SUPPORT-9086: listen socket messages

This commit is contained in:
gulnaz 2025-04-15 16:07:05 +03:00
parent da8aab60f7
commit f7c14e39cc
11 changed files with 92 additions and 296 deletions

View file

@ -0,0 +1,54 @@
package ru.micord.ervu.account_applications.controller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import ru.micord.ervu.account_applications.security.service.EncryptionService;
import ru.micord.ervu.account_applications.service.UserApplicationListService;
import ru.micord.ervu.account_applications.websocket.dto.ProcessResponseDto;
/**
* @author gulnaz
*/
@RestController
public class UserApplicationController {
private static final Logger LOGGER = LoggerFactory.getLogger(UserApplicationController.class);
private final UserApplicationListService applicationService;
private final EncryptionService encryptionService;
public UserApplicationController(UserApplicationListService applicationService,
EncryptionService encryptionService) {
this.applicationService = applicationService;
this.encryptionService = encryptionService;
}
@PutMapping(value = "/status", consumes = MediaType.APPLICATION_JSON_VALUE)
public void updateStatus(@RequestBody ProcessResponseDto data) {
String traceId = data.traceId();
switch (data.className()) {
case UPDATE -> {
LOGGER.info("update by traceId = {}", traceId);
String tempPass = data.body().tempPass();
if (StringUtils.hasText(tempPass)) {
String encryptedPassword = encryptionService.encrypt(tempPass);
applicationService.savePassword(traceId, encryptedPassword);
}
else {
applicationService.saveAcceptedStatus(traceId);
}
}
case PROCESS_ERROR -> {
String msg = data.body().msg().message();
LOGGER.error("error by traceId = {}, message: {}", traceId, msg);
applicationService.saveError(traceId, msg);
}
}
}
}

View file

@ -1,34 +0,0 @@
package ru.micord.ervu.account_applications.security.listener;
import org.springframework.context.ApplicationListener;
import org.springframework.security.authentication.event.AuthenticationSuccessEvent;
import org.springframework.stereotype.Component;
import ru.micord.ervu.account_applications.security.model.jwt.UserSession;
import ru.micord.ervu.account_applications.security.model.jwt.authentication.JwtTokenAuthentication;
import ru.micord.ervu.account_applications.websocket.service.WebSocketService;
/**
* @author gulnaz
*/
@Component
public class SuccessfulAuthListener implements ApplicationListener<AuthenticationSuccessEvent> {
private static final String ADMIN_ROLE = "security_administrator";
private final WebSocketService webSocketService;
public SuccessfulAuthListener(WebSocketService webSocketService) {
this.webSocketService = webSocketService;
}
@Override
public void onApplicationEvent(AuthenticationSuccessEvent event) {
JwtTokenAuthentication authentication = (JwtTokenAuthentication) event.getAuthentication();
UserSession userSession = authentication.getUserSession();
boolean isAdmin = userSession.roles().stream()
.anyMatch(ervuRoleAuthority -> ervuRoleAuthority.getAuthority().equals(ADMIN_ROLE));
if (isAdmin) {
webSocketService.connectToSocket(userSession.userId(), authentication.getCredentials().toString());
}
}
}

View file

@ -1,28 +0,0 @@
package ru.micord.ervu.account_applications.websocket;
import java.util.HashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
/**
* @author gulnaz
*/
@Configuration
public class WebSocketConfig {
@Autowired
private SSLContext sslContext;
@Bean
public WebSocketClient webSocketClient() {
Map<String, Object> userProperties = new HashMap<>();
userProperties.put("org.apache.tomcat.websocket.SSL_CONTEXT", sslContext);
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
webSocketClient.setUserProperties(userProperties);
return webSocketClient;
}
}

View file

@ -1,11 +0,0 @@
package ru.micord.ervu.account_applications.websocket.exception;
/**
* @author gulnaz
*/
public class WebSocketConnectionException extends RuntimeException {
public WebSocketConnectionException(String message) {
super(message);
}
}

View file

@ -1,123 +0,0 @@
package ru.micord.ervu.account_applications.websocket.handler;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import ru.micord.ervu.account_applications.security.service.EncryptionService;
import ru.micord.ervu.account_applications.service.UserApplicationListService;
import ru.micord.ervu.account_applications.websocket.dto.ProcessResponseDto;
import ru.micord.ervu.account_applications.websocket.service.WebSocketService;
/**
* @author gulnaz
*/
@Component
public class ClientSocketHandler extends TextWebSocketHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TextWebSocketHandler.class);
private static final Map<String, WebSocketSession> sessionByUserId = new ConcurrentHashMap<>();
private static final Map<String, UserData> userDataBySessionId = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
private final UserApplicationListService applicationService;
private final EncryptionService encryptionService;
private final WebSocketService webSocketService;
public ClientSocketHandler(ObjectMapper objectMapper,
UserApplicationListService applicationService,
EncryptionService encryptionService,
@Lazy WebSocketService webSocketService) {
this.objectMapper = objectMapper;
this.applicationService = applicationService;
this.encryptionService = encryptionService;
this.webSocketService = webSocketService;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) {
LOGGER.info("established connection {}", session);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
throws JsonProcessingException {
ProcessResponseDto dto = objectMapper.readValue(message.getPayload().toString(),
ProcessResponseDto.class);
String traceId = dto.traceId();
switch (dto.className()) {
case UPDATE -> {
LOGGER.info("update by traceId = {}", traceId);
String tempPass = dto.body().tempPass();
if (StringUtils.hasText(tempPass)) {
String encryptedPassword = encryptionService.encrypt(tempPass);
applicationService.savePassword(traceId, encryptedPassword);
}
else {
applicationService.saveAcceptedStatus(traceId);
}
}
case PROCESS_ERROR -> {
String msg = dto.body().msg().message();
LOGGER.error("error by traceId = {}, message: {}", traceId, msg);
applicationService.saveError(traceId, msg);
}
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
LOGGER.error("Transport error {}", exception.getMessage());
try {
session.close();
}
catch (IOException e) {
LOGGER.error("Failed to close session on handleTransportError ", e);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
LOGGER.info("Connection closed");
if (!status.equals(CloseStatus.NORMAL)) {
try {
session.close();
}
catch (IOException e) {
LOGGER.error("Failed to close session on afterConnectionClosed ", e);
}
}
String sessionId = session.getId();
UserData userData = userDataBySessionId.get(sessionId);
userDataBySessionId.remove(sessionId);
webSocketService.connectToSocket(userData.userId(), userData.token());
}
public boolean isSessionOpen(String userId) {
return sessionByUserId.get(userId) != null && sessionByUserId.get(userId).isOpen();
}
public void putSession(String userId, WebSocketSession session) {
sessionByUserId.put(userId, session);
}
public void putUserData(String sessionId, String userId, String token) {
userDataBySessionId.put(sessionId, new UserData(userId, token));
}
private record UserData(String userId, String token) {}
}

View file

@ -1,75 +0,0 @@
package ru.micord.ervu.account_applications.websocket.service;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketClient;
import ru.micord.ervu.account_applications.websocket.exception.WebSocketConnectionException;
import ru.micord.ervu.account_applications.websocket.handler.ClientSocketHandler;
/**
* @author gulnaz
*/
@Service
public class WebSocketService {
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketService.class);
private final WebSocketClient webSocketClient;
private final WebSocketHandler webSocketHandler;
@Value("${ervu.url}")
private String ervuUrl;
@Value("${ervu.socket.queue:/service/notifier/gateway/notify/notifier.message.send.push}")
private String socketQueue;
@Value("${ervu.socket.connection_timeout:30}")
private long timeout;
public WebSocketService(WebSocketClient webSocketClient, WebSocketHandler webSocketHandler) {
this.webSocketClient = webSocketClient;
this.webSocketHandler = webSocketHandler;
}
@Retryable(
retryFor = {WebSocketConnectionException.class},
maxAttemptsExpression = "${socket.connect.max_attempts:3}")
public void connectToSocket(String userId, String token) {
ClientSocketHandler socketHandler = (ClientSocketHandler) this.webSocketHandler;
if (socketHandler.isSessionOpen(userId)) {
return;
}
WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
headers.set("Content-Type", "application/json");
headers.add("Authorization", "Bearer " + token);
headers.add("Cookie", "JWT=" + token); // to listen private messages
try {
String host = new URI(ervuUrl).getHost();
WebSocketSession session = webSocketClient.doHandshake(this.webSocketHandler, headers,
URI.create("wss://" + host + socketQueue)).get(timeout, TimeUnit.SECONDS);
socketHandler.putSession(userId, session);
socketHandler.putUserData(session.getId(), userId, token);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Failed to connect socket", e);
LOGGER.error("Failed to connect socket on retry {}",
RetrySynchronizationManager.getContext().getRetryCount());
throw new WebSocketConnectionException(e.getMessage());
}
catch (URISyntaxException e) {
LOGGER.error("Failed to parse url: {}", ervuUrl, e);
}
}
}

View file

@ -1,3 +0,0 @@
export class SocketProvider {
public getSocket(): Promise<any> { return null }
}

View file

@ -1,7 +1,7 @@
import {Injectable} from "@angular/core";
import {Subject} from "rxjs";
import {HttpClient} from "@angular/common/http";
import {SocketProvider} from "../provider/socket.provider";
import {WebsocketService} from "../websocket/websocket.service";
export interface UserSession {
userId: string,
@ -18,28 +18,29 @@ export class AuthorizationService {
public onSessionUpdate: Subject<UserSession> = new Subject<UserSession>();
constructor(protected httpClient: HttpClient, protected socketProvider: SocketProvider) {}
constructor(protected httpClient: HttpClient, protected websocketService: WebsocketService) {}
public getCurrentSession(): Promise<any> {
if (this.session) return new Promise(resolve => resolve(this.session));
//TODO remove after test
this.listenSocket();
return this.httpClient.get('session')
.toPromise()
.then((session: UserSession) => {
this.session = session;
this.onSessionUpdate.next(session);
if (this.hasRole('security_administrator')) {
this.websocketService.listen(({data}) => {
let parsedObj = JSON.parse(data);
if (parsedObj && parsedObj.traceId) {
this.httpClient.put('status', data);
}
});
}
return session;
})
}
private async listenSocket(): Promise<void> {
const webSocketPlugin = await this.socketProvider.getSocket();
webSocketPlugin.addListener((e: MessageEvent) => {
console.log('socket msg!!!', JSON.parse(e.data));
});
}
isAuthorized(): boolean {
return !!this.session;
}

View file

@ -0,0 +1,26 @@
import {Injectable} from "@angular/core";
@Injectable({providedIn: 'root'})
export class WebsocketService {
public listen(fn){
fn = fn || console.log;
let property = Object.getOwnPropertyDescriptor(MessageEvent.prototype, "data");
const data = property.get;
// wrapper that replaces getter
function lookAtMessage() {
let socket = this.currentTarget instanceof WebSocket;
if (!socket) {
return data.call(this);
}
let msg = data.call(this);
Object.defineProperty(this, "data", { value: msg }); //anti-loop
fn({ data: msg, socket: this.currentTarget, event: this });
return msg;
}
property.get = lookAtMessage;
Object.defineProperty(MessageEvent.prototype, "data", property);
}
}

View file

@ -28,8 +28,6 @@ import {TokenProvider} from "../app/provider/token.provider";
import {MfeTokenProvider} from "./provider/mfe-token.provider";
import {DEFAULT_HTTP_INTERCEPTOR_PROVIDERS} from "./interceptor/mfe-default-interceptors.prod";
import {MfeOverlayContainer} from "./overlay/mfe-overlay-container.service";
import {MfeSocketProvider} from "./provider/mfe-socket-provider";
import {SocketProvider} from "../app/provider/socket.provider";
let IMPORTS = [
@ -63,7 +61,6 @@ let IMPORTS = [
{provide: RolesGuard, useClass: MfeRolesGuard},
{provide: TokenProvider, useClass: MfeTokenProvider},
{provide: OverlayContainer, useClass: MfeOverlayContainer},
{provide: SocketProvider, useClass: MfeSocketProvider},
DEFAULT_HTTP_INTERCEPTOR_PROVIDERS
],
bootstrap: [

View file

@ -1,8 +0,0 @@
import {fireMfeEventToContainer} from "../../../mfe-app-tools";
import {SocketProvider} from "../../app/provider/socket.provider";
export class MfeSocketProvider extends SocketProvider {
getSocket(): Promise<any> {
return fireMfeEventToContainer('ws-request', {});
}
}