SUPPORT-8556: Fix
This commit is contained in:
parent
87f3e27640
commit
e2c5f44c68
7 changed files with 102 additions and 68 deletions
|
|
@ -0,0 +1,54 @@
|
|||
package ru.micord.ervu.av.kafka.config.input;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
*/
|
||||
@Configuration
|
||||
public class InputKafkaProducerConfig {
|
||||
@Value("${spring.kafka.in.producer.bootstrap.servers}")
|
||||
private List<String> bootstrapAddress;
|
||||
@Value("${spring.kafka.in.producer.security.protocol}")
|
||||
private String securityProtocol;
|
||||
@Value("${spring.kafka.in.producer.properties.sasl.jaas.config}")
|
||||
private String jaasConfig;
|
||||
@Value("${spring.kafka.in.producer.properties.sasl.mechanism}")
|
||||
private String saslMechanism;
|
||||
|
||||
@Bean
|
||||
@Qualifier("in-prod-factory")
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
|
||||
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
|
||||
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
|
||||
configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
|
||||
return new DefaultKafkaProducerFactory<>(configs);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Qualifier("in-template")
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
package ru.micord.ervu.av.kafka.config.input;
|
||||
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.config.TopicBuilder;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
*/
|
||||
@Configuration
|
||||
public class InputKafkaTopicConfig {
|
||||
@Value("${kafka.in.status.topic.name}")
|
||||
private String kafkaInStatusTopicName;
|
||||
|
||||
@Bean
|
||||
public NewTopic inStatusTopic() {
|
||||
return TopicBuilder.name(kafkaInStatusTopicName).build();
|
||||
}
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
|
|||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
|
@ -30,6 +31,7 @@ public class OutputKafkaProducerConfig {
|
|||
private String saslMechanism;
|
||||
|
||||
@Bean
|
||||
@Qualifier("out-prod-factory")
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
|
||||
|
|
@ -45,6 +47,7 @@ public class OutputKafkaProducerConfig {
|
|||
}
|
||||
|
||||
@Bean
|
||||
@Qualifier("out-template")
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,40 +0,0 @@
|
|||
package ru.micord.ervu.av.service;
|
||||
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.Field;
|
||||
import org.jooq.Record;
|
||||
import org.jooq.Table;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import static org.jooq.impl.DSL.field;
|
||||
import static org.jooq.impl.DSL.name;
|
||||
import static org.jooq.impl.DSL.table;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
*/
|
||||
@Service
|
||||
public class FileStatusService {
|
||||
public static final Table<Record> INTERACTION_LOG = table(name("public", "interaction_log"));
|
||||
public static final Field<Long> INTERACTION_LOG_ID = field(name("id"), Long.class);
|
||||
public static final Field<String> INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class);
|
||||
public static final Field<String> INTERACTION_LOG_STATUS = field(name("status"), String.class);
|
||||
|
||||
@Autowired
|
||||
private DSLContext dslContext;
|
||||
|
||||
public void setStatus(Long id, String status) {
|
||||
dslContext.update(INTERACTION_LOG)
|
||||
.set(INTERACTION_LOG_STATUS, status)
|
||||
.where(INTERACTION_LOG_ID.eq(id))
|
||||
.execute();
|
||||
}
|
||||
|
||||
public void setStatus(String fileId, String status) {
|
||||
dslContext.update(INTERACTION_LOG)
|
||||
.set(INTERACTION_LOG_STATUS, status)
|
||||
.where(INTERACTION_LOG_FILE_ID.eq(fileId))
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.NewTopic;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
|
|
@ -55,29 +56,31 @@ public class FileUploadService {
|
|||
@Value("${file.saving.path}")
|
||||
private String fileSavingPath;
|
||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||
private final KafkaTemplate<String, String> inKafkaTemplate;
|
||||
private final NewTopic outErrorTopic;
|
||||
private final NewTopic outSuccessTopic;
|
||||
private final NewTopic inStatusTopic;
|
||||
private final ReceiveScanReportRetryable receiveScanReportRetryable;
|
||||
private final FileStatusService fileStatusService;
|
||||
private final S3Service s3Service;
|
||||
|
||||
@Autowired
|
||||
public FileUploadService(KafkaTemplate<String, String> kafkaTemplate, NewTopic outErrorTopic,
|
||||
NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
|
||||
FileStatusService fileStatusService, S3Service s3Service) {
|
||||
public FileUploadService(@Qualifier("out-template") KafkaTemplate<String, String> kafkaTemplate,
|
||||
@Qualifier("in-template") KafkaTemplate<String, String> inKafkaTemplate,
|
||||
NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
|
||||
S3Service s3Service, NewTopic inStatusTopic) {
|
||||
this.kafkaTemplate = kafkaTemplate;
|
||||
this.outErrorTopic = outErrorTopic;
|
||||
this.outSuccessTopic = outSuccessTopic;
|
||||
this.receiveScanReportRetryable = receiveScanReportRetryable;
|
||||
this.fileStatusService = fileStatusService;
|
||||
this.s3Service = s3Service;
|
||||
this.inKafkaTemplate = inKafkaTemplate;
|
||||
this.inStatusTopic = inStatusTopic;
|
||||
}
|
||||
|
||||
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
|
||||
containerFactory = "inputKafkaListenerContainerFactory")
|
||||
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
|
||||
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
|
||||
String fileId = downloadRequest.fileInfo().getFileId();
|
||||
|
||||
try {
|
||||
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
|
||||
|
|
@ -95,19 +98,16 @@ public class FileUploadService {
|
|||
if (infected || !clean) {
|
||||
downloadRequest.fileInfo().setFileUrl(null);
|
||||
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02);
|
||||
sendMessage(outErrorTopic.name(), downloadRequest);
|
||||
|
||||
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status());
|
||||
sendMessage(outErrorTopic.name(), downloadRequest, kafkaTemplate);
|
||||
}
|
||||
else {
|
||||
s3Service.putFile(filePath, fileUrl.fileName());
|
||||
|
||||
downloadRequest.fileInfo().setFileUrl(fileUrl.fileName());
|
||||
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
|
||||
sendMessage(outSuccessTopic.name(), downloadRequest);
|
||||
|
||||
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status());
|
||||
sendMessage(outSuccessTopic.name(), downloadRequest, kafkaTemplate);
|
||||
}
|
||||
sendMessage(inStatusTopic.name(), downloadRequest, inKafkaTemplate);
|
||||
|
||||
deleteFile(downloadUrl);
|
||||
if (new File(filePath).delete()) {
|
||||
|
|
@ -135,11 +135,9 @@ public class FileUploadService {
|
|||
DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage,
|
||||
DownloadResponse.class
|
||||
);
|
||||
|
||||
String fileId = downloadResponse.fileInfo().fileId();
|
||||
FileStatus fileStatus = downloadResponse.fileInfo().fileStatus();
|
||||
if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) {
|
||||
fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status());
|
||||
sendMessage(inStatusTopic.name(), downloadResponse, inKafkaTemplate);
|
||||
}
|
||||
|
||||
acknowledgment.acknowledge();
|
||||
|
|
@ -274,8 +272,8 @@ public class FileUploadService {
|
|||
}
|
||||
}
|
||||
|
||||
private void sendMessage(@NonNull String topicName, Object object) {
|
||||
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName,
|
||||
private void sendMessage(@NonNull String topicName, Object object, KafkaTemplate<String, String> template) {
|
||||
CompletableFuture<SendResult<String, String>> future = template.send(topicName,
|
||||
new GsonBuilder().setPrettyPrinting().create().toJson(object)
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -20,6 +20,13 @@ spring.kafka.consumer.enable.auto.commit=false
|
|||
spring.kafka.consumer.group.id=file-to-upload-consumers
|
||||
# kafka in listeners
|
||||
spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE
|
||||
# kafka in producer (with possibility for default bean)
|
||||
#host1:port1, host2:port2
|
||||
spring.kafka.in.producer.bootstrap.servers=${IN_KAFKA_SERVERS}
|
||||
spring.kafka.in.producer.security.protocol=SASL_PLAINTEXT
|
||||
#login password to set
|
||||
spring.kafka.in.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${IN_KAFKA_USERNAME}" password="${IN_KAFKA_PASSWORD}";
|
||||
spring.kafka.in.producer.properties.sasl.mechanism=SCRAM-SHA-256
|
||||
#
|
||||
# kafka out producer (with possibility for default bean)
|
||||
#host1:port1, host2:port2
|
||||
|
|
@ -51,6 +58,7 @@ spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE
|
|||
#
|
||||
#
|
||||
kafka.in.topic.name=${IN_KAFKA_TOPIC_NAME}
|
||||
kafka.in.status.topic.name=${IN_KAFKA_TOPIC_NAME}
|
||||
kafka.out.error.topic.name=${OUT_KAFKA_ERROR_TOPIC_NAME}
|
||||
kafka.out.success.topic.name=${OUT_KAFKA_SUCCESS_TOPIC_NAME}
|
||||
kafka.out.response.topic.name=${OUT_KAFKA_RESPONSE_TOPIC_NAME}
|
||||
|
|
@ -67,11 +75,3 @@ s3.out.access_key=${S3_ACCESS_KEY}
|
|||
s3.out.secret_key=${S3_SECRET_KEY}
|
||||
s3.out.bucket_name=${S3_OUT_BUCKET_NAME}
|
||||
#
|
||||
# spring jooq dsl bean properties begin ->
|
||||
spring.jooq.sql-dialect=Postgres
|
||||
spring.datasource.driver-class-name=org.postgresql.Driver
|
||||
#host:port/database_name
|
||||
spring.datasource.url=jdbc:postgresql://${SPRING_DATASOURCE_URL}
|
||||
spring.datasource.username=${SPRING_DATASOURCE_USERNAME}
|
||||
spring.datasource.password=${SPRING_DATASOURCE_PASSWORD}
|
||||
# spring jooq dsl bean properties <- end
|
||||
|
|
|
|||
4
test.env
4
test.env
|
|
@ -4,6 +4,7 @@ IN_KAFKA_SERVERS=10.10.31.11:32609
|
|||
IN_KAFKA_USERNAME=user1
|
||||
IN_KAFKA_PASSWORD=Blfi9d2OFG
|
||||
IN_KAFKA_TOPIC_NAME=file-to-upload
|
||||
IN_KAFKA_STATUS_TOPIC_NAME=file-status
|
||||
OUT_KAFKA_SERVERS=10.10.31.11:32609
|
||||
OUT_KAFKA_USERNAME=user1
|
||||
OUT_KAFKA_PASSWORD=Blfi9d2OFG
|
||||
|
|
@ -20,6 +21,3 @@ S3_PORT=31900
|
|||
S3_ACCESS_KEY=Keyq0l8IRarEf5GmpvEO
|
||||
S3_SECRET_KEY=8A2epSoI6OjdHHwA5F6tHxeYRThv47GdGwcBrV7a
|
||||
S3_OUT_BUCKET_NAME=default-out-bucket
|
||||
SPRING_DATASOURCE_URL=10.10.31.119:5432/ervu-lkrp-ul
|
||||
SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul
|
||||
SPRING_DATASOURCE_PASSWORD=ervu-lkrp-ul
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue