- BrainTools - https://www.braintools.ru -
Я работаю дата-сайентистом 5 лет и до сих пор испытываю боль [1], когда нужно сделать MVP по временным рядам. Начиная с того, как построить несколько графиков одновременно без «слипшихся» меток по осям, заканчивая поиском подходящего метода очистки ряда от аномалий. И всё это венчает цикл по каждому ряду с бесконечным жонглированием данными между numpy, pandas, sklearn, yet_another_library.
Если вы DS, и тоже, как и я, устали от вот этого всего, добро пожаловать под кат. Я покажу, как написать production-ready код для прогноза 200+ временных рядов от EDA до результата. Разберем на практике, как бороться с аномалиями, ловить смены тренда и в итоге – получить масштабируемое решение, а не очередной «велосипед».

Привет, Habr! Меня зовут Саша Солин, я ML-инженер в MAGNIT TECH. Наша команда занимается развитием и поддержкой моделей промо-прогноза [2]. В первом приближении прогнозирование промо может показаться задачей временных рядов, но по ряду причин, мы подходим к ней как к задаче классической регрессии. За счет своей простоты, некоторые из моделей получалось успешно переиспользовать в других проектах, не связанных с промо-прогнозом, например в ценообразовании [3] или доступности товара на полке [4].
Однако этому пришел конец, когда к нам пришли с задачей… прогнозирования роста журнала изменений БД. По предоставленным временным рядам 200+ сервисов необходимо сделать понедельный прогноз на несколько лет вперед для планирования инфраструктуры БД.
Достаем наш альпинистский набор Data Scientist’a и отправляемся покорять гору рядов!
Спойлер: данные представляют собой нестационарные временные ряды без выраженной сезонности, поэтому сразу отмели Prophet & ARIMA-like модели и решили смотреть сторону time-series библиотек с менее требовательными к рядам моделями. Выбор пал на фреймворк Etna [5] – он предоставляет набор удобных инструментов для работы с множественными временными рядами.
Перейдем от слов к делу, посмотрим на часть данных, с которыми будем работать:
import pandas as pd
from etna.datasets import TSDataset
# Читаем данные и преобразуем в "широкий формат Этна"
df = pd.read_csv('data.csv', parse_dates=['timestamp'])
ts = TSDataset(df, freq='W-MON')
ts

Данных минимум – четыре временных ряда с временными метками и таргетом (размер журнала изменений). Давайте взглянем на графики:
ts.plot()

Для начала построим бейзлайн – наивный прогноз, где прогнозное значение равно последнему известному наблюдению. Он послужит отправной точкой для оценки более сложных моделей.
Перед этим разобьем наш датасет на обучающую и тестовую выборки и создадим Pipeline. Он предназначен для последовательного выполнения цепочки создания /преобразования признаков и работы с моделью.
В данном примере использование пайплайна может показаться overkill’ом, но это позволит в дальнейшем проводить все эксперименты по единой схеме и поскорее освоиться с API фреймворка:
from etna.models import NaiveModel
from etna.pipeline import Pipeline
from etna.analysis import plot_forecast
# Формируем тестовую выборку из 20 последних наблюдений
HORIZON = 20
train_ts, test_ts = ts.train_test_split(test_size=HORIZON)
# Объявляем модель
naive_model = NaiveModel()
# Объявляем пайплайн
naive_pipeline = Pipeline(model=naive_model, horizon=HORIZON)
# Обучаем модель и делаем прогноз
naive_pipeline.fit(train_ts)
naive_forecast_ts = naive_pipeline.forecast()
# Визуализируем прогноз
plot_forecast(forecast_ts=naive_forecast_ts, test_ts=test_ts, train_ts=train_ts)

Для оценки прогноза воспользуемся метрикой взвешенной абсолютной процентной ошибки [6] WAPE:
from etna.metrics import WAPE
wape = WAPE()
naive_wape = wape(y_true=test_ts, y_pred=naive_forecast_ts)
df_metric = pd.DataFrame.from_dict(
data=naive_wape, orient='index', columns=['naive']
)
df_metric.round(2)

Бейзлайн готов, ошибка WAPE по каждому из рядов посчитана. Теперь попробуем с наскока снизить ошибку, заменив модель на линейную регрессию.

Первый и самый простой признак, который мы создадим для неё — линейный тренд. Это разница в днях между датой каждого наблюдения и началом временного ряда. Для этого нам потребуется преобразование, которое переведет разность дат в число:
from etna.transforms import LambdaTransform
def date_to_int(column: pd.DataFrame) -> pd.DataFrame:
"""
Получаем дату из индекса и вычисляем
разницу в днях от константной даты
"""
mix_date = column.index.min()
column = column
.apply(lambda x: (x.index - mix_date).days)
.div(7)
.add(1)
.astype('float')
return column
# Объявим лямбда преобразование, применяющее пользовательскую функцию к столбцу
date_to_int_transform = LambdaTransform(
in_column='target', # столбец, к которому применяется преобразование
inplace=False, # результат преобразования не перезаписывает in_column
out_column='step', # столбец с результатом выполнения функции
transform_func=date_to_int, # функция преобразования
inverse_transform_func=lambda x: x, # обратное преобразование не требуется
)
date_to_int_transform.fit_transform(train_ts)
train_ts.head(3)

Убедившись, что преобразование работает корректно, добавим его в пайплайн для линейной модели и сделаем новый прогноз. Здесь пайплайн наконец-то раскрывает свой потенциал: он создает новый признак для тренировочной и тестовой выборок, выполняет его стандартизацию и передает в модель:
from etna.models import LinearPerSegmentModel
from etna.transforms import StandardScalerTransform
# Объявляем линейную модель обучаемую посегментно
model = LinearPerSegmentModel()
# Объявляем последовательность преобразований
base_transforms = [
date_to_int_transform,
StandardScalerTransform()
]
base_pipeline = Pipeline(
model=model,
transforms=base_transforms,
horizon=HORIZON
)
# Явно указываем, что столбец step это регрессор,
# Иначе модель его "не увидит"
train_ts.regressors.append('step')
base_pipeline.fit(train_ts)
base_forecast_ts = base_pipeline.forecast()
plot_forecast(
train_ts=train_ts,
test_ts=test_ts,
forecast_ts={
'naive':naive_forecast_ts,
'linear_base':base_forecast_ts
}
)

Проверим метрики:
linear_base_wape = wape(y_true=test_ts, y_pred=base_forecast_ts)
df_metric['linear_base'] = pd.Series(linear_base_wape)
df_metric.round(2)

Линейной регрессии с простым трендом удалось снизить ошибку относительно бейзлайна лишь в половине случаев. В остальных случаях ошибка, наоборот, выросла — в два и в четыре раза.
Просчитались, но где?

Первый штурм не увенчался успехом. Посмотрим, какое еще снаряжение есть в нашем альпинистском наборе дата-сайентиста.
Etna из коробки предлагает готовые методы для детектирования аномалий вместе с удобными средствами визуализации. Ниже приведены примеры некоторых из них.
Метод гистограмм – данные аппроксимируются с помощью гистограммы, где точки, удаление которых снижает ошибку аппроксимации, считаются аномалиями:
from etna.analysis import get_anomalies_hist
from etna.analysis import plot_anomalies
anomaly_dict = get_anomalies_hist(train_ts, bins_number=4)
plot_anomalies(train_ts, anomaly_dict)

Вишенкой на торте является интерактивная визуализация, которая позволяет подбирать параметры детектора и сразу видеть, какие точки будут отмечены как аномальные.
Рассмотрим это на примере метода медианы. Его логика [7] проста: для каждой точки вычисляется медиана и стандартное отклонение (std) внутри скользящего окна заданного размера (window_size). Точка считается аномалией, если её значение отклоняется от медианы больше, чем на alpha * std. Параметр alpha регулирует чувствительность, а window_size задаёт локальность оценки:
from etna.analysis import get_anomalies_median
from etna.analysis import plot_anomalies_interactive
plot_anomalies_interactive(
ts=train_ts,
segment='db_service_2',
method=get_anomalies_median,
params_bounds={'window_size': (10, 30, 10), 'alpha':(2, 3, 0.5)}
)

Универсального рецепта для выбора метода детекции аномалий не существует – всё зависит от природы временного ряда и решаемой задачи.
В Etna работа с аномалиями выстроена в два логических этапа:
Первый этап – обнаружение и разметка. С помощью соответствующего преобразования заменим аномальные значения на NaN:
import copy
from etna.transforms import MedianOutliersTransform
anomalies_median_best_params = {'window_size': 20, 'alpha': 2.5}
outliers_remover_transform = MedianOutliersTransform(
in_column="target",
**anomalies_median_best_params
)
train_ts_wo_anomaly = copy.deepcopy(train_ts)
train_ts_wo_anomaly.fit_transform([outliers_remover_transform])
train_ts_wo_anomaly.plot()

Второй этап – заполнение пропусков. Образовавшиеся пропуски заполним с помощью TimeSeriesImputerTransform. После этого визуализируем ряды, чтобы убедиться в адекватности замены:
from etna.transforms import TimeSeriesImputerTransform
from etna.analysis import plot_imputation
outliers_imputer_transform = TimeSeriesImputerTransform(in_column="target", strategy="forward_fill")
plot_imputation(imputer=outliers_imputer_transform, ts=train_ts_wo_anomaly)
train_ts_wo_anomaly.fit_transform([outliers_imputer_transform])

Теперь соберем новый пайплайн, включив в него этапы обработки аномалий, и проверим, удалось ли улучшить прогноз:
anomaly_transforms = [
outliers_remover_transform,
outliers_imputer_transform,
date_to_int_transform,
StandardScalerTransform()
]
anomaly_pipeline = Pipeline(
model=model,
transforms=anomaly_transforms,
horizon=HORIZON
)
anomaly_pipeline.fit(train_ts)
anomaly_forecast_ts = anomaly_pipeline.forecast()
plot_forecast(
train_ts=train_ts_wo_anomaly,
test_ts=test_ts,
forecast_ts={
'naive':naive_forecast_ts,
'linear_base':base_forecast_ts,
'linear_anomaly':anomaly_forecast_ts,
}
)

С аномалиями на ряде db_service_3 мы справились, однако прогноз для рядов db_service_1 и db_service_4 по-прежнему страдает от исторических изменений тренда и сдвигов таргета.
Первая мысль — отбросить «нестабильную» историю и оставить для прогноза только последний период с более-менее постоянным уровнем тренда / таргета. Но как его точно найти? И что, если этот период окажется слишком зашумлённым или коротким, чтобы на нем можно было обнаружить потенциальную аномалию?
Что, если не отбрасывать данные, а, наоборот, обогатить их, разметив все участки с изменениями? Идея в следующем: выделить отрезки между точками смены тренда или уровня и передать эти метки модели в качестве признаков. Тогда у неё появится шанс научиться обобщать закономерности внутри каждого участка.
Для обнаружения точек изменения тренда в Etna есть готовый функционал, включая инструменты для их визуализации. Под капотом используется библиотека Ruptures, специализирующаяся на обнаружении точек разладки (changepoints) в нестационарных временных рядах.

Работа строится по знакомой схеме, похожей на обработку аномалий:
import ruptures as rpt
from etna.analysis import find_change_points
from etna.analysis import plot_time_series_with_change_points
change_points = find_change_points(
ts=train_ts_wo_anomaly, in_column="target",
change_point_model=rpt.Pelt(model='normal'), pen=10
)
plot_time_series_with_change_points(
ts=train_ts_wo_anomaly,
change_points=change_points
)

И, конечно, здесь тоже есть возможность интерактивного подбора параметров детектора точек разладки:
from etna.analysis import plot_change_points_interactive
params_bounds = {
"min_size": [0, 20, 10], # мин. размер сегмента
"pen": [0, 40, 10], # размер штрафа
"jump": [1, 10, 1], # шаг поиска сегментов
}
plot_change_points_interactive(
ts=train_ts_wo_anomaly,
change_point_model=rpt.Pelt,
model='normal',
params_bounds=params_bounds,
model_params=["min_size" ,"jump"],
predict_params=["pen"],
figsize=(6, 3)
)

Как и в случае с детектированием аномалий, универсального решения для поиска точек разладки не существует. В библиотеке Ruptures реализовано несколько алгоритмов поиска, что позволяет выбрать наиболее подходящий вариант для решаемой задачи.
Встраивание детектора точек разладки в пайплайн состоит из двух этапов:
Первый этап – создание Etna-совместимой обёртки. Инициализириуем класс-адаптер для модели Ruptures, чтобы она могла работать внутри экосистемы Etna:
from etna.transforms.decomposition import RupturesChangePointsModel
change_points_model_best_params = {'min_size': 10, 'jump':1}
change_points_predict_best_params = {'pen': 30}
change_points_model = RupturesChangePointsModel(
rpt.Pelt(model='normal', **change_points_model_best_params),
**change_points_predict_best_params
)
Второй этап – применение преобразования ChangePointsSegmentationTransform. Оно использует ранее созданную обёртку для разметки ряда на сегменты и создания категориального признака:
from etna.transforms import ChangePointsSegmentationTransform
change_points_transform = ChangePointsSegmentationTransform(
in_column='target',
change_points_model=change_points_model,
out_column='subsegment_id'
)
Так как линейная модель интерпретирует категориальный признак subsegment_id как обычное число (что бессмысленно), его необходимо преобразовать с помощью one-hot encoding (OHE):
from etna.transforms import OneHotEncoderTransform
ohe_transform = OneHotEncoderTransform(
in_column='subsegment_id',
out_column='subsegment',
return_type='numeric'
)
train_ts_wo_trend = copy.deepcopy(train_ts_wo_anomaly)
train_ts_wo_trend.fit_transform([change_points_transform, ohe_transform])
train_ts_wo_trend[: ,'db_service_2', :]

Важный нюанс: для прогнозного периода это преобразование будет проставлять флаги, соответствующие последней известной дате, поскольку мы не можем предсказать будущие точки изменения.
Наконец-то мы готовы собрать наш финальный пайплайн!
from etna.transforms import FilterFeaturesTransform
subsegment_transforms = [
outliers_remover_transform,
outliers_imputer_transform,
change_points_transform,
ohe_transform,
FilterFeaturesTransform(exclude=['subsegment_id']),
date_to_int_transform,
StandardScalerTransform(in_column=['step'])
]
subsegment_pipeline = Pipeline(
model=model,
transforms=subsegment_transforms,
horizon=HORIZON
)
subsegment_pipeline.fit(train_ts)
subsegment_forecast_ts = subsegment_pipeline.forecast()
plot_forecast(
train_ts=train_ts_wo_anomaly,
test_ts=test_ts,
forecast_ts={
'naive':naive_forecast_ts,
'linear_base':base_forecast_ts,
'linear_anomaly':anomaly_forecast_ts,
'linear_subsegment':subsegment_forecast_ts,
}
)

subsegment_wape = wape(y_true=test_ts, y_pred=subsegment_forecast_ts)
df_metric['linear_subsegment'] = pd.Series(subsegment_wape)
df_metric.round(2)

Временной ряд db_service_1 покорен! Но настоящий вызов ждет впереди – проверка на оставшихся 200+ рядах!

Справится ли наш подход с таким масштабом?
Теперь проявляется вся мощь пайплайна: для масштабирования достаточно передать новые данные в уже настроенные цепочки преобразований:
df_all = pd.read_csv('data_all.csv', parse_dates=['timestamp'])
ts_all = TSDataset(df_all, freq='W-MON')
train_all_ts, test_all_ts = ts_all.train_test_split(test_size=HORIZON)
train_all_ts.regressors.append('step')
num_segments = test_all_ts.describe()['num_segments'][0]
pipelines = [
['naive', naive_pipeline],
['linear_base', base_pipeline],
['linear_anomaly', anomaly_pipeline],
['linear_subsegment', subsegment_pipeline]
]
wape_macro = WAPE(mode='macro')
metrics = {}
for alias, test_pipeline in pipelines:
test_pipeline.fit(train_all_ts)
test_forecast_ts = test_pipeline.forecast()
metrics[alias] = wape_macro(
y_true=test_all_ts,
y_pred=test_forecast_ts
)
pd.Series(metrics, name=f'WAPE of {num_segments} segments').round(2)

Финальный пайплайн уверенно берёт вершину — он снижает ошибку WAPE на 4 процентных пункта относительно наивного бейзлайн-прогноза.
Наше восхождение на гору временных рядов подошло к концу. Мы разбили базовый лагерь (подготовка данных, наивная модель), осторожно обошли трещины аномалий, отыскали ключевые перевалы – точки смены трендов. Наконец, мы взошли на вершину, отмасштабировав решение на 200+ рядов, добившись улучшения метрик.
Библиотека Etna в этом путешествии стала надежным шерпой. Она не только помогла взглянуть на данные под новым углом благодаря удобной визуализации, но и автоматизировала рутину – подготовку данных и прогноз для 200+ рядов без единого цикла for.
Однако стоит упомянуть, что Etna не всегда будет лучшим выбором, ведь её основное предназначение – прогноз множества бизнес-рядов средней длины. За этими рамками её сильные стороны могут нивелироваться:
Прогноз для одного-двух рядов. Весь пайплайновый аппарат Etna будет похож на стрельбу из пушки по воробьям. Проще использовать, например, Prophet.
Миллионы рядов или последовательности длиной в сотни тысяч наблюдений. Производительность может не удовлетворить SLA, здесь стоит смотреть в сторону оптимизированных под такой масштаб решений, например PySpark.
Эксперименты с экзотическими моделями. Если ваш фокус — research и тестирование хайповых нейросетевых архитектур, то фреймворки вроде PyTorch Forecasting или Darts дадут больше свободы.
На какие еще инструменты прогноза временных рядов также стоит обратить внимание [8]:
Prophet – мощный инструмент для обработки сезонности и трендов с учетом праздников.
Sktime – фреймворк с унифицированным интерфейсом для всестороннего анализа временных рядов, включая прогнозирование, классификацию, кластеризацию и регрессию, с поддержкой современных алгоритмов.
Greykite – библиотека от LinkedIn, способная учитывать сложные паттерны, такие как множественные сезонности и эффекты взаимодействий, с помощью собственного алгоритма прогноза Silverkite.
Теперь пора ставить новые цели и использовать добытые знания. Удачи в покорении ваших данных!
При подготовке данной статьи использовались материалы из документации Etna [9].
P.S. С какими неожиданными «горными препятствиями» сталкивались вы в своих временных рядах? Делитесь в комментариях.
Автор: rounder
Источник [10]
Сайт-источник BrainTools: https://www.braintools.ru
Путь до страницы источника: https://www.braintools.ru/article/24455
URLs in this post:
[1] боль: http://www.braintools.ru/article/9901
[2] поддержкой моделей промо-прогноза: https://habr.com/ru/companies/magnit/articles/748680/
[3] ценообразовании: https://habr.com/ru/companies/magnit/articles/954224/
[4] доступности товара на полке: https://habr.com/ru/companies/magnit/articles/889658/
[5] Etna: https://github.com/etna-team/etna/
[6] ошибки: http://www.braintools.ru/article/4192
[7] логика: http://www.braintools.ru/article/7640
[8] внимание: http://www.braintools.ru/article/7595
[9] документации Etna: https://docs.etna.ai/stable/index.html
[10] Источник: https://habr.com/ru/companies/magnit/articles/985864/?utm_campaign=985864&utm_source=habrahabr&utm_medium=rss
Нажмите здесь для печати.