Merge branch 'feature/SUPPORT-9416_listen_kafka' into develop
This commit is contained in:
commit
9378464f6a
15 changed files with 232 additions and 182 deletions
|
|
@ -7,7 +7,7 @@ import {
|
|||
TextField,
|
||||
Visible
|
||||
} from "@webbpm/base-package";
|
||||
import {HttpClient, HttpErrorResponse, HttpResponse} from "@angular/common/http";
|
||||
import {HttpClient} from "@angular/common/http";
|
||||
import {FormField} from "../field/FormField";
|
||||
import {AuthorizationService} from "../../../modules/app/service/authorization.service";
|
||||
import {ApplicationKind} from "../enum/ApplicationKind";
|
||||
|
|
@ -202,10 +202,12 @@ export class UserManagementService extends Behavior {
|
|||
this.httpClient.post(url, request).toPromise()
|
||||
.then((response: ProcessResponse) => {
|
||||
let code = response.code;
|
||||
|
||||
if (code !== '200') {
|
||||
this.saveError(appNumber, response.msg);
|
||||
}
|
||||
else {
|
||||
this.statusUpdateService.trackDeclaration(appNumber);
|
||||
}
|
||||
})
|
||||
.catch(reason => {
|
||||
console.error("Error while executing request:", reason.toString());
|
||||
|
|
|
|||
|
|
@ -1,9 +1,6 @@
|
|||
import {Injectable, OnDestroy} from "@angular/core";
|
||||
import {Injectable} from "@angular/core";
|
||||
import {Subject} from "rxjs";
|
||||
import {HttpClient} from "@angular/common/http";
|
||||
import {WebsocketService} from "../websocket/websocket.service";
|
||||
import {StatusUpdateService} from "./status-update.service";
|
||||
import {ErvuPermission} from "../enum/ErvuPermission";
|
||||
|
||||
export interface UserSession {
|
||||
userId: string,
|
||||
|
|
@ -14,14 +11,13 @@ export interface UserSession {
|
|||
}
|
||||
|
||||
@Injectable({providedIn: 'root'})
|
||||
export class AuthorizationService implements OnDestroy {
|
||||
export class AuthorizationService{
|
||||
|
||||
private session: UserSession;
|
||||
|
||||
public onSessionUpdate: Subject<UserSession> = new Subject<UserSession>();
|
||||
|
||||
constructor(protected httpClient: HttpClient, protected websocketService: WebsocketService,
|
||||
protected statusUpdateService: StatusUpdateService) {}
|
||||
constructor(protected httpClient: HttpClient) {}
|
||||
|
||||
public getCurrentSession(): Promise<any> {
|
||||
if (this.session) return new Promise(resolve => resolve(this.session));
|
||||
|
|
@ -30,18 +26,6 @@ export class AuthorizationService implements OnDestroy {
|
|||
.then((session: UserSession) => {
|
||||
this.session = session;
|
||||
this.onSessionUpdate.next(session);
|
||||
|
||||
if (this.hasPermission(ErvuPermission.APPROVER)) {
|
||||
this.websocketService.subscribe(({data}) => {
|
||||
let parsedObj = JSON.parse(data);
|
||||
|
||||
if (parsedObj && parsedObj.body && parsedObj.body.applicationNumber) {
|
||||
if (parsedObj.className === 'update' || parsedObj.className === 'processError') {
|
||||
this.statusUpdateService.update(parsedObj);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return session;
|
||||
})
|
||||
}
|
||||
|
|
@ -77,8 +61,4 @@ export class AuthorizationService implements OnDestroy {
|
|||
getPermissions(): string[] {
|
||||
return this.isAuthorized() ? this.session.permissions : null;
|
||||
}
|
||||
|
||||
ngOnDestroy(): void {
|
||||
this.websocketService.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,11 @@ import {Injectable} from "@angular/core";
|
|||
import {BehaviorSubject} from "rxjs";
|
||||
import {HttpClient} from "@angular/common/http";
|
||||
|
||||
export interface StatusResponse {
|
||||
appNumber: number;
|
||||
status: string;
|
||||
}
|
||||
|
||||
enum ApplicationStatus {
|
||||
AGREED = 'Согласована',
|
||||
ACCEPTED = 'Исполнена'
|
||||
|
|
@ -14,19 +19,82 @@ export class StatusUpdateService {
|
|||
}
|
||||
|
||||
public statusMessage = new BehaviorSubject<any>(null);
|
||||
private pendingDeclarations = new Map<number, number>();
|
||||
private pollingInterval: any;
|
||||
private readonly MAX_ATTEMPTS = 12;
|
||||
|
||||
public update(responseObj: any): void {
|
||||
this.httpClient.put<number>('status', responseObj)
|
||||
.toPromise()
|
||||
.then(appNumber => this.publishStatus(appNumber, responseObj.className === 'update'))
|
||||
.catch(err => console.log('failed to update application status', err));
|
||||
public trackDeclaration(appNumber: number): void {
|
||||
if (!this.pendingDeclarations.has(appNumber)) {
|
||||
this.pendingDeclarations.set(appNumber, 0);
|
||||
this.startPolling();
|
||||
}
|
||||
}
|
||||
|
||||
public publishStatus(appNumber: number, accepted: boolean) {
|
||||
this.statusMessage.next(
|
||||
{
|
||||
appNumber: appNumber,
|
||||
status: accepted ? ApplicationStatus.ACCEPTED : ApplicationStatus.AGREED
|
||||
this.statusMessage.next({
|
||||
appNumber: appNumber,
|
||||
status: accepted
|
||||
? ApplicationStatus.ACCEPTED
|
||||
: ApplicationStatus.AGREED
|
||||
});
|
||||
}
|
||||
|
||||
private startPolling(): void {
|
||||
if (this.pendingDeclarations.size === 0 || this.pollingInterval) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.pollingInterval = setInterval(() => {
|
||||
this.checkPendingStatuses();
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
private stopPolling(): void {
|
||||
if (this.pollingInterval) {
|
||||
clearInterval(this.pollingInterval);
|
||||
this.pollingInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
private checkPendingStatuses(): void {
|
||||
const appNumbers = Array.from(this.pendingDeclarations.keys());
|
||||
if (appNumbers.length === 0) {
|
||||
this.stopPolling();
|
||||
return;
|
||||
}
|
||||
|
||||
this.httpClient.post<StatusResponse[]>('status/batch', appNumbers)
|
||||
.toPromise()
|
||||
.then(responses => {
|
||||
responses.forEach(response => {
|
||||
const attemptCount = (this.pendingDeclarations.get(response.appNumber) || 0) + 1;
|
||||
this.pendingDeclarations.set(response.appNumber, attemptCount);
|
||||
|
||||
if (response.status !== 'SENT') {
|
||||
this.pendingDeclarations.delete(response.appNumber);
|
||||
this.publishStatus(response.appNumber, response.status === 'ACCEPTED');
|
||||
}
|
||||
else if (attemptCount >= this.MAX_ATTEMPTS) {
|
||||
this.pendingDeclarations.delete(response.appNumber);
|
||||
console.warn(`Max attempts exceeded for declaration ${response.appNumber}`);
|
||||
}
|
||||
});
|
||||
|
||||
if (this.pendingDeclarations.size === 0) {
|
||||
this.stopPolling();
|
||||
}
|
||||
})
|
||||
.catch(err => {
|
||||
console.error('Failed to check statuses', err);
|
||||
appNumbers.forEach(appNumber => {
|
||||
const attemptCount = (this.pendingDeclarations.get(appNumber) || 0) + 1;
|
||||
this.pendingDeclarations.set(appNumber, attemptCount);
|
||||
|
||||
if (attemptCount >= this.MAX_ATTEMPTS) {
|
||||
this.pendingDeclarations.delete(appNumber);
|
||||
console.warn(`Max attempts exceeded for declaration ${appNumber} due to errors`);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
import {Injectable} from "@angular/core";
|
||||
|
||||
@Injectable({providedIn: 'root'})
|
||||
export class WebsocketService {
|
||||
|
||||
private initialData;
|
||||
|
||||
public subscribe(fn: Function): void {
|
||||
let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data");
|
||||
const data = property.get;
|
||||
this.initialData = data;
|
||||
|
||||
// wrapper that replaces getter
|
||||
function lookAtMessage() {
|
||||
let socket = this.currentTarget instanceof WebSocket;
|
||||
|
||||
if (!socket || !this.currentTarget.url.endsWith('notifier.message.send.push')) {
|
||||
return data.call(this);
|
||||
}
|
||||
let msg = data.call(this);
|
||||
Object.defineProperty(this, "data", { value: msg }); //anti-loop
|
||||
fn({ data: msg, socket: this.currentTarget, event: this });
|
||||
return msg;
|
||||
}
|
||||
property.get = lookAtMessage;
|
||||
Object.defineProperty(MessageEvent.prototype, "data", property);
|
||||
}
|
||||
|
||||
public unsubscribe(): void {
|
||||
let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data");
|
||||
|
||||
if (this.initialData) {
|
||||
property.get = this.initialData;
|
||||
Object.defineProperty(MessageEvent.prototype, "data", property);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue