Как я устал писать парсер под каждый прайс и сделал из этого библиотеку. etl.. etl. llm.. etl. llm. Open source.. etl. llm. Open source. pydantic.. etl. llm. Open source. pydantic. python.. etl. llm. Open source. pydantic. python. валидация данных.. etl. llm. Open source. pydantic. python. валидация данных. интеграция данных.. etl. llm. Open source. pydantic. python. валидация данных. интеграция данных. конвейер данных.. etl. llm. Open source. pydantic. python. валидация данных. интеграция данных. конвейер данных. обработка csv.. etl. llm. Open source. pydantic. python. валидация данных. интеграция данных. конвейер данных. обработка csv. парсинг данных.

С чего всё началось

У нас на проекте была до боли знакомая многим картина: десятки источников прайсингов, и каждый — в своём формате. Один вендор присылает CSV с разделителем ;, другой — выгрузку из Excel, третий дёргает наш вебхук JSON-ом. Данные по сути одни и те же — аэропорт, марка топлива, цена, дата, — но:

  • колонка цены называется то PRICE, то into-plane, то USD/gallon, то Цена (EUR/л);

  • единицы разные: у одного литры, у другого галлоны, у третьего цена в центах;

  • даты — 2026-01-15, 15/02/2026, 01.02.2026, как повезёт;

  • аэропорт — то IATA-код, то ICAO, то «JFK / John F Kennedy» одной строкой;

  • а у некоторых в одной ячейке через | перечислено сразу несколько аэропортов.

И под каждый такой источник жил отдельный парсер. Сначала их было три. Потом восемь. Потом я перестал считать. Каждый — это сто строк if-else, ручной strip(), ручной разбор дат, ручная валидация. А самое противное — они ломались молча. Вендор тихо переименовывал колонку, парсер мапил её в None, и в прод уезжали записи с пустой ценой. Узнавали мы об этом не из алерта, а из вопроса «а почему у нас топливо в Хитроу бесплатное?».

В какой-то момент я поймал себя на том, что в третий раз за месяц копирую один и тот же парсер и меняю в нём три строки. И решил, что хватит.

Главная мысль

Я сел и сформулировал, что меня бесит. Не парсинг как таковой — а то, что логика маппинга размазана по коду. Каждый парсер — это, если приглядеться, одна и та же программа с разной конфигурацией: «возьми вот эту колонку, почисти вот так, положи вот в это поле модели». Сам код одинаковый. Различается только описание соответствия — маппинг.

Значит, маппинг надо вынести из кода в данные. Парсер сделать один, универсальный и детерминированный. А «описание соответствия» — в отдельный артефакт, который можно ревьюить, версионировать и, в идеале, править, не дёргая разработчика.

Оставался один вопрос: кто будет писать эти описания? Под каждый новый источник руками расписывать «эта колонка → это поле» — почти то же самое, что писать парсер. И вот тут на помощь пришёл LLM — но не так, как обычно его суют в подобные задачи.

Ключевое решение: LLM один раз, потом — никогда

Главная ошибка, которую я хотел избежать, — гонять LLM на каждой строке. Это медленно, дорого и недетерминировано: один и тот же файл может распарситься по-разному, а в проде такое недопустимо.

Поэтому LLM в моей либе работает ровно один раз — когда система впервые встречает новую форму данных. Он смотрит на заголовки, небольшой сэмпл строк и JSON-схему моей целевой модели — и пишет спеку: человекочитаемый YAML, который говорит «их E-mail — это твой email, прогнать через strip_lower; их Reg Date — это signup_date, распарсить как %d.%m.%Y».

Эту спеку я читаю глазами, при необходимости правлю и коммичу — как обычный код. А дальше LLM больше не нужен. Совсем. Каждый следующий запуск — чистый детерминированный Python: ноль обращений к API, ноль недетерминизма, одинаковый результат на одинаковом входе.

Так появился fidelis.

Как это выглядит

Сначала я описываю данные так, как они нужны мне — одной Pydantic-моделью:

from datetime import date
from pydantic import BaseModel

class FuelPrice(BaseModel):
    airport: str
    grade: str
    price: float
    delivered: date

Дальше направляю fidelis на источник. Если спека для такой раскладки уже есть — он просто детерминированно мапит. Если нет (и сконфигурирован LLM) — генерит спеку, сохраняет, и со следующего раза работает уже без LLM:

from fidelis import Parser

parser = Parser(FuelPrice, spec_store="specs/", llm="anthropic:claude-opus-4-8")
result = parser.parse("incoming/shell_us.csv")

print(result.summary())
# valid=128 errors=2 coverage=0.98 needs_review=False drift=False generated=False
for row in result.valid_rows:
    ...  # это уже валидированные FuelPrice

А сама спека — вот такой читаемый YAML:

version: 1
generated_by: claude-opus-4-8
signature: 05cd14
mappings:
  - target: airport
    source: "Airport"
    transform: strip
  - target: price
    source: "USD/gallon"
    transform: to_float
  - target: delivered
    source: "Effective Date"
    transform: "parse_date:%m/%d/%Y"

Никаких if-else. Описание соответствия — и всё.

Трансформы: чистка значений по дороге

Сырое значение почти никогда не ложится в модель как есть: вокруг пробелы, цена строкой "1 240,50", дата в локальном формате. За это отвечают трансформы — их видно в спеке выше как transform: to_float или transform: "parse_date:%m/%d/%Y". Трансформ берёт одну ячейку и приводит её к нужному виду до валидации.

Из коробки есть всё, что нужно в 90% случаев: strip, strip_lower, to_int, to_float (понимает запятую как разделитель), to_bool, parse_date (можно перечислить несколько форматов через | — попробует по очереди) и clip для зажима числа в диапазон. Аргумент после двоеточия — часть спеки, так что один и тот же трансформ настраивается под каждый источник:

mappings:
  - target: delivered
    source: "Date"
    transform: "parse_date:%Y-%m-%d|%d/%m/%Y"   # сначала ISO, потом ДД/ММ/ГГГГ
  - target: discount_pct
    source: "Discount"
    transform: "clip:0:100"                       # зажать в [0, 100]

А если своего случая в наборе нет — регистрируешь трансформ в коде и ссылаешься на него по имени, как на встроенный. Форма — обычный вызов или декоратор:

import fidelis

@fidelis.register_transform("eu_float")
def eu_float(value, arg):
    # "1 240,50" -> 1240.5
    return float(str(value).replace(" ", "").replace(",", "."))
  - target: price
    source: "Цена (EUR/л)"
    transform: eu_float

Маленькие переиспользуемые функции вместо копипасты replace() по всем парсерам.

Где живут спеки: файлы, S3, база

По умолчанию спеки — это просто YAML-файлы рядом с проектом, и Parser(spec_store="specs/") читает их оттуда. Но в проде хранить парсинг-контракты в репозитории хочется не всегда: их может править продакт через админку, их может быть много, они могут лежать в общем сторе для нескольких сервисов. Поэтому spec_store принимает не только путь, но и любой бэкенд SpecStore:

Parser(FuelPrice, spec_store="specs/")            # файлы (по умолчанию)
Parser(FuelPrice, spec_store=S3SpecStore(bucket)) # свой бэкенд: S3, БД, конфиг-сервис

Самое приятное — реализовать такой стор почти ничего не стоит. Идентичность спеки это её сигнатура, и она же ложится прямым ключом: get(signature) — это один GET объекта или один SELECT по первичному ключу, без сканирования. Весь интерфейс — это get и save:

from fidelis import SpecStore, Spec

class S3SpecStore(SpecStore):
    def __init__(self, bucket, prefix=""):
        self.s3, self.bucket, self.prefix = boto3.client("s3"), bucket, prefix

    def get(self, signature):                       # один GET по сигнатуре
        key = f"{self.prefix}spec_{signature}.yaml"
        try:
            obj = self.s3.get_object(Bucket=self.bucket, Key=key)
        except self.s3.exceptions.NoSuchKey:
            return None
        return Spec.from_yaml(obj["Body"].read().decode())

    def save(self, spec):
        self.s3.put_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}spec_{spec.signature}.yaml",
            Body=spec.dump_yaml().encode(),
        )

Тот же Parser, тот же детерминированный прогон — меняется только то, откуда приехала спека. Хочешь хранить маппинги в Postgres и редактировать их из внутренней админки — пишешь двадцать строк SpecStore поверх своей таблицы, и продакт меняет соответствие колонок без единого деплоя.

Маппинг — это ещё не всё: данные можно обогащать

Довольно быстро выяснилось, что чистый «колонка → поле» закрывает не все случаи. Иногда нужного значения в источнике просто нет — его надо вычислить или подтянуть. Поэтому помимо маппинга в спеке есть обогащение (enrichment).

Если трансформация видит одну ячейку, то обогащение видит всю смапленную запись плюс исходную строку целиком — и может из этого собрать новое поле. Логику ты пишешь в коде и регистрируешь по имени, а в спеке просто ссылаешься на неё — ровно как на трансформацию. Так YAML остаётся полным контрактом, а обогащение варьируется от источника к источнику.

Например, один вендор шлёт цену в локальной валюте, а нам в модель нужна цена в USD — добавляем поле на лету:

import fidelis

@fidelis.register_enrichment("to_usd")
def to_usd(record, source):
    rate = fx_rates[source["Currency"]]      # курс по валюте из ИСХОДНОЙ строки
    record["price_usd"] = round(record["price"] * rate, 2)
    return record
# в спеке этого источника
enrich:
  - to_usd

Так же можно дотянуться до сырых колонок, которые вообще не мапятся ни в одно поле модели, — скажем, склеить First Name и Last Name в full_name, замаскировать чувствительное значение или сходить за справочными данными во внешний источник. Обогащение прогоняется после маппинга и до валидации, так что любое поле, которое оно добавило, валидируется наравне с остальными. А если функция упадёт — строка не потеряется, а вернётся типизированной ошибкой с именем сбойнувшего обогатителя.

Отдельно я сделал батч-обогащение — для случая, когда обогащение ходит в базу или по API. Построчный обогатитель сделал бы по запросу на каждую строку; батчевый получает сразу все чистые записи и возвращает столько же — то есть один bulk-вызов на весь фид:

@fidelis.register_batch_enrichment("attach_airport_meta")
def attach_airport_meta(records):
    codes = [r["airport"] for r in records]
    meta = airport_service.bulk_lookup(codes)   # ОДИН вызов, а не N
    for r in records:
        r["country"] = meta[r["airport"]].country
    return records

А чтобы не зашивать в обогатители конфиги и подключения, есть рантайм-контекст: справочники, курсы, пороги, клиенты БД можно передать в Parser(context=...), и любой обогатитель (или трансформация), который объявит параметр context, его получит — изолированно на один прогон.

В итоге «собрать поле из двух», «подтянуть страну по коду аэропорта», «пересчитать валюту», «замаскировать PII» — это больше не разные парсеры, а несколько строк в спеке плюс маленькие переиспользуемые функции в коде.

Один источник — несколько записей

Не всегда «одна строка входа = одна запись на выходе». Классика прайсингов: вендор в одной строке шлёт и розничную, и оптовую цену — а нам в модель нужно по записи на каждую. Городить это в обогащении неудобно, поэтому в спеке есть правила (rules): из одной строки рождается по записи на каждое сработавшее правило. У каждого правила свой предикат when и свои дополнительные маппинги, которые накладываются поверх общих.

Допустим, приходит такое:

SKU,RETAIL_PRICE,WHOLESALE_PRICE
JET-A,725.00,690.00
AVGAS,,540.00

Модель — «одна цена с типом»:

class Quote(BaseModel):
    sku: str
    kind: str        # retail | wholesale
    amount: float

А спека разворачивает строку в одну-две записи — в зависимости от того, какие цены заполнены:

mappings:
  - {target: sku, source: SKU, transform: strip}     # общее для всех записей
rules:
  - when: {field: RETAIL_PRICE, op: not_empty}        # ops: not_empty/empty/eq/ne/in/gt/lt/ge/le
    mappings:
      - {target: kind, value: retail}
      - {target: amount, source: RETAIL_PRICE, transform: to_float}
  - when: {field: WHOLESALE_PRICE, op: not_empty}
    mappings:
      - {target: kind, value: wholesale}
      - {target: amount, source: WHOLESALE_PRICE, transform: to_float}

Первая строка с двумя ценами даст две записи (retail и wholesale), вторая — только одну (wholesale, ведь розничная цена пустая). И всё это по-прежнему просто декларация в спеке, а не разветвление в коде. Каждая получившаяся запись дальше точно так же валидируется, обогащается и попадает в coverage — то есть «пустая строка, не давшая ни одной записи» тоже видна, а не теряется.

Что я понял по дороге

Идентичность источника — не имя файла, а смысл полей. Спека привязана не к shell_us.csv, а к сигнатуре — хэшу нормализованного набора имён колонок. Поэтому одна спека покрывает и CSV сегодня, и JSON с теми же полями завтра. Регистр и порядок колонок не важны.

Спека — это данные, а не код. И раз так, хранить её можно где угодно: рядом с проектом в YAML, в S3, в базе, в конфиг-сервисе — через интерфейс SpecStore. А ещё — раз это читаемый маппинг, а не парсер, — править её может кто угодно. Новый источник или переименованная колонка превращаются из «заведите задачу на разработку парсера» в «отредактируйте спеку». В пределе продакт делает это прямо в админке, не дожидаясь релиза.

Ничего не должно теряться молча. Это была главная боль, и я её закрыл жёстко. Каждая строка либо становится валидной записью, либо возвращается типизированной ошибкой с исходными данными, полем и причиной. Битые строки можно выгрузить в файл, отдать человеку на исправление и зачитать обратно — round-trip через карантин. А coverage показывает одной цифрой, какая доля входных строк реально дала результат.

Дрифт схемы надо ловить в CI, а не в проде. Когда знакомый источник внезапно получает или теряет колонку, его сигнатура меняется. fidelis распознаёт это как дрифт известного источника (по пересечению полей), а не как новый формат, и применяет заданную политику. В CI это просто fidelis check-drift feed.csv --model app:FuelPrice — сборка падает в тот день, когда вендор тихо переименовал колонку. Больше никакого бесплатного топлива в Хитроу.

Чем всё закончилось

Грязные реалии прайсингов оказались не такими уж и разными — почти всё свелось к набору деклараций. Розницу и опт из одной строки мы уже развернули правилами; ровно так же одну ячейку с JFK|LAX|ORD можно размножить в три записи (fan-out), а «широкую» таблицу с Q1_PRICE/Q2_PRICE/Q3_PRICE — развернуть в строки (unpivot). Всё это — пункты в спеке, а не новый парсер; рабочие примеры на каждый случай лежат в репозитории, от самого простого к самому хитрому.

Десяток разрозненных парсеров превратился в один движок плюс папку с YAML-описаниями. Новый вендор — это не день разработки, а ревью одной сгенерированной спеки. И, что приятнее всего, когда что-то ломается — я узнаю об этом из красной сборки, а не из вопроса бизнеса.

Если у вас на проекте такая же зоопарк-ситуация с входными данными — посмотрите fidelis на GitHub, может пригодится.

Буду рад фидбэку и ⭐️

Автор: NGdust

Источник