SUPPORT-8400: FileUploadService
This commit is contained in:
parent
4ae87e163e
commit
828884954d
14 changed files with 646 additions and 0 deletions
22
.gitignore
vendored
Normal file
22
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
#ignore target and tmp dir
|
||||||
|
target*/
|
||||||
|
tmp*/
|
||||||
|
debug.log
|
||||||
|
|
||||||
|
#
|
||||||
|
# IntelliJ IDEA project files
|
||||||
|
#
|
||||||
|
.idea*/
|
||||||
|
.classes*/
|
||||||
|
*.ipr
|
||||||
|
*.iml
|
||||||
|
*.iws
|
||||||
|
*.ids
|
||||||
|
|
||||||
|
# os meta files
|
||||||
|
Thumbs.db
|
||||||
|
.DS_Store
|
||||||
|
|
||||||
|
*.orig
|
||||||
|
pom.xml.versionsBackup
|
||||||
|
.flattened-pom.xml
|
||||||
96
file-upload/pom.xml
Normal file
96
file-upload/pom.xml
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-parent</artifactId>
|
||||||
|
<version>3.3.0</version>
|
||||||
|
<relativePath/>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<groupId>ru.cg.subproject</groupId>
|
||||||
|
<artifactId>file-upload</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
|
||||||
|
<properties>
|
||||||
|
<java.version>17</java.version>
|
||||||
|
</properties>
|
||||||
|
|
||||||
|
<dependencyManagement>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.code.gson</groupId>
|
||||||
|
<artifactId>gson</artifactId>
|
||||||
|
<version>2.11.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.httpcomponents</groupId>
|
||||||
|
<artifactId>httpclient</artifactId>
|
||||||
|
<version>4.5.14</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.httpcomponents</groupId>
|
||||||
|
<artifactId>httpmime</artifactId>
|
||||||
|
<version>4.5.14</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>1.18.34</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
<version>3.2.1</version>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</dependencyManagement>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.code.gson</groupId>
|
||||||
|
<artifactId>gson</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.httpcomponents</groupId>
|
||||||
|
<artifactId>httpclient</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.httpcomponents</groupId>
|
||||||
|
<artifactId>httpmime</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
|
</project>
|
||||||
|
|
@ -0,0 +1,15 @@
|
||||||
|
package ru.cg.subproject;
|
||||||
|
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@SpringBootApplication
|
||||||
|
public class FileUploadApplication {
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
SpringApplication.run(FileUploadApplication.class, args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
package ru.cg.subproject.av;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
public record AvResponse(String completed, String created, String progress, ScanResult scan_result,
|
||||||
|
String status, String[] verdicts) {
|
||||||
|
public record ScanResult(Scan noname) {
|
||||||
|
public record Scan(String started, String stopped, Threat[] threats, String verdict) {
|
||||||
|
public record Threat(String name, String object) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
package ru.cg.subproject.exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
public class FileUploadException extends Exception {
|
||||||
|
public FileUploadException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FileUploadException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
package ru.cg.subproject.exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
public class InvalidHttpFileUrlException extends Exception {
|
||||||
|
private static final String MESSAGE = "file url is not valid";
|
||||||
|
|
||||||
|
public InvalidHttpFileUrlException() {
|
||||||
|
super(MESSAGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public InvalidHttpFileUrlException(String message) {
|
||||||
|
super(String.join(" : ", MESSAGE, message));
|
||||||
|
}
|
||||||
|
|
||||||
|
public InvalidHttpFileUrlException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,64 @@
|
||||||
|
package ru.cg.subproject.kafka.config;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.listener.ContainerProperties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
@EnableKafka
|
||||||
|
public class KafkaConsumerConfig {
|
||||||
|
@Value("${spring.kafka.consumer.bootstrap-servers}")
|
||||||
|
private List<String> bootstrapAddress;
|
||||||
|
@Value("${spring.kafka.consumer.security.protocol}")
|
||||||
|
private String securityProtocol;
|
||||||
|
@Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
|
||||||
|
private String jaasConfig;
|
||||||
|
@Value("${spring.kafka.consumer.properties.sasl.mechanism}")
|
||||||
|
private String saslMechanism;
|
||||||
|
@Value("${spring.kafka.consumer.enable-auto-commit}")
|
||||||
|
private String enableAutoCommit;
|
||||||
|
@Value("${spring.kafka.listener.ack-mode}")
|
||||||
|
private String ackMode;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, String> consumerFactory() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
|
||||||
|
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
|
||||||
|
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||||
|
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
|
||||||
|
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||||
|
|
||||||
|
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
|
||||||
|
|
||||||
|
return new DefaultKafkaConsumerFactory<>(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, String> factory =
|
||||||
|
new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(consumerFactory());
|
||||||
|
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
package ru.cg.subproject.kafka.config;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class KafkaProducerConfig {
|
||||||
|
@Value("${spring.kafka.producer.bootstrap-servers}")
|
||||||
|
private List<String> bootstrapAddress;
|
||||||
|
@Value("${spring.kafka.producer.security.protocol}")
|
||||||
|
private String securityProtocol;
|
||||||
|
@Value("${spring.kafka.producer.properties.sasl.jaas.config}")
|
||||||
|
private String jaasConfig;
|
||||||
|
@Value("${spring.kafka.producer.properties.sasl.mechanism}")
|
||||||
|
private String saslMechanism;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, String> producerFactory() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
|
||||||
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||||
|
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
|
||||||
|
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||||
|
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
|
||||||
|
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||||
|
|
||||||
|
return new DefaultKafkaProducerFactory<>(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(producerFactory());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
package ru.cg.subproject.kafka.config;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.config.TopicBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class KafkaTopicConfig {
|
||||||
|
@Value("${kafka-out.topic.error.name}")
|
||||||
|
private String kafkaOutTopicErrorName;
|
||||||
|
@Value("${kafka-out.topic.success.name}")
|
||||||
|
private String kafkaOutTopicSuccessName;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic outErrorTopic() {
|
||||||
|
return TopicBuilder.name(kafkaOutTopicErrorName).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic outSuccessTopic() {
|
||||||
|
return TopicBuilder.name(kafkaOutTopicSuccessName).build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package ru.cg.subproject.kafka.dto;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import org.springframework.lang.NonNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
public record InMessage(OrgInfo orgInfo, SenderInfo senderInfo, @NonNull FileInfo fileInfo) {
|
||||||
|
public record OrgInfo(String orgName, String orgTypeCode, String orgTypeName, String ogrn,
|
||||||
|
String in, String kpp) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public record SenderInfo(String senderLastName, String senderFirstName,
|
||||||
|
String senderMiddleName, String birthDate, String senderRoleCode,
|
||||||
|
String senderRoleName, String snils, String idERN,
|
||||||
|
Document document) {
|
||||||
|
|
||||||
|
public record Document(String series, String number, String issueDate) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
public static class FileInfo {
|
||||||
|
@NonNull
|
||||||
|
@Setter
|
||||||
|
private String fileNameBase;
|
||||||
|
private final String fileName;
|
||||||
|
private final String filePatternCode;
|
||||||
|
private final String FilePatternName;
|
||||||
|
private final String departureDateTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,12 @@
|
||||||
|
package ru.cg.subproject.kafka.dto;
|
||||||
|
|
||||||
|
import org.springframework.lang.NonNull;
|
||||||
|
|
||||||
|
import ru.cg.subproject.av.AvResponse;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
public record OutErrorMessage(String errorMessage, AvResponse avResponse,
|
||||||
|
@NonNull InMessage inMessage) {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,235 @@
|
||||||
|
package ru.cg.subproject.service;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
import com.google.gson.GsonBuilder;
|
||||||
|
import org.apache.http.HttpEntity;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpDelete;
|
||||||
|
import org.apache.http.client.methods.HttpGet;
|
||||||
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
import org.apache.http.client.methods.HttpPut;
|
||||||
|
import org.apache.http.conn.HttpHostConnectException;
|
||||||
|
import org.apache.http.entity.mime.MultipartEntityBuilder;
|
||||||
|
import org.apache.http.entity.mime.content.FileBody;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.support.Acknowledgment;
|
||||||
|
import org.springframework.kafka.support.SendResult;
|
||||||
|
import org.springframework.lang.NonNull;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import ru.cg.subproject.av.AvResponse;
|
||||||
|
import ru.cg.subproject.exception.FileUploadException;
|
||||||
|
import ru.cg.subproject.exception.InvalidHttpFileUrlException;
|
||||||
|
import ru.cg.subproject.kafka.dto.InMessage;
|
||||||
|
import ru.cg.subproject.kafka.dto.OutErrorMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class FileUploadService {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class);
|
||||||
|
@Value("${av.rest.address}")
|
||||||
|
private String avRestAddress;
|
||||||
|
@Value("${file.before.av.path:}")
|
||||||
|
private String fileBeforeAvPath;
|
||||||
|
@Value("${http.file.server.out.address:http://localhost/out}")
|
||||||
|
private String httpFileServerOutAddress;
|
||||||
|
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||||
|
private final NewTopic outErrorTopic;
|
||||||
|
private final NewTopic outSuccessTopic;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
public FileUploadService(KafkaTemplate<String, String> kafkaTemplate, NewTopic outErrorTopic,
|
||||||
|
NewTopic outSuccessTopic) {
|
||||||
|
this.kafkaTemplate = kafkaTemplate;
|
||||||
|
this.outErrorTopic = outErrorTopic;
|
||||||
|
this.outSuccessTopic = outSuccessTopic;
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}")
|
||||||
|
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
|
||||||
|
InMessage inMessage = new Gson().fromJson(kafkaInMessage, InMessage.class);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileUrl fileUrl = parseFileUrl(inMessage.fileInfo().getFileNameBase());
|
||||||
|
String filePath = fileBeforeAvPath + fileUrl.fileName();
|
||||||
|
String downloadUrl = fileUrl.fileUrl();
|
||||||
|
downloadFileByHttp(downloadUrl, filePath);
|
||||||
|
|
||||||
|
AvResponse avResponse = sendFileToAvScan(filePath);
|
||||||
|
|
||||||
|
boolean infected = Arrays.stream(avResponse.verdicts())
|
||||||
|
.anyMatch(verdict -> verdict.equalsIgnoreCase("infected"));
|
||||||
|
|
||||||
|
if (infected) {
|
||||||
|
sendKafkaMessage(outErrorTopic.name(),
|
||||||
|
new OutErrorMessage("file is infected", avResponse, inMessage)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
String uploadUrl = httpFileServerOutAddress + "/" + fileUrl.fileName();
|
||||||
|
uploadFileByHttp(filePath, uploadUrl);
|
||||||
|
|
||||||
|
inMessage.fileInfo().setFileNameBase(uploadUrl);
|
||||||
|
sendKafkaMessage(outSuccessTopic.name(), inMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteFileByHttp(downloadUrl);
|
||||||
|
if (new File(filePath).delete()) {
|
||||||
|
acknowledgment.acknowledge();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InvalidHttpFileUrlException e) {
|
||||||
|
logger.error(e.getMessage() + ": " + kafkaInMessage);
|
||||||
|
|
||||||
|
sendKafkaMessage(outErrorTopic.name(),
|
||||||
|
new OutErrorMessage(e.getMessage(), null, inMessage)
|
||||||
|
);
|
||||||
|
acknowledgment.acknowledge();
|
||||||
|
}
|
||||||
|
catch (FileUploadException e) {
|
||||||
|
logger.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FileUrl parseFileUrl(@NonNull String fileUrl) throws InvalidHttpFileUrlException {
|
||||||
|
String[] substrings = fileUrl.split("/");
|
||||||
|
String fileName = substrings[substrings.length - 1];
|
||||||
|
if (substrings.length == 1 || fileName.isBlank()) {
|
||||||
|
throw new InvalidHttpFileUrlException(fileUrl);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return new FileUrl(fileName, fileUrl);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void downloadFileByHttp(String fileUrl, String filePath)
|
||||||
|
throws InvalidHttpFileUrlException, FileUploadException {
|
||||||
|
File file = new File(filePath);
|
||||||
|
HttpGet request = new HttpGet(fileUrl);
|
||||||
|
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault();
|
||||||
|
CloseableHttpResponse response = client.execute(request)) {
|
||||||
|
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode == 200) {
|
||||||
|
HttpEntity entity = response.getEntity();
|
||||||
|
if (entity != null) {
|
||||||
|
try (FileOutputStream outputStream = new FileOutputStream(file)) {
|
||||||
|
entity.writeTo(outputStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
String message = "http status code " + statusCode + " : " + fileUrl;
|
||||||
|
throw new InvalidHttpFileUrlException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (HttpHostConnectException e) {
|
||||||
|
throw new FileUploadException(e);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
String message =
|
||||||
|
(e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl;
|
||||||
|
throw new InvalidHttpFileUrlException(message, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private AvResponse sendFileToAvScan(String filePath) throws FileUploadException {
|
||||||
|
File file = new File(filePath);
|
||||||
|
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
HttpPost post = new HttpPost(avRestAddress);
|
||||||
|
post.addHeader("Content-type", "application/json");
|
||||||
|
HttpEntity entity = MultipartEntityBuilder.create()
|
||||||
|
.addPart("file", new FileBody(file))
|
||||||
|
.build();
|
||||||
|
post.setEntity(entity);
|
||||||
|
|
||||||
|
try (CloseableHttpResponse response = client.execute(post)) {
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode != 200) {
|
||||||
|
String message = "http status code " + statusCode + " for " + avRestAddress + " request";
|
||||||
|
throw new FileUploadException(message);
|
||||||
|
}
|
||||||
|
return new Gson().fromJson(response.getEntity().toString(), AvResponse.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new FileUploadException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void uploadFileByHttp(String filePath, String uploadUrl) throws FileUploadException {
|
||||||
|
File file = new File(filePath);
|
||||||
|
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
HttpPut put = new HttpPut(uploadUrl);
|
||||||
|
HttpEntity entity = MultipartEntityBuilder.create()
|
||||||
|
.addPart("file", new FileBody(file))
|
||||||
|
.build();
|
||||||
|
put.setEntity(entity);
|
||||||
|
|
||||||
|
try (CloseableHttpResponse response = client.execute(put)) {
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode == 204) {
|
||||||
|
logger.warn("file already exists : " + uploadUrl);
|
||||||
|
}
|
||||||
|
else if (statusCode != 201) {
|
||||||
|
String message = "http status code " + statusCode + " : " + uploadUrl;
|
||||||
|
throw new RuntimeException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new FileUploadException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteFileByHttp(String fileUrl) throws FileUploadException {
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
HttpDelete delete = new HttpDelete(fileUrl);
|
||||||
|
|
||||||
|
try (CloseableHttpResponse response = client.execute(delete)) {
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode != 204) {
|
||||||
|
String message = "http status code " + statusCode + " : " + fileUrl;
|
||||||
|
throw new RuntimeException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
throw new FileUploadException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendKafkaMessage(@NonNull String topicName, Object object) {
|
||||||
|
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName,
|
||||||
|
new GsonBuilder().setPrettyPrinting().create().toJson(object)
|
||||||
|
);
|
||||||
|
|
||||||
|
future.whenComplete((result, e) -> {
|
||||||
|
if (e != null) {
|
||||||
|
String message =
|
||||||
|
"Unable to send message [" + result.getProducerRecord().value() + "] into kafka topic '"
|
||||||
|
+ topicName + "' due to : " + e.getMessage();
|
||||||
|
throw new RuntimeException(message, e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
package ru.cg.subproject.service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author r.latypov
|
||||||
|
*/
|
||||||
|
public record FileUrl(String fileName, String fileUrl) {
|
||||||
|
}
|
||||||
32
file-upload/src/main/resources/application.properties
Normal file
32
file-upload/src/main/resources/application.properties
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
spring.kafka.admin.security.protocol=SASL_PLAINTEXT
|
||||||
|
spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||||
|
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
|
||||||
|
#
|
||||||
|
spring.kafka.bootstrap-servers=10.10.31.11:32609
|
||||||
|
#
|
||||||
|
spring.kafka.consumer.bootstrap-servers=10.10.31.11:32609
|
||||||
|
spring.kafka.consumer.security.protocol=SASL_PLAINTEXT
|
||||||
|
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||||
|
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
|
||||||
|
#
|
||||||
|
spring.kafka.consumer.enable-auto-commit=false
|
||||||
|
spring.kafka.consumer.group-id=file-to-upload-consumers
|
||||||
|
#
|
||||||
|
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
|
||||||
|
#
|
||||||
|
spring.kafka.producer.bootstrap-servers=10.10.31.11:32609
|
||||||
|
spring.kafka.producer.security.protocol=SASL_PLAINTEXT
|
||||||
|
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||||
|
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
|
||||||
|
#
|
||||||
|
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
|
||||||
|
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||||
|
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
|
||||||
|
#
|
||||||
|
kafka-in.topic.name=file-to-upload
|
||||||
|
kafka-out.topic.error.name=error
|
||||||
|
kafka-out.topic.success.name=success
|
||||||
|
#
|
||||||
|
av.rest.address=http://<server>:<port>/scans
|
||||||
|
file.before.av.path=/nginx/transfer/
|
||||||
|
http.file.server.out.address=http://localhost/out
|
||||||
Loading…
Add table
Add a link
Reference in a new issue