SUPPORT-8411: add topic properties
This commit is contained in:
parent
282241217f
commit
e678fa3ac6
6 changed files with 22 additions and 26 deletions
|
|
@ -1,8 +1,11 @@
|
|||
package ervu.dto.journal;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class JournalFileDataRequest {
|
||||
|
||||
private String orgIdErvu; // идентификатор организации в ЕРВУ
|
||||
@JsonProperty("orgId_ERVU")
|
||||
private String orgIdErvu; // идентификатор организации в ЕРВУ
|
||||
private String prnOid; // идентификатор сотрудника в ЕСИА ответственный за воинский учёт
|
||||
|
||||
public String getOrgIdErvu() {
|
||||
|
|
@ -22,15 +25,4 @@ public class JournalFileDataRequest {
|
|||
this.prnOid = prnOid;
|
||||
return this;
|
||||
}
|
||||
|
||||
/*
|
||||
Приведение в формат message подходящий для отправки в kafka.
|
||||
*/
|
||||
public String toString() {
|
||||
return String.format("{"
|
||||
+ "\"orgId_ERVU\": \"%s\","
|
||||
+ "\"prnOid\": \"%s\""
|
||||
+ "}", orgIdErvu, prnOid
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,21 +32,20 @@ public class JournalInMemoryStaticGridLoadService implements
|
|||
private ReplyingKafkaService ervuReplyingKafkaService;
|
||||
@Autowired
|
||||
private ObjectMapper objectMapper;
|
||||
@Value("${ervu-journal-kafka.reply-topic:ervu.organization.journal.request}")
|
||||
@Value("${ervu-journal-kafka.request-topic}")
|
||||
private String requestTopic;
|
||||
@Value("${ervu-journal-kafka.reply-topic:ervu.organization.journal.response}")
|
||||
@Value("${ervu-journal-kafka.reply-topic}")
|
||||
private String replyTopic;
|
||||
|
||||
@Override
|
||||
public List<JournalDto> loadData() {
|
||||
JournalFileDataRequest journalFileDataRequest = initJournalFileDataRequest();
|
||||
String responseJsonString = ervuReplyingKafkaService.sendMessageAndGetReply(requestTopic,
|
||||
replyTopic, journalFileDataRequest.toString()
|
||||
);
|
||||
|
||||
try {
|
||||
String responseJsonString = ervuReplyingKafkaService.sendMessageAndGetReply(requestTopic,
|
||||
replyTopic, objectMapper.writeValueAsString(journalFileDataRequest));
|
||||
JournalFileDataResponse journalFileDataResponse = objectMapper.readValue(responseJsonString,
|
||||
JournalFileDataResponse.class
|
||||
);
|
||||
JournalFileDataResponse.class);
|
||||
return journalFileDataResponse.getFilesInfo().stream()
|
||||
.map(JournalDtoMapper::mapToJournalDto)
|
||||
.toList();
|
||||
|
|
@ -56,7 +55,7 @@ public class JournalInMemoryStaticGridLoadService implements
|
|||
}
|
||||
}
|
||||
|
||||
//todo не раскоментить после тестирования
|
||||
//todo раскоментить после тестирования
|
||||
private JournalFileDataRequest initJournalFileDataRequest() {
|
||||
// Optional<Authentication> authentication = Optional.ofNullable(
|
||||
// SecurityContextHolder.getContext().getAuthentication()
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ public class ReplyingKafkaConfig {
|
|||
private String bootstrapServers;
|
||||
@Value("${ervu-kafka.reply-topic}")
|
||||
private String orgReplyTopic;
|
||||
@Value("${ervu-journal-kafka.reply-topic:ervu.organization.journal.response}")
|
||||
@Value("${ervu-journal-kafka.reply-topic}")
|
||||
private String journalReplyTopic;
|
||||
@Value("${ervu-kafka.group-id}")
|
||||
private String groupId;
|
||||
|
|
|
|||
|
|
@ -24,11 +24,12 @@ public abstract class BaseReplyingKafkaServiceImpl implements ReplyingKafkaServi
|
|||
ProducerRecord<String, String> record = new ProducerRecord<>(requestTopic, requestMessage);
|
||||
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
|
||||
|
||||
RequestReplyFuture<String, String, String> replyFuture = getReplyingKafkaTemplate().sendAndReceive(
|
||||
record);
|
||||
RequestReplyFuture<String, String, String> replyFuture = getReplyingKafkaTemplate()
|
||||
.sendAndReceive(record);
|
||||
|
||||
try {
|
||||
Optional<ConsumerRecord<String, String>> consumerRecord = Optional.ofNullable(replyFuture.get());
|
||||
return consumerRecord.map(ConsumerRecord::value)
|
||||
return Optional.ofNullable(replyFuture.get())
|
||||
.map(ConsumerRecord::value)
|
||||
.orElseThrow(() -> new RuntimeException("Kafka return result is null."));
|
||||
}
|
||||
catch (InterruptedException | ExecutionException e) {
|
||||
|
|
|
|||
|
|
@ -55,3 +55,5 @@ xa-data-source add \
|
|||
/system-property=ervu-kafka.reply-topic:add(value="ervu.organization.response")
|
||||
/system-property=ervu-kafka.group-id:add(value="1")
|
||||
/system-property=ervu-kafka.request-topic:add(value="ervu.organization.request")
|
||||
/system-property=ervu-journal-kafka.request-topic:add(value="ervu.organization.journal.request")
|
||||
/system-property=ervu-journal-kafka.reply-topic:add(value="ervu.organization.journal.response")
|
||||
|
|
|
|||
|
|
@ -82,6 +82,8 @@
|
|||
<property name="ervu-kafka.request-topic" value="ervu.organization.request"/>
|
||||
<property name="client-cert-hash" value="04508B4B0B58776A954A0E15F574B4E58799D74C61EE020B3330716C203E3BDD"/>
|
||||
<property name="bpmn.enable" value="false"/>
|
||||
<property name="ervu-journal-kafka.request-topic" value="ervu.organization.journal.request"/>
|
||||
<property name="ervu-journal-kafka.reply-topic" value="ervu.organization.journal.response"/>
|
||||
</system-properties>
|
||||
<management>
|
||||
<audit-log>
|
||||
|
|
@ -585,4 +587,4 @@
|
|||
<remote-destination host="${jboss.mail.server.host:localhost}" port="${jboss.mail.server.port:25}"/>
|
||||
</outbound-socket-binding>
|
||||
</socket-binding-group>
|
||||
</server>
|
||||
</server>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue