diff --git a/backend/pom.xml b/backend/pom.xml
index d1592c8b..77e756f5 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -97,14 +97,6 @@
org.xerial.snappy
snappy-java
-
- org.apache.kafka
- kafka-clients
-
-
- org.xerial.snappy
- snappy-java
-
ru.cg.webbpm.modules
inject
@@ -206,9 +198,24 @@
org.apache.httpcomponents
httpmime
+
+ com.google.protobuf
+ protobuf-java
+
+
+ com.google.protobuf
+ protobuf-java-util
+
${project.parent.artifactId}
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.6.2
+
+
maven-compiler-plugin
@@ -237,6 +244,22 @@
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+
+
+
+ compile
+
+
+
+
+ com.google.protobuf:protoc:4.27.3:exe:${os.detected.classifier}
+ ${project.parent.basedir}/backend/src/main/resources
+ ${project.parent.basedir}/backend/target/generated-sources/java/protobuf
+
+
diff --git a/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java b/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java
index b7952719..40be9cca 100644
--- a/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java
+++ b/backend/src/main/java/ervu/service/fileupload/EmployeeInfoFileUploadService.java
@@ -87,7 +87,7 @@ public class EmployeeInfoFileUploadService {
"messages/common_errors_messages");
private final WebDavClient webDavClient;
private final EmployeeInfoKafkaMessageService employeeInfoKafkaMessageService;
- private final ReplyingKafkaService replyingKafkaService;
+ private final ReplyingKafkaService replyingKafkaService;
private final KafkaTemplate kafkaTemplate;
private final InteractionService interactionService;
private final UlDataService ulDataService;
@@ -110,7 +110,7 @@ public class EmployeeInfoFileUploadService {
public EmployeeInfoFileUploadService(
WebDavClient webDavClient,
EmployeeInfoKafkaMessageService employeeInfoKafkaMessageService,
- ReplyingKafkaService replyingKafkaService,
+ ReplyingKafkaService replyingKafkaService,
InteractionService interactionService,
UlDataService ulDataService,
AuditService auditService,
diff --git a/backend/src/main/java/ru/micord/ervu/controller/ValidationFileController.java b/backend/src/main/java/ru/micord/ervu/controller/ValidationFileController.java
new file mode 100644
index 00000000..6384ea29
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/controller/ValidationFileController.java
@@ -0,0 +1,88 @@
+package ru.micord.ervu.controller;
+
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.kafka.shaded.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.InputStreamResource;
+import org.springframework.core.io.Resource;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.StringUtils;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import ru.micord.ervu.audit.constants.AuditConstants;
+import ru.micord.ervu.audit.service.AuditService;
+import ru.micord.ervu.model.ValidateExportResponse;
+import ru.micord.ervu.security.webbpm.jwt.UserIdsPair;
+import ru.micord.ervu.security.webbpm.jwt.util.SecurityUtil;
+import ru.micord.ervu.service.ValidationFileService;
+
+@RestController
+@RequestMapping("/validate")
+public class ValidationFileController {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ValidationFileController.class);
+ private final AuditService auditService;
+ private final ValidationFileService validationFileService;
+
+ public ValidationFileController(AuditService auditService,
+ ValidationFileService validationFileService) {
+ this.auditService = auditService;
+ this.validationFileService = validationFileService;
+ }
+
+ @GetMapping("/export/{fileId}")
+ public ResponseEntity exportFile(HttpServletRequest servletRequest,
+ @PathVariable String fileId) {
+ int size = 0;
+ String fileName = null;
+ String status = null;
+
+ try {
+ if (!StringUtils.hasText(fileId)) {
+ return ResponseEntity.notFound().build();
+ }
+
+ UserIdsPair userIdsPair = SecurityUtil.getUserIdsPair();
+ String ervuId = userIdsPair.getErvuId();
+ ValidateExportResponse validateExportResponse = validationFileService.exportFile(ervuId,
+ fileId
+ );
+
+ if (!validateExportResponse.hasFile()) {
+ LOGGER.error("Response does not contain file content for fileId: {}, user: {}", fileId, ervuId);
+ status = AuditConstants.FAILURE_STATUS_TYPE;
+ return ResponseEntity.noContent().build();
+ }
+
+ ByteString file = validateExportResponse.getFile();
+ size = file.size();
+ fileName = validateExportResponse.getFileName();
+ String encodedFilename = URLEncoder.encode(fileName, StandardCharsets.UTF_8);
+ InputStreamResource resource = new InputStreamResource(file.newInput());
+
+ status = AuditConstants.SUCCESS_STATUS_TYPE;
+ return ResponseEntity.ok()
+ .header(HttpHeaders.CONTENT_DISPOSITION,
+ "attachment; filename*=UTF-8''" + encodedFilename
+ )
+ .contentType(MediaType.APPLICATION_OCTET_STREAM)
+ .body(resource);
+ }
+ catch (Exception e) {
+ if (status == null) {
+ status = AuditConstants.FAILURE_STATUS_TYPE;
+ }
+ throw e;
+ }
+ finally {
+ auditService.processDownloadEvent(servletRequest, size, fileName, 2, status, null);
+ }
+ }
+}
diff --git a/backend/src/main/java/ru/micord/ervu/exception/ExportException.java b/backend/src/main/java/ru/micord/ervu/exception/ExportException.java
new file mode 100644
index 00000000..5603f06f
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/exception/ExportException.java
@@ -0,0 +1,14 @@
+package ru.micord.ervu.exception;
+
+/**
+ * @author Adel Kalimullin
+ */
+public class ExportException extends RuntimeException {
+ public ExportException(String message) {
+ super(message);
+ }
+
+ public ExportException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
diff --git a/backend/src/main/java/ru/micord/ervu/journal/JournalDto.java b/backend/src/main/java/ru/micord/ervu/journal/JournalDto.java
index 6adc2e2f..cbc23a70 100644
--- a/backend/src/main/java/ru/micord/ervu/journal/JournalDto.java
+++ b/backend/src/main/java/ru/micord/ervu/journal/JournalDto.java
@@ -2,14 +2,18 @@ package ru.micord.ervu.journal;
public class JournalDto {
+ private Integer documentNumber;
private String fileId;
private String departureDateTime;
private String fileName;
private Integer filePatternCode;
private String senderFio;
private String status;
- public Integer filesSentCount;
- public Integer acceptedFilesCount;
+ private Integer rowsCount;
+ private Integer rowsSuccess;
+ private Integer rowsError;
+ private Boolean hasFailedRows;
+
public String getDepartureDateTime() {
return departureDateTime;
@@ -65,21 +69,48 @@ public class JournalDto {
return this;
}
- public Integer getFilesSentCount() {
- return filesSentCount;
+ public Integer getRowsCount() {
+ return rowsCount;
}
- public JournalDto setFilesSentCount(Integer filesSentCount) {
- this.filesSentCount = filesSentCount;
+ public JournalDto setRowsCount(Integer rowsCount) {
+ this.rowsCount = rowsCount;
return this;
}
- public Integer getAcceptedFilesCount() {
- return acceptedFilesCount;
+ public Integer getRowsSuccess() {
+ return rowsSuccess;
}
- public JournalDto setAcceptedFilesCount(Integer acceptedFilesCount) {
- this.acceptedFilesCount = acceptedFilesCount;
+ public JournalDto setRowsSuccess(Integer rowsSuccess) {
+ this.rowsSuccess = rowsSuccess;
+ return this;
+ }
+
+ public Integer getRowsError() {
+ return rowsError;
+ }
+
+ public JournalDto setRowsError(Integer rowsError) {
+ this.rowsError = rowsError;
+ return this;
+ }
+
+ public Integer getDocumentNumber() {
+ return documentNumber;
+ }
+
+ public JournalDto setDocumentNumber(Integer documentNumber) {
+ this.documentNumber = documentNumber;
+ return this;
+ }
+
+ public Boolean isHasFailedRows() {
+ return hasFailedRows;
+ }
+
+ public JournalDto setHasFailedRows(Boolean hasFailedRows) {
+ this.hasFailedRows = hasFailedRows;
return this;
}
}
diff --git a/backend/src/main/java/ru/micord/ervu/journal/JournalFileInfo.java b/backend/src/main/java/ru/micord/ervu/journal/JournalFileInfo.java
index 6dc27a6e..7ffb5251 100644
--- a/backend/src/main/java/ru/micord/ervu/journal/JournalFileInfo.java
+++ b/backend/src/main/java/ru/micord/ervu/journal/JournalFileInfo.java
@@ -14,6 +14,7 @@ public class JournalFileInfo {
private SenderInfo senderInfo;
private Integer rowsCount; //Общее количество записей отправленных в файле
private Integer rowsSuccess; //Количество записей принятых в файле
+ private Integer rowsError; //Количество записей непринятых в файле
public List getPackFiles() {
return packFiles;
@@ -51,6 +52,15 @@ public class JournalFileInfo {
return this;
}
+ public Integer getRowsError() {
+ return rowsError;
+ }
+
+ public JournalFileInfo setRowsError(Integer rowsError) {
+ this.rowsError = rowsError;
+ return this;
+ }
+
@JsonIgnoreProperties(ignoreUnknown = true)
public static class JournalFileDetails {
private String fileId; //ИД файла полученный при создании записи о файле в реестр организаций (в ЕРВУ)
diff --git a/backend/src/main/java/ru/micord/ervu/journal/mapper/JournalDtoMapper.java b/backend/src/main/java/ru/micord/ervu/journal/mapper/JournalDtoMapper.java
index cbc144d9..332e1e28 100644
--- a/backend/src/main/java/ru/micord/ervu/journal/mapper/JournalDtoMapper.java
+++ b/backend/src/main/java/ru/micord/ervu/journal/mapper/JournalDtoMapper.java
@@ -24,8 +24,9 @@ public class JournalDtoMapper {
)
)
.setStatus(journalFileDetails.getFileStatus().getStatus())
- .setFilesSentCount(journalFileInfo.getRowsCount())
- .setAcceptedFilesCount(journalFileInfo.getRowsSuccess());
+ .setRowsCount(journalFileInfo.getRowsCount())
+ .setRowsSuccess(journalFileInfo.getRowsSuccess())
+ .setRowsError(journalFileInfo.getRowsError());
}
public static JournalDto mapToJournalDto(InteractionLogRecord record) {
@@ -35,8 +36,9 @@ public class JournalDtoMapper {
.setFilePatternCode(Integer.valueOf(record.getForm()))
.setSenderFio(record.getSender())
.setStatus(record.getStatus())
- .setFilesSentCount(record.getRecordsSent())
- .setAcceptedFilesCount(record.getRecordsAccepted())
+ .setRowsCount(0)
+ .setRowsSuccess(0)
+ .setRowsError(0)
.setFileId(record.getFileId());
}
}
diff --git a/backend/src/main/java/ru/micord/ervu/kafka/ReplyingKafkaConfig.java b/backend/src/main/java/ru/micord/ervu/kafka/ReplyingKafkaConfig.java
index 3f6df834..0c7b28a9 100644
--- a/backend/src/main/java/ru/micord/ervu/kafka/ReplyingKafkaConfig.java
+++ b/backend/src/main/java/ru/micord/ervu/kafka/ReplyingKafkaConfig.java
@@ -4,8 +4,10 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -54,63 +56,102 @@ public class ReplyingKafkaConfig {
private String saslMechanism;
@Value("${av.kafka.download.response}")
private String avReplyTopic;
+ @Value("${ervu.kafka.validate.export.reply.topic}")
+ private String validateReplyTopic;
- @Bean("ervuProducerFactory")
+ @Bean
public ProducerFactory producerFactory() {
- Map configProps = new HashMap<>();
- configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ Map configProps = commonProducerConfig();
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
- configProps.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
- + username + "\" password=\"" + password + "\";");
- configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
return new DefaultKafkaProducerFactory<>(configProps);
}
- @Bean
- public ConsumerFactory consumerFactory() {
+ private Map commonProducerConfig() {
Map configProps = new HashMap<>();
- configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + UUID.randomUUID());
- configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
- configProps.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
- + username + "\" password=\"" + password + "\";");
+ configProps.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\"" + username + "\" password=\"" + password + "\";");
configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return configProps;
+ }
+
+ @Bean("ervuConsumerFactory")
+ public ConsumerFactory ervuConsumerFactory() {
+ Map configProps = commonConsumerConfig();
+ configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(configProps);
}
+ @Bean("validateConsumerFactory")
+ public ConsumerFactory validateConsumerFactory() {
+ Map configProps = commonConsumerConfig();
+ configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+ return new DefaultKafkaConsumerFactory<>(configProps);
+ }
+
+ private Map commonConsumerConfig() {
+ Map configProps = new HashMap<>();
+ configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + UUID.randomUUID());
+ configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ configProps.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\"" + username + "\" password=\"" + password + "\";");
+ configProps.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
+ configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return configProps;
+ }
+
@Bean("ervuContainerFactory")
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
+ public ConcurrentKafkaListenerContainerFactory ervuContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(ervuConsumerFactory());
return factory;
}
- @Bean
- public ConcurrentMessageListenerContainer replyContainer(
- @Qualifier("ervuContainerFactory")
- ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory) {
- return kafkaListenerContainerFactory.createContainer(orgReplyTopic, excerptReplyTopic,
- journalReplyTopic, avReplyTopic
- );
+ @Bean("validateContainerFactory")
+ public ConcurrentKafkaListenerContainerFactory validateContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(validateConsumerFactory());
+ return factory;
}
- @Bean
- public ReplyingKafkaTemplate replyingKafkaTemplate(
- @Qualifier("ervuProducerFactory") ProducerFactory producerFactory,
- ConcurrentMessageListenerContainer replyContainer) {
- ReplyingKafkaTemplate replyingKafkaTemplate =
- new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
- replyingKafkaTemplate.setCorrelationHeaderName("messageId");
- replyingKafkaTemplate.setCorrelationIdStrategy(record ->
+ @Bean("ervuContainer")
+ public ConcurrentMessageListenerContainer ervuContainer(
+ @Qualifier("ervuContainerFactory") ConcurrentKafkaListenerContainerFactory factory) {
+ return factory.createContainer(orgReplyTopic, excerptReplyTopic,
+ journalReplyTopic, avReplyTopic);
+ }
+
+ @Bean("validateContainer")
+ public ConcurrentMessageListenerContainer validateContainer(
+ @Qualifier("validateContainerFactory") ConcurrentKafkaListenerContainerFactory factory) {
+ return factory.createContainer(validateReplyTopic);
+ }
+
+ @Bean("ervuReplyingTemplate")
+ public ReplyingKafkaTemplate ervuReplyingTemplate(
+ ProducerFactory producerFactory,
+ @Qualifier("ervuContainer") ConcurrentMessageListenerContainer container) {
+ ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, container);
+ customizeTemplate(template);
+ return template;
+ }
+
+ @Bean("validateReplyingTemplate")
+ public ReplyingKafkaTemplate validateReplyingTemplate(
+ ProducerFactory producerFactory,
+ @Qualifier("validateContainer") ConcurrentMessageListenerContainer container) {
+ ReplyingKafkaTemplate template = new ReplyingKafkaTemplate<>(producerFactory, container);
+ customizeTemplate(template);
+ return template;
+ }
+
+ private void customizeTemplate(ReplyingKafkaTemplate, ?, ?> template) {
+ template.setCorrelationHeaderName("messageId");
+ template.setCorrelationIdStrategy(record ->
new CorrelationKey(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)));
- replyingKafkaTemplate.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
- replyingKafkaTemplate.setSharedReplyTopic(true);
- return replyingKafkaTemplate;
+ template.setDefaultReplyTimeout(Duration.ofSeconds(replyTimeout));
+ template.setSharedReplyTopic(true);
}
}
diff --git a/backend/src/main/java/ru/micord/ervu/kafka/controller/ErvuKafkaController.java b/backend/src/main/java/ru/micord/ervu/kafka/controller/ErvuKafkaController.java
index 6b29a84e..1488cb8f 100644
--- a/backend/src/main/java/ru/micord/ervu/kafka/controller/ErvuKafkaController.java
+++ b/backend/src/main/java/ru/micord/ervu/kafka/controller/ErvuKafkaController.java
@@ -31,7 +31,7 @@ import ru.micord.ervu.util.UrlUtils;
public class ErvuKafkaController {
@Autowired
- private ReplyingKafkaService replyingKafkaService;
+ private ReplyingKafkaService replyingKafkaService;
@Autowired
private AuditService auditService;
diff --git a/backend/src/main/java/ru/micord/ervu/kafka/service/ReplyingKafkaService.java b/backend/src/main/java/ru/micord/ervu/kafka/service/ReplyingKafkaService.java
index 49918e61..876048d6 100644
--- a/backend/src/main/java/ru/micord/ervu/kafka/service/ReplyingKafkaService.java
+++ b/backend/src/main/java/ru/micord/ervu/kafka/service/ReplyingKafkaService.java
@@ -1,8 +1,9 @@
package ru.micord.ervu.kafka.service;
-public interface ReplyingKafkaService {
- String sendMessageAndGetReply(String requestTopic,
- String requestReplyTopic,
- String requestMessage);
-}
+public interface ReplyingKafkaService {
+
+ V sendMessageAndGetReply(String requestTopic,
+ String replyTopic,
+ T requestMessage);
+}
\ No newline at end of file
diff --git a/backend/src/main/java/ru/micord/ervu/kafka/service/impl/BaseReplyingKafkaServiceImpl.java b/backend/src/main/java/ru/micord/ervu/kafka/service/impl/BaseReplyingKafkaService.java
similarity index 54%
rename from backend/src/main/java/ru/micord/ervu/kafka/service/impl/BaseReplyingKafkaServiceImpl.java
rename to backend/src/main/java/ru/micord/ervu/kafka/service/impl/BaseReplyingKafkaService.java
index 03e19f72..c849daa2 100644
--- a/backend/src/main/java/ru/micord/ervu/kafka/service/impl/BaseReplyingKafkaServiceImpl.java
+++ b/backend/src/main/java/ru/micord/ervu/kafka/service/impl/BaseReplyingKafkaService.java
@@ -6,12 +6,10 @@ import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
-import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
import ru.micord.ervu.kafka.exception.KafkaMessageException;
import ru.micord.ervu.kafka.exception.KafkaMessageReplyTimeoutException;
@@ -21,35 +19,30 @@ import ru.micord.ervu.kafka.service.ReplyingKafkaService;
* @author Eduard Tihomirov
*/
@Service
-public class BaseReplyingKafkaServiceImpl implements ReplyingKafkaService {
+public abstract class BaseReplyingKafkaService implements ReplyingKafkaService {
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ReplyingKafkaTemplate replyingKafkaTemplate;
- public BaseReplyingKafkaServiceImpl(
- ReplyingKafkaTemplate replyingKafkaTemplate) {
- this.replyingKafkaTemplate = replyingKafkaTemplate;
- }
-
- public String sendMessageAndGetReply(String requestTopic,
- String replyTopic,
- String requestMessage) {
+ @Override
+ public V sendMessageAndGetReply(String requestTopic, String replyTopic, T requestMessage) {
long startTime = System.currentTimeMillis();
- ProducerRecord record = new ProducerRecord<>(requestTopic, requestMessage);
- record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
- RequestReplyFuture replyFuture = replyingKafkaTemplate.sendAndReceive(record);
+ RequestReplyFuture replyFuture = getTemplate().sendAndReceive(
+ getProducerRecord(requestTopic, replyTopic, requestMessage));
try {
- String result = Optional.ofNullable(replyFuture.get())
- .map(ConsumerRecord::value)
- .orElseThrow(() -> new KafkaMessageException("Kafka return result is null."));
- LOGGER.info("Thread {} - KafkaSendMessageAndGetReply: {} ms",
- Thread.currentThread().getId(), System.currentTimeMillis() - startTime);
- return result;
+ ConsumerRecord result = Optional.ofNullable(replyFuture.get())
+ .orElseThrow(() -> new KafkaMessageException("Kafka return result is null"));
+ LOGGER.info("Thread {} - KafkaSendMessageAndGetReply: {} ms, replyTopic: {}",
+ Thread.currentThread().getId(), System.currentTimeMillis() - startTime, replyTopic);
+ return result.value();
}
catch (InterruptedException | ExecutionException e) {
- LOGGER.error("Thread {} - KafkaSendMessageAndGetReply: {} ms",
- Thread.currentThread().getId(), System.currentTimeMillis() - startTime);
+ LOGGER.error("Thread {} - KafkaSendMessageAndGetReply: {} ms, replyTopic: {}",
+ Thread.currentThread().getId(), System.currentTimeMillis() - startTime, replyTopic);
throw new KafkaMessageReplyTimeoutException(e);
}
}
+ protected abstract ReplyingKafkaTemplate getTemplate();
+
+ protected abstract ProducerRecord getProducerRecord(String requestTopic,
+ String replyTopic, T requestMessage);
}
diff --git a/backend/src/main/java/ru/micord/ervu/kafka/service/impl/ErvuReplyingKafkaService.java b/backend/src/main/java/ru/micord/ervu/kafka/service/impl/ErvuReplyingKafkaService.java
new file mode 100644
index 00000000..51c3d452
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/kafka/service/impl/ErvuReplyingKafkaService.java
@@ -0,0 +1,31 @@
+package ru.micord.ervu.kafka.service.impl;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ErvuReplyingKafkaService extends BaseReplyingKafkaService{
+ private final ReplyingKafkaTemplate replyingKafkaTemplate;
+
+ public ErvuReplyingKafkaService(
+ @Qualifier("ervuReplyingTemplate") ReplyingKafkaTemplate replyingKafkaTemplate) {
+ this.replyingKafkaTemplate = replyingKafkaTemplate;
+ }
+
+ @Override
+ protected ReplyingKafkaTemplate getTemplate() {
+ return replyingKafkaTemplate;
+ }
+
+ @Override
+ protected ProducerRecord getProducerRecord(String requestTopic, String replyTopic,
+ String requestMessage) {
+ ProducerRecord record = new ProducerRecord<>(requestTopic, requestMessage);
+ record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
+ return record;
+ }
+}
diff --git a/backend/src/main/java/ru/micord/ervu/kafka/service/impl/ValidateReplyingKafkaService.java b/backend/src/main/java/ru/micord/ervu/kafka/service/impl/ValidateReplyingKafkaService.java
new file mode 100644
index 00000000..10903afc
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/kafka/service/impl/ValidateReplyingKafkaService.java
@@ -0,0 +1,32 @@
+package ru.micord.ervu.kafka.service.impl;
+
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.utils.Bytes;
+import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ValidateReplyingKafkaService extends BaseReplyingKafkaService {
+ private final ReplyingKafkaTemplate replyingKafkaTemplate;
+
+ public ValidateReplyingKafkaService(
+ ReplyingKafkaTemplate replyingKafkaTemplate) {
+ this.replyingKafkaTemplate = replyingKafkaTemplate;
+ }
+
+ @Override
+ protected ReplyingKafkaTemplate getTemplate() {
+ return replyingKafkaTemplate;
+ }
+
+ @Override
+ protected ProducerRecord getProducerRecord(String requestTopic, String replyTopic,
+ String requestMessage) {
+ ProducerRecord record = new ProducerRecord<>(requestTopic, requestMessage);
+ record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTopic.getBytes()));
+ return record;
+ }
+}
diff --git a/backend/src/main/java/ru/micord/ervu/model/ValidateExportRequest.java b/backend/src/main/java/ru/micord/ervu/model/ValidateExportRequest.java
new file mode 100644
index 00000000..65dbdd7a
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/model/ValidateExportRequest.java
@@ -0,0 +1,4 @@
+package ru.micord.ervu.model;
+
+public record ValidateExportRequest(String orgId, String fileId) {
+}
diff --git a/backend/src/main/java/ru/micord/ervu/model/ValidateExportResponse.java b/backend/src/main/java/ru/micord/ervu/model/ValidateExportResponse.java
new file mode 100644
index 00000000..8e8c8cd9
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/model/ValidateExportResponse.java
@@ -0,0 +1,28 @@
+package ru.micord.ervu.model;
+
+import org.apache.kafka.shaded.com.google.protobuf.ByteString;
+import org.apache.kafka.shaded.com.google.protobuf.InvalidProtocolBufferException;
+
+public class ValidateExportResponse {
+ private String fileName;
+ private ByteString file;
+
+ public ValidateExportResponse(byte[] bytes) throws InvalidProtocolBufferException {
+ // TODO: Заменить ValidateExportResponseProto на реальный protobuf класс
+// ValidateExportResponseProto protoResponse = ValidateExportResponseProto.parseFrom(bytes);
+// this.fileName = protoResponse.getFileName();
+// this.file = protoResponse.getFile();
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public ByteString getFile() {
+ return file;
+ }
+
+ public boolean hasFile() {
+ return file != null && !file.isEmpty();
+ }
+}
\ No newline at end of file
diff --git a/backend/src/main/java/ru/micord/ervu/property/grid/FilterType.java b/backend/src/main/java/ru/micord/ervu/property/grid/FilterType.java
index 7795ebc0..8d3f53f1 100644
--- a/backend/src/main/java/ru/micord/ervu/property/grid/FilterType.java
+++ b/backend/src/main/java/ru/micord/ervu/property/grid/FilterType.java
@@ -7,5 +7,6 @@ public enum FilterType {
TEXT,
DATE,
NUMBER,
- SET
+ SET,
+ FILE
}
diff --git a/backend/src/main/java/ru/micord/ervu/security/esia/service/EsiaAuthService.java b/backend/src/main/java/ru/micord/ervu/security/esia/service/EsiaAuthService.java
index 63143c16..a9f403db 100644
--- a/backend/src/main/java/ru/micord/ervu/security/esia/service/EsiaAuthService.java
+++ b/backend/src/main/java/ru/micord/ervu/security/esia/service/EsiaAuthService.java
@@ -86,7 +86,7 @@ public class EsiaAuthService {
@Autowired
private JwtTokenService jwtTokenService;
@Autowired
- private ReplyingKafkaService replyingKafkaService;
+ private ReplyingKafkaService replyingKafkaService;
@Autowired
private OkopfService okopfService;
@Autowired
diff --git a/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java b/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java
index c5688d16..b02c9e2c 100644
--- a/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java
+++ b/backend/src/main/java/ru/micord/ervu/service/InteractionServiceImpl.java
@@ -41,8 +41,6 @@ public class InteractionServiceImpl implements InteractionService {
.set(INTERACTION_LOG.SENT_DATE, timestamp)
.set(INTERACTION_LOG.SENDER, sender)
.set(INTERACTION_LOG.FILE_NAME, fileName)
- .set(INTERACTION_LOG.RECORDS_SENT, 0)
- .set(INTERACTION_LOG.RECORDS_ACCEPTED, 0)
.set(INTERACTION_LOG.ERVU_ID, ervuId)
.execute();
}
diff --git a/backend/src/main/java/ru/micord/ervu/service/ValidationFileService.java b/backend/src/main/java/ru/micord/ervu/service/ValidationFileService.java
new file mode 100644
index 00000000..c116bf26
--- /dev/null
+++ b/backend/src/main/java/ru/micord/ervu/service/ValidationFileService.java
@@ -0,0 +1,48 @@
+package ru.micord.ervu.service;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Bytes;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import ru.micord.ervu.exception.ExportException;
+import ru.micord.ervu.kafka.service.ReplyingKafkaService;
+import ru.micord.ervu.model.ValidateExportRequest;
+import ru.micord.ervu.model.ValidateExportResponse;
+
+/**
+ * @author Adel Kalimullin
+ */
+@Service
+public class ValidationFileService {
+ private final ReplyingKafkaService