ERVU-484 : fix delta for employer and IP

This commit is contained in:
Fusionshh 2025-09-30 16:56:34 +03:00
parent 1ee22f36ef
commit dd058f9d7f
2 changed files with 1142 additions and 404 deletions

View file

@ -7,7 +7,18 @@
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<pipeline_status>0</pipeline_status>
<parameters>
<parameter>
<name>IDM_ID</name>
<default_value>d229c06e-06d8-4409-82a2-101e088f1649</default_value>
<description/>
</parameter>
<parameter>
<name>MAX_SOURCE_UPDATE_DATE</name>
<default_value>2020-09-10</default_value>
<description/>
</parameter>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
@ -21,8 +32,13 @@
</notepads>
<order>
<hop>
<from>Change job status on error</from>
<to>Abort</to>
<from>delete_akt_become_null</from>
<to>update_akt_flag_on_delete</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_akt</from>
<to>update_akt_flag</to>
<enabled>Y</enabled>
</hop>
<hop>
@ -31,53 +47,103 @@
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>employer_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>employer_upsert</from>
<to>Change job status on error</to>
<from>employer_input</from>
<to>filter_akt_rows_for_delete</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>employer_input</from>
<to>Identify last row in a stream</to>
<to>filter_akt_rows_for_update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Change job status on success</to>
<from>employer_input</from>
<to>filter_pred_rows_for_update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<from>filter_akt_rows_for_delete</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>delete_akt_become_null</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_akt_rows_for_update</from>
<to>Detect empty stream 2</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream 2</from>
<to>upsert_akt</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_pred_rows_for_update</from>
<to>Detect empty stream 3</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream 3</from>
<to>upsert_pred</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag_on_delete</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_pred</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter rows</from>
<to>Block until transforms finish 2</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish 2</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Update</to>
<from>Change job status on error</from>
<to>Abort</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<from>delete_akt_become_null</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>employer_upsert</from>
<to>Block until transforms finish</to>
<from>update_akt_flag_on_delete</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>Filter rows</to>
<from>upsert_akt</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_pred</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
</order>
@ -97,8 +163,36 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>544</yloc>
<xloc>944</xloc>
<yloc>1264</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish 2</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>update_akt_flag_on_delete</name>
</transform>
<transform>
<name>update_akt_flag</name>
</transform>
<transform>
<name>upsert_pred</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1616</xloc>
<yloc>912</yloc>
</GUI>
</transform>
<transform>
@ -132,8 +226,8 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>864</xloc>
<yloc>544</yloc>
<xloc>768</xloc>
<yloc>1264</yloc>
</GUI>
</transform>
<transform>
@ -163,8 +257,8 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>240</yloc>
<xloc>1840</xloc>
<yloc>912</yloc>
</GUI>
</transform>
<transform>
@ -195,8 +289,8 @@ where job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';</sql>
<attributes/>
<GUI>
<xloc>320</xloc>
<yloc>384</yloc>
<xloc>128</xloc>
<yloc>944</yloc>
</GUI>
</transform>
<transform>
@ -212,8 +306,42 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<attributes/>
<GUI>
<xloc>672</xloc>
<yloc>240</yloc>
<xloc>768</xloc>
<yloc>832</yloc>
</GUI>
</transform>
<transform>
<name>Detect empty stream 2</name>
<type>DetectEmptyStream</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<attributes/>
<GUI>
<xloc>768</xloc>
<yloc>944</yloc>
</GUI>
</transform>
<transform>
<name>Detect empty stream 3</name>
<type>DetectEmptyStream</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<attributes/>
<GUI>
<xloc>768</xloc>
<yloc>1056</yloc>
</GUI>
</transform>
<transform>
@ -245,16 +373,49 @@ and recruitment_id = '${IDM_ID}';</sql>
</value>
</condition>
</compare>
<send_true_to>Change job status on success</send_true_to>
<send_true_to>Block until transforms finish 2</send_true_to>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>384</yloc>
<xloc>1440</xloc>
<yloc>912</yloc>
</GUI>
</transform>
<transform>
<name>Identify last row in a stream</name>
<type>DetectLastRow</type>
<name>delete_akt_become_null</name>
<type>Delete</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<lookup>
<key>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
</key>
<key>
<condition>=</condition>
<field>actual_employer</field>
<name>akt_actual_employer</name>
</key>
<schema>ervu_dashboard</schema>
<table>employer</table>
</lookup>
<attributes/>
<GUI>
<xloc>944</xloc>
<yloc>832</yloc>
</GUI>
</transform>
<transform>
<name>employer_input</name>
<type>TableInput</type>
<description/>
<distribute>N</distribute>
<custom_distribution/>
@ -263,15 +424,260 @@ and recruitment_id = '${IDM_ID}';</sql>
<method>none</method>
<schema_name/>
</partitioning>
<resultfieldname>last_row</resultfieldname>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>1</limit>
<sql>WITH fd AS (SELECT ri.recruit_id,
(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' ->> 'id') AS source_id,
ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'aktRabotodat' AS akt_rabotodat,
ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'predRabotodat' AS pred_rabotodat
FROM recruits_info ri
WHERE ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'aktRabotodat' IS NOT NULL
AND (
to_date(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'aktRabotodat' ->> 'dataSved',
'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
OR to_date(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'predRabotodat' ->> 'dataSved',
'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
)
AND COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
SELECT fd.recruit_id,
fd.source_id,
to_date(pred_rabotodat ->> 'dataSved', 'YYYY-MM-DD') as pred_source_update_date,
COALESCE(pred_rabotodat -> 'extend' ->> 'statusRabotodat' = '1', false) as pred_status,
pred_rabotodat -> 'extend' ->> 'adresRabotodat' as pred_address,
pred_rabotodat -> 'extend' ->> 'tipPodrazdRabotodat' as pred_separate_unit_type,
pred_rabotodat -> 'extend' ->> 'adresPodrazdRabotodat' as pred_separate_unit_address,
case
when pred_rabotodat ->> 'svedIP' is not null and pred_rabotodat ->> 'svedIP' != 'null'
then 'Индивидуальный предприниматель'
when pred_rabotodat ->> 'svedYUL' is not null and pred_rabotodat ->> 'svedYUL' != 'null'
then 'Юридическое лицо'
when pred_rabotodat ->> 'svedGlKFH' is not null and pred_rabotodat ->> 'svedGlKFH' != 'null' then 'Глава КФХ'
when pred_rabotodat ->> 'svedPlatFL' is not null and pred_rabotodat ->> 'svedPlatFL' != 'null'
then 'Физическое лицо'
end as pred_employer_category_name,
pred_rabotodat -> 'svedYUL' -> 'naimOrg' as pred_name,
case
when pred_rabotodat ->> 'svedIP' is not null and pred_rabotodat ->> 'svedIP' != 'null'
then pred_rabotodat -> 'svedIP' ->> 'innfl'
when pred_rabotodat ->> 'svedYUL' is not null and pred_rabotodat ->> 'svedYUL' != 'null'
then pred_rabotodat -> 'svedYUL' ->> 'innyul'
when pred_rabotodat ->> 'svedGlKFH' is not null and pred_rabotodat ->> 'svedGlKFH' != 'null'
then pred_rabotodat -> 'svedGlKFH' ->> 'innfl'
when pred_rabotodat ->> 'svedPlatFL' is not null and pred_rabotodat ->> 'svedPlatFL' != 'null'
then pred_rabotodat -> 'svedPlatFL' ->> 'innfl'
end as pred_inn,
case
when pred_rabotodat ->> 'svedIP' is not null and pred_rabotodat ->> 'svedIP' != 'null'
then pred_rabotodat -> 'extend' -> 'svedYUL' ->> 'ogrn'
when pred_rabotodat ->> 'svedYUL' is not null and pred_rabotodat ->> 'svedYUL' != 'null'
then pred_rabotodat -> 'svedIP' ->> 'ogrnip'
end as pred_ogrn,
case
when pred_rabotodat ->> 'svedYUL' is not null and pred_rabotodat ->> 'svedYUL' != 'null'
then pred_rabotodat -> 'extend' -> 'svedYUL' ->> 'kpp'
end as pred_kpp,
coalesce(pred_rabotodat -> 'svedYUL' -> 'extend' ->> 'priznakOpk' = '1', false) as pred_is_opk_org,
to_date(fd.pred_rabotodat ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}' AS pred_need_update,
to_date(akt_rabotodat ->> 'dataSved', 'YYYY-MM-DD') as akt_source_update_date,
COALESCE(akt_rabotodat -> 'extend' ->> 'statusRabotodat' = '1', false) as akt_status,
akt_rabotodat -> 'extend' ->> 'adresRabotodat' as akt_address,
akt_rabotodat -> 'extend' ->> 'tipPodrazdRabotodat' as akt_separate_unit_type,
akt_rabotodat -> 'extend' ->> 'adresPodrazdRabotodat' as akt_separate_unit_address,
case
when akt_rabotodat ->> 'svedIP' is not null and akt_rabotodat ->> 'svedIP' != 'null'
then 'Индивидуальный предприниматель'
when akt_rabotodat ->> 'svedYUL' is not null and akt_rabotodat ->> 'svedYUL' != 'null'
then 'Юридическое лицо'
when akt_rabotodat ->> 'svedGlKFH' is not null and akt_rabotodat ->> 'svedGlKFH' != 'null' then 'Глава КФХ'
when akt_rabotodat ->> 'svedPlatFL' is not null and akt_rabotodat ->> 'svedPlatFL' != 'null'
then 'Физическое лицо'
end as akt_employer_category_name,
akt_rabotodat -> 'svedYUL' -> 'naimOrg' as akt_name,
case
when akt_rabotodat ->> 'svedIP' is not null and akt_rabotodat ->> 'svedIP' != 'null'
then akt_rabotodat -> 'svedIP' ->> 'innfl'
when akt_rabotodat ->> 'svedYUL' is not null and akt_rabotodat ->> 'svedYUL' != 'null'
then akt_rabotodat -> 'svedYUL' ->> 'innyul'
when akt_rabotodat ->> 'svedGlKFH' is not null and akt_rabotodat ->> 'svedGlKFH' != 'null'
then akt_rabotodat -> 'svedGlKFH' ->> 'innfl'
when akt_rabotodat ->> 'svedPlatFL' is not null and akt_rabotodat ->> 'svedPlatFL' != 'null'
then akt_rabotodat -> 'svedPlatFL' ->> 'innfl'
end as akt_inn,
case
when akt_rabotodat ->> 'svedIP' is not null and akt_rabotodat ->> 'svedIP' != 'null'
then akt_rabotodat -> 'extend' -> 'svedYUL' ->> 'ogrn'
when akt_rabotodat ->> 'svedYUL' is not null and akt_rabotodat ->> 'svedYUL' != 'null'
then akt_rabotodat -> 'svedIP' ->> 'ogrnip'
end as akt_ogrn,
case
when akt_rabotodat ->> 'svedYUL' is not null and akt_rabotodat ->> 'svedYUL' != 'null'
then akt_rabotodat -> 'extend' -> 'svedYUL' ->> 'kpp'
end as akt_kpp,
coalesce(akt_rabotodat -> 'svedYUL' -> 'extend' ->> 'priznakOpk', '0') = '1' as akt_is_opk_org,
akt_rabotodat is not null as akt_actual_employer,
jsonb_typeof(akt_rabotodat) IS NULL as delete_akt,
to_date(fd.akt_rabotodat ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}' as akt_need_update
FROM fd;</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>672</xloc>
<yloc>384</yloc>
<xloc>336</xloc>
<yloc>944</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<name>filter_akt_rows_for_delete</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>delete_akt</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Detect empty stream</send_true_to>
<attributes/>
<GUI>
<xloc>592</xloc>
<yloc>832</yloc>
</GUI>
</transform>
<transform>
<name>filter_akt_rows_for_update</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>akt_need_update</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Detect empty stream 2</send_true_to>
<attributes/>
<GUI>
<xloc>592</xloc>
<yloc>944</yloc>
</GUI>
</transform>
<transform>
<name>filter_pred_rows_for_update</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>pred_need_update</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Detect empty stream 3</send_true_to>
<attributes/>
<GUI>
<xloc>592</xloc>
<yloc>1056</yloc>
</GUI>
</transform>
<transform>
<name>update_akt_flag</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>&lt;></condition>
<field>employed</field>
<name>akt_actual_employer</name>
</key>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>employed</name>
<rename>akt_actual_employer</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1136</xloc>
<yloc>944</yloc>
</GUI>
</transform>
<transform>
<name>update_akt_flag_on_delete</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
@ -287,32 +693,32 @@ and recruitment_id = '${IDM_ID}';</sql>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<field>employed</field>
<name>delete_akt</name>
</key>
<key>
<condition>&lt;></condition>
<field>employed</field>
<name>actual_employer</name>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>employed</name>
<rename>actual_employer</rename>
<rename>akt_actual_employer</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>672</xloc>
<yloc>544</yloc>
<xloc>1136</xloc>
<yloc>832</yloc>
</GUI>
</transform>
<transform>
<name>employer_input</name>
<type>TableInput</type>
<name>upsert_akt</name>
<type>InsertUpdate</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
@ -321,78 +727,101 @@ and recruitment_id = '${IDM_ID}';</sql>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>with filteredData as (select ri.recruit_id,
(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' ->> 'id') as source_id,
ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'aktRabotodat' as rabotodat,
true as actual_employer
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' ->> 'aktRabotodat' != 'null'
and to_date(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'aktRabotodat' ->>
'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
union all
select ri.recruit_id,
(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' ->> 'id') as source_id,
ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'predRabotodat' as rabotodat,
false as actual_employer
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' ->> 'predRabotodat' != 'null'
and to_date(ri.info -> 'svedFL' -> 'svedRabotodat' -> 'rabotodat' -> 'predRabotodat' ->>
'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
select fd.source_id,
fd.recruit_id,
to_date(rabotodat ->> 'dataSved', 'YYYY-MM-DD') as source_update_date,
COALESCE(rabotodat -> 'extend' ->> 'status', '0') = '1' AS status,
rabotodat -> 'extend' ->> 'adresRabotodat' as address,
rabotodat -> 'extend' ->> 'tipPodrazdRabotodat' as separate_unit_type,
rabotodat -> 'extend' ->> 'adresPodrazdRabotodat' as separate_unit_address,
case
when rabotodat ->> 'svedIP' is not null and rabotodat ->> 'svedIP' != 'null'
then 'Индивидуальный предприниматель'
when rabotodat ->> 'svedYUL' is not null and rabotodat ->> 'svedYUL' != 'null' then 'Юридическое лицо'
when rabotodat ->> 'svedGlKFH' is not null and rabotodat ->> 'svedGlKFH' != 'null' then 'Глава КФХ'
when rabotodat ->> 'svedPlatFL' is not null and rabotodat ->> 'svedPlatFL' != 'null' then 'Физическое лицо'
end as employer_category_name,
rabotodat -> 'svedYUL' -> 'naimOrg' as name,
case
when rabotodat ->> 'svedIP' is not null and rabotodat ->> 'svedIP' != 'null'
then rabotodat -> 'svedIP' ->> 'innfl'
when rabotodat ->> 'svedYUL' is not null and rabotodat ->> 'svedYUL' != 'null'
then rabotodat -> 'svedYUL' ->> 'innyul'
when rabotodat ->> 'svedGlKFH' is not null and rabotodat ->> 'svedGlKFH' != 'null'
then rabotodat -> 'svedGlKFH' ->> 'innfl'
when rabotodat ->> 'svedPlatFL' is not null and rabotodat ->> 'svedPlatFL' != 'null'
then rabotodat -> 'svedPlatFL' ->> 'innfl'
end as inn,
case
when rabotodat ->> 'svedIP' is not null and rabotodat ->> 'svedIP' != 'null'
then rabotodat -> 'extend' -> 'svedYUL' ->> 'ogrn'
when rabotodat ->> 'svedYUL' is not null and rabotodat ->> 'svedYUL' != 'null'
then rabotodat -> 'svedIP' ->> 'ogrnip'
end as ogrn,
case
when rabotodat ->> 'svedYUL' is not null and rabotodat ->> 'svedYUL' != 'null'
then rabotodat -> 'extend' -> 'svedYUL' ->> 'kpp'
end as kpp,
coalesce(rabotodat -> 'svedYUL' -> 'extend' ->> 'priznakOpk', '0') = '1' as is_opk_org,
actual_employer,
coalesce(rabotodat -> 'extend' ->> 'statusRabotodat', '0') = '1' as status
from filteredData fd;</sql>
<variables_active>Y</variables_active>
<lookup>
<key>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
</key>
<key>
<condition>=</condition>
<field>actual_employer</field>
<name>akt_actual_employer</name>
</key>
<schema>ervu_dashboard</schema>
<table>employer</table>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
<update>N</update>
</value>
<value>
<name>source_id</name>
<rename>source_id</rename>
<update>N</update>
</value>
<value>
<name>source_update_date</name>
<rename>akt_source_update_date</rename>
<update>Y</update>
</value>
<value>
<name>status</name>
<rename>akt_status</rename>
<update>Y</update>
</value>
<value>
<name>address</name>
<rename>akt_address</rename>
<update>Y</update>
</value>
<value>
<name>separate_unit_type</name>
<rename>akt_separate_unit_type</rename>
<update>Y</update>
</value>
<value>
<name>separate_unit_address</name>
<rename>akt_separate_unit_address</rename>
<update>Y</update>
</value>
<value>
<name>employer_category_name</name>
<rename>akt_employer_category_name</rename>
<update>Y</update>
</value>
<value>
<name>name</name>
<rename>akt_name</rename>
<update>Y</update>
</value>
<value>
<name>inn</name>
<rename>akt_inn</rename>
<update>Y</update>
</value>
<value>
<name>ogrn</name>
<rename>akt_ogrn</rename>
<update>Y</update>
</value>
<value>
<name>kpp</name>
<rename>akt_kpp</rename>
<update>Y</update>
</value>
<value>
<name>is_opk_org</name>
<rename>akt_is_opk_org</rename>
<update>Y</update>
</value>
<value>
<name>actual_employer</name>
<rename>akt_actual_employer</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>496</xloc>
<yloc>384</yloc>
<xloc>944</xloc>
<yloc>944</yloc>
</GUI>
</transform>
<transform>
<name>employer_upsert</name>
<name>upsert_pred</name>
<type>InsertUpdate</type>
<description/>
<distribute>Y</distribute>
@ -413,117 +842,90 @@ from filteredData fd;</sql>
<key>
<condition>&lt;></condition>
<field>actual_employer</field>
<name>actual_employer</name>
<name>pred_need_update</name>
</key>
<schema>ervu_dashboard</schema>
<table>employer</table>
<value>
<name>source_id</name>
<rename>source_id</rename>
<update>N</update>
</value>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
<update>N</update>
</value>
<value>
<name>source_update_date</name>
<rename>source_update_date</rename>
<update>Y</update>
<name>source_id</name>
<rename>source_id</rename>
<update>N</update>
</value>
<value>
<name>address</name>
<rename>address</rename>
<name>source_update_date</name>
<rename>pred_source_update_date</rename>
<update>Y</update>
</value>
<value>
<name>separate_unit_type_code</name>
<rename>separate_unit_type</rename>
<rename>pred_separate_unit_type</rename>
<update>Y</update>
</value>
<value>
<name>separate_unit_address</name>
<rename>separate_unit_address</rename>
<rename>pred_separate_unit_address</rename>
<update>Y</update>
</value>
<value>
<name>address</name>
<rename>pred_address</rename>
<update>Y</update>
</value>
<value>
<name>employer_category_name</name>
<rename>employer_category_name</rename>
<rename>pred_employer_category_name</rename>
<update>Y</update>
</value>
<value>
<name>name</name>
<rename>name</rename>
<rename>pred_name</rename>
<update>Y</update>
</value>
<value>
<name>inn</name>
<rename>inn</rename>
<rename>pred_inn</rename>
<update>Y</update>
</value>
<value>
<name>ogrn</name>
<rename>ogrn</rename>
<rename>pred_ogrn</rename>
<update>Y</update>
</value>
<value>
<name>kpp</name>
<rename>kpp</rename>
<update>Y</update>
</value>
<value>
<name>actual_employer</name>
<rename>actual_employer</rename>
<rename>pred_kpp</rename>
<update>Y</update>
</value>
<value>
<name>is_opk_org</name>
<rename>is_opk_org</rename>
<rename>pred_is_opk_org</rename>
<update>Y</update>
</value>
<value>
<name>status</name>
<rename>status</rename>
<rename>pred_status</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>864</xloc>
<yloc>384</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>Update</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1040</xloc>
<yloc>384</yloc>
<xloc>1136</xloc>
<yloc>1056</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<source_transform>delete_akt_become_null</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
@ -531,7 +933,43 @@ from filteredData fd;</sql>
<min_pct_rows/>
</error>
<error>
<source_transform>employer_upsert</source_transform>
<source_transform>update_akt_flag</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>update_akt_flag_on_delete</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>upsert_akt</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>upsert_pred</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>

View file

@ -20,6 +20,11 @@
<notepads>
</notepads>
<order>
<hop>
<from>Block until transforms finish 2</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Change job status on error</from>
<to>Abort</to>
@ -27,22 +32,107 @@
</hop>
<hop>
<from>Create job execution record</from>
<to>individual_entrepreneur_input</to>
<to>ip_input</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>delete_akt_become_null</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream 2</from>
<to>upsert_akt</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream 3</from>
<to>upsert_pred</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Filter rows</from>
<to>Change job status on success</to>
<to>Block until transforms finish 2</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>individual_entrepreneur_upsert</to>
<from>delete_akt_become_null</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>individual_entrepreneur_input</from>
<to>Identify last row in a stream</to>
<from>delete_akt_become_null</from>
<to>update_akt_flag_on_delete</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_akt_rows_for_delete</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_akt_rows_for_update</from>
<to>Detect empty stream 2</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_pred_rows_for_update</from>
<to>Detect empty stream 3</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>ip_input</from>
<to>filter_akt_rows_for_delete</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>ip_input</from>
<to>filter_akt_rows_for_update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>ip_input</from>
<to>filter_pred_rows_for_update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag_on_delete</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>update_akt_flag_on_delete</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_akt</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_akt</from>
<to>update_akt_flag</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_pred</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>upsert_pred</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
<hop>
@ -50,61 +140,21 @@
<to>unique_by_rec_id</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>ip_input</from>
<to>filter_null_npd_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>unique_by_rec_id</from>
<to>individual_entrepreneur_npd_upsert</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>Detect empty stream</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_null_npd_dates</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>individual_entrepreneur_upsert</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>individual_entrepreneur_npd_upsert</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Detect empty stream</from>
<to>Change job status on success</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>filter_non_actual_ip</from>
<to>Update</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Identify last row in a stream</from>
<to>filter_non_actual_ip</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Update</from>
<to>Change job status on error</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>individual_entrepreneur_upsert</from>
<to>Block until transforms finish</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Block until transforms finish</from>
<to>Filter rows</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Abort</name>
@ -122,8 +172,36 @@
<row_threshold>0</row_threshold>
<attributes/>
<GUI>
<xloc>1808</xloc>
<yloc>752</yloc>
<xloc>1872</xloc>
<yloc>576</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish 2</name>
<type>BlockUntilTransformsFinish</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<name>update_akt_flag_on_delete</name>
</transform>
<transform>
<name>update_akt_flag</name>
</transform>
<transform>
<name>upsert_pred</name>
</transform>
</transforms>
<attributes/>
<GUI>
<xloc>1872</xloc>
<yloc>400</yloc>
</GUI>
</transform>
<transform>
@ -149,7 +227,7 @@
<set_params>Y</set_params>
<single_statement>N</single_statement>
<sql>UPDATE etl.job_execution
SET status = 'DELTA_ERROR',
SET status = 'ERROR',
error_description = ?
WHERE job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';
@ -157,8 +235,8 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1328</xloc>
<yloc>752</yloc>
<xloc>1696</xloc>
<yloc>576</yloc>
</GUI>
</transform>
<transform>
@ -188,8 +266,8 @@ and recruitment_id = '${IDM_ID}';
</sql>
<attributes/>
<GUI>
<xloc>1808</xloc>
<yloc>272</yloc>
<xloc>2096</xloc>
<yloc>400</yloc>
</GUI>
</transform>
<transform>
@ -220,8 +298,8 @@ where job_name = '${JOB_NAME}'
and recruitment_id = '${IDM_ID}';</sql>
<attributes/>
<GUI>
<xloc>272</xloc>
<yloc>448</yloc>
<xloc>384</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
@ -237,8 +315,42 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<attributes/>
<GUI>
<xloc>736</xloc>
<yloc>272</yloc>
<xloc>1024</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>Detect empty stream 2</name>
<type>DetectEmptyStream</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<attributes/>
<GUI>
<xloc>1024</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
<name>Detect empty stream 3</name>
<type>DetectEmptyStream</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<attributes/>
<GUI>
<xloc>1024</xloc>
<yloc>544</yloc>
</GUI>
</transform>
<transform>
@ -270,34 +382,16 @@ and recruitment_id = '${IDM_ID}';</sql>
</value>
</condition>
</compare>
<send_true_to>Change job status on success</send_true_to>
<send_true_to>Block until transforms finish 2</send_true_to>
<attributes/>
<GUI>
<xloc>1808</xloc>
<yloc>448</yloc>
<xloc>1696</xloc>
<yloc>400</yloc>
</GUI>
</transform>
<transform>
<name>Identify last row in a stream</name>
<type>DetectLastRow</type>
<description/>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<resultfieldname>last_row</resultfieldname>
<attributes/>
<GUI>
<xloc>736</xloc>
<yloc>448</yloc>
</GUI>
</transform>
<transform>
<name>Update</name>
<type>Update</type>
<name>delete_akt_become_null</name>
<type>Delete</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
@ -308,35 +402,28 @@ and recruitment_id = '${IDM_ID}';</sql>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
<field>source_id</field>
<name>source_id</name>
</key>
<key>
<condition>&lt;></condition>
<field>individual_entrepreneur</field>
<name>actual</name>
<condition>=</condition>
<field>actual</field>
<name>delete_akt</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>individual_entrepreneur</name>
<rename>actual</rename>
</value>
<table>individual_entrepreneur</table>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1104</xloc>
<yloc>592</yloc>
<xloc>1200</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>filter_non_actual_ip</name>
<name>filter_akt_rows_for_delete</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
@ -351,7 +438,7 @@ and recruitment_id = '${IDM_ID}';</sql>
<conditions>
</conditions>
<function>=</function>
<leftvalue>actual</leftvalue>
<leftvalue>delete_akt</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
@ -364,11 +451,47 @@ and recruitment_id = '${IDM_ID}';</sql>
</value>
</condition>
</compare>
<send_true_to>Update</send_true_to>
<send_true_to>Detect empty stream</send_true_to>
<attributes/>
<GUI>
<xloc>912</xloc>
<yloc>592</yloc>
<xloc>848</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>filter_akt_rows_for_update</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>akt_need_update</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Detect empty stream 2</send_true_to>
<attributes/>
<GUI>
<xloc>848</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
@ -395,13 +518,13 @@ and recruitment_id = '${IDM_ID}';</sql>
<send_true_to>unique_by_rec_id</send_true_to>
<attributes/>
<GUI>
<xloc>736</xloc>
<yloc>752</yloc>
<xloc>848</xloc>
<yloc>656</yloc>
</GUI>
</transform>
<transform>
<name>individual_entrepreneur_input</name>
<type>TableInput</type>
<name>filter_pred_rows_for_update</name>
<type>FilterRows</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
@ -410,52 +533,29 @@ and recruitment_id = '${IDM_ID}';</sql>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>with filtered_data as (select ri.recruit_id,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'id' as source_id,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'aktRegIP' -> 0 as ip_elem,
to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'extend' ->> 'dataUchNPD',
'YYYY-MM-DD') as reg_npd_date,
to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'extend' ->> 'dataSnUchNPD',
'YYYY-MM-DD') as dereg_npd_date,
true as actual
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'aktRegIP' != 'null'
and to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'aktRegIP' ->> 'dataSved',
'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}'
union all
select ri.recruit_id,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'id' as source_id,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'predRegIP' as ip_elem,
to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'extend' ->> 'dataUchNPD',
'YYYY-MM-DD') as reg_npd_date,
to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'extend' ->> 'dataSnUchNPD',
'YYYY-MM-DD') as dereg_npd_date,
false as actual
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'predRegIP' != 'null'
and to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'predRegIP' ->> 'dataSved',
'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
select fd.recruit_id,
fd.source_id,
ip_elem ->> 'ogrnip' as ogrnip,
to_date(ip_elem ->> 'dataSved', 'YYYY-MM-DD') as source_update_date,
to_date(ip_elem ->> 'dataRegIP', 'YYYY-MM-DD') as registration_date,
to_date(ip_elem ->> 'dataINNNed', 'YYYY-MM-DD') as deregistration_date,
reg_npd_date,
dereg_npd_date,
fd.actual as actual
from filtered_data fd;</sql>
<variables_active>Y</variables_active>
<compare>
<condition>
<conditions>
</conditions>
<function>=</function>
<leftvalue>pred_need_update</leftvalue>
<negated>N</negated>
<operator>-</operator>
<value>
<isnull>N</isnull>
<length>-1</length>
<name>constant</name>
<precision>-1</precision>
<text>Y</text>
<type>Boolean</type>
</value>
</condition>
</compare>
<send_true_to>Detect empty stream 3</send_true_to>
<attributes/>
<GUI>
<xloc>496</xloc>
<yloc>448</yloc>
<xloc>848</xloc>
<yloc>544</yloc>
</GUI>
</transform>
<transform>
@ -503,72 +603,60 @@ from filtered_data fd;</sql>
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1104</xloc>
<yloc>752</yloc>
<xloc>1216</xloc>
<yloc>656</yloc>
</GUI>
</transform>
<transform>
<name>individual_entrepreneur_upsert</name>
<type>InsertUpdate</type>
<name>ip_input</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<distribute>N</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard-test</connection>
<lookup>
<key>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>individual_entrepreneur</table>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
<update>N</update>
</value>
<value>
<name>source_id</name>
<rename>source_id</rename>
<update>N</update>
</value>
<value>
<name>ogrnip</name>
<rename>ogrnip</rename>
<update>Y</update>
</value>
<value>
<name>source_update_date</name>
<rename>source_update_date</rename>
<update>Y</update>
</value>
<value>
<name>registration_date</name>
<rename>registration_date</rename>
<update>Y</update>
</value>
<value>
<name>deregistration_date</name>
<rename>dereg_npd_date</rename>
<update>Y</update>
</value>
<value>
<name>actual</name>
<rename>actual</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>1</limit>
<sql>with filtered_data as (select ri.recruit_id,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'id' as source_id,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'aktRegIP' -> 0 as akt_ip_elem,
ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'predRegIP' -> 0 as pred_ip_elem,
to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'extend' ->> 'dataUchNPD',
'YYYY-MM-DD') as reg_npd_date,
to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'extend' ->> 'dataSnUchNPD',
'YYYY-MM-DD') as dereg_npd_date
from recruits_info ri
where ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' ->> 'aktRegIP' != 'null'
and (to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'aktRegIP' ->> 'dataSved',
'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}'
or to_date(ri.info -> 'svedFL' -> 'svedRegIP' -> 'regIP' -> 'predRegIP' ->> 'dataSved',
'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}')
and COALESCE(ri.current_recruitment_id, ri.target_recruitment_id) = '${IDM_ID}')
select fd.recruit_id,
fd.source_id,
reg_npd_date,
dereg_npd_date,
akt_ip_elem ->> 'ogrnip' as akt_ogrnip,
to_date(akt_ip_elem ->> 'dataSved', 'YYYY-MM-DD') as akt_source_update_date,
to_date(akt_ip_elem ->> 'dataRegIP', 'YYYY-MM-DD') as akt_registration_date,
jsonb_typeof(akt_ip_elem) IS NULL as delete_akt,
to_date(fd.akt_ip_elem ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}' as akt_need_update,
pred_ip_elem ->> 'ogrnip' as pred_ogrnip,
to_date(pred_ip_elem ->> 'dataSved', 'YYYY-MM-DD') as pred_source_update_date,
to_date(pred_ip_elem ->> 'dataRegIP', 'YYYY-MM-DD') as pred_registration_date,
to_date(pred_ip_elem ->> 'dataINNNed', 'YYYY-MM-DD') as pred_deregistration_date,
to_date(fd.pred_ip_elem ->> 'dataSved', 'YYYY-MM-DD') > '${MAX_SOURCE_UPDATE_DATE}' as pred_need_update
from filtered_data fd;
</sql>
<variables_active>Y</variables_active>
<attributes/>
<GUI>
<xloc>1328</xloc>
<yloc>448</yloc>
<xloc>592</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
@ -592,13 +680,13 @@ from filtered_data fd;</sql>
</fields>
<attributes/>
<GUI>
<xloc>912</xloc>
<yloc>752</yloc>
<xloc>1024</xloc>
<yloc>656</yloc>
</GUI>
</transform>
<transform>
<name>Block until transforms finish</name>
<type>BlockUntilTransformsFinish</type>
<name>update_akt_flag</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
@ -607,37 +695,201 @@ from filtered_data fd;</sql>
<method>none</method>
<schema_name/>
</partitioning>
<transforms>
<transform>
<CopyNr/>
<name>Update</name>
</transform>
<transform>
<CopyNr/>
<name>individual_entrepreneur_npd_upsert</name>
</transform>
</transforms>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>&lt;></condition>
<field>individual_entrepreneur</field>
<name>akt_actual_employer</name>
</key>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>individual_entrepreneur</name>
<rename>delete_akt</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1568</xloc>
<yloc>448</yloc>
<xloc>1392</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
<name>update_akt_flag_on_delete</name>
<type>Update</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<error_ignored>N</error_ignored>
<lookup>
<key>
<condition>=</condition>
<field>employed</field>
<name>delete_akt</name>
</key>
<key>
<condition>=</condition>
<field>recruit_id</field>
<name>recruit_id</name>
</key>
<schema>ervu_dashboard</schema>
<table>citizen</table>
<value>
<name>individual_entrepreneur</name>
<rename>akt_need_update</rename>
</value>
</lookup>
<skip_lookup>N</skip_lookup>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>1392</xloc>
<yloc>320</yloc>
</GUI>
</transform>
<transform>
<name>upsert_akt</name>
<type>InsertUpdate</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<lookup>
<key>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
</key>
<key>
<condition>=</condition>
<field>actual</field>
<name>akt_need_update</name>
</key>
<schema>ervu_dashboard</schema>
<table>individual_entrepreneur</table>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
<update>N</update>
</value>
<value>
<name>source_id</name>
<rename>source_id</rename>
<update>N</update>
</value>
<value>
<name>ogrnip</name>
<rename>akt_ogrnip</rename>
<update>Y</update>
</value>
<value>
<name>source_update_date</name>
<rename>akt_source_update_date</rename>
<update>Y</update>
</value>
<value>
<name>registration_date</name>
<rename>akt_registration_date</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1200</xloc>
<yloc>432</yloc>
</GUI>
</transform>
<transform>
<name>upsert_pred</name>
<type>InsertUpdate</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>10000</commit>
<connection>ervu-dashboard</connection>
<lookup>
<key>
<condition>=</condition>
<field>source_id</field>
<name>source_id</name>
</key>
<key>
<condition>&lt;></condition>
<field>actual</field>
<name>pred_need_update</name>
</key>
<schema>ervu_dashboard</schema>
<table>individual_entrepreneur</table>
<value>
<name>recruit_id</name>
<rename>recruit_id</rename>
<update>N</update>
</value>
<value>
<name>source_id</name>
<rename>source_id</rename>
<update>N</update>
</value>
<value>
<name>ogrnip</name>
<rename>pred_ogrnip</rename>
<update>Y</update>
</value>
<value>
<name>source_update_date</name>
<rename>pred_source_update_date</rename>
<update>Y</update>
</value>
<value>
<name>registration_date</name>
<rename>pred_registration_date</rename>
<update>Y</update>
</value>
<value>
<name>deregistration_date</name>
<rename>pred_deregistration_date</rename>
<update>Y</update>
</value>
</lookup>
<update_bypassed>N</update_bypassed>
<attributes/>
<GUI>
<xloc>1392</xloc>
<yloc>544</yloc>
</GUI>
</transform>
<transform_error_handling>
<error>
<source_transform>Update</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>individual_entrepreneur_npd_upsert</source_transform>
<source_transform>delete_akt_become_null</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
@ -649,7 +901,55 @@ from filtered_data fd;</sql>
<min_pct_rows/>
</error>
<error>
<source_transform>individual_entrepreneur_upsert</source_transform>
<source_transform>individual_entrepreneur_npd_upsert</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename/>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>update_akt_flag</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>update_akt_flag_on_delete</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>upsert_akt</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>
<descriptions_valuename>error_description</descriptions_valuename>
<fields_valuename/>
<codes_valuename/>
<max_errors/>
<max_pct_errors/>
<min_pct_rows/>
</error>
<error>
<source_transform>upsert_pred</source_transform>
<target_transform>Change job status on error</target_transform>
<is_enabled>Y</is_enabled>
<nr_valuename/>