Только Сигма выбирают Delta Lake. datalake.. datalake. deltalake.. datalake. deltalake. flink.. datalake. deltalake. flink. hadoop.. datalake. deltalake. flink. hadoop. iceberg.. datalake. deltalake. flink. hadoop. iceberg. Kubernetes.. datalake. deltalake. flink. hadoop. iceberg. Kubernetes. lakehouse.. datalake. deltalake. flink. hadoop. iceberg. Kubernetes. lakehouse. ml.. datalake. deltalake. flink. hadoop. iceberg. Kubernetes. lakehouse. ml. spark.
Только Сигма выбирают Delta Lake - 1

Меня зовут Дмитрий Кравчук, я занимаюсь всем, что связано с данными в блоке AI&ML MAGNIT TECH. Расскажу про фундамент прибыльных проектов, которыми мы занимаемся в департаменте. Это начало цикла статей о наших достижениях за 5 лет и планах на будущее.

В 2020 году мы выбирали формат для больших ML-хранилищ: Delta Lake, Iceberg или Hudi. В итоге остановились на Delta Lake. За 5 лет вокруг него у нас выросла полноценная платформа данных для ML-нагрузок: Spark, Kubernetes, S3, YDB, Airflow, Flink, OpenMetadata, Trino, StarRocks и собственные фреймворки.

Я не буду пересказывать документацию Delta Lake, а расскажу, как это работает у нас в промышленном контуре: доставка данных, слои хранения, оркестрация, хранилище метаданных, lineage, data quality, observability и governance, а также доступ к данным.

Отдельно расскажу, в чём Delta Lake оказался силён, а где пришлось достраивать инфраструктуру: объектное хранилище вместо блочного, координировать коммиты для приближения к ACID, обеспечивать потоковую загрузку, оптимизировать хранение и эксплуатировать большие таблицы.

Материал будет полезен инженерам данных, ML инженерам, Data scientist’ам, системным аналитикам, MLOps/DataOps/DevOps и всем, кто создаёт промышленную платформу данных под реальные нагрузки.

История

Только Сигма выбирают Delta Lake - 2

В уже кажущемся далёким 2020 году нам не хватало Spark 2.x и Parquet. Хотелось чего-то большего (как всегда). Мы пробовали многие платформы on prem и в облаке (спойлер: все), и, оглядываясь назад, могу сказать, что выбор был сложный. Нужно было выбрать фреймворк для работы с Большими Данными. Они большие не только потому, что их много, но и потому, что из них можно извлечь много информации, а в перспективе — и денег. На тот момент мы рассматривали фреймворки с открытым исходным кодом, чтобы к преимуществам Parquet добавить удобство ACID. Список кандидатов с тех пор не сильно пополнился: Delta Lake, Iceberg и Hudi. Потоковой нагрузки было не так много (мы ещё вернемся к Paimon), и выбор был непростым.

Hudi отмели почти сразу из-за малого количества референсов и недостаточного уровня интеграции с нашим стеком на тот момент. А пользовались мы Spark 2.x, Hadoop, Airflow и Hive. Вспоминать это больно, потому что состояние не было отделено от приложений. Дальше выбирали между Iceberg и Delta Lake. Тогда мы ещё не знали, что основной разработчик Delta Lake в 2024 году приобретёт основного разработчика Iceberg. Также мы не знали, что будет выпущен Unity Catalog вместо Hive Metastore, который получил аж четвёртую мажорную версию. Выбрали Delta Lake из-за его зрелости на тот момент, и не прогадали.

Какие проекты реализовали

Только Сигма выбирают Delta Lake - 3

За пять лет реализовали много проектов и on prem, и в облаке. С развитием контейнеризации и подхода IaC мы меняли HDFS на бесплатные Ozone и MinIO. Пробовали Ceph и переходили полностью на S3 в платных версиях. Писали на Python и Scala, а потом только на Python. Отделяли состояние от приложений, переходили на Trino с Alluxio с Hive и Drill. Проводили мажорные обновления Spark, и наконец-то уже делали свои сборки приложений с кучей зависимостей, добавляя ко всему прочему Spline и Data Flint, и задумываясь, а не перейти ли нам на Ray и Daft. Всё это время неизменной оставалась скорость работы связки Spark и Delta. Мы также постоянно сравнивали со скоростью работы наших конвейеров в связке Spark и Iceberg, но всегда было не лучше.

После появления в Delta Lake режима UniFormat необходимость думать об Iceberg отпала, но всё равно остаются проекты, где используется этот инструмент. Многим нравится Iceberg за очень гибкий выбор секционирования, ведь не требуется перезаливать данные при изменении секций. Также в Iceberg по-другому выстроена работа с metastore (состояние не хранится возле данных, а сразу пишется в таблицу). Эта точка зрения часто откликается в проектах с созданием DWH, Data Lake, Lakehouse и прочих вариаций на тему пользовательского хранилища под разные цели. Наш конечный потребитель данных — в основном ML-команды, поэтому Delta для нас так и остаётся в приоритете. Мы долго не могли выбрать между архитектурами Лямбда и Каппа, а в итоге пришли к Дельте.

Так где же мы по сей день используем Delta Lake для хранилищ данных под цели ML и не только?

Не во многих статьях упоминается фундамент работы с данными, поэтому мы решили закрыть этот гештальт. На данный момент в наших ML-проектах:

  • более 400 источников данных;

  • 400+ конвейеров Spark и Airflow;

  • прирост данных более 1 Тб в день;

  • четыре команды инженеров данных;

  • пять крупных прибыльных ML-продуктов (доходность измеряется девятизначным числом).

Базовый набор

Только Сигма выбирают Delta Lake - 4

Давайте перейдем к фреймворкам на примере ML-проекта внутри компании.

Значительно бОльшая часть данных у нас попадает в хранилище для ML-проектов в пакетном режиме, и гораздо меньше транслируется в режиме, близком к реальному времени. Соотношение 80 на 20 будет близким к истине. Данные, которые появляются в своих собственных системах, могут стать корпоративными и также транслироваться в общее хранилище, а могут и не стать.

И те, и другие данные мы практически всегда загружаем в своё хранилище с помощью Spark. В идеале загрузка происходит сразу в формате Delta Lake в сырой слой, но не в виде managed-таблицы, а файлов внутри каталога. Если же это потоковые данные, то мы используем тот же стек для загрузки в сырой слой, но уже микропакетно с помощью Spark Structured Streaming. Как раз сейчас мы находимся на этапе синергии Spark и Flink для трансляции данных из внешних хранилищ в своё.

Касательно Flink требуется отступление, без которого не обойтись. С тех пор, как всех покусала Akka, флюгер потоковой обработки данных сместился в сторону Flink, а флюгер трансформации данных — в сторону Ray. Нельзя не замечать этого движения, если вы работаете с данными. Отношение к Flink разное: кто-то рассматривает его как серебряную пулю, а кто-то очень не любит хранение состояния в K8s. Мы же считаем Flink идеальным кандидатом для транспортировки и обработки данных, если их источником служит RabbitMQ/Kafka. С приёмником данных сейчас посложнее: можно легко выбрать Iceberg или даже замахнуться на Paimon, но мы сейчас также пишем в Delta Lake. Всё благодаря четвёртому мажорному релизу Delta Lake, где коннекторы выехали в отдельный репозиторий и даже получили поддержку на Rust! Решение задачи выноса состояния куда угодно из K8s раньше выглядело сложнее: немногие решались использовать RocksDB, но страдали от неполноценного режима высокой доступности. Второй мажорный релиз Flink открывает больше возможностей для работы с состоянием вне вычислений с помощью фреймворка Fluss. Его мы и развиваем сейчас. После внедрения сделаем отдельную статью.

Данные из сырого слоя следуют дальше только в режиме managed-таблиц, но реализацию никогда не делают на чистом Spark SQL. Поэтому нам пришлось выдумывать свой аналог DBT, о чём расскажем отдельно. Благодаря богатому логу change data feed из файлов Delta мы легко управляем версиями и стремимся к change data capture в своём представлении. Нужно это было для того, чтобы в любой момент иметь возможность разобраться, из какого конкретно инкремента появилась та или иная строка. Нам так и не понадобилось включать row tracking, но CDF мы используем и по сей день. О том, как транзакционный лог таблиц мешает применению lite vacuum в Delta Lake, мы тоже расскажем в отдельной статье.

С выходом обновлений Spark и Delta становится проще управлять физическим распределением данных на дисках. Это неприменимо к Iceberg, потому что данные в нём физически не меняются, а для составления корректного плана используются метаданные. Чтобы не выполнять optimize, compaction и ZOrder во время технических окон, мы используем оптимизацию при записи в небольшие таблицы и автоматическое уплотнение данных после записи для больших. По сути, эти операции инкрементально упорядочивают файлы во время каждого изменения, поэтому оптимизация не длится ночью долго.

Конечно, объектное хранилище не такое удобное, как блочное для Delta Lake, поэтому мы используем сторонний инструмент для фиксации транзакций и формирования очереди на запись данных в каталоги. Для этого выбрали реализацию YDB, поддерживающую протоколы DynamoDB. Благодаря этому разработчики не чувствуют большой разницы при смене типа хранилища, за исключением нескольких моментов. Например, при удалении managed-таблицы в Delta приходится удалять и запись из метаданных YDB. Чтобы было проще, мы добавили эту функциональность в свой фреймворк и периодически чистим по расписанию.

Вопрос экономии средств на хранение данных и их преобразование стоит всегда очень остро, поэтому долгий путь к полноценной и стабильной работе Spark в K8s был тернистым. Начиналось всё с выбора между Yunikorn и Volcano для управления ресурсами. В итоге выбрали первое из-за большей зрелости. Во время миграции стремились достигнуть такой же скорости работы конвейеров, особенно при взаимодействии с S3, и нашли очень много интересного, о чём будет отдельный рассказ с кодом и примерами. Основная причина миграции в K8s заключалась в переходе на выполнение расчётов «по запросу». Использование кластеров с полной загрузкой на 5-8 часов в течение суток обходится дорого и в облаке, и в on prem. В случае облака и использования PaaS-решений приходилось держать в активном состоянии кластер, который не всегда находится в нагрузке 100% 24/7 из-за сложностей масштабирования . А в on prem мы всегда стремимся использовать серверы полностью из-за их стоимости и длительности поставок. Купить кластер и использовать его целиком лишь часть времени — большая финансовая ошибка. Отсюда и появилась задача миграции с PaaS в K8s, чтобы не зависеть ни от какого провайдера. Дополнительным стимулом была возможность самостоятельно управлять сборками версий фреймворков и приложений.

В результате миграции мы добрались до динамического выделения ресурсов, фонового удаления данных и использования таблиц liquid clustering, но это сработало не везде. Например, с динамическим выделением мы наткнулись на очень интересную проблему: все приложения начинают работу, получив в своё распоряжение определенную долю ресурсов, но так и не достигают 100% из-за того, что вся очередь кластера забивается другими приложениями. В итоге всё работает параллельно, но не на полной мощности. Мы эмпирически сдвигали долю нужных для запуска исполнителей с 20% до 50% в зависимости от очереди, группы нод и времени, но всё-таки смогли отыграть в потреблении в пользу экономии. Использование liquid clustering нам пригодилось не везде из-за того, что при загрузке данных в сырой слой мы для каждой таблицы самостоятельно генерируем свои идентификаторы полей составного первичного ключа. Начинали мы с классических криптографических хеш-функций, а окончили на самых быстрых некриптографических, чтобы генерировать ключи в формате BIGINT. Конечно, такая длина нужна не везде, поэтому мы планируем усекать ключи в небольших таблицах для ускорения операций join. Поскольку наши собственные ключи распределяли данные достаточно случайно, переход на liquid clustering почти нигде не давал прироста. Об этом также можем подробно рассказать уже в статье про ML Feature Store.

Благодаря решению создать свой фреймворк для управления конвейерами данных и облегчения жизни разработчикам, мы смогли превратить Airflow только в средство запуска задач посредством изменения параметров в DAG’ах (логика в них теперь отсутствует). Airflow использует наш собственный вариант Airflow-оператора для Kubernetes — MagnitK8SOperator. Для облегчения жизни ребятам из технической поддержки мы типизировали конвейеры в Airflow с помощью набора меток и описаний, чтобы появилась возможность транслировать эту информацию в каталог данных внутри OpenMetadata, а также всегда обновлять Confluence автоматически. Использование OpenMetadata оказалось непростой задачей по ряду причин:

  • нам не подошла реализация проверки качества данных и пришлось реализовывать трансляцию уже выполненных проверок самостоятельно (об этом также расскажем отдельно).

  • индексирование и профилирование всех данных на наших объёмах хранилища, которое доходило до 10 Пб, длится часами из-за работы API не в многопоточном режиме.

  • lineage данных также пришлось подгружать извне с помощью Spline (и об этом мы расскажем отдельно и подробно).

Конечно, всё получилось, но заняло много времени.

Итогом прохождения данных по всем слоям хранилища является слой datamart, к которому потребители обращаются через Trino, StarRocks либо Spark. Это легко сделать благодаря наличию metastore. В Trino у нас выполняется стандартная работа с данными по запросу — этот движок данных у нас очень популярен благодаря нативному кешированию данных. StarRocks ещё не перешёл в режим Adopt, потому что его мы планируем использовать для BI-нагрузки, а это требует трансляции данных в движки СУБД (отдельная большая задача). Обращение через Spark уже удобно ребятам из ML, чтобы строить свой Feature Store.

Конечно, ещё есть вопрос безопасности, мониторинга и ИИ. Благодаря миграции в K8s мы получили возможность:

  • управлять секретами с помощью Vault;

  • безопасно делиться данными с помощью Delta Sharing Protocol;

  • транслировать логи через Alloy в Mimir;

  • строить привычные со времен Yarn дашборды в Grafana;

  • использовать ИИ агентов для анализа логов в Spark History с помощью Data Flint;

  • применять LLM для суммаризации трейс-логов Spark/Airflow в KeepHQ.

Пример инфры

пример архитектуры данных без потребителей и IaC

пример архитектуры данных без потребителей и IaC

Разберем её по частям:

  • слева от основного блока находятся внешние источники, которые у нас, как говорилось выше, в основном пакетные, нежели потоковые.

  • загрузка данных из источников осуществляется либо Spark’ом, либо Flink’ом (первое требует Airflow для запуска по расписанию, а второе живёт вечно).

  • ресурсами в Spark и Flink в K8s управляет Yunikorn, а помогает в обслуживании блок повыше, где находятся БД NoSQL и управление очередями, хранилище секретов, сервисные БД и metastore.

  • данные сначала попадают в формате Delta как инкременты и снимки (красное ведёрко S3), а далее перемещаются по слоям (количество слоёв намеренно сделано избыточное, как пример одной из самых сложных задач, что у нас была).

  • собирает логи Alloy, хранит Mimir, а демонстрация — в привычной Grafana.

  • остаётся блок с каталогом, BI, продвинутой аналитикой логов и рабочее место в браузере (как мы развиваем Kubeflow Notebooks, расскажем отдельно).

  • на выходе Trino либо StarRocks, а для особо безопасных данных есть даже доверенный протокол Delta Sharing.

  • теперь данные могут забирать все желающие.

Собственные фреймворки

Идея с собственным фреймворком для управления конвейерами данных появилась после первой попытки передать экспертизу от одной команды другой для решения подобной задачи с реализацией хранилища данных под ML-проект. Вместо обмена файлами конфигураций и ссылками на документацию проще предоставить полноценную библиотеку с красивой страницей описания методов. Это получилось не сразу, потому что в процессе фреймворк получил много дополнительных функциональных требований. Краткий список выполняемых функций:

  • установка доступна из доверенного источника по аналогии с PyPI.

  • есть обширная документация (не требуется привлекать авторов для демонстрации).

  • можно работать не только с Delta Lake, но и с Iceberg.

  • запуск потоков данных должен происходить легко в любой среде, не должно быть избыточного кода с логикой: в сторонке пишем YAML/JSON с параметрами, а фреймворк использует его.

По сути, мы получили возможность создавать правила и рекомендации для обработки данных без перегибания палки. Общий канал уведомления об обновлениях, сбор запросов на улучшения — намного удобнее, чем беспорядочный контроль кусков кода в разных репозиториях. Самое главное, что мы смогли кратно сократить время разработки любого конвейера данных с помощью единого подхода к любым конвейерам Spark. Сейчас разработчику нужно лишь «скормить» свой код фреймворку и указать по необходимости настройки Spark-сессии (распараллеливание, процессы и память), а указания списка .jar, нужных классов, подсказок для работы с JVM и прочее произойдёт автоматически. Это очень удобно контролировать из одного места в виде общего шаблона для любого конвейера, который может быть исправлен или дополнен, как конструктор. Вариантов ошибиться стало меньше из-за единой точки входа для любого потока данных, а гибкость зашкаливает, ведь разработчик может не просто запускать любой код, а делать это в любой версии Spark с любыми обвесами. Про наш простой фреймворк Flava будет отдельная статья.

Текущие недостатки

Только Сигма выбирают Delta Lake - 6

Если посмотреть широким взглядом на используемые нами технологии, то список недостатков Delta Lake всё ещё можно свести к:

  • необходимости использовать YDB при работе с объектным хранилищем данных;

  • ненативной интеграции с потоковыми фреймворками по типу Flink.

Четвёртый мажорный релиз Delta Lake позволил для нивелирования первого недостатка использовать отдельный координатор. Суть доработки заключается в отделении метаданных об обновлении таблицы от самих данных. Это будет очень знакомо всем разработчикам, использующим Iceberg. Также можно предоставлять каталогу данных полный контроль над таблицами.

Со вторым недостатком ситуация подобная, потому что в том же релизе уже выделены в отдельные репозитории и Delta Connect, и Delta Kernel. Это расширяет экосистему такого формата хранения данных. Необходимость выделения связана со слишком большим количеством подобных типов данных и потребностью в их совместимости. Если вы уже пробовали фреймворки, написанные на Rust, то делитесь впечатлениями в комментариях.

интеграция Delta Lake с другими фреймворками

интеграция Delta Lake с другими фреймворками

С точки зрения развития связки Spark и Delta четвёртая версия обоих фреймворков позволяет полноценно отделить IDE разработчика от вычислительных мощностей кластера где-то там, за горизонтом. Суть Spark и Delta Connect заключается в возможности удалённо подключаться к работающему кластеру и отправлять задачи.

архитектура Spark Connect

архитектура Spark Connect

Задел на будущее

Только Сигма выбирают Delta Lake - 9

Наши планы на ближайшее время будут реализованы после масштабной миграции на четвёртую версию Spark/Delta. Теперь это можно сделать, просто заменив версию образа приложения, но всё становится не таким простым после наслаивания на ванильные фреймворки требований безопасности и пожеланий разработчиков. Давайте по порядку.

Unity Catalog вместо Hive позволит получить большой набор преимуществ:

  • появится уровень каталогов, а это значит, что можно будет не ограничиваться схемами и таблицами внутри одного пространства;

  • появится опция хранения нереляционных данных и даже ML-моделей в одном месте;

  • выйдем на другой уровень доступности и контроля прав и ролей, потому что один каталог могут использовать многие команды, а не несколько.

UniFormat вместо вечного спора между Delta и Iceberg позволит нам — даже уже позволяет — записывать данные в формате, который могут прочитать все без исключения. Если вы пользовались XTable до выхода UniFormat в режиме релиза, то поделитесь впечатлениями в комментариях.

Благодаря Commit Coordinator мы сможем полноценно разделить метаданные и данные. Это очередной шаг на пути к сближению с Iceberg. Вместе с включением опции, которая делегирует управление таблицами каталогу данных, Delta Lake более не зависит от метаданных, хранящихся вместе с данными. Это очень удобно с точки зрения смены типа хранилища с блочного на объектное и наоборот.

Delta Connect поможет полностью избавиться от Zeppelin и перейти на связку Kubeflow Notebooks и VS Code Server. То есть мы сделаем рабочую и, главное, привычную среду разработки для всех, но без необходимости что-то устанавливать на рабочем устройстве. Kubeflow Notebooks позволяют создавать привычную среду разработки с помощью открытой версии VS Code, где любой специалист по машинному обучению может удалённо заниматься привычными делами в Spark-кластере с поддержкой Delta Lake формата.

Что мы поняли за пять лет

Главный вывод простой: сам по себе Delta Lake не решает все проблемы с данными. Он даёт хороший фундамент — приближение к ACID, версионирование данных, change data feed, понятную связку со Spark и нормальную основу для ML-нагрузок. Но дальше начинается самое интересное: на этом фундаменте нужно построить доставку данных, оркестрирование, управление ресурсами, metastore, каталог, lineage, качество данных, наблюдаемость, соблюдение требований регулятора, безопасность и удобный доступ для потребителей.

Хотя сообщество Open Source Delta Lake очень большое и живое (по всем нашим issues в GitHub главные разработчики отвечали и решали проблемы по коду), главным остаётся адаптация этого фреймворка внутри компании и интеграция с другими решениями. Именно поэтому в статье так много рассказано не только про Delta Lake, но и про Spark, Flink, Airflow, Kubernetes, YDB, OpenMetadata, Trino, StarRocks и собственные фреймворки. В реальной платформе данных формат хранения не живёт отдельно от всего остального. Можно выбрать хороший табличный формат, но без инженерного контура вокруг него он так и останется сырой технологией из документации.

Для нас Delta Lake стала не серебряной пулей, а хорошей инженерной ставкой. Где-то она закрыла задачи почти из коробки, где-то заставила достраивать свои решения, а где-то мы до сих пор ждём упрощения жизни благодаря новым возможностям четвёртой версии. Но главное уже понятно: эта ставка выдержала промышленную эксплуатацию, миграции, рост объёмов, смену инфраструктуры и несколько поколений проектов. А значит, было не зря.

Если вам интересно

Только Сигма выбирают Delta Lake - 10

Надеюсь, вам понравилось. В будущем мы подробнее раскроем некоторые темы:

  • как у нас работает IaC и почему мы выбрали GitOps bridge.

  • как долго и сложно переносили Spark в K8s с учётом наших вводных.

  • как нам помогли политики Airflow при работе с тех поддержкой и проверке кода.

  • как мы доставляем информацию о качестве данных в OpenMetadata.

  • как у нас устроены процессы Data Governance, Data Quality и Data Lineage при работе с OpenMetadata.

  • как устроен наш фреймворк Flava.

P.S. эту статью писал человек

Автор: dishkakrauch

Источник