From e2c5f44c6884a545da1b9dd05fe316cb6b47aa02 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Thu, 26 Sep 2024 09:52:49 +0300 Subject: [PATCH] SUPPORT-8556: Fix --- .../input/InputKafkaProducerConfig.java | 54 +++++++++++++++++++ .../config/input/InputKafkaTopicConfig.java | 21 ++++++++ .../output/OutputKafkaProducerConfig.java | 3 ++ .../ervu/av/service/FileStatusService.java | 40 -------------- .../ervu/av/service/FileUploadService.java | 32 ++++++----- src/main/resources/application.properties | 16 +++--- test.env | 4 +- 7 files changed, 102 insertions(+), 68 deletions(-) create mode 100644 src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaProducerConfig.java create mode 100644 src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaTopicConfig.java delete mode 100644 src/main/java/ru/micord/ervu/av/service/FileStatusService.java diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaProducerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaProducerConfig.java new file mode 100644 index 0000000..77a1323 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaProducerConfig.java @@ -0,0 +1,54 @@ +package ru.micord.ervu.av.kafka.config.input; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +/** + * @author r.latypov + */ +@Configuration +public class InputKafkaProducerConfig { + @Value("${spring.kafka.in.producer.bootstrap.servers}") + private List bootstrapAddress; + @Value("${spring.kafka.in.producer.security.protocol}") + private String securityProtocol; + @Value("${spring.kafka.in.producer.properties.sasl.jaas.config}") + private String jaasConfig; + @Value("${spring.kafka.in.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Bean + @Qualifier("in-prod-factory") + public ProducerFactory producerFactory() { + Map configs = new HashMap<>(); + + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + return new DefaultKafkaProducerFactory<>(configs); + } + + @Bean + @Qualifier("in-template") + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaTopicConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaTopicConfig.java new file mode 100644 index 0000000..a7211ba --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaTopicConfig.java @@ -0,0 +1,21 @@ +package ru.micord.ervu.av.kafka.config.input; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; + +/** + * @author r.latypov + */ +@Configuration +public class InputKafkaTopicConfig { + @Value("${kafka.in.status.topic.name}") + private String kafkaInStatusTopicName; + + @Bean + public NewTopic inStatusTopic() { + return TopicBuilder.name(kafkaInStatusTopicName).build(); + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java index 2a14b10..8a0ee0b 100644 --- a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java +++ b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java @@ -8,6 +8,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -30,6 +31,7 @@ public class OutputKafkaProducerConfig { private String saslMechanism; @Bean + @Qualifier("out-prod-factory") public ProducerFactory producerFactory() { Map configs = new HashMap<>(); @@ -45,6 +47,7 @@ public class OutputKafkaProducerConfig { } @Bean + @Qualifier("out-template") public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } diff --git a/src/main/java/ru/micord/ervu/av/service/FileStatusService.java b/src/main/java/ru/micord/ervu/av/service/FileStatusService.java deleted file mode 100644 index 82e2c7c..0000000 --- a/src/main/java/ru/micord/ervu/av/service/FileStatusService.java +++ /dev/null @@ -1,40 +0,0 @@ -package ru.micord.ervu.av.service; - -import org.jooq.DSLContext; -import org.jooq.Field; -import org.jooq.Record; -import org.jooq.Table; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.name; -import static org.jooq.impl.DSL.table; - -/** - * @author r.latypov - */ -@Service -public class FileStatusService { - public static final Table INTERACTION_LOG = table(name("public", "interaction_log")); - public static final Field INTERACTION_LOG_ID = field(name("id"), Long.class); - public static final Field INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class); - public static final Field INTERACTION_LOG_STATUS = field(name("status"), String.class); - - @Autowired - private DSLContext dslContext; - - public void setStatus(Long id, String status) { - dslContext.update(INTERACTION_LOG) - .set(INTERACTION_LOG_STATUS, status) - .where(INTERACTION_LOG_ID.eq(id)) - .execute(); - } - - public void setStatus(String fileId, String status) { - dslContext.update(INTERACTION_LOG) - .set(INTERACTION_LOG_STATUS, status) - .where(INTERACTION_LOG_FILE_ID.eq(fileId)) - .execute(); - } -} diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index 380cca8..6b9e1e2 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.kafka.annotation.KafkaListener; @@ -55,29 +56,31 @@ public class FileUploadService { @Value("${file.saving.path}") private String fileSavingPath; private final KafkaTemplate kafkaTemplate; + private final KafkaTemplate inKafkaTemplate; private final NewTopic outErrorTopic; private final NewTopic outSuccessTopic; + private final NewTopic inStatusTopic; private final ReceiveScanReportRetryable receiveScanReportRetryable; - private final FileStatusService fileStatusService; private final S3Service s3Service; @Autowired - public FileUploadService(KafkaTemplate kafkaTemplate, NewTopic outErrorTopic, - NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable, - FileStatusService fileStatusService, S3Service s3Service) { + public FileUploadService(@Qualifier("out-template") KafkaTemplate kafkaTemplate, + @Qualifier("in-template") KafkaTemplate inKafkaTemplate, + NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable, + S3Service s3Service, NewTopic inStatusTopic) { this.kafkaTemplate = kafkaTemplate; this.outErrorTopic = outErrorTopic; this.outSuccessTopic = outSuccessTopic; this.receiveScanReportRetryable = receiveScanReportRetryable; - this.fileStatusService = fileStatusService; this.s3Service = s3Service; + this.inKafkaTemplate = inKafkaTemplate; + this.inStatusTopic = inStatusTopic; } @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", containerFactory = "inputKafkaListenerContainerFactory") public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) { DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); - String fileId = downloadRequest.fileInfo().getFileId(); try { FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl()); @@ -95,19 +98,16 @@ public class FileUploadService { if (infected || !clean) { downloadRequest.fileInfo().setFileUrl(null); downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02); - sendMessage(outErrorTopic.name(), downloadRequest); - - fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status()); + sendMessage(outErrorTopic.name(), downloadRequest, kafkaTemplate); } else { s3Service.putFile(filePath, fileUrl.fileName()); downloadRequest.fileInfo().setFileUrl(fileUrl.fileName()); downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03); - sendMessage(outSuccessTopic.name(), downloadRequest); - - fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status()); + sendMessage(outSuccessTopic.name(), downloadRequest, kafkaTemplate); } + sendMessage(inStatusTopic.name(), downloadRequest, inKafkaTemplate); deleteFile(downloadUrl); if (new File(filePath).delete()) { @@ -135,11 +135,9 @@ public class FileUploadService { DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage, DownloadResponse.class ); - - String fileId = downloadResponse.fileInfo().fileId(); FileStatus fileStatus = downloadResponse.fileInfo().fileStatus(); if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) { - fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status()); + sendMessage(inStatusTopic.name(), downloadResponse, inKafkaTemplate); } acknowledgment.acknowledge(); @@ -274,8 +272,8 @@ public class FileUploadService { } } - private void sendMessage(@NonNull String topicName, Object object) { - CompletableFuture> future = kafkaTemplate.send(topicName, + private void sendMessage(@NonNull String topicName, Object object, KafkaTemplate template) { + CompletableFuture> future = template.send(topicName, new GsonBuilder().setPrettyPrinting().create().toJson(object) ); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 153cb49..0601a13 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -20,6 +20,13 @@ spring.kafka.consumer.enable.auto.commit=false spring.kafka.consumer.group.id=file-to-upload-consumers # kafka in listeners spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE +# kafka in producer (with possibility for default bean) +#host1:port1, host2:port2 +spring.kafka.in.producer.bootstrap.servers=${IN_KAFKA_SERVERS} +spring.kafka.in.producer.security.protocol=SASL_PLAINTEXT +#login password to set +spring.kafka.in.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${IN_KAFKA_USERNAME}" password="${IN_KAFKA_PASSWORD}"; +spring.kafka.in.producer.properties.sasl.mechanism=SCRAM-SHA-256 # # kafka out producer (with possibility for default bean) #host1:port1, host2:port2 @@ -51,6 +58,7 @@ spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE # # kafka.in.topic.name=${IN_KAFKA_TOPIC_NAME} +kafka.in.status.topic.name=${IN_KAFKA_TOPIC_NAME} kafka.out.error.topic.name=${OUT_KAFKA_ERROR_TOPIC_NAME} kafka.out.success.topic.name=${OUT_KAFKA_SUCCESS_TOPIC_NAME} kafka.out.response.topic.name=${OUT_KAFKA_RESPONSE_TOPIC_NAME} @@ -67,11 +75,3 @@ s3.out.access_key=${S3_ACCESS_KEY} s3.out.secret_key=${S3_SECRET_KEY} s3.out.bucket_name=${S3_OUT_BUCKET_NAME} # -# spring jooq dsl bean properties begin -> -spring.jooq.sql-dialect=Postgres -spring.datasource.driver-class-name=org.postgresql.Driver -#host:port/database_name -spring.datasource.url=jdbc:postgresql://${SPRING_DATASOURCE_URL} -spring.datasource.username=${SPRING_DATASOURCE_USERNAME} -spring.datasource.password=${SPRING_DATASOURCE_PASSWORD} -# spring jooq dsl bean properties <- end diff --git a/test.env b/test.env index 01c5d43..41d7c2f 100644 --- a/test.env +++ b/test.env @@ -4,6 +4,7 @@ IN_KAFKA_SERVERS=10.10.31.11:32609 IN_KAFKA_USERNAME=user1 IN_KAFKA_PASSWORD=Blfi9d2OFG IN_KAFKA_TOPIC_NAME=file-to-upload +IN_KAFKA_STATUS_TOPIC_NAME=file-status OUT_KAFKA_SERVERS=10.10.31.11:32609 OUT_KAFKA_USERNAME=user1 OUT_KAFKA_PASSWORD=Blfi9d2OFG @@ -20,6 +21,3 @@ S3_PORT=31900 S3_ACCESS_KEY=Keyq0l8IRarEf5GmpvEO S3_SECRET_KEY=8A2epSoI6OjdHHwA5F6tHxeYRThv47GdGwcBrV7a S3_OUT_BUCKET_NAME=default-out-bucket -SPRING_DATASOURCE_URL=10.10.31.119:5432/ervu-lkrp-ul -SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul -SPRING_DATASOURCE_PASSWORD=ervu-lkrp-ul