From e22c69ad0ff020122f9e8871b350a34abf012f53 Mon Sep 17 00:00:00 2001 From: fg-admin Date: Wed, 25 Jan 2023 13:56:13 +0300 Subject: [PATCH] Initial commit --- .gitignore | 2 + LICENSE | 202 ++++++++++++++++++++++++ README.md | 53 +++++++ cmd/example/system/disk.go | 70 +++++++++ cmd/example/system/link.go | 48 ++++++ cmd/example/system/main.go | 77 ++++++++++ cmd/example/system/node.go | 113 ++++++++++++++ cmd/example/system/object.go | 18 +++ cmd/example/system/qdsl.go | 67 ++++++++ cmd/example/system/trigger.go | 44 ++++++ cmd/example/system/type.go | 73 +++++++++ go.mod | 41 +++++ go.sum | 109 +++++++++++++ pkg/client/system/advanced_links.go | 62 ++++++++ pkg/client/system/errors/errors.go | 16 ++ pkg/client/system/links.go | 190 +++++++++++++++++++++++ pkg/client/system/objects.go | 131 ++++++++++++++++ pkg/client/system/register.go | 38 +++++ pkg/client/system/router.go | 30 ++++ pkg/client/system/system.go | 14 ++ pkg/client/system/types.go | 230 ++++++++++++++++++++++++++++ pkg/executor/executor.go | 145 ++++++++++++++++++ pkg/log/log.go | 16 ++ pkg/module/context.go | 37 +++++ pkg/module/error.go | 23 +++ pkg/module/message.go | 23 +++ pkg/module/middleware.go | 84 ++++++++++ pkg/module/module.go | 107 +++++++++++++ pkg/module/opts.go | 21 +++ 29 files changed, 2084 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 cmd/example/system/disk.go create mode 100644 cmd/example/system/link.go create mode 100644 cmd/example/system/main.go create mode 100644 cmd/example/system/node.go create mode 100644 cmd/example/system/object.go create mode 100644 cmd/example/system/qdsl.go create mode 100644 cmd/example/system/trigger.go create mode 100644 cmd/example/system/type.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/client/system/advanced_links.go create mode 100644 pkg/client/system/errors/errors.go create mode 100644 pkg/client/system/links.go create mode 100644 pkg/client/system/objects.go create mode 100644 pkg/client/system/register.go create mode 100644 pkg/client/system/router.go create mode 100644 pkg/client/system/system.go create mode 100644 pkg/client/system/types.go create mode 100644 pkg/executor/executor.go create mode 100644 pkg/log/log.go create mode 100644 pkg/module/context.go create mode 100644 pkg/module/error.go create mode 100644 pkg/module/message.go create mode 100644 pkg/module/middleware.go create mode 100644 pkg/module/module.go create mode 100644 pkg/module/opts.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c740c37 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/system +/cmd/example/system/system diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7a4a3ea --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..3a4f065 --- /dev/null +++ b/README.md @@ -0,0 +1,53 @@ +# cmdb functions + +## Type (ex. Profiletype) +- **`kafka topic (ingress input)- type`** +- **`kafka key - type name`** +- **`kafka message`** +``` +{ + "method": "create", + "name": "nodes", + "payload": {} +} +``` +- **method** - может принимать значения (create, createChild, read, update, delete) +- **payload** - произвольный объект, будет как единственный профиль в cmdb +- **name** - имя линка + + +### Object (ex. MgmtObject) +- **`kafka topic (ingress input)- object`** +- **`kafka key - object uuid`** +- **`kafka message`** +``` +{ + "method": "createChild", + "name": "nodes", + "type": "node", + "payload": {} +} +``` +- **method** - может принимать значения (createChild, read, update, delete) +- **type** - произвольный тип +- **payload** - произвольный объект, будет как единственный профиль в cmdb (обязателен при createChild/update) +- **name** - имя линка + + +## Link +- **`kafka topic (ingress input)- link`** +- **`kafka key - from uuid or link uuid`** +- **`kafka message`** +``` +{ + "method": "create", + "name": "nvme0n1", + "to": "aec5b90c-1922-4ec5-a448-76c4710fc4e2", + "payload": {} +} +``` +- **method** - может принимать значения (create, read, update, delete) +- **name** - имя линка +- **to** - uuid объекта к которому линк +- **payload** - произвольный объект, будет как единственный профиль в cmdb + diff --git a/cmd/example/system/disk.go b/cmd/example/system/disk.go new file mode 100644 index 0000000..2b7e18b --- /dev/null +++ b/cmd/example/system/disk.go @@ -0,0 +1,70 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex/types" + "git.fg-tech.ru/listware/go-core/pkg/client/system" +) + +type Disk struct { + Name string `json:"name,omitempty"` +} + +func createDiskType(ctx context.Context) (err error) { + disk := Disk{ + Name: "nvme0n1", + } + + pt := types.ReflectType(disk) + + message, err := system.CreateType(pt) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func updateDiskType(ctx context.Context) (err error) { + disk := Disk{ + Name: "nvme0n1", + } + + pt := types.ReflectType(disk) + + //pt.Triggers["create"] = append(pt.Triggers["create"], logTriger) + + message, err := system.UpdateType(pt) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func deleteDiskType(ctx context.Context) (err error) { + message, err := system.DeleteType("disk") + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func createDiskObject(ctx context.Context) (err error) { + disk := Disk{ + Name: "nvme0n1", + } + + typeName := "disk" + + message, err := system.CreateObject(typeName, disk) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} diff --git a/cmd/example/system/link.go b/cmd/example/system/link.go new file mode 100644 index 0000000..0545e1c --- /dev/null +++ b/cmd/example/system/link.go @@ -0,0 +1,48 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/go-core/pkg/client/system" +) + +func createLink(ctx context.Context) (err error) { + node := Node{ + Hostname: "sky", + } + + message, err := system.CreateLink("objects/a0c17af0-45aa-4cdb-ace4-f60440583c8a", "objects/64f21ca8-c2c7-4fc1-bccc-de04842dbd49", "parent", "node", node) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} + +func updateLink(ctx context.Context) (err error) { + disk := Disk{} + message, err := system.UpdateLink("objects/a0c17af0-45aa-4cdb-ace4-f60440583c8a", "parent", disk) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} +func deleteLinkById(ctx context.Context) (err error) { + message, err := system.DeleteLink("21668", "") + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} +func deleteLinkByFromName(ctx context.Context) (err error) { + message, err := system.DeleteLink("2aac7fcf-a3c5-4089-b873-11c0f52c488c", "nvme0n1") + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} diff --git a/cmd/example/system/main.go b/cmd/example/system/main.go new file mode 100644 index 0000000..2f90096 --- /dev/null +++ b/cmd/example/system/main.go @@ -0,0 +1,77 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + "fmt" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex/types" + "git.fg-tech.ru/listware/go-core/pkg/client/system" + "git.fg-tech.ru/listware/go-core/pkg/executor" + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" +) + +var ( + exec executor.Executor +) + +func main() { + var err error + exec, err = executor.New("127.0.0.1:9092") + if err != nil { + fmt.Println(err) + return + } + + defer exec.Close() + + ctx := context.Background() + if err = register(ctx); err != nil { + fmt.Println(err) + return + } + return +} + +type NodeContainer struct{} + +func register(ctx context.Context) (err error) { + var registerTypes = []*pbcmdb.RegisterTypeMessage{} + + nodeContainerType, err := system.RegisterType(types.ReflectType(NodeContainer{}), true) + if err != nil { + return err + } + + nodeType, err := system.RegisterType(types.ReflectType(Node{}), true) + if err != nil { + return err + } + _ = nodeType + + registerTypes = append(registerTypes, nodeContainerType, nodeType) + + var registerObjects = []*pbcmdb.RegisterObjectMessage{} + + nodeContainer, err := system.RegisterObject("system/root", "types/node-container", "nodes", NodeContainer{}, false, false) + if err != nil { + return + } + + nodeContainer1, err := system.RegisterObject("nodes.root", "types/node", "node1", NodeContainer{}, false, true) + if err != nil { + return + } + registerObjects = append(registerObjects, nodeContainer, nodeContainer1) + + message, err := system.Register("appname", nil, registerObjects, nil) + if err != nil { + return + } + + if err = exec.ExecSync(ctx, message); err != nil { + return err + } + return +} diff --git a/cmd/example/system/node.go b/cmd/example/system/node.go new file mode 100644 index 0000000..673668e --- /dev/null +++ b/cmd/example/system/node.go @@ -0,0 +1,113 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex/types" + "git.fg-tech.ru/listware/go-core/pkg/client/system" +) + +type Node struct { + Hostname string `json:"hostname,omitempty"` + Domain string `json:"domain"` + Model string `json:"model"` +} + +func createNodeType(ctx context.Context) (err error) { + node := Node{ + Hostname: "sky01", + } + + pt := types.ReflectType(node) + // pt.Triggers["create"] = append(pt.Triggers["create"], logTriger) + + message, err := system.CreateType(pt) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func updateNodeType(ctx context.Context) (err error) { + node := Node{ + Hostname: "sky01", + } + + pt := types.ReflectType(node) + // pt.Triggers["create"] = append(pt.Triggers["create"], logTriger) + + message, err := system.UpdateType(pt) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func deleteNodeType(ctx context.Context) (err error) { + message, err := system.DeleteType("node") + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func createNodeObject(ctx context.Context) (err error) { + node := Node{ + Hostname: "sky01", + } + + message, err := system.CreateObject("node", node) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func createChild(ctx context.Context) (err error) { + disk := Disk{ + Name: "nvme4n1", + } + + message, err := system.CreateChild("objects/64f21ca8-c2c7-4fc1-bccc-de04842dbd49", "types/disk", disk.Name, disk) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} + +func updateObject(ctx context.Context) (err error) { + node := Node{ + Hostname: "sky0asd112", + } + + message, err := system.UpdateObject("58d12e3e-63f7-4c90-9d28-3812aebf81ce", node) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func createNodeTypeTrigger(ctx context.Context) (err error) { + message, err := system.AddTrigger("types/node", logTrigger) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} +func deleteNodeTypeTrigger(ctx context.Context) (err error) { + message, err := system.DeleteTrigger("types/node", logTrigger) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} diff --git a/cmd/example/system/object.go b/cmd/example/system/object.go new file mode 100644 index 0000000..5ee3a52 --- /dev/null +++ b/cmd/example/system/object.go @@ -0,0 +1,18 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/go-core/pkg/client/system" +) + +func deleteObject(ctx context.Context) (err error) { + message, err := system.DeleteObject("17177459") + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} diff --git a/cmd/example/system/qdsl.go b/cmd/example/system/qdsl.go new file mode 100644 index 0000000..b1bedbd --- /dev/null +++ b/cmd/example/system/qdsl.go @@ -0,0 +1,67 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex/types" + "git.fg-tech.ru/listware/go-core/pkg/client/system" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func qdsl(ctx context.Context) (err error) { + disk := Disk{ + Name: "nvme0n1", + } + + ffc, err := system.CreateChild("", "disk", disk.Name, disk) + if err != nil { + return + } + + message, err := system.QdslRouter("*.node.types", ffc) + if err != nil { + return + } + + return exec.ExecAsync(ctx, message) +} + +func qdsl2(ctx context.Context) (err error) { + disk := Disk{ + Name: "nvme0n1", + } + + pt := types.ReflectType(disk) + ffc, err := system.CreateType(pt) + if err != nil { + return + } + + ///// + + ft := &pbtypes.FunctionType{ + Namespace: "dev0.office", + Type: "worker", + } + + goFuncCtx := &pbtypes.FunctionContext{ + FunctionType: ft, + } + + if goFuncCtx.Value, err = proto.Marshal(ffc); err != nil { + return + } + + _ = goFuncCtx + + _ = ffc + message, err := system.QdslRouter("nodes.root", ffc) + if err != nil { + return + } + _ = message + return exec.ExecAsync(ctx, message) +} diff --git a/cmd/example/system/trigger.go b/cmd/example/system/trigger.go new file mode 100644 index 0000000..c284452 --- /dev/null +++ b/cmd/example/system/trigger.go @@ -0,0 +1,44 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/go-core/pkg/client/system" + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" +) + +var logTrigger = &pbcmdb.Trigger{ + Type: "create", + FunctionType: &pbtypes.FunctionType{ + Namespace: "system", + Type: "log.system.functions.root", + }, +} +var initTrigger = &pbcmdb.Trigger{ + Type: "update", + FunctionType: &pbtypes.FunctionType{ + Namespace: "proxy", + Type: "init.inventory.functions.root", + }, +} + +func createInitTrigger(ctx context.Context) (err error) { + message, err := system.AddLinkTrigger("types/node", "types/function", initTrigger) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} + +func deleteInitTrigger(ctx context.Context) (err error) { + message, err := system.DeleteLinkTrigger("types/node", "types/function", initTrigger) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} diff --git a/cmd/example/system/type.go b/cmd/example/system/type.go new file mode 100644 index 0000000..f142843 --- /dev/null +++ b/cmd/example/system/type.go @@ -0,0 +1,73 @@ +// Copyright 2022 Listware + +package main + +import ( + "context" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex/types" + "git.fg-tech.ru/listware/go-core/pkg/client/system" +) + +type Str struct{} + +func createTypes(ctx context.Context) (err error) { + var names = []string{"group", "node", "cpu", "os", "baseboard", "bios", "mem", "netlink", "temp"} + str := Str{} + pt := types.ReflectType(str) + + for _, name := range names { + pt.Schema.Title = name + message, err := system.CreateType(pt) + if err != nil { + return err + } + + if err = exec.ExecAsync(ctx, message); err != nil { + return err + } + } + + return +} + +func deleteType(ctx context.Context, ptName string) (err error) { + message, err := system.DeleteType(ptName) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} + +func createType(ctx context.Context) (err error) { + str := Str{} + pt := types.ReflectType(str) + + pt.Schema.Title = "temp" + + message, err := system.CreateType(pt) + if err != nil { + return err + } + + if err = exec.ExecAsync(ctx, message); err != nil { + return err + } + + return +} + +func updateType(ctx context.Context) (err error) { + str := Str{} + pt := types.ReflectType(str) + + pt.Schema.Title = "temp" + + message, err := system.UpdateType(pt) + if err != nil { + return + } + + return exec.ExecSync(ctx, message) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8dfd367 --- /dev/null +++ b/go.mod @@ -0,0 +1,41 @@ +module git.fg-tech.ru/listware/go-core + +go 1.19 + +require ( + git.fg-tech.ru/listware/cmdb v0.1.0 + git.fg-tech.ru/listware/proto v0.1.1 + github.com/Shopify/sarama v1.38.1 + github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0 + github.com/google/uuid v1.3.0 + google.golang.org/protobuf v1.28.1 +) + +require ( + github.com/arangodb/go-driver v1.4.1 // indirect + github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.3.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.15.14 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/net v0.5.0 // indirect + golang.org/x/sys v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect + google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect + google.golang.org/grpc v1.52.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4ba7f16 --- /dev/null +++ b/go.sum @@ -0,0 +1,109 @@ +git.fg-tech.ru/listware/cmdb v0.1.0 h1:K4A8XI8X5nh975+m+LVea/kIa2hSQSXCnH3qVjDjOwk= +git.fg-tech.ru/listware/cmdb v0.1.0/go.mod h1:kke3p3xl6lPlqwSfmA+grlKhGX6zKXctjcbE1Pcs9Ik= +git.fg-tech.ru/listware/proto v0.1.1 h1:CSqteAtgysiJe7+KtLOEXSIvxypmlJCKwQtla1d2v+A= +git.fg-tech.ru/listware/proto v0.1.1/go.mod h1:t5lyMTuX/if05HI/na9tJAlHCHHMdhdPLBTkhvscedQ= +github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= +github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0 h1:OfLhhWnnOfBUvzbQuhE7hCKJdlBW41nV3CfCF/q7UJs= +github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.2.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4= +github.com/arangodb/go-driver v1.4.1 h1:Jg0N7XKxiKwjswmAcMCnefWmt81KJEqybqRAGJDRWlo= +github.com/arangodb/go-driver v1.4.1/go.mod h1:UTtaxTUMmyPWzKc2dsWWOZzZ3yM6aHWxn/eubGa3YmQ= +github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g= +github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= +github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= +github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 h1:a2S6M0+660BgMNl++4JPlcAO/CjkqYItDEZwkoDQK7c= +google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6/go.mod h1:rZS5c/ZVYMaOGBfO68GWtjOw/eLaZM1X6iVtgjZ+EWg= +google.golang.org/grpc v1.52.1 h1:2NpOPk5g5Xtb0qebIEs7hNIa++PdtZLo2AQUpc1YnSU= +google.golang.org/grpc v1.52.1/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/client/system/advanced_links.go b/pkg/client/system/advanced_links.go new file mode 100644 index 0000000..e162426 --- /dev/null +++ b/pkg/client/system/advanced_links.go @@ -0,0 +1,62 @@ +// Copyright 2022 Listware + +package system + +import ( + "encoding/json" + + "git.fg-tech.ru/listware/go-core/pkg/client/system/errors" + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func prepareAdvancedLink(id string) (fc *pbtypes.FunctionContext) { + ft := &pbtypes.FunctionType{ + Namespace: namespace, + Type: advancedLinksType, + } + + fc = &pbtypes.FunctionContext{ + Id: id, + FunctionType: ft, + } + return +} + +func UpdateAdvancedLink(id string, payload any) (fc *pbtypes.FunctionContext, err error) { + if payload == nil { + return nil, errors.ErrPayloadNil + } + + payloadRaw, ok := payload.([]byte) + if !ok { + if payloadRaw, err = json.Marshal(payload); err != nil { + return + } + } + + fc = prepareAdvancedLink(id) + + om := &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_UPDATE, + Payload: payloadRaw, + } + + if fc.Value, err = proto.Marshal(om); err != nil { + return + } + return +} +func DeleteAdvancedLink(id string) (fc *pbtypes.FunctionContext, err error) { + fc = prepareLink(id) + + om := &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_DELETE, + } + + if fc.Value, err = proto.Marshal(om); err != nil { + return + } + return +} diff --git a/pkg/client/system/errors/errors.go b/pkg/client/system/errors/errors.go new file mode 100644 index 0000000..e61dd8a --- /dev/null +++ b/pkg/client/system/errors/errors.go @@ -0,0 +1,16 @@ +// Copyright 2022 Listware + +package errors + +import ( + "fmt" +) + +var ( + ErrPayloadNil = fmt.Errorf("payload can't be empty") + ErrBadCmdbContext = fmt.Errorf("bad cmdb context") +) + +func ErrFunctionId(id string) error { + return fmt.Errorf("unsupported function id: %s", id) +} diff --git a/pkg/client/system/links.go b/pkg/client/system/links.go new file mode 100644 index 0000000..22adeb5 --- /dev/null +++ b/pkg/client/system/links.go @@ -0,0 +1,190 @@ +// Copyright 2022 Listware + +package system + +import ( + "encoding/json" + + "git.fg-tech.ru/listware/go-core/pkg/client/system/errors" + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func prepareLink(id string) (fc *pbtypes.FunctionContext) { + ft := &pbtypes.FunctionType{ + Namespace: namespace, + Type: linksType, + } + + fc = &pbtypes.FunctionContext{ + Id: id, + FunctionType: ft, + } + return +} + +func CreateLinkMessage(to, name, linktype string, payload any) (linkMessage *pbcmdb.LinkMessage, err error) { + if payload == nil { + return nil, errors.ErrPayloadNil + } + + payloadRaw, err := json.Marshal(payload) + if err != nil { + return + } + + linkMessage = &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_CREATE, + Name: name, + To: to, + Type: linktype, + Payload: payloadRaw, + } + return +} + +// CreateLink create link 'from' -> 'to' with 'name' +// from will be 'root', 'node', '17136214' +// to will be 'root', 'node', '17136214' +func CreateLink(from, to, name, linktype string, payload any) (functionContext *pbtypes.FunctionContext, err error) { + linkMessage, err := CreateLinkMessage(to, name, linktype, payload) + if err != nil { + return + } + + functionContext = prepareLink(from) + + if functionContext.Value, err = proto.Marshal(linkMessage); err != nil { + return + } + return +} + +func UpdateLink(from, name string, payload any) (fc *pbtypes.FunctionContext, err error) { + if payload == nil { + return nil, errors.ErrPayloadNil + } + + payloadRaw, ok := payload.([]byte) + if !ok { + if payloadRaw, err = json.Marshal(payload); err != nil { + return + } + } + + fc = prepareLink(from) + + om := &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_UPDATE, + Name: name, + Payload: payloadRaw, + } + + if fc.Value, err = proto.Marshal(om); err != nil { + return + } + return +} +func DeleteLink(from, name string) (fc *pbtypes.FunctionContext, err error) { + fc = prepareLink(from) + + om := &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_DELETE, + Name: name, + } + + if fc.Value, err = proto.Marshal(om); err != nil { + return + } + return +} + +func AddLinkTriggerMessage(to string, trigger *pbcmdb.Trigger) (linkMessage *pbcmdb.LinkMessage, err error) { + if trigger == nil { + return nil, errors.ErrPayloadNil + } + + linkMessage = &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_CREATE_TRIGGER, + To: to, + } + + if linkMessage.Payload, err = proto.Marshal(trigger); err != nil { + return + } + return +} + +func AddLinkTrigger(from, to string, trigger *pbcmdb.Trigger) (functionContext *pbtypes.FunctionContext, err error) { + linkMessage, err := AddLinkTriggerMessage(to, trigger) + if err != nil { + return + } + + functionContext = prepareLink(from) + + if functionContext.Value, err = proto.Marshal(linkMessage); err != nil { + return + } + return +} + +func DeleteLinkTriggerMessage(to string, trigger *pbcmdb.Trigger) (linkMessage *pbcmdb.LinkMessage, err error) { + if trigger == nil { + return nil, errors.ErrPayloadNil + } + + linkMessage = &pbcmdb.LinkMessage{ + Method: pbcmdb.Method_DELETE_TRIGGER, + To: to, + } + + if linkMessage.Payload, err = proto.Marshal(trigger); err != nil { + return + } + return +} + +func DeleteLinkTrigger(from, to string, trigger *pbcmdb.Trigger) (functionContext *pbtypes.FunctionContext, err error) { + linkMessage, err := DeleteLinkTriggerMessage(to, trigger) + if err != nil { + return + } + + functionContext = prepareLink(from) + + if functionContext.Value, err = proto.Marshal(linkMessage); err != nil { + return + } + + return +} + +func RegisterLink(from, to, name, linktype string, payload any, async bool) (registerLinkMessage *pbcmdb.RegisterLinkMessage, err error) { + linkMessage, err := CreateLinkMessage(to, name, linktype, payload) + if err != nil { + return + } + + registerLinkMessage = &pbcmdb.RegisterLinkMessage{ + Id: from, + LinkMessage: linkMessage, + Async: async, + } + return +} + +func RegisterLinkTrigger(from, to string, trigger *pbcmdb.Trigger, async bool) (registerLinkMessage *pbcmdb.RegisterLinkMessage, err error) { + linkMessage, err := AddLinkTriggerMessage(to, trigger) + if err != nil { + return + } + + registerLinkMessage = &pbcmdb.RegisterLinkMessage{ + Id: from, + LinkMessage: linkMessage, + Async: async, + } + return +} diff --git a/pkg/client/system/objects.go b/pkg/client/system/objects.go new file mode 100644 index 0000000..565fb7c --- /dev/null +++ b/pkg/client/system/objects.go @@ -0,0 +1,131 @@ +// Copyright 2022 Listware + +package system + +import ( + "encoding/json" + + "git.fg-tech.ru/listware/go-core/pkg/client/system/errors" + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func prepareObject(id string) (functionContext *pbtypes.FunctionContext) { + functionType := &pbtypes.FunctionType{ + Namespace: namespace, + Type: objectsType, + } + + functionContext = &pbtypes.FunctionContext{ + Id: id, + FunctionType: functionType, + } + return +} + +func CreateChildMessage(moType, name string, payload any, functions ...*pbtypes.FunctionMessage) (objectMessage *pbcmdb.ObjectMessage, err error) { + if payload == nil { + return nil, errors.ErrPayloadNil + } + + payloadRaw, ok := payload.([]byte) + if !ok { + if payloadRaw, err = json.Marshal(payload); err != nil { + return + } + } + + objectMessage = &pbcmdb.ObjectMessage{ + Method: pbcmdb.Method_CREATE_CHILD, + Type: moType, + Name: name, + Payload: payloadRaw, + Functions: functions, + } + + return +} + +func CreateChild(from, moType, name string, payload any, functions ...*pbtypes.FunctionMessage) (functionContext *pbtypes.FunctionContext, err error) { + objectMessage, err := CreateChildMessage(moType, name, payload, functions...) + if err != nil { + return + } + + functionContext = prepareObject(from) + + if functionContext.Value, err = proto.Marshal(objectMessage); err != nil { + return + } + return +} + +func RegisterObject(from, moType, name string, payload any, async, router bool, functions ...*pbtypes.FunctionMessage) (registerObjectMessage *pbcmdb.RegisterObjectMessage, err error) { + objectMessage, err := CreateChildMessage(moType, name, payload, functions...) + if err != nil { + return + } + + registerObjectMessage = &pbcmdb.RegisterObjectMessage{ + Id: from, + ObjectMessage: objectMessage, + Async: async, + Router: router, + } + return +} + +func UpdateObjectMessage(payload any) (objectMessage *pbcmdb.ObjectMessage, err error) { + if payload == nil { + return nil, errors.ErrPayloadNil + } + + payloadRaw, ok := payload.([]byte) + if !ok { + if payloadRaw, err = json.Marshal(payload); err != nil { + return + } + } + + objectMessage = &pbcmdb.ObjectMessage{ + Method: pbcmdb.Method_UPDATE, + Payload: payloadRaw, + } + return +} + +func UpdateObject(id string, payload any) (functionContext *pbtypes.FunctionContext, err error) { + functionContext = prepareObject(id) + + objectMessage, err := UpdateObjectMessage(payload) + if err != nil { + return + } + + if functionContext.Value, err = proto.Marshal(objectMessage); err != nil { + return + } + return +} + +func DeleteObjectMessage() (objectMessage *pbcmdb.ObjectMessage, err error) { + objectMessage = &pbcmdb.ObjectMessage{ + Method: pbcmdb.Method_DELETE, + } + return +} + +func DeleteObject(id string) (functionContext *pbtypes.FunctionContext, err error) { + functionContext = prepareObject(id) + + objectMessage, err := DeleteObjectMessage() + if err != nil { + return + } + + if functionContext.Value, err = proto.Marshal(objectMessage); err != nil { + return + } + return +} diff --git a/pkg/client/system/register.go b/pkg/client/system/register.go new file mode 100644 index 0000000..a73169f --- /dev/null +++ b/pkg/client/system/register.go @@ -0,0 +1,38 @@ +// Copyright 2022 Listware + +package system + +import ( + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func prepareRegister(id string) (functionContext *pbtypes.FunctionContext) { + functionType := &pbtypes.FunctionType{ + Namespace: namespace, + Type: registerType, + } + + functionContext = &pbtypes.FunctionContext{ + Id: id, + FunctionType: functionType, + } + return +} + +func Register(name string, types []*pbcmdb.RegisterTypeMessage, objects []*pbcmdb.RegisterObjectMessage, links []*pbcmdb.RegisterLinkMessage) (functionContext *pbtypes.FunctionContext, err error) { + functionContext = prepareRegister(name) + + registerMessage := &pbcmdb.RegisterMessage{ + TypeMessages: types, + ObjectMessages: objects, + LinkMessages: links, + } + + if functionContext.Value, err = proto.Marshal(registerMessage); err != nil { + return + } + + return +} diff --git a/pkg/client/system/router.go b/pkg/client/system/router.go new file mode 100644 index 0000000..fee869e --- /dev/null +++ b/pkg/client/system/router.go @@ -0,0 +1,30 @@ +// Copyright 2022 Listware + +package system + +import ( + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func prepareRouter(qdsl string) (fc *pbtypes.FunctionContext) { + ft := &pbtypes.FunctionType{ + Namespace: namespace, + Type: routerType, + } + + fc = &pbtypes.FunctionContext{ + Id: qdsl, + FunctionType: ft, + } + return +} + +func QdslRouter(qdsl string, ffc *pbtypes.FunctionContext) (fc *pbtypes.FunctionContext, err error) { + fc = prepareRouter(qdsl) + + if fc.Value, err = proto.Marshal(ffc); err != nil { + return + } + return +} diff --git a/pkg/client/system/system.go b/pkg/client/system/system.go new file mode 100644 index 0000000..ce619c1 --- /dev/null +++ b/pkg/client/system/system.go @@ -0,0 +1,14 @@ +// Copyright 2022 Listware + +package system + +const ( + namespace = "system" + + typesType = "types.system.functions.root" + routerType = "router.system.functions.root" + objectsType = "objects.system.functions.root" + linksType = "links.system.functions.root" + advancedLinksType = "advanced.links.system.functions.root" + registerType = "register.system.functions.root" +) diff --git a/pkg/client/system/types.go b/pkg/client/system/types.go new file mode 100644 index 0000000..786accb --- /dev/null +++ b/pkg/client/system/types.go @@ -0,0 +1,230 @@ +// Copyright 2022 Listware + +package system + +import ( + "encoding/json" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex/types" + "git.fg-tech.ru/listware/go-core/pkg/client/system/errors" + "git.fg-tech.ru/listware/proto/sdk/pbcmdb" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "google.golang.org/protobuf/proto" +) + +func prepareType(id string) (functionContext *pbtypes.FunctionContext) { + functionType := &pbtypes.FunctionType{ + Namespace: namespace, + Type: typesType, + } + + functionContext = &pbtypes.FunctionContext{ + Id: id, + FunctionType: functionType, + } + return +} + +func CreateTypeMessage(pt *types.Type) (typeMessage *pbcmdb.TypeMessage, err error) { + if pt == nil { + return nil, errors.ErrPayloadNil + } + + if pt.Schema == nil { + return nil, errors.ErrPayloadNil + } + + typeMessage = &pbcmdb.TypeMessage{ + Method: pbcmdb.Method_CREATE, + Name: pt.Schema.Title, + } + + if typeMessage.Payload, err = json.Marshal(pt); err != nil { + return + } + + return +} + +func CreateType(pt *types.Type) (functionContext *pbtypes.FunctionContext, err error) { + typeMessage, err := CreateTypeMessage(pt) + if err != nil { + return + } + + functionContext = prepareType("system/types") + + if functionContext.Value, err = proto.Marshal(typeMessage); err != nil { + return + } + + return +} + +func RegisterType(pt *types.Type, async bool) (registerTypeMessage *pbcmdb.RegisterTypeMessage, err error) { + typeMessage, err := CreateTypeMessage(pt) + if err != nil { + return + } + + registerTypeMessage = &pbcmdb.RegisterTypeMessage{ + Id: "system/types", + TypeMessage: typeMessage, + Async: async, + } + return +} + +func UpdateTypeMessage(pt *types.Type) (typeMessage *pbcmdb.TypeMessage, err error) { + if pt == nil { + return nil, errors.ErrPayloadNil + } + + if pt.Schema == nil { + return nil, errors.ErrPayloadNil + } + + typeMessage = &pbcmdb.TypeMessage{ + Method: pbcmdb.Method_UPDATE, + Name: pt.Schema.Title, + } + + if typeMessage.Payload, err = json.Marshal(pt); err != nil { + return + } + + return +} + +func UpdateType(pt *types.Type) (functionContext *pbtypes.FunctionContext, err error) { + typeMessage, err := UpdateTypeMessage(pt) + if err != nil { + return + } + + functionContext = prepareType("types/" + pt.Schema.Title) + + if functionContext.Value, err = proto.Marshal(typeMessage); err != nil { + return + } + + return +} + +func DeleteTypeMessage(moType string) (typeMessage *pbcmdb.TypeMessage, err error) { + typeMessage = &pbcmdb.TypeMessage{ + Method: pbcmdb.Method_DELETE, + Name: moType, + } + + return +} + +func DeleteType(moType string) (functionContext *pbtypes.FunctionContext, err error) { + typeMessage, err := DeleteTypeMessage(moType) + if err != nil { + return + } + + functionContext = prepareType(moType) + + if functionContext.Value, err = proto.Marshal(typeMessage); err != nil { + return + } + return +} + +func CreateObjectMessage(moType string, payload any, functions ...*pbtypes.FunctionMessage) (typeMessage *pbcmdb.TypeMessage, err error) { + if payload == nil { + return nil, errors.ErrPayloadNil + } + + payloadRaw, ok := payload.([]byte) + if !ok { + if payloadRaw, err = json.Marshal(payload); err != nil { + return + } + } + + typeMessage = &pbcmdb.TypeMessage{ + Method: pbcmdb.Method_CREATE_CHILD, + Name: moType, + Functions: functions, + Payload: payloadRaw, + } + return +} + +func CreateObject(moType string, payload any, functions ...*pbtypes.FunctionMessage) (functionContext *pbtypes.FunctionContext, err error) { + typeMessage, err := CreateObjectMessage(moType, payload, functions...) + if err != nil { + return + } + + functionContext = prepareType(moType) + + if functionContext.Value, err = proto.Marshal(typeMessage); err != nil { + return + } + return +} + +func AddTriggerMessage(moType string, trigger *pbcmdb.Trigger) (typeMessage *pbcmdb.TypeMessage, err error) { + if trigger == nil { + return nil, errors.ErrPayloadNil + } + + typeMessage = &pbcmdb.TypeMessage{ + Method: pbcmdb.Method_CREATE_TRIGGER, + Name: moType, + } + + if typeMessage.Payload, err = proto.Marshal(trigger); err != nil { + return + } + return +} + +func AddTrigger(moType string, trigger *pbcmdb.Trigger) (functionContext *pbtypes.FunctionContext, err error) { + typeMessage, err := AddTriggerMessage(moType, trigger) + if err != nil { + return + } + + functionContext = prepareType(moType) + + if functionContext.Value, err = proto.Marshal(typeMessage); err != nil { + return + } + return +} + +func DeleteTriggerMessage(moType string, trigger *pbcmdb.Trigger) (typeMessage *pbcmdb.TypeMessage, err error) { + if trigger == nil { + return nil, errors.ErrPayloadNil + } + + typeMessage = &pbcmdb.TypeMessage{ + Method: pbcmdb.Method_DELETE_TRIGGER, + Name: moType, + } + + if typeMessage.Payload, err = proto.Marshal(trigger); err != nil { + return + } + return +} +func DeleteTrigger(moType string, trigger *pbcmdb.Trigger) (functionContext *pbtypes.FunctionContext, err error) { + typeMessage, err := DeleteTriggerMessage(moType, trigger) + if err != nil { + return + } + + functionContext = prepareType(moType) + + if functionContext.Value, err = proto.Marshal(typeMessage); err != nil { + return + } + + return +} diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go new file mode 100644 index 0000000..a29aec1 --- /dev/null +++ b/pkg/executor/executor.go @@ -0,0 +1,145 @@ +// Copyright 2022 Listware + +package executor + +import ( + "bytes" + "context" + "fmt" + "strings" + "time" + + "git.fg-tech.ru/listware/proto/sdk/pbflink" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "github.com/Shopify/sarama" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" + "github.com/google/uuid" +) + +const ( + defaultBroker = "0.0.0.0:9092" + + inputTopic = "router.system" +) + +type Executor interface { + ExecAsync(context.Context, ...*pbtypes.FunctionContext) error + + ExecSync(context.Context, *pbtypes.FunctionContext) error + + Close() +} + +type executor struct { + p sarama.SyncProducer + c sarama.Consumer +} + +func New(brokers ...string) (Executor, error) { + if len(brokers) == 0 { + brokers = append(brokers, defaultBroker) + } + e := &executor{} + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Retry.Max = 5 + var err error + if e.p, err = sarama.NewSyncProducer(brokers, config); err != nil { + return nil, err + } + + if e.c, err = sarama.NewConsumer(brokers, config); err != nil { + return nil, err + } + + return e, err +} + +func (k *executor) Close() { + k.p.Close() + k.c.Close() +} + +func (k *executor) ExecAsync(ctx context.Context, msgs ...*pbtypes.FunctionContext) (err error) { + for _, msg := range msgs { + var buffer bytes.Buffer + if err = statefun.MakeProtobufType(msg).Serialize(&buffer, msg); err != nil { + return + } + + message := &sarama.ProducerMessage{ + Topic: inputTopic, + // Key: sarama.StringEncoder(msg.GetId()), + Value: sarama.ByteEncoder(buffer.Bytes()), + } + + if _, _, err = k.p.SendMessage(message); err != nil { + return + } + } + return +} + +func (k *executor) ExecSync(ctx context.Context, msg *pbtypes.FunctionContext) (err error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + replyResult := &pbtypes.ReplyResult{ + Namespace: msg.FunctionType.Namespace, + Topic: msg.FunctionType.Type, + Key: uuid.New().String(), + } + + msg.ReplyResult = replyResult + + var buffer bytes.Buffer + if err = statefun.MakeProtobufType(msg).Serialize(&buffer, msg); err != nil { + return + } + + message := &sarama.ProducerMessage{ + Topic: inputTopic, + // Key: sarama.StringEncoder(msg.GetId()), + Value: sarama.ByteEncoder(buffer.Bytes()), + } + + consumer, err := k.c.ConsumePartition(replyResult.Topic, 0, sarama.OffsetOldest) + if err != nil { + return + } + defer consumer.Close() + inputChan := consumer.Messages() + errorsInput := consumer.Errors() + if _, _, err = k.p.SendMessage(message); err != nil { + return + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case m := <-inputChan: + if string(m.Key) == replyResult.Key { + var typedValue pbflink.TypedValue + if err = statefun.MakeProtobufType(&typedValue).Deserialize(bytes.NewReader(m.Value), &typedValue); err != nil { + return err + } + + var functionResult pbtypes.FunctionResult + if err = statefun.MakeProtobufType(&functionResult).Deserialize(bytes.NewReader(typedValue.Value), &functionResult); err != nil { + return err + } + if !functionResult.Complete { + err = fmt.Errorf(strings.Join(functionResult.Errors, ";")) + } + return err + } + case err = <-errorsInput: + return + } + + } +} diff --git a/pkg/log/log.go b/pkg/log/log.go new file mode 100644 index 0000000..6f8f3fb --- /dev/null +++ b/pkg/log/log.go @@ -0,0 +1,16 @@ +// Copyright 2022 Listware + +package log + +import ( + "fmt" +) + +// чтобы потом в одном месте сменить реализацию +func Debug(a ...any) { + fmt.Println(a...) +} + +func Debugf(format string, a ...any) { + fmt.Printf(format, a...) +} diff --git a/pkg/module/context.go b/pkg/module/context.go new file mode 100644 index 0000000..185c04b --- /dev/null +++ b/pkg/module/context.go @@ -0,0 +1,37 @@ +// Copyright 2022 Listware + +package module + +import ( + "encoding/json" + + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +type Context interface { + statefun.Context + Message() json.RawMessage + States() []statefun.ValueSpec + + CmdbContext() json.RawMessage +} + +type contextAdapter struct { + statefun.Context + message json.RawMessage + states []statefun.ValueSpec + + cmdbContext json.RawMessage +} + +func (c *contextAdapter) Message() json.RawMessage { + return c.message +} + +func (c *contextAdapter) States() []statefun.ValueSpec { + return c.states +} + +func (c *contextAdapter) CmdbContext() json.RawMessage { + return c.cmdbContext +} diff --git a/pkg/module/error.go b/pkg/module/error.go new file mode 100644 index 0000000..8b62044 --- /dev/null +++ b/pkg/module/error.go @@ -0,0 +1,23 @@ +// Copyright 2022 Listware + +package module + +import ( + "fmt" +) + +type errorRetry struct { + err error +} + +func (e *errorRetry) Error() string { + return fmt.Sprintf("retry: %+v", e.err) +} + +func (e *errorRetry) Unwrap() error { + return e.err +} + +func NewError(err error) error { + return &errorRetry{err} +} diff --git a/pkg/module/message.go b/pkg/module/message.go new file mode 100644 index 0000000..d0f9656 --- /dev/null +++ b/pkg/module/message.go @@ -0,0 +1,23 @@ +// Copyright 2022 Listware + +package module + +import ( + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +func ToMessage(functionContext *pbtypes.FunctionContext) (message statefun.MessageBuilder, err error) { + message = statefun.MessageBuilder{ + Target: statefun.Address{ + Id: functionContext.GetId(), + }, + Value: functionContext, + } + + if message.Target.FunctionType, err = statefun.TypeNameFromParts(functionContext.GetFunctionType().GetNamespace(), functionContext.GetFunctionType().GetType()); err != nil { + return + } + message.ValueType = statefun.MakeProtobufType(functionContext) + return +} diff --git a/pkg/module/middleware.go b/pkg/module/middleware.go new file mode 100644 index 0000000..bd8bbb8 --- /dev/null +++ b/pkg/module/middleware.go @@ -0,0 +1,84 @@ +// Copyright 2022 Listware + +package module + +import ( + "errors" + "strings" + "time" + + "git.fg-tech.ru/listware/cmdb/pkg/cmdb/vertex" + "git.fg-tech.ru/listware/go-core/pkg/log" + "git.fg-tech.ru/listware/proto/sdk/pbtypes" + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +var ( + functionContextTypeName = statefun.MakeProtobufTypeWithTypeName(statefun.TypeNameFrom("type.googleapis.com/FunctionContext")) +) + +// StatefulFunction function interface +type StatefulFunction func(Context) error + +// functionMiddleware adapter implemets middleware +type functionMiddleware struct { + parent *module + f StatefulFunction +} + +// Invoke statefun.StatefulFunction implementation +func (fm *functionMiddleware) Invoke(ctx statefun.Context, message statefun.Message) (err error) { + start := time.Now() + defer func() { + log.Debug(ctx.Self(), "took", time.Since(start)) + }() + log.Debug(ctx.Self()) + + // if ctx.Caller().FunctionType != nil { + // log.Debug("caller", ctx.Caller()) + // } + // + + var functionContext pbtypes.FunctionContext + if err = message.As(functionContextTypeName, &functionContext); err != nil { + log.Debug(ctx.Self(), "proto", err) + // do not retry idempotent call + return nil + } + + context := &contextAdapter{ + Context: ctx, + message: functionContext.Value, + states: fm.parent.states[ctx.Self().FunctionType.String()], + } + + collectionKey := strings.Split(ctx.Self().Id, "/") + if len(collectionKey) != 2 { + log.Debug(ctx.Self(), "bad id") + return nil + } + + response, err := vertex.Read(ctx, collectionKey[1], collectionKey[0]) + if err != nil { + log.Debug(err) + return nil + } + context.cmdbContext = response.GetPayload() + + if context.cmdbContext == nil { + log.Debug(ctx.Self(), "bad context") + return nil + } + + if err = fm.f(context); err != nil { + log.Debug(err) + // print all errors + } + + if err = errors.Unwrap(err); err != nil { + // retry only if wrapped error + return + } + + return nil +} diff --git a/pkg/module/module.go b/pkg/module/module.go new file mode 100644 index 0000000..63a707c --- /dev/null +++ b/pkg/module/module.go @@ -0,0 +1,107 @@ +// Copyright 2022 Listware + +package module + +import ( + "context" + "fmt" + "net" + "net/http" + "os" + "path" + + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +type Module interface { + // Bind new function to module + Bind(string, StatefulFunction, ...statefun.ValueSpec) error + + // RegisterAndListen register module and listen port + RegisterAndListen(context.Context) error + + // listener port + Port() int + // addr for register + Addr() string +} + +type module struct { + builder statefun.StatefulFunctions + + namespace string + port int + + states map[string][]statefun.ValueSpec + + listener net.Listener +} + +// New module +func newModule(namespace string) *module { + return &module{ + builder: statefun.StatefulFunctionsBuilder(), + states: make(map[string][]statefun.ValueSpec), + namespace: namespace, + } +} + +func New(namespace string, opts ...Opt) Module { + m := newModule(namespace) + + for _, opt := range opts { + opt(m) + } + return m +} + +// Bind new function to module +func (m *module) Bind(methodType string, f StatefulFunction, states ...statefun.ValueSpec) error { + typeName := statefun.TypeNameFrom(path.Join(m.namespace, methodType)) + m.states[typeName.String()] = states + return m.builder.WithSpec(statefun.StatefulFunctionSpec{ + FunctionType: typeName, + States: states, + Function: &functionMiddleware{parent: m, f: f}, + }) +} + +// RegisterAndListen register module and listen port +func (m *module) RegisterAndListen(ctx context.Context) (err error) { + mux := http.NewServeMux() + mux.Handle("/statefun", m.builder.AsHandler()) + srv := &http.Server{Handler: mux} + + if m.listener, err = net.Listen("tcp4", fmt.Sprintf(":%d", m.port)); err != nil { + return + } + + ctx, cancel := context.WithCancel(ctx) + go func() { + if err = srv.Serve(m.listener); err != nil { + cancel() + } + }() + + <-ctx.Done() + + if err != nil { + return + } + + return srv.Shutdown(ctx) +} + +func (m *module) Port() int { + if m.listener != nil { + if addr, ok := m.listener.Addr().(*net.TCPAddr); ok { + return addr.Port + } + } + return m.port +} + +func (m *module) Addr() string { + hostname, _ := os.Hostname() + return fmt.Sprintf("http://%s:%d/statefun", hostname, m.Port()) +} diff --git a/pkg/module/opts.go b/pkg/module/opts.go new file mode 100644 index 0000000..2b0ebef --- /dev/null +++ b/pkg/module/opts.go @@ -0,0 +1,21 @@ +// Copyright 2022 Listware + +package module + +import ( + "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" +) + +type Opt func(*module) + +func WithStatefulFunctions(builder statefun.StatefulFunctions) Opt { + return func(m *module) { + m.builder = builder + } +} + +func WithPort(port int) Opt { + return func(m *module) { + m.port = port + } +}