前回はRxJavaの概要についてまとめました(記事はこちら)。今回はRxJavaの構成をまとめてみます。
1.4 RxJavaの構成
1.4.1 Flowable/Observable
FlowableおよびObservableはデータを生産し通知するクラス
FlowableとObservableの違いは、Flowableがバックプレッシャーの機能があり、Observableにはない
Flowable/Observableは基本的にReactive Streamsのルールおよびそのルールに影響を与えたObservable契約に従わないと正しくデータが通知されることが保証されない。
通知する際のルールは以下のものがある。
- nullを通知してはならない
- データの通知は行っても行わなくてもよい
- Flowable/Observableの処理を終了する際は完了かエラーの通知のどちらか一方を行わないといけない。両方を通知することはない
- 完了かエラーの通知をした後は他の通知を行ってはいけない
- 通知をする際は1つずつ順番に行い同時に行ってはいけない
※nullを通知してはならないというルールはRxJava 2.xからのルールでRxJava 1.xではnullを通知することは可能なので注意する。
※データの生成を複数スレッドから行う場合はそもそもの設計を見直すか、1つのスレッド上で処理を行うFlowable/Observableを複数用意し、それらを1つに結合するメソッドを使うなどして処理を行う。
Source:RxJavaリアクティブプログラミング
1.4.2 Subscriber/Observer
SubscriberおよびObserverは通知されたデータを受け取りそのデータを使った処理を行うインターフェース
Subscriberはバックプレッシャーの機能があり通知するデータ数をリクエストしないといけないのに対し、Observerはバックプレッシャーの機能がないため、データ数の制限なしにデータが通知される
Subscriber/Observerのメソッドが呼ばれる順は以下の通り。
- (1) onSubscribeメソッド
- (2) onNextメソッド
- (3) onCompleteメソッド
Subscriber/Observerが購読しFlowable/Observableの通知の準備ができたら、onSubscribeメソッドが呼ばれる。このonSubscribeメソッドは1つの購読で1度しか呼ばれない。
データが通知されるたびにonNextメソッドが呼ばれる。複数のデータが通知される場合はその分だけonNextメソッドが呼ばれ、1件もデータが通知されない場合は1度もonNextメソッドが呼ばれない。
onNextメソッドは安全な通知を行う。Flowable/Observableからデータを受け取っている限り、onNextメソッドが同時に実行されることはない。
全てのデータを通知し終えたら、これ以上データを通知することがないことを知らせるためにonCompleteメソッドが呼ばれ、完了時の処理を行う。
Flowable/Observableの処理中にエラーが発生したらonErrorメソッドが呼ばれ、処理を終了する。
各メソッドは1つずつ順番に実行されるが、SubscriberのonSubscribeメソッドだけは例外的に処理の途中でonNextメソッドやonCompleteメソッドが実行される。これはonSubscribeメソッドでSubscriptionのrequestメソッドを呼ぶとFlowableがデータの通知を始めてしまうから。
SubscriberのonSubscribeメソッド内でSubscriptionのrequestメソッドを呼び出すと通知処理が始まってしまうため、もしonSubscribeメソッド内で初期化処理などがある場合は、Subscriptionのrequestメソッドを呼ぶ前に行う必要がある。
1.4.3 Subscription
- SubscriptionはReactive Streamsで定義されているインターフェース
- 通知するデータの数をリクエストするrequestメソッドと処理の途中でも購読を解除するcancelメソッドを持つ。
- SubscriptionはSubscriberのonSubscribeメソッドの引数として渡されるオブジェクトで、指定した数だけのデータを通知するようにFlowableにリクエストしたり、購読を解除したりする機能を持つ。
public interface Subscription {
public void request(long n);
public void cancel();
}
1.4.4 Disposable
- Disposableは購読を解除するためのメソッドを持つインターフェース
- Disposableは、ObservableとObserver間の購読の際にObservableが購読の準備ができた時点でonSubscribeメソッド経由でObserverに渡されるオブジェクトで、購読を処理の途中でも解除する機能を持つ。
- DisposableはObservableとObserver間の購読以外でも使われることがあり、Flowable#subscribe(Subscriber)やObservable#subscribe(Observer)でない購読をするメソッド(subscribeメソッド/subscribeWithメソッド)の戻り値としてDisposableが定義されている。戻り値としてのDisposableは外部から購読の解除を可能とする。
- ただし、この購読解除の機能を外部に出すということは、外部から別スレッドで購読解除が行われるリスクが発生する。そのため、関数型インターフェースを引数に取る購読メソッドやDisposableSubscriber/DisposableObserverやResourceSubscriber/ResourceObserverによって返されるRxJavaが用意したDisposableのインスタンスは非同期で購読解除の呼び出しがされても問題ないように対応している。
- Disposableは購読解除だけではなくリソースなどの破棄を行うためのインターフェースとしても使用可能。処理が終わる際にリソースなどの破棄をしないといけない場合、その破棄する処理をDisposableのdisposeメソッドに実装する。
- FlowableEmitter/ObservableEmitterのsetDisposableメソッドにそのDisposableを設定した場合は完了やエラーおよび購読解除時に、ResourceSubscriber/ResourceObserverのaddメソッドでそのDisposableを追加した場合は、そのResourceSubscriber/ResourceObserverのdisposeメソッドを呼ばれた際にDisposableのdisposeメソッドが呼ばれる。
public interface Disposable {
void dispose();
boolean isDisposed();
}
1.4.5 FlowableProcessor/Subject
- ProcessorとはReactive Streamsで定義されている生産者(Publisher)と消費者(Subscriber)の両方の機能を持っているインターフェース
- RxJavaではこのReactive StreamsのProcessorの実装クラスとしてFlowableProcessorを持っている。
- RxJavaにはObservableとObserverの構成も存在しており、その構成でProcessorと同じ役割を果たすSubjectが用意されている。
- Processor/Subjectの種類
種類 | 説明 |
---|---|
PublishProcessor/PublishSubject | データを受け取ったタイミングでしか消費者(Subscriber/Observer)にデータを通知しないProcessor/Subject |
BehaviorProcessor/BehaviorSubject | 消費者を購読した直前のデータをバッファし、そのデータから通知するProcessor/Subject |
ReplayProcessor/ReplaySubject | 受け取った全てのデータを途中から登録した消費者にも通知するProcessor/Subject |
AsyncProcessor/AsyncSubject | データの生成が完了した際に最後に受け取ったデータしか消費者に通知しないProcessor/Subject |
UnicastProcessor/UnicastSubject | 1つの消費者からしか購読されないProcessor/Subject |
1.4.6 DisposableSubscriber/DisposableObserver
DisposableSubscriber/DisposableObserverはDisposableを実装したSubscriber/Observerの実装クラスで、外部から非同期に購読解除のメソッドを呼ばれても安全に購読解除を実行してくれるクラス
これらのクラスでは、onSubscribeメソッドがfinalメソッドとして実装されており、onSubscribeメソッドで渡されるSubscription/Disposableは隠蔽され直接アクセスできないため、次のメソッドからSubscription/Disposableのメソッドを呼び出す。
DisposableSubscriber のSubscriptionのメソッドを呼び出すメソッド
- request(long): Subscriptionのrequestメソッドを呼び出す
- dispose(): Subscriptionのcancelメソッドを呼び出す
DisposableObserver のDispose のメソッドを呼び出すメソッド
- dispose(): Disposeのdisposeメソッドを呼び出す
- isDispose(): DisposeのisDisposedメソッドを呼び出す
購読開始時の処理をオーバライドして独自の処理を実装したい場合は、onSubscribeメソッド内で呼ばれているonStartメソッドをオーバライドする。
DisposableSubscriberで初期化処理を行う場合、先に初期化処理を実行してから親のonStartメソッドを呼ぶこと。onStartメソッドを呼び出し後に、初期化処理を実行すると、初期化処理を呼ぶ前に通知処理が始ってしまうので注意が必要。
Source:RxJavaリアクティブプログラミング
1.4.7 ResourceSubscriber/ResourceObserver
- ResourceSubscriber/ResourceObserverhはDisposableを実装したSubscriber/Observerの実装クラスで、外部から非同期に購読解除のメソッドを呼ばれても安全に購読解除を実行してくれるクラス。DisposableSubscriber/DisposableObserverが持つ機能に加え、addメソッドで他のDisposableを格納することができる。
- 格納されたDisposableはResourceSubscriber/ResourceObserverのdisposeメソッドを呼ぶと、格納しているDisposableのdisposeメソッドも一緒に呼び出す。
- 完了時やエラー時に自動でdisposeメソッドが呼ばれるわけではない。
- ResourceSubscriberで初期化処理を行う場合、DisposableSubscriberと同様に、先に初期化処理を実行してから親のonStartメソッドを呼ぶこと。onStartメソッドを呼び出し後に、初期化処理を実行すると、初期化処理を呼ぶ前に通知処理が始ってしまうので注意が必要。
Source:RxJavaリアクティブプログラミング
次回は「【Programming】RxJava リアクティブプログラミング vol.3 / RxJavaの構成~後編~」についてまとめてみます。
written by tamito0201
プログラミングとのご縁結びならプロマリへ。
オンラインプログラミング学習スクールのプロマリは、プログラミングの初学者の皆様を応援しています。プログラミング講師と一緒に面白いアプリを作りませんか。
The programming school "Promari" will help you learn programming. "Promari" is supporting the first scholars of programming. Let's develop an application with our programming instructor.