SUPPORT-8413: add kafka config and producer; add ervu data controller; add new properties to standalone

This commit is contained in:
gulnaz 2024-08-15 13:21:45 +03:00
parent 38f2b4eab1
commit 716c6f8c3a
5 changed files with 218 additions and 2 deletions

View file

@ -0,0 +1,89 @@
package ervu_lkrp_fl.ervu_lkrp_fl.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
/**
* @author ya.kuznetsova
*/
@Configuration
@EnableKafka
public class ErvuKafkaConfig {
@Value("${kafka.ervu.send.url}")
private String kafkaUrl;
@Value("${kafka.ervu.security.protocol}")
private String securityProtocol;
@Value("${kafka.ervu.doc.login.module}")
private String loginModule;
@Value("${kafka.ervu.username}")
private String username;
@Value("${kafka.ervu.password}")
private String password;
@Value("${kafka.ervu.sasl.mechanism}")
private String saslMechanism;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
+ username + "\" password=\"" + password + "\";");
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Bytes> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
+ username + "\" password=\"" + password + "\";");
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View file

@ -0,0 +1,73 @@
package ervu_lkrp_fl.ervu_lkrp_fl.controller;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import com.google.protobuf.InvalidProtocolBufferException;
import ervu_lkrp_fl.ervu_lkrp_fl.converter.SummonsResponseDataConverter;
import ervu_lkrp_fl.ervu_lkrp_fl.dto.PersonRequestDto;
import ervu_lkrp_fl.ervu_lkrp_fl.dto.SubpoenaDto;
import ervu_lkrp_fl.ervu_lkrp_fl.service.KafkaProducerService;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import proto.ervu.rp.summons.SummonsResponseData;
/**
* @author gulnaz
*/
@RestController
public class ErvuDataController {
private final KafkaProducerService kafkaProducerService;
private final ConsumerFactory<String, Bytes> consumerFactory;
private final SummonsResponseDataConverter converter;
@Value("${kafka.ervu.recruit.reply.topic}")
private String replyTopic;
public ErvuDataController(KafkaProducerService kafkaProducerService,
ConsumerFactory<String, Bytes> consumerFactory,
SummonsResponseDataConverter converter) {
this.kafkaProducerService = kafkaProducerService;
this.consumerFactory = consumerFactory;
this.converter = converter;
}
@PostMapping(
value = "/get-data",
consumes = MediaType.APPLICATION_JSON_VALUE,
produces = MediaType.APPLICATION_JSON_VALUE
)
public SubpoenaDto getData(@RequestBody PersonRequestDto personRequestDto) {
//TODO replace on interaction via ReplyingKafkaTemplate
kafkaProducerService.sendRequest(personRequestDto);
AtomicReference<SubpoenaDto> subpoenaDto = new AtomicReference<>();
try (Consumer<String, Bytes> consumer =
consumerFactory.createConsumer("fl-recruit", null)) {
consumer.subscribe(Collections.singletonList(replyTopic));
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofSeconds(10));
consumerRecords.forEach(record -> {
try {
SummonsResponseData responseData = SummonsResponseData.parseFrom(record.value().get());
subpoenaDto.set(converter.convert(responseData));
}
catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Failed to parse data", e);
}
});
consumer.commitSync();
}
return subpoenaDto.get();
}
}

View file

@ -0,0 +1,36 @@
package ervu_lkrp_fl.ervu_lkrp_fl.service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import ervu_lkrp_fl.ervu_lkrp_fl.dto.PersonRequestDto;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author ya.kuznetsova
*/
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Value("${kafka.ervu.recruit.request.topic}")
private String requestTopic;
@Value("${kafka.ervu.recruit.header.class}")
private String headerClass;
public KafkaProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendRequest(PersonRequestDto request) {
ProducerRecord<String, Object> record = new ProducerRecord<>(requestTopic,
UUID.randomUUID().toString(), request);
record.headers().add("class", headerClass.getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(record);
}
}

View file

@ -61,4 +61,13 @@ xa-data-source add \
/system-property=webbpm.security.session.active.count:add(value="20") /system-property=webbpm.security.session.active.count:add(value="20")
/system-property=gar.enable:add(value=false) /system-property=gar.enable:add(value=false)
/system-property=security.password.regex:add(value="^((?=(.*\\d){1,})(?=.*[a-zа-яё])(?=.*[A-ZА-ЯЁ]).{8,})$") /system-property=security.password.regex:add(value="^((?=(.*\\d){1,})(?=.*[a-zа-яё])(?=.*[A-ZА-ЯЁ]).{8,})$")
/system-property=fias.enable:add(value=false) /system-property=fias.enable:add(value=false)
/system-property=kafka.ervu.send.url:add(value="http://10.10.31.11:32609")
/system-property=kafka.ervu.security.protocol:add(value="SASL_PLAINTEXT")
/system-property=kafka.ervu.doc.login.module:add(value="org.apache.kafka.common.security.scram.ScramLoginModule")
/system-property=kafka.ervu.sasl.mechanism:add(value="SCRAM-SHA-256")
/system-property=kafka.ervu.username:add(value="user1")
/system-property=kafka.ervu.password:add(value="Blfi9d2OFG")
/system-property=kafka.ervu.recruit.request.topic:add(value="mock.ervu.recruit.info.request")
/system-property=kafka.ervu.recruit.reply.topic:add(value="ervu.recruit.info.response")
/system-property=kafka.ervu.recruit.header.class:add(value="Request@urn://rostelekom.ru/RP-SummonsTR/1.0.5")

View file

@ -54,6 +54,15 @@
<property name="security.password.regex" value="^((?=(.*\d){1,})(?=.*[a-zа-яё])(?=.*[A-ZА-ЯЁ]).{8,})$"/> <property name="security.password.regex" value="^((?=(.*\d){1,})(?=.*[a-zа-яё])(?=.*[A-ZА-ЯЁ]).{8,})$"/>
<property name="fias.enable" value="false"/> <property name="fias.enable" value="false"/>
<property name="com.arjuna.ats.arjuna.allowMultipleLastResources" value="true"/> <property name="com.arjuna.ats.arjuna.allowMultipleLastResources" value="true"/>
<property name="kafka.ervu.send.url" value="http://10.10.31.11:32609"/>
<property name="kafka.ervu.security.protocol" value="SASL_PLAINTEXT"/>
<property name="kafka.ervu.doc.login.module" value="org.apache.kafka.common.security.scram.ScramLoginModule"/>
<property name="kafka.ervu.sasl.mechanism" value="SCRAM-SHA-256"/>
<property name="kafka.ervu.username" value="user1"/>
<property name="kafka.ervu.password" value="Blfi9d2OFG"/>
<property name="kafka.ervu.recruit.request.topic" value="mock.ervu.recruit.info.request"/>
<property name="kafka.ervu.recruit.reply.topic" value="ervu.recruit.info.response"/>
<property name="kafka.ervu.recruit.header.class" value="Request@urn://rostelekom.ru/RP-SummonsTR/1.0.5"/>
</system-properties> </system-properties>
<management> <management>
<audit-log> <audit-log>
@ -557,4 +566,4 @@
<remote-destination host="${jboss.mail.server.host:localhost}" port="${jboss.mail.server.port:25}"/> <remote-destination host="${jboss.mail.server.host:localhost}" port="${jboss.mail.server.port:25}"/>
</outbound-socket-binding> </outbound-socket-binding>
</socket-binding-group> </socket-binding-group>
</server> </server>