RxJava2でオペレータを自作してみる
Android アドベントカレンダー 17日目の記事です。
Rxは非常に便利なライブラリです。特にオペレータ群はとても強力で、filterやmapやflatmapなど使っておけば割となんとかなります。 が、それ故に雰囲気で書けるとこもあり、「オペレータって具体的にどんな感じで動いてるの?」となることもあります。僕も雰囲気で書いてます。
今回は、実際にオリジナルのオペレータを作ることで、中で何が起きているのかをざっくりと見てみましょう。
事前準備
RxJavaリポジトリをcloneしておきましょう。
今回はforkして手元に持ってきました。
作るオペレータ
なんでも良いですが、とりあえず2種類作ってみましょう。
本来はFlowable、Single、Maybe、CompletableなどそれぞれのPublisherに応じたオペレータを作る必要がありますが、数が多いので一旦Flowableに対応したものだけにします。
Imo ファクトリメソッド
justやtimerなど、ストリームの最上流にいるメソッドです。 彼らは実際にデータを生成してSubscriberに流すのがお仕事です。
今回は、imoオペレータを作ります。役割は以下です
- "imo" という文字列を生成し流す
需要は高そうですね。
Println オペレータ
こちらはfilterやmapなど、最上流ではなく上から流れてきたデータをごにょごにょするオペレータです。
役割は以下。
- 上流から流れてきたデータをSystem.out.printlnで出力する
これら二つを作成すると、最終的に以下のようなコードが書けるようになります。
Flowable.imo().println().subscribe();
// => imo とログが吐かれるだけ
Imoオペレータを作る
FlowableImo class
RxJava2において、全てのオペレータは独立したクラスになっています。 例えばFlowable.justの中身は以下です。
コンストラクタで値をもらって、購読時はSubscriptionにデータを包んでSubscriberに渡します。
そのままだと利用者側はメソッドチェーンで呼びづらいので、Flowableのメソッドで呼べるようにいい感じにラップしてるようです。
なので、新規にオペレータクラスを作るなら2つの実装が必要です。
- オペレータクラスの作成
- Flowableにstaticなメソッドを定義する
なのでまずはjustに習ってFlowableImoクラスを定義します。
public final class FlowableImo extends Flowable<String> { public FlowableImo() { } @Override protected void subscribeActual(Subscriber<? super String> s) { s.onSubscribe(new ScalarSubscription<String>(s, "imo")); } }
imo
の決め打ちになるので引数はありません。subscribeActualでは購読処理のためにSubscriptionをSubscriberに投げています。
継承元であるFlowableがSubscriberインタフェースを実装しているので、subscribeメソッドは実装済みですが、こちらは分岐させたりなどして実際にonSubscribeするところまで至ってないのでActualという意味で分けてるっぽいです。
subscribeActualが呼ばれるのは、Flowable内で適切なSubscriberを生成した後のようですね。
実際Subscribe呼ぶときはonNextだけのConsumerを渡したり、Subscriberを直渡ししたりだとバリエーションが多いので、それらを吸収するための措置っぽい。
定義し終わったら、Flowableから呼べるようにstaticなメソッドを追加します。
public abstract class Flowable<T> implements Publisher<T> { ~~~~~~~~~~~~~~~~~~~ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public static Flowable<String> imo() { return RxJavaPlugins.onAssembly(new FlowableImo()); } }
back pressure対応やスケジューラなどの設定アノテーションが付いていますが、長くなりそうなので今回は気にしないことにします。
やっていることは、FlowableImoを生成して返すだけです。その際に、RxJavaPluginsでFlowable生成時のフック処理を書いていた場合、それも引っ付けて返すために RxJavaPlugins.onAssembly
を呼んでいます。
Printlnオペレータを作る
次は流れてきた値をprintlnするオペレータを定義しましょう。
public final class FlowablePrintln<T> extends AbstractFlowableWithUpstream<T, T> { public FlowablePrintln(Flowable<T> source) { super(source); } @Override protected void subscribeActual(Subscriber<? super T> s) { source.subscribe(new PrintlnSubscriber<T>(s)); } static final class PrintlnSubscriber<T> implements FlowableSubscriber<T>, Subscription { final Subscriber<? super T> actual; Subscription s; PrintlnSubscriber(Subscriber<? super T> actual) { this.actual = actual; } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.s, s)) { this.s = s; actual.onSubscribe(this); } } @Override public void onNext(T t) { System.out.println(t); // ここがplintlnオペレータのやりたいこと actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void request(long n) { s.request(n); } @Override public void cancel() { s.cancel(); } } }
imoオペレータといくつか違う点があります。
AbstractFlowableWithUpstream
は上流のFlowableを扱うための抽象クラスです。
HasUpstreamPublisher
インタフェースを持つことで、自分より上流にPublisherがいることを保証しています。
この実装により、一個前のFlowableを保持させて、メソッドチェーンでオペレータを繋げられるようになっています。 FlowableImoはファクトリメソッドのため上流がありませんでしたが、printlnオペレータは必ず上流が存在するので継承が必要です。
また、インナークラスとして PrintlnSubscriber
が存在しています。
このSubscriberがオペレータのコアの機能で、onNextでデータを送るときにごにょごにょする部分です。
今回はonNextでprintlnして、データには変更を加えずそのまま流しています。
あとはファクトリメソッドと同じようにFlowableにメソッドを定義します。
public abstract class Flowable<T> implements Publisher<T> { ~~~~~~~~~~~~~~~~~~~ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable<T> println() { return RxJavaPlugins.onAssembly(new FlowablePrintln<T>(this)); } }
以上で、自作オペレータが実装できるようになりました。
Flowable.imo().println().subscribe();
// => imo とログが吐かれるだけ
できあがりはこちらです
見やすいようにPRにしてみました
所感
かなり端折りましたが、オペレータがどんな挙動をしているのかはざっくりわかりました。
ただ、SubscriberがSubscriberとSubscriptionの両方の役割を担っているので、そこのコードを追うのがめちゃめちゃ大変でした。 この辺りは別のタイミングで書こうかと思います。