RxJSで副作用を扱うにはどうするか - Schedulerを交えて

Rx.Scheduler

RxにはSchedulerと呼ばれる主要概念がある. 値がpushで飛んでくるというRxのインパクトの後ろに隠れがちなSchedulerではあるが, これにより, 処理系のスレッドモデル(並行性)と時間軸にまつわるタイミングの制御を統一的に扱えるようにしている. 後続へのoperatorへの値の送出タイミングの制御, Observableの処理スレッドの指定, タイマーのモックへの差し替えなどがSchedulerによって実現されている.

さてJavaScriptの場合, 原則的には単一スレッドの世界になる. Javaや.NETの場合とは違い, RxのSchdulerの役割は回り続けるイベントループ抽象となる. 永久に回り続けるイベントループの中で, どの時点で処理をdispatachするかがSchedulerの役目だ.

JavaScriptの世界にはメモリ空間を共有する方式でのマルチスレッドは事実上存在せず、アクター的なWorkerを立てるか(browser), 別プロセスを生成するかになるので(NodeJSには Web Workerっぽい何かを導入する案はあるが未確定), それぞれはシリアライズされたメッセージで繋ぐことになり, RxJSに隠蔽される形での縦断はできない. 同一のメモリ空間を複数スレッドで共有する方式の処理系でのRxならば「特定の重い処理をコールバックとして渡し, 別のスレッドに逃がしつつ, それらを上手く元のスレッドで待ち受ける」という処理をoperatorをつなぐだけで書けるが, RxJSでは兼ねてからの処理系の制約により, そんなカジュアルにスレッドをまたぐのは無理. 自前でObservableを返すRPC Adapterを実装すれば, それっぽいものはできると思うけれども, この場合はサーバーとクライアントをJSONで結んでいるのと大差は無い.

シングルスレッドなJavaScriptの世界ではRxのSchedulerは目立たない. 従来, JavaScriptの世界で並行性の問題に目立ってぶち当たることは少なかった. しかし, そこには目立たない・少ないだけで, 非同期操作とイベントループの狭間に並行性の問題は確かに存在する. いざ敵に現れたときに厄介な並行性の問題に立ち向かうときに非常に有効になるのが, このSchedulerである.

observeOn()/subscribeOn()

先にも述べたように, RxJSにおけるSchedulerはイベントループ抽象であるため, observeOn()subscribeOn()の挙動もイベントループに対する操作となる.

observeOn()は, 後続のobservableに対して, 指定されたschedulerに基づいて値を流す. 一方, subscribeOn()は指定されたschedulerのタイミングでsubscriptionを開始する.

違いは, observeOn()はObservableがsubscriptionされている間に永続的に作用するのに対し, subscribeOn()はsubscriptionが発生するタイミングそれぞれ一回のみに作用する.

つまり, 後続への値の創出を永続的にschedulerで制御したい場合はobserveOn()を使用し, イニシャライザ内などで一気にすべてのObservableをsubscriptionした上で, 将来の任意の時点でsubscriptionを開始したい場合にはsubscribeOn()を使用する.

前提として考えるよくあるケース

データフローを全てRx.Observableに載せて書こうとする場合, Observableの中間地点で副作用を起こす必要がある場合が存在する. たとえば, こんな感じ:

var value = ...;
var sideeffect = observable.do(() => {
    value.increment();
});

このような場合, observableから伝播してくるpush 1回に対してvalue.increment()が2度走ってしまうと困る. だが, sideeffectsubscribe()した回数分だけsubscriptionのチェーンは構築されてしまうので, push 1回に対してsideeffectがsubscibeされた回数だけvalue.increment()が実行されてしまう. さて, どうするか?

基本的な指針

  • 複数回subscribeしても重複処理の無いようにHot Observableに変換する
  • アトミックな処理区間を作る
  • 状態の変化を適切に関係する値に反映させる

つまるところ, モナドっぽいものを作りましょう, みたいな話だったりする……

Hot Observableに変換し, 途中のObservableが(内部的に)複数回subscribeされることが無いようにする

RxはObservableをsubscribeした場合に , 内部的なsubscribeを繰り返し, Observableのチェーンを最初まで遡るようになっている(by design). これの内部的な経路開通が複数回別個に行われるか, 内部的subscribeの経路を共有しているかが, ObservableがColdかHotかの違いとなっている.このため, 何も考えない場合, 同一のObservableが複数回subscribeされるイコール同一の処理がsubscribeされた回数分走ってしまうので, 副作用を起こす処理が複数回走ってしまって問題になったりする.

これを避けるために, 同一のObservableが複数回subscribeしても一つの経路しか開通しないようにHot Observableに変換する. 具体的にはshare()publish() + refCount())とかする.

類似のアプローチとして, 終端であるsubscribe()メソッドに渡したObserver(Subscriber)の中にのみ, 副作用を起こす処理を書くようにするというのもあるが, これをやりすぎると, Observableのchainが薄くなる一方で, subscribeがどんどん分厚くなっていく. これじゃ, ただのobserverパターンに逆戻りだよ!この手のアプローチを取らないと上手にハンドル出来ないケースも有るのだが, subscribeを厚くしなくてもハンドルできるケースは多いので, 末端のsubscribeに逃げずにハンドル出来ないかを最初に考えつつ動かすのが良いだろう.

なお, ObservableをHotに変換すればいいというものでもない. なぜなら, Hotでは最初のsubscribe/connect()以降に流れた値というのは取り逃してしまうが, Coldであれば, 新しくsubscribeする度にrootに向かってchainが走るため, 毎回初期値を流し込む必要のある処理を綺麗にObservableの中に閉じ込めることができる. パフォーマンスや従来のEvenEmiitterによるObserver patternを考えると, 防御的にHotに変換しつつ方が理解としては簡単だろう.

一方, 全部Coldで組んでおいて最適化の仮定でHotに変換していく方が, 迅速に動くものを作るという意味では簡単だったりもする(Hotに直してsubscriptionの数を減らす過程で, subscriptionの時系列問題が発生するので, 地味に面倒くさかったりするが……). 個人的な印象としては, 値を生成するジェネレータとしてObservableを使う場合はColdの方が良くて, UIイベントなどを取り扱う場合はObservableのchainがHotになるようにしたほうが総合して楽な印象.

アトミックな処理区間を生成するようにする

従来のJavaScriptだとそんなに意識することがないdata raceに関わる話. Observableを綺麗に小分けして組んでいるケースに問題になることが多い.

var a = event.map((id) => {
    return collection1.get(id));
});
var b = a.map((item) => {
    var val  = item.barfoo();
    return val;
});
var c = b.do((val) => {
   collection2.set(val);
});

ここで, a~cの処理はアトミックに一続きで処理される必要があるとする. その場合, 間に別の処理が割り込んできて, 変更対象のcollectionが変更されるとrace conditionとなって困る. 似た問題はRxに関係なく, 複数の処理が並行して動いていると発生する問題だが, 同期コードが多く, スレッドも一つしかなかったJavaScript界ではXHRのときくらいしか目立たなかった問題. NodeJSはfs.exist()などの非同期APIで問題となっていたし, 最近だとPromiseが内部的にmicrotask queue(emca262だとjob queueだっけ)を積み上げるようになっていて, 時々目にするようになった(余談だが, これについてPaul TaylorはPromiseの設計ミスだと考えているようで, 「es-observable proposalでは, この過ちを繰り返したくない」と主張していた……).

さて, RxJSにおいて, 各operatorメソッドは必ずしも連続して実行されることが保証されているわけではない. 基本的には連続して動作するのだが, operatorが内部で使うスケジューラによって変更が可能であるし, observeOn()などが途中に挟まれていると, その時点で保証は崩れることになる.

RxJS v5以降, 原則としてビルドインのオペレータは連続して(再帰的に)呼び出されるようにはなっているが, 先述した通り, observeOn()などを間に挟んだObservableを返す関数を呼び出したりするケースでは信頼できないのは変わりない.

function barfoo(in) {
    // 次のイベントループになったら後続に値を流すような設定になったとする.
    return out.observeOn(scheduler.nextEventLoop);
}

var a = ...;
var b = barfoo(a);
var c = b.do(....); // この場合, cの実行は次のイベントループになる

各operatorに渡された各コールバックの単位のみがアトミックに実行されるのが保証されている単位であり, これ以上の単位については自分でschedulerを用意したり, そもそも操作対象のオブジェクトの操作が可能な範囲を限定することで回避するしかない. 他の言語であれば, 特定のスレッドに張り付けたり, ロックを上手に使ってアトミック操作が担保される区間を作ればいい. 要はそれとおなじことをやればよい,

一番わかりやすいのは単一のdo/map内で処理を完結させてしまう解決方法. operatorの使い方はセマンティックではなくなったり, 別のobservableを引っ張ってこようとした場合に, ちょっとややこしくなるという欠点がある.

もう一つの方法としては, クロージャやprivateメンバを用いて, 変数に触れる函数・オブジェクトを限定した上で, Schedulerを明示的に設定したり, Hot Observableとして結果を同時するなどして, 同時に操作する可能性のある外部要因を減らし, オペレータのチェーンが一続きに実行される, アトミックな区間を作成すること.

// そもそも分割しないアプローチ.
var a = event.do((id) => {
    var item = collection1.get(id));
    var val  = item.barfoo();
    collection2.set(val);
});

// 関数内にクロージャ変数として閉じ込める
function barfoo() {
    var collection1 = ...;
    var collection2 = ...;

    var a = event.map((id) => {
        return collection1.get(id));
    });
    var b = a.map((item) => {
        var val  = item.barfoo();
        return val;
    });
    var c = b.do((val) => {
        collection2.set(val);
    });

    return c.share();
}

// privateメンバとして閉じ込める
class BarFoo {
    private _collection1: ....;
    private _collection2: ...;

    result: ....;

    constructor() {
        this._collection1 = ...;
        this._collection2 = ...;

        var a = event.map((id) => {
            return this._collection1.get(id));
        });
        var b = a.map((item) => {
            var val  = item.barfoo();
            return val;
        });
        var c = b.do((val) => {
           this._collection2.set(val);
        });

        this.result = c.share();
    }
}

状態の変化を適切に通知するようにする

そもそも副作用を面倒くさい点とは何たるや?と言えば, ある要素の状態が変更されたという事実が, その要素の状態に依存している(処理の前提としている)後続の別の処理まで適切に引き継がれないために, 状態の不整合が積み重なっていくということにある. 古典的なオブザーバーパターンであれば, イベントを受け取った後に特定のフラグを変更し忘れたりするようなやつ.

同期pull型のAPIベースでプログラムを組んでいても, 現在の状態を観測するpull APIの呼び出し結果を下手にキャッシュして効率を改善しようとした場合などに, 状態の整合性の取り方を間違えるとバグる. それと同じ問題と言える.

さて, ここでバグるのは, 最初に述べた通り, 系としての状態の不整合が発生している中で無理に処理を実行していった結果, 状態がさらに崩れていくからだ. pull型と違ってpush型の場合, ある値が更新・変更された場合で後続に必要な情報を取得しなおす習慣があまりない, というよりも非同期を許容するpush型APIの場合, pushを受けた後に前提条件をpullで取りに行ったとしても, pushで飛んできた状態が生成された時点での状態である保証されていないので, 組み合わせにくいというのが表現として妥当かもしれない.

このような場合どうするべきかと言えば, 答えは単純. 状態変更が後続に伝わらないのが問題なのだから, 状態変更が後続に伝わるようにすれば良い. つまり, 後続の処理に必要となる値をpushで流せるようにすれば良いのであって

  • 操作対象を独立したObservableとして, 状態が更新されると値が後続に流れるようにする(後続ではzip()/combineLatest()で待ち受ける)
  • 値を後続にpushするときに, 関連する状態をTupleか何かに突っ込んで一緒に後続に流す

のどちらかで解決する. 値の変更を伝える方法はいくつかあるが, Rxを用いて解決する場合は

  • 状態変更をObservableとして発信するデータ型を作る(ライブラリ層として隠ぺいする)
    • Map/Set/Arrayの派生型として, 値の挿入・削除・変更に対応してイベントを発行するものを作る
    • Reactive Propertyのようにもう一段メタな型として用意する
  • Subjectを使って, 明示的にonNextするのを頑張る

のどちらかとなる.

まとめ

  • Rxが, というよりも並行して動作するプログラムの中で状態の整合性を取る話が軸になる
  • Rxだとよく発生するというだけで, 別に今までも起こっていた話にすぎない
  • Rx.Schedulerは目立たないが, これが有ることで泥臭い事例を上手いこと乗り越えていける