diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index af748b5f..5cc07d4 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -13,9 +13,12 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -61,6 +64,7 @@ public class FileUploadService { private final FileManager fIleManager; private final ReceiveScanReportRetryable receiveScanReportRetryable; private final S3Service s3Service; + private ExecutorService executorService; @Autowired public FileUploadService( @@ -75,23 +79,21 @@ public class FileUploadService { this.fIleManager = fIleManager; } + @PostConstruct + public void init() { + executorService = Executors.newFixedThreadPool(threadCount); + } + @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", batch = "true", containerFactory = "inputKafkaListenerContainerFactory") public void listenKafkaIn(ConsumerRecords records) { - ExecutorService executorService = Executors.newFixedThreadPool(threadCount); - - try { - List> futures = new ArrayList<>(); - records.forEach(record -> { - CompletableFuture future = CompletableFuture.runAsync(() -> handleRecord(record), - executorService); - futures.add(future); - }); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - } - finally { - executorService.shutdown(); - } + List> futures = new ArrayList<>(); + records.forEach(record -> { + CompletableFuture future = CompletableFuture.runAsync(() -> handleRecord(record), + executorService); + futures.add(future); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } private void handleRecord(ConsumerRecord record) { @@ -243,4 +245,18 @@ public class FileUploadService { }); }); } + + @PreDestroy + public void destroy() { + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdown(); + + try { + executorService.awaitTermination(1, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } }