(cache) Gallonworks 応用 C++

応用 C++

非同期タスク

以下のコードは、30個のタスクを同時に実行します。各タスクは、一秒間 sleep_for するだけです。

// async1.cpp #include <algorithm> #include <chrono> #include <future> #include <iostream> #include <iterator> #include <thread> #include <vector> using namespace std; int main(int, char *[]) { vector<future<void>> futures; for(auto i = 0; i < 30; i++) { auto src = async([]{this_thread::sleep_for(chrono::seconds(1));}); futures.push_back(move(src)); } for_each(begin(futures), end(futures), [](future<void> & I){ I.wait(); }); cout << "done." << endl; return 0; }

実行すると、一秒で制御を戻します。要約すると、future<void>vectorasync のラムダを30個格納して、wait で各 async の終了を待つだけです。

async について、デフォルト動作であるこのケースでは、生成された時点でタスクが開始されています。async は、future のラッパーなので、future<void> に代入しています。なぜここで void なのかというと、ラムダの返値が void だからです。返値が int なら future<int> になります。その場合の例を以下のコードに示します。

// async2.cpp #include <future> #include <iostream> using namespace std; int ref1() { return 1; } int ref2() { return 2; } int main(int, char *[]) { future<int> f1 = async(ref1); future<int> f2 = async(ref2); int result = f1.get() + f2.get(); cout << "result = " << result << endl; return 0; }

async には関数、ラムダ、ファンクタを渡すことができます。

スレッドの返値

以下のコードは、各スレッドから計算結果を取得、表示します。結果を取得するための future は、promise に関連づけられているので、それを使います。

// promise.cpp #include <functional> #include <future> #include <iostream> #include <thread> using namespace std; int int1() { return 1; } int int2() { return 2; } void to_promise(int(*calc)(), promise<int> & p) { p.set_value(calc()); } int main(int, char *[]) { promise<int> p1, p2; future<int> f1 = p1.get_future(), f2 = p2.get_future(); thread t1(&to_promise, int1, ref(p1)), t2(&to_promise, int2, ref(p2)); int result = f1.get() + f2.get(); t1.join(); t2.join(); cout << "result = " << result << endl; return 0; }

promise はコピー禁止なので ref を使う必要があります。

非同期例外

以下のコードは、別のスレッドで発生した例外を処理します。

// exception1.cpp #include <functional> #include <exception> #include <iostream> #include <stdexcept> #include <thread> using namespace std; void thrower(exception_ptr & p) { try { throw runtime_error("throwed"); } catch(...) { p = current_exception(); } } int main(int, char *[]) { exception_ptr e; thread t(thrower, ref(e)); t.join(); try { if(e != exception_ptr()) rethrow_exception(e); } catch(const exception & x) { cout << x.what() << endl; } return 0; }

current_exception で初期化された exception_ptr の内容は唯一なので、rethrow_exception を通して元スレッド内で再スローします。

アトミック型

スレッドの競合状態から保護される組み込み型を提供します。型名の最初に atomic_ がついている型、たとえば atomic_intatomic_uint などがそうです。

// atomic.cpp #include <algorithm> #include <atomic> #include <iostream> #include <thread> #include <vector> using namespace std; atomic_int total_iterations; vector<int> vectors; void finder(int element) { auto iterations = 0; find_if(begin(vectors), end(vectors), [=, &iterations](int I) -> bool { iterations++; return I == element; }); total_iterations += iterations; cout << "Thread #" << this_thread::get_id() << ": "; cout << "element " << element << " "; cout << "found after " << iterations << " iterations" << endl; } int main(int, char *[]) { for(int i = 0; i < 1000; i++) vectors.push_back(i); random_shuffle(begin(vectors), end(vectors)); thread t(finder, 0); finder(500); t.join(); cout << total_iterations << " total iterations" << endl; }

意味合いとしてのサンプルです。total_iterationsatomic_int として競合状態から保護されています。

排他処理

std::mutex はマルチスレッディングでのアクセス制御を行います。lockunlock が提供されています。

std::mutex m; int n; // 共有データ m.lock(); n++; // 共有データの操作 m.unlock();

共有データを操作するコード領域(クリティカルセクション/リージョン)を lock()unlock で囲むと、ただ一つのスレッドが共有データにアクセスすることを保証します。ある lock 中のコード領域に他のスレッドが lock を試行すると、unlock されるまで他のスレッドは実行をブロックされます。

これとは別に、コード領域の lock をブロックされることなく取得する try_lock が提供されています。

std::mutex m; int n; // 共有データ if(m.try_lock()) { n++; // 共有データの操作 m.unlock(); } else { // 他の操作 }

std::recursive_mutex は複数回取得できる lock 操作と、取得した回数だけ呼び出さないと unlock しない unlock 操作を提供しています。

std::recursive_mutex m; int n; // 共有データ void f(int count) { m.lock(); n++; // 共有データの操作 if(--count > 0) f(count); m.unlock(); }

指定の時間だけ lock する操作は、std::timed_mutex が提供します。

std::timed_mutex m; int n; // 共有データ if(m.try_lock_for(std::chrono::seconds(5))) { // 五秒間 n++; // 共有データの操作 m.unlock(); } else { // 他の操作 }

ロック

ロックは、接尾辞 _lock を持つオブジェクトで、ロックの破棄時に、事前に与えた mutexunlock します。これは、例外安全なふるまいを実現します。

std::mutex m; int n; // 共有データ void f() { std::unique_lock<mutex> l(m); // ここで lock している n++; // 共有データの操作 } // ここで unlock している

複数のロックを取得する際は、以下のコードのようにすべきです。

std::mutex m1, m2, m3; void f() { std::unique_lock<mutex> l1(m1, std::defer_lock); std::unique_lock<mutex> l2(m2, std::defer_lock); std::unique_lock<mutex> l3(m3, std::defer_lock); lock(l1, l2, l3); } // ここで unlock している

defer_lock ポリシーは、渡した mutex をロックせずに unique_lock の管理下へ置きます。その後、前コードのように、破棄前にどこかで lock しなければなりません。

std::mutex m1, m2, m3; void f() { lock(l1, l2, l3); std::unique_lock<mutex> l1(m1, std::adopt_lock); std::unique_lock<mutex> l2(m2, std::adopt_lock); std::unique_lock<mutex> l3(m3, std::adopt_lock); } // ここで unlock している

adopt_lock ポリシーでは、ロック済みの mutex を渡し、それを unique_lock の管理下へ置きます。

以下のコードは、std::mutex を使った排他処理のサンプルです。

// timelock.cpp #include <chrono> #include <iostream> #include <mutex> #include <thread> using namespace std; mutex m; void locker() { // 定期的にロックをかけたりはずしたり for(int i = 0; i < 3; i++) { { lock_guard<mutex> l(m); cout << "locking with second" << endl; this_thread::sleep_for(chrono::seconds(1)); cout << "releasing with second" << endl; } this_thread::sleep_for(chrono::seconds(1)); } } void acquisition() { // 定期的に locker() のロックを取得しに行く for(int i = 0; i < 10; i++) { { unique_lock<mutex> l(m, try_to_lock); if(l) { cout << "lock successful" << endl; cout << "releasing successful" << endl; } else { cout << "lock failed, hibernating" << endl; } } this_thread::sleep_for(chrono::milliseconds(500)); } } int main(int, char *[]) { thread t(locker); this_thread::sleep_for(chrono::milliseconds(500)); acquisition(); t.join(); return 0; }

locker() は一秒おきに mutex のロック/非ロックを繰り返します。lock_guard は、実体化したスコープにロックをかける構文です。

acquisition() は 0.5秒おきにロックを取得しようと試みます。try_to_lock ポリシーを設定した unique_lock は、ロックの取得を試み、可能なら取得する構文です。

条件変数

以下のコードは、スレッド間でのイベント処理のサンプルです。

// condition_variable.cpp #include <chrono> #include <condition_variable> #include <iostream> #include <mutex> #include <queue> #include <thread> using namespace std; mutex m; condition_variable cond; queue<int> produced; bool done = false; void produce() { for(int i = 0; i < 5; i++) { this_thread::sleep_for(chrono::seconds(1)); unique_lock<mutex> l(m); cout << "producing " << i << endl << flush; produced.push(i); cond.notify_all(); } done = true; cond.notify_all(); } void consume() { unique_lock<mutex> l(m); while(!done) { cond.wait(l); while(!produced.empty()) { cout << "consuming " << produced.front() << endl << flush; produced.pop(); } } } int main(int, char *[]) { thread producer(produce), consumer(consume); producer.join(); consumer.join(); return 0; }

producer スレッドは、一秒ごとにキューへ値をプッシュします。consumer スレッドは、値がプッシュされると、wait() で停止していた状態から、notify_all() の実行によって再開されます。これを使って、consumer スレッドが過度のスリープ状態にならないようにします。

© 2012 Gallonworks