From 9e5474681acf10fe18fd72de7c3a169a80135d4e Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Mon, 8 Sep 2025 10:26:02 +0300 Subject: [PATCH] SUPPORT-9322: Fix --- .../output/OutputKafkaConsumerConfig.java | 64 --------------- .../output/OutputKafkaProducerConfig.java | 52 ------------- .../config/output/OutputKafkaTopicConfig.java | 28 ------- .../ervu/av/kafka/dto/DownloadRequest.java | 1 + .../ervu/av/kafka/dto/DownloadResponse.java | 11 --- .../java/ru/micord/ervu/av/s3/S3Service.java | 9 +++ .../ervu/av/service/FileUploadService.java | 78 +++++++------------ src/main/resources/application.properties | 18 +---- 8 files changed, 39 insertions(+), 222 deletions(-) delete mode 100644 src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java delete mode 100644 src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java delete mode 100644 src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java delete mode 100644 src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java deleted file mode 100644 index bc5ac81..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -package ru.micord.ervu.av.kafka.config.output; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; - -/** - * @author r.latypov - */ -@Configuration -@EnableKafka -public class OutputKafkaConsumerConfig { - @Value("${spring.kafka.out.consumer.bootstrap.servers}") - private List bootstrapAddress; - @Value("${spring.kafka.out.consumer.security.protocol}") - private String securityProtocol; - @Value("${spring.kafka.out.consumer.properties.sasl.jaas.config}") - private String jaasConfig; - @Value("${spring.kafka.out.consumer.properties.sasl.mechanism}") - private String saslMechanism; - @Value("${spring.kafka.out.consumer.enable.auto.commit}") - private String enableAutoCommit; - @Value("${spring.kafka.out.listener.ack.mode}") - private String ackMode; - - @Bean - public ConsumerFactory outputConsumerFactory() { - Map configs = new HashMap<>(); - - configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); - configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); - configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - - configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); - - return new DefaultKafkaConsumerFactory<>(configs); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory outputKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(outputConsumerFactory()); - factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); - return factory; - } -} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java deleted file mode 100644 index b57b6a9..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java +++ /dev/null @@ -1,52 +0,0 @@ -package ru.micord.ervu.av.kafka.config.output; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -/** - * @author r.latypov - */ -@Configuration -public class OutputKafkaProducerConfig { - @Value("${spring.kafka.producer.bootstrap.servers}") - private List bootstrapAddress; - @Value("${spring.kafka.producer.security.protocol}") - private String securityProtocol; - @Value("${spring.kafka.producer.properties.sasl.jaas.config}") - private String jaasConfig; - @Value("${spring.kafka.producer.properties.sasl.mechanism}") - private String saslMechanism; - - @Bean - public ProducerFactory outputProducerFactory() { - Map configs = new HashMap<>(); - - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); - configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); - configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - - return new DefaultKafkaProducerFactory<>(configs); - } - - @Bean - public KafkaTemplate outputKafkaTemplate() { - return new KafkaTemplate<>(outputProducerFactory()); - } -} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java deleted file mode 100644 index 79f1ef8..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -package ru.micord.ervu.av.kafka.config.output; - -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.TopicBuilder; - -/** - * @author r.latypov - */ -@Configuration -public class OutputKafkaTopicConfig { - @Value("${kafka.out.error.topic.name}") - private String kafkaOutErrorTopicName; - @Value("${kafka.out.success.topic.name}") - private String kafkaOutSuccessTopicName; - - @Bean - public NewTopic outErrorTopic() { - return TopicBuilder.name(kafkaOutErrorTopicName).build(); - } - - @Bean - public NewTopic outSuccessTopic() { - return TopicBuilder.name(kafkaOutSuccessTopicName).build(); - } -} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java index 886297d..b785940 100644 --- a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java @@ -22,6 +22,7 @@ public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo[] filesInfo) { private final String filePatternName; private final String departureDateTime; private final String timeZone; + private final String type; @Setter private FileStatus fileStatus; } diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java deleted file mode 100644 index 56d7b7e..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package ru.micord.ervu.av.kafka.dto; - -import org.springframework.lang.NonNull; - -/** - * @author r.latypov - */ -public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { - public record FileInfo(String fileId, FileStatus fileStatus) { - } -} diff --git a/src/main/java/ru/micord/ervu/av/s3/S3Service.java b/src/main/java/ru/micord/ervu/av/s3/S3Service.java index 8537eed..e5a3b2d 100644 --- a/src/main/java/ru/micord/ervu/av/s3/S3Service.java +++ b/src/main/java/ru/micord/ervu/av/s3/S3Service.java @@ -40,4 +40,13 @@ public class S3Service { throw new FileUploadException(e); } } + + public void deleteFileByUrl(String url) { + String prefix = "s3://" + outBucketName + "/"; + if (!url.startsWith(prefix)) { + throw new IllegalArgumentException("Некорректный S3 URL: " + url); + } + String key = url.substring(prefix.length()); + outClient.deleteObject(outBucketName, key); + } } diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index 850c331..833012a 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -24,13 +24,12 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.SendResult; import org.springframework.lang.NonNull; -import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Service; import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; import ru.micord.ervu.av.exception.RetryableException; import ru.micord.ervu.av.kafka.dto.DownloadRequest; -import ru.micord.ervu.av.kafka.dto.DownloadResponse; import ru.micord.ervu.av.kafka.dto.FileStatus; import ru.micord.ervu.av.s3.S3Service; @@ -46,10 +45,7 @@ public class FileUploadService { private boolean avCheckEnabled; @Value("${file.saving.path}") private String fileSavingPath; - private final KafkaTemplate kafkaTemplate; private final KafkaTemplate inKafkaTemplate; - private final NewTopic outErrorTopic; - private final NewTopic outSuccessTopic; private final NewTopic inStatusTopic; private final FileManager fIleManager; private final ReceiveScanReportRetryable receiveScanReportRetryable; @@ -57,15 +53,10 @@ public class FileUploadService { @Autowired public FileUploadService( - @Qualifier("outputKafkaTemplate") KafkaTemplate kafkaTemplate, @Qualifier("inputKafkaTemplate") KafkaTemplate inKafkaTemplate, - NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable, S3Service s3Service, NewTopic inStatusTopic, FileManager fIleManager) { - this.kafkaTemplate = kafkaTemplate; - this.outErrorTopic = outErrorTopic; - this.outSuccessTopic = outSuccessTopic; this.receiveScanReportRetryable = receiveScanReportRetryable; this.s3Service = s3Service; this.inKafkaTemplate = inKafkaTemplate; @@ -76,7 +67,7 @@ public class FileUploadService { @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", containerFactory = "inputKafkaListenerContainerFactory") public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment, - @Header("messageId") String messageId) { + @Headers Map headers) { DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); Map tempFilesMap = new HashMap<>(); @@ -121,9 +112,6 @@ public class FileUploadService { FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); fIleManager.deleteFile(fileUrl.fileUrl()); } - if (!isAvError && exitCode == INFECTED_CODE) { - sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate); - } } else { for (Map.Entry entry : tempFilesMap.entrySet()) { @@ -135,13 +123,8 @@ public class FileUploadService { FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); fIleManager.deleteFile(fileUrl.fileUrl()); } - sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate); } - DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest); - DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(), - new DownloadRequest.FileInfo[] {csvFile} - ); - sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate); + sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); for (Path filePath : tempFilesMap.keySet()) { try { @@ -156,13 +139,11 @@ public class FileUploadService { // считаем, что повторная обработка сообщения не нужна // ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения LOGGER.error(e.getMessage() + ": " + kafkaInMessage); - DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest); - csvFile.setFileStatus(FileStatus.FILE_STATUS_11); - csvFile.setFileUrl(null); - DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(), - new DownloadRequest.FileInfo[] {csvFile} - ); - sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate); + for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) { + fileInfo.setFileUrl(null); + fileInfo.setFileStatus(FileStatus.FILE_STATUS_11); + } + sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); } catch (FileUploadException | IOException e) { // считаем, что нужно повторное считывание сообщения @@ -174,22 +155,6 @@ public class FileUploadService { } } - @KafkaListener(id = "${spring.kafka.out.consumer.group.id}", - topics = "${kafka.out.response.topic.name}", - containerFactory = "outputKafkaListenerContainerFactory") - public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment, - @Header("messageId") String messageId) { - DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage, - DownloadResponse.class - ); - FileStatus fileStatus = downloadResponse.fileInfo().fileStatus(); - if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) { - sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate); - } - - acknowledgment.acknowledge(); - } - /* метод для выделения UUID файла из ссылки на файл сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла */ @@ -205,11 +170,20 @@ public class FileUploadService { } } - private void sendMessage(@NonNull String topicName, Object object, String messageId, + private void sendMessage(@NonNull String topicName, Object object, Map headers, KafkaTemplate template) { ProducerRecord record = new ProducerRecord<>(topicName, new GsonBuilder().setPrettyPrinting().create().toJson(object)); - record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8)); + if (headers != null) { + headers.forEach((key, value) -> { + if (value instanceof byte[]) { + record.headers().add(key, (byte[]) value); + } + else if (value != null) { + record.headers().add(key, value.toString().getBytes(StandardCharsets.UTF_8)); + } + }); + } CompletableFuture> future = template.send(record); future.whenComplete((result, e) -> { @@ -225,10 +199,14 @@ public class FileUploadService { private record FileUrl(String fileName, String fileUrl) { } - private DownloadRequest.FileInfo getCsvFile(DownloadRequest downloadRequest) { - return Arrays.stream(downloadRequest.filesInfo()) - .filter(fi -> fi.getFileName() != null && fi.getFileName().endsWith(".csv")) - .findFirst() - .orElse(null); + @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}", + containerFactory = "inputKafkaListenerContainerFactory") + public void listenKafkaIn(String kafkaMessage, Acknowledgment acknowledgment) { + DownloadRequest downloadRequest = new Gson().fromJson(kafkaMessage, + DownloadRequest.class + ); + Arrays.stream(downloadRequest.filesInfo()) + .forEach(fileInfo -> s3Service.deleteFileByUrl(fileInfo.getFileUrl())); + acknowledgment.acknowledge(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ddf45e4..70767b4 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -30,25 +30,9 @@ spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.securi spring.kafka.producer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} # spring kafka default beans properties <- end # -# kafka out consumer (not for default bean creation by spring) -#host1:port1, host2:port2 -spring.kafka.out.consumer.bootstrap.servers=${ERVU_KAFKA_BOOTSTRAP_SERVERS} -spring.kafka.out.consumer.security.protocol=${ERVU_KAFKA_SECURITY_PROTOCOL:SASL_PLAINTEXT} -#login password to set -spring.kafka.out.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}"; -spring.kafka.out.consumer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} -# -spring.kafka.out.consumer.enable.auto.commit=false -spring.kafka.out.consumer.group.id=${ERVU_KAFKA_GROUP_ID:response-consumers} -# kafka out listeners -spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE -# -# kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME} kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME} -kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME} -kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME} -kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME} +kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME} # av.check.enabled=${AV_CHECK_ENABLED:true} av.rest.address=${AV_REST_ADDRESS}