ERVU-480 : work flow

This commit is contained in:
Fusionshh 2025-09-23 16:06:14 +03:00
parent ed01961f0b
commit 911e448446
22 changed files with 1772 additions and 457 deletions

View file

@ -26,12 +26,12 @@
<schedulerType>0</schedulerType>
<weekDay>1</weekDay>
<parallel>N</parallel>
<xloc>48</xloc>
<xloc>0</xloc>
<yloc>48</yloc>
<attributes_hac/>
</action>
<action>
<name>check_if_employer_job_execution_exists.hpl</name>
<name>check_if_job_execution_exists.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
@ -41,7 +41,9 @@
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/employer/support/check_if_employer_job_execution_exists.hpl</filename>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/support/check_if_job_execution_exists.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -52,7 +54,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>272</xloc>
<xloc>352</xloc>
<yloc>48</yloc>
<attributes_hac/>
</action>
@ -69,7 +71,7 @@
<valuetype>variable</valuetype>
<variablename>JOB_EXECUTED_FLAG</variablename>
<parallel>N</parallel>
<xloc>576</xloc>
<xloc>656</xloc>
<yloc>48</yloc>
<attributes_hac/>
</action>
@ -95,7 +97,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>Y</parallel>
<xloc>896</xloc>
<xloc>976</xloc>
<yloc>48</yloc>
<attributes_hac/>
</action>
@ -121,7 +123,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>816</xloc>
<xloc>896</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
@ -136,7 +138,9 @@
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/employer/support/check_if_need_to_repeat.hpl</filename>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/support/check_if_need_to_repeat.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -147,7 +151,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>576</xloc>
<xloc>656</xloc>
<yloc>192</yloc>
<attributes_hac/>
</action>
@ -164,7 +168,7 @@
<valuetype>variable</valuetype>
<variablename>NEED_TO_REPEAT_JOB</variablename>
<parallel>N</parallel>
<xloc>576</xloc>
<xloc>656</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
@ -190,14 +194,34 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>576</xloc>
<xloc>656</xloc>
<yloc>544</yloc>
<attributes_hac/>
</action>
<action>
<name>init_job_name</name>
<description/>
<type>SET_VARIABLES</type>
<attributes/>
<fields>
<field>
<variable_name>JOB_NAME</variable_name>
<variable_type>CURRENT_WORKFLOW</variable_type>
<variable_value>employer_job</variable_value>
</field>
</fields>
<file_variable_type>CURRENT_WORKFLOW</file_variable_type>
<filename/>
<replacevars>N</replacevars>
<parallel>N</parallel>
<xloc>160</xloc>
<yloc>48</yloc>
<attributes_hac/>
</action>
</actions>
<hops>
<hop>
<from>check_if_employer_job_execution_exists.hpl</from>
<from>check_if_job_execution_exists.hpl</from>
<to>employer_job_execution_exists_check</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
@ -240,11 +264,18 @@
</hop>
<hop>
<from>Start</from>
<to>check_if_employer_job_execution_exists.hpl</to>
<to>init_job_name</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>init_job_name</from>
<to>check_if_job_execution_exists.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
</hops>
<notepads>
</notepads>

View file

@ -65,6 +65,21 @@
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_employers</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_employers</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -82,7 +97,7 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>944</xloc>
<xloc>1168</xloc>
<yloc>448</yloc>
</GUI>
</transform>
@ -117,7 +132,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>720</xloc>
<xloc>944</xloc>
<yloc>448</yloc>
</GUI>
</transform>
@ -148,7 +163,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>944</xloc>
<xloc>1168</xloc>
<yloc>112</yloc>
</GUI>
</transform>
@ -230,7 +245,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>944</xloc>
<xloc>1168</xloc>
<yloc>288</yloc>
</GUI>
</transform>
@ -252,6 +267,43 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<ignore_flag_field/>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>employed</name>
<rename>actual_employer</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>720</xloc>
<yloc>448</yloc>
</GUI>
</transform>
<transform>
<name>employer_input</name>
<type>TableInput</type>
@ -282,7 +334,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
false as actual_employer
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' ->> 'predRabotodat' != 'null'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
select fd.source_id,
fd.recruit_id,
to_date(rabotodat ->> 'dataSved', 'YYYY-MM-DD') as source_update_date,
@ -319,7 +372,8 @@ select fd.source_id,
then rabotodat -> 'extend' -> 'svedYUL' ->> 'kpp'
end as kpp,
coalesce(rabotodat -> 'svedYUL' -> 'extend' ->> 'priznakOpk', '0') = '1' as is_opk_org,
actual_employer
actual_employer,
coalesce(rabotodat -> 'extend' ->> 'statusRabotodat', '0') = '1' as status
from filteredData fd;</sql>
<variables_active>Y</variables_active>
<attributes/>
@ -394,6 +448,10 @@ from filteredData fd;</sql>
<column_name>is_opk_org</column_name>
<stream_name>is_opk_org</stream_name>
</field>
<field>
<column_name>status</column_name>
<stream_name>status</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
@ -410,11 +468,59 @@ from filteredData fd;</sql>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>720</xloc>
<xloc>944</xloc>
<yloc>288</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_employers</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual_employer</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>528</xloc>
<yloc>448</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>employer_output</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -65,6 +65,16 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -255,6 +265,46 @@ and recruitment_id = '${IDM_ID}';</sql>
<yloc>384</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<key>
<condition>&lt;></condition>
<field>employed</field>
<name>actual_employer</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>employed</name>
<rename>actual_employer</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>672</xloc>
<yloc>544</yloc>
</GUI>
</transform>
<transform>
<name>employer_input</name>
<type>TableInput</type>
@ -326,7 +376,8 @@ select fd.source_id,
then rabotodat -> 'extend' -> 'svedYUL' ->> 'kpp'
end as kpp,
coalesce(rabotodat -> 'svedYUL' -> 'extend' ->> 'priznakOpk', '0') = '1' as is_opk_org,
actual_employer
actual_employer,
coalesce(rabotodat -> 'extend' ->> 'statusRabotodat', '0') = '1' as status
from filteredData fd;</sql>
<variables_active>Y</variables_active>
<attributes/>
@ -353,11 +404,13 @@ from filteredData fd;</sql>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
<name2/>
</key>
<key>
<condition>=</condition>
<condition>&lt;></condition>
<field>actual_employer</field>
<name>actual_employer</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>employer</table>
@ -426,6 +479,11 @@ from filteredData fd;</sql>
<rename>is_opk_org</rename>
<update>Y</update>
</value>
<value>
<name>status</name>
<rename>status</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<attributes/>
@ -435,6 +493,18 @@ from filteredData fd;</sql>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>employer_upsert</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -65,6 +65,21 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_employers</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_employers</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -82,7 +97,7 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1072</xloc>
<xloc>1312</xloc>
<yloc>560</yloc>
</GUI>
</transform>
@ -117,7 +132,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>896</xloc>
<xloc>1136</xloc>
<yloc>560</yloc>
</GUI>
</transform>
@ -148,7 +163,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1072</xloc>
<xloc>1312</xloc>
<yloc>256</yloc>
</GUI>
</transform>
@ -166,11 +181,8 @@ and recruitment_id = '${IDM_ID}';
<arguments>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>N</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
@ -181,7 +193,6 @@ DO UPDATE
SET status = 'PROCESSING',
error_description = null,
execution_datetime = current_timestamp;</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>352</xloc>
@ -238,7 +249,7 @@ SET status = 'PROCESSING',
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1072</xloc>
<xloc>1312</xloc>
<yloc>400</yloc>
</GUI>
</transform>
@ -260,6 +271,41 @@ SET status = 'PROCESSING',
<yloc>400</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>employed</name>
<rename>actual_employer</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>896</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform>
<name>employer_input</name>
<type>TableInput</type>
@ -327,7 +373,8 @@ select fd.source_id,
then rabotodat -> 'extend' -> 'svedYUL' ->> 'kpp'
end as kpp,
coalesce(rabotodat -> 'svedYUL' -> 'extend' ->> 'priznakOpk', '0') = '1' as is_opk_org,
actual_employer
actual_employer,
coalesce(rabotodat -> 'extend' ->> 'statusRabotodat', '0') = '1' as status
from filteredData fd;</sql>
<variables_active>Y</variables_active>
<attributes/>
@ -354,11 +401,7 @@ from filteredData fd;</sql>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
</key>
<key>
<condition>=</condition>
<field>actual_employer</field>
<name>actual_employer</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>employer</table>
@ -427,15 +470,68 @@ from filteredData fd;</sql>
<rename>is_opk_org</rename>
<update>Y</update>
</value>
<value>
<name>status</name>
<rename>status</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>896</xloc>
<xloc>1136</xloc>
<yloc>400</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_employers</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual_employer</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>704</xloc>
<yloc>560</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>employer_upsert</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -21,33 +21,33 @@
</notepads>
<order>
<hop>
<from>Get all recruitments ordered by created_date</from>
<from>Get all recruitments</from>
<to>employer_flow.hpl</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Get all recruitments ordered by created_date</from>
<from>Get all recruitments</from>
<to>employer_flow.hpl 2</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Get all recruitments ordered by created_date</from>
<from>Get all recruitments</from>
<to>employer_flow.hpl 3</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Get all recruitments ordered by created_date</from>
<from>Get all recruitments</from>
<to>employer_flow.hpl 4</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Get all recruitments ordered by created_date</from>
<from>Get all recruitments</from>
<to>employer_flow.hpl 5</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Get all recruitments ordered by created_date</name>
<name>Get all recruitments</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
@ -59,6 +59,7 @@
</partitioning>
<connection>ervu-dashboard-test</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>SELECT
idm_id
FROM ervu_dashboard.recruitment;</sql>

View file

@ -66,22 +66,23 @@
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<lookup>get_max_source_update_date</lookup>
<sql>SELECT
<sql>WITH mud AS (
SELECT
recruitment_id,
MAX(execution_datetime) AS max_upd_date
FROM etl.job_execution
WHERE job_name = '${JOB_NAME}'
AND status IN ('SUCCESS','DELTA_ERROR','DELTA_SUCCESS','DELTA_PROCESSING')
GROUP BY recruitment_id
)
SELECT
r.idm_id,
? max_source_update_date
FROM ervu_dashboard.recruitment r
LEFT JOIN etl.job_execution je
ON r.idm_id = je.recruitment_id
and job_name = 'employer_job'
JOIN recruits_info ri
JOIN mud ON mud.recruitment_id = r.idm_id
JOIN recruits_info ri
ON COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = r.idm_id
AND ri.updated_at > (
SELECT MAX(execution_datetime)
FROM etl.job_execution
WHERE job_name = 'employer_job'
AND recruitment_id = r.idm_id
)
where je.status in ('SUCCESS', 'DELTA_ERROR', 'DELTA_SUCCESS', 'DELTA_PROCESSING');</sql>
AND ri.updated_at > mud.max_upd_date;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
@ -89,29 +90,6 @@ where je.status in ('SUCCESS', 'DELTA_ERROR', 'DELTA_SUCCESS', 'DELTA_PROCESSING
<yloc>336</yloc>
</GUI>
</transform>
<transform>
<name>get_max_source_update_date</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>select max(source_update_date)
from employer;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>400</xloc>
<yloc>336</yloc>
</GUI>
</transform>
<transform>
<name>employer_flow_delta.hpl</name>
<type>PipelineExecutor</type>
@ -417,6 +395,29 @@ from employer;</sql>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
<name>get_max_source_update_date</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>select max(source_update_date)
from employer;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>400</xloc>
<yloc>336</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>

View file

@ -98,6 +98,11 @@ WHERE je.id IS NULL
<field>recruitment_id</field>
<input/>
</variable_mapping>
<variable_mapping>
<variable>JOB_NAME</variable>
<field/>
<input>employer_job</input>
</variable_mapping>
<inherit_all_vars>Y</inherit_all_vars>
</parameters>
<execution_result_target_transform/>
@ -149,6 +154,11 @@ WHERE je.id IS NULL
<field>recruitment_id</field>
<input/>
</variable_mapping>
<variable_mapping>
<variable>JOB_NAME</variable>
<field/>
<input>employer_job</input>
</variable_mapping>
<inherit_all_vars>Y</inherit_all_vars>
</parameters>
<execution_result_target_transform/>
@ -200,6 +210,11 @@ WHERE je.id IS NULL
<field>recruitment_id</field>
<input/>
</variable_mapping>
<variable_mapping>
<variable>JOB_NAME</variable>
<field/>
<input>employer_job</input>
</variable_mapping>
<inherit_all_vars>Y</inherit_all_vars>
</parameters>
<execution_result_target_transform/>
@ -251,6 +266,11 @@ WHERE je.id IS NULL
<field>recruitment_id</field>
<input/>
</variable_mapping>
<variable_mapping>
<variable>JOB_NAME</variable>
<field/>
<input>employer_job</input>
</variable_mapping>
<inherit_all_vars>Y</inherit_all_vars>
</parameters>
<execution_result_target_transform/>
@ -302,6 +322,11 @@ WHERE je.id IS NULL
<field>recruitment_id</field>
<input/>
</variable_mapping>
<variable_mapping>
<variable>JOB_NAME</variable>
<field/>
<input>employer_job</input>
</variable_mapping>
<inherit_all_vars>Y</inherit_all_vars>
</parameters>
<execution_result_target_transform/>

View file

@ -31,7 +31,7 @@
<attributes_hac/>
</action>
<action>
<name>check_if_individual_entrepreneur_execution_exists.hpl</name>
<name>check_if_job_execution_exists.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
@ -41,7 +41,7 @@
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/individual_entrepreneur/support/check_if_individual_entrepreneur_execution_exists.hpl</filename>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/support/check_if_job_execution_exists.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
@ -54,7 +54,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>256</xloc>
<xloc>384</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
@ -71,7 +71,7 @@
<valuetype>variable</valuetype>
<variablename>JOB_EXECUTED_FLAG</variablename>
<parallel>N</parallel>
<xloc>592</xloc>
<xloc>720</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
@ -87,8 +87,6 @@
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/individual_entrepreneur/recruitment_five_flow.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -99,7 +97,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>Y</parallel>
<xloc>912</xloc>
<xloc>1040</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
@ -115,8 +113,6 @@
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/individual_entrepreneur/recruitment_five_flow_repeat.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -127,7 +123,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>832</xloc>
<xloc>960</xloc>
<yloc>528</yloc>
<attributes_hac/>
</action>
@ -142,7 +138,7 @@
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/individual_entrepreneur/support/check_if_need_to_repeat.hpl</filename>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/support/check_if_need_to_repeat.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
@ -155,7 +151,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>592</xloc>
<xloc>720</xloc>
<yloc>384</yloc>
<attributes_hac/>
</action>
@ -172,7 +168,7 @@
<valuetype>variable</valuetype>
<variablename>NEED_TO_REPEAT_JOB</variablename>
<parallel>N</parallel>
<xloc>592</xloc>
<xloc>720</xloc>
<yloc>528</yloc>
<attributes_hac/>
</action>
@ -188,8 +184,6 @@
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/individual_entrepreneur/recruitment_five_flow_delta.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -200,21 +194,34 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>592</xloc>
<xloc>720</xloc>
<yloc>720</yloc>
<attributes_hac/>
</action>
<action>
<name>init_job_name</name>
<description/>
<type>SET_VARIABLES</type>
<attributes/>
<fields>
<field>
<variable_name>JOB_NAME</variable_name>
<variable_type>CURRENT_WORKFLOW</variable_type>
<variable_value>individual_entrepreneur_job</variable_value>
</field>
</fields>
<file_variable_type>CURRENT_WORKFLOW</file_variable_type>
<filename/>
<replacevars>N</replacevars>
<parallel>N</parallel>
<xloc>144</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
</actions>
<hops>
<hop>
<from>Start</from>
<to>check_if_individual_entrepreneur_execution_exists.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>check_if_individual_entrepreneur_execution_exists.hpl</from>
<from>check_if_job_execution_exists.hpl</from>
<to>employer_job_execution_exists_check</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
@ -255,6 +262,20 @@
<evaluation>N</evaluation>
<unconditional>N</unconditional>
</hop>
<hop>
<from>Start</from>
<to>init_job_name</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>init_job_name</from>
<to>check_if_job_execution_exists.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
</hops>
<notepads>
</notepads>

View file

@ -85,6 +85,21 @@
<to>filter_null_npd_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_ip</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_ip</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -103,7 +118,7 @@
<attributes/>
<GUI>
<xloc>1648</xloc>
<yloc>576</yloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform>
@ -138,7 +153,7 @@ and recruitment_id = '${IDM_ID}';
<attributes/>
<GUI>
<xloc>1440</xloc>
<yloc>576</yloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform>
@ -272,6 +287,77 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<yloc>448</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>individual_entrepreneur</name>
<rename>actual</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1184</xloc>
<yloc>576</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_ip</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>976</xloc>
<yloc>576</yloc>
</GUI>
</transform>
<transform>
<name>filter_null_npd_dates</name>
<type>FilterRows</type>
@ -297,7 +383,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<attributes/>
<GUI>
<xloc>800</xloc>
<yloc>576</yloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform>
@ -337,7 +423,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
false as actual
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'predRegIP' != 'null'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
select fd.recruit_id,
fd.source_id,
ip_elem ->> 'ogrnip' as ogrnip,
@ -402,7 +489,7 @@ from filtered_data fd;</sql>
<attributes/>
<GUI>
<xloc>1184</xloc>
<yloc>576</yloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform>
@ -489,10 +576,22 @@ from filtered_data fd;</sql>
<attributes/>
<GUI>
<xloc>976</xloc>
<yloc>576</yloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>individual_entrepreneur_npd_output</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -85,6 +85,21 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_ip</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_ip</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -103,7 +118,7 @@
<attributes/>
<GUI>
<xloc>1616</xloc>
<yloc>592</yloc>
<yloc>752</yloc>
</GUI>
</transform>
<transform>
@ -138,7 +153,7 @@ and recruitment_id = '${IDM_ID}';
<attributes/>
<GUI>
<xloc>1328</xloc>
<yloc>592</yloc>
<yloc>752</yloc>
</GUI>
</transform>
<transform>
@ -300,7 +315,7 @@ and recruitment_id = '${IDM_ID}';</sql>
<attributes/>
<GUI>
<xloc>736</xloc>
<yloc>592</yloc>
<yloc>752</yloc>
</GUI>
</transform>
<transform>
@ -408,7 +423,7 @@ from filtered_data fd;</sql>
<attributes/>
<GUI>
<xloc>1104</xloc>
<yloc>592</yloc>
<yloc>752</yloc>
</GUI>
</transform>
<transform>
@ -495,6 +510,85 @@ from filtered_data fd;</sql>
</field>
</fields>
<attributes/>
<GUI>
<xloc>912</xloc>
<yloc>752</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<ignore_flag_field/>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<key>
<condition>&lt;></condition>
<field>individual_entrepreneur</field>
<name>actual</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>individual_entrepreneur</name>
<rename>actual</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1104</xloc>
<yloc>592</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_ip</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>912</xloc>
<yloc>592</yloc>
@ -525,6 +619,18 @@ from filtered_data fd;</sql>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
</transform_error_handling>
<attributes/>
</pipeline>

View file

@ -85,6 +85,21 @@
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_ip</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_ip</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -103,7 +118,7 @@
<attributes/>
<GUI>
<xloc>1856</xloc>
<yloc>624</yloc>
<yloc>736</yloc>
</GUI>
</transform>
<transform>
@ -138,7 +153,7 @@ and recruitment_id = '${IDM_ID}';
<attributes/>
<GUI>
<xloc>1568</xloc>
<yloc>624</yloc>
<yloc>736</yloc>
</GUI>
</transform>
<transform>
@ -186,11 +201,8 @@ and recruitment_id = '${IDM_ID}';
<arguments>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>N</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
@ -201,7 +213,6 @@ DO UPDATE
SET status = 'PROCESSING',
error_description = null,
execution_datetime = current_timestamp;</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>512</xloc>
@ -304,7 +315,7 @@ SET status = 'PROCESSING',
<attributes/>
<GUI>
<xloc>976</xloc>
<yloc>624</yloc>
<yloc>736</yloc>
</GUI>
</transform>
<transform>
@ -408,7 +419,7 @@ from filtered_data fd;</sql>
<attributes/>
<GUI>
<xloc>1328</xloc>
<yloc>624</yloc>
<yloc>736</yloc>
</GUI>
</transform>
<transform>
@ -497,7 +508,80 @@ from filtered_data fd;</sql>
<attributes/>
<GUI>
<xloc>1136</xloc>
<yloc>624</yloc>
<yloc>736</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<ignore_flag_field/>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>individual_entrepreneur</name>
<rename>actual</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1328</xloc>
<yloc>592</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_ip</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>1136</xloc>
<yloc>592</yloc>
</GUI>
</transform>
<transform_error_handling>
@ -525,6 +609,18 @@ from filtered_data fd;</sql>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
</transform_error_handling>
<attributes/>
</pipeline>

View file

@ -89,22 +89,23 @@
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<lookup>Get_max_source_update_date</lookup>
<sql>SELECT
r.idm_id as recruitment_id,
? as max_source_update_date
<sql>WITH mud AS (
SELECT
recruitment_id,
MAX(execution_datetime) AS max_upd_date
FROM etl.job_execution
WHERE job_name = '${JOB_NAME}'
AND status IN ('SUCCESS','DELTA_ERROR','DELTA_SUCCESS','DELTA_PROCESSING')
GROUP BY recruitment_id
)
SELECT
r.idm_id,
? max_source_update_date
FROM ervu_dashboard.recruitment r
LEFT JOIN etl.job_execution je
ON r.idm_id = je.recruitment_id
and job_name = 'individual_entrepreneur_job'
JOIN recruits_info ri
JOIN mud ON mud.recruitment_id = r.idm_id
JOIN recruits_info ri
ON COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = r.idm_id
AND ri.updated_at > (
SELECT MAX(execution_datetime)
FROM etl.job_execution
WHERE job_name = 'individual_entrepreneur_job'
AND recruitment_id = r.idm_id
)
where je.status in ('SUCCESS', 'DELTA_ERROR', 'DELTA_SUCCESS', 'DELTA_PROCESSING');</sql>
AND ri.updated_at > mud.max_upd_date;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>

View file

@ -65,6 +65,21 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_self_employed</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_self_employed</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -82,8 +97,8 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1040</xloc>
<yloc>384</yloc>
<xloc>1104</xloc>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
@ -103,11 +118,8 @@
</argument>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>Y</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>Y</set_params>
<single_statement>N</single_statement>
@ -118,11 +130,10 @@ WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';
</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>832</xloc>
<yloc>384</yloc>
<xloc>896</xloc>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
@ -139,11 +150,8 @@ and recruitment_id = '${IDM_ID}';
<arguments>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>Y</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
@ -153,10 +161,9 @@ WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';
</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>1040</xloc>
<xloc>1104</xloc>
<yloc>48</yloc>
</GUI>
</transform>
@ -174,11 +181,8 @@ and recruitment_id = '${IDM_ID}';
<arguments>
</arguments>
<connection>ervu-dashboard</connection>
<delete_field/>
<execute_each_row>N</execute_each_row>
<insert_field/>
<quoteString>N</quoteString>
<read_field/>
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
@ -186,7 +190,6 @@ and recruitment_id = '${IDM_ID}';
VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
</sql>
<update_field/>
<attributes/>
<GUI>
<xloc>224</xloc>
@ -242,7 +245,7 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1040</xloc>
<xloc>1104</xloc>
<yloc>224</yloc>
</GUI>
</transform>
@ -264,6 +267,77 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<yloc>224</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>self_employed</name>
<rename>actual</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>624</xloc>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_self_employed</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>624</xloc>
<yloc>368</yloc>
</GUI>
</transform>
<transform>
<name>self_employed_input</name>
<type>TableInput</type>
@ -292,7 +366,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
false as actual
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedNPD' ->> 'predNPD' != 'null'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
select fd.recruit_id,
npd_elem ->> 'id' as source_id,
to_date(npd_elem ->> 'dataSved', 'YYYY-MM-DD') as source_update_date,
@ -350,25 +425,34 @@ from filtered_data fd;</sql>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>ervu_dashboard</schema>
<specify_fields>Y</specify_fields>
<table>self_employed</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>832</xloc>
<xloc>896</xloc>
<yloc>224</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>self_employed_output</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -65,6 +65,21 @@
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_self_employed</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_self_employed</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -83,7 +98,7 @@
<attributes/>
<GUI>
<xloc>1040</xloc>
<yloc>416</yloc>
<yloc>464</yloc>
</GUI>
</transform>
<transform>
@ -118,7 +133,7 @@ and recruitment_id = '${IDM_ID}';
<attributes/>
<GUI>
<xloc>848</xloc>
<yloc>416</yloc>
<yloc>464</yloc>
</GUI>
</transform>
<transform>
@ -255,6 +270,85 @@ and recruitment_id = '${IDM_ID}';</sql>
<yloc>224</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<ignore_flag_field/>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<key>
<condition>&lt;></condition>
<field>self_employed</field>
<name>actual</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>self_employed</name>
<rename>actual</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>624</xloc>
<yloc>464</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_self_employed</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>624</xloc>
<yloc>336</yloc>
</GUI>
</transform>
<transform>
<name>self_employed_input</name>
<type>TableInput</type>
@ -356,6 +450,18 @@ from filtered_data fd;</sql>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>self_employed_upsert</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -65,6 +65,21 @@
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_self_employed</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_self_employed</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -83,7 +98,7 @@
<attributes/>
<GUI>
<xloc>1072</xloc>
<yloc>576</yloc>
<yloc>656</yloc>
</GUI>
</transform>
<transform>
@ -118,7 +133,7 @@ and recruitment_id = '${IDM_ID}';
<attributes/>
<GUI>
<xloc>864</xloc>
<yloc>576</yloc>
<yloc>656</yloc>
</GUI>
</transform>
<transform>
@ -255,6 +270,79 @@ SET status = 'PROCESSING',
<yloc>416</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<ignore_flag_field/>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<name2/>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>self_employed</name>
<rename>actual</rename>
</value>
</lookup>
<skip_lookup>Y</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>656</xloc>
<yloc>656</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_self_employed</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<attributes/>
<GUI>
<xloc>656</xloc>
<yloc>544</yloc>
</GUI>
</transform>
<transform>
<name>self_employed_input</name>
<type>TableInput</type>
@ -353,6 +441,18 @@ from filtered_data fd;</sql>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>self_employed_output</source_transform>
<target_transform>Change job status on error</target_transform>

View file

@ -89,22 +89,23 @@
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<lookup>Get_max_source_update_date</lookup>
<sql>SELECT
r.idm_id AS recruitment_id,
? as max_source_update_date
<sql>WITH mud AS (
SELECT
recruitment_id,
MAX(execution_datetime) AS max_upd_date
FROM etl.job_execution
WHERE job_name = '${JOB_NAME}'
AND status IN ('SUCCESS','DELTA_ERROR','DELTA_SUCCESS','DELTA_PROCESSING')
GROUP BY recruitment_id
)
SELECT
r.idm_id,
? max_source_update_date
FROM ervu_dashboard.recruitment r
LEFT JOIN etl.job_execution je
ON r.idm_id = je.recruitment_id
AND je.job_name = 'self_employed_job'
JOIN mud ON mud.recruitment_id = r.idm_id
JOIN recruits_info ri
ON COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = r.idm_id
AND ri.updated_at > (
SELECT MAX(execution_datetime)
FROM etl.job_execution
WHERE job_name = 'self_employed_job'
AND recruitment_id = r.idm_id
)
WHERE je.status IN ('SUCCESS', 'DELTA_ERROR', 'DELTA_SUCCESS', 'DELTA_PROCESSING');</sql>
AND ri.updated_at > mud.max_upd_date;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>

View file

@ -31,7 +31,7 @@
<attributes_hac/>
</action>
<action>
<name>check_if_self_employed_job_execution_exists.hpl</name>
<name>check_if_job_execution_exists.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
@ -41,7 +41,9 @@
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/self_employed/support/check_if_self_employed_job_execution_exists.hpl</filename>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/support/check_if_job_execution_exists.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -52,7 +54,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>704</xloc>
<xloc>912</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
@ -69,7 +71,7 @@
<valuetype>variable</valuetype>
<variablename>JOB_EXECUTED_FLAG</variablename>
<parallel>N</parallel>
<xloc>1008</xloc>
<xloc>1216</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
@ -95,7 +97,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>Y</parallel>
<xloc>1328</xloc>
<xloc>1536</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
@ -111,8 +113,6 @@
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/self_employed/recruitments_five_flow_repeat.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -123,7 +123,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>1248</xloc>
<xloc>1456</xloc>
<yloc>528</yloc>
<attributes_hac/>
</action>
@ -138,7 +138,9 @@
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/self_employed/support/check_if_need_to_repeat.hpl</filename>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/support/check_if_need_to_repeat.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -149,7 +151,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>1008</xloc>
<xloc>1216</xloc>
<yloc>384</yloc>
<attributes_hac/>
</action>
@ -166,7 +168,7 @@
<valuetype>variable</valuetype>
<variablename>NEED_TO_REPEAT_JOB</variablename>
<parallel>N</parallel>
<xloc>1008</xloc>
<xloc>1216</xloc>
<yloc>528</yloc>
<attributes_hac/>
</action>
@ -182,8 +184,6 @@
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/self_employed/recruitment_five_flow_delta.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -194,14 +194,34 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>1008</xloc>
<xloc>1216</xloc>
<yloc>720</yloc>
<attributes_hac/>
</action>
<action>
<name>init_job_name</name>
<description/>
<type>SET_VARIABLES</type>
<attributes/>
<fields>
<field>
<variable_name>JOB_NAME</variable_name>
<variable_type>CURRENT_WORKFLOW</variable_type>
<variable_value>self_employed_job</variable_value>
</field>
</fields>
<file_variable_type>CURRENT_WORKFLOW</file_variable_type>
<filename/>
<replacevars>N</replacevars>
<parallel>N</parallel>
<xloc>640</xloc>
<yloc>224</yloc>
<attributes_hac/>
</action>
</actions>
<hops>
<hop>
<from>check_if_self_employed_job_execution_exists.hpl</from>
<from>check_if_job_execution_exists.hpl</from>
<to>employer_job_execution_exists_check</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
@ -244,11 +264,18 @@
</hop>
<hop>
<from>Start</from>
<to>check_if_self_employed_job_execution_exists.hpl</to>
<to>init_job_name</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>init_job_name</from>
<to>check_if_job_execution_exists.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
</hops>
<notepads>
</notepads>

View file

@ -26,7 +26,7 @@
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_output</from>
<from>work_activity_output</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
@ -40,16 +40,6 @@
<to>Identify last row in a stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>citizen_work_activity_output</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_output</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter rows</from>
<to>Change job status on success</to>
@ -60,9 +50,24 @@
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>employee_parental_leave_output</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Filter rows</to>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Group by</from>
<to>work_activity_output</to>
<enabled>Y</enabled>
</hop>
<hop>
@ -72,12 +77,17 @@
</hop>
<hop>
<from>Filter_null_parental_leave_dates</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>employee_parental_leave_output</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>employee_parental_leave_output</from>
<to>Change job status on error</to>
<from>work_activity_output</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
</order>
@ -97,8 +107,30 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1424</xloc>
<yloc>528</yloc>
<xloc>1728</xloc>
<yloc>624</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>citizen_work_activity_output</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>928</xloc>
<yloc>624</yloc>
</GUI>
</transform>
<transform>
@ -132,8 +164,8 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>528</yloc>
<xloc>1504</xloc>
<yloc>624</yloc>
</GUI>
</transform>
<transform>
@ -163,7 +195,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1424</xloc>
<xloc>1504</xloc>
<yloc>144</yloc>
</GUI>
</transform>
@ -245,8 +277,8 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>144</yloc>
<xloc>1504</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
@ -270,11 +302,92 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
<operator>-</operator>
</condition>
</compare>
<send_true_to>employee_parental_leave_output</send_true_to>
<send_false_to>Change job status on success</send_false_to>
<send_true_to>Block until transforms finish</send_true_to>
<attributes/>
<GUI>
<xloc>704</xloc>
<yloc>528</yloc>
<yloc>624</yloc>
</GUI>
</transform>
<transform>
<name>Group by</name>
<type>GroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<add_linenr>N</add_linenr>
<all_rows>N</all_rows>
<directory>${java.io.tmpdir}</directory>
<fields>
<field>
<aggregate>recruit_id</aggregate>
<subject>recruit_id</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>employer_name</aggregate>
<subject>employer_name</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>position</aggregate>
<subject>position</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>personnel_event_date</aggregate>
<subject>personnel_event_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>personnel_event_type</aggregate>
<subject>personnel_event_type</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_update_date</aggregate>
<subject>source_update_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>actual_employer</aggregate>
<subject>actual_employer</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>parental_leave_start_date</aggregate>
<subject>parental_leave_start_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>parental_leave_end_date</aggregate>
<subject>parental_leave_end_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>last_row</aggregate>
<subject>last_row</subject>
<type>LAST_INCL_NULL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>source_id</name>
</field>
</group>
<ignore_aggregate>N</ignore_aggregate>
<prefix>grp</prefix>
<attributes/>
<GUI>
<xloc>944</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
@ -308,22 +421,13 @@ VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH filteredData AS (SELECT ri.recruit_id,
trud_elem,
uhod_elem
ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat' as trud_arr
FROM recruits_info ri
CROSS JOIN LATERAL jsonb_array_elements(
ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat'
) AS trud_elem
LEFT JOIN LATERAL jsonb_array_elements(
CASE
WHEN jsonb_typeof(trud_elem -> 'svedUhodReb') = 'array'
THEN trud_elem -> 'svedUhodReb'
ELSE '[]'::jsonb
END
) AS uhod_elem ON true
WHERE jsonb_typeof(ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat') = 'array'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
SELECT fd.recruit_id,
trud_elem ->> 'id' AS source_id,
trud_elem -> 'svedOrg' ->> 'naimOrg' AS employer_name,
@ -334,17 +438,66 @@ SELECT fd.recruit_id,
(trud_elem ->> 'prAktMestRab') = '1' AS actual_employer,
to_date(uhod_elem ->> 'dataNachUhodReb', 'YYYY-MM-DD') AS parental_leave_start_date,
to_date(uhod_elem ->> 'dataKonUhodReb', 'YYYY-MM-DD') AS parental_leave_end_date
FROM filteredData fd;
FROM filteredData fd
CROSS JOIN LATERAL jsonb_array_elements(trud_arr) AS trud_elem
LEFT JOIN LATERAL jsonb_array_elements(COALESCE(trud_elem -> 'svedUhodReb', '[]'::jsonb)) AS uhod_elem
ON true;
</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>512</xloc>
<xloc>496</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>citizen_work_activity_output</name>
<name>employee_parental_leave_output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>citizen_work_activity_source_id</column_name>
<stream_name>source_id</stream_name>
</field>
<field>
<column_name>start_date</column_name>
<stream_name>parental_leave_start_date</stream_name>
</field>
<field>
<column_name>end_date</column_name>
<stream_name>parental_leave_end_date</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_monthly>Y</partitioning_monthly>
<return_keys>N</return_keys>
<schema>ervu_dashboard</schema>
<specify_fields>Y</specify_fields>
<table>employee_parental_leave</table>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>624</yloc>
</GUI>
</transform>
<transform>
<name>work_activity_output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
@ -389,6 +542,10 @@ FROM filteredData fd;
<column_name>actual_employer</column_name>
<stream_name>actual_employer</stream_name>
</field>
<field>
<column_name>source_update_date</column_name>
<stream_name>source_update_date</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
@ -398,7 +555,7 @@ FROM filteredData fd;
<return_keys>N</return_keys>
<schema>ervu_dashboard</schema>
<specify_fields>Y</specify_fields>
<table>citizen_work_activity</table>
<table>work_activity</table>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
@ -409,55 +566,9 @@ FROM filteredData fd;
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>employee_parental_leave_output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>citizen_work_activity_source_id</column_name>
<stream_name>source_id</stream_name>
</field>
<field>
<column_name>parental_leave_start_date</column_name>
<stream_name>parental_leave_start_date</stream_name>
</field>
<field>
<column_name>parental_leave_end_date</column_name>
<stream_name>parental_leave_end_date</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_monthly>Y</partitioning_monthly>
<return_keys>N</return_keys>
<schema>ervu_dashboard</schema>
<specify_fields>Y</specify_fields>
<table>employee_parental_leave</table>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>960</xloc>
<yloc>528</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>citizen_work_activity_output</source_transform>
<source_transform>employee_parental_leave_output</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
@ -469,7 +580,7 @@ FROM filteredData fd;
<min_pct_rows/>
</error>
<error>
<source_transform>employee_parental_leave_output</source_transform>
<source_transform>work_activity_output</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>

View file

@ -20,51 +20,11 @@
<notepads>
</notepads>
<order>
<hop>
<from>Create job execution record</from>
<to>citizen_work_activity_input</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Filter_null_parental_leave_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_input</from>
<to>Identify last row in a stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter rows</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>citizen_work_activity_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter_null_parental_leave_dates</from>
<to>parental_leave_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_upsert</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_upsert</from>
<to>Change job status on error</to>
@ -75,7 +35,79 @@
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Create job execution record</from>
<to>citizen_work_activity_input</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter_null_parental_leave_dates</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Filter_null_parental_leave_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_input</from>
<to>Identify last row in a stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Group by</from>
<to>citizen_work_activity_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_upsert</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>parental_leave_upsert</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>citizen_work_activity_upsert</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>880</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
<name>Change job status on error</name>
<type>ExecSql</type>
@ -111,7 +143,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1296</xloc>
<xloc>1552</xloc>
<yloc>432</yloc>
</GUI>
</transform>
@ -142,7 +174,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1296</xloc>
<xloc>1552</xloc>
<yloc>48</yloc>
</GUI>
</transform>
@ -165,16 +197,13 @@ and recruitment_id = '${IDM_ID}';
<replace_variables>Y</replace_variables>
<set_params>N</set_params>
<single_statement>N</single_statement>
<sql>UPDATE etl.job_execution
SET
status = 'DELTA_PROCESSING',
execution_datetime = current_timestamp,
error_description = NULL
where job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';</sql>
<sql>INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id)
VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
</sql>
<attributes/>
<GUI>
<xloc>336</xloc>
<xloc>224</xloc>
<yloc>224</yloc>
</GUI>
</transform>
@ -191,7 +220,7 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<attributes/>
<GUI>
<xloc>752</xloc>
<xloc>640</xloc>
<yloc>48</yloc>
</GUI>
</transform>
@ -227,8 +256,8 @@ and recruitment_id = '${IDM_ID}';</sql>
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1024</xloc>
<yloc>48</yloc>
<xloc>1552</xloc>
<yloc>224</yloc>
</GUI>
</transform>
<transform>
@ -252,13 +281,94 @@ and recruitment_id = '${IDM_ID}';</sql>
<operator>-</operator>
</condition>
</compare>
<send_true_to>parental_leave_upsert</send_true_to>
<send_false_to>Change job status on success</send_false_to>
<send_true_to>Block until transforms finish</send_true_to>
<attributes/>
<GUI>
<xloc>752</xloc>
<xloc>640</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
<name>Group by</name>
<type>GroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<add_linenr>N</add_linenr>
<all_rows>N</all_rows>
<directory>${java.io.tmpdir}</directory>
<fields>
<field>
<aggregate>recruit_id</aggregate>
<subject>recruit_id</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>employer_name</aggregate>
<subject>employer_name</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>position</aggregate>
<subject>position</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>personnel_event_date</aggregate>
<subject>personnel_event_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>personnel_event_type</aggregate>
<subject>personnel_event_type</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_update_date</aggregate>
<subject>source_update_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>actual_employer</aggregate>
<subject>actual_employer</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>parental_leave_start_date</aggregate>
<subject>parental_leave_start_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>parental_leave_end_date</aggregate>
<subject>parental_leave_end_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>last_row</aggregate>
<subject>last_row</subject>
<type>LAST_INCL_NULL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>source_id</name>
</field>
</group>
<ignore_aggregate>N</ignore_aggregate>
<prefix>grp</prefix>
<attributes/>
<GUI>
<xloc>880</xloc>
<yloc>224</yloc>
</GUI>
</transform>
<transform>
<name>Identify last row in a stream</name>
<type>DetectLastRow</type>
@ -273,7 +383,7 @@ and recruitment_id = '${IDM_ID}';</sql>
<resultfieldname>last_row</resultfieldname>
<attributes/>
<GUI>
<xloc>752</xloc>
<xloc>640</xloc>
<yloc>224</yloc>
</GUI>
</transform>
@ -290,23 +400,12 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH filteredData AS (SELECT ri.recruit_id,
trud_elem,
uhod_elem
ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat' as trud_arr
FROM recruits_info ri
CROSS JOIN LATERAL jsonb_array_elements(
ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat'
) AS trud_elem
LEFT JOIN LATERAL jsonb_array_elements(
CASE
WHEN jsonb_typeof(trud_elem -> 'svedUhodReb') = 'array'
THEN trud_elem -> 'svedUhodReb'
ELSE '[]'::jsonb
END
) AS uhod_elem ON true
WHERE jsonb_typeof(ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat') = 'array'
--AND to_date(trud_elem ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
--AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
SELECT fd.recruit_id,
trud_elem ->> 'id' AS source_id,
@ -318,12 +417,16 @@ SELECT fd.recruit_id,
(trud_elem ->> 'prAktMestRab') = '1' AS actual_employer,
to_date(uhod_elem ->> 'dataNachUhodReb', 'YYYY-MM-DD') AS parental_leave_start_date,
to_date(uhod_elem ->> 'dataKonUhodReb', 'YYYY-MM-DD') AS parental_leave_end_date
FROM filteredData fd;
FROM filteredData fd
CROSS JOIN LATERAL jsonb_array_elements(trud_arr) AS trud_elem
LEFT JOIN LATERAL jsonb_array_elements(COALESCE(trud_elem -> 'svedUhodReb', '[]'::jsonb)) AS uhod_elem
ON true
where to_date(trud_elem ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}';
</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>544</xloc>
<xloc>432</xloc>
<yloc>224</yloc>
</GUI>
</transform>
@ -347,7 +450,7 @@ FROM filteredData fd;
<name>source_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen_work_activity</table>
<table>work_activity</table>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
@ -392,7 +495,7 @@ FROM filteredData fd;
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1040</xloc>
<xloc>1136</xloc>
<yloc>224</yloc>
</GUI>
</transform>
@ -436,7 +539,7 @@ FROM filteredData fd;
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1040</xloc>
<xloc>1136</xloc>
<yloc>432</yloc>
</GUI>
</transform>

View file

@ -20,46 +20,11 @@
<notepads>
</notepads>
<order>
<hop>
<from>Detect empty stream</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter rows</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Filter_null_parental_leave_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_input</from>
<to>Identify last row in a stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Create job execution record</from>
<to>citizen_work_activity_input</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>citizen_work_activity_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter_null_parental_leave_dates</from>
<to>parental_leave_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>parental_leave_upsert</from>
<to>Change job status on error</to>
@ -72,10 +37,77 @@
</hop>
<hop>
<from>citizen_work_activity_upsert</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Create job execution record</from>
<to>citizen_work_activity_input</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter_null_parental_leave_dates</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Filter_null_parental_leave_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Group by</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>citizen_work_activity_input</from>
<to>Identify last row in a stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Group by</from>
<to>citizen_work_activity_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>parental_leave_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>citizen_work_activity_upsert</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1088</xloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform>
<name>Change job status on error</name>
<type>ExecSql</type>
@ -111,7 +143,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1312</xloc>
<xloc>1632</xloc>
<yloc>704</yloc>
</GUI>
</transform>
@ -142,7 +174,7 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1312</xloc>
<xloc>1632</xloc>
<yloc>320</yloc>
</GUI>
</transform>
@ -166,15 +198,12 @@ and recruitment_id = '${IDM_ID}';
<set_params>N</set_params>
<single_statement>N</single_statement>
<sql>INSERT INTO etl.job_execution (id, job_name, status, execution_datetime, error_description, recruitment_id)
VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}')
ON CONFLICT (job_name, recruitment_id)
DO UPDATE
SET status = 'PROCESSING',
error_description = null,
execution_datetime = current_timestamp;</sql>
VALUES (DEFAULT, '${JOB_NAME}', 'PROCESSING', DEFAULT, null, '${IDM_ID}');
</sql>
<attributes/>
<GUI>
<xloc>304</xloc>
<xloc>432</xloc>
<yloc>496</yloc>
</GUI>
</transform>
@ -191,7 +220,7 @@ SET status = 'PROCESSING',
</partitioning>
<attributes/>
<GUI>
<xloc>768</xloc>
<xloc>848</xloc>
<yloc>320</yloc>
</GUI>
</transform>
@ -227,8 +256,8 @@ SET status = 'PROCESSING',
<send_true_to>Change job status on success</send_true_to>
<attributes/>
<GUI>
<xloc>1040</xloc>
<yloc>320</yloc>
<xloc>1632</xloc>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
@ -252,13 +281,94 @@ SET status = 'PROCESSING',
<operator>-</operator>
</condition>
</compare>
<send_true_to>parental_leave_upsert</send_true_to>
<send_false_to>Change job status on success</send_false_to>
<send_true_to>Block until transforms finish</send_true_to>
<attributes/>
<GUI>
<xloc>768</xloc>
<xloc>848</xloc>
<yloc>704</yloc>
</GUI>
</transform>
<transform>
<name>Group by</name>
<type>GroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<add_linenr>N</add_linenr>
<all_rows>N</all_rows>
<directory>${java.io.tmpdir}</directory>
<fields>
<field>
<aggregate>recruit_id</aggregate>
<subject>recruit_id</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>employer_name</aggregate>
<subject>employer_name</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>position</aggregate>
<subject>position</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>personnel_event_date</aggregate>
<subject>personnel_event_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>personnel_event_type</aggregate>
<subject>personnel_event_type</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>source_update_date</aggregate>
<subject>source_update_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>actual_employer</aggregate>
<subject>actual_employer</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>parental_leave_start_date</aggregate>
<subject>parental_leave_start_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>parental_leave_end_date</aggregate>
<subject>parental_leave_end_date</subject>
<type>FIRST_INCL_NULL</type>
</field>
<field>
<aggregate>last_row</aggregate>
<subject>last_row</subject>
<type>LAST_INCL_NULL</type>
</field>
</fields>
<give_back_row>N</give_back_row>
<group>
<field>
<name>source_id</name>
</field>
</group>
<ignore_aggregate>N</ignore_aggregate>
<prefix>grp</prefix>
<attributes/>
<GUI>
<xloc>1088</xloc>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
<name>Identify last row in a stream</name>
<type>DetectLastRow</type>
@ -273,7 +383,7 @@ SET status = 'PROCESSING',
<resultfieldname>last_row</resultfieldname>
<attributes/>
<GUI>
<xloc>768</xloc>
<xloc>848</xloc>
<yloc>496</yloc>
</GUI>
</transform>
@ -290,22 +400,13 @@ SET status = 'PROCESSING',
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit/>
<sql>WITH filteredData AS (SELECT ri.recruit_id,
trud_elem,
uhod_elem
ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat' as trud_arr
FROM recruits_info ri
CROSS JOIN LATERAL jsonb_array_elements(
ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat'
) AS trud_elem
LEFT JOIN LATERAL jsonb_array_elements(
CASE
WHEN jsonb_typeof(trud_elem -> 'svedUhodReb') = 'array'
THEN trud_elem -> 'svedUhodReb'
ELSE '[]'::jsonb
END
) AS uhod_elem ON true
WHERE jsonb_typeof(ri.info -> 'svedFL' -> 'svedTrud' -> 'trudDeyat') = 'array'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
)
SELECT fd.recruit_id,
trud_elem ->> 'id' AS source_id,
trud_elem -> 'svedOrg' ->> 'naimOrg' AS employer_name,
@ -316,12 +417,15 @@ SELECT fd.recruit_id,
(trud_elem ->> 'prAktMestRab') = '1' AS actual_employer,
to_date(uhod_elem ->> 'dataNachUhodReb', 'YYYY-MM-DD') AS parental_leave_start_date,
to_date(uhod_elem ->> 'dataKonUhodReb', 'YYYY-MM-DD') AS parental_leave_end_date
FROM filteredData fd;
FROM filteredData fd
CROSS JOIN LATERAL jsonb_array_elements(trud_arr) AS trud_elem
LEFT JOIN LATERAL jsonb_array_elements(COALESCE(trud_elem -> 'svedUhodReb', '[]'::jsonb)) AS uhod_elem
ON true;
</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>544</xloc>
<xloc>640</xloc>
<yloc>496</yloc>
</GUI>
</transform>
@ -345,7 +449,7 @@ FROM filteredData fd;
<name>source_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen_work_activity</table>
<table>work_activity</table>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
@ -390,7 +494,7 @@ FROM filteredData fd;
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1040</xloc>
<xloc>1360</xloc>
<yloc>496</yloc>
</GUI>
</transform>
@ -434,7 +538,7 @@ FROM filteredData fd;
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1040</xloc>
<xloc>1360</xloc>
<yloc>704</yloc>
</GUI>
</transform>

View file

@ -66,22 +66,23 @@
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<lookup>get_max_source_update_date</lookup>
<sql>SELECT
r.idm_id as recruitment_id,
? as max_source_update_date
<sql>WITH mud AS (
SELECT
recruitment_id,
MAX(execution_datetime) AS max_upd_date
FROM etl.job_execution
WHERE job_name = '${JOB_NAME}'
AND status IN ('SUCCESS','DELTA_ERROR','DELTA_SUCCESS','DELTA_PROCESSING')
GROUP BY recruitment_id
)
SELECT
r.idm_id,
? max_source_update_date
FROM ervu_dashboard.recruitment r
LEFT JOIN etl.job_execution je
ON r.idm_id = je.recruitment_id
and job_name = 'work_activity_job'
JOIN recruits_info ri
JOIN mud ON mud.recruitment_id = r.idm_id
JOIN recruits_info ri
ON COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = r.idm_id
AND ri.updated_at > (
SELECT MAX(execution_datetime)
FROM etl.job_execution
WHERE job_name = 'work_activity_job'
AND recruitment_id = r.idm_id
)
where je.status in ('SUCCESS', 'DELTA_ERROR', 'DELTA_SUCCESS', 'DELTA_PROCESSING');</sql>
AND ri.updated_at > mud.max_upd_date;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>

View file

@ -52,7 +52,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>416</xloc>
<xloc>592</xloc>
<yloc>304</yloc>
<attributes_hac/>
</action>
@ -69,7 +69,7 @@
<valuetype>variable</valuetype>
<variablename>JOB_EXECUTED_FLAG</variablename>
<parallel>N</parallel>
<xloc>688</xloc>
<xloc>864</xloc>
<yloc>304</yloc>
<attributes_hac/>
</action>
@ -95,7 +95,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>Y</parallel>
<xloc>1040</xloc>
<xloc>1216</xloc>
<yloc>304</yloc>
<attributes_hac/>
</action>
@ -111,8 +111,6 @@
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/info_recruits/citizen_tables/work/work_activity/recruitment_five_flow_repeat.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
@ -123,7 +121,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>928</xloc>
<xloc>1104</xloc>
<yloc>608</yloc>
<attributes_hac/>
</action>
@ -149,7 +147,7 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>688</xloc>
<xloc>864</xloc>
<yloc>464</yloc>
<attributes_hac/>
</action>
@ -166,7 +164,7 @@
<valuetype>variable</valuetype>
<variablename>NEED_TO_REPEAT_JOB</variablename>
<parallel>N</parallel>
<xloc>688</xloc>
<xloc>864</xloc>
<yloc>608</yloc>
<attributes_hac/>
</action>
@ -192,19 +190,32 @@
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>688</xloc>
<xloc>864</xloc>
<yloc>800</yloc>
<attributes_hac/>
</action>
<action>
<name>init_job_name</name>
<description/>
<type>SET_VARIABLES</type>
<attributes/>
<fields>
<field>
<variable_name>JOB_NAME</variable_name>
<variable_type>CURRENT_WORKFLOW</variable_type>
<variable_value>work_activity_job</variable_value>
</field>
</fields>
<file_variable_type>CURRENT_WORKFLOW</file_variable_type>
<filename/>
<replacevars>N</replacevars>
<parallel>N</parallel>
<xloc>384</xloc>
<yloc>304</yloc>
<attributes_hac/>
</action>
</actions>
<hops>
<hop>
<from>Start</from>
<to>check_if_work_activity_job_exists.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>check_if_work_activity_job_exists.hpl</from>
<to> work_activity_job_exists_check</to>
@ -247,6 +258,20 @@
<evaluation>N</evaluation>
<unconditional>N</unconditional>
</hop>
<hop>
<from>Start</from>
<to>init_job_name</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>init_job_name</from>
<to>check_if_work_activity_job_exists.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
</hops>
<notepads>
<notepad>
@ -263,8 +288,8 @@
<fontitalic>N</fontitalic>
<fontsize>-1</fontsize>
<height>26</height>
<xloc>880</xloc>
<yloc>224</yloc>
<xloc>1200</xloc>
<yloc>256</yloc>
<note>Первичка</note>
<width>65</width>
</notepad>
@ -282,7 +307,7 @@
<fontitalic>N</fontitalic>
<fontsize>-1</fontsize>
<height>26</height>
<xloc>800</xloc>
<xloc>1088</xloc>
<yloc>560</yloc>
<note>Повторка</note>
<width>64</width>
@ -301,8 +326,8 @@
<fontitalic>N</fontitalic>
<fontsize>-1</fontsize>
<height>26</height>
<xloc>608</xloc>
<yloc>720</yloc>
<xloc>848</xloc>
<yloc>864</yloc>
<note>Дельта</note>
<width>49</width>
</notepad>