diff --git a/pom.xml b/pom.xml
index b2c3c2f..e0d11d0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,12 @@
4.5.14
+
+ org.postgresql
+ postgresql
+ 42.7.3
+
+
org.projectlombok
lombok
@@ -71,6 +77,11 @@
httpmime
+
+ org.postgresql
+ postgresql
+
+
org.projectlombok
lombok
@@ -89,6 +100,10 @@
org.springframework.boot
spring-boot-starter
+
+ org.springframework.boot
+ spring-boot-starter-jooq
+
org.springframework.kafka
diff --git a/src/main/java/ru/micord/ervu/av/FileUploadApplication.java b/src/main/java/ru/micord/ervu/av/FileUploadApplication.java
index 4ef2923..e1e7979 100644
--- a/src/main/java/ru/micord/ervu/av/FileUploadApplication.java
+++ b/src/main/java/ru/micord/ervu/av/FileUploadApplication.java
@@ -3,11 +3,13 @@ package ru.micord.ervu.av;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author r.latypov
*/
@EnableRetry
+@EnableTransactionManagement
@SpringBootApplication
public class FileUploadApplication {
diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java
index 672a8f3..7d1f8cc 100644
--- a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java
+++ b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java
@@ -37,7 +37,7 @@ public class InputKafkaConsumerConfig {
private String ackMode;
@Bean
- public ConsumerFactory consumerFactory() {
+ public ConsumerFactory inputConsumerFactory() {
Map configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
@@ -54,10 +54,10 @@ public class InputKafkaConsumerConfig {
}
@Bean
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ public ConcurrentKafkaListenerContainerFactory inputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
+ factory.setConsumerFactory(inputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java
new file mode 100644
index 0000000..5588680
--- /dev/null
+++ b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java
@@ -0,0 +1,64 @@
+package ru.micord.ervu.av.kafka.config.output;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+
+/**
+ * @author r.latypov
+ */
+@Configuration
+@EnableKafka
+public class OutputKafkaConsumerConfig {
+ @Value("${spring.kafka.bootstrap-servers}")
+ private List bootstrapAddress;
+ @Value("${spring.kafka.consumer.security.protocol}")
+ private String securityProtocol;
+ @Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
+ private String jaasConfig;
+ @Value("${spring.kafka.consumer.properties.sasl.mechanism}")
+ private String saslMechanism;
+ @Value("${spring.kafka.consumer.enable-auto-commit}")
+ private String enableAutoCommit;
+ @Value("${spring.kafka.listener.ack-mode}")
+ private String ackMode;
+
+ @Bean
+ public ConsumerFactory outputConsumerFactory() {
+ Map configs = new HashMap<>();
+
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
+ configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+
+ configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+
+ return new DefaultKafkaConsumerFactory<>(configs);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory outputKafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(outputConsumerFactory());
+ factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+ return factory;
+ }
+}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java
new file mode 100644
index 0000000..eb49660
--- /dev/null
+++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java
@@ -0,0 +1,27 @@
+package ru.micord.ervu.av.kafka.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.lang.NonNull;
+
+/**
+ * @author r.latypov
+ */
+public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
+
+ @Getter
+ @AllArgsConstructor
+ public static class FileInfo {
+ @Setter
+ private String fileUrl;
+ private final String fileId;
+ private final String fileName;
+ private final String filePatternCode;
+ private final String FilePatternName;
+ private final String departureDateTime;
+ private final String timeZone;
+ @Setter
+ private FileStatus fileStatus;
+ }
+}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java
new file mode 100644
index 0000000..56d7b7e
--- /dev/null
+++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java
@@ -0,0 +1,11 @@
+package ru.micord.ervu.av.kafka.dto;
+
+import org.springframework.lang.NonNull;
+
+/**
+ * @author r.latypov
+ */
+public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
+ public record FileInfo(String fileId, FileStatus fileStatus) {
+ }
+}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/FileStatus.java b/src/main/java/ru/micord/ervu/av/kafka/dto/FileStatus.java
new file mode 100644
index 0000000..95729f0
--- /dev/null
+++ b/src/main/java/ru/micord/ervu/av/kafka/dto/FileStatus.java
@@ -0,0 +1,19 @@
+package ru.micord.ervu.av.kafka.dto;
+
+/**
+ * @author r.latypov
+ */
+public record FileStatus(String code, String status, String description) {
+ public static final FileStatus FILE_STATUS_01 = new FileStatus("01", "Загрузка",
+ "Файл принят до проверки на вирусы"
+ );
+ public static final FileStatus FILE_STATUS_02 = new FileStatus("02", "Проверка не пройдена",
+ "Проверка на вирусы не пройдена"
+ );
+ public static final FileStatus FILE_STATUS_03 = new FileStatus("03", "Направлено в ЕРВУ",
+ "Проверка на вирусы пройдена успешно, файл направлен в очередь"
+ );
+ public static final FileStatus FILE_STATUS_04 = new FileStatus("04", "Получен ЕРВУ",
+ "Файл был принят в обработку"
+ );
+}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/InMessage.java b/src/main/java/ru/micord/ervu/av/kafka/dto/InMessage.java
deleted file mode 100644
index 98f997e..0000000
--- a/src/main/java/ru/micord/ervu/av/kafka/dto/InMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package ru.micord.ervu.av.kafka.dto;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.lang.NonNull;
-
-/**
- * @author r.latypov
- */
-public record InMessage(OrgInfo orgInfo, SenderInfo senderInfo, @NonNull FileInfo fileInfo) {
- public record OrgInfo(String orgName, String orgTypeCode, String orgTypeName, String ogrn,
- String in, String kpp) {
- }
-
- public record SenderInfo(String senderLastName, String senderFirstName,
- String senderMiddleName, String birthDate, String senderRoleCode,
- String senderRoleName, String snils, String idERN,
- Document document) {
-
- public record Document(String series, String number, String issueDate) {
- }
- }
-
- @Getter
- @AllArgsConstructor
- public static class FileInfo {
- @NonNull
- @Setter
- private String fileNameBase;
- private final String fileName;
- private final String filePatternCode;
- private final String FilePatternName;
- private final String departureDateTime;
- }
-}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/OrgInfo.java b/src/main/java/ru/micord/ervu/av/kafka/dto/OrgInfo.java
new file mode 100644
index 0000000..12fcbbf
--- /dev/null
+++ b/src/main/java/ru/micord/ervu/av/kafka/dto/OrgInfo.java
@@ -0,0 +1,7 @@
+package ru.micord.ervu.av.kafka.dto;
+
+/**
+ * @author r.latypov
+ */
+public record OrgInfo(String orgId, String orgName, String prnOid) {
+}
diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/OutErrorMessage.java b/src/main/java/ru/micord/ervu/av/kafka/dto/OutErrorMessage.java
index eca7a3a..7a9add0 100644
--- a/src/main/java/ru/micord/ervu/av/kafka/dto/OutErrorMessage.java
+++ b/src/main/java/ru/micord/ervu/av/kafka/dto/OutErrorMessage.java
@@ -7,5 +7,5 @@ import ru.micord.ervu.av.response.AvResponse;
* @author r.latypov
*/
public record OutErrorMessage(String errorMessage, AvResponse avResponse,
- @NonNull InMessage inMessage) {
+ @NonNull DownloadRequest inMessage) {
}
diff --git a/src/main/java/ru/micord/ervu/av/service/FileStatusService.java b/src/main/java/ru/micord/ervu/av/service/FileStatusService.java
new file mode 100644
index 0000000..82e2c7c
--- /dev/null
+++ b/src/main/java/ru/micord/ervu/av/service/FileStatusService.java
@@ -0,0 +1,40 @@
+package ru.micord.ervu.av.service;
+
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Table;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import static org.jooq.impl.DSL.field;
+import static org.jooq.impl.DSL.name;
+import static org.jooq.impl.DSL.table;
+
+/**
+ * @author r.latypov
+ */
+@Service
+public class FileStatusService {
+ public static final Table INTERACTION_LOG = table(name("public", "interaction_log"));
+ public static final Field INTERACTION_LOG_ID = field(name("id"), Long.class);
+ public static final Field INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class);
+ public static final Field INTERACTION_LOG_STATUS = field(name("status"), String.class);
+
+ @Autowired
+ private DSLContext dslContext;
+
+ public void setStatus(Long id, String status) {
+ dslContext.update(INTERACTION_LOG)
+ .set(INTERACTION_LOG_STATUS, status)
+ .where(INTERACTION_LOG_ID.eq(id))
+ .execute();
+ }
+
+ public void setStatus(String fileId, String status) {
+ dslContext.update(INTERACTION_LOG)
+ .set(INTERACTION_LOG_STATUS, status)
+ .where(INTERACTION_LOG_FILE_ID.eq(fileId))
+ .execute();
+ }
+}
diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java
index f180a03..bd00a74 100644
--- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java
+++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java
@@ -35,8 +35,9 @@ import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
-import ru.micord.ervu.av.kafka.dto.InMessage;
-import ru.micord.ervu.av.kafka.dto.OutErrorMessage;
+import ru.micord.ervu.av.kafka.dto.DownloadRequest;
+import ru.micord.ervu.av.kafka.dto.DownloadResponse;
+import ru.micord.ervu.av.kafka.dto.FileStatus;
import ru.micord.ervu.av.response.AvFileSendResponse;
import ru.micord.ervu.av.response.AvResponse;
import ru.micord.ervu.av.s3.S3Service;
@@ -57,25 +58,29 @@ public class FileUploadService {
private final NewTopic outErrorTopic;
private final NewTopic outSuccessTopic;
private final ReceiveScanReportRetryable receiveScanReportRetryable;
+ private final FileStatusService fileStatusService;
private final S3Service s3Service;
@Autowired
public FileUploadService(KafkaTemplate kafkaTemplate, NewTopic outErrorTopic,
NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
- S3Service s3Service) {
+ FileStatusService fileStatusService, S3Service s3Service) {
this.kafkaTemplate = kafkaTemplate;
this.outErrorTopic = outErrorTopic;
this.outSuccessTopic = outSuccessTopic;
this.receiveScanReportRetryable = receiveScanReportRetryable;
+ this.fileStatusService = fileStatusService;
this.s3Service = s3Service;
}
- @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}")
+ @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}",
+ containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
- InMessage inMessage = new Gson().fromJson(kafkaInMessage, InMessage.class);
+ DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
+ String fileId = downloadRequest.fileInfo().getFileId();
try {
- FileUrl fileUrl = parseFileUrl(inMessage.fileInfo().getFileNameBase());
+ FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
String filePath = fileSavingPath + fileUrl.fileName();
String downloadUrl = fileUrl.fileUrl();
downloadFile(downloadUrl, filePath);
@@ -88,15 +93,20 @@ public class FileUploadService {
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_INFECTED));
if (infected || !clean) {
- sendMessage(outErrorTopic.name(),
- new OutErrorMessage("file is infected or not clean", avResponse, inMessage)
- );
+ downloadRequest.fileInfo().setFileUrl(null);
+ downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
+ sendMessage(outErrorTopic.name(), downloadRequest);
+
+ fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status());
}
else {
s3Service.putFile(filePath, fileUrl.fileName());
- inMessage.fileInfo().setFileNameBase(fileUrl.fileName());
- sendMessage(outSuccessTopic.name(), inMessage);
+ downloadRequest.fileInfo().setFileUrl(fileUrl.fileName());
+ downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
+ sendMessage(outSuccessTopic.name(), downloadRequest);
+
+ fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status());
}
deleteFile(downloadUrl);
@@ -108,11 +118,8 @@ public class FileUploadService {
// считаем, что повторная обработка сообщения не нужна
// ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения
logger.error(e.getMessage() + ": " + kafkaInMessage);
-
- sendMessage(outErrorTopic.name(),
- new OutErrorMessage(e.getMessage(), null, inMessage)
- );
acknowledgment.acknowledge();
+ throw new RuntimeException(kafkaInMessage, e);
}
catch (FileUploadException e) {
// считаем, что нужно повторное считывание сообщения
@@ -121,6 +128,23 @@ public class FileUploadService {
}
}
+ @KafkaListener(id = "${spring.kafka.out.consumer.group-id}",
+ topics = "${kafka-out.response.topic.name}",
+ containerFactory = "outputKafkaListenerContainerFactory")
+ public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) {
+ DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
+ DownloadResponse.class
+ );
+
+ String fileId = downloadResponse.fileInfo().fileId();
+ FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
+ if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
+ fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status());
+ }
+
+ acknowledgment.acknowledge();
+ }
+
/* метод для выделения UUID файла из ссылки на файл
сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла
*/
diff --git a/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java b/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java
index 1cb8027..3c4f2cf 100644
--- a/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java
+++ b/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java
@@ -21,8 +21,9 @@ import ru.micord.ervu.av.response.AvResponse;
*/
@Service
public class ReceiveScanReportRetryable {
- @Retryable(retryFor = {RetryableException.class}, maxAttempts = 10,
- backoff = @Backoff(delay = 1000))
+ @Retryable(retryFor = {RetryableException.class},
+ maxAttemptsExpression = "${av.retry.max-attempts.count}",
+ backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
public AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get)
throws FileUploadException {
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 43f5667..2238b29 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -15,6 +15,7 @@ spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
#
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=file-to-upload-consumers
+spring.kafka.out.consumer.group-id=response-consumers
#
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
#
@@ -33,9 +34,12 @@ spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
kafka-in.topic.name=${IN_KAFKA_TOPIC_NAME}
kafka-out.error.topic.name=${OUT_KAFKA_ERROR_TOPIC_NAME}
kafka-out.success.topic.name=${OUT_KAFKA_SUCCESS_TOPIC_NAME}
+kafka-out.response.topic.name=${OUT_KAFKA_RESPONSE_TOPIC_NAME}
#
av.rest.address=${AV_REST_ADDRESS}
av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS}
+av.retry.max-attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT}
+av.retry.delay.milliseconds=${AV_RETRY_DELAY_MILLISECONDS}
file.saving.path=${FILE_SAVING_PATH}
#
s3.out.endpoint=${S3_ENDPOINT}
@@ -43,3 +47,10 @@ s3.out.port=${S3_PORT}
s3.out.access_key=${S3_ACCESS_KEY}
s3.out.secret_key=${S3_SECRET_KEY}
s3.out.bucket_name=${S3_OUT_BUCKET_NAME}
+#
+spring.jooq.sql-dialect=Postgres
+spring.datasource.driver-class-name=org.postgresql.Driver
+#host:port/database_name
+spring.datasource.url=jdbc:postgresql://${SPRING_DATASOURCE_URL}
+spring.datasource.username=${SPRING_DATASOURCE_USERNAME}
+spring.datasource.password=${SPRING_DATASOURCE_PASSWORD}
diff --git a/test.env b/test.env
index aece32d..01c5d43 100644
--- a/test.env
+++ b/test.env
@@ -7,13 +7,19 @@ IN_KAFKA_TOPIC_NAME=file-to-upload
OUT_KAFKA_SERVERS=10.10.31.11:32609
OUT_KAFKA_USERNAME=user1
OUT_KAFKA_PASSWORD=Blfi9d2OFG
-OUT_KAFKA_ERROR_TOPIC_NAME=error
-OUT_KAFKA_SUCCESS_TOPIC_NAME=success
+OUT_KAFKA_ERROR_TOPIC_NAME=ervu.lkrp.download.request
+OUT_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request
+OUT_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response
AV_REST_ADDRESS=http://10.10.31.118:8085/scans
AV_FIRST_TIMEOUT_MILLISECONDS=1000
+AV_RETRY_MAX_ATTEMPTS_COUNT=10
+AV_RETRY_DELAY_MILLISECONDS=1000
FILE_SAVING_PATH=/transfer/
S3_ENDPOINT=http://ervu-minio.k8s.micord.ru
S3_PORT=31900
S3_ACCESS_KEY=Keyq0l8IRarEf5GmpvEO
S3_SECRET_KEY=8A2epSoI6OjdHHwA5F6tHxeYRThv47GdGwcBrV7a
S3_OUT_BUCKET_NAME=default-out-bucket
+SPRING_DATASOURCE_URL=10.10.31.119:5432/ervu-lkrp-ul
+SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul
+SPRING_DATASOURCE_PASSWORD=ervu-lkrp-ul