diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java deleted file mode 100644 index bc5ac81..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -package ru.micord.ervu.av.kafka.config.output; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.listener.ContainerProperties; - -/** - * @author r.latypov - */ -@Configuration -@EnableKafka -public class OutputKafkaConsumerConfig { - @Value("${spring.kafka.out.consumer.bootstrap.servers}") - private List bootstrapAddress; - @Value("${spring.kafka.out.consumer.security.protocol}") - private String securityProtocol; - @Value("${spring.kafka.out.consumer.properties.sasl.jaas.config}") - private String jaasConfig; - @Value("${spring.kafka.out.consumer.properties.sasl.mechanism}") - private String saslMechanism; - @Value("${spring.kafka.out.consumer.enable.auto.commit}") - private String enableAutoCommit; - @Value("${spring.kafka.out.listener.ack.mode}") - private String ackMode; - - @Bean - public ConsumerFactory outputConsumerFactory() { - Map configs = new HashMap<>(); - - configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); - configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); - configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - - configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); - - return new DefaultKafkaConsumerFactory<>(configs); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory outputKafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(outputConsumerFactory()); - factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); - return factory; - } -} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java deleted file mode 100644 index b57b6a9..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java +++ /dev/null @@ -1,52 +0,0 @@ -package ru.micord.ervu.av.kafka.config.output; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; - -/** - * @author r.latypov - */ -@Configuration -public class OutputKafkaProducerConfig { - @Value("${spring.kafka.producer.bootstrap.servers}") - private List bootstrapAddress; - @Value("${spring.kafka.producer.security.protocol}") - private String securityProtocol; - @Value("${spring.kafka.producer.properties.sasl.jaas.config}") - private String jaasConfig; - @Value("${spring.kafka.producer.properties.sasl.mechanism}") - private String saslMechanism; - - @Bean - public ProducerFactory outputProducerFactory() { - Map configs = new HashMap<>(); - - configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); - configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); - configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); - configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - - return new DefaultKafkaProducerFactory<>(configs); - } - - @Bean - public KafkaTemplate outputKafkaTemplate() { - return new KafkaTemplate<>(outputProducerFactory()); - } -} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java deleted file mode 100644 index 79f1ef8..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java +++ /dev/null @@ -1,28 +0,0 @@ -package ru.micord.ervu.av.kafka.config.output; - -import org.apache.kafka.clients.admin.NewTopic; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.TopicBuilder; - -/** - * @author r.latypov - */ -@Configuration -public class OutputKafkaTopicConfig { - @Value("${kafka.out.error.topic.name}") - private String kafkaOutErrorTopicName; - @Value("${kafka.out.success.topic.name}") - private String kafkaOutSuccessTopicName; - - @Bean - public NewTopic outErrorTopic() { - return TopicBuilder.name(kafkaOutErrorTopicName).build(); - } - - @Bean - public NewTopic outSuccessTopic() { - return TopicBuilder.name(kafkaOutSuccessTopicName).build(); - } -} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java index 822fcf9..b785940 100644 --- a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java @@ -8,7 +8,7 @@ import org.springframework.lang.NonNull; /** * @author r.latypov */ -public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { +public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo[] filesInfo) { @Getter @AllArgsConstructor @@ -22,6 +22,7 @@ public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { private final String filePatternName; private final String departureDateTime; private final String timeZone; + private final String type; @Setter private FileStatus fileStatus; } diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java deleted file mode 100644 index 56d7b7e..0000000 --- a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package ru.micord.ervu.av.kafka.dto; - -import org.springframework.lang.NonNull; - -/** - * @author r.latypov - */ -public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { - public record FileInfo(String fileId, FileStatus fileStatus) { - } -} diff --git a/src/main/java/ru/micord/ervu/av/s3/S3Service.java b/src/main/java/ru/micord/ervu/av/s3/S3Service.java index 8537eed..721b9dc 100644 --- a/src/main/java/ru/micord/ervu/av/s3/S3Service.java +++ b/src/main/java/ru/micord/ervu/av/s3/S3Service.java @@ -16,11 +16,13 @@ import ru.micord.ervu.av.exception.FileUploadException; public class S3Service { private final String outBucketName; private final AmazonS3 outClient; + private final String s3Prefix; @Autowired public S3Service(String outBucketName, AmazonS3 outClient) { this.outBucketName = outBucketName; this.outClient = outClient; + this.s3Prefix = "s3://" + outBucketName + "/"; } @PostConstruct @@ -33,11 +35,19 @@ public class S3Service { public String putFile(Path filePath, String key) throws FileUploadException { try { outClient.putObject(outBucketName, key, filePath.toFile()); - return String.join("/", "s3:/", outBucketName, key); + return s3Prefix + key; } catch (AmazonServiceException e) { // todo message throw new FileUploadException(e); } } + + public void deleteFileByUrl(String url) { + if (!url.startsWith(s3Prefix)) { + throw new IllegalArgumentException("Некорректный S3 URL: " + url); + } + String key = url.substring(s3Prefix.length()); + outClient.deleteObject(outBucketName, key); + } } diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index 897f56c..78eef23 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -5,6 +5,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import com.google.gson.Gson; @@ -21,13 +24,12 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.SendResult; import org.springframework.lang.NonNull; -import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Service; import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; import ru.micord.ervu.av.exception.RetryableException; import ru.micord.ervu.av.kafka.dto.DownloadRequest; -import ru.micord.ervu.av.kafka.dto.DownloadResponse; import ru.micord.ervu.av.kafka.dto.FileStatus; import ru.micord.ervu.av.s3.S3Service; @@ -43,10 +45,7 @@ public class FileUploadService { private boolean avCheckEnabled; @Value("${file.saving.path}") private String fileSavingPath; - private final KafkaTemplate kafkaTemplate; private final KafkaTemplate inKafkaTemplate; - private final NewTopic outErrorTopic; - private final NewTopic outSuccessTopic; private final NewTopic inStatusTopic; private final FileManager fIleManager; private final ReceiveScanReportRetryable receiveScanReportRetryable; @@ -54,15 +53,10 @@ public class FileUploadService { @Autowired public FileUploadService( - @Qualifier("outputKafkaTemplate") KafkaTemplate kafkaTemplate, @Qualifier("inputKafkaTemplate") KafkaTemplate inKafkaTemplate, - NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable, S3Service s3Service, NewTopic inStatusTopic, FileManager fIleManager) { - this.kafkaTemplate = kafkaTemplate; - this.outErrorTopic = outErrorTopic; - this.outSuccessTopic = outSuccessTopic; this.receiveScanReportRetryable = receiveScanReportRetryable; this.s3Service = s3Service; this.inKafkaTemplate = inKafkaTemplate; @@ -73,91 +67,92 @@ public class FileUploadService { @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", containerFactory = "inputKafkaListenerContainerFactory") public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment, - @Header("messageId") String messageId) { + @Headers Map headers) { DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); + Map tempFilesMap = new HashMap<>(); + try { - FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl()); - Files.createDirectories(Paths.get(fileSavingPath)); - LOGGER.info("working in {}", System.getProperty("user.home")); - Path filePath = Paths.get(fileSavingPath, fileUrl.fileName()); - String downloadUrl = fileUrl.fileUrl(); - fIleManager.downloadFile(downloadUrl, filePath); boolean isAvError = false; int exitCode = 0; + boolean hasError = false; - if (avCheckEnabled) { - try { - exitCode = receiveScanReportRetryable.checkFile(filePath); + for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) { + FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); + Files.createDirectories(Paths.get(fileSavingPath)); + LOGGER.info("working in {}", System.getProperty("user.home")); + Path filePath = Paths.get(fileSavingPath, fileUrl.fileName()); + String downloadUrl = fileUrl.fileUrl(); + fIleManager.downloadFile(downloadUrl, filePath); + tempFilesMap.put(filePath, fileInfo); + + if (avCheckEnabled) { + try { + exitCode = receiveScanReportRetryable.checkFile(filePath); + } + catch (FileUploadException | RetryableException e) { + LOGGER.error(e.getMessage(), e); + isAvError = true; + } } - catch (FileUploadException | RetryableException e) { - LOGGER.error(e.getMessage(), e); - isAvError = true; + + if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) { + hasError = true; + break; } } - if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) { - downloadRequest.fileInfo().setFileUrl(null); + if (hasError) { FileStatus fileStatus = (exitCode == PASS_PROTECTED_CODE || isAvError) ? FileStatus.FILE_STATUS_11 : FileStatus.FILE_STATUS_02; - downloadRequest.fileInfo().setFileStatus(fileStatus); - - if (!isAvError && exitCode == INFECTED_CODE) { - sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate); + for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) { + fileInfo.setFileUrl(null); + fileInfo.setFileStatus(fileStatus); + FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); + fIleManager.deleteFile(fileUrl.fileUrl()); } } else { - String fileRef = s3Service.putFile(filePath, fileUrl.fileName()); - - downloadRequest.fileInfo().setFileUrl(fileRef); - downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03); - sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate); + for (Map.Entry entry : tempFilesMap.entrySet()) { + Path filePath = entry.getKey(); + DownloadRequest.FileInfo fileInfo = entry.getValue(); + String fileRef = s3Service.putFile(filePath, filePath.getFileName().toString()); + fileInfo.setFileUrl(fileRef); + fileInfo.setFileStatus(FileStatus.FILE_STATUS_03); + FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); + fIleManager.deleteFile(fileUrl.fileUrl()); + } } - sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); + sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); - fIleManager.deleteFile(downloadUrl); - - try { - Files.delete(filePath); - } - catch (IOException e) { - LOGGER.error("Failed to delete file " + filePath, e); - } - finally { - acknowledgment.acknowledge(); + for (Path filePath : tempFilesMap.keySet()) { + try { + Files.deleteIfExists(filePath); + } + catch (IOException e) { + LOGGER.error("Failed to delete file " + filePath, e); + } } } catch (InvalidHttpFileUrlException e) { // считаем, что повторная обработка сообщения не нужна // ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения LOGGER.error(e.getMessage() + ": " + kafkaInMessage); - downloadRequest.fileInfo().setFileUrl(null); - downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_11); - sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); - acknowledgment.acknowledge(); + for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) { + fileInfo.setFileUrl(null); + fileInfo.setFileStatus(FileStatus.FILE_STATUS_11); + } + sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); } catch (FileUploadException | IOException e) { // считаем, что нужно повторное считывание сообщения // ошибку логируем, сообщение оставляем непрочитанным LOGGER.error(e.getMessage(), e); } - } - - @KafkaListener(id = "${spring.kafka.out.consumer.group.id}", - topics = "${kafka.out.response.topic.name}", - containerFactory = "outputKafkaListenerContainerFactory") - public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment, - @Header("messageId") String messageId) { - DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage, - DownloadResponse.class - ); - FileStatus fileStatus = downloadResponse.fileInfo().fileStatus(); - if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) { - sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate); + finally { + acknowledgment.acknowledge(); } - - acknowledgment.acknowledge(); } /* метод для выделения UUID файла из ссылки на файл @@ -175,11 +170,20 @@ public class FileUploadService { } } - private void sendMessage(@NonNull String topicName, Object object, String messageId, - KafkaTemplate template) { + private void sendMessage(@NonNull String topicName, Object object, Map headers, + KafkaTemplate template) { ProducerRecord record = new ProducerRecord<>(topicName, new GsonBuilder().setPrettyPrinting().create().toJson(object)); - record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8)); + if (headers != null) { + headers.forEach((key, value) -> { + if (value instanceof byte[]) { + record.headers().add(key, (byte[]) value); + } + else if (value != null) { + record.headers().add(key, value.toString().getBytes(StandardCharsets.UTF_8)); + } + }); + } CompletableFuture> future = template.send(record); future.whenComplete((result, e) -> { @@ -194,4 +198,15 @@ public class FileUploadService { private record FileUrl(String fileName, String fileUrl) { } + + @KafkaListener(id = "${spring.kafka.s3.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}", + containerFactory = "inputKafkaListenerContainerFactory") + public void listenKafkaIn(String kafkaMessage, Acknowledgment acknowledgment) { + DownloadRequest downloadRequest = new Gson().fromJson(kafkaMessage, + DownloadRequest.class + ); + Arrays.stream(downloadRequest.filesInfo()) + .forEach(fileInfo -> s3Service.deleteFileByUrl(fileInfo.getFileUrl())); + acknowledgment.acknowledge(); + } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ddf45e4..70767b4 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -30,25 +30,9 @@ spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.securi spring.kafka.producer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} # spring kafka default beans properties <- end # -# kafka out consumer (not for default bean creation by spring) -#host1:port1, host2:port2 -spring.kafka.out.consumer.bootstrap.servers=${ERVU_KAFKA_BOOTSTRAP_SERVERS} -spring.kafka.out.consumer.security.protocol=${ERVU_KAFKA_SECURITY_PROTOCOL:SASL_PLAINTEXT} -#login password to set -spring.kafka.out.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}"; -spring.kafka.out.consumer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} -# -spring.kafka.out.consumer.enable.auto.commit=false -spring.kafka.out.consumer.group.id=${ERVU_KAFKA_GROUP_ID:response-consumers} -# kafka out listeners -spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE -# -# kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME} kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME} -kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME} -kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME} -kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME} +kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME} # av.check.enabled=${AV_CHECK_ENABLED:true} av.rest.address=${AV_REST_ADDRESS}