SUPPORT-9518: init executor service in postconstruct

This commit is contained in:
gulnaz 2025-11-01 12:51:23 +03:00
parent d94a164876
commit 83bd48db19

View file

@ -13,9 +13,12 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -61,6 +64,7 @@ public class FileUploadService {
private final FileManager fIleManager; private final FileManager fIleManager;
private final ReceiveScanReportRetryable receiveScanReportRetryable; private final ReceiveScanReportRetryable receiveScanReportRetryable;
private final S3Service s3Service; private final S3Service s3Service;
private ExecutorService executorService;
@Autowired @Autowired
public FileUploadService( public FileUploadService(
@ -75,23 +79,21 @@ public class FileUploadService {
this.fIleManager = fIleManager; this.fIleManager = fIleManager;
} }
@PostConstruct
public void init() {
executorService = Executors.newFixedThreadPool(threadCount);
}
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
batch = "true", containerFactory = "inputKafkaListenerContainerFactory") batch = "true", containerFactory = "inputKafkaListenerContainerFactory")
public void listenKafkaIn(ConsumerRecords<String, String> records) { public void listenKafkaIn(ConsumerRecords<String, String> records) {
ExecutorService executorService = Executors.newFixedThreadPool(threadCount); List<CompletableFuture<Void>> futures = new ArrayList<>();
records.forEach(record -> {
try { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> handleRecord(record),
List<CompletableFuture<Void>> futures = new ArrayList<>(); executorService);
records.forEach(record -> { futures.add(future);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> handleRecord(record), });
executorService); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
futures.add(future);
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
finally {
executorService.shutdown();
}
} }
private void handleRecord(ConsumerRecord<String, String> record) { private void handleRecord(ConsumerRecord<String, String> record) {
@ -243,4 +245,18 @@ public class FileUploadService {
}); });
}); });
} }
@PreDestroy
public void destroy() {
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
} }