SUPPORT-8413: comment interaction with ReplyingKafkaTemplate; fix requests
This commit is contained in:
parent
5e69cead4b
commit
ee597574d1
10 changed files with 289 additions and 195 deletions
|
|
@ -1,22 +1,12 @@
|
|||
package ru.micord.ervu.controller;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import ru.micord.ervu.converter.SummonsResponseDataConverter;
|
||||
import ru.micord.ervu.dto.PersonRequestDto;
|
||||
import ru.micord.ervu.dto.SubpoenaDto;
|
||||
import ru.micord.ervu.service.KafkaProducerService;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import ru.micord.ervu.service.ReplyingKafkaService;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import proto.ervu.rp.summons.SummonsResponseData;
|
||||
|
||||
|
|
@ -26,52 +16,30 @@ import proto.ervu.rp.summons.SummonsResponseData;
|
|||
@RestController
|
||||
public class ErvuDataController {
|
||||
|
||||
private final KafkaProducerService kafkaProducerService;
|
||||
private final ConsumerFactory<String, Bytes> consumerFactory;
|
||||
private final ReplyingKafkaService replyingKafkaService;
|
||||
private final SummonsResponseDataConverter converter;
|
||||
|
||||
@Value("${kafka.ervu.recruit.reply.topic}")
|
||||
private String replyTopic;
|
||||
@Value("${kafka.ervu.subpoena.timeout:10}")
|
||||
private int timeout;
|
||||
|
||||
public ErvuDataController(KafkaProducerService kafkaProducerService,
|
||||
ConsumerFactory<String, Bytes> consumerFactory,
|
||||
SummonsResponseDataConverter converter) {
|
||||
this.kafkaProducerService = kafkaProducerService;
|
||||
this.consumerFactory = consumerFactory;
|
||||
public ErvuDataController(ReplyingKafkaService replyingKafkaService,
|
||||
SummonsResponseDataConverter converter) {
|
||||
this.replyingKafkaService = replyingKafkaService;
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
@PostMapping(
|
||||
value = "/get-data",
|
||||
consumes = MediaType.APPLICATION_JSON_VALUE,
|
||||
produces = MediaType.APPLICATION_JSON_VALUE
|
||||
)
|
||||
public SubpoenaDto getData(@RequestBody PersonRequestDto personRequestDto) {
|
||||
// TODO replace on interaction via ReplyingKafkaTemplate
|
||||
// see feature/SUPPORT-8413_fill_recruit_data_sync
|
||||
// temporally designed for demo
|
||||
kafkaProducerService.sendRequest(personRequestDto);
|
||||
AtomicReference<SubpoenaDto> subpoenaDto = new AtomicReference<>();
|
||||
public SubpoenaDto getData() {
|
||||
//TODO get id from token
|
||||
PersonRequestDto personRequestDto = new PersonRequestDto("6fb62081-7345-4a9d-a1d0-68b46bd8faac");
|
||||
byte[] reply = replyingKafkaService.sendRequestAndGetPersonReply(personRequestDto);
|
||||
|
||||
try (Consumer<String, Bytes> consumer =
|
||||
consumerFactory.createConsumer("fl-recruit", null)) {
|
||||
consumer.subscribe(Collections.singletonList(replyTopic));
|
||||
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofSeconds(timeout));
|
||||
consumerRecords.forEach(record -> {
|
||||
|
||||
try {
|
||||
SummonsResponseData responseData = SummonsResponseData.parseFrom(record.value().get());
|
||||
subpoenaDto.set(converter.convert(responseData));
|
||||
}
|
||||
catch (InvalidProtocolBufferException e) {
|
||||
throw new RuntimeException("Failed to parse data", e);
|
||||
}
|
||||
});
|
||||
consumer.commitSync();
|
||||
try {
|
||||
SummonsResponseData responseData = SummonsResponseData.parseFrom(reply);
|
||||
return converter.convert(responseData);
|
||||
}
|
||||
catch (InvalidProtocolBufferException e) {
|
||||
throw new RuntimeException("Failed to parse data", e);
|
||||
}
|
||||
|
||||
return subpoenaDto.get();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.ervu.config;
|
||||
package ru.micord.ervu.kafka;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
|
@ -47,10 +47,10 @@ public class ErvuKafkaConfig {
|
|||
public Map<String, Object> producerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
+ username + "\" password=\"" + password + "\";");
|
||||
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
// props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
// + username + "\" password=\"" + password + "\";");
|
||||
// props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
return props;
|
||||
|
|
@ -70,10 +70,10 @@ public class ErvuKafkaConfig {
|
|||
public ConsumerFactory<String, Bytes> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
+ username + "\" password=\"" + password + "\";");
|
||||
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
// props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
// + username + "\" password=\"" + password + "\";");
|
||||
// props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
package ru.micord.ervu.kafka;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.BytesDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
//@Configuration
|
||||
//@EnableKafka
|
||||
public class ReplyingKafkaConfig {
|
||||
|
||||
@Value("${kafka.ervu.send.url}")
|
||||
private String kafkaUrl;
|
||||
@Value("${kafka.ervu.security.protocol}")
|
||||
private String securityProtocol;
|
||||
@Value("${kafka.ervu.doc.login.module}")
|
||||
private String loginModule;
|
||||
@Value("${kafka.ervu.username}")
|
||||
private String username;
|
||||
@Value("${kafka.ervu.password}")
|
||||
private String password;
|
||||
@Value("${kafka.ervu.sasl.mechanism}")
|
||||
private String saslMechanism;
|
||||
|
||||
@Value("${kafka.ervu.recruit.reply.topic}")
|
||||
private String recruitReplyTopic;
|
||||
@Value("${kafka.ervu.subpoena.extract.reply.topic}")
|
||||
private String subpoenaExtractReplyTopic;
|
||||
@Value("${kafka.ervu.registry.extract.reply.topic}")
|
||||
private String registryExtractReplyTopic;
|
||||
|
||||
@Value("${kafka.ervu.group.id:1}")
|
||||
private String groupId;
|
||||
@Value("${ervu-kafka.reply-timeout:10}")
|
||||
private long replyTimeout;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Object> producerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
|
||||
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
// props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
// + username + "\" password=\"" + password + "\";");
|
||||
// props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(props);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, Object> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, Bytes> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
|
||||
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
// props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
// + username + "\" password=\"" + password + "\";");
|
||||
// props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentMessageListenerContainer<String, Bytes> replyContainer(
|
||||
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory) {
|
||||
ConcurrentMessageListenerContainer<String, Bytes> container = factory.createContainer(
|
||||
recruitReplyTopic, subpoenaExtractReplyTopic, registryExtractReplyTopic);
|
||||
container.getContainerProperties().setGroupId(groupId);
|
||||
return container;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate(
|
||||
ProducerFactory<String, Object> pf,
|
||||
ConcurrentMessageListenerContainer<String, Bytes> container) {
|
||||
ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate =
|
||||
new ReplyingKafkaTemplate<>(pf, container);
|
||||
replyingKafkaTemplate.setCorrelationHeaderName("messageId");
|
||||
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
|
||||
return replyingKafkaTemplate;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,54 +0,0 @@
|
|||
package ru.micord.ervu.service;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.UUID;
|
||||
|
||||
import ru.micord.ervu.dto.PersonRequestDto;
|
||||
import ru.micord.ervu.dto.ExtractRequestDto;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author ya.kuznetsova
|
||||
*/
|
||||
|
||||
@Service
|
||||
public class KafkaProducerService {
|
||||
|
||||
private final KafkaTemplate<String, Object> kafkaTemplate;
|
||||
|
||||
@Value("${kafka.ervu.recruit.request.topic}")
|
||||
private String recruitRequestTopic;
|
||||
@Value("${kafka.ervu.recruit.header.class}")
|
||||
private String recruitHeaderClass;
|
||||
@Value("${kafka.ervu.subpoena.extract.request.topic}")
|
||||
private String subpoenaExtractRequestTopic;
|
||||
@Value("${kafka.ervu.registry.extract.request.topic}")
|
||||
private String registryExtractRequestTopic;
|
||||
@Value("${kafka.ervu.extract.header.class}")
|
||||
private String extractHeaderClass;
|
||||
|
||||
public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
|
||||
this.kafkaTemplate = kafkaTemplate;
|
||||
}
|
||||
|
||||
public void sendRequest(PersonRequestDto request) {
|
||||
sendRequest(request, recruitRequestTopic, recruitHeaderClass);
|
||||
}
|
||||
|
||||
public void sendRequestForExtract(ExtractRequestDto request) {
|
||||
String topic = request.formatExtractRegistry().equals("1")
|
||||
? subpoenaExtractRequestTopic
|
||||
: registryExtractRequestTopic;
|
||||
sendRequest(request, topic, extractHeaderClass);
|
||||
}
|
||||
|
||||
private void sendRequest(Object request, String topic, String headerClass) {
|
||||
ProducerRecord<String, Object> record = new ProducerRecord<>(topic,
|
||||
UUID.randomUUID().toString(), request);
|
||||
record.headers().add("class", headerClass.getBytes(StandardCharsets.UTF_8));
|
||||
kafkaTemplate.send(record);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,109 @@
|
|||
package ru.micord.ervu.service;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
|
||||
import org.springframework.kafka.requestreply.RequestReplyFuture;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import ru.micord.ervu.dto.PersonRequestDto;
|
||||
import ru.micord.ervu.dto.ExtractRequestDto;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* @author ya.kuznetsova
|
||||
*/
|
||||
|
||||
@Service
|
||||
public class ReplyingKafkaService {
|
||||
|
||||
// private final ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate;
|
||||
private final KafkaTemplate<String, Object> kafkaTemplate;
|
||||
private final ConsumerFactory<String, Bytes> consumerFactory;
|
||||
|
||||
@Value("${kafka.ervu.recruit.request.topic}")
|
||||
private String recruitRequestTopic;
|
||||
@Value("${kafka.ervu.recruit.reply.topic}")
|
||||
private String recruitReplyTopic;
|
||||
@Value("${kafka.ervu.recruit.header.class}")
|
||||
private String recruitHeaderClass;
|
||||
|
||||
@Value("${kafka.ervu.subpoena.extract.request.topic}")
|
||||
private String subpoenaExtractRequestTopic;
|
||||
@Value("${kafka.ervu.subpoena.extract.reply.topic}")
|
||||
private String subpoenaExtractReplyTopic;
|
||||
|
||||
@Value("${kafka.ervu.registry.extract.request.topic}")
|
||||
private String registryExtractRequestTopic;
|
||||
@Value("${kafka.ervu.registry.extract.reply.topic}")
|
||||
private String registryExtractReplyTopic;
|
||||
@Value("${kafka.ervu.extract.header.class}")
|
||||
private String extractHeaderClass;
|
||||
|
||||
public ReplyingKafkaService(KafkaTemplate<String, Object> kafkaTemplate,
|
||||
ConsumerFactory<String, Bytes> consumerFactory) {
|
||||
this.kafkaTemplate = kafkaTemplate;
|
||||
this.consumerFactory = consumerFactory;
|
||||
}
|
||||
|
||||
public byte[] sendRequestAndGetPersonReply(PersonRequestDto request) {
|
||||
return sendRequestAndGetPersonReply(request, recruitRequestTopic, recruitReplyTopic, recruitHeaderClass);
|
||||
}
|
||||
|
||||
public byte[] sendRequestAndGetExtractReply(ExtractRequestDto request) {
|
||||
return sendRequestAndGetPersonReply(request,
|
||||
request.formatExtractRegistry().equals("1")
|
||||
? subpoenaExtractRequestTopic
|
||||
: registryExtractRequestTopic,
|
||||
request.formatExtractRegistry().equals("1")
|
||||
? subpoenaExtractReplyTopic
|
||||
: registryExtractReplyTopic,
|
||||
extractHeaderClass);
|
||||
}
|
||||
|
||||
private byte[] sendRequestAndGetPersonReply(Object request, String requestTopic, String replyTopic,
|
||||
String headerClass) {
|
||||
ProducerRecord<String, Object> record = new ProducerRecord<>(requestTopic,
|
||||
UUID.randomUUID().toString(), request);
|
||||
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
|
||||
record.headers().add("class", headerClass.getBytes(StandardCharsets.UTF_8));
|
||||
//TODO fix No pending reply error (replyingKafkaTemplate not working properly with mock service)
|
||||
// RequestReplyFuture<String, Object, Bytes> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
|
||||
//
|
||||
// try {
|
||||
// return Optional.ofNullable(replyFuture.get())
|
||||
// .map(t -> t.value().get())
|
||||
// .orElseThrow(() -> new RuntimeException("Kafka return result is null"));
|
||||
// }
|
||||
// catch (InterruptedException | ExecutionException e) {
|
||||
// throw new RuntimeException("Failed to get kafka response", e);
|
||||
// }
|
||||
record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
|
||||
kafkaTemplate.send(record);
|
||||
AtomicReference<Bytes> bytesRef = new AtomicReference<>(null);
|
||||
|
||||
try (Consumer<String, Bytes> consumer =
|
||||
consumerFactory.createConsumer("1", null)) {
|
||||
consumer.subscribe(Collections.singletonList(replyTopic));
|
||||
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofSeconds(10));
|
||||
consumerRecords.forEach(consumerRecord -> bytesRef.set(consumerRecord.value()));
|
||||
consumer.commitSync();
|
||||
}
|
||||
return Optional.ofNullable(bytesRef.get())
|
||||
.map(Bytes::get)
|
||||
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,22 +1,13 @@
|
|||
package ru.micord.ervu.service.rpc;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import ru.micord.ervu.dto.FileData;
|
||||
import ru.micord.ervu.dto.ExtractRequestDto;
|
||||
import ru.micord.ervu.service.KafkaProducerService;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import ru.micord.ervu.service.ReplyingKafkaService;
|
||||
import rtl.pgs.ervu.proto.ExtractRegistry;
|
||||
import rtl.pgs.ervu.proto.ResponseData;
|
||||
|
||||
import ru.cg.webbpm.modules.standard_annotations.validation.NotNull;
|
||||
import ru.cg.webbpm.modules.webkit.annotations.RpcCall;
|
||||
import ru.cg.webbpm.modules.webkit.annotations.RpcService;
|
||||
import ru.cg.webbpm.modules.webkit.beans.Behavior;
|
||||
|
|
@ -27,49 +18,31 @@ import ru.cg.webbpm.modules.webkit.beans.Behavior;
|
|||
@RpcService
|
||||
public class ExtractRpcService extends Behavior {
|
||||
|
||||
@Value("${kafka.ervu.subpoena.extract.reply.topic}")
|
||||
private String subpoenaExtractReplyTopic;
|
||||
@Value("${kafka.ervu.registry.extract.reply.topic}")
|
||||
private String registryExtractReplyTopic;
|
||||
@Value("${kafka.ervu.extract.timeout:20}")
|
||||
private int timeout;
|
||||
@NotNull()
|
||||
public String formatExtractRegistry;
|
||||
|
||||
private final Consumer<String, Bytes> consumer;
|
||||
private final KafkaProducerService kafkaProducerService;
|
||||
private final ReplyingKafkaService replyingKafkaService;
|
||||
|
||||
@Autowired
|
||||
public ExtractRpcService(ConsumerFactory<String, Bytes> consumerFactory,
|
||||
KafkaProducerService kafkaProducerService) {
|
||||
this.consumer = consumerFactory.createConsumer("fl-extract", null);
|
||||
this.kafkaProducerService = kafkaProducerService;
|
||||
public ExtractRpcService(ReplyingKafkaService replyingKafkaService) {
|
||||
this.replyingKafkaService = replyingKafkaService;
|
||||
}
|
||||
|
||||
@RpcCall
|
||||
public FileData getExtract(ExtractRequestDto request) {
|
||||
kafkaProducerService.sendRequestForExtract(request);
|
||||
AtomicReference<FileData> fileDataRef = new AtomicReference<>();
|
||||
public FileData getExtract() {
|
||||
//TODO get id from token
|
||||
ExtractRequestDto request = new ExtractRequestDto("6fb62081-7345-4a9d-a1d0-68b46bd8faac",
|
||||
formatExtractRegistry);
|
||||
byte[] reply = replyingKafkaService.sendRequestAndGetExtractReply(request);
|
||||
|
||||
String topic = request.formatExtractRegistry().equals("1")
|
||||
? subpoenaExtractReplyTopic
|
||||
: registryExtractReplyTopic;
|
||||
consumer.subscribe(Collections.singletonList(topic));
|
||||
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofSeconds(timeout));
|
||||
consumerRecords.forEach(record -> {
|
||||
|
||||
try {
|
||||
ResponseData responseData = ResponseData.parseFrom(record.value().get());
|
||||
ExtractRegistry extractRegistry = responseData.getDataRegistryInformation()
|
||||
.getExtractRegistry();
|
||||
fileDataRef.set(new FileData(extractRegistry.getFileName(), extractRegistry.getFileType(),
|
||||
extractRegistry.getFile().toByteArray()
|
||||
));
|
||||
}
|
||||
catch (InvalidProtocolBufferException e) {
|
||||
throw new RuntimeException("Failed to parse data", e);
|
||||
}
|
||||
});
|
||||
consumer.commitSync();
|
||||
|
||||
return fileDataRef.get();
|
||||
try {
|
||||
ResponseData responseData = ResponseData.parseFrom(reply);
|
||||
ExtractRegistry extractRegistry = responseData.getDataRegistryInformation()
|
||||
.getExtractRegistry();
|
||||
return new FileData(extractRegistry.getFileName(), extractRegistry.getFileType(),
|
||||
extractRegistry.getFile().toByteArray());
|
||||
}
|
||||
catch (InvalidProtocolBufferException e) {
|
||||
throw new RuntimeException("Failed to parse data", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,17 +3,10 @@ import {
|
|||
AnalyticalScope,
|
||||
Behavior,
|
||||
Event,
|
||||
NotNull,
|
||||
Visible
|
||||
} from "@webbpm/base-package";
|
||||
import {ExtractRpcService} from "../../../generated/ru/micord/ervu/service/rpc/ExtractRpcService";
|
||||
|
||||
// TODO remove; replace REQUEST_JSON with user id from token
|
||||
const REQUEST_JSON = {
|
||||
"ervuId": "6fb62081-7345-4a9d-a1d0-68b46bd8faac",
|
||||
"formatExtractRegistry": "1"
|
||||
}
|
||||
|
||||
@AnalyticalScope(AbstractButton)
|
||||
export class ExtractLoadService extends Behavior {
|
||||
|
||||
|
|
@ -21,8 +14,6 @@ export class ExtractLoadService extends Behavior {
|
|||
public successEvent: Event<boolean> = new Event<boolean>();
|
||||
@Visible("false")
|
||||
public errorEvent: Event<boolean> = new Event<boolean>();
|
||||
@NotNull()
|
||||
public formatExtractRegistry: string;
|
||||
|
||||
private button: AbstractButton;
|
||||
private rpc: ExtractRpcService;
|
||||
|
|
@ -33,8 +24,7 @@ export class ExtractLoadService extends Behavior {
|
|||
this.button = this.getScript(AbstractButton);
|
||||
this.rpc = this.getScript(ExtractRpcService);
|
||||
this.onClickFunction = () => {
|
||||
REQUEST_JSON.formatExtractRegistry = this.formatExtractRegistry;
|
||||
this.rpc.getExtract(REQUEST_JSON)
|
||||
this.rpc.getExtract()
|
||||
.then(fileData => {
|
||||
const newBlob = new Blob([fileData['file']],
|
||||
{ type: fileData['fileType'] });
|
||||
|
|
|
|||
|
|
@ -2,11 +2,6 @@ import {Injectable} from "@angular/core";
|
|||
import {BehaviorSubject} from "rxjs";
|
||||
import {HttpClient} from "@angular/common/http";
|
||||
|
||||
// TODO remove
|
||||
const REQUEST_JSON = {
|
||||
"ervuId": "6fb62081-7345-4a9d-a1d0-68b46bd8faac"
|
||||
}
|
||||
|
||||
@Injectable({providedIn:'root'})
|
||||
export class ErvuDataService {
|
||||
|
||||
|
|
@ -17,8 +12,7 @@ export class ErvuDataService {
|
|||
|
||||
public getData(): any {
|
||||
this.httpClient
|
||||
//TODO replace REQUEST_JSON with user id from token
|
||||
.post("get-data", REQUEST_JSON,
|
||||
.post("get-data", null,
|
||||
{
|
||||
headers: {
|
||||
"Content-type": "application/json"
|
||||
|
|
|
|||
2
pom.xml
2
pom.xml
|
|
@ -16,7 +16,7 @@
|
|||
</scm>
|
||||
<properties>
|
||||
<spring-security-kerberos.version>1.0.1.RELEASE</spring-security-kerberos.version>
|
||||
<spring-kafka.version>2.6.1</spring-kafka.version>
|
||||
<spring-kafka.version>2.6.13</spring-kafka.version>
|
||||
<org.bouncycastle.version>1.60</org.bouncycastle.version>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<enable.version.in.url>false</enable.version.in.url>
|
||||
|
|
|
|||
|
|
@ -1897,14 +1897,6 @@
|
|||
</classRef>
|
||||
<enabled>true</enabled>
|
||||
<expanded>true</expanded>
|
||||
<properties>
|
||||
<entry>
|
||||
<key>formatExtractRegistry</key>
|
||||
<value>
|
||||
<simple>"1"</simple>
|
||||
</value>
|
||||
</entry>
|
||||
</properties>
|
||||
</scripts>
|
||||
<scripts id="3380990f-3be0-442d-af4e-d31c8da7d39e">
|
||||
<classRef type="JAVA">
|
||||
|
|
@ -1913,6 +1905,14 @@
|
|||
</classRef>
|
||||
<enabled>true</enabled>
|
||||
<expanded>true</expanded>
|
||||
<properties>
|
||||
<entry>
|
||||
<key>formatExtractRegistry</key>
|
||||
<value>
|
||||
<simple>"1"</simple>
|
||||
</value>
|
||||
</entry>
|
||||
</properties>
|
||||
</scripts>
|
||||
</children>
|
||||
<children id="d671da65-59d4-4b34-8850-2e3039a502e5">
|
||||
|
|
@ -4043,7 +4043,6 @@
|
|||
<componentRootId>304824d5-9f9f-4af9-9b08-6232f7536774</componentRootId>
|
||||
<name>FS - 1.1.3 (Воинский учёт)</name>
|
||||
<container>true</container>
|
||||
<expanded>false</expanded>
|
||||
<childrenReordered>false</childrenReordered>
|
||||
<scripts id="46f20297-81d1-4786-bb17-2a78ca6fda6f">
|
||||
<properties>
|
||||
|
|
@ -5000,14 +4999,6 @@
|
|||
</classRef>
|
||||
<enabled>true</enabled>
|
||||
<expanded>true</expanded>
|
||||
<properties>
|
||||
<entry>
|
||||
<key>formatExtractRegistry</key>
|
||||
<value>
|
||||
<simple>"2"</simple>
|
||||
</value>
|
||||
</entry>
|
||||
</properties>
|
||||
</scripts>
|
||||
<scripts id="b821375f-f5a9-4a85-8b0a-2fff5e2da2a5">
|
||||
<classRef type="JAVA">
|
||||
|
|
@ -5016,6 +5007,14 @@
|
|||
</classRef>
|
||||
<enabled>true</enabled>
|
||||
<expanded>true</expanded>
|
||||
<properties>
|
||||
<entry>
|
||||
<key>formatExtractRegistry</key>
|
||||
<value>
|
||||
<simple>"2"</simple>
|
||||
</value>
|
||||
</entry>
|
||||
</properties>
|
||||
</scripts>
|
||||
</children>
|
||||
<children id="f10c801c-e47e-4383-87fd-cce707956c3b">
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue