From 9f22b174181084b6cee82a27156716e7ce2b7d65 Mon Sep 17 00:00:00 2001 From: "adel.ka" Date: Wed, 12 Nov 2025 17:37:34 +0300 Subject: [PATCH] fix property drivers_licence --- .../parallel/drivers_licence_flow.hpl | 261 +++--- .../parallel/drivers_licence_flow_delta.hpl | 339 ++++---- .../parallel/drivers_licence_flow_repeat.hpl | 62 +- .../property/parallel/property_flow.hpl | 700 ++++++++-------- .../property/parallel/property_flow_delta.hpl | 757 +++++++++--------- .../parallel/property_flow_repeat.hpl | 470 ++++++----- 6 files changed, 1248 insertions(+), 1341 deletions(-) diff --git a/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow.hpl b/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow.hpl index d53be1a..e25cc9c 100644 --- a/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow.hpl +++ b/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow.hpl @@ -60,16 +60,6 @@ driver_licence_group_by_recruit_id Y - - driver_licence_group_by_recruit_id - driver_licence_java_expression - Y - - - driver_licence_java_expression - update_drivers_licence_flags - Y - drivers_licence_output Identify last row in a stream @@ -85,6 +75,11 @@ driver_licence_success_job Y + + driver_licence_group_by_recruit_id + update_drivers_licence_flags + Y + Abort @@ -106,115 +101,6 @@ 512 - - driver_license_failure_job - ExecSql - - Y - - 1 - - none - - - - - error_description - - - ervu-dashboard - - Y - - N - - Y - Y - N - UPDATE etl.job_execution -SET status = 'ERROR', - error_description = ?, - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - - 1376 - 512 - - - - driver_licence_success_job - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - - Y - - N - - Y - N - N - UPDATE etl.job_execution -SET status = 'SUCCESS', - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - - 816 - 208 - - - - driver_lisence_job_create - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - - N - - N - - Y - N - N - INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) -VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - - - - - - 288 - 512 - - Detect empty stream DetectEmptyStream @@ -286,39 +172,6 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); 352 - - driver_licence_java_expression - Janino - - Y - - 1 - - none - - - - has_driver_license - has_driver_license_raw == 1 - Boolean - -1 - -1 - - - - has_tractor_license - has_tractor_license_raw == 1 - Boolean - -1 - -1 - - - - - 848 - 752 - - driver_licence_group_by_recruit_id GroupBy @@ -335,14 +188,16 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); ${java.io.tmpdir} - has_driver_license_raw + has_driver_license is_has_driver_license MAX + - has_tractor_license_raw + has_tractor_license is_has_tractor_license MAX + N @@ -352,6 +207,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); N + grp @@ -359,6 +215,103 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); 752 + + driver_licence_success_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'SUCCESS', + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 816 + 208 + + + + driver_license_failure_job + ExecSql + + Y + + 1 + + none + + + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'ERROR', + error_description = ?, + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1376 + 512 + + + + driver_lisence_job_create + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + N + N + Y + N + N + INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) +VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); + + + + + 288 + 512 + + drivers_licence_input TableInput diff --git a/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_delta.hpl b/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_delta.hpl index c46594e..0071305 100644 --- a/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_delta.hpl +++ b/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_delta.hpl @@ -30,21 +30,11 @@ Table input 2 Y - - Detect empty stream 2 - driver_licence_success_job - Y - driver_licence_insert_or_update driver_lisence_failure_job Y - - Table input 2 - Detect empty stream 2 - Y - update_driver_licence_flag driver_lisence_failure_job @@ -55,21 +45,11 @@ driver_licence_need_update_filter Y - - driver_licence_group_by_recruit_id - driver_licence_java_expression - Y - Table input 2 driver_licence_group_by_recruit_id Y - - driver_licence_java_expression - update_driver_licence_flag - Y - Table input 2 driver_licence_need_delete_filter @@ -80,6 +60,21 @@ driver_licence_insert_or_update Y + + Identify last row in a stream 2 2 + Filter rows 2 + Y + + + driver_licence_group_by_recruit_id + update_driver_licence_flag + Y + + + Filter rows 2 + driver_licence_success_job + Y + driver_licence_need_delete_filter delete_act_tractor @@ -87,32 +82,27 @@ delete_act_tractor - driver_lisence_failure_job + Detect empty stream Y - Filter rows 2 - Unique rows - Y - - - Unique rows - driver_licence_success_job + Detect empty stream + Identify last row in a stream 2 2 Y driver_licence_insert_or_update + Detect empty stream 2 + Y + + + Detect empty stream 2 Identify last row in a stream 2 2 Y delete_act_tractor - Identify last row in a stream 2 2 - Y - - - Identify last row in a stream 2 2 - Filter rows 2 + driver_lisence_failure_job Y @@ -137,8 +127,8 @@ - driver_license_job_create - ExecSql + Detect empty stream + DetectEmptyStream Y @@ -147,29 +137,10 @@ none - - - ervu-dashboard - - N - - N - - Y - N - N - UPDATE etl.job_execution -SET - status = 'DELTA_PROCESSING', - execution_datetime = DEFAULT, - error_description = NULL -where job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - 432 - 1264 + 1408 + 1136 @@ -185,8 +156,8 @@ and recruitment_id = '${IDM_ID}'; - 768 - 976 + 1568 + 1232 @@ -218,11 +189,11 @@ and recruitment_id = '${IDM_ID}'; - Unique rows + driver_licence_success_job - 1744 - 1184 + 1760 + 1136 @@ -239,8 +210,8 @@ and recruitment_id = '${IDM_ID}'; last_row - 1552 - 1184 + 1568 + 1136 @@ -256,7 +227,6 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard N - WITH base AS ( SELECT @@ -278,24 +248,25 @@ and recruitment_id = '${IDM_ID}'; THEN to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->>'dataSved', 'YYYY-MM-DD') END as pred_update_date FROM ervu_dashboard.recruits_info ri - JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' + JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' WHERE (jsonb_typeof(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->'svedKat') = 'array' OR ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM' IS NOT NULL OR ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM' IS NOT NULL) ), all_data AS ( + -- Водительские права (обычные) SELECT ri.recruit_id, - (ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->> 'id') as source_id, + (ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->> 'id') as source_id, ri.driver_update_date AS source_update_date, ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->>'kodStatus' AS status, ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->>'serNomDok' AS licence_series, NULL AS licence_number, array_to_string(array( - SELECT kat->>'naimKatTS' - FROM jsonb_array_elements(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->'svedKat') AS kat - WHERE jsonb_typeof(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->'svedKat') = 'array'), ',') AS categories, + SELECT kat->>'naimKatTS' + FROM jsonb_array_elements(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->'svedKat') AS kat + WHERE jsonb_typeof(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->'svedKat') = 'array'), ',') AS categories, to_date(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->>'dataVydDok', 'YYYY-MM-DD') AS start_date, to_date(ri.info->'svedFL'->'svedVoditUdost'->'voditUdost'->>'srokDeistvDok', 'YYYY-MM-DD') AS close_date, NULL AS replacement_date, @@ -308,6 +279,7 @@ and recruitment_id = '${IDM_ID}'; UNION ALL + -- Актуальные трактористские права (aktUTM) SELECT ri.recruit_id, (ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->> 'id') as source_id, @@ -316,18 +288,20 @@ and recruitment_id = '${IDM_ID}'; ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->>'serUTM' AS licence_series, ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->>'nomUTM' AS licence_number, (SELECT array_to_string(ARRAY(SELECT jsonb_array_elements_text( - ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->'extend'->'kodKatUTM')), ',')) AS categories, + ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->'extend'->'kodKatUTM')), ',')) AS categories, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->>'dataVydUTM', 'YYYY-MM-DD') AS start_date, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->>'dataOkonchUTM', 'YYYY-MM-DD') AS close_date, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->>'dataZameny', 'YYYY-MM-DD') AS replacement_date, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'aktUTM'->>'dataVozvr', 'YYYY-MM-DD') AS return_date, true AS tractor_driver, ri.akt_update_date > '${MAX_SOURCE_UPDATE_DATE}'::date as need_update, - NOT ri.has_akt_utm_info as need_to_delete + false as need_to_delete -- актуальные права не помечаем на удаление FROM base ri + WHERE ri.has_akt_utm_info UNION ALL + -- Предыдущие трактористские права (predUTM) SELECT ri.recruit_id, (ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->> 'id') as source_id, @@ -336,7 +310,7 @@ and recruitment_id = '${IDM_ID}'; ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->>'serUTM' AS licence_series, ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->>'nomUTM' AS licence_number, (SELECT array_to_string(ARRAY(SELECT jsonb_array_elements_text( - ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->'extend'->'kodKatUTM')), ',')) AS categories, + ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->'extend'->'kodKatUTM')), ',')) AS categories, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->>'dataVydUTM', 'YYYY-MM-DD') AS start_date, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->>'dataOkonchUTM', 'YYYY-MM-DD') AS close_date, to_date(ri.info->'svedFL'->'svedUdostTraktMash'->'udostTraktMash'->'predUTM'->>'dataZameny', 'YYYY-MM-DD') AS replacement_date, @@ -346,17 +320,37 @@ and recruitment_id = '${IDM_ID}'; false as need_to_delete FROM base ri WHERE ri.has_pred_utm_info + + UNION ALL + -- Актуальные трактористские права на удаление + SELECT + ri.recruit_id, + NULL as source_id, + NULL AS source_update_date, + '1' AS status, + NULL AS licence_series, + NULL AS licence_number, + NULL AS categories, + NULL AS start_date, + NULL AS close_date, + NULL AS replacement_date, + NULL AS return_date, + true AS tractor_driver, + false as need_update, + true as need_to_delete + FROM base ri + WHERE ri.has_pred_utm_info AND NOT ri.has_akt_utm_info and ri.pred_update_date > '${MAX_SOURCE_UPDATE_DATE}'::date ) SELECT *, CASE - WHEN tractor_driver = false AND status = '1' THEN 1 - ELSE 0 - END AS is_has_driver_license, + WHEN tractor_driver = false AND status = '1' THEN TRUE + ELSE FALSE + END AS has_driver_license, CASE - WHEN tractor_driver = true AND status = '1' AND NOT need_to_delete THEN 1 - ELSE 0 - END AS is_has_tractor_license + WHEN tractor_driver = true AND status = '1' AND NOT need_to_delete THEN TRUE + ELSE FALSE + END AS has_tractor_license FROM all_data ORDER BY recruit_id Y @@ -366,27 +360,6 @@ ORDER BY recruit_id 1264 - - Unique rows - Unique - - Y - - 1 - - none - - - N - - - N - - - 1744 - 1072 - - delete_act_tractor Delete @@ -421,8 +394,8 @@ ORDER BY recruit_id - 1232 - 1184 + 1248 + 1136 @@ -441,13 +414,13 @@ ORDER BY recruit_id ${java.io.tmpdir} - has_driver_license_raw - is_has_driver_license + has_driver_license + has_driver_license MAX - has_tractor_license_raw - is_has_tractor_license + has_tractor_license + has_tractor_license MAX @@ -465,42 +438,6 @@ ORDER BY recruit_id 1520 - - driver_lisence_failure_job - ExecSql - - Y - - 1 - - none - - - - - error_description - - - ervu-dashboard - Y - N - Y - Y - N - UPDATE etl.job_execution -SET status = 'DELTA_ERROR', - error_description = ?, - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1552 - 1328 - - driver_licence_insert_or_update InsertUpdate @@ -519,7 +456,6 @@ and recruitment_id = '${IDM_ID}'; = source_id source_id - ervu_dashboard drivers_licence
@@ -591,39 +527,6 @@ and recruitment_id = '${IDM_ID}'; 1328
- - driver_licence_java_expression - Janino - - Y - - 1 - - none - - - - has_driver_license - has_driver_license_raw == 1 - Boolean - -1 - -1 - - - - has_tractor_license - has_tractor_license_raw == 1 - Boolean - -1 - -1 - - - - - 1056 - 1520 - - driver_licence_need_delete_filter FilterRows @@ -653,11 +556,12 @@ and recruitment_id = '${IDM_ID}'; + Sample rows delete_act_tractor - 992 - 1184 + 1008 + 1136 @@ -689,6 +593,7 @@ and recruitment_id = '${IDM_ID}'; + Sample rows driver_licence_insert_or_update @@ -710,11 +615,8 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard - Y - N - Y N N @@ -725,13 +627,80 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; - - 1744 + 1760 976 + + driver_license_job_create + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + N + N + Y + N + N + UPDATE etl.job_execution +SET + status = 'DELTA_PROCESSING', + execution_datetime = DEFAULT, + error_description = NULL +where job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + 432 + 1264 + + + + driver_lisence_failure_job + ExecSql + + Y + + 1 + + none + + + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'DELTA_ERROR', + error_description = ?, + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1552 + 1328 + + update_driver_licence_flag Update @@ -746,13 +715,11 @@ and recruitment_id = '${IDM_ID}'; 10000 ervu-dashboard Y - = recruit_id recruit_id - ervu_dashboard citizen
diff --git a/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_repeat.hpl b/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_repeat.hpl index 454d6ea..f6c32ae 100644 --- a/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_repeat.hpl +++ b/mappings/info_recruits/citizen_tables/drivers_licence/parallel/drivers_licence_flow_repeat.hpl @@ -46,16 +46,6 @@ Filter rows 2 Y - - driver_licence_java_expression - update_drivers_licence_flags - Y - - - driver_licence_group_by_recruit_id - driver_licence_java_expression - Y - drivers_licence_input Detect empty stream 2 @@ -86,6 +76,11 @@ driver_license_failure_job Y + + driver_licence_group_by_recruit_id + update_drivers_licence_flags + Y + Abort 2 @@ -283,14 +278,16 @@ ${java.io.tmpdir} - has_driver_license_raw + has_driver_license is_has_driver_license MAX + - has_tractor_license_raw + has_tractor_license is_has_tractor_license MAX + N @@ -300,6 +297,7 @@ N + grp @@ -307,39 +305,6 @@ 1024 - - driver_licence_java_expression - Janino - - Y - - 1 - - none - - - - has_driver_license - has_driver_license_raw == 1 - Boolean - -1 - -1 - - - - has_tractor_license - has_tractor_license_raw == 1 - Boolean - -1 - -1 - - - - - 928 - 1024 - - driver_licence_success_job ExecSql @@ -422,11 +387,8 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard - N - N - Y N N @@ -451,7 +413,6 @@ DO UPDATE SET status = 'PROCESSING', execution_datetime = DEFAULT, error_description = NULL; - 368 @@ -471,7 +432,6 @@ DO UPDATE SET ervu-dashboard N - WITH base AS ( SELECT @@ -582,13 +542,11 @@ ORDER BY recruit_id 10000 ervu-dashboard Y - = recruit_id recruit_id - ervu_dashboard citizen
diff --git a/mappings/info_recruits/citizen_tables/property/parallel/property_flow.hpl b/mappings/info_recruits/citizen_tables/property/parallel/property_flow.hpl index d6e02b9..02494e1 100644 --- a/mappings/info_recruits/citizen_tables/property/parallel/property_flow.hpl +++ b/mappings/info_recruits/citizen_tables/property/parallel/property_flow.hpl @@ -21,62 +21,62 @@ - Create job execution record - Table input - Y - - - Filter rows - Change job status on success + property_create_job + property_input Y Detect empty stream - Change job status on success + property_success_job Y - Change job status on error + property_failure_job Abort Y - Table output - Change job status on error + property_output + property_failure_job Y - Update - Change job status on error + property_update_flags + property_failure_job Y - Table input + property_input + Detect empty stream + Y + + + property_input + property_output + Y + + + property_input + property_group_by + Y + + + property_group_by + property_update_flags + Y + + + Filter rows + property_success_job + Y + + + property_output Identify last row in a stream Y Identify last row in a stream - Table output - Y - - - Identify last row in a stream - sort_by_recruit_id - Y - - - sort_by_recruit_id - Update - Y - - - Identify last row in a stream - Detect empty stream - Y - - - Table output Filter rows Y @@ -97,105 +97,8 @@ 0 - 1376 - 528 - -
- - Change job status on error - ExecSql - - Y - - 1 - - none - - - - - error_description - - - ervu-dashboard - Y - N - Y - Y - N - UPDATE etl.job_execution -SET status = 'ERROR', - error_description = ?, - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1120 - 528 - - - - Change job status on success - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - Y - N - Y - N - N - UPDATE etl.job_execution -SET status = 'SUCCESS', - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1376 - 160 - - - - Create job execution record - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - N - N - Y - N - N - INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) -VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - - - - - 288 - 320 + 1392 + 864 @@ -212,7 +115,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); 688 - 160 + 0 @@ -244,10 +147,10 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - Change job status on success + property_success_job - 1376 + 1408 320 @@ -255,7 +158,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); Identify last row in a stream DetectLastRow - N + Y 1 @@ -265,13 +168,13 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); last_row - 688 + 1248 320
- Table input - TableInput + property_create_job + ExecSql Y @@ -280,168 +183,271 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); none + + ervu-dashboard N - WITH -base as( + N + Y + N + N + INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) +VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); + + + + + 288 + 320 + + + + property_failure_job + ExecSql + + Y + + 1 + + none + + + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'ERROR', + error_description = ?, + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1056 + 864 + + + + property_group_by + GroupBy + + Y + + 1 + + none + + + N + N + ${java.io.tmpdir} + + + has_property + has_property + MAX + + + has_ground_transport + has_ground_transport + MAX + + + has_air_transport + has_air_transport + MAX + + + has_water_transport + has_water_transport + MAX + + + has_possibility_reg_residence + has_possibility_reg_residence + MAX + + + N + + + recruit_id + + + N + grp + + + 688 + 576 + + + + property_input + TableInput + + N + + 1 + + none + + + ervu-dashboard + N + WITH base AS ( SELECT ri.recruit_id, - ri.info + ri.info, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedON' -> 'on') = 'array' AS has_realty, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') = 'array' AS has_ground, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') = 'array' AS has_air, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') = 'array' AS has_water FROM ervu_dashboard.recruits_info ri - join ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) + WHERE + '${IDM_ID}' != '' AND + COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' AND + (jsonb_typeof(info -> 'svedFL' -> 'svedON' -> 'on') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') = 'array') ), -prop AS ( - SELECT - base.recruit_id, - 'realty' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedON'->'on') = 'array' - THEN base.info->'svedFL'->'svedON'->'on' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedON'->'on') = 'array' - - UNION ALL - - SELECT - base.recruit_id, - 'ground_transportation' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedNazTS'->'nazTS') = 'array' - THEN base.info->'svedFL'->'svedNazTS'->'nazTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedNazTS'->'nazTS') = 'array' - - UNION ALL - - SELECT - base.recruit_id, - 'air_vehicles' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedVozTS'->'vozTS') = 'array' - THEN base.info->'svedFL'->'svedVozTS'->'vozTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedVozTS'->'vozTS') = 'array' - - UNION ALL - - SELECT - base.recruit_id, - 'water_vehicles' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedVodTS'->'vodTS') = 'array' - THEN base.info->'svedFL'->'svedVodTS'->'vodTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedVodTS'->'vodTS') = 'array' -), -all_data as( - -- Недвижимость - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - property_data->>'naimVidPrav' AS kind_right, -- Исправлено название поля - property_data->'adrObSob'->>'adrObSobTekst' AS address, - property_data->>'kodVidPrav' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - NULL AS vehicle_category, - to_date(property_data->>'dataRegPrav', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPrav', 'YYYY-MM-DD') AS close_date, - 'realty' AS type_property, - true AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'realty' - UNION ALL + prop AS ( + SELECT + base.recruit_id, + 'realty' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedON' -> 'on') AS property_data ON true + WHERE base.has_realty - -- Наземный транспорт - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'vidTS' AS property_code, - property_data->>'godVyp' AS issue_year, - property_data->>'markaTS' AS brand_model, - property_data->>'gosRegZn' AS reg_plate, - property_data->>'idenNom' AS vin, - NULL AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'ground_transportation' AS type_property, - false AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'ground_transportation' + UNION ALL - UNION ALL + SELECT + base.recruit_id, + 'ground_transportation' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') AS property_data ON true + WHERE base.has_ground - -- Воздушный транспорт - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'naimVid' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - property_data->>'naznKat' AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'air_vehicles' AS type_property, - false AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'air_vehicles' + UNION ALL - UNION ALL + SELECT + base.recruit_id, + 'air_vehicles' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') AS property_data ON true + WHERE base.has_air - -- Водный транспорт - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'naimVid' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - property_data->>'naznKat' AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'water_vehicles' AS type_property, - false AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'water_vehicles' -) -select + UNION ALL + + SELECT + base.recruit_id, + 'water_vehicles' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') AS property_data ON true + WHERE base.has_water + ), + + all_data AS ( + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD') AS source_update_date, + property_data ->> 'naimVidPrav' AS kind_right, + property_data -> 'adrObSob' ->> 'adrObSobTekst' AS address, + property_data ->> 'kodVidPrav' AS property_code, + NULL AS issue_year, + NULL AS brand_model, + NULL AS reg_plate, + NULL AS vin, + NULL AS vehicle_category, + to_date(property_data ->> 'dataRegPrav', 'YYYY-MM-DD') AS start_date, + to_date(property_data ->> 'dataPrekPrav', 'YYYY-MM-DD') AS close_date, + 'realty' AS type_property, + true AS has_possibility_reg_residence, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'realty' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL AS kind_right, + NULL AS address, + property_data ->> 'vidTS', + property_data ->> 'godVyp', + property_data ->> 'markaTS', + property_data ->> 'gosRegZn', + property_data ->> 'idenNom', + NULL AS vehicle_category, + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'ground_transportation', + false, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'ground_transportation' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL, + NULL, + property_data ->> 'naimVid', + NULL, + NULL, + NULL, + NULL, + property_data ->> 'naznKat', + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'air_vehicles', + false, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'air_vehicles' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL, + NULL, + property_data ->> 'naimVid', + NULL, + NULL, + NULL, + NULL, + property_data ->> 'naznKat', + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'water_vehicles', + false, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'water_vehicles' + ) + +SELECT recruit_id, source_update_date, kind_right, @@ -455,33 +461,23 @@ select start_date, close_date, type_property, - has_possibility_reg_residence, - CASE - WHEN type_property = 'realty' THEN true - ELSE false - END AS has_property, -- недвиж//has_property - CASE - WHEN type_property = 'ground_transportation' THEN true - ELSE false - END AS has_ground_transport, -- наземный транспорт//has_ground_transport - CASE - WHEN type_property = 'air_vehicles' THEN true - ELSE false - END AS has_air_transport, -- воздушный транспорт//has_air_transport - CASE - WHEN type_property = 'water_vehicles' THEN true - ELSE false - END AS has_water_transport -- водный транспорт//has_water_transport -from all_data + has_possibility_reg_residence AS has_possibility_reg_residence, + source_id, + type_property = 'realty' AS has_property, + type_property = 'ground_transportation' AS has_ground_transport, + type_property = 'air_vehicles' AS has_air_transport, + type_property = 'water_vehicles' AS has_water_transport +FROM all_data +ORDER BY recruit_id; Y - 496 + 688 320 - Table output + property_output TableOutput Y @@ -546,6 +542,10 @@ from all_data source_update_date source_update_date + + source_id + source_id + N N @@ -562,12 +562,44 @@ from all_data Y - 912 + 1056 320 - Update + property_success_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'SUCCESS', + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1408 + 0 + + + + property_update_flags Update N @@ -612,49 +644,15 @@ from all_data N Y - - 912 - 528 - - - - sort_by_recruit_id - SortRows - - Y - - 1 - - none - - - ${java.io.tmpdir} - srt - 100000 - - N - - Y - - - recruit_id - Y - N - N - 0 - N - - - 688 - 528 + 864 - Table output - Change job status on error + property_output + property_failure_job Y error_description @@ -665,8 +663,8 @@ from all_data - Update - Change job status on error + property_update_flags + property_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/property/parallel/property_flow_delta.hpl b/mappings/info_recruits/citizen_tables/property/parallel/property_flow_delta.hpl index af83226..db58253 100644 --- a/mappings/info_recruits/citizen_tables/property/parallel/property_flow_delta.hpl +++ b/mappings/info_recruits/citizen_tables/property/parallel/property_flow_delta.hpl @@ -22,12 +22,7 @@ Create job execution record - Table input - Y - - - Detect empty stream - Change job status on success + property_input Y @@ -41,45 +36,50 @@ Y - Table input - Identify last row in a stream - Y - - - Identify last row in a stream - Insert / update - Y - - - Identify last row in a stream - sort_by_recruit_id - Y - - - Identify last row in a stream - Detect empty stream - Y - - - Update + property_update_flags Change job status on error Y - sort_by_recruit_id - Update - Y - - - Insert / update + property_insert_or_update Change job status on error Y - Insert / update + property_input + property_group_by + Y + + + property_group_by + property_update_flags + Y + + + property_input + Filter rows 2 + Y + + + Filter rows 2 + property_insert_or_update + Y + + + Identify last row in a stream Filter rows Y + + property_insert_or_update + Detect empty stream 2 + Y + + + Detect empty stream 2 + Identify last row in a stream + Y + Abort @@ -97,8 +97,8 @@ 0 - 1456 - 512 + 1520 + 768 @@ -133,8 +133,8 @@ and recruitment_id = '${IDM_ID}'; - 1200 - 512 + 1088 + 768 @@ -166,7 +166,7 @@ and recruitment_id = '${IDM_ID}'; 1520 - 208 + 256 @@ -198,11 +198,11 @@ and recruitment_id = '${IDM_ID}'; 352 - 352 + 560 - Detect empty stream + Detect empty stream 2 DetectEmptyStream Y @@ -214,8 +214,8 @@ and recruitment_id = '${IDM_ID}'; - 768 - 208 + 1296 + 560 @@ -251,14 +251,51 @@ and recruitment_id = '${IDM_ID}'; 1520 - 352 + 416 + + + + Filter rows 2 + FilterRows + + Y + + 1 + + none + + + + + + + = + need_update + N + - + + N + -1 + constant + -1 + Y + Boolean + + + + Sample rows + property_insert_or_update + + + 864 + 560 Identify last row in a stream DetectLastRow - N + Y 1 @@ -268,12 +305,313 @@ and recruitment_id = '${IDM_ID}'; last_row - 768 - 352 + 1520 + 560 - Insert / update + property_update_flags + Update + + N + + 1 + + none + + + 10000 + ervu-dashboard + Y + + + + = + recruit_id + recruit_id + + + ervu_dashboard + citizen
+ + has_property + has_property + + + has_ground_transport + has_ground_transport + + + has_air_transport + has_air_transport + + + has_water_transport + has_water_transport + + + has_possibility_reg_residence + has_possibility_reg_residence + +
+ N + Y + + + 1088 + 976 + +
+ + property_group_by + GroupBy + + Y + + 1 + + none + + + N + N + ${java.io.tmpdir} + + + has_property + has_property + MAX + + + has_ground_transport + has_ground_transport + MAX + + + has_air_transport + has_air_transport + MAX + + + has_water_transport + has_water_transport + MAX + + + has_possibility_reg_residence + has_possibility_reg_residence + MAX + + + N + + + recruit_id + + + N + grp + + + 640 + 976 + + + + property_input + TableInput + + N + + 1 + + none + + + ervu-dashboard + N + WITH base AS ( + SELECT + ri.recruit_id, + ri.info, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedON' -> 'on') = 'array' AS has_realty, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') = 'array' AS has_ground, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') = 'array' AS has_air, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') = 'array' AS has_water + FROM ervu_dashboard.recruits_info ri + WHERE + '${IDM_ID}' != '' AND + COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' AND + (jsonb_typeof(info -> 'svedFL' -> 'svedON' -> 'on') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') = 'array') +), + + prop AS ( + SELECT + base.recruit_id, + 'realty' AS property_type, + property_data.value AS property_data, + to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date AS need_update + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedON' -> 'on') AS property_data ON true + WHERE base.has_realty + + UNION ALL + + SELECT + base.recruit_id, + 'ground_transportation' AS property_type, + property_data.value AS property_data, + to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date AS need_update + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') AS property_data ON true + WHERE base.has_ground + + UNION ALL + + SELECT + base.recruit_id, + 'air_vehicles' AS property_type, + property_data.value AS property_data, + to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date AS need_update + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') AS property_data ON true + WHERE base.has_air + + UNION ALL + + SELECT + base.recruit_id, + 'water_vehicles' AS property_type, + property_data.value AS property_data, + to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date AS need_update + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') AS property_data ON true + WHERE base.has_water + ), + + all_data AS ( + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD') AS source_update_date, + property_data ->> 'naimVidPrav' AS kind_right, + property_data -> 'adrObSob' ->> 'adrObSobTekst' AS address, + property_data ->> 'kodVidPrav' AS property_code, + NULL AS issue_year, + NULL AS brand_model, + NULL AS reg_plate, + NULL AS vin, + NULL AS vehicle_category, + to_date(property_data ->> 'dataRegPrav', 'YYYY-MM-DD') AS start_date, + to_date(property_data ->> 'dataPrekPrav', 'YYYY-MM-DD') AS close_date, + 'realty' AS type_property, + true AS has_possibility_reg_residence, + property_data ->> 'id' AS source_id, + prop.need_update + FROM prop + WHERE prop.property_type = 'realty' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL AS kind_right, + NULL AS address, + property_data ->> 'vidTS', + property_data ->> 'godVyp', + property_data ->> 'markaTS', + property_data ->> 'gosRegZn', + property_data ->> 'idenNom', + NULL AS vehicle_category, + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'ground_transportation', + false, + property_data ->> 'id' AS source_id, + prop.need_update + FROM prop + WHERE prop.property_type = 'ground_transportation' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL, + NULL, + property_data ->> 'naimVid', + NULL, + NULL, + NULL, + NULL, + property_data ->> 'naznKat', + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'air_vehicles', + false, + property_data ->> 'id' AS source_id, + prop.need_update + FROM prop + WHERE prop.property_type = 'air_vehicles' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL, + NULL, + property_data ->> 'naimVid', + NULL, + NULL, + NULL, + NULL, + property_data ->> 'naznKat', + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'water_vehicles', + false, + property_data ->> 'id' AS source_id, + prop.need_update + FROM prop + WHERE prop.property_type = 'water_vehicles' + ) + +SELECT + recruit_id, + source_update_date, + kind_right, + address, + property_code, + issue_year::int, + brand_model, + reg_plate, + vin, + vehicle_category, + start_date, + close_date, + type_property, + has_possibility_reg_residence AS has_possibility_reg_residence, + source_id, + need_update, + type_property = 'realty' AS has_property, + type_property = 'ground_transportation' AS has_ground_transport, + type_property = 'air_vehicles' AS has_air_transport, + type_property = 'water_vehicles' AS has_water_transport +FROM all_data +ORDER BY recruit_id; + Y + + + 640 + 560 + + + + property_insert_or_update InsertUpdate Y @@ -288,18 +626,8 @@ and recruitment_id = '${IDM_ID}'; = - recruit_id - recruit_id - - - = - start_date - start_date - - - = - type_property - type_property + source_id + source_id ervu_dashboard property
@@ -368,313 +696,22 @@ and recruitment_id = '${IDM_ID}'; source_update_date Y + + source_id + source_id + N +
N - 960 - 352 - -
- - Table input - TableInput - - Y - - 1 - - none - - - ervu-dashboard - N - WITH -base as( - SELECT - ri.recruit_id, - ri.info - FROM ervu_dashboard.recruits_info ri - JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) -), -prop AS ( - SELECT - base.recruit_id, - 'realty' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedON'->'on') = 'array' - THEN base.info->'svedFL'->'svedON'->'on' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedON'->'on') = 'array' - AND to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - - UNION ALL - - SELECT - base.recruit_id, - 'ground_transportation' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedNazTS'->'nazTS') = 'array' - THEN base.info->'svedFL'->'svedNazTS'->'nazTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedNazTS'->'nazTS') = 'array' - AND to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - - UNION ALL - - SELECT - base.recruit_id, - 'air_vehicles' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedVozTS'->'vozTS') = 'array' - THEN base.info->'svedFL'->'svedVozTS'->'vozTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedVozTS'->'vozTS') = 'array' - AND to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - - UNION ALL - - SELECT - base.recruit_id, - 'water_vehicles' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedVodTS'->'vodTS') = 'array' - THEN base.info->'svedFL'->'svedVodTS'->'vodTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedVodTS'->'vodTS') = 'array' - AND to_date(property_data->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date -), -all_data as( - -- Недвижимость - SELECT - prop.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - property_data->>'naimVidPrav' AS kind_right, - property_data->'adrObSob'->>'adrObSobTekst' AS address, - property_data->>'kodVidPrav' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - NULL AS vehicle_category, - to_date(property_data->>'dataRegPrav', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPrav', 'YYYY-MM-DD') AS close_date, - 'realty' AS type_property, - true AS has_possibility_reg_residence - FROM prop - WHERE prop.property_type = 'realty' - - UNION ALL - - -- Наземный транспорт - SELECT - prop.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'vidTS' AS property_code, - property_data->>'godVyp' AS issue_year, - property_data->>'markaTS' AS brand_model, - property_data->>'gosRegZn' AS reg_plate, - property_data->>'idenNom' AS vin, - NULL AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'ground_transportation' AS type_property, - false AS has_possibility_reg_residence - FROM prop - WHERE prop.property_type = 'ground_transportation' - - UNION ALL - - -- Воздушный транспорт - SELECT - prop.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'naimVid' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - property_data->>'naznKat' AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'air_vehicles' AS type_property, - false AS has_possibility_reg_residence - FROM prop - WHERE prop.property_type = 'air_vehicles' - - UNION ALL - - -- Водный транспорт - SELECT - prop.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'naimVid' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - property_data->>'naznKat' AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'water_vehicles' AS type_property, - false AS has_possibility_reg_residence - FROM prop - WHERE prop.property_type = 'water_vehicles' -) -SELECT - recruit_id, - source_update_date, - kind_right, - address, - property_code, - issue_year::int, - brand_model, - reg_plate, - vin, - vehicle_category, - start_date, - close_date, - type_property, - has_possibility_reg_residence, - CASE - WHEN type_property = 'realty' THEN true - ELSE false - END AS has_property, - CASE - WHEN type_property = 'ground_transportation' THEN true - ELSE false - END AS has_ground_transport, - CASE - WHEN type_property = 'air_vehicles' THEN true - ELSE false - END AS has_air_transport, - CASE - WHEN type_property = 'water_vehicles' THEN true - ELSE false - END AS has_water_transport -FROM all_data - Y - - - 560 - 352 - - - - Update - Update - - N - - 1 - - none - - - 10000 - ervu-dashboard - Y - - - = - recruit_id - recruit_id - - ervu_dashboard - citizen
- - has_property - has_property - - - has_ground_transport - has_ground_transport - - - has_air_transport - has_air_transport - - - has_water_transport - has_water_transport - - - has_possibility_reg_residence - has_possibility_reg_residence - -
- N - Y - - - 960 - 512 - -
- - sort_by_recruit_id - SortRows - - Y - - 1 - - none - - - ${java.io.tmpdir} - srt - 100000 - - N - - Y - - - recruit_id - Y - N - N - 0 - N - - - - - 768 - 512 + 1088 + 560 - Insert / update + property_update_flags Change job status on error Y @@ -686,7 +723,7 @@ FROM all_data - Update + property_insert_or_update Change job status on error Y diff --git a/mappings/info_recruits/citizen_tables/property/parallel/property_flow_repeat.hpl b/mappings/info_recruits/citizen_tables/property/parallel/property_flow_repeat.hpl index ce2a248..9d6f673 100644 --- a/mappings/info_recruits/citizen_tables/property/parallel/property_flow_repeat.hpl +++ b/mappings/info_recruits/citizen_tables/property/parallel/property_flow_repeat.hpl @@ -22,12 +22,7 @@ Create job execution record - Table input - Y - - - Filter rows - Change job status on success + property_input Y @@ -41,42 +36,47 @@ Y - Insert / update + property_insert_or_update Change job status on error Y - - Table input - Identify last row in a stream - Y - - - Identify last row in a stream - Insert / update - Y - - - Identify last row in a stream - sort_by_recruit_id - Y - - - Identify last row in a stream - Detect empty stream - Y - - - sort_by_recruit_id - Update - Y - Update Change job status on error Y - Insert / update + property_input + property_insert_or_update + Y + + + property_input + Detect empty stream + Y + + + property_input + property_group_by + Y + + + property_group_by + Update + Y + + + Filter rows + Change job status on success + Y + + + property_insert_or_update + Identify last row in a stream + Y + + + Identify last row in a stream Filter rows Y @@ -98,7 +98,7 @@ 1744 - 688 + 832 @@ -134,7 +134,7 @@ and recruitment_id = '${IDM_ID}'; 1328 - 688 + 832 @@ -166,7 +166,7 @@ and recruitment_id = '${IDM_ID}'; 1744 - 352 + 256 @@ -230,7 +230,7 @@ DO UPDATE SET 768 - 352 + 256 @@ -273,7 +273,7 @@ DO UPDATE SET Identify last row in a stream DetectLastRow - N + Y 1 @@ -283,12 +283,12 @@ DO UPDATE SET last_row - 768 + 1552 496 - Insert / update + property_insert_or_update InsertUpdate Y @@ -309,14 +309,8 @@ DO UPDATE SET = - start_date - start_date - - - - = - type_property - type_property + source_id + source_id ervu_dashboard @@ -386,19 +380,24 @@ DO UPDATE SET source_update_date Y + + source_id + source_id + N + Y - 1024 + 1328 496 - Table input + property_input TableInput - Y + N 1 @@ -407,166 +406,150 @@ DO UPDATE SET ervu-dashboard N - WITH -base as( + + WITH base AS ( SELECT ri.recruit_id, - ri.info + ri.info, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedON' -> 'on') = 'array' AS has_realty, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') = 'array' AS has_ground, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') = 'array' AS has_air, + jsonb_typeof(ri.info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') = 'array' AS has_water FROM ervu_dashboard.recruits_info ri - join ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) + WHERE + '${IDM_ID}' != '' AND + COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' AND + (jsonb_typeof(info -> 'svedFL' -> 'svedON' -> 'on') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') = 'array' OR + jsonb_typeof(info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') = 'array') ), -prop AS ( - SELECT - base.recruit_id, - 'realty' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedON'->'on') = 'array' - THEN base.info->'svedFL'->'svedON'->'on' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedON'->'on') = 'array' - - UNION ALL - - SELECT - base.recruit_id, - 'ground_transportation' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedNazTS'->'nazTS') = 'array' - THEN base.info->'svedFL'->'svedNazTS'->'nazTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedNazTS'->'nazTS') = 'array' - - UNION ALL - - SELECT - base.recruit_id, - 'air_vehicles' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedVozTS'->'vozTS') = 'array' - THEN base.info->'svedFL'->'svedVozTS'->'vozTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedVozTS'->'vozTS') = 'array' - - UNION ALL - - SELECT - base.recruit_id, - 'water_vehicles' AS property_type, - property_data.value AS property_data - FROM base - LEFT JOIN LATERAL jsonb_array_elements( - CASE - WHEN jsonb_typeof(base.info->'svedFL'->'svedVodTS'->'vodTS') = 'array' - THEN base.info->'svedFL'->'svedVodTS'->'vodTS' - ELSE '[]'::jsonb - END - ) AS property_data ON true - WHERE jsonb_typeof(base.info->'svedFL'->'svedVodTS'->'vodTS') = 'array' -), -all_data as( - -- Недвижимость - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - property_data->>'naimVidPrav' AS kind_right, -- Исправлено название поля - property_data->'adrObSob'->>'adrObSobTekst' AS address, - property_data->>'kodVidPrav' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - NULL AS vehicle_category, - to_date(property_data->>'dataRegPrav', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPrav', 'YYYY-MM-DD') AS close_date, - 'realty' AS type_property, - true AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'realty' - UNION ALL + prop AS ( + SELECT + base.recruit_id, + 'realty' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedON' -> 'on') AS property_data ON true + WHERE base.has_realty - -- Наземный транспорт - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'vidTS' AS property_code, - property_data->>'godVyp' AS issue_year, - property_data->>'markaTS' AS brand_model, - property_data->>'gosRegZn' AS reg_plate, - property_data->>'idenNom' AS vin, - NULL AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'ground_transportation' AS type_property, - false AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'ground_transportation' + UNION ALL - UNION ALL + SELECT + base.recruit_id, + 'ground_transportation' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedNazTS' -> 'nazTS') AS property_data ON true + WHERE base.has_ground - -- Воздушный транспорт - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'naimVid' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - property_data->>'naznKat' AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'air_vehicles' AS type_property, - false AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'air_vehicles' + UNION ALL - UNION ALL + SELECT + base.recruit_id, + 'air_vehicles' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedVozTS' -> 'vozTS') AS property_data ON true + WHERE base.has_air - -- Водный транспорт - SELECT - base.recruit_id, - to_date(property_data->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - NULL AS kind_right, - NULL AS address, - property_data->>'naimVid' AS property_code, - NULL AS issue_year, - NULL AS brand_model, - NULL AS reg_plate, - NULL AS vin, - property_data->>'naznKat' AS vehicle_category, - to_date(property_data->>'dataRegPr', 'YYYY-MM-DD') AS start_date, - to_date(property_data->>'dataPrekPr', 'YYYY-MM-DD') AS close_date, - 'water_vehicles' AS type_property, - false AS has_possibility_reg_residence - FROM base - JOIN prop ON base.recruit_id = prop.recruit_id - WHERE prop.property_type = 'water_vehicles' -) -select + UNION ALL + + SELECT + base.recruit_id, + 'water_vehicles' AS property_type, + property_data.value AS property_data + FROM base + JOIN LATERAL jsonb_array_elements(base.info -> 'svedFL' -> 'svedVodTS' -> 'vodTS') AS property_data ON true + WHERE base.has_water + ), + + all_data AS ( + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD') AS source_update_date, + property_data ->> 'naimVidPrav' AS kind_right, + property_data -> 'adrObSob' ->> 'adrObSobTekst' AS address, + property_data ->> 'kodVidPrav' AS property_code, + NULL AS issue_year, + NULL AS brand_model, + NULL AS reg_plate, + NULL AS vin, + NULL AS vehicle_category, + to_date(property_data ->> 'dataRegPrav', 'YYYY-MM-DD') AS start_date, + to_date(property_data ->> 'dataPrekPrav', 'YYYY-MM-DD') AS close_date, + 'realty' AS type_property, + true AS has_possibility_reg_residence, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'realty' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL AS kind_right, + NULL AS address, + property_data ->> 'vidTS', + property_data ->> 'godVyp', + property_data ->> 'markaTS', + property_data ->> 'gosRegZn', + property_data ->> 'idenNom', + NULL AS vehicle_category, + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'ground_transportation', + false, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'ground_transportation' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL, + NULL, + property_data ->> 'naimVid', + NULL, + NULL, + NULL, + NULL, + property_data ->> 'naznKat', + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'air_vehicles', + false, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'air_vehicles' + + UNION ALL + + SELECT + prop.recruit_id, + to_date(property_data ->> 'dataSved', 'YYYY-MM-DD'), + NULL, + NULL, + property_data ->> 'naimVid', + NULL, + NULL, + NULL, + NULL, + property_data ->> 'naznKat', + to_date(property_data ->> 'dataRegPr', 'YYYY-MM-DD'), + to_date(property_data ->> 'dataPrekPr', 'YYYY-MM-DD'), + 'water_vehicles', + false, + property_data ->> 'id' AS source_id + FROM prop + WHERE prop.property_type = 'water_vehicles' + ) + +SELECT recruit_id, source_update_date, kind_right, @@ -580,28 +563,18 @@ select start_date, close_date, type_property, - has_possibility_reg_residence, - CASE - WHEN type_property = 'realty' THEN true - ELSE false - END AS has_property, -- недвиж//has_property - CASE - WHEN type_property = 'ground_transportation' THEN true - ELSE false - END AS has_ground_transport, -- наземный транспорт//has_ground_transport - CASE - WHEN type_property = 'air_vehicles' THEN true - ELSE false - END AS has_air_transport, -- воздушный транспорт//has_air_transport - CASE - WHEN type_property = 'water_vehicles' THEN true - ELSE false - END AS has_water_transport -- водный транспорт//has_water_transport -from all_data + has_possibility_reg_residence AS has_possibility_reg_residence, + source_id, + type_property = 'realty' AS has_property, + type_property = 'ground_transportation' AS has_ground_transport, + type_property = 'air_vehicles' AS has_air_transport, + type_property = 'water_vehicles' AS has_water_transport +FROM all_data +ORDER BY recruit_id; Y - 496 + 768 496 @@ -652,13 +625,13 @@ from all_data Y - 992 - 688 + 768 + 832 - sort_by_recruit_id - SortRows + property_group_by + GroupBy Y @@ -667,32 +640,53 @@ from all_data none + N + N ${java.io.tmpdir} - srt - 100000 - - N - - Y - recruit_id - Y - N - N - 0 - N + has_property + has_property + MAX + + + has_ground_transport + has_ground_transport + MAX + + + has_air_transport + has_air_transport + MAX + + + has_water_transport + has_water_transport + MAX + + + has_possibility_reg_residence + has_possibility_reg_residence + MAX + N + + + recruit_id + + + N + grp 768 - 688 + 704 - Insert / update + property_insert_or_update Change job status on error Y