imog

主にUnityとかの事を書いています

RxJava2でオペレータを自作してみる

Android アドベントカレンダー 17日目の記事です。

qiita.com

Rxは非常に便利なライブラリです。特にオペレータ群はとても強力で、filterやmapやflatmapなど使っておけば割となんとかなります。 が、それ故に雰囲気で書けるとこもあり、「オペレータって具体的にどんな感じで動いてるの?」となることもあります。僕も雰囲気で書いてます。

今回は、実際にオリジナルのオペレータを作ることで、中で何が起きているのかをざっくりと見てみましょう。

事前準備

RxJavaリポジトリをcloneしておきましょう。

今回はforkして手元に持ってきました。

github.com

作るオペレータ

なんでも良いですが、とりあえず2種類作ってみましょう。

本来はFlowable、Single、Maybe、CompletableなどそれぞれのPublisherに応じたオペレータを作る必要がありますが、数が多いので一旦Flowableに対応したものだけにします。

Imo ファクトリメソッド

justやtimerなど、ストリームの最上流にいるメソッドです。 彼らは実際にデータを生成してSubscriberに流すのがお仕事です。

今回は、imoオペレータを作ります。役割は以下です

  • "imo" という文字列を生成し流す

需要は高そうですね。

Println オペレータ

こちらはfilterやmapなど、最上流ではなく上から流れてきたデータをごにょごにょするオペレータです。

役割は以下。

  • 上流から流れてきたデータをSystem.out.printlnで出力する

これら二つを作成すると、最終的に以下のようなコードが書けるようになります。

Flowable.imo().println().subscribe();
// => imo とログが吐かれるだけ

Imoオペレータを作る

FlowableImo class

RxJava2において、全てのオペレータは独立したクラスになっています。 例えばFlowable.justの中身は以下です。

https://github.com/adarapata/RxJava/blob/d3455d0c9d57d522c31b5c25af83e8f2b8df12b6/src/main/java/io/reactivex/internal/operators/flowable/FlowableJust.java#L26

コンストラクタで値をもらって、購読時はSubscriptionにデータを包んでSubscriberに渡します。

そのままだと利用者側はメソッドチェーンで呼びづらいので、Flowableのメソッドで呼べるようにいい感じにラップしてるようです。

https://github.com/adarapata/RxJava/blob/6a44e5d0543a48f1c378dc833a155f3f71333bc2/src/main/java/io/reactivex/Flowable.java#L2489

なので、新規にオペレータクラスを作るなら2つの実装が必要です。

  • オペレータクラスの作成
  • Flowableにstaticなメソッドを定義する

なのでまずはjustに習ってFlowableImoクラスを定義します。

public final class FlowableImo extends Flowable<String> {
    public FlowableImo() {
    }

    @Override
    protected void subscribeActual(Subscriber<? super String> s) {
        s.onSubscribe(new ScalarSubscription<String>(s, "imo"));
    }
}

imo の決め打ちになるので引数はありません。subscribeActualでは購読処理のためにSubscriptionをSubscriberに投げています。

継承元であるFlowableがSubscriberインタフェースを実装しているので、subscribeメソッドは実装済みですが、こちらは分岐させたりなどして実際にonSubscribeするところまで至ってないのでActualという意味で分けてるっぽいです。

https://github.com/adarapata/RxJava/blob/6a44e5d0543a48f1c378dc833a155f3f71333bc2/src/main/java/io/reactivex/Flowable.java#L13026

subscribeActualが呼ばれるのは、Flowable内で適切なSubscriberを生成した後のようですね。

https://github.com/adarapata/RxJava/blob/6a44e5d0543a48f1c378dc833a155f3f71333bc2/src/main/java/io/reactivex/Flowable.java#L13082

実際Subscribe呼ぶときはonNextだけのConsumerを渡したり、Subscriberを直渡ししたりだとバリエーションが多いので、それらを吸収するための措置っぽい。

定義し終わったら、Flowableから呼べるようにstaticなメソッドを追加します。

public abstract class Flowable<T> implements Publisher<T> {

~~~~~~~~~~~~~~~~~~~

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public static Flowable<String> imo() {
        return RxJavaPlugins.onAssembly(new FlowableImo());
    }
}

back pressure対応やスケジューラなどの設定アノテーションが付いていますが、長くなりそうなので今回は気にしないことにします。

やっていることは、FlowableImoを生成して返すだけです。その際に、RxJavaPluginsでFlowable生成時のフック処理を書いていた場合、それも引っ付けて返すために RxJavaPlugins.onAssembly を呼んでいます。

Printlnオペレータを作る

次は流れてきた値をprintlnするオペレータを定義しましょう。

public final class FlowablePrintln<T> extends AbstractFlowableWithUpstream<T, T> {
    public FlowablePrintln(Flowable<T> source) {
        super(source);
    }

    @Override
    protected void subscribeActual(Subscriber<? super T> s) {
        source.subscribe(new PrintlnSubscriber<T>(s));
    }

    static final class PrintlnSubscriber<T> implements FlowableSubscriber<T>, Subscription {
        final Subscriber<? super T> actual;

        Subscription s;

        PrintlnSubscriber(Subscriber<? super T> actual) {
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.validate(this.s, s)) {
                this.s = s;
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            System.out.println(t); // ここがplintlnオペレータのやりたいこと
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void request(long n) {
            s.request(n);
        }

        @Override
        public void cancel() {
            s.cancel();
        }
    }
}

imoオペレータといくつか違う点があります。

AbstractFlowableWithUpstream は上流のFlowableを扱うための抽象クラスです。

https://github.com/adarapata/RxJava/blob/a00ea07a4d2ce409e8dbea66ddbca9c0a77ddab6/src/main/java/io/reactivex/internal/operators/flowable/AbstractFlowableWithUpstream.java#L29

HasUpstreamPublisher インタフェースを持つことで、自分より上流にPublisherがいることを保証しています。

この実装により、一個前のFlowableを保持させて、メソッドチェーンでオペレータを繋げられるようになっています。 FlowableImoはファクトリメソッドのため上流がありませんでしたが、printlnオペレータは必ず上流が存在するので継承が必要です。

また、インナークラスとして PrintlnSubscriber が存在しています。 このSubscriberがオペレータのコアの機能で、onNextでデータを送るときにごにょごにょする部分です。

今回はonNextでprintlnして、データには変更を加えずそのまま流しています。

あとはファクトリメソッドと同じようにFlowableにメソッドを定義します。

public abstract class Flowable<T> implements Publisher<T> {

~~~~~~~~~~~~~~~~~~~

    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Flowable<T> println() {
        return RxJavaPlugins.onAssembly(new FlowablePrintln<T>(this));
    }
}

以上で、自作オペレータが実装できるようになりました。

Flowable.imo().println().subscribe();
// => imo とログが吐かれるだけ

できあがりはこちらです

見やすいようにPRにしてみました

github.com

所感

かなり端折りましたが、オペレータがどんな挙動をしているのかはざっくりわかりました。

ただ、SubscriberがSubscriberとSubscriptionの両方の役割を担っているので、そこのコードを追うのがめちゃめちゃ大変でした。 この辺りは別のタイミングで書こうかと思います。