SUPPORT-8474: Add kafka

This commit is contained in:
Eduard Tihomirov 2024-09-08 12:34:02 +03:00
parent b89b4a70c3
commit 0ecfa00e8a
11 changed files with 385 additions and 5 deletions

View file

@ -216,6 +216,10 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>

View file

@ -0,0 +1,86 @@
package ru.micord.ervu.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class ReplyingKafkaConfig {
@Value("${ervu-kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${ervu-kafka.reply-topic}")
private String replyTopic;
@Value("${ervu-kafka.group-id}")
private String groupId;
@Value("${ervu-kafka.reply-connection-timeout:30}")
private long connectionTimeout;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(
replyTopic);
container.getContainerProperties().setGroupId(groupId);
return container;
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> container) {
ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(pf, container);
replyingKafkaTemplate.setCorrelationHeaderName("messageID");
replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(connectionTimeout));
return replyingKafkaTemplate;
}
}

View file

@ -0,0 +1,43 @@
package ru.micord.ervu.kafka.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author Eduard Tihomirov
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Document implements Serializable {
private static final long serialVersionUID = 1L;
private String series;
private String number;
private String issueDate;
public String getSeries() {
return series;
}
public void setSeries(String series) {
this.series = series;
}
public String getNumber() {
return number;
}
public void setNumber(String number) {
this.number = number;
}
public String getIssueDate() {
return issueDate;
}
public void setIssueDate(String issueDate) {
this.issueDate = issueDate;
}
}

View file

@ -0,0 +1,33 @@
package ru.micord.ervu.kafka.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author Eduard Tihomirov
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ErrorData implements Serializable {
private static final long serialVersionUID = 1L;
private String code;
private String name;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View file

@ -0,0 +1,73 @@
package ru.micord.ervu.kafka.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author Eduard Tihomirov
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
private String lastName;
private String firstName;
private String middleName;
private String birthDate;
private String snils;
private Document document;
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getMiddleName() {
return middleName;
}
public void setMiddleName(String middleName) {
this.middleName = middleName;
}
public String getBirthDate() {
return birthDate;
}
public void setBirthDate(String birthDate) {
this.birthDate = birthDate;
}
public String getSnils() {
return snils;
}
public void setSnils(String snils) {
this.snils = snils;
}
public Document getDocument() {
return document;
}
public void setDocument(Document document) {
this.document = document;
}
}

View file

@ -0,0 +1,33 @@
package ru.micord.ervu.kafka.model;
import java.io.Serializable;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author Eduard Tihomirov
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Response implements Serializable {
private static final long serialVersionUID = 1L;
private String ervuId;
private ErrorData errorData;
public String getErvuId() {
return ervuId;
}
public void setErvuId(String ervuId) {
this.ervuId = ervuId;
}
public ErrorData getErrorData() {
return errorData;
}
public void setErrorData(ErrorData errorData) {
this.errorData = errorData;
}
}

View file

@ -0,0 +1,8 @@
package ru.micord.ervu.kafka.service;
public interface ReplyingKafkaService {
String sendMessageAndGetReply(String requestTopic,
String requestReplyTopic,
String requestMessage);
}

View file

@ -0,0 +1,42 @@
package ru.micord.ervu.kafka.service.impl;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
/**
* @author Eduard Tihomirov
*/
@Service
public class ReplyingKafkaServiceImpl implements ReplyingKafkaService {
@Autowired
private ReplyingKafkaTemplate<String, String, String> replyingKafkaTemplate;
public String sendMessageAndGetReply(String requestTopic,
String requestReplyTopic,
String requestMessage) {
ProducerRecord<String, String> record = new ProducerRecord<>(requestTopic, requestMessage);
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
RequestReplyFuture<String, String, String> replyFuture = replyingKafkaTemplate.sendAndReceive(
record);
try {
Optional<ConsumerRecord<String, String>> consumerRecord = Optional.ofNullable(replyFuture.get());
return consumerRecord.map(ConsumerRecord::value)
.orElseThrow(() -> new RuntimeException("Kafka return result is null."));
}
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to get kafka response.", e);
}
}
}

View file

@ -21,6 +21,11 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import ru.micord.ervu.kafka.model.Document;
import ru.micord.ervu.kafka.model.Person;
import ru.micord.ervu.kafka.model.Response;
import ru.micord.ervu.kafka.service.ReplyingKafkaService;
import ru.micord.ervu.security.esia.config.EsiaConfig;
import ru.micord.ervu.security.esia.model.FormUrlencoded;
import ru.micord.ervu.security.esia.model.EsiaAccessToken;
@ -55,6 +60,15 @@ public class EsiaAuthService {
@Autowired
private JwtTokenService jwtTokenService;
@Autowired
private ReplyingKafkaService replyingKafkaService;
@Value("${ervu-kafka.reply-topic}")
private String requestReplyTopic;
@Value("${ervu-kafka.request-topic}")
private String requestTopic;
public String generateAuthCodeUrl() {
try {
String clientId = esiaConfig.getClientId();
@ -191,7 +205,8 @@ public class EsiaAuthService {
accessToken.substring(accessToken.indexOf('.') + 1, accessToken.lastIndexOf('.')));
String decodedString = new String(decodedBytes);
EsiaAccessToken esiaAccessToken = objectMapper.readValue(decodedString, EsiaAccessToken.class);
Token token = jwtTokenService.createAccessToken(esiaAccessToken.getSbj_id(), tokenResponse.getExpires_in());
String ervuId = getErvuId(accessToken);
Token token = jwtTokenService.createAccessToken(esiaAccessToken.getSbj_id(), tokenResponse.getExpires_in(), ervuId);
Cookie authToken = new Cookie("auth_token", token.getValue());
authToken.setPath(path + "/");
authToken.setHttpOnly(true);
@ -282,7 +297,8 @@ public class EsiaAuthService {
accessToken.substring(accessToken.indexOf('.') + 1, accessToken.lastIndexOf('.')));
String decodedString = new String(decodedBytes);
EsiaAccessToken esiaAccessToken = objectMapper.readValue(decodedString, EsiaAccessToken.class);
Token token = jwtTokenService.createAccessToken(esiaAccessToken.getSbj_id(), tokenResponse.getExpires_in());
String ervuId = getErvuId(accessToken);
Token token = jwtTokenService.createAccessToken(esiaAccessToken.getSbj_id(), tokenResponse.getExpires_in(), ervuId);
Cookie authToken = new Cookie("auth_token", token.getValue());
authToken.setPath(path + "/");
authToken.setHttpOnly(true);
@ -359,4 +375,40 @@ public class EsiaAuthService {
throw new RuntimeException(e);
}
}
public String getErvuId(String accessToken) {
try {
PersonModel personModel = personalDataService.getPersonModel(accessToken);
Person person = copyToPerson(personModel);
String kafkaResponse = replyingKafkaService.sendMessageAndGetReply(requestTopic,
requestReplyTopic, objectMapper.writeValueAsString(person)
);
Response response = objectMapper.readValue(kafkaResponse, Response.class);
if (response.getErrorData() != null) {
throw new RuntimeException(
"Error code = " + response.getErrorData().getCode() + ", error name = "
+ response.getErrorData().getName());
}
else {
return response.getErvuId();
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private Person copyToPerson(PersonModel personModel) {
Person person = new Person();
person.setBirthDate(personModel.getBirthDate().toString());
person.setFirstName(personModel.getFirstName());
person.setLastName(personModel.getLastName());
person.setSnils(personModel.getSnils());
person.setMiddleName(personModel.getMiddleName());
Document document = new Document();
document.setNumber(personModel.getPassportModel().getNumber());
document.setSeries(personModel.getPassportModel().getSeries());
document.setIssueDate(personModel.getPassportModel().getIssueDate().toString());
return person;
}
}

View file

@ -38,17 +38,17 @@ public class JwtTokenService {
this.SIGNING_KEY = Keys.hmacShaKeyFor(encodedKey);
}
public Token createAccessToken(String userAccountId, Long expiresIn) {
public Token createAccessToken(String userAccountId, Long expiresIn, String ervuId) {
Date expirationDate = new Date(System.currentTimeMillis() + 1000L * expiresIn);
String value = Jwts.builder()
.setSubject(userAccountId)
.setSubject(userAccountId + ":" + ervuId)
.setIssuer(tokenIssuerName)
.setIssuedAt(new Date(System.currentTimeMillis()))
.setExpiration(expirationDate)
.signWith(SIGNING_KEY)
.compact();
return new Token(userAccountId, tokenIssuerName, expirationDate, value);
return new Token(userAccountId + ":" + ervuId, tokenIssuerName, expirationDate, value);
}
public boolean isValid(Token token) {

View file

@ -16,6 +16,7 @@
</scm>
<properties>
<spring-security-kerberos.version>1.0.1.RELEASE</spring-security-kerberos.version>
<spring-kafka.version>2.6.13</spring-kafka.version>
<org.bouncycastle.version>1.60</org.bouncycastle.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<enable.version.in.url>false</enable.version.in.url>
@ -350,6 +351,11 @@
<artifactId>slf4j-simple</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>