SUPPORT-9322: Fix

This commit is contained in:
Eduard Tihomirov 2025-09-08 10:26:02 +03:00
parent 53ff1458b4
commit 9e5474681a
8 changed files with 39 additions and 222 deletions

View file

@ -1,64 +0,0 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
/**
* @author r.latypov
*/
@Configuration
@EnableKafka
public class OutputKafkaConsumerConfig {
@Value("${spring.kafka.out.consumer.bootstrap.servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.out.consumer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.out.consumer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.out.consumer.properties.sasl.mechanism}")
private String saslMechanism;
@Value("${spring.kafka.out.consumer.enable.auto.commit}")
private String enableAutoCommit;
@Value("${spring.kafka.out.listener.ack.mode}")
private String ackMode;
@Bean
public ConsumerFactory<String, String> outputConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> outputKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(outputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}

View file

@ -1,52 +0,0 @@
package ru.micord.ervu.av.kafka.config.output;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author r.latypov
*/
@Configuration
public class OutputKafkaProducerConfig {
@Value("${spring.kafka.producer.bootstrap.servers}")
private List<String> bootstrapAddress;
@Value("${spring.kafka.producer.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.producer.properties.sasl.jaas.config}")
private String jaasConfig;
@Value("${spring.kafka.producer.properties.sasl.mechanism}")
private String saslMechanism;
@Bean
public ProducerFactory<String, String> outputProducerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> outputKafkaTemplate() {
return new KafkaTemplate<>(outputProducerFactory());
}
}

View file

@ -1,28 +0,0 @@
package ru.micord.ervu.av.kafka.config.output;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
/**
* @author r.latypov
*/
@Configuration
public class OutputKafkaTopicConfig {
@Value("${kafka.out.error.topic.name}")
private String kafkaOutErrorTopicName;
@Value("${kafka.out.success.topic.name}")
private String kafkaOutSuccessTopicName;
@Bean
public NewTopic outErrorTopic() {
return TopicBuilder.name(kafkaOutErrorTopicName).build();
}
@Bean
public NewTopic outSuccessTopic() {
return TopicBuilder.name(kafkaOutSuccessTopicName).build();
}
}

View file

@ -22,6 +22,7 @@ public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo[] filesInfo) {
private final String filePatternName; private final String filePatternName;
private final String departureDateTime; private final String departureDateTime;
private final String timeZone; private final String timeZone;
private final String type;
@Setter @Setter
private FileStatus fileStatus; private FileStatus fileStatus;
} }

View file

@ -1,11 +0,0 @@
package ru.micord.ervu.av.kafka.dto;
import org.springframework.lang.NonNull;
/**
* @author r.latypov
*/
public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
public record FileInfo(String fileId, FileStatus fileStatus) {
}
}

View file

@ -40,4 +40,13 @@ public class S3Service {
throw new FileUploadException(e); throw new FileUploadException(e);
} }
} }
public void deleteFileByUrl(String url) {
String prefix = "s3://" + outBucketName + "/";
if (!url.startsWith(prefix)) {
throw new IllegalArgumentException("Некорректный S3 URL: " + url);
}
String key = url.substring(prefix.length());
outClient.deleteObject(outBucketName, key);
}
} }

View file

@ -24,13 +24,12 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull; import org.springframework.lang.NonNull;
import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
import ru.micord.ervu.av.exception.RetryableException; import ru.micord.ervu.av.exception.RetryableException;
import ru.micord.ervu.av.kafka.dto.DownloadRequest; import ru.micord.ervu.av.kafka.dto.DownloadRequest;
import ru.micord.ervu.av.kafka.dto.DownloadResponse;
import ru.micord.ervu.av.kafka.dto.FileStatus; import ru.micord.ervu.av.kafka.dto.FileStatus;
import ru.micord.ervu.av.s3.S3Service; import ru.micord.ervu.av.s3.S3Service;
@ -46,10 +45,7 @@ public class FileUploadService {
private boolean avCheckEnabled; private boolean avCheckEnabled;
@Value("${file.saving.path}") @Value("${file.saving.path}")
private String fileSavingPath; private String fileSavingPath;
private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaTemplate<String, String> inKafkaTemplate; private final KafkaTemplate<String, String> inKafkaTemplate;
private final NewTopic outErrorTopic;
private final NewTopic outSuccessTopic;
private final NewTopic inStatusTopic; private final NewTopic inStatusTopic;
private final FileManager fIleManager; private final FileManager fIleManager;
private final ReceiveScanReportRetryable receiveScanReportRetryable; private final ReceiveScanReportRetryable receiveScanReportRetryable;
@ -57,15 +53,10 @@ public class FileUploadService {
@Autowired @Autowired
public FileUploadService( public FileUploadService(
@Qualifier("outputKafkaTemplate") KafkaTemplate<String, String> kafkaTemplate,
@Qualifier("inputKafkaTemplate") KafkaTemplate<String, String> inKafkaTemplate, @Qualifier("inputKafkaTemplate") KafkaTemplate<String, String> inKafkaTemplate,
NewTopic outErrorTopic, NewTopic outSuccessTopic,
ReceiveScanReportRetryable receiveScanReportRetryable, ReceiveScanReportRetryable receiveScanReportRetryable,
S3Service s3Service, NewTopic inStatusTopic, S3Service s3Service, NewTopic inStatusTopic,
FileManager fIleManager) { FileManager fIleManager) {
this.kafkaTemplate = kafkaTemplate;
this.outErrorTopic = outErrorTopic;
this.outSuccessTopic = outSuccessTopic;
this.receiveScanReportRetryable = receiveScanReportRetryable; this.receiveScanReportRetryable = receiveScanReportRetryable;
this.s3Service = s3Service; this.s3Service = s3Service;
this.inKafkaTemplate = inKafkaTemplate; this.inKafkaTemplate = inKafkaTemplate;
@ -76,7 +67,7 @@ public class FileUploadService {
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory") containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment, public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment,
@Header("messageId") String messageId) { @Headers Map<String, Object> headers) {
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
Map<Path, DownloadRequest.FileInfo> tempFilesMap = new HashMap<>(); Map<Path, DownloadRequest.FileInfo> tempFilesMap = new HashMap<>();
@ -121,9 +112,6 @@ public class FileUploadService {
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
fIleManager.deleteFile(fileUrl.fileUrl()); fIleManager.deleteFile(fileUrl.fileUrl());
} }
if (!isAvError && exitCode == INFECTED_CODE) {
sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate);
}
} }
else { else {
for (Map.Entry<Path, DownloadRequest.FileInfo> entry : tempFilesMap.entrySet()) { for (Map.Entry<Path, DownloadRequest.FileInfo> entry : tempFilesMap.entrySet()) {
@ -135,13 +123,8 @@ public class FileUploadService {
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
fIleManager.deleteFile(fileUrl.fileUrl()); fIleManager.deleteFile(fileUrl.fileUrl());
} }
sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate);
} }
DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest); sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate);
DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(),
new DownloadRequest.FileInfo[] {csvFile}
);
sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate);
for (Path filePath : tempFilesMap.keySet()) { for (Path filePath : tempFilesMap.keySet()) {
try { try {
@ -156,13 +139,11 @@ public class FileUploadService {
// считаем, что повторная обработка сообщения не нужна // считаем, что повторная обработка сообщения не нужна
// ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения // ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения
LOGGER.error(e.getMessage() + ": " + kafkaInMessage); LOGGER.error(e.getMessage() + ": " + kafkaInMessage);
DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest); for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) {
csvFile.setFileStatus(FileStatus.FILE_STATUS_11); fileInfo.setFileUrl(null);
csvFile.setFileUrl(null); fileInfo.setFileStatus(FileStatus.FILE_STATUS_11);
DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(), }
new DownloadRequest.FileInfo[] {csvFile} sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate);
);
sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate);
} }
catch (FileUploadException | IOException e) { catch (FileUploadException | IOException e) {
// считаем, что нужно повторное считывание сообщения // считаем, что нужно повторное считывание сообщения
@ -174,22 +155,6 @@ public class FileUploadService {
} }
} }
@KafkaListener(id = "${spring.kafka.out.consumer.group.id}",
topics = "${kafka.out.response.topic.name}",
containerFactory = "outputKafkaListenerContainerFactory")
public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment,
@Header("messageId") String messageId) {
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
DownloadResponse.class
);
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
sendMessage(inStatusTopic.name(), downloadResponse, messageId, inKafkaTemplate);
}
acknowledgment.acknowledge();
}
/* метод для выделения UUID файла из ссылки на файл /* метод для выделения UUID файла из ссылки на файл
сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла
*/ */
@ -205,11 +170,20 @@ public class FileUploadService {
} }
} }
private void sendMessage(@NonNull String topicName, Object object, String messageId, private void sendMessage(@NonNull String topicName, Object object, Map<String, Object> headers,
KafkaTemplate<String, String> template) { KafkaTemplate<String, String> template) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
new GsonBuilder().setPrettyPrinting().create().toJson(object)); new GsonBuilder().setPrettyPrinting().create().toJson(object));
record.headers().add("messageId", messageId.getBytes(StandardCharsets.UTF_8)); if (headers != null) {
headers.forEach((key, value) -> {
if (value instanceof byte[]) {
record.headers().add(key, (byte[]) value);
}
else if (value != null) {
record.headers().add(key, value.toString().getBytes(StandardCharsets.UTF_8));
}
});
}
CompletableFuture<SendResult<String, String>> future = template.send(record); CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, e) -> { future.whenComplete((result, e) -> {
@ -225,10 +199,14 @@ public class FileUploadService {
private record FileUrl(String fileName, String fileUrl) { private record FileUrl(String fileName, String fileUrl) {
} }
private DownloadRequest.FileInfo getCsvFile(DownloadRequest downloadRequest) { @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}",
return Arrays.stream(downloadRequest.filesInfo()) containerFactory = "inputKafkaListenerContainerFactory")
.filter(fi -> fi.getFileName() != null && fi.getFileName().endsWith(".csv")) public void listenKafkaIn(String kafkaMessage, Acknowledgment acknowledgment) {
.findFirst() DownloadRequest downloadRequest = new Gson().fromJson(kafkaMessage,
.orElse(null); DownloadRequest.class
);
Arrays.stream(downloadRequest.filesInfo())
.forEach(fileInfo -> s3Service.deleteFileByUrl(fileInfo.getFileUrl()));
acknowledgment.acknowledge();
} }
} }

View file

@ -30,25 +30,9 @@ spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.securi
spring.kafka.producer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} spring.kafka.producer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256}
# spring kafka default beans properties <- end # spring kafka default beans properties <- end
# #
# kafka out consumer (not for default bean creation by spring)
#host1:port1, host2:port2
spring.kafka.out.consumer.bootstrap.servers=${ERVU_KAFKA_BOOTSTRAP_SERVERS}
spring.kafka.out.consumer.security.protocol=${ERVU_KAFKA_SECURITY_PROTOCOL:SASL_PLAINTEXT}
#login password to set
spring.kafka.out.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}";
spring.kafka.out.consumer.properties.sasl.mechanism=${ERVU_KAFKA_SASL_MECHANISM:SCRAM-SHA-256}
#
spring.kafka.out.consumer.enable.auto.commit=false
spring.kafka.out.consumer.group.id=${ERVU_KAFKA_GROUP_ID:response-consumers}
# kafka out listeners
spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE
#
#
kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME} kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME}
kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME} kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME}
kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME} kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME}
kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME}
kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME}
# #
av.check.enabled=${AV_CHECK_ENABLED:true} av.check.enabled=${AV_CHECK_ENABLED:true}
av.rest.address=${AV_REST_ADDRESS} av.rest.address=${AV_REST_ADDRESS}