Add Finder service

This commit is contained in:
fg-admin 2023-01-25 11:48:21 +03:00
parent 29a0525b59
commit eb4d523751
13 changed files with 302 additions and 136 deletions

View File

@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>org.listware</groupId> <groupId>org.listware</groupId>
<artifactId>io</artifactId> <artifactId>io</artifactId>
<version>1.0</version> <version>1.1</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
@ -44,7 +44,7 @@
<dependency> <dependency>
<groupId>org.listware</groupId> <groupId>org.listware</groupId>
<artifactId>proto</artifactId> <artifactId>proto</artifactId>
<version>1.0</version> <version>1.1</version>
</dependency> </dependency>
</dependencies> </dependencies>
@ -93,4 +93,4 @@
<url>http://git.fg-tech.ru/api/packages/listware/maven</url> <url>http://git.fg-tech.ru/api/packages/listware/maven</url>
</snapshotRepository> </snapshotRepository>
</distributionManagement> </distributionManagement>
</project> </project>

View File

@ -5,7 +5,7 @@ package org.listware.io;
import java.util.Map; import java.util.Map;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.listware.io.functions.egress.Egress; import org.listware.io.functions.result.Egress;
import org.listware.io.router.IngressRouter; import org.listware.io.router.IngressRouter;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;

View File

@ -1,100 +0,0 @@
/* Copyright 2022 Listware */
package org.listware.io.functions.egress;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.listware.sdk.Functions;
import org.listware.io.utils.Constants;
import org.listware.io.utils.KafkaTypedValueDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EgressReader {
@SuppressWarnings("unused")
private final Logger LOG = LoggerFactory.getLogger(EgressReader.class);
private Properties properties = new Properties();
private KafkaConsumer<String, TypedValue> consumer;
private String topic;
private Duration duration = Duration.ofSeconds(5);
public EgressReader(String groupID, String topic) {
this.topic = topic;
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.Kafka.SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaTypedValueDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumer = new KafkaConsumer<String, TypedValue>(properties);
TopicPartition tp = new TopicPartition(topic, 0);
List<TopicPartition> tps = Arrays.asList(tp);
consumer.assign(tps);
consumer.seekToEnd(tps);
}
public void wait(String key) throws Exception {
long startTime = System.currentTimeMillis();
LOG.info("ReplyEgress wait: " + key);
for (;;) {
long endTime = System.currentTimeMillis();
if (endTime - startTime >= duration.getSeconds())
break;
ConsumerRecords<String, TypedValue> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, TypedValue> record : records) {
if (record.key().equals(key)) {
TypedValue typedValue = record.value();
Functions.FunctionResult functionResult = Functions.FunctionResult
.parseFrom(typedValue.toByteArray());
if (!functionResult.getComplete()) {
throw new ResultException(functionResult.getError());
}
return;
}
}
}
throw new KeyNotFoundException(key);
}
public Functions.ReplyEgress replyEgress() {
UUID uuid = UUID.randomUUID();
LOG.info("ReplyEgress new: " + uuid.toString());
return Functions.ReplyEgress.newBuilder().setNamespace(Constants.Namespaces.INTERNAL).setTopic(topic)
.setId(uuid.toString()).build();
}
public class KeyNotFoundException extends Exception {
private static final long serialVersionUID = 1L;
public KeyNotFoundException(String key) {
super(String.format("key '%s' not found", key));
}
}
public class ResultException extends Exception {
private static final long serialVersionUID = 1L;
public ResultException(String reason) {
super(reason);
}
}
}

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.io.functions.egress; package org.listware.io.functions.result;
import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec; import org.apache.flink.statefun.sdk.io.EgressSpec;

View File

@ -0,0 +1,144 @@
/* Copyright 2022 Listware */
package org.listware.io.functions.result;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.listware.io.utils.Constants;
import org.listware.io.utils.KafkaTypedValueDeserializer;
import org.listware.sdk.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EgressReader {
@SuppressWarnings("unused")
private final Logger LOG = LoggerFactory.getLogger(EgressReader.class);
private Properties properties = new Properties();
private KafkaConsumer<String, TypedValue> consumer;
private String topic;
// private Duration duration = Duration.ofSeconds(5);
public EgressReader(String groupID, String topic) {
this.topic = topic;
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.Kafka.SERVER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaTypedValueDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumer = new KafkaConsumer<String, TypedValue>(properties);
TopicPartition tp = new TopicPartition(topic, 0);
List<TopicPartition> tps = Arrays.asList(tp);
consumer.assign(tps);
consumer.seekToEnd(tps);
}
public Result.ReplyResult replyResult(String id) {
UUID uuid = UUID.randomUUID();
Result.ReplyResult replyEgress = Result.ReplyResult.newBuilder().setKey(uuid.toString())
.setNamespace(Constants.Namespaces.INTERNAL).setTopic(topic).setId(id).build();
return replyEgress;
}
public static class ReplyResult {
private String key;
private String id;
private String namespace;
private String topic;
private Boolean isEgress;
public ReplyResult() {
// POJO
}
public ReplyResult(Result.ReplyResult replyResult) {
super();
this.id = replyResult.getId();
this.key = replyResult.getKey();
this.namespace = replyResult.getNamespace();
this.topic = replyResult.getTopic();
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Boolean getIsEgress() {
return isEgress;
}
public void setIsEgress(Boolean isEgress) {
this.isEgress = isEgress;
}
public Result.ReplyResult toProto() {
return Result.ReplyResult.newBuilder().setId(id).setNamespace(namespace).setKey(key).setTopic(topic)
.build();
}
public Address toAddress() {
FunctionType functionType = new FunctionType(getNamespace(), getTopic());
Address address = new Address(functionType, getId());
return address;
}
}
public static class KeyNotFoundException extends Exception {
private static final long serialVersionUID = 1L;
public KeyNotFoundException(String key) {
// TODO rename to timeout error
super(String.format("key '%s' not found", key));
}
}
public static class ResultException extends Exception {
private static final long serialVersionUID = 1L;
public ResultException(String reason) {
super(reason);
}
}
}

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.io.functions.egress; package org.listware.io.functions.result;
import java.util.Properties; import java.util.Properties;

View File

@ -0,0 +1,60 @@
/* Copyright 2022 Listware */
package org.listware.io.grpc;
import java.util.concurrent.TimeUnit;
import org.listware.sdk.pbcmdb.Core.Request;
import org.listware.sdk.pbcmdb.Core.Response;
import org.listware.sdk.pbcmdb.EdgeServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
import org.listware.io.utils.Constants.Cmdb;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
// interface over database, in future will be not only 'arangodb'
public class EdgeClient {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(EdgeClient.class);
private final ManagedChannel channel;
private final EdgeServiceGrpc.EdgeServiceBlockingStub blockingStub;
public EdgeClient() {
this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build());
}
public EdgeClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = EdgeServiceGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public Response create(String collection, ByteString payload) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setPayload(payload).build();
return blockingStub.create(request);
}
public Response read(String collection, String key) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setKey(key).build();
return blockingStub.read(request);
}
public Response update(String collection, String key, ByteString payload) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setKey(key).setPayload(payload).build();
return blockingStub.update(request);
}
public Response remove(String collection, String key) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setKey(key).build();
return blockingStub.remove(request);
}
}

View File

@ -0,0 +1,59 @@
package org.listware.io.grpc;
import java.util.concurrent.TimeUnit;
import org.listware.io.utils.Constants.Cmdb;
import org.listware.sdk.pbcmdb.pbfinder.Finder;
import org.listware.sdk.pbcmdb.pbfinder.FinderServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
public class FinderClient {
private final ManagedChannel channel;
private final FinderServiceGrpc.FinderServiceBlockingStub blockingStub;
public FinderClient() {
this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build());
}
public FinderClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = FinderServiceGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public Finder.Response from(String from) throws StatusRuntimeException {
Finder.Request request = Finder.Request.newBuilder().setFrom(from).build();
return blockingStub.links(request);
}
public Finder.Response from(String from, String name) throws StatusRuntimeException {
Finder.Request request = Finder.Request.newBuilder().setFrom(from).setName(name).build();
return blockingStub.links(request);
}
public Finder.Response to(String to) throws StatusRuntimeException {
Finder.Request request = Finder.Request.newBuilder().setTo(to).build();
return blockingStub.links(request);
}
public Finder.Response to(String to, String name) throws StatusRuntimeException {
Finder.Request request = Finder.Request.newBuilder().setTo(to).setName(name).build();
return blockingStub.links(request);
}
public Finder.Response all(String from, String to) throws StatusRuntimeException {
Finder.Request request = Finder.Request.newBuilder().setFrom(from).setTo(to).build();
return blockingStub.links(request);
}
public Finder.Response all(String from, String to, String name) throws StatusRuntimeException {
Finder.Request request = Finder.Request.newBuilder().setFrom(from).setTo(to).setName(name).build();
return blockingStub.links(request);
}
}

View File

@ -1,6 +1,6 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.io.utils; package org.listware.io.grpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
@ -8,16 +8,16 @@ import io.grpc.StatusRuntimeException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.listware.io.utils.Constants.Cmdb;
import org.listware.sdk.pbcmdb.pbqdsl.QDSL; import org.listware.sdk.pbcmdb.pbqdsl.QDSL;
import org.listware.sdk.pbcmdb.pbqdsl.QdslServiceGrpc; import org.listware.sdk.pbcmdb.pbqdsl.QdslServiceGrpc;
import org.listware.io.utils.Constants.Cmdb.Qdsl;
public class QDSLClient { public class QDSLClient {
private final ManagedChannel channel; private final ManagedChannel channel;
private final QdslServiceGrpc.QdslServiceBlockingStub blockingStub; private final QdslServiceGrpc.QdslServiceBlockingStub blockingStub;
public QDSLClient() { public QDSLClient() {
this(ManagedChannelBuilder.forAddress(Qdsl.ADDR, Qdsl.PORT).usePlaintext().build()); this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build());
} }
public QDSLClient(ManagedChannel channel) { public QDSLClient(ManagedChannel channel) {

View File

@ -1,28 +1,32 @@
/* Copyright 2022 Listware */ /* Copyright 2022 Listware */
package org.listware.io.utils; package org.listware.io.grpc;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.listware.sdk.pbcmdb.Core.Request; import org.listware.sdk.pbcmdb.Core.Request;
import org.listware.sdk.pbcmdb.Core.Response; import org.listware.sdk.pbcmdb.Core.Response;
import org.listware.io.utils.Constants.Cmdb.Qdsl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
import org.listware.io.utils.Constants.Cmdb;
import org.listware.sdk.pbcmdb.VertexServiceGrpc; import org.listware.sdk.pbcmdb.VertexServiceGrpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
// interface over database, in future will be not only 'arangodb'
public class VertexClient { public class VertexClient {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(VertexClient.class); private static final Logger LOG = LoggerFactory.getLogger(VertexClient.class);
private final ManagedChannel channel; private final ManagedChannel channel;
private final VertexServiceGrpc.VertexServiceBlockingStub blockingStub; private final VertexServiceGrpc.VertexServiceBlockingStub blockingStub;
public VertexClient() { public VertexClient() {
this(ManagedChannelBuilder.forAddress(Qdsl.ADDR, Qdsl.PORT).usePlaintext().build()); this(ManagedChannelBuilder.forAddress(Cmdb.ADDR, Cmdb.PORT).usePlaintext().build());
} }
public VertexClient(ManagedChannel channel) { public VertexClient(ManagedChannel channel) {
@ -34,14 +38,22 @@ public class VertexClient {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} }
public Response read(String key, String collection) throws StatusRuntimeException { public Response create(String collection, ByteString payload) throws Exception {
LOG.info("read " + key + " from " + collection); Request request = Request.newBuilder().setCollection(collection).setPayload(payload).build();
Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); return blockingStub.create(request);
Response resp = blockingStub.read(request);
return resp;
} }
public Response remove(String key, String collection) throws StatusRuntimeException { public Response read(String collection, String key) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setKey(key).build();
return blockingStub.read(request);
}
public Response update(String collection, String key, ByteString payload) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setKey(key).setPayload(payload).build();
return blockingStub.update(request);
}
public Response remove(String collection, String key) throws Exception {
Request request = Request.newBuilder().setCollection(collection).setKey(key).build(); Request request = Request.newBuilder().setCollection(collection).setKey(key).build();
return blockingStub.remove(request); return blockingStub.remove(request);
} }

View File

@ -9,9 +9,9 @@ import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder; import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.listware.sdk.Functions.FunctionContext;
import org.listware.io.utils.Constants; import org.listware.io.utils.Constants;
import org.listware.io.utils.TypedValueDeserializer; import org.listware.io.utils.TypedValueDeserializer;
import org.listware.sdk.Functions.FunctionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -7,7 +7,7 @@ public class Constants {
// Internal java functions namespace // Internal java functions namespace
public static final String INTERNAL = "system"; public static final String INTERNAL = "system";
// External proxy functions namespace // External proxy functions namespace
public static final String EXTERNAL = "proxy"; public static final String EXTERNAL = "proxy.listware";
} }
public class Kafka { public class Kafka {
@ -15,17 +15,8 @@ public class Constants {
} }
public class Cmdb { public class Cmdb {
// FIXME from secrets public static final String ADDR = "cmdb";
public static final String ADDR = "cmdb.service.consul"; public static final int PORT = 31415;
public static final int PORT = 8529;
public static final String USER = "root";
public static final String PASSWORD = "password";
public static final String DBNAME = "CMDBv2";
public class Qdsl {
public static final String ADDR = "qdsl.cmdb.service.consul";
public static final int PORT = 31415;
}
} }
} }

View File

@ -7,7 +7,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer; import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue; import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.listware.sdk.Functions; import org.listware.sdk.Result;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -21,13 +21,13 @@ public class KafkaEgressTypedValueSerializer implements KafkaEgressSerializer<Ty
@Override @Override
public ProducerRecord<byte[], byte[]> serialize(TypedValue message) { public ProducerRecord<byte[], byte[]> serialize(TypedValue message) {
try { try {
Functions.FunctionResult functionResult = Functions.FunctionResult.parseFrom(message.getValue()); Result.FunctionResult functionResult = Result.FunctionResult.parseFrom(message.getValue());
Functions.ReplyEgress replyEgress = functionResult.getReplyEgress(); Result.ReplyResult replyEgress = functionResult.getReplyEgress();
String topic = replyEgress.getTopic(); String topic = replyEgress.getTopic();
byte[] key = replyEgress.getId().getBytes(StandardCharsets.UTF_8); byte[] key = replyEgress.getKey().getBytes(StandardCharsets.UTF_8);
byte[] value = message.toByteArray(); byte[] value = message.toByteArray();
return new ProducerRecord<byte[], byte[]>(topic, key, value); return new ProducerRecord<byte[], byte[]>(topic, key, value);