From 38a95b7ec092c32f2c2083ebff82304b7270d601 Mon Sep 17 00:00:00 2001 From: "adel.ka" Date: Thu, 18 Sep 2025 16:31:15 +0300 Subject: [PATCH] SUPPORT-9416: listen application-status from kafka --- .../controller/UserApplicationController.java | 55 +++++---------- .../dao/UserApplicationListDao.java | 19 ++++++ .../kafka/ApplicationStatusListener.java | 57 ++++++++++++++++ .../kafka/model/ApplicationStatus.java | 14 ++++ .../model/StatusResponse.java | 6 ++ .../service/UserApplicationListService.java | 5 ++ .../websocket/dto/ProcessErrorMsg.java | 10 --- .../websocket/dto/ProcessResponseBody.java | 13 ---- .../websocket/dto/ProcessResponseDto.java | 11 --- .../websocket/enums/ClassName.java | 16 ----- .../component/button/UserManagementService.ts | 2 +- .../app/service/authorization.service.ts | 24 +------ .../app/service/status-update.service.ts | 68 +++++++++++++++---- .../app/websocket/websocket.service.ts | 37 ---------- 14 files changed, 176 insertions(+), 161 deletions(-) create mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/kafka/ApplicationStatusListener.java create mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/kafka/model/ApplicationStatus.java create mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/model/StatusResponse.java delete mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessErrorMsg.java delete mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseBody.java delete mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseDto.java delete mode 100644 backend/src/main/java/ru/micord/ervu/account_applications/websocket/enums/ClassName.java delete mode 100644 frontend/src/ts/modules/app/websocket/websocket.service.ts diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/controller/UserApplicationController.java b/backend/src/main/java/ru/micord/ervu/account_applications/controller/UserApplicationController.java index a533865e..da341da6 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/controller/UserApplicationController.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/controller/UserApplicationController.java @@ -1,58 +1,35 @@ package ru.micord.ervu.account_applications.controller; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.http.MediaType; -import org.springframework.util.StringUtils; -import org.springframework.web.bind.annotation.PutMapping; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import ru.micord.ervu.account_applications.security.service.EncryptionService; +import ru.micord.ervu.account_applications.model.StatusResponse; import ru.micord.ervu.account_applications.service.UserApplicationListService; -import ru.micord.ervu.account_applications.websocket.dto.ProcessErrorMsg; -import ru.micord.ervu.account_applications.websocket.dto.ProcessResponseDto; /** * @author gulnaz */ @RestController public class UserApplicationController { - private static final Logger LOGGER = LoggerFactory.getLogger(UserApplicationController.class); - private final UserApplicationListService applicationService; - private final EncryptionService encryptionService; - public UserApplicationController(UserApplicationListService applicationService, - EncryptionService encryptionService) { + public UserApplicationController(UserApplicationListService applicationService) { this.applicationService = applicationService; - this.encryptionService = encryptionService; } - @PutMapping(value = "/status", consumes = MediaType.APPLICATION_JSON_VALUE) - public Long updateStatus(@RequestBody ProcessResponseDto data) { - Long appNumber = data.body().applicationNumber(); - switch (data.className()) { - case UPDATE -> { - LOGGER.info("update by appNumber = {}", appNumber); - String tempPass = data.body().tempPass(); - - if (StringUtils.hasText(tempPass)) { - String encryptedPassword = encryptionService.encrypt(tempPass); - applicationService.savePassword(appNumber, encryptedPassword); - } - else { - applicationService.saveAcceptedStatus(appNumber); - } - } - case PROCESS_ERROR -> { - ProcessErrorMsg errorMsg = data.body().msg(); - String msg = errorMsg == null ? "unknown error" : errorMsg.message(); - LOGGER.error("error by appNumber = {}, message: {}", appNumber, msg); - applicationService.saveError(appNumber, msg); - } - } - - return appNumber; + @PostMapping("/status/batch") + public List getBatchStatuses(@RequestBody List appNumbers) { + Map statuses = applicationService.getStatusesBatch(appNumbers); + return appNumbers.stream() + .map(appNumber -> new StatusResponse( + appNumber, + statuses.get(appNumber) + )) + .collect(Collectors.toList()); } } diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/dao/UserApplicationListDao.java b/backend/src/main/java/ru/micord/ervu/account_applications/dao/UserApplicationListDao.java index d905ab7a..41742d73 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/dao/UserApplicationListDao.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/dao/UserApplicationListDao.java @@ -2,6 +2,9 @@ package ru.micord.ervu.account_applications.dao; import java.sql.Timestamp; import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.jooq.DSLContext; import org.springframework.stereotype.Repository; @@ -23,6 +26,22 @@ public class UserApplicationListDao { this.dslContext = dslContext; } + public Map getStatusesBatch(List appNumbers) { + if (appNumbers == null || appNumbers.isEmpty()) { + return Map.of(); + } + + return dslContext.select(USER_APPLICATION_LIST.NUMBER_APP, USER_APPLICATION_LIST.APPLICATION_STATUS) + .from(USER_APPLICATION_LIST) + .where(USER_APPLICATION_LIST.NUMBER_APP.in(appNumbers)) + .fetch() + .stream() + .collect(Collectors.toMap( + record -> record.get(USER_APPLICATION_LIST.NUMBER_APP), + record -> record.get(USER_APPLICATION_LIST.APPLICATION_STATUS) + )); + } + public void savePassword(Long appNumber, String encodedPass) { dslContext.update(USER_APPLICATION_LIST) .set(USER_APPLICATION_LIST.APPLICATION_STATUS, ACCEPTED.name()) diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ApplicationStatusListener.java b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ApplicationStatusListener.java new file mode 100644 index 00000000..777e0252 --- /dev/null +++ b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/ApplicationStatusListener.java @@ -0,0 +1,57 @@ +package ru.micord.ervu.account_applications.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import ru.micord.ervu.account_applications.kafka.model.ApplicationStatus; +import ru.micord.ervu.account_applications.security.service.EncryptionService; +import ru.micord.ervu.account_applications.service.UserApplicationListService; + +/** + * @author Adel Kalimullin + */ +@Component +public class ApplicationStatusListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationStatusListener.class); + private final ObjectMapper mapper; + private final UserApplicationListService applicationService; + private final EncryptionService encryptionService; + + public ApplicationStatusListener(ObjectMapper mapper, + UserApplicationListService applicationService, EncryptionService encryptionService) { + this.mapper = mapper; + this.applicationService = applicationService; + this.encryptionService = encryptionService; + } + + @KafkaListener(id = "${kafka.application.status.group.id}", topics = "${kafka.application.status}") + public void listenKafkaDomain(String kafkaMessage) { + try { + ApplicationStatus applicationStatus = mapper.readValue(kafkaMessage, ApplicationStatus.class); + Long applicationNumber = applicationStatus.applicationNumber(); + if (applicationStatus.status()) { + LOGGER.info("update by appNumber = {}", applicationNumber); + String tempPass = applicationStatus.password(); + if (StringUtils.hasText(tempPass)) { + String encryptedPassword = encryptionService.encrypt(tempPass); + applicationService.savePassword(applicationNumber, encryptedPassword); + } + else { + applicationService.saveAcceptedStatus(applicationNumber); + } + } + else { + String errorMsg = applicationStatus.errorMsg(); + LOGGER.error("error by appNumber = {}, message: {}", applicationNumber, errorMsg); + applicationService.saveError(applicationNumber, errorMsg); + } + } + catch (JsonProcessingException e) { + LOGGER.error("Failed to deserialize message: {}", kafkaMessage, e); + } + } +} diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/kafka/model/ApplicationStatus.java b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/model/ApplicationStatus.java new file mode 100644 index 00000000..5ef0a452 --- /dev/null +++ b/backend/src/main/java/ru/micord/ervu/account_applications/kafka/model/ApplicationStatus.java @@ -0,0 +1,14 @@ +package ru.micord.ervu.account_applications.kafka.model; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * @author Adel Kalimullin + */ +public record ApplicationStatus( + @JsonProperty("applicationNumber") Long applicationNumber, + @JsonProperty("password") String password, + @JsonProperty("status") boolean status, + @JsonProperty("userName") String userName, + @JsonProperty("description") String errorMsg +) {} \ No newline at end of file diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/model/StatusResponse.java b/backend/src/main/java/ru/micord/ervu/account_applications/model/StatusResponse.java new file mode 100644 index 00000000..1a9444bc --- /dev/null +++ b/backend/src/main/java/ru/micord/ervu/account_applications/model/StatusResponse.java @@ -0,0 +1,6 @@ +package ru.micord.ervu.account_applications.model; + +/** + * @author Adel Kalimullin + */ +public record StatusResponse(Long appNumber, String status) {} \ No newline at end of file diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/service/UserApplicationListService.java b/backend/src/main/java/ru/micord/ervu/account_applications/service/UserApplicationListService.java index 9735df52..96c27042 100644 --- a/backend/src/main/java/ru/micord/ervu/account_applications/service/UserApplicationListService.java +++ b/backend/src/main/java/ru/micord/ervu/account_applications/service/UserApplicationListService.java @@ -3,6 +3,7 @@ package ru.micord.ervu.account_applications.service; import java.sql.Timestamp; import java.util.Date; import java.util.List; +import java.util.Map; import org.springframework.stereotype.Service; import ru.micord.ervu.account_applications.component.dao.AuditDao; @@ -30,6 +31,10 @@ public class UserApplicationListService { this.securityContext = securityContext; } + public Map getStatusesBatch(List appNumbers) { + return dao.getStatusesBatch(appNumbers); + } + public void savePassword(Long appNumber, String encodedPass) { dao.savePassword(appNumber, encodedPass); saveAuditStatusByAppNumber(appNumber, ACCEPTED.name()); diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessErrorMsg.java b/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessErrorMsg.java deleted file mode 100644 index f6736468..00000000 --- a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessErrorMsg.java +++ /dev/null @@ -1,10 +0,0 @@ -package ru.micord.ervu.account_applications.websocket.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; - -/** - * @author gulnaz - */ -@JsonIgnoreProperties(ignoreUnknown = true) -public record ProcessErrorMsg(String message) { -} diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseBody.java b/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseBody.java deleted file mode 100644 index cbda2a0d..00000000 --- a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseBody.java +++ /dev/null @@ -1,13 +0,0 @@ -package ru.micord.ervu.account_applications.websocket.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * @author gulnaz - */ -@JsonIgnoreProperties(ignoreUnknown = true) -public record ProcessResponseBody(String type, Long applicationNumber, String userName, - @JsonProperty("value") String tempPass, - String secretLink, ProcessErrorMsg msg) { -} diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseDto.java b/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseDto.java deleted file mode 100644 index c801403e..00000000 --- a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/dto/ProcessResponseDto.java +++ /dev/null @@ -1,11 +0,0 @@ -package ru.micord.ervu.account_applications.websocket.dto; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import ru.micord.ervu.account_applications.websocket.enums.ClassName; - -/** - * @author gulnaz - */ -@JsonIgnoreProperties(ignoreUnknown = true) -public record ProcessResponseDto(String forUser, ClassName className, ProcessResponseBody body) { -} diff --git a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/enums/ClassName.java b/backend/src/main/java/ru/micord/ervu/account_applications/websocket/enums/ClassName.java deleted file mode 100644 index 5eb35739..00000000 --- a/backend/src/main/java/ru/micord/ervu/account_applications/websocket/enums/ClassName.java +++ /dev/null @@ -1,16 +0,0 @@ -package ru.micord.ervu.account_applications.websocket.enums; - -import com.fasterxml.jackson.annotation.JsonEnumDefaultValue; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * @author gulnaz - */ -public enum ClassName { - @JsonProperty("update") - UPDATE, - @JsonProperty("processError") - PROCESS_ERROR, - @JsonEnumDefaultValue - SKIP -} diff --git a/frontend/src/ts/account_applications/component/button/UserManagementService.ts b/frontend/src/ts/account_applications/component/button/UserManagementService.ts index 64a2df0d..76956e32 100644 --- a/frontend/src/ts/account_applications/component/button/UserManagementService.ts +++ b/frontend/src/ts/account_applications/component/button/UserManagementService.ts @@ -202,10 +202,10 @@ export class UserManagementService extends Behavior { this.httpClient.post(url, request).toPromise() .then((response: ProcessResponse) => { let code = response.code; - if (code !== '200') { this.saveError(appNumber, response.msg); } + this.statusUpdateService.trackApplication(appNumber); }) .catch(reason => { console.error("Error while executing request:", reason.toString()); diff --git a/frontend/src/ts/modules/app/service/authorization.service.ts b/frontend/src/ts/modules/app/service/authorization.service.ts index ca9086d8..57f83c27 100644 --- a/frontend/src/ts/modules/app/service/authorization.service.ts +++ b/frontend/src/ts/modules/app/service/authorization.service.ts @@ -1,9 +1,6 @@ import {Injectable, OnDestroy} from "@angular/core"; import {Subject} from "rxjs"; import {HttpClient} from "@angular/common/http"; -import {WebsocketService} from "../websocket/websocket.service"; -import {StatusUpdateService} from "./status-update.service"; -import {ErvuPermission} from "../enum/ErvuPermission"; export interface UserSession { userId: string, @@ -14,14 +11,13 @@ export interface UserSession { } @Injectable({providedIn: 'root'}) -export class AuthorizationService implements OnDestroy { +export class AuthorizationService{ private session: UserSession; public onSessionUpdate: Subject = new Subject(); - constructor(protected httpClient: HttpClient, protected websocketService: WebsocketService, - protected statusUpdateService: StatusUpdateService) {} + constructor(protected httpClient: HttpClient) {} public getCurrentSession(): Promise { if (this.session) return new Promise(resolve => resolve(this.session)); @@ -30,18 +26,6 @@ export class AuthorizationService implements OnDestroy { .then((session: UserSession) => { this.session = session; this.onSessionUpdate.next(session); - - if (this.hasPermission(ErvuPermission.APPROVER)) { - this.websocketService.subscribe(({data}) => { - let parsedObj = JSON.parse(data); - - if (parsedObj && parsedObj.body && parsedObj.body.applicationNumber) { - if (parsedObj.className === 'update' || parsedObj.className === 'processError') { - this.statusUpdateService.update(parsedObj); - } - } - }); - } return session; }) } @@ -77,8 +61,4 @@ export class AuthorizationService implements OnDestroy { getPermissions(): string[] { return this.isAuthorized() ? this.session.permissions : null; } - - ngOnDestroy(): void { - this.websocketService.unsubscribe(); - } } diff --git a/frontend/src/ts/modules/app/service/status-update.service.ts b/frontend/src/ts/modules/app/service/status-update.service.ts index 93034e00..7a23925d 100644 --- a/frontend/src/ts/modules/app/service/status-update.service.ts +++ b/frontend/src/ts/modules/app/service/status-update.service.ts @@ -2,9 +2,15 @@ import {Injectable} from "@angular/core"; import {BehaviorSubject} from "rxjs"; import {HttpClient} from "@angular/common/http"; +export interface StatusResponse { + appNumber: number; + status: string; +} + enum ApplicationStatus { AGREED = 'Согласована', - ACCEPTED = 'Исполнена' + ACCEPTED = 'Исполнена', + SENT = 'Отправлена' } @Injectable({providedIn: 'root'}) @@ -14,19 +20,57 @@ export class StatusUpdateService { } public statusMessage = new BehaviorSubject(null); + private pendingApplications = new Set(); + private pollingInterval: any; - public update(responseObj: any): void { - this.httpClient.put('status', responseObj) - .toPromise() - .then(appNumber => this.publishStatus(appNumber, responseObj.className === 'update')) - .catch(err => console.log('failed to update application status', err)); + public trackApplication(appNumber: number): void { + this.pendingApplications.add(appNumber); + this.startPolling(); } public publishStatus(appNumber: number, accepted: boolean) { - this.statusMessage.next( - { - appNumber: appNumber, - status: accepted ? ApplicationStatus.ACCEPTED : ApplicationStatus.AGREED - }); + this.statusMessage.next({ + appNumber: appNumber, + status: accepted ? ApplicationStatus.ACCEPTED : ApplicationStatus.AGREED + }); } -} + + private startPolling(): void { + if (this.pendingApplications.size === 0 || this.pollingInterval) return; + + this.pollingInterval = setInterval(() => { + this.checkPendingStatuses(); + }, 5000); + } + + private stopPolling(): void { + if (this.pollingInterval) { + clearInterval(this.pollingInterval); + this.pollingInterval = null; + } + } + + private checkPendingStatuses(): void { + const appNumbers = Array.from(this.pendingApplications); + if (appNumbers.length === 0) { + this.stopPolling(); + return; + } + + this.httpClient.post('status/batch', appNumbers) + .toPromise() + .then(responses => { + responses.forEach(response => { + if (response.status !== 'SENT') { + this.pendingApplications.delete(response.appNumber); + this.publishStatus(response.appNumber, response.status === 'ACCEPTED'); + } + }); + + if (this.pendingApplications.size === 0) { + this.stopPolling(); + } + }) + .catch(err => console.error('Failed to check statuses', err)); + } +} \ No newline at end of file diff --git a/frontend/src/ts/modules/app/websocket/websocket.service.ts b/frontend/src/ts/modules/app/websocket/websocket.service.ts deleted file mode 100644 index 41f68924..00000000 --- a/frontend/src/ts/modules/app/websocket/websocket.service.ts +++ /dev/null @@ -1,37 +0,0 @@ -import {Injectable} from "@angular/core"; - -@Injectable({providedIn: 'root'}) -export class WebsocketService { - - private initialData; - - public subscribe(fn: Function): void { - let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data"); - const data = property.get; - this.initialData = data; - - // wrapper that replaces getter - function lookAtMessage() { - let socket = this.currentTarget instanceof WebSocket; - - if (!socket || !this.currentTarget.url.endsWith('notifier.message.send.push')) { - return data.call(this); - } - let msg = data.call(this); - Object.defineProperty(this, "data", { value: msg }); //anti-loop - fn({ data: msg, socket: this.currentTarget, event: this }); - return msg; - } - property.get = lookAtMessage; - Object.defineProperty(MessageEvent.prototype, "data", property); - } - - public unsubscribe(): void { - let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data"); - - if (this.initialData) { - property.get = this.initialData; - Object.defineProperty(MessageEvent.prototype, "data", property); - } - } -}