From 3a41351297958f9ace4402918b95ac60a691fa7f Mon Sep 17 00:00:00 2001 From: "adel.ka" Date: Tue, 11 Nov 2025 14:06:22 +0300 Subject: [PATCH] snils inn fix --- .../citizen_tables/inn/parallel/inn_flow.hpl | 288 ++++++------- .../inn/parallel/inn_flow_delta.hpl | 384 +++++++++--------- .../inn/parallel/inn_flow_repeat.hpl | 294 +++++++------- .../snils/parallel/snils_flow.hpl | 302 +++++++------- .../snils/parallel/snils_flow_delta.hpl | 144 +++---- .../snils/parallel/snils_flow_repeat.hpl | 287 ++++++------- .../snils/recruitment_five_flow_on_error.hpl | 2 +- .../snils/recruitments_five_flow.hpl | 2 +- 8 files changed, 865 insertions(+), 838 deletions(-) diff --git a/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow.hpl b/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow.hpl index 93060a2..2796be3 100644 --- a/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow.hpl +++ b/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow.hpl @@ -21,50 +21,50 @@ - Create job execution record - Table input + inn_create_job + inn_input Y Filter rows - Change job status on success + inn_success_job Y Detect empty stream - Change job status on success + inn_success_job Y - Change job status on error + inn_failure_job Abort Y - Table input + inn_output + inn_failure_job + Y + + + inn_input + inn_output + Y + + + inn_output Identify last row in a stream Y Identify last row in a stream - Table output inn - Y - - - Table output inn Filter rows Y - Table output inn + inn_input Detect empty stream Y - - Table output inn - Change job status on error - Y - Abort @@ -83,104 +83,7 @@ 1376 - 512 - - - - Change job status on error - ExecSql - - Y - - 1 - - none - - - - - error_description - - - ervu-dashboard - Y - N - Y - Y - N - UPDATE etl.job_execution -SET status = 'ERROR', - error_description = ?, - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1024 - 512 - - - - Change job status on success - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - Y - N - Y - N - N - UPDATE etl.job_execution -SET status = 'SUCCESS', - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1632 - 160 - - - - Create job execution record - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - N - N - Y - N - N - INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) -VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - - - - - 288 - 320 + 752 @@ -196,8 +99,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - 1632 - 320 + 640 + 352 @@ -229,11 +132,11 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - Change job status on success + inn_success_job 1024 - 160 + 128 @@ -250,12 +153,77 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); last_row - 768 - 320 + 1024 + 352 - Table input + inn_create_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + N + N + Y + N + N + INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) +VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); + + + + + 288 + 560 + + + + inn_failure_job + ExecSql + + Y + + 1 + + none + + + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'ERROR', + error_description = ?, + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1024 + 752 + + + + inn_input TableInput N @@ -267,44 +235,48 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); ervu-dashboard N - WITH base as ( - SELECT - ri.recruit_id, - ri.info - FROM ervu_dashboard.recruits_info ri - join ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) + SELECT + ri.recruit_id, + ri.info, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g') as cleaned_akt_inn, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g') as cleaned_pred_inn + FROM ervu_dashboard.recruits_info ri + JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id + WHERE + '${IDM_ID}' != '' + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' ) + SELECT - recruit_id, + recruit_id, info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'id' AS source_id, to_date(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g') AS inn, + cleaned_akt_inn AS inn, true as actual FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g')) = 12 +WHERE LENGTH(cleaned_akt_inn) = 12 UNION ALL SELECT - recruit_id, + recruit_id, info->'svedFL'->'svedBS'->'inn'->'predINN'->>'id' AS source_id, to_date(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g') AS inn, + cleaned_pred_inn AS inn, false as actual FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g')) = 12 +WHERE LENGTH(cleaned_pred_inn) = 12 Y - 496 - 320 + 640 + 560 - Table output inn + inn_output TableOutput N @@ -314,7 +286,7 @@ WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', none - 1000 + 10000 ervu-dashboard @@ -354,13 +326,45 @@ WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', 1024 - 320 + 560 + + + + inn_success_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'SUCCESS', + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 640 + 128 - Table output inn - Change job status on error + inn_output + inn_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_delta.hpl b/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_delta.hpl index 27813c5..f719002 100644 --- a/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_delta.hpl +++ b/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_delta.hpl @@ -21,48 +21,48 @@ - Create job execution record - Table input - Y - - - Table input - Identify last row in a stream + inn_create_job + inn_input Y Detect empty stream - Change job status on success + inn_success_job Y Filter rows - Change job status on success + inn_success_job Y - Change job status on error + inn_failure_job Abort Y - Identify last row in a stream - Insert / update inn + inn_insert_or_update + inn_failure_job Y - Insert / update inn - Filter rows - Y - - - Insert / update inn + inn_input Detect empty stream Y - Insert / update inn - Change job status on error + inn_insert_or_update + Identify last row in a stream + Y + + + Identify last row in a stream + Filter rows + Y + + + inn_input + inn_insert_or_update Y @@ -83,105 +83,7 @@ 1328 - 400 - - - - Change job status on error - ExecSql - - Y - - 1 - - none - - - - - error_description - - - ervu-dashboard - Y - N - Y - Y - N - UPDATE etl.job_execution -SET status = 'DELTA_ERROR', - error_description = ? -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1104 - 400 - - - - Change job status on success - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - Y - N - Y - N - N - UPDATE etl.job_execution -SET status = 'DELTA_SUCCESS' -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1408 - 64 - - - - Create job execution record - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - N - N - Y - N - N - UPDATE etl.job_execution -SET - status = 'DELTA_PROCESSING', - execution_datetime = DEFAULT, - error_description = NULL -where job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - 336 - 224 + 544 @@ -197,8 +99,8 @@ and recruitment_id = '${IDM_ID}'; - 1408 - 224 + 544 + 176 @@ -230,11 +132,11 @@ and recruitment_id = '${IDM_ID}'; - Change job status on success + inn_success_job 1104 - 64 + 0 @@ -251,12 +153,152 @@ and recruitment_id = '${IDM_ID}'; last_row - 816 - 224 + 1104 + 176 - Insert / update inn + inn_create_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + N + N + Y + N + N + UPDATE etl.job_execution +SET + status = 'DELTA_PROCESSING', + execution_datetime = DEFAULT, + error_description = NULL +where job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + 336 + 368 + + + + inn_failure_job + ExecSql + + Y + + 1 + + none + + + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'DELTA_ERROR', + error_description = ? +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1104 + 544 + + + + inn_input + TableInput + + N + + 1 + + none + + + ervu-dashboard + N + + WITH base AS ( + SELECT + ri.recruit_id, + ri.info, + ri.info->'svedFL'->'svedBS'->'inn'->'aktINN' AS akt_inn, + ri.info->'svedFL'->'svedBS'->'inn'->'predINN' AS pred_inn, + ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'dataSved' AS akt_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g') AS akt_inn_cleaned, + ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'dataSved' AS pred_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g') AS pred_inn_cleaned + FROM ervu_dashboard.recruits_info ri + JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id + WHERE + '${IDM_ID}' != '' + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' +), +inn_data AS ( + SELECT + recruit_id, + akt_inn->>'id' AS source_id, + to_date(akt_data_sved, 'YYYY-MM-DD') AS source_update_date, + akt_inn_cleaned AS inn, + true AS actual, + 'aktINN' AS inn_type + FROM base + WHERE + akt_data_sved IS NOT NULL + AND to_date(akt_data_sved, 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date + AND LENGTH(akt_inn_cleaned) = 12 + + UNION ALL + + SELECT + recruit_id, + pred_inn->>'id' AS source_id, + to_date(pred_data_sved, 'YYYY-MM-DD') AS source_update_date, + pred_inn_cleaned AS inn, + false AS actual, + 'predINN' AS inn_type + FROM base + WHERE + pred_data_sved IS NOT NULL + AND to_date(pred_data_sved, 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date + AND LENGTH(pred_inn_cleaned) = 12 +) +SELECT + recruit_id, + source_id, + source_update_date, + inn, + actual +FROM inn_data; + Y + + + 544 + 368 + + + + inn_insert_or_update InsertUpdate N @@ -306,94 +348,44 @@ and recruitment_id = '${IDM_ID}'; 1104 - 224 + 368 - Table input - TableInput + inn_success_job + ExecSql - N + Y 1 none + + ervu-dashboard - N - WITH base AS ( - SELECT - ri.recruit_id, - ri.info, - ri.info->'svedFL'->'svedBS'->'inn'->'aktINN' AS akt_inn, - ri.info->'svedFL'->'svedBS'->'inn'->'predINN' AS pred_inn - FROM ervu_dashboard.recruits_info ri - JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id - WHERE - '${IDM_ID}' != '' - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) - AND ( - ( - (ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'dataSved') IS NOT NULL - AND to_date(ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g')) = 12 - ) - OR - ( - (ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'dataSved') IS NOT NULL - AND to_date(ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g')) = 12 - ) - ) -), -inn_data AS ( - SELECT - recruit_id, - akt_inn->>'id' AS source_id, - to_date(akt_inn->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(akt_inn->>'inn', '[^0-9]', '', 'g') AS inn, - true AS actual, - 'aktINN' AS inn_type - FROM base - WHERE - (akt_inn->>'dataSved') IS NOT NULL - AND to_date(akt_inn->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(akt_inn->>'inn', '[^0-9]', '', 'g')) = 12 - - UNION ALL - - SELECT - recruit_id, - pred_inn->>'id' AS source_id, - to_date(pred_inn->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(pred_inn->>'inn', '[^0-9]', '', 'g') AS inn, - false AS actual, - 'predINN' AS inn_type - FROM base - WHERE - (pred_inn->>'dataSved') IS NOT NULL - AND to_date(pred_inn->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(pred_inn->>'inn', '[^0-9]', '', 'g')) = 12 -) -SELECT - recruit_id, - source_id, - source_update_date, - inn, - actual -FROM inn_data; - Y + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'DELTA_SUCCESS' +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + 544 - 224 + 0 - Insert / update inn - Change job status on error + inn_insert_or_update + inn_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_repeat.hpl b/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_repeat.hpl index a8cda04..c708905 100644 --- a/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_repeat.hpl +++ b/mappings/info_recruits/citizen_tables/inn/parallel/inn_flow_repeat.hpl @@ -21,48 +21,48 @@ - Create job execution record - Table input - Y - - - Table input - Identify last row in a stream + inn_create_job + inn_input Y Filter rows - Change job status on success + inn_success_job Y Detect empty stream - Change job status on success + inn_success_job Y - Change job status on error + inn_failure_job Abort Y - Identify last row in a stream - Insert / update inn + inn_insert_or_update + inn_failure_job Y - Insert / update inn - Filter rows - Y - - - Insert / update inn + inn_input Detect empty stream Y - Insert / update inn - Change job status on error + inn_input + inn_insert_or_update + Y + + + inn_insert_or_update + Identify last row in a stream + Y + + + Identify last row in a stream + Filter rows Y @@ -83,12 +83,12 @@ 1296 - 400 + 512 - Change job status on error - ExecSql + Detect empty stream + DetectEmptyStream Y @@ -97,33 +97,51 @@ none - - - error_description - - - ervu-dashboard - Y - N - Y - Y - N - UPDATE etl.job_execution -SET status = 'ERROR', - error_description = ? -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - + + + 480 + 160 + + + + Filter rows + FilterRows + + Y + + 1 + + none + + + + + + + = + last_row + N + - + + N + -1 + constant + -1 + Y + Boolean + + + + inn_success_job 1008 - 400 + 16 - Change job status on success - ExecSql + Identify last row in a stream + DetectLastRow Y @@ -132,28 +150,15 @@ and recruitment_id = '${IDM_ID}'; none - - - ervu-dashboard - Y - N - Y - N - N - UPDATE etl.job_execution -SET status = 'SUCCESS' -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - + last_row - 1440 - 48 + 1008 + 160 - Create job execution record + inn_create_job ExecSql Y @@ -196,12 +201,12 @@ DO UPDATE SET 272 - 208 + 320 - Detect empty stream - DetectEmptyStream + inn_failure_job + ExecSql Y @@ -210,68 +215,85 @@ DO UPDATE SET none - - - 1440 - 208 - - - - Filter rows - FilterRows - - Y - - 1 - - none - - - - - - - = - last_row - N - - - - N - -1 - constant - -1 - Y - Boolean - - - - Change job status on success + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'ERROR', + error_description = ? +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + 1008 - 48 + 512 - Identify last row in a stream - DetectLastRow + inn_input + TableInput - Y + N 1 none - last_row + ervu-dashboard + N + WITH +base as ( + SELECT + ri.recruit_id, + ri.info, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g') as cleaned_akt_inn, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g') as cleaned_pred_inn + FROM ervu_dashboard.recruits_info ri + JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id + WHERE + '${IDM_ID}' != '' + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' +) + +SELECT + recruit_id, + info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'id' AS source_id, + to_date(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, + cleaned_akt_inn AS inn, + true as actual +FROM base +WHERE LENGTH(cleaned_akt_inn) = 12 + +UNION ALL + +SELECT + recruit_id, + info->'svedFL'->'svedBS'->'inn'->'predINN'->>'id' AS source_id, + to_date(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, + cleaned_pred_inn AS inn, + false as actual +FROM base +WHERE LENGTH(cleaned_pred_inn) = 12 + Y - 672 - 208 + 480 + 320 - Insert / update inn + inn_insert_or_update InsertUpdate N @@ -288,7 +310,6 @@ DO UPDATE SET = source_id source_id - ervu_dashboard inn
@@ -322,61 +343,44 @@ DO UPDATE SET 1008 - 208 + 320
- Table input - TableInput + inn_success_job + ExecSql - N + Y 1 none + + ervu-dashboard - N - WITH -base as ( - SELECT - ri.recruit_id, - ri.info - FROM ervu_dashboard.recruits_info ri - join ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) -) -SELECT - recruit_id, - info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'id' AS source_id, - to_date(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g') AS inn, - true as actual -FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'aktINN'->>'inn', '[^0-9]', '', 'g')) = 12 + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'SUCCESS' +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; -UNION ALL - -SELECT - recruit_id, - info->'svedFL'->'svedBS'->'inn'->'predINN'->>'id' AS source_id, - to_date(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g') AS inn, - false as actual -FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'inn'->'predINN'->>'inn', '[^0-9]', '', 'g')) = 12 - Y + 480 - 208 + 16 - Insert / update inn - Change job status on error + inn_insert_or_update + inn_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow.hpl b/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow.hpl index d0d7926..dade5bb 100644 --- a/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow.hpl +++ b/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow.hpl @@ -21,48 +21,48 @@ - Create job execution record - Table input + snils_create_job + snils_input Y Filter rows - Change job status on success + snils_success_job Y Detect empty stream - Change job status on success + snils_success_job Y - Change job status on error + snils_failure_job Abort Y - Table input - Identify last row in a stream + snils_output + snils_failure_job Y - Identify last row in a stream - Table output - Y - - - Table output - Filter rows - Y - - - Table output + snils_input Detect empty stream Y - Table output - Change job status on error + snils_output + Identify last row in a stream + Y + + + Identify last row in a stream + Filter rows + Y + + + snils_input + snils_output Y @@ -83,104 +83,7 @@ 1376 - 512 - - - - Change job status on error - ExecSql - - Y - - 1 - - none - - - - - error_description - - - ervu-dashboard - Y - N - Y - Y - N - UPDATE etl.job_execution -SET status = 'ERROR', - error_description = ?, - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1024 - 512 - - - - Change job status on success - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - Y - N - Y - N - N - UPDATE etl.job_execution -SET status = 'SUCCESS', - execution_end_datetime = current_timestamp -WHERE job_name = '${JOB_NAME}' -and recruitment_id = '${IDM_ID}'; - - - - - 1632 - 160 - - - - Create job execution record - ExecSql - - Y - - 1 - - none - - - - - ervu-dashboard - N - N - Y - N - N - INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) -VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - - - - - 288 - 320 + 640 @@ -196,8 +99,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - 1632 - 320 + 496 + 240 @@ -229,15 +132,15 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); - Change job status on success + snils_success_job 1024 - 160 + 64 - Identify last row in a stream + Identify last row in a stream DetectLastRow Y @@ -250,12 +153,77 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); last_row - 752 - 320 + 1024 + 240 - Table input + snils_create_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + N + N + Y + N + N + INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id) +VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); + + + + + 288 + 448 + + + + snils_failure_job + ExecSql + + Y + + 1 + + none + + + + + error_description + + + ervu-dashboard + Y + N + Y + Y + N + UPDATE etl.job_execution +SET status = 'ERROR', + error_description = ?, + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 1024 + 640 + + + + snils_input TableInput N @@ -267,44 +235,52 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}'); ervu-dashboard N + WITH base as ( - SELECT - ri.recruit_id, - ri.info - FROM ervu_dashboard.recruits_info ri - join ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) - AND '${M_R_CR_DATE}'::timestamp >= ri.created_at + SELECT + ri.recruit_id, + ri.info, + ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'id' AS akt_source_id, + ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved' AS akt_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g') AS akt_snils_cleaned, + ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'id' AS pred_source_id, + ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved' AS pred_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g') AS pred_snils_cleaned + FROM ervu_dashboard.recruits_info ri + JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id + WHERE + '${IDM_ID}' != '' + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' ) SELECT recruit_id, - info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'id' AS source_id, - to_date(info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g') AS snils, + akt_source_id AS source_id, + to_date(akt_data_sved, 'YYYY-MM-DD') AS source_update_date, + akt_snils_cleaned AS snils, true AS actual FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g')) = 11 +WHERE LENGTH(akt_snils_cleaned) = 11 UNION ALL SELECT recruit_id, - info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'id' AS source_id, - to_date(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g') AS snils, + pred_source_id AS source_id, + to_date(pred_data_sved, 'YYYY-MM-DD') AS source_update_date, + pred_snils_cleaned AS snils, false AS actual FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g')) = 11 +WHERE LENGTH(pred_snils_cleaned) = 11 Y 496 - 320 + 448 - Table output + snils_output TableOutput N @@ -314,7 +290,7 @@ WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'sn none - 1000 + 10000 ervu-dashboard @@ -354,13 +330,45 @@ WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'sn 1024 - 320 + 448 + + + + snils_success_job + ExecSql + + Y + + 1 + + none + + + + + ervu-dashboard + Y + N + Y + N + N + UPDATE etl.job_execution +SET status = 'SUCCESS', + execution_end_datetime = current_timestamp +WHERE job_name = '${JOB_NAME}' +and recruitment_id = '${IDM_ID}'; + + + + + 496 + 64 - Table output - Change job status on error + snils_output + snils_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_delta.hpl b/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_delta.hpl index 2db0854..9113b18 100644 --- a/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_delta.hpl +++ b/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_delta.hpl @@ -21,50 +21,50 @@ - Create job execution record - Table input - Y - - - Table input - Identify last row in a stream + snils_create_job + snils_input Y Detect empty stream - Change job status on success + snils_success_job Y Filter rows - Change job status on success + snils_success_job Y - Change job status on error + snils_failure_job Abort Y - Insert / update snils + snils_insert_or_update + snils_failure_job + Y + + + snils_insert_or_update + Identify last row in a stream 2 + Y + + + Identify last row in a stream 2 Filter rows Y - Insert / update snils + snils_input + snils_insert_or_update + Y + + + snils_input Detect empty stream Y - - Insert / update snils - Change job status on error - Y - - - Identify last row in a stream - Insert / update snils - Y - Abort @@ -83,11 +83,11 @@ 1328 - 400 + 608 - Change job status on error + snils_failure_job ExecSql Y @@ -103,8 +103,11 @@ ervu-dashboard + Y + N + Y Y N @@ -116,14 +119,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + 1104 - 400 + 608 - Change job status on success + snils_success_job ExecSql Y @@ -136,8 +140,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + Y + N + Y N N @@ -148,14 +155,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1408 + 544 64 - Create job execution record + snils_create_job ExecSql Y @@ -168,8 +176,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + N + N + Y N N @@ -180,10 +191,11 @@ SET error_description = NULL where job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + 336 - 224 + 432 @@ -199,8 +211,8 @@ and recruitment_id = '${IDM_ID}'; - 1408 - 224 + 544 + 288 @@ -232,7 +244,7 @@ and recruitment_id = '${IDM_ID}'; - Change job status on success + snils_success_job 1104 @@ -240,7 +252,7 @@ and recruitment_id = '${IDM_ID}'; - Identify last row in a stream + Identify last row in a stream 2 DetectLastRow Y @@ -253,12 +265,12 @@ and recruitment_id = '${IDM_ID}'; last_row - 816 - 224 + 1104 + 240 - Insert / update snils + snils_insert_or_update InsertUpdate N @@ -275,6 +287,7 @@ and recruitment_id = '${IDM_ID}'; = source_id source_id + ervu_dashboard snils
@@ -308,11 +321,11 @@ and recruitment_id = '${IDM_ID}'; 1104 - 224 + 432
- Table input + snils_input TableInput N @@ -324,60 +337,51 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard N + WITH base AS ( SELECT ri.recruit_id, ri.info, ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS' AS akt_snils, - ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS' AS pred_snils + ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS' AS pred_snils, + ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved' AS akt_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g') AS akt_snils_cleaned, + ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved' AS pred_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g') AS pred_snils_cleaned FROM ervu_dashboard.recruits_info ri JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id - WHERE + WHERE '${IDM_ID}' != '' - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) - AND - ( - ( - (ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved') IS NOT NULL - AND to_date(ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g')) = 11 - ) - OR - ( - (ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved') IS NOT NULL - AND to_date(ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g')) = 11 - ) - ) + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' ), snils_data AS ( SELECT recruit_id, akt_snils->>'id' AS source_id, - to_date(akt_snils->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(akt_snils->>'snils', '[^0-9]', '', 'g') AS snils, + to_date(akt_data_sved, 'YYYY-MM-DD') AS source_update_date, + akt_snils_cleaned AS snils, true AS actual, - 'aktsnils' AS snils_type + 'aktSNILS' AS snils_type FROM base WHERE - (akt_snils->>'dataSved') IS NOT NULL - AND to_date(akt_snils->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(akt_snils->>'snils', '[^0-9]', '', 'g')) = 11 + akt_data_sved IS NOT NULL + AND to_date(akt_data_sved, 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date + AND LENGTH(akt_snils_cleaned) = 11 UNION ALL SELECT recruit_id, pred_snils->>'id' AS source_id, - to_date(pred_snils->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(pred_snils->>'snils', '[^0-9]', '', 'g') AS snils, + to_date(pred_data_sved, 'YYYY-MM-DD') AS source_update_date, + pred_snils_cleaned AS snils, false AS actual, - 'predsnils' AS snils_type + 'predSNILS' AS snils_type FROM base WHERE - (pred_snils->>'dataSved') IS NOT NULL - AND to_date(pred_snils->>'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date - AND LENGTH(REGEXP_REPLACE(pred_snils->>'snils', '[^0-9]', '', 'g')) = 11 + pred_data_sved IS NOT NULL + AND to_date(pred_data_sved, 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'::date + AND LENGTH(pred_snils_cleaned) = 11 ) SELECT recruit_id, @@ -390,13 +394,13 @@ FROM snils_data; 544 - 224 + 432 - Insert / update snils - Change job status on error + snils_insert_or_update + snils_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_repeat.hpl b/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_repeat.hpl index 464f27e..4432488 100644 --- a/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_repeat.hpl +++ b/mappings/info_recruits/citizen_tables/snils/parallel/snils_flow_repeat.hpl @@ -21,48 +21,48 @@ - Create job execution record - Table input - Y - - - Table input - Identify last row in a stream + snils_create_job + snils_input Y Filter rows - Change job status on success + snils_success_job Y Detect empty stream - Change job status on success + snils_success_job Y - Change job status on error + snils_failure_job Abort Y - Insert / update snils - Filter rows + snils_insert_or_update + snils_failure_job Y - Insert / update snils + snils_input + snils_insert_or_update + Y + + + snils_input Detect empty stream Y - Insert / update snils - Change job status on error + snils_insert_or_update + Identify last row in a stream Y - Identify last row in a stream - Insert / update snils + Identify last row in a stream + Filter rows Y @@ -83,11 +83,11 @@ 1296 - 400 + 592 - Change job status on error + snils_failure_job ExecSql Y @@ -103,8 +103,11 @@ ervu-dashboard + Y + N + Y Y N @@ -116,14 +119,15 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + 1008 - 400 + 592 - Change job status on success + snils_success_job ExecSql Y @@ -136,8 +140,11 @@ and recruitment_id = '${IDM_ID}'; ervu-dashboard + Y + N + Y N N @@ -148,14 +155,86 @@ WHERE job_name = '${JOB_NAME}' and recruitment_id = '${IDM_ID}'; + - 1440 + 480 48 - Create job execution record + Detect empty stream + DetectEmptyStream + + Y + + 1 + + none + + + + + 480 + 208 + + + + Filter rows + FilterRows + + Y + + 1 + + none + + + + + + + = + last_row + N + - + + N + -1 + constant + -1 + Y + Boolean + + + + snils_success_job + + + 1008 + 48 + + + + Identify last row in a stream + DetectLastRow + + Y + + 1 + + none + + + last_row + + + 1008 + 224 + + + + snils_create_job ExecSql Y @@ -198,82 +277,67 @@ DO UPDATE SET 272 - 208 + 400 - Detect empty stream - DetectEmptyStream + snils_input + TableInput - Y + N 1 none + ervu-dashboard + N + WITH +base as ( + SELECT + ri.recruit_id, + ri.info, + ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'id' AS akt_source_id, + ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved' AS akt_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g') AS akt_snils_cleaned, + ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'id' AS pred_source_id, + ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved' AS pred_data_sved, + REGEXP_REPLACE(ri.info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g') AS pred_snils_cleaned + FROM ervu_dashboard.recruits_info ri + JOIN ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id + WHERE + '${IDM_ID}' != '' + AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}' +) +SELECT + recruit_id, + akt_source_id AS source_id, + to_date(akt_data_sved, 'YYYY-MM-DD') AS source_update_date, + akt_snils_cleaned AS snils, + true AS actual +FROM base +WHERE LENGTH(akt_snils_cleaned) = 11 + +UNION ALL + +SELECT + recruit_id, + pred_source_id AS source_id, + to_date(pred_data_sved, 'YYYY-MM-DD') AS source_update_date, + pred_snils_cleaned AS snils, + false AS actual +FROM base +WHERE LENGTH(pred_snils_cleaned) = 11 + Y - 1440 - 208 + 480 + 400 - Filter rows - FilterRows - - Y - - 1 - - none - - - - - - - = - last_row - N - - - - N - -1 - constant - -1 - Y - Boolean - - - - Change job status on success - - - 1008 - 48 - - - - Identify last row in a stream - DetectLastRow - - Y - - 1 - - none - - - last_row - - - 736 - 208 - - - - Insert / update snils + snils_insert_or_update InsertUpdate N @@ -290,7 +354,6 @@ DO UPDATE SET = source_id source_id - ervu_dashboard snils
@@ -324,61 +387,13 @@ DO UPDATE SET 1008 - 208 - -
- - Table input - TableInput - - N - - 1 - - none - - - ervu-dashboard - N - WITH -base as ( - SELECT - ri.recruit_id, - ri.info - FROM ervu_dashboard.recruits_info ri - join ervu_dashboard.citizen r ON r.recruit_id = ri.recruit_id AND '${IDM_ID}' != '' -- Проверка на пустую строку - AND (ri.current_recruitment_id = '${IDM_ID}' or (ri.current_recruitment_id is null and ri.target_recruitment_id = '${IDM_ID}')) -) -SELECT - recruit_id, - info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'id' AS source_id, - to_date(info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g') AS snils, - true AS actual -FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'aktSNILS'->>'snils', '[^0-9]', '', 'g')) = 11 - -UNION ALL - -SELECT - recruit_id, - info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'id' AS source_id, - to_date(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'dataSved', 'YYYY-MM-DD') AS source_update_date, - REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g') AS snils, - false AS actual -FROM base -WHERE LENGTH(REGEXP_REPLACE(info->'svedFL'->'svedBS'->'snils'->'predSNILS'->>'snils', '[^0-9]', '', 'g')) = 11 - Y - - - 480 - 208 + 400 - Insert / update snils - Change job status on error + snils_insert_or_update + snils_failure_job Y error_description diff --git a/mappings/info_recruits/citizen_tables/snils/recruitment_five_flow_on_error.hpl b/mappings/info_recruits/citizen_tables/snils/recruitment_five_flow_on_error.hpl index 0db9bfd..2c1f326 100644 --- a/mappings/info_recruits/citizen_tables/snils/recruitment_five_flow_on_error.hpl +++ b/mappings/info_recruits/citizen_tables/snils/recruitment_five_flow_on_error.hpl @@ -59,7 +59,7 @@ ervu-dashboard N - 0 + SELECT r.idm_id as recruitment_id FROM ervu_dashboard.recruitment r diff --git a/mappings/info_recruits/citizen_tables/snils/recruitments_five_flow.hpl b/mappings/info_recruits/citizen_tables/snils/recruitments_five_flow.hpl index 2757f86..bffe733 100644 --- a/mappings/info_recruits/citizen_tables/snils/recruitments_five_flow.hpl +++ b/mappings/info_recruits/citizen_tables/snils/recruitments_five_flow.hpl @@ -59,7 +59,7 @@ ervu-dashboard N - 0 + SELECT idm_id AS recruitment FROM ervu_dashboard.recruitment