predictive_metrics

This commit is contained in:
Timur 2025-02-27 14:09:03 +03:00
parent 71c554d36b
commit 6f41e1c6ef
4 changed files with 1029 additions and 0 deletions

View file

@ -0,0 +1,207 @@
<?xml version="1.0" encoding="UTF-8"?>
<pipeline>
<info>
<name>data_collection</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<parameters>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
<created_user>-</created_user>
<created_date>2025/02/27 12:45:14.184</created_date>
<modified_user>-</modified_user>
<modified_date>2025/02/27 12:45:14.184</modified_date>
</info>
<notepads>
</notepads>
<order>
<hop>
<from>Table input</from>
<to>Merge join</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Table input 2</from>
<to>Merge join</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Merge join</from>
<to>Table output</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Merge join</name>
<type>MergeJoin</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<join_type>INNER</join_type>
<keys_1>
<key>recruitment_id</key>
</keys_1>
<keys_2>
<key>idm_id</key>
</keys_2>
<transform1>Table input</transform1>
<transform2>Table input 2</transform2>
<attributes/>
<GUI>
<xloc>768</xloc>
<yloc>368</yloc>
</GUI>
</transform>
<transform>
<name>Table input</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>postgres.person_registry</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>WITH year_series AS (
SELECT generate_series(
date_trunc('year', CURRENT_DATE) - INTERVAL '54 years',
date_trunc('year', CURRENT_DATE) - INTERVAL '1 year',
INTERVAL '1 year'
)::date AS year_date
)
SELECT
COALESCE(r.target_recruitment_id, r.current_recruitment_id) as recruitment_id,
ys.year_date AS year,
COALESCE(COUNT(r.id) FILTER (
WHERE r.military_registration_date >= ys.year_date
AND r.military_registration_date &lt; ys.year_date + INTERVAL '1 year'
AND (r.vu_unset_date IS NULL OR r.vu_unset_date &lt; ys.year_date)
), 0) AS count_all,
COALESCE(COUNT(r.id) FILTER (
WHERE r.military_registration_date >= ys.year_date
AND r.military_registration_date &lt; ys.year_date + INTERVAL '1 year'
AND r.registration_reasons @> '["3"]'::jsonb
), 0) AS reaching_17_age,
COALESCE(COUNT(r.id) FILTER (
WHERE r.military_registration_date >= ys.year_date
AND r.military_registration_date &lt; ys.year_date + INTERVAL '1 year'
AND r.registration_reasons @> '["5"]'::jsonb
), 0) AS women_military_specialty,
'Department' as schema
FROM year_series ys
LEFT JOIN recruits r ON 1=1 -- Соединяем со всеми записями, фильтрация происходит в FILTER
GROUP BY ys.year_date, COALESCE(r.target_recruitment_id, r.current_recruitment_id)
ORDER BY recruitment_id,year DESC;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>544</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform>
<name>Table input 2</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>-- Берем только записи с максимальной версией для каждого idm_id
SELECT DISTINCT ON (idm_id) idm_id
FROM ervu_dashboard.recruitment
WHERE schema = 'Department'
ORDER BY idm_id, updated_at DESC
</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>544</xloc>
<yloc>496</yloc>
</GUI>
</transform>
<transform>
<name>Table output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>1000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>recruitment_id</column_name>
<stream_name>recruitment_id</stream_name>
</field>
<field>
<column_name>recording_date</column_name>
<stream_name>year</stream_name>
</field>
<field>
<column_name>count_all</column_name>
<stream_name>count_all</stream_name>
</field>
<field>
<column_name>reaching_17_age</column_name>
<stream_name>reaching_17_age</stream_name>
</field>
<field>
<column_name>women_military_specialty</column_name>
<stream_name>women_military_specialty</stream_name>
</field>
<field>
<column_name>schema</column_name>
<stream_name>schema</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>forecast</schema>
<specify_fields>Y</specify_fields>
<table>registered_citizens</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>Y</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>960</xloc>
<yloc>368</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>
</pipeline>

View file

@ -0,0 +1,177 @@
<?xml version="1.0" encoding="UTF-8"?>
<pipeline>
<info>
<name>next_lvl_org_pm</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<parameters>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
<created_user>-</created_user>
<created_date>2025/02/27 12:55:12.966</created_date>
<modified_user>-</modified_user>
<modified_date>2025/02/27 12:55:12.966</modified_date>
</info>
<notepads>
</notepads>
<order>
<hop>
<from>Table input</from>
<to>Table output</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Table input</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>WITH latest_hierarchy AS (
-- Берем только записи с максимальной версией для каждого idm_id
SELECT DISTINCT ON (idm_id) *
FROM ervu_dashboard.recruitment
ORDER BY idm_id, updated_at DESC
),
hierarchy_cte AS (
-- Строим связи между уровнями (Department → Organization → Region → Ministry)
SELECT
h.idm_id AS department_id,
h.parent_id AS organization_id,
h2.parent_id AS region_id,
h3.parent_id AS ministry_id
FROM latest_hierarchy h
LEFT JOIN latest_hierarchy h2 ON h.parent_id = h2.idm_id -- Organization
LEFT JOIN latest_hierarchy h3 ON h2.parent_id = h3.idm_id -- Region
WHERE h.schema = 'Department' -- Начинаем с Department
),
aggregated_counts AS (
-- Агрегируем для Organization
SELECT
h.organization_id AS level_id,
SUM(r.reaching_17_age) AS total_reaching_17_age,
SUM(r.women_military_specialty) AS total_women_military_specialty,
SUM(r.count_all) AS total_count_all,
recording_date AS recording_date,
'Organization' AS level
FROM forecast.registered_citizens r
JOIN hierarchy_cte h ON r.recruitment_id = h.department_id
WHERE h.organization_id IS NOT NULL
GROUP BY h.organization_id, recording_date
UNION ALL
-- Агрегируем для Region
SELECT
h.region_id AS level_id,
SUM(r.reaching_17_age) AS total_reaching_17_age,
SUM(r.women_military_specialty) AS total_women_military_specialty,
SUM(r.count_all) AS total_count_all,
recording_date AS recording_date,
'Region' AS level
FROM forecast.registered_citizens r
JOIN hierarchy_cte h ON r.recruitment_id = h.department_id
WHERE h.region_id IS NOT NULL
GROUP BY h.region_id, recording_date
UNION ALL
-- Агрегируем для Ministry
SELECT
h.ministry_id AS level_id,
SUM(r.reaching_17_age) AS total_reaching_17_age,
SUM(r.women_military_specialty) AS total_women_military_specialty,
SUM(r.count_all) AS total_count_all,
recording_date AS recording_date,
'Ministry' AS level
FROM forecast.registered_citizens r
JOIN hierarchy_cte h ON r.recruitment_id = h.department_id
WHERE h.ministry_id IS NOT NULL
GROUP BY h.ministry_id, recording_date
)
SELECT * FROM aggregated_counts;</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>672</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform>
<name>Table output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>1000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>recruitment_id</column_name>
<stream_name>level_id</stream_name>
</field>
<field>
<column_name>reaching_17_age</column_name>
<stream_name>total_reaching_17_age</stream_name>
</field>
<field>
<column_name>women_military_specialty</column_name>
<stream_name>total_women_military_specialty</stream_name>
</field>
<field>
<column_name>count_all</column_name>
<stream_name>total_count_all</stream_name>
</field>
<field>
<column_name>recording_date</column_name>
<stream_name>recording_date</stream_name>
</field>
<field>
<column_name>schema</column_name>
<stream_name>level</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>forecast</schema>
<specify_fields>Y</specify_fields>
<table>registered_citizens</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>912</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>
</pipeline>

View file

@ -0,0 +1,165 @@
<?xml version="1.0" encoding="UTF-8"?>
<workflow>
<name>predictive</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<workflow_version/>
<created_user>-</created_user>
<created_date>2025/02/27 13:51:40.861</created_date>
<modified_user>-</modified_user>
<modified_date>2025/02/27 13:51:40.861</modified_date>
<parameters>
</parameters>
<actions>
<action>
<name>Start</name>
<description/>
<type>SPECIAL</type>
<attributes/>
<DayOfMonth>1</DayOfMonth>
<hour>12</hour>
<intervalMinutes>60</intervalMinutes>
<intervalSeconds>0</intervalSeconds>
<minutes>0</minutes>
<repeat>N</repeat>
<schedulerType>0</schedulerType>
<weekDay>1</weekDay>
<parallel>N</parallel>
<xloc>304</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
<action>
<name>data_collection.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
<add_date>N</add_date>
<add_time>N</add_time>
<clear_files>N</clear_files>
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/predictive_metrics/data_collection.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
</parameters>
<params_from_previous>N</params_from_previous>
<run_configuration>local</run_configuration>
<set_append_logfile>N</set_append_logfile>
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>448</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
<action>
<name>next_lvl_org_pm.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
<add_date>N</add_date>
<add_time>N</add_time>
<clear_files>N</clear_files>
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/predictive_metrics/next_lvl_org_pm.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
</parameters>
<params_from_previous>N</params_from_previous>
<run_configuration>local</run_configuration>
<set_append_logfile>N</set_append_logfile>
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>592</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
<action>
<name>predictive_data.hpl</name>
<description/>
<type>PIPELINE</type>
<attributes/>
<add_date>N</add_date>
<add_time>N</add_time>
<clear_files>N</clear_files>
<clear_rows>N</clear_rows>
<create_parent_folder>N</create_parent_folder>
<exec_per_row>N</exec_per_row>
<filename>${PROJECT_HOME}/predictive_metrics/predictive_data.hpl</filename>
<logext/>
<logfile/>
<loglevel>Basic</loglevel>
<parameters>
<pass_all_parameters>Y</pass_all_parameters>
</parameters>
<params_from_previous>N</params_from_previous>
<run_configuration>local</run_configuration>
<set_append_logfile>N</set_append_logfile>
<set_logfile>N</set_logfile>
<wait_until_finished>Y</wait_until_finished>
<parallel>N</parallel>
<xloc>736</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
<action>
<name>Success</name>
<description/>
<type>SUCCESS</type>
<attributes/>
<parallel>N</parallel>
<xloc>880</xloc>
<yloc>352</yloc>
<attributes_hac/>
</action>
</actions>
<hops>
<hop>
<from>Start</from>
<to>data_collection.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>Y</unconditional>
</hop>
<hop>
<from>data_collection.hpl</from>
<to>next_lvl_org_pm.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
<hop>
<from>next_lvl_org_pm.hpl</from>
<to>predictive_data.hpl</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
<hop>
<from>predictive_data.hpl</from>
<to>Success</to>
<enabled>Y</enabled>
<evaluation>Y</evaluation>
<unconditional>N</unconditional>
</hop>
</hops>
<notepads>
</notepads>
<attributes>
<group>
<name>debug_plugin</name>
</group>
</attributes>
</workflow>

View file

@ -0,0 +1,480 @@
<?xml version="1.0" encoding="UTF-8"?>
<pipeline>
<info>
<name>predictive_data</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<parameters>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
<created_user>-</created_user>
<created_date>2025/02/27 13:48:40.269</created_date>
<modified_user>-</modified_user>
<modified_date>2025/02/27 13:48:40.269</modified_date>
</info>
<notepads>
</notepads>
<order>
<hop>
<from>Table input</from>
<to>Table output</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Table input</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>-- Адаптированный запрос для таблицы forecast.registered_citizens с учетом указанной структуры
-- recording_date - год наблюдения
-- recruitment_id - идентификатор военкомата
-- count_all - всего на учете
-- reaching_17_age - поставлено на учет по достижении 17 лет
-- women_military_specialty - женщины, получившие ВУС
-- schema - уровень военкомата
-- 1. Проверяем наличие данных за весь необходимый период и создаем полную сетку лет
WITH year_range AS (
SELECT
EXTRACT(YEAR FROM CURRENT_DATE) - 54 AS min_year, -- Для первых двух показателей
EXTRACT(YEAR FROM CURRENT_DATE) - 34 AS min_year_females, -- Для женщин с ВУС
EXTRACT(YEAR FROM CURRENT_DATE) AS max_year
),
all_years AS (
SELECT generate_series(min_year, max_year) AS year
FROM year_range
),
all_years_females AS (
SELECT generate_series(min_year_females, max_year) AS year
FROM year_range
),
all_regions AS (
SELECT DISTINCT recruitment_id, schema FROM forecast.registered_citizens
),
-- 2. Создаем полную сетку регион-год для всех показателей
full_grid_males AS (
SELECT
r.recruitment_id,
r.schema,
y.year
FROM all_regions r
CROSS JOIN all_years y
),
full_grid_females AS (
SELECT
r.recruitment_id,
r.schema,
y.year
FROM all_regions r
CROSS JOIN all_years_females y
),
-- 3. Объединяем сетку с имеющимися данными, чтобы выявить пропуски
males_data AS (
SELECT
g.recruitment_id,
g.schema,
g.year,
COALESCE(h.count_all, NULL) AS count_all,
COALESCE(h.reaching_17_age, NULL) AS reaching_17_age
FROM full_grid_males g
LEFT JOIN forecast.registered_citizens h ON g.recruitment_id = h.recruitment_id AND g.year = EXTRACT(YEAR FROM h.recording_date)
),
females_data AS (
SELECT
g.recruitment_id,
g.schema,
g.year,
COALESCE(h.women_military_specialty, NULL) AS women_military_specialty
FROM full_grid_females g
LEFT JOIN forecast.registered_citizens h ON g.recruitment_id = h.recruitment_id AND g.year = EXTRACT(YEAR FROM h.recording_date)
),
-- 4. Рассчитываем статистики для выявления выбросов
males_stats AS (
SELECT
recruitment_id,
schema,
AVG(count_all) AS avg_count_all,
STDDEV(count_all) * 2 AS stddev_count_all, -- 2σ для фильтрации выбросов
AVG(reaching_17_age) AS avg_reaching_17_age,
STDDEV(reaching_17_age) * 2 AS stddev_reaching_17_age -- 2σ для фильтрации выбросов
FROM males_data
WHERE count_all IS NOT NULL AND reaching_17_age IS NOT NULL
GROUP BY recruitment_id, schema
),
females_stats AS (
SELECT
recruitment_id,
schema,
AVG(women_military_specialty) AS avg_women_military_specialty,
STDDEV(women_military_specialty) * 2 AS stddev_women_military_specialty -- 2σ для фильтрации выбросов
FROM females_data
WHERE women_military_specialty IS NOT NULL
GROUP BY recruitment_id, schema
),
-- 5. Выявляем выбросы и помечаем их для интерполяции
males_outliers AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
m.count_all,
CASE
WHEN m.count_all IS NULL OR ABS(m.count_all - s.avg_count_all) > s.stddev_count_all
THEN TRUE ELSE FALSE
END AS count_all_is_outlier,
m.reaching_17_age,
CASE
WHEN m.reaching_17_age IS NULL OR ABS(m.reaching_17_age - s.avg_reaching_17_age) > s.stddev_reaching_17_age
THEN TRUE ELSE FALSE
END AS reaching_17_age_is_outlier
FROM males_data m
JOIN males_stats s ON m.recruitment_id = s.recruitment_id AND m.schema = s.schema
),
females_outliers AS (
SELECT
f.recruitment_id,
f.schema,
f.year,
f.women_military_specialty,
CASE
WHEN f.women_military_specialty IS NULL OR ABS(f.women_military_specialty - s.avg_women_military_specialty) > s.stddev_women_military_specialty
THEN TRUE ELSE FALSE
END AS women_military_specialty_is_outlier
FROM females_data f
JOIN females_stats s ON f.recruitment_id = s.recruitment_id AND f.schema = s.schema
),
-- 6. Подготовка данных для интерполяции (находим ближайшие НЕ выбросы)
males_interpolation_prep AS (
SELECT
mo.recruitment_id,
mo.schema,
mo.year,
mo.count_all,
mo.count_all_is_outlier,
mo.reaching_17_age,
mo.reaching_17_age_is_outlier,
-- Для интерполяции "Всего на учете" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year &lt; mo.year AND NOT count_all_is_outlier) AS prev_year_count_all,
(SELECT MIN(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year > mo.year AND NOT count_all_is_outlier) AS next_year_count_all,
-- Для интерполяции "Поставлено на учет по достижении 17 лет" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year &lt; mo.year AND NOT reaching_17_age_is_outlier) AS prev_year_reaching_17_age,
(SELECT MIN(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year > mo.year AND NOT reaching_17_age_is_outlier) AS next_year_reaching_17_age
FROM males_outliers mo
),
females_interpolation_prep AS (
SELECT
fo.recruitment_id,
fo.schema,
fo.year,
fo.women_military_specialty,
fo.women_military_specialty_is_outlier,
-- Для интерполяции "Женщины, получившие ВУС" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM females_outliers
WHERE recruitment_id = fo.recruitment_id AND schema = fo.schema AND year &lt; fo.year AND NOT women_military_specialty_is_outlier) AS prev_year_women_military_specialty,
(SELECT MIN(year) FROM females_outliers
WHERE recruitment_id = fo.recruitment_id AND schema = fo.schema AND year > fo.year AND NOT women_military_specialty_is_outlier) AS next_year_women_military_specialty
FROM females_outliers fo
),
-- 7. Выполняем линейную интерполяцию для выбросов и отсутствующих значений
males_interpolated AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
CASE
WHEN m.count_all_is_outlier AND m.prev_year_count_all IS NOT NULL AND m.next_year_count_all IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.count_all FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_count_all) +
((m.year - m.prev_year_count_all)::float / (m.next_year_count_all - m.prev_year_count_all)) *
((SELECT n.count_all FROM males_outliers n
WHERE n.recruitment_id = m.recruitment_id AND n.schema = m.schema AND n.year = m.next_year_count_all) -
(SELECT p.count_all FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_count_all))
ELSE m.count_all
END AS count_all,
CASE
WHEN m.reaching_17_age_is_outlier AND m.prev_year_reaching_17_age IS NOT NULL AND m.next_year_reaching_17_age IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.reaching_17_age FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_reaching_17_age) +
((m.year - m.prev_year_reaching_17_age)::float / (m.next_year_reaching_17_age - m.prev_year_reaching_17_age)) *
((SELECT n.reaching_17_age FROM males_outliers n
WHERE n.recruitment_id = m.recruitment_id AND n.schema = m.schema AND n.year = m.next_year_reaching_17_age) -
(SELECT p.reaching_17_age FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_reaching_17_age))
ELSE m.reaching_17_age
END AS reaching_17_age
FROM males_interpolation_prep m
),
females_interpolated AS (
SELECT
f.recruitment_id,
f.schema,
f.year,
CASE
WHEN f.women_military_specialty_is_outlier AND f.prev_year_women_military_specialty IS NOT NULL AND f.next_year_women_military_specialty IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.women_military_specialty FROM females_outliers p
WHERE p.recruitment_id = f.recruitment_id AND p.schema = f.schema AND p.year = f.prev_year_women_military_specialty) +
((f.year - f.prev_year_women_military_specialty)::float / (f.next_year_women_military_specialty - f.prev_year_women_military_specialty)) *
((SELECT n.women_military_specialty FROM females_outliers n
WHERE n.recruitment_id = f.recruitment_id AND n.schema = f.schema AND n.year = f.next_year_women_military_specialty) -
(SELECT p.women_military_specialty FROM females_outliers p
WHERE p.recruitment_id = f.recruitment_id AND p.schema = f.schema AND p.year = f.prev_year_women_military_specialty))
ELSE f.women_military_specialty
END AS women_military_specialty
FROM females_interpolation_prep f
),
-- 8. Объединяем интерполированные данные
combined_data AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
m.count_all,
m.reaching_17_age,
f.women_military_specialty
FROM males_interpolated m
LEFT JOIN females_interpolated f ON m.recruitment_id = f.recruitment_id AND m.schema = f.schema AND m.year = f.year
),
-- 9. Рассчитываем параметры линейной регрессии
average_values AS (
SELECT
recruitment_id,
schema,
year,
count_all,
reaching_17_age,
women_military_specialty,
-- Вычисляем средние значения отдельно
AVG(year) FILTER (WHERE count_all IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_count_all,
AVG(count_all) FILTER (WHERE count_all IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_count_all,
AVG(year) FILTER (WHERE reaching_17_age IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_reaching_17_age,
AVG(reaching_17_age) FILTER (WHERE reaching_17_age IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_reaching_17_age,
AVG(year) FILTER (WHERE women_military_specialty IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_women_military_specialty,
AVG(women_military_specialty) FILTER (WHERE women_military_specialty IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_women_military_specialty
FROM combined_data
),
regression_stats AS (
SELECT
recruitment_id,
schema,
-- count_all
COUNT(*) FILTER (WHERE count_all IS NOT NULL) AS n_count_all,
AVG(year) FILTER (WHERE count_all IS NOT NULL) AS avg_x_count_all,
AVG(count_all) FILTER (WHERE count_all IS NOT NULL) AS avg_y_count_all,
SUM((year - avg_x_count_all) * (count_all - avg_y_count_all)) FILTER (WHERE count_all IS NOT NULL) AS sum_xy_count_all,
SUM(POWER(year - avg_x_count_all, 2)) FILTER (WHERE count_all IS NOT NULL) AS sum_x_squared_count_all,
-- reaching_17_age
COUNT(*) FILTER (WHERE reaching_17_age IS NOT NULL) AS n_reaching_17_age,
AVG(year) FILTER (WHERE reaching_17_age IS NOT NULL) AS avg_x_reaching_17_age,
AVG(reaching_17_age) FILTER (WHERE reaching_17_age IS NOT NULL) AS avg_y_reaching_17_age,
SUM((year - avg_x_reaching_17_age) * (reaching_17_age - avg_y_reaching_17_age)) FILTER (WHERE reaching_17_age IS NOT NULL) AS sum_xy_reaching_17_age,
SUM(POWER(year - avg_x_reaching_17_age, 2)) FILTER (WHERE reaching_17_age IS NOT NULL) AS sum_x_squared_reaching_17_age,
-- women_military_specialty
COUNT(*) FILTER (WHERE women_military_specialty IS NOT NULL) AS n_women_military_specialty,
AVG(year) FILTER (WHERE women_military_specialty IS NOT NULL) AS avg_x_women_military_specialty,
AVG(women_military_specialty) FILTER (WHERE women_military_specialty IS NOT NULL) AS avg_y_women_military_specialty,
SUM((year - avg_x_women_military_specialty) * (women_military_specialty - avg_y_women_military_specialty)) FILTER (WHERE women_military_specialty IS NOT NULL) AS sum_xy_women_military_specialty,
SUM(POWER(year - avg_x_women_military_specialty, 2)) FILTER (WHERE women_military_specialty IS NOT NULL) AS sum_x_squared_women_military_specialty
FROM average_values
GROUP BY recruitment_id, schema
),
-- 10. Вычисляем параметры уравнения линейной регрессии
regression_params AS (
SELECT
recruitment_id,
schema,
-- Параметры для count_all
CASE
WHEN sum_x_squared_count_all &lt;&gt; 0 THEN sum_xy_count_all / sum_x_squared_count_all
ELSE 0
END AS k_count_all,
avg_y_count_all - (CASE
WHEN sum_x_squared_count_all &lt;&gt; 0 THEN sum_xy_count_all / sum_x_squared_count_all
ELSE 0
END) * avg_x_count_all AS b_count_all,
-- Параметры для reaching_17_age
CASE
WHEN sum_x_squared_reaching_17_age &lt;&gt; 0 THEN sum_xy_reaching_17_age / sum_x_squared_reaching_17_age
ELSE 0
END AS k_reaching_17_age,
avg_y_reaching_17_age - (CASE
WHEN sum_x_squared_reaching_17_age &lt;&gt; 0 THEN sum_xy_reaching_17_age / sum_x_squared_reaching_17_age
ELSE 0
END) * avg_x_reaching_17_age AS b_reaching_17_age,
-- Параметры для women_military_specialty
CASE
WHEN sum_x_squared_women_military_specialty &lt;&gt; 0 THEN sum_xy_women_military_specialty / sum_x_squared_women_military_specialty
ELSE 0
END AS k_women_military_specialty,
avg_y_women_military_specialty - (CASE
WHEN sum_x_squared_women_military_specialty &lt;&gt; 0 THEN sum_xy_women_military_specialty / sum_x_squared_women_military_specialty
ELSE 0
END) * avg_x_women_military_specialty AS b_women_military_specialty
FROM regression_stats
)
-- 11. Рассчитываем прогнозные значения на текущий и следующий годы
SELECT
recruitment_id,
schema,
MAKE_DATE(EXTRACT(YEAR FROM CURRENT_DATE)::int, 1, 1) AS current_year,
ROUND(k_count_all * EXTRACT(YEAR FROM CURRENT_DATE) + b_count_all) AS count_all,
ROUND(k_reaching_17_age * EXTRACT(YEAR FROM CURRENT_DATE) + b_reaching_17_age) AS reaching_17_age,
ROUND(k_women_military_specialty * EXTRACT(YEAR FROM CURRENT_DATE) + b_women_military_specialty) AS women_military_specialty,
'Текущий год' AS prediction_type
FROM regression_params
UNION ALL
SELECT
recruitment_id,
schema,
MAKE_DATE(EXTRACT(YEAR FROM CURRENT_DATE)::int + 1, 1, 1) AS next_year,
ROUND(k_count_all * (EXTRACT(YEAR FROM CURRENT_DATE) + 1) + b_count_all) AS count_all,
ROUND(k_reaching_17_age * (EXTRACT(YEAR FROM CURRENT_DATE) + 1) + b_reaching_17_age) AS reaching_17_age,
ROUND(k_women_military_specialty * (EXTRACT(YEAR FROM CURRENT_DATE) + 1) + b_women_military_specialty) AS women_military_specialty,
'Следующий год' AS prediction_type
FROM regression_params
UNION ALL
SELECT
recruitment_id,
schema,
MAKE_DATE(EXTRACT(YEAR FROM CURRENT_DATE)::int + 2, 1, 1) AS next_year,
ROUND(k_count_all * (EXTRACT(YEAR FROM CURRENT_DATE) + 2) + b_count_all) AS count_all,
ROUND(k_reaching_17_age * (EXTRACT(YEAR FROM CURRENT_DATE) + 2) + b_reaching_17_age) AS reaching_17_age,
ROUND(k_women_military_specialty * (EXTRACT(YEAR FROM CURRENT_DATE) + 2) + b_women_military_specialty) AS women_military_specialty,
'Следующий год' AS prediction_type
FROM regression_params
UNION ALL
SELECT
recruitment_id,
schema,
MAKE_DATE(EXTRACT(YEAR FROM CURRENT_DATE)::int + 3, 1, 1) AS next_year,
ROUND(k_count_all * (EXTRACT(YEAR FROM CURRENT_DATE) + 3) + b_count_all) AS count_all,
ROUND(k_reaching_17_age * (EXTRACT(YEAR FROM CURRENT_DATE) + 3) + b_reaching_17_age) AS reaching_17_age,
ROUND(k_women_military_specialty * (EXTRACT(YEAR FROM CURRENT_DATE) + 3) + b_women_military_specialty) AS women_military_specialty,
'Следующий год' AS prediction_type
FROM regression_params
ORDER BY schema, recruitment_id, current_year;
</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>720</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform>
<name>Table output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>1000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>recruitment_id</column_name>
<stream_name>recruitment_id</stream_name>
</field>
<field>
<column_name>schema</column_name>
<stream_name>schema</stream_name>
</field>
<field>
<column_name>recording_date</column_name>
<stream_name>current_year</stream_name>
</field>
<field>
<column_name>count_all</column_name>
<stream_name>count_all</stream_name>
</field>
<field>
<column_name>reaching_17_age</column_name>
<stream_name>reaching_17_age</stream_name>
</field>
<field>
<column_name>women_military_specialty</column_name>
<stream_name>women_military_specialty</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_monthly>Y</partitioning_monthly>
<return_keys>N</return_keys>
<schema>forecast</schema>
<specify_fields>Y</specify_fields>
<table>registered_citizens</table>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>928</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>
</pipeline>