diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d99b869 --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +#ignore target and tmp dir +target*/ +tmp*/ +debug.log + +# +# IntelliJ IDEA project files +# +.idea*/ +.classes*/ +*.ipr +*.iml +*.iws +*.ids + +# os meta files +Thumbs.db +.DS_Store + +*.orig +pom.xml.versionsBackup +.flattened-pom.xml diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0aaea2c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,4 @@ +FROM bellsoft/liberica-openjdk-alpine:17-cds +COPY target/*.jar app.jar + +CMD ["java", "-jar", "app.jar"] \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index e2a2b70..20e91ad 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -8,19 +8,6 @@ services: networks: - lkrp_av - minio: - image: minio/minio - restart: always - env_file: - - test.env - volumes: - - minio_data:/data - ports: - - '${MINIO_HOST_PORT}:9000' - command: server /data - networks: - - lkrp_av - networks: lkrp_av: diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2050e66 --- /dev/null +++ b/pom.xml @@ -0,0 +1,141 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.3.0 + + + + ru.micord.ervu.lkrp + file-upload + 1.0.0-SNAPSHOT + + + + + com.amazonaws + aws-java-sdk-bom + 1.12.759 + pom + import + + + + com.google.code.gson + gson + 2.11.0 + + + + org.apache.httpcomponents + httpclient + 4.5.14 + + + org.apache.httpcomponents + httpmime + 4.5.14 + + + + org.postgresql + postgresql + 42.7.3 + + + + org.projectlombok + lombok + 1.18.34 + provided + + + + + + + com.amazonaws + aws-java-sdk-s3 + + + + com.google.code.gson + gson + + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpmime + + + + org.postgresql + postgresql + + + + org.projectlombok + lombok + + + + org.springframework + spring-aspects + + + org.springframework + spring-web + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-jooq + + + org.springframework.boot + spring-boot-starter-actuator + + + + org.springframework.kafka + spring-kafka + + + io.github.leofuso + actuator-kafka + v3.0.2.0.RELEASE + + + + org.springframework.retry + spring-retry + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/src/main/java/ru/micord/ervu/av/FileUploadApplication.java b/src/main/java/ru/micord/ervu/av/FileUploadApplication.java new file mode 100644 index 0000000..e1e7979 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/FileUploadApplication.java @@ -0,0 +1,19 @@ +package ru.micord.ervu.av; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.retry.annotation.EnableRetry; +import org.springframework.transaction.annotation.EnableTransactionManagement; + +/** + * @author r.latypov + */ +@EnableRetry +@EnableTransactionManagement +@SpringBootApplication +public class FileUploadApplication { + + public static void main(String[] args) { + SpringApplication.run(FileUploadApplication.class, args); + } +} diff --git a/src/main/java/ru/micord/ervu/av/exception/FileUploadException.java b/src/main/java/ru/micord/ervu/av/exception/FileUploadException.java new file mode 100644 index 0000000..5c2b06a --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/exception/FileUploadException.java @@ -0,0 +1,18 @@ +package ru.micord.ervu.av.exception; + +/** + * @author r.latypov + */ +public class FileUploadException extends Exception { + public FileUploadException(String message) { + super(message); + } + + public FileUploadException(Throwable cause) { + super(cause); + } + + public FileUploadException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/ru/micord/ervu/av/exception/InvalidHttpFileUrlException.java b/src/main/java/ru/micord/ervu/av/exception/InvalidHttpFileUrlException.java new file mode 100644 index 0000000..aaa4a81 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/exception/InvalidHttpFileUrlException.java @@ -0,0 +1,16 @@ +package ru.micord.ervu.av.exception; + +/** + * @author r.latypov + */ +public class InvalidHttpFileUrlException extends Exception { + private static final String MESSAGE = "file url is not valid"; + + public InvalidHttpFileUrlException(String message) { + super(String.join(" : ", MESSAGE, message)); + } + + public InvalidHttpFileUrlException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/ru/micord/ervu/av/exception/RetryableException.java b/src/main/java/ru/micord/ervu/av/exception/RetryableException.java new file mode 100644 index 0000000..3cc7f82 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/exception/RetryableException.java @@ -0,0 +1,7 @@ +package ru.micord.ervu.av.exception; + +/** + * @author r.latypov + */ +public class RetryableException extends RuntimeException { +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java new file mode 100644 index 0000000..7c8f597 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/config/input/InputKafkaConsumerConfig.java @@ -0,0 +1,64 @@ +package ru.micord.ervu.av.kafka.config.input; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +/** + * @author r.latypov + */ +@Configuration +@EnableKafka +public class InputKafkaConsumerConfig { + @Value("${spring.kafka.consumer.bootstrap.servers}") + private List bootstrapAddress; + @Value("${spring.kafka.consumer.security.protocol}") + private String securityProtocol; + @Value("${spring.kafka.consumer.properties.sasl.jaas.config}") + private String jaasConfig; + @Value("${spring.kafka.consumer.properties.sasl.mechanism}") + private String saslMechanism; + @Value("${spring.kafka.consumer.enable.auto.commit}") + private String enableAutoCommit; + @Value("${spring.kafka.listener.ack.mode}") + private String ackMode; + + @Bean + public ConsumerFactory inputConsumerFactory() { + Map configs = new HashMap<>(); + + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + + return new DefaultKafkaConsumerFactory<>(configs); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory inputKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(inputConsumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + return factory; + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java new file mode 100644 index 0000000..bc5ac81 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaConsumerConfig.java @@ -0,0 +1,64 @@ +package ru.micord.ervu.av.kafka.config.output; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +/** + * @author r.latypov + */ +@Configuration +@EnableKafka +public class OutputKafkaConsumerConfig { + @Value("${spring.kafka.out.consumer.bootstrap.servers}") + private List bootstrapAddress; + @Value("${spring.kafka.out.consumer.security.protocol}") + private String securityProtocol; + @Value("${spring.kafka.out.consumer.properties.sasl.jaas.config}") + private String jaasConfig; + @Value("${spring.kafka.out.consumer.properties.sasl.mechanism}") + private String saslMechanism; + @Value("${spring.kafka.out.consumer.enable.auto.commit}") + private String enableAutoCommit; + @Value("${spring.kafka.out.listener.ack.mode}") + private String ackMode; + + @Bean + public ConsumerFactory outputConsumerFactory() { + Map configs = new HashMap<>(); + + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); + + return new DefaultKafkaConsumerFactory<>(configs); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory outputKafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(outputConsumerFactory()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + return factory; + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java new file mode 100644 index 0000000..2a14b10 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaProducerConfig.java @@ -0,0 +1,51 @@ +package ru.micord.ervu.av.kafka.config.output; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +/** + * @author r.latypov + */ +@Configuration +public class OutputKafkaProducerConfig { + @Value("${spring.kafka.producer.bootstrap.servers}") + private List bootstrapAddress; + @Value("${spring.kafka.producer.security.protocol}") + private String securityProtocol; + @Value("${spring.kafka.producer.properties.sasl.jaas.config}") + private String jaasConfig; + @Value("${spring.kafka.producer.properties.sasl.mechanism}") + private String saslMechanism; + + @Bean + public ProducerFactory producerFactory() { + Map configs = new HashMap<>(); + + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); + configs.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + + return new DefaultKafkaProducerFactory<>(configs); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java new file mode 100644 index 0000000..79f1ef8 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/config/output/OutputKafkaTopicConfig.java @@ -0,0 +1,28 @@ +package ru.micord.ervu.av.kafka.config.output; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; + +/** + * @author r.latypov + */ +@Configuration +public class OutputKafkaTopicConfig { + @Value("${kafka.out.error.topic.name}") + private String kafkaOutErrorTopicName; + @Value("${kafka.out.success.topic.name}") + private String kafkaOutSuccessTopicName; + + @Bean + public NewTopic outErrorTopic() { + return TopicBuilder.name(kafkaOutErrorTopicName).build(); + } + + @Bean + public NewTopic outSuccessTopic() { + return TopicBuilder.name(kafkaOutSuccessTopicName).build(); + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java new file mode 100644 index 0000000..eb49660 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadRequest.java @@ -0,0 +1,27 @@ +package ru.micord.ervu.av.kafka.dto; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.springframework.lang.NonNull; + +/** + * @author r.latypov + */ +public record DownloadRequest(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { + + @Getter + @AllArgsConstructor + public static class FileInfo { + @Setter + private String fileUrl; + private final String fileId; + private final String fileName; + private final String filePatternCode; + private final String FilePatternName; + private final String departureDateTime; + private final String timeZone; + @Setter + private FileStatus fileStatus; + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java new file mode 100644 index 0000000..56d7b7e --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/DownloadResponse.java @@ -0,0 +1,11 @@ +package ru.micord.ervu.av.kafka.dto; + +import org.springframework.lang.NonNull; + +/** + * @author r.latypov + */ +public record DownloadResponse(OrgInfo orgInfo, @NonNull FileInfo fileInfo) { + public record FileInfo(String fileId, FileStatus fileStatus) { + } +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/FileStatus.java b/src/main/java/ru/micord/ervu/av/kafka/dto/FileStatus.java new file mode 100644 index 0000000..95729f0 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/FileStatus.java @@ -0,0 +1,19 @@ +package ru.micord.ervu.av.kafka.dto; + +/** + * @author r.latypov + */ +public record FileStatus(String code, String status, String description) { + public static final FileStatus FILE_STATUS_01 = new FileStatus("01", "Загрузка", + "Файл принят до проверки на вирусы" + ); + public static final FileStatus FILE_STATUS_02 = new FileStatus("02", "Проверка не пройдена", + "Проверка на вирусы не пройдена" + ); + public static final FileStatus FILE_STATUS_03 = new FileStatus("03", "Направлено в ЕРВУ", + "Проверка на вирусы пройдена успешно, файл направлен в очередь" + ); + public static final FileStatus FILE_STATUS_04 = new FileStatus("04", "Получен ЕРВУ", + "Файл был принят в обработку" + ); +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/OrgInfo.java b/src/main/java/ru/micord/ervu/av/kafka/dto/OrgInfo.java new file mode 100644 index 0000000..3b50b59 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/OrgInfo.java @@ -0,0 +1,7 @@ +package ru.micord.ervu.av.kafka.dto; + +/** + * @author r.latypov + */ +public record OrgInfo(String orgId, String orgName, SenderInfo senderInfo) { +} diff --git a/src/main/java/ru/micord/ervu/av/kafka/dto/SenderInfo.java b/src/main/java/ru/micord/ervu/av/kafka/dto/SenderInfo.java new file mode 100644 index 0000000..af8f0b0 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/kafka/dto/SenderInfo.java @@ -0,0 +1,7 @@ +package ru.micord.ervu.av.kafka.dto; + +/** + * @author Eduard Tihomirov + */ +public record SenderInfo(String prnOid, String lastName, String firstName, String middleName) { +} diff --git a/src/main/java/ru/micord/ervu/av/response/AvFileSendResponse.java b/src/main/java/ru/micord/ervu/av/response/AvFileSendResponse.java new file mode 100644 index 0000000..ea8dd00 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/response/AvFileSendResponse.java @@ -0,0 +1,11 @@ +package ru.micord.ervu.av.response; + +/** + * @author r.latypov + */ +public record AvFileSendResponse(String id, String location, Error error, String status) { + public static final String STATUS_ERROR = "error"; + + public record Error(String code, String message) { + } +} diff --git a/src/main/java/ru/micord/ervu/av/response/AvResponse.java b/src/main/java/ru/micord/ervu/av/response/AvResponse.java new file mode 100644 index 0000000..48e71ae --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/response/AvResponse.java @@ -0,0 +1,17 @@ +package ru.micord.ervu.av.response; + +import java.util.Map; + +/** + * @author r.latypov + */ +public record AvResponse(String completed, String created, Integer progress, + Map scan_result, String status, String[] verdicts) { + public record Scan(String started, String stopped, Threat[] threats, String verdict) { + public static final String VERDICT_CLEAN = "clean"; + public static final String VERDICT_INFECTED = "infected"; + + public record Threat(String name, String object) { + } + } +} diff --git a/src/main/java/ru/micord/ervu/av/s3/S3Connection.java b/src/main/java/ru/micord/ervu/av/s3/S3Connection.java new file mode 100644 index 0000000..fd6813e --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/s3/S3Connection.java @@ -0,0 +1,52 @@ +package ru.micord.ervu.av.s3; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author r.latypov + */ +@Configuration +public class S3Connection { + @Value("${s3.out.endpoint}") + private String endpointOut; + @Value("${s3.out.port:9000}") + private int portOut; + @Value("${s3.out.access_key}") + private String accessKeyOut; + @Value("${s3.out.secret_key}") + private String secretKeyOut; + @Value("${s3.out.bucket_name}") + private String bucketNameOut; + + @Bean("outBucketName") + public String getBucketNameOut() { + return bucketNameOut; + } + + @Bean("outClient") + public AmazonS3 getS3OutClient() { + return getS3Client(endpointOut, portOut, accessKeyOut, secretKeyOut); + } + + private static AmazonS3 getS3Client(String endpoint, int port, String accessKey, + String secretKey) { + AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); + String s3Endpoint = endpoint + ":" + port; + String region = Region.getRegion(Regions.DEFAULT_REGION).toString(); + + return AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3Endpoint, region)) + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .build(); + } +} diff --git a/src/main/java/ru/micord/ervu/av/s3/S3Service.java b/src/main/java/ru/micord/ervu/av/s3/S3Service.java new file mode 100644 index 0000000..9178d2c --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/s3/S3Service.java @@ -0,0 +1,48 @@ +package ru.micord.ervu.av.s3; + +import java.io.File; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import jakarta.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import ru.micord.ervu.av.exception.FileUploadException; + +/** + * @author r.latypov + */ +@Service +public class S3Service { + private final String outBucketName; + private final AmazonS3 outClient; + + @Autowired + public S3Service(String outBucketName, AmazonS3 outClient) { + this.outBucketName = outBucketName; + this.outClient = outClient; + } + + @PostConstruct + private void init() { + if (!outClient.doesBucketExistV2(outBucketName)) { + outClient.createBucket(outBucketName); + } + } + + public void putFile(String filePath, String key) throws FileUploadException { + try { + outClient.putObject(outBucketName, generateResourceName(outBucketName, key), + new File(filePath) + ); + } + catch (AmazonServiceException e) { + // todo message + throw new FileUploadException(e); + } + } + + private static String generateResourceName(String bucketName, String key) { + return String.join("/", bucketName, key); + } +} diff --git a/src/main/java/ru/micord/ervu/av/service/FileStatusService.java b/src/main/java/ru/micord/ervu/av/service/FileStatusService.java new file mode 100644 index 0000000..82e2c7c --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/service/FileStatusService.java @@ -0,0 +1,40 @@ +package ru.micord.ervu.av.service; + +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.Record; +import org.jooq.Table; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import static org.jooq.impl.DSL.field; +import static org.jooq.impl.DSL.name; +import static org.jooq.impl.DSL.table; + +/** + * @author r.latypov + */ +@Service +public class FileStatusService { + public static final Table INTERACTION_LOG = table(name("public", "interaction_log")); + public static final Field INTERACTION_LOG_ID = field(name("id"), Long.class); + public static final Field INTERACTION_LOG_FILE_ID = field(name("file_id"), String.class); + public static final Field INTERACTION_LOG_STATUS = field(name("status"), String.class); + + @Autowired + private DSLContext dslContext; + + public void setStatus(Long id, String status) { + dslContext.update(INTERACTION_LOG) + .set(INTERACTION_LOG_STATUS, status) + .where(INTERACTION_LOG_ID.eq(id)) + .execute(); + } + + public void setStatus(String fileId, String status) { + dslContext.update(INTERACTION_LOG) + .set(INTERACTION_LOG_STATUS, status) + .where(INTERACTION_LOG_FILE_ID.eq(fileId)) + .execute(); + } +} diff --git a/src/main/java/ru/micord/ervu/av/service/FileUploadService.java b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java new file mode 100644 index 0000000..380cca8 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/service/FileUploadService.java @@ -0,0 +1,294 @@ +package ru.micord.ervu.av.service; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.conn.HttpHostConnectException; +import org.apache.http.entity.mime.MultipartEntityBuilder; +import org.apache.http.entity.mime.content.FileBody; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.kafka.clients.admin.NewTopic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.SendResult; +import org.springframework.lang.NonNull; +import org.springframework.stereotype.Service; +import ru.micord.ervu.av.exception.FileUploadException; +import ru.micord.ervu.av.exception.InvalidHttpFileUrlException; +import ru.micord.ervu.av.kafka.dto.DownloadRequest; +import ru.micord.ervu.av.kafka.dto.DownloadResponse; +import ru.micord.ervu.av.kafka.dto.FileStatus; +import ru.micord.ervu.av.response.AvFileSendResponse; +import ru.micord.ervu.av.response.AvResponse; +import ru.micord.ervu.av.s3.S3Service; + +/** + * @author r.latypov + */ +@Service +public class FileUploadService { + private static final Logger logger = LoggerFactory.getLogger(FileUploadService.class); + @Value("${av.rest.address}") + private String avRestAddress; + @Value("${av.first.timeout.milliseconds}") + private Long avFirstTimeoutMilliseconds; + @Value("${file.saving.path}") + private String fileSavingPath; + private final KafkaTemplate kafkaTemplate; + private final NewTopic outErrorTopic; + private final NewTopic outSuccessTopic; + private final ReceiveScanReportRetryable receiveScanReportRetryable; + private final FileStatusService fileStatusService; + private final S3Service s3Service; + + @Autowired + public FileUploadService(KafkaTemplate kafkaTemplate, NewTopic outErrorTopic, + NewTopic outSuccessTopic, ReceiveScanReportRetryable receiveScanReportRetryable, + FileStatusService fileStatusService, S3Service s3Service) { + this.kafkaTemplate = kafkaTemplate; + this.outErrorTopic = outErrorTopic; + this.outSuccessTopic = outSuccessTopic; + this.receiveScanReportRetryable = receiveScanReportRetryable; + this.fileStatusService = fileStatusService; + this.s3Service = s3Service; + } + + @KafkaListener(id = "${spring.kafka.consumer.group.id}", topics = "${kafka.in.topic.name}", + containerFactory = "inputKafkaListenerContainerFactory") + public void listenKafkaIn(String kafkaInMessage, Acknowledgment acknowledgment) { + DownloadRequest downloadRequest = new Gson().fromJson(kafkaInMessage, DownloadRequest.class); + String fileId = downloadRequest.fileInfo().getFileId(); + + try { + FileUrl fileUrl = parseFileUrl(downloadRequest.fileInfo().getFileUrl()); + String filePath = fileSavingPath + fileUrl.fileName(); + String downloadUrl = fileUrl.fileUrl(); + downloadFile(downloadUrl, filePath); + + AvResponse avResponse = checkFile(filePath); + + boolean clean = Arrays.stream(avResponse.verdicts()) + .anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_CLEAN)); + boolean infected = Arrays.stream(avResponse.verdicts()) + .anyMatch(verdict -> verdict.equalsIgnoreCase(AvResponse.Scan.VERDICT_INFECTED)); + + if (infected || !clean) { + downloadRequest.fileInfo().setFileUrl(null); + downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_02); + sendMessage(outErrorTopic.name(), downloadRequest); + + fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_02.status()); + } + else { + s3Service.putFile(filePath, fileUrl.fileName()); + + downloadRequest.fileInfo().setFileUrl(fileUrl.fileName()); + downloadRequest.fileInfo().setFileStatus(FileStatus.FILE_STATUS_03); + sendMessage(outSuccessTopic.name(), downloadRequest); + + fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_03.status()); + } + + deleteFile(downloadUrl); + if (new File(filePath).delete()) { + acknowledgment.acknowledge(); + } + } + catch (InvalidHttpFileUrlException e) { + // считаем, что повторная обработка сообщения не нужна + // ошибку логируем, сообщаем об ошибке, помечаем прочтение сообщения + logger.error(e.getMessage() + ": " + kafkaInMessage); + acknowledgment.acknowledge(); + throw new RuntimeException(kafkaInMessage, e); + } + catch (FileUploadException e) { + // считаем, что нужно повторное считывание сообщения + // ошибку логируем, сообщение оставляем непрочитанным + logger.error(e.getMessage(), e); + } + } + + @KafkaListener(id = "${spring.kafka.out.consumer.group.id}", + topics = "${kafka.out.response.topic.name}", + containerFactory = "outputKafkaListenerContainerFactory") + public void listenKafkaOut(String kafkaOutResponseMessage, Acknowledgment acknowledgment) { + DownloadResponse downloadResponse = new Gson().fromJson(kafkaOutResponseMessage, + DownloadResponse.class + ); + + String fileId = downloadResponse.fileInfo().fileId(); + FileStatus fileStatus = downloadResponse.fileInfo().fileStatus(); + if (fileStatus.code().equalsIgnoreCase(FileStatus.FILE_STATUS_04.code())) { + fileStatusService.setStatus(fileId, FileStatus.FILE_STATUS_04.status()); + } + + acknowledgment.acknowledge(); + } + + /* метод для выделения UUID файла из ссылки на файл + сохраняем на диске и отправляем файл в хранилище под тем же UUID, сохраняя расширение файла + */ + private static FileUrl parseFileUrl(@NonNull String fileUrl) throws InvalidHttpFileUrlException { + String[] substrings = fileUrl.split("/"); + String fileName = substrings[substrings.length - 1]; + if (substrings.length == 1 || fileName.isBlank()) { + // ошибка данных; сообщение некорректно + throw new InvalidHttpFileUrlException(fileUrl); + } + else { + return new FileUrl(fileName, fileUrl); + } + } + + private void downloadFile(String fileUrl, String filePath) + throws InvalidHttpFileUrlException, FileUploadException { + File file = new File(filePath); + HttpGet request = new HttpGet(fileUrl); + + try (CloseableHttpClient client = HttpClients.createDefault(); + CloseableHttpResponse response = client.execute(request)) { + + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.OK.value()) { + HttpEntity entity = response.getEntity(); + if (entity != null) { + try (FileOutputStream outputStream = new FileOutputStream(file)) { + entity.writeTo(outputStream); + } + } + } + else { + // в хранилище не обнаружено файла; сообщение некорректно + String message = "http status code " + statusCode + " : " + fileUrl; + throw new InvalidHttpFileUrlException(message); + } + } + catch (HttpHostConnectException e) { + throw new FileUploadException(e); + } + catch (IOException e) { + // хранилище недоступно; сообщение некорректно + String message = + (e.getMessage() == null ? e.getCause().getMessage() : e.getMessage()) + " : " + fileUrl; + throw new InvalidHttpFileUrlException(message, e); + } + } + + private AvResponse checkFile(String filePath) throws FileUploadException { + File file = new File(filePath); + + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpPost post = new HttpPost(avRestAddress); + HttpEntity entity = MultipartEntityBuilder.create() + .addPart("file", new FileBody(file)) + .build(); + post.setEntity(entity); + + try (CloseableHttpResponse postResponse = client.execute(post)) { + int postStatusCode = postResponse.getStatusLine().getStatusCode(); + String postResponseJson = EntityUtils.toString(postResponse.getEntity()); + + AvFileSendResponse avFileSendResponse; + try { + avFileSendResponse = new Gson().fromJson(postResponseJson, AvFileSendResponse.class); + } + catch (JsonSyntaxException e) { + throw new FileUploadException("error json: " + postResponseJson, e); + } + + if (postStatusCode != HttpStatus.CREATED.value()) { + StringBuilder stringBuilder = new StringBuilder( + "http status code " + postStatusCode + " for " + avRestAddress + " post request."); + + String status = avFileSendResponse.status(); + if (status != null) { + stringBuilder.append(" Status: ").append(status).append("."); + } + if (avFileSendResponse.error() != null) { + stringBuilder.append(" Error code: ") + .append(avFileSendResponse.error().code()) + .append(". Error message: ") + .append(avFileSendResponse.error().message()) + .append(". "); + } + throw new FileUploadException(stringBuilder.toString()); + } + + String id = avFileSendResponse.id(); + String reportRequestUri = avRestAddress + "/" + id; + HttpGet get = new HttpGet(reportRequestUri); + + // waiting for timeout time before first request + try { + TimeUnit.MILLISECONDS.sleep( + avFirstTimeoutMilliseconds == null ? 1000L : avFirstTimeoutMilliseconds); + } + catch (InterruptedException e) { + throw new FileUploadException(e); + } + + return receiveScanReportRetryable.receiveScanReport(client, get); + } + } + catch (IOException e) { + // непредусмотренная ошибка доступа через http-клиент + throw new FileUploadException(e); + } + } + + private void deleteFile(String fileUrl) throws FileUploadException { + try (CloseableHttpClient client = HttpClients.createDefault()) { + HttpDelete delete = new HttpDelete(fileUrl); + + try (CloseableHttpResponse response = client.execute(delete)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != HttpStatus.NO_CONTENT.value()) { + String message = "http status code " + statusCode + " : " + fileUrl; + throw new RuntimeException(message); + } + } + } + catch (IOException e) { + // непредусмотренная ошибка доступа через http-клиент + throw new FileUploadException(e); + } + } + + private void sendMessage(@NonNull String topicName, Object object) { + CompletableFuture> future = kafkaTemplate.send(topicName, + new GsonBuilder().setPrettyPrinting().create().toJson(object) + ); + + future.whenComplete((result, e) -> { + if (e != null) { + String message = + "Unable to send message [" + result.getProducerRecord().value() + "] into kafka topic '" + + topicName + "' due to : " + e.getMessage(); + throw new RuntimeException(message, e); + } + }); + } + + private record FileUrl(String fileName, String fileUrl) { + } +} diff --git a/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java b/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java new file mode 100644 index 0000000..77d3838 --- /dev/null +++ b/src/main/java/ru/micord/ervu/av/service/ReceiveScanReportRetryable.java @@ -0,0 +1,54 @@ +package ru.micord.ervu.av.service; + +import java.io.IOException; + +import com.google.gson.Gson; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.springframework.http.HttpStatus; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; +import ru.micord.ervu.av.exception.FileUploadException; +import ru.micord.ervu.av.exception.RetryableException; +import ru.micord.ervu.av.response.AvResponse; + +/** + * @author r.latypov + */ +@Service +public class ReceiveScanReportRetryable { + @Retryable(retryFor = {RetryableException.class}, + maxAttemptsExpression = "${av.retry.max.attempts.count}", + backoff = @Backoff(delayExpression = "${av.retry.delay.milliseconds}")) + public AvResponse receiveScanReport(CloseableHttpClient client, HttpGet get) + throws FileUploadException { + + try (CloseableHttpResponse getResponse = client.execute(get)) { + int getStatusCode = getResponse.getStatusLine().getStatusCode(); + if (getStatusCode == HttpStatus.OK.value()) { + String getResponseJson = EntityUtils.toString(getResponse.getEntity()); + AvResponse avResponse = new Gson().fromJson(getResponseJson, AvResponse.class); + if (avResponse.completed() == null) { + throw new RetryableException(); + } + return avResponse; + } + else { + throw new FileUploadException("http status code " + getStatusCode + " for " + get.getURI() + + " get request."); + } + } + catch (ClientProtocolException e) { + // непредусмотренная ошибка доступа через http-клиент + throw new RuntimeException(e); + } + catch (IOException e) { + // непредусмотренная ошибка доступа через http-клиент + throw new FileUploadException(e); + } + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties new file mode 100644 index 0000000..5165cad --- /dev/null +++ b/src/main/resources/application.properties @@ -0,0 +1,84 @@ +server.servlet.context-path=/av + +# spring kafka default beans properties begin -> +# kafka out admin bean settings +spring.kafka.admin.security.protocol=SASL_PLAINTEXT +#login password to set +spring.kafka.admin.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${OUT_KAFKA_USERNAME}" password="${OUT_KAFKA_PASSWORD}"; +spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256 +# kafka out servers for admin bean +#host1:port1, host2:port2 +spring.kafka.bootstrap.servers=${OUT_KAFKA_SERVERS} +# +# kafka in consumer (with possibility for default bean) +#host1:port1, host2:port2 +spring.kafka.consumer.bootstrap.servers=${AV_KAFKA_SERVERS} +spring.kafka.consumer.security.protocol=SASL_PLAINTEXT +#login password to set +spring.kafka.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${AV_KAFKA_USERNAME}" password="${AV_KAFKA_PASSWORD}"; +spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256 +# +spring.kafka.consumer.enable.auto.commit=false +spring.kafka.consumer.group.id=file-to-upload-consumers +# kafka in listeners +spring.kafka.listener.ack.mode=MANUAL_IMMEDIATE +# +# kafka out producer (with possibility for default bean) +#host1:port1, host2:port2 +spring.kafka.producer.bootstrap.servers=${ERVU_KAFKA_SERVERS} +spring.kafka.producer.security.protocol=SASL_PLAINTEXT +#login password to set +spring.kafka.producer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}"; +spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256 +# +# kafka out general +spring.kafka.properties.security.protocol=SASL_PLAINTEXT +#login password to set +spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}"; +spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256 +# spring kafka default beans properties <- end +# +# kafka out consumer (not for default bean creation by spring) +#host1:port1, host2:port2 +spring.kafka.out.consumer.bootstrap.servers=${ERVU_KAFKA_SERVERS} +spring.kafka.out.consumer.security.protocol=SASL_PLAINTEXT +#login password to set +spring.kafka.out.consumer.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="${ERVU_KAFKA_USERNAME}" password="${ERVU_KAFKA_PASSWORD}"; +spring.kafka.out.consumer.properties.sasl.mechanism=SCRAM-SHA-256 +# +spring.kafka.out.consumer.enable.auto.commit=false +spring.kafka.out.consumer.group.id=response-consumers +# kafka out listeners +spring.kafka.out.listener.ack.mode=MANUAL_IMMEDIATE +# +# +kafka.in.topic.name=${AV_KAFKA_TOPIC_NAME} +kafka.out.error.topic.name=${ERVU_KAFKA_ERROR_TOPIC_NAME} +kafka.out.success.topic.name=${ERVU_KAFKA_SUCCESS_TOPIC_NAME} +kafka.out.response.topic.name=${ERVU_KAFKA_RESPONSE_TOPIC_NAME} +# +av.rest.address=${AV_REST_ADDRESS} +av.first.timeout.milliseconds=${AV_FIRST_TIMEOUT_MILLISECONDS} +av.retry.max.attempts.count=${AV_RETRY_MAX_ATTEMPTS_COUNT} +av.retry.delay.milliseconds=${AV_RETRY_DELAY_MILLISECONDS} +file.saving.path=${FILE_SAVING_PATH} +# +s3.out.endpoint=${S3_ENDPOINT} +s3.out.port=${S3_PORT} +s3.out.access_key=${S3_ACCESS_KEY} +s3.out.secret_key=${S3_SECRET_KEY} +s3.out.bucket_name=${S3_OUT_BUCKET_NAME} +# +# spring jooq dsl bean properties begin -> +spring.jooq.sql-dialect=Postgres +spring.datasource.driver-class-name=org.postgresql.Driver +#host:port/database_name +spring.datasource.url=jdbc:postgresql://${SPRING_DATASOURCE_URL} +spring.datasource.username=${SPRING_DATASOURCE_USERNAME} +spring.datasource.password=${SPRING_DATASOURCE_PASSWORD} +# spring jooq dsl bean properties <- end + +# endpoints management +management.endpoints.web.exposure.include = health, info, metrics +management.endpoint.health.show-details = always +management.endpoint.health.group.readiness.include = kafka diff --git a/test.env b/test.env index f0a194e..8d3216d 100644 --- a/test.env +++ b/test.env @@ -1,2 +1,23 @@ -MINIO_ROOT_USER=changeIt123 -MINIO_ROOT_PASSWORD=changeIt123 \ No newline at end of file +AV_KAFKA_SERVERS=10.10.31.11:32609 +AV_KAFKA_USERNAME=user1 +AV_KAFKA_PASSWORD=Blfi9d2OFG +AV_KAFKA_TOPIC_NAME=file-to-upload +ERVU_KAFKA_SERVERS=10.10.31.11:32609 +ERVU_KAFKA_USERNAME=user1 +ERVU_KAFKA_PASSWORD=Blfi9d2OFG +ERVU_KAFKA_ERROR_TOPIC_NAME=ervu.lkrp.download.request +ERVU_KAFKA_SUCCESS_TOPIC_NAME=ervu.lkrp.download.request +ERVU_KAFKA_RESPONSE_TOPIC_NAME=ervu.lkrp.download.response +AV_REST_ADDRESS=http://10.10.31.118:8085/scans +AV_FIRST_TIMEOUT_MILLISECONDS=1000 +AV_RETRY_MAX_ATTEMPTS_COUNT=10 +AV_RETRY_DELAY_MILLISECONDS=1000 +FILE_SAVING_PATH=/transfer/ +S3_ENDPOINT=http://ervu-minio.k8s.micord.ru +S3_PORT=31900 +S3_ACCESS_KEY=Keyq0l8IRarEf5GmpvEO +S3_SECRET_KEY=8A2epSoI6OjdHHwA5F6tHxeYRThv47GdGwcBrV7a +S3_OUT_BUCKET_NAME=default-out-bucket +SPRING_DATASOURCE_URL=10.10.31.119:5432/ervu-lkrp-ul +SPRING_DATASOURCE_USERNAME=ervu-lkrp-ul +SPRING_DATASOURCE_PASSWORD=ervu-lkrp-ul