From d1716cd9c8aadcc1536a405db9b44f2c5c726325 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Tue, 22 Apr 2025 12:48:14 +0300 Subject: [PATCH] fix --- .../kafka/ErvuDirectoriesListener.java | 6 ++-- .../kafka/KafkaConsumerInitializer.java | 34 +++++++++++-------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ErvuDirectoriesListener.java b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ErvuDirectoriesListener.java index ab690889..b61f7aed 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ErvuDirectoriesListener.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ErvuDirectoriesListener.java @@ -14,14 +14,12 @@ public class ErvuDirectoriesListener { @Autowired private ErvuDirectoriesService ervuDirectoriesService; - @KafkaListener(id = "${kafka.domain.group.id}", topics = "${kafka.domain.reconciliation}", - autoStartup = "false") + @KafkaListener(id = "${kafka.domain.group.id}", topics = "${kafka.domain.reconciliation}") public void listenKafkaDomain(String kafkaMessage) { ervuDirectoriesService.upsertKafkaDomainMessage(kafkaMessage); } - @KafkaListener(id = "${kafka.role.group.id}", topics = "${kafka.role.reconciliation}", - autoStartup = "false") + @KafkaListener(id = "${kafka.role.group.id}", topics = "${kafka.role.reconciliation}") public void listenKafkaRole(String kafkaMessage) { ervuDirectoriesService.upsertKafkaRoleMessage(kafkaMessage); } diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConsumerInitializer.java b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConsumerInitializer.java index 1b443443..81e21891 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConsumerInitializer.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConsumerInitializer.java @@ -1,9 +1,8 @@ package ru.micord.ervu.account_applications.kafka; +import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.DependsOn; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component; import ru.micord.ervu.account_applications.service.ErvuDirectoriesService; @@ -13,22 +12,29 @@ import ru.micord.ervu.account_applications.service.ErvuDirectoriesService; @Component @DependsOn("ervuDirectoriesListener") public class KafkaConsumerInitializer { - @Value("${kafka.domain.group.id}") - private String domainGroupId; - - @Value("${kafka.role.group.id}") - private String roleGroupId; - @Value("${load.directories:true}") private Boolean loadDirectories; - public KafkaConsumerInitializer(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, ErvuDirectoriesService ervuDirectoriesService) { - MessageListenerContainer listenerContainerDomain = kafkaListenerEndpointRegistry.getListenerContainer(domainGroupId); - MessageListenerContainer listenerContainerRole = kafkaListenerEndpointRegistry.getListenerContainer(roleGroupId); - listenerContainerDomain.start(); - listenerContainerRole.start(); + private final ErvuDirectoriesService ervuDirectoriesService; + + public KafkaConsumerInitializer(ErvuDirectoriesService ervuDirectoriesService) { + this.ervuDirectoriesService = ervuDirectoriesService; + } + + @PostConstruct + public void init() { if (loadDirectories) { - new Thread(ervuDirectoriesService::updateDirectories).start(); + new Thread(this::runWithSleep).start(); } } + + private void runWithSleep() { + try { + Thread.sleep(10000); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + ervuDirectoriesService.updateDirectories(); + } }