SUPPORT-9416: listen application-status from kafka

This commit is contained in:
adel.ka 2025-09-18 16:31:15 +03:00
parent a1563263c1
commit 38a95b7ec0
14 changed files with 176 additions and 161 deletions

View file

@ -202,10 +202,10 @@ export class UserManagementService extends Behavior {
this.httpClient.post(url, request).toPromise()
.then((response: ProcessResponse) => {
let code = response.code;
if (code !== '200') {
this.saveError(appNumber, response.msg);
}
this.statusUpdateService.trackApplication(appNumber);
})
.catch(reason => {
console.error("Error while executing request:", reason.toString());

View file

@ -1,9 +1,6 @@
import {Injectable, OnDestroy} from "@angular/core";
import {Subject} from "rxjs";
import {HttpClient} from "@angular/common/http";
import {WebsocketService} from "../websocket/websocket.service";
import {StatusUpdateService} from "./status-update.service";
import {ErvuPermission} from "../enum/ErvuPermission";
export interface UserSession {
userId: string,
@ -14,14 +11,13 @@ export interface UserSession {
}
@Injectable({providedIn: 'root'})
export class AuthorizationService implements OnDestroy {
export class AuthorizationService{
private session: UserSession;
public onSessionUpdate: Subject<UserSession> = new Subject<UserSession>();
constructor(protected httpClient: HttpClient, protected websocketService: WebsocketService,
protected statusUpdateService: StatusUpdateService) {}
constructor(protected httpClient: HttpClient) {}
public getCurrentSession(): Promise<any> {
if (this.session) return new Promise(resolve => resolve(this.session));
@ -30,18 +26,6 @@ export class AuthorizationService implements OnDestroy {
.then((session: UserSession) => {
this.session = session;
this.onSessionUpdate.next(session);
if (this.hasPermission(ErvuPermission.APPROVER)) {
this.websocketService.subscribe(({data}) => {
let parsedObj = JSON.parse(data);
if (parsedObj && parsedObj.body && parsedObj.body.applicationNumber) {
if (parsedObj.className === 'update' || parsedObj.className === 'processError') {
this.statusUpdateService.update(parsedObj);
}
}
});
}
return session;
})
}
@ -77,8 +61,4 @@ export class AuthorizationService implements OnDestroy {
getPermissions(): string[] {
return this.isAuthorized() ? this.session.permissions : null;
}
ngOnDestroy(): void {
this.websocketService.unsubscribe();
}
}

View file

@ -2,9 +2,15 @@ import {Injectable} from "@angular/core";
import {BehaviorSubject} from "rxjs";
import {HttpClient} from "@angular/common/http";
export interface StatusResponse {
appNumber: number;
status: string;
}
enum ApplicationStatus {
AGREED = 'Согласована',
ACCEPTED = 'Исполнена'
ACCEPTED = 'Исполнена',
SENT = 'Отправлена'
}
@Injectable({providedIn: 'root'})
@ -14,19 +20,57 @@ export class StatusUpdateService {
}
public statusMessage = new BehaviorSubject<any>(null);
private pendingApplications = new Set<number>();
private pollingInterval: any;
public update(responseObj: any): void {
this.httpClient.put<number>('status', responseObj)
.toPromise()
.then(appNumber => this.publishStatus(appNumber, responseObj.className === 'update'))
.catch(err => console.log('failed to update application status', err));
public trackApplication(appNumber: number): void {
this.pendingApplications.add(appNumber);
this.startPolling();
}
public publishStatus(appNumber: number, accepted: boolean) {
this.statusMessage.next(
{
appNumber: appNumber,
status: accepted ? ApplicationStatus.ACCEPTED : ApplicationStatus.AGREED
});
this.statusMessage.next({
appNumber: appNumber,
status: accepted ? ApplicationStatus.ACCEPTED : ApplicationStatus.AGREED
});
}
}
private startPolling(): void {
if (this.pendingApplications.size === 0 || this.pollingInterval) return;
this.pollingInterval = setInterval(() => {
this.checkPendingStatuses();
}, 5000);
}
private stopPolling(): void {
if (this.pollingInterval) {
clearInterval(this.pollingInterval);
this.pollingInterval = null;
}
}
private checkPendingStatuses(): void {
const appNumbers = Array.from(this.pendingApplications);
if (appNumbers.length === 0) {
this.stopPolling();
return;
}
this.httpClient.post<StatusResponse[]>('status/batch', appNumbers)
.toPromise()
.then(responses => {
responses.forEach(response => {
if (response.status !== 'SENT') {
this.pendingApplications.delete(response.appNumber);
this.publishStatus(response.appNumber, response.status === 'ACCEPTED');
}
});
if (this.pendingApplications.size === 0) {
this.stopPolling();
}
})
.catch(err => console.error('Failed to check statuses', err));
}
}

View file

@ -1,37 +0,0 @@
import {Injectable} from "@angular/core";
@Injectable({providedIn: 'root'})
export class WebsocketService {
private initialData;
public subscribe(fn: Function): void {
let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data");
const data = property.get;
this.initialData = data;
// wrapper that replaces getter
function lookAtMessage() {
let socket = this.currentTarget instanceof WebSocket;
if (!socket || !this.currentTarget.url.endsWith('notifier.message.send.push')) {
return data.call(this);
}
let msg = data.call(this);
Object.defineProperty(this, "data", { value: msg }); //anti-loop
fn({ data: msg, socket: this.currentTarget, event: this });
return msg;
}
property.get = lookAtMessage;
Object.defineProperty(MessageEvent.prototype, "data", property);
}
public unsubscribe(): void {
let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data");
if (this.initialData) {
property.get = this.initialData;
Object.defineProperty(MessageEvent.prototype, "data", property);
}
}
}