SUPPORT-8413: refactor kafka config; rename dtos; use ervuId in requests

This commit is contained in:
gulnaz 2024-09-20 15:48:56 +03:00
parent 6b757c805c
commit f1a2bfdb47
15 changed files with 306 additions and 296 deletions

View file

@ -1,14 +1,18 @@
package ru.micord.ervu.controller;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import ru.micord.ervu.converter.SummonsResponseDataConverter;
import ru.micord.ervu.dto.PersonRequestDto;
import ru.micord.ervu.dto.SubpoenaDto;
import ru.micord.ervu.service.ReplyingKafkaService;
import ru.micord.ervu.dto.SubpoenaRequestDto;
import ru.micord.ervu.dto.SubpoenaResponseDto;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import proto.ervu.rp.summons.SummonsResponseData;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
import ru.micord.ervu.security.webbpm.jwt.service.JwtTokenService;
/**
* @author gulnaz
@ -16,11 +20,20 @@ import proto.ervu.rp.summons.SummonsResponseData;
@RestController
public class ErvuDataController {
private final ReplyingKafkaService replyingKafkaService;
private final JwtTokenService jwtTokenService;
private final ReplyingKafkaService<Object, Bytes> replyingKafkaService;
private final SummonsResponseDataConverter converter;
public ErvuDataController(ReplyingKafkaService replyingKafkaService,
@Value("${ervu.kafka.recruit.request.topic}")
private String recruitRequestTopic;
@Value("${ervu.kafka.recruit.reply.topic}")
private String recruitReplyTopic;
public ErvuDataController(
JwtTokenService jwtTokenService,
@Qualifier("subpoena") ReplyingKafkaService<Object, Bytes> replyingKafkaService,
SummonsResponseDataConverter converter) {
this.jwtTokenService = jwtTokenService;
this.replyingKafkaService = replyingKafkaService;
this.converter = converter;
}
@ -29,10 +42,10 @@ public class ErvuDataController {
value = "/get-data",
produces = MediaType.APPLICATION_JSON_VALUE
)
public SubpoenaDto getData() {
//TODO get id from token
PersonRequestDto personRequestDto = new PersonRequestDto("6fb62081-7345-4a9d-a1d0-68b46bd8faac");
byte[] reply = replyingKafkaService.sendRequestAndGetPersonReply(personRequestDto);
public SubpoenaResponseDto getData() {
SubpoenaRequestDto subpoenaRequestDto = new SubpoenaRequestDto(jwtTokenService.getErvuId());
byte[] reply = replyingKafkaService.sendMessageAndGetReply(recruitRequestTopic,
recruitReplyTopic, subpoenaRequestDto).get();
try {
SummonsResponseData responseData = SummonsResponseData.parseFrom(reply);

View file

@ -6,7 +6,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import ru.micord.ervu.dto.SubpoenaDto;
import ru.micord.ervu.dto.SubpoenaResponseDto;
import ru.micord.ervu.util.DateUtil;
import org.springframework.stereotype.Component;
import proto.ervu.rp.summons.MeasuresTemporary;
@ -27,8 +27,8 @@ public class SummonsResponseDataConverter {
private static final String STAY_ADDRESS_CODE = "_2";
private static final String ACTUAL_ADDRESS_CODE = "_3";
public SubpoenaDto convert(SummonsResponseData responseData) {
SubpoenaDto.Builder builder = new SubpoenaDto.Builder()
public SubpoenaResponseDto convert(SummonsResponseData responseData) {
SubpoenaResponseDto.Builder builder = new SubpoenaResponseDto.Builder()
.personName(responseData.getFirstName(), responseData.getMiddleName(),
responseData.getLastName()
)

View file

@ -3,5 +3,5 @@ package ru.micord.ervu.dto;
/**
* @author ya.kuznetsova
*/
public record PersonRequestDto(String ervuId) {
public record SubpoenaRequestDto(String ervuId) {
}

View file

@ -9,17 +9,17 @@ import static ru.micord.ervu.util.DateUtil.convertToString;
/**
* @author gulnaz
*/
public record SubpoenaDto(String personName, String birthDate, String docType, String docNumber,
String issueDate, String issueOrg, String issueIdCode,
String residenceAddress, String actualAddress, String stayAddress,
String seriesAndNumber, String visitDateTime,
String militaryCommissariatName, String militaryCommissariatAddress,
String militaryCommissar, String placementDateSummons, String reasonName,
String summonsStatusName, String deliveryTypeSummonsName,
String estimatedDateSummons, List<Restriction> restrictions,
String firstRestrictionName, String firstRestrictionStartDate,
int otherRestrictionCount, int recruitmentStatusCode,
String recruitmentStartDate, int daysToAppearance) {
public record SubpoenaResponseDto(String personName, String birthDate, String docType, String docNumber,
String issueDate, String issueOrg, String issueIdCode,
String residenceAddress, String actualAddress, String stayAddress,
String seriesAndNumber, String visitDateTime,
String militaryCommissariatName, String militaryCommissariatAddress,
String militaryCommissar, String placementDateSummons, String reasonName,
String summonsStatusName, String deliveryTypeSummonsName,
String estimatedDateSummons, List<Restriction> restrictions,
String firstRestrictionName, String firstRestrictionStartDate,
int otherRestrictionCount, int recruitmentStatusCode,
String recruitmentStartDate, int daysToAppearance) {
public static final class Builder {
String personName;
@ -188,8 +188,8 @@ public record SubpoenaDto(String personName, String birthDate, String docType, S
return this;
}
public SubpoenaDto build() {
return new SubpoenaDto(personName, birthDate, docType, docNumber, issueDate, issueOrg,
public SubpoenaResponseDto build() {
return new SubpoenaResponseDto(personName, birthDate, docType, docNumber, issueDate, issueOrg,
issueIdCode, residenceAddress, actualAddress, stayAddress, seriesAndNumber, visitDateTime,
militaryCommissariatName, militaryCommissariatAddress, militaryCommissar,
placementDateSummons, reasonName, summonsStatusName, deliveryTypeSummonsName,

View file

@ -1,89 +0,0 @@
package ru.micord.ervu.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
/**
* @author ya.kuznetsova
*/
@Configuration
@EnableKafka
public class ErvuKafkaConfig {
@Value("${kafka.ervu.send.url}")
private String kafkaUrl;
@Value("${kafka.ervu.security.protocol}")
private String securityProtocol;
@Value("${kafka.ervu.doc.login.module}")
private String loginModule;
@Value("${kafka.ervu.username}")
private String username;
@Value("${kafka.ervu.password}")
private String password;
@Value("${kafka.ervu.sasl.mechanism}")
private String saslMechanism;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
+ username + "\" password=\"" + password + "\";");
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Bytes> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
+ username + "\" password=\"" + password + "\";");
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View file

@ -1,9 +1,14 @@
package ru.micord.ervu.kafka;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -12,6 +17,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.time.Duration;
import java.util.HashMap;
@ -23,40 +29,82 @@ public class ReplyingKafkaConfig {
@Value("${ervu.kafka.bootstrap.servers}")
private String bootstrapServers;
@Value("${ervu.kafka.security.protocol}")
private String securityProtocol;
@Value("${ervu.kafka.doc.login.module}")
private String loginModule;
@Value("${ervu.kafka.username}")
private String username;
@Value("${ervu.kafka.password}")
private String password;
@Value("${ervu.kafka.sasl.mechanism}")
private String saslMechanism;
@Value("${ervu.kafka.reply.topic}")
private String replyTopic;
@Value("${ervu.kafka.recruit.reply.topic}")
private String recruitReplyTopic;
@Value("${ervu.kafka.subpoena.extract.reply.topic}")
private String subpoenaExtractReplyTopic;
@Value("${ervu.kafka.registry.extract.reply.topic}")
private String registryExtractReplyTopic;
@Value("${ervu.kafka.group.id}")
private String groupId;
@Value("${ervu.kafka.reply.timeout:30}")
private long replyTimeout;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Map<String, Object> configProps = producerConfig();
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
public ProducerFactory<String, Object> subpoenaProducerFactory() {
Map<String, Object> configProps = producerConfig();
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
private Map<String, Object> producerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
// configProps.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
// + username + "\" password=\"" + password + "\";");
// configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return configProps;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Map<String, Object> configProps = consumerConfig();
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConsumerFactory<String, Bytes> subpoenaConsumerFactory() {
Map<String, Object> configProps = consumerConfig();
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
private Map<String, Object> consumerConfig() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
// configProps.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
// + username + "\" password=\"" + password + "\";");
// configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return configProps;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
@ -64,22 +112,52 @@ public class ReplyingKafkaConfig {
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> subpoenaKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(subpoenaConsumerFactory());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(
replyTopic);
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<String, String> container =
kafkaListenerContainerFactory.createContainer(replyTopic);
container.getContainerProperties().setGroupId(groupId);
return container;
}
@Bean
public ConcurrentMessageListenerContainer<String, Bytes> subpoenaReplyContainer(
ConcurrentKafkaListenerContainerFactory<String, Bytes> personKafkaListenerContainerFactory) {
ConcurrentMessageListenerContainer<String, Bytes> container =
personKafkaListenerContainerFactory.createContainer(recruitReplyTopic,
subpoenaExtractReplyTopic, registryExtractReplyTopic);
container.getContainerProperties().setGroupId(groupId);
return container;
}
@Bean
@Qualifier("person")
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> container) {
ProducerFactory<String, String> producerFactory,
ConcurrentMessageListenerContainer<String, String> replyContainer) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(pf, container);
replyingKafkaTemplate.setCorrelationHeaderName("messageID");
new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
replyingKafkaTemplate.setCorrelationHeaderName("messageId");
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
return replyingKafkaTemplate;
}
@Bean
@Qualifier("subpoena")
public ReplyingKafkaTemplate<String, Object, Bytes> subpoenaReplyingKafkaTemplate(
ProducerFactory<String, Object> personProducerFactory,
ConcurrentMessageListenerContainer<String, Bytes> personReplyContainer) {
ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(personProducerFactory, personReplyContainer);
replyingKafkaTemplate.setCorrelationHeaderName("messageId");
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
return replyingKafkaTemplate;
}

View file

@ -1,8 +1,8 @@
package ru.micord.ervu.kafka.service;
public interface ReplyingKafkaService {
public interface ReplyingKafkaService<T, V> {
String sendMessageAndGetReply(String requestTopic,
String requestReplyTopic,
String requestMessage);
V sendMessageAndGetReply(String requestTopic,
String replyTopic,
T requestMessage);
}

View file

@ -0,0 +1,36 @@
package ru.micord.ervu.kafka.service.impl;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
/**
* @author gulnaz
*/
public abstract class BaseReplyingKafkaService<T, V> implements ReplyingKafkaService<T, V> {
@Override
public V sendMessageAndGetReply(String requestTopic, String replyTopic, T requestMessage) {
RequestReplyFuture<String, T, V> replyFuture = getTemplate().sendAndReceive(
getProducerRecord(requestTopic, replyTopic, requestMessage));
try {
return Optional.ofNullable(replyFuture.get())
.map(ConsumerRecord::value)
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to get kafka response", e);
}
}
protected abstract ReplyingKafkaTemplate<String, T, V> getTemplate();
protected abstract ProducerRecord<String, T> getProducerRecord(String requestTopic,
String replyTopic, T requestMessage);
}

View file

@ -0,0 +1,36 @@
package ru.micord.ervu.kafka.service.impl;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
/**
* @author Eduard Tihomirov
*/
@Service
@Qualifier("person")
public class PersonReplyingKafkaService extends BaseReplyingKafkaService<String, String> {
private final ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
public PersonReplyingKafkaService(@Qualifier("person")
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate) {
this.replyingKafkaTemplate = replyingKafkaTemplate;
}
@Override
protected ReplyingKafkaTemplate<String, String, String> getTemplate() {
return replyingKafkaTemplate;
}
@Override
protected ProducerRecord<String, String> getProducerRecord(String requestTopic, String replyTopic,
String requestMessage) {
ProducerRecord<String, String> record = new ProducerRecord<>(requestTopic, requestMessage);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
return record;
}
}

View file

@ -1,42 +0,0 @@
package ru.micord.ervu.kafka.service.impl;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
/**
* @author Eduard Tihomirov
*/
@Service
public class ReplyingKafkaServiceImpl implements ReplyingKafkaService {
@Autowired
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
public String sendMessageAndGetReply(String requestTopic,
String requestReplyTopic,
String requestMessage) {
ProducerRecord<String, String> record = new ProducerRecord<>(requestTopic, requestMessage);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(
record);
try {
Optional<ConsumerRecord<String, String>> consumerRecord = Optional.ofNullable(replyFuture.get());
return consumerRecord.map(ConsumerRecord::value)
.orElseThrow(() -> new RuntimeException("Kafka return result is null."));
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to get kafka response.", e);
}
}
}

View file

@ -0,0 +1,51 @@
package ru.micord.ervu.kafka.service.impl;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
import ru.micord.ervu.dto.SubpoenaRequestDto;
/**
* @author gulnaz
*/
@Service
@Qualifier("subpoena")
public class SubpoenaReplyingKafkaService extends BaseReplyingKafkaService<Object, Bytes> {
private final ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate;
@Value("${ervu.kafka.recruit.header.class}")
private String recruitHeaderClass;
@Value("${ervu.kafka.extract.header.class}")
private String extractHeaderClass;
public SubpoenaReplyingKafkaService(@Qualifier("subpoena")
ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate) {
this.replyingKafkaTemplate = replyingKafkaTemplate;
}
@Override
protected ReplyingKafkaTemplate<String, Object, Bytes> getTemplate() {
return replyingKafkaTemplate;
}
@Override
protected ProducerRecord<String, Object> getProducerRecord(String requestTopic, String replyTopic,
Object requestMessage) {
ProducerRecord<String, Object> record = new ProducerRecord<>(requestTopic,
UUID.randomUUID().toString(), requestMessage);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
String headerClass = requestMessage instanceof SubpoenaRequestDto
? recruitHeaderClass : extractHeaderClass;
record.headers().add("class", headerClass.getBytes(StandardCharsets.UTF_8));
return record;
}
}

View file

@ -20,6 +20,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import ru.micord.ervu.kafka.model.Document;
import ru.micord.ervu.kafka.model.Person;
@ -57,7 +58,8 @@ public class EsiaAuthService {
private JwtTokenService jwtTokenService;
@Autowired
private ReplyingKafkaService replyingKafkaService;
@Qualifier("person")
private ReplyingKafkaService<String, String> replyingKafkaService;
@Autowired
private PersonalDataService personalDataService;

View file

@ -3,6 +3,7 @@ package ru.micord.ervu.security.webbpm.jwt.service;
import java.lang.invoke.MethodHandles;
import java.util.Base64;
import java.util.Date;
import java.util.Optional;
import javax.crypto.SecretKey;
import io.jsonwebtoken.Claims;
@ -12,7 +13,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import ru.micord.ervu.security.webbpm.jwt.JwtAuthentication;
import ru.micord.ervu.security.webbpm.jwt.model.Token;
import ru.cg.webbpm.modules.resources.api.ResourceMetadataUtils;
@ -25,7 +29,7 @@ public class JwtTokenService {
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Value("${webbpm.security.token.issuer}")
@Value("${webbpm.security.token.issuer:}")
private final String tokenIssuerName =
ResourceMetadataUtils.PROJECT_GROUP_ID + "." + ResourceMetadataUtils.PROJECT_ARTIFACT_ID;
private final SecretKey SIGNING_KEY;
@ -72,4 +76,12 @@ public class JwtTokenService {
return new Token(claims.getSubject(), claims.getIssuer(), claims.getExpiration(), token);
}
public String getErvuId() {
Optional<Authentication> authentication = Optional.ofNullable(
SecurityContextHolder.getContext().getAuthentication());
String jwtToken = authentication.map(auth -> ((JwtAuthentication) auth).getToken())
.orElseThrow(() -> new RuntimeException("Failed to get auth data. User unauthorized."));
return getToken(jwtToken).getUserAccountId().split(":")[1];
}
}

View file

@ -1,109 +0,0 @@
package ru.micord.ervu.service;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.KafkaHeaders;
import ru.micord.ervu.dto.PersonRequestDto;
import ru.micord.ervu.dto.ExtractRequestDto;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @author ya.kuznetsova
*/
@Service
public class ReplyingKafkaService {
// private final ReplyingKafkaTemplate<String, Object, Bytes> replyingKafkaTemplate;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ConsumerFactory<String, Bytes> consumerFactory;
@Value("${kafka.ervu.recruit.request.topic}")
private String recruitRequestTopic;
@Value("${kafka.ervu.recruit.reply.topic}")
private String recruitReplyTopic;
@Value("${kafka.ervu.recruit.header.class}")
private String recruitHeaderClass;
@Value("${kafka.ervu.subpoena.extract.request.topic}")
private String subpoenaExtractRequestTopic;
@Value("${kafka.ervu.subpoena.extract.reply.topic}")
private String subpoenaExtractReplyTopic;
@Value("${kafka.ervu.registry.extract.request.topic}")
private String registryExtractRequestTopic;
@Value("${kafka.ervu.registry.extract.reply.topic}")
private String registryExtractReplyTopic;
@Value("${kafka.ervu.extract.header.class}")
private String extractHeaderClass;
public ReplyingKafkaService(KafkaTemplate<String, Object> kafkaTemplate,
ConsumerFactory<String, Bytes> consumerFactory) {
this.kafkaTemplate = kafkaTemplate;
this.consumerFactory = consumerFactory;
}
public byte[] sendRequestAndGetPersonReply(PersonRequestDto request) {
return sendRequestAndGetPersonReply(request, recruitRequestTopic, recruitReplyTopic, recruitHeaderClass);
}
public byte[] sendRequestAndGetExtractReply(ExtractRequestDto request) {
return sendRequestAndGetPersonReply(request,
request.formatExtractRegistry().equals("1")
? subpoenaExtractRequestTopic
: registryExtractRequestTopic,
request.formatExtractRegistry().equals("1")
? subpoenaExtractReplyTopic
: registryExtractReplyTopic,
extractHeaderClass);
}
private byte[] sendRequestAndGetPersonReply(Object request, String requestTopic, String replyTopic,
String headerClass) {
ProducerRecord<String, Object> record = new ProducerRecord<>(requestTopic,
UUID.randomUUID().toString(), request);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
record.headers().add("class", headerClass.getBytes(StandardCharsets.UTF_8));
//TODO fix No pending reply error (replyingKafkaTemplate not working properly with mock service)
// RequestReplyFuture<String, Object, Bytes> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
//
// try {
// return Optional.ofNullable(replyFuture.get())
// .map(t -> t.value().get())
// .orElseThrow(() -> new RuntimeException("Kafka return result is null"));
// }
// catch (InterruptedException | ExecutionException e) {
// throw new RuntimeException("Failed to get kafka response", e);
// }
record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
kafkaTemplate.send(record);
AtomicReference<Bytes> bytesRef = new AtomicReference<>(null);
try (Consumer<String, Bytes> consumer =
consumerFactory.createConsumer("1", null)) {
consumer.subscribe(Collections.singletonList(replyTopic));
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofSeconds(10));
consumerRecords.forEach(consumerRecord -> bytesRef.set(consumerRecord.value()));
consumer.commitSync();
}
return Optional.ofNullable(bytesRef.get())
.map(Bytes::get)
.orElseThrow(() -> new RuntimeException("Kafka return result is null"));
}
}

View file

@ -1,11 +1,15 @@
package ru.micord.ervu.service.rpc;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import ru.micord.ervu.dto.FileData;
import ru.micord.ervu.dto.ExtractRequestDto;
import ru.micord.ervu.service.ReplyingKafkaService;
import rtl.pgs.ervu.proto.ExtractRegistry;
import rtl.pgs.ervu.proto.ResponseData;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
import ru.micord.ervu.security.webbpm.jwt.service.JwtTokenService;
import ru.cg.webbpm.modules.standard_annotations.validation.NotNull;
import ru.cg.webbpm.modules.webkit.annotations.RpcCall;
@ -18,21 +22,39 @@ import ru.cg.webbpm.modules.webkit.beans.Behavior;
@RpcService
public class ExtractRpcService extends Behavior {
private final JwtTokenService jwtTokenService;
private final ReplyingKafkaService<Object, Bytes> replyingKafkaService;
@Value("${ervu.kafka.subpoena.extract.request.topic}")
private String subpoenaExtractRequestTopic;
@Value("${ervu.kafka.subpoena.extract.reply.topic}")
private String subpoenaExtractReplyTopic;
@Value("${ervu.kafka.registry.extract.request.topic}")
private String registryExtractRequestTopic;
@Value("${ervu.kafka.registry.extract.reply.topic}")
private String registryExtractReplyTopic;
@NotNull()
public String formatExtractRegistry;
private final ReplyingKafkaService replyingKafkaService;
public ExtractRpcService(ReplyingKafkaService replyingKafkaService) {
public ExtractRpcService(
JwtTokenService jwtTokenService,
@Qualifier("subpoena") ReplyingKafkaService<Object, Bytes> replyingKafkaService) {
this.jwtTokenService = jwtTokenService;
this.replyingKafkaService = replyingKafkaService;
}
@RpcCall
public FileData getExtract() {
//TODO get id from token
ExtractRequestDto request = new ExtractRequestDto("6fb62081-7345-4a9d-a1d0-68b46bd8faac",
ExtractRequestDto request = new ExtractRequestDto(jwtTokenService.getErvuId(),
formatExtractRegistry);
byte[] reply = replyingKafkaService.sendRequestAndGetExtractReply(request);
String requestTopic = request.formatExtractRegistry().equals("1")
? subpoenaExtractRequestTopic
: registryExtractRequestTopic;
String replyTopic = request.formatExtractRegistry().equals("1")
? subpoenaExtractReplyTopic
: registryExtractReplyTopic;
byte[] reply = replyingKafkaService.sendMessageAndGetReply(requestTopic, replyTopic, request).get();
try {
ResponseData responseData = ResponseData.parseFrom(reply);