diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index 5d52d0d..87457b3 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -3,6 +3,7 @@ package ru.micord.ervu.av.service; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -25,6 +26,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -36,6 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.SendResult; import org.springframework.lang.NonNull; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; @@ -84,7 +87,8 @@ public class FileUploadService { @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", containerFactory = "inputKafkaListenerContainerFactory") - public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) { + public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment, + @Header("messageId") String messageId) { DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); try { @@ -115,16 +119,16 @@ public class FileUploadService { if (infected || !clean) { downloadRequest.fileInfo().setFileUrl(null); downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02); - sendMessage(outErrorTopic.name(), downloadRequest, kafkaTemplate); + sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate); } else { String fileRef = s3Service.putFile(filePath, fileUrl.fileName()); downloadRequest.fileInfo().setFileUrl(fileRef); downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03); - sendMessage(outSuccessTopic.name(), downloadRequest, kafkaTemplate); + sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate); } - sendMessage(inStatusTopic.name(), downloadRequest, inKafkaTemplate); + sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); deleteFile(downloadUrl); @@ -155,13 +159,14 @@ public class FileUploadService { @KafkaListener(id = "${spring.kafka.out.consumer.group.id}", topics = "${kafka.out.response.topic.name}", containerFactory = "outputKafkaListenerContainerFactory") - public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) { + public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment, + @Header("messageId") String messageId) { DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage, DownloadResponse.class ); FileStatus fileStatus = downloadResponse.fileInfo().fileStatus(); if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) { - sendMessage(inStatusTopic.name(), downloadResponse, inKafkaTemplate); + sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate); } acknowledgment.acknowledge(); @@ -296,10 +301,12 @@ public class FileUploadService { } } - private void sendMessage(@NonNull String topicName, Object object, KafkaTemplate template) { - CompletableFuture> future = template.send(topicName, - new GsonBuilder().setPrettyPrinting().create().toJson(object) - ); + private void sendMessage(@NonNull String topicName, Object object, String messageId, + KafkaTemplate template) { + ProducerRecord record = new ProducerRecord<>(topicName, + new GsonBuilder().setPrettyPrinting().create().toJson(object)); + record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8)); + CompletableFuture> future = template.send(record); future.whenComplete((result, e) -> { if (e != null) {