SAPPORT-8400: fix for review (3)
This commit is contained in:
parent
fb99f534cc
commit
3b8a978a05
11 changed files with 60 additions and 41 deletions
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord;
|
||||
package ru.micord.ervu.av;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.exception;
|
||||
package ru.micord.ervu.av.exception;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.exception;
|
||||
package ru.micord.ervu.av.exception;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
|
|
@ -6,10 +6,6 @@ package ru.micord.exception;
|
|||
public class InvalidHttpFileUrlException extends Exception {
|
||||
private static final String MESSAGE = "file url is not valid";
|
||||
|
||||
public InvalidHttpFileUrlException() {
|
||||
super(MESSAGE);
|
||||
}
|
||||
|
||||
public InvalidHttpFileUrlException(String message) {
|
||||
super(String.join(" : ", MESSAGE, message));
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.kafka.config.input;
|
||||
package ru.micord.ervu.av.kafka.config.input;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.kafka.config.output;
|
||||
package ru.micord.ervu.av.kafka.config.output;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.kafka.config.output;
|
||||
package ru.micord.ervu.av.kafka.config.output;
|
||||
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.kafka.dto;
|
||||
package ru.micord.ervu.av.kafka.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
|
@ -1,7 +1,7 @@
|
|||
package ru.micord.kafka.dto;
|
||||
package ru.micord.ervu.av.kafka.dto;
|
||||
|
||||
import org.springframework.lang.NonNull;
|
||||
import ru.micord.av.AvResponse;
|
||||
import ru.micord.ervu.av.response.AvResponse;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package ru.micord.av;
|
||||
package ru.micord.ervu.av.response;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
|
|
@ -1,9 +1,10 @@
|
|||
package ru.micord.service;
|
||||
package ru.micord.ervu.av.service;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
|
@ -24,17 +25,17 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.stereotype.Service;
|
||||
import ru.micord.av.AvResponse;
|
||||
import ru.micord.exception.InvalidHttpFileUrlException;
|
||||
import ru.micord.kafka.dto.InMessage;
|
||||
import ru.micord.kafka.dto.OutErrorMessage;
|
||||
|
||||
import ru.micord.exception.FileUploadException;
|
||||
import ru.micord.ervu.av.exception.FileUploadException;
|
||||
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
|
||||
import ru.micord.ervu.av.kafka.dto.InMessage;
|
||||
import ru.micord.ervu.av.kafka.dto.OutErrorMessage;
|
||||
import ru.micord.ervu.av.response.AvResponse;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
|
|
@ -60,7 +61,7 @@ public class FileUploadService {
|
|||
this.outSuccessTopic = outSuccessTopic;
|
||||
}
|
||||
|
||||
//@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}")
|
||||
@KafkaListener(id = "${spring.kafka.consumer.group-id}", topics = "${kafka-in.topic.name}")
|
||||
public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) {
|
||||
InMessage inMessage = new Gson().fromJson(kafkaInMessage, InMessage.class);
|
||||
|
||||
|
|
@ -68,48 +69,56 @@ public class FileUploadService {
|
|||
FileUrl fileUrl = parseFileUrl(inMessage.fileInfo().getFileNameBase());
|
||||
String filePath = fileSavingPath + fileUrl.fileName();
|
||||
String downloadUrl = fileUrl.fileUrl();
|
||||
downloadFileByHttp(downloadUrl, filePath);
|
||||
downloadFile(downloadUrl, filePath);
|
||||
|
||||
AvResponse avResponse = sendFileToAvScan(filePath);
|
||||
AvResponse avResponse = checkFile(filePath);
|
||||
|
||||
boolean infected = Arrays.stream(avResponse.verdicts())
|
||||
.anyMatch(verdict -> verdict.equalsIgnoreCase("infected"));
|
||||
|
||||
if (infected) {
|
||||
sendKafkaMessage(outErrorTopic.name(),
|
||||
sendMessage(outErrorTopic.name(),
|
||||
new OutErrorMessage("file is infected", avResponse, inMessage)
|
||||
);
|
||||
}
|
||||
else {
|
||||
String uploadUrl = httpFileServerOutAddress + "/" + fileUrl.fileName();
|
||||
uploadFileByHttp(filePath, uploadUrl);
|
||||
uploadFile(filePath, uploadUrl);
|
||||
|
||||
inMessage.fileInfo().setFileNameBase(uploadUrl);
|
||||
sendKafkaMessage(outSuccessTopic.name(), inMessage);
|
||||
sendMessage(outSuccessTopic.name(), inMessage);
|
||||
}
|
||||
|
||||
deleteFileByHttp(downloadUrl);
|
||||
deleteFile(downloadUrl);
|
||||
if (new File(filePath).delete()) {
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
}
|
||||
catch (InvalidHttpFileUrlException e) {
|
||||
// считаем, что повторная обработка сообщения не нужна
|
||||
// ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения
|
||||
logger.error(e.getMessage() + ": " + kafkaInMessage);
|
||||
|
||||
sendKafkaMessage(outErrorTopic.name(),
|
||||
sendMessage(outErrorTopic.name(),
|
||||
new OutErrorMessage(e.getMessage(), null, inMessage)
|
||||
);
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
catch (FileUploadException e) {
|
||||
// считаем, что нужно повторное считывание сообщения
|
||||
// ошибку логируем, сообщение оставляем непрочитанным
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/* метод для выделения UUID файла из ссылки на файл
|
||||
сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла
|
||||
*/
|
||||
private static FileUrl parseFileUrl(@NonNull String fileUrl) throws InvalidHttpFileUrlException {
|
||||
String[] substrings = fileUrl.split("/");
|
||||
String fileName = substrings[substrings.length - 1];
|
||||
if (substrings.length == 1 || fileName.isBlank()) {
|
||||
// ошибка данных; сообщение некорректно
|
||||
throw new InvalidHttpFileUrlException(fileUrl);
|
||||
}
|
||||
else {
|
||||
|
|
@ -117,7 +126,7 @@ public class FileUploadService {
|
|||
}
|
||||
}
|
||||
|
||||
private void downloadFileByHttp(String fileUrl, String filePath)
|
||||
private void downloadFile(String fileUrl, String filePath)
|
||||
throws InvalidHttpFileUrlException, FileUploadException {
|
||||
File file = new File(filePath);
|
||||
HttpGet request = new HttpGet(fileUrl);
|
||||
|
|
@ -135,6 +144,7 @@ public class FileUploadService {
|
|||
}
|
||||
}
|
||||
else {
|
||||
// в хранилище не обнаружено файла; сообщение некорректно
|
||||
String message = "http status code " + statusCode + " : " + fileUrl;
|
||||
throw new InvalidHttpFileUrlException(message);
|
||||
}
|
||||
|
|
@ -143,13 +153,14 @@ public class FileUploadService {
|
|||
throw new FileUploadException(e);
|
||||
}
|
||||
catch (IOException e) {
|
||||
// хранилище недоступно; сообщение некорректно
|
||||
String message =
|
||||
(e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl;
|
||||
throw new InvalidHttpFileUrlException(message, e);
|
||||
}
|
||||
}
|
||||
|
||||
private AvResponse sendFileToAvScan(String filePath) throws FileUploadException {
|
||||
private AvResponse checkFile(String filePath) throws FileUploadException {
|
||||
File file = new File(filePath);
|
||||
|
||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
|
|
@ -170,11 +181,12 @@ public class FileUploadService {
|
|||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
// непредусмотренная ошибка доступа через http-клиент
|
||||
throw new FileUploadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void uploadFileByHttp(String filePath, String uploadUrl) throws FileUploadException {
|
||||
private void uploadFile(String filePath, String uploadUrl) throws FileUploadException {
|
||||
File file = new File(filePath);
|
||||
|
||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
|
|
@ -187,6 +199,8 @@ public class FileUploadService {
|
|||
try (CloseableHttpResponse response = client.execute(put)) {
|
||||
int statusCode = response.getStatusLine().getStatusCode();
|
||||
if (statusCode == 204) {
|
||||
// считаем, что происходит повторная обработка
|
||||
// этот же файл доставлен при предыдущей обработке и не удален из входного хранилища
|
||||
logger.warn("file already exists : " + uploadUrl);
|
||||
}
|
||||
else if (statusCode != 201) {
|
||||
|
|
@ -196,11 +210,12 @@ public class FileUploadService {
|
|||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
// непредусмотренная ошибка доступа через http-клиент
|
||||
throw new FileUploadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteFileByHttp(String fileUrl) throws FileUploadException {
|
||||
private void deleteFile(String fileUrl) throws FileUploadException {
|
||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||
HttpDelete delete = new HttpDelete(fileUrl);
|
||||
|
||||
|
|
@ -213,11 +228,12 @@ public class FileUploadService {
|
|||
}
|
||||
}
|
||||
catch (IOException e) {
|
||||
// непредусмотренная ошибка доступа через http-клиент
|
||||
throw new FileUploadException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendKafkaMessage(@NonNull String topicName, Object object) {
|
||||
private void sendMessage(@NonNull String topicName, Object object) {
|
||||
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName,
|
||||
new GsonBuilder().setPrettyPrinting().create().toJson(object)
|
||||
);
|
||||
|
|
@ -1,12 +1,16 @@
|
|||
spring.kafka.admin.security.protocol=SASL_PLAINTEXT
|
||||
spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||
#login password to set
|
||||
spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";
|
||||
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
|
||||
#
|
||||
spring.kafka.bootstrap-servers=10.10.31.11:32609
|
||||
#host1:port1, host2:port2
|
||||
spring.kafka.bootstrap-servers=
|
||||
#
|
||||
spring.kafka.consumer.bootstrap-servers=10.10.31.11:32609
|
||||
#host1:port1, host2:port2
|
||||
spring.kafka.consumer.bootstrap-servers=
|
||||
spring.kafka.consumer.security.protocol=SASL_PLAINTEXT
|
||||
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||
#login password to set
|
||||
spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";
|
||||
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
|
||||
#
|
||||
spring.kafka.consumer.enable-auto-commit=false
|
||||
|
|
@ -14,13 +18,16 @@ spring.kafka.consumer.group-id=file-to-upload-consumers
|
|||
#
|
||||
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
|
||||
#
|
||||
spring.kafka.producer.bootstrap-servers=10.10.31.11:32609
|
||||
#host1:port1, host2:port2
|
||||
spring.kafka.producer.bootstrap-servers=
|
||||
spring.kafka.producer.security.protocol=SASL_PLAINTEXT
|
||||
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||
#login password to set
|
||||
spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";
|
||||
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
|
||||
#
|
||||
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
|
||||
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="Blfi9d2OFG";
|
||||
#login password to set
|
||||
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="" password="";
|
||||
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
|
||||
#
|
||||
kafka-in.topic.name=file-to-upload
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue