listen and pass messageId

This commit is contained in:
gulnaz 2024-10-02 14:33:02 +03:00
parent ca10109318
commit 8c314f651c

View file

@ -3,6 +3,7 @@ package ru.micord.ervu.av.service;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@ -25,6 +26,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -36,6 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
@ -84,7 +87,8 @@ public class FileUploadService {
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment,
@Header("messageId") String messageId) {
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
try {
@ -115,16 +119,16 @@ public class FileUploadService {
if (infected || !clean) {
downloadRequest.fileInfo().setFileUrl(null);
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
sendMessage(outErrorTopic.name(), downloadRequest, kafkaTemplate);
sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate);
}
else {
String fileRef = s3Service.putFile(filePath, fileUrl.fileName());
downloadRequest.fileInfo().setFileUrl(fileRef);
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
sendMessage(outSuccessTopic.name(), downloadRequest, kafkaTemplate);
sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate);
}
sendMessage(inStatusTopic.name(), downloadRequest, inKafkaTemplate);
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
deleteFile(downloadUrl);
@ -155,13 +159,14 @@ public class FileUploadService {
@KafkaListener(id = "${spring.kafka.out.consumer.group.id}",
topics = "${kafka.out.response.topic.name}",
containerFactory = "outputKafkaListenerContainerFactory")
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) {
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment,
@Header("messageId") String messageId) {
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
DownloadResponse.class
);
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
sendMessage(inStatusTopic.name(), downloadResponse, inKafkaTemplate);
sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate);
}
acknowledgment.acknowledge();
@ -296,10 +301,12 @@ public class FileUploadService {
}
}
private void sendMessage(@NonNull String topicName, Object object, KafkaTemplate<String, String> template) {
CompletableFuture<SendResult<String, String>> future = template.send(topicName,
new GsonBuilder().setPrettyPrinting().create().toJson(object)
);
private void sendMessage(@NonNull String topicName, Object object, String messageId,
KafkaTemplate<String, String> template) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
new GsonBuilder().setPrettyPrinting().create().toJson(object));
record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8));
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, e) -> {
if (e != null) {