From 83901de08213e58f06ad6930f87a58ed5fb4696e Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Wed, 25 Sep 2024 14:00:12 +0300 Subject: [PATCH 1/6] SUPPORT-8556: Fix --- ...ProducerConfig.java => AvKafkaConfig.java} | 28 ++++++++++++++++--- .../model/fileupload/DownloadResponse.java | 7 +++++ .../EmployeeInfoFileUploadService.java | 22 +++++++++++++-- .../ervu/service/InteractionService.java | 2 ++ .../ervu/service/InteractionServiceImpl.java | 11 +++++++- 5 files changed, 62 insertions(+), 8 deletions(-) rename backend/src/main/java/ervu/{KafkaProducerConfig.java => AvKafkaConfig.java} (65%) create mode 100644 backend/src/main/java/ervu/model/fileupload/DownloadResponse.java diff --git a/backend/src/main/java/ervu/KafkaProducerConfig.java b/backend/src/main/java/ervu/AvKafkaConfig.java similarity index 65% rename from backend/src/main/java/ervu/KafkaProducerConfig.java rename to backend/src/main/java/ervu/AvKafkaConfig.java index d13116a8..a6082681 100644 --- a/backend/src/main/java/ervu/KafkaProducerConfig.java +++ b/backend/src/main/java/ervu/AvKafkaConfig.java @@ -4,21 +4,21 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.*; /** * @author Alexandr Shalaginov */ @Configuration -public class KafkaProducerConfig { +public class AvKafkaConfig { @Value("${av.kafka.bootstrap.servers}") private String kafkaUrl; @Value("${av.kafka.security.protocol}") @@ -52,6 +52,26 @@ public class KafkaProducerConfig { return props; } + @Bean("av-cons-factory") + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfig()); + } + + @Bean("av-cons-configs") + public Map consumerConfig() { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\"" + + username + "\" password=\"" + password + "\";"); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + return props; + } + @Bean("av-template") public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); diff --git a/backend/src/main/java/ervu/model/fileupload/DownloadResponse.java b/backend/src/main/java/ervu/model/fileupload/DownloadResponse.java new file mode 100644 index 00000000..25f28c08 --- /dev/null +++ b/backend/src/main/java/ervu/model/fileupload/DownloadResponse.java @@ -0,0 +1,7 @@ +package ervu.model.fileupload; + +/** + * @author r.latypov + */ +public record DownloadResponse(OrgInfo orgInfo, FileInfo fileInfo) { +} diff --git a/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java b/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java index a566eddd..c4b71b9d 100644 --- a/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java +++ b/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java @@ -9,14 +9,13 @@ import java.util.UUID; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import ervu.client.fileupload.FileUploadWebDavClient; -import ervu.model.fileupload.EmployeeInfoFileFormType; -import ervu.model.fileupload.EmployeeInfoKafkaMessage; -import ervu.model.fileupload.FileStatus; +import ervu.model.fileupload.*; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.web.multipart.MultipartFile; @@ -147,4 +146,21 @@ public class EmployeeInfoFileUploadService { private String getTimeZone() { return ZonedDateTime.now().getOffset().toString(); } + + @KafkaListener(id = "${av.kafka.group.id}", topics = "${av.kafka.download.response}", + containerFactory = "av-cons-factory") + public void listenKafka(String kafkaMessage) { + ObjectMapper mapper = new ObjectMapper(); + try { + DownloadResponse downloadResponse = mapper.readValue(kafkaMessage, DownloadResponse.class); + FileInfo fileInfo = downloadResponse.fileInfo(); + interactionService.updateStatus(fileInfo.getFileId(), fileInfo.getFileStatus().getStatus(), + downloadResponse.orgInfo().getOrgId() + ); + } + catch (JsonProcessingException e) { + throw new RuntimeException(String.format("Fail get json from: %s", kafkaMessage), e); + } + + } } diff --git a/backend/src/main/java/ru/micord/ervu/service/InteractionService.java b/backend/src/main/java/ru/micord/ervu/service/InteractionService.java index 9be9cb11..fc61c684 100644 --- a/backend/src/main/java/ru/micord/ervu/service/InteractionService.java +++ b/backend/src/main/java/ru/micord/ervu/service/InteractionService.java @@ -13,4 +13,6 @@ public interface InteractionService { List get(String ervuId, String[] excludedStatuses); void setStatus(String fileId, String status, String fileName, String form, Timestamp timestamp, String sender, Integer count, String ervuId); + + void updateStatus(String fileId, String status, String ervuId); } diff --git a/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java b/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java index d2adcdd9..76f34cef 100644 --- a/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java +++ b/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java @@ -42,6 +42,15 @@ public class InteractionServiceImpl implements InteractionService { .set(INTERACTION_LOG.SENDER, sender) .set(INTERACTION_LOG.FILE_NAME, fileName) .set(INTERACTION_LOG.RECORDS_SENT, count) - .set(INTERACTION_LOG.ERVU_ID, ervuId); + .set(INTERACTION_LOG.ERVU_ID, ervuId) + .execute(); + } + + public void updateStatus(String fileId, String status, String ervuId) { + dslContext.update(INTERACTION_LOG) + .set(INTERACTION_LOG.STATUS, status) + .where(INTERACTION_LOG.ERVU_ID.eq(ervuId)) + .and(INTERACTION_LOG.FILE_ID.eq(fileId)) + .execute(); } } From a398e26794123aacc9c4da94cfb83be180e63259 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Thu, 26 Sep 2024 09:40:06 +0300 Subject: [PATCH 2/6] SUPPORT-8556: Fix --- backend/src/main/java/ervu/AvKafkaConfig.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/src/main/java/ervu/AvKafkaConfig.java b/backend/src/main/java/ervu/AvKafkaConfig.java index a6082681..9788644e 100644 --- a/backend/src/main/java/ervu/AvKafkaConfig.java +++ b/backend/src/main/java/ervu/AvKafkaConfig.java @@ -4,15 +4,17 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.*; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; /** * @author Alexandr Shalaginov From 723a4b965494d797f70e3fde3075e87ff611bc64 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Thu, 26 Sep 2024 14:10:57 +0300 Subject: [PATCH 3/6] SUPPORT-8556: Fix --- config/micord.env | 2 ++ config/standalone/dev/standalone.xml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/config/micord.env b/config/micord.env index 42f78eb9..66a9973c 100644 --- a/config/micord.env +++ b/config/micord.env @@ -15,6 +15,8 @@ AV_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT AV_KAFKA_SASL_MECHANISM=SCRAM-SHA-256 AV_KAFKA_USERNAME=user1 AV_KAFKA_PASSWORD=Blfi9d2OFG +AV_KAFKA_GROUP_ID=1 +AV_KAFKA_DOWNLOAD_RESPONSE=file-status ERVU_FILEUPLOAD_MAX_FILE_SIZE=5242880 ERVU_FILEUPLOAD_MAX_REQUEST_SIZE=6291456 ERVU_FILEUPLOAD_FILE_SIZE_THRESHOLD=0 diff --git a/config/standalone/dev/standalone.xml b/config/standalone/dev/standalone.xml index 9daf4143..49203d51 100644 --- a/config/standalone/dev/standalone.xml +++ b/config/standalone/dev/standalone.xml @@ -89,6 +89,8 @@ + + From c9d7135af8d76d122cf37275e564abb8ecb8604c Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Thu, 26 Sep 2024 14:19:01 +0300 Subject: [PATCH 4/6] SUPPORT-8556: Fix --- .../service/fileupload/EmployeeInfoFileUploadService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java b/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java index c4b71b9d..634a53d0 100644 --- a/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java +++ b/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java @@ -9,7 +9,11 @@ import java.util.UUID; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import ervu.client.fileupload.FileUploadWebDavClient; -import ervu.model.fileupload.*; +import ervu.model.fileupload.DownloadResponse; +import ervu.model.fileupload.EmployeeInfoFileFormType; +import ervu.model.fileupload.EmployeeInfoKafkaMessage; +import ervu.model.fileupload.FileInfo; +import ervu.model.fileupload.FileStatus; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From ea48e67cb90adf5edcbb0668f0647b5e54ab8280 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Fri, 27 Sep 2024 09:45:12 +0300 Subject: [PATCH 5/6] SUPPORT-8556: Fix --- backend/src/main/java/ervu/AvKafkaConfig.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/backend/src/main/java/ervu/AvKafkaConfig.java b/backend/src/main/java/ervu/AvKafkaConfig.java index 9788644e..ea74f176 100644 --- a/backend/src/main/java/ervu/AvKafkaConfig.java +++ b/backend/src/main/java/ervu/AvKafkaConfig.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,12 +35,14 @@ public class AvKafkaConfig { @Value("${av.kafka.sasl.mechanism}") private String saslMechanism; - @Bean("av-factory") + @Bean() + @Qualifier("avProducerFactory") public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } - @Bean("av-configs") + @Bean() + @Qualifier("avProducerConfigs") public Map producerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl); @@ -54,13 +57,15 @@ public class AvKafkaConfig { return props; } - @Bean("av-cons-factory") + @Bean() + @Qualifier("avConsumerFactory") public ConsumerFactory consumerFactory() { - return new DefaultKafkaConsumerFactory<>(consumerConfig()); + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } - @Bean("av-cons-configs") - public Map consumerConfig() { + @Bean() + @Qualifier("avConsumerConfigs") + public Map consumerConfigs() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -74,7 +79,8 @@ public class AvKafkaConfig { return props; } - @Bean("av-template") + @Bean() + @Qualifier("avTemplate") public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } From 365e5e4b7d1c33fcaf030167ddbc606b40ad52d7 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Tue, 1 Oct 2024 10:59:53 +0300 Subject: [PATCH 6/6] SUPPORT-8556: Fix --- config/micord.env | 2 +- config/standalone/dev/standalone.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/micord.env b/config/micord.env index 99fadcbc..90cc544f 100644 --- a/config/micord.env +++ b/config/micord.env @@ -16,7 +16,7 @@ AV_KAFKA_SASL_MECHANISM=SCRAM-SHA-256 AV_KAFKA_USERNAME=user1 AV_KAFKA_PASSWORD=Blfi9d2OFG AV_KAFKA_GROUP_ID=1 -AV_KAFKA_DOWNLOAD_RESPONSE=file-status +AV_KAFKA_DOWNLOAD_RESPONSE=ervu.lkrp.av-fileupload-status ERVU_FILEUPLOAD_MAX_FILE_SIZE=5242880 ERVU_FILEUPLOAD_MAX_REQUEST_SIZE=6291456 ERVU_FILEUPLOAD_FILE_SIZE_THRESHOLD=0 diff --git a/config/standalone/dev/standalone.xml b/config/standalone/dev/standalone.xml index 2ff5e841..1fc85543 100644 --- a/config/standalone/dev/standalone.xml +++ b/config/standalone/dev/standalone.xml @@ -95,7 +95,7 @@ - +