ReactiveX: コレクションに内包された複数のObservableの変更を受け取りたい場合にどうするか

まあサンプルコードはRxJSベースで書いてるわけですが.

Reactのようなone-way data flowなテンプレートエンジンとObservableを用いて, フォーム形式のアプリケーションを作っている場合, Observableの入れ子で表したいデータ構造をつくりたいことがある. たとえば, テーブルの各セルをそれぞれ別に編集できるような, 擬似的なエクセルもどきのフォーム. Observableを使って表そうとすると, 以下のような定義をすることになるだろう.

// テーブルの各行に相当. 中身が変更されうるのでObservableで表したい.
class Row implements Observable;

// テーブルを表すRowの集合. Array<T>やSet<T>でも良いが, どれが編集されたかを伝える都合上, 
// 各行のIDとなるキー_K_で逆引きできるMapとする
type Table = Map<K, Row>;

// テーブルそのものを示すObservable.
// これをsubscribeしてReactに渡してやれば, テーブル形式のViewが出来上がる…ということにしておこう
type ViewModel = Observable<Table>; // Observable<Map<K, Observable>>

このようにObservableが入れ子になるデータ構造において, Map(Collection)の中に包まれたObservableの変更を, 最上位のObservableにどうやって伝えるか?ということについて書いてみる. 実用するには必ず困る事案なのに, この手の内容について, あまり先行する情報がない気がする. Proof of Conceptレベルなので最適化が済んでない場合はご容赦を. そもそものアプローチとして, データ構造などを変更して, もう少しうまくやる方法もあるかもしれない点には注意.

まあ結論を先に述べると:

  • 基本的な指針は, 内包されている側の変更を内包している側に伝える事
  • Observableの数が静的に決まる場合はそんなに困らない
  • 困ったら「Observableをつないでいくスタイル」にこだわらずに, 古典的なObserverパターンに則ってみる(なぜならRxはObserverパターンを下敷きにおいているから)

に要約されると思う.

collectionの中に内包されるObservableが静的に決定される(要素数が固定)場合

これは全く難しくない. 内包される変更をがあったら, collection全体を流しなおすようにするだけだ. だいたいこんな感じ.

import {Observable} from 'rx';

class FixedLengthTableRow {
    a: Observable<A>;
    b: Observable<B>;

    constructor() {
        this.a = ...;
        this.b = ...;
    }

    asObservable: Observable<void> {
        return this.a.merge(this.b);
    }
}

class FixedLengthTable {
    private _collection: Observable<Map<K, FixedLengthTableRow>;

    constructor() {
        let updater = Observable.never();
        const collection = new Map();
        for (let i = 0; i < 10; ++i) {
            const r = new FixedLengthTableRow();
            collection.set(i, r);
            // コレクションが内包する全ての要素の変更を受け取るチャンネルを開通させ, 巻き込んでいく
            updater = updater.merge(r.asObservable());
        }
        
        // 内包するいずれかの要素の変更があったら, 状態が変わったものとしてコレクション自体を後続に流しなおす.
        this._collection = updater.map(() => collection);
    }

    asObservable(): Observable<Map<K, FixedLengthTableRow> {
        return this._collection;
    }
}

const table = new FixedLengthTable();
table.asObservable().subscribe((collection) => {
    // collectionの中身が更新されると勝手に流れてくる
});

collectionの中に内包されるObservableが動的に決定される(要素数が可変)場合

これはちょっと面倒臭いように思える. 一度構築したObservableのチェーンは動的に繋ぐものを増やしたり減らしたりできないためだ. Rx入門の多くで語られる「FRP way」に思考を奪われると本当に難しく思えてしまう.

が, これはSubjectを用いて, 古典的なobserverパターン的に解いてやれば良い. こんな感じ.

class DynamicLengthTableRow {
    a: Observable<A>;
    b: Observable<B>;

    constructor(notifier: Subject<void>) {
        this.a = ...;
        this.b = ...;

        const updating = this.a.merge(this.b);
        updating.subscribe(() => {
            // 状態変更が起きたら変更を通知する.
            notifier.onNext();
        });
    }
}

class DynamicLengthTable {
    private _receiver: Subject<void>;
    private _collection: Observable<Map<K, DynamicLengthTableRow>>;

    constructor() {
        // コレクションが内包する全ての要素の変更を受け取るチャンネルを用意する
        this._receiver = new Subject();

        const collection = new Map();
        for (let i = 0; i < 10; ++i) {
            const r = new DynamicLengthTableRow(this._receiver);
            collection.set(i, r);
        }
        // 内包するいずれかの要素の変更があったら, 状態が変わったものとしてコレクション自体を後続に流しなおす.
        this._collection = this._receiver.map(() => collection);
    }

    receiver(): Subject<void> {
        return this._receiver;
    }

    asObservable(): Observable<Map<K, DynamicLengthTableRow>> {
        return this._collection;
    }

    add(row: DynamicLengthTableRow): void {
        this.add(newKey, row);
    }
}

const table = new DynamicLengthTable();
table.asObservable().subscribe((collection) => {
    // collectionの中身が更新されると勝手に流れてくる
});

// こんな風に増減できる
const row1 = new DynamicLengthTableRow(table.receiver());
table.add(row1);