前回はリアクティブプログラムミングの概要についてまとめました(記事はこちら)。今回はRxJavaの概要をまとめてみます。
1.3 RxJava
1.3.1 概要
- Javaでリアクティブプログラミングを行うためのライブラリ
- RxJavaは、もともとは2009年にMicrosoftで .NET Frameworkの実験的なライブラリ「Reactive Extensions」(略して「Rx」)として公開され、2012年にオープンソース化されたものを、後にNetflixがJavaに移植しオープンソースとして公開した。
- Reactive Extensionsを扱うライブラリはReactiveXとしてオープンソースプロジェクト化し、Javaや.NETだけでなくJavaScriptやSwiftなど様々なプログラミング言語に対応したライブラリを提供している
- バージョン2.0よりReactive Streamsの仕様を実装している。2.0よりReactive StreamsのAPIに依存。
- デザインパターンの1つであるObserverパターンをうまく拡張している。Observerパターンは、監視対象のオブジェクトの状態変化に対するデザインパターンで、状態が変化するとそれを観察しているオブジェクトに対し変化があったことを知らせる構成。このパターンの特徴を生かし、特にデータを生産する側とデータを消費する側に分けることで、無理なくデータストリームを扱うことができる作りになっている。
1.3.2 バージョンの違い
- バージョン1からバージョン2に移行する際に単にパッケージ名やクラス名を変えるだけではなく、それらのAPI周りの変更も必要に
なる
バージョン | パッケージ |
---|---|
1.x | rx |
2.x | io.reactivex |
1.3.3 RxJavaの仕組み
- RxJavaの仕組みはデータを生産し通知する生産者と通知されたデータを受け取り処理を行う消費者の構成で成り立つ。
- RxJavaではこの生産者と消費者の関係が大きく分けて2つあり、1つはReactive Streamsを対応しているFlowableとSubscriber、もう1つはReactive Streamsの対応をしておらずバックプレッシャー(過去のメッセージも取得できること)の機能がないObservableとObserverの構成で成り立つ
- FlowableはObservableの進化系。BackPressureを調整して、onNestででてくるデータのスピード調整などができる。
Reactive Streams | 生産者(-able; 購読される) | 消費者(-er; 購読する) | 生産者&消費者(-erであり、-ableでもある) |
---|---|---|---|
Reactive Streams対応あり(バックプレッシャー機能あり) | Flowable | Subscriber | Processor |
Reactive Streams対応なし(バックプレッシャー機能なし) | Observable | Observer | Subject |
- Flowable/Subscriber
- 生産者であるFlowableによる購読開始(onSubscribe)、データ通知(onNext)、エラー(onError)、完了(onComplete)の4つの通知を行い、消費者であるSubscriberによって各通知を受け取った際の処理を行う。
- Subscriptionを通してデータ数のリクエストや購読の解除を行う。
- Observable/Observer
- FlowableとSubscriberの構成とほぼ同じで、生産者であるObservableからの購読開始(onSubscribe)、データ通知(onNext)、エラー(onError)、完了(onComplete)の4つの通知をObserverで受け取る
- 通知するデータ数の制御を行うバックプレッシャーの機能がないため、データ数のリクエストを行わない。そのため、Subscriptionは使わず、Disposableという購読解除のメソッドを持つインターフェースを用いる。
- Disposableは購読解除を行うため、購読を解除するdispose()と、購読を解除している場合はtrueを、解除していない場合はfalseを返すisDisposed()メソッドを持つ
- ObservableとObserver間でデータをやり取りをする場合は、FlowableとSubscriber 間のようなデータ数のリクエストは行わず、データが生成されるとすぐにObserverに通知される
Source:Reactive Streams And Microservices - A Case Study
1.3.4 オペレータ
- RxJavaでは、Publisher(Flowable/Observable)からSubscriber(Subscriber/Observerにデータが通知される間にSubscriberがデータを利用しやすい形に整形、変換することができる。
- 整形されたデータは再びFlowable/Observableなデータとして返されるため、メソッドチェインする形で段階的にデータを整形することが可能。
- こうしたデータを生成したり、変換したり、フィルターをかけたりできるメソッドのことをRxJavaではオペレータと呼ぶ。
Source:RxJavaリアクティブプログラミング
1.3.5 RxJava Examples
RxJavaの大まかな流れ
- 1.Observableを作る
- 2.filterやmapなどのOperatorを使って値を加工する
- 3.Observerを使ってObservableをsubscribeする
Flowable(Reactive Streams 対応)を使ったサンプル
- Sample1 文字列を配信するObservableプログラム
public void basicExample() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onNext("Welcome");
e.onNext("This is your first RxJava example");
e.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
LOGGER.info("observer subscribed to observable - on subscribe");
}
@Override
public void onNext(String value) {
LOGGER.info("observer - onNext " + value);
}
@Override
public void onError(Throwable e) {
LOGGER.info("observer - onError " + e.toString());
}
@Override
public void onComplete() {
LOGGER.info("observer - on complete");
}
};
observable.subscribe(observer);
}
- Sample2 Operatorを使って値を加工するプログラム
Observable.from(new Integer[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return (i % 2) == 0;
}
})
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer i) {
return i * 10;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onNext(Integer integer) {
Log.d("Hoge", integer.toString());
}
@Override
public void onCompleted() {
Log.d("Hoge", "completed");
}
@Override
public void onError(Throwable e) {}
});
- Sample3 あいさつの言葉を通知するFlowableプログラム
public static void main(String[] args) throws Exception {
// あいさつの言葉を通知するFlowableの生成
Flowable < String > flowable =
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter)
throws Exception {
String[] datas = {
"Hello, World!",
"こんにちは、世界!"
};
for (String data: datas) {
// 購読解除されている場合は処理をやめる
if (emitter.isCancelled()) {
return;
}
// データを通知する
emitter.onNext(data);
}
// 完了したことを通知する
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER); // 超過したデータはバッファする
flowable
// Subscriberの処理を別スレッドで行うようにする
.observeOn(Schedulers.computation())
// 購読する
.subscribe(new Subscriber<String>() {
// データ数のリクエストおよび購読の解除を行うオブジェクト
private Subscription subscription;
// 購読が開始された際の処理
@Override
public void onSubscribe(Subscription subscription) {
// SubscriptionをSubscriber内で保持する
this.subscription = subscription;
// 受け取るデータ数をリクエストする
this.subscription.request(1 L);
}
// データを受け取った際の処理
@Override
public void onNext(String data) {
// 実行しているスレッド名の取得
String threadName = Thread.currentThread().getName();
// 受け取ったデータを出力する
System.out.println(threadName + ": " + data);
// 次に受け取るデータ数をリクエストする
this.subscription.request(1 L);
}
// 完了を通知された際の処理
@Override
public void onComplete() {
// 実行しているスレッド名の取得
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ": 完了しました");
}
// エラーを通知された際の処理
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
// しばらく待つ
Thread.sleep(500 L);
}
1.3.6 副作用を発生させる処理
- 副作用を発生させる処理とは、オブジェクトの状態を変更するなどして、処理の外部からでも参照可能なオブジェクトに対して何らかの変化を加えることや、ファイルやデータベースの中身を変えるようなことを指す。
- 副作用を発生させないことは、複数スレッドから共有されるオブジェクトがないことになり、スレッドセーフを担保することができる
- データを通知してから消費者に受け取られるまでの間は副作用の発生を避ける作りとする。
- RxJavaでは基本的に副作用を発生させるような処理を行うのは、メソッドチェインの途中ではなく、最終的にデータを受け取り処理を行う消費者側で行うこと。
次回は「【Programming】RxJava リアクティブプログラミング vol.3 / RxJavaの構成~前編~」についてまとめてみます。
written by tamito0201
プログラミングとのご縁結びならプロマリへ。
オンラインプログラミング学習スクールのプロマリは、プログラミングの初学者の皆様を応援しています。プログラミング講師と一緒に面白いアプリを作りませんか。
The programming school "Promari" will help you learn programming. "Promari" is supporting the first scholars of programming. Let's develop an application with our programming instructor.