diff --git a/pom.xml b/pom.xml index 9883b66..fcd2336 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 3.3.0 + 3.3.5 diff --git a/src/main/java/ru/micord/ervu/av/service/FileManager.java b/src/main/java/ru/micord/ervu/av/service/FileManager.java new file mode 100644 index 0000000..4b52afe --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/service/FileManager.java @@ -0,0 +1,86 @@ +package ru.micord.ervu.av.service; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; + +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.springframework.http.HttpStatus; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Component; +import ru.micord.ervu.av.exception.FileUploadException; +import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; +import ru.micord.ervu.av.exception.RetryableException; + +/** + * @author gulnaz + */ +@Component +public class FileManager { + + @Retryable(retryFor = {InvalidHttpFileUrlException.class}, + maxAttemptsExpression = "${av.retry.max.attempts.count}", + backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}")) + public void downloadFile(String fileUrl, Path filePath) + throws InvalidHttpFileUrlException, FileUploadException { + File file = filePath.toFile(); + HttpGet request = new HttpGet(fileUrl); + + try (CloseableHttpClient client = HttpClients.createDefault(); + CloseableHttpResponse response = client.execute(request)) { + + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.OK.value()) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + try (FileOutputStream outputStream = new FileOutputStream(file)) { + entity.writeTo(outputStream); + } + } + } + else { + // в хранилище не обнаружено файла; сообщение некорректно + String message = "http status code " + statusCode + " : " + fileUrl; + throw new InvalidHttpFileUrlException(message); + } + } + catch (HttpHostConnectException e) { + throw new FileUploadException(e); + } + catch (IOException e) { + // хранилище недоступно; сообщение некорректно + String message = + (e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl; + throw new InvalidHttpFileUrlException(message, e); + } + } + + @Retryable(retryFor = {RetryableException.class}, + maxAttemptsExpression = "${av.retry.max.attempts.count}", + backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}")) + public void deleteFile(String fileUrl) throws FileUploadException { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpDelete delete = new HttpDelete(fileUrl); + + try (CloseableHttpResponse response = client.execute(delete)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.NO_CONTENT.value()) { + String message = "http status code " + statusCode + " : " + fileUrl; + throw new RetryableException(message); + } + } + } + catch (IOException e) { + // непредусмотренная ошибка доступа через http-клиент + throw new FileUploadException(e); + } + } +} diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index a4186a8..b4f44a7 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -1,7 +1,5 @@ package ru.micord.ervu.av.service; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -12,13 +10,6 @@ import java.util.concurrent.CompletableFuture; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.conn.HttpHostConnectException; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; @@ -26,7 +17,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; -import org.springframework.http.HttpStatus; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; @@ -58,14 +48,18 @@ public class FileUploadService { private final NewTopic outErrorTopic; private final NewTopic outSuccessTopic; private final NewTopic inStatusTopic; + private final FileManager fIleManager; private final ReceiveScanReportRetryable receiveScanReportRetryable; private final S3Service s3Service; @Autowired - public FileUploadService(@Qualifier("outputKafkaTemplate") KafkaTemplate kafkaTemplate, + public FileUploadService( + @Qualifier("outputKafkaTemplate") KafkaTemplate kafkaTemplate, @Qualifier("inputKafkaTemplate") KafkaTemplate inKafkaTemplate, - NewTopic outErrorTopic, NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable, - S3Service s3Service, NewTopic inStatusTopic) { + NewTopic outErrorTopic, NewTopic outSuccessTopic, + ReceiveScanReportRetryable receiveScanReportRetryable, + S3Service s3Service, NewTopic inStatusTopic, + FileManager fIleManager) { this.kafkaTemplate = kafkaTemplate; this.outErrorTopic = outErrorTopic; this.outSuccessTopic = outSuccessTopic; @@ -73,6 +67,7 @@ public class FileUploadService { this.s3Service = s3Service; this.inKafkaTemplate = inKafkaTemplate; this.inStatusTopic = inStatusTopic; + this.fIleManager = fIleManager; } @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", @@ -93,7 +88,7 @@ public class FileUploadService { logger.info("working in {}", System.getProperty("user.home")); Path filePath = Paths.get(fileSavingPath, fileUrl.fileName()); String downloadUrl = fileUrl.fileUrl(); - downloadFile(downloadUrl, filePath); + fIleManager.downloadFile(downloadUrl, filePath); boolean isAvError = false; boolean clean = true; boolean infected = false; @@ -130,7 +125,7 @@ public class FileUploadService { } sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); - deleteFile(downloadUrl); + fIleManager.deleteFile(downloadUrl); try { Files.delete(filePath); @@ -144,10 +139,12 @@ public class FileUploadService { } catch (InvalidHttpFileUrlException e) { // считаем, что повторная обработка сообщения не нужна - // ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения + // ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения logger.error(e.getMessage() + ": " + kafkaInMessage); + downloadRequest.fileInfo().setFileUrl(null); + downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_11); + sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); acknowledgment.acknowledge(); - throw new RuntimeException(kafkaInMessage, e); } catch (FileUploadException e) { // считаем, что нужно повторное считывание сообщения @@ -187,58 +184,6 @@ public class FileUploadService { } } - private void downloadFile(String fileUrl, Path filePath) - throws InvalidHttpFileUrlException, FileUploadException { - File file = filePath.toFile(); - HttpGet request = new HttpGet(fileUrl); - - try (CloseableHttpClient client = HttpClients.createDefault(); - CloseableHttpResponse response = client.execute(request)) { - - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode == HttpStatus.OK.value()) { - HttpEntity entity = response.getEntity(); - if (entity != null) { - try (FileOutputStream outputStream = new FileOutputStream(file)) { - entity.writeTo(outputStream); - } - } - } - else { - // в хранилище не обнаружено файла; сообщение некорректно - String message = "http status code " + statusCode + " : " + fileUrl; - throw new InvalidHttpFileUrlException(message); - } - } - catch (HttpHostConnectException e) { - throw new FileUploadException(e); - } - catch (IOException e) { - // хранилище недоступно; сообщение некорректно - String message = - (e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl; - throw new InvalidHttpFileUrlException(message, e); - } - } - - private void deleteFile(String fileUrl) throws FileUploadException { - try (CloseableHttpClient client = HttpClients.createDefault()) { - HttpDelete delete = new HttpDelete(fileUrl); - - try (CloseableHttpResponse response = client.execute(delete)) { - int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode != HttpStatus.NO_CONTENT.value()) { - String message = "http status code " + statusCode + " : " + fileUrl; - throw new RuntimeException(message); - } - } - } - catch (IOException e) { - // непредусмотренная ошибка доступа через http-клиент - throw new FileUploadException(e); - } - } - private void sendMessage(@NonNull String topicName, Object object, String messageId, KafkaTemplate template) { ProducerRecord record = new ProducerRecord<>(topicName, diff --git a/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java b/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java index cc52a26..1b4fbc4 100644 --- a/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java +++ b/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java @@ -41,7 +41,7 @@ public class ReceiveScanReportRetryable { @Retryable(retryFor = {RetryableException.class}, maxAttemptsExpression = "${av.retry.max.attempts.count}", backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}")) - public AvResponse checkFile(Path filePath) throws FileUploadException { + public AvResponse checkFile(Path filePath) throws RetryableException, FileUploadException { File file = filePath.toFile(); try (CloseableHttpClient client = HttpClients.createDefault()) { @@ -107,7 +107,7 @@ public class ReceiveScanReportRetryable { } private AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get) - throws FileUploadException { + throws RetryableException, FileUploadException { try (CloseableHttpResponse getResponse = client.execute(get)) { int getStatusCode = getResponse.getStatusLine().getStatusCode();