Merge branch 'feature/SUPPORT-8507_adding_av_responses_treatments' into release/1.0.0

This commit is contained in:
Рауф Латыпов 2024-09-16 10:47:42 +03:00
commit df9c67aaa6
25 changed files with 1081 additions and 14 deletions

22
.gitignore vendored Normal file
View file

@ -0,0 +1,22 @@
#ignore target and tmp dir
target*/
tmp*/
debug.log
#
# IntelliJ IDEA project files
#
.idea*/
.classes*/
*.ipr
*.iml
*.iws
*.ids
# os meta files
Thumbs.db
.DS_Store
*.orig
pom.xml.versionsBackup
.flattened-pom.xml

4
Dockerfile Normal file
View file

@ -0,0 +1,4 @@
FROM bellsoft/liberica-openjdk-alpine:17-cds
COPY target/*.jar app.jar
CMD ["java", "-jar", "app.jar"]

View file

@ -8,19 +8,6 @@ services:
networks: networks:
- lkrp_av - lkrp_av
minio:
image: minio/minio
restart: always
env_file:
- test.env
volumes:
- minio_data:/data
ports:
- '${MINIO_HOST_PORT}:9000'
command: server /data
networks:
- lkrp_av
networks: networks:
lkrp_av: lkrp_av:

128
pom.xml Normal file
View file

@ -0,0 +1,128 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/>
</parent>
<groupId>ru.cg.subproject</groupId>
<artifactId>file-upload</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.12.759</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jooq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,19 @@
package ru.micord.ervu.av;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author r.latypov
*/
@EnableRetry
@EnableTransactionManagement
@SpringBootApplication
public class FileUploadApplication {
public static void main(String[] args) {
SpringApplication.run(FileUploadApplication.class, args);
}
}

View file

@ -0,0 +1,18 @@
package ru.micord.ervu.av.exception;
/**
* @author r.latypov
*/
public class FileUploadException extends Exception {
public FileUploadException(String message) {
super(message);
}
public FileUploadException(Throwable cause) {
super(cause);
}
public FileUploadException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,16 @@
package ru.micord.ervu.av.exception;
/**
* @author r.latypov
*/
public class InvalidHttpFileUrlException extends Exception {
private static final String MESSAGE = "file url is not valid";
public InvalidHttpFileUrlException(String message) {
super(String.join(" : ", MESSAGE, message));
}
public InvalidHttpFileUrlException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,7 @@
package ru.micord.ervu.av.exception;
/**
* @author r.latypov
*/
public class RetryableException extends RuntimeException {
}

View file

@ -0,0 +1,64 @@
package ru.micord.ervu.av.kafka.config.input;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
/**
* @author r.latypov
*/
@Configuration
@EnableKafka
public class InputKafkaConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.consumer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.listener.ack-mode}")
private String ackMode;
@Bean
public ConsumerFactory<String, String> inputConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> inputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(inputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

View file

@ -0,0 +1,64 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
/**
* @author r.latypov
*/
@Configuration
@EnableKafka
public class OutputKafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.consumer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.listener.ack-mode}")
private String ackMode;
@Bean
public ConsumerFactory<String, String> outputConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> outputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(outputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

View file

@ -0,0 +1,51 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author r.latypov
*/
@Configuration
public class OutputKafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.producer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.producer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.producer.properties.sasl.mechanism}")
private String saslMechanism;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

View file

@ -0,0 +1,28 @@
package ru.micord.ervu.av.kafka.config.output;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
/**
* @author r.latypov
*/
@Configuration
public class OutputKafkaTopicConfig {
@Value("${kafka-out.error.topic.name}")
private String kafkaOutErrorTopicName;
@Value("${kafka-out.success.topic.name}")
private String kafkaOutSuccessTopicName;
@Bean
public NewTopic outErrorTopic() {
return TopicBuilder.name(kafkaOutErrorTopicName).build();
}
@Bean
public NewTopic outSuccessTopic() {
return TopicBuilder.name(kafkaOutSuccessTopicName).build();
}
}

View file

@ -0,0 +1,27 @@
package ru.micord.ervu.av.kafka.dto;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
@Getter
@AllArgsConstructor
public static class FileInfo {
@Setter
private String fileUrl;
private final String fileId;
private final String fileName;
private final String filePatternCode;
private final String FilePatternName;
private final String departureDateTime;
private final String timeZone;
@Setter
private FileStatus fileStatus;
}
}

View file

@ -0,0 +1,11 @@
package ru.micord.ervu.av.kafka.dto;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
public record FileInfo(String fileId, FileStatus fileStatus) {
}
}

View file

@ -0,0 +1,19 @@
package ru.micord.ervu.av.kafka.dto;
/**
* @author r.latypov
*/
public record FileStatus(String code, String status, String description) {
public static final FileStatus FILE_STATUS_01 = new FileStatus("01", "Загрузка",
"Файл принят до проверки на вирусы"
);
public static final FileStatus FILE_STATUS_02 = new FileStatus("02", "Проверка не пройдена",
"Проверка на вирусы не пройдена"
);
public static final FileStatus FILE_STATUS_03 = new FileStatus("03", "Направлено в ЕРВУ",
"Проверка на вирусы пройдена успешно, файл направлен в очередь"
);
public static final FileStatus FILE_STATUS_04 = new FileStatus("04", "Получен ЕРВУ",
"Файл был принят в обработку"
);
}

View file

@ -0,0 +1,7 @@
package ru.micord.ervu.av.kafka.dto;
/**
* @author r.latypov
*/
public record OrgInfo(String orgId, String orgName, String prnOid) {
}

View file

@ -0,0 +1,11 @@
package ru.micord.ervu.av.response;
/**
* @author r.latypov
*/
public record AvFileSendResponse(String id, String location, Error error, String status) {
public static final String STATUS_ERROR = "error";
public record Error(String code, String message) {
}
}

View file

@ -0,0 +1,17 @@
package ru.micord.ervu.av.response;
import java.util.Map;
/**
* @author r.latypov
*/
public record AvResponse(String completed, String created, Integer progress,
Map<String, Scan> scan_result, String status, String[] verdicts) {
public record Scan(String started, String stopped, Threat[] threats, String verdict) {
public static final String VERDICT_CLEAN = "clean";
public static final String VERDICT_INFECTED = "infected";
public record Threat(String name, String object) {
}
}
}

View file

@ -0,0 +1,52 @@
package ru.micord.ervu.av.s3;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author r.latypov
*/
@Configuration
public class S3Connection {
@Value("${s3.out.endpoint}")
private String endpointOut;
@Value("${s3.out.port:9000}")
private int portOut;
@Value("${s3.out.access_key}")
private String accessKeyOut;
@Value("${s3.out.secret_key}")
private String secretKeyOut;
@Value("${s3.out.bucket_name}")
private String bucketNameOut;
@Bean("outBucketName")
public String getBucketNameOut() {
return bucketNameOut;
}
@Bean("outClient")
public AmazonS3 getS3OutClient() {
return getS3Client(endpointOut, portOut, accessKeyOut, secretKeyOut);
}
private static AmazonS3 getS3Client(String endpoint, int port, String accessKey,
String secretKey) {
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
String s3Endpoint = endpoint + ":" + port;
String region = Region.getRegion(Regions.DEFAULT_REGION).toString();
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3Endpoint, region))
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build();
}
}

View file

@ -0,0 +1,48 @@
package ru.micord.ervu.av.s3;
import java.io.File;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
/**
* @author r.latypov
*/
@Service
public class S3Service {
private final String outBucketName;
private final AmazonS3 outClient;
@Autowired
public S3Service(String outBucketName, AmazonS3 outClient) {
this.outBucketName = outBucketName;
this.outClient = outClient;
}
@PostConstruct
private void init() {
if (!outClient.doesBucketExistV2(outBucketName)) {
outClient.createBucket(outBucketName);
}
}
public void putFile(String filePath, String key) throws FileUploadException {
try {
outClient.putObject(outBucketName, generateResourceName(outBucketName, key),
new File(filePath)
);
}
catch (AmazonServiceException e) {
// todo message
throw new FileUploadException(e);
}
}
private static String generateResourceName(String bucketName, String key) {
return String.join("/", bucketName, key);
}
}

View file

@ -0,0 +1,40 @@
package ru.micord.ervu.av.service;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Table;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.table;
/**
* @author r.latypov
*/
@Service
public class FileStatusService {
public static final Table<Record> INTERACTION_LOG = table(name("public", "interaction_log"));
public static final Field<Long> INTERACTION_LOG_ID = field(name("id"), Long.class);
public static final Field<String> INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class);
public static final Field<String> INTERACTION_LOG_STATUS = field(name("status"), String.class);
@Autowired
private DSLContext dslContext;
public void setStatus(Long id, String status) {
dslContext.update(INTERACTION_LOG)
.set(INTERACTION_LOG_STATUS, status)
.where(INTERACTION_LOG_ID.eq(id))
.execute();
}
public void setStatus(String fileId, String status) {
dslContext.update(INTERACTION_LOG)
.set(INTERACTION_LOG_STATUS, status)
.where(INTERACTION_LOG_FILE_ID.eq(fileId))
.execute();
}
}

View file

@ -0,0 +1,294 @@
package ru.micord.ervu.av.service;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.entity.mime.content.FileBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
import ru.micord.ervu.av.kafka.dto.DownloadRequest;
import ru.micord.ervu.av.kafka.dto.DownloadResponse;
import ru.micord.ervu.av.kafka.dto.FileStatus;
import ru.micord.ervu.av.response.AvFileSendResponse;
import ru.micord.ervu.av.response.AvResponse;
import ru.micord.ervu.av.s3.S3Service;
/**
* @author r.latypov
*/
@Service
public class FileUploadService {
private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class);
@Value("${av.rest.address}")
private String avRestAddress;
@Value("${av.first.timeout.milliseconds}")
private Long avFirstTimeoutMilliseconds;
@Value("${file.saving.path}")
private String fileSavingPath;
private final KafkaTemplate<String, String> kafkaTemplate;
private final NewTopic outErrorTopic;
private final NewTopic outSuccessTopic;
private final ReceiveScanReportRetryable receiveScanReportRetryable;
private final FileStatusService fileStatusService;
private final S3Service s3Service;
@Autowired
public FileUploadService(KafkaTemplate<String, String> kafkaTemplate, NewTopic outErrorTopic,
NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
FileStatusService fileStatusService, S3Service s3Service) {
this.kafkaTemplate = kafkaTemplate;
this.outErrorTopic = outErrorTopic;
this.outSuccessTopic = outSuccessTopic;
this.receiveScanReportRetryable = receiveScanReportRetryable;
this.fileStatusService = fileStatusService;
this.s3Service = s3Service;
}
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
String fileId = downloadRequest.fileInfo().getFileId();
try {
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
String filePath = fileSavingPath + fileUrl.fileName();
String downloadUrl = fileUrl.fileUrl();
downloadFile(downloadUrl, filePath);
AvResponse avResponse = checkFile(filePath);
boolean clean = Arrays.stream(avResponse.verdicts())
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_CLEAN));
boolean infected = Arrays.stream(avResponse.verdicts())
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_INFECTED));
if (infected || !clean) {
downloadRequest.fileInfo().setFileUrl(null);
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
sendMessage(outErrorTopic.name(), downloadRequest);
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status());
}
else {
s3Service.putFile(filePath, fileUrl.fileName());
downloadRequest.fileInfo().setFileUrl(fileUrl.fileName());
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
sendMessage(outSuccessTopic.name(), downloadRequest);
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status());
}
deleteFile(downloadUrl);
if (new File(filePath).delete()) {
acknowledgment.acknowledge();
}
}
catch (InvalidHttpFileUrlException e) {
// считаем, что повторная обработка сообщения не нужна
// ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения
logger.error(e.getMessage() + ": " + kafkaInMessage);
acknowledgment.acknowledge();
throw new RuntimeException(kafkaInMessage, e);
}
catch (FileUploadException e) {
// считаем, что нужно повторное считывание сообщения
// ошибку логируем, сообщение оставляем непрочитанным
logger.error(e.getMessage(), e);
}
}
@KafkaListener(id = "${spring.kafka.out.consumer.group-id}",
topics = "${kafka-out.response.topic.name}",
containerFactory = "outputKafkaListenerContainerFactory")
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) {
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
DownloadResponse.class
);
String fileId = downloadResponse.fileInfo().fileId();
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status());
}
acknowledgment.acknowledge();
}
/* метод для выделения UUID файла из ссылки на файл
сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла
*/
private static FileUrl parseFileUrl(@NonNull String fileUrl) throws InvalidHttpFileUrlException {
String[] substrings = fileUrl.split("/");
String fileName = substrings[substrings.length - 1];
if (substrings.length == 1 || fileName.isBlank()) {
// ошибка данных; сообщение некорректно
throw new InvalidHttpFileUrlException(fileUrl);
}
else {
return new FileUrl(fileName, fileUrl);
}
}
private void downloadFile(String fileUrl, String filePath)
throws InvalidHttpFileUrlException, FileUploadException {
File file = new File(filePath);
HttpGet request = new HttpGet(fileUrl);
try (CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(request)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.OK.value()) {
HttpEntity entity = response.getEntity();
if (entity != null) {
try (FileOutputStream outputStream = new FileOutputStream(file)) {
entity.writeTo(outputStream);
}
}
}
else {
// в хранилище не обнаружено файла; сообщение некорректно
String message = "http status code " + statusCode + " : " + fileUrl;
throw new InvalidHttpFileUrlException(message);
}
}
catch (HttpHostConnectException e) {
throw new FileUploadException(e);
}
catch (IOException e) {
// хранилище недоступно; сообщение некорректно
String message =
(e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl;
throw new InvalidHttpFileUrlException(message, e);
}
}
private AvResponse checkFile(String filePath) throws FileUploadException {
File file = new File(filePath);
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpPost post = new HttpPost(avRestAddress);
HttpEntity entity = MultipartEntityBuilder.create()
.addPart("file", new FileBody(file))
.build();
post.setEntity(entity);
try (CloseableHttpResponse postResponse = client.execute(post)) {
int postStatusCode = postResponse.getStatusLine().getStatusCode();
String postResponseJson = EntityUtils.toString(postResponse.getEntity());
AvFileSendResponse avFileSendResponse;
try {
avFileSendResponse = new Gson().fromJson(postResponseJson, AvFileSendResponse.class);
}
catch (JsonSyntaxException e) {
throw new FileUploadException("error json: " + postResponseJson, e);
}
if (postStatusCode != HttpStatus.CREATED.value()) {
StringBuilder stringBuilder = new StringBuilder(
"http status code " + postStatusCode + " for " + avRestAddress + " post request.");
String status = avFileSendResponse.status();
if (status != null) {
stringBuilder.append(" Status: ").append(status).append(".");
}
if (avFileSendResponse.error() != null) {
stringBuilder.append(" Error code: ")
.append(avFileSendResponse.error().code())
.append(". Error message: ")
.append(avFileSendResponse.error().message())
.append(". ");
}
throw new FileUploadException(stringBuilder.toString());
}
String id = avFileSendResponse.id();
String reportRequestUri = avRestAddress + "/" + id;
HttpGet get = new HttpGet(reportRequestUri);
// waiting for timeout time before first request
try {
TimeUnit.MILLISECONDS.sleep(
avFirstTimeoutMilliseconds == null ? 1000L : avFirstTimeoutMilliseconds);
}
catch (InterruptedException e) {
throw new FileUploadException(e);
}
return receiveScanReportRetryable.receiveScanReport(client, get);
}
}
catch (IOException e) {
// непредусмотренная ошибка доступа через http-клиент
throw new FileUploadException(e);
}
}
private void deleteFile(String fileUrl) throws FileUploadException {
try (CloseableHttpClient client = HttpClients.createDefault()) {
HttpDelete delete = new HttpDelete(fileUrl);
try (CloseableHttpResponse response = client.execute(delete)) {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.NO_CONTENT.value()) {
String message = "http status code " + statusCode + " : " + fileUrl;
throw new RuntimeException(message);
}
}
}
catch (IOException e) {
// непредусмотренная ошибка доступа через http-клиент
throw new FileUploadException(e);
}
}
private void sendMessage(@NonNull String topicName, Object object) {
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName,
new GsonBuilder().setPrettyPrinting().create().toJson(object)
);
future.whenComplete((result, e) -> {
if (e != null) {
String message =
"Unable to send message [" + result.getProducerRecord().value() + "] into kafka topic '"
+ topicName + "' due to : " + e.getMessage();
throw new RuntimeException(message, e);
}
});
}
private record FileUrl(String fileName, String fileUrl) {
}
}

View file

@ -0,0 +1,54 @@
package ru.micord.ervu.av.service;
import java.io.IOException;
import com.google.gson.Gson;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.springframework.http.HttpStatus;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.RetryableException;
import ru.micord.ervu.av.response.AvResponse;
/**
* @author r.latypov
*/
@Service
public class ReceiveScanReportRetryable {
@Retryable(retryFor = {RetryableException.class},
maxAttemptsExpression = "${av.retry.max-attempts.count}",
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
public AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get)
throws FileUploadException {
try (CloseableHttpResponse getResponse = client.execute(get)) {
int getStatusCode = getResponse.getStatusLine().getStatusCode();
if (getStatusCode == HttpStatus.OK.value()) {
String getResponseJson = EntityUtils.toString(getResponse.getEntity());
AvResponse avResponse = new Gson().fromJson(getResponseJson, AvResponse.class);
if (avResponse.completed() == null) {
throw new RetryableException();
}
return avResponse;
}
else {
throw new FileUploadException("http status code " + getStatusCode + " for " + get.getURI()
+ " get request.");
}
}
catch (ClientProtocolException e) {
// непредусмотренная ошибка доступа через http-клиент
throw new RuntimeException(e);
}
catch (IOException e) {
// непредусмотренная ошибка доступа через http-клиент
throw new FileUploadException(e);
}
}
}

View file

@ -0,0 +1,56 @@
spring.kafka.admin.security.protocol=SASL_PLAINTEXT
#login password to set
spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${OUT_KAFKA_USERNAME}" password="${OUT_KAFKA_PASSWORD}";
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
#
#host1:port1, host2:port2
spring.kafka.bootstrap-servers=${OUT_KAFKA_SERVERS}
#
#host1:port1, host2:port2
spring.kafka.consumer.bootstrap-servers=${IN_KAFKA_SERVERS}
spring.kafka.consumer.security.protocol=SASL_PLAINTEXT
#login password to set
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${IN_KAFKA_USERNAME}" password="${IN_KAFKA_PASSWORD}";
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
#
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.group-id=file-to-upload-consumers
spring.kafka.out.consumer.group-id=response-consumers
#
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
#
#host1:port1, host2:port2
spring.kafka.producer.bootstrap-servers=${OUT_KAFKA_SERVERS}
spring.kafka.producer.security.protocol=SASL_PLAINTEXT
#login password to set
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${OUT_KAFKA_USERNAME}" password="${OUT_KAFKA_PASSWORD}";
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
#
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
#login password to set
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${OUT_KAFKA_USERNAME}" password="${OUT_KAFKA_PASSWORD}";
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
#
kafka-in.topic.name=${IN_KAFKA_TOPIC_NAME}
kafka-out.error.topic.name=${OUT_KAFKA_ERROR_TOPIC_NAME}
kafka-out.success.topic.name=${OUT_KAFKA_SUCCESS_TOPIC_NAME}
kafka-out.response.topic.name=${OUT_KAFKA_RESPONSE_TOPIC_NAME}
#
av.rest.address=${AV_REST_ADDRESS}
av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS}
av.retry.max-attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT}
av.retry.delay.milliseconds=${AV_RETRY_DELAY_MILLISECONDS}
file.saving.path=${FILE_SAVING_PATH}
#
s3.out.endpoint=${S3_ENDPOINT}
s3.out.port=${S3_PORT}
s3.out.access_key=${S3_ACCESS_KEY}
s3.out.secret_key=${S3_SECRET_KEY}
s3.out.bucket_name=${S3_OUT_BUCKET_NAME}
#
spring.jooq.sql-dialect=Postgres
spring.datasource.driver-class-name=org.postgresql.Driver
#host:port/database_name
spring.datasource.url=jdbc:postgresql://${SPRING_DATASOURCE_URL}
spring.datasource.username=${SPRING_DATASOURCE_USERNAME}
spring.datasource.password=${SPRING_DATASOURCE_PASSWORD}

View file

@ -1,2 +1,25 @@
MINIO_ROOT_USER=changeIt123 MINIO_ROOT_USER=changeIt123
MINIO_ROOT_PASSWORD=changeIt123 MINIO_ROOT_PASSWORD=changeIt123
IN_KAFKA_SERVERS=10.10.31.11:32609
IN_KAFKA_USERNAME=user1
IN_KAFKA_PASSWORD=Blfi9d2OFG
IN_KAFKA_TOPIC_NAME=file-to-upload
OUT_KAFKA_SERVERS=10.10.31.11:32609
OUT_KAFKA_USERNAME=user1
OUT_KAFKA_PASSWORD=Blfi9d2OFG
OUT_KAFKA_ERROR_TOPIC_NAME=ervu.lkrp.download.request
OUT_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request
OUT_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response
AV_REST_ADDRESS=http://10.10.31.118:8085/scans
AV_FIRST_TIMEOUT_MILLISECONDS=1000
AV_RETRY_MAX_ATTEMPTS_COUNT=10
AV_RETRY_DELAY_MILLISECONDS=1000
FILE_SAVING_PATH=/transfer/
S3_ENDPOINT=http://ervu-minio.k8s.micord.ru
S3_PORT=31900
S3_ACCESS_KEY=Keyq0l8IRarEf5GmpvEO
S3_SECRET_KEY=8A2epSoI6OjdHHwA5F6tHxeYRThv47GdGwcBrV7a
S3_OUT_BUCKET_NAME=default-out-bucket
SPRING_DATASOURCE_URL=10.10.31.119:5432/ervu-lkrp-ul
SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul
SPRING_DATASOURCE_PASSWORD=ervu-lkrp-ul