SUPPORT-9165: add wait for accumulator

This commit is contained in:
adel.kalimullin 2025-05-13 14:59:59 +03:00
parent bc0f2af3f1
commit 682d9313dc

View file

@ -12,6 +12,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import ervu_business_metrics.config.KafkaEnabledCondition; import ervu_business_metrics.config.KafkaEnabledCondition;
import ervu_business_metrics.model.sso.UserSessionInfo; import ervu_business_metrics.model.sso.UserSessionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -21,6 +23,7 @@ import org.springframework.stereotype.Component;
@Component @Component
@Conditional(KafkaEnabledCondition.class) @Conditional(KafkaEnabledCondition.class)
public class SessionResponseHolder { public class SessionResponseHolder {
private final Logger LOGGER = LoggerFactory.getLogger(SessionResponseHolder.class);
private final Map<String, SessionAccumulator> sessionsMap = new ConcurrentHashMap<>(); private final Map<String, SessionAccumulator> sessionsMap = new ConcurrentHashMap<>();
public void initIfAbsent(String requestKey, int totalExpected) { public void initIfAbsent(String requestKey, int totalExpected) {
@ -37,9 +40,21 @@ public class SessionResponseHolder {
public List<UserSessionInfo> awaitSessions(String requestKey, long timeout, TimeUnit unit) public List<UserSessionInfo> awaitSessions(String requestKey, long timeout, TimeUnit unit)
throws ExecutionException, InterruptedException, TimeoutException { throws ExecutionException, InterruptedException, TimeoutException {
SessionAccumulator acc = sessionsMap.get(requestKey); SessionAccumulator acc = sessionsMap.get(requestKey);
long startTime = System.nanoTime();
LOGGER.info("Начало ожидания сессий для requestKey = {}", requestKey);
while (acc == null && (System.nanoTime() - startTime) < unit.toNanos(timeout)) {
LOGGER.debug("Аккумулятор для requestKey = {} еще не найден, повторная проверка", requestKey);
TimeUnit.MILLISECONDS.sleep(50);
acc = sessionsMap.get(requestKey);
}
if (acc == null) { if (acc == null) {
LOGGER.error("Аккумулятор для requestKey: {} не был инициализирован за время ожидания.", requestKey);
return List.of(); return List.of();
} }
return acc.getFuture().get(timeout, unit); return acc.getFuture().get(timeout, unit);
} }