SUPPORT-9322: Fix
This commit is contained in:
parent
4c3a992270
commit
53ff1458b4
2 changed files with 73 additions and 36 deletions
|
|
@ -8,7 +8,7 @@ import org.springframework.lang.NonNull;
|
|||
/**
|
||||
* @author r.latypov
|
||||
*/
|
||||
public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) {
|
||||
public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo[] filesInfo) {
|
||||
|
||||
@Getter
|
||||
@AllArgsConstructor
|
||||
|
|
|
|||
|
|
@ -5,6 +5,9 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
|
@ -76,72 +79,99 @@ public class FileUploadService {
|
|||
@Header("messageId") String messageId) {
|
||||
DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class);
|
||||
|
||||
Map<Path, DownloadRequest.FileInfo> tempFilesMap = new HashMap<>();
|
||||
|
||||
try {
|
||||
FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl());
|
||||
Files.createDirectories(Paths.get(fileSavingPath));
|
||||
LOGGER.info("working in {}", System.getProperty("user.home"));
|
||||
Path filePath = Paths.get(fileSavingPath, fileUrl.fileName());
|
||||
String downloadUrl = fileUrl.fileUrl();
|
||||
fIleManager.downloadFile(downloadUrl, filePath);
|
||||
boolean isAvError = false;
|
||||
int exitCode = 0;
|
||||
boolean hasError = false;
|
||||
|
||||
if (avCheckEnabled) {
|
||||
try {
|
||||
exitCode = receiveScanReportRetryable.checkFile(filePath);
|
||||
for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) {
|
||||
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
|
||||
Files.createDirectories(Paths.get(fileSavingPath));
|
||||
LOGGER.info("working in {}", System.getProperty("user.home"));
|
||||
Path filePath = Paths.get(fileSavingPath, fileUrl.fileName());
|
||||
String downloadUrl = fileUrl.fileUrl();
|
||||
fIleManager.downloadFile(downloadUrl, filePath);
|
||||
tempFilesMap.put(filePath, fileInfo);
|
||||
|
||||
if (avCheckEnabled) {
|
||||
try {
|
||||
exitCode = receiveScanReportRetryable.checkFile(filePath);
|
||||
}
|
||||
catch (FileUploadException | RetryableException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
isAvError = true;
|
||||
}
|
||||
}
|
||||
catch (FileUploadException | RetryableException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
isAvError = true;
|
||||
|
||||
if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) {
|
||||
hasError = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isAvError || exitCode == INFECTED_CODE || exitCode == PASS_PROTECTED_CODE) {
|
||||
downloadRequest.fileInfo().setFileUrl(null);
|
||||
if (hasError) {
|
||||
FileStatus fileStatus = (exitCode == PASS_PROTECTED_CODE || isAvError)
|
||||
? FileStatus.FILE_STATUS_11
|
||||
: FileStatus.FILE_STATUS_02;
|
||||
downloadRequest.fileInfo().setFileStatus(fileStatus);
|
||||
|
||||
for (DownloadRequest.FileInfo fileInfo : downloadRequest.filesInfo()) {
|
||||
fileInfo.setFileUrl(null);
|
||||
fileInfo.setFileStatus(fileStatus);
|
||||
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
|
||||
fIleManager.deleteFile(fileUrl.fileUrl());
|
||||
}
|
||||
if (!isAvError && exitCode == INFECTED_CODE) {
|
||||
sendMessage(outErrorTopic.name(), downloadRequest, messageId, kafkaTemplate);
|
||||
}
|
||||
}
|
||||
else {
|
||||
String fileRef = s3Service.putFile(filePath, fileUrl.fileName());
|
||||
|
||||
downloadRequest.fileInfo().setFileUrl(fileRef);
|
||||
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03);
|
||||
for (Map.Entry<Path, DownloadRequest.FileInfo> entry : tempFilesMap.entrySet()) {
|
||||
Path filePath = entry.getKey();
|
||||
DownloadRequest.FileInfo fileInfo = entry.getValue();
|
||||
String fileRef = s3Service.putFile(filePath, filePath.getFileName().toString());
|
||||
fileInfo.setFileUrl(fileRef);
|
||||
fileInfo.setFileStatus(FileStatus.FILE_STATUS_03);
|
||||
FileUrl fileUrl = parseFileUrl(fileInfo.getFileUrl());
|
||||
fIleManager.deleteFile(fileUrl.fileUrl());
|
||||
}
|
||||
sendMessage(outSuccessTopic.name(), downloadRequest, messageId, kafkaTemplate);
|
||||
}
|
||||
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
|
||||
DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest);
|
||||
DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(),
|
||||
new DownloadRequest.FileInfo[] {csvFile}
|
||||
);
|
||||
sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate);
|
||||
|
||||
fIleManager.deleteFile(downloadUrl);
|
||||
|
||||
try {
|
||||
Files.delete(filePath);
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOGGER.error("Failed to delete file " + filePath, e);
|
||||
}
|
||||
finally {
|
||||
acknowledgment.acknowledge();
|
||||
for (Path filePath : tempFilesMap.keySet()) {
|
||||
try {
|
||||
Files.deleteIfExists(filePath);
|
||||
}
|
||||
catch (IOException e) {
|
||||
LOGGER.error("Failed to delete file " + filePath, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (InvalidHttpFileUrlException e) {
|
||||
// считаем, что повторная обработка сообщения не нужна
|
||||
// ошибку логируем, отправляем сообщение с новым статусом, помечаем прочтение сообщения
|
||||
LOGGER.error(e.getMessage() + ": " + kafkaInMessage);
|
||||
downloadRequest.fileInfo().setFileUrl(null);
|
||||
downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_11);
|
||||
sendMessage(inStatusTopic.name(), downloadRequest, messageId, inKafkaTemplate);
|
||||
acknowledgment.acknowledge();
|
||||
DownloadRequest.FileInfo csvFile = getCsvFile(downloadRequest);
|
||||
csvFile.setFileStatus(FileStatus.FILE_STATUS_11);
|
||||
csvFile.setFileUrl(null);
|
||||
DownloadRequest statusRequest = new DownloadRequest(downloadRequest.orgInfo(),
|
||||
new DownloadRequest.FileInfo[] {csvFile}
|
||||
);
|
||||
sendMessage(inStatusTopic.name(), statusRequest, messageId, inKafkaTemplate);
|
||||
}
|
||||
catch (FileUploadException | IOException e) {
|
||||
// считаем, что нужно повторное считывание сообщения
|
||||
// ошибку логируем, сообщение оставляем непрочитанным
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
}
|
||||
finally {
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaListener(id = "${spring.kafka.out.consumer.group.id}",
|
||||
|
|
@ -194,4 +224,11 @@ public class FileUploadService {
|
|||
|
||||
private record FileUrl(String fileName, String fileUrl) {
|
||||
}
|
||||
|
||||
private DownloadRequest.FileInfo getCsvFile(DownloadRequest downloadRequest) {
|
||||
return Arrays.stream(downloadRequest.filesInfo())
|
||||
.filter(fi -> fi.getFileName() != null && fi.getFileName().endsWith(".csv"))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue