挙動をつかむのが難しいです。

 observableで非同期中の処理を途中で停止する方法。一般的にはdisposeを実行すればいいっぽい。
 ボタン押下でobservableを途中で停止するサンプルプログラムを記述した。
 observableの基本的な使い方や処理の流れについては、こちらを参考にしてほしい。
 disposable.dispose()を実行すると非同期処理中の処理は自動的に停止する。さらにonDisposeがコールバックされるので、そこに必要な処理を記述すればいい。なお、このときにtry~catchでエラーをハンドリングしている場合、disposeで途中で停止されるのでエラーがハンドリングされてしまい、onErrorのイベントが発生してしまう。しかし、このときdisposeが実行され、すでにobservableのインスタンスは破棄されているのでイベントが実行できずエラーになる。これを回避するには、すでにdisposedされているかどうかを確認するisDisposedプロパティのみてDisposeされていたらonErrorを実行しないように回避する。
 また、create内のラムダ式内部にsetCancellableで処理を記述しても、Dispose時に実行されるが、ここで処理を記述すると可読性が低くなりそうだ。

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        var disposable: Disposable? = null
        val observable = Observable.create<String> {

//別スレで動作している Log.v("nullpo", Thread.currentThread().name) try {

it.setCancellable { Log.v("null", "cancelable") }
//sleepを入れてわざと時間のかかる処理にしている for (i: Int in 1..100) { Thread.sleep(100) Log.v("null", i.toString())

//その都度データを送る it.onNext(i.toString()) }

//終了したらCompleteを実行 it.onComplete() } catch (e: Exception) {

//disposeするとエラーがキャッチされてonErrorが実行されてしまう
//するとdisposeされているのでイベントを実行することができずアプリが落ちる
//その回避のためisDisposedをチェックしてエラーを回避する if (it.isDisposed == false) it.onError(Throwable("take")) Log.v("null", "exception") } } .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe { disposable = it } .doOnNext { Log.v("nullpo", Thread.currentThread().name) Log.v("null", "onNext") } .doOnComplete { Log.v("nullpo", Thread.currentThread().name) Log.v("null", "onComplete") } .doOnError { Log.v("nullpo", Thread.currentThread().name) Log.v("null", "onError") }

//Dispose時に呼ばれる
//doOnErrorもdoOnCompleteも呼ばれずにDisposeが呼ばれるので処理中にDisposeされたことがわかる .doOnDispose { Log.v("null", "onDispose") }
//ボタンクリックでObservableの開始 button1.setOnClickListener { observable.subscribe({}, {}) }

//ボタンクリックでDispose button2.setOnClickListener { disposable?.dispose() } } }