Ранжируем треки с помощью TRIBE и RBF. rbf.. rbf. TRIBE.. rbf. TRIBE. нейросети.. rbf. TRIBE. нейросети. Ранжирование.

Ощущение — нравится трек или нет, хочется ли его переслушать возникает во время обработки звука мозгом. Поэтому вместо того, чтобы напрямую предсказывать «качество» музыки по спектрограммам или эмбеддингам, можно построить промежуточное представление: сначала оценить, какие паттерны активности коры вызывает аудио, а затем уже по этим паттернам предсказывать относительную популярность треков. Для предсказания активности коры использовалась нейросеть TRIBE.

TRIBE — это модель brain encoding: она получает стимул и предсказывает, какой отклик он вызовет в коре головного мозга. Изначально TRIBE работает с видео и объединяет три потока признаков — текст, изображение и звук. В этой статье используется только аудио: аудио файл превращается в последовательность векторов, описывающих на предсказанную активность коры.

Практически это означает следующее. На вход подаётся аудио файл, на выходе – матрица:

X^{mathrm{TRIBE}} in mathbb{R}^{T times D},

где T — число временных фрагментов, а D — число признаков корковой активности. D составляет порядка 20 тысяч, где каждое значение соответствует активности определенного участка коры. Таким образом, один трек превращается в динамику предсказанной реакции мозга по мере звучания музыки.

В качестве исходных данных используется Free Music Archive (далее FMA): это открытый датасет для задач Music Information Retrieval: классификации жанров, рекомендаций, поиска похожей музыки, анализа метаданных. Полная версия FMA содержит больше 100 тысяч треков, но в эксперименте использовался вариант small: 8000 mp3-фрагментов по 30 секунд, 8 сбалансированных жанров. Для этой задачи важен не жанр, а поля из tracks.csv: идентификатор трека, идентификатор альбома и число прослушиваний.

Идея эксперимента такая: если TRIBE действительно сохраняет в своих выходах часть информации о том, как звук обрабатывается мозгом, то в этих признаках может быть слабый сигнал, связанный с тем, какой трек слушатели выбирают чаще. Поставим задачу так: взять два трека из одного альбома и предсказать, какой из них набрал больше прослушиваний. Заметим, что сравнение происходит именно внутри одного альбома — прослушивания плохо сравниваются между разными артистами и релизами: у одного исполнителя 10 тысяч прослушиваний могут быть провалом, у другого — верхней границей аудитории.

Итоговый пайплайн получился такой:

  1. взять подмножество FMA small (не весь датасет, так как TRIBE требовательна к ресурсам);

  2. сгруппировать треки по альбомам;

  3. для каждого трека сохранить число прослушиваний из метаданных;

  4. прогнать аудио через TRIBE и получить матрицу признаков;

  5. сжать выход TRIBE через StandardScaler -> PCA -> StandardScaler;

  6. построить пары сравнений внутри каждого альбома;

  7. обучить небольшую RBF-сеть, которая выдаёт скалярную оценку трека;

  8. проверить accuracy на парах: сколько раз модель поставила более прослушиваемый трек выше.

Результат: около 85% pairwise accuracy на train и около 58% на test. Это слабый, но достоверный сигнал. Ниже — как именно он был получен.

Что именно предсказывается

На входе есть трек. После TRIBE он превращается в последовательность векторов X^{mathrm{TRIBE}}.

Обучаемая модель считает скаляр f(X) для каждого трека. Для пары треков из одного альбома требуется:

f(X_{mathrm{popular}}) > f(X_{mathrm{less popular}})

Это важное упрощение. Оно делает задачу устойчивее к разным масштабам популярности между альбомами, жанрами и артистами.

Подготовка FMA

В FMA метаданные лежат в tracks.csv с многоуровневым заголовком. Для эксперимента нужны три поля:

  • track_id – чтобы найти mp3-файл;

  • album_id – чтобы группировать сравнения;

  • track/listens – число прослушиваний, используемое как рейтинг.

Фрагмент подготовки метаданных:

from pathlib import Path
import pandas as pd
META_DIR = Path("data/fma_metadata")
TRACKS_CSV = META_DIR / "tracks.csv"

tracks = pd.read_csv(    
  TRACKS_CSV,  
  index_col=0,   
  header=[0, 1],   
  engine="python",    
  on_bad_lines="skip",
)

tracks = tracks[tracks[("set", "subset")] == "small"].copy()

tracks["track_id"] = tracks.index.astype(int)
tracks["album_id"] = tracks[("album", "id")]
tracks["popularity"] = tracks[("track", "listens")]

tracks = tracks[["track_id", "album_id", "popularity"]].dropna()

Альбомы с 1-2 треками бесполезны: внутри них нечего сравнивать. Поэтому я оставлял только альбомы, где есть минимум 3 трека:

MIN_TRACKS_IN_ALBUM = 3

album_sizes = tracks.groupby("album_id").size()
valid_album_ids = album_sizes[album_sizes >= MIN_TRACKS_IN_ALBUM].index

tracks = tracks[tracks["album_id"].isin(valid_album_ids)].copy()

Разбиение train/test тоже делается по альбомам.

import random

SEED = 42
TEST_FRAC = 0.2

rng = random.Random(SEED)
album_ids = list(tracks["album_id"].unique())
rng.shuffle(album_ids)

n_test = int(len(album_ids) * TEST_FRAC)
test_albums = set(album_ids[:n_test])
train_albums = set(album_ids[n_test:])

tracks["split"] = tracks["album_id"].apply(    lambda album_id: "test" if album_id in test_albums else "train")

Файлы удобно разложить так:

data/fma_dataset/
  train/        
    album_123/       
      000001.mp3          
      000002.mp3         
      ratings.json   
  test/       
    album_456/     
      000101.mp3         
      000102.mp3         
      ratings.json

ratings.json хранит только локальную таблицу популярности внутри папки альбома:

{    
  "000001": 1534,   
  "000002": 847,    
  "000003": 2681
}

На этапе построения пар каждая папка альбома становится самостоятельным объектом: список матриц треков плюс список пар (better_idx, worse_idx).

Прогон аудио через TRIBE

TRIBE используется как фиксированный экстрактор признаков. Код ниже показывает сам инференс без деталей.

from pathlib import Path
import numpy as np

from tribev2.demo_utils import TribeModel

DATASET_PATH = Path("data/fma_dataset")
CACHE_DIR = Path("cache/tribe")

model = TribeModel.from_pretrained(   
          "facebook/tribev2",    
          cache_folder=str(CACHE_DIR)
)
for audio_path in sorted(DATASET_PATH.rglob("*.mp3")): 
  events = model.get_events_dataframe(audio_path=str(audio_path))  
  events = events[events["type"] == "Audio"]   
  preds, segments = model.predict(events=events)   
  preds = np.asarray(preds, dtype=np.float32)   
  np.save(audio_path.with_suffix(".npy"), preds)

После этого рядом с каждым mp3 появляется npy:

000001.mp3
000001.npy
ratings.json

T зависит от длительности трека и от того, как TRIBE режет аудио на события. D велико, поэтому сразу обучать на исходном выходе неудобно из-за большой размерности.

Сжатие выхода TRIBE

Соседние или функционально близкие зоны мозга в таком представлении могут давать коррелированные значения. Поэтому перед RBF-моделью использовалось сжатие:

mathrm{StandardScaler}_1 rightarrow mathrm{PCA} rightarrow mathrm{StandardScaler}_2

PCA оставляет 16 компонент (explained variance ~ 95%):

from pathlib import Path
import joblib
import numpy as np

from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

DATASET_PATH = Path("data/fma_dataset")
PREPROCESSOR_PATH = Path("models/preprocessor.pkl")
N_COMPONENTS = 16

npy_files = sorted((DATASET_PATH / "train").rglob("*.npy"))

blocks = []
for path in npy_files:    
  x = np.load(path)    
  blocks.append(x.astype(np.float32))
X_train_frames = np.concatenate(blocks, axis=0)
preprocessor = Pipeline([    
                         ("scaler_1", StandardScaler()),    
  ("pca", PCA( n_components=N_COMPONENTS, svd_solver="randomized", random_state=42, )),    
  ("scaler_2", StandardScaler()),
])

X_debug = preprocessor.fit_transform(X_train_frames)

pca = preprocessor.named_steps["pca"]
PREPROCESSOR_PATH.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(preprocessor, PREPROCESSOR_PATH)

После этого каждый трек описывается матрицей:

X in mathbb{R}^{T times 16}

Формирование пар для сравнения

Число прослушиваний зависит от релиза, аудитории, попадания в подборки, времени публикации, названия артиста, обложки и ещё десятков факторов. Если обучать регрессию audio -> listens, модель будет пытаться объяснить аудиосигналом то, чего в аудио нет.

Поэтому используем pairwise ranking. Для каждого альбома берутся пары треков. Если один трек имеет больше прослушиваний, он считается better, второй – worse. Но сравнивать слишком близкие значения тоже плохо: разница между 1000 и 1020 прослушиваниями может быть шумом. Поэтому рейтинг переводится в логарифм, а пары с маленькой разницей отбрасываются.

Для альбома с рейтингами r_i:

s_i=log_{10}(r_i + varepsilon)

Пара используется только если:

|s_i - s_j| geq sigma_{mathrm{album}},

где sigma_{mathrm{album}} – стандартное отклонение логарифмических рейтингов внутри альбома.

Код построения пар:

from itertools import combinations
import numpy as np
MIN_SIGMA_TO_COMPARE = 1.0
EPS = 1e-8
def build_album_pairs(ratings):    
  scores = np.asarray(ratings, dtype=np.float32)    
  log_scores = np.log10(scores + EPS)    
  sigma = np.std(log_scores)    
  threshold = MIN_SIGMA_TO_COMPARE * sigma    
  pairs = []    
  for i, j in combinations(range(len(log_scores)), 2):        
    diff = abs(log_scores[i] - log_scores[j])        
    if diff < threshold:            
      continue        
    if log_scores[i] > log_scores[j]:
      pairs.append((i, j))        
    else:            
      pairs.append((j, i))    
  return pairs

Здесь (i, j) означает: трек i должен получить оценку выше, чем трек j.

Сборка датасета для обучения

На этом этапе каждая папка альбома превращается в структуру:

dataset = [    
           {        
            "tracks": [track_matrix_0, track_matrix_1, ...],       
            "pairs": [(better_idx, worse_idx), ...],    
           },    
          ...,
]

Код:

from pathlib import Path
import json
import joblib
import numpy as np

DATASET_PATH = Path("data/fma_dataset")
PREPROCESSOR_PATH = Path("models/preprocessor.pkl")

preprocessor = joblib.load(PREPROCESSOR_PATH)

def load_split(split_name):    
  split_dir = DATASET_PATH / split_name    
  albums = []    
  for album_dir in sorted(p for p in split_dir.iterdir() if p.is_dir()):
    with open(album_dir / "ratings.json", "r", encoding="utf-8") as f:
      ratings_dict = json.load(f)
      tracks = []
      ratings = []
      for npy_path in sorted(album_dir.glob("*.npy")):
        track_name = npy_path.stem
        if track_name not in ratings_dict:
          continue           
        x = np.load(npy_path) 
        x = preprocessor.transform(x).astype(np.float32)
        
        tracks.append(x)
        ratings.append(float(ratings_dict[track_name])) 
        
      if len(tracks) < 2:
        continue     
        
      pairs = build_album_pairs(ratings) 
      
      if not pairs: 
        continue       
      albums.append({ 
                     "tracks": tracks, 
                      "pairs": pairs,        
      })  
  return albums
train_dataset = load_split("train")
test_dataset = load_split("test")

Важный момент: pairs содержат индексы внутри альбома. Это не глобальные индексы файлов. Поэтому при обучении пара хранится как (album_idx, better_idx, worse_idx):

def flatten_pairs(dataset):    
  result = []    
  for album_idx, album in enumerate(dataset):        
    for better_idx, worse_idx in album["pairs"]:            
      result.append((album_idx, better_idx, worse_idx))    
  return result
train_pairs = flatten_pairs(train_dataset)
test_pairs = flatten_pairs(test_dataset)

Модель: RBF поверх временной последовательности

После препроцессора трек — это последовательность векторов:

mathbf{b}_j in mathbb{R}^{16}, qquad j=1, ldots, N_{mathrm{bases}}

Для предсказания популярности используем RBF-сеть. RBF-сеть (Radial Basis Function network) – это модель, которая сравнивает входные данные с набором “эталонных” паттернов. Каждый RBF-узел отвечает за некоторую область в пространстве признаков: если вход похож на соответствующий паттерн, отклик узла большой; если далёк — отклик б ыстро падает.
В эксперименте RBF-модель содержит N_{mathrm{bases}} = 8 центров:

mathbf{b}_j in mathbb{R}^{16}, qquad j=1, ldots, N_{mathrm{bases}}

Для каждого кадра mathbf{x}_t считается взвешенное расстояние до каждого центра.

d_{tj}^{2}=sum_{k=1}^{16} w_k (x_{tk} - b_{jk})^2

w_k > 0 – обучаемая важность k-й компоненты. Чтобы вес всегда был положительным, в коде хранится сырой параметр w_raw, а реальный вес считается через softplus:

w_k=operatorname{softplus}(w_{mathrm{raw}, k})

Отклик:

K_{tj}=exp(-gamma_j d_{tj}^{2}),

где gamma_j > 0 – обучаемая ширина j-го базиса. Большой gamma_j делает базис узким: он реагирует только на близкие к центру паттерны.

Скалярная оценка кадра:

z_t=sum_{j=1}^{N_{mathrm{bases}}} v_j K_{tj}

Скалярная оценка трека – среднее по времени:

f(X)=frac{1}{T} sum_{t=1}^{T} z_t

Полная модель на PyTorch:

import torch
import torch.nn as nn

def inv_softplus(x):    
  x = torch.as_tensor(x, dtype=torch.float32)    
  return torch.log(torch.expm1(x))

class RBFSequenceModel(nn.Module):    
  def __init__(self, n_bases, dim, gamma_init=0.01, w_init=1.0):
  super().__init__()     
  
  self.n_bases = n_bases    
  self.dim = dim      
  self.b = nn.Parameter(torch.randn(n_bases, dim))    
  self.gamma_raw = nn.Parameter(inv_softplus(gamma_init).repeat(n_bases))    
  self.v = nn.Parameter(torch.randn(n_bases) * 0.01)       
  self.w_raw = nn.Parameter(inv_softplus(w_init).repeat(dim))     
  self.softplus = nn.Softplus()   
  
  def w(self):      
    return self.softplus(self.w_raw)
    
  def gamma(self):       
    return self.softplus(self.gamma_raw) 
  
  def forward(self, x):      
    diff = x[:, None, :] - self.b[None, :, :]    
    dist2 = (diff ** 2 * self.w()[None, None, :]).sum(dim=2)    
    k = torch.exp(-self.gamma()[None, :] * dist2)    
    z = (k * self.v[None, :]).sum(dim=1)      
    return z.mean()

Лосс: softplus от отрицательного margin

Для пары (better, worse) модель считает:

y_{mathrm{better}}=f(X_{mathrm{better}}), qquad y_{mathrm{worse}}=f(X_{mathrm{worse}})

Margin:

m=y_{mathrm{better}} - y_{mathrm{worse}}

Если m > 0, порядок правильный. Если m < 0, модель ошиблась. Лосс:

mathcal{L}=log(1 + exp(-m))

В PyTorch это F.softplus(-margin):

import torch.nn.functional as F

def pairwise_batch_loss(model, dataset, batch):   
  losses = []   
  
  for album_idx, better_idx, worse_idx in batch:     
    album = dataset[album_idx]    
    
    x_better = album["tracks"][better_idx]   
    x_worse = album["tracks"][worse_idx]  
    
    y_better = model(x_better)       
    y_worse = model(x_worse)  
    
    margin = y_better - y_worse      
    losses.append(F.softplus(-margin))  
  return torch.stack(losses).mean()

Метрика

Метрика выбираем, исходя из постановки задачи: доля правильно упорядоченных пар.

operatorname{accuracy}=frac{#{(i, j): f(X_i) > f(X_j)}} {#{(i, j)}}

import random
import torch

@torch.no_grad()
def evaluate_accuracy(model, dataset, pairs, max_pairs=None): 
  model.eval()   
  if max_pairs is not None and len(pairs) > max_pairs:      
    eval_pairs = random.sample(pairs, max_pairs)   
  else:        
    eval_pairs = pairs   
  correct = 0   
  for album_idx, better_idx, worse_idx in eval_pairs:    
    album = dataset[album_idx] 
    
    y_better = model(album["tracks"][better_idx])      
    y_worse = model(album["tracks"][worse_idx])    
    
    correct += int(y_better.item() > y_worse.item())   
    return correct / len(eval_pairs)

У этой метрики есть понятная случайная база: 50%. Если модель ничего не выучила, она будет примерно угадывать знак пары.

Обучение

Параметры эксперимента:

N_BASES = 8
N_EPOCHS = 250
BATCH_SIZE = 16
LR = 5e-3

Минимальный цикл обучения:

import random
import numpy as np
import torch

def batchify(items, batch_size, shuffle=True):    
  indices = list(range(len(items)))   
  if shuffle:        
    random.shuffle(indices)   
  for start in range(0, len(indices), batch_size):    
    batch_indices = indices[start:start + batch_size]      
    yield [items[i] for i in batch_indices]
    
def convert_dataset_to_torch(dataset, device):  
  converted = []    
  for album in dataset:  
    tracks = [    
              torch.tensor(x, dtype=torch.float32, device=device)  
        for x in album["tracks"]        
    ]       
    converted.append({ 
                      "tracks": tracks,  
                      "pairs": album["pairs"],
    })   
  return converted

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

train_dataset_torch = convert_dataset_to_torch(train_dataset, DEVICE)
test_dataset_torch = convert_dataset_to_torch(test_dataset, DEVICE)

train_pairs = flatten_pairs(train_dataset_torch)
test_pairs = flatten_pairs(test_dataset_torch)

DIM = train_dataset_torch[0]["tracks"][0].shape[1]

model = RBFSequenceModel(n_bases=N_BASES, dim=DIM).to(DEVICE)
optimizer = torch.optim.AdamW(    model.parameters(),    lr=LR
)

train_losses = []
train_accs = []
test_accs = []

for epoch in range(1, N_EPOCHS + 1):   
  model.train()   
  epoch_losses = []  
  for batch in batchify(train_pairs, BATCH_SIZE, shuffle=True):    
    optimizer.zero_grad()       
    loss = pairwise_batch_loss(model, train_dataset_torch, batch)   
    loss.backward()       
  
    optimizer.step()        
    epoch_losses.append(loss.item())  
    
  train_loss = float(np.mean(epoch_losses))   
  train_acc = evaluate_accuracy(model, train_dataset_torch, train_pairs)  
  test_acc = evaluate_accuracy(model, test_dataset_torch, test_pairs)  
  
  train_losses.append(train_loss)   
  train_accs.append(train_acc)    
  test_accs.append(test_acc)    
  
  print(        f"epoch={epoch:03d} "   
              f"loss={train_loss:.6f} "  
              f"train_acc={train_acc * 100:.2f}% "  
              f"test_acc={test_acc * 100:.2f}%"   
     )

Результаты

На текущем запуске модель дала примерно такие значения:

train accuracy: около 85%
test accuracy:  около 58%
Рис. 1 Динамика accuracy

Рис. 1 Динамика accuracy

Это похоже на переобучение, и оно ожидаемо: данных мало, выход TRIBE сжат до 16 компонент, RBF-центров всего 8, но число факторов популярности намного больше, чем акустический сигнал.

При этом 58% на test всё равно выше случайных 50%. Я бы не называл это сильным результатом. Корректнее сказать так: в TRIBE-представлении есть слабый сигнал, связанный с популярностью треков .

Интерпретация параметров модели

Так как PCA удаляет признаки с высокой корреляцией (т. е. близкие участки коры), можно предположить, что каждая компонента вектора x описывает активность какого-то определенного участка мозга, а соответствующая компонента вектора w – важность этого участка для конечного предсказания. Очевидно, не вся кора отвечает за реакцию на музыку, что видно из графика ниже.

Рис. 2 Распределение компонент w

Рис. 2 Распределение компонент w

Каждый вектор b описывает некоторое фиксированное распределение активности коры, а скаляр v — то, насколько «ок/не ок» это состояние ощущается.

Вывод

Эксперимент не доказывает, что популярность музыки можно предсказывать по «мозговым» признакам. Он показывает более узкую вещь: если взять TRIBE как фиксированный экстрактор, сжать его выход до 16 компонент и обучить RBF‑модель на попарном ранжировании треков внутри альбомов, получается test accuracy около 58% против случайных 50%.

Для практической рекомендательной системы этого мало. Для технического baseline — уже достаточно интересно: пайплайн воспроизводим, модель маленькая, лосс соответствует задаче, а результат можно сравнивать с более простыми и более сильными аудиопризнаками.

Главный вывод для меня: такую задачу нельзя честно формулировать как «предсказание популярности музыки». Но её можно формулировать как проверку слабого ранжирующего сигнала в нейрофизиологически мотивированном представлении аудио. И в этой формулировке результат не выглядит случайным, хотя до убедительной модели ещё далеко.

Примечание: существует концептуально аналогичная работа: Virality Predictor. Статья была написана за день до выхода этого продукта, и её можно рассматривать, как попытку сделать локально‑запускаемый self‑made аналог.

Автор: gCapybara

Источник