SUPPORT-8591: use KafkaTemplate

This commit is contained in:
gulnaz 2024-10-08 17:55:21 +03:00
parent 93ac6c5e37
commit 986aa01830
5 changed files with 155 additions and 68 deletions

View file

@ -8,7 +8,6 @@ import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -79,6 +78,16 @@ public class ReplyingKafkaConfig {
return configProps;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTemplate<String, Object> subpoenaKafkaTemplate() {
return new KafkaTemplate<>(subpoenaProducerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = consumerConfig();
@ -119,46 +128,44 @@ public class ReplyingKafkaConfig {
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<String, String> container =
kafkaListenerContainerFactory.createContainer(replyTopic);
container.getContainerProperties().setGroupId(groupId);
return container;
}
@Bean
public ConcurrentMessageListenerContainer<String, Bytes> subpoenaReplyContainer(
ConcurrentKafkaListenerContainerFactory<String, Bytes> personKafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<String, Bytes> container =
personKafkaListenerContainerFactory.createContainer(recruitReplyTopic,
subpoenaExtractReplyTopic, registryExtractReplyTopic);
container.getContainerProperties().setGroupId(groupId);
return container;
}
@Bean
@Qualifier("person")
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> producerFactory,
ConcurrentMessageListenerContainer<String, String> replyContainer) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
replyingKafkaTemplate.setCorrelationHeaderName("messageId");
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
return replyingKafkaTemplate;
}
@Bean
@Qualifier("subpoena")
public ReplyingKafkaTemplate<String, Object, Bytes> subpoenaReplyingKafkaTemplate(
ProducerFactory<String, Object> personProducerFactory,
ConcurrentMessageListenerContainer<String, Bytes> personReplyContainer) {
ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(personProducerFactory, personReplyContainer);
replyingKafkaTemplate.setCorrelationHeaderName("messageId");
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
return replyingKafkaTemplate;
}
// @Bean
// public ConcurrentMessageListenerContainer<String, String> replyContainer(
// ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {
// ConcurrentMessageListenerContainer<String, String> container =
// kafkaListenerContainerFactory.createContainer(replyTopic);
// container.getContainerProperties().setGroupId(groupId);
// return container;
// }
//
// @Bean
// public ConcurrentMessageListenerContainer<String, Bytes> subpoenaReplyContainer(
// ConcurrentKafkaListenerContainerFactory<String, Bytes> personKafkaListenerContainerFactory) {
// ConcurrentMessageListenerContainer<String, Bytes> container =
// personKafkaListenerContainerFactory.createContainer(recruitReplyTopic,
// subpoenaExtractReplyTopic, registryExtractReplyTopic);
// container.getContainerProperties().setGroupId(groupId);
// return container;
// }
//
// @Bean(name = "person")
// public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
// ProducerFactory<String, String> producerFactory,
// ConcurrentMessageListenerContainer<String, String> replyContainer) {
// ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate =
// new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
// replyingKafkaTemplate.setCorrelationHeaderName("messageId");
// replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
// return replyingKafkaTemplate;
// }
//
// @Bean(name = "subpoena")
// public ReplyingKafkaTemplate<String, Object, Bytes> subpoenaReplyingKafkaTemplate(
// ProducerFactory<String, Object> personProducerFactory,
// ConcurrentMessageListenerContainer<String, Bytes> personReplyContainer) {
// ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate =
// new ReplyingKafkaTemplate<>(personProducerFactory, personReplyContainer);
// replyingKafkaTemplate.setCorrelationHeaderName("messageId");
// replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
// return replyingKafkaTemplate;
// }
}

View file

@ -1,10 +1,21 @@
package ru.micord.ervu.kafka.service.impl;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
@ -14,22 +25,61 @@ import ru.micord.ervu.kafka.service.ReplyingKafkaService;
*/
public abstract class BaseReplyingKafkaService<T, V> implements ReplyingKafkaService<T, V> {
private static final String MESSAGE_ID_HEADER = "messageId";
@Value("${ervu.kafka.group.id}")
private String groupId;
@Value("${ervu.kafka.reply.timeout:30}")
private long replyTimeout;
@Override
public V sendMessageAndGetReply(String requestTopic, String replyTopic, T requestMessage) {
RequestReplyFuture<String, T, V> replyFuture = getTemplate().sendAndReceive(
getProducerRecord(requestTopic, replyTopic, requestMessage));
//TODO fix No pending reply error SUPPORT-8591
// RequestReplyFuture<String, T, V> replyFuture = getTemplate().sendAndReceive(
// getProducerRecord(requestTopic, replyTopic, requestMessage));
//
// try {
// return Optional.ofNullable(replyFuture.get())
// .map(ConsumerRecord::value)
// .orElseThrow(() -> new RuntimeException("Kafka return result is null"));
// }
// catch (InterruptedException | ExecutionException e) {
// throw new RuntimeException("Failed to get kafka response", e);
// }
ProducerRecord<String, T> producerRecord = getProducerRecord(requestTopic, replyTopic,
requestMessage);
String messageId = UUID.randomUUID().toString();
producerRecord.headers().add(MESSAGE_ID_HEADER, messageId.getBytes(StandardCharsets.UTF_8));
getTemplate().send(producerRecord);
AtomicReference<V> responseRef = new AtomicReference<>(null);
try {
return Optional.ofNullable(replyFuture.get())
.map(ConsumerRecord::value)
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to get kafka response", e);
try (Consumer<String, V> consumer =
getConsumerFactory().createConsumer(groupId, null)) {
consumer.subscribe(Collections.singletonList(replyTopic));
ConsumerRecords<String, V> consumerRecords = consumer.poll(Duration.ofSeconds(replyTimeout));
for (ConsumerRecord<String, V> consumerRecord : consumerRecords) {
boolean match = Arrays.stream(consumerRecord.headers().toArray())
.anyMatch(header -> header.key().equals(MESSAGE_ID_HEADER)
&& messageId.equals(
new String(header.value(), StandardCharsets.UTF_8)));
if (match) {
responseRef.set(consumerRecord.value());
break;
}
}
consumer.commitSync();
}
return Optional.ofNullable(responseRef.get())
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
}
protected abstract ReplyingKafkaTemplate<String, T, V> getTemplate();
// protected abstract ReplyingKafkaTemplate<String, T, V> getTemplate();
protected abstract KafkaTemplate<String, T> getTemplate();
protected abstract ConsumerFactory<String, V> getConsumerFactory();
protected abstract ProducerRecord<String, T> getProducerRecord(String requestTopic,
String replyTopic, T requestMessage);

View file

@ -3,7 +3,8 @@ package ru.micord.ervu.kafka.service.impl;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
@ -14,16 +15,30 @@ import org.springframework.stereotype.Service;
@Qualifier("person")
public class PersonReplyingKafkaService extends BaseReplyingKafkaService<String, String> {
private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
// private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
private final ConsumerFactory<String, String> consumerFactory;
public PersonReplyingKafkaService(@Qualifier("person")
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate) {
this.replyingKafkaTemplate = replyingKafkaTemplate;
public PersonReplyingKafkaService(KafkaTemplate<String, String> kafkaTemplate,
ConsumerFactory<String, String> consumerFactory) {
this.kafkaTemplate = kafkaTemplate;
this.consumerFactory = consumerFactory;
}
// @Override
// protected ReplyingKafkaTemplate<String, String, String> getTemplate() {
// return replyingKafkaTemplate;
// }
@Override
protected KafkaTemplate<String, String> getTemplate() {
return kafkaTemplate;
}
@Override
protected ReplyingKafkaTemplate<String, String, String> getTemplate() {
return replyingKafkaTemplate;
protected ConsumerFactory<String, String> getConsumerFactory() {
return consumerFactory;
}
@Override

View file

@ -8,7 +8,8 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
import ru.micord.ervu.dto.SubpoenaRequestDto;
@ -20,21 +21,35 @@ import ru.micord.ervu.dto.SubpoenaRequestDto;
@Qualifier("subpoena")
public class SubpoenaReplyingKafkaService extends BaseReplyingKafkaService<Object, Bytes> {
private final ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate;
// private final ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate;
private final KafkaTemplate<String, Object> subpoenaKafkaTemplate;
private final ConsumerFactory<String, Bytes> subpoenaConsumerFactory;
@Value("${ervu.kafka.recruit.header.class}")
private String recruitHeaderClass;
@Value("${ervu.kafka.extract.header.class}")
private String extractHeaderClass;
public SubpoenaReplyingKafkaService(@Qualifier("subpoena")
ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate) {
this.replyingKafkaTemplate = replyingKafkaTemplate;
public SubpoenaReplyingKafkaService(KafkaTemplate<String, Object> subpoenaKafkaTemplate,
ConsumerFactory<String, Bytes> subpoenaConsumerFactory) {
this.subpoenaKafkaTemplate = subpoenaKafkaTemplate;
this.subpoenaConsumerFactory = subpoenaConsumerFactory;
}
// @Override
// protected ReplyingKafkaTemplate<String, Object, Bytes> getTemplate() {
// return replyingKafkaTemplate;
// }
@Override
protected KafkaTemplate<String, Object> getTemplate() {
return subpoenaKafkaTemplate;
}
@Override
protected ReplyingKafkaTemplate<String, Object, Bytes> getTemplate() {
return replyingKafkaTemplate;
protected ConsumerFactory<String, Bytes> getConsumerFactory() {
return subpoenaConsumerFactory;
}
@Override