From 6fd41db91b6410bdb47fdd54b0d7eb3e1a37b178 Mon Sep 17 00:00:00 2001 From: "adel.ka" Date: Fri, 14 Nov 2025 14:05:46 +0300 Subject: [PATCH] =?UTF-8?q?=D1=84=D0=B8=D0=BA=D1=81=20rdi=20=D0=B8=20rd?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parallel/restriction_document_flow.hpl | 194 +++++++------- .../restriction_document_flow_delta.hpl | 90 +++---- .../restriction_document_flow_repeat.hpl | 138 +++++----- .../recruitment_five_flow_delta.hpl | 225 +++++++++++++--- .../restriction_document_item_flow.hpl | 224 ++++++++-------- .../restriction_document_item_flow_delta.hpl | 240 +++++++++--------- .../restriction_document_item_flow_repeat.hpl | 227 ++++++++--------- .../recruitment_five_flow_delta.hpl | 217 +++++++++++++--- 8 files changed, 911 insertions(+), 644 deletions(-) diff --git a/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow.hpl b/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow.hpl index eaa3249..ca36184 100644 --- a/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow.hpl +++ b/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow.hpl @@ -21,53 +21,82 @@ - Create job execution record - Table input + Create_job_execution_record_RD + input_RD Y - Filter rows - Change job status on success + filter_rows_RD + сhange_status_on_success_RD Y - Detect empty stream - Change job status on success + detect_empty_stream_RD + сhange_status_on_success_RD Y - Change job status on error - Abort + change_job_status_on_error_RD + abort_RD Y - Table input - Identify last row in a stream + output_RD + change_job_status_on_error_RD Y - Identify last row in a stream - Table output + output_RD + detect_empty_stream_RD Y - Table output - Filter rows + output_RD + identify_last_row_in_a_stream_RD Y - Table output - Change job status on error + identify_last_row_in_a_stream_RD + filter_rows_RD Y - Table output - Detect empty stream + input_RD + output_RD Y - Abort + Create_job_execution_record_RD + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + N + N + Y + N + N + INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) +VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); + + + + + 288 + 400 + + + + abort_RD Abort Y @@ -83,11 +112,11 @@ 1376 - 512 + 592 - Change job status on error + change_job_status_on_error_RD ExecSql Y @@ -118,71 +147,11 @@ and recruitment_id = '${IDM_ID}'; 1024 - 512 + 592 - Change job status on success - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - Y - N - Y - N - N - UPDATE etl.job_execution -SET status = 'SUCCESS' -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1632 - 160 - - - - Create job execution record - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - N - N - Y - N - N - INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) -VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - - - - - 288 - 320 - - - - Detect empty stream + detect_empty_stream_RD DetectEmptyStream Y @@ -195,11 +164,11 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); 1632 - 320 + 400 - Filter rows + filter_rows_RD FilterRows Y @@ -227,15 +196,15 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - Change job status on success + сhange_status_on_success_RD 1024 - 160 + 64 - Identify last row in a stream + identify_last_row_in_a_stream_RD DetectLastRow Y @@ -248,12 +217,12 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); last_row - 752 - 320 + 1024 + 224 - Table input + input_RD TableInput N @@ -275,12 +244,12 @@ WHERE Y - 496 - 320 + 640 + 400 - Table output + output_RD TableOutput N @@ -366,13 +335,44 @@ WHERE 1024 - 320 + 400 + + + + сhange_status_on_success_RD + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'SUCCESS' +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1632 + 64 - Table output - Change job status on error + output_RD + change_job_status_on_error_RD Y error_description diff --git a/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_delta.hpl b/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_delta.hpl index 592136d..1f79c2a 100644 --- a/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_delta.hpl +++ b/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_delta.hpl @@ -21,53 +21,53 @@ - Create job execution record - Table input + create_job_execution_record_RD + table_input_RD Y - Table input - Identify last row in a stream + detect_empty_stream_RD + change_job_status_on_success_RD Y - Detect empty stream - Change job status on success + filter_rows_RD + change_job_status_on_success_RD Y - Filter rows - Change job status on success + change_job_status_on_error_RD + abort_RD Y - Change job status on error - Abort + insert_or_update_RD + detect_empty_stream_RD Y - Identify last row in a stream - Insert / update + insert_or_update_RD + change_job_status_on_error_RD Y - Insert / update - Filter rows + insert_or_update_RD + identify_last_row_in_a_stream_RD Y - Insert / update - Detect empty stream + identify_last_row_in_a_stream_RD + filter_rows_RD Y - Insert / update - Change job status on error + table_input_RD + insert_or_update_RD Y - Abort + abort_RD Abort Y @@ -83,11 +83,11 @@ 1168 - 400 + 544 - Change job status on error + change_job_status_on_error_RD ExecSql Y @@ -118,11 +118,11 @@ and recruitment_id = '${IDM_ID}'; 944 - 400 + 544 - Change job status on success + change_job_status_on_success_RD ExecSql Y @@ -153,7 +153,7 @@ and recruitment_id = '${IDM_ID}'; - Create job execution record + create_job_execution_record_RD ExecSql Y @@ -181,11 +181,11 @@ and recruitment_id = '${IDM_ID}'; 336 - 224 + 368 - Detect empty stream + detect_empty_stream_RD DetectEmptyStream Y @@ -198,11 +198,11 @@ and recruitment_id = '${IDM_ID}'; 1248 - 224 + 368 - Filter rows + filter_rows_RD FilterRows Y @@ -230,7 +230,7 @@ and recruitment_id = '${IDM_ID}'; - Change job status on success + change_job_status_on_success_RD 944 @@ -238,7 +238,7 @@ and recruitment_id = '${IDM_ID}'; - Identify last row in a stream + identify_last_row_in_a_stream_RD DetectLastRow Y @@ -251,12 +251,12 @@ and recruitment_id = '${IDM_ID}'; last_row - 736 - 224 + 944 + 208 - Insert / update + insert_or_update_RD InsertUpdate N @@ -274,11 +274,6 @@ and recruitment_id = '${IDM_ID}'; id id - - = - created_at - created_at - ervu_dashboard restriction_document
@@ -289,12 +284,12 @@ and recruitment_id = '${IDM_ID}'; subpoena_id subpoena_id - Y + N created_at created_at - N + Y updated_at @@ -356,11 +351,11 @@ and recruitment_id = '${IDM_ID}'; 944 - 224 + 368
- Table input + table_input_RD TableInput N @@ -372,26 +367,25 @@ and recruitment_id = '${IDM_ID}'; postgres.subpoena N - SELECT * FROM public.restriction_document WHERE '${IDM_ID}' != '' -- Проверка на пустую строку AND vk_id = '${IDM_ID}' - AND updated_at >= '${M_RESTRDOC_UP_DATE}'::timestamp + AND updated_at >= '${MAX_UPDATE_DATE}'::timestamp Y - 544 - 224 + 640 + 368 - Insert / update - Change job status on error + insert_or_update_RD + change_job_status_on_error_RD Y error_description diff --git a/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_repeat.hpl b/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_repeat.hpl index b74cd46..8ca5fab 100644 --- a/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_repeat.hpl +++ b/mappings/info_recruits/raw_data/restriction_document/parallel/restriction_document_flow_repeat.hpl @@ -21,53 +21,53 @@ - Create job execution record - Table input + create_job_execution_record_RD + input_RD Y - Table input - Identify last row in a stream + filter_rows_RD + change_job_status_on_success_RD Y - Filter rows - Change job status on success + detect_empty_stream_RD + change_job_status_on_success_RD Y - Detect empty stream - Change job status on success + change_job_status_on_error_RD + abort_RD Y - Change job status on error - Abort + insert_or_update_RD + detect_empty_stream_RD Y - Identify last row in a stream - Insert / update + insert_or_update_RD + change_job_status_on_error_RD Y - Insert / update - Filter rows + insert_or_update_RD + identify_last_row_in_a_stream_RD Y - Insert / update - Detect empty stream + identify_last_row_in_a_stream_RD + filter_rows_RD Y - Insert / update - Change job status on error + input_RD + insert_or_update_RD Y - Abort + abort_RD Abort Y @@ -83,11 +83,11 @@ 1248 - 400 + 656 - Change job status on error + change_job_status_on_error_RD ExecSql Y @@ -118,11 +118,11 @@ and recruitment_id = '${IDM_ID}'; 960 - 400 + 656 - Change job status on success + change_job_status_on_success_RD ExecSql Y @@ -153,7 +153,7 @@ and recruitment_id = '${IDM_ID}'; - Create job execution record + create_job_execution_record_RD ExecSql Y @@ -196,11 +196,11 @@ DO UPDATE SET 272 - 208 + 416 - Detect empty stream + detect_empty_stream_RD DetectEmptyStream Y @@ -213,11 +213,11 @@ DO UPDATE SET 1440 - 208 + 416 - Filter rows + filter_rows_RD FilterRows Y @@ -245,7 +245,7 @@ DO UPDATE SET - Change job status on success + change_job_status_on_success_RD 960 @@ -253,7 +253,7 @@ DO UPDATE SET - Identify last row in a stream + identify_last_row_in_a_stream_RD DetectLastRow Y @@ -266,12 +266,39 @@ DO UPDATE SET last_row - 720 - 208 + 960 + 224 - Insert / update + input_RD + TableInput + + N + + 1 + + none + + + postgres.subpoena + N + SELECT +* +FROM public.restriction_document +WHERE + '${IDM_ID}' != '' -- Проверка на пустую строку + AND vk_id = '${IDM_ID}' + + Y + + + 608 + 416 + + + + insert_or_update_RD InsertUpdate N @@ -288,11 +315,7 @@ DO UPDATE SET = id id - - - = - created_at - created_at + ervu_dashboard restriction_document
@@ -304,12 +327,12 @@ DO UPDATE SET subpoena_id subpoena_id - Y + N created_at created_at - N + Y updated_at @@ -367,44 +390,17 @@ DO UPDATE SET Y - N + Y 960 - 208 - -
- - Table input - TableInput - - N - - 1 - - none - - - postgres.subpoena - N - SELECT -* -FROM public.restriction_document -WHERE - '${IDM_ID}' != '' -- Проверка на пустую строку - AND vk_id = '${IDM_ID}' - - Y - - - 480 - 208 + 416 - Insert / update - Change job status on error + insert_or_update_RD + change_job_status_on_error_RD Y error_description diff --git a/mappings/info_recruits/raw_data/restriction_document/recruitment_five_flow_delta.hpl b/mappings/info_recruits/raw_data/restriction_document/recruitment_five_flow_delta.hpl index 00e079c..ba6d13f 100644 --- a/mappings/info_recruits/raw_data/restriction_document/recruitment_five_flow_delta.hpl +++ b/mappings/info_recruits/raw_data/restriction_document/recruitment_five_flow_delta.hpl @@ -21,33 +21,111 @@ - Table input - restriction_document_flow_delta.hpl + max_update_date_from_restriction_document + failed_restriction_document_idm_ids_on_delta Y - Table input + max_update_date_from_restriction_document + updated_idm_ids_from_source_restriction_document + Y + + + Sort rows + restriction_document_flow_delta.hpl 1 + Y + + + Sort rows restriction_document_flow_delta.hpl 2 Y - Table input + Sort rows restriction_document_flow_delta.hpl 3 Y - Table input + Sort rows restriction_document_flow_delta.hpl 4 Y - Table input + Sort rows restriction_document_flow_delta.hpl 5 Y + + updated_idm_ids_from_source_restriction_document + Append streams + Y + + + failed_restriction_document_idm_ids_on_delta + Append streams + Y + + + Append streams + Sort rows + Y + - Table input + Append streams + Append + + Y + + 1 + + none + + + updated_idm_ids_from_source_restriction_document + failed_restriction_document_idm_ids_on_delta + + + 592 + 800 + + + + Sort rows + SortRows + + Y + + 1 + + none + + + ${java.io.tmpdir} + out + 100000 + + N + + Y + + + recruitment_id + Y + N + N + 0 + N + + + + + 768 + 800 + + + + failed_restriction_document_idm_ids_on_delta TableInput Y @@ -60,36 +138,46 @@ ervu-dashboard N 0 - WITH mud AS (SELECT recruitment_id, - MAX(execution_datetime) AS max_upd_date - FROM etl.job_execution - WHERE job_name = '${JOB_NAME}' - AND status IN ('SUCCESS', 'DELTA_SUCCESS') - GROUP BY recruitment_id) -SELECT r.idm_id -FROM ervu_dashboard.recruitment r - JOIN mud ON mud.recruitment_id = r.idm_id - JOIN recruits_info ri - ON COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = r.idm_id - AND ri.updated_at > mud.max_upd_date - -union - -select r2.idm_id -from ervu_dashboard.recruitment r2 + max_update_date_from_restriction_document + select r.idm_id as recruitment_id, + ? as max_update_date +from ervu_dashboard.recruitment r join etl.job_execution je - on r2.idm_id = je.recruitment_id + on r.idm_id = je.recruitment_id where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') and je.job_name = '${JOB_NAME}'; Y 352 - 288 + 1008 - restriction_document_flow_delta.hpl + max_update_date_from_restriction_document + TableInput + + N + + 1 + + none + + + ervu-dashboard + N + 0 + select max(updated_at) as max_update_date + from ervu_dashboard.restriction_document; + N + + + 352 + 800 + + + + restriction_document_flow_delta.hpl 1 PipelineExecutor Y @@ -109,7 +197,12 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') IDM_ID - idm_id + recruitment_id + + + + MAX_UPDATE_DATE + max_update_date Y @@ -135,8 +228,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 128 + 928 + 672 @@ -160,7 +253,12 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') IDM_ID - idm_id + recruitment_id + + + + MAX_UPDATE_DATE + max_update_date Y @@ -186,8 +284,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 208 + 1024 + 752 @@ -211,7 +309,12 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') IDM_ID - idm_id + recruitment_id + + + + MAX_UPDATE_DATE + max_update_date Y @@ -237,8 +340,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 288 + 1024 + 848 @@ -262,7 +365,12 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') IDM_ID - idm_id + recruitment_id + + + + MAX_UPDATE_DATE + max_update_date Y @@ -288,8 +396,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 368 + 1024 + 944 @@ -313,7 +421,12 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') IDM_ID - idm_id + recruitment_id + + + + MAX_UPDATE_DATE + max_update_date Y @@ -339,8 +452,36 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 448 + 976 + 1040 + + + + updated_idm_ids_from_source_restriction_document + TableInput + + Y + + 1 + + none + + + postgres.subpoena + N + 0 + max_update_date_from_restriction_document + WITH max_update_date(val) AS (SELECT CAST(? AS timestamp)) +SELECT DISTINCT ra.vk_id as recruitment_id, + mud.val as max_update_date +FROM public.restriction_document ra +JOIN max_update_date mud ON TRUE +WHERE (mud.val IS NULL OR ra.updated_at > mud.val); + Y + + + 352 + 608 diff --git a/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow.hpl b/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow.hpl index 737c73c..288fafb 100644 --- a/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow.hpl +++ b/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow.hpl @@ -21,73 +21,68 @@ - Create job execution record - Table input + create_job_execution_record_RDI + table_input_RDI Y - Filter rows - Change job status on success + filter_rows_RDI + change_job_status_on_success_RDI Y - Detect empty stream - Change job status on success + detect_empty_stream_RDI + change_job_status_on_success_RDI Y - Change job status on error - Abort + change_job_status_on_error_RDI + abort_RDI Y - Table output - Change job status on error + table_output_RDI + change_job_status_on_error_RDI Y - Update - Change job status on error + update_flags_RDI + change_job_status_on_error_RDI Y - Table input - Identify last row in a stream + group_by_RDI + update_flags_RDI Y - Identify last row in a stream - Table output + table_output_RDI + identify_last_row_in_a_stream_RDI Y - Identify last row in a stream - Detect empty stream + identify_last_row_in_a_stream_RDI + filter_rows_RDI Y - Identify last row in a stream - sort_by_recruit_id + table_input_RDI + detect_empty_stream_RDI Y - sort_by_recruit_id - Update + table_input_RDI + group_by_RDI Y - Table output - Block until transforms finish - Y - - - Block until transforms finish - Filter rows + table_input_RDI + table_output_RDI Y - Abort + abort_RDI Abort Y @@ -99,37 +94,16 @@ ABORT_WITH_ERROR Y + 0 1360 - 496 + 752 - Block until transforms finish - BlockUntilTransformsFinish - - Y - - 1 - - none - - - - - Update - - - - - 1152 - 320 - - - - Change job status on error + change_job_status_on_error_RDI ExecSql Y @@ -145,8 +119,11 @@ ervu-dashboard + Y + N + Y Y N @@ -157,14 +134,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1168 - 496 + 1008 + 752 - Change job status on success + change_job_status_on_success_RDI ExecSql Y @@ -177,8 +155,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + Y + N + Y N N @@ -188,14 +169,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1344 + 1424 176 - Create job execution record + create_job_execution_record_RDI ExecSql Y @@ -208,8 +190,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + N + N + Y N N @@ -217,14 +202,15 @@ and recruitment_id = '${IDM_ID}'; VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); + 288 - 320 + 400 - Detect empty stream + detect_empty_stream_RDI DetectEmptyStream Y @@ -241,7 +227,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - Filter rows + filter_rows_RDI FilterRows Y @@ -269,15 +255,52 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - Change job status on success + change_job_status_on_success_RDI - 1344 - 320 + 1424 + 400 - Identify last row in a stream + group_by_RDI + GroupBy + + Y + + 1 + + none + + + N + N + ${java.io.tmpdir} + + + has_active_temporary_measure + has_active_temporary_measure + MAX + + + + N + + + recruit_id + + + N + + grp + + + 704 + 576 + + + + identify_last_row_in_a_stream_RDI DetectLastRow N @@ -290,15 +313,15 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); last_row - 704 - 320 + 1200 + 400 - Table input + table_input_RDI TableInput - Y + N 1 @@ -323,16 +346,16 @@ s.id = rd.subpoena_id WHERE '${IDM_ID}' != '' -- Проверка на пустую строку AND rd.vk_id = '${IDM_ID}' - +ORDER BY s.recruit_id Y - 496 - 320 + 704 + 400 - Table output + table_output_RDI TableOutput Y @@ -410,23 +433,26 @@ WHERE N N N + Y + N ervu_dashboard Y restriction_document_item
+ N Y N Y - 928 - 320 + 1008 + 400
- Update + update_flags_RDI Update N @@ -439,11 +465,13 @@ WHERE 10000 ervu-dashboard Y + = recruit_id recruit_id + ervu_dashboard citizen
@@ -455,47 +483,15 @@ WHERE N Y - - 928 - 496 - -
- - sort_by_recruit_id - SortRows - - Y - - 1 - - none - - - N - ${java.io.tmpdir} - - - Y - N - N - 0 - recruit_id - N - - - srt - 1000000 - Y - 704 - 496 + 752 - Table output - Change job status on error + table_output_RDI + change_job_status_on_error_RDI Y error_description @@ -506,8 +502,8 @@ WHERE - Update - Change job status on error + update_flags_RDI + change_job_status_on_error_RDI Y error_description diff --git a/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_delta.hpl b/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_delta.hpl index f8cdb0f..cc8da14 100644 --- a/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_delta.hpl +++ b/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_delta.hpl @@ -21,73 +21,68 @@ - Create job execution record - Table input + create_job_execution_record_RDI + table_input_RDI Y - Detect empty stream - Change job status on success + detect_empty_stream_RDI + change_job_status_on_success_RDI Y - Filter rows - Change job status on success + filter_rows_RDI + change_job_status_on_success_RDI Y - Change job status on error - Abort + change_job_status_on_error_RDI + abort_RDI Y - Insert / update - Change job status on error + insert_or_update_RDI + change_job_status_on_error_RDI Y - Update - Change job status on error + update_flags_RDI + change_job_status_on_error_RDI Y - Table input - Identify last row in a stream + insert_or_update_RDI + identify_last_row_in_a_stream_RDI Y - Identify last row in a stream - Insert / update + identify_last_row_in_a_stream_RDI + filter_rows_RDI Y - Identify last row in a stream - Detect empty stream + table_input_RDI + insert_or_update_RDI Y - Insert / update - Block until transforms finish + table_input_RDI + detect_empty_stream_RDI Y - Block until transforms finish - Filter rows + table_input_RDI + group_by_RDI Y - Identify last row in a stream - sort_by_recruit_id - Y - - - sort_by_recruit_id - Update + group_by_RDI + update_flags_RDI Y - Abort + abort_RDI Abort Y @@ -99,37 +94,16 @@ ABORT_WITH_ERROR Y + 0 - 1408 - 368 + 1488 + 784 - Block until transforms finish - BlockUntilTransformsFinish - - Y - - 1 - - none - - - - - Update - - - - - 1264 - 224 - - - - Change job status on error + change_job_status_on_error_RDI ExecSql Y @@ -145,8 +119,11 @@ ervu-dashboard + Y + N + Y Y N @@ -157,14 +134,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1232 - 368 + 1152 + 784 - Change job status on success + change_job_status_on_success_RDI ExecSql Y @@ -177,8 +155,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + Y + N + Y N N @@ -188,14 +169,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1472 - 80 + 1552 + 304 - Create job execution record + create_job_execution_record_RDI ExecSql Y @@ -208,8 +190,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + N + N + Y N N @@ -220,14 +205,15 @@ SET error_description = NULL where job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 336 - 224 + 320 + 576 - Detect empty stream + detect_empty_stream_RDI DetectEmptyStream Y @@ -239,12 +225,12 @@ and recruitment_id = '${IDM_ID}'; - 768 - 80 + 752 + 304 - Filter rows + filter_rows_RDI FilterRows Y @@ -272,15 +258,52 @@ and recruitment_id = '${IDM_ID}'; - Change job status on success + change_job_status_on_success_RDI - 1472 - 224 + 1552 + 576 - Identify last row in a stream + group_by_RDI + GroupBy + + Y + + 1 + + none + + + N + N + ${java.io.tmpdir} + + + has_active_temporary_measure + has_active_temporary_measure + MAX + + + + N + + + recruit_id + + + N + + grp + + + 752 + 784 + + + + identify_last_row_in_a_stream_RDI DetectLastRow N @@ -293,12 +316,12 @@ and recruitment_id = '${IDM_ID}'; last_row - 768 - 224 + 1344 + 576 - Insert / update + insert_or_update_RDI InsertUpdate Y @@ -315,11 +338,13 @@ and recruitment_id = '${IDM_ID}'; = id id + = created_at created_at + ervu_dashboard restriction_document_item
@@ -402,23 +427,24 @@ and recruitment_id = '${IDM_ID}'; N - 992 - 224 + 1152 + 576
- Table input + table_input_RDI TableInput - Y + N 1 none - ervu_person_registry + postgres.subpoena N + select s.recruit_id, CASE @@ -434,16 +460,17 @@ s.id = rd.subpoena_id WHERE '${IDM_ID}' != '' -- Проверка на пустую строку AND rd.vk_id = '${IDM_ID}' - AND rdi.updated_at >= '${M_RDI_UP_DATE}'::timestamp + AND rdi.updated_at >= '${MAX_UPDATE_DATE}'::timestamp +ORDER BY s.recruit_id Y - 544 - 224 + 752 + 576 - Update + update_flags_RDI Update N @@ -456,16 +483,19 @@ WHERE 10000 ervu-dashboard Y + = recruit_id recruit_id + <> has_active_temporary_measure has_active_temporary_measure + ervu_dashboard citizen
@@ -478,46 +508,14 @@ WHERE Y - 992 - 368 - -
- - sort_by_recruit_id - SortRows - - Y - - 1 - - none - - - N - ${java.io.tmpdir} - - - Y - N - N - 0 - recruit_id - N - - - srt - 1000000 - Y - - - 768 - 368 + 960 + 784 - Insert / update - Change job status on error + insert_or_update_RDI + change_job_status_on_error_RDI Y error_description @@ -528,8 +526,8 @@ WHERE - Update - Change job status on error + update_flags_RDI + change_job_status_on_error_RDI Y error_description diff --git a/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_repeat.hpl b/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_repeat.hpl index efddae3..f1b8766 100644 --- a/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_repeat.hpl +++ b/mappings/info_recruits/raw_data/restriction_document_item/parallel/restriction_document_item_flow_repeat.hpl @@ -21,73 +21,68 @@ - Create job execution record - Table input + create_job_execution_record_RDI + table_input_RDI Y - Filter rows - Change job status on success + filter_rows_RDI + change_job_status_on_success_RDI Y - Detect empty stream - Change job status on success + detect_empty_stream_RDI + change_job_status_on_success_RDI Y - Change job status on error - Abort + change_job_status_on_error_RDI + abort_RDI Y - Insert / update - Change job status on error + insert_or_update_RDI + change_job_status_on_error_RDI Y - Update - Change job status on error + update_flags_RDI + change_job_status_on_error_RDI Y - Table input - Identify last row in a stream + table_input_RDI + group_by_RDI Y - Identify last row in a stream - Insert / update + group_by_RDI + update_flags_RDI Y - Insert / update - Block until transforms finish + identify_last_row_in_a_stream_RDI + filter_rows_RDI Y - Identify last row in a stream - sort_by_recruit_id + detect_empty_stream_RDI + table_input_RDI Y - sort_by_recruit_id - Update + table_input_RDI + insert_or_update_RDI Y - Identify last row in a stream - Detect empty stream - Y - - - Block until transforms finish - Filter rows + insert_or_update_RDI + identify_last_row_in_a_stream_RDI Y - Abort + abort_RDI Abort Y @@ -99,37 +94,16 @@ ABORT_WITH_ERROR Y + 0 - 1312 - 368 + 1440 + 448 - Block until transforms finish - BlockUntilTransformsFinish - - Y - - 1 - - none - - - - - Update - - - - - 1184 - 208 - - - - Change job status on error + change_job_status_on_error_RDI ExecSql Y @@ -145,8 +119,11 @@ ervu-dashboard + Y + N + Y Y N @@ -157,14 +134,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1136 - 368 + 1024 + 448 - Change job status on success + change_job_status_on_success_RDI ExecSql Y @@ -177,8 +155,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + Y + N + Y N N @@ -188,6 +169,7 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + 1440 @@ -195,7 +177,7 @@ and recruitment_id = '${IDM_ID}'; - Create job execution record + create_job_execution_record_RDI ExecSql Y @@ -208,8 +190,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + N + N + Y N N @@ -235,17 +220,18 @@ DO UPDATE SET execution_datetime = DEFAULT, error_description = NULL + 272 - 208 + 256 - Detect empty stream + detect_empty_stream_RDI DetectEmptyStream - Y + N 1 @@ -254,12 +240,12 @@ DO UPDATE SET - 672 + 656 64 - Filter rows + filter_rows_RDI FilterRows Y @@ -287,15 +273,52 @@ DO UPDATE SET - Change job status on success + change_job_status_on_success_RDI 1440 - 208 + 256 - Identify last row in a stream + group_by_RDI + GroupBy + + Y + + 1 + + none + + + N + N + ${java.io.tmpdir} + + + has_active_temporary_measure + has_active_temporary_measure + MAX + + + + N + + + recruit_id + + + N + + grp + + + 656 + 448 + + + + identify_last_row_in_a_stream_RDI DetectLastRow N @@ -308,12 +331,12 @@ DO UPDATE SET last_row - 672 - 208 + 1264 + 256 - Insert / update + insert_or_update_RDI InsertUpdate Y @@ -330,11 +353,13 @@ DO UPDATE SET = id id + = created_at created_at + ervu_dashboard restriction_document_item
@@ -417,15 +442,15 @@ DO UPDATE SET N - 912 - 208 + 1024 + 256
- Table input + table_input_RDI TableInput - Y + N 1 @@ -450,16 +475,17 @@ s.id = rd.subpoena_id WHERE '${IDM_ID}' != '' -- Проверка на пустую строку AND rd.vk_id = '${IDM_ID}' +ORDER BY s.recruit_id Y - 480 - 208 + 656 + 256 - Update + update_flags_RDI Update N @@ -472,16 +498,19 @@ WHERE 10000 ervu-dashboard Y + = recruit_id recruit_id + <> has_active_temporary_measure has_active_temporary_measure + ervu_dashboard citizen
@@ -494,46 +523,14 @@ WHERE Y - 912 - 368 - -
- - sort_by_recruit_id - SortRows - - Y - - 1 - - none - - - N - ${java.io.tmpdir} - - - Y - N - N - 0 - recruit_id - N - - - srt - 1000000 - Y - - - 672 - 368 + 832 + 448 - Insert / update - Change job status on error + insert_or_update_RDI + change_job_status_on_error_RDI Y error_description @@ -544,8 +541,8 @@ WHERE - Update - Change job status on error + update_flags_RDI + change_job_status_on_error_RDI Y error_description diff --git a/mappings/info_recruits/raw_data/restriction_document_item/recruitment_five_flow_delta.hpl b/mappings/info_recruits/raw_data/restriction_document_item/recruitment_five_flow_delta.hpl index 249b05c..002c79e 100644 --- a/mappings/info_recruits/raw_data/restriction_document_item/recruitment_five_flow_delta.hpl +++ b/mappings/info_recruits/raw_data/restriction_document_item/recruitment_five_flow_delta.hpl @@ -21,33 +21,111 @@ - Table input + Append streams + Sort rows + Y + + + failed_restriction_document_item_idm_ids_on_delta + Append streams + Y + + + max_update_date_from_restriction_document_item + failed_restriction_document_item_idm_ids_on_delta + Y + + + max_update_date_from_restriction_document_item + updated_idm_ids_from_source_restriction_document_item + Y + + + updated_idm_ids_from_source_restriction_document_item + Append streams + Y + + + Sort rows restriction_document_item_flow_delta.hpl Y - Table input + Sort rows restriction_document_item_flow_delta.hpl 2 Y - Table input + Sort rows restriction_document_item_flow_delta.hpl 3 Y - Table input + Sort rows restriction_document_item_flow_delta.hpl 4 Y - Table input + Sort rows restriction_document_item_flow_delta.hpl 5 Y - Table input + Append streams + Append + + Y + + 1 + + none + + + updated_idm_ids_from_source_restriction_document_item + failed_restriction_document_item_idm_ids_on_delta + + + 272 + 592 + + + + Sort rows + SortRows + + Y + + 1 + + none + + + ${java.io.tmpdir} + out + 100000 + + N + + Y + + + recruitment_id + Y + N + N + 0 + N + + + + + 448 + 592 + + + + failed_restriction_document_item_idm_ids_on_delta TableInput Y @@ -60,32 +138,42 @@ ervu-dashboard N 0 - WITH mud AS (SELECT recruitment_id, - MAX(execution_datetime) AS max_upd_date - FROM etl.job_execution - WHERE job_name = '${JOB_NAME}' - AND status IN ('SUCCESS', 'DELTA_SUCCESS') - GROUP BY recruitment_id) -SELECT r.idm_id -FROM ervu_dashboard.recruitment r - JOIN mud ON mud.recruitment_id = r.idm_id - JOIN recruits_info ri - ON COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = r.idm_id - AND ri.updated_at > mud.max_upd_date - -union - -select r2.idm_id -from ervu_dashboard.recruitment r2 + max_update_date_from_restriction_document_item + select r.idm_id as recruitment_id, + ? as max_update_date +from ervu_dashboard.recruitment r join etl.job_execution je - on r2.idm_id = je.recruitment_id + on r.idm_id = je.recruitment_id where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') and je.job_name = '${JOB_NAME}'; Y - 352 - 288 + 0 + 784 + + + + max_update_date_from_restriction_document_item + TableInput + + N + + 1 + + none + + + ervu-dashboard + N + 0 + select max(updated_at) as max_update_date + from ervu_dashboard.restriction_document_item; + N + + + 0 + 576 @@ -112,6 +200,11 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') recruitment_id + + MAX_UPDATE_DATE + max_update_date + + Y @@ -135,8 +228,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 128 + 688 + 432 @@ -163,6 +256,11 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') recruitment_id + + MAX_UPDATE_DATE + max_update_date + + Y @@ -186,8 +284,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 208 + 688 + 512 @@ -214,6 +312,11 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') recruitment_id + + MAX_UPDATE_DATE + max_update_date + + Y @@ -237,8 +340,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 288 + 688 + 592 @@ -265,6 +368,11 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') recruitment_id + + MAX_UPDATE_DATE + max_update_date + + Y @@ -288,8 +396,8 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 368 + 688 + 672 @@ -316,6 +424,11 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') recruitment_id + + MAX_UPDATE_DATE + max_update_date + + Y @@ -339,8 +452,40 @@ where je.status in ('DELTA_ERROR', 'DELTA_PROCESSING') - 656 - 448 + 688 + 752 + + + + updated_idm_ids_from_source_restriction_document_item + TableInput + + Y + + 1 + + none + + + postgres.subpoena + N + 0 + max_update_date_from_restriction_document_item + WITH max_update_date(val) AS (SELECT CAST(? AS timestamp)) +SELECT DISTINCT rd.vk_id as recruitment_id, + mud.val as max_update_date +FROM public.restriction_document_item rdi + join public.restriction_document rd on + rd.id = coalesce(rdi.restriction_document_create_id,rdi.restriction_document_cancel_id) + join public.subpoena s on + s.id = rd.subpoena_id + JOIN max_update_date mud ON TRUE +WHERE (mud.val IS NULL OR rdi.updated_at > mud.val); + Y + + + 0 + 384