ervu-dashboard-etl/map_v1/predictive_metrics/predictive_data.hpl
2025-08-12 16:42:25 +03:00

461 lines
20 KiB
XML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?xml version="1.0" encoding="UTF-8"?>
<pipeline>
<info>
<name>predictive_data</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<parameters>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
<created_user>-</created_user>
<created_date>2025/02/27 13:48:40.269</created_date>
<modified_user>-</modified_user>
<modified_date>2025/02/27 13:48:40.269</modified_date>
</info>
<notepads>
</notepads>
<order>
<hop>
<from>Table input</from>
<to>Table output</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Table input</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>-- Адаптированный запрос для таблицы forecast.registered_citizens с учетом указанной структуры
-- recording_date - год наблюдения
-- recruitment_id - идентификатор военкомата
-- count_all - всего на учете
-- reaching_17_age - поставлено на учет по достижении 17 лет
-- women_military_specialty - женщины, получившие ВУС
-- schema - уровень военкомата
-- 1. Проверяем наличие данных за весь необходимый период и создаем полную сетку лет
WITH RECURSIVE years AS ( -- Теперь синтаксис правильный
SELECT
EXTRACT(YEAR FROM CURRENT_DATE)::int AS year,
'Текущий год' AS prediction_type
UNION ALL
SELECT
year + 1,
'Прогнозный год'
FROM years
WHERE year &lt; EXTRACT(YEAR FROM CURRENT_DATE)::int + 49 -- +49, т.к. первый год уже включён
),
year_range AS (
SELECT
EXTRACT(YEAR FROM CURRENT_DATE) - 54 AS min_year, -- Для первых двух показателей
EXTRACT(YEAR FROM CURRENT_DATE) - 34 AS min_year_females, -- Для женщин с ВУС
EXTRACT(YEAR FROM CURRENT_DATE) AS max_year
),
all_years AS (
SELECT generate_series(min_year, max_year) AS year
FROM year_range
),
all_years_females AS (
SELECT generate_series(min_year_females, max_year) AS year
FROM year_range
),
all_regions AS (
SELECT DISTINCT recruitment_id, schema FROM forecast.registered_citizens
),
-- 2. Создаем полную сетку регион-год для всех показателей
full_grid_males AS (
SELECT
r.recruitment_id,
r.schema,
y.year
FROM all_regions r
CROSS JOIN all_years y
),
full_grid_females AS (
SELECT
r.recruitment_id,
r.schema,
y.year
FROM all_regions r
CROSS JOIN all_years_females y
),
-- 3. Объединяем сетку с имеющимися данными, чтобы выявить пропуски
males_data AS (
SELECT
g.recruitment_id,
g.schema,
g.year,
COALESCE(h.count_all, NULL) AS count_all,
COALESCE(h.reaching_17_age, NULL) AS reaching_17_age
FROM full_grid_males g
LEFT JOIN forecast.registered_citizens h ON g.recruitment_id = h.recruitment_id AND g.year = EXTRACT(YEAR FROM h.recording_date)
),
females_data AS (
SELECT
g.recruitment_id,
g.schema,
g.year,
COALESCE(h.women_military_specialty, NULL) AS women_military_specialty
FROM full_grid_females g
LEFT JOIN forecast.registered_citizens h ON g.recruitment_id = h.recruitment_id AND g.year = EXTRACT(YEAR FROM h.recording_date)
),
-- 4. Рассчитываем статистики для выявления выбросов
males_stats AS (
SELECT
recruitment_id,
schema,
AVG(count_all) AS avg_count_all,
STDDEV(count_all) AS stddev_count_all, -- σ для фильтрации выбросов
AVG(reaching_17_age) AS avg_reaching_17_age,
STDDEV(reaching_17_age) AS stddev_reaching_17_age -- σ для фильтрации выбросов
FROM males_data
WHERE count_all IS NOT NULL AND reaching_17_age IS NOT NULL
GROUP BY recruitment_id, schema
),
females_stats AS (
SELECT
recruitment_id,
schema,
AVG(women_military_specialty) AS avg_women_military_specialty,
STDDEV(women_military_specialty) AS stddev_women_military_specialty -- σ для фильтрации выбросов
FROM females_data
WHERE women_military_specialty IS NOT NULL
GROUP BY recruitment_id, schema
),
-- 5. Выявляем выбросы и помечаем их для интерполяции
males_outliers AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
m.count_all,
CASE
WHEN m.count_all IS NULL OR ABS(m.count_all - s.avg_count_all) > s.stddev_count_all
THEN TRUE ELSE FALSE
END AS count_all_is_outlier,
m.reaching_17_age,
CASE
WHEN m.reaching_17_age IS NULL OR ABS(m.reaching_17_age - s.avg_reaching_17_age) > s.stddev_reaching_17_age
THEN TRUE ELSE FALSE
END AS reaching_17_age_is_outlier
FROM males_data m
JOIN males_stats s ON m.recruitment_id = s.recruitment_id AND m.schema = s.schema
),
females_outliers AS (
SELECT
f.recruitment_id,
f.schema,
f.year,
f.women_military_specialty,
CASE
WHEN f.women_military_specialty IS NULL OR ABS(f.women_military_specialty - s.avg_women_military_specialty) > s.stddev_women_military_specialty
THEN TRUE ELSE FALSE
END AS women_military_specialty_is_outlier
FROM females_data f
JOIN females_stats s ON f.recruitment_id = s.recruitment_id AND f.schema = s.schema
),
-- 6. Подготовка данных для интерполяции (находим ближайшие НЕ выбросы)
males_interpolation_prep AS (
SELECT
mo.recruitment_id,
mo.schema,
mo.year,
mo.count_all,
mo.count_all_is_outlier,
mo.reaching_17_age,
mo.reaching_17_age_is_outlier,
-- Для интерполяции "Всего на учете" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year &lt; mo.year AND NOT count_all_is_outlier) AS prev_year_count_all,
(SELECT MIN(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year > mo.year AND NOT count_all_is_outlier) AS next_year_count_all,
-- Для интерполяции "Поставлено на учет по достижении 17 лет" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year &lt; mo.year AND NOT reaching_17_age_is_outlier) AS prev_year_reaching_17_age,
(SELECT MIN(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year > mo.year AND NOT reaching_17_age_is_outlier) AS next_year_reaching_17_age
FROM males_outliers mo
),
females_interpolation_prep AS (
SELECT
fo.recruitment_id,
fo.schema,
fo.year,
fo.women_military_specialty,
fo.women_military_specialty_is_outlier,
-- Для интерполяции "Женщины, получившие ВУС" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM females_outliers
WHERE recruitment_id = fo.recruitment_id AND schema = fo.schema AND year &lt; fo.year AND NOT women_military_specialty_is_outlier) AS prev_year_women_military_specialty,
(SELECT MIN(year) FROM females_outliers
WHERE recruitment_id = fo.recruitment_id AND schema = fo.schema AND year > fo.year AND NOT women_military_specialty_is_outlier) AS next_year_women_military_specialty
FROM females_outliers fo
),
-- 7. Выполняем линейную интерполяцию для выбросов и отсутствующих значений
males_interpolated AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
CASE
WHEN m.count_all_is_outlier AND m.prev_year_count_all IS NOT NULL AND m.next_year_count_all IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.count_all FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_count_all) +
((m.year - m.prev_year_count_all)::float / (m.next_year_count_all - m.prev_year_count_all)) *
((SELECT n.count_all FROM males_outliers n
WHERE n.recruitment_id = m.recruitment_id AND n.schema = m.schema AND n.year = m.next_year_count_all) -
(SELECT p.count_all FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_count_all))
ELSE m.count_all
END AS count_all,
CASE
WHEN m.reaching_17_age_is_outlier AND m.prev_year_reaching_17_age IS NOT NULL AND m.next_year_reaching_17_age IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.reaching_17_age FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_reaching_17_age) +
((m.year - m.prev_year_reaching_17_age)::float / (m.next_year_reaching_17_age - m.prev_year_reaching_17_age)) *
((SELECT n.reaching_17_age FROM males_outliers n
WHERE n.recruitment_id = m.recruitment_id AND n.schema = m.schema AND n.year = m.next_year_reaching_17_age) -
(SELECT p.reaching_17_age FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_reaching_17_age))
ELSE m.reaching_17_age
END AS reaching_17_age
FROM males_interpolation_prep m
),
females_interpolated AS (
SELECT
f.recruitment_id,
f.schema,
f.year,
CASE
WHEN f.women_military_specialty_is_outlier AND f.prev_year_women_military_specialty IS NOT NULL AND f.next_year_women_military_specialty IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.women_military_specialty FROM females_outliers p
WHERE p.recruitment_id = f.recruitment_id AND p.schema = f.schema AND p.year = f.prev_year_women_military_specialty) +
((f.year - f.prev_year_women_military_specialty)::float / (f.next_year_women_military_specialty - f.prev_year_women_military_specialty)) *
((SELECT n.women_military_specialty FROM females_outliers n
WHERE n.recruitment_id = f.recruitment_id AND n.schema = f.schema AND n.year = f.next_year_women_military_specialty) -
(SELECT p.women_military_specialty FROM females_outliers p
WHERE p.recruitment_id = f.recruitment_id AND p.schema = f.schema AND p.year = f.prev_year_women_military_specialty))
ELSE f.women_military_specialty
END AS women_military_specialty
FROM females_interpolation_prep f
),
-- 8. Объединяем интерполированные данные
combined_data AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
m.count_all,
m.reaching_17_age,
f.women_military_specialty
FROM males_interpolated m
LEFT JOIN females_interpolated f ON m.recruitment_id = f.recruitment_id AND m.schema = f.schema AND m.year = f.year
),
-- 9. Рассчитываем параметры линейной регрессии
average_values AS (
SELECT
recruitment_id,
schema,
year,
count_all,
reaching_17_age,
women_military_specialty,
-- Вычисляем средние значения отдельно
AVG(year) FILTER (WHERE count_all IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_count_all,
AVG(count_all) FILTER (WHERE count_all IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_count_all,
AVG(year) FILTER (WHERE reaching_17_age IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_reaching_17_age,
AVG(reaching_17_age) FILTER (WHERE reaching_17_age IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_reaching_17_age,
AVG(year) FILTER (WHERE women_military_specialty IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_women_military_specialty,
AVG(women_military_specialty) FILTER (WHERE women_military_specialty IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_women_military_specialty
FROM combined_data
),
regression_stats AS (
SELECT
recruitment_id,
schema,
-- count_all
COUNT(*) FILTER (WHERE count_all IS NOT NULL) AS n_count_all,
AVG(year) FILTER (WHERE count_all IS NOT NULL) AS avg_x_count_all,
AVG(count_all) FILTER (WHERE count_all IS NOT NULL) AS avg_y_count_all,
SUM((year - avg_x_count_all) * (count_all - avg_y_count_all)) FILTER (WHERE count_all IS NOT NULL) AS sum_xy_count_all,
SUM(POWER(year - avg_x_count_all, 2)) FILTER (WHERE count_all IS NOT NULL) AS sum_x_squared_count_all,
-- reaching_17_age
COUNT(*) FILTER (WHERE reaching_17_age IS NOT NULL) AS n_reaching_17_age,
AVG(year) FILTER (WHERE reaching_17_age IS NOT NULL) AS avg_x_reaching_17_age,
AVG(reaching_17_age) FILTER (WHERE reaching_17_age IS NOT NULL) AS avg_y_reaching_17_age,
SUM((year - avg_x_reaching_17_age) * (reaching_17_age - avg_y_reaching_17_age)) FILTER (WHERE reaching_17_age IS NOT NULL) AS sum_xy_reaching_17_age,
SUM(POWER(year - avg_x_reaching_17_age, 2)) FILTER (WHERE reaching_17_age IS NOT NULL) AS sum_x_squared_reaching_17_age,
-- women_military_specialty
COUNT(*) FILTER (WHERE women_military_specialty IS NOT NULL) AS n_women_military_specialty,
AVG(year) FILTER (WHERE women_military_specialty IS NOT NULL) AS avg_x_women_military_specialty,
AVG(women_military_specialty) FILTER (WHERE women_military_specialty IS NOT NULL) AS avg_y_women_military_specialty,
SUM((year - avg_x_women_military_specialty) * (women_military_specialty - avg_y_women_military_specialty)) FILTER (WHERE women_military_specialty IS NOT NULL) AS sum_xy_women_military_specialty,
SUM(POWER(year - avg_x_women_military_specialty, 2)) FILTER (WHERE women_military_specialty IS NOT NULL) AS sum_x_squared_women_military_specialty
FROM average_values
GROUP BY recruitment_id, schema
),
-- 10. Вычисляем параметры уравнения линейной регрессии
regression_params AS (
SELECT
recruitment_id,
schema,
-- Параметры для count_all
CASE
WHEN sum_x_squared_count_all &lt;&gt; 0 THEN sum_xy_count_all / sum_x_squared_count_all
ELSE 0
END AS k_count_all,
avg_y_count_all - (CASE
WHEN sum_x_squared_count_all &lt;&gt; 0 THEN sum_xy_count_all / sum_x_squared_count_all
ELSE 0
END) * avg_x_count_all AS b_count_all,
-- Параметры для reaching_17_age
CASE
WHEN sum_x_squared_reaching_17_age &lt;&gt; 0 THEN sum_xy_reaching_17_age / sum_x_squared_reaching_17_age
ELSE 0
END AS k_reaching_17_age,
avg_y_reaching_17_age - (CASE
WHEN sum_x_squared_reaching_17_age &lt;&gt; 0 THEN sum_xy_reaching_17_age / sum_x_squared_reaching_17_age
ELSE 0
END) * avg_x_reaching_17_age AS b_reaching_17_age,
-- Параметры для women_military_specialty
CASE
WHEN sum_x_squared_women_military_specialty &lt;&gt; 0 THEN sum_xy_women_military_specialty / sum_x_squared_women_military_specialty
ELSE 0
END AS k_women_military_specialty,
avg_y_women_military_specialty - (CASE
WHEN sum_x_squared_women_military_specialty &lt;&gt; 0 THEN sum_xy_women_military_specialty / sum_x_squared_women_military_specialty
ELSE 0
END) * avg_x_women_military_specialty AS b_women_military_specialty
FROM regression_stats
)
-- 11. Рассчитываем прогнозные значения на текущий и следующий годы
SELECT
r.recruitment_id,
r.schema,
MAKE_DATE(y.year, 1, 1) AS prediction_year,
ROUND(r.k_count_all * y.year + r.b_count_all) AS count_all,
ROUND(r.k_reaching_17_age * y.year + r.b_reaching_17_age) AS reaching_17_age,
ROUND(r.k_women_military_specialty * y.year + r.b_women_military_specialty) AS women_military_specialty,
y.prediction_type
FROM years y
CROSS JOIN regression_params r
ORDER BY r.schema, r.recruitment_id, prediction_year;
</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>720</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform>
<name>Table output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>1000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>recruitment_id</column_name>
<stream_name>recruitment_id</stream_name>
</field>
<field>
<column_name>schema</column_name>
<stream_name>schema</stream_name>
</field>
<field>
<column_name>recording_date</column_name>
<stream_name>prediction_year</stream_name>
</field>
<field>
<column_name>count_all</column_name>
<stream_name>count_all</stream_name>
</field>
<field>
<column_name>reaching_17_age</column_name>
<stream_name>reaching_17_age</stream_name>
</field>
<field>
<column_name>women_military_specialty</column_name>
<stream_name>women_military_specialty</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>forecast</schema>
<specify_fields>Y</specify_fields>
<table>registered_citizens</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>928</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>
</pipeline>