Rustの並列タスクモデルのメモ
「メモリ安全性と並列実行性に注目した言語」と語られる割に、メモリ安全性はともかく、どのようにRustの並列実行モデルが成り立っているのかについて語られる事が殆どない気がしたので、Rust Tasks and Communication Tutorial を大雑把ながら読み解いていくことで、Rust 0.7での並列モデルを探っていきたいと思います。
尚、サンプルコードはチュートリアルのものを元に一部改変を加えて使用してます
Rustにおける並列実行モデル
- 実行されるRustプログラムは、それぞれがスタックと独自に所有権を持つヒープが割り振られた、タスクツリーによって構成される
- Rustにおける並列実行モデルは、軽量かつメモリ独立したタスクと、それらの相互のメッセージ通信によって成り立っている。
- Rustにおけるタスクはスレッドではない。細かく分割されたタスクを、Rustの内部スケジューラによって管理・実行している
- マルチコアな環境ではデフォルトで並列的にスケジュールされる
- VMにおけるグリーンスレッドのようなものを考えると良い
- 言語のセマンティクスで並列実行モデルを提供しているわけではない
- 構文レベルでの補佐は、メモリ安全を保証する型システム
- 標準ライブラリとの組み合わせによって提供される
- タスクは障害の分離と回復を提供するが、
catch
などを用いた例外処理は存在しない- 代わりに、タスク同士が相互に失敗をモニターする形式
タスクの生成
std::task::spawn()
を用いることでタスクが生成される。- 生成されたタスクの実行順序は保証されない
use std::task; // `spawn()`が既にインポートされているので、本来は必要無い fn hoge() {...} task::spawn(hoge); //関数を渡す spawn( || println("hoge") ); //クロージャを渡す // do構文を用いてspawnに渡す do spawn { println("hoge"); } // タスクの実行順序は保証されないため、出力順序は実行毎に変わる for int::range(0, 20) |child_task_number| { do spawn { println(fmt!("%d", child_task_number)); } }
タスクとのコミュニケーション
- タスクとのコミュニケーションにはコミュニケーションチャンネルを使う
- 複数タスク間から直接書き込みが可能な共有変数はRustにおいて許可されていないため
std::comm::stream()
によってコミュニケーションチャンネルを生成する- pipeは入口と出口を持つ
- channelが入口
- portが出口
- portが値を受け取るまで、処理はそこで停止する。
- Servoのメイン関数内にメインループが存在しないのは、これを使っているため
- portとchannelは基本的に一対のものであり、複数のタスク間でひとつのchannelを使い回す事はできない
- Rustの型システムを考えると普通ですよね
// コミュニケーションチャンネルを作る let (port, chan): (Port<int>, Chan<int>) = std::comm::stream(); println(fmt!("%d", 1)); do spawn { let result = 4; println(fmVdt!("%d", 3)); chan.send(result); } println(fmt!("%d", 2)); let result = port.recv(); //chan.send()が呼ばれるまでここで処理は止まる println(fmt!("%d", result)); // うちの環境では1234と出力されました
複数のタスクから一本のchannelを使いたい場合
- 複数のタスクから一本のchannelを使いたい場合、
std::comm::SharedChan
を用いて共有channelを生成し、タスク内でclone()
して使用する
use std::comm; let (port, chan) = comm::stream(); let shared = comm::SharedChan::new(chan); for std::int::range(0, 3) |val| { // 生成した共有channelはclone()して使う let child_chan = shared.clone(); do spawn { child_chan.send(val); } } let result = port.recv() + port.recv() + port.recv(); println(fmt!("%d", result));
バックグラウンドでの並列実行
extra::future
を用いることで、バックグランドでの並列実行を行う事が出来るextra::future::spawn()
はFuture
型を返すだけで、処理をブロックしないFuture
に実装されたget()
を呼ぶ事で結果を取得できる。- そのため、
Future
が返って来る変数はmutableである必要がある
- そのため、
let mut delayed_fib = extra::future::spawn (|| /* 時間のかかる処理 */ ); hoge(); // 処理が止まる事無く継続する let bar = delayed_fib.get(); //Futureの結果の取得
データをコピーせずにimmutableなデータを共有する
- borrowed pointer、managed pointerはタスク間で移動できないので今回は使えない
- この用途では
extra::arc::ARC
を使う- Atomically Reference CountingのARC
- これを使う事でAtomically Reference Counted wrapperが生成される
- ARC wrapperを
clone()
し、各タスクにpipe経由で送る使用方法がチュートリアルでは例示されている
extern mod extra; use std::int; use extra::arc::ARC; let number: ~int = ~1; let arc = ARC(number);// ARCを作成 for int::range(1, 10) |num| { let (port, chan) = std::comm::stream(); chan.send(arc.clone());//ARCをclone do spawn { let local: ARC<~int> = port.recv(); let task_num = local.get();// ARCでラップされた値への参照 println(fmt!("%d", **task_num + num)); } }
タスクの失敗
- Rustは組み込みで
fail!()
マクロを持っており、これを呼ぶ事で例外を発生できる - Rustにおいて、各タスク内で発生した例外の回復は不可能
- デフォルトの状態では、すべてのタスクはlinkedな状態であり、例外が発生した場合すべてに伝播する
- これがタスク同士で相互に失敗をモニターしているという意味
- タスクの失敗をどう取り扱うかについては、幾つかの手法が存在している
std::task::try()
を用いて処理の結果を判断する
std::task::try()
を用いて処理の結果を判断するエラーハンドリングstd::task::try()
は、渡したタスクの結果に応じてResult
というenum型の値を返すResult
はOk
またはErr
というどちらかの値を持っているOk
にはパターンマッチで返ってきた値を渡す事が可能Err
は有用なメッセージを持たないが、持たせることを含めて検討中
use std::task; use std::result::Result; let result: Result<int, ()> = do task::try { if some_condition() { 1 } else { fail!("oops!"); } }; assert!(result.is_ok());//タスクが成功していればtrue
タスクの失敗のモード
- タスクの失敗については、複数のモードが存在する
双方向リンク(bidirectionally linked)モード
std::task::spawn()
によって生成されるタスクにおける失敗は双方向性リンクされている(bidirectionally linked)モードである- 親子の別なく、ひとつのタスクが失敗すると関連するタスクが全部失敗するのがこのモード
do spawn { do spawn { fail!(); } hoge1();//失敗 } hoge2();//失敗
一方向リンク(unidirectionally linked)モード
- 「親は任意の子タスクを殺せるが、子タスクの失敗によって親タスクが死ぬのは避けたい」という場合に使う。
- このモードは
std::task::spawn_supervised()
によって生成される std::task::spawn()
とメッセージパッシングを用いた例がチュートリアルに載っている- スーパーバイザーという音の響きがとても良い
- これにより、親タスクによる子タスクの失敗の管理などが可能になる
- ただし、親タスクが死んだ場合、子タスクもまとめて死ぬ事になる
- このモードは
do task::spawn_supervised { fail!(); } hoge();//子タスクで発生したエラーは祖先方向に伝播しない
失敗が独立した(isolated failure)モード
- 親タスクの失敗と子タスクの失敗を完全に独立させたい場合、
std::task::spawn_unlinked()
を使用する - チュートリアルのまんまです
let (time1, time2) = (random(), random()); do task::spawn_unlinked { sleep_for(time2); // 親からのエラー伝播による中断無し fail!(); } sleep_for(time1); // 子からのエラー伝播による中断無し fail!(); // MAX(time1, time2)がプログラムの実行時間となる
タスクの親子間でのコミュニケーション
- タスクの親子間でのメッセージパッシングを行いたい場合、
extra::comm::DuplexStream()
を使用する DuplexStream
はメッセージの送受信が可能なエンドポイントを生成する
let (parent, child) = extra::comm::DuplexStream(); do spawn { let c: int = child.recv(); assert!(c == 1); child.send(2); } parent.send(1); let p: int = parent.recv(); assert!(p == 2);