From 9289858320eb61ba668d989d5d682ce386ab1900 Mon Sep 17 00:00:00 2001 From: fg-admin Date: Wed, 25 Jan 2023 11:53:59 +0300 Subject: [PATCH] Add link triggers Move arangodb to other pakage Add register function --- pom.xml | 6 +- .../org/listware/core/FunctionContext.java | 43 +- src/main/java/org/listware/core/Module.java | 16 +- .../java/org/listware/core/cmdb/Cmdb.java | 407 ++++++++++++++++++ .../org/listware/core/cmdb/LinkClient.java | 76 ++++ .../org/listware/core/cmdb/ObjectClient.java | 81 ++++ .../listware/core/cmdb/RegisterMessage.java | 107 +++++ .../java/org/listware/core/cmdb/Trigger.java | 209 +++++++++ .../listware/core/documents/LinkDocument.java | 46 +- .../core/documents/ObjectDocument.java | 98 +++-- .../core/documents/entity/Created.java | 2 +- .../core/documents/entity/DocumentFields.java | 1 + .../core/documents/entity/Updated.java | 2 +- .../core/provider/FunctionProvider.java | 278 ++---------- .../core/provider/functions/Arango.java | 174 -------- .../core/provider/functions/Base.java | 68 +-- .../core/provider/functions/Link.java | 139 ------ .../core/provider/functions/Register.java | 190 ++++++-- .../core/provider/functions/Router.java | 23 +- .../core/provider/functions/Sync.java | 150 +++++++ .../core/provider/functions/Type.java | 215 --------- .../core/provider/functions/TypeTrigger.java | 87 ---- .../provider/functions/link/AdvancedLink.java | 77 ++++ .../provider/functions/link/LinkContext.java | 34 ++ .../provider/functions/link/LinkTrigger.java | 127 ++++++ .../core/provider/functions/object/Link.java | 163 +++++++ .../functions/{ => object}/Object.java | 45 +- .../functions/object/ObjectContext.java | 36 ++ .../functions/{ => object}/ObjectTrigger.java | 28 +- .../core/provider/functions/object/Type.java | 171 ++++++++ .../functions/object/TypeTrigger.java | 102 +++++ .../listware/core/provider/utils/Cmdb.java | 30 -- .../core/provider/utils/JsonDeserializer.java | 21 - .../listware/core/provider/utils/Trigger.java | 116 ----- .../listware/core/utils/ErrorContainer.java | 59 +++ .../exceptions/AlreadyLinkException.java | 2 +- .../exceptions/AlreadyTriggerException.java | 2 +- .../utils/exceptions/NoLinkException.java | 6 +- .../exceptions/PayloadNotFoundException.java | 2 +- .../exceptions/TriggerNotFoundException.java | 4 +- .../utils/exceptions/UnknownIdException.java | 2 +- .../exceptions/UnknownMethodException.java | 2 +- 42 files changed, 2200 insertions(+), 1247 deletions(-) create mode 100644 src/main/java/org/listware/core/cmdb/Cmdb.java create mode 100644 src/main/java/org/listware/core/cmdb/LinkClient.java create mode 100644 src/main/java/org/listware/core/cmdb/ObjectClient.java create mode 100644 src/main/java/org/listware/core/cmdb/RegisterMessage.java create mode 100644 src/main/java/org/listware/core/cmdb/Trigger.java delete mode 100644 src/main/java/org/listware/core/provider/functions/Arango.java delete mode 100644 src/main/java/org/listware/core/provider/functions/Link.java create mode 100644 src/main/java/org/listware/core/provider/functions/Sync.java delete mode 100644 src/main/java/org/listware/core/provider/functions/Type.java delete mode 100644 src/main/java/org/listware/core/provider/functions/TypeTrigger.java create mode 100644 src/main/java/org/listware/core/provider/functions/link/AdvancedLink.java create mode 100644 src/main/java/org/listware/core/provider/functions/link/LinkContext.java create mode 100644 src/main/java/org/listware/core/provider/functions/link/LinkTrigger.java create mode 100644 src/main/java/org/listware/core/provider/functions/object/Link.java rename src/main/java/org/listware/core/provider/functions/{ => object}/Object.java (70%) create mode 100644 src/main/java/org/listware/core/provider/functions/object/ObjectContext.java rename src/main/java/org/listware/core/provider/functions/{ => object}/ObjectTrigger.java (64%) create mode 100644 src/main/java/org/listware/core/provider/functions/object/Type.java create mode 100644 src/main/java/org/listware/core/provider/functions/object/TypeTrigger.java delete mode 100644 src/main/java/org/listware/core/provider/utils/Cmdb.java delete mode 100644 src/main/java/org/listware/core/provider/utils/JsonDeserializer.java delete mode 100644 src/main/java/org/listware/core/provider/utils/Trigger.java create mode 100644 src/main/java/org/listware/core/utils/ErrorContainer.java rename src/main/java/org/listware/core/{provider => }/utils/exceptions/AlreadyLinkException.java (83%) rename src/main/java/org/listware/core/{provider => }/utils/exceptions/AlreadyTriggerException.java (80%) rename src/main/java/org/listware/core/{provider => }/utils/exceptions/NoLinkException.java (63%) rename src/main/java/org/listware/core/{provider => }/utils/exceptions/PayloadNotFoundException.java (80%) rename src/main/java/org/listware/core/{provider => }/utils/exceptions/TriggerNotFoundException.java (79%) rename src/main/java/org/listware/core/{provider => }/utils/exceptions/UnknownIdException.java (81%) rename src/main/java/org/listware/core/{provider => }/utils/exceptions/UnknownMethodException.java (85%) diff --git a/pom.xml b/pom.xml index 2929837..8424107 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.listware core - 1.0 + 1.1 jar @@ -84,13 +84,13 @@ org.listware io - 1.0 + 1.1 provided org.listware proto - 1.0 + 1.1 diff --git a/src/main/java/org/listware/core/FunctionContext.java b/src/main/java/org/listware/core/FunctionContext.java index c4d8f61..0db0c76 100644 --- a/src/main/java/org/listware/core/FunctionContext.java +++ b/src/main/java/org/listware/core/FunctionContext.java @@ -3,25 +3,26 @@ package org.listware.core; import org.apache.flink.statefun.sdk.Context; -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.listware.sdk.Functions; -import org.listware.io.utils.TypedValueDeserializer; +import org.listware.core.cmdb.Cmdb.SystemKeys; import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.utils.Cmdb.Matcher; -import org.listware.core.provider.utils.Cmdb.SystemKeys; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FunctionContext { + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(FunctionContext.class); private Context flinkContext; private ObjectDocument document; private Functions.FunctionContext functionContext; - private boolean isExecutedCallback = false; + + + private class Matcher { + public static final String UUID_V4_STRING = "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}"; + public static final String NUMERIC_STRING = "\\d+"; + } public FunctionContext(Context context, ObjectDocument document, Functions.FunctionContext functionContext) { this.flinkContext = context; @@ -88,30 +89,4 @@ public class FunctionContext { public boolean isType() { return (!isRoot() && !isObjects() && !isTypes() && !isLink() && !isObject()); } - - // You can execute callback only once, getCallback() to inherit func or - // Callback() after invoke - public void callback() throws Exception { - Functions.FunctionContext callback = getCallback(); - if (callback != null) { - String namespace = callback.getFunctionType().getNamespace(); - String type = callback.getFunctionType().getType(); - FunctionType functionType = new FunctionType(namespace, type); - - LOG.info("send: " + functionType + " id " + callback.getId()); - - TypedValue typedValue = TypedValueDeserializer.fromMessageLite(callback); - ; - - flinkContext.send(functionType, callback.getId(), typedValue); - } - } - - public Functions.FunctionContext getCallback() { - if (functionContext.hasCallback() && !isExecutedCallback) { - isExecutedCallback = true; - return functionContext.getCallback(); - } - return null; - } -} +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/Module.java b/src/main/java/org/listware/core/Module.java index 88c27d8..2c9a2b0 100644 --- a/src/main/java/org/listware/core/Module.java +++ b/src/main/java/org/listware/core/Module.java @@ -8,14 +8,16 @@ import com.google.auto.service.AutoService; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.listware.core.provider.FunctionProvider; -import org.listware.core.provider.functions.Link; import org.listware.core.provider.functions.Log; -import org.listware.core.provider.functions.Object; -import org.listware.core.provider.functions.ObjectTrigger; import org.listware.core.provider.functions.Register; import org.listware.core.provider.functions.Router; -import org.listware.core.provider.functions.Type; -import org.listware.core.provider.functions.TypeTrigger; +import org.listware.core.provider.functions.link.AdvancedLink; +import org.listware.core.provider.functions.link.LinkTrigger; +import org.listware.core.provider.functions.object.Link; +import org.listware.core.provider.functions.object.Object; +import org.listware.core.provider.functions.object.ObjectTrigger; +import org.listware.core.provider.functions.object.Type; +import org.listware.core.provider.functions.object.TypeTrigger; @AutoService(StatefulFunctionModule.class) public final class Module implements StatefulFunctionModule { @@ -31,5 +33,7 @@ public final class Module implements StatefulFunctionModule { binder.bindFunctionProvider(Router.FUNCTION_TYPE, provider); binder.bindFunctionProvider(Log.FUNCTION_TYPE, provider); binder.bindFunctionProvider(Register.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(AdvancedLink.FUNCTION_TYPE, provider); + binder.bindFunctionProvider(LinkTrigger.FUNCTION_TYPE, provider); } -} \ No newline at end of file +} diff --git a/src/main/java/org/listware/core/cmdb/Cmdb.java b/src/main/java/org/listware/core/cmdb/Cmdb.java new file mode 100644 index 0000000..573de1c --- /dev/null +++ b/src/main/java/org/listware/core/cmdb/Cmdb.java @@ -0,0 +1,407 @@ +/* + * Copyright 2022 + * Listware + */ + +package org.listware.core.cmdb; + +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.core.documents.LinkDocument; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.functions.link.LinkTrigger; +import org.listware.core.provider.functions.object.ObjectTrigger; +import org.listware.core.utils.exceptions.AlreadyLinkException; +import org.listware.core.utils.exceptions.NoLinkException; +import org.listware.core.utils.exceptions.UnknownIdException; +import org.listware.io.grpc.FinderClient; +import org.listware.io.grpc.QDSLClient; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.sdk.pbcmdb.pbfinder.Finder; +import org.listware.sdk.pbcmdb.pbqdsl.QDSL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +public class Cmdb { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Cmdb.class); + + // Constant system cmdb keys + public class SystemKeys { + public static final String ROOT = "root"; + public static final String OBJECTS = "objects"; + public static final String TYPES = "types"; + } + + // Collection names + public class Collections { + public static final String SYSTEM = "system"; + public static final String TYPES = "types"; + public static final String OBJECTS = "objects"; + public static final String LINKS = "links"; + } + + // Link entries + public class LinkTypes { + public static final String TYPE = "type"; + public static final String SYSTEM = "system"; + } + + private ObjectClient objectClient = new ObjectClient(); + public LinkClient linkClient = new LinkClient(); + public QDSLClient qdslClient = new QDSLClient(); + public FinderClient finderClient = new FinderClient(); + + public void shutdown() throws InterruptedException { + objectClient.shutdown(); + linkClient.shutdown(); + qdslClient.shutdown(); + finderClient.shutdown(); + } + + // R same for types/objects/system + public ObjectDocument readDocument(String id) throws Exception { + return objectClient.readDocument(id); + } + + private ObjectDocument updateDocument(ObjectDocument document) throws Exception { + document = objectClient.updateDocument(document.getId(), document.serialize()); + LOG.info("updated " + document.getId()); + return document; + } + + // D same for types/objects/system + private void removeDocument(ObjectDocument document) throws Exception { + objectClient.removeDocument(document.getId()); + LOG.info("deleted " + document.getId()); + } + + // R links + public LinkDocument readLinkDocument(String id) throws Exception { + return linkClient.readDocument(id); + } + + public LinkDocument readLinkDocument(String from, String name) throws Exception { + Finder.Response response = finderClient.from(from, name); + if (response.getLinksCount() == 0) { + throw new NoLinkException(from, name); + } + return LinkDocument.deserialize(response.getLinks(0).getPayload()); + } + + // do not duplicate link with name + public void checkFrom(ObjectDocument parent, String name) throws Exception { + Finder.Response response = finderClient.from(parent.getId(), name); + if (response.getLinksCount() > 0) { + throw new AlreadyLinkException(parent.getId(), name); + } + } + + public LinkDocument updateLinkDocument(LinkDocument document) throws Exception { + document = linkClient.updateDocument(document.getId(), document.serialize()); + LOG.info("updated " + document.getId()); + return document; + } + + // D links + public void removeDocument(LinkDocument document) throws Exception { + linkClient.removeDocument(document.getId()); + LOG.info("deleted " + document.getId()); + } + + /*******************************************************************************************/ + // C for SYSTEM + public ObjectDocument createSystem(ObjectDocument document) throws Exception { + document = objectClient.createDocument(Collections.SYSTEM, document.serialize()); + + LOG.info("created system " + document.getId()); + + return document; + } + + public ObjectDocument createSystem(ObjectDocument parent, ObjectDocument document) throws Exception { + document = objectClient.createDocument(Collections.SYSTEM, document.serialize()); + + LOG.info("created system " + document.getId()); + + // link root -> object + createLink(parent, document, LinkTypes.SYSTEM, document.getKey()); + + return document; + } + + /*******************************************************************************************/ + + // types C + public ObjectDocument createType(ObjectDocument document) throws Exception { + document = objectClient.createDocument(Collections.TYPES, document.serialize()); + + ObjectDocument types = readDocument("system/types"); + + // link from types -> type + createLink(types, document, LinkTypes.TYPE, document.getKey()); + + LOG.info("created type " + document.getId()); + + return document; + } + + public ObjectDocument updateType(Context context, ObjectDocument document) throws Exception { + document = updateDocument(document); + + return document; + } + + public void removeType(Context context, ObjectDocument document) throws Exception { + removeDocument(document); + } + + /*******************************************************************************************/ + + // only from type + public ObjectDocument createObject(ObjectDocument type, ObjectDocument document) throws Exception { + ObjectDocument objects = readDocument("system/objects"); + + document = objectClient.createDocument(Collections.OBJECTS, document.serialize()); + + // link type -> object ($uuid) + createLink(type, document, type.getKey(), document.getKey()); + + // link objects -> object ($uuid) + createLink(objects, document, type.getKey(), document.getKey()); + + LOG.info("created object " + document.getId()); + + return document; + } + + public ObjectDocument createObject(Context context, ObjectDocument type, ObjectDocument document) throws Exception { + document = createObject(type, document); + + Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(document.getId(), Core.Method.CREATE); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + context.send(ObjectTrigger.FUNCTION_TYPE, document.getId(), typedValue); + + return document; + } + + // with parent + public ObjectDocument createObject(ObjectDocument type, ObjectDocument parent, ObjectDocument document, String name) + throws Exception { + + checkFrom(parent, name); + + document = createObject(type, document); + + // link parent -> object ($uuid) + createLink(parent, document, type.getKey(), name); + + return document; + } + + public ObjectDocument createObject(Context context, ObjectDocument type, ObjectDocument parent, + ObjectDocument document, String name) throws Exception { + + checkFrom(parent, name); + + document = createObject(context, type, document); + + // link parent -> object ($uuid) + createLink(parent, document, type.getKey(), name); + + return document; + } + + public ObjectDocument updateObject(Context context, ObjectDocument document) throws Exception { + document = updateDocument(document); + Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(document.getId(), Core.Method.UPDATE); + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + context.send(ObjectTrigger.FUNCTION_TYPE, document.getId(), typedValue); + + return document; + } + + public void removeObject(Context context, ObjectDocument document) throws Exception { + removeDocument(document); + // move trigger to link type -> objects + +// Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(document.getId(), Core.Method.DELETE); +// +// TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); +// +// context.send(ObjectTrigger.FUNCTION_TYPE, document.getId(), typedValue); + } + + // Links + + public LinkDocument createLink(ObjectDocument parent, ObjectDocument document, String type, String name) + throws Exception { + + checkFrom(parent, name); + + LinkDocument link = new LinkDocument(parent.getId(), document.getId(), type, name); + + link = linkClient.createDocument(Collections.LINKS, link.serialize()); + + LOG.info("created link" + link.getId()); + + return link; + } + + public LinkDocument createLink(ObjectDocument parent, ObjectDocument document, String type, String name, + ByteString payload) throws Exception { + + checkFrom(parent, name); + + LinkDocument link = new LinkDocument(parent.getId(), document.getId(), type, name); + link.replaceProperties(payload); + + link = linkClient.createDocument(Collections.LINKS, link.serialize()); + + LOG.info("created link" + link.getId()); + + return link; + } + + public LinkDocument createLink(Context context, ObjectDocument parent, ObjectDocument document, String type, + String name, ByteString payload) throws Exception { + + LinkDocument link = createLink(parent, document, type, name, payload); + + Functions.FunctionContext pbFunctionContext = LinkTrigger.Trigger(link.getId(), Core.Method.CREATE); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + context.send(LinkTrigger.FUNCTION_TYPE, link.getId(), typedValue); + + LOG.info("created link" + link.getId()); + + return link; + } + + public LinkDocument updateLink(Context context, LinkDocument document) throws Exception { + document = updateLinkDocument(document); + + Functions.FunctionContext pbFunctionContext = LinkTrigger.Trigger(document.getId(), Core.Method.UPDATE); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + context.send(LinkTrigger.FUNCTION_TYPE, document.getId(), typedValue); + + return document; + } + + public void bootstrap() throws Exception { + ObjectDocument root = null; + try { + root = readDocument("system/root"); + } catch (Exception ex) { + LOG.error(ex.getLocalizedMessage()); + root = new ObjectDocument(SystemKeys.ROOT); + root = createSystem(root); + } + + ObjectDocument objects = null; + try { + objects = readDocument("system/objects"); + } catch (Exception ex) { + LOG.error(ex.getLocalizedMessage()); + objects = new ObjectDocument(SystemKeys.OBJECTS); + objects = createSystem(root, objects); + } + + ObjectDocument types = null; + try { + types = readDocument("system/types"); + } catch (Exception ex) { + LOG.error(ex.getLocalizedMessage()); + types = new ObjectDocument(SystemKeys.TYPES); + types = createSystem(root, types); + } + + ObjectDocument functionContainer = null; + try { + functionContainer = readDocument("types/function-container"); + } catch (Exception ex) { + LOG.error(ex.getLocalizedMessage()); + functionContainer = new ObjectDocument("function-container"); + functionContainer = createType(functionContainer); + } + + ObjectDocument function = null; + try { + function = readDocument("types/function"); + } catch (Exception ex) { + LOG.error(ex.getLocalizedMessage()); + function = new ObjectDocument("function"); + function = createType(function); + } + + QDSL.Options options = QDSL.Options.newBuilder().build(); + + QDSL.Elements elements = qdslClient.qdsl("functions.root", options); + ObjectDocument functions = null; + if (elements.getElementsCount() == 0) { + functions = new ObjectDocument(); + functions = createObject(function, root, functions, "functions"); + } else { + functions = readDocument(elements.getElements(0).getId()); + } + + elements = qdslClient.qdsl("system.functions.root", options); + ObjectDocument system = null; + if (elements.getElementsCount() == 0) { + system = new ObjectDocument(); + system = createObject(functionContainer, functions, system, "system"); + } else { + system = readDocument(elements.getElements(0).getId()); + } + + elements = qdslClient.qdsl("types.system.functions.root", options); + ObjectDocument typesFunction = null; + if (elements.getElementsCount() == 0) { + typesFunction = new ObjectDocument(); + typesFunction = createObject(function, system, typesFunction, "types"); + } else { + typesFunction = readDocument(elements.getElements(0).getId()); + } + + elements = qdslClient.qdsl("objects.system.functions.root", options); + ObjectDocument objectsFunction = null; + if (elements.getElementsCount() == 0) { + objectsFunction = new ObjectDocument(); + objectsFunction = createObject(function, system, objectsFunction, "objects"); + } else { + objectsFunction = readDocument(elements.getElements(0).getId()); + } + + elements = qdslClient.qdsl("links.system.functions.root", options); + ObjectDocument linksFunction = null; + if (elements.getElementsCount() == 0) { + linksFunction = new ObjectDocument(); + linksFunction = createObject(function, system, linksFunction, "links"); + } else { + linksFunction = readDocument(elements.getElements(0).getId()); + } + } + + public String getTypeId(String id) throws Exception { + QDSL.Options options = QDSL.Options.newBuilder().setType(true).build(); + + String query = String.format("*[?@._id == '%s'?].objects", id); + + QDSL.Elements elements = qdslClient.qdsl(query, options); + + for (QDSL.Element element : elements.getElementsList()) { + return "types/" + element.getType(); + } + throw new UnknownIdException(id); + } + +} diff --git a/src/main/java/org/listware/core/cmdb/LinkClient.java b/src/main/java/org/listware/core/cmdb/LinkClient.java new file mode 100644 index 0000000..26e6857 --- /dev/null +++ b/src/main/java/org/listware/core/cmdb/LinkClient.java @@ -0,0 +1,76 @@ +/* + * Copyright 2022 + * Listware + */ + +package org.listware.core.cmdb; + +import org.listware.core.documents.LinkDocument; +import org.listware.core.utils.exceptions.PayloadNotFoundException; +import org.listware.core.utils.exceptions.UnknownIdException; +import org.listware.io.grpc.EdgeClient; +import org.listware.sdk.pbcmdb.Core; + +import com.google.protobuf.ByteString; + +public class LinkClient extends EdgeClient { + + public LinkDocument createDocument(String collection, ByteString payload) throws Exception { + Core.Response resp = create(collection, payload); + return LinkDocument.deserialize(resp.getPayload()); + } + + public LinkDocument readDocument(String id) throws Exception { + Parser parser = new Parser(id); + Core.Response resp = read(parser.getCollection(), parser.getKey()); + if (resp.getPayload().isEmpty()) { + throw new PayloadNotFoundException(); + } + return LinkDocument.deserialize(resp.getPayload()); + } + + public LinkDocument updateDocument(String id, ByteString payload) throws Exception { + Parser parser = new Parser(id); + Core.Response resp = update(parser.getCollection(), parser.getKey(), payload); + if (resp.getPayload().isEmpty()) { + throw new PayloadNotFoundException(); + } + return LinkDocument.deserialize(resp.getPayload()); + } + + public void removeDocument(String id) throws Exception { + Parser parser = new Parser(id); + remove(parser.getCollection(), parser.getKey()); + } + + class Parser { + private String collection; + private String key; + + public Parser(String id) throws Exception { + String[] separated = id.split("\\/"); + if (separated.length < 2) { + throw new UnknownIdException(id); + } + this.collection = separated[0]; + this.key = separated[1]; + } + + public String getCollection() { + return collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + } + +} diff --git a/src/main/java/org/listware/core/cmdb/ObjectClient.java b/src/main/java/org/listware/core/cmdb/ObjectClient.java new file mode 100644 index 0000000..8f8f75e --- /dev/null +++ b/src/main/java/org/listware/core/cmdb/ObjectClient.java @@ -0,0 +1,81 @@ +/* + * Copyright 2022 + * Listware + */ + +package org.listware.core.cmdb; + +import org.listware.core.documents.ObjectDocument; +import org.listware.core.utils.exceptions.PayloadNotFoundException; +import org.listware.core.utils.exceptions.UnknownIdException; +import org.listware.io.grpc.VertexClient; +import org.listware.sdk.pbcmdb.Core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +public class ObjectClient extends VertexClient { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(ObjectClient.class); + + public ObjectDocument createDocument(String collection, ByteString payload) throws Exception { + Core.Response resp = create(collection, payload); + return ObjectDocument.deserialize(resp.getPayload()); + } + + public ObjectDocument readDocument(String id) throws Exception { + Parser parser = new Parser(id); + Core.Response resp = read(parser.getCollection(), parser.getKey()); + if (resp.getPayload().isEmpty()) { + throw new PayloadNotFoundException(); + } + return ObjectDocument.deserialize(resp.getPayload()); + } + + public ObjectDocument updateDocument(String id, ByteString payload) throws Exception { + Parser parser = new Parser(id); + Core.Response resp = update(parser.getCollection(), parser.getKey(), payload); + if (resp.getPayload().isEmpty()) { + throw new PayloadNotFoundException(); + } + return ObjectDocument.deserialize(resp.getPayload()); + } + + public void removeDocument(String id) throws Exception { + Parser parser = new Parser(id); + remove(parser.getCollection(), parser.getKey()); + } + + class Parser { + private String collection; + private String key; + + public Parser(String id) throws Exception { + String[] separated = id.split("\\/"); + if (separated.length < 2) { + throw new UnknownIdException(id); + } + + this.collection = separated[0]; + this.key = separated[1]; + } + + public String getCollection() { + return collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + } + +} diff --git a/src/main/java/org/listware/core/cmdb/RegisterMessage.java b/src/main/java/org/listware/core/cmdb/RegisterMessage.java new file mode 100644 index 0000000..5d28446 --- /dev/null +++ b/src/main/java/org/listware/core/cmdb/RegisterMessage.java @@ -0,0 +1,107 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.cmdb; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.listware.core.provider.functions.Register; +import org.listware.sdk.pbcmdb.Core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RegisterMessage { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Register.class); + + private List types = new ArrayList<>(); + private List objects = new ArrayList<>();; + private List links = new ArrayList<>();; + + public RegisterMessage() { + // POJO + } + + public RegisterMessage(Core.RegisterMessage registerMessage) { + registerMessage.getTypeMessagesList().forEach(msg -> types.add(msg.toByteArray())); + registerMessage.getObjectMessagesList().forEach(msg -> objects.add(msg.toByteArray())); + registerMessage.getLinkMessagesList().forEach(msg -> links.add(msg.toByteArray())); + } + + public List getTypes() { + return types; + } + + public void setTypes(List types) { + this.types = types; + } + + public List getObjects() { + return objects; + } + + public void setObjects(List objects) { + this.objects = objects; + } + + public List getLinks() { + return links; + } + + public void setLinks(List links) { + this.links = links; + } + + public List listTypes() throws Exception { + List registerMessages = new ArrayList<>(); + + Iterator iterator = types.iterator(); + while (iterator.hasNext()) { + byte[] data = iterator.next(); + Core.RegisterTypeMessage registerMessage = Core.RegisterTypeMessage.parseFrom(data); + registerMessages.add(registerMessage); + iterator.remove(); + if (!registerMessage.getAsync()) { + return registerMessages; + } + } + + return registerMessages; + } + + public List listObjects() throws Exception { + List registerMessages = new ArrayList<>(); + + Iterator iterator = objects.iterator(); + while (iterator.hasNext()) { + byte[] data = iterator.next(); + Core.RegisterObjectMessage registerMessage = Core.RegisterObjectMessage.parseFrom(data); + registerMessages.add(registerMessage); + iterator.remove(); + if (!registerMessage.getAsync()) { + return registerMessages; + } + } + + return registerMessages; + } + + public List listLinks() throws Exception { + List registerMessages = new ArrayList<>(); + + Iterator iterator = links.iterator(); + while (iterator.hasNext()) { + byte[] data = iterator.next(); + Core.RegisterLinkMessage registerMessage = Core.RegisterLinkMessage.parseFrom(data); + registerMessages.add(registerMessage); + iterator.remove(); + if (!registerMessage.getAsync()) { + return registerMessages; + } + } + + return registerMessages; + } + +} diff --git a/src/main/java/org/listware/core/cmdb/Trigger.java b/src/main/java/org/listware/core/cmdb/Trigger.java new file mode 100644 index 0000000..399ad3a --- /dev/null +++ b/src/main/java/org/listware/core/cmdb/Trigger.java @@ -0,0 +1,209 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.cmdb; + +import java.util.HashMap; +import java.util.Map; + +import org.listware.core.documents.LinkDocument; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.utils.exceptions.AlreadyTriggerException; +import org.listware.core.utils.exceptions.TriggerNotFoundException; +import org.listware.sdk.pbcmdb.Core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +public class Trigger { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Trigger.class); + + private static String triggersKey = "triggers"; + + public static final String CREATE = "create"; + public static final String UPDATE = "update"; + public static final String DELETE = "delete"; + + public Trigger() { + // POJO + } + + public Trigger(Core.Trigger trigger) { + this(); + this.namespace = trigger.getFunctionType().getNamespace(); + this.type = trigger.getFunctionType().getType(); + } + + @JsonProperty("namespace") + private String namespace; + @JsonProperty("type") + private String type; + + /** + * + * @return The namespace + */ + @JsonProperty("namespace") + public String getNamespace() { + return namespace; + } + + /** + * + * @return The type + */ + @JsonProperty("type") + public String getType() { + return type; + } + + public static Map getByType(ObjectDocument baseDocument, String type) throws Exception { + Map properties = baseDocument.getProperties(); + if (!properties.containsKey(triggersKey)) { + throw new TriggerNotFoundException(); + } + java.lang.Object object = properties.get(triggersKey); + Map> triggersMap = deserialize(object); + + if (!triggersMap.containsKey(type)) { + throw new TriggerNotFoundException(); + } + + return triggersMap.get(type); + } + + public static ObjectDocument add(ObjectDocument document, Core.Trigger trigger) throws Exception { + String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); + + Map properties = document.getProperties(); + + Map> triggersMap; + if (properties.containsKey(triggersKey)) { + java.lang.Object object = properties.get(triggersKey); + triggersMap = deserialize(object); + } else { + triggersMap = new HashMap>(); + } + + Map triggers; + if (triggersMap.containsKey(trigger.getType())) { + triggers = triggersMap.get(trigger.getType()); + } else { + triggers = new HashMap(); + } + + if (triggers.containsKey(key)) { + throw new AlreadyTriggerException(); + } + + triggers.put(key, new Trigger(trigger)); + + triggersMap.put(trigger.getType(), triggers); + + properties.put(triggersKey, triggersMap); + + document.updateProperties(properties); + + return document; + } + + public static LinkDocument add(LinkDocument document, Core.Trigger trigger) throws Exception { + String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); + + Map properties = document.getProperties(); + + Map> triggersMap; + if (properties.containsKey(triggersKey)) { + java.lang.Object object = properties.get(triggersKey); + triggersMap = deserialize(object); + } else { + triggersMap = new HashMap>(); + } + + Map triggers; + if (triggersMap.containsKey(trigger.getType())) { + triggers = triggersMap.get(trigger.getType()); + } else { + triggers = new HashMap(); + } + + if (triggers.containsKey(key)) { + throw new AlreadyTriggerException(); + } + + triggers.put(key, new Trigger(trigger)); + + triggersMap.put(trigger.getType(), triggers); + + properties.put(triggersKey, triggersMap); + + document.updateProperties(properties); + + return document; + } + + public static ObjectDocument delete(ObjectDocument document, Core.Trigger trigger) throws Exception { + Map properties = document.getProperties(); + java.lang.Object object = properties.get(triggersKey); + + // triggers map + Map> triggersMap = deserialize(object); + + // type map + Map triggers = triggersMap.get(trigger.getType()); + + String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); + + if (!triggers.containsKey(key)) { + return document; + } + + triggers.remove(key); + + triggersMap.put(trigger.getType(), triggers); + + properties.put(triggersKey, triggersMap); + + document.updateProperties(properties); + return document; + } + + public static LinkDocument delete(LinkDocument document, Core.Trigger trigger) throws Exception { + Map properties = document.getProperties(); + java.lang.Object object = properties.get(triggersKey); + + // triggers map + Map> triggersMap = deserialize(object); + + // type map + Map triggers = triggersMap.get(trigger.getType()); + + String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); + + if (!triggers.containsKey(key)) { + return document; + } + + triggers.remove(key); + + triggersMap.put(trigger.getType(), triggers); + + properties.put(triggersKey, triggersMap); + + document.updateProperties(properties); + return document; + } + + public static Map> deserialize(Object from) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.FAIL_ON_SELF_REFERENCES, false); + + TypeReference>> ref = new TypeReference>>() { + }; + return mapper.convertValue(from, ref); + } +} diff --git a/src/main/java/org/listware/core/documents/LinkDocument.java b/src/main/java/org/listware/core/documents/LinkDocument.java index d26fa26..0e7b662 100644 --- a/src/main/java/org/listware/core/documents/LinkDocument.java +++ b/src/main/java/org/listware/core/documents/LinkDocument.java @@ -7,7 +7,7 @@ import java.util.Map; import org.listware.core.documents.entity.DocumentFields; import org.listware.core.documents.entity.Name; import org.listware.core.documents.entity.Type; -import org.listware.core.provider.utils.exceptions.PayloadNotFoundException; +import org.listware.core.utils.exceptions.PayloadNotFoundException; import com.arangodb.entity.From; import com.arangodb.entity.To; @@ -47,30 +47,15 @@ public class LinkDocument extends ObjectDocument { this.to = to; } - public LinkDocument(final String from, final String to, final String name, final String type) { + public LinkDocument(final String from, final String to, final String type, final String name) { this(from, to); this.name = name; this.type = type; } public LinkDocument(final Map properties) { - super(properties); - final Object tmpFrom = properties.remove(DocumentFields.FROM); - if (tmpFrom != null) { - from = tmpFrom.toString(); - } - final Object tmpTo = properties.remove(DocumentFields.TO); - if (tmpTo != null) { - to = tmpTo.toString(); - } - final Object tmpName = properties.remove(DocumentFields.NAME); - if (tmpName != null) { - name = tmpName.toString(); - } - final Object tmpType = properties.remove(DocumentFields.TYPE); - if (tmpType != null) { - type = tmpType.toString(); - } + super(); + replaceProperties(properties); } public String getFrom() { @@ -101,6 +86,27 @@ public class LinkDocument extends ObjectDocument { return type; } + @Override + public void replaceProperties(final Map properties) { + final Object tmpFrom = properties.remove(DocumentFields.FROM); + if (tmpFrom != null) { + from = tmpFrom.toString(); + } + final Object tmpTo = properties.remove(DocumentFields.TO); + if (tmpTo != null) { + to = tmpTo.toString(); + } + final Object tmpName = properties.remove(DocumentFields.NAME); + if (tmpName != null) { + name = tmpName.toString(); + } + final Object tmpType = properties.remove(DocumentFields.TYPE); + if (tmpType != null) { + type = tmpType.toString(); + } + super.replaceProperties(properties); + } + public void setType(String type) { this.type = type; } @@ -108,7 +114,7 @@ public class LinkDocument extends ObjectDocument { @Override public String toString() { return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key - + ", from=" + from + ", to=" + to + ", properties=" + properties + "]"; + + ", from=" + from + ", to=" + to + ", properties=" + this.getProperties() + "]"; } @Override diff --git a/src/main/java/org/listware/core/documents/ObjectDocument.java b/src/main/java/org/listware/core/documents/ObjectDocument.java index 2bc6fca..27310df 100644 --- a/src/main/java/org/listware/core/documents/ObjectDocument.java +++ b/src/main/java/org/listware/core/documents/ObjectDocument.java @@ -8,11 +8,16 @@ import java.util.Map; import org.listware.core.documents.entity.DocumentFields; import org.listware.core.documents.entity.Meta; -import org.listware.core.provider.utils.exceptions.PayloadNotFoundException; +import org.listware.core.utils.exceptions.PayloadNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.arangodb.entity.Id; import com.arangodb.entity.Key; import com.arangodb.entity.Rev; +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -20,6 +25,9 @@ import com.github.fge.jackson.JsonLoader; import com.google.protobuf.ByteString; public class ObjectDocument implements Serializable { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(ObjectDocument.class); + private static final long serialVersionUID = -1824742667228719116L; @Id @@ -29,9 +37,9 @@ public class ObjectDocument implements Serializable { @Rev protected String revision; @Meta - protected MetaDocument meta; - - protected Map properties; + private MetaDocument meta; + @JsonIgnore + private Map properties; public ObjectDocument() { super(); @@ -44,26 +52,9 @@ public class ObjectDocument implements Serializable { this.key = key; } - @SuppressWarnings("unchecked") public ObjectDocument(final Map properties) { this(); - final Object tmpId = properties.remove(DocumentFields.ID); - if (tmpId != null) { - id = tmpId.toString(); - } - final Object tmpKey = properties.remove(DocumentFields.KEY); - if (tmpKey != null) { - key = tmpKey.toString(); - } - final Object tmpRev = properties.remove(DocumentFields.REV); - if (tmpRev != null) { - revision = tmpRev.toString(); - } - final Object tmpMeta = properties.remove(DocumentFields.META); - if (tmpMeta != null) { - meta = new MetaDocument((Map) tmpMeta); - } - this.properties = properties; + replaceProperties(properties); } public String getId() { @@ -98,21 +89,63 @@ public class ObjectDocument implements Serializable { this.meta = meta; } + @JsonAnyGetter public Map getProperties() { return properties; } + @JsonAnySetter public void setProperties(final Map properties) { this.properties = properties; } + @SuppressWarnings("unchecked") + public void replaceProperties(final Map properties) { + final Object tmpId = properties.remove(DocumentFields.ID); + if (tmpId != null) { + id = tmpId.toString(); + } + final Object tmpKey = properties.remove(DocumentFields.KEY); + if (tmpKey != null) { + key = tmpKey.toString(); + } + final Object tmpRev = properties.remove(DocumentFields.REV); + if (tmpRev != null) { + revision = tmpRev.toString(); + } + final Object tmpMeta = properties.remove(DocumentFields.META); + if (tmpMeta != null) { + meta = new MetaDocument((Map) tmpMeta); + } + this.properties = properties; + meta.update(); + } + + public void replaceProperties(ByteString payload) throws Exception { + if (payload.isEmpty()) { + throw new PayloadNotFoundException(); + } + + JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8()); + + ObjectMapper mapper = new ObjectMapper(); + + TypeReference> ref = new TypeReference>() { + }; + + Map values = mapper.convertValue(jsonNode, ref); + replaceProperties(values); + } + public void addAttribute(final String key, final Object value) { properties.put(key, value); + meta.update(); } public void updateAttribute(final String key, final Object value) { if (properties.containsKey(key)) { properties.put(key, value); + meta.update(); } } @@ -120,6 +153,15 @@ public class ObjectDocument implements Serializable { return properties.get(key); } + public void updateProperties(final Map properties) { + properties.remove(DocumentFields.ID); + properties.remove(DocumentFields.KEY); + properties.remove(DocumentFields.REV); + properties.remove(DocumentFields.META); + this.properties = properties; + meta.update(); + } + @Override public String toString() { return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key @@ -183,11 +225,12 @@ public class ObjectDocument implements Serializable { } else return revision.equals(other.revision); } - - public void updateMeta() { - meta.update(); - } + public ByteString serialize() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + byte[] values = mapper.writeValueAsBytes(this); + return ByteString.copyFrom(values); + } public static ObjectDocument deserialize(ByteString payload) throws Exception { if (payload.isEmpty()) { @@ -195,11 +238,14 @@ public class ObjectDocument implements Serializable { } JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8()); + ObjectMapper mapper = new ObjectMapper(); TypeReference> ref = new TypeReference>() { }; + Map values = mapper.convertValue(jsonNode, ref); + return new ObjectDocument(values); } } diff --git a/src/main/java/org/listware/core/documents/entity/Created.java b/src/main/java/org/listware/core/documents/entity/Created.java index 71f9901..e1c98fe 100644 --- a/src/main/java/org/listware/core/documents/entity/Created.java +++ b/src/main/java/org/listware/core/documents/entity/Created.java @@ -18,4 +18,4 @@ import com.fasterxml.jackson.annotation.JsonProperty; @JsonProperty(DocumentFields.CREATED) @JsonInclude(JsonInclude.Include.NON_NULL) public @interface Created { -} \ No newline at end of file +} diff --git a/src/main/java/org/listware/core/documents/entity/DocumentFields.java b/src/main/java/org/listware/core/documents/entity/DocumentFields.java index 53a17af..e07ec34 100644 --- a/src/main/java/org/listware/core/documents/entity/DocumentFields.java +++ b/src/main/java/org/listware/core/documents/entity/DocumentFields.java @@ -12,6 +12,7 @@ public class DocumentFields { public static final String REV = "_rev"; public static final String FROM = "_from"; public static final String TO = "_to"; + public static final String NAME = "_name"; public static final String TYPE = "_type"; public static final String META = "_meta"; diff --git a/src/main/java/org/listware/core/documents/entity/Updated.java b/src/main/java/org/listware/core/documents/entity/Updated.java index 0fd5223..b61ac74 100644 --- a/src/main/java/org/listware/core/documents/entity/Updated.java +++ b/src/main/java/org/listware/core/documents/entity/Updated.java @@ -18,4 +18,4 @@ import com.fasterxml.jackson.annotation.JsonProperty; @JsonProperty(DocumentFields.UPDATED) @JsonInclude(JsonInclude.Include.NON_NULL) public @interface Updated { -} \ No newline at end of file +} diff --git a/src/main/java/org/listware/core/provider/FunctionProvider.java b/src/main/java/org/listware/core/provider/FunctionProvider.java index d9a4e03..ebda790 100644 --- a/src/main/java/org/listware/core/provider/FunctionProvider.java +++ b/src/main/java/org/listware/core/provider/FunctionProvider.java @@ -2,106 +2,52 @@ package org.listware.core.provider; -import java.util.Arrays; -import java.util.Collection; - -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; - import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.StatefulFunctionProvider; -import org.listware.io.utils.Constants.Cmdb; -import org.listware.io.utils.QDSLClient; -import org.listware.sdk.pbcmdb.pbqdsl.QDSL; -import org.listware.core.documents.LinkDocument; -import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.functions.Link; +import org.listware.core.cmdb.Cmdb; import org.listware.core.provider.functions.Log; -import org.listware.core.provider.functions.Object; -import org.listware.core.provider.functions.ObjectTrigger; import org.listware.core.provider.functions.Register; import org.listware.core.provider.functions.Router; -import org.listware.core.provider.functions.Type; -import org.listware.core.provider.functions.TypeTrigger; -import org.listware.core.provider.utils.Cmdb.Collections; -import org.listware.core.provider.utils.Cmdb.LinkTypes; -import org.listware.core.provider.utils.Cmdb.SystemKeys; +import org.listware.core.provider.functions.link.AdvancedLink; +import org.listware.core.provider.functions.link.LinkTrigger; +import org.listware.core.provider.functions.object.Link; +import org.listware.core.provider.functions.object.Object; +import org.listware.core.provider.functions.object.ObjectTrigger; +import org.listware.core.provider.functions.object.Type; +import org.listware.core.provider.functions.object.TypeTrigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.arangodb.ArangoCollection; -import com.arangodb.ArangoDB; -import com.arangodb.ArangoDatabase; -import com.arangodb.ArangoGraph; -import com.arangodb.DbName; -import com.arangodb.entity.CollectionType; -import com.arangodb.entity.DocumentCreateEntity; -import com.arangodb.entity.EdgeDefinition; -import com.arangodb.entity.KeyType; -import com.arangodb.mapping.ArangoJack; -import com.arangodb.model.CollectionCreateOptions; -import com.arangodb.model.DocumentCreateOptions; - public class FunctionProvider implements StatefulFunctionProvider { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(FunctionProvider.class); - // graph name - private static final String SYSTEM_GRAPH = "system"; - - // system function types - private static final String SYSTEM_TYPE = "system"; - private static final String FUNCTION_CONTAINER_TYPE = "function-container"; - private static final String FUNCTION_TYPE = "function"; - private static final String FUNCTIONS_LINK_NAME = "functions"; - private static final String SYSTEM_LINK_NAME = "system"; - private static final String TYPES_FUNCTION_LINK_NAME = "types"; - private static final String OBJECTS_FUNCTION_LINK_NAME = "objects"; - private static final String LINKS_FUNCTION_LINK_NAME = "links"; - private static final String ROUTER_FUNCTION_LINK_NAME = "router"; - - private static final DbName DB_NAME = DbName.of(Cmdb.DBNAME); - - private QDSLClient client = new QDSLClient(); private Log log = new Log(); private Register register = new Register(); - - private ArangoGraph graph; - - private Object object; - private ObjectTrigger objectTrigger; - private Type type; - private TypeTrigger typeTrigger; - private Link link; - private Router qdsl; - - private ArangoCollection system, objects, types, links; + private Object object = new Object(); + private ObjectTrigger objectTrigger = new ObjectTrigger(); + private Type type = new Type(); + private TypeTrigger typeTrigger = new TypeTrigger(); + private Link link = new Link(); + private AdvancedLink advancedlink = new AdvancedLink(); + private LinkTrigger linkTrigger = new LinkTrigger(); + private Router router = new Router(); public FunctionProvider() { - graph = bootstrap(); - - qdsl = new Router(client); - type = new Type(graph); - typeTrigger = new TypeTrigger(graph); - object = new Object(graph); - objectTrigger = new ObjectTrigger(graph); - link = new Link(graph); + try { + Cmdb cmdb = new Cmdb(); + cmdb.bootstrap(); + cmdb.shutdown(); + } catch (Exception e) { + LOG.error(e.getLocalizedMessage()); + } } @Override public StatefulFunction functionOfType(FunctionType functionType) { if (functionType.equals(Router.FUNCTION_TYPE)) { - return qdsl; + return router; } if (functionType.equals(Type.FUNCTION_TYPE)) { @@ -130,174 +76,14 @@ public class FunctionProvider implements StatefulFunctionProvider { return register; } + if (functionType.equals(AdvancedLink.FUNCTION_TYPE)) { + return advancedlink; + } + + if (functionType.equals(LinkTrigger.FUNCTION_TYPE)) { + return linkTrigger; + } + return null; } - - private ArangoGraph bootstrap() { - // TODO db and collections bootstrap - ArangoDB arango = new ArangoDB.Builder().host(Cmdb.ADDR, Cmdb.PORT) - // .useSsl(true).sslContext(sslContext) - .serializer(new ArangoJack()).user(Cmdb.USER).password(Cmdb.PASSWORD).build(); - - ArangoDatabase db = arango.db(DB_NAME); - if (!db.exists()) { - db.create(); - } - - system = db.collection(Collections.SYSTEM); - if (!system.exists()) { - // TODO set 'isSystem' for system - CollectionCreateOptions opts = new CollectionCreateOptions().isSystem(true); - system.create(opts); - } - - types = db.collection(Collections.TYPES); - if (!types.exists()) { - types.create(); - } - - objects = db.collection(Collections.OBJECTS); - if (!objects.exists()) { - // TODO set 'uuid' for objects - CollectionCreateOptions opts = new CollectionCreateOptions().keyOptions(false, KeyType.uuid, null, null); - objects.create(opts); - } - - links = db.collection(Collections.LINKS); - if (!links.exists()) { - // TODO set 'edge' for links - CollectionCreateOptions ops = new CollectionCreateOptions().type(CollectionType.EDGES); - links.create(ops); - } - - // TODO create 'root' - if (!system.documentExists(SystemKeys.ROOT)) { - system.insertDocument(new ObjectDocument(SystemKeys.ROOT)); - } - - ObjectDocument root = system.getDocument(SystemKeys.ROOT, ObjectDocument.class); - - // TODO create 'objects' - if (!system.documentExists(SystemKeys.OBJECTS)) { - insertSystem(root, SystemKeys.OBJECTS); - } - - ObjectDocument objectsBaseDocument = system.getDocument(SystemKeys.OBJECTS, ObjectDocument.class); - - // TODO create 'types' - if (!system.documentExists(SystemKeys.TYPES)) { - insertSystem(root, SystemKeys.TYPES); - } - - ObjectDocument typesBaseDocument = system.getDocument(SystemKeys.TYPES, ObjectDocument.class); - - // TODO create 'function-container' - if (!types.documentExists(FUNCTION_CONTAINER_TYPE)) { - insertType(typesBaseDocument, FUNCTION_CONTAINER_TYPE); - } - - // TODO create 'function' - if (!types.documentExists(FUNCTION_TYPE)) { - insertType(typesBaseDocument, FUNCTION_TYPE); - } - - QDSL.Options options = QDSL.Options.newBuilder().build(); - - // TODO create 'functions.root' from type 'function-container' - QDSL.Elements elements = client.qdsl("functions.root", options); - if (elements.getElementsCount() == 0) { - - ObjectDocument typeBaseDocument = types.getDocument(FUNCTION_CONTAINER_TYPE, ObjectDocument.class); - - ObjectDocument functionsBaseDocument = insertObject(typeBaseDocument, objectsBaseDocument, root, - FUNCTIONS_LINK_NAME); - - ObjectDocument systemBaseDocument = insertObject(typeBaseDocument, objectsBaseDocument, - functionsBaseDocument, SYSTEM_LINK_NAME); - - typeBaseDocument = types.getDocument(FUNCTION_TYPE, ObjectDocument.class); - insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, TYPES_FUNCTION_LINK_NAME); - insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, OBJECTS_FUNCTION_LINK_NAME); - insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, LINKS_FUNCTION_LINK_NAME); - insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, ROUTER_FUNCTION_LINK_NAME); - } - - // TODO graph bootstrap - ArangoGraph graph = db.graph(SYSTEM_GRAPH); - if (!graph.exists()) { - // TODO one edge of 1 links collection - EdgeDefinition EdgeDefinition = new EdgeDefinition().collection(Collections.LINKS) - .from(Collections.SYSTEM, Collections.TYPES, Collections.OBJECTS) - .to(Collections.TYPES, Collections.OBJECTS); - - Collection edgeDefinitions = Arrays.asList(EdgeDefinition); - db.createGraph(SYSTEM_GRAPH, edgeDefinitions); - } - return graph; - } - - private ObjectDocument insertObject(ObjectDocument type, ObjectDocument object, ObjectDocument parent, - String name) { - // insert to 'objects' - DocumentCreateEntity documentCreateEntity = objects.insertDocument(new ObjectDocument(), - new DocumentCreateOptions().returnNew(true)); - - String id = documentCreateEntity.getId(); - String key = documentCreateEntity.getKey(); - - // Link type -> object ($uuid) - insertLink(type.getId(), id, key, type.getKey()); - - // Link objects -> object ($uuid) - insertLink(object.getId(), id, key, type.getKey()); - - // Link parent -> object ('name') - insertLink(parent.getId(), id, name, type.getKey()); - - return documentCreateEntity.getNew(); - } - - private void insertType(ObjectDocument typesBaseDocument, String key) { - DocumentCreateEntity documentCreateEntity = types.insertDocument(new ObjectDocument(key)); - insertLink(typesBaseDocument.getId(), documentCreateEntity.getId(), key, LinkTypes.TYPE); - } - - private void insertSystem(ObjectDocument root, String key) { - DocumentCreateEntity documentCreateEntity = system.insertDocument(new ObjectDocument(key)); - insertLink(root.getId(), documentCreateEntity.getId(), key, SYSTEM_TYPE); - } - - private void insertLink(String from, String to, String name, String type) { - LinkDocument baseEdgeDocument = new LinkDocument(from, to, name, type); - links.insertDocument(baseEdgeDocument); - } - - SSLContext createSSLContext() throws NoSuchAlgorithmException, KeyManagementException { - final SSLContext sslContext = SSLContext.getInstance("TLS"); - - sslContext.init(null, new TrustManager[] { new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - @Override - public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - } }, null); - - HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory()); - HttpsURLConnection.setDefaultHostnameVerifier(new HostnameVerifier() { - - public boolean verify(String hostname, SSLSession session) { - return true; - } - }); - return sslContext; - } - } diff --git a/src/main/java/org/listware/core/provider/functions/Arango.java b/src/main/java/org/listware/core/provider/functions/Arango.java deleted file mode 100644 index 54382f2..0000000 --- a/src/main/java/org/listware/core/provider/functions/Arango.java +++ /dev/null @@ -1,174 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.functions; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.flink.statefun.sdk.Context; - -import com.arangodb.ArangoCursor; -import com.arangodb.ArangoEdgeCollection; -import com.arangodb.ArangoGraph; -import com.arangodb.ArangoVertexCollection; -import com.arangodb.entity.EdgeEntity; -import com.google.protobuf.ByteString; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.listware.io.utils.QDSLClient; -import org.listware.io.utils.VertexClient; -import org.listware.sdk.Functions; -import org.listware.sdk.pbcmdb.Core; -import org.listware.core.FunctionContext; -import org.listware.core.documents.LinkDocument; -import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.utils.Cmdb.Collections; -import org.listware.core.provider.utils.Cmdb.Matcher; -import org.listware.core.provider.utils.Cmdb.SystemKeys; -import org.listware.core.provider.utils.exceptions.NoLinkException; - -/** - * Middleware for CUD functions - ** - */ -public class Arango extends Base { - private static final Logger LOG = LoggerFactory.getLogger(Arango.class); - - private VertexClient vertexClient = new VertexClient(); - protected QDSLClient client = new QDSLClient(); - - protected ArangoGraph graph; - protected ArangoVertexCollection system, types, objects; - protected ArangoEdgeCollection links; - - public Arango(ArangoGraph graph) { - this.graph = graph; - - system = graph.vertexCollection(Collections.SYSTEM); - types = graph.vertexCollection(Collections.TYPES); - objects = graph.vertexCollection(Collections.OBJECTS); - links = graph.edgeCollection(Collections.LINKS); - } - - @Override - public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception { - String id = context.self().id(); - ObjectDocument baseDocument = readContext(id); - - FunctionContext functionContext = new FunctionContext(context, baseDocument, pbFunctionContext); - - invoke(functionContext); - - functionContext.callback(); - } - - public void invoke(FunctionContext functionContext) throws Exception { - - } - - protected ObjectDocument readContext(String key) throws Exception { - if (key.equals(SystemKeys.ROOT) || key.equals(SystemKeys.OBJECTS) || key.equals(SystemKeys.TYPES)) { - - Core.Response resp = vertexClient.read(key, Collections.SYSTEM); - - return ObjectDocument.deserialize(resp.getPayload()); - } else if (key.matches(Matcher.UUID_V4_STRING)) { - Core.Response resp = vertexClient.read(key, Collections.OBJECTS); - - return ObjectDocument.deserialize(resp.getPayload()); - } else if (key.matches(Matcher.NUMERIC_STRING)) { - Core.Response resp = vertexClient.read(key, Collections.LINKS); - - return LinkDocument.deserialize(resp.getPayload()); - } else { - Core.Response resp = vertexClient.read(key, Collections.TYPES); - - return ObjectDocument.deserialize(resp.getPayload()); - } - } - -// *[?$._from == "%s" && $._name == "%s"?] - - protected LinkDocument findByFromName(String from, String name) throws NoLinkException { -// QDSL.Options options = QDSL.Options.newBuilder().setObject(true).build(); -// -// String query = String.format("%s.objects", functionContext.Context().self().id()); -// -// QDSL.Elements elements = client.qdsl(query, options); -// - - String query = "FOR t IN links FILTER t._name == @name && t._from == @from RETURN t"; - - Map bindVars = new HashMap(); - bindVars.put("name", name); - bindVars.put("from", from); - ArangoCursor cursor = graph.db().query(query, bindVars, LinkDocument.class); - - if (cursor.hasNext()) { - return cursor.next(); - } - - throw new NoLinkException(from, name); - } - - private void insertLink(LinkDocument linkDocument) throws Exception { - EdgeEntity edgeEntity = links.insertEdge(linkDocument); - - LOG.info("created " + edgeEntity.getId()); - -// pbqdsl.Options options = pbqdsl.Options.newBuilder().setType(true).build(); -// -// String query = String.format("*[?$._to == '%s'?].objects", linkDocument.getFrom()); -// -// pbqdsl.Elements elements = client.qdsl(query, options); -// -// if (elements.getElementsCount() > 0) { -// String type = elements.getElements(0).getType(); -// LOG.info("insert: from " + type); -// } -// -// query = String.format("*[?$._to == '%s'?].objects", linkDocument.getTo()); -// -// elements = client.qdsl(query, options); -// -// if (elements.getElementsCount() > 0) { -// String type = elements.getElements(0).getType(); -// LOG.info("insert: to " + type); -// } - - } - - protected void insertLink(String from, String to, String name, String type) throws Exception { - LinkDocument linkDocument = new LinkDocument(from, to, name, type); - insertLink(linkDocument); - } - - protected void insertLink(String from, String to, String name, String type, ByteString payload) throws Exception { - LinkDocument linkDocument = LinkDocument.deserialize(payload); - linkDocument.setFrom(from); - linkDocument.setTo(to); - linkDocument.setName(name); - linkDocument.setType(type); - insertLink(linkDocument); - } - - /** - * Exec function - ** - * @param id string - * @param namespace string - * @param type string - * @param method Method - */ - public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) { - Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type) - .build(); - - Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build(); - - Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) - .setId(id).setValue(typeMessage.toByteString()); - return builder.build(); - } -} \ No newline at end of file diff --git a/src/main/java/org/listware/core/provider/functions/Base.java b/src/main/java/org/listware/core/provider/functions/Base.java index e66cb77..6ef1e36 100644 --- a/src/main/java/org/listware/core/provider/functions/Base.java +++ b/src/main/java/org/listware/core/provider/functions/Base.java @@ -5,63 +5,65 @@ package org.listware.core.provider.functions; import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; -import org.listware.io.functions.egress.Egress; +import org.listware.core.FunctionContext; +import org.listware.core.cmdb.Cmdb; import org.listware.sdk.Functions; +import org.listware.sdk.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Base implements StatefulFunction { +public class Base extends Sync implements StatefulFunction { private static final Logger LOG = LoggerFactory.getLogger(Base.class); + protected Cmdb cmdb = new Cmdb(); + + public Base(String groupID, String topic) { + super(groupID, topic); + } + @Override public void invoke(Context context, java.lang.Object input) { long startTime = System.currentTimeMillis(); - LOG.info(context.self().toString()); - - if (!(input instanceof TypedValue)) { - LOG.error("unknown message received: " + input); - return; - } - - Functions.FunctionResult.Builder functionResultBuilder = Functions.FunctionResult.newBuilder(); - - TypedValue typedValue = (TypedValue) input; - - Functions.FunctionContext functionContext = null; - try { - functionContext = Functions.FunctionContext.parseFrom(typedValue.getValue()); + LOG.info(context.self().toString()); - invoke(context, functionContext); + if (input instanceof TypedValue) { + TypedValue typedValue = (TypedValue) input; - functionResultBuilder.setComplete(true); - } catch (Exception e) { - LOG.error(e.getLocalizedMessage()); - functionResultBuilder.setError(e.getLocalizedMessage()); - } finally { - if (functionContext.hasReplyEgress()) { - Functions.ReplyEgress replyEgress = functionContext.getReplyEgress(); + Functions.FunctionContext functionContext = Functions.FunctionContext.parseFrom(typedValue.getValue()); - Functions.FunctionResult functionResult = functionResultBuilder.setReplyEgress(replyEgress).build(); + onInit(context, functionContext); + invoke(context, functionContext); - TypedValue newTypedValue = TypedValue.newBuilder().setValue(functionResult.toByteString()) - .setHasValue(true).build(); + } else if (input instanceof Result.FunctionResult) { + Result.FunctionResult functionResult = (Result.FunctionResult) input; + onResult(context, functionResult); - LOG.info("ReplyEgress result: " + replyEgress.getId()); - - context.send(Egress.EGRESS, newTypedValue); + } else { + LOG.error(context.self() + " unknown message received: " + input); } - long endTime = System.currentTimeMillis(); + } catch (Exception e) { + LOG.error(context.self() + " " + e.getLocalizedMessage()); + onException(context, e.getLocalizedMessage()); + } finally { + try { + onReply(context); - LOG.info(context.self().type() + ": took " + (endTime - startTime) + " milliseconds"); + long endTime = System.currentTimeMillis(); + + LOG.info(context.self() + ": took " + (endTime - startTime) + " milliseconds"); + } catch (Exception e) { + LOG.error(context.self() + ": " + e.getLocalizedMessage()); + } } - } public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { + } + public void invoke(FunctionContext functionContext) throws Exception { } } diff --git a/src/main/java/org/listware/core/provider/functions/Link.java b/src/main/java/org/listware/core/provider/functions/Link.java deleted file mode 100644 index 09bfc54..0000000 --- a/src/main/java/org/listware/core/provider/functions/Link.java +++ /dev/null @@ -1,139 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.functions; - -import javax.annotation.Nullable; - -import org.apache.flink.statefun.sdk.FunctionType; - -import com.arangodb.ArangoGraph; -import com.arangodb.entity.EdgeUpdateEntity; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.listware.io.utils.Constants.Namespaces; -import org.listware.sdk.Functions; -import org.listware.sdk.pbcmdb.Core; -import org.listware.core.FunctionContext; -import org.listware.core.documents.LinkDocument; -import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.utils.exceptions.AlreadyLinkException; -import org.listware.core.provider.utils.exceptions.NoLinkException; -import org.listware.core.provider.utils.exceptions.UnknownIdException; -import org.listware.core.provider.utils.exceptions.UnknownMethodException; - -/** - * Link CUD arangodb service for links - ** - */ -public class Link extends Arango { - private static final Logger LOG = LoggerFactory.getLogger(Link.class); - - public static final String TYPE = "links.system.functions.root"; - - public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - - public Link(ArangoGraph graph) { - super(graph); - } - - @Override - public void invoke(FunctionContext functionContext) throws Exception { - Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue()); - - switch (message.getMethod()) { - case CREATE: - create(functionContext, message); - break; - - case UPDATE: - update(functionContext, message); - break; - - case DELETE: - delete(functionContext, message); - break; - - default: - throw new UnknownMethodException(message.getMethod()); - } - } - - private void create(FunctionContext functionContext, Core.LinkMessage message) throws Exception { - // do not create link from link - if (functionContext.isLink()) { - throw new UnknownIdException(functionContext.getFlinkContext().self().id()); - } - - try { - // or by from+name - findByFromName(functionContext.getDocument().getId(), message.getName()); - throw new AlreadyLinkException(functionContext.getDocument().getId(), message.getName()); - } catch (NoLinkException ignored) { - } - - ObjectDocument baseDocument = readContext(message.getTo()); - - // FIXME _type? - // types -> type == `Cmdb.TYPE_TYPE` - // type -> object == type - // object1 -> object2 == type of object2 - insertLink(functionContext.getDocument().getId(), baseDocument.getId(), message.getName(), message.getType(), - message.getPayload()); - } - - private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception { - LinkDocument document = null; - - if (!functionContext.isLink()) { - document = findByFromName(functionContext.getDocument().getId(), message.getName()); - } else { - document = (LinkDocument) functionContext.getDocument(); - } - - LinkDocument newDocument = LinkDocument.deserialize(message.getPayload()); - - document.setProperties(newDocument.getProperties()); - - document.updateMeta(); - - EdgeUpdateEntity edgeUpdateEntity = links.replaceEdge(document.getKey(), document); - LOG.info("updated link " + edgeUpdateEntity.getId()); - } - - private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception { - ObjectDocument prevLinkDocument = functionContext.getDocument(); - - if (!functionContext.isLink()) { - prevLinkDocument = findByFromName(functionContext.getDocument().getId(), message.getName()); - } - - links.deleteEdge(prevLinkDocument.getKey()); - LOG.info("deleted link " + prevLinkDocument.getId()); - } - - /** - * CreateLink create link 'from' -> 'to' with 'name' - ** - * @param from will be ('root', 'node', '17136214') - * @param to will be ('root', 'node', '17136214') - * @param name string - * @param callback FunctionContext (optional) - */ - public static Functions.FunctionContext CreateLink(String from, String to, String name, - @Nullable Functions.FunctionContext callback) { - Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) - .setType(Link.TYPE).build(); - - Core.LinkMessage linkMessage = Core.LinkMessage.newBuilder().setMethod(Core.Method.CREATE).setName(name) - .setTo(to).build(); - - Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) - .setId(from).setValue(linkMessage.toByteString()); - - if (callback != null) { - builder = builder.setCallback(callback); - } - return builder.build(); - } -} diff --git a/src/main/java/org/listware/core/provider/functions/Register.java b/src/main/java/org/listware/core/provider/functions/Register.java index 0fa3154..585af3d 100644 --- a/src/main/java/org/listware/core/provider/functions/Register.java +++ b/src/main/java/org/listware/core/provider/functions/Register.java @@ -4,15 +4,20 @@ package org.listware.core.provider.functions; import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.annotations.Persisted; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; -import org.listware.io.functions.egress.EgressReader; +import org.apache.flink.statefun.sdk.state.PersistedTable; +import org.listware.core.cmdb.RegisterMessage; import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.Constants.Namespaces; import org.listware.sdk.Functions; -import org.listware.sdk.Functions.ReplyEgress; +import org.listware.sdk.Result; import org.listware.sdk.pbcmdb.Core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.listware.core.provider.functions.object.Type; +import org.listware.core.provider.functions.object.Object; +import org.listware.core.provider.functions.object.Link; public class Register extends Base { @SuppressWarnings("unused") @@ -21,59 +26,186 @@ public class Register extends Base { public static final String TYPE = "register.system.functions.root"; public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - private EgressReader egressReader = new EgressReader(TYPE, TYPE); + private static final String MESSAGES_TABLE = "messages-table"; + private static final String STATE_TABLE = "types-state-table"; + + @Persisted + private PersistedTable messagesTable = PersistedTable.of(MESSAGES_TABLE, String.class, + RegisterMessage.class); + + @Persisted + private PersistedTable stateTable = PersistedTable.of(STATE_TABLE, String.class, Boolean.class); + + public Register() { + super(TYPE, TYPE); + } @Override public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { Core.RegisterMessage registerMessage = Core.RegisterMessage.parseFrom(functionContext.getValue()); - registerTypes(context, registerMessage); + RegisterMessage message = new RegisterMessage(registerMessage); + messagesTable.set(this.key, message); - registerObjects(context, registerMessage); + registerNext(context); } - private void registerTypes(Context context, Core.RegisterMessage registerMessage) throws Exception { - for (Core.RegisterTypeMessage registerTypeMessage : registerMessage.getTypeMessagesList()) { + private void registerRouter(Context context, Functions.FunctionContext.Builder builder) throws Exception { + Result.ReplyResult replyResult = replyResult(context); + stateTable.set(replyResult.getKey(), true); - Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) - .setType(Type.TYPE).build(); + Functions.FunctionContext functionContext = builder.build(); - ReplyEgress replyEgress = egressReader.replyEgress(); + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Router.TYPE).build(); - Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder() - .setReplyEgress(replyEgress).setId(registerTypeMessage.getId()).setFunctionType(functionType) - .setValue(registerTypeMessage.getTypeMessage().toByteString()); + Functions.FunctionContext routerFunctionContext = Functions.FunctionContext.newBuilder() + .setReplyResult(replyResult).setId(functionContext.getId()).setFunctionType(functionType) + .setValue(functionContext.toByteString()).build(); - Functions.FunctionContext newFunctionContext = builder.build(); + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(routerFunctionContext); - TypedValue newTypedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); + context.send(Router.FUNCTION_TYPE, routerFunctionContext.getId(), typedValue); + } - context.send(Type.FUNCTION_TYPE, newFunctionContext.getId(), newTypedValue); + private void registerBuilder(Context context, FunctionType functionType, Functions.FunctionContext.Builder builder) + throws Exception { + Result.ReplyResult replyResult = replyResult(context); + stateTable.set(replyResult.getKey(), true); - egressReader.wait(replyEgress.getId()); + Functions.FunctionContext functionContext = builder.setReplyResult(replyResult).build(); + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(functionContext); + + context.send(functionType, functionContext.getId(), typedValue); + } + + private void registerType(Context context, Core.RegisterTypeMessage message) throws Exception { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Type.TYPE).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setId(message.getId()) + .setFunctionType(functionType).setValue(message.getTypeMessage().toByteString()); + + if (message.getRouter()) { + registerRouter(context, builder); + } else { + registerBuilder(context, Type.FUNCTION_TYPE, builder); + } + + } + + private void registerObject(Context context, Core.RegisterObjectMessage message) throws Exception { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Object.TYPE).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setId(message.getId()) + .setFunctionType(functionType).setValue(message.getObjectMessage().toByteString()); + + if (message.getRouter()) { + registerRouter(context, builder); + } else { + registerBuilder(context, Object.FUNCTION_TYPE, builder); } } - private void registerObjects(Context context, Core.RegisterMessage registerMessage) throws Exception { - for (Core.RegisterObjectMessage registerObjectMessage : registerMessage.getObjectMessagesList()) { + private void registerLink(Context context, Core.RegisterLinkMessage message) throws Exception { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Object.TYPE).build(); - Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) - .setType(Object.TYPE).build(); + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setId(message.getId()) + .setFunctionType(functionType).setValue(message.getLinkMessage().toByteString()); - ReplyEgress replyEgress = egressReader.replyEgress(); + registerBuilder(context, Link.FUNCTION_TYPE, builder); + } - Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder() - .setFunctionType(functionType).setId(registerObjectMessage.getId()).setReplyEgress(replyEgress) - .setValue(registerObjectMessage.getObjectMessage().toByteString()); + @Override + protected void onResult(Context context, Result.FunctionResult functionResult) throws Exception { + super.onResult(context, functionResult); - Functions.FunctionContext newFunctionContext = builder.build(); + String key = functionResult.getReplyEgress().getKey(); - TypedValue newTypedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); + stateTable.remove(key); - context.send(Object.FUNCTION_TYPE, newFunctionContext.getId(), newTypedValue); + registerNext(context); + } - egressReader.wait(replyEgress.getId()); + @Override + protected void onReply(Context context) throws Exception { + RegisterMessage message = messagesTable.get(this.key); + + // wait all answers + if (stateTable.keys().iterator().hasNext()) { + return; } + + // if errors - reply answer + if (errorContainer.getComplete()) { + if (!message.getTypes().isEmpty()) { + return; + } + + if (!message.getObjects().isEmpty()) { + return; + } + + if (!message.getLinks().isEmpty()) { + return; + } + } + + messagesTable.remove(this.key); + super.onReply(context); + } + + private void registerTypes(Context context) throws Exception { + RegisterMessage message = messagesTable.get(this.key); + + for (Core.RegisterTypeMessage registerMessage : message.listTypes()) { + registerType(context, registerMessage); + } + + messagesTable.set(this.key, message); + } + + private void registerObjects(Context context) throws Exception { + RegisterMessage message = messagesTable.get(this.key); + + for (Core.RegisterObjectMessage registerMessage : message.listObjects()) { + registerObject(context, registerMessage); + } + + messagesTable.set(this.key, message); + } + + private void registerLinks(Context context) throws Exception { + RegisterMessage message = messagesTable.get(this.key); + + for (Core.RegisterLinkMessage registerMessage : message.listLinks()) { + registerLink(context, registerMessage); + } + + messagesTable.set(this.key, message); + } + + private void registerNext(Context context) throws Exception { + RegisterMessage message = messagesTable.get(this.key); + + if (stateTable.keys().iterator().hasNext()) { + return; + } + + if (!errorContainer.getComplete()) { + return; + } + + if (!message.getTypes().isEmpty()) { + registerTypes(context); + } else if (!message.getObjects().isEmpty()) { + registerObjects(context); + } else if (!message.getLinks().isEmpty()) { + registerLinks(context); + } + } } diff --git a/src/main/java/org/listware/core/provider/functions/Router.java b/src/main/java/org/listware/core/provider/functions/Router.java index a7a1f27..c3749dd 100644 --- a/src/main/java/org/listware/core/provider/functions/Router.java +++ b/src/main/java/org/listware/core/provider/functions/Router.java @@ -7,12 +7,11 @@ import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.listware.io.functions.egress.EgressReader; -import org.listware.io.utils.QDSLClient; +import org.listware.io.grpc.QDSLClient; import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.Constants.Namespaces; import org.listware.sdk.Functions; -import org.listware.sdk.Functions.ReplyEgress; +import org.listware.sdk.Result; import org.listware.sdk.pbcmdb.pbqdsl.QDSL; /** @@ -20,23 +19,22 @@ import org.listware.sdk.pbcmdb.pbqdsl.QDSL; ** */ public class Router extends Base { + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(Router.class); public static final String TYPE = "router.system.functions.root"; public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - private QDSLClient client; + private QDSLClient client = new QDSLClient(); - private EgressReader egressReader = new EgressReader(TYPE, TYPE); - - public Router(QDSLClient client) { - this.client = client; + public Router() { + super(TYPE, TYPE); } @Override public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { - QDSL.Options options = QDSL.Options.newBuilder().setKey(true).build(); + QDSL.Options options = QDSL.Options.newBuilder().setId(true).build(); QDSL.Elements elements = client.qdsl(context.self().id(), options); @@ -44,9 +42,9 @@ public class Router extends Base { .toBuilder(); for (QDSL.Element element : elements.getElementsList()) { - ReplyEgress replyEgress = egressReader.replyEgress(); + Result.ReplyResult replyResult = replyResult(context); - Functions.FunctionContext newFunctionContext = builder.setReplyEgress(replyEgress).setId(element.getKey()) + Functions.FunctionContext newFunctionContext = builder.setReplyResult(replyResult).setId(element.getId()) .build(); String namespace = newFunctionContext.getFunctionType().getNamespace(); @@ -55,10 +53,7 @@ public class Router extends Base { TypedValue typedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); - LOG.info("send: " + functionType + " id " + newFunctionContext.getId()); - context.send(functionType, newFunctionContext.getId(), typedValue); - egressReader.wait(replyEgress.getId()); } } diff --git a/src/main/java/org/listware/core/provider/functions/Sync.java b/src/main/java/org/listware/core/provider/functions/Sync.java new file mode 100644 index 0000000..4969b5c --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/Sync.java @@ -0,0 +1,150 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions; + +import java.util.Iterator; +import java.util.UUID; + +import org.apache.flink.statefun.sdk.Address; +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.annotations.Persisted; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.apache.flink.statefun.sdk.state.PersistedTable; +import org.listware.core.utils.ErrorContainer; +import org.listware.io.functions.result.Egress; +import org.listware.io.functions.result.EgressReader; +import org.listware.sdk.Functions; +import org.listware.sdk.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Sync { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Sync.class); + + private static final String RESULT_TABLE = "result-table"; + private static final String REPLY_TABLE = "reply-table"; + private static final String ERRORS_TABLE = "errors-table"; + + @Persisted + private PersistedTable replyTable = PersistedTable.of(REPLY_TABLE, String.class, + EgressReader.ReplyResult.class); + + @Persisted + private PersistedTable errorsTable = PersistedTable.of(ERRORS_TABLE, String.class, + ErrorContainer.class); + + @Persisted + private PersistedTable resultTable = PersistedTable.of(RESULT_TABLE, String.class, String.class); + + private EgressReader egressReader = null; + + protected String key = null; + + protected ErrorContainer errorContainer = null; + + public Sync(String groupID, String topic) { + egressReader = new EgressReader(groupID, topic); + UUID uuid = UUID.randomUUID(); + key = uuid.toString(); + } + + protected Result.ReplyResult replyResult(Context context) { + Result.ReplyResult replyResult = egressReader.replyResult(context.self().id()); + + String key = replyResult.getKey(); + + resultTable.set(key, this.key); + + return replyResult; + } + + protected void onInit(Context context, Functions.FunctionContext functionContext) throws Exception { + if (functionContext.hasReplyResult()) { + EgressReader.ReplyResult replyResult = new EgressReader.ReplyResult(functionContext.getReplyResult()); + + if (context.caller() != null) { + replyResult.setIsEgress(false); + } else { + replyResult.setIsEgress(true); + } + + this.key = replyResult.getKey(); + + replyTable.set(this.key, replyResult); + } else { + UUID uuid = UUID.randomUUID(); + key = uuid.toString(); + } + errorContainer = new ErrorContainer(); + errorsTable.set(this.key, errorContainer); + } + + protected void onResult(Context context, Result.FunctionResult functionResult) throws Exception { + String key = functionResult.getReplyEgress().getKey(); + + this.key = resultTable.get(key); + resultTable.remove(key); + + errorContainer = errorsTable.get(this.key); + + if (!functionResult.getComplete()) { + if (errorContainer != null) { + errorContainer.appendAll(functionResult.getErrorsList()); + errorsTable.set(this.key, errorContainer); + } + } + } + + protected void onException(Context context, String message) { + errorContainer = errorsTable.get(this.key); + if (errorContainer != null) { + errorContainer.append(message); + errorsTable.set(this.key, errorContainer); + } + } + + protected void onReply(Context context) throws Exception { + errorContainer = errorsTable.get(this.key); + if (errorContainer == null) { + return; + } + + Iterator it = resultTable.values().iterator(); + + while (it.hasNext()) { + if (it.next().equals(this.key)) { + return; + } + } + + errorsTable.remove(this.key); + + EgressReader.ReplyResult replyResult = replyTable.get(this.key); + if (replyResult == null) { + return; + } + + replyTable.remove(this.key); + + reply(context, replyResult, errorContainer); + } + + protected void reply(Context context, EgressReader.ReplyResult replyResult, ErrorContainer errorContainer) + throws Exception { + Result.FunctionResult functionResult = errorContainer.toFunctionResult(replyResult); + + if (!replyResult.getIsEgress()) { + Address address = replyResult.toAddress(); + // send result to caller + context.send(address, functionResult); + } else { + // TODO egress from TypedValue to FunctionResult + TypedValue newTypedValue = TypedValue.newBuilder().setValue(functionResult.toByteString()).setHasValue(true) + .build(); + // send result to egress + context.send(Egress.EGRESS, newTypedValue); + } + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/Type.java b/src/main/java/org/listware/core/provider/functions/Type.java deleted file mode 100644 index 83f8ee6..0000000 --- a/src/main/java/org/listware/core/provider/functions/Type.java +++ /dev/null @@ -1,215 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.functions; - -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; - -import com.arangodb.ArangoGraph; -import com.arangodb.entity.VertexEntity; -import com.arangodb.entity.VertexUpdateEntity; - -import com.google.protobuf.ByteString; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.listware.io.utils.TypedValueDeserializer; -import org.listware.io.utils.Constants.Namespaces; -import org.listware.sdk.Functions; -import org.listware.sdk.pbcmdb.Core; -import org.listware.core.FunctionContext; -import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.utils.Trigger; -import org.listware.core.provider.utils.Cmdb.LinkTypes; -import org.listware.core.provider.utils.Cmdb.SystemKeys; -import org.listware.core.provider.utils.exceptions.AlreadyLinkException; -import org.listware.core.provider.utils.exceptions.NoLinkException; -import org.listware.core.provider.utils.exceptions.UnknownIdException; -import org.listware.core.provider.utils.exceptions.UnknownMethodException; - -/** - * Type CUD arangodb service for types - ** - */ -public class Type extends Arango { -// @Persisted -// private final PersistedValue TargetControllerData = PersistedValue.of("", String.class); - - private static final Logger LOG = LoggerFactory.getLogger(Type.class); - - public static final String TYPE = "types.system.functions.root"; - - public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - - public Type(ArangoGraph graph) { - super(graph); - } - - @Override - public void invoke(FunctionContext functionContext) throws Exception { - // TODO only create from 'types.root' or work with 'type' - if (!functionContext.isType() && !functionContext.isTypes()) { - throw new UnknownIdException(functionContext.getFlinkContext().self().id()); - } - - Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue()); - - switch (message.getMethod()) { - case CREATE: - create(functionContext, message); - break; - - case CREATE_CHILD: - createChild(functionContext, message); - break; - - case UPDATE: - update(functionContext, message); - break; - - case DELETE: - delete(functionContext); - break; - - case CREATE_TRIGGER: - createTrigger(functionContext, message); - break; - - case DELETE_TRIGGER: - deleteTrigger(functionContext, message); - break; - - default: - throw new UnknownMethodException(message.getMethod()); - } - } - - private void create(FunctionContext functionContext, Core.TypeMessage message) throws Exception { - if (!functionContext.isTypes()) { - throw new UnknownIdException(functionContext.getFlinkContext().self().id()); - } - - ObjectDocument baseDocument = ObjectDocument.deserialize(message.getPayload()); - baseDocument.setKey(message.getName()); - - VertexEntity vertexEntity = types.insertVertex(baseDocument); - - LOG.info("created type " + vertexEntity.getId()); - - // TODO link from types -> type - insertLink(functionContext.getDocument().getId(), vertexEntity.getId(), message.getName(), LinkTypes.TYPE); - } - - private void update(FunctionContext functionContext, Core.TypeMessage message) throws Exception { - ObjectDocument document = functionContext.getDocument(); - - ObjectDocument newDocument = ObjectDocument.deserialize(message.getPayload()); - - document.setProperties(newDocument.getProperties()); - - document.updateMeta(); - - VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), - document); - - LOG.info("updated type " + vertexUpdateDocument.getId()); - } - - // TODO delete all objects with type - private void delete(FunctionContext functionContext) throws Exception { - types.deleteVertex(functionContext.getFlinkContext().self().id()); - LOG.info("deleted type " + functionContext.getFlinkContext().self().id()); - } - - private void createChild(FunctionContext functionContext, Core.TypeMessage message) throws Exception { - ObjectDocument callerBaseDocument = null; - - if (functionContext.getFlinkContext().caller() != null) { - try { - callerBaseDocument = readContext(functionContext.getFlinkContext().caller().id()); - try { - // do not duplicate link with name - findByFromName(callerBaseDocument.getId(), message.getName()); - - throw new AlreadyLinkException(callerBaseDocument.getId(), message.getName()); - } catch (NoLinkException ignored) { - } - } catch (IllegalArgumentException ignored) { - } - } - ObjectDocument baseDocument = ObjectDocument.deserialize(message.getPayload()); - - VertexEntity vertexEntity = objects.insertVertex(baseDocument); - - // Link type -> object ($uuid) - insertLink(functionContext.getDocument().getId(), vertexEntity.getId(), vertexEntity.getKey(), - functionContext.getDocument().getKey()); - - // trigger!!! - Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(vertexEntity.getKey(), Core.Method.CREATE); - - TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); - - functionContext.getFlinkContext().send(ObjectTrigger.FUNCTION_TYPE, vertexEntity.getKey(), typedValue); - - // Link objects -> object ($uuid) - ObjectDocument objects = system.getVertex(SystemKeys.OBJECTS, ObjectDocument.class); - insertLink(objects.getId(), vertexEntity.getId(), vertexEntity.getKey(), - functionContext.getDocument().getKey()); - - // if caller, link caller -> object (name) - if (callerBaseDocument != null) { - insertLink(callerBaseDocument.getId(), vertexEntity.getId(), message.getName(), - functionContext.getDocument().getKey()); - } - } - - private void createTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception { - Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); - - ObjectDocument document = Trigger.add(functionContext.getDocument(), trigger); - - document.updateMeta(); - - VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), - document); - LOG.info("created trigger " + vertexUpdateDocument.getId()); - } - - private void deleteTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception { - Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); - - ObjectDocument document = Trigger.delete(functionContext.getDocument(), trigger); - - document.updateMeta(); - - VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), - document); - LOG.info("deleted trigger " + vertexUpdateDocument.getId()); - } - - /** - * CreateObject create new object - ** - * @param type string - * @param name string - * @param payload ByteString - * @param callback FunctionContext (optional) - */ - public static Functions.FunctionContext CreateObject(String type, String name, ByteString payload, - Functions.FunctionContext callback) { - Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) - .setType(TYPE).build(); - - Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(Core.Method.CREATE_CHILD).setName(name) - .setPayload(payload).build(); - - Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) - .setId(type).setValue(typeMessage.toByteString()); - if (callback != null) { - builder = builder.setCallback(callback); - } - return builder.build(); - } - -} diff --git a/src/main/java/org/listware/core/provider/functions/TypeTrigger.java b/src/main/java/org/listware/core/provider/functions/TypeTrigger.java deleted file mode 100644 index 12ba790..0000000 --- a/src/main/java/org/listware/core/provider/functions/TypeTrigger.java +++ /dev/null @@ -1,87 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.functions; - -import java.util.Map; - -import org.apache.flink.statefun.sdk.FunctionType; -import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; -import org.listware.io.utils.TypedValueDeserializer; -import org.listware.io.utils.Constants.Namespaces; -import org.listware.sdk.Functions; -import org.listware.sdk.pbcmdb.Core; -import org.listware.core.FunctionContext; -import org.listware.core.provider.utils.Trigger; -import org.listware.core.provider.utils.exceptions.UnknownMethodException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.arangodb.ArangoGraph; - -public class TypeTrigger extends Arango { - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(TypeTrigger.class); - - private static final String TYPE = "trigger.types.system.functions.root"; - - public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - - public TypeTrigger(ArangoGraph graph) { - super(graph); - } - - @Override - public void invoke(FunctionContext functionContext) throws Exception { - Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue()); - - switch (message.getMethod()) { - case CREATE: - try { - Map create = Trigger.getByType(functionContext.getDocument(), "create"); - - for (Trigger trigger : create.values()) { - Functions.FunctionContext pbFunctionContext = Arango.Exec( - functionContext.getFlinkContext().caller().id(), trigger.getNamespace(), trigger.getType(), - message.getMethod()); - - TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); - - FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType()); - - functionContext.getFlinkContext().send(functionType, - functionContext.getFlinkContext().caller().id(), typedValue); - } - } catch (Exception ex) { - - } - - break; - - case UPDATE: - break; - - case DELETE: - break; - - default: - throw new UnknownMethodException(message.getMethod()); - } - } - - /** - * Trigger trigger - ** - * @param type string - * @param method Method - */ - public static Functions.FunctionContext Trigger(String type, Core.Method method) { - Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) - .setType(TYPE).build(); - - Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build(); - - Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) - .setId(type).setValue(typeMessage.toByteString()); - return builder.build(); - } -} diff --git a/src/main/java/org/listware/core/provider/functions/link/AdvancedLink.java b/src/main/java/org/listware/core/provider/functions/link/AdvancedLink.java new file mode 100644 index 0000000..175dd5b --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/link/AdvancedLink.java @@ -0,0 +1,77 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions.link; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.listware.core.FunctionContext; +import org.listware.core.documents.LinkDocument; +import org.listware.core.utils.exceptions.UnknownMethodException; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.Result; +import org.listware.sdk.pbcmdb.Core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Link CUD arangodb service for links + ** + */ +public class AdvancedLink extends LinkContext { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(AdvancedLink.class); + + public static final String TYPE = "advanced.links.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public AdvancedLink() { + super(TYPE, TYPE); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case UPDATE: + update(functionContext, message); + break; + + case DELETE: + delete(functionContext, message); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + LinkDocument document = (LinkDocument) functionContext.getDocument(); + document.replaceProperties(message.getPayload()); + document.setId(functionContext.getFlinkContext().self().id()); + + document = cmdb.updateLink(functionContext.getFlinkContext(), document); + } + + private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + LinkDocument document = (LinkDocument) functionContext.getDocument(); + document.setId(functionContext.getFlinkContext().self().id()); + + cmdb.removeDocument(document); + } + + public static Functions.FunctionContext ProxyMessage(String id, Core.LinkMessage linkMessage, + Result.ReplyResult replyResult) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(AdvancedLink.TYPE).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id).setValue(linkMessage.toByteString()); + if (replyResult != null) { + builder = builder.setReplyResult(replyResult); + } + return builder.build(); + } +} \ No newline at end of file diff --git a/src/main/java/org/listware/core/provider/functions/link/LinkContext.java b/src/main/java/org/listware/core/provider/functions/link/LinkContext.java new file mode 100644 index 0000000..7566044 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/link/LinkContext.java @@ -0,0 +1,34 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions.link; + +import org.apache.flink.statefun.sdk.Context; +import org.listware.core.FunctionContext; +import org.listware.core.documents.LinkDocument; +import org.listware.core.provider.functions.Base; +import org.listware.sdk.Functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Middleware for CUD functions + ** + */ +public class LinkContext extends Base { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(LinkContext.class); + + public LinkContext(String groupID, String topic) { + super(groupID, topic); + } + + @Override + public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception { + LinkDocument document = cmdb.readLinkDocument(context.self().id()); + + FunctionContext functionContext = new FunctionContext(context, document, pbFunctionContext); + + invoke(functionContext); + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/link/LinkTrigger.java b/src/main/java/org/listware/core/provider/functions/link/LinkTrigger.java new file mode 100644 index 0000000..02fd5dc --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/link/LinkTrigger.java @@ -0,0 +1,127 @@ +/* + * Copyright 2022 + * Listware + */ + +package org.listware.core.provider.functions.link; + +import org.apache.flink.statefun.sdk.Context; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.core.FunctionContext; +import org.listware.core.cmdb.Trigger; +import org.listware.core.documents.LinkDocument; +import org.listware.core.utils.exceptions.UnknownMethodException; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.listware.sdk.pbcmdb.pbqdsl.QDSL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LinkTrigger extends LinkContext { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(LinkTrigger.class); + + public static final String TYPE = "trigger.advanced.links.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public LinkTrigger() { + super(TYPE, TYPE); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + LinkDocument document = (LinkDocument) functionContext.getDocument(); + + String from = cmdb.getTypeId(document.getFrom()); + String to = cmdb.getTypeId(document.getTo()); + + QDSL.Options options = QDSL.Options.newBuilder().setLink(true).build(); + + String query = String.format("*[?@._id == '%s'?].*[?@._id == '%s'?].types", to, from); + + QDSL.Elements elements = cmdb.qdslClient.qdsl(query, options); + + for (QDSL.Element element : elements.getElementsList()) { + LinkDocument link = LinkDocument.deserialize(element.getLink()); + + Map triggers = new HashMap<>(); + + switch (message.getMethod()) { + case CREATE: + triggers = Trigger.getByType(link, Trigger.CREATE); + break; + + case UPDATE: + triggers = Trigger.getByType(link, Trigger.UPDATE); + + break; + + case DELETE: + triggers = Trigger.getByType(link, Trigger.DELETE); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + + for (Trigger trigger : triggers.values()) { + execTrigger(functionContext.getFlinkContext(), document.getFrom(), trigger, message.getMethod()); + execTrigger(functionContext.getFlinkContext(), document.getTo(), trigger, message.getMethod()); + } + + } + } + + private void execTrigger(Context context, String id, Trigger trigger, Core.Method method) { + Functions.FunctionContext pbFunctionContext = Exec(id, trigger.getNamespace(), trigger.getType(), method); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType()); + + context.send(functionType, pbFunctionContext.getId(), typedValue); + } + + /** + * Trigger trigger + ** + * @param id string + * @param method Method + */ + public static Functions.FunctionContext Trigger(String id, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(TYPE).build(); + + Core.LinkMessage message = Core.LinkMessage.newBuilder().setMethod(method).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id).setValue(message.toByteString()); + return builder.build(); + } + + /** + * Exec function + ** + * @param id string + * @param namespace string + * @param type string + * @param method Method + */ + public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type) + .build(); + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id); + return builder.build(); + } +} diff --git a/src/main/java/org/listware/core/provider/functions/object/Link.java b/src/main/java/org/listware/core/provider/functions/object/Link.java new file mode 100644 index 0000000..93debd0 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/object/Link.java @@ -0,0 +1,163 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions.object; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.listware.core.FunctionContext; +import org.listware.core.cmdb.Trigger; +import org.listware.core.documents.LinkDocument; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.functions.link.AdvancedLink; +import org.listware.core.utils.exceptions.AlreadyTriggerException; +import org.listware.core.utils.exceptions.UnknownIdException; +import org.listware.core.utils.exceptions.UnknownMethodException; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.Result; +import org.listware.sdk.pbcmdb.Core; + +/** + * Link CUD arangodb service for links + ** + */ +public class Link extends ObjectContext { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(Link.class); + + public static final String TYPE = "links.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public Link() { + super(TYPE, TYPE); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case CREATE: + create(functionContext, message); + break; + + case UPDATE: + update(functionContext, message); + break; + + case DELETE: + delete(functionContext, message); + break; + + case CREATE_TRIGGER: + createTrigger(functionContext, message); + break; + + case DELETE_TRIGGER: + deleteTrigger(functionContext, message); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + private void create(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + cmdb.checkFrom(functionContext.getDocument(), message.getName()); + + ObjectDocument document = cmdb.readDocument(message.getTo()); + + cmdb.createLink(functionContext.getFlinkContext(), functionContext.getDocument(), document, message.getType(), + message.getName(), message.getPayload()); + } + + private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + Result.ReplyResult replyResult = replyResult(functionContext.getFlinkContext()); + + LinkDocument document = cmdb.readLinkDocument(functionContext.getDocument().getId(), message.getName()); + + Functions.FunctionContext pbFunctionContext = AdvancedLink.ProxyMessage(document.getId(), message, replyResult); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + functionContext.getFlinkContext().send(AdvancedLink.FUNCTION_TYPE, document.getId(), typedValue); + } + + private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + Result.ReplyResult replyResult = replyResult(functionContext.getFlinkContext()); + + LinkDocument document = cmdb.readLinkDocument(functionContext.getDocument().getId(), message.getName()); + + Functions.FunctionContext pbFunctionContext = AdvancedLink.ProxyMessage(document.getId(), message, replyResult); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + functionContext.getFlinkContext().send(AdvancedLink.FUNCTION_TYPE, document.getId(), typedValue); + } + + private void createTrigger(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); + + ObjectDocument from = functionContext.getDocument(); + ObjectDocument to = cmdb.readDocument(message.getTo()); + + LinkDocument document = null; + try { + document = cmdb.readLinkDocument(from.getId(), to.getKey()); + document = Trigger.add(document, trigger); + document = cmdb.updateLinkDocument(document); + } catch (AlreadyTriggerException ex) { + throw ex; + } catch (Exception ex) { + document = Trigger.add(new LinkDocument(), trigger); + document = cmdb.createLink(from, to, "trigger", to.getKey(), document.serialize()); + } + + LOG.info("created link trigger " + document.getId()); + } + + private void deleteTrigger(FunctionContext functionContext, Core.LinkMessage message) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); + + ObjectDocument from = functionContext.getDocument(); + ObjectDocument to = cmdb.readDocument(message.getTo()); + + LinkDocument document = cmdb.readLinkDocument(from.getId(), to.getKey()); + document = Trigger.delete(document, trigger); + document = cmdb.updateLinkDocument(document); + + LOG.info("deleted type trigger " + document.getId()); + } + + /** + * CreateLink create link 'from' -> 'to' with 'name' + ** + * @param from will be ('system/root', 'types/node', '/$uuid') + * @param to will be ('system/root', 'types/node', '/$uuid') + * @param name string + */ + public static Functions.FunctionContext CreateLink(String from, String to, String name) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(Link.TYPE).build(); + + Core.LinkMessage linkMessage = Core.LinkMessage.newBuilder().setMethod(Core.Method.CREATE).setName(name) + .setTo(to).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(from).setValue(linkMessage.toByteString()); + return builder.build(); + } +} diff --git a/src/main/java/org/listware/core/provider/functions/Object.java b/src/main/java/org/listware/core/provider/functions/object/Object.java similarity index 70% rename from src/main/java/org/listware/core/provider/functions/Object.java rename to src/main/java/org/listware/core/provider/functions/object/Object.java index 6840102..cabd031 100644 --- a/src/main/java/org/listware/core/provider/functions/Object.java +++ b/src/main/java/org/listware/core/provider/functions/object/Object.java @@ -1,41 +1,41 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.functions; +package org.listware.core.provider.functions.object; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; -import com.arangodb.ArangoGraph; -import com.arangodb.entity.VertexUpdateEntity; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.listware.core.FunctionContext; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.utils.exceptions.UnknownIdException; +import org.listware.core.utils.exceptions.UnknownMethodException; import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.Constants.Namespaces; import org.listware.sdk.Functions; +import org.listware.sdk.Result; import org.listware.sdk.pbcmdb.Core; -import org.listware.core.FunctionContext; -import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.utils.exceptions.UnknownIdException; -import org.listware.core.provider.utils.exceptions.UnknownMethodException; /** * Object CUD arangodb service for objects ** */ -public class Object extends Arango { +public class Object extends ObjectContext { + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(Object.class); public static final String TYPE = "objects.system.functions.root"; public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - public Object(ArangoGraph graph) { - super(graph); + public Object() { + super(TYPE, TYPE); } @Override public void invoke(FunctionContext functionContext) throws Exception { + Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue()); switch (message.getMethod()) { @@ -57,8 +57,10 @@ public class Object extends Arango { } private void createChild(FunctionContext functionContext, Core.ObjectMessage message) throws Exception { + Result.ReplyResult replyResult = replyResult(functionContext.getFlinkContext()); + Functions.FunctionContext pbFunctionContext = Type.CreateObject(message.getType(), message.getName(), - message.getPayload(), functionContext.getCallback()); + message.getPayload(), replyResult); TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); @@ -71,25 +73,18 @@ public class Object extends Arango { } ObjectDocument document = functionContext.getDocument(); - - ObjectDocument newDocument = ObjectDocument.deserialize(message.getPayload()); - - document.setProperties(newDocument.getProperties()); - - document.updateMeta(); - - VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(), - document); - - LOG.info("updated object " + vertexUpdateDocument.getId()); + document.replaceProperties(message.getPayload()); + document.setId(functionContext.getFlinkContext().self().id()); + document = cmdb.updateObject(functionContext.getFlinkContext(), document); } private void delete(FunctionContext functionContext) throws Exception { if (!functionContext.isObject()) { throw new UnknownIdException(functionContext.getFlinkContext().self().id()); } + ObjectDocument document = functionContext.getDocument(); + document.setId(functionContext.getFlinkContext().self().id()); - objects.deleteVertex(functionContext.getFlinkContext().self().id()); - LOG.info("deleted object " + functionContext.getFlinkContext().self().id()); + cmdb.removeObject(functionContext.getFlinkContext(), document); } } diff --git a/src/main/java/org/listware/core/provider/functions/object/ObjectContext.java b/src/main/java/org/listware/core/provider/functions/object/ObjectContext.java new file mode 100644 index 0000000..43c798e --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/object/ObjectContext.java @@ -0,0 +1,36 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions.object; + +import org.apache.flink.statefun.sdk.Context; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.listware.core.FunctionContext; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.provider.functions.Base; +import org.listware.sdk.Functions; + +/** + * Middleware for CUD functions + ** + */ +public class ObjectContext extends Base { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(ObjectContext.class); + + + public ObjectContext(String groupID, String topic) { + super(groupID, topic); + } + + @Override + public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception { + ObjectDocument document = cmdb.readDocument(context.self().id()); + + FunctionContext functionContext = new FunctionContext(context, document, pbFunctionContext); + + invoke(functionContext); + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/ObjectTrigger.java b/src/main/java/org/listware/core/provider/functions/object/ObjectTrigger.java similarity index 64% rename from src/main/java/org/listware/core/provider/functions/ObjectTrigger.java rename to src/main/java/org/listware/core/provider/functions/object/ObjectTrigger.java index 0c83bfe..5ac2982 100644 --- a/src/main/java/org/listware/core/provider/functions/ObjectTrigger.java +++ b/src/main/java/org/listware/core/provider/functions/object/ObjectTrigger.java @@ -1,21 +1,18 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.functions; +package org.listware.core.provider.functions.object; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.core.FunctionContext; import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.Constants.Namespaces; import org.listware.sdk.Functions; import org.listware.sdk.pbcmdb.Core; -import org.listware.sdk.pbcmdb.pbqdsl.QDSL; -import org.listware.core.FunctionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.arangodb.ArangoGraph; - -public class ObjectTrigger extends Arango { +public class ObjectTrigger extends ObjectContext { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(ObjectTrigger.class); @@ -23,27 +20,20 @@ public class ObjectTrigger extends Arango { public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); - public ObjectTrigger(ArangoGraph graph) { - super(graph); + public ObjectTrigger() { + super(TYPE, TYPE); } @Override public void invoke(FunctionContext functionContext) throws Exception { Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue()); - QDSL.Options options = QDSL.Options.newBuilder().setType(true).build(); + String typeId = cmdb.getTypeId(functionContext.getFlinkContext().self().id()); + Functions.FunctionContext pbFunctionContext = TypeTrigger.Trigger(typeId, message.getMethod()); - String query = String.format("%s.objects", functionContext.getFlinkContext().self().id()); + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); - QDSL.Elements elements = client.qdsl(query, options); - - for (QDSL.Element element : elements.getElementsList()) { - Functions.FunctionContext pbFunctionContext = TypeTrigger.Trigger(element.getType(), message.getMethod()); - - TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); - - functionContext.getFlinkContext().send(TypeTrigger.FUNCTION_TYPE, element.getType(), typedValue); - } + functionContext.getFlinkContext().send(TypeTrigger.FUNCTION_TYPE, pbFunctionContext.getId(), typedValue); } /** diff --git a/src/main/java/org/listware/core/provider/functions/object/Type.java b/src/main/java/org/listware/core/provider/functions/object/Type.java new file mode 100644 index 0000000..7e8572d --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/object/Type.java @@ -0,0 +1,171 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions.object; + +import org.apache.flink.statefun.sdk.FunctionType; + +import com.google.protobuf.ByteString; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.listware.core.FunctionContext; +import org.listware.core.cmdb.Trigger; +import org.listware.core.documents.ObjectDocument; +import org.listware.core.utils.exceptions.UnknownIdException; +import org.listware.core.utils.exceptions.UnknownMethodException; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.Result; +import org.listware.sdk.pbcmdb.Core; + +/** + * Type CUD arangodb service for types + ** + */ +public class Type extends ObjectContext { + private static final Logger LOG = LoggerFactory.getLogger(Type.class); + + public static final String TYPE = "types.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public Type() { + super(TYPE, TYPE); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + // TODO only create from 'types.root' or work with 'type' + if (!functionContext.isType() && !functionContext.isTypes()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + switch (message.getMethod()) { + case CREATE: + create(functionContext, message); + break; + + case CREATE_CHILD: + createChild(functionContext, message); + break; + + case UPDATE: + update(functionContext, message); + break; + + case DELETE: + delete(functionContext); + break; + + case CREATE_TRIGGER: + createTrigger(functionContext, message); + break; + + case DELETE_TRIGGER: + deleteTrigger(functionContext, message); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + } + + private void create(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + if (!functionContext.isTypes()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + ObjectDocument document = ObjectDocument.deserialize(message.getPayload()); + document.setKey(message.getName()); + + document = cmdb.createType(document); + } + + private void update(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + ObjectDocument document = functionContext.getDocument(); + document.replaceProperties(message.getPayload()); + document.setId(functionContext.getFlinkContext().self().id()); + document = cmdb.updateType(functionContext.getFlinkContext(), document); + } + + // TODO delete all objects with type + private void delete(FunctionContext functionContext) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + ObjectDocument document = functionContext.getDocument(); + document.setId(functionContext.getFlinkContext().self().id()); + + cmdb.removeType(functionContext.getFlinkContext(), document); + } + + private void createChild(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + + ObjectDocument document = ObjectDocument.deserialize(message.getPayload()); + ObjectDocument type = functionContext.getDocument(); + + if (functionContext.getFlinkContext().caller() != null) { + ObjectDocument parent = cmdb.readDocument(functionContext.getFlinkContext().caller().id()); + document = cmdb.createObject(functionContext.getFlinkContext(), type, parent, document, message.getName()); + } else { + document = cmdb.createObject(functionContext.getFlinkContext(), type, document); + } + } + + private void createTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); + ObjectDocument document = Trigger.add(functionContext.getDocument(), trigger); + + document = cmdb.updateType(functionContext.getFlinkContext(), document); + + LOG.info("created type trigger " + document.getId()); + } + + private void deleteTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception { + if (!functionContext.isType()) { + throw new UnknownIdException(functionContext.getFlinkContext().self().id()); + } + Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload()); + ObjectDocument document = Trigger.delete(functionContext.getDocument(), trigger); + + document = cmdb.updateType(functionContext.getFlinkContext(), document); + + LOG.info("deleted type trigger " + document.getId()); + } + + /** + * CreateObject create new object + ** + * @param type string + * @param name string + * @param payload ByteString + * @param replyEgress ReplyEgress (optional) + */ + public static Functions.FunctionContext CreateObject(String type, String name, ByteString payload, + Result.ReplyResult replyResult) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(TYPE).build(); + + Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(Core.Method.CREATE_CHILD).setName(name) + .setPayload(payload).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(type).setValue(typeMessage.toByteString()); + if (replyResult != null) { + builder = builder.setReplyResult(replyResult); + } + return builder.build(); + } + +} diff --git a/src/main/java/org/listware/core/provider/functions/object/TypeTrigger.java b/src/main/java/org/listware/core/provider/functions/object/TypeTrigger.java new file mode 100644 index 0000000..465ab31 --- /dev/null +++ b/src/main/java/org/listware/core/provider/functions/object/TypeTrigger.java @@ -0,0 +1,102 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.provider.functions.object; + +import java.util.Map; + +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.listware.core.FunctionContext; +import org.listware.core.cmdb.Trigger; +import org.listware.core.utils.exceptions.UnknownMethodException; +import org.listware.io.utils.TypedValueDeserializer; +import org.listware.io.utils.Constants.Namespaces; +import org.listware.sdk.Functions; +import org.listware.sdk.pbcmdb.Core; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TypeTrigger extends ObjectContext { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(TypeTrigger.class); + + private static final String TYPE = "trigger.types.system.functions.root"; + + public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); + + public TypeTrigger() { + super(TYPE, TYPE); + } + + @Override + public void invoke(FunctionContext functionContext) throws Exception { + Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue()); + + Map triggers = null; + + switch (message.getMethod()) { + case CREATE: + triggers = Trigger.getByType(functionContext.getDocument(), Trigger.CREATE); + break; + + case UPDATE: + triggers = Trigger.getByType(functionContext.getDocument(), Trigger.UPDATE); + break; + + case DELETE: + triggers = Trigger.getByType(functionContext.getDocument(), Trigger.DELETE); + break; + + default: + throw new UnknownMethodException(message.getMethod()); + } + + for (Trigger trigger : triggers.values()) { + Functions.FunctionContext pbFunctionContext = Exec(functionContext.getFlinkContext().caller().id(), + trigger.getNamespace(), trigger.getType(), message.getMethod()); + + TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); + + FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType()); + + functionContext.getFlinkContext().send(functionType, functionContext.getFlinkContext().caller().id(), + typedValue); + } + } + + /** + * Trigger trigger + ** + * @param id string + * @param method Method + */ + public static Functions.FunctionContext Trigger(String id, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) + .setType(TYPE).build(); + + Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id).setValue(typeMessage.toByteString()); + return builder.build(); + } + + /** + * Exec function + ** + * @param id string + * @param namespace string + * @param type string + * @param method Method + */ + public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) { + Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type) + .build(); + + Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build(); + + Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType) + .setId(id).setValue(typeMessage.toByteString()); + return builder.build(); + } +} diff --git a/src/main/java/org/listware/core/provider/utils/Cmdb.java b/src/main/java/org/listware/core/provider/utils/Cmdb.java deleted file mode 100644 index 2512edb..0000000 --- a/src/main/java/org/listware/core/provider/utils/Cmdb.java +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.utils; - -public class Cmdb { - // Constant system cmdb keys - public class SystemKeys { - public static final String ROOT = "root"; - public static final String OBJECTS = "objects"; - public static final String TYPES = "types"; - } - - // Collection names - public class Collections { - public static final String SYSTEM = "system"; - public static final String TYPES = "types"; - public static final String OBJECTS = "objects"; - public static final String LINKS = "links"; - } - - // Link entries - public class LinkTypes { - public static final String TYPE = "type"; - } - - public class Matcher { - public static final String UUID_V4_STRING = "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}"; - public static final String NUMERIC_STRING = "\\d+"; - } -} diff --git a/src/main/java/org/listware/core/provider/utils/JsonDeserializer.java b/src/main/java/org/listware/core/provider/utils/JsonDeserializer.java deleted file mode 100644 index 6e439e4..0000000 --- a/src/main/java/org/listware/core/provider/utils/JsonDeserializer.java +++ /dev/null @@ -1,21 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.utils; - -import java.util.Map; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; - - -public class JsonDeserializer { - public static Map> triggers(Object from) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - mapper.configure(SerializationFeature.FAIL_ON_SELF_REFERENCES, false); - - TypeReference>> ref = new TypeReference>>() { - }; - return mapper.convertValue(from, ref); - } -} diff --git a/src/main/java/org/listware/core/provider/utils/Trigger.java b/src/main/java/org/listware/core/provider/utils/Trigger.java deleted file mode 100644 index 28d36d2..0000000 --- a/src/main/java/org/listware/core/provider/utils/Trigger.java +++ /dev/null @@ -1,116 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.core.provider.utils; - -import java.util.HashMap; -import java.util.Map; - -import org.listware.sdk.pbcmdb.Core; -import org.listware.core.documents.ObjectDocument; -import org.listware.core.provider.utils.exceptions.AlreadyTriggerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.annotation.JsonProperty; - -public class Trigger { - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(Trigger.class); - - private static String triggersKey = "triggers"; - - public Trigger(Core.Trigger trigger) { - this.namespace = trigger.getFunctionType().getNamespace(); - this.type = trigger.getFunctionType().getType(); - } - - @JsonProperty("namespace") - private String namespace; - @JsonProperty("type") - private String type; - - /** - * - * @return The namespace - */ - @JsonProperty("namespace") - public String getNamespace() { - return namespace; - } - - /** - * - * @return The type - */ - @JsonProperty("type") - public String getType() { - return type; - } - - public static Map getByType(ObjectDocument baseDocument, String type) throws Exception { - Map properties = baseDocument.getProperties(); - java.lang.Object object = properties.get(triggersKey); - Map> triggersMap = JsonDeserializer.triggers(object); - return triggersMap.get(type); - } - - public static ObjectDocument add(ObjectDocument baseDocument, Core.Trigger trigger) throws Exception { - String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); - - Map properties = baseDocument.getProperties(); - - Map> triggersMap; - if (properties.containsKey(triggersKey)) { - java.lang.Object object = properties.get(triggersKey); - triggersMap = JsonDeserializer.triggers(object); - } else { - triggersMap = new HashMap>(); - } - - Map triggers; - if (triggersMap.containsKey(trigger.getType())) { - triggers = triggersMap.get(trigger.getType()); - } else { - triggers = new HashMap(); - } - - if (triggers.containsKey(key)) { - throw new AlreadyTriggerException(); - } - - triggers.put(key, new Trigger(trigger)); - - triggersMap.put(trigger.getType(), triggers); - - properties.put(triggersKey, triggersMap); - - baseDocument.setProperties(properties); - return baseDocument; - } - - public static ObjectDocument delete(ObjectDocument baseDocument, Core.Trigger trigger) throws Exception { - Map properties = baseDocument.getProperties(); - java.lang.Object object = properties.get(triggersKey); - - // triggers map - Map> triggersMap = JsonDeserializer.triggers(object); - - // type map - Map triggers = triggersMap.get(trigger.getType()); - - String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType(); - - if (!triggers.containsKey(key)) { - return baseDocument; - } - - triggers.remove(key); - - triggersMap.put(trigger.getType(), triggers); - - properties.put(triggersKey, triggersMap); - - baseDocument.setProperties(properties); - return baseDocument; - } -} diff --git a/src/main/java/org/listware/core/utils/ErrorContainer.java b/src/main/java/org/listware/core/utils/ErrorContainer.java new file mode 100644 index 0000000..af2e932 --- /dev/null +++ b/src/main/java/org/listware/core/utils/ErrorContainer.java @@ -0,0 +1,59 @@ +/* Copyright 2022 Listware */ + +package org.listware.core.utils; + +import java.util.ArrayList; +import java.util.List; + +import org.listware.io.functions.result.EgressReader; +import org.listware.sdk.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ErrorContainer { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(ErrorContainer.class); + + private List errors = new ArrayList(); + private Boolean complete = true; + + public ErrorContainer() { + // POJO + } + + public Boolean getComplete() { + return complete; + } + + public void setComplete(Boolean complete) { + this.complete = complete; + } + + public List getErrors() { + return errors; + } + + public void setErrors(List errors) { + this.errors = errors; + } + + public void append(String error) { + errors.add(error); + complete = false; + } + + public void appendAll(List errors) { + for (String error : errors) { + append(error); + } + } + + public Result.FunctionResult toFunctionResult(EgressReader.ReplyResult replyEgress) { + Result.FunctionResult.Builder functionResultBuilder = Result.FunctionResult.newBuilder(); + functionResultBuilder.setComplete(getComplete()); + functionResultBuilder.addAllErrors(getErrors()); + Result.FunctionResult functionResult = functionResultBuilder.setReplyEgress(replyEgress.toProto()).build(); + return functionResult; + } + +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyLinkException.java b/src/main/java/org/listware/core/utils/exceptions/AlreadyLinkException.java similarity index 83% rename from src/main/java/org/listware/core/provider/utils/exceptions/AlreadyLinkException.java rename to src/main/java/org/listware/core/utils/exceptions/AlreadyLinkException.java index d160f2b..0a4343e 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyLinkException.java +++ b/src/main/java/org/listware/core/utils/exceptions/AlreadyLinkException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; public class AlreadyLinkException extends Exception { private static final long serialVersionUID = 1L; diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyTriggerException.java b/src/main/java/org/listware/core/utils/exceptions/AlreadyTriggerException.java similarity index 80% rename from src/main/java/org/listware/core/provider/utils/exceptions/AlreadyTriggerException.java rename to src/main/java/org/listware/core/utils/exceptions/AlreadyTriggerException.java index 1828cf5..8b02f49 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/AlreadyTriggerException.java +++ b/src/main/java/org/listware/core/utils/exceptions/AlreadyTriggerException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; public class AlreadyTriggerException extends Exception { private static final long serialVersionUID = 1L; diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/NoLinkException.java b/src/main/java/org/listware/core/utils/exceptions/NoLinkException.java similarity index 63% rename from src/main/java/org/listware/core/provider/utils/exceptions/NoLinkException.java rename to src/main/java/org/listware/core/utils/exceptions/NoLinkException.java index 9718790..c0e129d 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/NoLinkException.java +++ b/src/main/java/org/listware/core/utils/exceptions/NoLinkException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; public class NoLinkException extends Exception { private static final long serialVersionUID = 1L; @@ -8,4 +8,8 @@ public class NoLinkException extends Exception { public NoLinkException(String from, String name) { super(String.format("link %s -> %s not found: ", from, name)); } + + public NoLinkException(String from) { + super(String.format("link from %s not found: ", from)); + } } diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/PayloadNotFoundException.java b/src/main/java/org/listware/core/utils/exceptions/PayloadNotFoundException.java similarity index 80% rename from src/main/java/org/listware/core/provider/utils/exceptions/PayloadNotFoundException.java rename to src/main/java/org/listware/core/utils/exceptions/PayloadNotFoundException.java index da40339..331ce46 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/PayloadNotFoundException.java +++ b/src/main/java/org/listware/core/utils/exceptions/PayloadNotFoundException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; public class PayloadNotFoundException extends Exception { private static final long serialVersionUID = 1L; diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/TriggerNotFoundException.java b/src/main/java/org/listware/core/utils/exceptions/TriggerNotFoundException.java similarity index 79% rename from src/main/java/org/listware/core/provider/utils/exceptions/TriggerNotFoundException.java rename to src/main/java/org/listware/core/utils/exceptions/TriggerNotFoundException.java index c463826..e510a08 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/TriggerNotFoundException.java +++ b/src/main/java/org/listware/core/utils/exceptions/TriggerNotFoundException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; public class TriggerNotFoundException extends Exception { private static final long serialVersionUID = 1L; @@ -8,4 +8,4 @@ public class TriggerNotFoundException extends Exception { public TriggerNotFoundException() { super("trigger not found"); } -} \ No newline at end of file +} diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/UnknownIdException.java b/src/main/java/org/listware/core/utils/exceptions/UnknownIdException.java similarity index 81% rename from src/main/java/org/listware/core/provider/utils/exceptions/UnknownIdException.java rename to src/main/java/org/listware/core/utils/exceptions/UnknownIdException.java index d2c5b8e..7189ad7 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/UnknownIdException.java +++ b/src/main/java/org/listware/core/utils/exceptions/UnknownIdException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; public class UnknownIdException extends Exception { diff --git a/src/main/java/org/listware/core/provider/utils/exceptions/UnknownMethodException.java b/src/main/java/org/listware/core/utils/exceptions/UnknownMethodException.java similarity index 85% rename from src/main/java/org/listware/core/provider/utils/exceptions/UnknownMethodException.java rename to src/main/java/org/listware/core/utils/exceptions/UnknownMethodException.java index 36b40e7..8ee8ef6 100644 --- a/src/main/java/org/listware/core/provider/utils/exceptions/UnknownMethodException.java +++ b/src/main/java/org/listware/core/utils/exceptions/UnknownMethodException.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.core.provider.utils.exceptions; +package org.listware.core.utils.exceptions; import org.listware.sdk.pbcmdb.Core;