Смотреть что такое "Реактивное программирование" в других словарях. Разбираемся с фреймворком ReactiveX и пишем в реактивном стиле для Android Устойчивость реактивных систем

Я хочу рассказать вам о современной дисциплине программирования, отвечающей растущим требованиям масштабируемости, отказоустойчивости и быстрого отклика, и незаменимой как в многоядерных средах так и в облачных вычислениях, а также представить открытый онлайн-курс по ней, который начнётся всего через несколько дней .

Если вы ничего не слышали про реактивное программирование, всё в порядке. Это стремительно развивающаяся дисциплина, в которой скомбинированы параллелизм (concurrency) c событийной ориентированностью и асинхроностью. Реактивность присуща любому веб-сервису и распределенной системе, и служит ядром многих выскопроизводительных систем с большой степенью параллелизма. Если коротко, то авторы курса предлагают рассматривать реактивное программирование как естественное расширение функционального программирования (с функциями высших порядков) на параллельные системы с распределенным состоянием, координируемые и оркестрируемые асинхронными потоками данных, которыми обмениваются активные субъекты, или акторы .

Более понятными словами это описывается в Реактивном манифесте , ниже я перескажу основные положения из него, а полный перевод опубликован на хабре . Как рассказывает википедия , термин реактивное программирование существует довольно давно и имеет практические применения разной степени экзотичности, но новый толчок к развитию и распространению он получил совсем недавно, благодаря усилиям авторов Реактивного манифеста — инициативной группе из Typesafe Inc . Typesafe известна в среде функционального программирования как компания, основанная авторами прекрасного языка Scala и революционной параллельной платформы Akka . Сейчас они позиционируют свою компанию как создателя первой в мире реактивной платформы, предназначенной для разработки нового поколения. Их платформа позволяет быстро разрабатывать сложные пользовательские интерфейсы и предоставляет новый уровень абстракции над параллельными вычислениями и многопоточностью, уменьшая присущие им риски благодаря гарантированно предсказуемому масштабированию. Она реализует на практике идеи Реактивного манифеста и позволяет разработчику осмыслять и создавать приложения, отвечающие современным запросам.

Вы можете познакомиться с этой платформой и реактивным программированием, приняв участие в массовом открытом онлайн-курсе «Принципы реактивного программирования» . Этот курс является продолжением курса Мартина Одерски «Принципы функционального программирования на Скала», который набрал более 100 000 участников и продемонстрировал одну из самых высоких в мире степень успешного прохождения массового открытого онлайн-курса его участниками. Вместе с создателем языка Скала новый курс читают Эрик Мейер, разрабатывавший среду Rx для реактивного программирования под.NET, и Роланд Кун, ведущий команду разработки Akka в Typesafe в настоящее время. Курс раскрывает ключевые элементы реактивного программирования и показывает, как они применяются для конструирования событийно-ориентированных систем, обладающих масштабируемостью и отказоустойчивостью. Учебный материал иллюстрируется короткими программами и сопровождается набором заданий, каждое из которых — это программный проект. В случае успешного выполнения всех заданий участники получают сертификаты (разумеется, и участие и сертификаты бесплатны). Курс продолжается 7 недель и начинается в этот понедельник, 4 ноября. Подробный план, а также вступительное видео доступны на странице курса: https://www.coursera.org/course/reactive .

Тем кто заинтересовался, либо сомневается, предлагаю сжатое изложение базовых концепций Реактивного манифеста. Его авторы отмечают значительные перемены в требованиях к приложениям за последние годы. Сегодня приложения разворачиваются в любом окружении от мобильных устройств до облачных кластеров с тысячами многоядерных процессоров. Эти окружения предъявляют новые требования к программному обеспечению и технологиям. В архитектурах предыдущего поколения акцент делался на управляемые сервера и контейнеры, а масштабирование достигалось за счет дополнительного дорогостоящего оборудования, проприетарных решений и параллельных вычислений через многопоточность. Сейчас развивается новая архитектура, в которой можно выделить четыре важнейшие черты, всё более преобладающие как в пользовательских, так и в корпоративных промышленных окружениях. Системы с такой архитектурой: событийно-ориентированы (Event-driven), масштабируемы (Scalable), отказоустойчивы (Resilient) и обладают быстрым откликом, т.е. отзывчивы (Responsive). Это обеспечивает комфортное взаимодействие с пользователем, дающее ощущение реального времени, и поддерживаемое самовосстанавливающимся масштабируемым прикладным стеком, готовым к развертыванию в многоядерных и облачных окружениях. Каждая из четырех характеристик реактивной архитектуры применяется ко всему технологическому стеку, что отличает их от звеньев в многоуровневых архитектурах. Рассмотрим их немного подробней.


Событийно-ориентированные приложения предполагают асинхронные коммуникации компонент и реализуют их слабую связанность (loosely coupled design): отправитель и получатель сообщения не нуждаются в сведениях ни друг о друге, ни о способе передачи сообщения, что позволяет им сконцентрироваться на содержании коммуникаций. Кроме того, что слабосвязанные компоненты значительно улучшают сопровождаемость, расширяемость и эволюционирование системы, асинхронность и неблокирующий характер их взаимодействия позволяют также освободить значительную часть ресурсов, снизить время оклика и обеспечить бо льшую пропускную способность по сравнению с традиционными приложениями. Именно благодаря событийно-ориентированной природе возможны остальные черты реактивной архитектуры.

Масштабируемость в контексте реактивного программирования — это реакция системы на изменение нагрузки, т.е. эластичность, достигаемая возможностью добавления или освобождения вычислительных узлов по мере необходимости. Благодаря низкой связанности, асинхронному обмену сообщениями и независимости от размещения компонент (location transparency), способ развертывания и топология приложения становятся решением времени развертывания и предметом конфигурации и адаптивных алгоритмов, реагирующих на нагрузку. Таким образом, вычислительная сеть становится частью приложения, изначально имеющего явную распределенную природу.

Отказоустойчивость реактивной архитектуры тоже становится частью дизайна, и это значительно отличает ее от традиционных подходов к обеспечению непрерывной доступности системы путем резервирования серверов и перехвата управления при отказе. Устойчивость такой системы достигается ее способностью корректно реагировать на сбои отдельных компонент, изолировать эти сбои, сохраняя их контекст в виде вызвавших их сообщений, и передавать эти сообщения другому компоненту, способному принять решения о том, как следует обрабатывать ошибку. Такой подход позволяет сохранить чистой бизнес-логику приложения, отделив от нее логику обработки сбоев, которая формулируется в явном декларативном виде для регистрации, изолирования, и обработки сбоев средствами самой системы. Для построения таких самовосстанавливающихся систем компоненты упорядочиваются иерархически, и проблема эскалируется до того уровня, который способен ее решить.

И наконец отзывчивость — это способность системы реагировать на пользовательское воздействие независимо от нагрузки и сбоев, такие приложения вовлекают пользователя во взаимодействие, создают ощущение тесной связи с системой и достаточной оснащенности для выполнения текущих задач. Отзывчивость актуальна не только в системах реального времени, но и необходима для широкого круга приложений. Более того, система, неспособная к быстрому отклику даже в момент сбоя, не может считаться отказоустойчивой. Отзывчивость достигается применением наблюдаемых моделей (observable models), потоков событий (event streams) и клиентов с состоянием (stateful clients). Наблюдаемые модели генерируют события при изменении своего состояния и обеспечивают взаимодействие реального времени между пользователями и системами, а потоки событий предоставляют абстракцию, на которой построено это взаимодействие путем неблокирующих асинхронных трансформаций и коммуникаций.

Таким образом, реактивные приложения представляют собой сбалансированный подход к решению широкого спектра задач современной разработки ПО. Построенные на событийно-ориентированном основании, они предоставляют средства, необходимые для гарантий масштабируемости и отказоустойчивости и поддерживают полнофункциональное отзывчивое пользовательское взаимодействие. Авторы ожидают, что всё большее число систем будет придерживаться принципов реактивного манифеста.

В довесок привожу план курса без перевода. Просто на случай, если вы дочитали до этого места, и вам все еще интересно.

Week 1 : Review of Principles of Functional Programming: substitution model, for-expressions and how they relate to monads. Introduces a new implementation of for-expressions: random value generators. Shows how this can be used in randomized testing and gives an overview of ScalaCheck, a tool which implements this idea.

Week 2 : Functional programming and mutable state. What makes an object mutable? How this impacts the substitution model. Extended example: Digital circuit simulation

Week 3 : Futures. Introduces futures as another monad, with for-expressions as concrete syntax. Shows how futures can be composed to avoid thread blocking. Discusses cross-thread error handling.

Week 4 : Reactive stream processing. Generalizing futures to reactive computations over streams. Stream operators.

Week 5 : Actors. Introduces the Actor Model, actors as encapsulated units of consistency, asynchronous message passing, discusses different message delivery semantics (at most once, at least once, exactly once) and eventual consistency.

Week 6 : Supervision. Introduces reification of failure, hierarchical failure handling, the Error Kernel pattern, lifecycle monitoring, discusses transient and persistent state.

Week 7 : Conversation Patterns. Discusses the management of conversational state between actors and patterns for flow control, routing of messages to pools of actors for resilience or load balancing, acknowledgement of reception to achieve reliable delivery.

Принципы реактивного программирования не новы, и их можно проследить с 70-х и 80-х годов в основополагающих работах Джима Грея и Пэта Хелланда по тандемной системе.

Эти люди намного опередили свое время. Только в последние 5-10 лет технологическая индустрия была вынуждена пересмотреть существующие «лучшие практики» для развития корпоративной системы. Она научилась применять знания о реактивных принципах сегодняшнего мира многоядерных и облачных вычислений.

Основой для реактивной системы является передача сообщений, которая создает временную границу между компонентами, позволяет их развязывать во времени, используя параллельность и пространство, что распределяет нагрузку и обеспечивает мобильность. Эта развязка является требованием полной изоляции между компонентами и составляет основу как для устойчивости, так и для эластичности систем.

Основы реактивного программирования

Это программирование направлено на потоки информации и распространение изменений данных. При использовании языков программирования легко выделить статические и динамические потоки, при этом базовая модель автоматически распространит изменения через все потоки данных. Говоря простыми словами, в программирования Rx, испускаемых одним компонентом, и базовая структура, предоставляемая библиотеками Rx, будет распространять эти изменения на другой компонент, зарегистрированный для получения этих изменений. Реактивное программирование Rx состоит из трех ключевых моментов.

Основные функции компонентов:

  1. Наблюдаемые - не что иное, как потоки данных. Наблюдаемый упаковывает данные, которые могут передаваться из одного потока в другой. Они в основном испускают данные периодически или только один раз в своем жизненном цикле на основе конфигураций. Существуют различные операторы, которые могут помочь наблюдателю отправить некоторые конкретные данные на основе определенных событий.
  2. Наблюдатели потребляют поток данных, излучаемый наблюдаемым. Наблюдатели подписываются с помощью метода реактивного программирования subscribeOn () для получения данных, передающих наблюдаемым. Всякий раз, когда наблюдаемый передаст данные, все зарегистрированные наблюдатели получают данные в обратном вызове onNext (). Здесь они могут выполнять различные операции, такие как разбор ответа JSON или обновление пользовательского интерфейса. Если есть ошибка, вызванная наблюдаемым, наблюдатель получит ее в onError ().
  3. Планировщики (расписание) - это компонент в Rx, который сообщает наблюдаемым и наблюдателям, по какому потоку они должны работать. Можно использовать метод observOn (), чтобы сообщать наблюдателям, на каком потоке они должны наблюдать. Кроме того, можно использовать schedOn (), чтобы сообщить наблюдаемому, в каком потоке они должны запускаться.

В реактивном программировании с использованием RxJava предусмотрены основные потоки по умолчанию, такие как Schedulers.newThread () создадут новый фон. Schedulers.io () выполнит код в потоке ввода-вывода.

Основными преимуществами Rx являются увеличение использования вычислительных ресурсов на многоядерном и многопроцессорном оборудовании, повышение производительности за счет сокращения точек и повышение производительности за счет сокращения точек сериализации, согласно Закону Амдаля и Универсального закона о масштабируемости Гюнтера.

Второе преимущество - высокая производительность для разработчиков, поскольку традиционные парадигмы программирования изо всех сил пытались обеспечить простой и поддерживаемый подход к работе с асинхронными и неблокирующими вычислениями и IO. С этими задачами справляется функциональное реактивное программирование, поскольку оно обычно устраняет необходимость в явной координации между активными компонентами.

Там, где возникает Rx, создается процесс создания компонентов и состав рабочих процессов. Чтобы полностью использовать асинхронное выполнение, включение противодавления имеет решающее значение, чтобы избежать чрезмерного использования или, скорее, неограниченного потребления ресурсов. Чтобы обеспечить устойчивое состояние с точки зрения потока данных, обратное давление на основе нагрузки отсылает спрос, протекающий вверх по потоку, и принимает сообщения.

Таким образом, основными преимуществами системы являются:

  1. Повышенная производительность - благодаря возможности быстро и стабильно обрабатывать огромные объемы данных.
  2. Улучшенный UX - из-за того, что приложение больше реагирует на пользователя.
  3. Упрощенные модификации и обновления - благодаря более читабельному и более легкому прогнозированию кода.

Но несмотря на то, что Reactive Programming - очень полезная штука при создании современного программного обеспечения, чтобы рассуждать о системе на более высоком уровне, нужно использовать другой инструмент - Reactive Architecture для процесса проектирования реактивных систем. Кроме того, важно помнить, что существует много парадигм программирования, и Rx - это лишь один из них, как и любой инструмент, он не предназначен для всех случаев использования.

Устойчивость реактивных систем

Устойчивость - это отзывчивость при неудаче и является неотъемлемым функциональным свойством системы. Для нее нужны разработки, а не просто добавление в систему в ретроактивном виде. Устойчивость реактивного программирования javascript выходит за пределы отказоустойчивости и это происходит не из-за деградации, а в случае неудачи она может полностью сама исправиться.

Для этого требуется изоляция компонентов и сдерживание отказов, чтобы избежать сбоев, распространяющихся на соседние компоненты, что может привести к катастрофическим сценариям с каскадными сбоями. Таким образом, ключом к созданию систем Resilient — самовосстановления — является то, что они могут быть охарактеризованными как сообщения, отправленные на другие компоненты, которые действуют как супервизоры и управляются из безопасного контекста вне отказавшего компонента.

Здесь, будучи управляемыми сообщениями, эти средства отходят от сильно связанных, хрупких, глубоко вложенных синхронных цепочек вызовов, которые в большинстве случаев игнорируются. Идея состоит в том, чтобы отделить управление отказами от цепочки вызовов, например, освобождая клиента от ответственности за обработку сбоев сервера.

Поскольку большинство систем по своей природе сложны, одним из наиболее важных аспектов является обеспечение того, чтобы системная архитектура обеспечивала минимальное снижение производительности как при разработке, так и в обслуживании компонентов, и в то же время уменьшала случайную сложность до минимума. Это важно, поскольку в течение жизненного цикла системы, если она не разработана должным образом, будет все труднее и труднее поддерживать работоспособность, и потребуется все больше времени и усилий для понимания, чтобы локализовать и устранить проблемы.

Реактивные системы представляют собой наиболее производительную системную архитектуру, в контексте многоядерных, облачных и мобильных архитектур:

  1. Изоляция отказов предлагает переборки между компонентами, предотвращая отказы от каскадирования и ограничивая объем и степень отказов.
  2. Иерархии супервайзеров предлагают несколько уровней защиты в сочетании с возможностями самовосстановления, что устраняет множество временных отказов от каких-либо эксплуатационных затрат для расследования.
  3. Пропуск передачи сообщений и прозрачность местоположения позволяют отключать и заменять компоненты, не затрагивая работу конечного пользователя. Это снижает стоимость сбоев, их относительную актуальность, а также ресурсы, необходимые для диагностики и исправления.
  4. Репликация снижает риск потери данных и уменьшает влияние сбоя на доступность поиска и хранения информации.
  5. Эластичность позволяет сохранять ресурсы по мере того, как использование колеблется, что минимизирует эксплуатационные расходы при низкой загрузке и риск сбоев или срочных инвестиций в масштабируемость по мере увеличения нагрузки.

Веб-приложения могут в значительной степени выиграть от стиля разработки с Rx, что позволяет составить рабочие процессы запроса-ответа, включающие разветвление на вызовы служб, асинхронное извлечение ресурсов и составление ответов, и последующую сортировку для клиента. Совсем недавно push-а-серверные события и веб-сокеты стали все чаще использоваться на практике, и для выполнения этого в масштабе требуется эффективный способ хранения множества открытых подключений и где IO не блокирует.

Для этого есть инструменты,например, Streams and Futures, которые делают простыми неблокирующие и асинхронные преобразования, и подталкивает их к клиентам. Реактивное программирование c уровнем доступа к данным - обновляет и запрашивает их в эффективном ресурсе, предпочтительно с использованием баз данных SQL или NoSQL с асинхронными драйверами.

Веб-приложения также выигрывают от разработки реактивной системы для таких вещей, как распределенное кэширование, согласованность данных и уведомления с несколькими узлами. Традиционные веб-приложения обычно используют стоящие узлы. Но как только программисты начинают использовать Server-Sent-Events (SSE) и WebSockets — эти узлы становятся работоспособными, поскольку, как минимум, они поддерживают состояние клиентского соединения, а push-уведомления направляются к ним соответствующим образом. Для этого требуется разработка реактивной системы, так как это область, где важна адресация получателей посредством обмена сообщениями.

Суть реактивное программирование Java

Не требуется обязательно использовать Rx в реактивных системах. Потому что Rx программирование и реактивные системы — не одно и тоже. Хотя они часто используются взаимозаменяемые термины, но не являются точными синонимами и отражают разные вещи. Системы представляют собой следующий уровень «реактивности». Этот уровень подразумевает конкретные проектные и архитектурные решения, которые позволяют создавать устойчивые и гибкие приложения.

Тем не менее очень хорошая идея — комбинация методов — приносит еще больше преимуществ приложениям, так как делает их еще более связанными, позволяет более эффективно использовать ресурсы и обеспечивает более низкую задержку. Когда речь идет об огромных объемах данных или многозадачности, часто нужна асинхронная обработка, чтобы системы были быстрыми и отзывчивыми.

В Java - представителе старого объектно-ориентированного программирования, асинхронность может стать действительно сложной и сделать код трудно понятным и поддерживаемым. Таким образом Rx особенно полезно для этой «чисто» объектно-ориентированной среды, поскольку оно упрощает работу с асинхронными потоками.

С его последними выпусками, начиная с Java 8, сама Java сделала некоторые попытки внедрить встроенную реактивность, но эти попытки сегодня не очень популярны у разработчиков. Тем не менее есть некоторые живые и регулярно обновляемые сторонние реализации для реактивного программирования Java, которые помогают сэкономить день и поэтому особенно ценятся разработчиками Java.

В обычном приложении для обычно неоднократно выполняют некоторые операции реактивного программирования с использованием RxJava, поэтому нужно сравнить скорость, использование процессора и памяти с теми же операциями, которые были реализованы как с сопрограммами Kotlin, так и с RxJava. Это начальный тест производительности.

Всякий раз, когда применяется новый инструмент, который будет широко использоваться во всем кодексе, важно понять, повлияет ли оно на общую производительность приложения, прежде чем принимать решение о том, насколько целесообразно использовать его. Практика использования дает короткий ответ: в большинстве случаев пользователям стоит подумать о замене RxJava на сопрограммы Kotlin, особенно в Android.

Реактивное программирование с применением RxJava может по-прежнему использоваться в ограниченном количестве случаев, и в этих случаях можно смешивать, как RxJava, так и сопрограммы.

Простые причины:

  1. Обеспечивают гораздо большую гибкость, чем обыкновенное Rx.
  2. Предоставляет богатый набор операторов в коллекциях, которые будут выглядеть так же, как с операторами RxJava.
  3. Реактивное программирование Kotlin могут взаимодействовать при необходимости с использованием rxjava.
  4. Они очень легкие и эффективные, учитывая более высокий уровень использования ЦП для сбора мусора со всех объектов, созданных RxJava.

Реактивные расширения

Reactive Extensions (ReactiveX или RX) - это библиотека, которая следует за принципами Rx, то есть составление асинхронных и основанных на событиях программ с использованием наблюдаемой последовательности. Эти библиотеки предоставляют множество интерфейсов и методов, которые помогают разработчикам писать чистый и простой код.

Реактивные расширения доступны на нескольких языках. Программисты особенно заинтересованы в RxJava и RxAndroid, так как андроид - это самая сфокусированная область.

Реактивное программирование с использованием RxJava - это реализация Java Reactive Extension из Netflix. В основном это библиотека, которая составляет асинхронные события, следуя шаблону наблюдателя.

Можно создавать асинхронный трафик, преобразовывать и потреблять их наблюдателем в разных потоках данных. Библиотека предлагает широкий спектр удивительных операторов, таких как карта, объединение и фильтр, которые могут быть применены к потоку данных. Когда программист начнет использовать фактические примеры кода, он узнает больше об операторах и преобразованиях.

Многопоточность в приложениях Android

Android" class="if uuid-2938324" src="/misc/i/gallery/73564/2938324.jpg" />

Android реактивное программирование (RxAndroid) специфичен для платформы Android с несколькими добавленными классами поверх RxJava. Более конкретно — планировщики представлены в RxAndroid (AndroidSchedulers.mainThread ()), который играет важную роль в поддержке концепции многопоточности в приложениях для Android.

Помимо всего прочего, специалисты советуют использовать только библиотеку RxJava. Даже благодаря большому количеству планировщиков, используемых в программировании для Android.

Ниже приведен список планировщиков и их краткое содержание:

  1. Schedulers.io () - используется для выполнения неинтенсивных операций, таких как сетевые вызовы, чтение дисков /файлов, операций с базами данных и который поддерживает пул потоков.
  2. AndroidSchedulers.mainThread () - обеспечивает доступ к основной теме Thread / UI. Обычно в этом потоке происходят операции, такие как обновление пользовательского интерфейса, взаимодействие с пользователем. Специалисты советуют пользователям, что они не должны выполнять какие-либо интенсивные операции над этим потоком, так как это может вызвать бросок приложения или диалог ANR.
  3. Schedulers.newThread () - используя это, новый поток будет создан каждый раз, когда запланирована задача. Обычно предлагается не использовать расписание для очень продолжительных работ. Нити, созданные с помощью newThread (), не будут повторно применяться.
  4. Schedulers.computation () - этот график может применяться для выполнения интенсивных операций с процессором, по обработке огромных данных центра реактивного программирования, обработка растровых изображений. Количество потоков, созданных с использованием этого планировщика, полностью зависит от количества доступных ядер ЦП.
  5. Schedulers.single () - этот планировщик выполнит все задачи в следующем порядке, что можно использовать, когда требуется необходимость последовательного выполнения.
  6. Schedulers.immediate () - этот планировщик выполняет задачу немедленно, синхронно блокируя основной поток.
  7. Schedulers.trampoline () - выполняет задачи в режиме First In-First Out. Все запланированные задачи будут выполняться одна за другой, ограничивая количество потоков фона одним.
  8. Schedulers.from () - позволяет создавать планировщик от исполнителя, ограничивая количество создаваемых потоков. Когда пул потоков занят, задачи будут поставлены в очередь.

Теперь, когда есть хорошие теоретические знания об RxJava и RxAndroid, можно перейти к некоторым примерам кода, чтобы лучше понять концепцию. Для начала работы нужно добавить зависимости RxJava и RxAndroid к проектам build.gradle и синхронизировать проект.

Программирование.

Observer подписывают на Observable, чтобы он мог начать получать данные, используя два метода:

  1. SubscribeOn (Schedulers.io ()) - говорит Observable, чтобы запустить задачу в фоновом потоке.
  2. ObservOn (AndroidSchedulers.mainThread ()) - указывает Observer получать данные в потоке пользовательского интерфейса Android.

Вот и все, таким образом программист сможет написать свою первую программу реактивного программирования с RxJava.

Предприятия и поставщики промежуточного программного обеспечения начали использовать Reactive, а в 2016 -2018 годах наблюдался огромный рост корпоративной заинтересованности в принятии этой парадигмы.

Rx предлагает производительность для разработчиков благодаря ресурсоэффективности на уровне компонентов для внутренней логики и преобразования потока данных, в то время, как реактивные системы предлагают производительность для архитекторов и DevOps, благодаря устойчивости и эластичности на уровне системы. Они применяются для создания «Cloud Native» и других широкомасштабных распределенных систем. На практике также широко используют книги о реактивном программировании Java с методами позволяющих комбинировать принципов проектирования реактивных систем.

На сегодняшний день существует целый ряд методологий программирования сложных систем реального времени. Одна из таких методологий носит название ФРП (FRP). Она впитала в себя шаблон проектирования, называемый Наблюдатель (Observer ) с одной стороны, а с другой, как не трудно догадаться, принципы функционального программирования. В этой статье мы рассмотрим функциональное реактивное программирование на примере реализации его в библиотеке Sodium для языка Haskell .

Теоретическая часть

Традиционно программы принято делить на три класса:

  • Пакетные
  • Интерактивные
  • Реактивные

Различия между этими тремя классами программ кроются в типе их взаимодействия с внешним миром.

Выполнение пакетных программ никак не синхронизировано с внешним миром. Они могут быть запущены в некоторый произвольный момент времени и завершены тогда, когда исходные данные будут обработаны и получен результат. В отличии от пакетных интерактивные и реактивные программы обмениваются данными с внешним миром непрерывно в процессе своей работы с момента запуска до момента остановки.

Реактивная программа в отличии от интерактивной жестко синхронизирована с внешней средой: от неё требуется реагировать на события внешнего мира с приемлемой задержкой в темпе возникновения этих событий. В то же время интерактивной программе позволительно заставлять внешнюю среду ждать. Например, текстовый редактор является интерактивной программой: пользователь, запустивший процесс исправления ошибок в тексте, должен дождаться его завершения, чтобы продолжить редактирование. Однако, автопилот является реактивной программой, поскольку при возникновении препятствия, она должна сразу же скорректировать курс, чтобы совершить манёвр облёта, ведь реальный мир нельзя поставить на паузу, как пользователя текстового редактора.

Эти два класса программ появились несколько позднее, тогда, когда компьютеры начали использоваться в управлении машинами, и стали развиваться пользовательские интерфейсы. С тех пор сменилось множество методик реализации, каждая следующая из которых призвана была исправить недостатки предшествующих. Сначала это были простые событийно-управляемые программы: в системе выделяли некоторое множество событий, на которые необходимо было реагировать, и создавали обработчики этих событий. Обработчики в свою очередь также могли генерировать события, которые уходили во внешний мир. Эта модель была проста и решала некоторый круг простых интерактивных задач.

Но со временем интерактивные и реактивные программы становились сложнее и событийно-ориентированное программирование превратилось в ад. Возникла потребность в более продвинутых инструментах синтеза событийно-управляемых систем. В этой методологии было два главных изъяна:

  • Неявное состояние
  • Недетерминированность

Чтобы исправить это, сначала был создан шаблон наблюдатель (observer ), трансформировавший события в меняющиеся во времени значения. Набор этих значений представлял собой явное состояние, в котором находится программа. Так на смену обработке событий пришла привычная для пакетных программ работа с данными. Разработчик мог изменять наблюдаемые значения и подписывать обработчики на изменение этих значений. Появилась возможность реализовывать зависимые значения, которые менялись по заданному алгоритму, при изменении тех значений, от которых они зависели.

Хоть события в этом подходе и ушли, но потребность в них всё же осталась. Далеко не всегда событие подразумевает изменение значения. Например, событие реального времени подразумевает увеличение счётчика времени на секунду, однако событие срабатывания будильника каждый день в определённое время вовсе не подразумевает никакого значения. Конечно, можно связать и это событие с неким значением, но это будет искусственным приёмом. Например, мы можем ввести значение: время_срабатывания_будильника == остаток_от_деления (текущее_время, 24*60*60) . Но это будет не совсем то, что нас интересует, поскольку эта переменная привязана к секунде, и на самом деле значение меняется дважды. Чтобы выяснить, что будильник сработал, подписчик должен определять, что значение стало истинным, а не наоборот. Значение будет истинным ровно одну секунду, а если мы изменим период тиков с секунды, скажем, на 100 миллисекунд, то и значение истина будет уже не секунду, а эти 100 миллисекунд.

Появление методологии функционального реактивного программирования это своеобразный ответ функциональщиков на шаблон наблюдатель . В FRP были переосмыслены выработанные подходы: события (Events) никуда не делись, однако, наблюдаемые значения также появились и были названы характеристиками (Behaviors) . Событие в этой методологии представляет собой некое дискретно генерируемое значение, а характеристика --- непрерывно генерируемое. Оба они могут быть связаны между собой: характеристики могут генерировать события, а события выступать в качестве источников для характеристик.

Проблема недетерминированности состояния гораздо более сложна. Она проистекает из того факта, что событийно-управляемые системы дефакто асинхронные. Это влечёт возникновение промежуточных состояний системы, которые могут быть по некоторым причинам недопустимы. Для решения этой проблемы появилось так называемое синхронное программирование.

Практическая часть

Библиотека Sodium появилась как проект реализации FRP с общим интерфейсом на разных языках программирования. В ней присутствуют все элементы методологии: примитивы (Event, Behavior) и шаблоны их использования.

Примитивы и взаимодействие с внешним миром

Два основных примитива, с которыми нам предстоит работать, это:

  • Event a - событие со значением типа a
  • Behavior a - характеристика (или меняющееся значение) типа a

Мы можем создавать новые события и значения функциями newEvent и newBehavior :

NewEvent:: Reactive (Event a, a -> Reactive ()) newBehavior:: a -> Reactive (Behavior a, a -> Reactive ())

Как видим, обе эти функции могут быть вызваны только в монаде Reactive , в результате возвращается собственно сам примитив, а также функция, которая должна быть вызвана для активации события или изменения значения. Функция создания характеристики принимает в качестве первого аргумента начальное значение.

Чтобы связать реальный мир с реактивной программой, существует функция sync , а чтобы связать программу с внешним миром существует функция listen :

Sync:: Reactive a -> IO a listen:: Event a -> (a -> IO ()) -> Reactive (IO ())

Первая функция, как можно понять из названия, выполняет некоторый реактивный код синхронно, она позволяет попасть в контекст Reactive из контекста IO , а вторая служит для добавления обработчиков событий, возникающих в контексте Reactive , выполняющихся в контексте IO . Функция listen возвращает функцию unlisten , которую нужно вызвать, чтобы отсоединить обработчик.

Таким образом реализуется своеобразный механизм транзакций. Когда мы делаем нечто внутри монады Reactive, код выполняется в пределах одной транзакции в момент вызова функции sync . Состояние детерминировано только вне контекста транзакции.

Это базис реактивного функционального программирования, который достаточен для написания программ. Может немного смутить тот факт, что слушать можно только события. Именно так и должно быть, как мы увидим далее между событиями и характеристиками существуют тесные взаимосвязи.

Операции над основными примитивами

Для удобства в методологию внесены дополнительные функции, преобразующие события и характеристики. Рассмотрим некоторые из них:

Событие, которое никогда не произойдёт -- (может использоваться в качестве заглушки) never:: Event a -- Объединение двух событий одинакового типа в одно -- (удобно для определения одного обработчика для класса событий) merge:: Event a -> Event a -> Event a -- Вытаскивает значения из Maybe событий -- (отделяем зёрна от плевел) filterJust:: Event (Maybe a) -> Event a -- Превращает событие в характеристику с начальным значением -- (меняем значение при возникновении событий) hold:: a -> Event a -> Reactive (Behavior a) -- Превращает характеристику в событие -- (генерируем события при изменении значения) updates:: Behavior a -> Event a -- Превращает характеристику в событие -- (также генерируем событие для начального значения) value:: Behavior a -> Event a -- При возникновении события берёт значение характеристики, -- применяет функцию и генерирует событие snapshot:: (a -> b -> c) -> Event a -> Behavior b -> Event c -- Получает текущее значение характеристики sample:: Behavior a -> Reactive a -- Сводит повторяющиеся события в одно coalesce:: (a -> a -> a) -> Event a -> Event a -- Подавляет все события, кроме первого once:: Event a -> Event a -- Разделяет событие со списком на несколько событий split:: Event [a] -> Event a

Сферические примеры в вакууме

Давайте попробуем что-нибудь написать:

Import FRP.Sodium main = do sync $ do -- создаём событие (e1, triggerE1) <- newEvent -- создаём характеристику с начальным значением 0 (v1, changeV1) <- newBehavior 0 -- определяем обработчик для события listen e1 $ \_ -> putStrLn $ "e1 triggered" -- определяем обработчик для изменения значения характеристики listen (value v1) $ \v -> putStrLn $ "v1 value: " ++ show v -- Генерируем событие без значения triggerE1 () -- Изменяем значение характеристики changeV1 13

Установим пакет Sodium с помощью Cabal и запустим пример в интерпретаторе:

# если хотим работать в отдельной песочнице # создаём её > cabal sandbox init # установим > cabal install sodium > cabal repl GHCi, version 7.6.3: http://www.haskell.org/ghc/ :? for help Loading package ghc-prim ... linking ... done. Loading package integer-gmp ... linking ... done. Loading package base ... linking ... done. # загрузим пример Prelude> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. # выполним пример *Main>

А теперь поэкспериментируем. Закомментируем строчку, где мы изменяем наше значение (changeV1 13) и перезапустим пример:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered v1 value: 0

Как видим, теперь выводится начальное значение, так происходит потому что функция value генерирует первое событие с начальным значением характеристики. Давайте заменим функцию value на updates и посмотрим, что получилось:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered

Теперь начальное значение не выводится, но если раскомментировать строчку, в которой мы изменили значение, то по-прежнему будет выведено изменённое значение. Давайте вернём всё, как было, и сгенерируем событие e1 дважды:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered e1 triggered v1 value: 13

Как видим, событие также сработало дважды. Попробуем этого избежать, для чего в функции listen заменим аргумент e1 на (once e1) , тем самым создав новое событие, срабатывающее единожды:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered v1 value: 13

Когда у события отсутствует аргумент, нам важен сам факт наличия или отсутствия события, поэтому функция once для объединения событий верный выбор. Однако, когда аргумент присутствует, это не всегда подходит. Перепишем пример следующим образом:

<- newEvent (v1, changeV1) <- newBehavior 0 listen e1 $ \v -> putStrLn $ "e1 triggered with: " ++ show v listen (value v1) $ \v -> putStrLn $ "v1 value: " ++ show v triggerE1 "a" triggerE1 "b" triggerE1 "c" changeV1 13

Получим, как и ожидали, все события со значениями в том же порядке, в котором они были сгенерированы:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered with: "a" e1 triggered with: "b" e1 triggered with: "c" v1 value: 13

Если используем функцию once с e1 , то получим только первое событие, поэтому попробуем использовать функцию coalesce , для чего заменим аргумент e1 в listen аргументом (coalesce (\_ a -> a) e1) :

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered with: "c" v1 value: 13

И действительно, мы получили только последнее событие.

Ещё примеры

Давайте рассмотрим примеры посложнее:

Import FRP.Sodium main = do sync $ do (e1, triggerE1) <- newEvent -- создаём характеристику, изменяемую событием e1 v1 <- hold 0 e1 listen e1 $ \v -> putStrLn $ "e1 triggered with: " ++ show v listen (value v1) $ \v -> putStrLn $ "v1 value is: " ++ show v -- генерируем события triggerE1 1 triggerE1 2 triggerE1 3

Вот что имеем на выходе:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main e1 triggered with: 1 e1 triggered with: 2 e1 triggered with: 3 v1 value is: 3

Значение характеристики выводится только один раз, хотя событий было сгенерировано несколько. В этом состоит особенность синхронного программирования: характеристики синхронизированы с вызовом sync . Чтобы это продемонстрировать, слегка переделаем наш пример:

<- sync $ do (e1, triggerE1) <- newEvent v1 <- hold 0 e1 listen e1 $ \v -> putStrLn $ "e1 triggered with: " ++ show v listen (value v1) $ \v -> putStrLn $ "v1 value is: " ++ show v return triggerE1 sync $ triggerE1 1 sync $ triggerE1 2 sync $ triggerE1 3

Мы всего-лишь вынесли триггер события во внешний мир и вызываем его в разных фазах синхронизации:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main v1 value is: 0 e1 triggered with: 1 v1 value is: 1 e1 triggered with: 2 v1 value is: 2 e1 triggered with: 3 v1 value is: 3

Теперь при каждом событии демонстрируется новое значение.

Другие операции над примитивами

Рассмотрим следующую группу полезных функции:

Объединяет события с использованием функции mergeWith:: (a -> a -> a) -> Event a -> Event a -> Event a -- Фильтрует события, оставляя только те, -- для которых функция возвращает истину filterE:: (a -> Bool) -> Event a -> Event a -- Позволяет "выключать" события -- когда характеристика равна False gate:: Event a -> Behavior Bool -> Event a -- Организует преобразователь событий -- с внутренним состоянием collectE:: (a -> s -> (b, s)) -> s -> Event a -> Reactive (Event b) -- Организует преобразователь характеристик -- с внутренним состоянием collect:: (a -> s -> (b, s)) -> s -> Behavior a -> Reactive (Behavior b) -- Создаёт характеристику как результат накопления событий accum:: a -> Event (a -> a) -> Reactive (Behavior a)

Конечно, это ещё далеко не все функции, предоставляемые библиотекой. Есть и куда более экзотические вещи, которые выходят за рамки данной статьи.

Примеры

Давайте попробуем эти функции в действии. Начнём с последней, организуем что-то вроде калькулятора. Пусть у нас будет некое значение, к которому можно применять арифметические функции и получать результат:

Import FRP.Sodium main = do triggerE1 <- sync $ do (e1, triggerE1) <- newEvent -- пусть начальное значение будет равно 1 v1 <- accum (1:: Int) e1 listen (value v1) $ \v -> putStrLn $ "v1 value is: " ++ show v return triggerE1 -- прибавляем 1 sync $ triggerE1 (+ 1) -- умножаем на 2 sync $ triggerE1 (* 2) -- вычитаем 3 sync $ triggerE1 (+ (-3)) -- прибавляем 5 sync $ triggerE1 (+ 5) -- возводим в степень 3 sync $ triggerE1 (^ 3)

Запустим:

*Main> :l Example.hs Compiling Main (Example.hs, interpreted) Ok, modules loaded: Main. *Main> main v1 value is: 1 v1 value is: 2 v1 value is: 4 v1 value is: 1 v1 value is: 6 v1 value is: 216

Может показаться, что набор возможностей довольно скуден, но на самом деле это не так. Ведь мы имеем дело с Хаскелем, аппликативные функторы и монады никуда не делись. Мы можем выполнять над характеристиками и событиями любые операции, которые мы привыкли выполнять над чистыми значениями. В результате чего получаются новые характеристики и события. Для характеристик реализован класс функтор и аппликативный функтор, для событий же по очевидным причинам только функтор.

Например:

<$>), (<*>)) import FRP.Sodium main = do (setA, setB) <- sync $ do (a, setA) <- newBehavior 0 (b, setB) <- newBehavior 0 -- Новая характеристика a + b let a_add_b = (+) <$> a <*> b -- Новая характеристика a * b let a_mul_b = (*) <$> a <*> b listen (value a) $ \v -> putStrLn $ "a = " ++ show v listen (value b) $ \v -> putStrLn $ "b = " ++ show v listen (value a_add_b) $ \v -> putStrLn $ "a + b = " ++ show v listen (value a_mul_b) $ \v -> putStrLn $ "a * b = " ++ show v return (setA, setB) sync $ do setA 2 setB 3 sync $ setA 3 sync $ setB 7

Вот что будет выведено в интерпретаторе:

λ> main a = 0 b = 0 a + b = 0 a * b = 0 a = 2 b = 3 a + b = 5 a * b = 6 a = 3 a + b = 6 a * b = 9 b = 7 a + b = 10 a * b = 21

А теперь посмотрим как нечто подобное работает с событиями:

Import Control.Applicative ((<$>)) import FRP.Sodium main = do sigA <- sync $ do (a, sigA) <- newEvent let a_mul_2 = (* 2) <$> a let a_pow_2 = (^ 2) <$> a listen a $ \v -> putStrLn $ "a = " ++ show v listen a_mul_2 $ \v -> putStrLn $ "a * 2 = " ++ show v listen a_pow_2 $ \v -> putStrLn $ "a ^ 2 = " ++ show v return sigA sync $ do sigA 2 sync $ sigA 3 sync $ sigA 7

Вот что будет выведено:

λ> main a = 2 a * 2 = 4 a ^ 2 = 4 a = 3 a * 2 = 6 a ^ 2 = 9 a = 7 a * 2 = 14 a ^ 2 = 49

В документации имеется перечень экземпляров классов, которые реализованы для Behavior и Event , но ничто не мешает реализовать экземпляры недостающих классов.

Обратная сторона реактивности

Функциональное реактивное программирование несомненно упрощает разработку сложных систем реального времени, однако существует множество аспектов, которые необходимо учитывать при использовании этого подхода. Поэтому рассмотрим здесь проблемы, которые наиболее часто имеют место быть.

Неодновременность

Синхронное программирование подразумевает некий механизм транзакций, обеспечивающий согласованность последовательно сменяющих друг друга состояний системы, и, следовательно, отсутствие промежуточных неожиданных состояний. В Sodium за транзакции отвечают вызовы sync . Хоть состояние внутри транзакции не определено, однако нельзя считать, что всё внутри неё происходит одновременно. Значения меняются в определённом порядке, который влияет на результат. Так, например, совместное использование событий и характеристик может вызывать неожиданные эффекты. Рассмотрим пример:

Import Control.Applicative ((<$>)) import FRP.Sodium main = do setVal <- sync $ do (val, setVal) <- newBehavior 0 -- создаём булеву характеристику val > 2 let gt2 = (> 2) <$> val -- создаём событие со значениями, которые > 2 let evt = gate (value val) gt2 listen (value val) $ \v -> putStrLn $ "val = " ++ show v listen (value gt2) $ \v -> putStrLn $ "val > 2 ? " ++ show v listen evt $ \v -> putStrLn $ "val > 2: " ++ show v return setVal sync $ setVal 1 sync $ setVal 2 sync $ setVal 3 sync $ setVal 4 sync $ setVal 0

Можно ожидать вывод наподобие этого:

Val = 0 val > 2 ? False val = 1 val > 2 ? False val = 2 val > 2 ? False val = 3 val > 2 ? True val > 2: 3 val = 4 val > 2 ? True val > 2: 4 val = 0 val > 2 ? False

Однако на самом деле строка val > 2: 3 будет отсутствовать, а в конце появится строка val > 2: 0 . Так происходит потому, что событие изменения значения (value val) генерируется до того, как будет вычислена зависимая характеристика gt2 , и поэтому событие evt не возникает для установленного значения 3. В конце же, когда мы снова установили 0, вычисление характеристики gt2 запаздывает.

В общем, эффекты те же, что и в аналоговой и цифровой электронике: гонки сигналов, для борьбы с которыми используют разные приёмы. В частности, синхронизацию. Так мы и поступим, чтобы заставить этот код работать должным образом:

Import Control.Applicative ((<$>)) import FRP.Sodium main = do (sigClk, setVal) <- sync $ do -- Мы ввели новое событие clk -- сигнал синхронизации -- прям как в цифровой электронике (clk, sigClk) <- newEvent (val, setVal) <- newBehavior 0 -- Также вы создали альтернативную функцию -- получения значения по сигналу синхронизации -- и заменили все вызовы value на value" let value" = snapshot (\_ v -> v) clk let gt2 = (> 2) <$> val let evt = gate (value" val) gt2 listen (value" val) $ \v -> putStrLn $ "val = " ++ show v listen (value" gt2) $ \v -> putStrLn $ "val > 2 ? " ++ show v listen evt $ \v -> putStrLn $ "val > 2: " ++ show v return (sigClk, setVal) -- Ввели новую функцию sync" -- которая вызывает сигнал синхронизации -- в конце каждой транзакции -- И заменили ей все вызовы sync let sync" a = sync $ a >> sigClk () sync" $ setVal 1 sync" $ setVal 2 sync" $ setVal 3 sync" $ setVal 4 sync" $ setVal 0

Теперь наш вывод стал таким как и ожидалось:

λ> main val = 0 val > 2 ? False val = 1 val > 2 ? False val = 2 val > 2 ? False val = 3 val > 2 ? True val > 2: 3 val = 4 val > 2 ? True val > 2: 4 val = 0 val > 2 ? False

Ленивость

Проблемы другого рода связаны с ленивой природой вычислений в Haskell . Это приводит к тому, что при испытании кода в интерпретаторе, некоторый вывод в конце может попросту отсутствовать. Что можно предложить в этом случае, так выполнить бесполезный шаг синхронизации в конце, например sync $ return () .

Заключение

На этом пока, думаю, достаточно. В настоящий момент один из авторов библиотеки Sodium пишет книгу об ФРП . Будем надеяться, это как-то восполнит пробелы в данной области программирования и послужит популяризации прогрессивных подходов в наших закостенелых умах.

Поехали.

Реактивное программирование сперва звучит, как название зарождающейся парадигмы, но на самом деле, относится к методу программирования, в котором для работы с асинхронными потоками данных используется событийно-ориентированный подход. Основываясь на постоянно текущих данных, реактивные системы реагируют на них путем выполнения ряда событий.
Реактивное программирование следует шаблону проектирования “Наблюдатель”, который можно определить следующим образом: если в одном объекте происходит изменение состояния, то все прочие объекты оповещаются и обновляются соответствующим образом. Поэтому, вместо того, чтобы опрашивать события на предмет изменений, события пушатся асинхронно, чтобы наблюдатели могли их обработать. В этом примере, наблюдатели - функции, которые исполняются, когда событие отправлено. А упомянутый поток данных - фактический наблюдаемый.

Почти все языки и фреймворки используют этот подход в своей экосистеме, и последние версии Java - не исключение. В этой статье я объясню как можно применить реактивное программирование, используя последнюю версию JAX-RS в Java EE 8 и функционал Java 8.

Реактивный Манифест

В Реактивном Манифесте перечислены четыре фундаментальных аспекта, необходимых приложению, чтобы быть более гибким, слабо связанным и простым для масштабирования, а следовательно и способным быть реактивным. В нем говорится, что приложение должно быть отзывчивым, гибким (а значит и масштабируемым), устойчивым и message-driven.

Основополагающая цель - действительно отзывчивое приложение. Предположим, есть приложение, в котором обработкой запросов пользователей занимается один большой поток, и после выполнения работы этот поток отправляет ответы обратно оригинальным запрашивателям. Когда приложение получает больше запросов, чем может обработать, этот поток становится bottleneck’ом, и приложение теряет свою былую отзывчивость. Чтобы сохранить отзывчивость, приложение должно быть масштабируемым и устойчивым. Устойчивым можно считать приложение, в котором есть функционал для авто-восстановления. По опыту большинства разработчиков, только message-driven архитектура позволяет приложению быть масштабируемым, устойчивым и отзывчивым.

Реактивное программирование стало внедряться в версии Java 8 и Java EE 8. Язык Java представил такие понятия, как CompletionStage , и его реализацию CompletableFuture , а Java начал использовать эти функции в таких спецификациях, как Reactive Client API в JAX-RS.

JAX-RS 2.1 Reactive Client API

Посмотрим, как реактивное программирование может использоваться в приложениях Java EE 8. Чтобы разобраться в процессе, нужны базовые знания API Java EE.

JAX-RS 2.1 представил новый способ создания REST клиента с поддержкой реактивного программирования. Дефолтная реализация invoker, предлагаемая в JAX-RS - синхронная, это значит, что создаваемый клиент отправит блокирующий вызов точке назначения (endpoint) сервера. Пример реализации представлен в Listing 1.

Response response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .get();
Начиная с версии 2.0, JAX-RS предоставляет поддержку создания асинхронного invoker на клиентском API с помощью простого вызова метода async() , как показано в Listing 2.

Future response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .async() .get();
Использование асинхронного invoker на клиенте возвращает инстанс Future с типом javax.ws.rs.core.Response . Это может привести к опросу ответа, с вызовом future.get() , или регистрации колбека, который будет вызываться при наличии доступного HTTP ответа. Обе реализации подходят для асинхронного программирования, но все обычно усложняется, если вы хотите сгруппировать обратные вызовы или добавить условные кейсы в эти асинхронные минимумы выполнения.

JAX-RS 2.1 предоставляет реактивный способ преодоления этих проблем с новым JAX-RS Reactive Client API для сборки клиента. Это так же просто, как вызов rx() метода во время сборки клиента. В Listing 3 rx() метод возвращает реактивный invoker, который существует во время выполнения клиента, и клиент возвращает ответ с типом CompletionStage.rx() , который позволяет переход от синхронного invoker к асинхронному с помощью простого вызова.

CompletionStage response = ClientBuilder.newClient() .target("http://localhost:8080/service-url") .request() .rx() .get();
CompletionStage<Т> - новый интерфейс, введенный в Java 8. Он представляет вычисление, которое может являться этапом в рамках большего вычисления, как и следует из названия. Это единственный представитель реактивности Java 8, который попал в JAX-RS.
После получения инстанса ответа, я могу вызывать AcceptAsync() , где я могу предоставить фрагмент кода, который будет выполняться асинхронно, когда ответ станет доступным, как это показано в Listing 4.

Response.thenAcceptAsync(res -> { Temperature t = res.readEntity(Temperature.class); //do stuff with t });
Добавление реактивности в точку endpoint REST

Реактивный подход не ограничивается клиентской стороной в JAX-RS; его можно использовать и на стороне сервера. Для примера, сперва я создам простой сценарий, где смогу запросить список местоположений одной точки назначения. Для каждого положения я сделаю отдельный вызов с данными местоположения до другой точки, чтобы получить значения температуры. Взаимодействие точек назначения будет таким, как показано на Figure 1.

Figure 1. Взаимодействие между точками назначения

Сначала я просто определяю модель области определения, а затем сервисы для каждой модели. В Listing 5 показано, как определяется класс Forecast , который оборачивает классы Location и Temperature .

Public class Temperature { private Double temperature; private String scale; // getters & setters } public class Location { String name; public Location() {} public Location(String name) { this.name = name; } // getters & setters } public class Forecast { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature(final Temperature temperature) { this.temperature = temperature; return this; } // getters }
Для обертки списка прогнозов, класс ServiceResponse имплементирован в Listing 6.

Public class ServiceResponse { private long processingTime; private List forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List forecasts) { this.forecasts = forecasts; return this; } // getters }
LocationResource , показанный в Listing 7, определяет три образца местоположений, возвращаемых с путем /location .

@Path("/location") public class LocationResource { @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocations() { List locations = new ArrayList<>(); locations.add(new Location("London")); locations.add(new Location("Istanbul")); locations.add(new Location("Prague")); return Response.ok(new GenericEntity>(locations){}).build(); } }
TemperatureResource , показанный в Listing 8, возвращает случайно сгенерированное значение температуры между 30 и 50 для заданной локации. Задержка в 500 мс добавлена в имплементацию для симуляции считывания датчика.

@Path("/temperature") public class TemperatureResource { @GET @Path("/{city}") @Produces(MediaType.APPLICATION_JSON) public Response getAverageTemperature(@PathParam("city") String cityName) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { ignored.printStackTrace(); } return Response.ok(temperature).build(); } }
Сначала я покажу реализацию синхронного ForecastResource (смотрите Listing 9), который выдает все местоположения. Затем, для каждого положения он вызывает температурный сервис, чтобы получить значения в градусах по Цельсию.

@Path("/forecast") public class ForecastResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public Response getLocationsWithTemperature() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List locations = locationTarget .request() .get(new GenericType>(){}); locations.forEach(location -> { Temperature temperature = temperatureTarget .resolveTemplate("city", location.getName()) .request() .get(Temperature.class); response.getForecasts().add(new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return Response.ok(response).build(); } }
Когда точка назначения прогноза запрашивается как /forecast , вы получите вывод, похожий на тот, что указан в Listing 10. Обратите внимание, что время обработки запроса заняло 1.533 мс, что логично, так как синхронный запрос значений температуры из трех различных местоположений добавляет до 1.5 мс.

{ "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 33 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 38 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 46 } } ], "processingTime": 1533 }
Пока все идет по плану. Настало время ввести реактивное программирование на серверной стороне, где вызовы к каждой локации могут выполняться параллельно после получения всех местоположений. Это явно может улучшить синхронный поток, показанный ранее. Это выполняется в Listing 11, где показано определение реактивной версии сервиса прогнозов.

@Path("/reactiveForecast") public class ForecastReactiveResource { @Uri("location") private WebTarget locationTarget; @Uri("temperature/{city}") private WebTarget temperatureTarget; @GET @Produces(MediaType.APPLICATION_JSON) public void getLocationsWithTemperature(@Suspended final AsyncResponse async) { long startTime = System.currentTimeMillis(); // Создать этап (stage) для извлечения местоположений CompletionStage> locationCS = locationTarget.request() .rx() .get(new GenericType>() {}); // Создав отдельный этап на этапе местоположений, // описанном выше, собрать список прогнозов, //как в одном большом CompletionStage final CompletionStage> forecastCS = locationCS.thenCompose(locations -> { // Создать этап для получения прогнозов // как списка СompletionStage List> forecastList = // Стрим местоположений и обработка каждого // из них по отдельности locations.stream().map(location -> { // Создать этап для получения // значений температуры только одного города // по его названию final CompletionStage tempCS = temperatureTarget .resolveTemplate("city", location.getName()) .request() .rx() .get(Temperature.class); // Затем создать CompletableFuture, в котором // содержится инстанс прогноза // с местоположением и температурным значением return CompletableFuture.completedFuture(new Forecast(location)) .thenCombine(tempCS, Forecast::setTemperature); }).collect(Collectors.toList()); // Вернуть финальный инстанс CompletableFuture, // где все представленные объекты completable future // завершены return CompletableFuture.allOf(forecastList.toArray(new CompletableFuture)) .thenApply(v -> forecastList.stream() .map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join) .collect(Collectors.toList())); }); // Создать инстанс ServiceResponse, // в котором содержится полный список прогнозов // вместе со временем обработки. // Создать его future и объединить с // forecastCS, чтобы получить прогнозы // и вставить в ответ сервиса CompletableFuture.completedFuture(new ServiceResponse()) .thenCombine(forecastCS, ServiceResponse::forecasts) .whenCompleteAsync((response, throwable) -> { response.setProcessingTime(System.currentTimeMillis() - startTime); async.resume(response); }); } }
Реактивная реализация может показаться сложной, на первый взгляд, но после более внимательного изучения вы заметите, что она довольно проста. В реализации ForecastReactiveResource я сначала создаю клиентский вызов на сервисы местоположений с помощью JAX-RS Reactive Client API. Как я упоминал выше, это дополнение для Java EE 8, и оно помогает создавать реактивный вызов просто с помощью метода rx() .

Теперь я создаю новый этап на основе местоположения, чтобы собрать список прогнозов. Они будут храниться, как список прогнозов, в одном большом completion stage, названном forecastCS . В конечном итоге, я создам ответ вызова сервиса, используя только forecastCS .

А теперь, соберем прогнозы в виде списка completion stage’eй, определенных в переменной forecastList . Чтобы создать completion stage для каждого прогноза, я передаю данные по местоположениям, а затем создаю переменную tempCS , снова используя JAX-RS Reactive Client API, который вызывает сервис температуры с названием города. Здесь для сборки клиента я использую метод resolveTemplate() , и это позволяет мне передавать название города сборщику в качестве параметра.

В качестве последнего шага потоковой передачи, я совершаю вызов CompletableFuture.completedFuture() , передавая новый инстанс Forecast в качестве параметра. Я объединяю этот future с tempCS этапом, чтобы у меня было значение температуры для проитерированных локаций.

Метод CompletableFuture.allOf() в Listing 11 преобразует список completion stage’ей в forecastCS . Выполнение этого шага возвращает большой инстанс completable future, когда все предоставленные объекты completable future завершены.

Ответ сервиса - инстанс класса ServiceResponse , поэтому я создаю completed future, а затем объединяю forecastCS completion stage со списком прогнозов и вычисляю время отклика сервиса.

Конечно, реактивное программирование заставляет только серверную сторону выполняться асинхронно; клиентская сторона будет заблокирована до тех пор, пока сервер не отправит ответ обратно запрашивающему. Чтобы преодолеть эту проблему, Server Sent Events (SSEs) может быть использован для частичной отправки ответа, как только он окажется доступен, чтобы температурные значения для каждой локации передавались клиенту одно за другим. Вывод ForecastReactiveResource будет похож на тот, что представлен в Listing 12. Как показано в выводе, время обработки составляет 515 мс, что является идеальным временем выполнения для получения температурных значений из одной локации.

{ "forecasts": [ { "location": { "name": "London" }, "temperature": { "scale": "Celsius", "temperature": 49 } }, { "location": { "name": "Istanbul" }, "temperature": { "scale": "Celsius", "temperature": 32 } }, { "location": { "name": "Prague" }, "temperature": { "scale": "Celsius", "temperature": 45 } } ], "processingTime": 515 }
Вывод

В примерах этой статьи, я сначала показал синхронный способ получения прогнозов с помощью сервисов местоположения и температуры. Затем, я перешел к реактивному подходу для того, чтобы асинхронная обработка выполнялась между вызовами сервиса. Когда вы используете JAX-RS Reactive Client API в Java EE 8 вместе с классами CompletionStage и CompletableFuture , доступными в Java 8, сила асинхронной обработки вырывается на волю, благодаря реактивному программированию.

Реактивное программирование - это больше, чем просто реализация асинхронной модели из синхронной; оно также упрощает работу с такими концепциями, как nesting stage. Чем больше оно используется, тем проще будет управлять сложными сценариями в параллельном программировании.

Спасибо за внимание. Как всегда ждём ваши комментарии и вопросы.

Вы можете помочь и перевести немного средств на развитие сайта

Мир ООП-разработки вообще и язык Java в частности живут очень активной жизнью. Тут есть свои модные тенденции, и сегодня разберем один из главных трендов сезона - фреймворк ReactiveX. Если ты еще в стороне от этой волны - обещаю, она тебе понравится! Это точно лучше, чем джинсы с завышенной талией:).

Реактивное программирование

Как только ООП-языки доросли до массового применения, разработчики осознали, насколько иногда не хватает возможностей С-подобных языков. Поскольку написание кода в стиле функционального программирования серьезно разрушает качество ООП-кода, а значит, и поддерживаемость проекта, был придуман гибрид - реактивное программирование .

Парадигма реактивной разработки строится на идее постоянного отслеживания изменений состояния объекта. Если такие изменения произошли, то все заинтересованные объекты должны получить уже обновленные данные и работать только с ними, забыв про старые.

Хорошим примером идеи реактивного программирования может служить Excel-таблица. Если связать несколько ячеек одной формулой, результат вычисления будет меняться каждый раз, когда изменятся данные в этих ячейках. Для бухгалтерии такое динамическое изменение данных - привычное дело, но для программистов это скорее исключение.

A=3; b=4; c=a + b; F1(c); a=1; F2(c);

В этом примере функции F1 и F2 будут работать с разными значениями переменной C. Часто требуется, чтобы у обеих функций были только самые актуальные данные, - реактивное программирование позволит без изменения логики самих функций сразу же вызвать F1 с новыми параметрами. Такое построение кода дает приложению возможность моментально реагировать на любые изменения, что сделает его быстрым, гибким и отзывчивым.

ReactiveX

Воплощать с нуля идеи реактивного программирования может быть довольно хлопотно - есть подводные камни, да и времени это займет прилично. Поэтому для многих разработчиков эта парадигма оставалась только теоретическим материалом, пока не появился ReactiveX.

Фреймворк ReactiveX - это инструмент для реактивного программирования, работающий со всеми популярными ООП-языками. Сами создатели называют его мультиплатформенным API для асинхронной разработки, основанным на паттерне «Наблюдатель» (Observer).

Если термин «реактивное программирование» - это своего рода теоретическая модель, то паттерн «Наблюдатель» - готовый механизм отслеживания изменений в программе. А отслеживать их приходится довольно часто: загрузку и обновление данных, оповещения о событиях и так далее.

Паттерн «Наблюдатель» существует примерно столько же, сколько и само ООП. Объект, состояние которого может поменяться, называется издателем (популярный перевод термина Observable). Все остальные участники, которым интересны эти изменения, - подписчики (Observer, Subscriber). Для получения уведомлений подписчики регистрируются у издателя, явно указывая свой идентификатор. Издатель время от времени генерирует уведомления, которые им же рассылаются по списку зарегистрированных подписчиков.

Собственно, создатели ReactiveX не придумали ничего революционного, они просто удобно реализовали паттерн. И хотя во многих ООП-языках, и в Java в частности, есть готовые реализации паттерна, в этом фреймворке присутствует дополнительный «тюнинг», который превращает «Наблюдатель» в очень мощный инструмент.

RxAndroid

Порт библиотеки ReactiveX для мира Android называется rxAndroid и подключается, как всегда, через Gradle.

Compile "io.reactivex:rxandroid:1.1.0"

Издатель, генерирующий уведомления, здесь задается с помощью класса Observable. У издателя может быть несколько подписчиков, для их реализации воспользуемся классом Subscriber. Стандартное поведение для Observable - выпустить одно или несколько сообщений для подписчиков, а затем завершить свою работу или выдать сообщение об ошибке. В качестве сообщений могут быть как переменные, так и целые объекты.

Rx.Observable myObserv = rx.Observable.create(new rx.Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext("Hello"); subscriber.onNext("world"); subscriber.onCompleted(); } });

В данном случае издатель myObserv сначала отправит строки hello и message, а затем сообщение об успешном завершении работы. Издатель может вызвать методы onNext() , onCompleted() и onEror() , поэтому у подписчиков они должны быть определены.

Subscriber mySub = new Subscriber() {... @Override public void onNext(String value) {Log.e("got data", " " + value);} };

Все готово для работы. Осталось связать объекты между собой - и «Hello, world!» в реактивном программировании готов!

MyObserv.subscribe(mySub);

Надо сказать, что это был очень простой пример. В ReactiveX есть множество вариантов поведения всех участников паттерна: фильтрация, группирование, обработка ошибок. Пользу от реактивного программирования можно ощутить, только попробовав его в деле. Приступим к задаче посерьезнее.

Продолжение доступно только участникам

Вариант 1. Присоединись к сообществу «сайт», чтобы читать все материалы на сайте

Членство в сообществе в течение указанного срока откроет тебе доступ ко ВСЕМ материалам «Хакера», увеличит личную накопительную скидку и позволит накапливать профессиональный рейтинг Xakep Score!