Slow Feature Analysis. Разбор метода и реализация на Python с нуля. python.. python. Алгоритмы.. python. Алгоритмы. анализ данных.. python. Алгоритмы. анализ данных. математика.. python. Алгоритмы. анализ данных. математика. Машинное обучение.. python. Алгоритмы. анализ данных. математика. Машинное обучение. машинное+обучение.. python. Алгоритмы. анализ данных. математика. Машинное обучение. машинное+обучение. Программирование.

В этой статье я хочу рассказать про метод обучения без учителя – “Анализ медленных признаков” (Slow Feature Analysis), далее SFA. Метод был разработан в 2002 году Лоренцом Вискоттом и Терренсом Сейновски.

SFA можно использовать для выделения стабильных сигналов на фоне шума, такие как отслеживание объектов на видео, трендов цен из финансовых данных, признаков износа по вибрациям оборудования.

SFA – метод изучения инвариантных или медленно изменяющихся признаков на основе векторного входного сигнала. Эту задачу можно свести к минимизации скорости изменения проекций. В статье будет рассмотрен многомерный случай, но принцип легко обобщается на одномерный через вложение Такенса.

Что такое вложение Такенса?

Вложение Такенса – метод реконструкции фазового пространства динамической системы по временному ряду.
Чтобы воспользоваться этим методом нужно определить минимальное необходимое пространство D. Сделать это можно методом ложных ближайших соседей. По теореме Такенса если собладается

D >=2k + 1, где k истинная размерность аттрактора, то реконструкция будет топологически эквивалентна исходной системе. Далее нам нужно выбрать временную задержку tau и мы сможем построить из исходного ряда векторы размерности D:

X(t)=(x(t), x(t + τ), x(t + 2τ), ...x(t + (D − 1)τ).

Каждая точка в новом d-мерном пространстве соответствует “окну” длины D из исходного ряда.

Таким образом, даже если у вас одномерный временной ряд, вы можете сначала применить вложение Такенса для восстановления фазового пространства, а затем подать полученный многомерный сигнал на вход SFA для извлечения медленных управляющих параметров системы.

Объяснение алгоритма и реализация его на Python

Пусть у нас есть многомерный временной сигнал X(t) длины T и количеством признаков N такой, что:

X(t)=[x_1(t), x_2(t), x_3(t), ... x_N(t)],

Где x_i(t) значения временного сигнала, а t = 1, 2, …, T.

Для этой задачи будет создан искусственный пример – три различных сигнала:

  • Синус с белым шумом

  • Синус с угасающим розовым шумом

  • Синус с экстремальными значениями и броуновским блужданием.

Примечание: наличие розового шума, броуновского блуждания и выбросов сделано намеренно, чтобы подсветить границы применимости метода. Как будет показано далее на графиках, SFA успешно выделяет общий тренд, но его весовые коэффициенты для зашумленных каналов стремятся к нулю, эффективно отключая “сломанный сигнал”. Это показывает как силу, так и ограничение метода – он не восстанавливает сигнал из шума, а стремится игнорировать источник шума.

В примере будут использоваться исходные три столбца, но для более сложных задач можно создавать степенные, логарифмические и более сложные признаки и их композиции.

series_1

series_2

series_3

0

0.3249

-0.6801

0.2065

1

-0.0972

0.1738

0.7730

2

-0.0553

-0.3769

0.3526

3

-0.1391

0.0540

0.9979

4

0.2736

0.3680

0.7669

Slow Feature Analysis. Разбор метода и реализация на Python с нуля - 7
Код на Python

import numpy as np
import pandas as pd

np.random.seed(1)
pd.set_option('display.float_format', '{:.4f}'.format)

data_size = 500
# 4 периода синуса
x = np.linspace(0, 4 * np.pi, data_size) 

sin_wave = np.sin(x)
# Создание синуса с белым шумом
white_noise = np.random.normal(0, 0.2, data_size)
series_1 = sin_wave + white_noise

# Создание синуса угасающим розовым шумом
pink_noise = (lambda x: x / np.std(x))(
    np.real(
        np.fft.ifft(
            np.concatenate([
                [0],
                (1 / np.sqrt(np.abs(np.fft.fftfreq(data_size)[1:]) + 1e-10))
                * np.exp(2j * np.pi * np.random.random(data_size - 1))
            ])
        )
    )
)

decay_envelope = 2.25 * np.exp(-2.25 * x / x.max()) + 0.25

pink_noise = pink_noise * decay_envelope

series_2 = sin_wave + 0.4 * pink_noise

# Создание синуса с выбросами и броуновским движением
brownian = np.cumsum(np.random.normal(0, 0.5, data_size))
extreme_values = np.zeros(data_size)
outlier_indices = np.random.choice(data_size, size=int(0.5 * data_size), replace=False)
extreme_values[outlier_indices] = np.random.uniform(-1, 1, size=len(outlier_indices))
series_3 = sin_wave + extreme_values + 0.4 * brownian

df = pd.DataFrame({
    'series_1': series_1,
    'series_2': series_2,
    'series_3': series_3
})
  

Мы хотим найти такие функции g_j(X), чтобы выходные сигналы y_j(t)=g_j(X(t)) изменялись наиболее медленно. Это достигается минимизацией следующего функционала:

Δ_j=⟨dot{y}_j(t)^2⟩,

Где dot{y}_j производная по времени, а треугольные скобки ⟨⟩ означают усреднение по времени. Так как у нас дискретный случай, то производная по времени будет считаться следующим образом:

dot{y}_j approx y_j(t+1) - y_j(t).

Для статьи был выбран самый простой способ, но при необходимости можно использовать и центральные разности:

dot{y}_j approx frac {y_j(t + 1) - y_j(t - 1)}{2}.

Как и у многих оптимизационных задач здесь есть ограничения:

  1. Нулевое среднее. Это нужно чтобы исключить неинформативные константные признаки.

frac{1}{T} sum_{t=1}^{T} y_j(t)=0.

  1. Единичная дисперсия. Все признаки нужно привести к одному масштабу.

frac{1}{T} sum_{t=1}^{T} (y_j(t))^2=1.

  1. Некоррелированность. Это нужно чтобы каждый следующий признак не дублировал информацию.

frac{1}{T} sum_{t=1}^{T} y_i(t) cdot y_j(t)=0, quad text{при } i neq j.

В настоящий момент данные образуют следующее облако точек:

Slow Feature Analysis. Разбор метода и реализация на Python с нуля - 17

Чтобы наши данные удовлетворяли этим требованиям мы сделаем следующие действия:

Для центрирования следует вычесть из каждого элемента среднее:

tilde{x}(t)=x(t) - overline{x}.

series_1

series_2

series_3

0

0.3142

-0.6037

-1.6258

1

-0.1079

0.2502

-1.0593

2

-0.0660

-0.3006

-1.4797

3

-0.1498

0.1304

-0.8344

4

0.2630

0.4444

-1.0654

Код на Python

df_centr = pd.DataFrame()

for column in df.columns:
    df_centr[column] = df[column] - df[column].mean()
  

Вычисляем ковариационную матрицу. Ковариационная матрица признаков определяется как математическое ожидание внешнего произведения центрированного вектора этих признаков, то получается следующее:

A=⟨tilde{x}(t)tilde{x}^T(t)⟩.

series_1

series_2

series_3

series_1

0.5298

0.4110

-0.2132

series_2

0.4110

0.6339

-0.1706

series_3

-0.2132

-0.1706

0.9572

Код на Python

A = (df_centr.T @ df_centr) / (len(df_centr) - 1)
  

или



A = np.cov(df_centr.T)
  

Теперь нужно сделать сферизацию, ZCA whitening или, как это еще называют, отбеливание Махаланобиса. Для этого нужно выполнить сингулярное разложение ковариационной матрицы. Получится следующее: UDU^{T}, важно заметить, что для наших целей элементы матрицы D будут в степени -frac{1}{2}. Полученная формула будет такой:

A_s=UD^{-frac{1}{2}}U^{T}.

Далее нам нужно умножить:

tilde{X}=tilde{X}A_s.

Матрица A_s выполняет преобразование, которое приводит данные к единичной ковариационной матрице ⟨tilde{x}_stilde{x}_s^{T}⟩=I, где I – единичная матрица.

Код на Python

# Выполняем сингулярное разложение матрицы A
U, D, Vt = np.linalg.svd(A)
# Создаём диагональную матрицу из обратных квадратных корней сингулярных значений
D_inv_sqrt = np.diag(D ** (-0.5))  
# Умножение U @ D_inv_sqrt @ U.T даёт матрицу, которая будет использоваться для сферирования данных
A_s = U @ D_inv_sqrt @ U.T
# Применяем сферирование к df
df_sphered = df_centr @ A_s
  

series_1

series_2

series_3

0

0.7398

-1.2908

-1.6972

1

-0.5523

0.4318

-1.1252

2

-0.1557

-0.5195

-1.5904

3

-0.5084

0.2768

-0.9004

4

-0.0018

0.4875

-1.0604

Связаны ли между собой расстояние и отбеливание Махаланобиса?

Если кратко: ZCA-отбеливание превращает расстояние Махаланобиса в обычное Евклидово расстояние.

Расстояние Махаланобиса измеряет удалённость точки x от центра облака данных mu с учётом формы этого облака (ковариации Sigma):

D_M(x)=sqrt{(x - mu)^T Sigma^{-1} (x - mu)}.

ZCA-отбеливание (то самое, которое мы применяем в SFA) преобразует данные по формуле:

x_{text{white}}=Sigma^{-1/2} (x - mu).

Теперь посчитаем обычное Евклидово расстояние от нуля до x_{text{white}} в отбеленном пространстве:

|x_{text{white}}|^2=x_{text{white}}^T x_{text{white}}=(x - mu)^T Sigma^{-1/2} Sigma^{-1/2} (x - mu)=(x - mu)^T Sigma^{-1} (x - mu)=D_M^2(x).

Расстояние Махаланобиса в исходном пространстве равно обычному Евклидову расстоянию в отбеленном пространстве.

После отбеливания облако точек выглядит так:

Slow Feature Analysis. Разбор метода и реализация на Python с нуля - 31

Вычисляем производные сферированных данных:

dot{x}(t)=tilde{x}(t+1) - tilde{x}(t).

series_1

series_2

series_3

0

-1.2920

1.7227

0.5721

1

0.3966

-0.9514

-0.4653

2

-0.3527

0.7964

0.6900

3

0.5065

0.2106

-0.1600

4

-1.4053

1.0828

-0.0462

Примечание: усреднение ⟨⟩ идет по t от 1 до T-1, так как для последнего элемента T нельзя вычислить dot{x}(T) по формуле x(T+1) - x(T).

Код на Python

df_derivative = df_sphered.diff().dropna().reset_index(drop=True)
  

Центрируем полученные производные:

tilde{dot{x}}(t)=dot{x}(t) - overline{dot{x}}.

series_1

series_2

series_3

0

-1.2913

1.7195

0.5629

1

0.3974

-0.9545

-0.4744

2

-0.3520

0.7932

0.6809

3

0.5073

0.2075

-0.1692

4

-1.4046

1.0797

-0.0553

Код на Python

for column in df_derivative.columns:
    df_derivative[column] = df_derivative[column] - df_derivative[column].mean()
  

Создаем матрицу ковариации B из полученных данных.

B=⟨tilde{dot{x}}(t)tilde{dot{x}}^T(t)⟩.

series_1

series_2

series_3

series_1

0.3402

-0.2491

0.0657

series_2

-0.2491

0.4223

0.0531

series_3

0.0657

0.0531

0.4133

Код на Python

B = (df_derivative.T @ df_derivative) / (len(df_derivative) - 1)
  

или


B = np.cov(df_derivative.T)
  

Вспомним, что мы хотели минимизировать среднюю квадратичную скорость изменения проекции:

Δ_j=⟨dot{y}(t)^2⟩⟨dot{y}(t)^2⟩=w^{T}⟨tilde{dot{x}}(t)tilde{dot{x}}^{T}(t)⟩w.

А ⟨tilde{dot{x}}(t)tilde{dot{x}}^{T}(t)⟩ есть ни что иное как матрица ковариации B. Поэтому:

⟨dot{y}(t)^2⟩=w^{T}Bw

Примечание: для общности вывода мы сначала рассмотрим случай без сферизации, где ограничение записывается как w^{T}Aw=1. Затем подставим A=I как частный случай.

Для поиска решений используем метод множителей Лагранжа.
Чтобы найти значение j-того признака запись формулы будет иметь следующий вид:

L(w, lambda, mu_1, ... mu_{j})=w_j^{T}Bw_j - lambda (w_j^{T}Aw_j - 1) - sum_{i=1}^{j - 1} mu_i w_j^{T}Aw_i.

Берем производную по w и приравниваем к нулю:

frac{partial L}{partial w_j}=2Bw_j - 2lambda Aw_j - sum_{i=1}^{j - 1} mu_i Aw_i=0.

Умножаем слева на w_k^T A^{-1}, где k – индекс любого уже найденного решения (k < j).

У нас получается:

2w_k^{T}B_sw_j - 2lambda w_k^{T}Aw_j - sum_{i=1}^{j - 1} mu_i w_k^{T}Aw_i=0.

Из-за условия ортогональности w_k^{T}Aw_j=0 и w_k^{T}Bw_j​=0.

- sum_{i=1}^{j - 1} mu_i w_k^{T}Aw_i=0.

Так как при i=k мы получаем w_k^{T}A_sw_k=1. Из этого у нас следует то, что mu_i=0 и мы можем переписать уравнение в следующем виде:

B_sw_j=lambda_jA_sw_j.

Эта задача сводится к поиску собственных векторов матрицы (A_s^{1/2})^T B A_s^{1/2}.

Так как ранее предварительно сферировали данные, их ковариационная матрица стала единичной – A_s=I, поэтому получается Bw_j=lambda_jIw_j, что можем записать еще более просто: Bw_j=lambda_jw_j.

Где lambda_j это собственное значение, чем оно меньше, тем более медленный признак. Поэтому сортируем по возрастанию значения lambda_1 < lambda_2 <…<lambda_D

В результате вычисления собственных значений матрицы B получаем:

lambda_1=0.1055, quad lambda_2=0.4364, quad lambda_3=0.6337

Здесь видно, что первая компонента изменяется практически в 6 раз медленнее третьей.

Признаки получатся из решения y_j(t)=w^{T}_jtilde{x}, где y_1(t)=w^{T}_1tilde{x} самый медленный признак, y_2 следующий и так далее. Поэтому решение будет выглядеть как:

y(t)=[y_1, y_2,... y_D]^{T}=W^{T}tilde{x}(t).

Где W=[w_1, w_2,... w_D] матрица собственных весов.

slow_feature_1

slow_feature_2

slow_feature_3

0

0.1892

-1.7225

-1.4461

1

0.1606

-1.1158

0.6976

2

-0.0173

-1.6569

-0.2794

3

0.0368

-0.9182

0.5490

4

0.5832

-0.9358

0.3825

Код на Python

# Вычисляем собственные значения и собственные векторы марицы B
lambdas, w = np.linalg.eigh(B)
# Получаем индексы для сортировки собственных значений
sort_idx = np.argsort(lambdas)
# Сортируем собственные значения
lambdas_sorted = lambdas[sort_idx]
# Сортируем собственные векторы
w_sorted = w[:, sort_idx]
# Эвристика, которая нужна для воспроизводимости, так как иногда
 из-за особенностей вычисления график будет отражаться. 
# Например библиотека sksfa может вернуть перевёрнутый график 
# по сравнению с ручной реализацией. 
# Умножение на -1 исправит его положение.
for i in range(w_sorted.shape[1]):
    if np.sum(w_sorted[:, i]) < 0:
        w_sorted[:, i] *= -1
# Умножаем матрицу данных на матрицу отсортированных 
# собственных векторов, получая медленные признаки.
y = df_sphered @ w_sorted
  

Вот как выглядят графики медленных признаков:

Slow Feature Analysis. Разбор метода и реализация на Python с нуля - 70

Рассмотрим вклад исходных признаков в полученные решения:

Веса для признака 1

Веса для признака 2

Веса для признака 3

series_1

0.8827

0.3968

-1.7422

series_2

0.4866

0.2028

1.7019

series_3

-0.1265

1.0609

-0.0792

Slow Feature Analysis. Разбор метода и реализация на Python с нуля - 71
Код на Python

w_original = A_s @ w_sorted

weights_df = pd.DataFrame(
    w_original,
    index=df_sphered.columns,
    columns=[f'Веса для медленного признака {i+1}' for i in range(w_original.shape[1])]
)

print("Вклад исходных признаков в каждую медленную компоненту:")
display(weights_df)
  

Как видно в таблице, модель присваивает в самой медленной компоненте наибольший вес первому, самому стабильному признаку. Мы можем изменять параметры и полученные результаты подтвердят то, что SFA склонен отказываться от наиболее шумных источников.

Реализация алгоритма с помощью библиотеки sksfa

В интернете есть реализации этого алгоритма. Я рассмотрю библиотеку sksfa для описанной задачи:

Код с использованием sksfa


from sksfa import SFA
import numpy as np
import pandas as pd

sfa = SFA(n_components=3)

y_sksfa = sfa.fit_transform(df.values)


w, b = sfa.affine_parameters()

# Чтобы график компонент не был перевернутым, 
# а веса с противоположным знаком, умножу w и y_sksfa на -1
w[0, :] *= -1
w[2, :] *= -1
y_sksfa[:,0] *= -1
y_sksfa[:,2] *= -1

weights_df = pd.DataFrame(
    w.T,
    index=df.columns,
    columns=[f'Веса для медленного признака {i+1}' for i in range(w.shape[1])]
)

print("Вклад исходных признаков в каждую медленную компоненту (sksfa):")
display(weights_df)
  

Веса для признака 1

Веса для признака 2

Веса для признака 3

series_1

0.8827

0.3968

-1.7422

series_2

0.4866

0.2028

1.7019

series_3

-0.1265

1.0609

-0.0792

Собственные значения, полученные через sksfa, совпадают с ручным расчётом:

lambda_1=0.1055, quad lambda_2=0.4364, quad lambda_3=0.6337.

Сравнение показало, что ручная реализация SFA и библиотека sksfa работают идентично. Собственные значения lambda_j тоже совпадают, а после приведения знаков весов и компонент к общему виду совпадают веса и сами медленные признаки. Так что вы можете использовать любое удобное решение в своих проектах.

В работе алгоритма как и у многих моделей машинного обучения есть ограничения – он хуже работает, если в данных имеются выбросы или нарушается стацинарность сигнала. Кроме этого, если модель учится на нескольких инвариантах, ее обобщающие качества будут ухудшаться.

Полезные ссылки:

  1. Оригинальная статья: Wiskott, L. and Sejnowski, T. J. (2002). Slow feature analysis: Unsupervised learning of invariances. Neural computation, 14(4):715-770.

  2. sksfa – implementation of Slow Feature Analysis for scikit-learn.

  3. Код в Jupyter Notebook – юпитер ноутбук с кодом из статьи.

Автор: Scryphin

Источник