SUPPORT-9122:fix

This commit is contained in:
adel.ka 2025-04-23 20:13:28 +03:00
parent 57fe1628de
commit 9e2c1189bf

View file

@ -4,11 +4,8 @@ import javax.annotation.PostConstruct;
import ervu_business_metrics.config.IdmReconcileEnabledCondition;
import ervu_business_metrics.service.IdmDirectoriesService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
/**
@ -18,37 +15,24 @@ import org.springframework.stereotype.Component;
@DependsOn("idmDirectoriesListener")
@Conditional(IdmReconcileEnabledCondition.class)
public class KafkaConsumerInitializer {
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
private final IdmDirectoriesService idmDirectoriesService;
@Value("${kafka.domain.group.id}")
private String domainGroupId;
@Value("${kafka.role.group.id}")
private String roleGroupId;
@Value("${kafka.account.group.id}")
private String accountGroupId;
public KafkaConsumerInitializer(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry,
IdmDirectoriesService idmDirectoriesService) {
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
public KafkaConsumerInitializer(IdmDirectoriesService idmDirectoriesService) {
this.idmDirectoriesService = idmDirectoriesService;
}
@PostConstruct
public void initialize() {
startKafkaListener(domainGroupId);
startKafkaListener(roleGroupId);
startKafkaListener(accountGroupId);
new Thread(idmDirectoriesService::updateDirectories).start();
new Thread(this::runWithSleep).start();
}
private void startKafkaListener(String listenerId) {
MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);
if (container != null) {
container.start();
private void runWithSleep() {
try {
Thread.sleep(10000);
}
else {
throw new IllegalStateException("Kafka Listener not found: " + listenerId);
catch (InterruptedException e) {
throw new RuntimeException(e);
}
idmDirectoriesService.updateDirectories();
}
}