This commit is contained in:
Eduard Tihomirov 2025-04-22 12:48:14 +03:00
parent bafe64e6af
commit d1716cd9c8
2 changed files with 22 additions and 18 deletions

View file

@ -14,14 +14,12 @@ public class ErvuDirectoriesListener {
@Autowired
private ErvuDirectoriesService ervuDirectoriesService;
@KafkaListener(id = "${kafka.domain.group.id}", topics = "${kafka.domain.reconciliation}",
autoStartup = "false")
@KafkaListener(id = "${kafka.domain.group.id}", topics = "${kafka.domain.reconciliation}")
public void listenKafkaDomain(String kafkaMessage) {
ervuDirectoriesService.upsertKafkaDomainMessage(kafkaMessage);
}
@KafkaListener(id = "${kafka.role.group.id}", topics = "${kafka.role.reconciliation}",
autoStartup = "false")
@KafkaListener(id = "${kafka.role.group.id}", topics = "${kafka.role.reconciliation}")
public void listenKafkaRole(String kafkaMessage) {
ervuDirectoriesService.upsertKafkaRoleMessage(kafkaMessage);
}

View file

@ -1,9 +1,8 @@
package ru.micord.ervu.account_applications.kafka;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
import ru.micord.ervu.account_applications.service.ErvuDirectoriesService;
@ -13,22 +12,29 @@ import ru.micord.ervu.account_applications.service.ErvuDirectoriesService;
@Component
@DependsOn("ervuDirectoriesListener")
public class KafkaConsumerInitializer {
@Value("${kafka.domain.group.id}")
private String domainGroupId;
@Value("${kafka.role.group.id}")
private String roleGroupId;
@Value("${load.directories:true}")
private Boolean loadDirectories;
public KafkaConsumerInitializer(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry, ErvuDirectoriesService ervuDirectoriesService) {
MessageListenerContainer listenerContainerDomain = kafkaListenerEndpointRegistry.getListenerContainer(domainGroupId);
MessageListenerContainer listenerContainerRole = kafkaListenerEndpointRegistry.getListenerContainer(roleGroupId);
listenerContainerDomain.start();
listenerContainerRole.start();
private final ErvuDirectoriesService ervuDirectoriesService;
public KafkaConsumerInitializer(ErvuDirectoriesService ervuDirectoriesService) {
this.ervuDirectoriesService = ervuDirectoriesService;
}
@PostConstruct
public void init() {
if (loadDirectories) {
new Thread(ervuDirectoriesService::updateDirectories).start();
new Thread(this::runWithSleep).start();
}
}
private void runWithSleep() {
try {
Thread.sleep(10000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
ervuDirectoriesService.updateDirectories();
}
}