Что такое реактивность в программировании
Перейти к содержимому

Что такое реактивность в программировании

  • автор:

ПРИМЕНЕНИЕ РЕАКТИВНОГО ПРОГРАММИРОВАНИЯ И МОДЕЛИ КОМПЛЕКСНОЙ ОБРАБОТКИ СОБЫТИЙ Текст научной статьи по специальности «Компьютерные и информационные науки»

Аннотация научной статьи по компьютерным и информационным наукам, автор научной работы — Алпатов Алексей Николаевич, Сороков Артём Сергеевич

Реактивное программирование становится все более актуальным в современном программировании. Это связано с тем, что сейчас все больше приложений работают в режиме реального времени и требуют быстрого и эффективного обработки данных. Данный подход предоставляет инструменты для создания таких приложений, которые могут быстро реагировать на изменения внешних условий и обрабатывать большие объемы данных. Статья рассматривает принципы и концепции реактивного программирования и модели комплексной обработки событий, а также примеры их применения. Мы также обсудим преимущества использования этих подходов вместе.

i Надоели баннеры? Вы всегда можете отключить рекламу.

Похожие темы научных работ по компьютерным и информационным наукам , автор научной работы — Алпатов Алексей Николаевич, Сороков Артём Сергеевич

РЕАЛИЗАЦИЯ ВЕБ СЕРВИСА С ПРИМЕНЕНИЕМ ПАРАДИГМЫ РЕАКТИВНОГО ПРОГРАММИРОВАНИЯ
Система выполнения моделей машинного обучения на потоке событий
Обзор состояния области потоковой обработки данных
Использование реактивного программирования при разработке мобильных приложений

Использование технологии программирования Command Query Responsibility Segregation (cqrs) для распределенных информационных сервисов

i Не можете найти то, что вам нужно? Попробуйте сервис подбора литературы.
i Надоели баннеры? Вы всегда можете отключить рекламу.

APPLICATION OF REACTIVE PROGRAMMING AND COMPLEX EVENT PROCESSING MODELS

Reactive programming is becoming more and more relevant in modern programming. This is due to the fact that now more and more applications work in real time and require fast and efficient data processing. This approach provides the tools to create such applications that can quickly respond to changes in external conditions and process large amounts of data. The article considers the principles and concepts of reactive programming and complex event processing models, as well as examples of their application. We will also discuss the benefits of using these approaches together.

Текст научной работы на тему «ПРИМЕНЕНИЕ РЕАКТИВНОГО ПРОГРАММИРОВАНИЯ И МОДЕЛИ КОМПЛЕКСНОЙ ОБРАБОТКИ СОБЫТИЙ»

Столыпинский вестник №5/2023

Научная статья Original article УДК 004.4

DOI 10.55186/27131424 2023 5 5 7

ПРИМЕНЕНИЕ РЕАКТИВНОГО ПРОГРАММИРОВАНИЯ И МОДЕЛИ КОМПЛЕКСНОЙ ОБРАБОТКИ СОБЫТИЙ

APPLICATION OF REACTIVE PROGRAMMING AND COMPLEX EVENT PROCESSING MODELS

Алпатов Алексей Николаевич, кандидат технических наук, доцент кафедры «Инструментального и прикладного программного обеспечения», РТУ МИРЭА, Россия, г. Москва

Сороков Артём Сергеевич, студент, 4 курс, институт информационных технологий, РТУ МИРЭА, Россия, г. Москва

Alpatov Alexey Nikolaevich, Candidate of Technical Sciences, Associate Professor of the Department of «Instrumental and Applied Software», RTU MIREA, Russia, Moscow, e-mail: alpatov@mirea.ru

Sorokov Artem Sergeevich, student 4th year, Institute of Information Technology, RTU MIREA, Russia, Moscow, e-mail: sorokov_a02@mail.ru

Реактивное программирование становится все более актуальным в современном программировании. Это связано с тем, что сейчас все больше

приложений работают в режиме реального времени и требуют быстрого и эффективного обработки данных. Данный подход предоставляет инструменты для создания таких приложений, которые могут быстро реагировать на изменения внешних условий и обрабатывать большие объемы данных.

Статья рассматривает принципы и концепции реактивного программирования и модели комплексной обработки событий, а также примеры их применения. Мы также обсудим преимущества использования этих подходов вместе.

Reactive programming is becoming more and more relevant in modern programming. This is due to the fact that now more and more applications work in real time and require fast and efficient data processing. This approach provides the tools to create such applications that can quickly respond to changes in external conditions and process large amounts of data.

The article considers the principles and concepts of reactive programming and complex event processing models, as well as examples of their application. We will also discuss the benefits of using these approaches together.

Ключевые слова: Реактивное программирование, комплексная обработка событий, Java.

Keywords: Reactive programming, complex event processing, Java.

В современном мире программное обеспечение стало неотъемлемой частью нашей жизни. Однако, с увеличением сложности приложений и возрастанием числа пользователей, стандартные подходы к программированию становятся недостаточно эффективными. В такой ситуации реактивное программирование и модель комплексной обработки событий могут стать решением проблемы. Также такой подход используется для разработки приложений, которые реагируют на изменения внешних

условий и событий, а не просто выполняют последовательность заданных действий.

Реактивное программирование — это подход к программированию, который использует асинхронные потоки данных и событий для создания более отзывчивых приложений. Основные принципы реактивного программирования включают в себя: реактивность, асинхронность, отзывчивость, управление ошибками и распределенность.

Реактивные приложения обычно состоят из компонентов, которые могут быть связаны друг с другом через потоки данных. Эти потоки данных могут быть как однонаправленными, так и двунаправленными. Потоки данных могут также быть преобразованы, комбинированы и агрегированы с помощью функциональных операций, таких как отображение, фильтрация, свертка и т.д.

Существует множество инструментов и библиотек, которые поддерживают реактивное программирование. Некоторые из них включают в себя ReactiveX, Akka, RxJava, Project Reactor и другие. Эти инструменты предоставляют различные способы создания и управления потоками данных, а также реализацию других принципов реактивного программирования.

Примером применения реактивного программирования может быть система мониторинга, которая собирает и анализирует данные из разных источников в реальном времени. В такой системе, данные могут поступать из различных источников, таких как датчики, логи и базы данных. Реактивное программирование может быть использовано для эффективной обработки потоков данных, фильтрации, трансформации и агрегации данных, а также для мгновенной обработки ошибок и исключений.

Модель комплексной обработки событий (Complex Event Processing, CEP) — это методология обработки и анализа потоков событий с целью выявления сложных паттернов и событий, которые не могут быть обнаружены с помощью простых правил и условий. CEP используется в различных

областях, таких как финансы, здравоохранение, телекоммуникации, транспорт и другие.

Основная идея CEP заключается в том, чтобы обработать и анализировать потоки событий в реальном времени с использованием сложных алгоритмов и правил, которые позволяют выявлять скрытые связи и зависимости между событиями. Например, CEP может быть использован для мониторинга финансовых рынков и выявления аномалий или для определения паттернов в здоровье пациентов на основе их медицинских данных.

Существует множество инструментов и библиотек, которые поддерживают модель комплексной обработки событий. Некоторые из них включают в себя Apache Flink, Esper, StreamBase и другие.

Реактивное программирование и модель комплексной обработки событий могут быть использованы вместе для создания масштабируемых, отказоустойчивых и высокопроизводительных систем обработки данных в реальном времени. Эти подходы могут быть комбинированы для обработки сложных потоков данных, выявления скрытых паттернов и аномалий, а также для эффективной обработки ошибок и исключений.

Например, система мониторинга финансовых рынков может использовать реактивное программирование для управления и обработки потоков данных, а модель комплексной обработки событий может быть использована для анализа и принятия решений на основе этих данных. Комбинируя эти подходы, можно создать систему, которая быстро и эффективно анализирует большие объемы данных в реальном времени, выявляет изменения и аномалии на рынке и предлагает соответствующие действия.

Другим примером может быть система управления транспортным потоком, где реактивное программирование используется для управления потоками данных о движении транспорта, а модель комплексной обработки событий используется для принятия решений на основе этих данных,

например, для определения оптимального маршрута для транспорта в реальном времени.

Реактивное программирование и модель комплексной обработки событий являются эффективными подходами для создания масштабируемых, отказоустойчивых и высокопроизводительных систем обработки данных в реальном времени. Комбинируя эти подходы, можно создать систему, которая быстро и эффективно обрабатывает большие объемы данных, выявляет аномалии и принимает соответствующие действия. В будущем, эти подходы будут продолжать развиваться и улучшаться, что позволит создавать еще более мощные системы обработки данных.

Однако, необходимо учитывать, что реактивное программирование и модель комплексной обработки событий не всегда являются оптимальными решениями для всех задач обработки данных. Некоторые задачи могут быть решены более эффективно с использованием других подходов и инструментов.

Тем не менее, с ростом количества данных, которые необходимо обрабатывать в реальном времени, реактивное программирование и модель комплексной обработки событий становятся все более важными и необходимыми для создания эффективных систем обработки данных. Их использование позволяет создавать системы, которые обеспечивают высокую производительность, масштабируемость и отказоустойчивость, что является ключевым требованием для многих современных приложений и систем.

Таким образом, реактивное программирование и модель комплексной обработки событий представляют собой мощные инструменты для обработки данных в реальном времени и будут продолжать использоваться в будущем для создания высокопроизводительных и эффективных систем обработки данных.

Использование этих подходов может помочь разработчикам создавать системы, которые могут быстро и эффективно обрабатывать большие объемы

данных, выявлять аномалии и принимать соответствующие действия. Это особенно важно в условиях, когда время реакции на события является критически важным, например, в системах финансового мониторинга, телекоммуникационных сетях, системах безопасности и других подобных системах.

Будущее этих подходов обещает быть светлым, так как растущее количество данных и требования к обработке данных в реальном времени будут продолжать стимулировать развитие и усовершенствование этих подходов. Ожидается, что в будущем, эти подходы будут еще более широко применяться во многих отраслях, включая финансы, здравоохранение, транспорт и другие, что приведет к созданию новых возможностей для развития и инноваций.

Реактивное программирование и модель комплексной обработки событий представляют собой мощные инструменты для обработки данных в реальном времени. Результаты их использования уже показали свою эффективность в различных отраслях, включая финансы, здравоохранение, транспорт и другие.

Одним из главных достоинств этих подходов является их способность обрабатывать большие объемы данных и выявлять скрытые паттерны и аномалии в потоках данных. Это позволяет создавать системы, которые быстро и эффективно обрабатывают данные и принимают соответствующие действия.

Однако, применение реактивного программирования и модели комплексной обработки событий требует от разработчиков некоторой экспертизы и умения мыслить реактивно. Некоторые инструменты и библиотеки могут также быть достаточно сложными для использования, что может привести к трудностям при их внедрении.

Тем не менее, будущее этих подходов обещает быть светлым, так как растущее количество данных и требования к обработке данных в реальном

времени будут продолжать стимулировать развитие и усовершенствование этих подходов. Ожидается, что в будущем, эти подходы будут еще более широко применяться во многих отраслях, что приведет к созданию новых возможностей для развития и инноваций.

Таким образом, реактивное программирование и модель комплексной обработки событий являются перспективными подходами для обработки данных в реальном времени, и их применение будет продолжать расти и развиваться в будущем

1. Введение в реактивное программирование. [Электронный ресурс]. URL: https://habr.com/ru/companies/arcadia/articles/432004/ (дата обращения: 07.04.2023).

2. Реактивное программирование на Java: как, зачем и стоит ли? Часть I [Электронный ресурс]. https://habr.com/ru/companies/oleg-bunin/articles/543386/ (дата обращения: 10.04.2023).

3. Реактивное программирование на Java: как, зачем и стоит ли? Часть II [Электронный ресурс]. https://habr.com/ru/companies/oleg-bunin/articles/545702/ ls.html (дата обращения: 17.04.2023).

4. Обработка сложных событий. [Электронный ресурс]. URL: https://ru.wikibrief.org/wiki/Complex_event_processing (дата обращения: 20.04.2023).

5. Поиск событийных цепочек в реальном времени с CEP-библиотекой Apache Flink. [Электронный ресурс]. URL: https://bigdataschool.ru/blog/cep-library-flink-for-event-streaming-analytics.html (дата обращения: 07.05.2023).

6. Комплексная обработка событий — взгляд начинающего. [Электронный ресурс]. URL: https://coderlessons.com/articles/programmirovanie/kompleksnaia-obrabotka-sobytii-vzgliad-nachinaiushchego (дата обращения: 10.05.2023).

1. Introduction to reactive programming. [Electronic resource]. URL: https://habr.com/ru/companies/arcadia/articles/432004/ (date of access: 07.04.2023)

2. Reactive programming in Java: how, why and is it worth it? Part I [Electronic resource]. https://habr.com/ru/companies/oleg-bunin/articles/543386/ (date of access: 10.04.2023).

3. Reactive programming in Java: how, why and is it worth it? Part II [Electronic resource]. https://habr.com/ru/companies/oleg-bunin/articles/545702/ls.html (accessed 17.04.2023).

4. Handling complex events. [Electronic resource]. URL: https://ru.wikibrief.org/wiki/Complex_event_processing (accessed 20.04.2023).

5. Search for real-time event chains with the Apache Flink CEP library. [Electronic resource]. URL: https://bigdataschool.ru/blog/cep-library-flink-for-event-streaming-analytics.html (date of access: 07.05.2023).

6. Complex event processing — a beginner’s view. [Electronic resource]. URL: https://coderlessons.com/articles/programmirovanie/kompleksnaia-obrabotka-sobytii-vzgliad-nachinaiushchego (accessed 10.05.2023).

© Сороков А.С., 2023 Научный сетевой журнал «Столыпинский вестник» №5/2023

Для цитирования: Сороков А.С. ПРИМЕНЕНИЕ РЕАКТИВНОГО ПРОГРАММИРОВАНИЯ И МОДЕЛИ КОМПЛЕКСНОЙ ОБРАБОТКИ СОБЫТИЙ// Научный сетевой журнал «Столыпинский вестник» №5/2023

Реактивное программирование на реальных примерах: подробное введение

Обложка поста Реактивное программирование на реальных примерах: подробное введение

Обучение реактивному подходу в программировании — достаточно непростая вещь, и недостаток обучающих материалов только усугубляет этот процесс. Большинство существующих обучающих пособий не дают глубокого обзора и не рассказывают о том, как спроектировать архитектуру проекта в целом.

Этот материал направлен на то, чтобы помочь новичкам начать думать по-настоящему “реактивно”.

Так что же такое реактивное программирование?

Есть множество не до конца верных определений и объяснений в интернете. Википедия дает слишком скупое описание. Ответы на Stack Overflow часто непонятны новичкам. Реактивный Манифест выглядит так, будто его писали для руководителей проектов или бизнесменов. Rx терминология от Microsoft, гласящая о том, что “Rx = Observables + LINQ + Schedulers”, звучит настолько тяжело и по-майкрософтовски, что большинство из нас слабо понимает, о чем идёт речь. Такие термины, как “реактивность” и “распространение изменений” не выражают ничего, что бы отличалось от обычного MV* подхода, реализованного уже на бесчисленном множестве языков. Любой фреймворк реагирует на изменения моделей. В любом фреймворке изменения распространяются. Если бы это было не так, пользователь не видел бы никаких изменений.

Дадим подробное объяснение термину “реактивное программирование”.

Реактивное программирование — программирование с асинхронными потоками данных

Впрочем, ничего нового. Event bus’ы или обычные события клика — это тоже асинхронные потоки данных, которые вы можете прослушивать, чтобы реагировать какими-либо действиями. Реактивность — это та же самая идея, возведенная в абсолют. Вы можете создавать потоки данных не только из событий наведения или кликания мышью. Потоком может быть что угодно: переменные, пользовательский ввод, свойства, кэш, структуры данных и т.п. Например, представьте, что ваша лента новостей в Твиттере — поток событий. Вы можете слушать этот поток и реагировать на события соответственно.

Кроме этого, вы получаете удивительный набор функций для комбинирования, создания и фильтрации этих потоков. Вот где проявляется вся магия этого подхода. Один или несколько потоков могут использоваться как входные данные для другого потока. Вы можете объединять два потока. Также вы можете фильтровать поток, выбирая только те события, которые вам интересны.

Так как потоки — основопологающая вещь в реактивном подходе, давайте рассмотрим их подробнее на примере пользовательского клика мышью:

Реактивное программирование на реальных примерах: подробное введение 1

Поток — это последовательность событий, упорядоченная по времени. Он может выбрасывать три типа данных: значение (определенного типа), ошибку или сигнал завершения. Сигнал завершения распространяется, когда текущее окно или окно, содержащее кнопку, закрывается.

Мы перехватываем эти события асинхронно, указывая одну функцию, которая будет вызываться, когда выброшено значение, другую для ошибок и третью для обработки сигнала завершения. В некоторых случаях можно опустить последние две и сфокусироваться на объявлении функции для перехвата значений. Прослушивание потока называется подпиской (subscribing). Функции, которые мы объявляем, называются наблюдателями (observer). Поток — это объект наших наблюдений (observable, наблюдаемый объект). Это в точности паттерн проектирования, называемый “Наблюдатель“. Подробнее о шаблонах проектирования для новичков, читайте в нашей статье.

В данном руководстве мы будем использовать альтернативный способ представления вышеупомянутой диаграммы с помощью ASCII символов:

--a---b-c---d---X---|-> a, b, c, d - генерируемые значения X - ошибка | - сигнал завершения ---> - временная ось 

Теперь давайте сгенерируем новые потоки сообщений клика, трансформированные из оригинального потока.

Для начала сделаем поток счетчиков, который определяет, сколько раз кнопка была нажата. В большинстве реактивных библиотек у каждого потока есть множество встроенных функций, таких как map, filter, scan и т.д. Когда вы вызываете одну из этих функций, например clickStream.map(f) , она возвращает новый поток, основанный на родительском. Функции не модифицируют родительский поток. Это называется неизменяемостью и является неотъемлемой частью реактивного подхода, позволяя нам вызывать цепочку функций, например clickStream.map(f).scan(g) :

clickStream: ---c----c--c----c------c--> vvv map(c становится 1) vvv ---1----1--1----1------1--> vvvvvvvvv scan(+) vvvvvvvvv counterStream: ---1----2--3----4------5--> 

Функция map(f) заменяет каждое полученное значение в соответствии с вашей реализацией функции f . В нашем случае функция map производит значение “1” после каждого клика. Функция scan(g) аггрегирует все предыдущие значения, производя значение x = g(accumulated, current) , где g в данном случае — это просто функция сложения. В конечном итоге counterStream выбрасывает общее количество кликов.

Чтобы вы поняли всю мощь реактивного подхода, давайте предположим, что вы хотите реализовать поток событий “двойной клик”. Чтобы сделать эту задачу еще интереснее новый поток должен принимать множественные нажатия за двойные. Представьте себе, как бы вы реализовывали эту задачу в императивном стиле. Потребовалось бы несколько переменных, хранящих состояние, и использование интервалов.

В реактивном же подходе все достаточно просто. Всю описанную выше логику можно реализовать четырьмя строками кода. Но давайте пока не останавливаться на коде. Лучший способ научиться понимать и проектировать потоки, вне зависимости от того, новичок вы или эксперт — это изображать диаграммы:

Реактивное программирование на реальных примерах: подробное введение 2

Серые прямоугольники — это функции, которые трансформируют один поток в другой. Сначала мы собираем клики в списки. Если прошло 250 миллисекунд без единого нажатия кнопки — мы применяем функцию map() на каждом из списков, чтобы вычислить его длину. В конце мы фильтруем списки с длиной 1, используя функцию filter(x >= 2) . Вот так, в три действия, мы получаем результат — поток событий множественных кликов. Мы можем подписаться на него и использовать, как пожелаем.

Этот пример показывает всю простоту, с которой реализовывается достаточно сложная на первый взгляд задача, если мы используем реактивный подход.

Для чего нужно реактивное программирование

Реактивный подход повышает уровень абстракции вашего кода и вы можете сконцентрироваться на взаимосвязи событий, которые определяют бизнес-логику, вместо того, чтобы постоянно поддерживать код с большим количеством деталей реализации. Код в реактивном программировании, вероятно, будет короче.

Преимущество более заметно в современных веб- и мобильных приложениях, которые работают с большим количеством разнообразных UI-событий. 10 лет назад всё взаимодействие с веб-страницей сводилось к отправке больших форм на сервер и выполнении простого рендеринга в клиентской части. Сейчас приложения более сложны: изменение одного поля может повлечь за собой автоматическое сохранение данных на сервере, информация о новом “лайке” должна отправиться другим подключенным пользователям и т.д.

Реактивное программирование очень хорошо подходит для обработки большого количества разнообразных событий.

Начинаем думать в реактивном стиле

В последующих примерах используется JavaScript и RxJS, но Rx-библиотеки доступны для многих других языков и платформ (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy, и т.д.). На нашем сайте есть руководства по использованию библиотек RxSwift и ReactiveX в Python.

Реализуем виджет “На кого подписаться”

В Twitter есть такой виджет, который предлагает вам другие аккаунты, на которые вы можете подписаться:

Реактивное программирование на реальных примерах: подробное введение 3

Мы намерены реализовать его основную функциональность:

  • Загрузка из API и вывод трех аккаунтов;
  • По клику кнопки “Обновить” вывод других трех аккаунтов;
  • По клику кнопки “x” рядом с аккаунтом — удаление его из виджета и вывод другого аккаунта;
  • Отображение аватарки и ссылки на аккаунт в каждой из трех строк.

Вместо Twitter-аккаунтов, которые закрыты для неавторизованных пользователей, мы будем использовать Github API и брать аккаунты оттуда. Ссылку на Github API для получения списка пользователей вы можете найти в официальной документации. Также можете смотреть на готовый код данного примера.

Запрос и ответ

Как подойти к решению этой проблемы в Rx-стиле? Надо начать с того, что (почти) все, что угодно может быть потоком. Первое, что мы реализуем, будет “Загрузка из API и вывод трех аккаунтов”. Ничего необычного, нужно просто (1) сделать запрос, (2) получить ответ, (3) отобразить ответ. Представим запрос в качестве потока.

При инициализации мы должны сделать только один запрос, так что если мы смоделируем его, как поток данных, он будет выбрасывать только одно значение. В дальнейшем мы будем делать множество запросов, но на данный момент нам нужен только один.

--a------|-> Где а - это строка 'https://api.github.com/users' 

Когда происходит запрос, он сообщает нам две вещи: когда запрос должен быть выполнен — время генерации события, и куда мы делаем запрос — значение генерируемого событие, строка, содержащая URL.

Создать поток, содержащий одно значение, очень просто с библиотеками семейства Rx*:

var requestStream = Rx.Observable.just('https://api.github.com/users'); 

То, что мы написали — это просто поток, содержащий строку, который не делает ничего, так что мы должны как-то заставить его действовать так, как нам нужно. Это делается с помощью подписки на поток:

requestStream.subscribe(function(requestUrl) < // выполняем запрос jQuery.getJSON(requestUrl, function(responseData) < // . >); > 

Заметьте, что мы используем Ajax-коллбэк (callback) из библиотеки jQuery, чтобы управлять асинхронностью операции запроса. Если вы слабо понимаете, что такое callback’и, почитайте нашу статью об эволюции асинхронного программирования в JS. “Но подождите, Rx же работает с асинхронными потоками данных. Не может ли ответ на запрос быть потоком, содержащим данные, которые придут когда-нибудь позже?” — можете спросить вы. Что ж, на концептуальном уровне все верно, давайте попробуем это реализовать:

requestStream.subscribe(function(requestUrl) < // выполняем запрос var responseStream = Rx.Observable.create(function (observer) < jQuery.getJSON(requestUrl) .done(function(response) < observer.onNext(response); >) .fail(function(jqXHR, status, error) < observer.onError(error); >) .always(function() < observer.onCompleted(); >); >); responseStream.subscribe(function(response) < // делаем что-то с ответом >); > 

Rx.Observable.create() создает пользовательский поток данных, информируя каждого подписчика о событиях ( onNext() ) или ошибках ( onError() ). Мы обернули Ajax-промис в соответствующий коллбэк. Значит ли это, что Promise — то же самое, что Observable? Да, значит. Подробнее о Promis’ах читайте в нашей вводной статье.

Observable — это Promise++. В Rx вы можете конвертировать Promise в Observable очень простым образом:

var stream = Rx.Observable.fromPromise(promise); 

Единственное отличие между Promise и Observable в том, что Observable не совместим с Promises/A+. Promise — это, по сути, Observable с одним генерируемым значением. Потоки в Rx расширяют промисы, позволяя возвращать множество значений.

Возвращаясь к нашему примеру: вы можете заметить, что мы вызываем функцию subscribe() два раза — один внутри другого. Также создание responseStream зависит от requestStream . Как было сказано выше, в Rx есть простые механизмы, позволяющие трансформировать и создавать новые потоки из других, так что мы должны этим воспользоваться.

Одна из таких функций, с которой вы уже познакомились — map(f) — берет каждое значение из потока A, применяет на нем f() и производит значение для потока B. Если мы применим эту функцию на потоке запроса и ответа, мы можем преобразовать список URL’ов в промисы ответа.

var responseMetastream = requestStream .map(function(requestUrl) < return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); >); 

Затем мы должны создать поток потоков, называемый так же метапоток (metastream). Не пугайтесь, все достаточно просто. Метапоток — это такой поток, в котором каждое генерируемое им значение является потоком. Можно представить их себе как указатели: каждое генерируемое значение — указатель на новый поток. В нашем примере URL каждого запроса преобразуется в указатель на поток, содержащий промис ответа.

Реактивное программирование на реальных примерах: подробное введение 4

“Но зачем же нам обернутые в потоки ответы?” — можете спросить вы. В данном случае можно преобразовать метапоток в обычный поток ответов сервера, в котором каждое генерируемое значение является JSON-объектом, а не промисом, с помощью функции flatmap() . Но, тем не менее, метапотоки — обычное явление в реактивном программировании, и в частности, они используются для обработки асинхронных запросов.

var responseStream = requestStream .flatMap(function(requestUrl) < return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); >); 

Реактивное программирование на реальных примерах: подробное введение 5

Как и ожидалось, если у нас впоследствии будут какие-то события, генерируемые потоком запросов, поток ответов будет реагировать на них соответствующим образом:

requestStream: --a-----b--c------------|-> responseStream: -----A--------B-----C---|-> (нижним регистром обозначен запрос, верхним - ответ) 

И теперь, когда у нас есть поток ответов, мы можем отрендерить получаемые данные:

responseStream.subscribe(function(response) < // рендерим ответ в DOM >); 

Весь код целиком:

var requestStream = Rx.Observable.just('https://api.github.com/users'); var responseStream = requestStream .flatMap(function(requestUrl) < return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl)); >); responseStream.subscribe(function(response) < // рендерим ответ в DOM >); 
Кнопка обновления

Нужно отметить, что список пользователей, который мы получаем по API, состоит из 100 элементов. API позволяет нам задавать смещение списка, но не его размер, так что пока мы используем только 3 объекта, игнорируя остальные. Вы научитесь кэшировать ответ чуть позже.

Каждый раз, когда пользователь нажимает кнопку обновления, поток запросов должен сгенерировать URL, чтобы мы могли получить новые данные. Для этой задачи нам потребуется сделать две вещи: реализовать поток событий нажатия на кнопку, а также изменить поток запросов так, чтобы он реагировал на события потока нажатий. К счастью, RxJS позволяет нам преобразовать обычные JavaScript-события в Observable.

var refreshButton = document.querySelector('.refresh'); var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click'); 

Давайте поменяем поток запросов так, чтобы при нажатии кнопки обновления генерировался URL со случайным параметром смещения списка.

var requestStream = refreshClickStream .map(function() < var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; >); 

Похоже, мы что-то сломали. Теперь запрос срабатывает только после того, как мы нажали кнопку. Но по условию задачи мы должны делать запрос и при инициализации. Давайте попробуем починить наш код.

Для начала создадим разные потоки для вышеупомянутых условий:

var requestOnRefreshStream = refreshClickStream .map(function() < var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; >); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); 

Но как же теперь соединить события этих двух потоков в один? В этом нам поможет функция merge() . Вот визуальное представление того, что она делает:

stream A: ---a--------e-----o-----> stream B: -----B---C-----D--------> vvvvvvvvv merge vvvvvvvvv ---a-B---C--e--D--o-----> 

Ну, теперь все очень просто:

var requestOnRefreshStream = refreshClickStream .map(function() < var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; >); var startupRequestStream = Rx.Observable.just('https://api.github.com/users'); var requestStream = Rx.Observable.merge( requestOnRefreshStream, startupRequestStream ); 

Также есть альтернативный и более чистый способ реализовать задачу без вспомогательных переменных:

var requestStream = refreshClickStream .map(function() < var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; >) .merge(Rx.Observable.just('https://api.github.com/users')); 

А можно и еще короче!

var requestStream = refreshClickStream .map(function() < var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; >) .startWith('https://api.github.com/users'); 

Функция startWith() делает как раз то, что нам нужно. Не важно, как вы реализовали поток: если вы вызвали функцию startWith(x) , x будет начальным значением.

Вы заметили, что у нас дублируется URL? Давайте избавимся от дубликата, передвинув startWith() поближе к refreshClickStream , чтобы эмулировать нажатие кнопки при инициализации приложения:

var requestStream = refreshClickStream.startWith('startup click') .map(function() < var randomOffset = Math.floor(Math.random()*500); return 'https://api.github.com/users?since=' + randomOffset; >); 
Моделируем 3 рекомендации с помощью потоков

Теперь, вместе с кнопкой обновления, у нас появилась проблема: при нажатии этой кнопки текущие 3 рекомендации не исчезают. Новые предложения появляются, как только с сервера пришел ответ, но для того, чтобы наш UI выглядел отзывчивым, мы должны очищать текущие предложения сразу же.

refreshClickStream.subscribe(function() < // очищаем текущие рекомендации >); 

Теперь у нас есть два подписчика, влияющих на DOM-элементы (другой подписывается на responseStream ) и это соответствует принципу “Разделяй и властвуй“. Вы еще помните мантру реактивного подхода? Напоминаем:

Реактивное программирование на реальных примерах: подробное введение 6

Давайте сделаем так, чтобы рекомендации были потоками, в которых каждое генерируемое значение — это JSON-объект, содержащий данные рекомендации. Мы сделаем по потоку на каждую из трех рекомендаций. Вот так выглядит поток для рекомендации №1:

var suggestion1Stream = responseStream .map(function(listUsers) < // берем случайную из списка return listUsers[Math.floor(Math.random()*listUsers.length)]; >); 

Скопипастим этот код для потока №2 ( suggestion2Stream ) и №3 ( suggestion3Stream ). Подумайте над тем, как можно избежать дублирования кода в данном примере. Это будет отличным упражнением. А еще и очень хорошим способом избежать эффекта последней строки.

На данный момент этот блок не поддерживается, но мы не забыли о нём! Наша команда уже занята его разработкой, он будет доступен в ближайшее время.

Реактивное программирование на Java: как, зачем и стоит ли? Часть II

Реактивное программирование — один из самых актуальных трендов современности. Обучение ему — сложный процесс, особенно если нет подходящих материалов. В качестве своеобразного дайджеста может выступить эта статья. На конференции РИТ++ 2020 эксперт и тренер Luxoft Training Владимир Сонькин рассказал о фишках управления асинхронными потоками данных и подходах к ним, а также показал на примерах, в каких ситуациях нужна реактивность, и что она может дать.

В первой части статьи рассказывалось о том, что привело к появлению реактивного программирования, где оно применяется, и что нам может дать асинхронность. Пришло время рассказать о следующем шаге, позволяющем получить максимум преимуществ от асинхронности, и это — реактивное программирование.

Reactivity

Реактивное программирование — это асинхронность, соединенная с потоковой обработкой данных. То есть если в асинхронной обработке нет блокировок потоков, но данные обрабатываются все равно порциями, то реактивность добавляет возможность обрабатывать данные потоком. Помните тот пример, когда начальник поручает задачу Васе, тот должен передать результат Диме, а Дима вернуть начальнику? Но у нас задача — это некая порция, и пока она не будет сделана, дальше передать ее нельзя. Такой подход действительно разгружает начальника, но Дима и Вася периодически простаивают, ведь Диме надо дождаться результатов работы Васи, а Васе — дождаться нового задания.

Пример
А теперь представьте, что задачу разбили на множество подзадач. И теперь они плывут непрерывным потоком:
Пример 2
Говорят, когда Генри Форд придумал свой конвейер, он повысил производительность труда в четыре раза, благодаря чему ему удалось сделать автомобили доступными. Здесь мы видим то же самое: у нас небольшие порции данных, а конвейер с потоком данных, и каждый обработчик пропускает через себя эти данные, каким-то образом их преобразовывая. В качестве Васи и Димы у нас выступают потоки выполнения (threads), обеспечивая, таким образом, многопоточную обработку данных.

Схема технологии распараллеливания

На этой схеме показаны разные технологии распараллеливания, добавлявшиеся в Java в разных версиях. Как мы видим, спецификация Reactive Streams на вершине — она не заменяет всего, что было до нее, но добавляет самый высокий уровень абстракции, а значит ее использование просто и эффективно. Попробуем в этом разобраться.

Идея реактивности построена на паттерне проектирования Observer.

Observer

Давайте вспомним, что это за паттерн. У нас есть подписчики и то, на что мы подписываемся. В качестве примера здесь рассмотрен Твиттер, но подписаться на какое-то сообщество или человека, а потом получать обновления можно в любой соцсети. После подписки, как только появляется новое сообщение, всем подписчикам приходит notify, то есть уведомление. Это базовый паттерн.

В данной схеме есть:

  • Publisher — тот, кто публикует новые сообщения;
  • Observer — тот, кто на них подписан. В реактивных потоках подписчик обычно называется Subscriber. Термины разные, но по сути это одно и то же. В большинстве сообществ более привычны термины Publisher/Subscriber.

Это базовая идея, на которой все строится.

Один из жизненных примеров реактивности — система оповещения при пожаре. Допустим, нам надо сделать систему, включающую тревогу в случае превышения задымленности и температуры.

Пример реактивности

У нас есть датчик дыма и градусник. Когда дыма становится много и/или температура растет, на соответствующих датчиках увеличивается значение. Когда значение и температура на датчике дыма оказываются выше пороговых, включается колокольчик и оповещает о тревоге.

Если бы у нас был традиционный, а не реактивный подход, мы бы писали код, который каждые пять минут опрашивает детектор дыма и датчик температуры, и включает или выключает колокольчик. Однако в реактивном подходе за нас это делает реактивный фреймворк, а мы только прописываем условия: колокольчик активен, когда детектор больше X, а температура больше Y. Это происходит каждый раз, когда приходит новое событие.

От детектора дыма идет поток данных: например, значение 10, потом 12, и т.д. Температура тоже меняется, это другой поток данных — 20, 25, 15. Каждый раз, когда появляется новое значение, результат пересчитывается, что приводит к включению или выключению системы оповещения. Нам достаточно сформулировать условие, при котором колокольчик должен включиться.

Если вернуться к паттерну Observer, у нас детектор дыма и термометр — это публикаторы сообщений, то есть источники данных (Publisher), а колокольчик на них подписан, то есть он Subscriber, или наблюдатель (Observer).

Пример реактивности

Немного разобравшись с идеей реактивности, давайте углубимся в реактивный подход. Мы поговорим об операторах реактивного программирования. Операторы позволяют каким-либо образом трансформировать потоки данных, меняя данные и создавая новые потоки. Для примера рассмотрим оператор distinctUntilChanged. Он убирает одинаковые значения, идущие друг за другом. Действительно, если значение на детекторе дыма не изменилось — зачем нам на него реагировать и что-то там пересчитывать:

Оператор distinctUntilChanged

Reactive approach

Рассмотрим еще один пример: допустим, мы разрабатываем UI, и нам нужно отслеживать двойные нажатия мышкой. Тройной клик будем считать как двойной.

Пример реактивного подхода

Клики здесь — это поток щелчков мышкой (на схеме 1, 2, 1, 3). Нам нужно их сгруппировать. Для этого мы используем оператор throttle. Говорим, что если два события (два клика) произошли в течение 250 мс, их нужно сгруппировать. На второй схеме представлены сгруппированные значения (1, 2, 1, 3). Это поток данных, но уже обработанных — в данном случае сгрупированных.

Таким образом начальный поток преобразовался в другой. Дальше нужно получить длину списка ( 1, 2, 1, 3). Фильтруем, оставляя только те значения, которые больше или равны 2. На нижней схеме осталось только два элемента (2, 3) — это и были двойные клики. Таким образом, мы преобразовали начальный поток в поток двойных кликов.

Это и есть реактивное программирование: есть потоки на входе, каким-то образом мы пропускаем их через обработчики, и получаем поток на выходе. При этом вся обработка происходит асинхронно, то есть никто никого не ждет.

Еще одна хорошая метафора — это система водопровода: есть трубы, одна подключена к другой, есть какие-то вентили, может быть, стоят очистители, нагреватели или охладители (это операторы), трубы разделяются или объединяются. Система работает, вода льется. Так и в реактивном программировании, только в водопроводе течет вода, а у нас — данные.

Можно придумать потоковое приготовление супа. Например, есть задача максимально эффективно сварить много супа. Обычно берется кастрюля, в нее наливается порция воды, овощи нарезаются и т.д. Это не потоковый, а традиционный подход, когда мы варим суп порциями. Сварили эту кастрюлю, потом нужно ставить следующую, а после — еще одну. Соответственно, надо дождаться, пока в новой кастрюле снова закипит вода, растворится соль, специи и т.д. Все это занимает время.

Представьте себе такой вариант: в трубе нужного диаметра (достаточного, чтобы заполнялась кастрюля) вода сразу подогревается до нужной температуры, есть нарезанная свекла и другие овощи. На вход они поступают целыми, а выходят уже шинкованными. В какой-то момент все смешивается, вода подсаливается и т.д. Это максимально эффективное приготовление, супоконвейер. И именно в этом идея реактивного подхода.

Observable example

Теперь посмотрим на код, в котором мы публикуем события:

Пример Observable

Observable.just позволяет положить в поток несколько значений, причем если обычные реактивные потоки содержат значения, растянутые во времени, то тут мы их кладем все сразу — то есть синхронно. В данном случае это названия городов, на которые в дальнейшем можно подписаться (тут для примера взяты города, в которых есть учебный центр Люксофт).

Девушка (Publisher) опубликовала эти значения, а Observers на них подписываются и печатают значения из потока.

Это похоже на потоки данных (Stream) в Java 8. И тут, и там синхронные потоки. И здесь, и в Java 8 список значений нам известен сразу. Но если бы использовался обычный для Java 8 поток, мы не могли бы туда что-то докладывать. В стрим ничего нельзя добавить: он синхронный. В нашем примере потоки асинхронные, то есть в любой момент времени в них могут появляться новые события — скажем, если через год откроется учебный центр в новой локации — она может добавиться в поток, и реактивные операторы правильно обработают эту ситуацию. Мы добавили события и сразу же на них подписались:

Мы можем в любой момент добавить значение, которое через какое-то время выводится. Когда появляется новое значение, мы просим его напечатать, и на выходе получаем список значений:

Список значений

При этом есть возможность не только указать, что должно происходить, когда появляются новые значения, но и дополнительно отработать такие сценарии, как возникновение ошибок в потоке данных или завершение потока данных. Да-да, хотя часто потоки данных не завершаются (например, показания термометра или датчика дыма), многие потоки могут завершаться: например, поток данных с сервера или с другого микросервиса. В какой-то момент сервер закрывает соединение, и появляется потребность на это как-то отреагировать.

Implementing and subscribing to an observer

В Java 9 нет реализации реактивных потоков — только спецификация. Но есть несколько библиотек — реализаций реактивного подхода. В этом примере используется библиотека RxJava. Мы подписываемся на поток данных, и определяем несколько обработчиков, то есть методы, которые будут запущены в начале обработки потока (onSubscribe), при получении каждого очередного сообщения (onNext), при возникновении ошибки (onError) и при завершении потока (onComplete):

Библиотека RxJava

Давайте посмотрим на последнюю строчку.

locations.map(String::length).filter(l -> l >= 5).subscribe(observer);

Мы используем операторы map и filter. Если вы работали со стримами Java 8, вам, конечно, знакомы map и filter. Здесь они работают точно так же. Разница в том, что в реактивном программировании эти значения могут появляться постепенно. Каждый раз, когда приходит новое значение, оно проходит через все преобразования. Так, String::length заменит строчки на длину в каждой из строк.

В данном случае получится 5 (Minsk), 6 (Krakow), 6 (Moscow), 4 (Kiev), 5 (Sofia). Фильтруем, оставляя только те, что больше 5. У нас получится список длин строк, которые больше 5 (Киев отсеется). Подписываемся на итоговый поток, после этого вызывается Observer и реагирует на значения в этом итоговом потоке. При каждом следующем значении он будет выводить длину:

public void onNext(Integer value) System.out.println(«Length: » + value);

То есть сначала появится Length 5, потом — Length 6. Когда наш поток завершится, будет вызван onComplete, а в конце появится надпись «Done.»:

public void onComplete() System.out.println(«Done.»);

Не все потоки могут завершаться. Но некоторые способны на это. Например, если мы читали что-то из файла, поток завершится, когда файл закончится.

Если где-то произойдет ошибка, мы можем на нее отреагировать:

public void onError(Throwable e) e.printStackTrace();

Таким образом мы можем реагировать разными способами на разные события: на следующее значение, на завершение потока и на ошибочную ситуацию.

Reactive Streams spec

Реактивные потоки вошли в Java 9 как спецификация.

Если предыдущие технологии (Completable Future, Fork/Join framework) получили свою имплементацию в JDK, то реактивные потоки имплементации не имеют. Есть только очень короткая спецификация. Там всего 4 интерфейса:

Reactive Streams spec

Если рассматривать наш пример из картинки про Твиттер, мы можем сказать, что:

Publisher — девушка, которая постит твиты;

Subscriber — подписчик. Он определяет , что делать, если:

  • Начали слушать поток (onSubscribe). Когда мы успешно подписались, вызовется эта функция;
  • Появилось очередное значение в потоке (onNext);
  • Появилось ошибочное значение (onError);
  • Поток завершился (onComplete).

Subscription — у нас есть подписка, которую можно отменить (cancel) или запросить определенное количество значений (request(long n)). Мы можем определить поведение при каждом следующем значении, а можем забирать значения вручную.

Processor — обработчик — это два в одном: он одновременно и Subscriber, и Publisher. Он принимает какие-то значения и куда-то их кладет.

Если мы хотим на что-то подписаться, вызываем Subscribe, подписываемся, и потом каждый раз будем получать обновления. Можно запросить их вручную с помощью request. А можно определить поведение при приходе нового сообщения (onNext): что делать, если появилось новое сообщение, что делать, если пришла ошибка и что делать, если Publisher завершил поток. Мы можем определить эти callbacks, или отписаться (cancel).

PUSH / PULL модели

Существует две модели потоков:

  • Push-модель — когда идет «проталкивание» значений.

Например, вы подписались на кого-то в Telegram или Instagram и получаете оповещения (они так и называются — push-сообщения, вы их не запрашиваете, они приходят сами). Это может быть, например, всплывающее сообщение. Можно определить, как реагировать на каждое новое сообщение.

  • Pull-модель — когда мы сами делаем запрос.

Например, мы не хотим подписываться, т.к. информации и так слишком много, а хотим сами заходить на сайт и узнавать новости.

Для Push-модели мы определяем callbacks, то есть функции, которые будут вызваны, когда придет очередное сообщение, а для Pull-модели можно воспользоваться методом request, когда мы захотим узнать, что новенького.

Pull-модель очень важна для Backpressure — «напирания» сзади. Что же это такое?

Вы можете быть просто заспамленными своими подписками. В этом случае прочитать их все нереально, и есть шанс потерять действительно важные данные — они просто утонут в этом потоке сообщений. Когда подписчик из-за большого потока информации не справляется со всем, что публикует Publisher, получается Backpressure.

В этом случае можно использовать Pull-модель и делать request по одному сообщению, прежде всего из тех потоков данных, которые наиболее важны для вас.

Implementations

Давайте рассмотрим существующие реализации реактивных потоков:

  • RxJava. Эта библиотека реализована для разных языков. Помимо RxJava существует Rx для C#, JS, Kotlin, Scala и т.д.
  • Reactor Core. Был создан под эгидой Spring, и вошел в Spring 5.
  • Akka-стримы от создателя Scala Мартина Одерски. Они создали фреймворк Akka (подход с Actor), а Akka-стримы — это реализация реактивных потоков, которые дружат с этим фреймворком.

Во многом эти реализации похожи, и все они реализуют спецификацию реактивных потоков из Java 9.

Посмотрим подробнее на Spring’овский Reactor.

Function may return…

Давайте обобщим, что может возвращать функция:

Что может возвращать функция

  • Single/Synchronous;

Обычная функция возвращает одно значение, и делает это синхронно.

  • Multipple/Synchronous;

Если мы используем Java 8, можем возвращать из функции поток данных Stream. Когда вернулось много значений, их можно отправлять на обработку. Но мы не можем отправить на обработку данные до того, как все они получены — ведь Stream работают только синхронно.

  • Single/Asynchronous;

Здесь уже используется асинхронный подход, но функция возвращает только одно значение:

  • либо CompletableFuture (Java), и через какое-то время приходит асинхронный ответ;
  • либо Mono, возвращающая одно значение в библиотеке Spring Reactor.
  • Multiple/Asynchronous.

А вот тут как раз — реактивные потоки. Они асинхронные, то есть возвращают значение не сразу, а через какое-то время. И именно в этом варианте можно получить поток значений, причем эти значения будут растянуты во времени Таким образом, мы комбинируем преимущества потоков Stream, позволяющих вернуть цепочку значений, и асинхронности, позволяющей отложить возврат значения.

Например, вы читаете файл, а он меняется. В случае Single/Asynchronous вы через какое-то время получаете целиком весь файл. В случае Multiple/Asynchronous вы получаете поток данных из файла, который сразу же можно начинать обрабатывать. То есть можно одновременно читать данные, обрабатывать их, и, возможно, куда-то записывать. . Реактивные асинхронные потоки называются:

  • Publisher (в спецификации Java 9);
  • Observable (в RxJava);
  • Flux (в Spring Reactor).

Netty as a non-blocking server

Рассмотрим пример использования реактивных потоков Flux вместе со Spring Reactor. В основе Reactor лежит сервер Netty. Spring Reactor — это основа технологии, которую мы будем использовать. А сама технология называется WebFlux. Чтобы WebFlux работал, нужен асинхронный неблокирующий сервер.

Схема работы сервера Netty

Схема работы сервера Netty похожа на то, как работает Node.js. Есть Selector — входной поток, который принимает запросы от клиентов и отправляет их на выполнение в освободившиеся потоки. Если в качестве синхронного сервера (Servlet-контейнера) используется Tomcat, то в качестве асинхронного используется Netty.

Давайте посмотрим, сколько вычислительных ресурсов расходуют Netty и Tomcat на выполнение одного запроса:

CPU

Throughput — это общее количество обработанных данных. При небольшой нагрузке, до первых 300 пользователей у RxNetty и Tomcat оно одинаковое, а после Netty уходит в приличный отрыв — почти в 2 фраза.

Throughput

Blocking vs Reactive

У нас есть два стека обработки запросов:

  • Традиционный блокирующий стек.
  • Неблокирующий стек — в нем все происходит асинхронно и реактивно.

Два стека обработки запросов

В блокирующем стеке все строится на Servlet API, в реактивном неблокирующем стеке — на Netty.

Сравним реактивный стек и стек Servlet.

В Reactive Stack применяется технология Spring WebFlux. Например, вместо Servlet API используются реактивные стримы.

Reactive Stack

Чтобы мы получили ощутимое преимущество в производительности, весь стек должен быть реактивным. Поэтому чтение данных тоже должно происходить из реактивного источника.

Например, если у нас используется стандартный JDBC, он является не реактивным блокирующим источником, потому что JDBC не поддерживает неблокирующий ввод/вывод. Когда мы отправляем запрос в базу данных, приходится ждать, пока результат этого запроса придет. Соответственно, получить преимущество не удается.

В Reactive Stack мы получаем преимущество за счет реактивности. Netty работает с пользователем, Reactive Streams Adapters — со Spring WebFlux, а в конце находится реактивная база: то есть весь стек получается реактивным. Давайте посмотрим на него на схеме:

Схема реактивного стека

Data Repo — репозиторий, где хранятся данные. В случае, если есть запросы, допустим, от клиента или внешнего сервера, они через Flux поступают в контроллер, обрабатываются, добавляются в репозиторий, а потом ответ идет в обратную сторону.

При этом все это делается неблокирующим способом: мы можем использовать либо Push-подход, когда мы определяем, что делать при каждой следующей операции, либо Pull-подход, если есть вероятность Backpressure, и мы хотим сами контролировать скорость обработки данных, а не получать все данные разом.

Операторы

В реактивных потоках огромное количество операторов. Многие из них похожи на те, которые есть в обычных стримах Java. Мы рассмотрим только несколько самых распространенных операторов, которые понадобятся нам для практического примера применения реактивности.

Filter operator

Скорее всего, вы уже знакомы с фильтрами из интерфейса Stream.

Filter operator

По синтаксису этот фильтр точно такой же, как обычный. Но если в стриме Java 8 все данные есть сразу, здесь они могут появляться постепенно. Стрелки вправо — это временная шкала, а в кружочках находятся появляющиеся данные. Мы видим, что фильтр оставляет в итоговом потоке только значения, превышающие 10.

Take 2

Take 2 означает, что нужно взять только первые два значения.

Map operator

Оператор Map тоже хорошо знаком:

Map operator

Это действие, происходящее с каждым значением. Здесь — умножить на десять: было 3, стало 30; было 2, стало 20 и т.д.

Delay operator

Delay operator

Задержка: все операции сдвигаются. Этот оператор может понадобиться, когда значения уже генерируются, но подготовительные процессы еще происходят, поэтому приходится отложить обработку данных из потока.

Reduce operator

Еще один всем известный оператор:

Reduce operator

Он дожидается конца работы потока (onComplete) — на схеме она представлена вертикальной чертой. После чего мы получаем результат — здесь это число 15. Оператор reduce сложил все значения, которые были в потоке.

Scan operator

Этот оператор отличается от предыдущего тем, что не дожидается конца работы потока.

Scan operator

Оператор scan рассчитывает текущее значение нарастающим итогом: сначала был 1, потом прибавил к предыдущему значению 2, стало 3, потом прибавил 3, стало 6, еще 4, стало 10 и т.д. На выходе получили 15. Дальше мы видим вертикальную черту — onComplete. Но, может быть, его никогда не произойдет: некоторые потоки не завершаются. Например, у термометра или датчика дыма нет завершения, но scan поможет рассчитать текущее суммарное значение, а при некоторой комбинации операторов — текущее среднее значение всех данных в потоке.

Merge operator

Объединяет значения двух потоков.

Merge operator

Например, есть два температурных датчика в разных местах, а нам нужно обрабатывать их единообразно, в общем потоке .

Combine latest

Получив новое значение, комбинирует его с последним значением из предыдущего потока.

Combine latest

Если в потоке возникает новое событие, мы его комбинируем с последним полученным значением из другого потока. Скажем, таким образом мы можем комбинировать значения от датчика дыма и термометра: при появлении нового значения температуры в потоке temperatureStream оно будет комбинироваться с последним полученным значением задымленности из smokeStream. И мы будем получать пару значений. А уже по этой паре можно выполнить итоговый расчет:

temperatureStream.combineLatest(smokeStream).map((x, y) -> x > X && y > Y)

В итоге на выходе у нас получается поток значений true или false — включить или выключить колокольчик. Он будет пересчитываться каждый раз, когда будет появляться новое значение в temperatureStream или в smokeStream.

FlatMap operator

Этот оператор вам, скорее всего, знаком по стримам Java 8. Элементами потока в данном случае являются другие потоки. Получается поток потоков. Работать с ними неудобно, и в этих случаях нам может понадобиться «уплостить» поток.

FlatMap operator

Можно представить такой поток как конвейер, на который ставят коробки с запчастями. До того, как мы начнем их применять, запчасти нужно достать из коробок. Именно это делает оператор flatMap.

Flatmap часто используется при обработке потока данных, полученных с сервера. Т.к. сервер возвращает поток, чтобы мы смогли обрабатывать отдельные данные, этот поток сначала надо «развернуть». Это и делает flatMap.

Buffer operator

Buffer operator

Это оператор, который помогает группировать данные. На выходе Buffer получается поток, элементами которого являются списки (List в Java). Он может пригодиться, когда мы хотим отправлять данные не по одному, а порциями.

Мы с самого начала говорили, что реактивные потоки позволяют разбить задачу на подзадачи, и обрабатывать их маленькими порциями. Но иногда лучше наоборот, собрать много маленьких частей в блоки. Скажем, продолжая пример с конвейером и запчастями, нам может понадобиться отправлять запчасти на другой завод (другой сервер). Но каждую отдельную запчасть отправлять неэффективно. Лучше их собрать в коробки, скажем по 100 штук, и отправлять более крупными партиями.

На схеме выше мы группируем отдельные значения по три элемента (так как всего их было пять, получилась «коробка» из трех, а потом из двух значений). То есть если flatMap распаковывает данные из коробок, buffer, наоборот, упаковывает их.

Всего существует более сотни операторов реактивного программирования. Здесь разобрана только небольшая часть.

Итого

Есть два подхода:

Что объединяет два подхода

  • Spring MVC — традиционная модель, в которой используется JDBC, императивная логика и т.д.
  • Spring WebFlux, в котором используется реактивный подход и сервер Netty.

Есть кое-что, что их объединяет. Tomcat, Jetty, Undertow могут работать и со Spring MVC, и со Spring WebFlux. Однако дефолтным сервером в Spring для работы с реактивным подходом является именно Netty.

Заинтересовались темой?

Новый практический online-курс Java Advanced: функциональное, асинхронное и реактивное программирование по изучению современных функциональных, асинхронных и реактивных подходов к разработке на Java. Включает изучение NIO2, CompletableFurure, RxJava, Reactor, R2DBC, SSE, Spring Data reactive, WebClient, reactive WebSocket, RSocket.

Расскажи друзьям:
Как не пропустить самое интересное?
Подписывайтесь на наш ежемесячный дайджест!

Оценка и обучение ИТ-специалистов по ключевым направлениям разработки программного обеспечения. Курсы от экспертов-практиков по языкам программирования, системному и бизнес-анализу, архитектуре ПО, ручному и автоматизированному тестированию ПО, Big Data и машинному обучению, управлению проектами и Agile. Действует скидка 10% на обучение физических лиц.

Остались вопросы?
IBS Training Center Контакты: +7 (495) 609-6967 education@ibs.ru Адрес:
127018 , Москва , ул. Складочная, д. 3, стр. 1
© 2024 IBS, all rights reserved
Пользователь только что записался на курс » »

Сайт IBS Training Center использует cookie. Это дает нам возможность следить за корректной работой сайта, а также анализировать данные, чтобы развивать наши продукты и сервисы. Посещая сайт, вы соглашаетесь с обработкой ваших персональных данных. В случае несогласия вам следует покинуть его

Архитектура и реактивное программирование

reactivity as a pattern for low models coupling

Что такое реактивное программирование? Не Rx. И даже не Excel. Это архитектурный паттерн, позволяющий абсолютно иначе писать код. В статье мы устаканим фундаментальные знания, утвердимся в том, что React.js всё же является реактивным, и подумаем о том, как и когда нужно, а когда не нужно применять паттерны реактивного программирования.

Так уж вышло, что я побывал в большом количестве огромных кодовых баз, где сталкивался с одними и теми же проблемами организации кода. Информация ниже — это результат исследований программирования в общем и реактивного программирования в частности за последние пять лет. Я уже несколько лет пишу свой менеджер состояния Reatom, и это не просто пет-проект, а серьёзный продукт. Я старался сделать его проще для входа и использования, но оставил возможность расти до энтерпрайза и решать соответствующие проблемы. В статье будет не теория из пустых рассуждений, а опыт решения реальных задач.

Вы уже могли видеть связанную статью «Что такое состояние», где я подробно разбираю этот вопрос. А сегодня поговорим о второй стороне управления состоянием и потоками данных.

Проблемы, которые реактивное программирование поможет вам решить:

  • связанность кода и его разбиение на модули;
  • ленивая подгрузка модулей;
  • автоматическая инвалидация кеша.

▍ Определение

Реактивное программирование — парадигма программирования, предполагающая передачу ответственности за инициализацию обработки информации источнику информации.

Это определение — описание того общего, что есть у Rx и MobX — таких разных, но безусловно реактивных библиотек, признанных всей индустрией. Да, они полностью отличаются внешним API, но подкапотные механизмы одни и те же — динамический список подписчиков и какое-то условие (триггер) его обхода. Это очень простая механика, и она встречается постоянно. В этом нет ничего сакрального, многие стандартные API платформы и библиотеки её реализуют. Иногда доступ к подписчикам явный — метод subscribe или effect . Иногда он скрыт — JSX тег разворачивается в React.createElement , который ведёт к подписке на внутренний стейт.

Да, React.js также использует паттерны реактивного программирования, это легко проверить. Есть ли у нас контроль над выполнением функции рендера? Нет, React определяет это. Вы могли бы попросить запланировать обновление, но оно будет запущено, когда React примет решение об этом. Он несёт ответственность за запуск вычислений.

В документации React как-то мелькало утверждение о том, что React не ФРПшный и это так. Он не использует специфичные конструкции функционально-реактивного программирования, но немного использует реактивное программирование в общем.

Паттерны реактивного программирования предполагают разворачивание направления связей в нашем коде, позволяя одним модулям зависеть от других, не меняя их код. Иначе говоря, реактивное программирование позволяет уменьшить связанность кода.

Хорошей практикой при разработке приложений является разделение их на независимые виджеты / модули / компоненты / фичи — все называют их по-разному. Но важно понимать, что эти модули не существуют сами по себе, они имеют какой-то общий смысл, и цельное приложение получается из их связанности. Сложный вопрос — как описывать эти связи и где их хранить.

▍ Пример

Здесь и ниже мы будем находиться в контексте JavaScript, но все идеи относятся к любому ЯП.

У нас есть аватарка пользователя в шапке и на странице профиля, при её обновлении с этой страницы нужно подставить новый урл в двух местах. Классический подход: в handlePictureUpdate мы пишем такой код: document.querySelector(‘.profile .ava’).src = newSrc; document.querySelector(‘.header .ava’).src = newSrc . Важно тут то, что функция handlePictureUpdate находится в коде модуля профайла, но почему-то ходит в модуль шапки — это и есть связанность. Такой кодстайл имеет свойство расти по своей сложности, его чтение может давать не те результаты, на которые рассчитываешь, — код шапки не содержит информации о связи с профайлом. Всё это ведёт к неочевидным багам — мы обновили шапку, поменяв её класс на .app-header , и querySelector в handlePictureUpdate теперь будет падать с ошибкой TypeError: Cannot set properties of undefined (setting ‘src’) . Причём ошибку после такого изменения скорее всего не выявили бы, потому что она в другом модуле. А в интеграционных тестах профайла не было бы проверки того, что происходит в шапке — классика.

Кто-то скажет, что дело в отсутствии БЭМа и предсказуемых селекторов. Кто-то возмутится неиспользованием общей константы с названием селектора шапки. Кто-то укажет на отсутствие проверки на undefined после querySelector — TypeScript бы подсказал! Такой маленький пример и уже так много проблем. Но это всё вопросы прикладного кода, которые в разных ситуациях будут разными. Возможно ли решить проблему подобного характера с архитектурной точки зрения, принципиально избавившись от необходимости перепроверять один модуль при рефакторинге другого?

▍ DDD и SSoT

Принцип Single Source of Truth (SSoT) означает, что в системе существует единственный актуальный и согласованный источник данных определённого типа, а все связанные модели работают с данными и процессами этого типа только через этот источник. Предметно-ориентированное проектирование (domain-driven design, DDD) говорит нам о том, что эти типы данных должны исходить от бизнес-сущностей — доменов: профиль пользователя, сформированный заказ товара. Модель — реализация домена в коде, страница пользователя и форма просмотра и создания, редактирования заказа.

Оперируя бизнес-доменами и не привязываясь к интерфейсу пользователя, нам становится намного проще определять, что к чему относится. Если отвязать профиль от страницы, а заказ от формы — выделить отдельную модель под домен — то станет намного проще принимать решение о том, где хранить и обрабатывать путь к аватарке пользователя.

Пример-предыстория о всё том же адресе аватарки. Когда приложение только начинали делать, выделенной страницы пользователя не было, были только основные формы для реализации бизнеса: список товаров, оформление заказа. Со временем решили повысить UX пользователя, сделать интерфейс дружественным и домашним и решили отображать на нём всегда что-то очень знакомое для пользователя — его аватарку. При клике по квадратной заглушке в правой части шапки можно было выбрать картинку и загрузить её. Логика этой загрузки лежала, соответственно, в шапке. Ещё позже решили сделать страницу редактирования профиля, чтобы адрес можно было заранее сохранить и редактировать, ну и картинку покрупнее посмотреть, заменить и покропить. У программиста встала дилемма — двигать код загрузки из файла с шапкой в файл профайла или из профайла импортировать код из шапки, что странно. В итоге код был передвинут, но это там так и осталось: document.querySelector(‘.header .ava’).src = newSrc .

Теперь у нас в коде страницы профайла есть какое-то знание о коде шапки — не хорошо. В начале статьи разобрали, где это может сломаться. Но даже если мы попытаемся применить DDD и выделим код профайла отдельно от страницы профайла, завязка на интерфейс у нас всё равно останется: document.querySelector(‘.profile .ava’).src = newSrc; document.querySelector(‘.header .ava’).src = newSrc , просто будет лежать в другой папочке. Таким образом, отделяя домен бизнесовый, мы раздробили домен системный — код страницы профайла и код шапки. Шило на мыло.

director by robert b weide

▍ Проектирование с реактивным программированием

Реактивное программирование прекрасно решает подобные проблемы за счёт простого трюка — связи между модулей переносятся из кода в рантайм. В коде мы лишь описываем, что хотели бы получить, но не описываем как. Это очень важно. Когда мы говорим о высокой связанности как плохом запахе кода, мы имеем в виду переплетение в самом коде, в тексте файлов проекта. Написание кода, чтение и дебаг происходят в подавляющем большинстве при работе с текстом программы, с кодом в самом материальном смысле этого слова. Реактивные паттерны позволяют убрать код, но оставить логическую связь, образовав её в рантайме. Как мы знаем, лучший код тот, что не написан.

Вспоминая пример с обновлением аватарки в шапке, код выглядел бы так для шапки: profile.$picture.subscribe(src => < this.ava.src = src >) , в то время как в модели профайла нужно было бы просто экспортировать синглтон profile с picture , обёрнутым в контейнер Observable — $picture . Теперь код профиля ничего не знает о шапке, но она связана с ним наглядно и типобезопасно. Мы легко можем отследить эти связи при необходимости — Find all references в IDE, но сам код профайла остался максимально чистым и ёмким. По сути ничего не меняется, но код становится обслуживать легче. Повторю КДПВ.

reactivity as a pattern for low models coupling

▍ Ленивость

Тема небольшая и понятная, но проговорить её стоит. Проектируя модели с реактивными публичными интерфейсами, мы достигаем такого сильного уровня изоляции, что другие не завис имые , а зави сящие модели могут легко подключаться и отключаться в любой момент времени, позволяя включать и настраивать dynamic imports максимально легко.

▍ Инвалидация кеша

Ещё одна тема, по которой я пройдусь лишь вскользь, но не потому, что она простая, а потому, что слишком большая и требует отдельной статьи или серии статей. Задача этой статьи — сделать архитектурный обзор, на этом и сфокусируемся.

Мы знаем, что связи в нашей системе никуда не делись, но перенеслись из кода в рантайм, и рантаймом этим управляет какой-то утилитарный код. Обычно это интерфейс, который скрывает за собой список подписчиков и логику их обхода при поступлении новой информации, но как ещё это можно использовать? Я долго копаюсь в этой теме и с уверенностью могу сказать, что сложная реактивная система походит на упрощённую реализацию виртуальной машины. Тут и автоматическая очистка мусора (garbage collection), и инлайн кеши (мемоизация), и виртуальная адресация (скоупы / контексты), и ещё небольшая пачка разнообразных фич для метапрограммирования(?). Одна из таких фич лежит на поверхности — зная все места хранения данных и связи между ними, можно легко отслеживать инвалидацию данных и их связей. Главная прелесть такой оптимизации заключается в её автоматическом применении, что делает вопрос производительности всего приложения более предсказуемым.

Тема эта очень большая, на Хабре уже были хорошие статьи. Я лишь могу привести свою последнюю версию обзора возможных алгоритмов инвалидации и реакций. Вникать в это не обязательно.

image

▍ Что такое хорошо, что такое плохо

Интереснее всего поговорить о том, какую сложность и какие проблемы привносит реактивное программирование. Две первых лежат на поверхности: производительность и дебаг.

Мы привносим больше работы в рантайм — конструируя ещё один уровень абстракции, мы забираем память на его обслуживание (в большей степени) и в некоторых случаях вычислительную производительность (в меньшей степени). К счастью, используя современные техники автоматического кеширования (и автоматической инвалидации), мы можем отыграть потраченные ресурсы на том, что уменьшим количество вычислений в доменном коде.

Большей проблемой является дополнительная ментальная нагрузка и сложность дебага дополнительных структур в рантайме. Данные уже нельзя просто увидеть в переменной, гуляя по коду с отладчиком — нужно запросить их из реактивного контейнера, написав дополнительный код в консоли, в лучшем случае раскрыть свойство-геттер, что может иметь неприятные сайд-эффекты. Дополнительные девтулзы могут помочь в этом вопросе, но всё ещё нет библиотек, где они были бы развиты достаточно хорошо, чтобы можно было в большинстве случаев отказаться от нативного дебагера.

Самой большой проблемой, на моей практике, является непонимание, когда эту реактивность применять, а когда нет. Дело в том, что внедряя реактивные интерфейсы в наш код, мы его красим. Как асинхронные функции заставляют везде писать await, чтобы получить из них данные, так и реактивные примитивы удобно использовать тогда, когда весь код на них опирается. Соответственно, когда все ключевые сущности имеют дополнительный реактивный интерфейс, программисту кажется что именно его и нужно использовать для всех задач. Но это не так.

В своём канале я опубликовал сравнение реализации очень простой задачи на двух разных реактивных библиотеках. Одна пропагандирует описывать вообще все связи реактивно, вторая подразумевает описание локальных процессов модели императивно. Размер кода отличается, но является не самым страшным моментом. Стрелочки на скрине показывают последовательность чтения кода при дебаге — попытке увидеть последовательность его выполнения (самая крайняя левая стрелка — начало операции). На этом примере очевидно, что процессы, описанные через реактивные интерфейсы, требуют больший путь для чтения — приходится больше прыгать глазами по коду.

В чём же причина, почему в этом примере реактивный подход выглядит так плохо? Давайте вспомним определение, что реактивность — способ разбиения кода. Зачем разбивать код единого процесса? В этом нет никакого смысла. Помимо связанности у нас есть и зацепленность, иногда эти понятия путают, но в английском они строго определены: сoupling и cohesion. На вики есть прекрасная иллюстрация.

Модули системы должны быть отвязаны друг от друга, но внутри, наоборот, они должны быть полностью сцеплены. Мне нравится другая формулировка — процессы должны быть максимально последовательны, линейны, императивны.

Чаще всего, контейнер доменной логики — это не данные, а процессы, которые описывают условия и правила их трансформации и изменения. Это самая сложная и важная часть кода приложения. Она должна описываться максимально просто.

Бывают задачи, когда части процессной логики должны быть реактивными — на них должны как-то реагировать другие модули системы и делать это прозрачно для основного домена. Например, когда нам нужно собирать аналитику с действий пользователя и системы, делать какие-то трассировки. В таких задачах событийная модель, конечно, будет правильным решением. Но по моей практике — такие сценарии встречаются очень редко. Чаще всего достаточно завернуть наши данные в интерфейс подписки, чтобы было проще соблюдать SSoT — это потребность большинства приложений.

▍ Выводы

ФРП и Rx помогают описывать асинхронные цепочки процессов, которые можно переиспользовать, но нужно это откровенно редко. При этом Rx очень плох в управлении состоянием — связанными данными. Команда Angular так и не смогла адаптировать rxjs для оптимального управления состоянием и недавно завезла отдельный примитив для этого — сигналы.

MobX отлично справляется с управлением связанными данными, но у него полностью отсутствуют примитивы для описания реактивных процессов, решающие редкие, но также важные задачи.

Оба имеют специфическое апи и связанные с этим проблемы.

Как вы понимаете, я автор ещё одной реактивной библиотеки (Reatom), рассказывающий вам об этом, как в своей реализации учёл обе эти потребности и сделал внутренние примитивы для описания процессов (экшены) тоже реактивными. Вы можете максимально эффективно описывать состояния, зависимые состояния, императивно описывать процессы и лениво, при необходимости, подписываться на них.

Выбирайте правильный тулинг, а самое главное — делайте это осознанно. Low coupling для модулей через публичные реактивные интерфейсы. Hight cohesion — через простой императивный код для внутренней логики — процессов.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *