diff --git a/pom.xml b/pom.xml index ffe3ff2..839545b 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.listware io - 1.0 + 1.1 jar @@ -44,7 +44,7 @@ org.listware proto - 1.0 + 1.1 @@ -93,4 +93,4 @@ http://git.fg-tech.ru/api/packages/listware/maven - \ No newline at end of file + diff --git a/src/main/java/org/listware/io/Module.java b/src/main/java/org/listware/io/Module.java index d7b85b8..bde2f9b 100644 --- a/src/main/java/org/listware/io/Module.java +++ b/src/main/java/org/listware/io/Module.java @@ -5,7 +5,7 @@ package org.listware.io; import java.util.Map; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; -import org.listware.io.functions.egress.Egress; +import org.listware.io.functions.result.Egress; import org.listware.io.router.IngressRouter; import com.google.auto.service.AutoService; diff --git a/src/main/java/org/listware/io/functions/egress/EgressReader.java b/src/main/java/org/listware/io/functions/egress/EgressReader.java deleted file mode 100644 index 0d69fc0..0000000 --- a/src/main/java/org/listware/io/functions/egress/EgressReader.java +++ /dev/null @@ -1,100 +0,0 @@ -/* Copyright 2022 Listware */ - -package org.listware.io.functions.egress; - -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.UUID; - -import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.listware.sdk.Functions; -import org.listware.io.utils.Constants; -import org.listware.io.utils.KafkaTypedValueDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EgressReader { - @SuppressWarnings("unused") - private final Logger LOG = LoggerFactory.getLogger(EgressReader.class); - - private Properties properties = new Properties(); - private KafkaConsumer consumer; - private String topic; - private Duration duration = Duration.ofSeconds(5); - - public EgressReader(String groupID, String topic) { - this.topic = topic; - - properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.Kafka.SERVER); - properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - KafkaTypedValueDeserializer.class.getName()); - properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID); - consumer = new KafkaConsumer(properties); - TopicPartition tp = new TopicPartition(topic, 0); - List tps = Arrays.asList(tp); - consumer.assign(tps); - consumer.seekToEnd(tps); - } - - public void wait(String key) throws Exception { - long startTime = System.currentTimeMillis(); - - LOG.info("ReplyEgress wait: " + key); - - for (;;) { - long endTime = System.currentTimeMillis(); - - if (endTime - startTime >= duration.getSeconds()) - break; - - ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); - - for (ConsumerRecord record : records) { - if (record.key().equals(key)) { - TypedValue typedValue = record.value(); - Functions.FunctionResult functionResult = Functions.FunctionResult - .parseFrom(typedValue.toByteArray()); - if (!functionResult.getComplete()) { - throw new ResultException(functionResult.getError()); - } - return; - } - } - } - - throw new KeyNotFoundException(key); - } - - public Functions.ReplyEgress replyEgress() { - UUID uuid = UUID.randomUUID(); - LOG.info("ReplyEgress new: " + uuid.toString()); - return Functions.ReplyEgress.newBuilder().setNamespace(Constants.Namespaces.INTERNAL).setTopic(topic) - .setId(uuid.toString()).build(); - } - - public class KeyNotFoundException extends Exception { - private static final long serialVersionUID = 1L; - - public KeyNotFoundException(String key) { - super(String.format("key '%s' not found", key)); - } - } - - public class ResultException extends Exception { - private static final long serialVersionUID = 1L; - - public ResultException(String reason) { - super(reason); - } - } - -} diff --git a/src/main/java/org/listware/io/functions/egress/Egress.java b/src/main/java/org/listware/io/functions/result/Egress.java similarity index 95% rename from src/main/java/org/listware/io/functions/egress/Egress.java rename to src/main/java/org/listware/io/functions/result/Egress.java index b00ea32..152e64f 100644 --- a/src/main/java/org/listware/io/functions/egress/Egress.java +++ b/src/main/java/org/listware/io/functions/result/Egress.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.io.functions.egress; +package org.listware.io.functions.result; import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.statefun.sdk.io.EgressSpec; diff --git a/src/main/java/org/listware/io/functions/result/EgressReader.java b/src/main/java/org/listware/io/functions/result/EgressReader.java new file mode 100644 index 0000000..2ec1e16 --- /dev/null +++ b/src/main/java/org/listware/io/functions/result/EgressReader.java @@ -0,0 +1,144 @@ +/* Copyright 2022 Listware */ + +package org.listware.io.functions.result; + +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.flink.statefun.sdk.Address; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.listware.io.utils.Constants; +import org.listware.io.utils.KafkaTypedValueDeserializer; +import org.listware.sdk.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EgressReader { + @SuppressWarnings("unused") + private final Logger LOG = LoggerFactory.getLogger(EgressReader.class); + + private Properties properties = new Properties(); + private KafkaConsumer consumer; + private String topic; + // private Duration duration = Duration.ofSeconds(5); + + public EgressReader(String groupID, String topic) { + this.topic = topic; + + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.Kafka.SERVER); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + KafkaTypedValueDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID); + consumer = new KafkaConsumer(properties); + TopicPartition tp = new TopicPartition(topic, 0); + List tps = Arrays.asList(tp); + consumer.assign(tps); + consumer.seekToEnd(tps); + } + + public Result.ReplyResult replyResult(String id) { + UUID uuid = UUID.randomUUID(); + + Result.ReplyResult replyEgress = Result.ReplyResult.newBuilder().setKey(uuid.toString()) + .setNamespace(Constants.Namespaces.INTERNAL).setTopic(topic).setId(id).build(); + + return replyEgress; + } + + public static class ReplyResult { + private String key; + private String id; + private String namespace; + private String topic; + private Boolean isEgress; + + public ReplyResult() { + // POJO + } + + public ReplyResult(Result.ReplyResult replyResult) { + super(); + this.id = replyResult.getId(); + this.key = replyResult.getKey(); + this.namespace = replyResult.getNamespace(); + this.topic = replyResult.getTopic(); + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Boolean getIsEgress() { + return isEgress; + } + + public void setIsEgress(Boolean isEgress) { + this.isEgress = isEgress; + } + + public Result.ReplyResult toProto() { + return Result.ReplyResult.newBuilder().setId(id).setNamespace(namespace).setKey(key).setTopic(topic) + .build(); + } + + public Address toAddress() { + FunctionType functionType = new FunctionType(getNamespace(), getTopic()); + Address address = new Address(functionType, getId()); + return address; + } + } + + public static class KeyNotFoundException extends Exception { + private static final long serialVersionUID = 1L; + + public KeyNotFoundException(String key) { + // TODO rename to timeout error + super(String.format("key '%s' not found", key)); + } + } + + public static class ResultException extends Exception { + private static final long serialVersionUID = 1L; + + public ResultException(String reason) { + super(reason); + } + } + +} diff --git a/src/main/java/org/listware/io/functions/egress/EgressWriter.java b/src/main/java/org/listware/io/functions/result/EgressWriter.java similarity index 97% rename from src/main/java/org/listware/io/functions/egress/EgressWriter.java rename to src/main/java/org/listware/io/functions/result/EgressWriter.java index 47ba4e4..fc970e3 100644 --- a/src/main/java/org/listware/io/functions/egress/EgressWriter.java +++ b/src/main/java/org/listware/io/functions/result/EgressWriter.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.io.functions.egress; +package org.listware.io.functions.result; import java.util.Properties; diff --git a/src/main/java/org/listware/io/grpc/EdgeClient.java b/src/main/java/org/listware/io/grpc/EdgeClient.java new file mode 100644 index 0000000..37f0caf --- /dev/null +++ b/src/main/java/org/listware/io/grpc/EdgeClient.java @@ -0,0 +1,60 @@ +/* Copyright 2022 Listware */ + +package org.listware.io.grpc; + +import java.util.concurrent.TimeUnit; + +import org.listware.sdk.pbcmdb.Core.Request; +import org.listware.sdk.pbcmdb.Core.Response; +import org.listware.sdk.pbcmdb.EdgeServiceGrpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +import org.listware.io.utils.Constants.Cmdb; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +// interface over database, in future will be not only 'arangodb' +public class EdgeClient { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(EdgeClient.class); + + private final ManagedChannel channel; + private final EdgeServiceGrpc.EdgeServiceBlockingStub blockingStub; + + public EdgeClient() { + this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build()); + } + + public EdgeClient(ManagedChannel channel) { + this.channel = channel; + blockingStub = EdgeServiceGrpc.newBlockingStub(channel); + } + + public void shutdown() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + + public Response create(String collection, ByteString payload) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setPayload(payload).build(); + return blockingStub.create(request); + } + + public Response read(String collection, String key) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); + return blockingStub.read(request); + } + + public Response update(String collection, String key, ByteString payload) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setKey(key).setPayload(payload).build(); + return blockingStub.update(request); + } + + public Response remove(String collection, String key) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); + return blockingStub.remove(request); + } +} diff --git a/src/main/java/org/listware/io/grpc/FinderClient.java b/src/main/java/org/listware/io/grpc/FinderClient.java new file mode 100644 index 0000000..9c409fd --- /dev/null +++ b/src/main/java/org/listware/io/grpc/FinderClient.java @@ -0,0 +1,59 @@ +package org.listware.io.grpc; + +import java.util.concurrent.TimeUnit; + +import org.listware.io.utils.Constants.Cmdb; +import org.listware.sdk.pbcmdb.pbfinder.Finder; +import org.listware.sdk.pbcmdb.pbfinder.FinderServiceGrpc; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; + +public class FinderClient { + private final ManagedChannel channel; + private final FinderServiceGrpc.FinderServiceBlockingStub blockingStub; + + public FinderClient() { + this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build()); + } + + public FinderClient(ManagedChannel channel) { + this.channel = channel; + blockingStub = FinderServiceGrpc.newBlockingStub(channel); + } + + public void shutdown() throws InterruptedException { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + + public Finder.Response from(String from) throws StatusRuntimeException { + Finder.Request request = Finder.Request.newBuilder().setFrom(from).build(); + return blockingStub.links(request); + } + + public Finder.Response from(String from, String name) throws StatusRuntimeException { + Finder.Request request = Finder.Request.newBuilder().setFrom(from).setName(name).build(); + return blockingStub.links(request); + } + + public Finder.Response to(String to) throws StatusRuntimeException { + Finder.Request request = Finder.Request.newBuilder().setTo(to).build(); + return blockingStub.links(request); + } + + public Finder.Response to(String to, String name) throws StatusRuntimeException { + Finder.Request request = Finder.Request.newBuilder().setTo(to).setName(name).build(); + return blockingStub.links(request); + } + + public Finder.Response all(String from, String to) throws StatusRuntimeException { + Finder.Request request = Finder.Request.newBuilder().setFrom(from).setTo(to).build(); + return blockingStub.links(request); + } + + public Finder.Response all(String from, String to, String name) throws StatusRuntimeException { + Finder.Request request = Finder.Request.newBuilder().setFrom(from).setTo(to).setName(name).build(); + return blockingStub.links(request); + } +} diff --git a/src/main/java/org/listware/io/utils/QDSLClient.java b/src/main/java/org/listware/io/grpc/QDSLClient.java similarity index 86% rename from src/main/java/org/listware/io/utils/QDSLClient.java rename to src/main/java/org/listware/io/grpc/QDSLClient.java index e49e766..c779f61 100644 --- a/src/main/java/org/listware/io/utils/QDSLClient.java +++ b/src/main/java/org/listware/io/grpc/QDSLClient.java @@ -1,6 +1,6 @@ /* Copyright 2022 Listware */ -package org.listware.io.utils; +package org.listware.io.grpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -8,16 +8,16 @@ import io.grpc.StatusRuntimeException; import java.util.concurrent.TimeUnit; +import org.listware.io.utils.Constants.Cmdb; import org.listware.sdk.pbcmdb.pbqdsl.QDSL; import org.listware.sdk.pbcmdb.pbqdsl.QdslServiceGrpc; -import org.listware.io.utils.Constants.Cmdb.Qdsl; public class QDSLClient { private final ManagedChannel channel; private final QdslServiceGrpc.QdslServiceBlockingStub blockingStub; public QDSLClient() { - this(ManagedChannelBuilder.forAddress(Qdsl.ADDR, Qdsl.PORT).usePlaintext().build()); + this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build()); } public QDSLClient(ManagedChannel channel) { diff --git a/src/main/java/org/listware/io/utils/VertexClient.java b/src/main/java/org/listware/io/grpc/VertexClient.java similarity index 54% rename from src/main/java/org/listware/io/utils/VertexClient.java rename to src/main/java/org/listware/io/grpc/VertexClient.java index 5b11b80..f9d3bb7 100644 --- a/src/main/java/org/listware/io/utils/VertexClient.java +++ b/src/main/java/org/listware/io/grpc/VertexClient.java @@ -1,28 +1,32 @@ /* Copyright 2022 Listware */ -package org.listware.io.utils; +package org.listware.io.grpc; import java.util.concurrent.TimeUnit; import org.listware.sdk.pbcmdb.Core.Request; import org.listware.sdk.pbcmdb.Core.Response; -import org.listware.io.utils.Constants.Cmdb.Qdsl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.protobuf.ByteString; + +import org.listware.io.utils.Constants.Cmdb; import org.listware.sdk.pbcmdb.VertexServiceGrpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.StatusRuntimeException; +// interface over database, in future will be not only 'arangodb' public class VertexClient { + @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(VertexClient.class); private final ManagedChannel channel; private final VertexServiceGrpc.VertexServiceBlockingStub blockingStub; public VertexClient() { - this(ManagedChannelBuilder.forAddress(Qdsl.ADDR, Qdsl.PORT).usePlaintext().build()); + this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build()); } public VertexClient(ManagedChannel channel) { @@ -34,14 +38,22 @@ public class VertexClient { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } - public Response read(String key, String collection) throws StatusRuntimeException { - LOG.info("read " + key + " from " + collection); - Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); - Response resp = blockingStub.read(request); - return resp; + public Response create(String collection, ByteString payload) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setPayload(payload).build(); + return blockingStub.create(request); } - public Response remove(String key, String collection) throws StatusRuntimeException { + public Response read(String collection, String key) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); + return blockingStub.read(request); + } + + public Response update(String collection, String key, ByteString payload) throws Exception { + Request request = Request.newBuilder().setCollection(collection).setKey(key).setPayload(payload).build(); + return blockingStub.update(request); + } + + public Response remove(String collection, String key) throws Exception { Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); return blockingStub.remove(request); } diff --git a/src/main/java/org/listware/io/router/IngressRouter.java b/src/main/java/org/listware/io/router/IngressRouter.java index 7a6ba7e..5c7839d 100644 --- a/src/main/java/org/listware/io/router/IngressRouter.java +++ b/src/main/java/org/listware/io/router/IngressRouter.java @@ -9,9 +9,9 @@ import org.apache.flink.statefun.sdk.io.Router; import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.listware.sdk.Functions.FunctionContext; import org.listware.io.utils.Constants; import org.listware.io.utils.TypedValueDeserializer; +import org.listware.sdk.Functions.FunctionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/listware/io/utils/Constants.java b/src/main/java/org/listware/io/utils/Constants.java index 11ea8d0..5321ba9 100644 --- a/src/main/java/org/listware/io/utils/Constants.java +++ b/src/main/java/org/listware/io/utils/Constants.java @@ -7,7 +7,7 @@ public class Constants { // Internal java functions namespace public static final String INTERNAL = "system"; // External proxy functions namespace - public static final String EXTERNAL = "proxy"; + public static final String EXTERNAL = "proxy.listware"; } public class Kafka { @@ -15,17 +15,8 @@ public class Constants { } public class Cmdb { - // FIXME from secrets - public static final String ADDR = "cmdb.service.consul"; - public static final int PORT = 8529; - public static final String USER = "root"; - public static final String PASSWORD = "password"; - public static final String DBNAME = "CMDBv2"; - - public class Qdsl { - public static final String ADDR = "qdsl.cmdb.service.consul"; - public static final int PORT = 31415; - } + public static final String ADDR = "cmdb"; + public static final int PORT = 31415; } } diff --git a/src/main/java/org/listware/io/utils/KafkaEgressTypedValueSerializer.java b/src/main/java/org/listware/io/utils/KafkaEgressTypedValueSerializer.java index bd8dbfa..2c61203 100644 --- a/src/main/java/org/listware/io/utils/KafkaEgressTypedValueSerializer.java +++ b/src/main/java/org/listware/io/utils/KafkaEgressTypedValueSerializer.java @@ -7,7 +7,7 @@ import java.nio.charset.StandardCharsets; import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.kafka.clients.producer.ProducerRecord; -import org.listware.sdk.Functions; +import org.listware.sdk.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,13 +21,13 @@ public class KafkaEgressTypedValueSerializer implements KafkaEgressSerializer serialize(TypedValue message) { try { - Functions.FunctionResult functionResult = Functions.FunctionResult.parseFrom(message.getValue()); + Result.FunctionResult functionResult = Result.FunctionResult.parseFrom(message.getValue()); - Functions.ReplyEgress replyEgress = functionResult.getReplyEgress(); + Result.ReplyResult replyEgress = functionResult.getReplyEgress(); String topic = replyEgress.getTopic(); - byte[] key = replyEgress.getId().getBytes(StandardCharsets.UTF_8); + byte[] key = replyEgress.getKey().getBytes(StandardCharsets.UTF_8); byte[] value = message.toByteArray(); return new ProducerRecord(topic, key, value);