From 3667597949b7c15f3b55762c0feefebbeca381dd Mon Sep 17 00:00:00 2001 From: gulnaz Date: Sat, 1 Nov 2025 11:15:27 +0300 Subject: [PATCH] SUPPORT-9518: configure batch listener; add parallel file handling; add retryable for s3 --- micord.env | 6 ++ .../input/InputKafkaConsumerConfig.java | 36 +++++++- .../java/ru/micord/ervu/av/s3/S3Service.java | 9 ++ .../ervu/av/service/FileUploadService.java | 82 +++++++++++++------ src/main/resources/application.properties | 10 ++- 5 files changed, 115 insertions(+), 28 deletions(-) diff --git a/micord.env b/micord.env index 37d7519..09c87be 100644 --- a/micord.env +++ b/micord.env @@ -7,6 +7,11 @@ AV_KAFKA_GROUP_ID=file-to-upload-consumers AV_KAFKA_TOPIC_NAME=file-to-upload AV_KAFKA_STATUS_TOPIC_NAME=ervu.lkrp.av-fileupload-status AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY=1 +AV_KAFKA_CONSUMER_FETCH_MIN_BYTES=1 +AV_KAFKA_CONSUMER_FETCH_MAX_WAIT=500 +AV_KAFKA_CONSUMER_MAX_POLL_RECORDS=500 +AV_KAFKA_CONSUMER_RECOVERY_INTERVAL=0 +AV_KAFKA_CONSUMER_RECOVERY_ATTEMPTS=0 ERVU_KAFKA_BOOTSTRAP_SERVERS=10.10.31.11:32609 ERVU_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT @@ -19,6 +24,7 @@ ERVU_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request ERVU_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response AV_CHECK_ENABLED=true +AV_THREAD_COUNT=8 AV_REST_ADDRESS=http://10.10.31.118:8085/scans AV_FIRST_TIMEOUT_MILLISECONDS=1000 AV_RETRY_MAX_ATTEMPTS_COUNT=10 diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java index 2233795..2900f58 100644 --- a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java +++ b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java @@ -8,14 +8,19 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; /** * @author r.latypov @@ -23,6 +28,8 @@ import org.springframework.kafka.listener.ContainerProperties; @Configuration @EnableKafka public class InputKafkaConsumerConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(InputKafkaConsumerConfig.class); + @Value("${spring.kafka.consumer.bootstrap.servers}") private List bootstrapAddress; @Value("${spring.kafka.consumer.security.protocol}") @@ -33,10 +40,20 @@ public class InputKafkaConsumerConfig { private String saslMechanism; @Value("${spring.kafka.consumer.enable.auto.commit}") private String enableAutoCommit; + @Value("${spring.kafka.consumer.fetch.min.bytes}") + private int fetchMinBytes; + @Value("${spring.kafka.consumer.fetch.max.wait}") + private int fetchMaxWait; + @Value("${spring.kafka.consumer.max.poll.records}") + private int maxPollRecords; @Value("${spring.kafka.listener.ack.mode}") private String ackMode; - @Value("${spring.kafka.consumer.concurrency:1}") + @Value("${spring.kafka.consumer.concurrency}") private int consumerConcurrency; + @Value("${spring.kafka.consumer.recovery.interval}") + private long recoveryInterval; + @Value("${spring.kafka.consumer.recovery.attempts}") + private long maxRecoveryAttempts; @Bean public ConsumerFactory inputConsumerFactory() { @@ -51,6 +68,9 @@ public class InputKafkaConsumerConfig { configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + configs.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, fetchMinBytes); + configs.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWait); + configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); return new DefaultKafkaConsumerFactory<>(configs); } @@ -60,8 +80,20 @@ public class InputKafkaConsumerConfig { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(inputConsumerFactory()); - factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); factory.setConcurrency(consumerConcurrency); + factory.setBatchListener(true); + factory.setCommonErrorHandler(defaultErrorHandler()); return factory; } + + @Bean + public DefaultErrorHandler defaultErrorHandler() { + DefaultErrorHandler handler = new DefaultErrorHandler( + (record, ex) -> LOGGER.error("recovery failed for message {}, {}", record.value(), ex.getMessage()), + new FixedBackOff(recoveryInterval, maxRecoveryAttempts) + ); + handler.setLogLevel(KafkaException.Level.TRACE); + return handler; + } } diff --git a/src/main/java/ru/micord/ervu/av/s3/S3Service.java b/src/main/java/ru/micord/ervu/av/s3/S3Service.java index 721b9dc..4357451 100644 --- a/src/main/java/ru/micord/ervu/av/s3/S3Service.java +++ b/src/main/java/ru/micord/ervu/av/s3/S3Service.java @@ -6,8 +6,11 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; import org.springframework.stereotype.Service; import ru.micord.ervu.av.exception.FileUploadException; +import ru.micord.ervu.av.exception.RetryableException; /** * @author r.latypov @@ -32,6 +35,9 @@ public class S3Service { } } + @Retryable(retryFor = {RetryableException.class}, + maxAttemptsExpression = "${av.retry.max.attempts.count}", + backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}")) public String putFile(Path filePath, String key) throws FileUploadException { try { outClient.putObject(outBucketName, key, filePath.toFile()); @@ -43,6 +49,9 @@ public class S3Service { } } + @Retryable(retryFor = {RetryableException.class}, + maxAttemptsExpression = "${av.retry.max.attempts.count}", + backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}")) public void deleteFileByUrl(String url) { if (!url.startsWith(s3Prefix)) { throw new IllegalArgumentException("Некорректный S3 URL: " + url); diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index 78eef23..af748b5f 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -5,15 +5,22 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -21,10 +28,10 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.BatchListenerFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.SendResult; import org.springframework.lang.NonNull; -import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Service; import ru.micord.ervu.av.exception.FileUploadException; import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; @@ -41,10 +48,14 @@ public class FileUploadService { private static final int INFECTED_CODE = 72; private static final int PASS_PROTECTED_CODE = 73; private static final Logger LOGGER = LoggerFactory.getLogger(FileUploadService.class); + @Value("${av.check.enabled}") private boolean avCheckEnabled; + @Value("${av.thread.count}") + private int threadCount; @Value("${file.saving.path}") private String fileSavingPath; + private final KafkaTemplate inKafkaTemplate; private final NewTopic inStatusTopic; private final FileManager fIleManager; @@ -65,11 +76,27 @@ public class FileUploadService { } @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", - containerFactory = "inputKafkaListenerContainerFactory") - public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment, - @Headers Map headers) { - DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); + batch = "true", containerFactory = "inputKafkaListenerContainerFactory") + public void listenKafkaIn(ConsumerRecords records) { + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + try { + List> futures = new ArrayList<>(); + records.forEach(record -> { + CompletableFuture future = CompletableFuture.runAsync(() -> handleRecord(record), + executorService); + futures.add(future); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + finally { + executorService.shutdown(); + } + } + + private void handleRecord(ConsumerRecord record) { + String kafkaInMessage = record.value(); + DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); Map tempFilesMap = new HashMap<>(); try { @@ -124,7 +151,7 @@ public class FileUploadService { fIleManager.deleteFile(fileUrl.fileUrl()); } } - sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); + sendMessage(inStatusTopic.name(), downloadRequest, record.headers(), inKafkaTemplate); for (Path filePath : tempFilesMap.keySet()) { try { @@ -143,15 +170,16 @@ public class FileUploadService { fileInfo.setFileUrl(null); fileInfo.setFileStatus(FileStatus.FILE_STATUS_11); } - sendMessage(inStatusTopic.name(), downloadRequest, headers, inKafkaTemplate); + sendMessage(inStatusTopic.name(), downloadRequest, record.headers(), inKafkaTemplate); } catch (FileUploadException | IOException e) { // считаем, что нужно повторное считывание сообщения // ошибку логируем, сообщение оставляем непрочитанным LOGGER.error(e.getMessage(), e); } - finally { - acknowledgment.acknowledge(); + catch (Exception e) { + // гарантирует коммит записей, которые были успешно обработаны до возникновения ошибки + throw new BatchListenerFailedException(e.getMessage(), record); } } @@ -170,17 +198,14 @@ public class FileUploadService { } } - private void sendMessage(@NonNull String topicName, Object object, Map headers, + private void sendMessage(@NonNull String topicName, Object object, Headers headers, KafkaTemplate template) { ProducerRecord record = new ProducerRecord<>(topicName, new GsonBuilder().setPrettyPrinting().create().toJson(object)); if (headers != null) { - headers.forEach((key, value) -> { - if (value instanceof byte[]) { - record.headers().add(key, (byte[]) value); - } - else if (value != null) { - record.headers().add(key, value.toString().getBytes(StandardCharsets.UTF_8)); + headers.forEach(header -> { + if (header.value() != null) { + record.headers().add(header.key(), header.value()); } }); } @@ -200,13 +225,22 @@ public class FileUploadService { } @KafkaListener(id = "${spring.kafka.s3.consumer.group.id}", topics = "${kafka.clear.s3.topic.name}", - containerFactory = "inputKafkaListenerContainerFactory") - public void listenKafkaIn(String kafkaMessage, Acknowledgment acknowledgment) { - DownloadRequest downloadRequest = new Gson().fromJson(kafkaMessage, - DownloadRequest.class - ); - Arrays.stream(downloadRequest.filesInfo()) - .forEach(fileInfo -> s3Service.deleteFileByUrl(fileInfo.getFileUrl())); - acknowledgment.acknowledge(); + batch = "true", containerFactory = "inputKafkaListenerContainerFactory") + public void listenKafkaInForDelete(ConsumerRecords records) { + records.forEach(record -> { + DownloadRequest downloadRequest = new Gson().fromJson(record.value(), DownloadRequest.class); + Arrays.stream(downloadRequest.filesInfo()) + .forEach(fileInfo -> { + try { + s3Service.deleteFileByUrl(fileInfo.getFileUrl()); + } + catch (IllegalArgumentException e) { + LOGGER.error(e.getMessage()); + } + catch (Exception e) { + throw new BatchListenerFailedException(e.getMessage(), record); + } + }); + }); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index fe3c07d..f71ee4f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -9,9 +9,14 @@ spring.kafka.consumer.security.protocol=${AV_KAFKA_SECURITY_PROTOCOL:SASL_PLAINT spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${AV_KAFKA_USERNAME}" password="${AV_KAFKA_PASSWORD}"; spring.kafka.consumer.properties.sasl.mechanism=${AV_KAFKA_SASL_MECHANISM:SCRAM-SHA-256} # -spring.kafka.consumer.enable.auto.commit=false +spring.kafka.consumer.enable.auto.commit=true +spring.kafka.consumer.fetch.min.bytes=${AV_KAFKA_CONSUMER_FETCH_MIN_BYTES:1} +spring.kafka.consumer.fetch.max.wait=${AV_KAFKA_CONSUMER_FETCH_MAX_WAIT:500} +spring.kafka.consumer.max.poll.records=${AV_KAFKA_CONSUMER_MAX_POLL_RECORDS:500} spring.kafka.consumer.group.id=${AV_KAFKA_GROUP_ID:file-to-upload-consumers} -spring.kafka.consumer.concurrency=${AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY} +spring.kafka.consumer.concurrency=${AV_KAFKA_TOPIC_CONSUMER_CONCURRENCY:1} +spring.kafka.consumer.recovery.interval=${AV_KAFKA_CONSUMER_RECOVERY_INTERVAL:0} +spring.kafka.consumer.recovery.attempts=${AV_KAFKA_CONSUMER_RECOVERY_ATTEMPTS:0} # kafka in listeners spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE # kafka in producer (with possibility for default bean) @@ -36,6 +41,7 @@ kafka.in.status.topic.name=${AV_KAFKA_STATUS_TOPIC_NAME} kafka.clear.s3.topic.name=${AV_KAFKA_CLEAR_S3_TOPIC_NAME} # av.check.enabled=${AV_CHECK_ENABLED:true} +av.thread.count=${AV_THREAD_COUNT:8} av.rest.address=${AV_REST_ADDRESS} av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS:1000} av.retry.max.attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT:10}