Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность. apache.. apache. Apache Livy.. apache. Apache Livy. k8s.. apache. Apache Livy. k8s. Kubernetes.. apache. Apache Livy. k8s. Kubernetes. ml platform.. apache. Apache Livy. k8s. Kubernetes. ml platform. python.. apache. Apache Livy. k8s. Kubernetes. ml platform. python. spark.. apache. Apache Livy. k8s. Kubernetes. ml platform. python. spark. vk cloud.. apache. Apache Livy. k8s. Kubernetes. ml platform. python. spark. vk cloud. vk tech.. apache. Apache Livy. k8s. Kubernetes. ml platform. python. spark. vk cloud. vk tech. машинное обучениe.
Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 1

Установка и эксплуатация приложений Spark в облаке зачастую становятся препятствием для дата-инженеров (Data Engineer, DE): сложная работа с Helm-конфигурациями отвлекает внимание от анализа данных и замедляет подготовку среды. Но полностью отказываться от Spark зачастую нерационально, поэтому многие команды стремятся найти свое решение для обхода существующих сложностей.

Меня зовут Юрий Орлов. Я руководитель команды разработки ML Platform в VK Tech. В этой статье я расскажу о том, как мы автоматизировали развертывание Spark в облаке и создали клиент на Python, который снижает требования к знаниям в области DevOps и Kubernetes, необходимым для начала работы со Spark.

О Spark и существующих сложностях

Apache Spark — фреймворк для быстрой и эффективной обработки больших объемов данных (Big Data). 

Со Spark работают разные специалисты: разработчики, аналитики данных, ML-специалисты и дата-инженеры. При этом дата-инженеры решают с помощью Spark большой пул задач, среди которых:

  • Сбор и обработка данных (ETL). Извлечение, трансформация и загрузка данных из множества источников в централизованное хранилище.

  • Анализ и агрегирование данных. Вычисление статистики, создание отчетов и визуализаций на основе большого объема данных.

  • Поточная обработка данных (Streaming). Быстрое реагирование на события путем обработки данных в реальном времени.

  • Оптимизация вычислительных процессов. Повышение скорости и эффективности обработки данных за счет параллельных и распределенных вычислений.

  • Развертывание и мониторинг инфраструктуры. Установка, конфигурирование и поддержание стабильности работы Spark-кластеров.

Вместе с тем для дата-инженеров работа со Spark не всегда проста. Так, запуск Spark в Kubernetes требует создания YAML и компетенций в DevOps. Из-за этого DE вынуждены погружаться в особенности реализации YAML, kubernetes, взаимодействия с компонентами Spark и прочих ненужных инфраструктурных нюансов. Это повышает порог входа в работу с инструментом и значительно сказывается на производительности команд. 

Наше решение 

Чтобы преодолеть существующие сложности работы со Spark, мы с командой разработали Python-клиент, который скрывает «под капотом» всю механику работы с kubernetes и конфигами. Таким образом, при использовании данного клиента spark job запускается прямо из python и airflow, а инженеры могут писать пайплайны, а не YAML.

Сам Python-клиент устанавливается в Notebook Jupyterhub и хранит в себе множество настроек для интеграции с Kubernetes-кластером, на котором установлен Apache Spark с Spark-оператором.

Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 2

Таким образом мы избавляем дата-инженеров от необходимости выполнения множества низкоуровневых операций: от подключения к Kubernetes до указания дополнительных переменных в окружениях. 

Сравнение методов запуска Spark 

Чтобы понять, насколько удобной получилась наша реализация, сравним ее с другими способами запуска Spark.

Запуск Spark c YARN

Для начала рассмотрим небольшой фрагмент кода, касающийся YARN.

from pyspark.sql import SparkSession

# Создаем SparkSession с YARN как cluster manager
spark = SparkSession.builder 
   .appName("WordCountOnYARN") 
   .master("yarn") 
   .getOrCreate()
lines = spark.read.text("hdfs:///data/input.txt").rdd.map(lambda r: r[0])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)) 
                  .reduceByKey(lambda a, b: a + b)
word_counts.saveAsTextFile("hdfs:///data/output")
spark.stop()

Здесь, кажется, все просто:

  • создаем экземпляр сессии;

  • пишем нужный код;

  • подключаемся к YARN, после чего начинается обработка исходных данных и их преобразование в нужный вид.

Но в действительности у такой реализации есть несколько нюансов.

  • Необходимость HADOOP-кластера с файловой системой HDFS. Это большой комбайн, который нужно не только запустить, но и поддерживать.

  • Сложность поддержки YARN-кластера. Необходима поддержка YARN — отдельного кластерного компонента, который также разворачивается в Kubernetes и имеет собственные тонкости, многочисленные настройки и потенциальные трудности. 

  • Недостаточная гибкость настроек. В настройках есть определенные ограничения. 

Таким образом, для полноценной работы с такой реализацией потребуется привлечение как минимум одного DevOps-специалиста. В противном случае дата-инженеры будут вынуждены тратить время на настройку работы с внешними клиентами (например, airflow).

Запуск spark в k8s

В случае Kubernetes сразу появляется зависимость от элементов k8s, в первую очередь — от манифестов и правил их составления. 

Причем настройка взаимосвязей между компонентами и внесение правок в манифесты — дело непростое. Для наглядности разберем небольшой фрагмент манифеста:

{ "apiVersion":"sparkoperator.k8s.io/v1beta2",

   "kind":"SparkApplication",
   "metadata":{
      "name":"job-37666432",
      "namespace":"sparkcluster-tnz2g7qn"
   },
   "spec":{
      "driver":{
         "affinity":{
            "nodeAffinity":{
               "requiredDuringSchedulingIgnoredDuringExecution":{
                  "nodeSelectorTerms":[
                     {
                        "matchExpressions":[
                           {
                              "key":"data.vkcs.cloud/nodegroup",
                              "operator":"In",
                              "values":[
                                 "sparkjobs-tnz2g7qn"
                              ]}]}]}}},
         "annotations":{
            "data.vkcs.cloud/log-filter":"sparkjob"
         },
         "coreLimit":"1200m",
         "cores":1,
         "env":[{
               "name":"AWS_ACCESS_KEY_ID","valueFrom":{

При изучении манифеста становится очевидно, что даже для запуска маленькой простой джобы в Spark нужно:

  • описывать YAML-манифесты;

  • знать детали kubernetes, понимать, где и какие разделы располагаются.

Все это приводит к тому, что дата-инженеры вынуждены сначала изучать и помнить детали инфраструктуры, а уже потом приступать к написанию запросов и созданию пайплайнов. При этом даже незначительные изменения в пайплайнах занимают дополнительное время, поскольку каждое обновление конфигурации требует соответствующих исправлений в манифестах Kubernetes.

Запуск spark job через наш Python-клиент

Теперь к тому, как запускается spark job в нашей реализации. Для примера рассмотрим задачу конвертации набора данных из формата CSV в Parquet посредством Spark.

Здесь все просто:

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, IntegerType, DecimalType, StringType
from pyspark.sql.functions import StructType

def check_files_exists(spark: SparkSession, input_s3_path: str):
   print(f"Checking input path: {input_s3_path}/customer.tbl")
   try:
       temp_df = spark.read.text(f"{input_s3_path}/customer.tbl").limit(1)
       count = temp_df.count()
       print(f"File accessible, found {count} line(s)")
   except Exception as e:
       print(f"Cannot access file: {e}")
       raise
def convert_customer():
   input_s3_path = os.environ.get('INPUT_S3_PATH')
   output_s3_path = os.environ.get('OUTPUT_S3_PATH')
   scale_factor = os.environ.get('SCALE_FACTOR')

    customer_schema = StructType([
       StructField("c_custkey", IntegerType(), True),
       StructField("c_name", StringType(), True),
       StructField("c_address", StringType(), True),
       StructField("c_nationkey", IntegerType(), True),
       StructField("c_phone", StringType(), True),
       StructField("c_acctbal", DecimalType(15, 2), True),
       StructField("c_mktsegment", StringType(), True),
       StructField("c_comment", StringType(), True)
   ])

   spark = SparkSession.builder 
       .appName(f"Convert_Customer_SF{scale_factor}") 
       .getOrCreate()

Помимо этой джобы, мы создаем еще дополнительный Python-файл, который будет ей управлять. 

import logging
import uuid
from mlplatform_client.v2 import BasicAuth, SparkCluster
from mlplatform_client.v2.utils import wait_job_running, wait_job_succeeded
from mlplatform_client.v2.clients.spark import SparkClient

log = logging.getLogger()

ML_PLATFORM_HOST = "https://spark.tnz2g7qn.data.bizmrg.com"
KEYSTONE_USERNAME="uniq_user"
KEYSTONE_PASSWORD="pass"

def run():

   cluster = SparkCluster(
       SparkClient(
           host=ML_PLATFORM_HOST,
           auth=BasicAuth(username=KEYSTONE_USERNAME, password=KEYSTONE_PASSWORD),
           skip_tls_verify=False
       )
   )

   job_name = f"job-{str(uuid.uuid4()).encode().hex()[:8]}"
   manifest = cluster.jobs.get_default_manifest(job_name)
   manifest.set_executor_settings({"instances": 1, "cores": 1})
   job = cluster.jobs.submit_pyjob(manifest, pyfile="jobs/test_job.py")
   wait_job_running(job, delay=8)
   log.info(job.info())
   wait_job_succeeded(job, delay=8)
   log.info(job.logs())

if name == "__main__":
   run()

После этого остается только запустить Python-файл, и джоба начнет выполняться — ничего больше от дата-инженера не требуется.

Причем удобство метода не ограничивается простыми пайплайнами. Так, если нужно изменить какие-либо настройки (например, число экземпляров, количество выделяемых ядер CPU, размер оперативной памяти, число partition’ов), достаточно добавить в код еще один метод.

Например, возьмем уже рассмотренный ранее код:

cluster = SparkCluster(
   SparkClient(
       host=ML_PLATFORM_HOST,
       auth=BasicAuth(username=KEYSTONE_USERNAME, password=KEYSTONE_PASSWORD),
       skip_tls_verify=False
   )
)

job_name = f"job-{str(uuid.uuid4()).encode().hex()[:8]}"
manifest = cluster.jobs.get_default_manifest(job_name)
manifest.set_executor_settings({"instances": 1, "cores": 1})
job = cluster.jobs.submit_pyjob(manifest, pyfile="jobs/test_job.py")
wait_job_running(job, delay=8)
log.info(job.info())
wait_job_succeeded(job, delay=8)
log.info(job.logs())

Чтобы внести в него такой параметр, как число партишенов, достаточно добавить еще один метод:

manifest.set_spark_conf({
   "spark.sql.shuffle.partitions": "600"
})

То есть не надо вручную вносить изменения в манифесты, следить за правильностью отступов и другими нюансами — все сводится к указанию одного дополнительного параметра.

Таким образом, в нашей реализации дата-инженеры получают:

  • простой запуск из python (YAML и знание kubernetes не нужно);

  • понятный интерфейс для работы — все управление можно осуществлять с помощью одной библиотеки.

Пример более сложных кейсов

Мы рассмотрели простой пример, когда запускается одна джоба. Но на практике такие простые задачи встречаются редко. Поэтому немного усложним и посмотрим, как наш Python-клиент будет работать в кейсах, приближенных к реальным.

Для начала примем условие, что:

  • джобу необходимо запускать определенное количество раз;

  • требуется автоматизированное расписание запуска джобы и контроль зависимостей;

  • важно иметь возможность легко править код и параметры;

  • нужен прозрачный контроль выполнения задач.

Саму задачу оставляем прежней — преобразовать CSV в parquet с помощью Spark.

Для автоматизации воспользуемся Airflow.

Что ж, перейдем к обзору алгоритма.

Примечание: В рамках примера будем использовать Spark 3.5.1 и Airflow 2.7.1.

Итак, рассмотрим пример DAG в Airflow. Первым делом берем уже упомянутую задачу, которая конвертирует CSV в parquet.

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, IntegerType, DecimalType, StringType
from pyspark.sql.functions import StructType
def check_files_exists(spark: SparkSession, input_s3_path: str):

   print(f"Checking input path: {input_s3_path}/customer.tbl")
   try:
       temp_df = spark.read.text(f"{input_s3_path}/customer.tbl").limit(1)
       count = temp_df.count()
       print(f"File accessible, found {count} line(s)")
   except Exception as e:
       print(f"Cannot access file: {e}")
       raise
        

def convert_customer():
   input_s3_path = os.environ.get('INPUT_S3_PATH')
   output_s3_path = os.environ.get('OUTPUT_S3_PATH')
   scale_factor = os.environ.get('SCALE_FACTOR')

   customer_schema = StructType([
       StructField("c_custkey", IntegerType(), True),
       StructField("c_name", StringType(), True),
       StructField("c_address", StringType(), True),
       StructField("c_nationkey", IntegerType(), True),
       StructField("c_phone", StringType(), True),
       StructField("c_acctbal", DecimalType(15, 2), True),
       StructField("c_mktsegment", StringType(), True),
       StructField("c_comment", StringType(), True)
   ])

   spark = SparkSession.builder 
       .appName(f"Convert_Customer_SF{scale_factor}") 
       .getOrCreate()

Далее мы на Python составляем типовой DAG для Airflow.

with DAG("csv-to-parquet",
        default_args=DEFAULT_ARGS,
        schedule=None,
        catchup=False,
        dagrun_timeout=timedelta(hours=16),
        tags=[DEFAULT_ARGS["owner"]]
        ) as dag:

   def check_environ_params(env_vars, **context):
       os.environ.update(env_vars)

       required_vars = ["ML_PLATFORM_HOST", "KEYSTONE_USERNAME",
                        "KEYSTONE_PASSWORD", "S3_ENDPOINT",
                        "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY",
                        "AWS_REGION", "BUCKET_NAME"]

       for var in required_vars:
           if var not in os.environ:
               raise ValueError(f"Required environment variable {var} is missing")

           if not os.environ[var]:
               raise ValueError(f"Environment variable {var} is empty")

       for name, value in os.environ.items():
           print(f"{name}: {value}")

Следом обе джобы складываем в S3, к которому подключен этот DAG. 

Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 3

Туда же загружаем Python-файл с DAG. 

Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 4

Далее просто переходим в Airflow, где уже будет доступен загруженный DAG. Находим его и нажимаем Play.

Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 5

На этом подготовка завершена: сразу после в Airflow можно отслеживать процесс выполнения задач и статус их завершения («успешно» или «неудачно»).

Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 6

Помимо этого, здесь же через интерфейс Airflow можно получить подробный отчет о выполнении каждой задачи: чтобы увидеть всю информацию о задаче, достаточно перейти во вкладку с логами.

Простой Python, автоматический Spark: минус Kubernetes, плюс продуктивность - 7

К выводам

Безусловно, мы не первооткрыватели этого решения — подобная схема несколько раньше уже была реализована в сервисе Apache Livy. Вместе с тем в Livy есть несколько значительных недостатков — например:

  • надо поднимать и поддерживать отдельный микросервис Livy;

  • Livy взаимодействует непосредственно с мастер-нодой и воркер-нодами Spark, обходя Spark-оператор.

Реализовав свою схему, мы не просто исключили эти требования — мы сделали решение, с которым дата-инженеры могут запускать код PySpark напрямую из скриптов Python или создавать автоматизированные конвейеры без погружения в инфраструктурные детали. Это сокращает время от старта написания кода до выполнения задач и повышает продуктивность команд, работающих с большими данными в облачной среде.

А как со Spark работают дата-инженеры в вашей команде? Делитесь опытом.

Автор: YO_N

Источник

Rambler's Top100