From 9ea36b152d3dbd0a9507a61b548c80a2123aeaf9 Mon Sep 17 00:00:00 2001 From: Eduard Tihomirov Date: Thu, 27 Mar 2025 15:01:51 +0300 Subject: [PATCH] SUPPORT-8696: Fix --- backend/pom.xml | 8 + .../kafka/KafkaConfig.java | 54 +++++ ...Response.java => RecruitmentResponse.java} | 95 ++------- ...RoleApiResponse.java => RoleResponse.java} | 41 +--- .../service/ErvuDirectoriesService.java | 190 +++++++++--------- .../ErvuDirectoriesUpdateShedulerService.java | 7 +- pom.xml | 23 +++ 7 files changed, 209 insertions(+), 209 deletions(-) create mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConfig.java rename backend/src/main/java/ru/micord/ervu/account_applications/model/{RecruitmentApiResponse.java => RecruitmentResponse.java} (77%) rename backend/src/main/java/ru/micord/ervu/account_applications/model/{RoleApiResponse.java => RoleResponse.java} (62%) diff --git a/backend/pom.xml b/backend/pom.xml index 329f3d4a..54be276c 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -228,6 +228,14 @@ org.springframework.retry spring-retry + + org.springframework.kafka + spring-kafka + + + org.apache.kafka + kafka-clients + ${project.parent.artifactId} diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConfig.java b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConfig.java new file mode 100644 index 00000000..3ec477f1 --- /dev/null +++ b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/KafkaConfig.java @@ -0,0 +1,54 @@ +package ru.micord.ervu.account_applications.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +/** + * @author Eduard Tihomirov + */ +@Configuration +@EnableKafka +public class KafkaConfig { + @Value("${kafka.hosts}") + private String bootstrapServers; + @Value("${kafka.auth_sec_proto}") + private String securityProtocol; + @Value("${kafka.auth_sasl_module}") + private String loginModule; + @Value("${kafka.user}") + private String username; + @Value("${kafka.pass}") + private String password; + @Value("${kafka.auth_sasl_mech}") + private String saslMechanism; + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerConfigs()); + } + + @Bean + public Map consumerConfigs() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\"" + + username + "\" password=\"" + password + "\";"); + props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + return props; + } +} diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/model/RecruitmentApiResponse.java b/backend/src/main/java/ru/micord/ervu/account_applications/model/RecruitmentResponse.java similarity index 77% rename from backend/src/main/java/ru/micord/ervu/account_applications/model/RecruitmentApiResponse.java rename to backend/src/main/java/ru/micord/ervu/account_applications/model/RecruitmentResponse.java index 90986eb2..c0a126a5 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/model/RecruitmentApiResponse.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/model/RecruitmentResponse.java @@ -7,43 +7,16 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; * @author Eduard Tihomirov */ @JsonIgnoreProperties(ignoreUnknown = true) -public class RecruitmentApiResponse { - private int totalRows; - private int page; - private int perPage; - private List records; +public class RecruitmentResponse { + private List data; - public int getTotalRows() { - return totalRows; + public List getData() { + return data; } - public void setTotalRows(int totalRows) { - this.totalRows = totalRows; - } - - public int getPage() { - return page; - } - - public void setPage(int page) { - this.page = page; - } - - public int getPerPage() { - return perPage; - } - - public void setPerPage(int perPage) { - this.perPage = perPage; - } - - public List getRecords() { - return records; - } - - public void setRecords( - List records) { - this.records = records; + public void setData( + List data) { + this.data = data; } @JsonIgnoreProperties(ignoreUnknown = true) @@ -78,8 +51,9 @@ public class RecruitmentApiResponse { private String militaryCode; private Long createDate; private Long modified; - private ParentRecord parent; - private RegionInfo regionInfo; + private String parent; + private String regionId; + private String regionCode; public String getId() { return id; @@ -273,21 +247,21 @@ public class RecruitmentApiResponse { this.reportsEnabled = reportsEnabled; } - public ParentRecord getParent() { + public String getParent() { return parent; } - public void setParent(ParentRecord parent) { + public void setParent(String parent) { this.parent = parent; } - public RegionInfo getRegionInfo() { - return regionInfo; + public String getRegionId() { + return regionId; } - public void setRegionInfo( - RegionInfo regionInfo) { - this.regionInfo = regionInfo; + public void setRegionId( + String regionId) { + this.regionId = regionId; } public String getSubpoenaSeriesCode() { @@ -338,39 +312,12 @@ public class RecruitmentApiResponse { this.modified = modified; } - @JsonIgnoreProperties(ignoreUnknown = true) - public static class ParentRecord { - private String id; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } + public String getRegionCode() { + return regionCode; } - @JsonIgnoreProperties(ignoreUnknown = true) - public static class RegionInfo { - private String regionId; - private String regionCode; - - public String getRegionId() { - return regionId; - } - - public void setRegionId(String regionId) { - this.regionId = regionId; - } - - public String getRegionCode() { - return regionCode; - } - - public void setRegionCode(String regionCode) { - this.regionCode = regionCode; - } + public void setRegionCode(String regionCode) { + this.regionCode = regionCode; } } } diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/model/RoleApiResponse.java b/backend/src/main/java/ru/micord/ervu/account_applications/model/RoleResponse.java similarity index 62% rename from backend/src/main/java/ru/micord/ervu/account_applications/model/RoleApiResponse.java rename to backend/src/main/java/ru/micord/ervu/account_applications/model/RoleResponse.java index 571705f5..44917e1a 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/model/RoleApiResponse.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/model/RoleResponse.java @@ -8,43 +8,16 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; * @author Eduard Tihomirov */ @JsonIgnoreProperties(ignoreUnknown = true) -public class RoleApiResponse { - private int totalRows; - private int page; - private int perPage; - private List records; +public class RoleResponse { + private List data; - public int getTotalRows() { - return totalRows; + public List getData() { + return data; } - public void setTotalRows(int totalRows) { - this.totalRows = totalRows; - } - - public int getPage() { - return page; - } - - public void setPage(int page) { - this.page = page; - } - - public int getPerPage() { - return perPage; - } - - public void setPerPage(int perPage) { - this.perPage = perPage; - } - - public List getRecords() { - return records; - } - - public void setRecords( - List records) { - this.records = records; + public void setData( + List data) { + this.data = data; } @JsonIgnoreProperties(ignoreUnknown = true) public static class Record { diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesService.java b/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesService.java index 791ab8ce..2e8aefcd 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesService.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesService.java @@ -1,8 +1,6 @@ package ru.micord.ervu.account_applications.service; -import java.net.URI; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; +import java.lang.invoke.MethodHandles; import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; @@ -10,19 +8,27 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.jooq.DSLContext; import org.jooq.Record2; import org.jooq.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.client.RestTemplate; -import org.springframework.web.util.UriComponentsBuilder; import ru.micord.ervu.account_applications.db_beans.public_.tables.records.RecruitmentRecord; import ru.micord.ervu.account_applications.db_beans.public_.tables.records.UserApplicationRoleRecord; -import ru.micord.ervu.account_applications.model.RecruitmentApiResponse; -import ru.micord.ervu.account_applications.model.RoleApiResponse; +import ru.micord.ervu.account_applications.model.RecruitmentResponse; +import ru.micord.ervu.account_applications.model.RoleResponse; import static ru.micord.ervu.account_applications.db_beans.public_.tables.Recruitment.RECRUITMENT; import static ru.micord.ervu.account_applications.db_beans.public_.tables.UserApplicationRole.USER_APPLICATION_ROLE; @@ -32,69 +38,83 @@ import static ru.micord.ervu.account_applications.db_beans.public_.tables.UserAp */ @Service public class ErvuDirectoriesService { + private static final Logger LOGGER = LoggerFactory.getLogger( + MethodHandles.lookup().lookupClass()); @Value("${ervu.url}") private String ervuUrl; - @Value("${ervu.page.size:50}") - private Integer perPage; - @Value("${ervu.recruitment.schemas:Organization, Department, Region, Ministry}") - private String schemas; + @Value("${ervu.collection:domain, role}") + private String ervuCollection; @Autowired private DSLContext dsl; - @Autowired private RestTemplate restTemplate; + @Autowired + private ObjectMapper objectMapper; + Result> domainIds = null; + List roleIds = null; + - @Transactional public void updateDirectories() { - fetchAndUpsetRecruitmentData(); - fetchAndUpsetRoleData(); - } - - public void fetchAndUpsetRecruitmentData() { - String[] schemaArray = schemas.split(","); - Arrays.stream(schemaArray).forEach(schema -> { - schema = schema.trim(); - int page = 1; - boolean hasMorePages; - Result> ids = dsl.select(RECRUITMENT.ID, RECRUITMENT.IDM_ID) - .from(RECRUITMENT) - .fetch(); - try { - do { - String url = ervuUrl + "/service/idm/domains"; - String encodedSchemaValue = URLEncoder.encode("schema=in=(" + schema + ")", - StandardCharsets.UTF_8 - ); - URI uri = UriComponentsBuilder.fromHttpUrl(url) - .queryParam("query", encodedSchemaValue) - .queryParam("expand", "parent") - .queryParam("page", page) - .queryParam("perPage", perPage) - .build(true) - .toUri(); - - RecruitmentApiResponse recruitmentApiResponse = restTemplate.getForObject(uri, - RecruitmentApiResponse.class - ); - - if (recruitmentApiResponse != null && recruitmentApiResponse.getRecords() != null) { - upsertRecruitmentData(recruitmentApiResponse.getRecords(), ids); - hasMorePages = recruitmentApiResponse.getRecords().size() == perPage; - } - else { - hasMorePages = false; - } - page++; + try { + initIds(); + String[] ervuCollectionArray = ervuCollection.split(","); + Arrays.stream(ervuCollectionArray).forEach(ervuCollection -> { + String targetUrl = ervuUrl + "/service/idm/reconcile/"+ ervuCollection + "/to/kafka/v1"; + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + String emptyJson = "{}"; + HttpEntity requestEntity = new HttpEntity<>(emptyJson, headers); + ResponseEntity response = restTemplate.postForEntity(targetUrl, requestEntity, String.class); + if (!response.getStatusCode().is2xxSuccessful()) { + LOGGER.error( + "Error in " + ervuCollection + " request. Status code: " + response.getStatusCode() + + "; Body: " + response.getBody()); } - while (hasMorePages); - } - catch (Exception e) { - throw new RuntimeException(e); - } - }); + }); + } + catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } - private void upsertRecruitmentData(List recordList, + @KafkaListener(id = "${kafka.consumer_group_id}", topics = "${kafka.domain.reconciliation}") + @Transactional + public void listenKafkaDomain(String kafkaMessage) { + RecruitmentResponse response = null; + try { + response = objectMapper.readValue(kafkaMessage, RecruitmentResponse.class); + } + catch (JsonProcessingException e) { + throw new RuntimeException("Error with parsing domain kafka message", e); + } + if (response.getData() != null && !response.getData().isEmpty()) { + if (domainIds == null) { + initIds(); + } + upsertRecruitmentData(response.getData(), domainIds); + } + } + + @KafkaListener(id = "${kafka.consumer_group_id}", topics = "${kafka.role.reconciliation}") + @Transactional + public void listenKafkaRole(String kafkaMessage) { + RoleResponse response = null; + try { + response = objectMapper.readValue(kafkaMessage, RoleResponse.class); + } + catch (JsonProcessingException e) { + throw new RuntimeException("Error with parsing role kafka message", e); + } + if (response.getData() != null && !response.getData().isEmpty()) { + if (roleIds == null) { + initIds(); + } + upsertRoleData(response.getData(), roleIds); + } + } + + private void upsertRecruitmentData(List recordList, Result> ids) { List newRecruitmentRecords = new ArrayList<>(); List recruitmentRecords = new ArrayList<>(); @@ -126,14 +146,12 @@ public class ErvuDirectoriesService { recruitmentRecord.setEnabled(record.getEnabled() != null ? record.getEnabled() : true); recruitmentRecord.setTimezone(record.getTimezone()); recruitmentRecord.setReportsEnabled(record.getReportsEnabled()); - recruitmentRecord.setParentId(record.getParent() != null ? record.getParent().getId() : null); + recruitmentRecord.setParentId(record.getParent()); recruitmentRecord.setSubpoenaSeriesCode(record.getSubpoenaSeriesCode()); recruitmentRecord.setAddressId(record.getAddressId()); recruitmentRecord.setPostalAddressId(record.getPostalAddressId()); - recruitmentRecord.setRegionId( - record.getRegionInfo() != null ? record.getRegionInfo().getRegionId() : null); - recruitmentRecord.setRegionCode( - record.getRegionInfo() != null ? record.getRegionInfo().getRegionCode() : null); + recruitmentRecord.setRegionId(record.getRegionId()); + recruitmentRecord.setRegionCode(record.getRegionCode()); recruitmentRecord.setMilitaryCode(record.getMilitaryCode()); recruitmentRecord.setCreatedAt(createdAt); recruitmentRecord.setUpdatedAt(updatedAt); @@ -154,42 +172,7 @@ public class ErvuDirectoriesService { dsl.batchUpdate(recruitmentRecords).execute(); } - public void fetchAndUpsetRoleData() { - int page = 1; - boolean hasMorePages; - List ids = dsl.select(USER_APPLICATION_ROLE.USER_ROLE_ID) - .from(USER_APPLICATION_ROLE) - .fetch(USER_APPLICATION_ROLE.USER_ROLE_ID); - try { - do { - String url = ervuUrl + "/service/idm/roles"; - String encodedErvuRole = URLEncoder.encode("ervuRole=like=true", StandardCharsets.UTF_8); - URI uri = UriComponentsBuilder.fromHttpUrl(url) - .queryParam("query", encodedErvuRole) - .queryParam("page", page) - .queryParam("perPage", perPage) - .build(true) - .toUri(); - - RoleApiResponse roleApiResponse = restTemplate.getForObject(uri, RoleApiResponse.class); - - if (roleApiResponse != null && roleApiResponse.getRecords() != null) { - upsertRoleData(roleApiResponse.getRecords(), ids); - hasMorePages = roleApiResponse.getRecords().size() == perPage; - } - else { - hasMorePages = false; - } - page++; - } - while (hasMorePages); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - private void upsertRoleData(List recordList, List ids) { + private void upsertRoleData(List recordList, List ids) { List newRoleRecords = new ArrayList<>(); List roleRecords = new ArrayList<>(); recordList.forEach(record -> { @@ -216,4 +199,13 @@ public class ErvuDirectoriesService { dsl.batchUpdate(roleRecords).execute(); } + private void initIds() { + domainIds = dsl.select(RECRUITMENT.ID, RECRUITMENT.IDM_ID) + .from(RECRUITMENT) + .fetch(); + roleIds = dsl.select(USER_APPLICATION_ROLE.USER_ROLE_ID) + .from(USER_APPLICATION_ROLE) + .fetch(USER_APPLICATION_ROLE.USER_ROLE_ID); + } + } diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesUpdateShedulerService.java b/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesUpdateShedulerService.java index 3685d358..aee62c77 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesUpdateShedulerService.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/service/ErvuDirectoriesUpdateShedulerService.java @@ -1,5 +1,7 @@ package ru.micord.ervu.account_applications.service; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.annotation.PostConstruct; import net.javacrumbs.shedlock.core.SchedulerLock; @@ -8,7 +10,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.DependsOn; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import static org.springframework.scheduling.config.ScheduledTaskRegistrar.CRON_DISABLED; @@ -28,7 +29,9 @@ public class ErvuDirectoriesUpdateShedulerService { @PostConstruct public void init() { if (!cronLoad.equals(CRON_DISABLED)) { - run(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(this::run); + executor.shutdown(); } } diff --git a/pom.xml b/pom.xml index 32463457..18448dcb 100644 --- a/pom.xml +++ b/pom.xml @@ -21,6 +21,7 @@ 3.192.4 72000 6.15.0 + 2.9.13 @@ -312,6 +313,28 @@ spring-retry 2.0.11 + + org.springframework.kafka + spring-kafka + ${spring-kafka.version} + + + org.apache.kafka + kafka-clients + + + + + org.apache.kafka + kafka-clients + 3.9.0 + + + org.xerial.snappy + snappy-java + + +