ECMA262のIteration protocolsで遅延評価するIteratorを作る

これは何?

ECMA262 6th以降にはiteration protocols(と呼ぶべきもの)が導入されている、というのは皆さんもちろんご存知のとおりだと思います。これを使ってIterator<T>.next()が呼ばれるまでmap()などを実行しない(lazy evaluationする)Iteratorを作ってみようという話です。Rustのstd::iter::IteratorとかC#LINQとかが似たような挙動をしますよね。

今後、巨大なオブジェクトやリストに対してのIteratorが提供・出現した場合、毎回のように即時評価でfor-ofしたりArray.from()するなどは現実的ではないでしょう。以下のように書くことも出来ますが、

/**
 *  @template T, U
 *  @param {Iterable<T>} src
 *  @param {function(T):U} mapfn
 *  @return {Iterable<U>}
 */
function map(src, mapfn) {
  const iter = src[Symbol.iterator]();
  const next = function () {
    const { done, value } = iter.next();
    if (done) {
      return {
        done,
      };
    }
    const result = mapfn(value);
    return {
      done: false,
      value: result,
    };
  };

  return {
    [Symbol.iterator]() {
      return {
        next,
      }
    }
  };
}

const inifinityList = [...];
map(inifinityList);

bind operator proposalがacceptされないことにはa( b ( c() ) )のように書いていくケースの方が多くてダルい。俺はbind operatorの行く末を気にせずにArray.prototype.map()のようにメソッドチェーンで書いていきたいんだ、その気持ち、わかります。

というわけで、ものは試しにiterator protocolに則る形で実装してみましょう.

Proof Of Concept

型がややこしいのでTypeScriptで書いてみました。だいたいこんな感じ。どうせES6に型アノテーションつけた程度に書いてるので、適当にアノテーション落とせば動きます。

ちなみに基本設計はRxJS v5を元にしています。IterableとObservableでDualityだ! だからというわけではないですが、評価結果のキャッシュはopt-inになっています(cache())。安易にキャッシュしてしまうと、mapやfilterのコールバック関数がDate.now()などの副作用に依存している場合にバグってしまうので、明示的に使用する前提にしてます。

実用したい場合

大抵の場合は自分で再実装するか、Interactive Extensions (for JavaScript)あたりを待つか、新しい何かが出てくるのを待てば良いのではないでしょうか。

他にも、

などがありますが、2016年3月3日現在の安定版ではiteration protocolには対応してない模様ですが、今のところはES6~のIteratorが必須になるようなケースはそこまで多くはないと思っているので、これらを使うなどで凌ぐというのも現実的だと思います。

RxJSで副作用を扱うにはどうするか - Schedulerを交えて

Rx.Scheduler

RxにはSchedulerと呼ばれる主要概念がある. 値がpushで飛んでくるというRxのインパクトの後ろに隠れがちなSchedulerではあるが, これにより, 処理系のスレッドモデル(並行性)と時間軸にまつわるタイミングの制御を統一的に扱えるようにしている. 後続へのoperatorへの値の送出タイミングの制御, Observableの処理スレッドの指定, タイマーのモックへの差し替えなどがSchedulerによって実現されている.

さてJavaScriptの場合, 原則的には単一スレッドの世界になる. Javaや.NETの場合とは違い, RxのSchdulerの役割は回り続けるイベントループ抽象となる. 永久に回り続けるイベントループの中で, どの時点で処理をdispatachするかがSchedulerの役目だ.

JavaScriptの世界にはメモリ空間を共有する方式でのマルチスレッドは事実上存在せず、アクター的なWorkerを立てるか(browser), 別プロセスを生成するかになるので(NodeJSには Web Workerっぽい何かを導入する案はあるが未確定), それぞれはシリアライズされたメッセージで繋ぐことになり, RxJSに隠蔽される形での縦断はできない. 同一のメモリ空間を複数スレッドで共有する方式の処理系でのRxならば「特定の重い処理をコールバックとして渡し, 別のスレッドに逃がしつつ, それらを上手く元のスレッドで待ち受ける」という処理をoperatorをつなぐだけで書けるが, RxJSでは兼ねてからの処理系の制約により, そんなカジュアルにスレッドをまたぐのは無理. 自前でObservableを返すRPC Adapterを実装すれば, それっぽいものはできると思うけれども, この場合はサーバーとクライアントをJSONで結んでいるのと大差は無い.

シングルスレッドなJavaScriptの世界ではRxのSchedulerは目立たない. 従来, JavaScriptの世界で並行性の問題に目立ってぶち当たることは少なかった. しかし, そこには目立たない・少ないだけで, 非同期操作とイベントループの狭間に並行性の問題は確かに存在する. いざ敵に現れたときに厄介な並行性の問題に立ち向かうときに非常に有効になるのが, このSchedulerである.

observeOn()/subscribeOn()

先にも述べたように, RxJSにおけるSchedulerはイベントループ抽象であるため, observeOn()subscribeOn()の挙動もイベントループに対する操作となる.

observeOn()は, 後続のobservableに対して, 指定されたschedulerに基づいて値を流す. 一方, subscribeOn()は指定されたschedulerのタイミングでsubscriptionを開始する.

違いは, observeOn()はObservableがsubscriptionされている間に永続的に作用するのに対し, subscribeOn()はsubscriptionが発生するタイミングそれぞれ一回のみに作用する.

つまり, 後続への値の創出を永続的にschedulerで制御したい場合はobserveOn()を使用し, イニシャライザ内などで一気にすべてのObservableをsubscriptionした上で, 将来の任意の時点でsubscriptionを開始したい場合にはsubscribeOn()を使用する.

前提として考えるよくあるケース

データフローを全てRx.Observableに載せて書こうとする場合, Observableの中間地点で副作用を起こす必要がある場合が存在する. たとえば, こんな感じ:

var value = ...;
var sideeffect = observable.do(() => {
    value.increment();
});

このような場合, observableから伝播してくるpush 1回に対してvalue.increment()が2度走ってしまうと困る. だが, sideeffectsubscribe()した回数分だけsubscriptionのチェーンは構築されてしまうので, push 1回に対してsideeffectがsubscibeされた回数だけvalue.increment()が実行されてしまう. さて, どうするか?

基本的な指針

  • 複数回subscribeしても重複処理の無いようにHot Observableに変換する
  • アトミックな処理区間を作る
  • 状態の変化を適切に関係する値に反映させる

つまるところ, モナドっぽいものを作りましょう, みたいな話だったりする……

Hot Observableに変換し, 途中のObservableが(内部的に)複数回subscribeされることが無いようにする

RxはObservableをsubscribeした場合に , 内部的なsubscribeを繰り返し, Observableのチェーンを最初まで遡るようになっている(by design). これの内部的な経路開通が複数回別個に行われるか, 内部的subscribeの経路を共有しているかが, ObservableがColdかHotかの違いとなっている.このため, 何も考えない場合, 同一のObservableが複数回subscribeされるイコール同一の処理がsubscribeされた回数分走ってしまうので, 副作用を起こす処理が複数回走ってしまって問題になったりする.

これを避けるために, 同一のObservableが複数回subscribeしても一つの経路しか開通しないようにHot Observableに変換する. 具体的にはshare()publish() + refCount())とかする.

類似のアプローチとして, 終端であるsubscribe()メソッドに渡したObserver(Subscriber)の中にのみ, 副作用を起こす処理を書くようにするというのもあるが, これをやりすぎると, Observableのchainが薄くなる一方で, subscribeがどんどん分厚くなっていく. これじゃ, ただのobserverパターンに逆戻りだよ!この手のアプローチを取らないと上手にハンドル出来ないケースも有るのだが, subscribeを厚くしなくてもハンドルできるケースは多いので, 末端のsubscribeに逃げずにハンドル出来ないかを最初に考えつつ動かすのが良いだろう.

なお, ObservableをHotに変換すればいいというものでもない. なぜなら, Hotでは最初のsubscribe/connect()以降に流れた値というのは取り逃してしまうが, Coldであれば, 新しくsubscribeする度にrootに向かってchainが走るため, 毎回初期値を流し込む必要のある処理を綺麗にObservableの中に閉じ込めることができる. パフォーマンスや従来のEvenEmiitterによるObserver patternを考えると, 防御的にHotに変換しつつ方が理解としては簡単だろう.

一方, 全部Coldで組んでおいて最適化の仮定でHotに変換していく方が, 迅速に動くものを作るという意味では簡単だったりもする(Hotに直してsubscriptionの数を減らす過程で, subscriptionの時系列問題が発生するので, 地味に面倒くさかったりするが……). 個人的な印象としては, 値を生成するジェネレータとしてObservableを使う場合はColdの方が良くて, UIイベントなどを取り扱う場合はObservableのchainがHotになるようにしたほうが総合して楽な印象.

アトミックな処理区間を生成するようにする

従来のJavaScriptだとそんなに意識することがないdata raceに関わる話. Observableを綺麗に小分けして組んでいるケースに問題になることが多い.

var a = event.map((id) => {
    return collection1.get(id));
});
var b = a.map((item) => {
    var val  = item.barfoo();
    return val;
});
var c = b.do((val) => {
   collection2.set(val);
});

ここで, a~cの処理はアトミックに一続きで処理される必要があるとする. その場合, 間に別の処理が割り込んできて, 変更対象のcollectionが変更されるとrace conditionとなって困る. 似た問題はRxに関係なく, 複数の処理が並行して動いていると発生する問題だが, 同期コードが多く, スレッドも一つしかなかったJavaScript界ではXHRのときくらいしか目立たなかった問題. NodeJSはfs.exist()などの非同期APIで問題となっていたし, 最近だとPromiseが内部的にmicrotask queue(emca262だとjob queueだっけ)を積み上げるようになっていて, 時々目にするようになった(余談だが, これについてPaul TaylorはPromiseの設計ミスだと考えているようで, 「es-observable proposalでは, この過ちを繰り返したくない」と主張していた……).

さて, RxJSにおいて, 各operatorメソッドは必ずしも連続して実行されることが保証されているわけではない. 基本的には連続して動作するのだが, operatorが内部で使うスケジューラによって変更が可能であるし, observeOn()などが途中に挟まれていると, その時点で保証は崩れることになる.

RxJS v5以降, 原則としてビルドインのオペレータは連続して(再帰的に)呼び出されるようにはなっているが, 先述した通り, observeOn()などを間に挟んだObservableを返す関数を呼び出したりするケースでは信頼できないのは変わりない.

function barfoo(in) {
    // 次のイベントループになったら後続に値を流すような設定になったとする.
    return out.observeOn(scheduler.nextEventLoop);
}

var a = ...;
var b = barfoo(a);
var c = b.do(....); // この場合, cの実行は次のイベントループになる

各operatorに渡された各コールバックの単位のみがアトミックに実行されるのが保証されている単位であり, これ以上の単位については自分でschedulerを用意したり, そもそも操作対象のオブジェクトの操作が可能な範囲を限定することで回避するしかない. 他の言語であれば, 特定のスレッドに張り付けたり, ロックを上手に使ってアトミック操作が担保される区間を作ればいい. 要はそれとおなじことをやればよい,

一番わかりやすいのは単一のdo/map内で処理を完結させてしまう解決方法. operatorの使い方はセマンティックではなくなったり, 別のobservableを引っ張ってこようとした場合に, ちょっとややこしくなるという欠点がある.

もう一つの方法としては, クロージャやprivateメンバを用いて, 変数に触れる函数・オブジェクトを限定した上で, Schedulerを明示的に設定したり, Hot Observableとして結果を同時するなどして, 同時に操作する可能性のある外部要因を減らし, オペレータのチェーンが一続きに実行される, アトミックな区間を作成すること.

// そもそも分割しないアプローチ.
var a = event.do((id) => {
    var item = collection1.get(id));
    var val  = item.barfoo();
    collection2.set(val);
});

// 関数内にクロージャ変数として閉じ込める
function barfoo() {
    var collection1 = ...;
    var collection2 = ...;

    var a = event.map((id) => {
        return collection1.get(id));
    });
    var b = a.map((item) => {
        var val  = item.barfoo();
        return val;
    });
    var c = b.do((val) => {
        collection2.set(val);
    });

    return c.share();
}

// privateメンバとして閉じ込める
class BarFoo {
    private _collection1: ....;
    private _collection2: ...;

    result: ....;

    constructor() {
        this._collection1 = ...;
        this._collection2 = ...;

        var a = event.map((id) => {
            return this._collection1.get(id));
        });
        var b = a.map((item) => {
            var val  = item.barfoo();
            return val;
        });
        var c = b.do((val) => {
           this._collection2.set(val);
        });

        this.result = c.share();
    }
}

状態の変化を適切に通知するようにする

そもそも副作用を面倒くさい点とは何たるや?と言えば, ある要素の状態が変更されたという事実が, その要素の状態に依存している(処理の前提としている)後続の別の処理まで適切に引き継がれないために, 状態の不整合が積み重なっていくということにある. 古典的なオブザーバーパターンであれば, イベントを受け取った後に特定のフラグを変更し忘れたりするようなやつ.

同期pull型のAPIベースでプログラムを組んでいても, 現在の状態を観測するpull APIの呼び出し結果を下手にキャッシュして効率を改善しようとした場合などに, 状態の整合性の取り方を間違えるとバグる. それと同じ問題と言える.

さて, ここでバグるのは, 最初に述べた通り, 系としての状態の不整合が発生している中で無理に処理を実行していった結果, 状態がさらに崩れていくからだ. pull型と違ってpush型の場合, ある値が更新・変更された場合で後続に必要な情報を取得しなおす習慣があまりない, というよりも非同期を許容するpush型APIの場合, pushを受けた後に前提条件をpullで取りに行ったとしても, pushで飛んできた状態が生成された時点での状態である保証されていないので, 組み合わせにくいというのが表現として妥当かもしれない.

このような場合どうするべきかと言えば, 答えは単純. 状態変更が後続に伝わらないのが問題なのだから, 状態変更が後続に伝わるようにすれば良い. つまり, 後続の処理に必要となる値をpushで流せるようにすれば良いのであって

  • 操作対象を独立したObservableとして, 状態が更新されると値が後続に流れるようにする(後続ではzip()/combineLatest()で待ち受ける)
  • 値を後続にpushするときに, 関連する状態をTupleか何かに突っ込んで一緒に後続に流す

のどちらかで解決する. 値の変更を伝える方法はいくつかあるが, Rxを用いて解決する場合は

  • 状態変更をObservableとして発信するデータ型を作る(ライブラリ層として隠ぺいする)
    • Map/Set/Arrayの派生型として, 値の挿入・削除・変更に対応してイベントを発行するものを作る
    • Reactive Propertyのようにもう一段メタな型として用意する
  • Subjectを使って, 明示的にonNextするのを頑張る

のどちらかとなる.

まとめ

  • Rxが, というよりも並行して動作するプログラムの中で状態の整合性を取る話が軸になる
  • Rxだとよく発生するというだけで, 別に今までも起こっていた話にすぎない
  • Rx.Schedulerは目立たないが, これが有ることで泥臭い事例を上手いこと乗り越えていける

Summary of the layered architecture for karen-irc’s client-side

This is the summary overview of karen-irc's client-side architecture. The history of my thought are wrote in the issue. I may talk about more details in somewhere.

And I deployed the simplified design of this to my main work code. Perhaps I or my co-worker may write about it in someplace.

CAUTION: karen-irc's code is refactoring still now, so there are some legacy part

Basic Principle

  • I wrote the summary as slide
  • Use many terminologies from Flux, because we started this design from the thought of Flux pattern.
    • But Flux pattern is just a design pattern, not an architecture. There are some missing peaces.

Key Components

The concept tree will be:

  • adapter
    • includes driver & adapter roles
  • domain
    • includes domain models (entity, value object, and etc)
  • intent
    • includes action & dispatcher roles.
      • this 'action' naming is derived from Flux, we may rename this role.
    • I feel this role is very similar to WPF (Windows Presentation Foundation)'s command. Should we change this name to command?
  • context
    • This is a unit of view content area and has a lifecycle itself. This terminology is inspired by Android's Activity.
    • only provides lifecycle.
  • output
    • viewmodel
      • This role has a responsibility of the state of view.
      • This transforms a data to other data structure which is more suitable for a corresponded view.
      • This name may confuse me with MVVM pattern's View Model... I thought it provides very similarity role, but I don't have concrete confidence...
    • view
      • This role has a responsibility of the representation of view.
      • We mixed composite this layer with using React.js and raw dom manipulation.
  • application
    • This is a base layer of an application.
    • This layer provides a base routing context, managing dependencies, initializing flow, and etc.
    • This layer should be rewritten to fulfill the requirement for each application.

Data Flow

A data flow should be represented as Rx's Observable, except a part of perf sensitive code.

Some of My Thought

Intent

Flux's ActionCreator and Dispatcher makes some misleading. By naming & grouping, we feel they are separated materials. But It's mistake. ActionCreator provides an abstraction to call Dispatcher without pass an event type enum. Thus ActionCreator is a function which is named as materialized intent. ActionCreator and Dispatcher should be tightly coupled.

Context

context is a unit of a view content area and has a lifecycle itself. An active context ensure that you have an ownership of the view content area under the given mount point, so you can render something to the mount point if you are called. This terminology is inspired by Android's Activity and Windows' UWP app lifecycle. This interface is here:

trait ViewContext {
  fn on_activated(&self, mountpoint: Element);
  fn on_destroy(&self, mountpoint: Element);

  fn on_resume(&self, mountpoint: Element);
  fn on_suspend(&self, mountpoint: Element);
} 

Each lifecycle method should return a Promise/Future if we need implement moving in/out transition. karen-irc does not need it yet. Therefore the return value is still simplified.

This only provide a lifecycle of view context, does not manage any dependencies. Dependencies are managed by an application layer for each application as most suitable style. So this does not provide any magic system like other react.js meta frameworks because their choice should be freely from our architecture. Each contexts are separated. Theoretically, you can use a raw dom manipulation in the context A, use React.js in context the B, and use Angular2 in the context C.

This does not correspond to a html:body element. context is a unit of view content area which is divided-and-ruled.

Adapter for RPC

I write my roughly thought about RPC adapter.

We can express this layer by some functions and some classes, as same as the relationship of between action and dispatcher in intent.

The key points is that We can materialize a RPC call concretely with giving a named function returning a Promise/Future/Observable

All RPC call is composited from:

  • protocol
  • address of endpoint
  • type definition of input/output

These 3 key components are very similar to "ABC" of WCF (Windows Communication Foundation) which is called as "Indigo" in Longhorn era.

And RPC client/service should manage connections sessions dispatched by self. Then they can create a returned response value for each requests. This means that: you can get a result as a Promise/Future/Observable even if the request is one way to a service, and you can transform a event stream by flatmapping the result value.

Why don't split out a generic framework part as a new package?

I don't like it :)

I agree it would be valuable thing to split out a generic data structure to reuse it. Because almost data structure represents data structure and there are not any architectural design. They are usually "non-political" code.

As a generalization, it will be very harder than imagine that to create a generic framework. I doubt the value to create a generic framework which can be used for all situations from this project. Even if we achieve to create some framework which looks like “generic”, it would need to spend a more times to make it a truly “generic. It comes to nothing!

The most important thing is reuse the philosophy. It is meaningless to reuse the code base to other applications which has significantly different requirements.

open souce関係の2015年まとめ

本業がマクロでそれなりに忙しかったんで、その関係もあって、興味関心がエンタープライズアプリケーションアーキテクチャとか、GUIアプリケーションアーキテクチャに向いていたので、なんか2015年はServoとかRustよりも専らECMA262かTypeScriptばかり書いてたような気がする。

とりあえずパッと思い出せる範囲で書きます.

Servo

  • issue/pull request/meeting noteは、年間通して概ね全部見てる
  • 前半はそこそこやってたけど、後半はそこまででもない
  • そこまで時間見れるpull requestは、手があいてればreviewしたりしてる
  • 来年はServoだけじゃなくて、GeckoでもWebKitでもBlinkでも、他のエンジンも触ってみたい

option-t

  • https://github.com/karen-irc/option-t てのを作りました
  • Rustのstd::optionをベースに, ECMA262 5thの範囲でOption<T>を作りました
    • CommonJSで書いてるけど、別にこだわりがあるわけではなく、NodeJS+browserifyしか自分の用途が無いだけ
    • まあ、TypeScript推奨ではあります
  • std::result相当のResult<T, E>(Either<A,B>)も作ったけど, 特に入れてない
  • 来年はperf optimizationするかなー

karen-irc

  • コンストせんせと「好き勝手できるircクライアント欲しくね?」という話になって、一緒にshoutをforkした
  • 元のコードがjQueryベースでメンテきついので、個人的な実験場も兼ねて、TypeScript + babelにしたり、設計全面rewriteしたり、ReactぶっこんだりRxぶっこんだりして色々遊んだ
    • まあ、リファクタリングは、まだ終わってないんだけど……
    • 一部知見が本業の方にフィードバックされた
  • 来年は、新機能追加とかそもそもの処理フローの最適化に手を付けようとおもいます

ReactiveX/RxJS

幾つか気になる点があったので、pull req送ったりissue立てたりした

標準化とか

w3c/requestidlecallback

whatwg/html

es-observable proposal

issue立てたりは他にも結構やった気がしますが、よく覚えてないので、まあいいや

まとめ

  • 思ったよりも小粒なことばっかりやってた一年だったなー
  • ちょっとkaren-ircに力入れすぎたね……
  • もうちょいガッツリとコード書かないとダメですねー

コミュニティの受容の印象いろいろ:Rx編

2015年を通して、Rxに関して色々調べてみた結果、(国内の)コミュニティごとに受容の温度感がだいぶ違ったので、ちょっとおもしろかった。

.NET(C#)界隈

2009年くらいからRxに付き合ってるだけあって、なんというか熱狂期を過ぎた感じ。冷静に受け止めて粛々と使う人は使うという印象。LINQがあり、LINQ to Eventsという売り文句でコミュニティに向けて発信されたのもあってか、リアクティブとかFRPとか、そういう単語で盛大に踊った痕跡もあんまり残ってない。Channel9とかinfoQとかにアップロードされてるBart de Smetの解説動画とかも、思想の源流としてリアクティブの話はしつつも、IEnumerableとのdualityを話す。

Android界隈

リアクティブという言葉のマジックを第一印象として大切にしつつも、Promise/Future相当の統合兵器として、粛々と実践利用している印象。cookpadが使っているというのが国内における普及の一押しのようには感じている。QiitaにはArrayの拡張メソッドの代わりに使おうと試みる記事も上がっているものの、その用途の道具じゃないので、その一派は割りと早期に廃れた模様。2014年のJavaScriptコミュニティがReactで盛り上がっている一方でAndroid界隈はRxJavaで盛り上がっていた、という感じだろうか。

Java界隈

Reactive Streamsからの流れで、Java/Scala関係なくサーバーサイドアーキテクチャでpush型サービスモデルを組むための道具の一つとして受け止められているように見受けられる。日本のJavaコミュニティといえばScalaも混ざっているので、当然ScalaのFunctionalでMonadicな側面と合わせて論じられていたりもする、という印象を受ける。

JavaScript界隈

たぶんいちばん反応が若い、というか初々しい。2014~15年と、コミュニティのメインストリームはReact~ES6と来ているので、Rx一派は未だ傍流なのと(消化が進んでいないとも言う)、Cycle.jsとその作者であるAndré Staltzが大のFunctional好きなのもあってか、「リアクティブ」「Functional Reactive Programming」といった単語が魔法の言葉として長らく扱われている。その魔法のベールにこそ、Reactの次のコミュニティマウンティングとメイクマネーチャンスの鍵が眠っているかのようにも扱われているが、その神秘のベールがコミュニティの合意として明かされる日までは、もう少しかかりそうな感じ。

番外: Netflix界隈

2013年1月のReactive Programming at Netflix以来、会社のカラーとしてReactiveを押している。が、あくまでも枕詞として会社のカラーを押すためのキャッチーな単語を選んだという感じで、彼らのtechconf動画を見ると、RxJavaでpush型の世界をどうハンドリングするかの話やobserver patternやIterableに対する双対としてのRxの話が多く、地に足の着いた話しか出てこない。

ReactiveX: コレクションに内包された複数のObservableの変更を受け取りたい場合にどうするか

まあサンプルコードはRxJSベースで書いてるわけですが.

Reactのようなone-way data flowなテンプレートエンジンとObservableを用いて, フォーム形式のアプリケーションを作っている場合, Observableの入れ子で表したいデータ構造をつくりたいことがある. たとえば, テーブルの各セルをそれぞれ別に編集できるような, 擬似的なエクセルもどきのフォーム. Observableを使って表そうとすると, 以下のような定義をすることになるだろう.

// テーブルの各行に相当. 中身が変更されうるのでObservableで表したい.
class Row implements Observable;

// テーブルを表すRowの集合. Array<T>やSet<T>でも良いが, どれが編集されたかを伝える都合上, 
// 各行のIDとなるキー_K_で逆引きできるMapとする
type Table = Map<K, Row>;

// テーブルそのものを示すObservable.
// これをsubscribeしてReactに渡してやれば, テーブル形式のViewが出来上がる…ということにしておこう
type ViewModel = Observable<Table>; // Observable<Map<K, Observable>>

このようにObservableが入れ子になるデータ構造において, Map(Collection)の中に包まれたObservableの変更を, 最上位のObservableにどうやって伝えるか?ということについて書いてみる. 実用するには必ず困る事案なのに, この手の内容について, あまり先行する情報がない気がする. Proof of Conceptレベルなので最適化が済んでない場合はご容赦を. そもそものアプローチとして, データ構造などを変更して, もう少しうまくやる方法もあるかもしれない点には注意.

まあ結論を先に述べると:

  • 基本的な指針は, 内包されている側の変更を内包している側に伝える事
  • Observableの数が静的に決まる場合はそんなに困らない
  • 困ったら「Observableをつないでいくスタイル」にこだわらずに, 古典的なObserverパターンに則ってみる(なぜならRxはObserverパターンを下敷きにおいているから)

に要約されると思う.

collectionの中に内包されるObservableが静的に決定される(要素数が固定)場合

これは全く難しくない. 内包される変更をがあったら, collection全体を流しなおすようにするだけだ. だいたいこんな感じ.

import {Observable} from 'rx';

class FixedLengthTableRow {
    a: Observable<A>;
    b: Observable<B>;

    constructor() {
        this.a = ...;
        this.b = ...;
    }

    asObservable: Observable<void> {
        return this.a.merge(this.b);
    }
}

class FixedLengthTable {
    private _collection: Observable<Map<K, FixedLengthTableRow>;

    constructor() {
        let updater = Observable.never();
        const collection = new Map();
        for (let i = 0; i < 10; ++i) {
            const r = new FixedLengthTableRow();
            collection.set(i, r);
            // コレクションが内包する全ての要素の変更を受け取るチャンネルを開通させ, 巻き込んでいく
            updater = updater.merge(r.asObservable());
        }
        
        // 内包するいずれかの要素の変更があったら, 状態が変わったものとしてコレクション自体を後続に流しなおす.
        this._collection = updater.map(() => collection);
    }

    asObservable(): Observable<Map<K, FixedLengthTableRow> {
        return this._collection;
    }
}

const table = new FixedLengthTable();
table.asObservable().subscribe((collection) => {
    // collectionの中身が更新されると勝手に流れてくる
});

collectionの中に内包されるObservableが動的に決定される(要素数が可変)場合

これはちょっと面倒臭いように思える. 一度構築したObservableのチェーンは動的に繋ぐものを増やしたり減らしたりできないためだ. Rx入門の多くで語られる「FRP way」に思考を奪われると本当に難しく思えてしまう.

が, これはSubjectを用いて, 古典的なobserverパターン的に解いてやれば良い. こんな感じ.

class DynamicLengthTableRow {
    a: Observable<A>;
    b: Observable<B>;

    constructor(notifier: Subject<void>) {
        this.a = ...;
        this.b = ...;

        const updating = this.a.merge(this.b);
        updating.subscribe(() => {
            // 状態変更が起きたら変更を通知する.
            notifier.onNext();
        });
    }
}

class DynamicLengthTable {
    private _receiver: Subject<void>;
    private _collection: Observable<Map<K, DynamicLengthTableRow>>;

    constructor() {
        // コレクションが内包する全ての要素の変更を受け取るチャンネルを用意する
        this._receiver = new Subject();

        const collection = new Map();
        for (let i = 0; i < 10; ++i) {
            const r = new DynamicLengthTableRow(this._receiver);
            collection.set(i, r);
        }
        // 内包するいずれかの要素の変更があったら, 状態が変わったものとしてコレクション自体を後続に流しなおす.
        this._collection = this._receiver.map(() => collection);
    }

    receiver(): Subject<void> {
        return this._receiver;
    }

    asObservable(): Observable<Map<K, DynamicLengthTableRow>> {
        return this._collection;
    }

    add(row: DynamicLengthTableRow): void {
        this.add(newKey, row);
    }
}

const table = new DynamicLengthTable();
table.asObservable().subscribe((collection) => {
    // collectionの中身が更新されると勝手に流れてくる
});

// こんな風に増減できる
const row1 = new DynamicLengthTableRow(table.receiver());
table.add(row1);

リアクティブとかファンクショナルとか言わないReactiveX入門

しばし見かけるReactive Extensions(ReactiveX, Rx)に関する説明の多くはファンクショナルだのリアクティブだのモナドといったキャッチーなフレーズを使っている。けれども、そういう方面に馴染みのない人を相手にして、そもそもの概念的に何を解決したかったものなのかという説明があんまりない気がした。ユースケースを伴った説明も局所解すぎて現実における使い道がわかりにくい、というか誰も彼もデータバインディングしか例に出さないのはどうなんだ。これはストリームの川?それは表現形態であって実態を表しているとは言い難いだろう。

放置していても誰も書かない気がするし、神秘的な霊験と共に語られても全く役に立たないし、自分の思考の整理と(幾らかは同僚への説明も兼ねて)書いてみることにする。

もしかすると.NET界隈あたりでは過去にやりつくしたネタの再生産かもしれないし、ラジオなどの非文章媒体で既にあるのかもしれないけれど、見つからなかったので、まあいいや。


プログラミングの手法としてイベント駆動でコールバックを呼び出すパターンが存在する、というのは周知の事実。これは「何かが変化したら、それが変化したことを任意の対象に対して通知する」のに役立つ。典型的なのはGUIプログラミング。ユーザーがクリックしたことがイベントとして通知され、それを受けて処理を行う。ネットワークリクエストの結果を受け取るコールバックも、レスポンスが帰ってきたという局所的なイベントを受けて動作する点で変わりはない。

プログラミング的にもイベントが飛んでくるという抽象化がなされている方がやりやすいことは、それなりにある。もしイベントがないと延々とポーリングをして状態の変更を確認し続けなければならない。面倒臭い。

ところが、この古典的なイベント処理には問題がある。イベントというのは本質的に非同期に突如としてやってくるものであり、それぞれがバラバラに飛んでくる(離散的に、と言い換えても良いかもしれない)ものであるのが殆どだからだ。ネットワークレスポンスの終端イベントは開始イベントの後にやってくる、という程度の順序はあるけれど、個々のイベント自体は、自信がどういう順序関係で発行されたのかを情報としては持ち得ていない。

イベントが全部バラバラに飛んでくる以上、「画面がリサイズされた後にクリックが起こった場合」というような状況を適切に処理したい場合、各イベントのユーザーであるプログラマは、自分でフラグを立てるなどをしてイベントの順序を管理するしかない。イベントAに対応するフラグA’を立てて、その状態でイベントBが来たらB’を立てて、A’とB’の両方がtrueであれば後続に処理をつないでいきetc。フラグが増え続け、破綻する日は近い。

イベントAとBが両方とも来たことを示すCというイベントを発行する、という手法もあるが、これはイベント粒度を細くすればするほどやることが増えていくし、各イベントが取り扱う変化の範囲で悩み始める。

このイベントが多すぎる・関係性を持ち始める問題は、わかりやすく例えれば、GUIがリッチになるにつれ問題となっていく。GUIの変更・ユーザーの操作それぞれが条件となり、次に起こすべき変更もさらに条件となっていく。しかも、これがモバイル端末の上ともなると、ネットワークの状態、バッテリーの具合、環境光の変化などが出てくる。クライアントではなくサーバー側であれば、クラスタ内のノードの死活状態やログの送受、広くはクライアントからのリクエストもイベントとみなすことができるだろう。混沌としてくる。

こうした状況に対してLINQ to Eventsとも呼称されるReactiveXが提示した解決策は、イベントは離散的なデータの集合であり、それらから任意のイベントの関係性を取り出す方法があれば良いということ。度々例示してきたように、一見バラバラで独立している個々のイベントにも、実際に利用する際には関係性が存在している。その関係性をコードとして明示しクエリとして処理することで、コードを通して人間にも読める形式で、機械的に結果を取り出すことを可能にした。この結果、複数のイベントの関係性から別のイベントを生成することが容易になり、イベントに紐づく変化の範囲についてコードを書く側が悩む必要もなくなった(必要に応じてイベントを細かく割りやすくなった)。

これと同時にスケジューラという概念を導入することで、取り出したイベントを後続に伝えるタイミングや、イベントを発行する対象のスレッドなども一連の流れの中で制御できるようにした。

それと、イベントを文字列やsymbolのような固定値ではなくオブジェクトとし、構文ないしメソッドの形式でイベントの関係性を表現したことで、コード解析技術を通してイベントの関係性の解析・図示や入力補完といったことが可能になった。これにより、アプリケーション規模が巨大になっていってもIDEなどを通してコードを掌握しきることができるので、ある種のgoto jumpであったイベントハンドリングがコード上に静的な形式で落とし込まれることになった。

私のブログを読んでいる人はJavaScriptの読み書きをする人が多いと思うので、JavaScriptのPromiseとの関連で説明しておくと、Promiseは原則として1回しか自身の状態を変更できない = 1回限りのイベントにしか使えない(なので、実質的にはpull型で動き始める)。何回でも発生しうるイベント全般に対する適用ができないから、Promiseだけではイベントの関係性は表現し得ない。

さて、先述のような抽象化を実施するために、Rxの実装の裏側では多くの一時オブジェクトを生成するなどしているため、実行時のコストがかかるように思える。だが、ハードウェアの進歩とコンパイラVMの進化、及び抽象化に伴うプログラミング時の生産性の向上と合わせると、アプリケーションのレイヤーではコストとして無視するという選択が可能になった。コストが無視できない環境では従来通り頑張って書けばいいだけで、事態が悪化したわけではないし、言語そのものを乗り換える(JS -> Java/C# -> Cpp)ことでも案外解決できたりする。

また、イベントを発行することに伴うプログラミング時の負担が減った結果、状態の変更をイベントとして気軽に通知しやすくなった。これにより、ある意味では副作用を起こしやすくなった。副作用による問題の一つは、ある状態に依存している(ある状態であることを仮定している)後続の処理群が、前提条件の変更を適切にハンドリングできない点にある。故に、気軽に状態が変更された事実をイベントとして伝え、再度計算し直すことで、全体としての整合性を保てるようになる(だからと言って、気軽に手当たり次第に副作用を起こしていいかというとそれは別)(再計算によるコストも、前述の通り実行環境の進化で無視できるという前提に立っているので、無視できない変更が起こるケースは再計算せずに済ませるようにした方がいい)。

ReactiveX.ioのイントロに「俺たちはFunctional Reactive Programmingじゃない」って書いてあるのは、各自読んでもらうとして、だいたいこんな感じ?