From 828884954d9d56956a08696abc6c37d23d35cd39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A0=D0=B0=D1=83=D1=84=20=D0=9B=D0=B0=D1=82=D1=8B=D0=BF?= =?UTF-8?q?=D0=BE=D0=B2?= Date: Wed, 24 Jul 2024 09:59:34 +0300 Subject: [PATCH] SUPPORT-8400: FileUploadService --- .gitignore | 22 ++ file-upload/pom.xml | 96 +++++++ .../cg/subproject/FileUploadApplication.java | 15 ++ .../java/ru/cg/subproject/av/AvResponse.java | 14 ++ .../exception/FileUploadException.java | 14 ++ .../InvalidHttpFileUrlException.java | 20 ++ .../kafka/config/KafkaConsumerConfig.java | 64 +++++ .../kafka/config/KafkaProducerConfig.java | 51 ++++ .../kafka/config/KafkaTopicConfig.java | 28 +++ .../ru/cg/subproject/kafka/dto/InMessage.java | 36 +++ .../subproject/kafka/dto/OutErrorMessage.java | 12 + .../subproject/service/FileUploadService.java | 235 ++++++++++++++++++ .../ru/cg/subproject/service/FileUrl.java | 7 + .../src/main/resources/application.properties | 32 +++ 14 files changed, 646 insertions(+) create mode 100644 .gitignore create mode 100644 file-upload/pom.xml create mode 100644 file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java create mode 100644 file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java create mode 100644 file-upload/src/main/resources/application.properties diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d99b869 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +#ignore target and tmp dir +target*/ +tmp*/ +debug.log + +# +# IntelliJ IDEA project files +# +.idea*/ +.classes*/ +*.ipr +*.iml +*.iws +*.ids + +# os meta files +Thumbs.db +.DS_Store + +*.orig +pom.xml.versionsBackup +.flattened-pom.xml diff --git a/file-upload/pom.xml b/file-upload/pom.xml new file mode 100644 index 0000000..816b840 --- /dev/null +++ b/file-upload/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.3.0 + + + + ru.cg.subproject + file-upload + 0.0.1-SNAPSHOT + + + 17 + + + + + + com.google.code.gson + gson + 2.11.0 + + + + org.apache.httpcomponents + httpclient + 4.5.14 + + + org.apache.httpcomponents + httpmime + 4.5.14 + + + + org.projectlombok + lombok + 1.18.34 + provided + + + + org.springframework.kafka + spring-kafka + 3.2.1 + + + + + + + com.google.code.gson + gson + + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpmime + + + + org.projectlombok + lombok + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.kafka + spring-kafka + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java b/file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java new file mode 100644 index 0000000..f9f4b06 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java @@ -0,0 +1,15 @@ +package ru.cg.subproject; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author r.latypov + */ +@SpringBootApplication +public class FileUploadApplication { + + public static void main(String[] args) { + SpringApplication.run(FileUploadApplication.class, args); + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java b/file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java new file mode 100644 index 0000000..92a894f --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java @@ -0,0 +1,14 @@ +package ru.cg.subproject.av; + +/** + * @author r.latypov + */ +public record AvResponse(String completed, String created, String progress, ScanResult scan_result, + String status, String[] verdicts) { + public record ScanResult(Scan noname) { + public record Scan(String started, String stopped, Threat[] threats, String verdict) { + public record Threat(String name, String object) { + } + } + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java b/file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java new file mode 100644 index 0000000..b11dea2 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java @@ -0,0 +1,14 @@ +package ru.cg.subproject.exception; + +/** + * @author r.latypov + */ +public class FileUploadException extends Exception { + public FileUploadException(String message) { + super(message); + } + + public FileUploadException(Throwable cause) { + super(cause); + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java b/file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java new file mode 100644 index 0000000..9ff1f80 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java @@ -0,0 +1,20 @@ +package ru.cg.subproject.exception; + +/** + * @author r.latypov + */ +public class InvalidHttpFileUrlException extends Exception { + private static final String MESSAGE = "file url is not valid"; + + public InvalidHttpFileUrlException() { + super(MESSAGE); + } + + public InvalidHttpFileUrlException(String message) { + super(String.join(" : ", MESSAGE, message)); + } + + public InvalidHttpFileUrlException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..f428816 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java @@ -0,0 +1,64 @@ +package ru.cg.subproject.kafka.config; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +/** + * @author r.latypov + */ +@Configuration +@EnableKafka +public class KafkaConsumerConfig { + @Value("${spring.kafka.consumer.bootstrap-servers}") + private List bootstrapAddress; + @Value("${spring.kafka.consumer.security.protocol}") + private String securityProtocol; + @Value("${spring.kafka.consumer.properties.sasl.jaas.config}") + private String jaasConfig; + @Value("${spring.kafka.consumer.properties.sasl.mechanism}") + private String saslMechanism; + @Value("${spring.kafka.consumer.enable-auto-commit}") + private String enableAutoCommit; + @Value("${spring.kafka.listener.ack-mode}") + private String ackMode; + + @Bean + public ConsumerFactory consumerFactory() { + Map configs = new HashMap<>(); + + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + + return new DefaultKafkaConsumerFactory<>(configs); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + return factory; + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java new file mode 100644 index 0000000..389ec9b --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,51 @@ +package ru.cg.subproject.kafka.config; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +/** + * @author r.latypov + */ +@Configuration +public class KafkaProducerConfig { + @Value("${spring.kafka.producer.bootstrap-servers}") + private List bootstrapAddress; + @Value("${spring.kafka.producer.security.protocol}") + private String securityProtocol; + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String jaasConfig; + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Bean + public ProducerFactory producerFactory() { + Map configs = new HashMap<>(); + + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + return new DefaultKafkaProducerFactory<>(configs); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java new file mode 100644 index 0000000..a44b82f --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java @@ -0,0 +1,28 @@ +package ru.cg.subproject.kafka.config; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; + +/** + * @author r.latypov + */ +@Configuration +public class KafkaTopicConfig { + @Value("${kafka-out.topic.error.name}") + private String kafkaOutTopicErrorName; + @Value("${kafka-out.topic.success.name}") + private String kafkaOutTopicSuccessName; + + @Bean + public NewTopic outErrorTopic() { + return TopicBuilder.name(kafkaOutTopicErrorName).build(); + } + + @Bean + public NewTopic outSuccessTopic() { + return TopicBuilder.name(kafkaOutTopicSuccessName).build(); + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java new file mode 100644 index 0000000..1b388e1 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java @@ -0,0 +1,36 @@ +package ru.cg.subproject.kafka.dto; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.springframework.lang.NonNull; + +/** + * @author r.latypov + */ +public record InMessage(OrgInfo orgInfo, SenderInfo senderInfo, @NonNull FileInfo fileInfo) { + public record OrgInfo(String orgName, String orgTypeCode, String orgTypeName, String ogrn, + String in, String kpp) { + } + + public record SenderInfo(String senderLastName, String senderFirstName, + String senderMiddleName, String birthDate, String senderRoleCode, + String senderRoleName, String snils, String idERN, + Document document) { + + public record Document(String series, String number, String issueDate) { + } + } + + @Getter + @AllArgsConstructor + public static class FileInfo { + @NonNull + @Setter + private String fileNameBase; + private final String fileName; + private final String filePatternCode; + private final String FilePatternName; + private final String departureDateTime; + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java new file mode 100644 index 0000000..0cba965 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java @@ -0,0 +1,12 @@ +package ru.cg.subproject.kafka.dto; + +import org.springframework.lang.NonNull; + +import ru.cg.subproject.av.AvResponse; + +/** + * @author r.latypov + */ +public record OutErrorMessage(String errorMessage, AvResponse avResponse, + @NonNull InMessage inMessage) { +} diff --git a/file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java b/file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java new file mode 100644 index 0000000..78965f4 --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java @@ -0,0 +1,235 @@ +package ru.cg.subproject.service; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.entity.mime.content.FileBody; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.kafka.clients.admin.NewTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.SendResult; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Service; + +import ru.cg.subproject.av.AvResponse; +import ru.cg.subproject.exception.FileUploadException; +import ru.cg.subproject.exception.InvalidHttpFileUrlException; +import ru.cg.subproject.kafka.dto.InMessage; +import ru.cg.subproject.kafka.dto.OutErrorMessage; + +/** + * @author r.latypov + */ +@Service +public class FileUploadService { + private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class); + @Value("${av.rest.address}") + private String avRestAddress; + @Value("${file.before.av.path:}") + private String fileBeforeAvPath; + @Value("${http.file.server.out.address:http://localhost/out}") + private String httpFileServerOutAddress; + private final KafkaTemplate kafkaTemplate; + private final NewTopic outErrorTopic; + private final NewTopic outSuccessTopic; + + @Autowired + public FileUploadService(KafkaTemplate kafkaTemplate, NewTopic outErrorTopic, + NewTopic outSuccessTopic) { + this.kafkaTemplate = kafkaTemplate; + this.outErrorTopic = outErrorTopic; + this.outSuccessTopic = outSuccessTopic; + } + + @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}") + public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) { + InMessage inMessage = new Gson().fromJson(kafkaInMessage, InMessage.class); + + try { + FileUrl fileUrl = parseFileUrl(inMessage.fileInfo().getFileNameBase()); + String filePath = fileBeforeAvPath + fileUrl.fileName(); + String downloadUrl = fileUrl.fileUrl(); + downloadFileByHttp(downloadUrl, filePath); + + AvResponse avResponse = sendFileToAvScan(filePath); + + boolean infected = Arrays.stream(avResponse.verdicts()) + .anyMatch(verdict -> verdict.equalsIgnoreCase("infected")); + + if (infected) { + sendKafkaMessage(outErrorTopic.name(), + new OutErrorMessage("file is infected", avResponse, inMessage) + ); + } + else { + String uploadUrl = httpFileServerOutAddress + "/" + fileUrl.fileName(); + uploadFileByHttp(filePath, uploadUrl); + + inMessage.fileInfo().setFileNameBase(uploadUrl); + sendKafkaMessage(outSuccessTopic.name(), inMessage); + } + + deleteFileByHttp(downloadUrl); + if (new File(filePath).delete()) { + acknowledgment.acknowledge(); + } + } + catch (InvalidHttpFileUrlException e) { + logger.error(e.getMessage() + ": " + kafkaInMessage); + + sendKafkaMessage(outErrorTopic.name(), + new OutErrorMessage(e.getMessage(), null, inMessage) + ); + acknowledgment.acknowledge(); + } + catch (FileUploadException e) { + logger.error(e.getMessage(), e); + } + } + + private static FileUrl parseFileUrl(@NonNull String fileUrl) throws InvalidHttpFileUrlException { + String[] substrings = fileUrl.split("/"); + String fileName = substrings[substrings.length - 1]; + if (substrings.length == 1 || fileName.isBlank()) { + throw new InvalidHttpFileUrlException(fileUrl); + } + else { + return new FileUrl(fileName, fileUrl); + } + } + + private void downloadFileByHttp(String fileUrl, String filePath) + throws InvalidHttpFileUrlException, FileUploadException { + File file = new File(filePath); + HttpGet request = new HttpGet(fileUrl); + + try (CloseableHttpClient client = HttpClients.createDefault(); + CloseableHttpResponse response = client.execute(request)) { + + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + try (FileOutputStream outputStream = new FileOutputStream(file)) { + entity.writeTo(outputStream); + } + } + } + else { + String message = "http status code " + statusCode + " : " + fileUrl; + throw new InvalidHttpFileUrlException(message); + } + } + catch (HttpHostConnectException e) { + throw new FileUploadException(e); + } + catch (IOException e) { + String message = + (e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl; + throw new InvalidHttpFileUrlException(message, e); + } + } + + private AvResponse sendFileToAvScan(String filePath) throws FileUploadException { + File file = new File(filePath); + + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpPost post = new HttpPost(avRestAddress); + post.addHeader("Content-type", "application/json"); + HttpEntity entity = MultipartEntityBuilder.create() + .addPart("file", new FileBody(file)) + .build(); + post.setEntity(entity); + + try (CloseableHttpResponse response = client.execute(post)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != 200) { + String message = "http status code " + statusCode + " for " + avRestAddress + " request"; + throw new FileUploadException(message); + } + return new Gson().fromJson(response.getEntity().toString(), AvResponse.class); + } + } + catch (IOException e) { + throw new FileUploadException(e); + } + } + + private void uploadFileByHttp(String filePath, String uploadUrl) throws FileUploadException { + File file = new File(filePath); + + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpPut put = new HttpPut(uploadUrl); + HttpEntity entity = MultipartEntityBuilder.create() + .addPart("file", new FileBody(file)) + .build(); + put.setEntity(entity); + + try (CloseableHttpResponse response = client.execute(put)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 204) { + logger.warn("file already exists : " + uploadUrl); + } + else if (statusCode != 201) { + String message = "http status code " + statusCode + " : " + uploadUrl; + throw new RuntimeException(message); + } + } + } + catch (IOException e) { + throw new FileUploadException(e); + } + } + + private void deleteFileByHttp(String fileUrl) throws FileUploadException { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpDelete delete = new HttpDelete(fileUrl); + + try (CloseableHttpResponse response = client.execute(delete)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != 204) { + String message = "http status code " + statusCode + " : " + fileUrl; + throw new RuntimeException(message); + } + } + } + catch (IOException e) { + throw new FileUploadException(e); + } + } + + private void sendKafkaMessage(@NonNull String topicName, Object object) { + CompletableFuture> future = kafkaTemplate.send(topicName, + new GsonBuilder().setPrettyPrinting().create().toJson(object) + ); + + future.whenComplete((result, e) -> { + if (e != null) { + String message = + "Unable to send message [" + result.getProducerRecord().value() + "] into kafka topic '" + + topicName + "' due to : " + e.getMessage(); + throw new RuntimeException(message, e); + } + }); + } +} diff --git a/file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java b/file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java new file mode 100644 index 0000000..910181d --- /dev/null +++ b/file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java @@ -0,0 +1,7 @@ +package ru.cg.subproject.service; + +/** + * @author r.latypov + */ +public record FileUrl(String fileName, String fileUrl) { +} diff --git a/file-upload/src/main/resources/application.properties b/file-upload/src/main/resources/application.properties new file mode 100644 index 0000000..35d1ad6 --- /dev/null +++ b/file-upload/src/main/resources/application.properties @@ -0,0 +1,32 @@ +spring.kafka.admin.security.protocol=SASL_PLAINTEXT +spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG"; +spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256 +# +spring.kafka.bootstrap-servers=10.10.31.11:32609 +# +spring.kafka.consumer.bootstrap-servers=10.10.31.11:32609 +spring.kafka.consumer.security.protocol=SASL_PLAINTEXT +spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG"; +spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256 +# +spring.kafka.consumer.enable-auto-commit=false +spring.kafka.consumer.group-id=file-to-upload-consumers +# +spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE +# +spring.kafka.producer.bootstrap-servers=10.10.31.11:32609 +spring.kafka.producer.security.protocol=SASL_PLAINTEXT +spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG"; +spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256 +# +spring.kafka.properties.security.protocol=SASL_PLAINTEXT +spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG"; +spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256 +# +kafka-in.topic.name=file-to-upload +kafka-out.topic.error.name=error +kafka-out.topic.success.name=success +# +av.rest.address=http://:/scans +file.before.av.path=/nginx/transfer/ +http.file.server.out.address=http://localhost/out