これも結構難しいです。
FlowableはRxJava2.xから追加された機能だ。
機能的にはObservableに近い機能があり、ほぼ同じように利用できる。Observableに関しての使い方については、こちらの記事からどうぞ。
ObservableとFlowableは、どちらも例えばスマホを持って移動したとき常に変化する位置情報を連続的に取り扱い操作したり、現在のCPUの使用率を1秒ごとに取得して何か処理するというような、変化するデータが次々とあらわれる逐次的に発生するデータを取り扱う。
ObservableとFlowableの違いは、実際に試してみたところによると、Observableがすべてのデータをキャッシュして、最初から一つずつデータをとばすことなく処理していくような処理に向いているようだ。試しに多くのデータを流してみたが、データがなくなるということはなかった。なのでObservableでは、取得したデータを最初から一つずつ順番に処理していくような、データの更新間隔が少なく、データに関する処理も十分少なく、貯まるデータよりも処理できるデータが多いような処理に向いているようだ。(ただしObservableでもデータを間引きして処理する仕組みはある)。
それに対してFlowableは保存しておくデータの量を決定したり、キャッシュできる容量をオーバーしたときのどのような処理をするかといったことや、さらにデータを何個ずつ読み取るかなどというデータの流れ(フロー)に関して、よりきめ細やかな処理ができる。
ではデータのフローの設定と、キャッシュから一つずつデータを取り出すソースを示しながら、それぞれそこに仕組みを記述してみるのでソースのコメントを読んでみて欲しい
一つはキャッシュするデータの数を指定できるところ。
二つめはデータのキャッシュ方法を指定できるところ。
三つ目はデータの読み出し数を指定できるところだ。
キャッシュできるデータ数の指定は、そのままの意味なので割愛。
キャッシュ方法については、代表的に以下のようなキャッシュ方法が指定できる。
データ読み出し数については、subscription.requestで指定できる。これはなぜかdoOnSubscribeに記述しても反映されないので、subscribeメソッドで指定する必要がある。
データがキャッシュに保存されている場合、requestで指定した数だけデータを読み出すことができる。よくサンプルコードではdoOnNextにrequest(1)のように記述されているが、たぶん、それは本来の使い方ではなく、状況によって1つ、場合によっては3つなどというように数値を可変して読み込むようにするのが本来の方法だと思われる。
なので今回のサンプルではボタンクリックでその都度1つずつデータを読み出すようにしている。処理の具合などによって、2個読み込んだり、3つ読み込んだり、処理はしばらくやめるというような場合に利用するのが本来の使い方だと思われる。
なお、こうすると1秒に1回とか100ミリセカンドに1回とかの情報を読み取ることもできる。しかし、FlowableとObservableには、ある間隔ごとのデータの読み出しをするメソッドが用意されており、実際にはそちらを使ったほうが楽だと思われる。Observableにもなぜか同様のメソッドがあるので、特段、特別な操作をしないのであれば、Observableにある機能だけ利用すれば様々な処理を行うことができそうだ。
以下に、その方法の具体的なサンプルを示す
FlowableはRxJava2.xから追加された機能だ。
機能的にはObservableに近い機能があり、ほぼ同じように利用できる。Observableに関しての使い方については、こちらの記事からどうぞ。
ObservableとFlowableは、どちらも例えばスマホを持って移動したとき常に変化する位置情報を連続的に取り扱い操作したり、現在のCPUの使用率を1秒ごとに取得して何か処理するというような、変化するデータが次々とあらわれる逐次的に発生するデータを取り扱う。
ObservableとFlowableの違いは、実際に試してみたところによると、Observableがすべてのデータをキャッシュして、最初から一つずつデータをとばすことなく処理していくような処理に向いているようだ。試しに多くのデータを流してみたが、データがなくなるということはなかった。なのでObservableでは、取得したデータを最初から一つずつ順番に処理していくような、データの更新間隔が少なく、データに関する処理も十分少なく、貯まるデータよりも処理できるデータが多いような処理に向いているようだ。(ただしObservableでもデータを間引きして処理する仕組みはある)。
それに対してFlowableは保存しておくデータの量を決定したり、キャッシュできる容量をオーバーしたときのどのような処理をするかといったことや、さらにデータを何個ずつ読み取るかなどというデータの流れ(フロー)に関して、よりきめ細やかな処理ができる。
ではデータのフローの設定と、キャッシュから一つずつデータを取り出すソースを示しながら、それぞれそこに仕組みを記述してみるのでソースのコメントを読んでみて欲しい
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//フローを制御するためのクラスを格納する変数を宣言しておく
var subscription: Subscription? = null
val flowable = Flowable.create<String>({
//ここのラムダ式の部分が別スレッドで動作する
Log.v("nullpo", Thread.currentThread().name)
//試しに10個のデータを連続して流す
for (i: Int in 1..10) {
Log.v("null", i.toString())
it.onNext(i.toString())
}
//ここでキャッシュからあふれた場合の処理を記述する
//LATESTはキャッシュを削除し直近のデータを保存するように指定する
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.newThread())
//第三引数でキャッシュするデータの数を指定する
//ここでは5としているので、5回分のonNextのデータを保存できる
.observeOn(AndroidSchedulers.mainThread(), false, 5)
.doOnSubscribe {
Log.v("null", "onSubscribe")
//フロー制御用のクラスのインスタンスを保存しておく
subscription = it
}
.doOnNext {
Log.v("null", "onNext")
Log.v("null", it)
}
button1.setOnClickListener {
//ボタンクリックで非同期処理開始
//第四引数がsubscribeが呼ばれたときに実行されるコールバック
//メソッドチェーンで記述してあるdoOnSubscribeと同じタイミングで呼び出される
//しかしrequestのメソッドはここに記述しないとなぜか反映されない
//ここではデータを1個だけ読み出すように設定している
flowable.subscribe({}, {}, {}, { subscription?.request(1) })
}
button2.setOnClickListener {
//ボタンでクリックをするとさらに1個のデータを読み出すように指定できる
//request(2)とすると2個のデータを読み出し、doOnNextが2回実行される
subscription?.request(1)
}
}
}
ポイントは3つ。一つはキャッシュするデータの数を指定できるところ。
二つめはデータのキャッシュ方法を指定できるところ。
三つ目はデータの読み出し数を指定できるところだ。
キャッシュできるデータ数の指定は、そのままの意味なので割愛。
キャッシュ方法については、代表的に以下のようなキャッシュ方法が指定できる。
BackpressureStrategy.BUFFER
無制限にデータをキャッシュする
BackpressureStrategy.LATEST
直近の最近のデータをキャッシュする
BackpressureStrategy.DROP
これまでのキャッシュデータは削除して次の最新のデータをキャッシュする
BackpressureStrategy.ERROR
バッファの容量を超えてデータがくるとエラーとする
無制限にデータをキャッシュする
BackpressureStrategy.LATEST
直近の最近のデータをキャッシュする
BackpressureStrategy.DROP
これまでのキャッシュデータは削除して次の最新のデータをキャッシュする
BackpressureStrategy.ERROR
バッファの容量を超えてデータがくるとエラーとする
データ読み出し数については、subscription.requestで指定できる。これはなぜかdoOnSubscribeに記述しても反映されないので、subscribeメソッドで指定する必要がある。
データがキャッシュに保存されている場合、requestで指定した数だけデータを読み出すことができる。よくサンプルコードではdoOnNextにrequest(1)のように記述されているが、たぶん、それは本来の使い方ではなく、状況によって1つ、場合によっては3つなどというように数値を可変して読み込むようにするのが本来の方法だと思われる。
なので今回のサンプルではボタンクリックでその都度1つずつデータを読み出すようにしている。処理の具合などによって、2個読み込んだり、3つ読み込んだり、処理はしばらくやめるというような場合に利用するのが本来の使い方だと思われる。
なお、こうすると1秒に1回とか100ミリセカンドに1回とかの情報を読み取ることもできる。しかし、FlowableとObservableには、ある間隔ごとのデータの読み出しをするメソッドが用意されており、実際にはそちらを使ったほうが楽だと思われる。Observableにもなぜか同様のメソッドがあるので、特段、特別な操作をしないのであれば、Observableにある機能だけ利用すれば様々な処理を行うことができそうだ。
以下に、その方法の具体的なサンプルを示す
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
val flowable = Flowable.create<String>({
Log.v("nullpo", Thread.currentThread().name)
for (i: Int in 1..100000) {
it.onNext(i.toString())
}
}, BackpressureStrategy.BUFFER)
//throttleから始まるメソッド等を利用すると、ある間隔でデータを取得することができる
//下記の場合は100ミリセカンドごとに、その直近のデータをdoOnNextに送るように指示をする
.throttleLast(100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
Log.v("null", "onNext")
Log.v("null", it)
}
.subscribe({ }, { }, { })
}
}