imog

主にUnityとかの事を書いています

RxJavaリアクティブプログラミング読んでる 1

3月からAndroidやることになったので、先週くらいからRxJavaリアクティブプログラミングを電車の中でちまちま読んでいる

https://www.amazon.co.jp/dp/4798149519

ReactiveExtensionsはUniRxで触ってたのでめちゃくちゃ詰まるということはないが、知らないことが色々書いてたのでログを残しとこうと思った。 間違いも結構あると思いますがご容赦ください。ツッコミはいただけると嬉しいです。

ReactiveStream

そもそもReactiveExtensionsではなく、ReactiveStreamの話をメインにしている。

http://www.reactive-streams.org/

.NETのReactiveExtensionsが非常に便利で様々な言語に移植されていき、Rxをベースとしたさまざまなフレームワークやライブラリが生まれてきたけど、 これらは内部実装が様々で、データストリームを扱うという目的は同じながら、使うライブラリで実装方法を変えねばならぬという問題が起きていたらしい。 そこで、データストリームの非同期通信の仕組みの実装方法をある程度共通化しましょうということでReactiveStreamというインタフェースを定めたらしい。

そして、ReactiveStreamのJava版がReactiveStreamJVM

https://github.com/reactive-streams/reactive-streams-jvm

4ファイルしかなくて、本当に最低限のインタフェースを設けてるだけだった。

RxJava 2.xは上記のインタフェースを実装している。

FlowableとSubscriber

この本ではストリーム流す側と購読する側を「生産者」「消費者」と読んでる。

RxJava1.xやUniRxの時は Observable が生産者で Observer が消費者。

RxJava2.xだと FlowableSubscriber がそれにあたる。

基本的には同じだったけど、大きな違いとして消費者側が購読のハンドリングができるかどうかと感じた。 前者だと、subscribeしたときにどのタイミングで止めるかというのは消費者側は操作しにくかった。

Take(n)などのオペレータで管理はできるけど、それは生産者側の処理なのでちょっと違う。 Disposeを呼び出して止めるはできたけど、それはストリーム外からの操作になるのでちょっと怖いという感じだった。 なので、購読を始めると生産者が流すのを終えるまで購読をやめない!となる。

しかし、ReactiveStreamはSubscriptionインタフェースを提供している。

https://github.com/reactive-streams/reactive-streams-jvm/blob/master/api/src/main/java/org/reactivestreams/Subscription.java

Subscriptionは生産者と消費者の間に立つインタフェースで、生産者にデータをリクエストする request と購読を解除する cancel を持っている。

これらをSubScriberが呼び出すことで、途中でも消費者側の都合で中止できる。これは良いものだと思う。ストリーム内で完結させやすくなりそう。

そういった理由か、2.x以降に実装されたsubscribeはIDisposableを返さなくなってる。 だけど、1.x時代のIDisposableを返すsubScribeは引き続き残っているみたい。

バックプレッシャー

本書で頻繁に出てくる言葉。かなり理解が怪しい。

消費者が通知を受け取れない状況のときにデータをどうするかという設定をバックプレッシャーと呼ぶみたい。 通知ができるまで貯めておくか、通知ができるまで破棄していくかなど。

受け取れない状態というのは例えば非同期で生産者が怒涛の勢いでデータを流してきてさばけないなど。

そういう意味だと非同期の時以外気にしない設定じゃないかなあと思ってる。

この辺、はじめの方からカジュアルに単語が出てきてるけど詳細はChapter3なのでまた後に書くことになりそう。

ちなみにやっとChapter1が終わりました。先は長い。