(cache) Gallonworks 応用 C++
応用 C++
非同期タスク
以下のコードは、30個のタスクを同時に実行します。各タスクは、一秒間 sleep_for するだけです。
// async1.cpp
#include <algorithm>
#include <chrono>
#include <future>
#include <iostream>
#include <iterator>
#include <thread>
#include <vector>
using namespace std;
int main(int, char *[]) {
vector<future<void>> futures;
for(auto i = 0; i < 30; i++) {
auto src = async([]{this_thread::sleep_for(chrono::seconds(1));});
futures.push_back(move(src));
}
for_each(begin(futures), end(futures), [](future<void> & I){
I.wait();
});
cout << "done." << endl;
return 0;
}
実行すると、一秒で制御を戻します。要約すると、future<void> の vector に async のラムダを30個格納して、wait で各 async の終了を待つだけです。
各 async について、デフォルト動作であるこのケースでは、生成された時点でタスクが開始されています。async は、future のラッパーなので、future<void> に代入しています。なぜここで void なのかというと、ラムダの返値が void だからです。返値が int なら future<int> になります。その場合の例を以下のコードに示します。
// async2.cpp
#include <future>
#include <iostream>
using namespace std;
int ref1() {
return 1;
}
int ref2() {
return 2;
}
int main(int, char *[]) {
future<int> f1 = async(ref1);
future<int> f2 = async(ref2);
int result = f1.get() + f2.get();
cout << "result = " << result << endl;
return 0;
}
async には関数、ラムダ、ファンクタを渡すことができます。
スレッドの返値
以下のコードは、各スレッドから計算結果を取得、表示します。結果を取得するための future は、promise に関連づけられているので、それを使います。
// promise.cpp
#include <functional>
#include <future>
#include <iostream>
#include <thread>
using namespace std;
int int1() {
return 1;
}
int int2() {
return 2;
}
void to_promise(int(*calc)(), promise<int> & p) {
p.set_value(calc());
}
int main(int, char *[]) {
promise<int> p1, p2;
future<int> f1 = p1.get_future(), f2 = p2.get_future();
thread t1(&to_promise, int1, ref(p1)), t2(&to_promise, int2, ref(p2));
int result = f1.get() + f2.get();
t1.join();
t2.join();
cout << "result = " << result << endl;
return 0;
}
promise はコピー禁止なので ref を使う必要があります。
非同期例外
以下のコードは、別のスレッドで発生した例外を処理します。
// exception1.cpp
#include <functional>
#include <exception>
#include <iostream>
#include <stdexcept>
#include <thread>
using namespace std;
void thrower(exception_ptr & p) {
try {
throw runtime_error("throwed");
}
catch(...) {
p = current_exception();
}
}
int main(int, char *[]) {
exception_ptr e;
thread t(thrower, ref(e));
t.join();
try {
if(e != exception_ptr()) rethrow_exception(e);
} catch(const exception & x) {
cout << x.what() << endl;
}
return 0;
}
current_exception で初期化された exception_ptr の内容は唯一なので、rethrow_exception を通して元スレッド内で再スローします。
アトミック型
スレッドの競合状態から保護される組み込み型を提供します。型名の最初に atomic_ がついている型、たとえば atomic_int や atomic_uint などがそうです。
// atomic.cpp
#include <algorithm>
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
using namespace std;
atomic_int total_iterations;
vector<int> vectors;
void finder(int element) {
auto iterations = 0;
find_if(begin(vectors), end(vectors), [=, &iterations](int I) -> bool {
iterations++;
return I == element;
});
total_iterations += iterations;
cout << "Thread #" << this_thread::get_id() << ": ";
cout << "element " << element << " ";
cout << "found after " << iterations << " iterations" << endl;
}
int main(int, char *[]) {
for(int i = 0; i < 1000; i++) vectors.push_back(i);
random_shuffle(begin(vectors), end(vectors));
thread t(finder, 0);
finder(500);
t.join();
cout << total_iterations << " total iterations" << endl;
}
意味合いとしてのサンプルです。total_iterations が atomic_int として競合状態から保護されています。
排他処理
std::mutex はマルチスレッディングでのアクセス制御を行います。lock と unlock が提供されています。
std::mutex m;
int n; // 共有データ
m.lock();
n++; // 共有データの操作
m.unlock();
共有データを操作するコード領域(クリティカルセクション/リージョン)を lock() と unlock で囲むと、ただ一つのスレッドが共有データにアクセスすることを保証します。ある lock 中のコード領域に他のスレッドが lock を試行すると、unlock されるまで他のスレッドは実行をブロックされます。
これとは別に、コード領域の lock をブロックされることなく取得する try_lock が提供されています。
std::mutex m;
int n; // 共有データ
if(m.try_lock()) {
n++; // 共有データの操作
m.unlock();
} else {
// 他の操作
}
std::recursive_mutex は複数回取得できる lock 操作と、取得した回数だけ呼び出さないと unlock しない unlock 操作を提供しています。
std::recursive_mutex m;
int n; // 共有データ
void f(int count) {
m.lock();
n++; // 共有データの操作
if(--count > 0) f(count);
m.unlock();
}
指定の時間だけ lock する操作は、std::timed_mutex が提供します。
std::timed_mutex m;
int n; // 共有データ
if(m.try_lock_for(std::chrono::seconds(5))) { // 五秒間
n++; // 共有データの操作
m.unlock();
} else {
// 他の操作
}
ロック
ロックは、接尾辞 _lock を持つオブジェクトで、ロックの破棄時に、事前に与えた mutex を unlock します。これは、例外安全なふるまいを実現します。
std::mutex m;
int n; // 共有データ
void f() {
std::unique_lock<mutex> l(m); // ここで lock している
n++; // 共有データの操作
} // ここで unlock している
複数のロックを取得する際は、以下のコードのようにすべきです。
std::mutex m1, m2, m3;
void f() {
std::unique_lock<mutex> l1(m1, std::defer_lock);
std::unique_lock<mutex> l2(m2, std::defer_lock);
std::unique_lock<mutex> l3(m3, std::defer_lock);
lock(l1, l2, l3);
} // ここで unlock している
defer_lock ポリシーは、渡した mutex をロックせずに unique_lock の管理下へ置きます。その後、前コードのように、破棄前にどこかで lock しなければなりません。
std::mutex m1, m2, m3;
void f() {
lock(l1, l2, l3);
std::unique_lock<mutex> l1(m1, std::adopt_lock);
std::unique_lock<mutex> l2(m2, std::adopt_lock);
std::unique_lock<mutex> l3(m3, std::adopt_lock);
} // ここで unlock している
adopt_lock ポリシーでは、ロック済みの mutex を渡し、それを unique_lock の管理下へ置きます。
以下のコードは、std::mutex を使った排他処理のサンプルです。
// timelock.cpp
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;
mutex m;
void locker() { // 定期的にロックをかけたりはずしたり
for(int i = 0; i < 3; i++) {
{
lock_guard<mutex> l(m);
cout << "locking with second" << endl;
this_thread::sleep_for(chrono::seconds(1));
cout << "releasing with second" << endl;
}
this_thread::sleep_for(chrono::seconds(1));
}
}
void acquisition() { // 定期的に locker() のロックを取得しに行く
for(int i = 0; i < 10; i++) {
{
unique_lock<mutex> l(m, try_to_lock);
if(l) {
cout << "lock successful" << endl;
cout << "releasing successful" << endl;
} else {
cout << "lock failed, hibernating" << endl;
}
}
this_thread::sleep_for(chrono::milliseconds(500));
}
}
int main(int, char *[]) {
thread t(locker);
this_thread::sleep_for(chrono::milliseconds(500));
acquisition();
t.join();
return 0;
}
locker() は一秒おきに mutex のロック/非ロックを繰り返します。lock_guard は、実体化したスコープにロックをかける構文です。
acquisition() は 0.5秒おきにロックを取得しようと試みます。try_to_lock ポリシーを設定した unique_lock は、ロックの取得を試み、可能なら取得する構文です。
条件変数
以下のコードは、スレッド間でのイベント処理のサンプルです。
// condition_variable.cpp
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
using namespace std;
mutex m;
condition_variable cond;
queue<int> produced;
bool done = false;
void produce() {
for(int i = 0; i < 5; i++) {
this_thread::sleep_for(chrono::seconds(1));
unique_lock<mutex> l(m);
cout << "producing " << i << endl << flush;
produced.push(i);
cond.notify_all();
}
done = true;
cond.notify_all();
}
void consume() {
unique_lock<mutex> l(m);
while(!done) {
cond.wait(l);
while(!produced.empty()) {
cout << "consuming " << produced.front() << endl << flush;
produced.pop();
}
}
}
int main(int, char *[]) {
thread producer(produce), consumer(consume);
producer.join();
consumer.join();
return 0;
}
producer スレッドは、一秒ごとにキューへ値をプッシュします。consumer スレッドは、値がプッシュされると、wait() で停止していた状態から、notify_all() の実行によって再開されます。これを使って、consumer スレッドが過度のスリープ状態にならないようにします。
© 2012 Gallonworks