SUPPORT-9518: configure batch listener; add parallel file handling; add retryable for s3

This commit is contained in:
gulnaz 2025-11-01 11:15:27 +03:00
parent 83de2bb8b0
commit 3667597949
5 changed files with 115 additions and 28 deletions

View file

@ -7,6 +7,11 @@ AV_KAFKA_GROUP_ID=file-to-upload-consumers
AV_KAFKA_TOPIC_NAME=file-to-upload AV_KAFKA_TOPIC_NAME=file-to-upload
AV_KAFKA_STATUS_TOPIC_NAME=ervu.lkrp.av-fileupload-status AV_KAFKA_STATUS_TOPIC_NAME=ervu.lkrp.av-fileupload-status
AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY=1 AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY=1
AV_KAFKA_CONSUMER_FETCH_MIN_BYTES=1
AV_KAFKA_CONSUMER_FETCH_MAX_WAIT=500
AV_KAFKA_CONSUMER_MAX_POLL_RECORDS=500
AV_KAFKA_CONSUMER_RECOVERY_INTERVAL=0
AV_KAFKA_CONSUMER_RECOVERY_ATTEMPTS=0
ERVU_KAFKA_BOOTSTRAP_SERVERS=10.10.31.11:32609 ERVU_KAFKA_BOOTSTRAP_SERVERS=10.10.31.11:32609
ERVU_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT ERVU_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
@ -19,6 +24,7 @@ ERVU_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request
ERVU_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response ERVU_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response
AV_CHECK_ENABLED=true AV_CHECK_ENABLED=true
AV_THREAD_COUNT=8
AV_REST_ADDRESS=http://10.10.31.118:8085/scans AV_REST_ADDRESS=http://10.10.31.118:8085/scans
AV_FIRST_TIMEOUT_MILLISECONDS=1000 AV_FIRST_TIMEOUT_MILLISECONDS=1000
AV_RETRY_MAX_ATTEMPTS_COUNT=10 AV_RETRY_MAX_ATTEMPTS_COUNT=10

View file

@ -8,14 +8,19 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
/** /**
* @author r.latypov * @author r.latypov
@ -23,6 +28,8 @@ import org.springframework.kafka.listener.ContainerProperties;
@Configuration @Configuration
@EnableKafka @EnableKafka
public class InputKafkaConsumerConfig { public class InputKafkaConsumerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(InputKafkaConsumerConfig.class);
@Value("${spring.kafka.consumer.bootstrap.servers}") @Value("${spring.kafka.consumer.bootstrap.servers}")
private List<String> bootstrapAddress; private List<String> bootstrapAddress;
@Value("${spring.kafka.consumer.security.protocol}") @Value("${spring.kafka.consumer.security.protocol}")
@ -33,10 +40,20 @@ public class InputKafkaConsumerConfig {
private String saslMechanism; private String saslMechanism;
@Value("${spring.kafka.consumer.enable.auto.commit}") @Value("${spring.kafka.consumer.enable.auto.commit}")
private String enableAutoCommit; private String enableAutoCommit;
@Value("${spring.kafka.consumer.fetch.min.bytes}")
private int fetchMinBytes;
@Value("${spring.kafka.consumer.fetch.max.wait}")
private int fetchMaxWait;
@Value("${spring.kafka.consumer.max.poll.records}")
private int maxPollRecords;
@Value("${spring.kafka.listener.ack.mode}") @Value("${spring.kafka.listener.ack.mode}")
private String ackMode; private String ackMode;
@Value("${spring.kafka.consumer.concurrency:1}") @Value("${spring.kafka.consumer.concurrency}")
private int consumerConcurrency; private int consumerConcurrency;
@Value("${spring.kafka.consumer.recovery.interval}")
private long recoveryInterval;
@Value("${spring.kafka.consumer.recovery.attempts}")
private long maxRecoveryAttempts;
@Bean @Bean
public ConsumerFactory<String, String> inputConsumerFactory() { public ConsumerFactory<String, String> inputConsumerFactory() {
@ -51,6 +68,9 @@ public class InputKafkaConsumerConfig {
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes);
configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWait);
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
return new DefaultKafkaConsumerFactory<>(configs); return new DefaultKafkaConsumerFactory<>(configs);
} }
@ -60,8 +80,20 @@ public class InputKafkaConsumerConfig {
ConcurrentKafkaListenerContainerFactory<String, String> factory = ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>(); new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(inputConsumerFactory()); factory.setConsumerFactory(inputConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
factory.setConcurrency(consumerConcurrency); factory.setConcurrency(consumerConcurrency);
factory.setBatchListener(true);
factory.setCommonErrorHandler(defaultErrorHandler());
return factory; return factory;
} }
@Bean
public DefaultErrorHandler defaultErrorHandler() {
DefaultErrorHandler handler = new DefaultErrorHandler(
(record, ex) -> LOGGER.error("recovery failed for message {}, {}", record.value(), ex.getMessage()),
new FixedBackOff(recoveryInterval, maxRecoveryAttempts)
);
handler.setLogLevel(KafkaException.Level.TRACE);
return handler;
}
} }

View file

@ -6,8 +6,11 @@ import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.RetryableException;
/** /**
* @author r.latypov * @author r.latypov
@ -32,6 +35,9 @@ public class S3Service {
} }
} }
@Retryable(retryFor = {RetryableException.class},
maxAttemptsExpression = "${av.retry.max.attempts.count}",
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
public String putFile(Path filePath, String key) throws FileUploadException { public String putFile(Path filePath, String key) throws FileUploadException {
try { try {
outClient.putObject(outBucketName, key, filePath.toFile()); outClient.putObject(outBucketName, key, filePath.toFile());
@ -43,6 +49,9 @@ public class S3Service {
} }
} }
@Retryable(retryFor = {RetryableException.class},
maxAttemptsExpression = "${av.retry.max.attempts.count}",
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
public void deleteFileByUrl(String url) { public void deleteFileByUrl(String url) {
if (!url.startsWith(s3Prefix)) { if (!url.startsWith(s3Prefix)) {
throw new IllegalArgumentException("Некорректный S3 URL: " + url); throw new IllegalArgumentException("Некорректный S3 URL: " + url);

View file

@ -5,15 +5,22 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -21,10 +28,10 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.BatchListenerFailedException;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull; import org.springframework.lang.NonNull;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.FileUploadException;
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
@ -41,10 +48,14 @@ public class FileUploadService {
private static final int INFECTED_CODE = 72; private static final int INFECTED_CODE = 72;
private static final int PASS_PROTECTED_CODE = 73; private static final int PASS_PROTECTED_CODE = 73;
private static final Logger LOGGER = LoggerFactory.getLogger(FileUploadService.class); private static final Logger LOGGER = LoggerFactory.getLogger(FileUploadService.class);
@Value("${av.check.enabled}") @Value("${av.check.enabled}")
private boolean avCheckEnabled; private boolean avCheckEnabled;
@Value("${av.thread.count}")
private int threadCount;
@Value("${file.saving.path}") @Value("${file.saving.path}")
private String fileSavingPath; private String fileSavingPath;
private final KafkaTemplate<String, String> inKafkaTemplate; private final KafkaTemplate<String, String> inKafkaTemplate;
private final NewTopic inStatusTopic; private final NewTopic inStatusTopic;
private final FileManager fIleManager; private final FileManager fIleManager;
@ -65,11 +76,27 @@ public class FileUploadService {
} }
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory") batch = "true", containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment, public void listenKafkaIn(ConsumerRecords<String, String> records) {
@Headers Map<String, Object> headers) { ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
try {
List<CompletableFuture<Void>> futures = new ArrayList<>();
records.forEach(record -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> handleRecord(record),
executorService);
futures.add(future);
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
finally {
executorService.shutdown();
}
}
private void handleRecord(ConsumerRecord<String, String> record) {
String kafkaInMessage = record.value();
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
Map<Path, DownloadRequest.FileInfo> tempFilesMap = new HashMap<>(); Map<Path, DownloadRequest.FileInfo> tempFilesMap = new HashMap<>();
try { try {
@ -124,7 +151,7 @@ public class FileUploadService {
fIleManager.deleteFile(fileUrl.fileUrl()); fIleManager.deleteFile(fileUrl.fileUrl());
} }
} }
sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); sendMessage(inStatusTopic.name(), downloadRequest, record.headers(), inKafkaTemplate);
for (Path filePath : tempFilesMap.keySet()) { for (Path filePath : tempFilesMap.keySet()) {
try { try {
@ -143,15 +170,16 @@ public class FileUploadService {
fileInfo.setFileUrl(null); fileInfo.setFileUrl(null);
fileInfo.setFileStatus(FileStatus.FILE_STATUS_11); fileInfo.setFileStatus(FileStatus.FILE_STATUS_11);
} }
sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); sendMessage(inStatusTopic.name(), downloadRequest, record.headers(), inKafkaTemplate);
} }
catch (FileUploadException | IOException e) { catch (FileUploadException | IOException e) {
// считаем, что нужно повторное считывание сообщения // считаем, что нужно повторное считывание сообщения
// ошибку логируем, сообщение оставляем непрочитанным // ошибку логируем, сообщение оставляем непрочитанным
LOGGER.error(e.getMessage(), e); LOGGER.error(e.getMessage(), e);
} }
finally { catch (Exception e) {
acknowledgment.acknowledge(); // гарантирует коммит записей, которые были успешно обработаны до возникновения ошибки
throw new BatchListenerFailedException(e.getMessage(), record);
} }
} }
@ -170,17 +198,14 @@ public class FileUploadService {
} }
} }
private void sendMessage(@NonNull String topicName, Object object, Map<String, Object> headers, private void sendMessage(@NonNull String topicName, Object object, Headers headers,
KafkaTemplate<String, String> template) { KafkaTemplate<String, String> template) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
new GsonBuilder().setPrettyPrinting().create().toJson(object)); new GsonBuilder().setPrettyPrinting().create().toJson(object));
if (headers != null) { if (headers != null) {
headers.forEach((key, value) -> { headers.forEach(header -> {
if (value instanceof byte[]) { if (header.value() != null) {
record.headers().add(key, (byte[]) value); record.headers().add(header.key(), header.value());
}
else if (value != null) {
record.headers().add(key, value.toString().getBytes(StandardCharsets.UTF_8));
} }
}); });
} }
@ -200,13 +225,22 @@ public class FileUploadService {
} }
@KafkaListener(id = "${spring.kafka.s3.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}", @KafkaListener(id = "${spring.kafka.s3.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}",
containerFactory = "inputKafkaListenerContainerFactory") batch = "true", containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(String kafkaMessage, Acknowledgment acknowledgment) { public void listenKafkaInForDelete(ConsumerRecords<String, String> records) {
DownloadRequest downloadRequest = new Gson().fromJson(kafkaMessage, records.forEach(record -> {
DownloadRequest.class DownloadRequest downloadRequest = new Gson().fromJson(record.value(), DownloadRequest.class);
);
Arrays.stream(downloadRequest.filesInfo()) Arrays.stream(downloadRequest.filesInfo())
.forEach(fileInfo -> s3Service.deleteFileByUrl(fileInfo.getFileUrl())); .forEach(fileInfo -> {
acknowledgment.acknowledge(); try {
s3Service.deleteFileByUrl(fileInfo.getFileUrl());
}
catch (IllegalArgumentException e) {
LOGGER.error(e.getMessage());
}
catch (Exception e) {
throw new BatchListenerFailedException(e.getMessage(), record);
}
});
});
} }
} }

View file

@ -9,9 +9,14 @@ spring.kafka.consumer.security.protocol=${AV_KAFKA_SECURITY_PROTOCOL:SASL_PLAINT
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${AV_KAFKA_USERNAME}" password="${AV_KAFKA_PASSWORD}"; spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${AV_KAFKA_USERNAME}" password="${AV_KAFKA_PASSWORD}";
spring.kafka.consumer.properties.sasl.mechanism=${AV_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} spring.kafka.consumer.properties.sasl.mechanism=${AV_KAFKA_SASL_MECHANISM:SCRAM-SHA-256}
# #
spring.kafka.consumer.enable.auto.commit=false spring.kafka.consumer.enable.auto.commit=true
spring.kafka.consumer.fetch.min.bytes=${AV_KAFKA_CONSUMER_FETCH_MIN_BYTES:1}
spring.kafka.consumer.fetch.max.wait=${AV_KAFKA_CONSUMER_FETCH_MAX_WAIT:500}
spring.kafka.consumer.max.poll.records=${AV_KAFKA_CONSUMER_MAX_POLL_RECORDS:500}
spring.kafka.consumer.group.id=${AV_KAFKA_GROUP_ID:file-to-upload-consumers} spring.kafka.consumer.group.id=${AV_KAFKA_GROUP_ID:file-to-upload-consumers}
spring.kafka.consumer.concurrency=${AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY} spring.kafka.consumer.concurrency=${AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY:1}
spring.kafka.consumer.recovery.interval=${AV_KAFKA_CONSUMER_RECOVERY_INTERVAL:0}
spring.kafka.consumer.recovery.attempts=${AV_KAFKA_CONSUMER_RECOVERY_ATTEMPTS:0}
# kafka in listeners # kafka in listeners
spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE
# kafka in producer (with possibility for default bean) # kafka in producer (with possibility for default bean)
@ -36,6 +41,7 @@ kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME}
kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME} kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME}
# #
av.check.enabled=${AV_CHECK_ENABLED:true} av.check.enabled=${AV_CHECK_ENABLED:true}
av.thread.count=${AV_THREAD_COUNT:8}
av.rest.address=${AV_REST_ADDRESS} av.rest.address=${AV_REST_ADDRESS}
av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS:1000} av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS:1000}
av.retry.max.attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT:10} av.retry.max.attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT:10}