ervu-dashboard-etl/mappings_new/predictive_metrics/predictive_data.hpl

462 lines
20 KiB
Text
Raw Normal View History

2025-06-10 16:27:45 +03:00
<?xml version="1.0" encoding="UTF-8"?>
<pipeline>
<info>
<name>predictive_data</name>
<name_sync_with_filename>Y</name_sync_with_filename>
<description/>
<extended_description/>
<pipeline_version/>
<pipeline_type>Normal</pipeline_type>
<parameters>
</parameters>
<capture_transform_performance>N</capture_transform_performance>
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
<created_user>-</created_user>
<created_date>2025/02/27 13:48:40.269</created_date>
<modified_user>-</modified_user>
<modified_date>2025/02/27 13:48:40.269</modified_date>
</info>
<notepads>
</notepads>
<order>
<hop>
<from>Table input</from>
<to>Table output</to>
<enabled>Y</enabled>
</hop>
</order>
<transform>
<name>Table input</name>
<type>TableInput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<connection>ervu-dashboard</connection>
<execute_each_row>N</execute_each_row>
<limit>0</limit>
<sql>-- Адаптированный запрос для таблицы forecast.registered_citizens с учетом указанной структуры
-- recording_date - год наблюдения
-- recruitment_id - идентификатор военкомата
-- count_all - всего на учете
-- reaching_17_age - поставлено на учет по достижении 17 лет
-- women_military_specialty - женщины, получившие ВУС
-- schema - уровень военкомата
-- 1. Проверяем наличие данных за весь необходимый период и создаем полную сетку лет
WITH RECURSIVE years AS ( -- Теперь синтаксис правильный
SELECT
EXTRACT(YEAR FROM CURRENT_DATE)::int AS year,
'Текущий год' AS prediction_type
UNION ALL
SELECT
year + 1,
'Прогнозный год'
FROM years
WHERE year &lt; EXTRACT(YEAR FROM CURRENT_DATE)::int + 49 -- +49, т.к. первый год уже включён
),
year_range AS (
SELECT
EXTRACT(YEAR FROM CURRENT_DATE) - 54 AS min_year, -- Для первых двух показателей
EXTRACT(YEAR FROM CURRENT_DATE) - 34 AS min_year_females, -- Для женщин с ВУС
EXTRACT(YEAR FROM CURRENT_DATE) AS max_year
),
all_years AS (
SELECT generate_series(min_year, max_year) AS year
FROM year_range
),
all_years_females AS (
SELECT generate_series(min_year_females, max_year) AS year
FROM year_range
),
all_regions AS (
SELECT DISTINCT recruitment_id, schema FROM forecast.registered_citizens
),
-- 2. Создаем полную сетку регион-год для всех показателей
full_grid_males AS (
SELECT
r.recruitment_id,
r.schema,
y.year
FROM all_regions r
CROSS JOIN all_years y
),
full_grid_females AS (
SELECT
r.recruitment_id,
r.schema,
y.year
FROM all_regions r
CROSS JOIN all_years_females y
),
-- 3. Объединяем сетку с имеющимися данными, чтобы выявить пропуски
males_data AS (
SELECT
g.recruitment_id,
g.schema,
g.year,
COALESCE(h.count_all, NULL) AS count_all,
COALESCE(h.reaching_17_age, NULL) AS reaching_17_age
FROM full_grid_males g
LEFT JOIN forecast.registered_citizens h ON g.recruitment_id = h.recruitment_id AND g.year = EXTRACT(YEAR FROM h.recording_date)
),
females_data AS (
SELECT
g.recruitment_id,
g.schema,
g.year,
COALESCE(h.women_military_specialty, NULL) AS women_military_specialty
FROM full_grid_females g
LEFT JOIN forecast.registered_citizens h ON g.recruitment_id = h.recruitment_id AND g.year = EXTRACT(YEAR FROM h.recording_date)
),
-- 4. Рассчитываем статистики для выявления выбросов
males_stats AS (
SELECT
recruitment_id,
schema,
AVG(count_all) AS avg_count_all,
STDDEV(count_all) AS stddev_count_all, -- σ для фильтрации выбросов
AVG(reaching_17_age) AS avg_reaching_17_age,
STDDEV(reaching_17_age) AS stddev_reaching_17_age -- σ для фильтрации выбросов
FROM males_data
WHERE count_all IS NOT NULL AND reaching_17_age IS NOT NULL
GROUP BY recruitment_id, schema
),
females_stats AS (
SELECT
recruitment_id,
schema,
AVG(women_military_specialty) AS avg_women_military_specialty,
STDDEV(women_military_specialty) AS stddev_women_military_specialty -- σ для фильтрации выбросов
FROM females_data
WHERE women_military_specialty IS NOT NULL
GROUP BY recruitment_id, schema
),
-- 5. Выявляем выбросы и помечаем их для интерполяции
males_outliers AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
m.count_all,
CASE
WHEN m.count_all IS NULL OR ABS(m.count_all - s.avg_count_all) > s.stddev_count_all
THEN TRUE ELSE FALSE
END AS count_all_is_outlier,
m.reaching_17_age,
CASE
WHEN m.reaching_17_age IS NULL OR ABS(m.reaching_17_age - s.avg_reaching_17_age) > s.stddev_reaching_17_age
THEN TRUE ELSE FALSE
END AS reaching_17_age_is_outlier
FROM males_data m
JOIN males_stats s ON m.recruitment_id = s.recruitment_id AND m.schema = s.schema
),
females_outliers AS (
SELECT
f.recruitment_id,
f.schema,
f.year,
f.women_military_specialty,
CASE
WHEN f.women_military_specialty IS NULL OR ABS(f.women_military_specialty - s.avg_women_military_specialty) > s.stddev_women_military_specialty
THEN TRUE ELSE FALSE
END AS women_military_specialty_is_outlier
FROM females_data f
JOIN females_stats s ON f.recruitment_id = s.recruitment_id AND f.schema = s.schema
),
-- 6. Подготовка данных для интерполяции (находим ближайшие НЕ выбросы)
males_interpolation_prep AS (
SELECT
mo.recruitment_id,
mo.schema,
mo.year,
mo.count_all,
mo.count_all_is_outlier,
mo.reaching_17_age,
mo.reaching_17_age_is_outlier,
-- Для интерполяции "Всего на учете" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year &lt; mo.year AND NOT count_all_is_outlier) AS prev_year_count_all,
(SELECT MIN(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year > mo.year AND NOT count_all_is_outlier) AS next_year_count_all,
-- Для интерполяции "Поставлено на учет по достижении 17 лет" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year &lt; mo.year AND NOT reaching_17_age_is_outlier) AS prev_year_reaching_17_age,
(SELECT MIN(year) FROM males_outliers
WHERE recruitment_id = mo.recruitment_id AND schema = mo.schema AND year > mo.year AND NOT reaching_17_age_is_outlier) AS next_year_reaching_17_age
FROM males_outliers mo
),
females_interpolation_prep AS (
SELECT
fo.recruitment_id,
fo.schema,
fo.year,
fo.women_military_specialty,
fo.women_military_specialty_is_outlier,
-- Для интерполяции "Женщины, получившие ВУС" ищем ближайшие не-выбросы
(SELECT MAX(year) FROM females_outliers
WHERE recruitment_id = fo.recruitment_id AND schema = fo.schema AND year &lt; fo.year AND NOT women_military_specialty_is_outlier) AS prev_year_women_military_specialty,
(SELECT MIN(year) FROM females_outliers
WHERE recruitment_id = fo.recruitment_id AND schema = fo.schema AND year > fo.year AND NOT women_military_specialty_is_outlier) AS next_year_women_military_specialty
FROM females_outliers fo
),
-- 7. Выполняем линейную интерполяцию для выбросов и отсутствующих значений
males_interpolated AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
CASE
WHEN m.count_all_is_outlier AND m.prev_year_count_all IS NOT NULL AND m.next_year_count_all IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.count_all FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_count_all) +
((m.year - m.prev_year_count_all)::float / (m.next_year_count_all - m.prev_year_count_all)) *
((SELECT n.count_all FROM males_outliers n
WHERE n.recruitment_id = m.recruitment_id AND n.schema = m.schema AND n.year = m.next_year_count_all) -
(SELECT p.count_all FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_count_all))
ELSE m.count_all
END AS count_all,
CASE
WHEN m.reaching_17_age_is_outlier AND m.prev_year_reaching_17_age IS NOT NULL AND m.next_year_reaching_17_age IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.reaching_17_age FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_reaching_17_age) +
((m.year - m.prev_year_reaching_17_age)::float / (m.next_year_reaching_17_age - m.prev_year_reaching_17_age)) *
((SELECT n.reaching_17_age FROM males_outliers n
WHERE n.recruitment_id = m.recruitment_id AND n.schema = m.schema AND n.year = m.next_year_reaching_17_age) -
(SELECT p.reaching_17_age FROM males_outliers p
WHERE p.recruitment_id = m.recruitment_id AND p.schema = m.schema AND p.year = m.prev_year_reaching_17_age))
ELSE m.reaching_17_age
END AS reaching_17_age
FROM males_interpolation_prep m
),
females_interpolated AS (
SELECT
f.recruitment_id,
f.schema,
f.year,
CASE
WHEN f.women_military_specialty_is_outlier AND f.prev_year_women_military_specialty IS NOT NULL AND f.next_year_women_military_specialty IS NOT NULL THEN
-- Linear interpolation formula: y = y1 + ((x - x1) / (x2 - x1)) * (y2 - y1)
(SELECT p.women_military_specialty FROM females_outliers p
WHERE p.recruitment_id = f.recruitment_id AND p.schema = f.schema AND p.year = f.prev_year_women_military_specialty) +
((f.year - f.prev_year_women_military_specialty)::float / (f.next_year_women_military_specialty - f.prev_year_women_military_specialty)) *
((SELECT n.women_military_specialty FROM females_outliers n
WHERE n.recruitment_id = f.recruitment_id AND n.schema = f.schema AND n.year = f.next_year_women_military_specialty) -
(SELECT p.women_military_specialty FROM females_outliers p
WHERE p.recruitment_id = f.recruitment_id AND p.schema = f.schema AND p.year = f.prev_year_women_military_specialty))
ELSE f.women_military_specialty
END AS women_military_specialty
FROM females_interpolation_prep f
),
-- 8. Объединяем интерполированные данные
combined_data AS (
SELECT
m.recruitment_id,
m.schema,
m.year,
m.count_all,
m.reaching_17_age,
f.women_military_specialty
FROM males_interpolated m
LEFT JOIN females_interpolated f ON m.recruitment_id = f.recruitment_id AND m.schema = f.schema AND m.year = f.year
),
-- 9. Рассчитываем параметры линейной регрессии
average_values AS (
SELECT
recruitment_id,
schema,
year,
count_all,
reaching_17_age,
women_military_specialty,
-- Вычисляем средние значения отдельно
AVG(year) FILTER (WHERE count_all IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_count_all,
AVG(count_all) FILTER (WHERE count_all IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_count_all,
AVG(year) FILTER (WHERE reaching_17_age IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_reaching_17_age,
AVG(reaching_17_age) FILTER (WHERE reaching_17_age IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_reaching_17_age,
AVG(year) FILTER (WHERE women_military_specialty IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_x_women_military_specialty,
AVG(women_military_specialty) FILTER (WHERE women_military_specialty IS NOT NULL) OVER (PARTITION BY recruitment_id, schema) AS avg_y_women_military_specialty
FROM combined_data
),
regression_stats AS (
SELECT
recruitment_id,
schema,
-- count_all
COUNT(*) FILTER (WHERE count_all IS NOT NULL) AS n_count_all,
AVG(year) FILTER (WHERE count_all IS NOT NULL) AS avg_x_count_all,
AVG(count_all) FILTER (WHERE count_all IS NOT NULL) AS avg_y_count_all,
SUM((year - avg_x_count_all) * (count_all - avg_y_count_all)) FILTER (WHERE count_all IS NOT NULL) AS sum_xy_count_all,
SUM(POWER(year - avg_x_count_all, 2)) FILTER (WHERE count_all IS NOT NULL) AS sum_x_squared_count_all,
-- reaching_17_age
COUNT(*) FILTER (WHERE reaching_17_age IS NOT NULL) AS n_reaching_17_age,
AVG(year) FILTER (WHERE reaching_17_age IS NOT NULL) AS avg_x_reaching_17_age,
AVG(reaching_17_age) FILTER (WHERE reaching_17_age IS NOT NULL) AS avg_y_reaching_17_age,
SUM((year - avg_x_reaching_17_age) * (reaching_17_age - avg_y_reaching_17_age)) FILTER (WHERE reaching_17_age IS NOT NULL) AS sum_xy_reaching_17_age,
SUM(POWER(year - avg_x_reaching_17_age, 2)) FILTER (WHERE reaching_17_age IS NOT NULL) AS sum_x_squared_reaching_17_age,
-- women_military_specialty
COUNT(*) FILTER (WHERE women_military_specialty IS NOT NULL) AS n_women_military_specialty,
AVG(year) FILTER (WHERE women_military_specialty IS NOT NULL) AS avg_x_women_military_specialty,
AVG(women_military_specialty) FILTER (WHERE women_military_specialty IS NOT NULL) AS avg_y_women_military_specialty,
SUM((year - avg_x_women_military_specialty) * (women_military_specialty - avg_y_women_military_specialty)) FILTER (WHERE women_military_specialty IS NOT NULL) AS sum_xy_women_military_specialty,
SUM(POWER(year - avg_x_women_military_specialty, 2)) FILTER (WHERE women_military_specialty IS NOT NULL) AS sum_x_squared_women_military_specialty
FROM average_values
GROUP BY recruitment_id, schema
),
-- 10. Вычисляем параметры уравнения линейной регрессии
regression_params AS (
SELECT
recruitment_id,
schema,
-- Параметры для count_all
CASE
WHEN sum_x_squared_count_all &lt;&gt; 0 THEN sum_xy_count_all / sum_x_squared_count_all
ELSE 0
END AS k_count_all,
avg_y_count_all - (CASE
WHEN sum_x_squared_count_all &lt;&gt; 0 THEN sum_xy_count_all / sum_x_squared_count_all
ELSE 0
END) * avg_x_count_all AS b_count_all,
-- Параметры для reaching_17_age
CASE
WHEN sum_x_squared_reaching_17_age &lt;&gt; 0 THEN sum_xy_reaching_17_age / sum_x_squared_reaching_17_age
ELSE 0
END AS k_reaching_17_age,
avg_y_reaching_17_age - (CASE
WHEN sum_x_squared_reaching_17_age &lt;&gt; 0 THEN sum_xy_reaching_17_age / sum_x_squared_reaching_17_age
ELSE 0
END) * avg_x_reaching_17_age AS b_reaching_17_age,
-- Параметры для women_military_specialty
CASE
WHEN sum_x_squared_women_military_specialty &lt;&gt; 0 THEN sum_xy_women_military_specialty / sum_x_squared_women_military_specialty
ELSE 0
END AS k_women_military_specialty,
avg_y_women_military_specialty - (CASE
WHEN sum_x_squared_women_military_specialty &lt;&gt; 0 THEN sum_xy_women_military_specialty / sum_x_squared_women_military_specialty
ELSE 0
END) * avg_x_women_military_specialty AS b_women_military_specialty
FROM regression_stats
)
-- 11. Рассчитываем прогнозные значения на текущий и следующий годы
SELECT
r.recruitment_id,
r.schema,
MAKE_DATE(y.year, 1, 1) AS prediction_year,
ROUND(r.k_count_all * y.year + r.b_count_all) AS count_all,
ROUND(r.k_reaching_17_age * y.year + r.b_reaching_17_age) AS reaching_17_age,
ROUND(r.k_women_military_specialty * y.year + r.b_women_military_specialty) AS women_military_specialty,
y.prediction_type
FROM years y
CROSS JOIN regression_params r
ORDER BY r.schema, r.recruitment_id, prediction_year;
</sql>
<variables_active>N</variables_active>
<attributes/>
<GUI>
<xloc>720</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform>
<name>Table output</name>
<type>TableOutput</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<commit>1000</commit>
<connection>ervu-dashboard</connection>
<fields>
<field>
<column_name>recruitment_id</column_name>
<stream_name>recruitment_id</stream_name>
</field>
<field>
<column_name>schema</column_name>
<stream_name>schema</stream_name>
</field>
<field>
<column_name>recording_date</column_name>
<stream_name>prediction_year</stream_name>
</field>
<field>
<column_name>count_all</column_name>
<stream_name>count_all</stream_name>
</field>
<field>
<column_name>reaching_17_age</column_name>
<stream_name>reaching_17_age</stream_name>
</field>
<field>
<column_name>women_military_specialty</column_name>
<stream_name>women_military_specialty</stream_name>
</field>
</fields>
<ignore_errors>N</ignore_errors>
<only_when_have_rows>N</only_when_have_rows>
<partitioning_daily>N</partitioning_daily>
<partitioning_enabled>N</partitioning_enabled>
<partitioning_field/>
<partitioning_monthly>Y</partitioning_monthly>
<return_field/>
<return_keys>N</return_keys>
<schema>forecast</schema>
<specify_fields>Y</specify_fields>
<table>registered_citizens</table>
<tablename_field/>
<tablename_in_field>N</tablename_in_field>
<tablename_in_table>Y</tablename_in_table>
<truncate>N</truncate>
<use_batch>Y</use_batch>
<attributes/>
<GUI>
<xloc>928</xloc>
<yloc>304</yloc>
</GUI>
</transform>
<transform_error_handling>
</transform_error_handling>
<attributes/>
</pipeline>