Merge branch 'develop' of 10.10.31.70:/ervu-dashboard-etl into develop

This commit is contained in:
Ruslan 2025-09-29 21:04:11 +05:00
commit e6a58272aa
3 changed files with 509 additions and 34 deletions

View file

@ -50,11 +50,6 @@
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_output</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_input</from>
<to>Identify last row in a stream</to>
@ -65,6 +60,31 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_output</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Group by</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -82,8 +102,30 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1216</xloc>
<yloc>576</yloc>
<xloc>1424</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>Update</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1168</xloc>
<yloc>416</yloc>
</GUI>
</transform>
<transform>
@ -117,8 +159,8 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>992</xloc>
<yloc>576</yloc>
<xloc>1136</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
@ -148,7 +190,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1216</xloc>
<xloc>1424</xloc>
<yloc>240</yloc>
</GUI>
</transform>
@ -230,10 +272,80 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1216</xloc>
<xloc>1424</xloc>
<yloc>416</yloc>
</GUI>
</transform>
<transform>
<name>Group by</name>
<type>GroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<add_linenr>N</add_linenr>
<all_rows>N</all_rows>
<directory>${java.io.tmpdir}</directory>
<fields>
<field>
<aggregate>has_incapacity_info</aggregate>
<subject>has_incapacity_info</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_id</aggregate>
<subject>source_id</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_update_date</aggregate>
<subject>source_update_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>capacity_status</aggregate>
<subject>capacity_status</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>court_name</aggregate>
<subject>court_name</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>register_date</aggregate>
<subject>register_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>deregistration_date</aggregate>
<subject>deregistration_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>last_row</aggregate>
<subject>last_row</subject>
<type>FIRST_INCL_NULL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>recruit_id</name>
</field>
</group>
<ignore_aggregate>N</ignore_aggregate>
<prefix>grp</prefix>
<attributes/>
<GUI>
<xloc>704</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
<name>Identify last row in a stream</name>
<type>DetectLastRow</type>
@ -252,6 +364,41 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<yloc>416</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>has_incapacity_info</name>
<rename>has_incapacity_info</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>912</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
<name>incapacity_input</name>
<type>TableInput</type>
@ -265,13 +412,12 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH filteredData AS (
SELECT ri.recruit_id,
ri.info -> 'svedFL' -> 'svedNedeesp' -> 'nedeesposob' as nedeesposob_arr
FROM recruits_info ri
WHERE jsonb_typeof(ri.info -> 'svedFL' -> 'svedNedeesp' -> 'nedeesposob') = 'array'
AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
-- AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
SELECT fd.recruit_id,
nedeesposob_elem ->> 'id' AS source_id,
@ -279,7 +425,8 @@ SELECT fd.recruit_id,
nedeesposob_elem ->> 'naimStatus' AS capacity_status,
nedeesposob_elem ->> 'naimOrg' AS court_name,
to_date(nedeesposob_elem ->> 'dataPrisv', 'YYYY-MM-DD') AS register_date,
to_date(nedeesposob_elem ->> 'dataOkonch', 'YYYY-MM-DD') AS deregistration_date
to_date(nedeesposob_elem ->> 'dataOkonch', 'YYYY-MM-DD') AS deregistration_date,
true as has_incapacity_info
FROM filteredData fd
CROSS JOIN LATERAL jsonb_array_elements(fd.nedeesposob_arr -> 'svedFL' -> 'svedNedeesp' -> 'nedeesposob') AS nedeesposob_elem;</sql>
<variables_active>Y</variables_active>
@ -347,11 +494,23 @@ FROM filteredData fd
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>992</xloc>
<xloc>912</xloc>
<yloc>416</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>incapacity_output</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -56,13 +56,33 @@
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_upsert</from>
<to>Filter rows</to>
<from>Detect empty stream</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Change job status on success</to>
<from>Group by</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_upsert</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
</order>
@ -82,7 +102,7 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1744</xloc>
<xloc>1888</xloc>
<yloc>560</yloc>
</GUI>
</transform>
@ -117,7 +137,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1520</xloc>
<xloc>1568</xloc>
<yloc>560</yloc>
</GUI>
</transform>
@ -148,7 +168,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1744</xloc>
<xloc>1888</xloc>
<yloc>208</yloc>
</GUI>
</transform>
@ -233,10 +253,80 @@ and recruitment_id = '${IDM_ID}';</sql>
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1744</xloc>
<xloc>1888</xloc>
<yloc>384</yloc>
</GUI>
</transform>
<transform>
<name>Group by</name>
<type>GroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<add_linenr>N</add_linenr>
<all_rows>N</all_rows>
<directory>${java.io.tmpdir}</directory>
<fields>
<field>
<aggregate>has_incapacity_info</aggregate>
<subject>has_incapacity_info</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_id</aggregate>
<subject>source_id</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_update_date</aggregate>
<subject>source_update_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>capacity_status</aggregate>
<subject>capacity_status</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>court_name</aggregate>
<subject>court_name</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>register_date</aggregate>
<subject>register_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>deregistration_date</aggregate>
<subject>deregistration_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>last_row</aggregate>
<subject>last_row</subject>
<type>FIRST_INCL_NULL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>recruit_id</name>
</field>
</group>
<ignore_aggregate>N</ignore_aggregate>
<prefix>grp</prefix>
<attributes/>
<GUI>
<xloc>1232</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
<name>Identify last row in a stream</name>
<type>DetectLastRow</type>
@ -255,6 +345,41 @@ and recruitment_id = '${IDM_ID}';</sql>
<yloc>384</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>has_incapacity_info</name>
<rename>has_incapacity_info</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1392</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
<name>incapacity_input</name>
<type>TableInput</type>
@ -268,7 +393,6 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH filteredData AS (SELECT ri.recruit_id,
ri.info -> 'svedFL' -> 'svedNedeesp' -> 'nedeesposob' as nedeesposob_arr
FROM recruits_info ri
@ -351,11 +475,45 @@ WHERE to_date(nedeesposob_elem ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPD
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1520</xloc>
<xloc>1568</xloc>
<yloc>384</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>Update</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1744</xloc>
<yloc>384</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>incapacity_upsert</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -50,11 +50,6 @@
<to>incapacity_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_upsert</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_upsert</from>
<to>Change job status on error</to>
@ -65,6 +60,31 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Group by</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>incapacity_upsert</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -82,7 +102,7 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1344</xloc>
<xloc>1424</xloc>
<yloc>640</yloc>
</GUI>
</transform>
@ -148,7 +168,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1344</xloc>
<xloc>1424</xloc>
<yloc>256</yloc>
</GUI>
</transform>
@ -233,7 +253,7 @@ and recruitment_id = '${IDM_ID}';</sql>
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1344</xloc>
<xloc>1424</xloc>
<yloc>432</yloc>
</GUI>
</transform>
@ -268,7 +288,6 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH filteredData AS (
SELECT ri.recruit_id,
ri.info -> 'svedFL' -> 'svedNedeesp' -> 'nedeesposob' as nedeesposob_arr
@ -356,6 +375,133 @@ FROM filteredData fd
<yloc>432</yloc>
</GUI>
</transform>
<transform>
<name>Group by</name>
<type>GroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<add_linenr>N</add_linenr>
<all_rows>N</all_rows>
<directory>${java.io.tmpdir}</directory>
<fields>
<field>
<aggregate>has_incapacity_info</aggregate>
<subject>has_incapacity_info</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_id</aggregate>
<subject>source_id</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_update_date</aggregate>
<subject>source_update_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>capacity_status</aggregate>
<subject>capacity_status</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>court_name</aggregate>
<subject>court_name</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>register_date</aggregate>
<subject>register_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>deregistration_date</aggregate>
<subject>deregistration_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>last_row</aggregate>
<subject>last_row</subject>
<type>FIRST_INCL_NULL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>recruit_id</name>
</field>
</group>
<ignore_aggregate>N</ignore_aggregate>
<prefix>grp</prefix>
<attributes/>
<GUI>
<xloc>832</xloc>
<yloc>640</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>has_incapacity_info</name>
<rename>has_incapacity_info</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>976</xloc>
<yloc>640</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>Update</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1280</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>incapacity_upsert</source_transform>
@ -369,6 +515,18 @@ FROM filteredData fd
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
</transform_error_handling>
<attributes/>
</pipeline>