Add link triggers

Move arangodb to other pakage
Add register function
This commit is contained in:
fg-admin 2023-01-25 11:53:59 +03:00
parent f7eda6f989
commit 9289858320
42 changed files with 2200 additions and 1247 deletions

View File

@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.listware</groupId> <groupId>org.listware</groupId>
<artifactId>core</artifactId> <artifactId>core</artifactId>
<version>1.0</version> <version>1.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
@ -84,13 +84,13 @@
<dependency> <dependency>
<groupId>org.listware</groupId> <groupId>org.listware</groupId>
<artifactId>io</artifactId> <artifactId>io</artifactId>
<version>1.0</version> <version>1.1</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.listware</groupId> <groupId>org.listware</groupId>
<artifactId>proto</artifactId> <artifactId>proto</artifactId>
<version>1.0</version> <version>1.1</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -3,25 +3,26 @@
package org.listware.core; package org.listware.core;
import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.sdk.Functions; import org.listware.sdk.Functions;
import org.listware.io.utils.TypedValueDeserializer; import org.listware.core.cmdb.Cmdb.SystemKeys;
import org.listware.core.documents.ObjectDocument; import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.utils.Cmdb.Matcher;
import org.listware.core.provider.utils.Cmdb.SystemKeys;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class FunctionContext { public class FunctionContext {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(FunctionContext.class); private static final Logger LOG = LoggerFactory.getLogger(FunctionContext.class);
private Context flinkContext; private Context flinkContext;
private ObjectDocument document; private ObjectDocument document;
private Functions.FunctionContext functionContext; private Functions.FunctionContext functionContext;
private boolean isExecutedCallback = false;
private class Matcher {
public static final String UUID_V4_STRING = "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}";
public static final String NUMERIC_STRING = "\\d+";
}
public FunctionContext(Context context, ObjectDocument document, Functions.FunctionContext functionContext) { public FunctionContext(Context context, ObjectDocument document, Functions.FunctionContext functionContext) {
this.flinkContext = context; this.flinkContext = context;
@ -88,30 +89,4 @@ public class FunctionContext {
public boolean isType() { public boolean isType() {
return (!isRoot() && !isObjects() && !isTypes() && !isLink() && !isObject()); return (!isRoot() && !isObjects() && !isTypes() && !isLink() && !isObject());
} }
// You can execute callback only once, getCallback() to inherit func or
// Callback() after invoke
public void callback() throws Exception {
Functions.FunctionContext callback = getCallback();
if (callback != null) {
String namespace = callback.getFunctionType().getNamespace();
String type = callback.getFunctionType().getType();
FunctionType functionType = new FunctionType(namespace, type);
LOG.info("send: " + functionType + " id " + callback.getId());
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(callback);
;
flinkContext.send(functionType, callback.getId(), typedValue);
}
}
public Functions.FunctionContext getCallback() {
if (functionContext.hasCallback() && !isExecutedCallback) {
isExecutedCallback = true;
return functionContext.getCallback();
}
return null;
}
} }

View File

@ -8,14 +8,16 @@ import com.google.auto.service.AutoService;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.listware.core.provider.FunctionProvider; import org.listware.core.provider.FunctionProvider;
import org.listware.core.provider.functions.Link;
import org.listware.core.provider.functions.Log; import org.listware.core.provider.functions.Log;
import org.listware.core.provider.functions.Object;
import org.listware.core.provider.functions.ObjectTrigger;
import org.listware.core.provider.functions.Register; import org.listware.core.provider.functions.Register;
import org.listware.core.provider.functions.Router; import org.listware.core.provider.functions.Router;
import org.listware.core.provider.functions.Type; import org.listware.core.provider.functions.link.AdvancedLink;
import org.listware.core.provider.functions.TypeTrigger; import org.listware.core.provider.functions.link.LinkTrigger;
import org.listware.core.provider.functions.object.Link;
import org.listware.core.provider.functions.object.Object;
import org.listware.core.provider.functions.object.ObjectTrigger;
import org.listware.core.provider.functions.object.Type;
import org.listware.core.provider.functions.object.TypeTrigger;
@AutoService(StatefulFunctionModule.class) @AutoService(StatefulFunctionModule.class)
public final class Module implements StatefulFunctionModule { public final class Module implements StatefulFunctionModule {
@ -31,5 +33,7 @@ public final class Module implements StatefulFunctionModule {
binder.bindFunctionProvider(Router.FUNCTION_TYPE, provider); binder.bindFunctionProvider(Router.FUNCTION_TYPE, provider);
binder.bindFunctionProvider(Log.FUNCTION_TYPE, provider); binder.bindFunctionProvider(Log.FUNCTION_TYPE, provider);
binder.bindFunctionProvider(Register.FUNCTION_TYPE, provider); binder.bindFunctionProvider(Register.FUNCTION_TYPE, provider);
binder.bindFunctionProvider(AdvancedLink.FUNCTION_TYPE, provider);
binder.bindFunctionProvider(LinkTrigger.FUNCTION_TYPE, provider);
} }
} }

View File

@ -0,0 +1,407 @@
/*
* Copyright 2022
* Listware
*/
package org.listware.core.cmdb;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.core.documents.LinkDocument;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.functions.link.LinkTrigger;
import org.listware.core.provider.functions.object.ObjectTrigger;
import org.listware.core.utils.exceptions.AlreadyLinkException;
import org.listware.core.utils.exceptions.NoLinkException;
import org.listware.core.utils.exceptions.UnknownIdException;
import org.listware.io.grpc.FinderClient;
import org.listware.io.grpc.QDSLClient;
import org.listware.io.utils.TypedValueDeserializer;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.listware.sdk.pbcmdb.pbfinder.Finder;
import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
public class Cmdb {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Cmdb.class);
// Constant system cmdb keys
public class SystemKeys {
public static final String ROOT = "root";
public static final String OBJECTS = "objects";
public static final String TYPES = "types";
}
// Collection names
public class Collections {
public static final String SYSTEM = "system";
public static final String TYPES = "types";
public static final String OBJECTS = "objects";
public static final String LINKS = "links";
}
// Link entries
public class LinkTypes {
public static final String TYPE = "type";
public static final String SYSTEM = "system";
}
private ObjectClient objectClient = new ObjectClient();
public LinkClient linkClient = new LinkClient();
public QDSLClient qdslClient = new QDSLClient();
public FinderClient finderClient = new FinderClient();
public void shutdown() throws InterruptedException {
objectClient.shutdown();
linkClient.shutdown();
qdslClient.shutdown();
finderClient.shutdown();
}
// R same for types/objects/system
public ObjectDocument readDocument(String id) throws Exception {
return objectClient.readDocument(id);
}
private ObjectDocument updateDocument(ObjectDocument document) throws Exception {
document = objectClient.updateDocument(document.getId(), document.serialize());
LOG.info("updated " + document.getId());
return document;
}
// D same for types/objects/system
private void removeDocument(ObjectDocument document) throws Exception {
objectClient.removeDocument(document.getId());
LOG.info("deleted " + document.getId());
}
// R links
public LinkDocument readLinkDocument(String id) throws Exception {
return linkClient.readDocument(id);
}
public LinkDocument readLinkDocument(String from, String name) throws Exception {
Finder.Response response = finderClient.from(from, name);
if (response.getLinksCount() == 0) {
throw new NoLinkException(from, name);
}
return LinkDocument.deserialize(response.getLinks(0).getPayload());
}
// do not duplicate link with name
public void checkFrom(ObjectDocument parent, String name) throws Exception {
Finder.Response response = finderClient.from(parent.getId(), name);
if (response.getLinksCount() > 0) {
throw new AlreadyLinkException(parent.getId(), name);
}
}
public LinkDocument updateLinkDocument(LinkDocument document) throws Exception {
document = linkClient.updateDocument(document.getId(), document.serialize());
LOG.info("updated " + document.getId());
return document;
}
// D links
public void removeDocument(LinkDocument document) throws Exception {
linkClient.removeDocument(document.getId());
LOG.info("deleted " + document.getId());
}
/*******************************************************************************************/
// C for SYSTEM
public ObjectDocument createSystem(ObjectDocument document) throws Exception {
document = objectClient.createDocument(Collections.SYSTEM, document.serialize());
LOG.info("created system " + document.getId());
return document;
}
public ObjectDocument createSystem(ObjectDocument parent, ObjectDocument document) throws Exception {
document = objectClient.createDocument(Collections.SYSTEM, document.serialize());
LOG.info("created system " + document.getId());
// link root -> object
createLink(parent, document, LinkTypes.SYSTEM, document.getKey());
return document;
}
/*******************************************************************************************/
// types C
public ObjectDocument createType(ObjectDocument document) throws Exception {
document = objectClient.createDocument(Collections.TYPES, document.serialize());
ObjectDocument types = readDocument("system/types");
// link from types -> type
createLink(types, document, LinkTypes.TYPE, document.getKey());
LOG.info("created type " + document.getId());
return document;
}
public ObjectDocument updateType(Context context, ObjectDocument document) throws Exception {
document = updateDocument(document);
return document;
}
public void removeType(Context context, ObjectDocument document) throws Exception {
removeDocument(document);
}
/*******************************************************************************************/
// only from type
public ObjectDocument createObject(ObjectDocument type, ObjectDocument document) throws Exception {
ObjectDocument objects = readDocument("system/objects");
document = objectClient.createDocument(Collections.OBJECTS, document.serialize());
// link type -> object ($uuid)
createLink(type, document, type.getKey(), document.getKey());
// link objects -> object ($uuid)
createLink(objects, document, type.getKey(), document.getKey());
LOG.info("created object " + document.getId());
return document;
}
public ObjectDocument createObject(Context context, ObjectDocument type, ObjectDocument document) throws Exception {
document = createObject(type, document);
Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(document.getId(), Core.Method.CREATE);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
context.send(ObjectTrigger.FUNCTION_TYPE, document.getId(), typedValue);
return document;
}
// with parent
public ObjectDocument createObject(ObjectDocument type, ObjectDocument parent, ObjectDocument document, String name)
throws Exception {
checkFrom(parent, name);
document = createObject(type, document);
// link parent -> object ($uuid)
createLink(parent, document, type.getKey(), name);
return document;
}
public ObjectDocument createObject(Context context, ObjectDocument type, ObjectDocument parent,
ObjectDocument document, String name) throws Exception {
checkFrom(parent, name);
document = createObject(context, type, document);
// link parent -> object ($uuid)
createLink(parent, document, type.getKey(), name);
return document;
}
public ObjectDocument updateObject(Context context, ObjectDocument document) throws Exception {
document = updateDocument(document);
Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(document.getId(), Core.Method.UPDATE);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
context.send(ObjectTrigger.FUNCTION_TYPE, document.getId(), typedValue);
return document;
}
public void removeObject(Context context, ObjectDocument document) throws Exception {
removeDocument(document);
// move trigger to link type -> objects
// Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(document.getId(), Core.Method.DELETE);
//
// TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
//
// context.send(ObjectTrigger.FUNCTION_TYPE, document.getId(), typedValue);
}
// Links
public LinkDocument createLink(ObjectDocument parent, ObjectDocument document, String type, String name)
throws Exception {
checkFrom(parent, name);
LinkDocument link = new LinkDocument(parent.getId(), document.getId(), type, name);
link = linkClient.createDocument(Collections.LINKS, link.serialize());
LOG.info("created link" + link.getId());
return link;
}
public LinkDocument createLink(ObjectDocument parent, ObjectDocument document, String type, String name,
ByteString payload) throws Exception {
checkFrom(parent, name);
LinkDocument link = new LinkDocument(parent.getId(), document.getId(), type, name);
link.replaceProperties(payload);
link = linkClient.createDocument(Collections.LINKS, link.serialize());
LOG.info("created link" + link.getId());
return link;
}
public LinkDocument createLink(Context context, ObjectDocument parent, ObjectDocument document, String type,
String name, ByteString payload) throws Exception {
LinkDocument link = createLink(parent, document, type, name, payload);
Functions.FunctionContext pbFunctionContext = LinkTrigger.Trigger(link.getId(), Core.Method.CREATE);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
context.send(LinkTrigger.FUNCTION_TYPE, link.getId(), typedValue);
LOG.info("created link" + link.getId());
return link;
}
public LinkDocument updateLink(Context context, LinkDocument document) throws Exception {
document = updateLinkDocument(document);
Functions.FunctionContext pbFunctionContext = LinkTrigger.Trigger(document.getId(), Core.Method.UPDATE);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
context.send(LinkTrigger.FUNCTION_TYPE, document.getId(), typedValue);
return document;
}
public void bootstrap() throws Exception {
ObjectDocument root = null;
try {
root = readDocument("system/root");
} catch (Exception ex) {
LOG.error(ex.getLocalizedMessage());
root = new ObjectDocument(SystemKeys.ROOT);
root = createSystem(root);
}
ObjectDocument objects = null;
try {
objects = readDocument("system/objects");
} catch (Exception ex) {
LOG.error(ex.getLocalizedMessage());
objects = new ObjectDocument(SystemKeys.OBJECTS);
objects = createSystem(root, objects);
}
ObjectDocument types = null;
try {
types = readDocument("system/types");
} catch (Exception ex) {
LOG.error(ex.getLocalizedMessage());
types = new ObjectDocument(SystemKeys.TYPES);
types = createSystem(root, types);
}
ObjectDocument functionContainer = null;
try {
functionContainer = readDocument("types/function-container");
} catch (Exception ex) {
LOG.error(ex.getLocalizedMessage());
functionContainer = new ObjectDocument("function-container");
functionContainer = createType(functionContainer);
}
ObjectDocument function = null;
try {
function = readDocument("types/function");
} catch (Exception ex) {
LOG.error(ex.getLocalizedMessage());
function = new ObjectDocument("function");
function = createType(function);
}
QDSL.Options options = QDSL.Options.newBuilder().build();
QDSL.Elements elements = qdslClient.qdsl("functions.root", options);
ObjectDocument functions = null;
if (elements.getElementsCount() == 0) {
functions = new ObjectDocument();
functions = createObject(function, root, functions, "functions");
} else {
functions = readDocument(elements.getElements(0).getId());
}
elements = qdslClient.qdsl("system.functions.root", options);
ObjectDocument system = null;
if (elements.getElementsCount() == 0) {
system = new ObjectDocument();
system = createObject(functionContainer, functions, system, "system");
} else {
system = readDocument(elements.getElements(0).getId());
}
elements = qdslClient.qdsl("types.system.functions.root", options);
ObjectDocument typesFunction = null;
if (elements.getElementsCount() == 0) {
typesFunction = new ObjectDocument();
typesFunction = createObject(function, system, typesFunction, "types");
} else {
typesFunction = readDocument(elements.getElements(0).getId());
}
elements = qdslClient.qdsl("objects.system.functions.root", options);
ObjectDocument objectsFunction = null;
if (elements.getElementsCount() == 0) {
objectsFunction = new ObjectDocument();
objectsFunction = createObject(function, system, objectsFunction, "objects");
} else {
objectsFunction = readDocument(elements.getElements(0).getId());
}
elements = qdslClient.qdsl("links.system.functions.root", options);
ObjectDocument linksFunction = null;
if (elements.getElementsCount() == 0) {
linksFunction = new ObjectDocument();
linksFunction = createObject(function, system, linksFunction, "links");
} else {
linksFunction = readDocument(elements.getElements(0).getId());
}
}
public String getTypeId(String id) throws Exception {
QDSL.Options options = QDSL.Options.newBuilder().setType(true).build();
String query = String.format("*[?@._id == '%s'?].objects", id);
QDSL.Elements elements = qdslClient.qdsl(query, options);
for (QDSL.Element element : elements.getElementsList()) {
return "types/" + element.getType();
}
throw new UnknownIdException(id);
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2022
* Listware
*/
package org.listware.core.cmdb;
import org.listware.core.documents.LinkDocument;
import org.listware.core.utils.exceptions.PayloadNotFoundException;
import org.listware.core.utils.exceptions.UnknownIdException;
import org.listware.io.grpc.EdgeClient;
import org.listware.sdk.pbcmdb.Core;
import com.google.protobuf.ByteString;
public class LinkClient extends EdgeClient {
public LinkDocument createDocument(String collection, ByteString payload) throws Exception {
Core.Response resp = create(collection, payload);
return LinkDocument.deserialize(resp.getPayload());
}
public LinkDocument readDocument(String id) throws Exception {
Parser parser = new Parser(id);
Core.Response resp = read(parser.getCollection(), parser.getKey());
if (resp.getPayload().isEmpty()) {
throw new PayloadNotFoundException();
}
return LinkDocument.deserialize(resp.getPayload());
}
public LinkDocument updateDocument(String id, ByteString payload) throws Exception {
Parser parser = new Parser(id);
Core.Response resp = update(parser.getCollection(), parser.getKey(), payload);
if (resp.getPayload().isEmpty()) {
throw new PayloadNotFoundException();
}
return LinkDocument.deserialize(resp.getPayload());
}
public void removeDocument(String id) throws Exception {
Parser parser = new Parser(id);
remove(parser.getCollection(), parser.getKey());
}
class Parser {
private String collection;
private String key;
public Parser(String id) throws Exception {
String[] separated = id.split("\\/");
if (separated.length < 2) {
throw new UnknownIdException(id);
}
this.collection = separated[0];
this.key = separated[1];
}
public String getCollection() {
return collection;
}
public void setCollection(String collection) {
this.collection = collection;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2022
* Listware
*/
package org.listware.core.cmdb;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.utils.exceptions.PayloadNotFoundException;
import org.listware.core.utils.exceptions.UnknownIdException;
import org.listware.io.grpc.VertexClient;
import org.listware.sdk.pbcmdb.Core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
public class ObjectClient extends VertexClient {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ObjectClient.class);
public ObjectDocument createDocument(String collection, ByteString payload) throws Exception {
Core.Response resp = create(collection, payload);
return ObjectDocument.deserialize(resp.getPayload());
}
public ObjectDocument readDocument(String id) throws Exception {
Parser parser = new Parser(id);
Core.Response resp = read(parser.getCollection(), parser.getKey());
if (resp.getPayload().isEmpty()) {
throw new PayloadNotFoundException();
}
return ObjectDocument.deserialize(resp.getPayload());
}
public ObjectDocument updateDocument(String id, ByteString payload) throws Exception {
Parser parser = new Parser(id);
Core.Response resp = update(parser.getCollection(), parser.getKey(), payload);
if (resp.getPayload().isEmpty()) {
throw new PayloadNotFoundException();
}
return ObjectDocument.deserialize(resp.getPayload());
}
public void removeDocument(String id) throws Exception {
Parser parser = new Parser(id);
remove(parser.getCollection(), parser.getKey());
}
class Parser {
private String collection;
private String key;
public Parser(String id) throws Exception {
String[] separated = id.split("\\/");
if (separated.length < 2) {
throw new UnknownIdException(id);
}
this.collection = separated[0];
this.key = separated[1];
}
public String getCollection() {
return collection;
}
public void setCollection(String collection) {
this.collection = collection;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
}
}

View File

@ -0,0 +1,107 @@
/* Copyright 2022 Listware */
package org.listware.core.cmdb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.listware.core.provider.functions.Register;
import org.listware.sdk.pbcmdb.Core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RegisterMessage {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Register.class);
private List<byte[]> types = new ArrayList<>();
private List<byte[]> objects = new ArrayList<>();;
private List<byte[]> links = new ArrayList<>();;
public RegisterMessage() {
// POJO
}
public RegisterMessage(Core.RegisterMessage registerMessage) {
registerMessage.getTypeMessagesList().forEach(msg -> types.add(msg.toByteArray()));
registerMessage.getObjectMessagesList().forEach(msg -> objects.add(msg.toByteArray()));
registerMessage.getLinkMessagesList().forEach(msg -> links.add(msg.toByteArray()));
}
public List<byte[]> getTypes() {
return types;
}
public void setTypes(List<byte[]> types) {
this.types = types;
}
public List<byte[]> getObjects() {
return objects;
}
public void setObjects(List<byte[]> objects) {
this.objects = objects;
}
public List<byte[]> getLinks() {
return links;
}
public void setLinks(List<byte[]> links) {
this.links = links;
}
public List<Core.RegisterTypeMessage> listTypes() throws Exception {
List<Core.RegisterTypeMessage> registerMessages = new ArrayList<>();
Iterator<byte[]> iterator = types.iterator();
while (iterator.hasNext()) {
byte[] data = iterator.next();
Core.RegisterTypeMessage registerMessage = Core.RegisterTypeMessage.parseFrom(data);
registerMessages.add(registerMessage);
iterator.remove();
if (!registerMessage.getAsync()) {
return registerMessages;
}
}
return registerMessages;
}
public List<Core.RegisterObjectMessage> listObjects() throws Exception {
List<Core.RegisterObjectMessage> registerMessages = new ArrayList<>();
Iterator<byte[]> iterator = objects.iterator();
while (iterator.hasNext()) {
byte[] data = iterator.next();
Core.RegisterObjectMessage registerMessage = Core.RegisterObjectMessage.parseFrom(data);
registerMessages.add(registerMessage);
iterator.remove();
if (!registerMessage.getAsync()) {
return registerMessages;
}
}
return registerMessages;
}
public List<Core.RegisterLinkMessage> listLinks() throws Exception {
List<Core.RegisterLinkMessage> registerMessages = new ArrayList<>();
Iterator<byte[]> iterator = links.iterator();
while (iterator.hasNext()) {
byte[] data = iterator.next();
Core.RegisterLinkMessage registerMessage = Core.RegisterLinkMessage.parseFrom(data);
registerMessages.add(registerMessage);
iterator.remove();
if (!registerMessage.getAsync()) {
return registerMessages;
}
}
return registerMessages;
}
}

View File

@ -0,0 +1,209 @@
/* Copyright 2022 Listware */
package org.listware.core.cmdb;
import java.util.HashMap;
import java.util.Map;
import org.listware.core.documents.LinkDocument;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.utils.exceptions.AlreadyTriggerException;
import org.listware.core.utils.exceptions.TriggerNotFoundException;
import org.listware.sdk.pbcmdb.Core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
public class Trigger {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Trigger.class);
private static String triggersKey = "triggers";
public static final String CREATE = "create";
public static final String UPDATE = "update";
public static final String DELETE = "delete";
public Trigger() {
// POJO
}
public Trigger(Core.Trigger trigger) {
this();
this.namespace = trigger.getFunctionType().getNamespace();
this.type = trigger.getFunctionType().getType();
}
@JsonProperty("namespace")
private String namespace;
@JsonProperty("type")
private String type;
/**
*
* @return The namespace
*/
@JsonProperty("namespace")
public String getNamespace() {
return namespace;
}
/**
*
* @return The type
*/
@JsonProperty("type")
public String getType() {
return type;
}
public static Map<String, Trigger> getByType(ObjectDocument baseDocument, String type) throws Exception {
Map<String, java.lang.Object> properties = baseDocument.getProperties();
if (!properties.containsKey(triggersKey)) {
throw new TriggerNotFoundException();
}
java.lang.Object object = properties.get(triggersKey);
Map<String, Map<String, Trigger>> triggersMap = deserialize(object);
if (!triggersMap.containsKey(type)) {
throw new TriggerNotFoundException();
}
return triggersMap.get(type);
}
public static ObjectDocument add(ObjectDocument document, Core.Trigger trigger) throws Exception {
String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType();
Map<String, java.lang.Object> properties = document.getProperties();
Map<String, Map<String, Trigger>> triggersMap;
if (properties.containsKey(triggersKey)) {
java.lang.Object object = properties.get(triggersKey);
triggersMap = deserialize(object);
} else {
triggersMap = new HashMap<String, Map<String, Trigger>>();
}
Map<String, Trigger> triggers;
if (triggersMap.containsKey(trigger.getType())) {
triggers = triggersMap.get(trigger.getType());
} else {
triggers = new HashMap<String, Trigger>();
}
if (triggers.containsKey(key)) {
throw new AlreadyTriggerException();
}
triggers.put(key, new Trigger(trigger));
triggersMap.put(trigger.getType(), triggers);
properties.put(triggersKey, triggersMap);
document.updateProperties(properties);
return document;
}
public static LinkDocument add(LinkDocument document, Core.Trigger trigger) throws Exception {
String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType();
Map<String, java.lang.Object> properties = document.getProperties();
Map<String, Map<String, Trigger>> triggersMap;
if (properties.containsKey(triggersKey)) {
java.lang.Object object = properties.get(triggersKey);
triggersMap = deserialize(object);
} else {
triggersMap = new HashMap<String, Map<String, Trigger>>();
}
Map<String, Trigger> triggers;
if (triggersMap.containsKey(trigger.getType())) {
triggers = triggersMap.get(trigger.getType());
} else {
triggers = new HashMap<String, Trigger>();
}
if (triggers.containsKey(key)) {
throw new AlreadyTriggerException();
}
triggers.put(key, new Trigger(trigger));
triggersMap.put(trigger.getType(), triggers);
properties.put(triggersKey, triggersMap);
document.updateProperties(properties);
return document;
}
public static ObjectDocument delete(ObjectDocument document, Core.Trigger trigger) throws Exception {
Map<String, java.lang.Object> properties = document.getProperties();
java.lang.Object object = properties.get(triggersKey);
// triggers map
Map<String, Map<String, Trigger>> triggersMap = deserialize(object);
// type map
Map<String, Trigger> triggers = triggersMap.get(trigger.getType());
String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType();
if (!triggers.containsKey(key)) {
return document;
}
triggers.remove(key);
triggersMap.put(trigger.getType(), triggers);
properties.put(triggersKey, triggersMap);
document.updateProperties(properties);
return document;
}
public static LinkDocument delete(LinkDocument document, Core.Trigger trigger) throws Exception {
Map<String, java.lang.Object> properties = document.getProperties();
java.lang.Object object = properties.get(triggersKey);
// triggers map
Map<String, Map<String, Trigger>> triggersMap = deserialize(object);
// type map
Map<String, Trigger> triggers = triggersMap.get(trigger.getType());
String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType();
if (!triggers.containsKey(key)) {
return document;
}
triggers.remove(key);
triggersMap.put(trigger.getType(), triggers);
properties.put(triggersKey, triggersMap);
document.updateProperties(properties);
return document;
}
public static Map<String, Map<String, Trigger>> deserialize(Object from) throws Exception {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.FAIL_ON_SELF_REFERENCES, false);
TypeReference<Map<String, Map<String, Trigger>>> ref = new TypeReference<Map<String, Map<String, Trigger>>>() {
};
return mapper.convertValue(from, ref);
}
}

View File

@ -7,7 +7,7 @@ import java.util.Map;
import org.listware.core.documents.entity.DocumentFields; import org.listware.core.documents.entity.DocumentFields;
import org.listware.core.documents.entity.Name; import org.listware.core.documents.entity.Name;
import org.listware.core.documents.entity.Type; import org.listware.core.documents.entity.Type;
import org.listware.core.provider.utils.exceptions.PayloadNotFoundException; import org.listware.core.utils.exceptions.PayloadNotFoundException;
import com.arangodb.entity.From; import com.arangodb.entity.From;
import com.arangodb.entity.To; import com.arangodb.entity.To;
@ -47,30 +47,15 @@ public class LinkDocument extends ObjectDocument {
this.to = to; this.to = to;
} }
public LinkDocument(final String from, final String to, final String name, final String type) { public LinkDocument(final String from, final String to, final String type, final String name) {
this(from, to); this(from, to);
this.name = name; this.name = name;
this.type = type; this.type = type;
} }
public LinkDocument(final Map<String, Object> properties) { public LinkDocument(final Map<String, Object> properties) {
super(properties); super();
final Object tmpFrom = properties.remove(DocumentFields.FROM); replaceProperties(properties);
if (tmpFrom != null) {
from = tmpFrom.toString();
}
final Object tmpTo = properties.remove(DocumentFields.TO);
if (tmpTo != null) {
to = tmpTo.toString();
}
final Object tmpName = properties.remove(DocumentFields.NAME);
if (tmpName != null) {
name = tmpName.toString();
}
final Object tmpType = properties.remove(DocumentFields.TYPE);
if (tmpType != null) {
type = tmpType.toString();
}
} }
public String getFrom() { public String getFrom() {
@ -101,6 +86,27 @@ public class LinkDocument extends ObjectDocument {
return type; return type;
} }
@Override
public void replaceProperties(final Map<String, Object> properties) {
final Object tmpFrom = properties.remove(DocumentFields.FROM);
if (tmpFrom != null) {
from = tmpFrom.toString();
}
final Object tmpTo = properties.remove(DocumentFields.TO);
if (tmpTo != null) {
to = tmpTo.toString();
}
final Object tmpName = properties.remove(DocumentFields.NAME);
if (tmpName != null) {
name = tmpName.toString();
}
final Object tmpType = properties.remove(DocumentFields.TYPE);
if (tmpType != null) {
type = tmpType.toString();
}
super.replaceProperties(properties);
}
public void setType(String type) { public void setType(String type) {
this.type = type; this.type = type;
} }
@ -108,7 +114,7 @@ public class LinkDocument extends ObjectDocument {
@Override @Override
public String toString() { public String toString() {
return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key
+ ", from=" + from + ", to=" + to + ", properties=" + properties + "]"; + ", from=" + from + ", to=" + to + ", properties=" + this.getProperties() + "]";
} }
@Override @Override

View File

@ -8,11 +8,16 @@ import java.util.Map;
import org.listware.core.documents.entity.DocumentFields; import org.listware.core.documents.entity.DocumentFields;
import org.listware.core.documents.entity.Meta; import org.listware.core.documents.entity.Meta;
import org.listware.core.provider.utils.exceptions.PayloadNotFoundException; import org.listware.core.utils.exceptions.PayloadNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.arangodb.entity.Id; import com.arangodb.entity.Id;
import com.arangodb.entity.Key; import com.arangodb.entity.Key;
import com.arangodb.entity.Rev; import com.arangodb.entity.Rev;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -20,6 +25,9 @@ import com.github.fge.jackson.JsonLoader;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
public class ObjectDocument implements Serializable { public class ObjectDocument implements Serializable {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ObjectDocument.class);
private static final long serialVersionUID = -1824742667228719116L; private static final long serialVersionUID = -1824742667228719116L;
@Id @Id
@ -29,9 +37,9 @@ public class ObjectDocument implements Serializable {
@Rev @Rev
protected String revision; protected String revision;
@Meta @Meta
protected MetaDocument meta; private MetaDocument meta;
@JsonIgnore
protected Map<String, Object> properties; private Map<String, Object> properties;
public ObjectDocument() { public ObjectDocument() {
super(); super();
@ -44,26 +52,9 @@ public class ObjectDocument implements Serializable {
this.key = key; this.key = key;
} }
@SuppressWarnings("unchecked")
public ObjectDocument(final Map<String, Object> properties) { public ObjectDocument(final Map<String, Object> properties) {
this(); this();
final Object tmpId = properties.remove(DocumentFields.ID); replaceProperties(properties);
if (tmpId != null) {
id = tmpId.toString();
}
final Object tmpKey = properties.remove(DocumentFields.KEY);
if (tmpKey != null) {
key = tmpKey.toString();
}
final Object tmpRev = properties.remove(DocumentFields.REV);
if (tmpRev != null) {
revision = tmpRev.toString();
}
final Object tmpMeta = properties.remove(DocumentFields.META);
if (tmpMeta != null) {
meta = new MetaDocument((Map<String, Object>) tmpMeta);
}
this.properties = properties;
} }
public String getId() { public String getId() {
@ -98,21 +89,63 @@ public class ObjectDocument implements Serializable {
this.meta = meta; this.meta = meta;
} }
@JsonAnyGetter
public Map<String, Object> getProperties() { public Map<String, Object> getProperties() {
return properties; return properties;
} }
@JsonAnySetter
public void setProperties(final Map<String, Object> properties) { public void setProperties(final Map<String, Object> properties) {
this.properties = properties; this.properties = properties;
} }
@SuppressWarnings("unchecked")
public void replaceProperties(final Map<String, Object> properties) {
final Object tmpId = properties.remove(DocumentFields.ID);
if (tmpId != null) {
id = tmpId.toString();
}
final Object tmpKey = properties.remove(DocumentFields.KEY);
if (tmpKey != null) {
key = tmpKey.toString();
}
final Object tmpRev = properties.remove(DocumentFields.REV);
if (tmpRev != null) {
revision = tmpRev.toString();
}
final Object tmpMeta = properties.remove(DocumentFields.META);
if (tmpMeta != null) {
meta = new MetaDocument((Map<String, Object>) tmpMeta);
}
this.properties = properties;
meta.update();
}
public void replaceProperties(ByteString payload) throws Exception {
if (payload.isEmpty()) {
throw new PayloadNotFoundException();
}
JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8());
ObjectMapper mapper = new ObjectMapper();
TypeReference<Map<String, Object>> ref = new TypeReference<Map<String, Object>>() {
};
Map<String, Object> values = mapper.convertValue(jsonNode, ref);
replaceProperties(values);
}
public void addAttribute(final String key, final Object value) { public void addAttribute(final String key, final Object value) {
properties.put(key, value); properties.put(key, value);
meta.update();
} }
public void updateAttribute(final String key, final Object value) { public void updateAttribute(final String key, final Object value) {
if (properties.containsKey(key)) { if (properties.containsKey(key)) {
properties.put(key, value); properties.put(key, value);
meta.update();
} }
} }
@ -120,6 +153,15 @@ public class ObjectDocument implements Serializable {
return properties.get(key); return properties.get(key);
} }
public void updateProperties(final Map<String, Object> properties) {
properties.remove(DocumentFields.ID);
properties.remove(DocumentFields.KEY);
properties.remove(DocumentFields.REV);
properties.remove(DocumentFields.META);
this.properties = properties;
meta.update();
}
@Override @Override
public String toString() { public String toString() {
return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key return "BaseDocument [documentRevision=" + revision + ", documentHandle=" + id + ", documentKey=" + key
@ -184,22 +226,26 @@ public class ObjectDocument implements Serializable {
return revision.equals(other.revision); return revision.equals(other.revision);
} }
public void updateMeta() { public ByteString serialize() throws Exception {
meta.update(); ObjectMapper mapper = new ObjectMapper();
byte[] values = mapper.writeValueAsBytes(this);
return ByteString.copyFrom(values);
} }
public static ObjectDocument deserialize(ByteString payload) throws Exception { public static ObjectDocument deserialize(ByteString payload) throws Exception {
if (payload.isEmpty()) { if (payload.isEmpty()) {
throw new PayloadNotFoundException(); throw new PayloadNotFoundException();
} }
JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8()); JsonNode jsonNode = JsonLoader.fromString(payload.toStringUtf8());
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
TypeReference<Map<String, Object>> ref = new TypeReference<Map<String, Object>>() { TypeReference<Map<String, Object>> ref = new TypeReference<Map<String, Object>>() {
}; };
Map<String, Object> values = mapper.convertValue(jsonNode, ref); Map<String, Object> values = mapper.convertValue(jsonNode, ref);
return new ObjectDocument(values); return new ObjectDocument(values);
} }
} }

View File

@ -12,6 +12,7 @@ public class DocumentFields {
public static final String REV = "_rev"; public static final String REV = "_rev";
public static final String FROM = "_from"; public static final String FROM = "_from";
public static final String TO = "_to"; public static final String TO = "_to";
public static final String NAME = "_name"; public static final String NAME = "_name";
public static final String TYPE = "_type"; public static final String TYPE = "_type";
public static final String META = "_meta"; public static final String META = "_meta";

View File

@ -2,106 +2,52 @@
package org.listware.core.provider; package org.listware.core.provider;
import java.util.Arrays;
import java.util.Collection;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider; import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.listware.io.utils.Constants.Cmdb; import org.listware.core.cmdb.Cmdb;
import org.listware.io.utils.QDSLClient;
import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
import org.listware.core.documents.LinkDocument;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.functions.Link;
import org.listware.core.provider.functions.Log; import org.listware.core.provider.functions.Log;
import org.listware.core.provider.functions.Object;
import org.listware.core.provider.functions.ObjectTrigger;
import org.listware.core.provider.functions.Register; import org.listware.core.provider.functions.Register;
import org.listware.core.provider.functions.Router; import org.listware.core.provider.functions.Router;
import org.listware.core.provider.functions.Type; import org.listware.core.provider.functions.link.AdvancedLink;
import org.listware.core.provider.functions.TypeTrigger; import org.listware.core.provider.functions.link.LinkTrigger;
import org.listware.core.provider.utils.Cmdb.Collections; import org.listware.core.provider.functions.object.Link;
import org.listware.core.provider.utils.Cmdb.LinkTypes; import org.listware.core.provider.functions.object.Object;
import org.listware.core.provider.utils.Cmdb.SystemKeys; import org.listware.core.provider.functions.object.ObjectTrigger;
import org.listware.core.provider.functions.object.Type;
import org.listware.core.provider.functions.object.TypeTrigger;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.arangodb.ArangoCollection;
import com.arangodb.ArangoDB;
import com.arangodb.ArangoDatabase;
import com.arangodb.ArangoGraph;
import com.arangodb.DbName;
import com.arangodb.entity.CollectionType;
import com.arangodb.entity.DocumentCreateEntity;
import com.arangodb.entity.EdgeDefinition;
import com.arangodb.entity.KeyType;
import com.arangodb.mapping.ArangoJack;
import com.arangodb.model.CollectionCreateOptions;
import com.arangodb.model.DocumentCreateOptions;
public class FunctionProvider implements StatefulFunctionProvider { public class FunctionProvider implements StatefulFunctionProvider {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(FunctionProvider.class); private static final Logger LOG = LoggerFactory.getLogger(FunctionProvider.class);
// graph name
private static final String SYSTEM_GRAPH = "system";
// system function types
private static final String SYSTEM_TYPE = "system";
private static final String FUNCTION_CONTAINER_TYPE = "function-container";
private static final String FUNCTION_TYPE = "function";
private static final String FUNCTIONS_LINK_NAME = "functions";
private static final String SYSTEM_LINK_NAME = "system";
private static final String TYPES_FUNCTION_LINK_NAME = "types";
private static final String OBJECTS_FUNCTION_LINK_NAME = "objects";
private static final String LINKS_FUNCTION_LINK_NAME = "links";
private static final String ROUTER_FUNCTION_LINK_NAME = "router";
private static final DbName DB_NAME = DbName.of(Cmdb.DBNAME);
private QDSLClient client = new QDSLClient();
private Log log = new Log(); private Log log = new Log();
private Register register = new Register(); private Register register = new Register();
private Object object = new Object();
private ArangoGraph graph; private ObjectTrigger objectTrigger = new ObjectTrigger();
private Type type = new Type();
private Object object; private TypeTrigger typeTrigger = new TypeTrigger();
private ObjectTrigger objectTrigger; private Link link = new Link();
private Type type; private AdvancedLink advancedlink = new AdvancedLink();
private TypeTrigger typeTrigger; private LinkTrigger linkTrigger = new LinkTrigger();
private Link link; private Router router = new Router();
private Router qdsl;
private ArangoCollection system, objects, types, links;
public FunctionProvider() { public FunctionProvider() {
graph = bootstrap(); try {
Cmdb cmdb = new Cmdb();
qdsl = new Router(client); cmdb.bootstrap();
type = new Type(graph); cmdb.shutdown();
typeTrigger = new TypeTrigger(graph); } catch (Exception e) {
object = new Object(graph); LOG.error(e.getLocalizedMessage());
objectTrigger = new ObjectTrigger(graph); }
link = new Link(graph);
} }
@Override @Override
public StatefulFunction functionOfType(FunctionType functionType) { public StatefulFunction functionOfType(FunctionType functionType) {
if (functionType.equals(Router.FUNCTION_TYPE)) { if (functionType.equals(Router.FUNCTION_TYPE)) {
return qdsl; return router;
} }
if (functionType.equals(Type.FUNCTION_TYPE)) { if (functionType.equals(Type.FUNCTION_TYPE)) {
@ -130,174 +76,14 @@ public class FunctionProvider implements StatefulFunctionProvider {
return register; return register;
} }
if (functionType.equals(AdvancedLink.FUNCTION_TYPE)) {
return advancedlink;
}
if (functionType.equals(LinkTrigger.FUNCTION_TYPE)) {
return linkTrigger;
}
return null; return null;
} }
private ArangoGraph bootstrap() {
// TODO db and collections bootstrap
ArangoDB arango = new ArangoDB.Builder().host(Cmdb.ADDR, Cmdb.PORT)
// .useSsl(true).sslContext(sslContext)
.serializer(new ArangoJack()).user(Cmdb.USER).password(Cmdb.PASSWORD).build();
ArangoDatabase db = arango.db(DB_NAME);
if (!db.exists()) {
db.create();
}
system = db.collection(Collections.SYSTEM);
if (!system.exists()) {
// TODO set 'isSystem' for system
CollectionCreateOptions opts = new CollectionCreateOptions().isSystem(true);
system.create(opts);
}
types = db.collection(Collections.TYPES);
if (!types.exists()) {
types.create();
}
objects = db.collection(Collections.OBJECTS);
if (!objects.exists()) {
// TODO set 'uuid' for objects
CollectionCreateOptions opts = new CollectionCreateOptions().keyOptions(false, KeyType.uuid, null, null);
objects.create(opts);
}
links = db.collection(Collections.LINKS);
if (!links.exists()) {
// TODO set 'edge' for links
CollectionCreateOptions ops = new CollectionCreateOptions().type(CollectionType.EDGES);
links.create(ops);
}
// TODO create 'root'
if (!system.documentExists(SystemKeys.ROOT)) {
system.insertDocument(new ObjectDocument(SystemKeys.ROOT));
}
ObjectDocument root = system.getDocument(SystemKeys.ROOT, ObjectDocument.class);
// TODO create 'objects'
if (!system.documentExists(SystemKeys.OBJECTS)) {
insertSystem(root, SystemKeys.OBJECTS);
}
ObjectDocument objectsBaseDocument = system.getDocument(SystemKeys.OBJECTS, ObjectDocument.class);
// TODO create 'types'
if (!system.documentExists(SystemKeys.TYPES)) {
insertSystem(root, SystemKeys.TYPES);
}
ObjectDocument typesBaseDocument = system.getDocument(SystemKeys.TYPES, ObjectDocument.class);
// TODO create 'function-container'
if (!types.documentExists(FUNCTION_CONTAINER_TYPE)) {
insertType(typesBaseDocument, FUNCTION_CONTAINER_TYPE);
}
// TODO create 'function'
if (!types.documentExists(FUNCTION_TYPE)) {
insertType(typesBaseDocument, FUNCTION_TYPE);
}
QDSL.Options options = QDSL.Options.newBuilder().build();
// TODO create 'functions.root' from type 'function-container'
QDSL.Elements elements = client.qdsl("functions.root", options);
if (elements.getElementsCount() == 0) {
ObjectDocument typeBaseDocument = types.getDocument(FUNCTION_CONTAINER_TYPE, ObjectDocument.class);
ObjectDocument functionsBaseDocument = insertObject(typeBaseDocument, objectsBaseDocument, root,
FUNCTIONS_LINK_NAME);
ObjectDocument systemBaseDocument = insertObject(typeBaseDocument, objectsBaseDocument,
functionsBaseDocument, SYSTEM_LINK_NAME);
typeBaseDocument = types.getDocument(FUNCTION_TYPE, ObjectDocument.class);
insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, TYPES_FUNCTION_LINK_NAME);
insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, OBJECTS_FUNCTION_LINK_NAME);
insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, LINKS_FUNCTION_LINK_NAME);
insertObject(typeBaseDocument, objectsBaseDocument, systemBaseDocument, ROUTER_FUNCTION_LINK_NAME);
}
// TODO graph bootstrap
ArangoGraph graph = db.graph(SYSTEM_GRAPH);
if (!graph.exists()) {
// TODO one edge of 1 links collection
EdgeDefinition EdgeDefinition = new EdgeDefinition().collection(Collections.LINKS)
.from(Collections.SYSTEM, Collections.TYPES, Collections.OBJECTS)
.to(Collections.TYPES, Collections.OBJECTS);
Collection<EdgeDefinition> edgeDefinitions = Arrays.asList(EdgeDefinition);
db.createGraph(SYSTEM_GRAPH, edgeDefinitions);
}
return graph;
}
private ObjectDocument insertObject(ObjectDocument type, ObjectDocument object, ObjectDocument parent,
String name) {
// insert to 'objects'
DocumentCreateEntity<ObjectDocument> documentCreateEntity = objects.insertDocument(new ObjectDocument(),
new DocumentCreateOptions().returnNew(true));
String id = documentCreateEntity.getId();
String key = documentCreateEntity.getKey();
// Link type -> object ($uuid)
insertLink(type.getId(), id, key, type.getKey());
// Link objects -> object ($uuid)
insertLink(object.getId(), id, key, type.getKey());
// Link parent -> object ('name')
insertLink(parent.getId(), id, name, type.getKey());
return documentCreateEntity.getNew();
}
private void insertType(ObjectDocument typesBaseDocument, String key) {
DocumentCreateEntity<ObjectDocument> documentCreateEntity = types.insertDocument(new ObjectDocument(key));
insertLink(typesBaseDocument.getId(), documentCreateEntity.getId(), key, LinkTypes.TYPE);
}
private void insertSystem(ObjectDocument root, String key) {
DocumentCreateEntity<ObjectDocument> documentCreateEntity = system.insertDocument(new ObjectDocument(key));
insertLink(root.getId(), documentCreateEntity.getId(), key, SYSTEM_TYPE);
}
private void insertLink(String from, String to, String name, String type) {
LinkDocument baseEdgeDocument = new LinkDocument(from, to, name, type);
links.insertDocument(baseEdgeDocument);
}
SSLContext createSSLContext() throws NoSuchAlgorithmException, KeyManagementException {
final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, new TrustManager[] { new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
} }, null);
HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory());
HttpsURLConnection.setDefaultHostnameVerifier(new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
});
return sslContext;
}
} }

View File

@ -1,174 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.statefun.sdk.Context;
import com.arangodb.ArangoCursor;
import com.arangodb.ArangoEdgeCollection;
import com.arangodb.ArangoGraph;
import com.arangodb.ArangoVertexCollection;
import com.arangodb.entity.EdgeEntity;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.listware.io.utils.QDSLClient;
import org.listware.io.utils.VertexClient;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.listware.core.FunctionContext;
import org.listware.core.documents.LinkDocument;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.utils.Cmdb.Collections;
import org.listware.core.provider.utils.Cmdb.Matcher;
import org.listware.core.provider.utils.Cmdb.SystemKeys;
import org.listware.core.provider.utils.exceptions.NoLinkException;
/**
* Middleware for CUD functions
**
*/
public class Arango extends Base {
private static final Logger LOG = LoggerFactory.getLogger(Arango.class);
private VertexClient vertexClient = new VertexClient();
protected QDSLClient client = new QDSLClient();
protected ArangoGraph graph;
protected ArangoVertexCollection system, types, objects;
protected ArangoEdgeCollection links;
public Arango(ArangoGraph graph) {
this.graph = graph;
system = graph.vertexCollection(Collections.SYSTEM);
types = graph.vertexCollection(Collections.TYPES);
objects = graph.vertexCollection(Collections.OBJECTS);
links = graph.edgeCollection(Collections.LINKS);
}
@Override
public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception {
String id = context.self().id();
ObjectDocument baseDocument = readContext(id);
FunctionContext functionContext = new FunctionContext(context, baseDocument, pbFunctionContext);
invoke(functionContext);
functionContext.callback();
}
public void invoke(FunctionContext functionContext) throws Exception {
}
protected ObjectDocument readContext(String key) throws Exception {
if (key.equals(SystemKeys.ROOT) || key.equals(SystemKeys.OBJECTS) || key.equals(SystemKeys.TYPES)) {
Core.Response resp = vertexClient.read(key, Collections.SYSTEM);
return ObjectDocument.deserialize(resp.getPayload());
} else if (key.matches(Matcher.UUID_V4_STRING)) {
Core.Response resp = vertexClient.read(key, Collections.OBJECTS);
return ObjectDocument.deserialize(resp.getPayload());
} else if (key.matches(Matcher.NUMERIC_STRING)) {
Core.Response resp = vertexClient.read(key, Collections.LINKS);
return LinkDocument.deserialize(resp.getPayload());
} else {
Core.Response resp = vertexClient.read(key, Collections.TYPES);
return ObjectDocument.deserialize(resp.getPayload());
}
}
// *[?$._from == "%s" && $._name == "%s"?]
protected LinkDocument findByFromName(String from, String name) throws NoLinkException {
// QDSL.Options options = QDSL.Options.newBuilder().setObject(true).build();
//
// String query = String.format("%s.objects", functionContext.Context().self().id());
//
// QDSL.Elements elements = client.qdsl(query, options);
//
String query = "FOR t IN links FILTER t._name == @name && t._from == @from RETURN t";
Map<String, java.lang.Object> bindVars = new HashMap<String, java.lang.Object>();
bindVars.put("name", name);
bindVars.put("from", from);
ArangoCursor<LinkDocument> cursor = graph.db().query(query, bindVars, LinkDocument.class);
if (cursor.hasNext()) {
return cursor.next();
}
throw new NoLinkException(from, name);
}
private void insertLink(LinkDocument linkDocument) throws Exception {
EdgeEntity edgeEntity = links.insertEdge(linkDocument);
LOG.info("created " + edgeEntity.getId());
// pbqdsl.Options options = pbqdsl.Options.newBuilder().setType(true).build();
//
// String query = String.format("*[?$._to == '%s'?].objects", linkDocument.getFrom());
//
// pbqdsl.Elements elements = client.qdsl(query, options);
//
// if (elements.getElementsCount() > 0) {
// String type = elements.getElements(0).getType();
// LOG.info("insert: from " + type);
// }
//
// query = String.format("*[?$._to == '%s'?].objects", linkDocument.getTo());
//
// elements = client.qdsl(query, options);
//
// if (elements.getElementsCount() > 0) {
// String type = elements.getElements(0).getType();
// LOG.info("insert: to " + type);
// }
}
protected void insertLink(String from, String to, String name, String type) throws Exception {
LinkDocument linkDocument = new LinkDocument(from, to, name, type);
insertLink(linkDocument);
}
protected void insertLink(String from, String to, String name, String type, ByteString payload) throws Exception {
LinkDocument linkDocument = LinkDocument.deserialize(payload);
linkDocument.setFrom(from);
linkDocument.setTo(to);
linkDocument.setName(name);
linkDocument.setType(type);
insertLink(linkDocument);
}
/**
* Exec function
**
* @param id string
* @param namespace string
* @param type string
* @param method Method
*/
public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type)
.build();
Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(id).setValue(typeMessage.toByteString());
return builder.build();
}
}

View File

@ -5,63 +5,65 @@ package org.listware.core.provider.functions;
import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.io.functions.egress.Egress; import org.listware.core.FunctionContext;
import org.listware.core.cmdb.Cmdb;
import org.listware.sdk.Functions; import org.listware.sdk.Functions;
import org.listware.sdk.Result;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class Base implements StatefulFunction { public class Base extends Sync implements StatefulFunction {
private static final Logger LOG = LoggerFactory.getLogger(Base.class); private static final Logger LOG = LoggerFactory.getLogger(Base.class);
protected Cmdb cmdb = new Cmdb();
public Base(String groupID, String topic) {
super(groupID, topic);
}
@Override @Override
public void invoke(Context context, java.lang.Object input) { public void invoke(Context context, java.lang.Object input) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
LOG.info(context.self().toString());
if (!(input instanceof TypedValue)) {
LOG.error("unknown message received: " + input);
return;
}
Functions.FunctionResult.Builder functionResultBuilder = Functions.FunctionResult.newBuilder();
TypedValue typedValue = (TypedValue) input;
Functions.FunctionContext functionContext = null;
try { try {
functionContext = Functions.FunctionContext.parseFrom(typedValue.getValue()); LOG.info(context.self().toString());
invoke(context, functionContext); if (input instanceof TypedValue) {
TypedValue typedValue = (TypedValue) input;
functionResultBuilder.setComplete(true); Functions.FunctionContext functionContext = Functions.FunctionContext.parseFrom(typedValue.getValue());
} catch (Exception e) {
LOG.error(e.getLocalizedMessage());
functionResultBuilder.setError(e.getLocalizedMessage());
} finally {
if (functionContext.hasReplyEgress()) {
Functions.ReplyEgress replyEgress = functionContext.getReplyEgress();
Functions.FunctionResult functionResult = functionResultBuilder.setReplyEgress(replyEgress).build(); onInit(context, functionContext);
invoke(context, functionContext);
TypedValue newTypedValue = TypedValue.newBuilder().setValue(functionResult.toByteString()) } else if (input instanceof Result.FunctionResult) {
.setHasValue(true).build(); Result.FunctionResult functionResult = (Result.FunctionResult) input;
onResult(context, functionResult);
LOG.info("ReplyEgress result: " + replyEgress.getId()); } else {
LOG.error(context.self() + " unknown message received: " + input);
context.send(Egress.EGRESS, newTypedValue);
} }
long endTime = System.currentTimeMillis(); } catch (Exception e) {
LOG.error(context.self() + " " + e.getLocalizedMessage());
onException(context, e.getLocalizedMessage());
} finally {
try {
onReply(context);
LOG.info(context.self().type() + ": took " + (endTime - startTime) + " milliseconds"); long endTime = System.currentTimeMillis();
LOG.info(context.self() + ": took " + (endTime - startTime) + " milliseconds");
} catch (Exception e) {
LOG.error(context.self() + ": " + e.getLocalizedMessage());
}
} }
} }
public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception {
}
public void invoke(FunctionContext functionContext) throws Exception {
} }
} }

View File

@ -1,139 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions;
import javax.annotation.Nullable;
import org.apache.flink.statefun.sdk.FunctionType;
import com.arangodb.ArangoGraph;
import com.arangodb.entity.EdgeUpdateEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.listware.core.FunctionContext;
import org.listware.core.documents.LinkDocument;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.utils.exceptions.AlreadyLinkException;
import org.listware.core.provider.utils.exceptions.NoLinkException;
import org.listware.core.provider.utils.exceptions.UnknownIdException;
import org.listware.core.provider.utils.exceptions.UnknownMethodException;
/**
* Link CUD arangodb service for links
**
*/
public class Link extends Arango {
private static final Logger LOG = LoggerFactory.getLogger(Link.class);
public static final String TYPE = "links.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public Link(ArangoGraph graph) {
super(graph);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) {
case CREATE:
create(functionContext, message);
break;
case UPDATE:
update(functionContext, message);
break;
case DELETE:
delete(functionContext, message);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
}
private void create(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
// do not create link from link
if (functionContext.isLink()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
try {
// or by from+name
findByFromName(functionContext.getDocument().getId(), message.getName());
throw new AlreadyLinkException(functionContext.getDocument().getId(), message.getName());
} catch (NoLinkException ignored) {
}
ObjectDocument baseDocument = readContext(message.getTo());
// FIXME _type?
// types -> type == `Cmdb.TYPE_TYPE`
// type -> object == type
// object1 -> object2 == type of object2
insertLink(functionContext.getDocument().getId(), baseDocument.getId(), message.getName(), message.getType(),
message.getPayload());
}
private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
LinkDocument document = null;
if (!functionContext.isLink()) {
document = findByFromName(functionContext.getDocument().getId(), message.getName());
} else {
document = (LinkDocument) functionContext.getDocument();
}
LinkDocument newDocument = LinkDocument.deserialize(message.getPayload());
document.setProperties(newDocument.getProperties());
document.updateMeta();
EdgeUpdateEntity edgeUpdateEntity = links.replaceEdge(document.getKey(), document);
LOG.info("updated link " + edgeUpdateEntity.getId());
}
private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
ObjectDocument prevLinkDocument = functionContext.getDocument();
if (!functionContext.isLink()) {
prevLinkDocument = findByFromName(functionContext.getDocument().getId(), message.getName());
}
links.deleteEdge(prevLinkDocument.getKey());
LOG.info("deleted link " + prevLinkDocument.getId());
}
/**
* CreateLink create link 'from' -> 'to' with 'name'
**
* @param from will be ('root', 'node', '17136214')
* @param to will be ('root', 'node', '17136214')
* @param name string
* @param callback FunctionContext (optional)
*/
public static Functions.FunctionContext CreateLink(String from, String to, String name,
@Nullable Functions.FunctionContext callback) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(Link.TYPE).build();
Core.LinkMessage linkMessage = Core.LinkMessage.newBuilder().setMethod(Core.Method.CREATE).setName(name)
.setTo(to).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(from).setValue(linkMessage.toByteString());
if (callback != null) {
builder = builder.setCallback(callback);
}
return builder.build();
}
}

View File

@ -4,15 +4,20 @@ package org.listware.core.provider.functions;
import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.io.functions.egress.EgressReader; import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.listware.core.cmdb.RegisterMessage;
import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces; import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions; import org.listware.sdk.Functions;
import org.listware.sdk.Functions.ReplyEgress; import org.listware.sdk.Result;
import org.listware.sdk.pbcmdb.Core; import org.listware.sdk.pbcmdb.Core;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.listware.core.provider.functions.object.Type;
import org.listware.core.provider.functions.object.Object;
import org.listware.core.provider.functions.object.Link;
public class Register extends Base { public class Register extends Base {
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -21,59 +26,186 @@ public class Register extends Base {
public static final String TYPE = "register.system.functions.root"; public static final String TYPE = "register.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
private EgressReader egressReader = new EgressReader(TYPE, TYPE); private static final String MESSAGES_TABLE = "messages-table";
private static final String STATE_TABLE = "types-state-table";
@Persisted
private PersistedTable<String, RegisterMessage> messagesTable = PersistedTable.of(MESSAGES_TABLE, String.class,
RegisterMessage.class);
@Persisted
private PersistedTable<String, Boolean> stateTable = PersistedTable.of(STATE_TABLE, String.class, Boolean.class);
public Register() {
super(TYPE, TYPE);
}
@Override @Override
public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception {
Core.RegisterMessage registerMessage = Core.RegisterMessage.parseFrom(functionContext.getValue()); Core.RegisterMessage registerMessage = Core.RegisterMessage.parseFrom(functionContext.getValue());
registerTypes(context, registerMessage); RegisterMessage message = new RegisterMessage(registerMessage);
messagesTable.set(this.key, message);
registerObjects(context, registerMessage); registerNext(context);
} }
private void registerTypes(Context context, Core.RegisterMessage registerMessage) throws Exception { private void registerRouter(Context context, Functions.FunctionContext.Builder builder) throws Exception {
for (Core.RegisterTypeMessage registerTypeMessage : registerMessage.getTypeMessagesList()) { Result.ReplyResult replyResult = replyResult(context);
stateTable.set(replyResult.getKey(), true);
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) Functions.FunctionContext functionContext = builder.build();
.setType(Type.TYPE).build();
ReplyEgress replyEgress = egressReader.replyEgress(); Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(Router.TYPE).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder() Functions.FunctionContext routerFunctionContext = Functions.FunctionContext.newBuilder()
.setReplyEgress(replyEgress).setId(registerTypeMessage.getId()).setFunctionType(functionType) .setReplyResult(replyResult).setId(functionContext.getId()).setFunctionType(functionType)
.setValue(registerTypeMessage.getTypeMessage().toByteString()); .setValue(functionContext.toByteString()).build();
Functions.FunctionContext newFunctionContext = builder.build(); TypedValue typedValue = TypedValueDeserializer.fromMessageLite(routerFunctionContext);
TypedValue newTypedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); context.send(Router.FUNCTION_TYPE, routerFunctionContext.getId(), typedValue);
}
context.send(Type.FUNCTION_TYPE, newFunctionContext.getId(), newTypedValue); private void registerBuilder(Context context, FunctionType functionType, Functions.FunctionContext.Builder builder)
throws Exception {
Result.ReplyResult replyResult = replyResult(context);
stateTable.set(replyResult.getKey(), true);
egressReader.wait(replyEgress.getId()); Functions.FunctionContext functionContext = builder.setReplyResult(replyResult).build();
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(functionContext);
context.send(functionType, functionContext.getId(), typedValue);
}
private void registerType(Context context, Core.RegisterTypeMessage message) throws Exception {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(Type.TYPE).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setId(message.getId())
.setFunctionType(functionType).setValue(message.getTypeMessage().toByteString());
if (message.getRouter()) {
registerRouter(context, builder);
} else {
registerBuilder(context, Type.FUNCTION_TYPE, builder);
}
}
private void registerObject(Context context, Core.RegisterObjectMessage message) throws Exception {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(Object.TYPE).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setId(message.getId())
.setFunctionType(functionType).setValue(message.getObjectMessage().toByteString());
if (message.getRouter()) {
registerRouter(context, builder);
} else {
registerBuilder(context, Object.FUNCTION_TYPE, builder);
} }
} }
private void registerObjects(Context context, Core.RegisterMessage registerMessage) throws Exception { private void registerLink(Context context, Core.RegisterLinkMessage message) throws Exception {
for (Core.RegisterObjectMessage registerObjectMessage : registerMessage.getObjectMessagesList()) { Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(Object.TYPE).build();
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL) Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setId(message.getId())
.setType(Object.TYPE).build(); .setFunctionType(functionType).setValue(message.getLinkMessage().toByteString());
ReplyEgress replyEgress = egressReader.replyEgress(); registerBuilder(context, Link.FUNCTION_TYPE, builder);
}
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder() @Override
.setFunctionType(functionType).setId(registerObjectMessage.getId()).setReplyEgress(replyEgress) protected void onResult(Context context, Result.FunctionResult functionResult) throws Exception {
.setValue(registerObjectMessage.getObjectMessage().toByteString()); super.onResult(context, functionResult);
Functions.FunctionContext newFunctionContext = builder.build(); String key = functionResult.getReplyEgress().getKey();
TypedValue newTypedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); stateTable.remove(key);
context.send(Object.FUNCTION_TYPE, newFunctionContext.getId(), newTypedValue); registerNext(context);
}
egressReader.wait(replyEgress.getId()); @Override
protected void onReply(Context context) throws Exception {
RegisterMessage message = messagesTable.get(this.key);
// wait all answers
if (stateTable.keys().iterator().hasNext()) {
return;
} }
// if errors - reply answer
if (errorContainer.getComplete()) {
if (!message.getTypes().isEmpty()) {
return;
}
if (!message.getObjects().isEmpty()) {
return;
}
if (!message.getLinks().isEmpty()) {
return;
}
}
messagesTable.remove(this.key);
super.onReply(context);
}
private void registerTypes(Context context) throws Exception {
RegisterMessage message = messagesTable.get(this.key);
for (Core.RegisterTypeMessage registerMessage : message.listTypes()) {
registerType(context, registerMessage);
}
messagesTable.set(this.key, message);
}
private void registerObjects(Context context) throws Exception {
RegisterMessage message = messagesTable.get(this.key);
for (Core.RegisterObjectMessage registerMessage : message.listObjects()) {
registerObject(context, registerMessage);
}
messagesTable.set(this.key, message);
}
private void registerLinks(Context context) throws Exception {
RegisterMessage message = messagesTable.get(this.key);
for (Core.RegisterLinkMessage registerMessage : message.listLinks()) {
registerLink(context, registerMessage);
}
messagesTable.set(this.key, message);
}
private void registerNext(Context context) throws Exception {
RegisterMessage message = messagesTable.get(this.key);
if (stateTable.keys().iterator().hasNext()) {
return;
}
if (!errorContainer.getComplete()) {
return;
}
if (!message.getTypes().isEmpty()) {
registerTypes(context);
} else if (!message.getObjects().isEmpty()) {
registerObjects(context);
} else if (!message.getLinks().isEmpty()) {
registerLinks(context);
}
} }
} }

View File

@ -7,12 +7,11 @@ import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.listware.io.functions.egress.EgressReader; import org.listware.io.grpc.QDSLClient;
import org.listware.io.utils.QDSLClient;
import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces; import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions; import org.listware.sdk.Functions;
import org.listware.sdk.Functions.ReplyEgress; import org.listware.sdk.Result;
import org.listware.sdk.pbcmdb.pbqdsl.QDSL; import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
/** /**
@ -20,23 +19,22 @@ import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
** **
*/ */
public class Router extends Base { public class Router extends Base {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Router.class); private static final Logger LOG = LoggerFactory.getLogger(Router.class);
public static final String TYPE = "router.system.functions.root"; public static final String TYPE = "router.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
private QDSLClient client; private QDSLClient client = new QDSLClient();
private EgressReader egressReader = new EgressReader(TYPE, TYPE); public Router() {
super(TYPE, TYPE);
public Router(QDSLClient client) {
this.client = client;
} }
@Override @Override
public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception { public void invoke(Context context, Functions.FunctionContext functionContext) throws Exception {
QDSL.Options options = QDSL.Options.newBuilder().setKey(true).build(); QDSL.Options options = QDSL.Options.newBuilder().setId(true).build();
QDSL.Elements elements = client.qdsl(context.self().id(), options); QDSL.Elements elements = client.qdsl(context.self().id(), options);
@ -44,9 +42,9 @@ public class Router extends Base {
.toBuilder(); .toBuilder();
for (QDSL.Element element : elements.getElementsList()) { for (QDSL.Element element : elements.getElementsList()) {
ReplyEgress replyEgress = egressReader.replyEgress(); Result.ReplyResult replyResult = replyResult(context);
Functions.FunctionContext newFunctionContext = builder.setReplyEgress(replyEgress).setId(element.getKey()) Functions.FunctionContext newFunctionContext = builder.setReplyResult(replyResult).setId(element.getId())
.build(); .build();
String namespace = newFunctionContext.getFunctionType().getNamespace(); String namespace = newFunctionContext.getFunctionType().getNamespace();
@ -55,10 +53,7 @@ public class Router extends Base {
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext); TypedValue typedValue = TypedValueDeserializer.fromMessageLite(newFunctionContext);
LOG.info("send: " + functionType + " id " + newFunctionContext.getId());
context.send(functionType, newFunctionContext.getId(), typedValue); context.send(functionType, newFunctionContext.getId(), typedValue);
egressReader.wait(replyEgress.getId());
} }
} }

View File

@ -0,0 +1,150 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions;
import java.util.Iterator;
import java.util.UUID;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.listware.core.utils.ErrorContainer;
import org.listware.io.functions.result.Egress;
import org.listware.io.functions.result.EgressReader;
import org.listware.sdk.Functions;
import org.listware.sdk.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Sync {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Sync.class);
private static final String RESULT_TABLE = "result-table";
private static final String REPLY_TABLE = "reply-table";
private static final String ERRORS_TABLE = "errors-table";
@Persisted
private PersistedTable<String, EgressReader.ReplyResult> replyTable = PersistedTable.of(REPLY_TABLE, String.class,
EgressReader.ReplyResult.class);
@Persisted
private PersistedTable<String, ErrorContainer> errorsTable = PersistedTable.of(ERRORS_TABLE, String.class,
ErrorContainer.class);
@Persisted
private PersistedTable<String, String> resultTable = PersistedTable.of(RESULT_TABLE, String.class, String.class);
private EgressReader egressReader = null;
protected String key = null;
protected ErrorContainer errorContainer = null;
public Sync(String groupID, String topic) {
egressReader = new EgressReader(groupID, topic);
UUID uuid = UUID.randomUUID();
key = uuid.toString();
}
protected Result.ReplyResult replyResult(Context context) {
Result.ReplyResult replyResult = egressReader.replyResult(context.self().id());
String key = replyResult.getKey();
resultTable.set(key, this.key);
return replyResult;
}
protected void onInit(Context context, Functions.FunctionContext functionContext) throws Exception {
if (functionContext.hasReplyResult()) {
EgressReader.ReplyResult replyResult = new EgressReader.ReplyResult(functionContext.getReplyResult());
if (context.caller() != null) {
replyResult.setIsEgress(false);
} else {
replyResult.setIsEgress(true);
}
this.key = replyResult.getKey();
replyTable.set(this.key, replyResult);
} else {
UUID uuid = UUID.randomUUID();
key = uuid.toString();
}
errorContainer = new ErrorContainer();
errorsTable.set(this.key, errorContainer);
}
protected void onResult(Context context, Result.FunctionResult functionResult) throws Exception {
String key = functionResult.getReplyEgress().getKey();
this.key = resultTable.get(key);
resultTable.remove(key);
errorContainer = errorsTable.get(this.key);
if (!functionResult.getComplete()) {
if (errorContainer != null) {
errorContainer.appendAll(functionResult.getErrorsList());
errorsTable.set(this.key, errorContainer);
}
}
}
protected void onException(Context context, String message) {
errorContainer = errorsTable.get(this.key);
if (errorContainer != null) {
errorContainer.append(message);
errorsTable.set(this.key, errorContainer);
}
}
protected void onReply(Context context) throws Exception {
errorContainer = errorsTable.get(this.key);
if (errorContainer == null) {
return;
}
Iterator<String> it = resultTable.values().iterator();
while (it.hasNext()) {
if (it.next().equals(this.key)) {
return;
}
}
errorsTable.remove(this.key);
EgressReader.ReplyResult replyResult = replyTable.get(this.key);
if (replyResult == null) {
return;
}
replyTable.remove(this.key);
reply(context, replyResult, errorContainer);
}
protected void reply(Context context, EgressReader.ReplyResult replyResult, ErrorContainer errorContainer)
throws Exception {
Result.FunctionResult functionResult = errorContainer.toFunctionResult(replyResult);
if (!replyResult.getIsEgress()) {
Address address = replyResult.toAddress();
// send result to caller
context.send(address, functionResult);
} else {
// TODO egress from TypedValue to FunctionResult
TypedValue newTypedValue = TypedValue.newBuilder().setValue(functionResult.toByteString()).setHasValue(true)
.build();
// send result to egress
context.send(Egress.EGRESS, newTypedValue);
}
}
}

View File

@ -1,215 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import com.arangodb.ArangoGraph;
import com.arangodb.entity.VertexEntity;
import com.arangodb.entity.VertexUpdateEntity;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.listware.core.FunctionContext;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.utils.Trigger;
import org.listware.core.provider.utils.Cmdb.LinkTypes;
import org.listware.core.provider.utils.Cmdb.SystemKeys;
import org.listware.core.provider.utils.exceptions.AlreadyLinkException;
import org.listware.core.provider.utils.exceptions.NoLinkException;
import org.listware.core.provider.utils.exceptions.UnknownIdException;
import org.listware.core.provider.utils.exceptions.UnknownMethodException;
/**
* Type CUD arangodb service for types
**
*/
public class Type extends Arango {
// @Persisted
// private final PersistedValue<String> TargetControllerData = PersistedValue.of("", String.class);
private static final Logger LOG = LoggerFactory.getLogger(Type.class);
public static final String TYPE = "types.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public Type(ArangoGraph graph) {
super(graph);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
// TODO only create from 'types.root' or work with 'type'
if (!functionContext.isType() && !functionContext.isTypes()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) {
case CREATE:
create(functionContext, message);
break;
case CREATE_CHILD:
createChild(functionContext, message);
break;
case UPDATE:
update(functionContext, message);
break;
case DELETE:
delete(functionContext);
break;
case CREATE_TRIGGER:
createTrigger(functionContext, message);
break;
case DELETE_TRIGGER:
deleteTrigger(functionContext, message);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
}
private void create(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
if (!functionContext.isTypes()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
ObjectDocument baseDocument = ObjectDocument.deserialize(message.getPayload());
baseDocument.setKey(message.getName());
VertexEntity vertexEntity = types.insertVertex(baseDocument);
LOG.info("created type " + vertexEntity.getId());
// TODO link from types -> type
insertLink(functionContext.getDocument().getId(), vertexEntity.getId(), message.getName(), LinkTypes.TYPE);
}
private void update(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
ObjectDocument document = functionContext.getDocument();
ObjectDocument newDocument = ObjectDocument.deserialize(message.getPayload());
document.setProperties(newDocument.getProperties());
document.updateMeta();
VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(),
document);
LOG.info("updated type " + vertexUpdateDocument.getId());
}
// TODO delete all objects with type
private void delete(FunctionContext functionContext) throws Exception {
types.deleteVertex(functionContext.getFlinkContext().self().id());
LOG.info("deleted type " + functionContext.getFlinkContext().self().id());
}
private void createChild(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
ObjectDocument callerBaseDocument = null;
if (functionContext.getFlinkContext().caller() != null) {
try {
callerBaseDocument = readContext(functionContext.getFlinkContext().caller().id());
try {
// do not duplicate link with name
findByFromName(callerBaseDocument.getId(), message.getName());
throw new AlreadyLinkException(callerBaseDocument.getId(), message.getName());
} catch (NoLinkException ignored) {
}
} catch (IllegalArgumentException ignored) {
}
}
ObjectDocument baseDocument = ObjectDocument.deserialize(message.getPayload());
VertexEntity vertexEntity = objects.insertVertex(baseDocument);
// Link type -> object ($uuid)
insertLink(functionContext.getDocument().getId(), vertexEntity.getId(), vertexEntity.getKey(),
functionContext.getDocument().getKey());
// trigger!!!
Functions.FunctionContext pbFunctionContext = ObjectTrigger.Trigger(vertexEntity.getKey(), Core.Method.CREATE);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
functionContext.getFlinkContext().send(ObjectTrigger.FUNCTION_TYPE, vertexEntity.getKey(), typedValue);
// Link objects -> object ($uuid)
ObjectDocument objects = system.getVertex(SystemKeys.OBJECTS, ObjectDocument.class);
insertLink(objects.getId(), vertexEntity.getId(), vertexEntity.getKey(),
functionContext.getDocument().getKey());
// if caller, link caller -> object (name)
if (callerBaseDocument != null) {
insertLink(callerBaseDocument.getId(), vertexEntity.getId(), message.getName(),
functionContext.getDocument().getKey());
}
}
private void createTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload());
ObjectDocument document = Trigger.add(functionContext.getDocument(), trigger);
document.updateMeta();
VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(),
document);
LOG.info("created trigger " + vertexUpdateDocument.getId());
}
private void deleteTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload());
ObjectDocument document = Trigger.delete(functionContext.getDocument(), trigger);
document.updateMeta();
VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(),
document);
LOG.info("deleted trigger " + vertexUpdateDocument.getId());
}
/**
* CreateObject create new object
**
* @param type string
* @param name string
* @param payload ByteString
* @param callback FunctionContext (optional)
*/
public static Functions.FunctionContext CreateObject(String type, String name, ByteString payload,
Functions.FunctionContext callback) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(TYPE).build();
Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(Core.Method.CREATE_CHILD).setName(name)
.setPayload(payload).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(type).setValue(typeMessage.toByteString());
if (callback != null) {
builder = builder.setCallback(callback);
}
return builder.build();
}
}

View File

@ -1,87 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions;
import java.util.Map;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.listware.core.FunctionContext;
import org.listware.core.provider.utils.Trigger;
import org.listware.core.provider.utils.exceptions.UnknownMethodException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.arangodb.ArangoGraph;
public class TypeTrigger extends Arango {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TypeTrigger.class);
private static final String TYPE = "trigger.types.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public TypeTrigger(ArangoGraph graph) {
super(graph);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) {
case CREATE:
try {
Map<String, Trigger> create = Trigger.getByType(functionContext.getDocument(), "create");
for (Trigger trigger : create.values()) {
Functions.FunctionContext pbFunctionContext = Arango.Exec(
functionContext.getFlinkContext().caller().id(), trigger.getNamespace(), trigger.getType(),
message.getMethod());
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType());
functionContext.getFlinkContext().send(functionType,
functionContext.getFlinkContext().caller().id(), typedValue);
}
} catch (Exception ex) {
}
break;
case UPDATE:
break;
case DELETE:
break;
default:
throw new UnknownMethodException(message.getMethod());
}
}
/**
* Trigger trigger
**
* @param type string
* @param method Method
*/
public static Functions.FunctionContext Trigger(String type, Core.Method method) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(TYPE).build();
Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(type).setValue(typeMessage.toByteString());
return builder.build();
}
}

View File

@ -0,0 +1,77 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions.link;
import org.apache.flink.statefun.sdk.FunctionType;
import org.listware.core.FunctionContext;
import org.listware.core.documents.LinkDocument;
import org.listware.core.utils.exceptions.UnknownMethodException;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.Result;
import org.listware.sdk.pbcmdb.Core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Link CUD arangodb service for links
**
*/
public class AdvancedLink extends LinkContext {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(AdvancedLink.class);
public static final String TYPE = "advanced.links.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public AdvancedLink() {
super(TYPE, TYPE);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) {
case UPDATE:
update(functionContext, message);
break;
case DELETE:
delete(functionContext, message);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
}
private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
LinkDocument document = (LinkDocument) functionContext.getDocument();
document.replaceProperties(message.getPayload());
document.setId(functionContext.getFlinkContext().self().id());
document = cmdb.updateLink(functionContext.getFlinkContext(), document);
}
private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
LinkDocument document = (LinkDocument) functionContext.getDocument();
document.setId(functionContext.getFlinkContext().self().id());
cmdb.removeDocument(document);
}
public static Functions.FunctionContext ProxyMessage(String id, Core.LinkMessage linkMessage,
Result.ReplyResult replyResult) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(AdvancedLink.TYPE).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(id).setValue(linkMessage.toByteString());
if (replyResult != null) {
builder = builder.setReplyResult(replyResult);
}
return builder.build();
}
}

View File

@ -0,0 +1,34 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions.link;
import org.apache.flink.statefun.sdk.Context;
import org.listware.core.FunctionContext;
import org.listware.core.documents.LinkDocument;
import org.listware.core.provider.functions.Base;
import org.listware.sdk.Functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Middleware for CUD functions
**
*/
public class LinkContext extends Base {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(LinkContext.class);
public LinkContext(String groupID, String topic) {
super(groupID, topic);
}
@Override
public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception {
LinkDocument document = cmdb.readLinkDocument(context.self().id());
FunctionContext functionContext = new FunctionContext(context, document, pbFunctionContext);
invoke(functionContext);
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright 2022
* Listware
*/
package org.listware.core.provider.functions.link;
import org.apache.flink.statefun.sdk.Context;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.core.FunctionContext;
import org.listware.core.cmdb.Trigger;
import org.listware.core.documents.LinkDocument;
import org.listware.core.utils.exceptions.UnknownMethodException;
import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LinkTrigger extends LinkContext {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(LinkTrigger.class);
public static final String TYPE = "trigger.advanced.links.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public LinkTrigger() {
super(TYPE, TYPE);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue());
LinkDocument document = (LinkDocument) functionContext.getDocument();
String from = cmdb.getTypeId(document.getFrom());
String to = cmdb.getTypeId(document.getTo());
QDSL.Options options = QDSL.Options.newBuilder().setLink(true).build();
String query = String.format("*[?@._id == '%s'?].*[?@._id == '%s'?].types", to, from);
QDSL.Elements elements = cmdb.qdslClient.qdsl(query, options);
for (QDSL.Element element : elements.getElementsList()) {
LinkDocument link = LinkDocument.deserialize(element.getLink());
Map<String, Trigger> triggers = new HashMap<>();
switch (message.getMethod()) {
case CREATE:
triggers = Trigger.getByType(link, Trigger.CREATE);
break;
case UPDATE:
triggers = Trigger.getByType(link, Trigger.UPDATE);
break;
case DELETE:
triggers = Trigger.getByType(link, Trigger.DELETE);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
for (Trigger trigger : triggers.values()) {
execTrigger(functionContext.getFlinkContext(), document.getFrom(), trigger, message.getMethod());
execTrigger(functionContext.getFlinkContext(), document.getTo(), trigger, message.getMethod());
}
}
}
private void execTrigger(Context context, String id, Trigger trigger, Core.Method method) {
Functions.FunctionContext pbFunctionContext = Exec(id, trigger.getNamespace(), trigger.getType(), method);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType());
context.send(functionType, pbFunctionContext.getId(), typedValue);
}
/**
* Trigger trigger
**
* @param id string
* @param method Method
*/
public static Functions.FunctionContext Trigger(String id, Core.Method method) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(TYPE).build();
Core.LinkMessage message = Core.LinkMessage.newBuilder().setMethod(method).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(id).setValue(message.toByteString());
return builder.build();
}
/**
* Exec function
**
* @param id string
* @param namespace string
* @param type string
* @param method Method
*/
public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type)
.build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(id);
return builder.build();
}
}

View File

@ -0,0 +1,163 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions.object;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.listware.core.FunctionContext;
import org.listware.core.cmdb.Trigger;
import org.listware.core.documents.LinkDocument;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.functions.link.AdvancedLink;
import org.listware.core.utils.exceptions.AlreadyTriggerException;
import org.listware.core.utils.exceptions.UnknownIdException;
import org.listware.core.utils.exceptions.UnknownMethodException;
import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.Result;
import org.listware.sdk.pbcmdb.Core;
/**
* Link CUD arangodb service for links
**
*/
public class Link extends ObjectContext {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Link.class);
public static final String TYPE = "links.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public Link() {
super(TYPE, TYPE);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
Core.LinkMessage message = Core.LinkMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) {
case CREATE:
create(functionContext, message);
break;
case UPDATE:
update(functionContext, message);
break;
case DELETE:
delete(functionContext, message);
break;
case CREATE_TRIGGER:
createTrigger(functionContext, message);
break;
case DELETE_TRIGGER:
deleteTrigger(functionContext, message);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
}
private void create(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
cmdb.checkFrom(functionContext.getDocument(), message.getName());
ObjectDocument document = cmdb.readDocument(message.getTo());
cmdb.createLink(functionContext.getFlinkContext(), functionContext.getDocument(), document, message.getType(),
message.getName(), message.getPayload());
}
private void update(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
Result.ReplyResult replyResult = replyResult(functionContext.getFlinkContext());
LinkDocument document = cmdb.readLinkDocument(functionContext.getDocument().getId(), message.getName());
Functions.FunctionContext pbFunctionContext = AdvancedLink.ProxyMessage(document.getId(), message, replyResult);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
functionContext.getFlinkContext().send(AdvancedLink.FUNCTION_TYPE, document.getId(), typedValue);
}
private void delete(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
Result.ReplyResult replyResult = replyResult(functionContext.getFlinkContext());
LinkDocument document = cmdb.readLinkDocument(functionContext.getDocument().getId(), message.getName());
Functions.FunctionContext pbFunctionContext = AdvancedLink.ProxyMessage(document.getId(), message, replyResult);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
functionContext.getFlinkContext().send(AdvancedLink.FUNCTION_TYPE, document.getId(), typedValue);
}
private void createTrigger(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload());
ObjectDocument from = functionContext.getDocument();
ObjectDocument to = cmdb.readDocument(message.getTo());
LinkDocument document = null;
try {
document = cmdb.readLinkDocument(from.getId(), to.getKey());
document = Trigger.add(document, trigger);
document = cmdb.updateLinkDocument(document);
} catch (AlreadyTriggerException ex) {
throw ex;
} catch (Exception ex) {
document = Trigger.add(new LinkDocument(), trigger);
document = cmdb.createLink(from, to, "trigger", to.getKey(), document.serialize());
}
LOG.info("created link trigger " + document.getId());
}
private void deleteTrigger(FunctionContext functionContext, Core.LinkMessage message) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload());
ObjectDocument from = functionContext.getDocument();
ObjectDocument to = cmdb.readDocument(message.getTo());
LinkDocument document = cmdb.readLinkDocument(from.getId(), to.getKey());
document = Trigger.delete(document, trigger);
document = cmdb.updateLinkDocument(document);
LOG.info("deleted type trigger " + document.getId());
}
/**
* CreateLink create link 'from' -> 'to' with 'name'
**
* @param from will be ('system/root', 'types/node', '/$uuid')
* @param to will be ('system/root', 'types/node', '/$uuid')
* @param name string
*/
public static Functions.FunctionContext CreateLink(String from, String to, String name) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(Link.TYPE).build();
Core.LinkMessage linkMessage = Core.LinkMessage.newBuilder().setMethod(Core.Method.CREATE).setName(name)
.setTo(to).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(from).setValue(linkMessage.toByteString());
return builder.build();
}
}

View File

@ -1,41 +1,41 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.functions; package org.listware.core.provider.functions.object;
import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import com.arangodb.ArangoGraph;
import com.arangodb.entity.VertexUpdateEntity;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.listware.core.FunctionContext;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.utils.exceptions.UnknownIdException;
import org.listware.core.utils.exceptions.UnknownMethodException;
import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces; import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions; import org.listware.sdk.Functions;
import org.listware.sdk.Result;
import org.listware.sdk.pbcmdb.Core; import org.listware.sdk.pbcmdb.Core;
import org.listware.core.FunctionContext;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.utils.exceptions.UnknownIdException;
import org.listware.core.provider.utils.exceptions.UnknownMethodException;
/** /**
* Object CUD arangodb service for objects * Object CUD arangodb service for objects
** **
*/ */
public class Object extends Arango { public class Object extends ObjectContext {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Object.class); private static final Logger LOG = LoggerFactory.getLogger(Object.class);
public static final String TYPE = "objects.system.functions.root"; public static final String TYPE = "objects.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public Object(ArangoGraph graph) { public Object() {
super(graph); super(TYPE, TYPE);
} }
@Override @Override
public void invoke(FunctionContext functionContext) throws Exception { public void invoke(FunctionContext functionContext) throws Exception {
Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue()); Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) { switch (message.getMethod()) {
@ -57,8 +57,10 @@ public class Object extends Arango {
} }
private void createChild(FunctionContext functionContext, Core.ObjectMessage message) throws Exception { private void createChild(FunctionContext functionContext, Core.ObjectMessage message) throws Exception {
Result.ReplyResult replyResult = replyResult(functionContext.getFlinkContext());
Functions.FunctionContext pbFunctionContext = Type.CreateObject(message.getType(), message.getName(), Functions.FunctionContext pbFunctionContext = Type.CreateObject(message.getType(), message.getName(),
message.getPayload(), functionContext.getCallback()); message.getPayload(), replyResult);
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext); TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
@ -71,25 +73,18 @@ public class Object extends Arango {
} }
ObjectDocument document = functionContext.getDocument(); ObjectDocument document = functionContext.getDocument();
document.replaceProperties(message.getPayload());
ObjectDocument newDocument = ObjectDocument.deserialize(message.getPayload()); document.setId(functionContext.getFlinkContext().self().id());
document = cmdb.updateObject(functionContext.getFlinkContext(), document);
document.setProperties(newDocument.getProperties());
document.updateMeta();
VertexUpdateEntity vertexUpdateDocument = types.replaceVertex(functionContext.getFlinkContext().self().id(),
document);
LOG.info("updated object " + vertexUpdateDocument.getId());
} }
private void delete(FunctionContext functionContext) throws Exception { private void delete(FunctionContext functionContext) throws Exception {
if (!functionContext.isObject()) { if (!functionContext.isObject()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id()); throw new UnknownIdException(functionContext.getFlinkContext().self().id());
} }
ObjectDocument document = functionContext.getDocument();
document.setId(functionContext.getFlinkContext().self().id());
objects.deleteVertex(functionContext.getFlinkContext().self().id()); cmdb.removeObject(functionContext.getFlinkContext(), document);
LOG.info("deleted object " + functionContext.getFlinkContext().self().id());
} }
} }

View File

@ -0,0 +1,36 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions.object;
import org.apache.flink.statefun.sdk.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.listware.core.FunctionContext;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.functions.Base;
import org.listware.sdk.Functions;
/**
* Middleware for CUD functions
**
*/
public class ObjectContext extends Base {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ObjectContext.class);
public ObjectContext(String groupID, String topic) {
super(groupID, topic);
}
@Override
public void invoke(Context context, Functions.FunctionContext pbFunctionContext) throws Exception {
ObjectDocument document = cmdb.readDocument(context.self().id());
FunctionContext functionContext = new FunctionContext(context, document, pbFunctionContext);
invoke(functionContext);
}
}

View File

@ -1,21 +1,18 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.functions; package org.listware.core.provider.functions.object;
import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.core.FunctionContext;
import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces; import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions; import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core; import org.listware.sdk.pbcmdb.Core;
import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
import org.listware.core.FunctionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.arangodb.ArangoGraph; public class ObjectTrigger extends ObjectContext {
public class ObjectTrigger extends Arango {
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ObjectTrigger.class); private static final Logger LOG = LoggerFactory.getLogger(ObjectTrigger.class);
@ -23,27 +20,20 @@ public class ObjectTrigger extends Arango {
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE); public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public ObjectTrigger(ArangoGraph graph) { public ObjectTrigger() {
super(graph); super(TYPE, TYPE);
} }
@Override @Override
public void invoke(FunctionContext functionContext) throws Exception { public void invoke(FunctionContext functionContext) throws Exception {
Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue()); Core.ObjectMessage message = Core.ObjectMessage.parseFrom(functionContext.getFunctionContext().getValue());
QDSL.Options options = QDSL.Options.newBuilder().setType(true).build(); String typeId = cmdb.getTypeId(functionContext.getFlinkContext().self().id());
Functions.FunctionContext pbFunctionContext = TypeTrigger.Trigger(typeId, message.getMethod());
String query = String.format("%s.objects", functionContext.getFlinkContext().self().id()); TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
QDSL.Elements elements = client.qdsl(query, options); functionContext.getFlinkContext().send(TypeTrigger.FUNCTION_TYPE, pbFunctionContext.getId(), typedValue);
for (QDSL.Element element : elements.getElementsList()) {
Functions.FunctionContext pbFunctionContext = TypeTrigger.Trigger(element.getType(), message.getMethod());
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
functionContext.getFlinkContext().send(TypeTrigger.FUNCTION_TYPE, element.getType(), typedValue);
}
} }
/** /**

View File

@ -0,0 +1,171 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions.object;
import org.apache.flink.statefun.sdk.FunctionType;
import com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.listware.core.FunctionContext;
import org.listware.core.cmdb.Trigger;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.utils.exceptions.UnknownIdException;
import org.listware.core.utils.exceptions.UnknownMethodException;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.Result;
import org.listware.sdk.pbcmdb.Core;
/**
* Type CUD arangodb service for types
**
*/
public class Type extends ObjectContext {
private static final Logger LOG = LoggerFactory.getLogger(Type.class);
public static final String TYPE = "types.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public Type() {
super(TYPE, TYPE);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
// TODO only create from 'types.root' or work with 'type'
if (!functionContext.isType() && !functionContext.isTypes()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue());
switch (message.getMethod()) {
case CREATE:
create(functionContext, message);
break;
case CREATE_CHILD:
createChild(functionContext, message);
break;
case UPDATE:
update(functionContext, message);
break;
case DELETE:
delete(functionContext);
break;
case CREATE_TRIGGER:
createTrigger(functionContext, message);
break;
case DELETE_TRIGGER:
deleteTrigger(functionContext, message);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
}
private void create(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
if (!functionContext.isTypes()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
ObjectDocument document = ObjectDocument.deserialize(message.getPayload());
document.setKey(message.getName());
document = cmdb.createType(document);
}
private void update(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
ObjectDocument document = functionContext.getDocument();
document.replaceProperties(message.getPayload());
document.setId(functionContext.getFlinkContext().self().id());
document = cmdb.updateType(functionContext.getFlinkContext(), document);
}
// TODO delete all objects with type
private void delete(FunctionContext functionContext) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
ObjectDocument document = functionContext.getDocument();
document.setId(functionContext.getFlinkContext().self().id());
cmdb.removeType(functionContext.getFlinkContext(), document);
}
private void createChild(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
ObjectDocument document = ObjectDocument.deserialize(message.getPayload());
ObjectDocument type = functionContext.getDocument();
if (functionContext.getFlinkContext().caller() != null) {
ObjectDocument parent = cmdb.readDocument(functionContext.getFlinkContext().caller().id());
document = cmdb.createObject(functionContext.getFlinkContext(), type, parent, document, message.getName());
} else {
document = cmdb.createObject(functionContext.getFlinkContext(), type, document);
}
}
private void createTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload());
ObjectDocument document = Trigger.add(functionContext.getDocument(), trigger);
document = cmdb.updateType(functionContext.getFlinkContext(), document);
LOG.info("created type trigger " + document.getId());
}
private void deleteTrigger(FunctionContext functionContext, Core.TypeMessage message) throws Exception {
if (!functionContext.isType()) {
throw new UnknownIdException(functionContext.getFlinkContext().self().id());
}
Core.Trigger trigger = Core.Trigger.parseFrom(message.getPayload());
ObjectDocument document = Trigger.delete(functionContext.getDocument(), trigger);
document = cmdb.updateType(functionContext.getFlinkContext(), document);
LOG.info("deleted type trigger " + document.getId());
}
/**
* CreateObject create new object
**
* @param type string
* @param name string
* @param payload ByteString
* @param replyEgress ReplyEgress (optional)
*/
public static Functions.FunctionContext CreateObject(String type, String name, ByteString payload,
Result.ReplyResult replyResult) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(TYPE).build();
Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(Core.Method.CREATE_CHILD).setName(name)
.setPayload(payload).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(type).setValue(typeMessage.toByteString());
if (replyResult != null) {
builder = builder.setReplyResult(replyResult);
}
return builder.build();
}
}

View File

@ -0,0 +1,102 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.functions.object;
import java.util.Map;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.listware.core.FunctionContext;
import org.listware.core.cmdb.Trigger;
import org.listware.core.utils.exceptions.UnknownMethodException;
import org.listware.io.utils.TypedValueDeserializer;
import org.listware.io.utils.Constants.Namespaces;
import org.listware.sdk.Functions;
import org.listware.sdk.pbcmdb.Core;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TypeTrigger extends ObjectContext {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(TypeTrigger.class);
private static final String TYPE = "trigger.types.system.functions.root";
public static final FunctionType FUNCTION_TYPE = new FunctionType(Namespaces.INTERNAL, TYPE);
public TypeTrigger() {
super(TYPE, TYPE);
}
@Override
public void invoke(FunctionContext functionContext) throws Exception {
Core.TypeMessage message = Core.TypeMessage.parseFrom(functionContext.getFunctionContext().getValue());
Map<String, Trigger> triggers = null;
switch (message.getMethod()) {
case CREATE:
triggers = Trigger.getByType(functionContext.getDocument(), Trigger.CREATE);
break;
case UPDATE:
triggers = Trigger.getByType(functionContext.getDocument(), Trigger.UPDATE);
break;
case DELETE:
triggers = Trigger.getByType(functionContext.getDocument(), Trigger.DELETE);
break;
default:
throw new UnknownMethodException(message.getMethod());
}
for (Trigger trigger : triggers.values()) {
Functions.FunctionContext pbFunctionContext = Exec(functionContext.getFlinkContext().caller().id(),
trigger.getNamespace(), trigger.getType(), message.getMethod());
TypedValue typedValue = TypedValueDeserializer.fromMessageLite(pbFunctionContext);
FunctionType functionType = new FunctionType(trigger.getNamespace(), trigger.getType());
functionContext.getFlinkContext().send(functionType, functionContext.getFlinkContext().caller().id(),
typedValue);
}
}
/**
* Trigger trigger
**
* @param id string
* @param method Method
*/
public static Functions.FunctionContext Trigger(String id, Core.Method method) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(Namespaces.INTERNAL)
.setType(TYPE).build();
Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(id).setValue(typeMessage.toByteString());
return builder.build();
}
/**
* Exec function
**
* @param id string
* @param namespace string
* @param type string
* @param method Method
*/
public static Functions.FunctionContext Exec(String id, String namespace, String type, Core.Method method) {
Functions.FunctionType functionType = Functions.FunctionType.newBuilder().setNamespace(namespace).setType(type)
.build();
Core.TypeMessage typeMessage = Core.TypeMessage.newBuilder().setMethod(method).build();
Functions.FunctionContext.Builder builder = Functions.FunctionContext.newBuilder().setFunctionType(functionType)
.setId(id).setValue(typeMessage.toByteString());
return builder.build();
}
}

View File

@ -1,30 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.utils;
public class Cmdb {
// Constant system cmdb keys
public class SystemKeys {
public static final String ROOT = "root";
public static final String OBJECTS = "objects";
public static final String TYPES = "types";
}
// Collection names
public class Collections {
public static final String SYSTEM = "system";
public static final String TYPES = "types";
public static final String OBJECTS = "objects";
public static final String LINKS = "links";
}
// Link entries
public class LinkTypes {
public static final String TYPE = "type";
}
public class Matcher {
public static final String UUID_V4_STRING = "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[89abAB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}";
public static final String NUMERIC_STRING = "\\d+";
}
}

View File

@ -1,21 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.utils;
import java.util.Map;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
public class JsonDeserializer {
public static Map<String, Map<String, Trigger>> triggers(Object from) throws Exception {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.FAIL_ON_SELF_REFERENCES, false);
TypeReference<Map<String, Map<String, Trigger>>> ref = new TypeReference<Map<String, Map<String, Trigger>>>() {
};
return mapper.convertValue(from, ref);
}
}

View File

@ -1,116 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.core.provider.utils;
import java.util.HashMap;
import java.util.Map;
import org.listware.sdk.pbcmdb.Core;
import org.listware.core.documents.ObjectDocument;
import org.listware.core.provider.utils.exceptions.AlreadyTriggerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
public class Trigger {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(Trigger.class);
private static String triggersKey = "triggers";
public Trigger(Core.Trigger trigger) {
this.namespace = trigger.getFunctionType().getNamespace();
this.type = trigger.getFunctionType().getType();
}
@JsonProperty("namespace")
private String namespace;
@JsonProperty("type")
private String type;
/**
*
* @return The namespace
*/
@JsonProperty("namespace")
public String getNamespace() {
return namespace;
}
/**
*
* @return The type
*/
@JsonProperty("type")
public String getType() {
return type;
}
public static Map<String, Trigger> getByType(ObjectDocument baseDocument, String type) throws Exception {
Map<String, java.lang.Object> properties = baseDocument.getProperties();
java.lang.Object object = properties.get(triggersKey);
Map<String, Map<String, Trigger>> triggersMap = JsonDeserializer.triggers(object);
return triggersMap.get(type);
}
public static ObjectDocument add(ObjectDocument baseDocument, Core.Trigger trigger) throws Exception {
String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType();
Map<String, java.lang.Object> properties = baseDocument.getProperties();
Map<String, Map<String, Trigger>> triggersMap;
if (properties.containsKey(triggersKey)) {
java.lang.Object object = properties.get(triggersKey);
triggersMap = JsonDeserializer.triggers(object);
} else {
triggersMap = new HashMap<String, Map<String, Trigger>>();
}
Map<String, Trigger> triggers;
if (triggersMap.containsKey(trigger.getType())) {
triggers = triggersMap.get(trigger.getType());
} else {
triggers = new HashMap<String, Trigger>();
}
if (triggers.containsKey(key)) {
throw new AlreadyTriggerException();
}
triggers.put(key, new Trigger(trigger));
triggersMap.put(trigger.getType(), triggers);
properties.put(triggersKey, triggersMap);
baseDocument.setProperties(properties);
return baseDocument;
}
public static ObjectDocument delete(ObjectDocument baseDocument, Core.Trigger trigger) throws Exception {
Map<String, java.lang.Object> properties = baseDocument.getProperties();
java.lang.Object object = properties.get(triggersKey);
// triggers map
Map<String, Map<String, Trigger>> triggersMap = JsonDeserializer.triggers(object);
// type map
Map<String, Trigger> triggers = triggersMap.get(trigger.getType());
String key = trigger.getFunctionType().getNamespace() + "/" + trigger.getFunctionType().getType();
if (!triggers.containsKey(key)) {
return baseDocument;
}
triggers.remove(key);
triggersMap.put(trigger.getType(), triggers);
properties.put(triggersKey, triggersMap);
baseDocument.setProperties(properties);
return baseDocument;
}
}

View File

@ -0,0 +1,59 @@
/* Copyright 2022 Listware */
package org.listware.core.utils;
import java.util.ArrayList;
import java.util.List;
import org.listware.io.functions.result.EgressReader;
import org.listware.sdk.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ErrorContainer {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ErrorContainer.class);
private List<String> errors = new ArrayList<String>();
private Boolean complete = true;
public ErrorContainer() {
// POJO
}
public Boolean getComplete() {
return complete;
}
public void setComplete(Boolean complete) {
this.complete = complete;
}
public List<String> getErrors() {
return errors;
}
public void setErrors(List<String> errors) {
this.errors = errors;
}
public void append(String error) {
errors.add(error);
complete = false;
}
public void appendAll(List<String> errors) {
for (String error : errors) {
append(error);
}
}
public Result.FunctionResult toFunctionResult(EgressReader.ReplyResult replyEgress) {
Result.FunctionResult.Builder functionResultBuilder = Result.FunctionResult.newBuilder();
functionResultBuilder.setComplete(getComplete());
functionResultBuilder.addAllErrors(getErrors());
Result.FunctionResult functionResult = functionResultBuilder.setReplyEgress(replyEgress.toProto()).build();
return functionResult;
}
}

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
public class AlreadyLinkException extends Exception { public class AlreadyLinkException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
public class AlreadyTriggerException extends Exception { public class AlreadyTriggerException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
public class NoLinkException extends Exception { public class NoLinkException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ -8,4 +8,8 @@ public class NoLinkException extends Exception {
public NoLinkException(String from, String name) { public NoLinkException(String from, String name) {
super(String.format("link %s -> %s not found: ", from, name)); super(String.format("link %s -> %s not found: ", from, name));
} }
public NoLinkException(String from) {
super(String.format("link from %s not found: ", from));
}
} }

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
public class PayloadNotFoundException extends Exception { public class PayloadNotFoundException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
public class TriggerNotFoundException extends Exception { public class TriggerNotFoundException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
public class UnknownIdException extends Exception { public class UnknownIdException extends Exception {

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.core.provider.utils.exceptions; package org.listware.core.utils.exceptions;
import org.listware.sdk.pbcmdb.Core; import org.listware.sdk.pbcmdb.Core;