commit f7eda6f989cf2c6b5ddc48b595e93130a9433d15 Author: fg-admin Date: Fri Dec 30 14:28:38 2022 +0300 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dc6f790 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +target/ +*.iml +.idea/ +venv/* +__pycahe__/* +.DS_Store +*.code-workspace +.vscode/ +.settings/ +.classpath +.project diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7a4a3ea --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..3b616f9 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# Core diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2929837 --- /dev/null +++ b/pom.xml @@ -0,0 +1,142 @@ + + 4.0.0 + org.listware + core + 1.0 + jar + + + UTF-8 + 11 + 11 + + + + + org.apache.flink + statefun-sdk-embedded + 3.2.0 + provided + + + org.apache.flink + statefun-flink-io-bundle + 3.2.0 + provided + + + com.google.protobuf + protobuf-java + 3.7.1 + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + com.arangodb + arangodb-java-driver + 6.19.0 + + + com.arangodb + jackson-dataformat-velocypack + 3.0.1 + + + com.github.fge + jackson-coreutils + 1.8 + + + io.grpc + grpc-stub + 1.51.0 + + + io.grpc + grpc-protobuf + 1.51.0 + + + javax.annotation + javax.annotation-api + 1.3.2 + + + io.grpc + grpc-netty-shaded + 1.50.0 + runtime + + + org.apache.httpcomponents + httpcore + 4.4.8 + + + com.google.guava + guava + 23.6-jre + + + + org.listware + io + 1.0 + provided + + + org.listware + proto + 1.0 + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + package + + shade + + + false + + + + + + + + + + + + + gitea + http://git.fg-tech.ru/api/packages/listware/maven + + + + + gitea + http://git.fg-tech.ru/api/packages/listware/maven + + + gitea + http://git.fg-tech.ru/api/packages/listware/maven + + + diff --git a/src/main/java/org/listware/core/FunctionContext.java b/src/main/java/org/listware/core/FunctionContext.java new file mode 100644 index 0000000..c4d8f61 --- /dev/null +++ b/src/main/java/org/listware/core/FunctionContext.java @@ -0,0 +1,117 @@ +/* Copyright 2022 Listware */ + +package org.listware.core; + +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; + +import org.listware.sdk.Functions; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.utils.Cmdb.Matcher; +import org.listware.core.provider.utils.Cmdb.SystemKeys; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FunctionContext { + private static final Logger LOG = LoggerFactory.getLogger(FunctionContext.class); + + private Context flinkContext; + private ObjectDocument document; + private Functions.FunctionContext functionContext; + private boolean isExecutedCallback = false; + + public FunctionContext(Context context, ObjectDocument document, Functions.FunctionContext functionContext) { + this.flinkContext = context; + this.document = document; + this.functionContext = functionContext; + } + + public Context getFlinkContext() { + return flinkContext; + } + + public ObjectDocument getDocument() { + return document; + } + + public Functions.FunctionContext getFunctionContext() { + return functionContext; + } + + /** + * root + ** + */ + public boolean isRoot() { + return document.getKey().equals(SystemKeys.ROOT); + } + + /** + * objects.root + ** + */ + public boolean isObjects() { + return document.getKey().equals(SystemKeys.OBJECTS); + } + + /** + * types.root + ** + */ + public boolean isTypes() { + return document.getKey().equals(SystemKeys.TYPES); + } + + /** + * is link? + ** + */ + public boolean isLink() { + return document.getKey().matches(Matcher.NUMERIC_STRING); + } + + /** + * is object? + ** + */ + public boolean isObject() { + return document.getKey().matches(Matcher.UUID_V4_STRING); + } + + /** + * is type? + ** + */ + public boolean isType() { + return (!isRoot() && !isObjects() && !isTypes() && !isLink() && !isObject()); + } + + // You can execute callback only once, getCallback() to inherit func or + // Callback() after invoke + public void callback() throws Exception { + Functions.FunctionContext callback = getCallback(); + if (callback != null) { + String namespace = callback.getFunctionType().getNamespace(); + String type = callback.getFunctionType().getType(); + FunctionType functionType = new FunctionType(namespace, type); + + LOG.info("send: " + functionType + " id " + callback.getId()); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(callback); + ; + + flinkContext.send(functionType, callback.getId(), typedValue); + } + } + + public Functions.FunctionContext getCallback() { + if (functionContext.hasCallback() && !isExecutedCallback) { + isExecutedCallback = true; + return functionContext.getCallback(); + } + return null; + } +} diff --git a/src/main/java/org/listware/core/Module.java b/src/main/java/org/listware/core/Module.java new file mode 100644 index 0000000..88c27d8 --- /dev/null +++ b/src/main/java/org/listware/core/Module.java @@ -0,0 +1,35 @@ +/* Copyright 2022 Listware */ + +package org.listware.core; + +import java.util.Map; + +import com.google.auto.service.AutoService; + +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; +import org.listware.core.provider.FunctionProvider; +import org.listware.core.provider.functions.Link; +import org.listware.core.provider.functions.Log; +import org.listware.core.provider.functions.Object; +import org.listware.core.provider.functions.ObjectTrigger; +import org.listware.core.provider.functions.Register; +import org.listware.core.provider.functions.Router; +import org.listware.core.provider.functions.Type; +import org.listware.core.provider.functions.TypeTrigger; + +@AutoService(StatefulFunctionModule.class) +public final class Module implements StatefulFunctionModule { + private FunctionProvider provider = new FunctionProvider(); + + @Override + public void configure(Map globalConfiguration, Binder binder) { + binder.bindFunctionProvider(Type.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(TypeTrigger.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(Object.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(ObjectTrigger.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(Link.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(Router.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(Log.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(Register.FUNCTION_TYPE, provider); + } +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/documents/LinkDocument.java b/src/main/java/org/listware/core/documents/LinkDocument.java new file mode 100644 index 0000000..d26fa26 --- /dev/null +++ b/src/main/java/org/listware/core/documents/LinkDocument.java @@ -0,0 +1,177 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents; + +import java.util.Map; + +import org.listware.core.documents.entity.DocumentFields; +import org.listware.core.documents.entity.Name; +import org.listware.core.documents.entity.Type; +import org.listware.core.provider.utils.exceptions.PayloadNotFoundException; + +import com.arangodb.entity.From; +import com.arangodb.entity.To; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.jackson.JsonLoader; +import com.google.protobuf.ByteString; + +public class LinkDocument extends ObjectDocument { + private static final long serialVersionUID = 6904923804449368783L; + + @From + private String from; + @To + private String to; + + @Name + private String name; + + @Type + private String type; + + public LinkDocument() { + super(); + } + + public LinkDocument(final String from, final String to) { + super(); + this.from = from; + this.to = to; + } + + public LinkDocument(final String key, final String from, final String to) { + super(key); + this.from = from; + this.to = to; + } + + public LinkDocument(final String from, final String to, final String name, final String type) { + this(from, to); + this.name = name; + this.type = type; + } + + public LinkDocument(final Map properties) { + super(properties); + final Object tmpFrom = properties.remove(DocumentFields.FROM); + if (tmpFrom != null) { + from = tmpFrom.toString(); + } + final Object tmpTo = properties.remove(DocumentFields.TO); + if (tmpTo != null) { + to = tmpTo.toString(); + } + final Object tmpName = properties.remove(DocumentFields.NAME); + if (tmpName != null) { + name = tmpName.toString(); + } + final Object tmpType = properties.remove(DocumentFields.TYPE); + if (tmpType != null) { + type = tmpType.toString(); + } + } + + public String getFrom() { + return from; + } + + public void setFrom(final String from) { + this.from = from; + } + + public String getTo() { + return to; + } + + public void setTo(final String to) { + this.to = to; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + @Override + public String toString() { + return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key + + ", from=" + from + ", to=" + to + ", properties=" + properties + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((from == null) ? 0 : from.hashCode()); + result = prime * result + ((to == null) ? 0 : to.hashCode()); + result = prime * result + ((to == null) ? 0 : name.hashCode()); + result = prime * result + ((to == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final LinkDocument other = (LinkDocument) obj; + if (name == null) { + if (other.name != null) { + return false; + } + } else if (!name.equals(other.name)) { + return false; + } + if (type == null) { + if (other.type != null) { + return false; + } + } else if (!type.equals(other.type)) { + return false; + } + if (from == null) { + if (other.from != null) { + return false; + } + } else if (!from.equals(other.from)) { + return false; + } + if (to == null) { + return other.to == null; + } else + return to.equals(other.to); + } + + public static LinkDocument deserialize(ByteString payload) throws Exception { + if (payload.isEmpty()) { + throw new PayloadNotFoundException(); + } + + JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8()); + ObjectMapper mapper = new ObjectMapper(); + + TypeReference> ref = new TypeReference>() { + }; + Map values = mapper.convertValue(jsonNode, ref); + return new LinkDocument(values); + } +} diff --git a/src/main/java/org/listware/core/documents/MetaDocument.java b/src/main/java/org/listware/core/documents/MetaDocument.java new file mode 100644 index 0000000..ea35f8c --- /dev/null +++ b/src/main/java/org/listware/core/documents/MetaDocument.java @@ -0,0 +1,92 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents; + +import java.io.Serializable; +import java.util.Map; + +import org.listware.core.documents.entity.Created; +import org.listware.core.documents.entity.DocumentFields; +import org.listware.core.documents.entity.Updated; + +public class MetaDocument implements Serializable { + private static final long serialVersionUID = 8363296886824978390L; + + @Created + private Number created; + + @Updated + private Number updated; + + public MetaDocument() { + super(); + created = System.currentTimeMillis(); + updated = created; + } + + public MetaDocument(final Map properties) { + super(); + final Object tmpCreated = properties.remove(DocumentFields.CREATED); + if (tmpCreated != null) { + created = (Number) tmpCreated; + } + final Object tmpUpdated = properties.remove(DocumentFields.UPDATED); + if (tmpUpdated != null) { + updated = (Number) tmpUpdated; + } + } + + public Number getCreated() { + return created; + } + + public void setCreated(Number created) { + this.created = created; + } + + public Number getUpdated() { + return updated; + } + + public void setUpdated(Number updated) { + this.updated = updated; + } + + public void update() { + updated = System.currentTimeMillis(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((created == null) ? 0 : created.hashCode()); + result = prime * result + ((updated == null) ? 0 : updated.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final MetaDocument other = (MetaDocument) obj; + if (created == null) { + if (other.created != null) { + return false; + } + } else if (!created.equals(other.created)) { + return false; + } + if (updated == null) { + return other.updated == null; + } else + return updated.equals(other.updated); + } +} diff --git a/src/main/java/org/listware/core/documents/ObjectDocument.java b/src/main/java/org/listware/core/documents/ObjectDocument.java new file mode 100644 index 0000000..2bc6fca --- /dev/null +++ b/src/main/java/org/listware/core/documents/ObjectDocument.java @@ -0,0 +1,205 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.listware.core.documents.entity.DocumentFields; +import org.listware.core.documents.entity.Meta; +import org.listware.core.provider.utils.exceptions.PayloadNotFoundException; + +import com.arangodb.entity.Id; +import com.arangodb.entity.Key; +import com.arangodb.entity.Rev; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.fge.jackson.JsonLoader; +import com.google.protobuf.ByteString; + +public class ObjectDocument implements Serializable { + private static final long serialVersionUID = -1824742667228719116L; + + @Id + protected String id; + @Key + protected String key; + @Rev + protected String revision; + @Meta + protected MetaDocument meta; + + protected Map properties; + + public ObjectDocument() { + super(); + properties = new HashMap<>(); + meta = new MetaDocument(); + } + + public ObjectDocument(final String key) { + this(); + this.key = key; + } + + @SuppressWarnings("unchecked") + public ObjectDocument(final Map properties) { + this(); + final Object tmpId = properties.remove(DocumentFields.ID); + if (tmpId != null) { + id = tmpId.toString(); + } + final Object tmpKey = properties.remove(DocumentFields.KEY); + if (tmpKey != null) { + key = tmpKey.toString(); + } + final Object tmpRev = properties.remove(DocumentFields.REV); + if (tmpRev != null) { + revision = tmpRev.toString(); + } + final Object tmpMeta = properties.remove(DocumentFields.META); + if (tmpMeta != null) { + meta = new MetaDocument((Map) tmpMeta); + } + this.properties = properties; + } + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getKey() { + return key; + } + + public void setKey(final String key) { + this.key = key; + } + + public String getRevision() { + return revision; + } + + public void setRevision(final String revision) { + this.revision = revision; + } + + public MetaDocument getMeta() { + return meta; + } + + public void setMeta(MetaDocument meta) { + this.meta = meta; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(final Map properties) { + this.properties = properties; + } + + public void addAttribute(final String key, final Object value) { + properties.put(key, value); + } + + public void updateAttribute(final String key, final Object value) { + if (properties.containsKey(key)) { + properties.put(key, value); + } + } + + public Object getAttribute(final String key) { + return properties.get(key); + } + + @Override + public String toString() { + return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key + + ", properties=" + properties + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((id == null) ? 0 : id.hashCode()); + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + ((properties == null) ? 0 : properties.hashCode()); + result = prime * result + ((revision == null) ? 0 : revision.hashCode()); + result = prime * result + ((meta == null) ? 0 : meta.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ObjectDocument other = (ObjectDocument) obj; + if (id == null) { + if (other.id != null) { + return false; + } + } else if (!id.equals(other.id)) { + return false; + } + if (key == null) { + if (other.key != null) { + return false; + } + } else if (!key.equals(other.key)) { + return false; + } + if (properties == null) { + if (other.properties != null) { + return false; + } + } else if (!properties.equals(other.properties)) { + return false; + } + if (meta == null) { + if (other.meta != null) { + return false; + } + } else if (!meta.equals(other.meta)) { + return false; + } + if (revision == null) { + return other.revision == null; + } else + return revision.equals(other.revision); + } + + public void updateMeta() { + meta.update(); + } + + + public static ObjectDocument deserialize(ByteString payload) throws Exception { + if (payload.isEmpty()) { + throw new PayloadNotFoundException(); + } + + JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8()); + ObjectMapper mapper = new ObjectMapper(); + + TypeReference> ref = new TypeReference>() { + }; + Map values = mapper.convertValue(jsonNode, ref); + return new ObjectDocument(values); + } +} diff --git a/src/main/java/org/listware/core/documents/entity/Created.java b/src/main/java/org/listware/core/documents/entity/Created.java new file mode 100644 index 0000000..71f9901 --- /dev/null +++ b/src/main/java/org/listware/core/documents/entity/Created.java @@ -0,0 +1,21 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +//TODO: in v7 add targets ElementType.METHOD and ElementType.PARAMETER +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonProperty(DocumentFields.CREATED) +@JsonInclude(JsonInclude.Include.NON_NULL) +public @interface Created { +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/documents/entity/DocumentFields.java b/src/main/java/org/listware/core/documents/entity/DocumentFields.java new file mode 100644 index 0000000..53a17af --- /dev/null +++ b/src/main/java/org/listware/core/documents/entity/DocumentFields.java @@ -0,0 +1,20 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents.entity; + +public class DocumentFields { + + private DocumentFields() { + } + + public static final String ID = "_id"; + public static final String KEY = "_key"; + public static final String REV = "_rev"; + public static final String FROM = "_from"; + public static final String TO = "_to"; + public static final String NAME = "_name"; + public static final String TYPE = "_type"; + public static final String META = "_meta"; + public static final String CREATED = "created"; + public static final String UPDATED = "updated"; +} diff --git a/src/main/java/org/listware/core/documents/entity/Meta.java b/src/main/java/org/listware/core/documents/entity/Meta.java new file mode 100644 index 0000000..243a7a2 --- /dev/null +++ b/src/main/java/org/listware/core/documents/entity/Meta.java @@ -0,0 +1,21 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +// TODO: in v7 add targets ElementType.METHOD and ElementType.PARAMETER +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonProperty(DocumentFields.META) +@JsonInclude(JsonInclude.Include.NON_NULL) +public @interface Meta { +} diff --git a/src/main/java/org/listware/core/documents/entity/Name.java b/src/main/java/org/listware/core/documents/entity/Name.java new file mode 100644 index 0000000..5a884b0 --- /dev/null +++ b/src/main/java/org/listware/core/documents/entity/Name.java @@ -0,0 +1,21 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +// TODO: in v7 add targets ElementType.METHOD and ElementType.PARAMETER +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonProperty(DocumentFields.NAME) +@JsonInclude(JsonInclude.Include.NON_NULL) +public @interface Name { +} diff --git a/src/main/java/org/listware/core/documents/entity/Type.java b/src/main/java/org/listware/core/documents/entity/Type.java new file mode 100644 index 0000000..0f83447 --- /dev/null +++ b/src/main/java/org/listware/core/documents/entity/Type.java @@ -0,0 +1,21 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +//TODO: in v7 add targets ElementType.METHOD and ElementType.PARAMETER +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonProperty(DocumentFields.TYPE) +@JsonInclude(JsonInclude.Include.NON_NULL) +public @interface Type { +} diff --git a/src/main/java/org/listware/core/documents/entity/Updated.java b/src/main/java/org/listware/core/documents/entity/Updated.java new file mode 100644 index 0000000..0fd5223 --- /dev/null +++ b/src/main/java/org/listware/core/documents/entity/Updated.java @@ -0,0 +1,21 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.documents.entity; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +//TODO: in v7 add targets ElementType.METHOD and ElementType.PARAMETER +@Target(ElementType.FIELD) +@Retention(RetentionPolicy.RUNTIME) +@JacksonAnnotationsInside +@JsonProperty(DocumentFields.UPDATED) +@JsonInclude(JsonInclude.Include.NON_NULL) +public @interface Updated { +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/provider/FunctionProvider.java b/src/main/java/org/listware/core/provider/FunctionProvider.java new file mode 100644 index 0000000..d9a4e03 --- /dev/null +++ b/src/main/java/org/listware/core/provider/FunctionProvider.java @@ -0,0 +1,303 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider; + +import java.util.Arrays; +import java.util.Collection; + +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.apache.flink.statefun.sdk.StatefulFunctionProvider; +import org.listware.io.utils.Constants.Cmdb; +import org.listware.io.utils.QDSLClient; +import org.listware.sdk.pbcmdb.pbqdsl.QDSL; +import org.listware.core.documents.LinkDocument; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.functions.Link; +import org.listware.core.provider.functions.Log; +import org.listware.core.provider.functions.Object; +import org.listware.core.provider.functions.ObjectTrigger; +import org.listware.core.provider.functions.Register; +import org.listware.core.provider.functions.Router; +import org.listware.core.provider.functions.Type; +import org.listware.core.provider.functions.TypeTrigger; +import org.listware.core.provider.utils.Cmdb.Collections; +import org.listware.core.provider.utils.Cmdb.LinkTypes; +import org.listware.core.provider.utils.Cmdb.SystemKeys; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.arangodb.ArangoCollection; +import com.arangodb.ArangoDB; +import com.arangodb.ArangoDatabase; +import com.arangodb.ArangoGraph; +import com.arangodb.DbName; +import com.arangodb.entity.CollectionType; +import com.arangodb.entity.DocumentCreateEntity; +import com.arangodb.entity.EdgeDefinition; +import com.arangodb.entity.KeyType; +import com.arangodb.mapping.ArangoJack; +import com.arangodb.model.CollectionCreateOptions; +import com.arangodb.model.DocumentCreateOptions; + +public class FunctionProvider implements StatefulFunctionProvider { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(FunctionProvider.class); + + // graph name + private static final String SYSTEM_GRAPH = "system"; + + // system function types + private static final String SYSTEM_TYPE = "system"; + private static final String FUNCTION_CONTAINER_TYPE = "function-container"; + private static final String FUNCTION_TYPE = "function"; + private static final String FUNCTIONS_LINK_NAME = "functions"; + private static final String SYSTEM_LINK_NAME = "system"; + private static final String TYPES_FUNCTION_LINK_NAME = "types"; + private static final String OBJECTS_FUNCTION_LINK_NAME = "objects"; + private static final String LINKS_FUNCTION_LINK_NAME = "links"; + private static final String ROUTER_FUNCTION_LINK_NAME = "router"; + + private static final DbName DB_NAME = DbName.of(Cmdb.DBNAME); + + private QDSLClient client = new QDSLClient(); + private Log log = new Log(); + private Register register = new Register(); + + private ArangoGraph graph; + + private Object object; + private ObjectTrigger objectTrigger; + private Type type; + private TypeTrigger typeTrigger; + private Link link; + private Router qdsl; + + private ArangoCollection system, objects, types, links; + + public FunctionProvider() { + graph = bootstrap(); + + qdsl = new Router(client); + type = new Type(graph); + typeTrigger = new TypeTrigger(graph); + object = new Object(graph); + objectTrigger = new ObjectTrigger(graph); + link = new Link(graph); + } + + @Override + public StatefulFunction functionOfType(FunctionType functionType) { + if (functionType.equals(Router.FUNCTION_TYPE)) { + return qdsl; + } + + if (functionType.equals(Type.FUNCTION_TYPE)) { + return type; + } + if (functionType.equals(TypeTrigger.FUNCTION_TYPE)) { + return typeTrigger; + } + + if (functionType.equals(Object.FUNCTION_TYPE)) { + return object; + } + if (functionType.equals(ObjectTrigger.FUNCTION_TYPE)) { + return objectTrigger; + } + + if (functionType.equals(Link.FUNCTION_TYPE)) { + return link; + } + + if (functionType.equals(Log.FUNCTION_TYPE)) { + return log; + } + + if (functionType.equals(Register.FUNCTION_TYPE)) { + return register; + } + + return null; + } + + private ArangoGraph bootstrap() { + // TODO db and collections bootstrap + ArangoDB arango = new ArangoDB.Builder().host(Cmdb.ADDR, Cmdb.PORT) + // .useSsl(true).sslContext(sslContext) + .serializer(new ArangoJack()).user(Cmdb.USER).password(Cmdb.PASSWORD).build(); + + ArangoDatabase db = arango.db(DB_NAME); + if (!db.exists()) { + db.create(); + } + + system = db.collection(Collections.SYSTEM); + if (!system.exists()) { + // TODO set 'isSystem' for system + CollectionCreateOptions opts = new CollectionCreateOptions().isSystem(true); + system.create(opts); + } + + types = db.collection(Collections.TYPES); + if (!types.exists()) { + types.create(); + } + + objects = db.collection(Collections.OBJECTS); + if (!objects.exists()) { + // TODO set 'uuid' for objects + CollectionCreateOptions opts = new CollectionCreateOptions().keyOptions(false, KeyType.uuid, null, null); + objects.create(opts); + } + + links = db.collection(Collections.LINKS); + if (!links.exists()) { + // TODO set 'edge' for links + CollectionCreateOptions ops = new CollectionCreateOptions().type(CollectionType.EDGES); + links.create(ops); + } + + // TODO create 'root' + if (!system.documentExists(SystemKeys.ROOT)) { + system.insertDocument(new ObjectDocument(SystemKeys.ROOT)); + } + + ObjectDocument root = system.getDocument(SystemKeys.ROOT, ObjectDocument.class); + + // TODO create 'objects' + if (!system.documentExists(SystemKeys.OBJECTS)) { + insertSystem(root, SystemKeys.OBJECTS); + } + + ObjectDocument objectsBaseDocument = system.getDocument(SystemKeys.OBJECTS, ObjectDocument.class); + + // TODO create 'types' + if (!system.documentExists(SystemKeys.TYPES)) { + insertSystem(root, SystemKeys.TYPES); + } + + ObjectDocument typesBaseDocument = system.getDocument(SystemKeys.TYPES, ObjectDocument.class); + + // TODO create 'function-container' + if (!types.documentExists(FUNCTION_CONTAINER_TYPE)) { + insertType(typesBaseDocument, FUNCTION_CONTAINER_TYPE); + } + + // TODO create 'function' + if (!types.documentExists(FUNCTION_TYPE)) { + insertType(typesBaseDocument, FUNCTION_TYPE); + } + + QDSL.Options options = QDSL.Options.newBuilder().build(); + + // TODO create 'functions.root' from type 'function-container' + QDSL.Elements elements = client.qdsl("functions.root", options); + if (elements.getElementsCount() == 0) { + + ObjectDocument typeBaseDocument = types.getDocument(FUNCTION_CONTAINER_TYPE, ObjectDocument.class); + + ObjectDocument functionsBaseDocument = insertObject(typeBaseDocument, objectsBaseDocument, root, + FUNCTIONS_LINK_NAME); + + ObjectDocument systemBaseDocument = insertObject(typeBaseDocument, objectsBaseDocument, + functionsBaseDocument, SYSTEM_LINK_NAME); + + typeBaseDocument = types.getDocument(FUNCTION_TYPE, ObjectDocument.class); + insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, TYPES_FUNCTION_LINK_NAME); + insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, OBJECTS_FUNCTION_LINK_NAME); + insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, LINKS_FUNCTION_LINK_NAME); + insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, ROUTER_FUNCTION_LINK_NAME); + } + + // TODO graph bootstrap + ArangoGraph graph = db.graph(SYSTEM_GRAPH); + if (!graph.exists()) { + // TODO one edge of 1 links collection + EdgeDefinition EdgeDefinition = new EdgeDefinition().collection(Collections.LINKS) + .from(Collections.SYSTEM, Collections.TYPES, Collections.OBJECTS) + .to(Collections.TYPES, Collections.OBJECTS); + + Collection edgeDefinitions = Arrays.asList(EdgeDefinition); + db.createGraph(SYSTEM_GRAPH, edgeDefinitions); + } + return graph; + } + + private ObjectDocument insertObject(ObjectDocument type, ObjectDocument object, ObjectDocument parent, + String name) { + // insert to 'objects' + DocumentCreateEntity documentCreateEntity = objects.insertDocument(new ObjectDocument(), + new DocumentCreateOptions().returnNew(true)); + + String id = documentCreateEntity.getId(); + String key = documentCreateEntity.getKey(); + + // Link type -> object ($uuid) + insertLink(type.getId(), id, key, type.getKey()); + + // Link objects -> object ($uuid) + insertLink(object.getId(), id, key, type.getKey()); + + // Link parent -> object ('name') + insertLink(parent.getId(), id, name, type.getKey()); + + return documentCreateEntity.getNew(); + } + + private void insertType(ObjectDocument typesBaseDocument, String key) { + DocumentCreateEntity documentCreateEntity = types.insertDocument(new ObjectDocument(key)); + insertLink(typesBaseDocument.getId(), documentCreateEntity.getId(), key, LinkTypes.TYPE); + } + + private void insertSystem(ObjectDocument root, String key) { + DocumentCreateEntity documentCreateEntity = system.insertDocument(new ObjectDocument(key)); + insertLink(root.getId(), documentCreateEntity.getId(), key, SYSTEM_TYPE); + } + + private void insertLink(String from, String to, String name, String type) { + LinkDocument baseEdgeDocument = new LinkDocument(from, to, name, type); + links.insertDocument(baseEdgeDocument); + } + + SSLContext createSSLContext() throws NoSuchAlgorithmException, KeyManagementException { + final SSLContext sslContext = SSLContext.getInstance("TLS"); + + sslContext.init(null, new TrustManager[] { new X509TrustManager() { + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } }, null); + + HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory()); + HttpsURLConnection.setDefaultHostnameVerifier(new HostnameVerifier() { + + public boolean verify(String hostname, SSLSession session) { + return true; + } + }); + return sslContext; + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/Arango.java b/src/main/java/org/listware/core/provider/functions/Arango.java new file mode 100644 index 0000000..54382f2 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Arango.java @@ -0,0 +1,174 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.statefun.sdk.Context; + +import com.arangodb.ArangoCursor; +import com.arangodb.ArangoEdgeCollection; +import com.arangodb.ArangoGraph; +import com.arangodb.ArangoVertexCollection; +import com.arangodb.entity.EdgeEntity; +import com.google.protobuf.ByteString; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.listware.io.utils.QDSLClient; +import org.listware.io.utils.VertexClient; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.core.FunctionContext; +import org.listware.core.documents.LinkDocument; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.utils.Cmdb.Collections; +import org.listware.core.provider.utils.Cmdb.Matcher; +import org.listware.core.provider.utils.Cmdb.SystemKeys; +import org.listware.core.provider.utils.exceptions.NoLinkException; + +/** + * Middleware for CUD functions + ** + */ +public class Arango extends Base { + private static final Logger LOG = LoggerFactory.getLogger(Arango.class); + + private VertexClient vertexClient = new VertexClient(); + protected QDSLClient client = new QDSLClient(); + + protected ArangoGraph graph; + protected ArangoVertexCollection system, types, objects; + protected ArangoEdgeCollection links; + + public Arango(ArangoGraph graph) { + this.graph = graph; + + system = graph.vertexCollection(Collections.SYSTEM); + types = graph.vertexCollection(Collections.TYPES); + objects = graph.vertexCollection(Collections.OBJECTS); + links = graph.edgeCollection(Collections.LINKS); + } + + @Override + public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception { + String id = context.self().id(); + ObjectDocument baseDocument = readContext(id); + + FunctionContext functionContext = new FunctionContext(context, baseDocument, pbFunctionContext); + + invoke(functionContext); + + functionContext.callback(); + } + + public void invoke(FunctionContext functionContext) throws Exception { + + } + + protected ObjectDocument readContext(String key) throws Exception { + if (key.equals(SystemKeys.ROOT) || key.equals(SystemKeys.OBJECTS) || key.equals(SystemKeys.TYPES)) { + + Core.Response resp = vertexClient.read(key, Collections.SYSTEM); + + return ObjectDocument.deserialize(resp.getPayload()); + } else if (key.matches(Matcher.UUID_V4_STRING)) { + Core.Response resp = vertexClient.read(key, Collections.OBJECTS); + + return ObjectDocument.deserialize(resp.getPayload()); + } else if (key.matches(Matcher.NUMERIC_STRING)) { + Core.Response resp = vertexClient.read(key, Collections.LINKS); + + return LinkDocument.deserialize(resp.getPayload()); + } else { + Core.Response resp = vertexClient.read(key, Collections.TYPES); + + return ObjectDocument.deserialize(resp.getPayload()); + } + } + +// *[?$._from == "%s" && $._name == "%s"?] + + protected LinkDocument findByFromName(String from, String name) throws NoLinkException { +// QDSL.Options options = QDSL.Options.newBuilder().setObject(true).build(); +// +// String query = String.format("%s.objects", functionContext.Context().self().id()); +// +// QDSL.Elements elements = client.qdsl(query, options); +// + + String query = "FOR t IN links FILTER t._name == @name && t._from == @from RETURN t"; + + Map bindVars = new HashMap(); + bindVars.put("name", name); + bindVars.put("from", from); + ArangoCursor cursor = graph.db().query(query, bindVars, LinkDocument.class); + + if (cursor.hasNext()) { + return cursor.next(); + } + + throw new NoLinkException(from, name); + } + + private void insertLink(LinkDocument linkDocument) throws Exception { + EdgeEntity edgeEntity = links.insertEdge(linkDocument); + + LOG.info("created " + edgeEntity.getId()); + +// pbqdsl.Options options = pbqdsl.Options.newBuilder().setType(true).build(); +// +// String query = String.format("*[?$._to == '%s'?].objects", linkDocument.getFrom()); +// +// pbqdsl.Elements elements = client.qdsl(query, options); +// +// if (elements.getElementsCount() > 0) { +// String type = elements.getElements(0).getType(); +// LOG.info("insert: from " + type); +// } +// +// query = String.format("*[?$._to == '%s'?].objects", linkDocument.getTo()); +// +// elements = client.qdsl(query, options); +// +// if (elements.getElementsCount() > 0) { +// String type = elements.getElements(0).getType(); +// LOG.info("insert: to " + type); +// } + + } + + protected void insertLink(String from, String to, String name, String type) throws Exception { + LinkDocument linkDocument = new LinkDocument(from, to, name, type); + insertLink(linkDocument); + } + + protected void insertLink(String from, String to, String name, String type, ByteString payload) throws Exception { + LinkDocument linkDocument = LinkDocument.deserialize(payload); + linkDocument.setFrom(from); + linkDocument.setTo(to); + linkDocument.setName(name); + linkDocument.setType(type); + insertLink(linkDocument); + } + + /** + * Exec function + ** + * @param id string + * @param namespace string + * @param type string + * @param method Method + */ + public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type) + .build(); + + Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id).setValue(typeMessage.toByteString()); + return builder.build(); + } +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/provider/functions/Base.java b/src/main/java/org/listware/core/provider/functions/Base.java new file mode 100644 index 0000000..e66cb77 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Base.java @@ -0,0 +1,67 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.io.functions.egress.Egress; +import org.listware.sdk.Functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Base implements StatefulFunction { + private static final Logger LOG = LoggerFactory.getLogger(Base.class); + + @Override + public void invoke(Context context, java.lang.Object input) { + long startTime = System.currentTimeMillis(); + + LOG.info(context.self().toString()); + + if (!(input instanceof TypedValue)) { + LOG.error("unknown message received: " + input); + return; + } + + Functions.FunctionResult.Builder functionResultBuilder = Functions.FunctionResult.newBuilder(); + + TypedValue typedValue = (TypedValue) input; + + Functions.FunctionContext functionContext = null; + + try { + functionContext = Functions.FunctionContext.parseFrom(typedValue.getValue()); + + invoke(context, functionContext); + + functionResultBuilder.setComplete(true); + } catch (Exception e) { + LOG.error(e.getLocalizedMessage()); + functionResultBuilder.setError(e.getLocalizedMessage()); + } finally { + if (functionContext.hasReplyEgress()) { + Functions.ReplyEgress replyEgress = functionContext.getReplyEgress(); + + Functions.FunctionResult functionResult = functionResultBuilder.setReplyEgress(replyEgress).build(); + + TypedValue newTypedValue = TypedValue.newBuilder().setValue(functionResult.toByteString()) + .setHasValue(true).build(); + + LOG.info("ReplyEgress result: " + replyEgress.getId()); + + context.send(Egress.EGRESS, newTypedValue); + } + + long endTime = System.currentTimeMillis(); + + LOG.info(context.self().type() + ": took " + (endTime - startTime) + " milliseconds"); + } + + } + + public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { + + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/Link.java b/src/main/java/org/listware/core/provider/functions/Link.java new file mode 100644 index 0000000..09bfc54 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Link.java @@ -0,0 +1,139 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import javax.annotation.Nullable; + +import org.apache.flink.statefun.sdk.FunctionType; + +import com.arangodb.ArangoGraph; +import com.arangodb.entity.EdgeUpdateEntity; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.core.FunctionContext; +import org.listware.core.documents.LinkDocument; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.utils.exceptions.AlreadyLinkException; +import org.listware.core.provider.utils.exceptions.NoLinkException; +import org.listware.core.provider.utils.exceptions.UnknownIdException; +import org.listware.core.provider.utils.exceptions.UnknownMethodException; + +/** + * Link CUD arangodb service for links + ** + */ +public class Link extends Arango { + private static final Logger LOG = LoggerFactory.getLogger(Link.class); + + public static final String TYPE = "links.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public Link(ArangoGraph graph) { + super(graph); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case CREATE: + create(functionContext, message); + break; + + case UPDATE: + update(functionContext, message); + break; + + case DELETE: + delete(functionContext, message); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + private void create(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + // do not create link from link + if (functionContext.isLink()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + try { + // or by from+name + findByFromName(functionContext.getDocument().getId(), message.getName()); + throw new AlreadyLinkException(functionContext.getDocument().getId(), message.getName()); + } catch (NoLinkException ignored) { + } + + ObjectDocument baseDocument = readContext(message.getTo()); + + // FIXME _type? + // types -> type == `Cmdb.TYPE_TYPE` + // type -> object == type + // object1 -> object2 == type of object2 + insertLink(functionContext.getDocument().getId(), baseDocument.getId(), message.getName(), message.getType(), + message.getPayload()); + } + + private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + LinkDocument document = null; + + if (!functionContext.isLink()) { + document = findByFromName(functionContext.getDocument().getId(), message.getName()); + } else { + document = (LinkDocument) functionContext.getDocument(); + } + + LinkDocument newDocument = LinkDocument.deserialize(message.getPayload()); + + document.setProperties(newDocument.getProperties()); + + document.updateMeta(); + + EdgeUpdateEntity edgeUpdateEntity = links.replaceEdge(document.getKey(), document); + LOG.info("updated link " + edgeUpdateEntity.getId()); + } + + private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + ObjectDocument prevLinkDocument = functionContext.getDocument(); + + if (!functionContext.isLink()) { + prevLinkDocument = findByFromName(functionContext.getDocument().getId(), message.getName()); + } + + links.deleteEdge(prevLinkDocument.getKey()); + LOG.info("deleted link " + prevLinkDocument.getId()); + } + + /** + * CreateLink create link 'from' -> 'to' with 'name' + ** + * @param from will be ('root', 'node', '17136214') + * @param to will be ('root', 'node', '17136214') + * @param name string + * @param callback FunctionContext (optional) + */ + public static Functions.FunctionContext CreateLink(String from, String to, String name, + @Nullable Functions.FunctionContext callback) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Link.TYPE).build(); + + Core.LinkMessage linkMessage = Core.LinkMessage.newBuilder().setMethod(Core.Method.CREATE).setName(name) + .setTo(to).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(from).setValue(linkMessage.toByteString()); + + if (callback != null) { + builder = builder.setCallback(callback); + } + return builder.build(); + } +} diff --git a/src/main/java/org/listware/core/provider/functions/Log.java b/src/main/java/org/listware/core/provider/functions/Log.java new file mode 100644 index 0000000..788bb40 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Log.java @@ -0,0 +1,51 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Log implements StatefulFunction { + private static final Logger LOG = LoggerFactory.getLogger(Log.class); + + public static final String TYPE = "log.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + @Override + public void invoke(Context context, java.lang.Object input) { + LOG.info(context.self().toString()); + + long startTime = System.currentTimeMillis(); + + try { + middleware(context, input); + } catch (Exception e) { + LOG.error(e.getLocalizedMessage()); + } + long endTime = System.currentTimeMillis(); + + LOG.info(context.self().type() + ": took " + (endTime - startTime) + " milliseconds"); + } + + @SuppressWarnings("unused") + private void middleware(Context context, java.lang.Object input) throws Exception { + LOG.info("LOG: called from " + context.caller().toString()); + + if (!(input instanceof TypedValue)) { + throw new IllegalArgumentException("unknown message received " + input); + } + + TypedValue value = (TypedValue) input; + + // TODO use message + Functions.FunctionContext functionContext = Functions.FunctionContext.parseFrom(value.getValue()); + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/Object.java b/src/main/java/org/listware/core/provider/functions/Object.java new file mode 100644 index 0000000..6840102 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Object.java @@ -0,0 +1,95 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; + +import com.arangodb.ArangoGraph; +import com.arangodb.entity.VertexUpdateEntity; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.core.FunctionContext; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.utils.exceptions.UnknownIdException; +import org.listware.core.provider.utils.exceptions.UnknownMethodException; + +/** + * Object CUD arangodb service for objects + ** + */ +public class Object extends Arango { + private static final Logger LOG = LoggerFactory.getLogger(Object.class); + + public static final String TYPE = "objects.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public Object(ArangoGraph graph) { + super(graph); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case CREATE_CHILD: + createChild(functionContext, message); + break; + + case UPDATE: + update(functionContext, message); + break; + + case DELETE: + delete(functionContext); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + private void createChild(FunctionContext functionContext, Core.ObjectMessage message) throws Exception { + Functions.FunctionContext pbFunctionContext = Type.CreateObject(message.getType(), message.getName(), + message.getPayload(), functionContext.getCallback()); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + functionContext.getFlinkContext().send(Type.FUNCTION_TYPE, message.getType(), typedValue); + } + + private void update(FunctionContext functionContext, Core.ObjectMessage message) throws Exception { + if (!functionContext.isObject()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + ObjectDocument document = functionContext.getDocument(); + + ObjectDocument newDocument = ObjectDocument.deserialize(message.getPayload()); + + document.setProperties(newDocument.getProperties()); + + document.updateMeta(); + + VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), + document); + + LOG.info("updated object " + vertexUpdateDocument.getId()); + } + + private void delete(FunctionContext functionContext) throws Exception { + if (!functionContext.isObject()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + objects.deleteVertex(functionContext.getFlinkContext().self().id()); + LOG.info("deleted object " + functionContext.getFlinkContext().self().id()); + } +} diff --git a/src/main/java/org/listware/core/provider/functions/ObjectTrigger.java b/src/main/java/org/listware/core/provider/functions/ObjectTrigger.java new file mode 100644 index 0000000..0c83bfe --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/ObjectTrigger.java @@ -0,0 +1,65 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.sdk.pbcmdb.pbqdsl.QDSL; +import org.listware.core.FunctionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.arangodb.ArangoGraph; + +public class ObjectTrigger extends Arango { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(ObjectTrigger.class); + + private static final String TYPE = "trigger.objects.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public ObjectTrigger(ArangoGraph graph) { + super(graph); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + QDSL.Options options = QDSL.Options.newBuilder().setType(true).build(); + + String query = String.format("%s.objects", functionContext.getFlinkContext().self().id()); + + QDSL.Elements elements = client.qdsl(query, options); + + for (QDSL.Element element : elements.getElementsList()) { + Functions.FunctionContext pbFunctionContext = TypeTrigger.Trigger(element.getType(), message.getMethod()); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + functionContext.getFlinkContext().send(TypeTrigger.FUNCTION_TYPE, element.getType(), typedValue); + } + } + + /** + * Trigger trigger + ** + * @param id string + * @param method Method + */ + public static Functions.FunctionContext Trigger(String id, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(TYPE).build(); + + Core.ObjectMessage message = Core.ObjectMessage.newBuilder().setMethod(method).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id).setValue(message.toByteString()); + return builder.build(); + } +} diff --git a/src/main/java/org/listware/core/provider/functions/Register.java b/src/main/java/org/listware/core/provider/functions/Register.java new file mode 100644 index 0000000..0fa3154 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Register.java @@ -0,0 +1,79 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.io.functions.egress.EgressReader; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.Functions.ReplyEgress; +import org.listware.sdk.pbcmdb.Core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Register extends Base { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Register.class); + + public static final String TYPE = "register.system.functions.root"; + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + private EgressReader egressReader = new EgressReader(TYPE, TYPE); + + @Override + public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { + Core.RegisterMessage registerMessage = Core.RegisterMessage.parseFrom(functionContext.getValue()); + + registerTypes(context, registerMessage); + + registerObjects(context, registerMessage); + } + + private void registerTypes(Context context, Core.RegisterMessage registerMessage) throws Exception { + for (Core.RegisterTypeMessage registerTypeMessage : registerMessage.getTypeMessagesList()) { + + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Type.TYPE).build(); + + ReplyEgress replyEgress = egressReader.replyEgress(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder() + .setReplyEgress(replyEgress).setId(registerTypeMessage.getId()).setFunctionType(functionType) + .setValue(registerTypeMessage.getTypeMessage().toByteString()); + + Functions.FunctionContext newFunctionContext = builder.build(); + + TypedValue newTypedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); + + context.send(Type.FUNCTION_TYPE, newFunctionContext.getId(), newTypedValue); + + egressReader.wait(replyEgress.getId()); + } + } + + private void registerObjects(Context context, Core.RegisterMessage registerMessage) throws Exception { + for (Core.RegisterObjectMessage registerObjectMessage : registerMessage.getObjectMessagesList()) { + + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Object.TYPE).build(); + + ReplyEgress replyEgress = egressReader.replyEgress(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder() + .setFunctionType(functionType).setId(registerObjectMessage.getId()).setReplyEgress(replyEgress) + .setValue(registerObjectMessage.getObjectMessage().toByteString()); + + Functions.FunctionContext newFunctionContext = builder.build(); + + TypedValue newTypedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); + + context.send(Object.FUNCTION_TYPE, newFunctionContext.getId(), newTypedValue); + + egressReader.wait(replyEgress.getId()); + } + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/Router.java b/src/main/java/org/listware/core/provider/functions/Router.java new file mode 100644 index 0000000..a7a1f27 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Router.java @@ -0,0 +1,65 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.listware.io.functions.egress.EgressReader; +import org.listware.io.utils.QDSLClient; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.Functions.ReplyEgress; +import org.listware.sdk.pbcmdb.pbqdsl.QDSL; + +/** + * Function router by qdsl + ** + */ +public class Router extends Base { + private static final Logger LOG = LoggerFactory.getLogger(Router.class); + + public static final String TYPE = "router.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + private QDSLClient client; + + private EgressReader egressReader = new EgressReader(TYPE, TYPE); + + public Router(QDSLClient client) { + this.client = client; + } + + @Override + public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { + QDSL.Options options = QDSL.Options.newBuilder().setKey(true).build(); + + QDSL.Elements elements = client.qdsl(context.self().id(), options); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.parseFrom(functionContext.getValue()) + .toBuilder(); + + for (QDSL.Element element : elements.getElementsList()) { + ReplyEgress replyEgress = egressReader.replyEgress(); + + Functions.FunctionContext newFunctionContext = builder.setReplyEgress(replyEgress).setId(element.getKey()) + .build(); + + String namespace = newFunctionContext.getFunctionType().getNamespace(); + String type = newFunctionContext.getFunctionType().getType(); + FunctionType functionType = new FunctionType(namespace, type); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); + + LOG.info("send: " + functionType + " id " + newFunctionContext.getId()); + + context.send(functionType, newFunctionContext.getId(), typedValue); + egressReader.wait(replyEgress.getId()); + } + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/Type.java b/src/main/java/org/listware/core/provider/functions/Type.java new file mode 100644 index 0000000..83f8ee6 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Type.java @@ -0,0 +1,215 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; + +import com.arangodb.ArangoGraph; +import com.arangodb.entity.VertexEntity; +import com.arangodb.entity.VertexUpdateEntity; + +import com.google.protobuf.ByteString; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.core.FunctionContext; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.utils.Trigger; +import org.listware.core.provider.utils.Cmdb.LinkTypes; +import org.listware.core.provider.utils.Cmdb.SystemKeys; +import org.listware.core.provider.utils.exceptions.AlreadyLinkException; +import org.listware.core.provider.utils.exceptions.NoLinkException; +import org.listware.core.provider.utils.exceptions.UnknownIdException; +import org.listware.core.provider.utils.exceptions.UnknownMethodException; + +/** + * Type CUD arangodb service for types + ** + */ +public class Type extends Arango { +// @Persisted +// private final PersistedValue TargetControllerData = PersistedValue.of("", String.class); + + private static final Logger LOG = LoggerFactory.getLogger(Type.class); + + public static final String TYPE = "types.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public Type(ArangoGraph graph) { + super(graph); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + // TODO only create from 'types.root' or work with 'type' + if (!functionContext.isType() && !functionContext.isTypes()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case CREATE: + create(functionContext, message); + break; + + case CREATE_CHILD: + createChild(functionContext, message); + break; + + case UPDATE: + update(functionContext, message); + break; + + case DELETE: + delete(functionContext); + break; + + case CREATE_TRIGGER: + createTrigger(functionContext, message); + break; + + case DELETE_TRIGGER: + deleteTrigger(functionContext, message); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + private void create(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + if (!functionContext.isTypes()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + ObjectDocument baseDocument = ObjectDocument.deserialize(message.getPayload()); + baseDocument.setKey(message.getName()); + + VertexEntity vertexEntity = types.insertVertex(baseDocument); + + LOG.info("created type " + vertexEntity.getId()); + + // TODO link from types -> type + insertLink(functionContext.getDocument().getId(), vertexEntity.getId(), message.getName(), LinkTypes.TYPE); + } + + private void update(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + ObjectDocument document = functionContext.getDocument(); + + ObjectDocument newDocument = ObjectDocument.deserialize(message.getPayload()); + + document.setProperties(newDocument.getProperties()); + + document.updateMeta(); + + VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), + document); + + LOG.info("updated type " + vertexUpdateDocument.getId()); + } + + // TODO delete all objects with type + private void delete(FunctionContext functionContext) throws Exception { + types.deleteVertex(functionContext.getFlinkContext().self().id()); + LOG.info("deleted type " + functionContext.getFlinkContext().self().id()); + } + + private void createChild(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + ObjectDocument callerBaseDocument = null; + + if (functionContext.getFlinkContext().caller() != null) { + try { + callerBaseDocument = readContext(functionContext.getFlinkContext().caller().id()); + try { + // do not duplicate link with name + findByFromName(callerBaseDocument.getId(), message.getName()); + + throw new AlreadyLinkException(callerBaseDocument.getId(), message.getName()); + } catch (NoLinkException ignored) { + } + } catch (IllegalArgumentException ignored) { + } + } + ObjectDocument baseDocument = ObjectDocument.deserialize(message.getPayload()); + + VertexEntity vertexEntity = objects.insertVertex(baseDocument); + + // Link type -> object ($uuid) + insertLink(functionContext.getDocument().getId(), vertexEntity.getId(), vertexEntity.getKey(), + functionContext.getDocument().getKey()); + + // trigger!!! + Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(vertexEntity.getKey(), Core.Method.CREATE); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + functionContext.getFlinkContext().send(ObjectTrigger.FUNCTION_TYPE, vertexEntity.getKey(), typedValue); + + // Link objects -> object ($uuid) + ObjectDocument objects = system.getVertex(SystemKeys.OBJECTS, ObjectDocument.class); + insertLink(objects.getId(), vertexEntity.getId(), vertexEntity.getKey(), + functionContext.getDocument().getKey()); + + // if caller, link caller -> object (name) + if (callerBaseDocument != null) { + insertLink(callerBaseDocument.getId(), vertexEntity.getId(), message.getName(), + functionContext.getDocument().getKey()); + } + } + + private void createTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); + + ObjectDocument document = Trigger.add(functionContext.getDocument(), trigger); + + document.updateMeta(); + + VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), + document); + LOG.info("created trigger " + vertexUpdateDocument.getId()); + } + + private void deleteTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); + + ObjectDocument document = Trigger.delete(functionContext.getDocument(), trigger); + + document.updateMeta(); + + VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), + document); + LOG.info("deleted trigger " + vertexUpdateDocument.getId()); + } + + /** + * CreateObject create new object + ** + * @param type string + * @param name string + * @param payload ByteString + * @param callback FunctionContext (optional) + */ + public static Functions.FunctionContext CreateObject(String type, String name, ByteString payload, + Functions.FunctionContext callback) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(TYPE).build(); + + Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(Core.Method.CREATE_CHILD).setName(name) + .setPayload(payload).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(type).setValue(typeMessage.toByteString()); + if (callback != null) { + builder = builder.setCallback(callback); + } + return builder.build(); + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/TypeTrigger.java b/src/main/java/org/listware/core/provider/functions/TypeTrigger.java new file mode 100644 index 0000000..12ba790 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/TypeTrigger.java @@ -0,0 +1,87 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import java.util.Map; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.core.FunctionContext; +import org.listware.core.provider.utils.Trigger; +import org.listware.core.provider.utils.exceptions.UnknownMethodException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.arangodb.ArangoGraph; + +public class TypeTrigger extends Arango { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(TypeTrigger.class); + + private static final String TYPE = "trigger.types.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public TypeTrigger(ArangoGraph graph) { + super(graph); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case CREATE: + try { + Map create = Trigger.getByType(functionContext.getDocument(), "create"); + + for (Trigger trigger : create.values()) { + Functions.FunctionContext pbFunctionContext = Arango.Exec( + functionContext.getFlinkContext().caller().id(), trigger.getNamespace(), trigger.getType(), + message.getMethod()); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType()); + + functionContext.getFlinkContext().send(functionType, + functionContext.getFlinkContext().caller().id(), typedValue); + } + } catch (Exception ex) { + + } + + break; + + case UPDATE: + break; + + case DELETE: + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + /** + * Trigger trigger + ** + * @param type string + * @param method Method + */ + public static Functions.FunctionContext Trigger(String type, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(TYPE).build(); + + Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(type).setValue(typeMessage.toByteString()); + return builder.build(); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/Cmdb.java b/src/main/java/org/listware/core/provider/utils/Cmdb.java new file mode 100644 index 0000000..2512edb --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/Cmdb.java @@ -0,0 +1,30 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils; + +public class Cmdb { + // Constant system cmdb keys + public class SystemKeys { + public static final String ROOT = "root"; + public static final String OBJECTS = "objects"; + public static final String TYPES = "types"; + } + + // Collection names + public class Collections { + public static final String SYSTEM = "system"; + public static final String TYPES = "types"; + public static final String OBJECTS = "objects"; + public static final String LINKS = "links"; + } + + // Link entries + public class LinkTypes { + public static final String TYPE = "type"; + } + + public class Matcher { + public static final String UUID_V4_STRING = "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}"; + public static final String NUMERIC_STRING = "\\d+"; + } +} diff --git a/src/main/java/org/listware/core/provider/utils/JsonDeserializer.java b/src/main/java/org/listware/core/provider/utils/JsonDeserializer.java new file mode 100644 index 0000000..6e439e4 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/JsonDeserializer.java @@ -0,0 +1,21 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils; + +import java.util.Map; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + + +public class JsonDeserializer { + public static Map> triggers(Object from) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.FAIL_ON_SELF_REFERENCES, false); + + TypeReference>> ref = new TypeReference>>() { + }; + return mapper.convertValue(from, ref); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/Trigger.java b/src/main/java/org/listware/core/provider/utils/Trigger.java new file mode 100644 index 0000000..28d36d2 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/Trigger.java @@ -0,0 +1,116 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils; + +import java.util.HashMap; +import java.util.Map; + +import org.listware.sdk.pbcmdb.Core; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.utils.exceptions.AlreadyTriggerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Trigger { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Trigger.class); + + private static String triggersKey = "triggers"; + + public Trigger(Core.Trigger trigger) { + this.namespace = trigger.getFunctionType().getNamespace(); + this.type = trigger.getFunctionType().getType(); + } + + @JsonProperty("namespace") + private String namespace; + @JsonProperty("type") + private String type; + + /** + * + * @return The namespace + */ + @JsonProperty("namespace") + public String getNamespace() { + return namespace; + } + + /** + * + * @return The type + */ + @JsonProperty("type") + public String getType() { + return type; + } + + public static Map getByType(ObjectDocument baseDocument, String type) throws Exception { + Map properties = baseDocument.getProperties(); + java.lang.Object object = properties.get(triggersKey); + Map> triggersMap = JsonDeserializer.triggers(object); + return triggersMap.get(type); + } + + public static ObjectDocument add(ObjectDocument baseDocument, Core.Trigger trigger) throws Exception { + String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); + + Map properties = baseDocument.getProperties(); + + Map> triggersMap; + if (properties.containsKey(triggersKey)) { + java.lang.Object object = properties.get(triggersKey); + triggersMap = JsonDeserializer.triggers(object); + } else { + triggersMap = new HashMap>(); + } + + Map triggers; + if (triggersMap.containsKey(trigger.getType())) { + triggers = triggersMap.get(trigger.getType()); + } else { + triggers = new HashMap(); + } + + if (triggers.containsKey(key)) { + throw new AlreadyTriggerException(); + } + + triggers.put(key, new Trigger(trigger)); + + triggersMap.put(trigger.getType(), triggers); + + properties.put(triggersKey, triggersMap); + + baseDocument.setProperties(properties); + return baseDocument; + } + + public static ObjectDocument delete(ObjectDocument baseDocument, Core.Trigger trigger) throws Exception { + Map properties = baseDocument.getProperties(); + java.lang.Object object = properties.get(triggersKey); + + // triggers map + Map> triggersMap = JsonDeserializer.triggers(object); + + // type map + Map triggers = triggersMap.get(trigger.getType()); + + String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); + + if (!triggers.containsKey(key)) { + return baseDocument; + } + + triggers.remove(key); + + triggersMap.put(trigger.getType(), triggers); + + properties.put(triggersKey, triggersMap); + + baseDocument.setProperties(properties); + return baseDocument; + } +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyLinkException.java b/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyLinkException.java new file mode 100644 index 0000000..d160f2b --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyLinkException.java @@ -0,0 +1,11 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +public class AlreadyLinkException extends Exception { + private static final long serialVersionUID = 1L; + + public AlreadyLinkException(String from, String name) { + super(String.format("link %s -> %s already exists: ", from, name)); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyTriggerException.java b/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyTriggerException.java new file mode 100644 index 0000000..1828cf5 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyTriggerException.java @@ -0,0 +1,11 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +public class AlreadyTriggerException extends Exception { + private static final long serialVersionUID = 1L; + + public AlreadyTriggerException() { + super("trigger already exists"); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/NoLinkException.java b/src/main/java/org/listware/core/provider/utils/exceptions/NoLinkException.java new file mode 100644 index 0000000..9718790 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/NoLinkException.java @@ -0,0 +1,11 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +public class NoLinkException extends Exception { + private static final long serialVersionUID = 1L; + + public NoLinkException(String from, String name) { + super(String.format("link %s -> %s not found: ", from, name)); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/PayloadNotFoundException.java b/src/main/java/org/listware/core/provider/utils/exceptions/PayloadNotFoundException.java new file mode 100644 index 0000000..da40339 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/PayloadNotFoundException.java @@ -0,0 +1,11 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +public class PayloadNotFoundException extends Exception { + private static final long serialVersionUID = 1L; + + public PayloadNotFoundException() { + super("payload not found"); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/TriggerNotFoundException.java b/src/main/java/org/listware/core/provider/utils/exceptions/TriggerNotFoundException.java new file mode 100644 index 0000000..c463826 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/TriggerNotFoundException.java @@ -0,0 +1,11 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +public class TriggerNotFoundException extends Exception { + private static final long serialVersionUID = 1L; + + public TriggerNotFoundException() { + super("trigger not found"); + } +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/UnknownIdException.java b/src/main/java/org/listware/core/provider/utils/exceptions/UnknownIdException.java new file mode 100644 index 0000000..d2c5b8e --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/UnknownIdException.java @@ -0,0 +1,12 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +public class UnknownIdException extends Exception { + + private static final long serialVersionUID = 1L; + + public UnknownIdException(String id) { + super(String.format("unknown id '%s'", id)); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/UnknownMethodException.java b/src/main/java/org/listware/core/provider/utils/exceptions/UnknownMethodException.java new file mode 100644 index 0000000..36b40e7 --- /dev/null +++ b/src/main/java/org/listware/core/provider/utils/exceptions/UnknownMethodException.java @@ -0,0 +1,13 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.utils.exceptions; + +import org.listware.sdk.pbcmdb.Core; + +public class UnknownMethodException extends Exception { + private static final long serialVersionUID = 1L; + + public UnknownMethodException(Core.Method method) { + super(String.format("unknown method '%s'", method.toString())); + } +} diff --git a/src/main/resources/META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule b/src/main/resources/META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule new file mode 100644 index 0000000..4da5d44 --- /dev/null +++ b/src/main/resources/META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule @@ -0,0 +1 @@ +org.listware.core.Module