SUPPORT-8696: Fix

This commit is contained in:
Eduard Tihomirov 2025-03-27 15:01:51 +03:00
parent c8aa2b8b08
commit 9ea36b152d
7 changed files with 209 additions and 209 deletions

View file

@ -228,6 +228,14 @@
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.parent.artifactId}</finalName>

View file

@ -0,0 +1,54 @@
package ru.micord.ervu.account_applications.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
/**
* @author Eduard Tihomirov
*/
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.hosts}")
private String bootstrapServers;
@Value("${kafka.auth_sec_proto}")
private String securityProtocol;
@Value("${kafka.auth_sasl_module}")
private String loginModule;
@Value("${kafka.user}")
private String username;
@Value("${kafka.pass}")
private String password;
@Value("${kafka.auth_sasl_mech}")
private String saslMechanism;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
+ username + "\" password=\"" + password + "\";");
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
return props;
}
}

View file

@ -7,43 +7,16 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
* @author Eduard Tihomirov
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class RecruitmentApiResponse {
private int totalRows;
private int page;
private int perPage;
private List<Record> records;
public class RecruitmentResponse {
private List<Record> data;
public int getTotalRows() {
return totalRows;
public List<Record> getData() {
return data;
}
public void setTotalRows(int totalRows) {
this.totalRows = totalRows;
}
public int getPage() {
return page;
}
public void setPage(int page) {
this.page = page;
}
public int getPerPage() {
return perPage;
}
public void setPerPage(int perPage) {
this.perPage = perPage;
}
public List<Record> getRecords() {
return records;
}
public void setRecords(
List<Record> records) {
this.records = records;
public void setData(
List<Record> data) {
this.data = data;
}
@JsonIgnoreProperties(ignoreUnknown = true)
@ -78,8 +51,9 @@ public class RecruitmentApiResponse {
private String militaryCode;
private Long createDate;
private Long modified;
private ParentRecord parent;
private RegionInfo regionInfo;
private String parent;
private String regionId;
private String regionCode;
public String getId() {
return id;
@ -273,21 +247,21 @@ public class RecruitmentApiResponse {
this.reportsEnabled = reportsEnabled;
}
public ParentRecord getParent() {
public String getParent() {
return parent;
}
public void setParent(ParentRecord parent) {
public void setParent(String parent) {
this.parent = parent;
}
public RegionInfo getRegionInfo() {
return regionInfo;
public String getRegionId() {
return regionId;
}
public void setRegionInfo(
RegionInfo regionInfo) {
this.regionInfo = regionInfo;
public void setRegionId(
String regionId) {
this.regionId = regionId;
}
public String getSubpoenaSeriesCode() {
@ -338,39 +312,12 @@ public class RecruitmentApiResponse {
this.modified = modified;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class ParentRecord {
private String id;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getRegionCode() {
return regionCode;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class RegionInfo {
private String regionId;
private String regionCode;
public String getRegionId() {
return regionId;
}
public void setRegionId(String regionId) {
this.regionId = regionId;
}
public String getRegionCode() {
return regionCode;
}
public void setRegionCode(String regionCode) {
this.regionCode = regionCode;
}
public void setRegionCode(String regionCode) {
this.regionCode = regionCode;
}
}
}

View file

@ -8,43 +8,16 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
* @author Eduard Tihomirov
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class RoleApiResponse {
private int totalRows;
private int page;
private int perPage;
private List<RoleApiResponse.Record> records;
public class RoleResponse {
private List<RoleResponse.Record> data;
public int getTotalRows() {
return totalRows;
public List<RoleResponse.Record> getData() {
return data;
}
public void setTotalRows(int totalRows) {
this.totalRows = totalRows;
}
public int getPage() {
return page;
}
public void setPage(int page) {
this.page = page;
}
public int getPerPage() {
return perPage;
}
public void setPerPage(int perPage) {
this.perPage = perPage;
}
public List<RoleApiResponse.Record> getRecords() {
return records;
}
public void setRecords(
List<RoleApiResponse.Record> records) {
this.records = records;
public void setData(
List<RoleResponse.Record> data) {
this.data = data;
}
@JsonIgnoreProperties(ignoreUnknown = true)
public static class Record {

View file

@ -1,8 +1,6 @@
package ru.micord.ervu.account_applications.service;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.lang.invoke.MethodHandles;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
@ -10,19 +8,27 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jooq.DSLContext;
import org.jooq.Record2;
import org.jooq.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import ru.micord.ervu.account_applications.db_beans.public_.tables.records.RecruitmentRecord;
import ru.micord.ervu.account_applications.db_beans.public_.tables.records.UserApplicationRoleRecord;
import ru.micord.ervu.account_applications.model.RecruitmentApiResponse;
import ru.micord.ervu.account_applications.model.RoleApiResponse;
import ru.micord.ervu.account_applications.model.RecruitmentResponse;
import ru.micord.ervu.account_applications.model.RoleResponse;
import static ru.micord.ervu.account_applications.db_beans.public_.tables.Recruitment.RECRUITMENT;
import static ru.micord.ervu.account_applications.db_beans.public_.tables.UserApplicationRole.USER_APPLICATION_ROLE;
@ -32,69 +38,83 @@ import static ru.micord.ervu.account_applications.db_beans.public_.tables.UserAp
*/
@Service
public class ErvuDirectoriesService {
private static final Logger LOGGER = LoggerFactory.getLogger(
MethodHandles.lookup().lookupClass());
@Value("${ervu.url}")
private String ervuUrl;
@Value("${ervu.page.size:50}")
private Integer perPage;
@Value("${ervu.recruitment.schemas:Organization, Department, Region, Ministry}")
private String schemas;
@Value("${ervu.collection:domain, role}")
private String ervuCollection;
@Autowired
private DSLContext dsl;
@Autowired
private RestTemplate restTemplate;
@Autowired
private ObjectMapper objectMapper;
Result<Record2<UUID, String>> domainIds = null;
List<String> roleIds = null;
@Transactional
public void updateDirectories() {
fetchAndUpsetRecruitmentData();
fetchAndUpsetRoleData();
}
public void fetchAndUpsetRecruitmentData() {
String[] schemaArray = schemas.split(",");
Arrays.stream(schemaArray).forEach(schema -> {
schema = schema.trim();
int page = 1;
boolean hasMorePages;
Result<Record2<UUID, String>> ids = dsl.select(RECRUITMENT.ID, RECRUITMENT.IDM_ID)
.from(RECRUITMENT)
.fetch();
try {
do {
String url = ervuUrl + "/service/idm/domains";
String encodedSchemaValue = URLEncoder.encode("schema=in=(" + schema + ")",
StandardCharsets.UTF_8
);
URI uri = UriComponentsBuilder.fromHttpUrl(url)
.queryParam("query", encodedSchemaValue)
.queryParam("expand", "parent")
.queryParam("page", page)
.queryParam("perPage", perPage)
.build(true)
.toUri();
RecruitmentApiResponse recruitmentApiResponse = restTemplate.getForObject(uri,
RecruitmentApiResponse.class
);
if (recruitmentApiResponse != null && recruitmentApiResponse.getRecords() != null) {
upsertRecruitmentData(recruitmentApiResponse.getRecords(), ids);
hasMorePages = recruitmentApiResponse.getRecords().size() == perPage;
}
else {
hasMorePages = false;
}
page++;
try {
initIds();
String[] ervuCollectionArray = ervuCollection.split(",");
Arrays.stream(ervuCollectionArray).forEach(ervuCollection -> {
String targetUrl = ervuUrl + "/service/idm/reconcile/"+ ervuCollection + "/to/kafka/v1";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
String emptyJson = "{}";
HttpEntity<String> requestEntity = new HttpEntity<>(emptyJson, headers);
ResponseEntity<String> response = restTemplate.postForEntity(targetUrl, requestEntity, String.class);
if (!response.getStatusCode().is2xxSuccessful()) {
LOGGER.error(
"Error in " + ervuCollection + " request. Status code: " + response.getStatusCode()
+ "; Body: " + response.getBody());
}
while (hasMorePages);
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
});
}
catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
private void upsertRecruitmentData(List<RecruitmentApiResponse.Record> recordList,
@KafkaListener(id = "${kafka.consumer_group_id}", topics = "${kafka.domain.reconciliation}")
@Transactional
public void listenKafkaDomain(String kafkaMessage) {
RecruitmentResponse response = null;
try {
response = objectMapper.readValue(kafkaMessage, RecruitmentResponse.class);
}
catch (JsonProcessingException e) {
throw new RuntimeException("Error with parsing domain kafka message", e);
}
if (response.getData() != null && !response.getData().isEmpty()) {
if (domainIds == null) {
initIds();
}
upsertRecruitmentData(response.getData(), domainIds);
}
}
@KafkaListener(id = "${kafka.consumer_group_id}", topics = "${kafka.role.reconciliation}")
@Transactional
public void listenKafkaRole(String kafkaMessage) {
RoleResponse response = null;
try {
response = objectMapper.readValue(kafkaMessage, RoleResponse.class);
}
catch (JsonProcessingException e) {
throw new RuntimeException("Error with parsing role kafka message", e);
}
if (response.getData() != null && !response.getData().isEmpty()) {
if (roleIds == null) {
initIds();
}
upsertRoleData(response.getData(), roleIds);
}
}
private void upsertRecruitmentData(List<RecruitmentResponse.Record> recordList,
Result<Record2<UUID, String>> ids) {
List<RecruitmentRecord> newRecruitmentRecords = new ArrayList<>();
List<RecruitmentRecord> recruitmentRecords = new ArrayList<>();
@ -126,14 +146,12 @@ public class ErvuDirectoriesService {
recruitmentRecord.setEnabled(record.getEnabled() != null ? record.getEnabled() : true);
recruitmentRecord.setTimezone(record.getTimezone());
recruitmentRecord.setReportsEnabled(record.getReportsEnabled());
recruitmentRecord.setParentId(record.getParent() != null ? record.getParent().getId() : null);
recruitmentRecord.setParentId(record.getParent());
recruitmentRecord.setSubpoenaSeriesCode(record.getSubpoenaSeriesCode());
recruitmentRecord.setAddressId(record.getAddressId());
recruitmentRecord.setPostalAddressId(record.getPostalAddressId());
recruitmentRecord.setRegionId(
record.getRegionInfo() != null ? record.getRegionInfo().getRegionId() : null);
recruitmentRecord.setRegionCode(
record.getRegionInfo() != null ? record.getRegionInfo().getRegionCode() : null);
recruitmentRecord.setRegionId(record.getRegionId());
recruitmentRecord.setRegionCode(record.getRegionCode());
recruitmentRecord.setMilitaryCode(record.getMilitaryCode());
recruitmentRecord.setCreatedAt(createdAt);
recruitmentRecord.setUpdatedAt(updatedAt);
@ -154,42 +172,7 @@ public class ErvuDirectoriesService {
dsl.batchUpdate(recruitmentRecords).execute();
}
public void fetchAndUpsetRoleData() {
int page = 1;
boolean hasMorePages;
List<String> ids = dsl.select(USER_APPLICATION_ROLE.USER_ROLE_ID)
.from(USER_APPLICATION_ROLE)
.fetch(USER_APPLICATION_ROLE.USER_ROLE_ID);
try {
do {
String url = ervuUrl + "/service/idm/roles";
String encodedErvuRole = URLEncoder.encode("ervuRole=like=true", StandardCharsets.UTF_8);
URI uri = UriComponentsBuilder.fromHttpUrl(url)
.queryParam("query", encodedErvuRole)
.queryParam("page", page)
.queryParam("perPage", perPage)
.build(true)
.toUri();
RoleApiResponse roleApiResponse = restTemplate.getForObject(uri, RoleApiResponse.class);
if (roleApiResponse != null && roleApiResponse.getRecords() != null) {
upsertRoleData(roleApiResponse.getRecords(), ids);
hasMorePages = roleApiResponse.getRecords().size() == perPage;
}
else {
hasMorePages = false;
}
page++;
}
while (hasMorePages);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private void upsertRoleData(List<RoleApiResponse.Record> recordList, List<String> ids) {
private void upsertRoleData(List<RoleResponse.Record> recordList, List<String> ids) {
List<UserApplicationRoleRecord> newRoleRecords = new ArrayList<>();
List<UserApplicationRoleRecord> roleRecords = new ArrayList<>();
recordList.forEach(record -> {
@ -216,4 +199,13 @@ public class ErvuDirectoriesService {
dsl.batchUpdate(roleRecords).execute();
}
private void initIds() {
domainIds = dsl.select(RECRUITMENT.ID, RECRUITMENT.IDM_ID)
.from(RECRUITMENT)
.fetch();
roleIds = dsl.select(USER_APPLICATION_ROLE.USER_ROLE_ID)
.from(USER_APPLICATION_ROLE)
.fetch(USER_APPLICATION_ROLE.USER_ROLE_ID);
}
}

View file

@ -1,5 +1,7 @@
package ru.micord.ervu.account_applications.service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import net.javacrumbs.shedlock.core.SchedulerLock;
@ -8,7 +10,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.DependsOn;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import static org.springframework.scheduling.config.ScheduledTaskRegistrar.CRON_DISABLED;
@ -28,7 +29,9 @@ public class ErvuDirectoriesUpdateShedulerService {
@PostConstruct
public void init() {
if (!cronLoad.equals(CRON_DISABLED)) {
run();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(this::run);
executor.shutdown();
}
}

23
pom.xml
View file

@ -21,6 +21,7 @@
<webbpm-platform.version>3.192.4</webbpm-platform.version>
<wbp.overall-timeout>72000</wbp.overall-timeout>
<jasperreports.version>6.15.0</jasperreports.version>
<spring-kafka.version>2.9.13</spring-kafka.version>
</properties>
<dependencyManagement>
<dependencies>
@ -312,6 +313,28 @@
<artifactId>spring-retry</artifactId>
<version>2.0.11</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.0</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>