SUPPORT-8539: SUPPORT-8507: доработка приложения

This commit is contained in:
Рауф Латыпов 2024-09-13 07:28:38 +03:00
parent 9380f59587
commit 4317a82f50
15 changed files with 250 additions and 59 deletions

15
pom.xml
View file

@ -42,6 +42,12 @@
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -71,6 +77,11 @@
<artifactId>httpmime</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -89,6 +100,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jooq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>

View file

@ -3,11 +3,13 @@ package ru.micord.ervu.av;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author r.latypov
*/
@EnableRetry
@EnableTransactionManagement
@SpringBootApplication
public class FileUploadApplication {

View file

@ -37,7 +37,7 @@ public class InputKafkaConsumerConfig {
private String ackMode;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
public ConsumerFactory<String, String> inputConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
@ -54,10 +54,10 @@ public class InputKafkaConsumerConfig {
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
public ConcurrentKafkaListenerContainerFactory<String, String> inputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConsumerFactory(inputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}

View file

@ -0,0 +1,64 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
/**
* @author r.latypov
*/
@Configuration
@EnableKafka
public class OutputKafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.consumer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.listener.ack-mode}")
private String ackMode;
@Bean
public ConsumerFactory<String, String> outputConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> outputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(outputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

View file

@ -0,0 +1,27 @@
package ru.micord.ervu.av.kafka.dto;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
@Getter
@AllArgsConstructor
public static class FileInfo {
@Setter
private String fileUrl;
private final String fileId;
private final String fileName;
private final String filePatternCode;
private final String FilePatternName;
private final String departureDateTime;
private final String timeZone;
@Setter
private FileStatus fileStatus;
}
}

View file

@ -0,0 +1,11 @@
package ru.micord.ervu.av.kafka.dto;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
public record FileInfo(String fileId, FileStatus fileStatus) {
}
}

View file

@ -0,0 +1,19 @@
package ru.micord.ervu.av.kafka.dto;
/**
* @author r.latypov
*/
public record FileStatus(String code, String status, String description) {
public static final FileStatus FILE_STATUS_01 = new FileStatus("01", "Загрузка",
"Файл принят до проверки на вирусы"
);
public static final FileStatus FILE_STATUS_02 = new FileStatus("02", "Проверка не пройдена",
"Проверка на вирусы не пройдена"
);
public static final FileStatus FILE_STATUS_03 = new FileStatus("03", "Направлено в ЕРВУ",
"Проверка на вирусы пройдена успешно, файл направлен в очередь"
);
public static final FileStatus FILE_STATUS_04 = new FileStatus("04", "Получен ЕРВУ",
"Файл был принят в обработку"
);
}

View file

@ -1,36 +0,0 @@
package ru.micord.ervu.av.kafka.dto;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record InMessage(OrgInfo orgInfo, SenderInfo senderInfo, @NonNull FileInfo fileInfo) {
public record OrgInfo(String orgName, String orgTypeCode, String orgTypeName, String ogrn,
String in, String kpp) {
}
public record SenderInfo(String senderLastName, String senderFirstName,
String senderMiddleName, String birthDate, String senderRoleCode,
String senderRoleName, String snils, String idERN,
Document document) {
public record Document(String series, String number, String issueDate) {
}
}
@Getter
@AllArgsConstructor
public static class FileInfo {
@NonNull
@Setter
private String fileNameBase;
private final String fileName;
private final String filePatternCode;
private final String FilePatternName;
private final String departureDateTime;
}
}

View file

@ -0,0 +1,7 @@
package ru.micord.ervu.av.kafka.dto;
/**
* @author r.latypov
*/
public record OrgInfo(String orgId, String orgName, String prnOid) {
}

View file

@ -7,5 +7,5 @@ import ru.micord.ervu.av.response.AvResponse;
* @author r.latypov
*/
public record OutErrorMessage(String errorMessage, AvResponse avResponse,
@NonNull InMessage inMessage) {
@NonNull DownloadRequest inMessage) {
}

View file

@ -0,0 +1,40 @@
package ru.micord.ervu.av.service;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Table;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.table;
/**
* @author r.latypov
*/
@Service
public class FileStatusService {
public static final Table<Record> INTERACTION_LOG = table(name("public", "interaction_log"));
public static final Field<Long> INTERACTION_LOG_ID = field(name("id"), Long.class);
public static final Field<String> INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class);
public static final Field<String> INTERACTION_LOG_STATUS = field(name("status"), String.class);
@Autowired
private DSLContext dslContext;
public void setStatus(Long id, String status) {
dslContext.update(INTERACTION_LOG)
.set(INTERACTION_LOG_STATUS, status)
.where(INTERACTION_LOG_ID.eq(id))
.execute();
}
public void setStatus(String fileId, String status) {
dslContext.update(INTERACTION_LOG)
.set(INTERACTION_LOG_STATUS, status)
.where(INTERACTION_LOG_FILE_ID.eq(fileId))
.execute();
}
}

View file

@ -35,8 +35,9 @@ import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
import ru.micord.ervu.av.kafka.dto.InMessage;
import ru.micord.ervu.av.kafka.dto.OutErrorMessage;
import ru.micord.ervu.av.kafka.dto.DownloadRequest;
import ru.micord.ervu.av.kafka.dto.DownloadResponse;
import ru.micord.ervu.av.kafka.dto.FileStatus;
import ru.micord.ervu.av.response.AvFileSendResponse;
import ru.micord.ervu.av.response.AvResponse;
import ru.micord.ervu.av.s3.S3Service;
@ -57,25 +58,29 @@ public class FileUploadService {
private final NewTopic outErrorTopic;
private final NewTopic outSuccessTopic;
private final ReceiveScanReportRetryable receiveScanReportRetryable;
private final FileStatusService fileStatusService;
private final S3Service s3Service;
@Autowired
public FileUploadService(KafkaTemplate<String, String> kafkaTemplate, NewTopic outErrorTopic,
NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
S3Service s3Service) {
FileStatusService fileStatusService, S3Service s3Service) {
this.kafkaTemplate = kafkaTemplate;
this.outErrorTopic = outErrorTopic;
this.outSuccessTopic = outSuccessTopic;
this.receiveScanReportRetryable = receiveScanReportRetryable;
this.fileStatusService = fileStatusService;
this.s3Service = s3Service;
}
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}")
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
InMessage inMessage = new Gson().fromJson(kafkaInMessage, InMessage.class);
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
String fileId = downloadRequest.fileInfo().getFileId();
try {
FileUrl fileUrl = parseFileUrl(inMessage.fileInfo().getFileNameBase());
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
String filePath = fileSavingPath + fileUrl.fileName();
String downloadUrl = fileUrl.fileUrl();
downloadFile(downloadUrl, filePath);
@ -88,15 +93,20 @@ public class FileUploadService {
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_INFECTED));
if (infected || !clean) {
sendMessage(outErrorTopic.name(),
new OutErrorMessage("file is infected or not clean", avResponse, inMessage)
);
downloadRequest.fileInfo().setFileUrl(null);
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
sendMessage(outErrorTopic.name(), downloadRequest);
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status());
}
else {
s3Service.putFile(filePath, fileUrl.fileName());
inMessage.fileInfo().setFileNameBase(fileUrl.fileName());
sendMessage(outSuccessTopic.name(), inMessage);
downloadRequest.fileInfo().setFileUrl(fileUrl.fileName());
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
sendMessage(outSuccessTopic.name(), downloadRequest);
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status());
}
deleteFile(downloadUrl);
@ -108,11 +118,8 @@ public class FileUploadService {
// считаем, что повторная обработка сообщения не нужна
// ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения
logger.error(e.getMessage() + ": " + kafkaInMessage);
sendMessage(outErrorTopic.name(),
new OutErrorMessage(e.getMessage(), null, inMessage)
);
acknowledgment.acknowledge();
throw new RuntimeException(kafkaInMessage, e);
}
catch (FileUploadException e) {
// считаем, что нужно повторное считывание сообщения
@ -121,6 +128,23 @@ public class FileUploadService {
}
}
@KafkaListener(id = "${spring.kafka.out.consumer.group-id}",
topics = "${kafka-out.response.topic.name}",
containerFactory = "outputKafkaListenerContainerFactory")
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) {
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
DownloadResponse.class
);
String fileId = downloadResponse.fileInfo().fileId();
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status());
}
acknowledgment.acknowledge();
}
/* метод для выделения UUID файла из ссылки на файл
сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла
*/

View file

@ -21,8 +21,9 @@ import ru.micord.ervu.av.response.AvResponse;
*/
@Service
public class ReceiveScanReportRetryable {
@Retryable(retryFor = {RetryableException.class}, maxAttempts = 10,
backoff = @Backoff(delay = 1000))
@Retryable(retryFor = {RetryableException.class},
maxAttemptsExpression = "${av.retry.max-attempts.count}",
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
public AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get)
throws FileUploadException {

View file

@ -15,6 +15,7 @@ spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
#
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=file-to-upload-consumers
spring.kafka.out.consumer.group-id=response-consumers
#
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
#
@ -33,9 +34,12 @@ spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
kafka-in.topic.name=${IN_KAFKA_TOPIC_NAME}
kafka-out.error.topic.name=${OUT_KAFKA_ERROR_TOPIC_NAME}
kafka-out.success.topic.name=${OUT_KAFKA_SUCCESS_TOPIC_NAME}
kafka-out.response.topic.name=${OUT_KAFKA_RESPONSE_TOPIC_NAME}
#
av.rest.address=${AV_REST_ADDRESS}
av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS}
av.retry.max-attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT}
av.retry.delay.milliseconds=${AV_RETRY_DELAY_MILLISECONDS}
file.saving.path=${FILE_SAVING_PATH}
#
s3.out.endpoint=${S3_ENDPOINT}
@ -43,3 +47,10 @@ s3.out.port=${S3_PORT}
s3.out.access_key=${S3_ACCESS_KEY}
s3.out.secret_key=${S3_SECRET_KEY}
s3.out.bucket_name=${S3_OUT_BUCKET_NAME}
#
spring.jooq.sql-dialect=Postgres
spring.datasource.driver-class-name=org.postgresql.Driver
#host:port/database_name
spring.datasource.url=jdbc:postgresql://${SPRING_DATASOURCE_URL}
spring.datasource.username=${SPRING_DATASOURCE_USERNAME}
spring.datasource.password=${SPRING_DATASOURCE_PASSWORD}

View file

@ -7,13 +7,19 @@ IN_KAFKA_TOPIC_NAME=file-to-upload
OUT_KAFKA_SERVERS=10.10.31.11:32609
OUT_KAFKA_USERNAME=user1
OUT_KAFKA_PASSWORD=Blfi9d2OFG
OUT_KAFKA_ERROR_TOPIC_NAME=error
OUT_KAFKA_SUCCESS_TOPIC_NAME=success
OUT_KAFKA_ERROR_TOPIC_NAME=ervu.lkrp.download.request
OUT_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request
OUT_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response
AV_REST_ADDRESS=http://10.10.31.118:8085/scans
AV_FIRST_TIMEOUT_MILLISECONDS=1000
AV_RETRY_MAX_ATTEMPTS_COUNT=10
AV_RETRY_DELAY_MILLISECONDS=1000
FILE_SAVING_PATH=/transfer/
S3_ENDPOINT=http://ervu-minio.k8s.micord.ru
S3_PORT=31900
S3_ACCESS_KEY=Keyq0l8IRarEf5GmpvEO
S3_SECRET_KEY=8A2epSoI6OjdHHwA5F6tHxeYRThv47GdGwcBrV7a
S3_OUT_BUCKET_NAME=default-out-bucket
SPRING_DATASOURCE_URL=10.10.31.119:5432/ervu-lkrp-ul
SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul
SPRING_DATASOURCE_PASSWORD=ervu-lkrp-ul