Android アドベントカレンダー 17日目の記事です。
qiita.com
Rxは非常に便利なライブラリです。特にオペレータ群はとても強力で、filterやmapやflatmapなど使っておけば割となんとかなります。
が、それ故に雰囲気で書けるとこもあり、「オペレータって具体的にどんな感じで動いてるの?」となることもあります。僕も雰囲気で書いてます。
今回は、実際にオリジナルのオペレータを作ることで、中で何が起きているのかをざっくりと見てみましょう。
事前準備
RxJavaリポジトリをcloneしておきましょう。
今回はforkして手元に持ってきました。
github.com
作るオペレータ
なんでも良いですが、とりあえず2種類作ってみましょう。
本来はFlowable、Single、Maybe、CompletableなどそれぞれのPublisherに応じたオペレータを作る必要がありますが、数が多いので一旦Flowableに対応したものだけにします。
Imo ファクトリメソッド
justやtimerなど、ストリームの最上流にいるメソッドです。
彼らは実際にデータを生成してSubscriberに流すのがお仕事です。
今回は、imoオペレータを作ります。役割は以下です
需要は高そうですね。
Println オペレータ
こちらはfilterやmapなど、最上流ではなく上から流れてきたデータをごにょごにょするオペレータです。
役割は以下。
- 上流から流れてきたデータをSystem.out.printlnで出力する
これら二つを作成すると、最終的に以下のようなコードが書けるようになります。
Flowable.imo().println().subscribe();
Imoオペレータを作る
FlowableImo class
RxJava2において、全てのオペレータは独立したクラスになっています。
例えばFlowable.justの中身は以下です。
https://github.com/adarapata/RxJava/blob/d3455d0c9d57d522c31b5c25af83e8f2b8df12b6/src/main/java/io/reactivex/internal/operators/flowable/FlowableJust.java#L26
コンストラクタで値をもらって、購読時はSubscriptionにデータを包んでSubscriberに渡します。
そのままだと利用者側はメソッドチェーンで呼びづらいので、Flowableのメソッドで呼べるようにいい感じにラップしてるようです。
https://github.com/adarapata/RxJava/blob/6a44e5d0543a48f1c378dc833a155f3f71333bc2/src/main/java/io/reactivex/Flowable.java#L2489
なので、新規にオペレータクラスを作るなら2つの実装が必要です。
- オペレータクラスの作成
- Flowableにstaticなメソッドを定義する
なのでまずはjustに習ってFlowableImoクラスを定義します。
public final class FlowableImo extends Flowable<String> {
public FlowableImo() {
}
@Override
protected void subscribeActual(Subscriber<? super String> s) {
s.onSubscribe(new ScalarSubscription<String>(s, "imo"));
}
}
imo
の決め打ちになるので引数はありません。subscribeActualでは購読処理のためにSubscriptionをSubscriberに投げています。
継承元であるFlowableがSubscriberインタフェースを実装しているので、subscribeメソッドは実装済みですが、こちらは分岐させたりなどして実際にonSubscribeするところまで至ってないのでActualという意味で分けてるっぽいです。
https://github.com/adarapata/RxJava/blob/6a44e5d0543a48f1c378dc833a155f3f71333bc2/src/main/java/io/reactivex/Flowable.java#L13026
subscribeActualが呼ばれるのは、Flowable内で適切なSubscriberを生成した後のようですね。
https://github.com/adarapata/RxJava/blob/6a44e5d0543a48f1c378dc833a155f3f71333bc2/src/main/java/io/reactivex/Flowable.java#L13082
実際Subscribe呼ぶときはonNextだけのConsumerを渡したり、Subscriberを直渡ししたりだとバリエーションが多いので、それらを吸収するための措置っぽい。
定義し終わったら、Flowableから呼べるようにstaticなメソッドを追加します。
public abstract class Flowable<T> implements Publisher<T> {
~~~~~~~~~~~~~~~~~~~
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static Flowable<String> imo() {
return RxJavaPlugins.onAssembly(new FlowableImo());
}
}
back pressure対応やスケジューラなどの設定アノテーションが付いていますが、長くなりそうなので今回は気にしないことにします。
やっていることは、FlowableImoを生成して返すだけです。その際に、RxJavaPluginsでFlowable生成時のフック処理を書いていた場合、それも引っ付けて返すために RxJavaPlugins.onAssembly
を呼んでいます。
Printlnオペレータを作る
次は流れてきた値をprintlnするオペレータを定義しましょう。
public final class FlowablePrintln<T> extends AbstractFlowableWithUpstream<T, T> {
public FlowablePrintln(Flowable<T> source) {
super(source);
}
@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new PrintlnSubscriber<T>(s));
}
static final class PrintlnSubscriber<T> implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super T> actual;
Subscription s;
PrintlnSubscriber(Subscriber<? super T> actual) {
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
System.out.println(t);
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
}
}
imoオペレータといくつか違う点があります。
AbstractFlowableWithUpstream
は上流のFlowableを扱うための抽象クラスです。
https://github.com/adarapata/RxJava/blob/a00ea07a4d2ce409e8dbea66ddbca9c0a77ddab6/src/main/java/io/reactivex/internal/operators/flowable/AbstractFlowableWithUpstream.java#L29
HasUpstreamPublisher
インタフェースを持つことで、自分より上流にPublisherがいることを保証しています。
この実装により、一個前のFlowableを保持させて、メソッドチェーンでオペレータを繋げられるようになっています。
FlowableImoはファクトリメソッドのため上流がありませんでしたが、printlnオペレータは必ず上流が存在するので継承が必要です。
また、インナークラスとして PrintlnSubscriber
が存在しています。
このSubscriberがオペレータのコアの機能で、onNextでデータを送るときにごにょごにょする部分です。
今回はonNextでprintlnして、データには変更を加えずそのまま流しています。
あとはファクトリメソッドと同じようにFlowableにメソッドを定義します。
public abstract class Flowable<T> implements Publisher<T> {
~~~~~~~~~~~~~~~~~~~
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> println() {
return RxJavaPlugins.onAssembly(new FlowablePrintln<T>(this));
}
}
以上で、自作オペレータが実装できるようになりました。
Flowable.imo().println().subscribe();
できあがりはこちらです
見やすいようにPRにしてみました
github.com
所感
かなり端折りましたが、オペレータがどんな挙動をしているのかはざっくりわかりました。
ただ、SubscriberがSubscriberとSubscriptionの両方の役割を担っているので、そこのコードを追うのがめちゃめちゃ大変でした。
この辺りは別のタイミングで書こうかと思います。