- BrainTools - https://www.braintools.ru -
Apache Flink — это фреймворк и распределенный движок обработки данных, поддерживающий какпакетную (ограниченную), так и потоковую (неограниченную)обработку данных. Это значит, что с его помощью можно обрабатывать как статичные (неизменяемые) данные, так и данные, поступающие в реальном времени.
Он работает как в одной, так и в различных кластерных средах, когда задачи распределены между несколькими машинами. Подобным образом работает и MapReduce, который в отличие от Flink ограничен пакетной обработкой данных.
Как мы уже знаем, Apache Flink может обрабатывать данные двумя способами:
Пакетная обработка (Batch Processing) — обработка конечного набора данных, например файлов или баз данных [1].
Потоковая обработка (Stream Processing) — работа с бесконечным потоком данных в реальном времени, как в случае с событиями и данными, поступающими постоянно.
Система Flink состоит из следующих компонентов:
— Dispatcher. Получает описание задачи от клиента или другого компонента системы, который хочет запустить задачу в Flink. Предоставляет REST API [2] для запуска задач в Flink.
— Job Manager. Управляет выполнением задач, их планированием и распределением. Выполняет операции, такие как создание чекпоинтов и восстановление приложения [3] после сбоев.
— Resource Manager. Координирует ресурсы, взаимодействует с внешними провайдерами, масштабирует приложение при необходимости.
— Task Manager. Выполняет задачи, управляет их состоянием и сообщает метрики в Job Manager.
— JobGraph. Представляет абстрактное описание вычислительного задания в Flink, которое включает последовательность этапов обработки данных [4] и зависимости между ними. Он определяет, какие операции нужно выполнить и в какой последовательности.
— Checkpoint Coordinator. Управляет созданием чекпоинтов (автоматических точек восстановления).
— Savepoint. Точка восстановления, созданная по инициативе пользователя. Позволяет сохранить текущее состояние задачи, например перед обновлением или изменением конфигурации.
— State Backend. Хранит состояние задачи и управляет им, поддерживает различные механизмы хранения и управления состоянием. Это может быть хранение в памяти [5] или на диске, с использованием различных технологий и систем для хранения данных.
— Task Slots. Единицы, которые определяют, сколько задач может одновременно выполнять TaskManager. Каждое задание (или его часть) назначается на слот для выполнения. Количество слотов в TaskManager ограничивает параллельность — количество задач, которые могут быть обработаны одновременно этим узлом.
— Shuffle и Data Exchange. Оптимизированный обмен данными между задачами. Включает передачу данных между различными этапами обработки.
— Client. Интерфейс для отправки и мониторинга задач через CLI, [6] REST API или клиентскую библиотеку.
— Metrics и Monitoring System. Система для сбора и мониторинга метрик задач и ресурсов (интегрируется с инструментами по типу Prometheus [7] и Grafana [8]).
Flink поддерживает несколько языков программирования: [10]
Java [11] — основной язык для работы с Flink с наиболее развитым API [12].
Scala [13] — второй по популярности язык для работы с Flink.
JVM-совместимые языки (например, Kotlin [14]) —поддерживаются, но без официальной поддержки.
SQL [17] — поддерживается для декларативной обработки данных.
У Apache Flink есть несколько уровней абстракции (API) для обработки данных.
SQL API — самый высокий уровень абстракции, позволяющий выполнять SQL-запросы [18] над потоками данных. Подходит для тех, кто привык работать с SQL и не нуждается в тонкой настройке.
Table API — декларативный API для работы с таблицами, похож на SQL, но поддерживает динамическую обработку потоков данных.
DataStream API — низкоуровневый API для потоковой обработки, есть операции для манипулирования потоками данных в реальном времени.
Process Function API — самый низкоуровневый API, который дает полный контроль над обработкой событий, их порядком и состоянием.
Flink SQL API и Table API [20] — это два API для аналитической обработки данных в Flink, интегрированные в одно общее API. Они позволяют выполнять SQL-запросы или работать с таблицами в потоковых приложениях.
Table API и SQL API работают со StreamTableEnvironment [21]. Это абстрактный класс для обработки таблиц в потоках данных в Apache Flink. Он расширяет интерфейс TableEnvironment [22] и нужен для работы с таблицами и SQL-запросами в потоке.
Table API — это декларативный интерфейс для работы с таблицами. Позволяет выполнять операции, похожие на SQL (select, filter, join), но с использованием методов API. Поддерживает потоковые и пакетные данные.
SQL API — это интерфейс для работы с Flink с помощью SQL. Основан на Apache Calcite [23], который обрабатывает SQL-запросы. Семантика SQL одинакова для потоков и пакетных данных.
Пример на Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
tableEnv.executeSql("CREATE TABLE MyTable (id INT, name STRING)");
Table result = tableEnv.sqlQuery("SELECT * FROM MyTable WHERE id > 10");
DataStream<Row> stream = tableEnv.toDataStream(result);
Пример на Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, settings)
tableEnv.executeSql("CREATE TABLE MyTable (id INT, name STRING)")
val result = tableEnv.sqlQuery("SELECT * FROM MyTable WHERE id > 10")
val stream = tableEnv.toDataStream(result)
Примечание: SQL и Table API работают в тесной связи с DataStream API, так как используют его для создания таблиц из потоков, выполнения вычислений, оптимизации и интеграции потоковой и декларативной обработки.
Datastream API [24] нужен для потоковой обработки данных. Он позволяет выполнять операции над потоками данных: фильтрацию, обновление состояний, агрегации, оконные операции и другие трансформации.
Потоки данных могут быть созданы из различных источников (очередей сообщений, сокетов, файлов) и выводиться в различные хранилища.
DataStream API работает с классом DataStream [25](Java или Scala) — коллекцией данных, которые могут быть как конечными, так и неограниченными (непрерывными потоками). Эти данные можно обрабатывать с помощью различных операторов и трансформаций, например:
map — применяет функцию к каждому элементу;
flatMap — преобразует элемент в 0, 1 или несколько новых элементов;
filter — отбирает элементы по условию;
keyBy — группирует поток по ключу;
reduce — агрегирует элементы внутри группы;
window — разделяет поток на окна (временные или по количеству элементов);
process — проводит низкоуровневую обработку с доступом к состоянию.
Пример на Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("input.txt");
DataStream<String> filtered = text.filter(line -> line.startsWith("Error"));
filtered.print();
Пример на Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("input.txt")
val filtered = text.filter(_.startsWith("Error"))
filtered.print()
В обоих случаях создаем потоки и фильтруем данные:
readTextFile — создает поток данных из файла;
filter — фильтрует строки, которые начинаются с Error.
Process Function API [26] предоставляет низкоуровневые функции для обработки потоковых данных, в том числе управление состоянием, временем и окнами. Этот API позволяет работать с событиями, состоянием и таймерами.
Flink поддерживает обработку с учетом Event Time [27] с помощью watermarks (assignTimestampsAndWatermarks) и оконных функций (window, trigger).
Основные механизмы:
Watermarks [28] указывают границу обработанных событий (WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))).
Timers в KeyedProcessFunction [29](ctx.timerService().registerEventTimeTimer(timestamp)) выполняют отложенные действия при достижении временного порога.
Windows [30]
(.window(TumblingEventTimeWindows.of(Time.seconds(10)))) группирует события по времени.
Triggers [31] (EventTimeTrigger.create()) определяют момент срабатывания окна.
Также Flink поддерживает низкоуровневые джойны [32]между потоками с помощью функций CoProcessFunction и KeyedCoProcessFunction.
Джойны (CoProcessFunction, KeyedCoProcessFunction) позволяют объединять события из разных потоков.
Flink поддерживает состояния (Stateful Processing [33]), которые можно сохранить с помощью Checkpoints и Savepoints.
Состояние (например, ValueState, ListState, MapState) хранит промежуточные данные (getRuntimeContext().getState(…)).
ProcessFunction и KeyedProcessFunction позволяют работать с состоянием и таймерами для ключевых потоков. С помощью RuntimeContext [34] можно управлять состоянием и регистрировать таймеры для работы как с временем событий, так и с временем обработки.
Основные моменты:
processElement() — обработка каждого события;
onTimer() — обработка по таймеру;
ctx.timerService().registerEventTimeTimer() —регистрация таймера.
Пример на Java
stream.keyBy(value -> value)
.process(new KeyedProcessFunction<String, String, String>() {
private transient ValueState<Integer> countState;
@Override
public void open(Configuration parameters) {
countState = getRuntimeContext().getState(new ValueStateDescriptor<>("countState", Integer.class, 0));
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) {
int currentCount = countState.value();
countState.update(currentCount + 1);
// Устанавливаем таймер на 1 минуту ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 60000);
out.collect(value + " processed: " + (currentCount + 1));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
out.collect("Timer triggered for key: " + ctx.getCurrentKey());
}
})
.print();
Пример на Scala
stream.keyBy(value => value)
.process(new KeyedProcessFunction[String, String, String] {
@transient var countState: ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
val descriptor = new ValueStateDescriptor[Int]("countState", classOf[Int], 0)
countState = getRuntimeContext.getState(descriptor)
}
override def processElement(value: String, ctx: KeyedProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {
val currentCount = countState.value()
countState.update(currentCount + 1)
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 60000) // Устанавливаем таймер на 1 минуту
out.collect(s"$value processed: ${currentCount + 1}")
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, String, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect(s"Timer triggered for key: ${ctx.getCurrentKey}")
}
})
.print()
Здесь:
keyBy() разбивает поток на ключи;
process() применяет обработку для каждого ключа;
ValueState хранит состояние для каждого ключа;
таймеры ожидают определенное время для срабатывания, например для расчетов или сбора данных.
Flink может работать с различными источниками данных и отправлять результаты в хранилища с помощью коннекторов [35].
Источник (Source). Функция источника получает данные и передает их в поток обработки. Пример источника — Kafka, откуда Flink может получать сообщения в реальном времени.
Синк (Sink). Функция синка принимает данные из потока и отправляет их в выбранное хранилище, например в базу данных или файл.
Часто используемые коннекторы:
Apache Kafka [36] для получения данных в реальном времени.
HDFS [37] для работы с распределенными файловыми системами.
JDBC-совместимые базы данных для работы с реляционными базами данных. Например, MySQL [38], PostgreSQL [39], H2 Database [40].
Elasticsearch [41] для индексации и поиска данных.
Flink-приложение — это программа, которая включает Flink-задачу и необходимую для ее выполнения конфигурацию.
Окружение (Environment) — это среда, в которой выполняется Flink-приложение. В окружении содержатся настройки, параметры и ресурсы для выполнения задач. Окружение для Flink может быть локальным или кластерным.
Local Environment — локальная среда для тестирования и отладки приложений на одном компьютере.
Cluster Environment — окружение, настроенное для работы в распределенном кластере (например, в YARN [42]или Kubernetes [43]), где Flink-задача выполняется на нескольких машинах.
Рассмотрим, как создать рабочее окружение Flink для работы на Java, настроить StreamTableEnvironmentдля обработки данных и подключить источники, такие как Kafka, для работы с реальными потоками данных.
Установите Java (JDK 8 или выше). Как установить Java, рассказывали в этой статье [44].
Установите Apache Flink
Скачайте Flink с официального сайта [45] и распакуйте архив в удобную директорию. Рекомендуется выбрать актуальную стабильную версию, если нет особых требований. При этом она должна быть совместима с версией Java [46].
Настройте зависимости
Убедитесь, что у вас установлен Maven [47] или Gradle [48] для работы с зависимостями.
Для Maven добавьте зависимость в файл pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version> версия </version>
</dependency>
Для Gradle добавьте зависимость в build.gradle:
dependencies {
implementation 'org.apache.flink:flink-streaming-java:версия'
}
В Flink для работы с потоковыми данными через таблицы используют StreamTableEnvironment. Он позволяет выполнять SQL-запросы и работать с таблицами в потоке данных.
Перед работой также нужно дополнить список зависимостей Table API и SQL API.
Для Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version> версия </version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner</artifactId>
<version> версия </version>
</dependency>
Для Gradle:
dependencies {
implementation 'org.apache.flink:flink-table-api-java:версия'
implementation 'org.apache.flink:flink-table-planner:версия'
}
Пример на Java
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;
public class FlinkTableExample {
public static void main(String[] args) throws Exception {
// Создаем потоковое окружение
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Создаем StreamTableEnvironment для работы с таблицами
TableEnvironment tableEnv = TableEnvironment.create(env);
// Пример SQL-запроса
String query = "SELECT * FROM my_table WHERE my_column > 100";
// Выполняем запрос и преобразуем результат в таблицу
Table result = tableEnv.sqlQuery(query);
// Трансформируем данные
result.execute().print();
}
}
В этом примере создаем StreamTableEnvironment, с помощью которого выполняем SQL-запрос для фильтрации данных из таблицы.
Для обработки данных в Flink также необходимо подключить источники, такие как Kafka, файлы, базы данных и другие, с помощью соответствующих коннекторов.
Пример подключения Kafka
Для Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>версия</version>
</dependency>
Для Gradle:
dependencies {
implementation 'org.apache.flink:flink-connector-kafka:версия'
}
Теперь, когда зависимости добавлены, можно подключить Kafka в вашем коде через SQL-запрос с использованием Flink SQL API.
tableEnv.executeSql(
"CREATE TABLE my_table (" +
" id INT," +
" name STRING," +
" value DOUBLE" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'my-topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
Здесь:
properties.bootstrap.servers — это адрес сервера Kafka, с которого Flink будет читать данные;
format указывает формат данных (например, JSON).
В StreamTableEnvironment можно использовать как SQL-запросы, так и операторы Table API для работы с потоками данных.
Пример обработки данных с помощью SQL API
Table result = tableEnv.sqlQuery("SELECT id, name FROM my_table WHERE value > 50");
result.execute().print();
Эта команда выведет строки данных, где каждый элемент таблицы будет в виде строки, а именно: значения столбцов id и name для всех записей, удовлетворяющих условию value > 50.
После настройки окружения и выполнения SQL-запросов или операций с Table API вы можете запустить задачу Flink.
Пример запуска задачи:
bin/flink run -c my.package.FlinkTableExample my-flink-job.jar
Обучиться работе с моделями машинного обучения [49]: от базовой математики [50] до написания собственного алгоритма — можно на совместной магистратуре [51] Skillfactory и МИФИ «Прикладной анализ данных и машинное обучение».
Автор: skillfactory_school
Источник [52]
Сайт-источник BrainTools: https://www.braintools.ru
Путь до страницы источника: https://www.braintools.ru/article/15772
URLs in this post:
[1] баз данных: https://blog.skillfactory.ru/glossary/baza-dannyh/
[2] REST API: https://blog.skillfactory.ru/glossary/rest-api/
[3] приложения: https://blog.skillfactory.ru/glossary/prilozhenie/
[4] обработки данных: https://blog.skillfactory.ru/predvaritelnaya-obrabotka-dannyh-v-mashinnom-obuchenii/
[5] памяти: http://www.braintools.ru/article/4140
[6] CLI,: https://blog.skillfactory.ru/glossary/interface/#:~:text=%D1%81%20%D0%BF%D1%80%D0%B8%D0%BB%D0%BE%D0%B6%D0%B5%D0%BD%D0%B8%D0%B5%D0%BC%20%E2%80%94%20%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D1%84%D0%B5%D0%B9%D1%81.-,CLI,-%D0%AD%D1%82%D0%BE%20%D0%B0%D0%B1%D0%B1%D1%80%D0%B5%D0%B2%D0%B8%D0%B0%D1%82%D1%83%D1%80%D0%B0%20%D0%B4%D0%BB%D1%8F
[7] Prometheus: https://prometheus.io/
[8] Grafana: https://grafana.com/
[9] Источник: https://softwarefrontier.substack.com/p/understanding-apache-flink-architecture
[10] языков программирования:: https://blog.skillfactory.ru/kakie-yazyki-nuzhny-v-it-professiyah/
[11] Java: https://blog.skillfactory.ru/glossary/java/
[12] API: https://blog.skillfactory.ru/glossary/api/
[13] Scala: https://blog.skillfactory.ru/glossary/scala/
[14] Kotlin: https://blog.skillfactory.ru/glossary/kotlin/
[15] Python: https://blog.skillfactory.ru/glossary/python/
[16] PyFlink: https://pyflink.readthedocs.io/en/main/index.html
[17] SQL: https://blog.skillfactory.ru/glossary/sql/?ysclid=m726nrl4sb438960105
[18] SQL-запросы: https://blog.skillfactory.ru/polnoe-rukovodstvo-po-zaprosam-sql/
[19] Источник: https://docs.confluent.io/platform/current/flink/concepts/flink.html
[20] Flink SQL API и Table API: https://jbcodeforce.github.io/flink-studies/coding/flink-sql/
[21] StreamTableEnvironment: https://www.demo2s.com/java/java-org-apache-flink-table-api-bridge-java-streamtableenvironment.html
[22] TableEnvironment: https://www.javadoc.io/static/org.apache.flink/flink-table-api-java/1.16.1/org/apache/flink/table/api/TableEnvironment.html
[23] Apache Calcite: https://calcite.apache.org/
[24] Datastream API: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/overview/
[25] DataStream: https://www.javadoc.io/static/org.apache.flink/flink-streaming-java/1.17.0/org/apache/flink/streaming/api/datastream/DataStream.html
[26] Process Function API: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/process_function/
[27] Event Time: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/#:~:text=scheduled%2C%20or%20otherwise).-,Event%20time,-%3A%20Event%20time
[28] Watermarks: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
[29] KeyedProcessFunction: https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html
[30] Windows: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/datastream/operators/windows/
[31] Triggers: https://nightlies.apache.org/flink/flink-docs-release-1.10/api/java/index.html?org/apache/flink/table/runtime/operators/window/triggers/Trigger.html
[32] джойны: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hive-dialect/queries/join/
[33] Stateful Processing: https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/
[34] RuntimeContext: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#:~:text=accessed%20using%20the-,RuntimeContext,-%2C%20so%20it%20is
[35] коннекторов: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/connector/
[36] Apache Kafka: https://blog.skillfactory.ru/glossary/kafka-apache/
[37] HDFS: https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html
[38] MySQL: https://blog.skillfactory.ru/glossary/mysql/
[39] PostgreSQL: https://blog.skillfactory.ru/glossary/postgresql/
[40] H2 Database: https://www.h2database.com/html/main.html
[41] Elasticsearch: https://blog.skillfactory.ru/glossary/elasticsearch/
[42] YARN: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html
[43] Kubernetes: https://kubernetes.io/ru/docs/concepts/overview/components/
[44] этой статье: https://blog.skillfactory.ru/start-in-java/#:~:text=%D0%B8%20%D1%84%D0%B8%D0%BD%D0%B0%D0%BD%D1%81%D0%BE%D0%B2%D1%8B%D1%85%20%D0%B8%D0%BD%D1%81%D1%82%D1%80%D1%83%D0%BC%D0%B5%D0%BD%D1%82%D0%BE%D0%B2.-,%D0%9A%D0%B0%D0%BA%20%D1%83%D1%81%D1%82%D0%B0%D0%BD%D0%BE%D0%B2%D0%B8%D1%82%D1%8C%20Java%3F,-%D0%94%D0%BB%D1%8F%20%D0%BF%D1%80%D0%BE%D0%B3%D1%80%D0%B0%D0%BC%D0%BC%D0%B8%D1%80%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D1%8F%20%D0%BD%D0%B0
[45] официального сайта: https://flink.apache.org/downloads.html
[46] совместима с версией Java: https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/java_compatibility/
[47] Maven: https://maven.apache.org/
[48] Gradle: https://gradle.org/
[49] обучения: http://www.braintools.ru/article/5125
[50] математики: http://www.braintools.ru/article/7620
[51] магистратуре: https://new.skillfactory.ru/machine-learning-mag-mephi?utm_source=blog&utm_medium=habr&utm_campaign=none_vo_mifiml_blog_habr_course_none_none_all_apacheflink_text&utm_content=apacheflink?utm_term=text
[52] Источник: https://habr.com/ru/articles/914836/?utm_campaign=914836&utm_source=habrahabr&utm_medium=rss
Нажмите здесь для печати.