diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java index 822fcf9..886297d 100644 --- a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java @@ -8,7 +8,7 @@ import org.springframework.lang.NonNull; /** * @author r.latypov */ -public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { +public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo[] filesInfo) { @Getter @AllArgsConstructor diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java index 897f56c..850c331 100644 --- a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -5,6 +5,9 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import com.google.gson.Gson; @@ -76,72 +79,99 @@ public class FileUploadService { @Header("messageId") String messageId) { DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); + Map tempFilesMap = new HashMap<>(); + try { - FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl()); - Files.createDirectories(Paths.get(fileSavingPath)); - LOGGER.info("working in {}", System.getProperty("user.home")); - Path filePath = Paths.get(fileSavingPath, fileUrl.fileName()); - String downloadUrl = fileUrl.fileUrl(); - fIleManager.downloadFile(downloadUrl, filePath); boolean isAvError = false; int exitCode = 0; + boolean hasError = false; - if (avCheckEnabled) { - try { - exitCode = receiveScanReportRetryable.checkFile(filePath); + for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) { + FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); + Files.createDirectories(Paths.get(fileSavingPath)); + LOGGER.info("working in {}", System.getProperty("user.home")); + Path filePath = Paths.get(fileSavingPath, fileUrl.fileName()); + String downloadUrl = fileUrl.fileUrl(); + fIleManager.downloadFile(downloadUrl, filePath); + tempFilesMap.put(filePath, fileInfo); + + if (avCheckEnabled) { + try { + exitCode = receiveScanReportRetryable.checkFile(filePath); + } + catch (FileUploadException | RetryableException e) { + LOGGER.error(e.getMessage(), e); + isAvError = true; + } } - catch (FileUploadException | RetryableException e) { - LOGGER.error(e.getMessage(), e); - isAvError = true; + + if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) { + hasError = true; + break; } } - if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) { - downloadRequest.fileInfo().setFileUrl(null); + if (hasError) { FileStatus fileStatus = (exitCode == PASS_PROTECTED_CODE || isAvError) ? FileStatus.FILE_STATUS_11 : FileStatus.FILE_STATUS_02; - downloadRequest.fileInfo().setFileStatus(fileStatus); - + for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) { + fileInfo.setFileUrl(null); + fileInfo.setFileStatus(fileStatus); + FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); + fIleManager.deleteFile(fileUrl.fileUrl()); + } if (!isAvError && exitCode == INFECTED_CODE) { sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate); } } else { - String fileRef = s3Service.putFile(filePath, fileUrl.fileName()); - - downloadRequest.fileInfo().setFileUrl(fileRef); - downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03); + for (Map.Entry entry : tempFilesMap.entrySet()) { + Path filePath = entry.getKey(); + DownloadRequest.FileInfo fileInfo = entry.getValue(); + String fileRef = s3Service.putFile(filePath, filePath.getFileName().toString()); + fileInfo.setFileUrl(fileRef); + fileInfo.setFileStatus(FileStatus.FILE_STATUS_03); + FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl()); + fIleManager.deleteFile(fileUrl.fileUrl()); + } sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate); } - sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); + DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest); + DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(), + new DownloadRequest.FileInfo[] {csvFile} + ); + sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate); - fIleManager.deleteFile(downloadUrl); - - try { - Files.delete(filePath); - } - catch (IOException e) { - LOGGER.error("Failed to delete file " + filePath, e); - } - finally { - acknowledgment.acknowledge(); + for (Path filePath : tempFilesMap.keySet()) { + try { + Files.deleteIfExists(filePath); + } + catch (IOException e) { + LOGGER.error("Failed to delete file " + filePath, e); + } } } catch (InvalidHttpFileUrlException e) { // считаем, что повторная обработка сообщения не нужна // ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения LOGGER.error(e.getMessage() + ": " + kafkaInMessage); - downloadRequest.fileInfo().setFileUrl(null); - downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_11); - sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate); - acknowledgment.acknowledge(); + DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest); + csvFile.setFileStatus(FileStatus.FILE_STATUS_11); + csvFile.setFileUrl(null); + DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(), + new DownloadRequest.FileInfo[] {csvFile} + ); + sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate); } catch (FileUploadException | IOException e) { // считаем, что нужно повторное считывание сообщения // ошибку логируем, сообщение оставляем непрочитанным LOGGER.error(e.getMessage(), e); } + finally { + acknowledgment.acknowledge(); + } } @KafkaListener(id = "${spring.kafka.out.consumer.group.id}", @@ -194,4 +224,11 @@ public class FileUploadService { private record FileUrl(String fileName, String fileUrl) { } + + private DownloadRequest.FileInfo getCsvFile(DownloadRequest downloadRequest) { + return Arrays.stream(downloadRequest.filesInfo()) + .filter(fi -> fi.getFileName() != null && fi.getFileName().endsWith(".csv")) + .findFirst() + .orElse(null); + } }