Merge branch 'feature/SUPPORT-9322_new_files' into develop

This commit is contained in:
Eduard Tihomiorv 2025-10-20 12:25:00 +03:00
commit 876028c2dc
8 changed files with 96 additions and 241 deletions

View file

@ -1,64 +0,0 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
/**
* @author r.latypov
*/
@Configuration
@EnableKafka
public class OutputKafkaConsumerConfig {
@Value("${spring.kafka.out.consumer.bootstrap.servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.out.consumer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.out.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.out.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.out.consumer.enable.auto.commit}")
private String enableAutoCommit;
@Value("${spring.kafka.out.listener.ack.mode}")
private String ackMode;
@Bean
public ConsumerFactory<String, String> outputConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> outputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(outputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

View file

@ -1,52 +0,0 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author r.latypov
*/
@Configuration
public class OutputKafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap.servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.producer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.producer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.producer.properties.sasl.mechanism}")
private String saslMechanism;
@Bean
public ProducerFactory<String, String> outputProducerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> outputKafkaTemplate() {
return new KafkaTemplate<>(outputProducerFactory());
}
}

View file

@ -1,28 +0,0 @@
package ru.micord.ervu.av.kafka.config.output;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
/**
* @author r.latypov
*/
@Configuration
public class OutputKafkaTopicConfig {
@Value("${kafka.out.error.topic.name}")
private String kafkaOutErrorTopicName;
@Value("${kafka.out.success.topic.name}")
private String kafkaOutSuccessTopicName;
@Bean
public NewTopic outErrorTopic() {
return TopicBuilder.name(kafkaOutErrorTopicName).build();
}
@Bean
public NewTopic outSuccessTopic() {
return TopicBuilder.name(kafkaOutSuccessTopicName).build();
}
}

View file

@ -8,7 +8,7 @@ import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo[] filesInfo) {
@Getter
@AllArgsConstructor
@ -22,6 +22,7 @@ public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
private final String filePatternName;
private final String departureDateTime;
private final String timeZone;
private final String type;
@Setter
private FileStatus fileStatus;
}

View file

@ -1,11 +0,0 @@
package ru.micord.ervu.av.kafka.dto;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
public record FileInfo(String fileId, FileStatus fileStatus) {
}
}

View file

@ -16,11 +16,13 @@ import ru.micord.ervu.av.exception.FileUploadException;
public class S3Service {
private final String outBucketName;
private final AmazonS3 outClient;
private final String s3Prefix;
@Autowired
public S3Service(String outBucketName, AmazonS3 outClient) {
this.outBucketName = outBucketName;
this.outClient = outClient;
this.s3Prefix = "s3://" + outBucketName + "/";
}
@PostConstruct
@ -33,11 +35,19 @@ public class S3Service {
public String putFile(Path filePath, String key) throws FileUploadException {
try {
outClient.putObject(outBucketName, key, filePath.toFile());
return String.join("/", "s3:/", outBucketName, key);
return s3Prefix + key;
}
catch (AmazonServiceException e) {
// todo message
throw new FileUploadException(e);
}
}
public void deleteFileByUrl(String url) {
if (!url.startsWith(s3Prefix)) {
throw new IllegalArgumentException("Некорректный S3 URL: " + url);
}
String key = url.substring(s3Prefix.length());
outClient.deleteObject(outBucketName, key);
}
}

View file

@ -5,6 +5,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import com.google.gson.Gson;
@ -21,13 +24,12 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
import ru.micord.ervu.av.exception.RetryableException;
import ru.micord.ervu.av.kafka.dto.DownloadRequest;
import ru.micord.ervu.av.kafka.dto.DownloadResponse;
import ru.micord.ervu.av.kafka.dto.FileStatus;
import ru.micord.ervu.av.s3.S3Service;
@ -43,10 +45,7 @@ public class FileUploadService {
private boolean avCheckEnabled;
@Value("${file.saving.path}")
private String fileSavingPath;
private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaTemplate<String, String> inKafkaTemplate;
private final NewTopic outErrorTopic;
private final NewTopic outSuccessTopic;
private final NewTopic inStatusTopic;
private final FileManager fIleManager;
private final ReceiveScanReportRetryable receiveScanReportRetryable;
@ -54,15 +53,10 @@ public class FileUploadService {
@Autowired
public FileUploadService(
@Qualifier("outputKafkaTemplate") KafkaTemplate<String, String> kafkaTemplate,
@Qualifier("inputKafkaTemplate") KafkaTemplate<String, String> inKafkaTemplate,
NewTopic outErrorTopic, NewTopic outSuccessTopic,
ReceiveScanReportRetryable receiveScanReportRetryable,
S3Service s3Service, NewTopic inStatusTopic,
FileManager fIleManager) {
this.kafkaTemplate = kafkaTemplate;
this.outErrorTopic = outErrorTopic;
this.outSuccessTopic = outSuccessTopic;
this.receiveScanReportRetryable = receiveScanReportRetryable;
this.s3Service = s3Service;
this.inKafkaTemplate = inKafkaTemplate;
@ -73,18 +67,24 @@ public class FileUploadService {
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment,
@Header("messageId") String messageId) {
@Headers Map<String, Object> headers) {
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
Map<Path, DownloadRequest.FileInfo> tempFilesMap = new HashMap<>();
try {
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
boolean isAvError = false;
int exitCode = 0;
boolean hasError = false;
for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) {
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
Files.createDirectories(Paths.get(fileSavingPath));
LOGGER.info("working in {}", System.getProperty("user.home"));
Path filePath = Paths.get(fileSavingPath, fileUrl.fileName());
String downloadUrl = fileUrl.fileUrl();
fIleManager.downloadFile(downloadUrl, filePath);
boolean isAvError = false;
int exitCode = 0;
tempFilesMap.put(filePath, fileInfo);
if (avCheckEnabled) {
try {
@ -97,68 +97,63 @@ public class FileUploadService {
}
if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) {
downloadRequest.fileInfo().setFileUrl(null);
hasError = true;
break;
}
}
if (hasError) {
FileStatus fileStatus = (exitCode == PASS_PROTECTED_CODE || isAvError)
? FileStatus.FILE_STATUS_11
: FileStatus.FILE_STATUS_02;
downloadRequest.fileInfo().setFileStatus(fileStatus);
if (!isAvError && exitCode == INFECTED_CODE) {
sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate);
for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) {
fileInfo.setFileUrl(null);
fileInfo.setFileStatus(fileStatus);
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
fIleManager.deleteFile(fileUrl.fileUrl());
}
}
else {
String fileRef = s3Service.putFile(filePath, fileUrl.fileName());
downloadRequest.fileInfo().setFileUrl(fileRef);
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate);
for (Map.Entry<Path, DownloadRequest.FileInfo> entry : tempFilesMap.entrySet()) {
Path filePath = entry.getKey();
DownloadRequest.FileInfo fileInfo = entry.getValue();
String fileRef = s3Service.putFile(filePath, filePath.getFileName().toString());
fileInfo.setFileUrl(fileRef);
fileInfo.setFileStatus(FileStatus.FILE_STATUS_03);
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
fIleManager.deleteFile(fileUrl.fileUrl());
}
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
fIleManager.deleteFile(downloadUrl);
}
sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate);
for (Path filePath : tempFilesMap.keySet()) {
try {
Files.delete(filePath);
Files.deleteIfExists(filePath);
}
catch (IOException e) {
LOGGER.error("Failed to delete file " + filePath, e);
}
finally {
acknowledgment.acknowledge();
}
}
catch (InvalidHttpFileUrlException e) {
// считаем, что повторная обработка сообщения не нужна
// ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения
LOGGER.error(e.getMessage() + ": " + kafkaInMessage);
downloadRequest.fileInfo().setFileUrl(null);
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_11);
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
acknowledgment.acknowledge();
for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) {
fileInfo.setFileUrl(null);
fileInfo.setFileStatus(FileStatus.FILE_STATUS_11);
}
sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate);
}
catch (FileUploadException | IOException e) {
// считаем, что нужно повторное считывание сообщения
// ошибку логируем, сообщение оставляем непрочитанным
LOGGER.error(e.getMessage(), e);
}
}
@KafkaListener(id = "${spring.kafka.out.consumer.group.id}",
topics = "${kafka.out.response.topic.name}",
containerFactory = "outputKafkaListenerContainerFactory")
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment,
@Header("messageId") String messageId) {
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
DownloadResponse.class
);
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate);
}
finally {
acknowledgment.acknowledge();
}
}
/* метод для выделения UUID файла из ссылки на файл
сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла
@ -175,11 +170,20 @@ public class FileUploadService {
}
}
private void sendMessage(@NonNull String topicName, Object object, String messageId,
private void sendMessage(@NonNull String topicName, Object object, Map<String, Object> headers,
KafkaTemplate<String, String> template) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
new GsonBuilder().setPrettyPrinting().create().toJson(object));
record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8));
if (headers != null) {
headers.forEach((key, value) -> {
if (value instanceof byte[]) {
record.headers().add(key, (byte[]) value);
}
else if (value != null) {
record.headers().add(key, value.toString().getBytes(StandardCharsets.UTF_8));
}
});
}
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, e) -> {
@ -194,4 +198,15 @@ public class FileUploadService {
private record FileUrl(String fileName, String fileUrl) {
}
@KafkaListener(id = "${spring.kafka.s3.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaMessage, Acknowledgment acknowledgment) {
DownloadRequest downloadRequest = new Gson().fromJson(kafkaMessage,
DownloadRequest.class
);
Arrays.stream(downloadRequest.filesInfo())
.forEach(fileInfo -> s3Service.deleteFileByUrl(fileInfo.getFileUrl()));
acknowledgment.acknowledge();
}
}

View file

@ -30,25 +30,9 @@ spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.securi
spring.kafka.producer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256}
# spring kafka default beans properties <- end
#
# kafka out consumer (not for default bean creation by spring)
#host1:port1, host2:port2
spring.kafka.out.consumer.bootstrap.servers=${ERVU_KAFKA_BOOTSTRAP_SERVERS}
spring.kafka.out.consumer.security.protocol=${ERVU_KAFKA_SECURITY_PROTOCOL:SASL_PLAINTEXT}
#login password to set
spring.kafka.out.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}";
spring.kafka.out.consumer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256}
#
spring.kafka.out.consumer.enable.auto.commit=false
spring.kafka.out.consumer.group.id=${ERVU_KAFKA_GROUP_ID:response-consumers}
# kafka out listeners
spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE
#
#
kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME}
kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME}
kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME}
kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME}
kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME}
kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME}
#
av.check.enabled=${AV_CHECK_ENABLED:true}
av.rest.address=${AV_REST_ADDRESS}