- BrainTools - https://www.braintools.ru -

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna

Я работаю дата-сайентистом 5 лет и до сих пор испытываю боль [1], когда нужно сделать MVP по временным рядам. Начиная с того, как построить несколько графиков одновременно без «слипшихся» меток по осям, заканчивая поиском подходящего метода очистки ряда от аномалий. И всё это венчает цикл по каждому ряду с бесконечным жонглированием данными между numpy, pandas, sklearn, yet_another_library.

Если вы DS, и тоже, как и я, устали от вот этого всего, добро пожаловать под кат. Я покажу, как написать production-ready код для прогноза 200+ временных рядов от EDA до результата. Разберем на практике, как бороться с аномалиями, ловить смены тренда и в итоге – получить масштабируемое решение, а не очередной «велосипед».

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 1

Какую вершину нам предстоит покорить?

Привет, Habr! Меня зовут Саша Солин, я ML-инженер в MAGNIT TECH. Наша команда занимается развитием и поддержкой моделей промо-прогноза [2]. В первом приближении прогнозирование промо может показаться задачей временных рядов, но по ряду причин, мы подходим к ней как к задаче классической регрессии. За счет своей простоты, некоторые из моделей получалось успешно переиспользовать в других проектах, не связанных с промо-прогнозом, например в ценообразовании [3] или доступности товара на полке [4].

Однако этому пришел конец, когда к нам пришли с задачей… прогнозирования роста журнала изменений БД. По предоставленным временным рядам 200+ сервисов необходимо сделать понедельный прогноз на несколько лет вперед для планирования инфраструктуры БД.

Достаем наш альпинистский набор Data Scientist’a и отправляемся покорять гору рядов!

Базовый лагерь: готовим снаряжение и планируем маршрут

Спойлер: данные представляют собой нестационарные временные ряды без выраженной сезонности, поэтому сразу отмели Prophet & ARIMA-like модели и решили смотреть сторону time-series библиотек с менее требовательными к рядам моделями. Выбор пал на фреймворк Etna [5] – он предоставляет набор удобных инструментов для работы с множественными временными рядами.

Перейдем от слов к делу, посмотрим на часть данных, с которыми будем работать:

import pandas as pd
from etna.datasets import TSDataset

# Читаем данные и преобразуем в "широкий формат Этна"
df = pd.read_csv('data.csv', parse_dates=['timestamp'])
ts = TSDataset(df, freq='W-MON')

ts
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 2

Данных минимум – четыре временных ряда с временными метками и таргетом (размер журнала изменений). Давайте взглянем на графики:

ts.plot()
Настоящий джекпот: нестационарные временные ряды со сменой тренда (1, 2), вкраплениями аномалий (3) и резкими изменениями уровня таргета (4).

Настоящий джекпот: нестационарные временные ряды со сменой тренда (1, 2), вкраплениями аномалий (3) и резкими изменениями уровня таргета (4).
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 4

Для начала построим бейзлайн – наивный прогноз, где прогнозное значение равно последнему известному наблюдению. Он послужит отправной точкой для оценки более сложных моделей.

Перед этим разобьем наш датасет на обучающую и тестовую выборки и создадим Pipeline. Он предназначен для последовательного выполнения цепочки создания /преобразования признаков и работы с моделью.

В данном примере использование пайплайна может показаться overkill’ом, но это позволит в дальнейшем проводить все эксперименты по единой схеме и поскорее освоиться с API фреймворка:

from etna.models import NaiveModel
from etna.pipeline import Pipeline
from etna.analysis import plot_forecast

# Формируем тестовую выборку из 20 последних наблюдений
HORIZON = 20
train_ts, test_ts = ts.train_test_split(test_size=HORIZON)

# Объявляем модель
naive_model = NaiveModel()

# Объявляем пайплайн
naive_pipeline = Pipeline(model=naive_model, horizon=HORIZON)

# Обучаем модель и делаем прогноз
naive_pipeline.fit(train_ts)
naive_forecast_ts = naive_pipeline.forecast()

# Визуализируем прогноз
plot_forecast(forecast_ts=naive_forecast_ts, test_ts=test_ts, train_ts=train_ts)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 5

Для оценки прогноза воспользуемся метрикой взвешенной абсолютной процентной ошибки [6] WAPE:

text{WAPE}=frac{sumlimits_{i=1}^{n} |y_i - hat{y}_i|}{sumlimits_{i=1}^{n} |y_i|}

from etna.metrics import WAPE

wape = WAPE()
naive_wape = wape(y_true=test_ts, y_pred=naive_forecast_ts)
df_metric = pd.DataFrame.from_dict(
    data=naive_wape, orient='index', columns=['naive']
) 
df_metric.round(2)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 7

Бейзлайн готов, ошибка WAPE по каждому из рядов посчитана. Теперь попробуем с наскока снизить ошибку, заменив модель на линейную регрессию.

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 8

Первый и самый простой признак, который мы создадим для неё — линейный тренд. Это разница в днях между датой каждого наблюдения и началом временного ряда. Для этого нам потребуется преобразование, которое переведет разность дат в число:

from etna.transforms import LambdaTransform

def date_to_int(column: pd.DataFrame) -> pd.DataFrame:
    """
    Получаем дату из индекса и вычисляем
    разницу в днях от константной даты
    """
    
    mix_date = column.index.min()
    column = column 
        .apply(lambda x: (x.index - mix_date).days) 
        .div(7) 
        .add(1) 
        .astype('float')
    return column

# Объявим лямбда преобразование, применяющее пользовательскую функцию к столбцу
date_to_int_transform = LambdaTransform(
    in_column='target', # столбец, к которому применяется преобразование
    inplace=False, # результат преобразования не перезаписывает in_column
    out_column='step', # столбец с результатом выполнения функции
    transform_func=date_to_int, # функция преобразования
    inverse_transform_func=lambda x: x, # обратное преобразование не требуется
)
date_to_int_transform.fit_transform(train_ts)
train_ts.head(3)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 9

Убедившись, что преобразование работает корректно, добавим его в пайплайн для линейной модели и сделаем новый прогноз. Здесь пайплайн наконец-то раскрывает свой потенциал: он создает новый признак для тренировочной и тестовой выборок, выполняет его стандартизацию и передает в модель:

from etna.models import LinearPerSegmentModel
from etna.transforms import StandardScalerTransform

# Объявляем линейную модель обучаемую посегментно
model = LinearPerSegmentModel()

# Объявляем последовательность преобразований
base_transforms = [
    date_to_int_transform,
    StandardScalerTransform()
]
base_pipeline = Pipeline(
    model=model,
    transforms=base_transforms,
    horizon=HORIZON
)

# Явно указываем, что столбец step это регрессор,
# Иначе модель его "не увидит"
train_ts.regressors.append('step')

base_pipeline.fit(train_ts)
base_forecast_ts = base_pipeline.forecast()

plot_forecast(
    train_ts=train_ts,
    test_ts=test_ts,
    forecast_ts={
        'naive':naive_forecast_ts,
        'linear_base':base_forecast_ts
    }
)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 10

Проверим метрики:

linear_base_wape = wape(y_true=test_ts, y_pred=base_forecast_ts)
df_metric['linear_base'] =  pd.Series(linear_base_wape)
df_metric.round(2)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 11

Линейной регрессии с простым трендом удалось снизить ошибку относительно бейзлайна лишь в половине случаев. В остальных случаях ошибка, наоборот, выросла — в два и в четыре раза.

Просчитались, но где?

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 12

Первый штурм не увенчался успехом. Посмотрим, какое еще снаряжение есть в нашем альпинистском наборе дата-сайентиста.

Обходим трещины и лавины: обнаруживаем и исключаем аномалии

Etna из коробки предлагает готовые методы для детектирования аномалий вместе с удобными средствами визуализации. Ниже приведены примеры некоторых из них.

Метод гистограмм – данные аппроксимируются с помощью гистограммы, где точки, удаление которых снижает ошибку аппроксимации, считаются аномалиями:

from etna.analysis import get_anomalies_hist
from etna.analysis import plot_anomalies

anomaly_dict = get_anomalies_hist(train_ts, bins_number=4)
plot_anomalies(train_ts, anomaly_dict)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 13

Вишенкой на торте является интерактивная визуализация, которая позволяет подбирать параметры детектора и сразу видеть, какие точки будут отмечены как аномальные.

Рассмотрим это на примере метода медианы. Его логика [7] проста: для каждой точки вычисляется медиана и стандартное отклонение (std) внутри скользящего окна заданного размера (window_size). Точка считается аномалией, если её значение отклоняется от медианы больше, чем на alpha * std. Параметр alpha регулирует чувствительность, а window_size задаёт локальность оценки:

from etna.analysis import get_anomalies_median
from etna.analysis import plot_anomalies_interactive

plot_anomalies_interactive(
    ts=train_ts,
    segment='db_service_2',
    method=get_anomalies_median,
    params_bounds={'window_size': (10, 30, 10), 'alpha':(2, 3, 0.5)}
)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 14

Универсального рецепта для выбора метода детекции аномалий не существует – всё зависит от природы временного ряда и решаемой задачи.

В Etna работа с аномалиями выстроена в два логических этапа:

Первый этап – обнаружение и разметка. С помощью соответствующего преобразования заменим аномальные значения на NaN:

import copy
from etna.transforms import MedianOutliersTransform

anomalies_median_best_params = {'window_size': 20, 'alpha': 2.5}
outliers_remover_transform = MedianOutliersTransform(
    in_column="target",
    **anomalies_median_best_params
)

train_ts_wo_anomaly = copy.deepcopy(train_ts)
train_ts_wo_anomaly.fit_transform([outliers_remover_transform])
train_ts_wo_anomaly.plot()
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 15

Второй этап – заполнение пропусков. Образовавшиеся пропуски заполним с помощью TimeSeriesImputerTransform. После этого визуализируем ряды, чтобы убедиться в адекватности замены:

from etna.transforms import TimeSeriesImputerTransform
from etna.analysis import plot_imputation


outliers_imputer_transform = TimeSeriesImputerTransform(in_column="target", strategy="forward_fill")
plot_imputation(imputer=outliers_imputer_transform, ts=train_ts_wo_anomaly)
train_ts_wo_anomaly.fit_transform([outliers_imputer_transform])
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 16

Теперь соберем новый пайплайн, включив в него этапы обработки аномалий, и проверим, удалось ли улучшить прогноз:

anomaly_transforms = [
    outliers_remover_transform,
    outliers_imputer_transform,
    date_to_int_transform,
    StandardScalerTransform()
]
anomaly_pipeline = Pipeline(
    model=model,
    transforms=anomaly_transforms,
    horizon=HORIZON
)
anomaly_pipeline.fit(train_ts)
anomaly_forecast_ts = anomaly_pipeline.forecast()

plot_forecast(
    train_ts=train_ts_wo_anomaly,
    test_ts=test_ts,
    forecast_ts={
        'naive':naive_forecast_ts,
        'linear_base':base_forecast_ts,
        'linear_anomaly':anomaly_forecast_ts,
    }
)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 17

С аномалиями на ряде db_service_3 мы справились, однако прогноз для рядов db_service_1 и  db_service_4 по-прежнему страдает от исторических изменений тренда и сдвигов таргета.

Первая мысль — отбросить «нестабильную» историю и оставить для прогноза только последний период с более-менее постоянным уровнем тренда / таргета. Но как его точно найти? И что, если этот период окажется слишком зашумлённым или коротким, чтобы на нем можно было обнаружить потенциальную аномалию?

Что, если не отбрасывать данные, а, наоборот, обогатить их, разметив все участки с изменениями? Идея в следующем: выделить отрезки между точками смены тренда или уровня и передать эти метки модели в качестве признаков. Тогда у неё появится шанс научиться обобщать закономерности внутри каждого участка.

Находим перевалы: детектируем точки изменения тренда

Для обнаружения точек изменения тренда в Etna есть готовый функционал, включая инструменты для их визуализации. Под капотом используется библиотека Ruptures, специализирующаяся на обнаружении точек разладки (changepoints) в нестационарных временных рядах.

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 18

Работа строится по знакомой схеме, похожей на обработку аномалий:

import ruptures as rpt

from etna.analysis import find_change_points
from etna.analysis import plot_time_series_with_change_points

change_points = find_change_points(
    ts=train_ts_wo_anomaly, in_column="target",
    change_point_model=rpt.Pelt(model='normal'), pen=10
)
plot_time_series_with_change_points(
    ts=train_ts_wo_anomaly,
    change_points=change_points
)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 19

И, конечно, здесь тоже есть возможность интерактивного подбора параметров детектора точек разладки:

from etna.analysis import plot_change_points_interactive

params_bounds = {
    "min_size": [0, 20, 10], # мин. размер сегмента
    "pen": [0, 40, 10], # размер штрафа
    "jump": [1, 10, 1], # шаг поиска сегментов
}
plot_change_points_interactive(
    ts=train_ts_wo_anomaly,
    change_point_model=rpt.Pelt,
    model='normal',
    params_bounds=params_bounds,
    model_params=["min_size" ,"jump"],
    predict_params=["pen"],
    figsize=(6, 3)
)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 20

Как и в случае с детектированием аномалий, универсального решения для поиска точек разладки не существует. В библиотеке Ruptures реализовано несколько алгоритмов поиска, что позволяет выбрать наиболее подходящий вариант для решаемой задачи.

Встраивание детектора точек разладки в пайплайн состоит из двух этапов:

Первый этап – создание Etna-совместимой обёртки. Инициализириуем класс-адаптер для модели Ruptures, чтобы она могла работать внутри экосистемы Etna:

from etna.transforms.decomposition import RupturesChangePointsModel

change_points_model_best_params = {'min_size': 10, 'jump':1}
change_points_predict_best_params = {'pen': 30}

change_points_model = RupturesChangePointsModel(
    rpt.Pelt(model='normal', **change_points_model_best_params),
    **change_points_predict_best_params
)

Второй этап – применение преобразования ChangePointsSegmentationTransform. Оно использует ранее созданную обёртку для разметки ряда на сегменты и создания категориального признака:

from etna.transforms import ChangePointsSegmentationTransform

change_points_transform = ChangePointsSegmentationTransform(
    in_column='target', 
    change_points_model=change_points_model,
    out_column='subsegment_id'
)

Так как линейная модель интерпретирует категориальный признак subsegment_id как обычное число (что бессмысленно), его необходимо преобразовать с помощью one-hot encoding (OHE):

from etna.transforms import OneHotEncoderTransform

ohe_transform = OneHotEncoderTransform(
    in_column='subsegment_id',
    out_column='subsegment',
    return_type='numeric'
)

train_ts_wo_trend = copy.deepcopy(train_ts_wo_anomaly)
train_ts_wo_trend.fit_transform([change_points_transform, ohe_transform])
train_ts_wo_trend[: ,'db_service_2', :]
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 21

Важный нюанс: для прогнозного периода это преобразование будет проставлять флаги, соответствующие последней известной дате, поскольку мы не можем предсказать будущие точки изменения.

На вершине: сводим всё воедино и любуемся панорамой прогноза

Наконец-то мы готовы собрать наш финальный пайплайн!

Рейнджеры Преобразования, объединяемся!

Рейнджеры Преобразования, объединяемся!
from etna.transforms import FilterFeaturesTransform

subsegment_transforms = [
    outliers_remover_transform,
    outliers_imputer_transform,
    change_points_transform,
    ohe_transform,
    FilterFeaturesTransform(exclude=['subsegment_id']),
    date_to_int_transform,
    StandardScalerTransform(in_column=['step'])
]
subsegment_pipeline = Pipeline(
    model=model,
    transforms=subsegment_transforms,
    horizon=HORIZON
)
subsegment_pipeline.fit(train_ts)

subsegment_forecast_ts = subsegment_pipeline.forecast()

plot_forecast(
    train_ts=train_ts_wo_anomaly,
    test_ts=test_ts,
    forecast_ts={
        'naive':naive_forecast_ts,
        'linear_base':base_forecast_ts,
        'linear_anomaly':anomaly_forecast_ts,
        'linear_subsegment':subsegment_forecast_ts,
    }
)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 23
subsegment_wape = wape(y_true=test_ts, y_pred=subsegment_forecast_ts)
df_metric['linear_subsegment'] =  pd.Series(subsegment_wape)
df_metric.round(2)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 24

Временной ряд db_service_1 покорен! Но настоящий вызов ждет впереди – проверка на оставшихся 200+ рядах!

Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 25

Справится ли наш подход с таким масштабом?

Теперь проявляется вся мощь пайплайна: для масштабирования достаточно передать новые данные в уже настроенные цепочки преобразований:

df_all = pd.read_csv('data_all.csv', parse_dates=['timestamp'])
ts_all = TSDataset(df_all, freq='W-MON')

train_all_ts, test_all_ts = ts_all.train_test_split(test_size=HORIZON)
train_all_ts.regressors.append('step')
num_segments = test_all_ts.describe()['num_segments'][0]

pipelines = [
    ['naive', naive_pipeline],
    ['linear_base', base_pipeline],
    ['linear_anomaly', anomaly_pipeline],
    ['linear_subsegment', subsegment_pipeline]
]
wape_macro = WAPE(mode='macro')
metrics = {}
for alias, test_pipeline in pipelines:
    test_pipeline.fit(train_all_ts)
    test_forecast_ts = test_pipeline.forecast()
    metrics[alias] = wape_macro(
        y_true=test_all_ts,
        y_pred=test_forecast_ts
    )

pd.Series(metrics, name=f'WAPE of {num_segments} segments').round(2)
Покоряем гору временных рядов: делаем прогноз для 200+ рядов с библиотекой Etna - 26

Финальный пайплайн уверенно берёт вершину — он снижает ошибку WAPE на 4 процентных пункта относительно наивного бейзлайн-прогноза.

Заключение: с вершины к новым горизонтам

Наше восхождение на гору временных рядов подошло к концу. Мы разбили базовый лагерь (подготовка данных, наивная модель), осторожно обошли трещины аномалий, отыскали ключевые перевалы – точки смены трендов. Наконец, мы взошли на вершину, отмасштабировав решение на 200+ рядов, добившись улучшения метрик.

Библиотека Etna в этом путешествии стала надежным шерпой. Она не только помогла взглянуть на данные под новым углом благодаря удобной визуализации, но и автоматизировала рутину – подготовку данных и прогноз для 200+ рядов без единого цикла for.

Однако стоит упомянуть, что Etna не всегда будет лучшим выбором, ведь её основное предназначение – прогноз множества бизнес-рядов средней длины. За этими рамками её сильные стороны могут нивелироваться:

  • Прогноз для одного-двух рядов. Весь пайплайновый аппарат Etna будет похож на стрельбу из пушки по воробьям. Проще использовать, например, Prophet.

  • Миллионы рядов или последовательности длиной в сотни тысяч наблюдений. Производительность может не удовлетворить SLA, здесь стоит смотреть в сторону оптимизированных под такой масштаб решений, например PySpark.

  • Эксперименты с экзотическими моделями. Если ваш фокус — research и тестирование хайповых нейросетевых архитектур, то фреймворки вроде PyTorch Forecasting или Darts дадут больше свободы.

На какие еще инструменты прогноза временных рядов также стоит обратить внимание [8]:

  • Prophet – мощный инструмент для обработки сезонности и трендов с учетом праздников.

  • Sktime – фреймворк с унифицированным интерфейсом для всестороннего анализа временных рядов, включая прогнозирование, классификацию, кластеризацию и регрессию, с поддержкой современных алгоритмов.

  • Greykite – библиотека от LinkedIn, способная учитывать сложные паттерны, такие как множественные сезонности и эффекты взаимодействий, с помощью собственного алгоритма прогноза Silverkite.

Теперь пора ставить новые цели и использовать добытые знания. Удачи в покорении ваших данных!

 При подготовке данной статьи использовались материалы из документации Etna [9].

P.S. С какими неожиданными «горными препятствиями» сталкивались вы в своих временных рядах? Делитесь в комментариях.

Автор: rounder

Источник [10]


Сайт-источник BrainTools: https://www.braintools.ru

Путь до страницы источника: https://www.braintools.ru/article/24455

URLs in this post:

[1] боль: http://www.braintools.ru/article/9901

[2] поддержкой моделей промо-прогноза: https://habr.com/ru/companies/magnit/articles/748680/

[3] ценообразовании: https://habr.com/ru/companies/magnit/articles/954224/

[4] доступности товара на полке: https://habr.com/ru/companies/magnit/articles/889658/

[5] Etna: https://github.com/etna-team/etna/

[6] ошибки: http://www.braintools.ru/article/4192

[7] логика: http://www.braintools.ru/article/7640

[8] внимание: http://www.braintools.ru/article/7595

[9] документации Etna: https://docs.etna.ai/stable/index.html

[10] Источник: https://habr.com/ru/companies/magnit/articles/985864/?utm_campaign=985864&utm_source=habrahabr&utm_medium=rss

www.BrainTools.ru

Rambler's Top100