diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d99b869
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,22 @@
+#ignore target and tmp dir
+target*/
+tmp*/
+debug.log
+
+#
+# IntelliJ IDEA project files
+#
+.idea*/
+.classes*/
+*.ipr
+*.iml
+*.iws
+*.ids
+
+# os meta files
+Thumbs.db
+.DS_Store
+
+*.orig
+pom.xml.versionsBackup
+.flattened-pom.xml
diff --git a/file-upload/pom.xml b/file-upload/pom.xml
new file mode 100644
index 0000000..816b840
--- /dev/null
+++ b/file-upload/pom.xml
@@ -0,0 +1,96 @@
+
+
+ 4.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 3.3.0
+
+
+
+ ru.cg.subproject
+ file-upload
+ 0.0.1-SNAPSHOT
+
+
+ 17
+
+
+
+
+
+ com.google.code.gson
+ gson
+ 2.11.0
+
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.14
+
+
+ org.apache.httpcomponents
+ httpmime
+ 4.5.14
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.34
+ provided
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ 3.2.1
+
+
+
+
+
+
+ com.google.code.gson
+ gson
+
+
+
+ org.apache.httpcomponents
+ httpclient
+
+
+ org.apache.httpcomponents
+ httpmime
+
+
+
+ org.projectlombok
+ lombok
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+ org.springframework.kafka
+ spring-kafka
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
diff --git a/file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java b/file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java
new file mode 100644
index 0000000..f9f4b06
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/FileUploadApplication.java
@@ -0,0 +1,15 @@
+package ru.cg.subproject;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * @author r.latypov
+ */
+@SpringBootApplication
+public class FileUploadApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(FileUploadApplication.class, args);
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java b/file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java
new file mode 100644
index 0000000..92a894f
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/av/AvResponse.java
@@ -0,0 +1,14 @@
+package ru.cg.subproject.av;
+
+/**
+ * @author r.latypov
+ */
+public record AvResponse(String completed, String created, String progress, ScanResult scan_result,
+ String status, String[] verdicts) {
+ public record ScanResult(Scan noname) {
+ public record Scan(String started, String stopped, Threat[] threats, String verdict) {
+ public record Threat(String name, String object) {
+ }
+ }
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java b/file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java
new file mode 100644
index 0000000..b11dea2
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/exception/FileUploadException.java
@@ -0,0 +1,14 @@
+package ru.cg.subproject.exception;
+
+/**
+ * @author r.latypov
+ */
+public class FileUploadException extends Exception {
+ public FileUploadException(String message) {
+ super(message);
+ }
+
+ public FileUploadException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java b/file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java
new file mode 100644
index 0000000..9ff1f80
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/exception/InvalidHttpFileUrlException.java
@@ -0,0 +1,20 @@
+package ru.cg.subproject.exception;
+
+/**
+ * @author r.latypov
+ */
+public class InvalidHttpFileUrlException extends Exception {
+ private static final String MESSAGE = "file url is not valid";
+
+ public InvalidHttpFileUrlException() {
+ super(MESSAGE);
+ }
+
+ public InvalidHttpFileUrlException(String message) {
+ super(String.join(" : ", MESSAGE, message));
+ }
+
+ public InvalidHttpFileUrlException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java
new file mode 100644
index 0000000..f428816
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaConsumerConfig.java
@@ -0,0 +1,64 @@
+package ru.cg.subproject.kafka.config;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+
+/**
+ * @author r.latypov
+ */
+@Configuration
+@EnableKafka
+public class KafkaConsumerConfig {
+ @Value("${spring.kafka.consumer.bootstrap-servers}")
+ private List bootstrapAddress;
+ @Value("${spring.kafka.consumer.security.protocol}")
+ private String securityProtocol;
+ @Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
+ private String jaasConfig;
+ @Value("${spring.kafka.consumer.properties.sasl.mechanism}")
+ private String saslMechanism;
+ @Value("${spring.kafka.consumer.enable-auto-commit}")
+ private String enableAutoCommit;
+ @Value("${spring.kafka.listener.ack-mode}")
+ private String ackMode;
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
+ Map configs = new HashMap<>();
+
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
+ configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+
+ configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
+
+ return new DefaultKafkaConsumerFactory<>(configs);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+ return factory;
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java
new file mode 100644
index 0000000..389ec9b
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaProducerConfig.java
@@ -0,0 +1,51 @@
+package ru.cg.subproject.kafka.config;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+/**
+ * @author r.latypov
+ */
+@Configuration
+public class KafkaProducerConfig {
+ @Value("${spring.kafka.producer.bootstrap-servers}")
+ private List bootstrapAddress;
+ @Value("${spring.kafka.producer.security.protocol}")
+ private String securityProtocol;
+ @Value("${spring.kafka.producer.properties.sasl.jaas.config}")
+ private String jaasConfig;
+ @Value("${spring.kafka.producer.properties.sasl.mechanism}")
+ private String saslMechanism;
+
+ @Bean
+ public ProducerFactory producerFactory() {
+ Map configs = new HashMap<>();
+
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
+ configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+
+ return new DefaultKafkaProducerFactory<>(configs);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java
new file mode 100644
index 0000000..a44b82f
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/kafka/config/KafkaTopicConfig.java
@@ -0,0 +1,28 @@
+package ru.cg.subproject.kafka.config;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.TopicBuilder;
+
+/**
+ * @author r.latypov
+ */
+@Configuration
+public class KafkaTopicConfig {
+ @Value("${kafka-out.topic.error.name}")
+ private String kafkaOutTopicErrorName;
+ @Value("${kafka-out.topic.success.name}")
+ private String kafkaOutTopicSuccessName;
+
+ @Bean
+ public NewTopic outErrorTopic() {
+ return TopicBuilder.name(kafkaOutTopicErrorName).build();
+ }
+
+ @Bean
+ public NewTopic outSuccessTopic() {
+ return TopicBuilder.name(kafkaOutTopicSuccessName).build();
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java
new file mode 100644
index 0000000..1b388e1
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/InMessage.java
@@ -0,0 +1,36 @@
+package ru.cg.subproject.kafka.dto;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.lang.NonNull;
+
+/**
+ * @author r.latypov
+ */
+public record InMessage(OrgInfo orgInfo, SenderInfo senderInfo, @NonNull FileInfo fileInfo) {
+ public record OrgInfo(String orgName, String orgTypeCode, String orgTypeName, String ogrn,
+ String in, String kpp) {
+ }
+
+ public record SenderInfo(String senderLastName, String senderFirstName,
+ String senderMiddleName, String birthDate, String senderRoleCode,
+ String senderRoleName, String snils, String idERN,
+ Document document) {
+
+ public record Document(String series, String number, String issueDate) {
+ }
+ }
+
+ @Getter
+ @AllArgsConstructor
+ public static class FileInfo {
+ @NonNull
+ @Setter
+ private String fileNameBase;
+ private final String fileName;
+ private final String filePatternCode;
+ private final String FilePatternName;
+ private final String departureDateTime;
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java
new file mode 100644
index 0000000..0cba965
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/kafka/dto/OutErrorMessage.java
@@ -0,0 +1,12 @@
+package ru.cg.subproject.kafka.dto;
+
+import org.springframework.lang.NonNull;
+
+import ru.cg.subproject.av.AvResponse;
+
+/**
+ * @author r.latypov
+ */
+public record OutErrorMessage(String errorMessage, AvResponse avResponse,
+ @NonNull InMessage inMessage) {
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java b/file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java
new file mode 100644
index 0000000..78965f4
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/service/FileUploadService.java
@@ -0,0 +1,235 @@
+package ru.cg.subproject.service;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.HttpHostConnectException;
+import org.apache.http.entity.mime.MultipartEntityBuilder;
+import org.apache.http.entity.mime.content.FileBody;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.lang.NonNull;
+import org.springframework.stereotype.Service;
+
+import ru.cg.subproject.av.AvResponse;
+import ru.cg.subproject.exception.FileUploadException;
+import ru.cg.subproject.exception.InvalidHttpFileUrlException;
+import ru.cg.subproject.kafka.dto.InMessage;
+import ru.cg.subproject.kafka.dto.OutErrorMessage;
+
+/**
+ * @author r.latypov
+ */
+@Service
+public class FileUploadService {
+ private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class);
+ @Value("${av.rest.address}")
+ private String avRestAddress;
+ @Value("${file.before.av.path:}")
+ private String fileBeforeAvPath;
+ @Value("${http.file.server.out.address:http://localhost/out}")
+ private String httpFileServerOutAddress;
+ private final KafkaTemplate kafkaTemplate;
+ private final NewTopic outErrorTopic;
+ private final NewTopic outSuccessTopic;
+
+ @Autowired
+ public FileUploadService(KafkaTemplate kafkaTemplate, NewTopic outErrorTopic,
+ NewTopic outSuccessTopic) {
+ this.kafkaTemplate = kafkaTemplate;
+ this.outErrorTopic = outErrorTopic;
+ this.outSuccessTopic = outSuccessTopic;
+ }
+
+ @KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}")
+ public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
+ InMessage inMessage = new Gson().fromJson(kafkaInMessage, InMessage.class);
+
+ try {
+ FileUrl fileUrl = parseFileUrl(inMessage.fileInfo().getFileNameBase());
+ String filePath = fileBeforeAvPath + fileUrl.fileName();
+ String downloadUrl = fileUrl.fileUrl();
+ downloadFileByHttp(downloadUrl, filePath);
+
+ AvResponse avResponse = sendFileToAvScan(filePath);
+
+ boolean infected = Arrays.stream(avResponse.verdicts())
+ .anyMatch(verdict -> verdict.equalsIgnoreCase("infected"));
+
+ if (infected) {
+ sendKafkaMessage(outErrorTopic.name(),
+ new OutErrorMessage("file is infected", avResponse, inMessage)
+ );
+ }
+ else {
+ String uploadUrl = httpFileServerOutAddress + "/" + fileUrl.fileName();
+ uploadFileByHttp(filePath, uploadUrl);
+
+ inMessage.fileInfo().setFileNameBase(uploadUrl);
+ sendKafkaMessage(outSuccessTopic.name(), inMessage);
+ }
+
+ deleteFileByHttp(downloadUrl);
+ if (new File(filePath).delete()) {
+ acknowledgment.acknowledge();
+ }
+ }
+ catch (InvalidHttpFileUrlException e) {
+ logger.error(e.getMessage() + ": " + kafkaInMessage);
+
+ sendKafkaMessage(outErrorTopic.name(),
+ new OutErrorMessage(e.getMessage(), null, inMessage)
+ );
+ acknowledgment.acknowledge();
+ }
+ catch (FileUploadException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ private static FileUrl parseFileUrl(@NonNull String fileUrl) throws InvalidHttpFileUrlException {
+ String[] substrings = fileUrl.split("/");
+ String fileName = substrings[substrings.length - 1];
+ if (substrings.length == 1 || fileName.isBlank()) {
+ throw new InvalidHttpFileUrlException(fileUrl);
+ }
+ else {
+ return new FileUrl(fileName, fileUrl);
+ }
+ }
+
+ private void downloadFileByHttp(String fileUrl, String filePath)
+ throws InvalidHttpFileUrlException, FileUploadException {
+ File file = new File(filePath);
+ HttpGet request = new HttpGet(fileUrl);
+
+ try (CloseableHttpClient client = HttpClients.createDefault();
+ CloseableHttpResponse response = client.execute(request)) {
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == 200) {
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ try (FileOutputStream outputStream = new FileOutputStream(file)) {
+ entity.writeTo(outputStream);
+ }
+ }
+ }
+ else {
+ String message = "http status code " + statusCode + " : " + fileUrl;
+ throw new InvalidHttpFileUrlException(message);
+ }
+ }
+ catch (HttpHostConnectException e) {
+ throw new FileUploadException(e);
+ }
+ catch (IOException e) {
+ String message =
+ (e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl;
+ throw new InvalidHttpFileUrlException(message, e);
+ }
+ }
+
+ private AvResponse sendFileToAvScan(String filePath) throws FileUploadException {
+ File file = new File(filePath);
+
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ HttpPost post = new HttpPost(avRestAddress);
+ post.addHeader("Content-type", "application/json");
+ HttpEntity entity = MultipartEntityBuilder.create()
+ .addPart("file", new FileBody(file))
+ .build();
+ post.setEntity(entity);
+
+ try (CloseableHttpResponse response = client.execute(post)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ String message = "http status code " + statusCode + " for " + avRestAddress + " request";
+ throw new FileUploadException(message);
+ }
+ return new Gson().fromJson(response.getEntity().toString(), AvResponse.class);
+ }
+ }
+ catch (IOException e) {
+ throw new FileUploadException(e);
+ }
+ }
+
+ private void uploadFileByHttp(String filePath, String uploadUrl) throws FileUploadException {
+ File file = new File(filePath);
+
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ HttpPut put = new HttpPut(uploadUrl);
+ HttpEntity entity = MultipartEntityBuilder.create()
+ .addPart("file", new FileBody(file))
+ .build();
+ put.setEntity(entity);
+
+ try (CloseableHttpResponse response = client.execute(put)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == 204) {
+ logger.warn("file already exists : " + uploadUrl);
+ }
+ else if (statusCode != 201) {
+ String message = "http status code " + statusCode + " : " + uploadUrl;
+ throw new RuntimeException(message);
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new FileUploadException(e);
+ }
+ }
+
+ private void deleteFileByHttp(String fileUrl) throws FileUploadException {
+ try (CloseableHttpClient client = HttpClients.createDefault()) {
+ HttpDelete delete = new HttpDelete(fileUrl);
+
+ try (CloseableHttpResponse response = client.execute(delete)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 204) {
+ String message = "http status code " + statusCode + " : " + fileUrl;
+ throw new RuntimeException(message);
+ }
+ }
+ }
+ catch (IOException e) {
+ throw new FileUploadException(e);
+ }
+ }
+
+ private void sendKafkaMessage(@NonNull String topicName, Object object) {
+ CompletableFuture> future = kafkaTemplate.send(topicName,
+ new GsonBuilder().setPrettyPrinting().create().toJson(object)
+ );
+
+ future.whenComplete((result, e) -> {
+ if (e != null) {
+ String message =
+ "Unable to send message [" + result.getProducerRecord().value() + "] into kafka topic '"
+ + topicName + "' due to : " + e.getMessage();
+ throw new RuntimeException(message, e);
+ }
+ });
+ }
+}
diff --git a/file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java b/file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java
new file mode 100644
index 0000000..910181d
--- /dev/null
+++ b/file-upload/src/main/java/ru/cg/subproject/service/FileUrl.java
@@ -0,0 +1,7 @@
+package ru.cg.subproject.service;
+
+/**
+ * @author r.latypov
+ */
+public record FileUrl(String fileName, String fileUrl) {
+}
diff --git a/file-upload/src/main/resources/application.properties b/file-upload/src/main/resources/application.properties
new file mode 100644
index 0000000..35d1ad6
--- /dev/null
+++ b/file-upload/src/main/resources/application.properties
@@ -0,0 +1,32 @@
+spring.kafka.admin.security.protocol=SASL_PLAINTEXT
+spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
+spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
+#
+spring.kafka.bootstrap-servers=10.10.31.11:32609
+#
+spring.kafka.consumer.bootstrap-servers=10.10.31.11:32609
+spring.kafka.consumer.security.protocol=SASL_PLAINTEXT
+spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
+spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
+#
+spring.kafka.consumer.enable-auto-commit=false
+spring.kafka.consumer.group-id=file-to-upload-consumers
+#
+spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+#
+spring.kafka.producer.bootstrap-servers=10.10.31.11:32609
+spring.kafka.producer.security.protocol=SASL_PLAINTEXT
+spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
+spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
+#
+spring.kafka.properties.security.protocol=SASL_PLAINTEXT
+spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
+spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
+#
+kafka-in.topic.name=file-to-upload
+kafka-out.topic.error.name=error
+kafka-out.topic.success.name=success
+#
+av.rest.address=http://:/scans
+file.before.av.path=/nginx/transfer/
+http.file.server.out.address=http://localhost/out