From 716c6f8c3a0a064a43cbd7112daec28a930fc847 Mon Sep 17 00:00:00 2001 From: gulnaz Date: Thu, 15 Aug 2024 13:21:45 +0300 Subject: [PATCH] SUPPORT-8413: add kafka config and producer; add ervu data controller; add new properties to standalone --- .../ervu_lkrp_fl/config/ErvuKafkaConfig.java | 89 +++++++++++++++++++ .../controller/ErvuDataController.java | 73 +++++++++++++++ .../service/KafkaProducerService.java | 36 ++++++++ config/patches/default.cli | 11 ++- config/standalone/dev/standalone.xml | 11 ++- 5 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/config/ErvuKafkaConfig.java create mode 100644 backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/controller/ErvuDataController.java create mode 100644 backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/service/KafkaProducerService.java diff --git a/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/config/ErvuKafkaConfig.java b/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/config/ErvuKafkaConfig.java new file mode 100644 index 0000000..9a0dffc --- /dev/null +++ b/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/config/ErvuKafkaConfig.java @@ -0,0 +1,89 @@ +package ervu_lkrp_fl.ervu_lkrp_fl.config; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +/** + * @author ya.kuznetsova + */ +@Configuration +@EnableKafka +public class ErvuKafkaConfig { + + @Value("${kafka.ervu.send.url}") + private String kafkaUrl; + @Value("${kafka.ervu.security.protocol}") + private String securityProtocol; + @Value("${kafka.ervu.doc.login.module}") + private String loginModule; + @Value("${kafka.ervu.username}") + private String username; + @Value("${kafka.ervu.password}") + private String password; + @Value("${kafka.ervu.sasl.mechanism}") + private String saslMechanism; + + @Bean + public Map producerConfigs() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\"" + + username + "\" password=\"" + password + "\";"); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return props; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\"" + + username + "\" password=\"" + password + "\";"); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/controller/ErvuDataController.java b/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/controller/ErvuDataController.java new file mode 100644 index 0000000..384f966 --- /dev/null +++ b/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/controller/ErvuDataController.java @@ -0,0 +1,73 @@ +package ervu_lkrp_fl.ervu_lkrp_fl.controller; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.protobuf.InvalidProtocolBufferException; +import ervu_lkrp_fl.ervu_lkrp_fl.converter.SummonsResponseDataConverter; +import ervu_lkrp_fl.ervu_lkrp_fl.dto.PersonRequestDto; +import ervu_lkrp_fl.ervu_lkrp_fl.dto.SubpoenaDto; +import ervu_lkrp_fl.ervu_lkrp_fl.service.KafkaProducerService; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.utils.Bytes; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import proto.ervu.rp.summons.SummonsResponseData; + +/** + * @author gulnaz + */ +@RestController +public class ErvuDataController { + + private final KafkaProducerService kafkaProducerService; + private final ConsumerFactory consumerFactory; + private final SummonsResponseDataConverter converter; + + @Value("${kafka.ervu.recruit.reply.topic}") + private String replyTopic; + + public ErvuDataController(KafkaProducerService kafkaProducerService, + ConsumerFactory consumerFactory, + SummonsResponseDataConverter converter) { + this.kafkaProducerService = kafkaProducerService; + this.consumerFactory = consumerFactory; + this.converter = converter; + } + + @PostMapping( + value = "/get-data", + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = MediaType.APPLICATION_JSON_VALUE + ) + public SubpoenaDto getData(@RequestBody PersonRequestDto personRequestDto) { + //TODO replace on interaction via ReplyingKafkaTemplate + kafkaProducerService.sendRequest(personRequestDto); + AtomicReference subpoenaDto = new AtomicReference<>(); + + try (Consumer consumer = + consumerFactory.createConsumer("fl-recruit", null)) { + consumer.subscribe(Collections.singletonList(replyTopic)); + ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(10)); + consumerRecords.forEach(record -> { + + try { + SummonsResponseData responseData = SummonsResponseData.parseFrom(record.value().get()); + subpoenaDto.set(converter.convert(responseData)); + } + catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Failed to parse data", e); + } + }); + consumer.commitSync(); + } + + return subpoenaDto.get(); + } +} diff --git a/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/service/KafkaProducerService.java b/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/service/KafkaProducerService.java new file mode 100644 index 0000000..6b045ff --- /dev/null +++ b/backend/src/main/java/ervu_lkrp_fl/ervu_lkrp_fl/service/KafkaProducerService.java @@ -0,0 +1,36 @@ +package ervu_lkrp_fl.ervu_lkrp_fl.service; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import ervu_lkrp_fl.ervu_lkrp_fl.dto.PersonRequestDto; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +/** + * @author ya.kuznetsova + */ + +@Service +public class KafkaProducerService { + + private final KafkaTemplate kafkaTemplate; + + @Value("${kafka.ervu.recruit.request.topic}") + private String requestTopic; + @Value("${kafka.ervu.recruit.header.class}") + private String headerClass; + + public KafkaProducerService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void sendRequest(PersonRequestDto request) { + ProducerRecord record = new ProducerRecord<>(requestTopic, + UUID.randomUUID().toString(), request); + record.headers().add("class", headerClass.getBytes(StandardCharsets.UTF_8)); + kafkaTemplate.send(record); + } +} diff --git a/config/patches/default.cli b/config/patches/default.cli index a965cb0..8cb8a26 100644 --- a/config/patches/default.cli +++ b/config/patches/default.cli @@ -61,4 +61,13 @@ xa-data-source add \ /system-property=webbpm.security.session.active.count:add(value="20") /system-property=gar.enable:add(value=false) /system-property=security.password.regex:add(value="^((?=(.*\\d){1,})(?=.*[a-zа-яё])(?=.*[A-ZА-ЯЁ]).{8,})$") -/system-property=fias.enable:add(value=false) \ No newline at end of file +/system-property=fias.enable:add(value=false) +/system-property=kafka.ervu.send.url:add(value="http://10.10.31.11:32609") +/system-property=kafka.ervu.security.protocol:add(value="SASL_PLAINTEXT") +/system-property=kafka.ervu.doc.login.module:add(value="org.apache.kafka.common.security.scram.ScramLoginModule") +/system-property=kafka.ervu.sasl.mechanism:add(value="SCRAM-SHA-256") +/system-property=kafka.ervu.username:add(value="user1") +/system-property=kafka.ervu.password:add(value="Blfi9d2OFG") +/system-property=kafka.ervu.recruit.request.topic:add(value="mock.ervu.recruit.info.request") +/system-property=kafka.ervu.recruit.reply.topic:add(value="ervu.recruit.info.response") +/system-property=kafka.ervu.recruit.header.class:add(value="Request@urn://rostelekom.ru/RP-SummonsTR/1.0.5") diff --git a/config/standalone/dev/standalone.xml b/config/standalone/dev/standalone.xml index e3b7d41..8d80bdd 100644 --- a/config/standalone/dev/standalone.xml +++ b/config/standalone/dev/standalone.xml @@ -54,6 +54,15 @@ + + + + + + + + + @@ -557,4 +566,4 @@ - \ No newline at end of file +