- BrainTools - https://www.braintools.ru -
“Когда пару лет назад я впервые столкнулась с реактивным программированием, – рассказывает моя коллега Екатерина, – казалось, что это что-то слишком сложное и академическое. Но чем больше работаешь с современными высоконагруженными системами, тем яснее становится, что без реактивного подхода сложно обеспечить высокую отзывчивость и масштабируемость”.
разработчик Java в Programming Store
Сегодня реактивные технологии уже перестали быть экзотикой. Netflix, Uber, Alibaba — все они активно используют реактивные стеки, чтобы выдерживать миллионы одновременных подключений. И если вы Java-разработчик, то знание WebFlux, Reactor или R2DBC становится не просто преимуществом, а необходимостью.
Представьте обычное современное веб-приложение: пользователи лайкают, отправляют сообщения, загружают файлы, получают пуши — всё одновременно и в реальном времени. На классических синхронных потоках такое приложение быстро начнёт «задыхаться». Именно здесь реактивное программирование раскрывает себя во всей красе.
Если покопаться в истоках, то идеи реактивности появились ещё в 90-х, когда разработчики заговорили о потоках данных и наблюдаемых последовательностях. Но настоящий прорыв случился ближе к 2014 году. Netflix столкнулся с тем, что традиционные синхронные архитектуры просто не тянут — миллионы пользователей, постоянные запросы, гигантская нагрузка на сеть. Решение родилось внутри компании: библиотека RxJava, которая позже была открыта миру и получила широкое распространение в Java-сообществе.
Примерно в то же время Microsoft активно развивал Reactive Extensions (Rx) под .NET. Это подтвердило, что концепция универсальна и применима в любых экосистемах.
Реактивное программирование в Java — это не абстракция, а вполне конкретные технологии и стандарты:
Reactive Streams API сначала появился как отдельная спецификация (org.reactivestreams), а затем вошел в Java 9 в 2017 году как java.util.concurrent.Flow. Он определяет:
Publisher — источник данных,
Subscriber — потребитель данных,
Subscription — управление потоком,
Processor — преобразователь данных.
Project Reactor — флагманская реализация от Pivotal (создателей Spring), включающая:
Mono — решает задачи с единственным исходом,
Flux — управляет непрерывными потоками данных.
Spring WebFlux — реактивный веб-фреймворк, выпущенный в Spring 5 (2017):
работает на Netty вместо традиционных сервлет-контейнеров,
может обрабатывать значительно больше одновременных соединений при I/O-bound нагрузке по сравнению с традиционным Spring MVC.
Таким образом, Reactive Streams — это спецификация, то есть набор интерфейсов, которые описывают, как именно должны взаимодействовать компоненты в реактивной системе. Project Reactor — это уже конкретная реализация этих интерфейсов. Его классы Flux и Mono — это, по сути, Publisher, но с мощным набором операторов (map, flatMap, filter, merge и т. д.), которые позволяют легко описывать асинхронные цепочки обработки данных. А Spring WebFlux — это надстройка над Reactor, которая применяет эти принципы в веб-контексте. Она позволяет строить неблокирующие REST-контроллеры, маршруты и обработчики запросов, используя Mono и Flux как стандартные типы возвращаемых значений.
С появлением Spring Boot 3 и Java 17+ реактивное программирование стало еще доступнее. Virtual threads из Project Loom не заменяют реактивный подход, а дополняют его. Они больше подходят для блокирующего кода с высокой конкуренцией, в то время как реактивные потоки оптимальны для I/O-bound сценариев с асинхронной обработкой.
Современная реактивная экосистема Java включает:
R2DBC — реактивный доступ к реляционным БД;
Reactive MongoDB/Cassandra драйверы;
RSocket — реактивный протокол для микросервисов;
Micrometer — продвинутые метрики для мониторинга.
Если сравнивать, то традиционная модель похожа на службу такси: одна машина — один пассажир, и если где-то пробка, всё стоит. Реактивный подход ближе к метро: один состав перевозит тысячи людей одновременно, движение не останавливается, ресурсы используются эффективно.
Реактивное программирование не волшебная таблетка. Это просто естественный шаг вперёд, когда классические блокирующие подходы перестают справляться. И самое приятное — начать можно постепенно. Вы можете внедрять реактивные компоненты точечно, не переписывая всё приложение с нуля.
Команда Spring не раз показывала тесты, где WebFlux выдаёт, в среднем, в 3–4 раза больше запросов в секунду и при этом потребляет на 40% меньше памяти [1], чем классический Spring MVC. На практике это чувствуется сразу: сервер становится заметно отзывчивее даже под серьёзной нагрузкой.
Дальше я покажу, как шаг за шагом перейти к реактивному подходу, не потеряв устойчивости и простоты сопровождения кода.
Reactive Manifesto — это не сухая спецификация, а, скорее, набор идей о том, как строить живые, адаптивные системы.
Представьте, что вы создаёте не просто приложение, а организм, который должен спокойно переносить стресс [2], меняться под давлением и оставаться в форме. Именно к этому и сводятся четыре базовых принципа реактивного подхода.
Любая система должна отвечать быстро и предсказуемо, независимо от того, что происходит внутри. Пользователь не должен «зависать» в ожидании ответа и гадать, живо ли приложение. Даже под нагрузкой система должна сохранять ощущение плавности и контроля.
// Традиционный подход может "зависнуть"
public String loadUserDataBlocking(String userId) {
// Может блокировать поток на неопределенное время
return database.query("SELECT * FROM users WHERE id = ?", userId);
}
// Реактивный подход гарантирует ответ
public Mono<String> loadUserDataReactive(String userId) {
return userRepository.findById(userId)
.timeout(Duration.ofSeconds(3)) // Гарантия максимального времени ответа
.onErrorReturn("User not available"); // Всегда возвращаем результат
}
Даже лучшие системы иногда ломаются, и это нормально. Главное, чтобы поломка в одном месте не тянула за собой всё остальное.
Реактивная архитектура как раз и помогает локализовать сбои, изолировать проблемы и восстанавливаться без вмешательства человека.
public Mono<Order> processOrder(Order order) {
return inventoryService.reserveItems(order)
.transformDeferred(circuitBreaker::run) // Защита от повторяющихся сбоев
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // 3 попытки с растущей задержкой
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class, e -> {
// Переход на упрощенную логику при таймауте
return processOrderWithLimitedFunctionality(order);
})
.onErrorResume(ServiceUnavailableException.class, e -> {
// Работа без резервирования товара
return processOrderWithoutReservation(order);
})
.onErrorReturn(createFallbackOrder(order));
}
Если проводить аналогию, то это как торговый центр: отключился эскалатор — включились лифты; пропало электричество — зажглось аварийное освещение. Система не останавливается — просто переходит в другой режим.
Нагрузки растут, трафик скачет, и система должна реагировать [3] на это сама, без паники. Эластичность — это способность приложения масштабироваться туда, где нужно, и не держать лишние ресурсы, когда всё спокойно.
@Bean
public Scheduler elasticScheduler() {
// Динамический пул потоков подстраивается под нагрузку
return Schedulers.newBoundedElastic(
10, // Максимальное количество потоков
1000, // Вместимость очереди задач
"elastic-pool" // Идентификатор пула
);
}
public Flux<String> processBatchReactive(List<String> items) {
return Flux.fromIterable(items)
.parallel() // Активация параллельной обработки
.runOn(Schedulers.parallel()) // Распределение по ядрам процессора
.flatMap(this::processItem) // Конкурентная обработка элементов
.sequential(); // Возврат к последовательному потоку
}
Эластичность работает на всех уровнях: Kubernetes поднимает больше подов при пике трафика, а Reactor эффективно распределяет нагрузку уже внутри каждого экземпляра.
В реактивной архитектуре компоненты не тянут друг друга за руку, а общаются через сообщения. Это снижает связанность и делает систему гибкой: один сервис может временно отвалиться, а остальные спокойно продолжат работу.
// Реактивная отправка сообщений
public class ReactiveOrderService {
private final StreamBridge streamBridge; // Мост для отправки в брокер сообщений
public Mono<Void> processOrder(Order order) {
return Mono.fromRunnable(() ->
// Отправка заказа в очередь
streamBridge.send("orders-out-0", order)
).doOnSuccess(() ->
// Логируем факт отправки (выполнится только при успешной отправке)
log.info("Order sent: {}", order.id())
);
}
}
// Реактивный обработчик сообщений
@Component
public class OrderMessageHandler {
@Bean
public Consumer<Flux<Order>> orderProcessor() {
// Создаем реактивный потребитель, который будет автоматически вызываться при поступлении сообщений
return flux -> flux
// Преобразуем поток сообщений: каждое сообщение обрабатываем асинхронно
.flatMap(order ->
// Обрабатываем заказ в отдельном потоке для параллелизации
orderService.process(order)
.subscribeOn(Schedulers.boundedElastic())
)
// Активируем поток и начинаем обработку входящих сообщений
.subscribe();
}
}
Вместо хрупкой сети прямых вызовов получается устойчивая экосистема, где компоненты общаются через чётко определённые каналы.
Эти принципы не работают по отдельности:
Message-Driven даёт основу для Elastic масштабирования;
Resilient помогает системе оставаться Responsive даже при сбоях;
а Elastic характер поддерживает устойчивость, когда нагрузка скачет.
В итоге система не просто работает: она живёт и предсказуемо себя ведёт в условиях постоянного стресса [4]. Это и есть суть реактивного подхода: не бороться с хаосом, а научиться с ним сосуществовать.
|
Приложение |
Почему подходит |
Пример |
|
Чат, мессенджер |
Тысячи сообщений в реальном времени |
Telegram |
|
Торговая платформа |
Мгновенные обновления цен |
Биржевые терминалы |
|
Стриминговый сервис |
Потоковая передача видео |
Netflix |
|
Игровой сервис |
Многопользовательские игры онлайн |
Игровые серверы |
|
Мониторинг систем |
Постоянный поток метрик |
Grafana, дашборды |
|
Приложение |
Почему подходит |
Пример |
|
Интернет-магазин |
Синхронные транзакции с гарантированной согласованностью данных |
Amazon, OZON |
|
Банковское приложение |
Сложные транзакции со строгой согласованностью |
Мобильный банк |
|
Корпоративный портал |
Документооборот, CRM |
1С |
|
Аналитические отчёты |
Сложные расчёты, статистика |
Excel |
масштабирование: >10,000 пользователей онлайн;
реальное время: данные обновляются каждую секунду;
потоковые данные: видео, аудио, события IoT;
позволяет эффективнее использовать серверные ресурсы и масштабироваться при высокой нагрузке.
простота: команда из 1-3 разработчиков;
сложная логика [5]: много вычислений и проверок;
стандартный CRUD: формы, таблицы, отчёты;
сжатые сроки: нужно быстро выпустить MVP.
Когда использовать MONO (один результат).
public class MonoUseCases {
// поиск по ID - всегда один объект или null
Mono<User> findUserById(String userId) {
return userRepository.findById(userId);
}
// создание ресурса - возвращаем созданный объект
Mono<Order> createOrder(OrderRequest request) {
return orderRepository.save(request.toOrder());
}
// аутентификация - возвращаем токен или ошибку
Mono<AuthToken> login(String email, String password) {
return authService.authenticate(email, password);
}
// валидация - успех или ошибка
Mono<Void> validateEmail(String email) {
return emailValidator.isValid(email)
? Mono.empty()
: Mono.error(new InvalidEmailException());
}
}
Когда использовать FLUX (поток результатов).
public class FluxUseCases {
// список элементов - много объектов
Flux<Product> getAllProducts() {
return productRepository.findAll();
}
// поиск с фильтрацией - несколько результатов
Flux<User> findUsersByCity(String city) {
return userRepository.findByCity(city);
}
// реальное время - поток событий
@GetMapping(value = "/notifications",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Notification> streamNotifications(String userId) {
return notificationService.getUserNotifications(userId)
.doOnCancel(() -> log.info("Client disconnected")); // Обработка отключения
}
// обработка файлов - построчное чтение
Flux<String> readLargeFile(FilePart file) {
return file.content()
.map(buffer -> buffer.toString(StandardCharsets.UTF_8))
.filter(line -> !line.trim().isEmpty()); //Фильтрация пустых строк
}
// IoT данные - непрерывный поток с датчиков
Flux<SensorData> streamSensorData(String deviceId) {
return sensorService.subscribeToDevice(deviceId)
.sample(Duration.ofSeconds(1)) // Дросселирование - 1 значение в секунду
.onBackpressureLatest() // Только последнее значение при перегрузке
.doOnSubscribe(sub -> log.info("Subscribed to device: {}", deviceId))
.doOnComplete(() -> log.info("Device {} stream completed", deviceId));
}
// аудит и логи - поток событий системы
Flux<AuditEvent> getAuditLog(LocalDateTime from, LocalDateTime to) {
return auditRepository.findByTimestampBetween(from, to)
.sort(Comparator.comparing(AuditEvent::getTimestamp));
}
}
Традиционный подход (проблемы).
// блокирующий подход
@GetMapping("/users/{id}")
public User findUserById(String userId) {
// Каждый запрос занимает один поток на всё время выполнения
User user = userRepository.findById(userId); // Поток БЛОКИРОВАН на 200ms
return user;
// Поток освобождается только после полного выполнения
}
// ПРИ 1000 одновременных запросов:
// 1000 потоков × 200ms = 200 секунд блокировки!
// Сервер "захлёбывается" - кончаются потоки
Реактивный подход (решение).
// неблокирующий подход
@GetMapping("/users/{id}")
public Mono<User> findUserById(String userId) {
return userRepository.findById(userId);
// Поток НЕ блокируется - сразу освобождается!
// Запрос "подписывается" на результат и ждёт его асинхронно
}
// ПРИ 1000 одновременных запросов:
// 1 поток может обработать 10000+ таких запросов!
// Сервер использует ресурсы эффективно
Традиционный подход (проблемы).
// Вся коллекция загружается в память сразу
@GetMapping("/products")
public List<Product> getAllProducts() {
// Все продукты загружаются в память одновременно
List<Product> products = productRepository.findAll();
// Блокируем поток до полной загрузки всех данных
return products;
// При 1,000,000 товаров: 1GB памяти + блокировка на 2+ секунды
}
// Данные стримятся постепенно
@GetMapping(value = "/products", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<Product> getAllProductsReactive() {
return productRepository.findAll();
// Данные отправляются по мере готовности
// Поток освобождается сразу, клиент получает данные постепенно
// При 1,000,000 товаров: 1MB буфер + неблокирующая обработка
}
На практике новички часто совершают ошибки [6], которые сводят все преимущества на нет. Далее опишу три самых частых случая, когда код вроде бы «реактивный», но фактически работает синхронно, блокирует потоки или теряет данные.
1. Использование .block() и .subscribe() не там, где нужно.
Вызовы .block() или .subscribe() ломают асинхронность, если использовать их в контроллерах или сервисах.
@GetMapping("/users/{id}")
public User getUser(String id) {
// Ошибка: блокируем поток до получения результата
return userService.findById(id).block();
}
Что не так:
.block() заставляет поток ждать результат: теряется вся неблокирующая модель;
под нагрузкой сервер быстро «захлебывается», ведь каждый запрос теперь «застрял».
Как правильно:
фреймворк WebFlux сам подписывается на поток. Нужно просто вернуть Mono или Flux.
@GetMapping("/users/{id}")
public Mono<User> getUser(String id) {
// Реактивно: поток сразу освобождается
return userService.findById(id);
}
.subscribe() — не блокирует поток, но запускает конвейер. Его нельзя вызывать внутри контроллеров и сервисов — только на границах системы, где реактивность нужно «включить» вручную. Например, при интеграции с брокером сообщений, RSocket или планировщиками задач (@Scheduled).
Ошибка: запускаем поток внутри контроллера.
@GetMapping("/orders")
public void process() {
orderService.getOrders()
.subscribe(order -> log.info("Order: {}", order));
}
Хорошо:
@Bean
public Consumer<Flux<Order>> orderProcessor() {
// Здесь subscribe() нужен, чтобы запустить поток входящих сообщений
return flux -> flux
.flatMap(order ->
orderService.process(order)
.subscribeOn(Schedulers.boundedElastic())
)
.subscribe();
}
В контроллерах и сервисах — никаких .subscribe() или .block().
На границах системы (Kafka, RSocket) — можно и нужно.
2. Блокирующие вызовы внутри реактивных потоков.
Иногда разработчик вроде бы пишет на Flux и Mono, но внутри всё равно вызывает блокирующий код — базы, API, файловые операции.
public Flux<User> findAllUsers() {
return Flux.fromIterable(userRepository.findAll()); // блокирующий JPA вызов
}
Такой код формально реактивный, но по факту «тормозной». Поток Reactor ожидает завершения findAll(), пока другие операции простаивают.
Если блокирующий вызов избежать нельзя, нужно вынести его на отдельный пул потоков с помощью Schedulers.boundedElastic():
public Flux<User> findAllUsers() {
return Mono.fromCallable(() -> userRepository.findAll()) // Безопасный вызов
.subscribeOn(Schedulers.boundedElastic()) // Вынос в отдельный пул потоков
.flatMapMany(Flux::fromIterable); // Преобразование в поток
}
boundedElastic — это динамический пул потоков с ограничением по количеству активных задач, оптимальный для кратковременных блокирующих операций (I/O, файловые операции, JDBC). Но он не предназначен для тяжёлых вычислений: для этого лучше использовать Schedulers.parallel().
Проверяйте библиотеки, если они не поддерживают Reactive Streams (например, JPA или старые HTTP-клиенты), не вызывайте их напрямую в реактивных цепочках.
3. Потеря управления backpressure (перегрузка потока).
Многие новички даже не подозревают о существовании backpressure — механизма контроля скорости потока. Без него реактивное приложение может «утонуть» в собственных данных: производитель шлёт миллионы событий, а потребитель не успевает обрабатывать.
Flux.interval(Duration.ofMillis(1))
.map(this::process)
.subscribe(); // Поток быстро переполнится
На первый взгляд, Flux.interval() безопасен, но, если внутри цепочки тяжёлая обработка (например, flatMap без ограничения), поток событий может накапливаться быстрее, чем обрабатываться. Отсюда и необходимость onBackpressure.
Используйте встроенные стратегии Reactor:
.onBackpressureBuffer() — временно хранить элементы в буфере,
.onBackpressureDrop() — отбрасывать лишние,
.onBackpressureLatest() — оставлять только последние.
Flux.interval(Duration.ofMillis(1))
.onBackpressureLatest() // ограничиваем поток
.flatMap(this::process)
.subscribe();
Backpressure — это как предохранитель на конвейере. Если рабочий не успевает, система притормаживает подачу деталей, а не засыпает его тысячами.
Посмотрим на реальном примере одного метода получения заказов клиента, как будет выглядеть код, если мы перейдём от традиционного подхода к реактивному.
Традиционный код:
Такой код работает, но блокирует потоки: каждый запрос ждёт завершения обращения к БД. Под нагрузкой это быстро становится узким местом.
@RestController
@RequiredArgsConstructor
@Slf4j
public class OrderController {
private final ClientOrderFacade clientOrderFacade;
@GetMapping
public List<Order> getAllOrders() {
return clientOrderFacade.getAllOrders();
}
}
@Component
@RequiredArgsConstructor
@Slf4j
public class ClientOrderFacade {
private final OrderService orderService;
private final PersonService personService;
@Transactional
public List<Order> getAllOrders() {
Person person = personService.findByUser(UserContextHolder.getUser()).orElseThrow(() -> new RuntimeException("User not found"));
return orderService.findAllByPerson(person)
.stream()
.sorted(Comparator.comparing(Order::getCreated).reversed())
.toList();
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderServiceImpl implements OrderService {
private final OrderRepository orderRepository;
@Override
public Set<Order> findAllByPerson(Person person) {
return orderRepository.findAllByPerson(person);
}
}
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
Set<Order> findAllByPerson(Person person);
}
Реактивная версия (Spring WebFlux + Reactor):
В pom.xml нужно добавить:
<!-- Spring WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Для реактивного доступа к БД -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- Драйвер для PostgreSQL -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
@RestController
@RequiredArgsConstructor
@Slf4j
public class OrderController {
private final ClientOrderFacade clientOrderFacade;
@GetMapping
public Flux<Order> getAllOrders() {
log.info("Поиск заказов пользователя {}", UserContextHolder.getUser().getLogin());
return clientOrderFacade.getAllOrders();
}
}
List<Order> заменён на Flux<Order>. Контроллер теперь возвращает поток данных, который WebFlux отдаёт клиенту по мере готовности — без блокировки.
@Component
@RequiredArgsConstructor
@Slf4j
public class ClientOrderFacade {
private final OrderService orderService;
private final PersonService personService;
public Flux<Order> getAllOrders() {
return personService.findByUser(UserContextHolder.getUser())
.switchIfEmpty(Mono.error(new RuntimeException("User not found")))
.flatMapMany(person -> orderService.findAllByPerson(person.getId()));
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
private final OrderRepository orderRepository;
public Flux<Order> findAllByPerson(Long personId) {
return orderRepository.findAllByPersonId(personId);
}
}
@Repository
public interface OrderRepository extends R2dbcRepository<Order, Long> {
@Query("SELECT * FROM orders WHERE person_id = :personId ORDER BY created DESC")
Flux<Order> findAllByPersonId(Long personId);
}
Реактивное программирование — это не модный тренд, а естественная эволюция [7] архитектур, где важны отзывчивость, устойчивость и масштабируемость.
Оно не требует переписывать всё с нуля: начните с одного контроллера, одного реактивного потока, одного Flux. Постепенно вы почувствуете, как приложение становится живым: не ждёт, не блокируется, а реагирует.
И в этот момент вы поймёте, что система действительно работает вместе с вами, а не против вас.
Автор: bae_prosto
Источник [8]
Сайт-источник BrainTools: https://www.braintools.ru
Путь до страницы источника: https://www.braintools.ru/article/21989
URLs in this post:
[1] памяти: http://www.braintools.ru/article/4140
[2] стресс: http://www.braintools.ru/article/9548
[3] реагировать: http://www.braintools.ru/article/1549
[4] стресса: http://www.braintools.ru/article/9041
[5] логика: http://www.braintools.ru/article/7640
[6] ошибки: http://www.braintools.ru/article/4192
[7] эволюция: http://www.braintools.ru/article/7702
[8] Источник: https://habr.com/ru/articles/966502/?utm_source=habrahabr&utm_medium=rss&utm_campaign=966502
Нажмите здесь для печати.