SUPPORT-8934: handle empty extract case for existing recruit

This commit is contained in:
gulnaz 2025-02-20 11:51:02 +03:00
parent 94e2e6a390
commit 8a354074f9
11 changed files with 108 additions and 43 deletions

View file

@ -2,9 +2,11 @@ package ru.micord.ervu.controller;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.InputStreamResource;
@ -15,12 +17,13 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import rtl.pgs.ervu.proto.ExtractRegistry;
import rtl.pgs.ervu.proto.ResponseData;
import ru.micord.ervu.audit.constants.AuditConstants;
import ru.micord.ervu.audit.service.AuditService;
import ru.micord.ervu.dto.ExtractEmptyRequestDto;
import ru.micord.ervu.dto.ExtractRequestDto;
import ru.micord.ervu.kafka.dto.EmptyExtract;
import ru.micord.ervu.kafka.dto.Extract;
import ru.micord.ervu.kafka.dto.FullExtract;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
import ru.micord.ervu.security.esia.model.PersonModel;
import ru.micord.ervu.security.esia.service.PersonalDataService;
@ -45,6 +48,8 @@ public class ExtractController {
private String registryExtractRequestTopic;
@Value("${ervu.kafka.registry.extract.reply.topic}")
private String registryExtractReplyTopic;
@Value("${ervu.kafka.registry.extract.type.header}")
private String registryExtractTypeHeader;
public ExtractController(
PersonalDataService personalDataService,
@ -59,45 +64,43 @@ public class ExtractController {
public ResponseEntity<Resource> getExtract(HttpServletRequest servletRequest, @PathVariable String formatRegistry) {
UserIdsPair userIdsPair = SecurityUtil.getUserIdsPair();
String ervuId = userIdsPair.getErvuId();
ConsumerRecord<String, Bytes> record;
boolean isEmpty = true;
if (ervuId != null) {
ExtractRequestDto request = new ExtractRequestDto(ervuId, formatRegistry);
record = replyingKafkaService.sendMessageAndGetReply(
registryExtractRequestTopic, registryExtractReplyTopic, request);
isEmpty = Arrays.stream(record.headers().toArray())
.filter(header -> header.key().equals(registryExtractTypeHeader))
.findFirst()
.map(header -> Boolean.parseBoolean(new String(header.value(), StandardCharsets.UTF_8)))
.orElseThrow();
}
else {
String esiaUserId = userIdsPair.getEsiaUserId(); // esiaUserId is not null here
String esiaAccessToken = EsiaTokensStore.getAccessToken(esiaUserId);
PersonModel personModel = personalDataService.getPersonModel(esiaAccessToken);
ExtractEmptyRequestDto emptyRequest = new ExtractEmptyRequestDto(
personModel.getLastName(),
personModel.getFirstName(), personModel.getMiddleName(), personModel.getBirthDate(),
personModel.getSnils(), formatRegistry
);
record = replyingKafkaService.sendMessageAndGetReply(registryExtractEmptyRequestTopic,
registryExtractReplyTopic, emptyRequest);
}
byte[] bytes = record.value().get();
String fileName = null;
ByteString file;
int size = 0;
try {
if (ervuId != null) {
ExtractRequestDto request = new ExtractRequestDto(ervuId, formatRegistry);
byte[] reply = replyingKafkaService.sendMessageAndGetReply(registryExtractRequestTopic,
registryExtractReplyTopic, request).get();
ResponseData responseData = ResponseData.parseFrom(reply);
ExtractRegistry extractRegistry = responseData.getDataRegistryInformation()
.getExtractRegistry();
fileName = extractRegistry.getFileName();
file = extractRegistry.getFile();
}
else {
String esiaUserId = userIdsPair.getEsiaUserId(); // esiaUserid is not null here
String esiaAccessToken = EsiaTokensStore.getAccessToken(esiaUserId);
PersonModel personModel = personalDataService.getPersonModel(esiaAccessToken);
ExtractEmptyRequestDto emptyRequest = new ExtractEmptyRequestDto(
personModel.getLastName(),
personModel.getFirstName(), personModel.getMiddleName(), personModel.getBirthDate(),
personModel.getSnils(), formatRegistry
);
byte[] reply = replyingKafkaService.sendMessageAndGetReply(registryExtractEmptyRequestTopic,
registryExtractReplyTopic, emptyRequest).get();
rtl.pgs.ervu.proto.emptyrequest.ResponseData responseData = rtl.pgs.ervu.proto.emptyrequest.ResponseData
.parseFrom(reply);
rtl.pgs.ervu.proto.emptyrequest.ExtractRegistry extractRegistry = responseData.getDataRegistryInformation()
.getExtractRegistry();
fileName = extractRegistry.getFileName();
file = extractRegistry.getFile();
}
size = file.size();
Extract extract = ervuId == null || isEmpty ? new EmptyExtract(bytes) : new FullExtract(bytes);
fileName = extract.getFileName();
String encodedFilename = URLEncoder.encode(fileName, StandardCharsets.UTF_8);
ByteString file = extract.getFile();
InputStreamResource resource = new InputStreamResource(file.newInput());
size = file.size();
auditService.processDownloadEvent(servletRequest, size, fileName, formatRegistry,
AuditConstants.SUCCESS_STATUS
);

View file

@ -0,0 +1,19 @@
package ru.micord.ervu.kafka.dto;
import com.google.protobuf.InvalidProtocolBufferException;
import rtl.pgs.ervu.proto.emptyrequest.ExtractRegistry;
import rtl.pgs.ervu.proto.emptyrequest.ResponseData;
/**
* @author gulnaz
*/
public class EmptyExtract extends Extract {
public EmptyExtract(byte[] bytes) throws InvalidProtocolBufferException {
ResponseData responseData = ResponseData.parseFrom(bytes);
ExtractRegistry extractRegistry = responseData.getDataRegistryInformation()
.getExtractRegistry();
fileName = extractRegistry.getFileName();
file = extractRegistry.getFile();
}
}

View file

@ -0,0 +1,20 @@
package ru.micord.ervu.kafka.dto;
import com.google.protobuf.ByteString;
/**
* @author gulnaz
*/
public abstract class Extract {
protected String fileName;
protected ByteString file;
public String getFileName() {
return fileName;
}
public ByteString getFile() {
return file;
}
}

View file

@ -0,0 +1,19 @@
package ru.micord.ervu.kafka.dto;
import com.google.protobuf.InvalidProtocolBufferException;
import rtl.pgs.ervu.proto.ExtractRegistry;
import rtl.pgs.ervu.proto.ResponseData;
/**
* @author gulnaz
*/
public class FullExtract extends Extract {
public FullExtract(byte[] bytes) throws InvalidProtocolBufferException {
ResponseData responseData = ResponseData.parseFrom(bytes);
ExtractRegistry extractRegistry = responseData.getDataRegistryInformation()
.getExtractRegistry();
fileName = extractRegistry.getFileName();
file = extractRegistry.getFile();
}
}

View file

@ -1,8 +1,10 @@
package ru.micord.ervu.kafka.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface ReplyingKafkaService<T, V> {
V sendMessageAndGetReply(String requestTopic,
String replyTopic,
T requestMessage);
ConsumerRecord<String, V> sendMessageAndGetReply(String requestTopic,
String replyTopic,
T requestMessage);
}

View file

@ -20,14 +20,13 @@ public abstract class BaseReplyingKafkaService<T, V> implements ReplyingKafkaSer
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public V sendMessageAndGetReply(String requestTopic, String replyTopic, T requestMessage) {
public ConsumerRecord<String, V> sendMessageAndGetReply(String requestTopic, String replyTopic, T requestMessage) {
long startTime = System.currentTimeMillis();
RequestReplyFuture<String, T, V> replyFuture = getTemplate().sendAndReceive(
getProducerRecord(requestTopic, replyTopic, requestMessage));
try {
V result = Optional.ofNullable(replyFuture.get())
.map(ConsumerRecord::value)
ConsumerRecord<String, V> result = Optional.ofNullable(replyFuture.get())
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
LOGGER.info("Thread {} - KafkaSendMessageAndGetReply: {} ms",
Thread.currentThread().getId(), System.currentTimeMillis() - startTime);

View file

@ -401,7 +401,7 @@ public class EsiaAuthService {
Person person = copyToPerson(personModel);
String kafkaResponse = replyingKafkaService.sendMessageAndGetReply(requestTopic,
requestReplyTopic, objectMapper.writeValueAsString(person)
);
).value();
return objectMapper.readValue(kafkaResponse, Response.class);
}

View file

@ -40,7 +40,7 @@ public class SubpoenaService {
}
SubpoenaRequestDto subpoenaRequestDto = new SubpoenaRequestDto(ervuId);
byte[] reply = replyingKafkaService.sendMessageAndGetReply(recruitRequestTopic,
recruitReplyTopic, subpoenaRequestDto).get();
recruitReplyTopic, subpoenaRequestDto).value().get();
try {
SummonsResponseData responseData = SummonsResponseData.parseFrom(reply);

View file

@ -32,6 +32,7 @@ ERVU_KAFKA_RECRUIT_HEADER_CLASS=Request@urn://rostelekom.ru/RP-SummonsTR/1.0.5
ERVU_KAFKA_REGISTRY_EXTRACT_EMPTY_REQUEST_TOPIC=ervu.extract.empty.request
ERVU_KAFKA_REGISTRY_EXTRACT_REQUEST_TOPIC=ervu.extract.info.request
ERVU_KAFKA_REGISTRY_EXTRACT_REPLY_TOPIC=ervu.extract.info.response
ERVU_KAFKA_REGISTRY_EXTRACT_TYPE_HEADER=empty
ERVU_KAFKA_EXTRACT_HEADER_CLASS=request@urn://rostelekom.ru/ERVU-extractFromRegistryTR/1.0.3
ERVU_KAFKA_DOC_LOGIN_MODULE=org.apache.kafka.common.security.plain.PlainLoginModule

View file

@ -31,6 +31,7 @@ ERVU_KAFKA_RECRUIT_HEADER_CLASS=Request@urn://rostelekom.ru/RP-SummonsTR/1.0.5
ERVU_KAFKA_REGISTRY_EXTRACT_EMPTY_REQUEST_TOPIC=ervu.extract.empty.request
ERVU_KAFKA_REGISTRY_EXTRACT_REQUEST_TOPIC=ervu.extract.info.request
ERVU_KAFKA_REGISTRY_EXTRACT_REPLY_TOPIC=ervu.extract.info.response
ERVU_KAFKA_REGISTRY_EXTRACT_TYPE_HEADER=empty
ERVU_KAFKA_EXTRACT_HEADER_CLASS=request@urn://rostelekom.ru/ERVU-extractFromRegistryTR/1.0.3
ERVU_KAFKA_DOC_LOGIN_MODULE=org.apache.kafka.common.security.scram.ScramLoginModule
AUDIT_KAFKA_AUTHORIZATION_TOPIC=ervu.lkrp.auth.events

View file

@ -78,6 +78,7 @@
<property name="ervu.kafka.registry.extract.empty.request.topic" value="ervu.extract.empty.request"/>
<property name="ervu.kafka.registry.extract.request.topic" value="ervu.extract.info.request"/>
<property name="ervu.kafka.registry.extract.reply.topic" value="ervu.extract.info.response"/>
<property name="ervu.kafka.registry.extract.type.header" value="empty"/>
<property name="ervu.kafka.extract.header.class" value="request@urn://rostelekom.ru/ERVU-extractFromRegistryTR/1.0.3"/>
<property name="esia.token.clear.cron" value="0 0 */1 * * *"/>
<property name="audit.kafka.bootstrap.servers" value="localhost:9092"/>