Merge branch 'feature/SUPPORT-8591_use_replying_kafka_template' into develop

This commit is contained in:
gulnaz 2024-10-11 11:32:17 +03:00
commit 10b1e7f478
9 changed files with 54 additions and 235 deletions

View file

@ -1,33 +0,0 @@
package ru.micord.ervu.kafka;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @author gulnaz
*/
public class ErvuProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
if (record.topic().startsWith("ervu")) {
record.headers().remove("messageId");
record.headers().add("messageId", UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}

View file

@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -14,11 +15,16 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.CorrelationKey;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Configuration
@EnableKafka
@ -47,8 +53,8 @@ public class ReplyingKafkaConfig {
@Value("${ervu.kafka.sasl.mechanism}")
private String saslMechanism;
@Bean
public ProducerFactory<String, String> ervuProducerFactory() {
@Bean("ervuProducerFactory")
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@ -60,13 +66,8 @@ public class ReplyingKafkaConfig {
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean("ervuTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(ervuProducerFactory());
}
@Bean("ervuConsumerFactory")
public ConsumerFactory<String, String> ervuConsumerFactory() {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@ -79,68 +80,33 @@ public class ReplyingKafkaConfig {
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
@Bean("ervuContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(ervuConsumerFactory());
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// @Bean("excerpt-container")
// public ConcurrentMessageListenerContainer<String, String> excerptReplyContainer(
// ConcurrentKafkaListenerContainerFactory<String, String> factory) {
// ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(
// excerptReplyTopic);
// container.getContainerProperties().setGroupId(groupId);
// return container;
// }
//
// @Bean("excerpt-template")
// public ReplyingKafkaTemplate<String, String, String> excerptReplyingKafkaTemplate(
// @Qualifier("ervu") ProducerFactory<String, String> pf,
// @Qualifier("excerpt-container") ConcurrentMessageListenerContainer<String, String> container) {
// return initReplyingKafkaTemplate(pf, container);
// }
//
// @Bean("org")
// public ConcurrentMessageListenerContainer<String, String> replyContainer(
// ConcurrentKafkaListenerContainerFactory<String, String> factory) {
// ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(
// orgReplyTopic);
// container.getContainerProperties().setGroupId(groupId);
// return container;
// }
//
// @Bean("journal")
// public ConcurrentMessageListenerContainer<String, String> journalReplyContainer(
// ConcurrentKafkaListenerContainerFactory<String, String> factory) {
// ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(
// journalReplyTopic);
// container.getContainerProperties().setGroupId(groupId);
// return container;
// }
//
// @Bean("org")
// public ReplyingKafkaTemplate<String, String, String> orgReplyingKafkaTemplate(
// @Qualifier("ervu") ProducerFactory<String, String> pf,
// @Qualifier("org") ConcurrentMessageListenerContainer<String, String> container) {
// return initReplyingKafkaTemplate(pf, container);
// }
//
// @Bean("journal")
// public ReplyingKafkaTemplate<String, String, String> journalReplyingKafkaTemplate(
// @Qualifier("ervu") ProducerFactory<String, String> pf,
// @Qualifier("journal") ConcurrentMessageListenerContainer<String, String> container) {
// return initReplyingKafkaTemplate(pf, container);
// }
//
// private ReplyingKafkaTemplate<String, String, String> initReplyingKafkaTemplate(
// ProducerFactory<String, String> pf,
// ConcurrentMessageListenerContainer<String, String> container) {
// ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate =
// new ReplyingKafkaTemplate<>(pf, container);
// replyingKafkaTemplate.setCorrelationHeaderName("messageId");
// replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
// return replyingKafkaTemplate;
// }
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
@Qualifier("ervuContainerFactory") ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<String, String> container =
kafkaListenerContainerFactory.createContainer(orgReplyTopic, excerptReplyTopic, journalReplyTopic);
container.getContainerProperties().setGroupId(groupId);
return container;
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
@Qualifier("ervuProducerFactory") ProducerFactory<String, String> producerFactory,
ConcurrentMessageListenerContainer<String, String> replyContainer) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
replyingKafkaTemplate.setCorrelationHeaderName("messageId");
replyingKafkaTemplate.setCorrelationIdStrategy(record ->
new CorrelationKey(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
return replyingKafkaTemplate;
}
}

View file

@ -24,7 +24,6 @@ import ru.micord.ervu.security.webbpm.jwt.service.JwtTokenService;
public class ErvuKafkaController {
@Autowired
// @Qualifier("excerpt-service")
private ReplyingKafkaService replyingKafkaService;
@Autowired

View file

@ -1,22 +1,13 @@
package ru.micord.ervu.kafka.service.impl;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
@ -27,22 +18,11 @@ import ru.micord.ervu.kafka.service.ReplyingKafkaService;
@Service
public class BaseReplyingKafkaServiceImpl implements ReplyingKafkaService {
private static final String MESSAGE_ID_HEADER = "messageId";
// protected abstract ReplyingKafkaTemplate<String, String, String> getReplyingKafkaTemplate()
private final KafkaTemplate<String, String> kafkaTemplate;
private final ConsumerFactory<String, String> consumerFactory;
@Value("${ervu.kafka.group.id}")
private String groupId;
@Value("${ervu.kafka.reply.timeout:30}")
private long replyTimeout;
private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
public BaseReplyingKafkaServiceImpl(
@Qualifier("ervuTemplate") KafkaTemplate<String, String> kafkaTemplate,
@Qualifier("ervuConsumerFactory") ConsumerFactory<String, String> consumerFactory) {
this.kafkaTemplate = kafkaTemplate;
this.consumerFactory = consumerFactory;
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate) {
this.replyingKafkaTemplate = replyingKafkaTemplate;
}
public String sendMessageAndGetReply(String requestTopic,
@ -50,43 +30,15 @@ public class BaseReplyingKafkaServiceImpl implements ReplyingKafkaService {
String requestMessage) {
ProducerRecord<String, String> record = new ProducerRecord<>(requestTopic, requestMessage);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
//TODO fix No pending reply error SUPPORT-8591
// RequestReplyFuture<String, String, String> replyFuture = getReplyingKafkaTemplate()
// .sendAndReceive(record);
//
// try {
// return Optional.ofNullable(replyFuture.get())
// .map(ConsumerRecord::value)
// .orElseThrow(() -> new RuntimeException("Kafka return result is null."));
// }
// catch (InterruptedException | ExecutionException e) {
// throw new RuntimeException("Failed to get kafka response.", e);
// }
RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
String messageId = UUID.randomUUID().toString();
record.headers().add(MESSAGE_ID_HEADER, messageId.getBytes(StandardCharsets.UTF_8));
kafkaTemplate.send(record);
AtomicReference<String> responseRef = new AtomicReference<>(null);
try (Consumer<String, String> consumer =
consumerFactory.createConsumer(groupId, null)) {
consumer.subscribe(Collections.singletonList(replyTopic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(replyTimeout));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
boolean match = Arrays.stream(consumerRecord.headers().toArray())
.anyMatch(header -> header.key().equals(MESSAGE_ID_HEADER)
&& messageId.equals(
new String(header.value(), StandardCharsets.UTF_8)));
if (match) {
responseRef.set(consumerRecord.value());
consumer.commitSync();
break;
}
}
try {
return Optional.ofNullable(replyFuture.get())
.map(ConsumerRecord::value)
.orElseThrow(() -> new RuntimeException("Kafka return result is null."));
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to get kafka response.", e);
}
return Optional.ofNullable(responseRef.get())
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
}
}

View file

@ -1,23 +0,0 @@
package ru.micord.ervu.kafka.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author Eduard Tihomirov
*/
//@Service
//@Qualifier("excerpt-service")
public class ExcerptReplyingKafkaServiceImpl {
// @Autowired
// @Qualifier("excerpt-template")
// private ReplyingKafkaTemplate<String, String, String> excerptReplyingKafkaTemplate;
//
// @Override
// protected ReplyingKafkaTemplate<String, String, String> getReplyingKafkaTemplate() {
// return excerptReplyingKafkaTemplate;
// }
}

View file

@ -1,20 +0,0 @@
package ru.micord.ervu.kafka.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
//@Service
//@Qualifier("journal")
public class JournalReplyingKafkaServiceImpl {
// @Autowired
// @Qualifier("journal")
// private ReplyingKafkaTemplate<String, String, String> journalReplyingKafkaTemplate;
//
// @Override
// protected ReplyingKafkaTemplate<String, String, String> getReplyingKafkaTemplate() {
// return journalReplyingKafkaTemplate;
// }
}

View file

@ -1,20 +0,0 @@
package ru.micord.ervu.kafka.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
//@Service
//@Qualifier("org")
public class OrgReplyingKafkaServiceImpl {
// @Autowired
// @Qualifier("org")
// private ReplyingKafkaTemplate<String, String, String> orgReplyingKafkaTemplate;
//
// @Override
// protected ReplyingKafkaTemplate<String, String, String> getReplyingKafkaTemplate() {
// return orgReplyingKafkaTemplate;
// }
}

View file

@ -66,7 +66,6 @@ public class EsiaAuthService {
private JwtTokenService jwtTokenService;
@Autowired
// @Qualifier("org")
private ReplyingKafkaService replyingKafkaService;
@Autowired

View file

@ -28,11 +28,10 @@ public class JournalInMemoryStaticGridLoadService implements
private final JwtTokenService jwtTokenService;
private final InteractionService interactionService;
private final ReplyingKafkaService ervuReplyingKafkaService;
private final ReplyingKafkaService replyingKafkaService;
private final ObjectMapper objectMapper;
private final HttpServletRequest request;
@Value("${ervu.kafka.journal.request.topic}")
private String requestTopic;
@Value("${ervu.kafka.journal.reply.topic}")
@ -42,11 +41,11 @@ public class JournalInMemoryStaticGridLoadService implements
public JournalInMemoryStaticGridLoadService(JwtTokenService jwtTokenService,
InteractionService interactionService,
ReplyingKafkaService ervuReplyingKafkaService,
ReplyingKafkaService replyingKafkaService,
ObjectMapper objectMapper, HttpServletRequest request) {
this.jwtTokenService = jwtTokenService;
this.interactionService = interactionService;
this.ervuReplyingKafkaService = ervuReplyingKafkaService;
this.replyingKafkaService = replyingKafkaService;
this.objectMapper = objectMapper;
this.request = request;
}
@ -61,7 +60,7 @@ public class JournalInMemoryStaticGridLoadService implements
List<JournalDto> ervuJournalList;
try {
String responseJsonString = ervuReplyingKafkaService.sendMessageAndGetReply(requestTopic,
String responseJsonString = replyingKafkaService.sendMessageAndGetReply(requestTopic,
replyTopic, objectMapper.writeValueAsString(journalFileDataRequest));
JournalFileDataResponse journalFileDataResponse = objectMapper.readValue(responseJsonString,
JournalFileDataResponse.class);