понедельник, 5 апреля 2010 г.

Reactive Framework (Rx)

Меня всегда восхищают решения с одной стороны простые и элегантные, а с другой - на удивление эффективные и удобные. Примером таких решений может служить LINQ – это удивительная штука. Внеся всего несколько изменений в синтаксис языка – анонимные типы, расширяющие методы, вывод типов (var), инициализаторы и лямбды (причем все это, кроме разве что анонимных типов – чистой воды синтаксический сахар), мы получили удивительно удобную конструкцию для работы с коллекциями, причем даже с уже давно написанными, и сейчас уже сложно представить приложение без элементов LINQ-а, настолько он получился удачным.


И вот, один из основных авторов линка Эрик Мейер (Erik Mejier) снова радует нас очередным фантастическим решением, на этот раз у него дошли руки до событий и прочей асинхронности. Его подход, как мы убедимся, элегантен, прост, требует минимальных изменений в базовой библиотеке и вместе с тем уже сейчас видно, что он окажется на удивление удачным.

Суть предложенного им решения лежит на поверхности – представить события в виде последовательностей данных, поступающих из какого-либо источника. Например, по выражению самого Эрика «мышка – это огромная база данных с миллионами записей координат положения мыши и всяких кликов». А раз это коллекция данных, значит можно применить LINQ запросы по этим данным. =) Собственно, альтернативное название этого подхода LINQ 2 Events, но на мой взгляд оно не очень удачное так как сама технология много шире.

Сам LINQ2Object, работает поверх пары интерфейсов IEnumerable/IEnumerator, которые отлично справляется со своей задачей, но вот здесь-то на пути представления событий в виде набора данных, есть одна досадная помеха – идеологически эти интерфейсы являются интерактивными (interactive). Это означает, что потребитель сам управляет получением новой порции объектов – до тех пор, пока потребитель не вызвал метод MoveNext(), источник данных никак не может впихнуть в потребителя новую порцию информации. В то время как для работы с событиями нужно обратное, чтобы потребитель реагировал на появление новых данных в источнике – следовательно, для решения данной проблемы нужен некий реактивный (Reactive) аналог такого же механизма. То есть, такая конструкция потребителя данных, которая не управляла бы источником, а наоборот – реагировала на его события, или другими словами, сама управлялась им. А так как во всем остальном пара интерфейсов IEnumerable/IEnumerator вполне хороша, то, оцените изящество решения: достаточно эту пару интерфейсов вывернуть наизнанку, чтобы получить требуемый эффект. :) Так и родилась пара других интерфейсов - IObserver/IObservable.

Если интересно как это происходило в подробностях, то очень рекомендую посмотреть одну из видеозаписей Эрика или его коллег про Rx, это действительно интересно. =)

http://channel9.msdn.com/shows/Going+Deep/E2E-Erik-Meijer-and-Wes-Dyer-Reactive-Framework-Rx-Under-the-Hood-1-of-2/

http://channel9.msdn.com/shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx/

http://channel9.msdn.com/posts/Charles/Erik-Meijer-Rx-in-15-Minutes/

Здесь же я просто в сжатом виде опишу что куда переехало.

Так как изменилось только направление вызовов, то действовать можно чисто механически. Интерфейс IEnumerable имеет единственный метод GetEnumerator(), который возвращает IEnumerator. В случае же IObservable, будет тоже один метод Subscribe, который однако, так как связь развернута в другую сторону, возвращать не будет ничего (точнее возвращать будет IDisposable), а вот в качестве параметра будет принимать экземпляр IObserver. Тут все просто и прозрачно...

Далее, интерфейс IEnumerator оборудован методом MoveNext(), который возвращает false, если добрались до конца списка или выкидывает исключение, если случилась какая непредвиденная фигня. Значит интерфейс IObserver должен быть оборудован методами OnCompleted() и OnError(Exception e), чтобы определить, что делать, если данные в источнике закончились или случилась какая неожиданность. И, наконец, в IEnumerator-е есть свойство T Current, возвращающее текущий элемент, значит IObserver должен быть метод void OnNext(T), посредством которого источник будет поставлять очередной элемент.

Таким образом, на глазах изумленной публики, вывернув наизнанку интерфейсы IEnumerable/IEnumerator, мы получили IObservable/IObserver, что является ни чем иным, как реализацией паттерна Observer (обозреватель, он же публикатор/подписчик). Да, если кто не в курсе, то IEnumerable/IEnumerator – это паттерн Iterator. Все вышеизложенное подводит нас к мысли о том, что паттерны Iterator и Observer – близнецы-братья, один является зеркальным отражением другого, этаким альтер эго. Причем вся красота этого решения в том, что один паттерн из другого получается чисто механически, вызовы просто разворачиваются в другую сторону и никаких специальных случаев обрабатывать не надо! Эрик еще ехидничал, что не понимает, как эти ребята из GoF, описавшие еще пятнадцать лет назад оба этих паттерна, могли не заметить такого вопиющего дуализма. =))

Ну чтож, это все действительно красиво и элегантно, очевидно, что навернуть поверх этих двух интерфейсов набор линковских методов расширений – дело техники. Но какая с этого польза?

Однако практический эффект от этого решения сложно переоценить. Глобально, не считая множества приятных мелочей, есть два положительных момента.

  1. Мы получили способ комбинирования событий. В реальных приложениях довольно частно нужно иметь обработчик на комбинацию событий, причем не просто на комбинацию, а на комбинацию со сложными условиями, например, когда одно событие возникает только после другого и если этому не предшествовало третье. С помощю Rx, такая задача решается одним запросом.
  2. Но самое крутое, что мы получили единый (!) механизм работы не только с событиями, но и со всеми асинхронными операциями. То есть, все события, асинхронные паттерны, TPL и любая другая параллельно-асинхронная конструкция – отлично покрываются этой моделью и все это тоже можно комбинировать между собой линковскими запросами посредством Rx.

Причем, всего этого удалось добиться лишь внеся интерфейсы IObservable/IObserver в базовую библиотеку (.Net 4 уже будет поставляться с этими интерфейсами), ну и, естественно, предоставив рядышком еще одну библиотеку (собственно Reactive Framework) с реализацией LINQ-а и некоторыми другими хелперными методами, о которых мы поговорим ниже. Как видно, здесь даже не пришлось трогать язык, просто нет необходимости... :) Более того, когда только Эрик впервые обнародовал саму идею, тут же нашлись энтузиасты, которые реализовали свою версию этих интерфейсов и необходимых методов в своих проектах, и убедились, что все отлично работает.

Что же входит непосредственно в Reactive Framework? Кроме LINQ-овских расширений для построения запросов, там присутствует большое количество хелперных методов, помогающих построить реализацию описанных выше интерфейсов. Причем не только с нуля, но и на основе уже имеющихся событий, асинхронных вызовов BeginXxx/EndXxx (APM-паттерн), временных интервалов, обычных коллекций, генераторов и байт знает чего еще... Таким образом, как только выйдет Rx, его можно будет сразу же использовать для работы с уже имеющимся сейчас кодом.

Кроме того, в Rx входит несколько дополнительных методов для работы с коллекциями и событиями (они разнесены по разным сборкам Interactive и Reactive, соответственно), которых нет в обычном LINQ-е, но к которым давно привыкли в функциональных языках.

Самое время для примера?

Допустим, у нас есть задача - обратиться по нескольким веб-адресам (несколько раз, для надежности и естественно асинхронно, чтобы не терять время попусту) и определить, отвечают ли они. И если ни разу добиться ответа не удалось – вывести этот провинившийся url.

Тогда решение может быть таким - пусть у нас есть коллекция подозрительных url-ов.


string[] urls = { "http://dfsdfsdf.rt", "http://rsdn.ru", "http://google.com", "http://rsdn.ru/qweqwe", "http://rbc.ru" };

Для каждого url-а нам нужно создать объект выполняющий запрос


urls.Select(url => new { Url = url, WebRequest = WebRequest.Create(url) })

И подписаться на события получения такого объекта, предварительно сконвертировав это дело в Observable коллекцию


.ToObservable().Subscribe(

В подписчике же начинается самое интересное... Нам нужно три раза с интервалом в 5 секунд, асинхронно обратиться по переданному адресу. Здесь на помощь приходит метод Zip, который комбинирует две коллекции таким образом, что их элементы чередуются. Начнем с коллекции временных интервалов, здесь события случаются строго через 5 секунд.


Observable.Interval(TimeSpan.FromSeconds(5)).Zip(

Теперь к этой цепочке событий надо призиповать асинхронные обращения по адресу.


Observable.FromAsyncPattern<WebResponse>(
                            requestInfo.WebRequest.BeginGetResponse, requestInfo.WebRequest.EndGetResponse)()

Таким не хитрым способом, мы получили из классического APM-паттерна, Observable коллекцию. Только из этой коллекции может прилететь исключение, а надо бы все обработать единым образом, и здесь придет на помощь функция Materialize, которая обернет результат вызова в специальный объект, а повторить вызов трижды, что нужно по условиям задачи, поможет метод Repeat.


.Materialize().Repeat(3),

Результатом объединения будет только вторая коллекция, так как временной интервал нас не интересует. При этом все три вызова надо объеденить в один буферный объект, чтобы потом проанализировать.


(l, r) => r).BufferWithCount(3)

Теперь осталось только найти такие последовательности запросов, которые все завершились неудачей...


.Where(
                            notifications =>
                            notifications.All(notification => notification.Kind == NotificationKind.OnError))

И вывести результат на консоль.


.Subscribe(n => Console.WriteLine(requestInfo.Url)));

Полный запрос решающий поставленную задачу может выглядеть так:


string[] urls = { "http://dfsdfsdf.rt", "http://rsdn.ru", "http://google.com", "http://rsdn.ru/qweqwe", "http://rbc.ru" };
 
urls.Select(url => new {Url = url, WebRequest = WebRequest.Create(url)})
    .ToObservable().Subscribe(
    requestInfo =>
         Observable.Interval(TimeSpan.FromSeconds(5)).Zip(
             Observable.FromAsyncPattern<WebResponse>(
requestInfo.WebRequest.BeginGetResponse, 
requestInfo.WebRequest.EndGetResponse)().Materialize().Repeat(3),
         (l, r) => r).BufferWithCount(3)
    .Where(notifications =>
           notifications.All(notification => notification.Kind == NotificationKind.OnError))
    .Subscribe(n => Console.WriteLine(requestInfo.Url)));

В заключение хотелось бы заметить, что с появлением Rx мы получили возможность избавиться от мешанины разбросанных по коду обработчиков событий, подписки на них и головной боли по их освобождению, вместо этого асинхронный код теперь можно писать практически «линейно», почти так же как обычный синхронный.

Комментариев нет:

Отправить комментарий