Merge tag '1.8.0' into develop
Tag release
This commit is contained in:
commit
cabf259245
10 changed files with 183 additions and 101 deletions
|
|
@ -5,6 +5,7 @@ AV_KAFKA_USERNAME=user1
|
||||||
AV_KAFKA_PASSWORD=Blfi9d2OFG
|
AV_KAFKA_PASSWORD=Blfi9d2OFG
|
||||||
AV_KAFKA_GROUP_ID=file-to-upload-consumers
|
AV_KAFKA_GROUP_ID=file-to-upload-consumers
|
||||||
AV_KAFKA_TOPIC_NAME=file-to-upload
|
AV_KAFKA_TOPIC_NAME=file-to-upload
|
||||||
|
AV_KAFKA_STATUS_TOPIC_NAME=ervu.lkrp.av-fileupload-status
|
||||||
|
|
||||||
ERVU_KAFKA_BOOTSTRAP_SERVERS=10.10.31.11:32609
|
ERVU_KAFKA_BOOTSTRAP_SERVERS=10.10.31.11:32609
|
||||||
ERVU_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
|
ERVU_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
|
||||||
|
|
@ -16,6 +17,7 @@ ERVU_KAFKA_ERROR_TOPIC_NAME=ervu.lkrp.download.request
|
||||||
ERVU_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request
|
ERVU_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request
|
||||||
ERVU_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response
|
ERVU_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response
|
||||||
|
|
||||||
|
AV_CHECK_ENABLED=true
|
||||||
AV_REST_ADDRESS=http://10.10.31.118:8085/scans
|
AV_REST_ADDRESS=http://10.10.31.118:8085/scans
|
||||||
AV_FIRST_TIMEOUT_MILLISECONDS=1000
|
AV_FIRST_TIMEOUT_MILLISECONDS=1000
|
||||||
AV_RETRY_MAX_ATTEMPTS_COUNT=10
|
AV_RETRY_MAX_ATTEMPTS_COUNT=10
|
||||||
|
|
@ -26,8 +28,8 @@ FILE_SAVING_PATH=/transfer/
|
||||||
S3_ENDPOINT=http://ervu-minio.k8s.micord.ru:31900
|
S3_ENDPOINT=http://ervu-minio.k8s.micord.ru:31900
|
||||||
S3_ACCESS_KEY=rlTdTvkmSXu9FsLhfecw
|
S3_ACCESS_KEY=rlTdTvkmSXu9FsLhfecw
|
||||||
S3_SECRET_KEY=NUmY0wwRIEyAd98GCKd1cOgJWvLQYAcMMul5Ulu0
|
S3_SECRET_KEY=NUmY0wwRIEyAd98GCKd1cOgJWvLQYAcMMul5Ulu0
|
||||||
S3_OUT_BUCKET_NAME=default-out-bucket
|
S3_BUCKET_NAME=default-out-bucket
|
||||||
S3_OUT_PATH_STYLE_ACCESS_ENABLED=true
|
S3_PATH_STYLE_ACCESS_ENABLED=true
|
||||||
|
|
||||||
SPRING_DATASOURCE_URL=jdbc:postgresql://10.10.31.119:5432/ervu-lkrp-ul
|
SPRING_DATASOURCE_URL=jdbc:postgresql://10.10.31.119:5432/ervu-lkrp-ul
|
||||||
SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul
|
SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul
|
||||||
|
|
|
||||||
13
pom.xml
13
pom.xml
|
|
@ -130,6 +130,19 @@
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>com.amashchenko.maven.plugin</groupId>
|
||||||
|
<artifactId>gitflow-maven-plugin</artifactId>
|
||||||
|
<version>1.21.0</version>
|
||||||
|
<configuration>
|
||||||
|
<useSnapshotInRelease>true</useSnapshotInRelease>
|
||||||
|
<useSnapshotInHotfix>true</useSnapshotInHotfix>
|
||||||
|
<versionDigitToIncrement>1</versionDigitToIncrement>
|
||||||
|
<pushRemote>true</pushRemote>
|
||||||
|
<fetchRemote>true</fetchRemote>
|
||||||
|
<commitDevelopmentVersionAtStart>true</commitDevelopmentVersionAtStart>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
</build>
|
</build>
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
package ru.micord.ervu.av.kafka.config.input;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class InputKafkaProducerConfig {
|
||||||
|
@Value("${spring.kafka.in.producer.bootstrap.servers}")
|
||||||
|
private List<String> bootstrapAddress;
|
||||||
|
@Value("${spring.kafka.in.producer.security.protocol}")
|
||||||
|
private String securityProtocol;
|
||||||
|
@Value("${spring.kafka.in.producer.properties.sasl.jaas.config}")
|
||||||
|
private String jaasConfig;
|
||||||
|
@Value("${spring.kafka.in.producer.properties.sasl.mechanism}")
|
||||||
|
private String saslMechanism;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, String> inputProducerFactory() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
|
||||||
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
|
||||||
|
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||||
|
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
|
||||||
|
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||||
|
|
||||||
|
return new DefaultKafkaProducerFactory<>(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, String> inputKafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(inputProducerFactory());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
package ru.micord.ervu.av.kafka.config.input;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.config.TopicBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class InputKafkaTopicConfig {
|
||||||
|
@Value("${kafka.in.status.topic.name}")
|
||||||
|
private String kafkaInStatusTopicName;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic inStatusTopic() {
|
||||||
|
return TopicBuilder.name(kafkaInStatusTopicName).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -8,6 +8,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.common.config.SaslConfigs;
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
@ -30,7 +31,7 @@ public class OutputKafkaProducerConfig {
|
||||||
private String saslMechanism;
|
private String saslMechanism;
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ProducerFactory<String, String> producerFactory() {
|
public ProducerFactory<String, String> outputProducerFactory() {
|
||||||
Map<String, Object> configs = new HashMap<>();
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
|
||||||
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
|
@ -45,7 +46,7 @@ public class OutputKafkaProducerConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
public KafkaTemplate<String, String> outputKafkaTemplate() {
|
||||||
return new KafkaTemplate<>(producerFactory());
|
return new KafkaTemplate<>(outputProducerFactory());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,26 +16,26 @@ import org.springframework.context.annotation.Configuration;
|
||||||
* @author r.latypov
|
* @author r.latypov
|
||||||
*/
|
*/
|
||||||
@Configuration
|
@Configuration
|
||||||
public class S3Connection {
|
public class S3Config {
|
||||||
@Value("${s3.out.endpoint}")
|
@Value("${s3.endpoint}")
|
||||||
private String endpointOut;
|
private String endpoint;
|
||||||
@Value("${s3.out.access_key}")
|
@Value("${s3.access_key}")
|
||||||
private String accessKeyOut;
|
private String accessKey;
|
||||||
@Value("${s3.out.secret_key}")
|
@Value("${s3.secret_key}")
|
||||||
private String secretKeyOut;
|
private String secretKey;
|
||||||
@Value("${s3.out.bucket_name}")
|
@Value("${s3.bucket_name}")
|
||||||
private String bucketNameOut;
|
private String bucketName;
|
||||||
@Value("${s3.out.path.style.access.enabled:true}")
|
@Value("${s3.path.style.enabled:true}")
|
||||||
private boolean pathStyleAccessEnabled;
|
private boolean pathStyleAccessEnabled;
|
||||||
|
|
||||||
@Bean("outBucketName")
|
@Bean("outBucketName")
|
||||||
public String getBucketNameOut() {
|
public String getBucket() {
|
||||||
return bucketNameOut;
|
return bucketName;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean("outClient")
|
@Bean("outClient")
|
||||||
public AmazonS3 getS3OutClient() {
|
public AmazonS3 getS3OutClient() {
|
||||||
return getS3Client(endpointOut, accessKeyOut, secretKeyOut, pathStyleAccessEnabled);
|
return getS3Client(endpoint, accessKey, secretKey, pathStyleAccessEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static AmazonS3 getS3Client(String endpoint, String accessKey,
|
private static AmazonS3 getS3Client(String endpoint, String accessKey,
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
package ru.micord.ervu.av.s3;
|
package ru.micord.ervu.av.s3;
|
||||||
|
|
||||||
import java.io.File;
|
import java.nio.file.Path;
|
||||||
|
|
||||||
import com.amazonaws.AmazonServiceException;
|
import com.amazonaws.AmazonServiceException;
|
||||||
import com.amazonaws.services.s3.AmazonS3;
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
|
|
@ -30,19 +30,14 @@ public class S3Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void putFile(String filePath, String key) throws FileUploadException {
|
public String putFile(Path filePath, String key) throws FileUploadException {
|
||||||
try {
|
try {
|
||||||
outClient.putObject(outBucketName, generateResourceName(outBucketName, key),
|
outClient.putObject(outBucketName, key, filePath.toFile());
|
||||||
new File(filePath)
|
return String.join("/", "s3:/", outBucketName, key);
|
||||||
);
|
|
||||||
}
|
}
|
||||||
catch (AmazonServiceException e) {
|
catch (AmazonServiceException e) {
|
||||||
// todo message
|
// todo message
|
||||||
throw new FileUploadException(e);
|
throw new FileUploadException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String generateResourceName(String bucketName, String key) {
|
|
||||||
return String.join("/", bucketName, key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
package ru.micord.ervu.av.service;
|
|
||||||
|
|
||||||
import org.jooq.DSLContext;
|
|
||||||
import org.jooq.Field;
|
|
||||||
import org.jooq.Record;
|
|
||||||
import org.jooq.Table;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
import static org.jooq.impl.DSL.field;
|
|
||||||
import static org.jooq.impl.DSL.name;
|
|
||||||
import static org.jooq.impl.DSL.table;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author r.latypov
|
|
||||||
*/
|
|
||||||
@Service
|
|
||||||
public class FileStatusService {
|
|
||||||
public static final Table<Record> INTERACTION_LOG = table(name("public", "interaction_log"));
|
|
||||||
public static final Field<Long> INTERACTION_LOG_ID = field(name("id"), Long.class);
|
|
||||||
public static final Field<String> INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class);
|
|
||||||
public static final Field<String> INTERACTION_LOG_STATUS = field(name("status"), String.class);
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private DSLContext dslContext;
|
|
||||||
|
|
||||||
public void setStatus(Long id, String status) {
|
|
||||||
dslContext.update(INTERACTION_LOG)
|
|
||||||
.set(INTERACTION_LOG_STATUS, status)
|
|
||||||
.where(INTERACTION_LOG_ID.eq(id))
|
|
||||||
.execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setStatus(String fileId, String status) {
|
|
||||||
dslContext.update(INTERACTION_LOG)
|
|
||||||
.set(INTERACTION_LOG_STATUS, status)
|
|
||||||
.where(INTERACTION_LOG_FILE_ID.eq(fileId))
|
|
||||||
.execute();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -3,6 +3,10 @@ package ru.micord.ervu.av.service;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
@ -22,9 +26,11 @@ import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
import org.apache.http.impl.client.HttpClients;
|
import org.apache.http.impl.client.HttpClients;
|
||||||
import org.apache.http.util.EntityUtils;
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.apache.kafka.clients.admin.NewTopic;
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
|
@ -32,6 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.support.Acknowledgment;
|
import org.springframework.kafka.support.Acknowledgment;
|
||||||
import org.springframework.kafka.support.SendResult;
|
import org.springframework.kafka.support.SendResult;
|
||||||
import org.springframework.lang.NonNull;
|
import org.springframework.lang.NonNull;
|
||||||
|
import org.springframework.messaging.handler.annotation.Header;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import ru.micord.ervu.av.exception.FileUploadException;
|
import ru.micord.ervu.av.exception.FileUploadException;
|
||||||
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
|
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
|
||||||
|
|
@ -48,6 +55,8 @@ import ru.micord.ervu.av.s3.S3Service;
|
||||||
@Service
|
@Service
|
||||||
public class FileUploadService {
|
public class FileUploadService {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class);
|
private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class);
|
||||||
|
@Value("${av.check.enabled}")
|
||||||
|
private boolean avCheckEnabled;
|
||||||
@Value("${av.rest.address}")
|
@Value("${av.rest.address}")
|
||||||
private String avRestAddress;
|
private String avRestAddress;
|
||||||
@Value("${av.first.timeout.milliseconds}")
|
@Value("${av.first.timeout.milliseconds}")
|
||||||
|
|
@ -55,62 +64,81 @@ public class FileUploadService {
|
||||||
@Value("${file.saving.path}")
|
@Value("${file.saving.path}")
|
||||||
private String fileSavingPath;
|
private String fileSavingPath;
|
||||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||||
|
private final KafkaTemplate<String, String> inKafkaTemplate;
|
||||||
private final NewTopic outErrorTopic;
|
private final NewTopic outErrorTopic;
|
||||||
private final NewTopic outSuccessTopic;
|
private final NewTopic outSuccessTopic;
|
||||||
|
private final NewTopic inStatusTopic;
|
||||||
private final ReceiveScanReportRetryable receiveScanReportRetryable;
|
private final ReceiveScanReportRetryable receiveScanReportRetryable;
|
||||||
private final FileStatusService fileStatusService;
|
|
||||||
private final S3Service s3Service;
|
private final S3Service s3Service;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public FileUploadService(KafkaTemplate<String, String> kafkaTemplate, NewTopic outErrorTopic,
|
public FileUploadService(@Qualifier("outputKafkaTemplate") KafkaTemplate<String, String> kafkaTemplate,
|
||||||
NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
|
@Qualifier("inputKafkaTemplate") KafkaTemplate<String, String> inKafkaTemplate,
|
||||||
FileStatusService fileStatusService, S3Service s3Service) {
|
NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
|
||||||
|
S3Service s3Service, NewTopic inStatusTopic) {
|
||||||
this.kafkaTemplate = kafkaTemplate;
|
this.kafkaTemplate = kafkaTemplate;
|
||||||
this.outErrorTopic = outErrorTopic;
|
this.outErrorTopic = outErrorTopic;
|
||||||
this.outSuccessTopic = outSuccessTopic;
|
this.outSuccessTopic = outSuccessTopic;
|
||||||
this.receiveScanReportRetryable = receiveScanReportRetryable;
|
this.receiveScanReportRetryable = receiveScanReportRetryable;
|
||||||
this.fileStatusService = fileStatusService;
|
|
||||||
this.s3Service = s3Service;
|
this.s3Service = s3Service;
|
||||||
|
this.inKafkaTemplate = inKafkaTemplate;
|
||||||
|
this.inStatusTopic = inStatusTopic;
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
|
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
|
||||||
containerFactory = "inputKafkaListenerContainerFactory")
|
containerFactory = "inputKafkaListenerContainerFactory")
|
||||||
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
|
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment,
|
||||||
|
@Header("messageId") String messageId) {
|
||||||
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
|
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
|
||||||
String fileId = downloadRequest.fileInfo().getFileId();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
|
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
|
||||||
String filePath = fileSavingPath + fileUrl.fileName();
|
|
||||||
|
try {
|
||||||
|
Files.createDirectories(Paths.get(fileSavingPath));
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException("Failed to create directory " + fileSavingPath, e);
|
||||||
|
}
|
||||||
|
logger.info("working in {}", System.getProperty("user.home"));
|
||||||
|
Path filePath = Paths.get(fileSavingPath, fileUrl.fileName());
|
||||||
String downloadUrl = fileUrl.fileUrl();
|
String downloadUrl = fileUrl.fileUrl();
|
||||||
downloadFile(downloadUrl, filePath);
|
downloadFile(downloadUrl, filePath);
|
||||||
|
boolean clean = true;
|
||||||
|
boolean infected = false;
|
||||||
|
|
||||||
AvResponse avResponse = checkFile(filePath);
|
if (avCheckEnabled) {
|
||||||
|
AvResponse avResponse = checkFile(filePath);
|
||||||
|
|
||||||
boolean clean = Arrays.stream(avResponse.verdicts())
|
clean = Arrays.stream(avResponse.verdicts())
|
||||||
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_CLEAN));
|
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_CLEAN));
|
||||||
boolean infected = Arrays.stream(avResponse.verdicts())
|
infected = Arrays.stream(avResponse.verdicts())
|
||||||
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_INFECTED));
|
.anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_INFECTED));
|
||||||
|
}
|
||||||
|
|
||||||
if (infected || !clean) {
|
if (infected || !clean) {
|
||||||
downloadRequest.fileInfo().setFileUrl(null);
|
downloadRequest.fileInfo().setFileUrl(null);
|
||||||
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
|
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
|
||||||
sendMessage(outErrorTopic.name(), downloadRequest);
|
sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate);
|
||||||
|
|
||||||
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status());
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
s3Service.putFile(filePath, fileUrl.fileName());
|
String fileRef = s3Service.putFile(filePath, fileUrl.fileName());
|
||||||
|
|
||||||
downloadRequest.fileInfo().setFileUrl(fileUrl.fileName());
|
downloadRequest.fileInfo().setFileUrl(fileRef);
|
||||||
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
|
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
|
||||||
sendMessage(outSuccessTopic.name(), downloadRequest);
|
sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate);
|
||||||
|
|
||||||
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status());
|
|
||||||
}
|
}
|
||||||
|
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
|
||||||
|
|
||||||
deleteFile(downloadUrl);
|
deleteFile(downloadUrl);
|
||||||
if (new File(filePath).delete()) {
|
|
||||||
|
try {
|
||||||
|
Files.delete(filePath);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new RuntimeException("Failed to delete file " + filePath.getFileName());
|
||||||
|
}
|
||||||
|
finally {
|
||||||
acknowledgment.acknowledge();
|
acknowledgment.acknowledge();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -131,15 +159,14 @@ public class FileUploadService {
|
||||||
@KafkaListener(id = "${spring.kafka.out.consumer.group.id}",
|
@KafkaListener(id = "${spring.kafka.out.consumer.group.id}",
|
||||||
topics = "${kafka.out.response.topic.name}",
|
topics = "${kafka.out.response.topic.name}",
|
||||||
containerFactory = "outputKafkaListenerContainerFactory")
|
containerFactory = "outputKafkaListenerContainerFactory")
|
||||||
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) {
|
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment,
|
||||||
|
@Header("messageId") String messageId) {
|
||||||
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
|
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
|
||||||
DownloadResponse.class
|
DownloadResponse.class
|
||||||
);
|
);
|
||||||
|
|
||||||
String fileId = downloadResponse.fileInfo().fileId();
|
|
||||||
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
|
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
|
||||||
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
|
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
|
||||||
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status());
|
sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate);
|
||||||
}
|
}
|
||||||
|
|
||||||
acknowledgment.acknowledge();
|
acknowledgment.acknowledge();
|
||||||
|
|
@ -160,9 +187,9 @@ public class FileUploadService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void downloadFile(String fileUrl, String filePath)
|
private void downloadFile(String fileUrl, Path filePath)
|
||||||
throws InvalidHttpFileUrlException, FileUploadException {
|
throws InvalidHttpFileUrlException, FileUploadException {
|
||||||
File file = new File(filePath);
|
File file = filePath.toFile();
|
||||||
HttpGet request = new HttpGet(fileUrl);
|
HttpGet request = new HttpGet(fileUrl);
|
||||||
|
|
||||||
try (CloseableHttpClient client = HttpClients.createDefault();
|
try (CloseableHttpClient client = HttpClients.createDefault();
|
||||||
|
|
@ -194,8 +221,8 @@ public class FileUploadService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private AvResponse checkFile(String filePath) throws FileUploadException {
|
private AvResponse checkFile(Path filePath) throws FileUploadException {
|
||||||
File file = new File(filePath);
|
File file = filePath.toFile();
|
||||||
|
|
||||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
HttpPost post = new HttpPost(avRestAddress);
|
HttpPost post = new HttpPost(avRestAddress);
|
||||||
|
|
@ -274,10 +301,12 @@ public class FileUploadService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMessage(@NonNull String topicName, Object object) {
|
private void sendMessage(@NonNull String topicName, Object object, String messageId,
|
||||||
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName,
|
KafkaTemplate<String, String> template) {
|
||||||
new GsonBuilder().setPrettyPrinting().create().toJson(object)
|
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
|
||||||
);
|
new GsonBuilder().setPrettyPrinting().create().toJson(object));
|
||||||
|
record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8));
|
||||||
|
CompletableFuture<SendResult<String, String>> future = template.send(record);
|
||||||
|
|
||||||
future.whenComplete((result, e) -> {
|
future.whenComplete((result, e) -> {
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,13 @@ spring.kafka.consumer.enable.auto.commit=false
|
||||||
spring.kafka.consumer.group.id=${AV_KAFKA_GROUP_ID:file-to-upload-consumers}
|
spring.kafka.consumer.group.id=${AV_KAFKA_GROUP_ID:file-to-upload-consumers}
|
||||||
# kafka in listeners
|
# kafka in listeners
|
||||||
spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE
|
spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE
|
||||||
|
# kafka in producer (with possibility for default bean)
|
||||||
|
#host1:port1, host2:port2
|
||||||
|
spring.kafka.in.producer.bootstrap.servers=${AV_KAFKA_BOOTSTRAP_SERVERS}
|
||||||
|
spring.kafka.in.producer.security.protocol=${AV_KAFKA_SECURITY_PROTOCOL:SASL_PLAINTEXT}
|
||||||
|
#login password to set
|
||||||
|
spring.kafka.in.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${AV_KAFKA_USERNAME}" password="${AV_KAFKA_PASSWORD}";
|
||||||
|
spring.kafka.in.producer.properties.sasl.mechanism=${AV_KAFKA_SASL_MECHANISM:SCRAM-SHA-256}
|
||||||
#
|
#
|
||||||
# kafka out producer (with possibility for default bean)
|
# kafka out producer (with possibility for default bean)
|
||||||
#host1:port1, host2:port2
|
#host1:port1, host2:port2
|
||||||
|
|
@ -38,10 +45,12 @@ spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME}
|
kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME}
|
||||||
|
kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME}
|
||||||
kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME}
|
kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME}
|
||||||
kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME}
|
kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME}
|
||||||
kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME}
|
kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME}
|
||||||
#
|
#
|
||||||
|
av.check.enabled=${AV_CHECK_ENABLED:true}
|
||||||
av.rest.address=${AV_REST_ADDRESS}
|
av.rest.address=${AV_REST_ADDRESS}
|
||||||
av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS:1000}
|
av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS:1000}
|
||||||
av.retry.max.attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT:10}
|
av.retry.max.attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT:10}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue