This commit is contained in:
adel.ka 2025-10-24 10:48:07 +03:00
parent 0a02fd319e
commit 3cd263e016
3 changed files with 370 additions and 194 deletions

View file

@ -80,6 +80,16 @@
<to>spouse_success_job_status</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>spouse_sort_rows</from>
<to>spouse_update_flags</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>spouse_update_flags</from>
<to>spouse_error_job_status</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -101,83 +111,6 @@
<yloc>192</yloc>
</GUI>
</transform>
<transform>
<name>spouse_error_job_status</name>
<type>ExecSql</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<arguments>
<argument>
<name>error_description</name>
</argument>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>Y</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>Y</set_params>
<single_statement>N</single_statement>
<sql>UPDATE etl.job_execution
SET status = 'ERROR',
error_description = ?,
execution_end_datetime = current_timestamp
WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';
</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>800</xloc>
<yloc>192</yloc>
</GUI>
</transform>
<transform>
<name>spouse_success_job_status</name>
<type>ExecSql</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<arguments>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>Y</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
<sql>UPDATE etl.job_execution
SET status = 'SUCCESS',
execution_end_datetime = current_timestamp
WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}'
and status = 'PROCESSING';
</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>592</xloc>
<yloc>720</yloc>
</GUI>
</transform>
<transform>
<name>Create job execution record</name>
<type>ExecSql</type>
@ -207,23 +140,6 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<yloc>576</yloc>
</GUI>
</transform>
<transform>
<name>spouse_detect_empty_stream</name>
<type>DetectEmptyStream</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<attributes/>
<GUI>
<xloc>352</xloc>
<yloc>720</yloc>
</GUI>
</transform>
<transform>
<name>Filter rows</name>
<type>FilterRows</type>
@ -261,8 +177,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
</GUI>
</transform>
<transform>
<name>spouse_sort_rows</name>
<type>SortRows</type>
<name>Unique rows</name>
<type>Unique</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
@ -271,94 +187,18 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<method>none</method>
<schema_name/>
</partitioning>
<directory>${java.io.tmpdir}</directory>
<prefix>out</prefix>
<sort_size>100000</sort_size>
<free_memory/>
<compress>N</compress>
<compress_variable/>
<unique_rows>Y</unique_rows>
<count_rows>N</count_rows>
<fields>
<field>
<case_insensitive>N</case_insensitive>
<name>recruit_id</name>
<ascending>Y</ascending>
<case_sensitive>N</case_sensitive>
<collator_enabled>N</collator_enabled>
<collator_strength>0</collator_strength>
<presorted>N</presorted>
</field>
</fields>
<reject_duplicate_row>N</reject_duplicate_row>
<attributes/>
<GUI>
<xloc>352</xloc>
<yloc>192</yloc>
</GUI>
</transform>
<transform>
<name>spouse_input</name>
<type>TableInput</type>
<description/>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH spouse_data AS (
SELECT
ri.recruit_id AS recruit_id,
(supr->>'id')::uuid AS spouse_external_id,
NULLIF(supr->>'idERN', '') AS spouse_id_ern,
NULLIF(supr->>'rodstvSvyazSuprug', '')::int AS kinship_type,
supr->'svedFLBS'->'fio'->>'familiya' AS last_name,
supr->'svedFLBS'->'fio'->>'imya' AS first_name,
supr->'svedFLBS'->'fio'->>'otchestvo' AS middle_name,
CONCAT_WS(' ',
supr->'svedFLBS'->'fio'->>'familiya',
supr->'svedFLBS'->'fio'->>'imya',
supr->'svedFLBS'->'fio'->>'otchestvo'
) AS full_name,
MAKE_DATE(
NULLIF(supr->'svedFLBS'->'dataRozhdDok'->>'god', '')::int,
NULLIF(supr->'svedFLBS'->'dataRozhdDok'->>'mesyacz', '')::int,
NULLIF(supr->'svedFLBS'->'dataRozhdDok'->>'den', '')::int
) AS birth_date,
(supr->'svedSmert'->'extend'->>'dataSmert')::date AS death_date,
supr->'svedSmert'->>'nomerZapis' AS death_az_number,
ri.info->'svedFL'->'svedSemPolozh'->'svedAZBrak'->>'nomerZapis' AS marriage_az_number,
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZBrak'->>'dataBrakKalend')::date AS marriage_date,
ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'nomerZapis' AS divorce_az_number,
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'dataRastBrakKalend')::date AS divorce_date,
CASE
WHEN ri.info->'svedFL'->'svedSemPolozh'->>'svedSuprIskl' = '1' THEN true
ELSE false
END AS information_excluded,
ri.ctid as source_ctid
FROM ervu_dashboard.recruits_info ri
JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id
CROSS JOIN LATERAL (
SELECT supri
FROM jsonb_array_elements(ri.info->'svedFL'->'svedSemPolozh'->'suprugi') AS supri
WHERE jsonb_typeof(ri.info->'svedFL'->'svedSemPolozh'->'suprugi') = 'array'
) AS supr(supr)
WHERE
'${IDM_ID}' != ''
AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
SELECT
*,
source_ctid = (SELECT MAX(source_ctid) FROM spouse_data) as last_row
FROM spouse_data;
</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>352</xloc>
<yloc>576</yloc>
<xloc>592</xloc>
<yloc>912</yloc>
</GUI>
</transform>
<transform>
@ -428,14 +268,11 @@ FROM spouse_data;
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>ervu_dashboard</schema>
<specify_fields>Y</specify_fields>
<table>citizen_spouse</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
@ -485,27 +322,24 @@ FROM spouse_data;
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>ervu_dashboard</schema>
<specify_fields>Y</specify_fields>
<table>marriage_registry</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>544</xloc>
<xloc>576</xloc>
<yloc>192</yloc>
</GUI>
</transform>
<transform>
<name>Unique rows</name>
<type>Unique</type>
<name>spouse_detect_empty_stream</name>
<type>DetectEmptyStream</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
@ -514,18 +348,223 @@ FROM spouse_data;
<method>none</method>
<schema_name/>
</partitioning>
<count_rows>N</count_rows>
<attributes/>
<GUI>
<xloc>352</xloc>
<yloc>720</yloc>
</GUI>
</transform>
<transform>
<name>spouse_error_job_status</name>
<type>ExecSql</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<arguments>
<argument>
<name>error_description</name>
</argument>
</arguments>
<connection>ervu-dashboard</connection>
<execute_each_row>Y</execute_each_row>
<quoteString>N</quoteString>
<replace_variables>Y</replace_variables>
<set_params>Y</set_params>
<single_statement>N</single_statement>
<sql>UPDATE etl.job_execution
SET status = 'ERROR',
error_description = ?,
execution_end_datetime = current_timestamp
WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>800</xloc>
<yloc>192</yloc>
</GUI>
</transform>
<transform>
<name>spouse_input</name>
<type>TableInput</type>
<description/>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<sql>WITH spouse_data AS (
SELECT
ri.recruit_id AS recruit_id,
(supr->>'id')::uuid AS spouse_external_id,
NULLIF(supr->>'idERN', '') AS spouse_id_ern,
NULLIF(supr->>'rodstvSvyazSuprug', '')::int AS kinship_type,
supr->'svedFLBS'->'fio'->>'familiya' AS last_name,
supr->'svedFLBS'->'fio'->>'imya' AS first_name,
supr->'svedFLBS'->'fio'->>'otchestvo' AS middle_name,
CONCAT_WS(' ',
supr->'svedFLBS'->'fio'->>'familiya',
supr->'svedFLBS'->'fio'->>'imya',
supr->'svedFLBS'->'fio'->>'otchestvo'
) AS full_name,
MAKE_DATE(
NULLIF(supr->'svedFLBS'->'dataRozhdDok'->>'god', '')::int,
NULLIF(supr->'svedFLBS'->'dataRozhdDok'->>'mesyacz', '')::int,
NULLIF(supr->'svedFLBS'->'dataRozhdDok'->>'den', '')::int
) AS birth_date,
(supr->'svedSmert'->'extend'->>'dataSmert')::date AS death_date,
supr->'svedSmert'->>'nomerZapis' AS death_az_number,
ri.info->'svedFL'->'svedSemPolozh'->'svedAZBrak'->>'nomerZapis' AS marriage_az_number,
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZBrak'->>'dataBrakKalend')::date AS marriage_date,
ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'nomerZapis' AS divorce_az_number,
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'dataRastBrakKalend')::date AS divorce_date,
NULLIF(ri.info->'svedFL'->'svedSemPolozh'->>'semPolozh','')::int AS marital_status,
CASE
WHEN ri.info->'svedFL'->'svedSemPolozh'->>'svedSuprIskl' = '1' THEN true
ELSE false
END AS information_excluded,
ri.ctid as source_ctid
FROM ervu_dashboard.recruits_info ri
JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id
CROSS JOIN LATERAL (
SELECT supri
FROM jsonb_array_elements(ri.info->'svedFL'->'svedSemPolozh'->'suprugi') AS supri
WHERE jsonb_typeof(ri.info->'svedFL'->'svedSemPolozh'->'suprugi') = 'array'
) AS supr(supr)
WHERE
'${IDM_ID}' != ''
AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
SELECT
*,
source_ctid = (SELECT MAX(source_ctid) FROM spouse_data) as last_row
FROM spouse_data;
</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>352</xloc>
<yloc>576</yloc>
</GUI>
</transform>
<transform>
<name>spouse_sort_rows</name>
<type>SortRows</type>
<description/>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<directory>${java.io.tmpdir}</directory>
<prefix>out</prefix>
<sort_size>100000</sort_size>
<free_memory/>
<compress>N</compress>
<compress_variable/>
<unique_rows>Y</unique_rows>
<fields>
<field>
<case_insensitive>N</case_insensitive>
<name>recruit_id</name>
<ascending>Y</ascending>
<case_sensitive>N</case_sensitive>
<collator_enabled>N</collator_enabled>
<collator_strength>0</collator_strength>
<presorted>N</presorted>
</field>
</fields>
<reject_duplicate_row>N</reject_duplicate_row>
<attributes/>
<GUI>
<xloc>352</xloc>
<yloc>192</yloc>
</GUI>
</transform>
<transform>
<name>spouse_success_job_status</name>
<type>ExecSql</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<arguments>
</arguments>
<connection>ervu-dashboard</connection>
<execute_each_row>Y</execute_each_row>
<quoteString>N</quoteString>
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
<sql>UPDATE etl.job_execution
SET status = 'SUCCESS',
execution_end_datetime = current_timestamp
WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}'
and status = 'PROCESSING';
</sql>
<attributes/>
<GUI>
<xloc>592</xloc>
<yloc>912</yloc>
<yloc>720</yloc>
</GUI>
</transform>
<transform>
<name>spouse_update_flags</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>Y</error_ignored>
<ignore_flag_field/>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<key>
<condition>&lt;></condition>
<field>marital_status</field>
<name>marital_status</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>marital_status</name>
<rename>marital_status</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>576</xloc>
<yloc>32</yloc>
</GUI>
</transform>
<transform_error_handling>
@ -553,6 +592,18 @@ FROM spouse_data;
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>spouse_update_flags</source_transform>
<target_transform>spouse_error_job_status</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
</transform_error_handling>
<attributes/>
</pipeline>

View file

@ -80,6 +80,16 @@
<to>spouse_success_job_status</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Sort rows</from>
<to>spouse_update_flags</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>spouse_update_flags</from>
<to>spouse_error_job_status</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -189,7 +199,7 @@ and recruitment_id = '${IDM_ID}';</sql>
<name>Sort rows</name>
<type>SortRows</type>
<description/>
<distribute>Y</distribute>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
@ -350,6 +360,7 @@ and recruitment_id = '${IDM_ID}';
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH spouse_data AS (
SELECT
ri.recruit_id AS recruit_id,
@ -375,6 +386,7 @@ and recruitment_id = '${IDM_ID}';
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZBrak'->>'dataBrakKalend')::date AS marriage_date,
ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'nomerZapis' AS divorce_az_number,
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'dataRastBrakKalend')::date AS divorce_date,
NULLIF(ri.info->'svedFL'->'svedSemPolozh'->>'semPolozh','')::int AS marital_status,
CASE
WHEN ri.info->'svedFL'->'svedSemPolozh'->>'svedSuprIskl' = '1' THEN true
ELSE false
@ -437,6 +449,46 @@ WHERE job_name = '${JOB_NAME}'
<yloc>688</yloc>
</GUI>
</transform>
<transform>
<name>spouse_update_flags</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>Y</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<key>
<condition>&lt;></condition>
<field>marital_status</field>
<name>marital_status</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>marital_status</name>
<rename>marital_status</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>624</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>spouse_update_or_insert</name>
<type>InsertUpdate</type>
@ -544,6 +596,18 @@ WHERE job_name = '${JOB_NAME}'
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>spouse_update_flags</source_transform>
<target_transform>spouse_error_job_status</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>spouse_update_or_insert</source_transform>
<target_transform>spouse_error_job_status</target_transform>

View file

@ -107,6 +107,16 @@ M_R_CR_DATE
<to>spouse_success_job_status</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Sort rows</from>
<to>spouse_update_flags</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>spouse_update_flags</from>
<to>spouse_error_job_status</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -231,7 +241,7 @@ DO UPDATE SET
<name>Sort rows</name>
<type>SortRows</type>
<description/>
<distribute>Y</distribute>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
@ -417,6 +427,7 @@ and recruitment_id = '${IDM_ID}';
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZBrak'->>'dataBrakKalend')::date AS marriage_date,
ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'nomerZapis' AS divorce_az_number,
(ri.info->'svedFL'->'svedSemPolozh'->'svedAZRazvod'->>'dataRastBrakKalend')::date AS divorce_date,
NULLIF(ri.info->'svedFL'->'svedSemPolozh'->>'semPolozh','')::int AS marital_status,
CASE
WHEN ri.info->'svedFL'->'svedSemPolozh'->>'svedSuprIskl' = '1' THEN true
ELSE false
@ -498,13 +509,11 @@ and status = 'PROCESSING';
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<key>
<condition>=</condition>
<field>spouse_external_id</field>
<name>spouse_external_id</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen_spouse</table>
@ -576,6 +585,46 @@ and status = 'PROCESSING';
<yloc>528</yloc>
</GUI>
</transform>
<transform>
<name>spouse_update_flags</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>Y</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<key>
<condition>&lt;></condition>
<field>marital_status</field>
<name>marital_status</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>marital_status</name>
<rename>marital_status</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>512</xloc>
<yloc>368</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>marriage_registry_update_or_insert</source_transform>
@ -601,6 +650,18 @@ and status = 'PROCESSING';
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>spouse_update_flags</source_transform>
<target_transform>spouse_error_job_status</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
</transform_error_handling>
<attributes/>
</pipeline>