diff --git a/backend/src/main/java/ervu/client/okopf/EsnsiOkopfClient.java b/backend/src/main/java/ervu/client/okopf/EsnsiOkopfClient.java index 6e351269..27e88617 100644 --- a/backend/src/main/java/ervu/client/okopf/EsnsiOkopfClient.java +++ b/backend/src/main/java/ervu/client/okopf/EsnsiOkopfClient.java @@ -1,14 +1,17 @@ package ervu.client.okopf; -import java.io.*; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.util.concurrent.TimeoutException; +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Objects; import java.util.stream.Collectors; import java.util.zip.ZipInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; @@ -19,37 +22,27 @@ import org.springframework.stereotype.Component; */ @Component public class EsnsiOkopfClient { + private static final Logger logger = LoggerFactory.getLogger(EsnsiOkopfClient.class); @Value("${esnsi.okopf.url}") - private String uri; + private String url; - @Retryable(value = {TimeoutException.class}, backoff = - @Backoff(delay = 2000)) + @Retryable(value = IOException.class, maxAttemptsExpression = "${esnsi.okopf.retry.max.attempts.load:3}", backoff = + @Backoff(delayExpression = "${esnsi.okop.retry.delay.load:30000}")) public String getJsonOkopFormData() { - HttpClient client = HttpClient.newHttpClient(); - HttpRequest request = HttpRequest.newBuilder() - .uri(URI.create(uri)) - .GET() - .build(); - try { - HttpResponse response = client.send(request, - HttpResponse.BodyHandlers.ofInputStream() - ); - if (response.statusCode() >= 200 && response.statusCode() <= 202) { - return unzipFile(new ZipInputStream(response.body())); + try (BufferedInputStream in = new BufferedInputStream(new URL(url).openStream()); + ZipInputStream archiveStream = new ZipInputStream(in); + BufferedReader br = new BufferedReader( + new InputStreamReader(archiveStream, StandardCharsets.UTF_8))) { + if (Objects.nonNull(archiveStream.getNextEntry())) { + logger.info("Received an non-empty archive in response."); + return br.lines().collect(Collectors.joining(System.lineSeparator())); } - throw new RuntimeException("The returned status " + response.statusCode() + " is incorrect. Json file has not be unzip"); + logger.info("Received an empty archive in response. Skipping load okpof file process"); } - catch (IOException | InterruptedException e) { - throw new RuntimeException(e); + catch (IOException e) { + logger.error("Failed to send HTTP request {} or process the response for okopf file.", url, e); } - } - - private String unzipFile(ZipInputStream zis) throws IOException { - if (zis.getNextEntry() != null) { - BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(zis.readAllBytes()))); - return br.lines().collect(Collectors.joining(System.lineSeparator())); - } - throw new RuntimeException("ZipInputStream is empty and has not been unzipped"); + return null; } } diff --git a/backend/src/main/java/ervu/dao/okopf/OkopfDao.java b/backend/src/main/java/ervu/dao/okopf/OkopfDao.java index b576f826..fafa7689 100644 --- a/backend/src/main/java/ervu/dao/okopf/OkopfDao.java +++ b/backend/src/main/java/ervu/dao/okopf/OkopfDao.java @@ -10,7 +10,7 @@ import ervu.model.okopf.OkopfModel; * @author Artyom Hackimullin */ public interface OkopfDao { - void save(List recordModels); + void saveOrUpdate(List recordModels); String fetchTitleByLeg(String leg); } diff --git a/backend/src/main/java/ervu/dao/okopf/OkopfDaoImpl.java b/backend/src/main/java/ervu/dao/okopf/OkopfDaoImpl.java index aa7fa197..5c732bdf 100644 --- a/backend/src/main/java/ervu/dao/okopf/OkopfDaoImpl.java +++ b/backend/src/main/java/ervu/dao/okopf/OkopfDaoImpl.java @@ -1,8 +1,11 @@ package ervu.dao.okopf; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import ervu.model.okopf.OkopfModel; +import ervu_lkrp_ul.ervu_lkrp_ul.db_beans.public_.tables.records.OkopfRecordsRecord; import org.jooq.DSLContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; @@ -20,18 +23,12 @@ public class OkopfDaoImpl implements OkopfDao { private DSLContext dsl; @Override - public void save(List recordModels) { - var queries = recordModels.stream().map(record -> - dsl.insertInto(OKOPF_RECORDS, OKOPF_RECORDS.OKOPF_RECORDS_ID, OKOPF_RECORDS.NAME, OKOPF_RECORDS.VERSION) - .values(record.getCode(), record.getName(), record.getVersion()) - .onConflict(OKOPF_RECORDS.OKOPF_RECORDS_ID) - .doUpdate() - .set(OKOPF_RECORDS.NAME, record.getName()) - .set(OKOPF_RECORDS.VERSION, record.getVersion()) - .where(OKOPF_RECORDS.OKOPF_RECORDS_ID.eq(record.getCode())) - ).toList(); - - dsl.batch(queries).execute(); + public void saveOrUpdate(List okopfModels) { + deleteNotActualOkopfRecords(okopfModels); + dsl.batchUpdate(okopfModels.stream() + .map(this::mapOkopfModelToRecord) + .toList()) + .execute(); } @Override @@ -41,4 +38,23 @@ public class OkopfDaoImpl implements OkopfDao { .where(OKOPF_RECORDS.OKOPF_RECORDS_ID.eq(leg)) .fetchOne(OKOPF_RECORDS.NAME); } + + private void deleteNotActualOkopfRecords(List recordModels) { + Set ids = recordModels + .stream() + .map(OkopfModel::getCode) + .collect(Collectors.toSet()); + + dsl.deleteFrom(OKOPF_RECORDS) + .where(OKOPF_RECORDS.OKOPF_RECORDS_ID.notIn(ids)) + .execute(); + } + + private OkopfRecordsRecord mapOkopfModelToRecord(OkopfModel model) { + OkopfRecordsRecord record = dsl.newRecord(OKOPF_RECORDS); + record.setValue(OKOPF_RECORDS.OKOPF_RECORDS_ID, model.getCode()); + record.setValue(OKOPF_RECORDS.NAME, model.getName()); + record.setValue(OKOPF_RECORDS.VERSION, model.getVersion()); + return record; + } } diff --git a/backend/src/main/java/ervu/service/scheduler/EsnsiOkopfSchedulerServiceImpl.java b/backend/src/main/java/ervu/service/scheduler/EsnsiOkopfSchedulerServiceImpl.java index cdd4da77..1263d883 100644 --- a/backend/src/main/java/ervu/service/scheduler/EsnsiOkopfSchedulerServiceImpl.java +++ b/backend/src/main/java/ervu/service/scheduler/EsnsiOkopfSchedulerServiceImpl.java @@ -5,7 +5,6 @@ import java.util.List; import java.util.stream.Stream; import javax.annotation.PostConstruct; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import ervu.client.okopf.EsnsiOkopfClient; import ervu.dao.okopf.OkopfDao; @@ -24,6 +23,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import static org.springframework.scheduling.config.ScheduledTaskRegistrar.CRON_DISABLED; +import static org.springframework.util.StringUtils.hasText; /** @@ -48,11 +48,11 @@ public class EsnsiOkopfSchedulerServiceImpl implements EsnsiOkopfSchedulerServic @Transactional public void init() { if (!cronLoad.equals(CRON_DISABLED)) { - logger.info("Synchronization with OKOPF enabled"); + logger.info("Synchronization with esnsi okopf enabled"); load(); } else { - logger.info("Synchronization with OKOPF disabled"); + logger.info("Synchronization with esnsi okopf disabled"); } } @@ -60,20 +60,25 @@ public class EsnsiOkopfSchedulerServiceImpl implements EsnsiOkopfSchedulerServic @SchedulerLock(name = "loadOkopf") @Transactional public void load() { + logger.info("Loading okopf file"); + String data = esnsiOkopfClient.getJsonOkopFormData(); try { - logger.info("Loading okopf file"); - String data = esnsiOkopfClient.getJsonOkopFormData(); - OkopfOrgModel orgModel = mapper.readValue(data, OkopfOrgModel.class); - int currentVersion = mapper.readTree(data).findValue("version").asInt(); - List okopfRecords = mapToOkopfRecords(orgModel.getData(), currentVersion); - okopfDao.save(okopfRecords); + if (hasText(data)) { + logger.info("Beginning to save okopf data"); + OkopfOrgModel orgModel = mapper.readValue(data, OkopfOrgModel.class); + int currentVersion = mapper.readTree(data).findValue("version").asInt(); + List okopfRecords = mapToOkopfRecords(orgModel.getData(), currentVersion); + okopfDao.saveOrUpdate(okopfRecords); + logger.info("Saved okopf data"); + } } - catch (JsonProcessingException e) { + catch (Exception e) { throw new RuntimeException(e); } } private List mapToOkopfRecords(OkopfDataModel dataModel, int version) { + logger.info("Parsing from json file to okopf model"); return Arrays.stream(dataModel.getDetails()) .flatMap(detail -> { OkopfAttributeValueModel[] attributeValues = detail.getAttributeValues(); diff --git a/config.md b/config.md index 90604b6f..7e14c875 100644 --- a/config.md +++ b/config.md @@ -783,7 +783,12 @@ JBPM использует 3 корневых категории логирова #### Взаимодействие с ЕСНСИ в части получения справочника ОКОПФ - `ESNSI_OKOPF_URL` - url который обращается к еснси для получения справочника и скачивает данные спровочников организации в виде заархивированного json файла. -- `ESNSI_OKOPF_CRON_LOAD` - настройка, которая указывет расписание для загрузки справочника окопф и сохранение данных по справкам в БД +- `ESNSI_OKOPF_CRON_LOAD` - настройка, которая указывет расписание для загрузки справочника окопф и + сохранение данных по справкам в БД +- `ESNSI_OKOPF_RETRY_DELAY_LOAD` - настройка, которая указывет на повторную попытку загрузить + справочник окопф с задержкой +- `ESNSI_OKOPF_RETRY_MAX_ATTEMPTS_LOAD` - настройка, которая указывет на максимальное кол-во попыток + повторно загрузить справочник окопф #### Взаимодействие с WebDav diff --git a/config/micord.env b/config/micord.env index 7316852b..dea33f20 100644 --- a/config/micord.env +++ b/config/micord.env @@ -35,12 +35,18 @@ ERVU_KAFKA_JOURNAL_REPLY_TOPIC=ervu.organization.journal.response DB.JOURNAL.EXCLUDED.STATUSES=Направлено в ЕРВУ,Получен ЕРВУ ESNSI_OKOPF_URL=https://esnsi.gosuslugi.ru/rest/ext/v1/classifiers/11465/file?extension=JSON&encoding=UTF_8 ESNSI_OKOPF_CRON_LOAD=0 0 */1 * * * +ESNSI_OKOPF_RETRY_MAX_ATTEMPTS_LOAD=3 +ESNSI_OKOPF_RETRY_DELAY_LOAD=1000 ERVU_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT ERVU_KAFKA_SASL_MECHANISM=SCRAM-SHA-256 ERVU_KAFKA_USERNAME=user1 ERVU_KAFKA_PASSWORD=Blfi9d2OFG ERVU_KAFKA_EXCERPT_REPLY_TOPIC=ervu.lkrp.excerpt.response ERVU_KAFKA_EXCERPT_REQUEST_TOPIC=ervu.lkrp.excerpt.request +ESNSI_OKOPF_URL=https://esnsi.gosuslugi.ru/rest/ext/v1/classifiers/16271/file?extension=JSON&encoding=UTF_8 +ESNSI_OKOPF_CRON_LOAD=0 0 */1 * * * +ESNSI_OKOPF_RETRY_MAX_ATTEMPTS_LOAD=3 +ESNSI_OKOPF_RETRY_DELAY_LOAD=30000 ERVU_FILE_UPLOAD_MAX_FILE_SIZE=5242880 ERVU_FILE_UPLOAD_MAX_REQUEST_SIZE=6291456 diff --git a/config/standalone/dev/standalone.xml b/config/standalone/dev/standalone.xml index 0d8881d9..b595148b 100644 --- a/config/standalone/dev/standalone.xml +++ b/config/standalone/dev/standalone.xml @@ -83,7 +83,9 @@ - + + +