Merge remote-tracking branch 'origin/release/1.0.0' into release/1.0.0
This commit is contained in:
commit
8b7a8845f5
7 changed files with 75 additions and 5 deletions
|
|
@ -7,9 +7,12 @@ import org.apache.kafka.clients.CommonClientConfigs;
|
|||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
|
@ -18,7 +21,7 @@ import org.springframework.kafka.core.ProducerFactory;
|
|||
* @author Alexandr Shalaginov
|
||||
*/
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
public class AvKafkaConfig {
|
||||
@Value("${av.kafka.bootstrap.servers}")
|
||||
private String kafkaUrl;
|
||||
@Value("${av.kafka.security.protocol}")
|
||||
|
|
@ -32,12 +35,14 @@ public class KafkaProducerConfig {
|
|||
@Value("${av.kafka.sasl.mechanism}")
|
||||
private String saslMechanism;
|
||||
|
||||
@Bean("av-factory")
|
||||
@Bean()
|
||||
@Qualifier("avProducerFactory")
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
return new DefaultKafkaProducerFactory<>(producerConfigs());
|
||||
}
|
||||
|
||||
@Bean("av-configs")
|
||||
@Bean()
|
||||
@Qualifier("avProducerConfigs")
|
||||
public Map<String, Object> producerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
|
||||
|
|
@ -52,7 +57,30 @@ public class KafkaProducerConfig {
|
|||
return props;
|
||||
}
|
||||
|
||||
@Bean("av-template")
|
||||
@Bean()
|
||||
@Qualifier("avConsumerFactory")
|
||||
public ConsumerFactory<String, String> consumerFactory() {
|
||||
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
|
||||
}
|
||||
|
||||
@Bean()
|
||||
@Qualifier("avConsumerConfigs")
|
||||
public Map<String, Object> consumerConfigs() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaUrl);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
|
||||
props.put(SaslConfigs.SASL_JAAS_CONFIG, loginModule + " required username=\""
|
||||
+ username + "\" password=\"" + password + "\";");
|
||||
props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
|
||||
|
||||
return props;
|
||||
}
|
||||
|
||||
@Bean()
|
||||
@Qualifier("avTemplate")
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
package ervu.model.fileupload;
|
||||
|
||||
/**
|
||||
* @author r.latypov
|
||||
*/
|
||||
public record DownloadResponse(OrgInfo orgInfo, FileInfo fileInfo) {
|
||||
}
|
||||
|
|
@ -9,14 +9,17 @@ import java.util.UUID;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import ervu.client.fileupload.FileUploadWebDavClient;
|
||||
import ervu.model.fileupload.DownloadResponse;
|
||||
import ervu.model.fileupload.EmployeeInfoFileFormType;
|
||||
import ervu.model.fileupload.EmployeeInfoKafkaMessage;
|
||||
import ervu.model.fileupload.FileInfo;
|
||||
import ervu.model.fileupload.FileStatus;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
|
@ -147,4 +150,21 @@ public class EmployeeInfoFileUploadService {
|
|||
private String getTimeZone() {
|
||||
return ZonedDateTime.now().getOffset().toString();
|
||||
}
|
||||
|
||||
@KafkaListener(id = "${av.kafka.group.id}", topics = "${av.kafka.download.response}",
|
||||
containerFactory = "av-cons-factory")
|
||||
public void listenKafka(String kafkaMessage) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
try {
|
||||
DownloadResponse downloadResponse = mapper.readValue(kafkaMessage, DownloadResponse.class);
|
||||
FileInfo fileInfo = downloadResponse.fileInfo();
|
||||
interactionService.updateStatus(fileInfo.getFileId(), fileInfo.getFileStatus().getStatus(),
|
||||
downloadResponse.orgInfo().getOrgId()
|
||||
);
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(String.format("Fail get json from: %s", kafkaMessage), e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,4 +13,6 @@ public interface InteractionService {
|
|||
List<InteractionLogRecord> get(String ervuId, String[] excludedStatuses);
|
||||
|
||||
void setStatus(String fileId, String status, String fileName, String form, Timestamp timestamp, String sender, Integer count, String ervuId);
|
||||
|
||||
void updateStatus(String fileId, String status, String ervuId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,6 +42,15 @@ public class InteractionServiceImpl implements InteractionService {
|
|||
.set(INTERACTION_LOG.SENDER, sender)
|
||||
.set(INTERACTION_LOG.FILE_NAME, fileName)
|
||||
.set(INTERACTION_LOG.RECORDS_SENT, count)
|
||||
.set(INTERACTION_LOG.ERVU_ID, ervuId);
|
||||
.set(INTERACTION_LOG.ERVU_ID, ervuId)
|
||||
.execute();
|
||||
}
|
||||
|
||||
public void updateStatus(String fileId, String status, String ervuId) {
|
||||
dslContext.update(INTERACTION_LOG)
|
||||
.set(INTERACTION_LOG.STATUS, status)
|
||||
.where(INTERACTION_LOG.ERVU_ID.eq(ervuId))
|
||||
.and(INTERACTION_LOG.FILE_ID.eq(fileId))
|
||||
.execute();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@ AV_KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
|
|||
AV_KAFKA_SASL_MECHANISM=SCRAM-SHA-256
|
||||
AV_KAFKA_USERNAME=user1
|
||||
AV_KAFKA_PASSWORD=Blfi9d2OFG
|
||||
AV_KAFKA_GROUP_ID=1
|
||||
AV_KAFKA_DOWNLOAD_RESPONSE=ervu.lkrp.av-fileupload-status
|
||||
ERVU_FILEUPLOAD_MAX_FILE_SIZE=5242880
|
||||
ERVU_FILEUPLOAD_MAX_REQUEST_SIZE=6291456
|
||||
ERVU_FILEUPLOAD_FILE_SIZE_THRESHOLD=0
|
||||
|
|
|
|||
|
|
@ -91,6 +91,8 @@
|
|||
<property name="s3.endpoint" value="http://ervu-minio.k8s.micord.ru:31900"/>
|
||||
<property name="s3.access_key" value="rlTdTvkmSXu9FsLhfecw"/>
|
||||
<property name="s3.secret_key" value="NUmY0wwRIEyAd98GCKd1cOgJWvLQYAcMMul5Ulu0"/>
|
||||
<property name="av.kafka.group.id" value="1"/>
|
||||
<property name="av.kafka.download.response" value="ervu.lkrp.av-fileupload-status"/>
|
||||
</system-properties>
|
||||
<management>
|
||||
<audit-log>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue