add file manager
This commit is contained in:
parent
4a1e31dd6a
commit
6d71d89577
3 changed files with 102 additions and 71 deletions
86
src/main/java/ru/micord/ervu/av/service/FileManager.java
Normal file
86
src/main/java/ru/micord/ervu/av/service/FileManager.java
Normal file
|
|
@ -0,0 +1,86 @@
|
||||||
|
package ru.micord.ervu.av.service;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import org.apache.http.HttpEntity;
|
||||||
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
|
import org.apache.http.client.methods.HttpDelete;
|
||||||
|
import org.apache.http.client.methods.HttpGet;
|
||||||
|
import org.apache.http.conn.HttpHostConnectException;
|
||||||
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
|
import org.apache.http.impl.client.HttpClients;
|
||||||
|
import org.springframework.http.HttpStatus;
|
||||||
|
import org.springframework.retry.annotation.Backoff;
|
||||||
|
import org.springframework.retry.annotation.Retryable;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import ru.micord.ervu.av.exception.FileUploadException;
|
||||||
|
import ru.micord.ervu.av.exception.InvalidHttpFileUrlException;
|
||||||
|
import ru.micord.ervu.av.exception.RetryableException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author gulnaz
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class FileManager {
|
||||||
|
|
||||||
|
@Retryable(retryFor = {InvalidHttpFileUrlException.class},
|
||||||
|
maxAttemptsExpression = "${av.retry.max.attempts.count}",
|
||||||
|
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
|
||||||
|
public void downloadFile(String fileUrl, Path filePath)
|
||||||
|
throws InvalidHttpFileUrlException, FileUploadException {
|
||||||
|
File file = filePath.toFile();
|
||||||
|
HttpGet request = new HttpGet(fileUrl);
|
||||||
|
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault();
|
||||||
|
CloseableHttpResponse response = client.execute(request)) {
|
||||||
|
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode == HttpStatus.OK.value()) {
|
||||||
|
HttpEntity entity = response.getEntity();
|
||||||
|
if (entity != null) {
|
||||||
|
try (FileOutputStream outputStream = new FileOutputStream(file)) {
|
||||||
|
entity.writeTo(outputStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// в хранилище не обнаружено файла; сообщение некорректно
|
||||||
|
String message = "http status code " + statusCode + " : " + fileUrl;
|
||||||
|
throw new InvalidHttpFileUrlException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (HttpHostConnectException e) {
|
||||||
|
throw new FileUploadException(e);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
// хранилище недоступно; сообщение некорректно
|
||||||
|
String message =
|
||||||
|
(e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl;
|
||||||
|
throw new InvalidHttpFileUrlException(message, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Retryable(retryFor = {RetryableException.class},
|
||||||
|
maxAttemptsExpression = "${av.retry.max.attempts.count}",
|
||||||
|
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
|
||||||
|
public void deleteFile(String fileUrl) throws FileUploadException {
|
||||||
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
HttpDelete delete = new HttpDelete(fileUrl);
|
||||||
|
|
||||||
|
try (CloseableHttpResponse response = client.execute(delete)) {
|
||||||
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
|
if (statusCode != HttpStatus.NO_CONTENT.value()) {
|
||||||
|
String message = "http status code " + statusCode + " : " + fileUrl;
|
||||||
|
throw new RetryableException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
// непредусмотренная ошибка доступа через http-клиент
|
||||||
|
throw new FileUploadException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,7 +1,5 @@
|
||||||
package ru.micord.ervu.av.service;
|
package ru.micord.ervu.av.service;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
|
@ -12,13 +10,6 @@ import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import com.google.gson.GsonBuilder;
|
import com.google.gson.GsonBuilder;
|
||||||
import org.apache.http.HttpEntity;
|
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
|
||||||
import org.apache.http.client.methods.HttpDelete;
|
|
||||||
import org.apache.http.client.methods.HttpGet;
|
|
||||||
import org.apache.http.conn.HttpHostConnectException;
|
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
|
||||||
import org.apache.http.impl.client.HttpClients;
|
|
||||||
import org.apache.kafka.clients.admin.NewTopic;
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
@ -26,7 +17,6 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.http.HttpStatus;
|
|
||||||
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
import org.springframework.kafka.core.KafkaTemplate;
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
import org.springframework.kafka.support.Acknowledgment;
|
import org.springframework.kafka.support.Acknowledgment;
|
||||||
|
|
@ -58,14 +48,18 @@ public class FileUploadService {
|
||||||
private final NewTopic outErrorTopic;
|
private final NewTopic outErrorTopic;
|
||||||
private final NewTopic outSuccessTopic;
|
private final NewTopic outSuccessTopic;
|
||||||
private final NewTopic inStatusTopic;
|
private final NewTopic inStatusTopic;
|
||||||
|
private final FileManager fIleManager;
|
||||||
private final ReceiveScanReportRetryable receiveScanReportRetryable;
|
private final ReceiveScanReportRetryable receiveScanReportRetryable;
|
||||||
private final S3Service s3Service;
|
private final S3Service s3Service;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public FileUploadService(@Qualifier("outputKafkaTemplate") KafkaTemplate<String, String> kafkaTemplate,
|
public FileUploadService(
|
||||||
|
@Qualifier("outputKafkaTemplate") KafkaTemplate<String, String> kafkaTemplate,
|
||||||
@Qualifier("inputKafkaTemplate") KafkaTemplate<String, String> inKafkaTemplate,
|
@Qualifier("inputKafkaTemplate") KafkaTemplate<String, String> inKafkaTemplate,
|
||||||
NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable,
|
NewTopic outErrorTopic, NewTopic outSuccessTopic,
|
||||||
S3Service s3Service, NewTopic inStatusTopic) {
|
ReceiveScanReportRetryable receiveScanReportRetryable,
|
||||||
|
S3Service s3Service, NewTopic inStatusTopic,
|
||||||
|
FileManager fIleManager) {
|
||||||
this.kafkaTemplate = kafkaTemplate;
|
this.kafkaTemplate = kafkaTemplate;
|
||||||
this.outErrorTopic = outErrorTopic;
|
this.outErrorTopic = outErrorTopic;
|
||||||
this.outSuccessTopic = outSuccessTopic;
|
this.outSuccessTopic = outSuccessTopic;
|
||||||
|
|
@ -73,6 +67,7 @@ public class FileUploadService {
|
||||||
this.s3Service = s3Service;
|
this.s3Service = s3Service;
|
||||||
this.inKafkaTemplate = inKafkaTemplate;
|
this.inKafkaTemplate = inKafkaTemplate;
|
||||||
this.inStatusTopic = inStatusTopic;
|
this.inStatusTopic = inStatusTopic;
|
||||||
|
this.fIleManager = fIleManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
|
@KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}",
|
||||||
|
|
@ -93,7 +88,7 @@ public class FileUploadService {
|
||||||
logger.info("working in {}", System.getProperty("user.home"));
|
logger.info("working in {}", System.getProperty("user.home"));
|
||||||
Path filePath = Paths.get(fileSavingPath, fileUrl.fileName());
|
Path filePath = Paths.get(fileSavingPath, fileUrl.fileName());
|
||||||
String downloadUrl = fileUrl.fileUrl();
|
String downloadUrl = fileUrl.fileUrl();
|
||||||
downloadFile(downloadUrl, filePath);
|
fIleManager.downloadFile(downloadUrl, filePath);
|
||||||
boolean isAvError = false;
|
boolean isAvError = false;
|
||||||
boolean clean = true;
|
boolean clean = true;
|
||||||
boolean infected = false;
|
boolean infected = false;
|
||||||
|
|
@ -130,7 +125,7 @@ public class FileUploadService {
|
||||||
}
|
}
|
||||||
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
|
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
|
||||||
|
|
||||||
deleteFile(downloadUrl);
|
fIleManager.deleteFile(downloadUrl);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Files.delete(filePath);
|
Files.delete(filePath);
|
||||||
|
|
@ -144,10 +139,12 @@ public class FileUploadService {
|
||||||
}
|
}
|
||||||
catch (InvalidHttpFileUrlException e) {
|
catch (InvalidHttpFileUrlException e) {
|
||||||
// считаем, что повторная обработка сообщения не нужна
|
// считаем, что повторная обработка сообщения не нужна
|
||||||
// ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения
|
// ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения
|
||||||
logger.error(e.getMessage() + ": " + kafkaInMessage);
|
logger.error(e.getMessage() + ": " + kafkaInMessage);
|
||||||
|
downloadRequest.fileInfo().setFileUrl(null);
|
||||||
|
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_11);
|
||||||
|
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
|
||||||
acknowledgment.acknowledge();
|
acknowledgment.acknowledge();
|
||||||
throw new RuntimeException(kafkaInMessage, e);
|
|
||||||
}
|
}
|
||||||
catch (FileUploadException e) {
|
catch (FileUploadException e) {
|
||||||
// считаем, что нужно повторное считывание сообщения
|
// считаем, что нужно повторное считывание сообщения
|
||||||
|
|
@ -187,58 +184,6 @@ public class FileUploadService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void downloadFile(String fileUrl, Path filePath)
|
|
||||||
throws InvalidHttpFileUrlException, FileUploadException {
|
|
||||||
File file = filePath.toFile();
|
|
||||||
HttpGet request = new HttpGet(fileUrl);
|
|
||||||
|
|
||||||
try (CloseableHttpClient client = HttpClients.createDefault();
|
|
||||||
CloseableHttpResponse response = client.execute(request)) {
|
|
||||||
|
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
|
||||||
if (statusCode == HttpStatus.OK.value()) {
|
|
||||||
HttpEntity entity = response.getEntity();
|
|
||||||
if (entity != null) {
|
|
||||||
try (FileOutputStream outputStream = new FileOutputStream(file)) {
|
|
||||||
entity.writeTo(outputStream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
// в хранилище не обнаружено файла; сообщение некорректно
|
|
||||||
String message = "http status code " + statusCode + " : " + fileUrl;
|
|
||||||
throw new InvalidHttpFileUrlException(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (HttpHostConnectException e) {
|
|
||||||
throw new FileUploadException(e);
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
// хранилище недоступно; сообщение некорректно
|
|
||||||
String message =
|
|
||||||
(e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl;
|
|
||||||
throw new InvalidHttpFileUrlException(message, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteFile(String fileUrl) throws FileUploadException {
|
|
||||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
|
||||||
HttpDelete delete = new HttpDelete(fileUrl);
|
|
||||||
|
|
||||||
try (CloseableHttpResponse response = client.execute(delete)) {
|
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
|
||||||
if (statusCode != HttpStatus.NO_CONTENT.value()) {
|
|
||||||
String message = "http status code " + statusCode + " : " + fileUrl;
|
|
||||||
throw new RuntimeException(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (IOException e) {
|
|
||||||
// непредусмотренная ошибка доступа через http-клиент
|
|
||||||
throw new FileUploadException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendMessage(@NonNull String topicName, Object object, String messageId,
|
private void sendMessage(@NonNull String topicName, Object object, String messageId,
|
||||||
KafkaTemplate<String, String> template) {
|
KafkaTemplate<String, String> template) {
|
||||||
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
|
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ public class ReceiveScanReportRetryable {
|
||||||
@Retryable(retryFor = {RetryableException.class},
|
@Retryable(retryFor = {RetryableException.class},
|
||||||
maxAttemptsExpression = "${av.retry.max.attempts.count}",
|
maxAttemptsExpression = "${av.retry.max.attempts.count}",
|
||||||
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
|
backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}"))
|
||||||
public AvResponse checkFile(Path filePath) throws FileUploadException {
|
public AvResponse checkFile(Path filePath) throws RetryableException, FileUploadException {
|
||||||
File file = filePath.toFile();
|
File file = filePath.toFile();
|
||||||
|
|
||||||
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
|
|
@ -107,7 +107,7 @@ public class ReceiveScanReportRetryable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get)
|
private AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get)
|
||||||
throws FileUploadException {
|
throws RetryableException, FileUploadException {
|
||||||
|
|
||||||
try (CloseableHttpResponse getResponse = client.execute(get)) {
|
try (CloseableHttpResponse getResponse = client.execute(get)) {
|
||||||
int getStatusCode = getResponse.getStatusLine().getStatusCode();
|
int getStatusCode = getResponse.getStatusLine().getStatusCode();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue