imog

主にUnityとかの事を書いています

skipLast, takeLastオペレータはデータを流すタイミングをずらす

ちょっと面白かったのでメモ。

例えば複数のSubscriberを並行して処理したいなと思った時に、autoConnectで流すタイミングを合わせたとする。

Observable<Integer> foo = Observable.just(3, 4, 5).publish().autoConnect(2);

foo.subscribe(data -> System.out.println("A:" + String.valueOf(data)));
foo.subscribe(data -> System.out.println("B:" + String.valueOf(data)));
A:3
B:3
A:4
B:4
A:5
B:5

もちろん順番に値が流れることになる。

この時にskipLastで片方だけ流す量を調整してみる。

Observable<Integer> foo = Observable.just(3, 4, 5).publish().autoConnect(2);

foo.subscribe(data -> System.out.println("A:" + String.valueOf(data)));
// 5だけskipされる
foo.skipLast(1).subscribe(data -> System.out.println("B:" + String.valueOf(data)));

するとこんな感じになる。

A:3
A:4
B:3
A:5
B:4

ABABAと最後のBが流れないのかと思いきや、AABABと一つ目のBがなくなってしまった。 なんでだろうと思ってソース読んだらかなりシンプルだった。

github.com

指定した数だけキューに溜めて、数を満たしたら順次OnNextに流していく実装になっている。 オペレータから見た時に、どのくらいの数のデータが流れてくるのかという情報はないので、先に指定分ずらしておくようだ。 なるほどという感じ。

ちなみにtakeLastもほぼ同じ感じ。

https://github.com/ReactiveX/RxJava/blob/d3455d0c9d57d522c31b5c25af83e8f2b8df12b6/src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLast.java#L59-L64

普通に使っててこの仕様にひっかかることはなさそう。