処理並列は、必ず処理速度が向上するのか
3つのアルゴリズムを並列化して、並列化によってどのように処理効率が向上するのか、検証します。
はじめに
CPU は、コンピューターの頭脳です。単純に考えると、頭脳が2つあれば、1つの時と比べて、同じ時間でたくさんのことを考えることが出来そうです。10年ほど前、Intel 製プロセッサー Pentium II の頃から、1枚のボードに複数の CPU を載せて高速化する試みがなされました。Intel 製 CPU では2ユニットまででしたが、SUN Microsystems 製のコンピューターでは、もっとたくさんのユニットを載せることが出来ていました。今では、CPU 自体に複数の「コア」を載せ、1ユニットで同時に別々のことを実行できるようになっています。
ハードウェアが、同時に複数のことを実行できるようになったため、ソフトウェアもそれに対応する必要が出てきました。本記事では、昨今あちらこちらで声高に繰り返される、「並列処理化すれば速くなる」に疑問を持ったため、それを検証することにします。
対象読者
なんといっても、並列処理化に興味のある方が対象です。特に、「並列化すれば速くなる」と、単純に思い込んでいる方に、読んでいただきたいと思います。
本記事では、Visual C++ 2008 を元に、基本的に C 言語によるコードにより、検証を行います。よって、ある程度の C 言語の知識があるとよいでしょう。しかし、処理は日本語で説明をするようにしますので、C 言語の文法を知っている必要はありません。
必要な環境
本記事を、読む分には、特別な環境を必要としません。
本記事で紹介するコード例を試すには、Visual Studio C++ 2008 を使用します。また、実際に差異を見るためには、タスクマネージャーで複数の CPU 使用履歴が表示される PC が必要です。
並列処理化の有用性を検証する
どうやって検証するか
まず、検証方法を設計します。どういう仮説があって、その仮説が正しいことを証明するためには、どの様な実験から、どの様な結果が出ればよいのか。これらを明らかにしておかなければ、「検証」ではなく、「観察」になってしまいます。
本記事は、「並列化すれば速くなる」という仮定を検証します。このために、いくつかのアルゴリズムについてコードを作成します。そのコードを、Open MP によってマルチ スレッド化します。シングル、マルチ スレッドで実行完了までにかかる時間を計測し、それぞれマルチ スレッドによってどれくらい実行時間が短縮できたかを算出します。短縮時間を比べることで、マルチ スレッド化がどの様な時にも有効なのか、調査することとします。
アルゴリズム1:素数を求める
素数とは、なんでしょうか。1と、その数以外の数では割り切ることが出来ない数です。では、どうやってこれを求めましょうか。2からその数の手前までの数で順番に割ってみて、割りきれる数があるかどうか、調べます。つまり、11が素数かどうか調べるには、2,3,4,5,6,7,8,9,10 で、割り切れるかどうかを調べます。コードで示します。
素数を判別するコード例:
// 素数かどうか、判別する
// 戻り値:0…素数ではない / 0以外…素数
int IsPrimeNumber(const int 調べる数)
{
for (int i = 2; i < 調べる数; ++i) {
if (調べる数 % i == 0) {
return 0;
}
}
return 1;
}
素数を求めたい範囲について、for 文でループします。それぞれの数についてこの判別関数を通すことで、素数かどうかを調べます。とりあえず、この関数では「素数の数」だけ返すようにします。処理を並列化することの効率を測定するので、コンソールへの出力は行いません。コンソールへの出力がシリアル化される、コンソール出力は遅い処理である、というのが理由です。求めた素数は配列へ格納します。配列を動的に拡張すると、メモリを確保するという並列化できない要素が出てくるため、呼び出し側で確保しておくこととします。
ある数までの素数を求めるコード例:
int GetPrimeNumbers(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0)
{
int index = 0;
int count = 0;
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
++count;
if (素数配列 != NULL && index < 配列数) {
素数配列[index++] = i;
}
}
}
return count;
}
これで[ある数までの素数の数を求めるコード」が出来ました。では、このコードが正しいかどうか、検証します。コードによって、1~20までの素数の数を求めさせます。これくらいなら、自分で計算も出来るでしょう。その結果とつきあわせます。
素数(の数)を求めるコードを検証するコード:
#include "stdafx.h"
#include <iostream>
#include <omp.h>
int IsPrimeNumber(const int 調べる数) をここに
int GetPrimeNumbers(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0) をここに
int _tmain(int argc, _TCHAR* argv[])
{
std::cout << GetPrimeNumbers(2) << "個/2…1\n";
std::cout << GetPrimeNumbers(3) << "個/3…2\n";
std::cout << GetPrimeNumbers(4) << "個/4…2\n";
std::cout << GetPrimeNumbers(5) << "個/5…3\n";
std::cout << GetPrimeNumbers(6) << "個/6…3\n";
std::cout << GetPrimeNumbers(7) << "個/7…4\n";
std::cout << GetPrimeNumbers(8) << "個/8…4\n";
std::cout << GetPrimeNumbers(9) << "個/9…4\n";
std::cout << GetPrimeNumbers(10) << "個/10…4\n";
std::cout << GetPrimeNumbers(11) << "個/11…5\n";
std::cout << GetPrimeNumbers(12) << "個/12…5\n";
std::cout << GetPrimeNumbers(13) << "個/13…6\n";
std::cout << GetPrimeNumbers(14) << "個/14…6\n";
std::cout << GetPrimeNumbers(15) << "個/15…6\n";
std::cout << GetPrimeNumbers(16) << "個/16…6\n";
std::cout << GetPrimeNumbers(17) << "個/17…7\n";
std::cout << GetPrimeNumbers(18) << "個/18…7\n";
std::cout << GetPrimeNumbers(19) << "個/19…8\n";
std::cout << GetPrimeNumbers(20) << "個/20…8\n";
return 0;
}
実行結果:
1個/2…1
2個/3…2
2個/4…2
3個/5…3
3個/6…3
4個/7…4
4個/8…4
4個/9…4
4個/10…4
5個/11…5
5個/12…5
6個/13…6
6個/14…6
6個/15…6
6個/16…6
7個/17…7
7個/18…7
8個/19…8
8個/20…8
コードが正しいことが検証できました。これを元に、マルチ スレッド化のためのコードを追加します。
GetPrimeNumbers を、マルチ スレッド化する:
int GetPrimeNumbersParallel(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0)
{
int index = 0;
int count = 0;
#pragma omp parallel for
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
#pragma omp atomic
++count;
if (素数配列 != NULL) {
#pragma omp critical
if (index < 配列数) {
素数配列[index++] = i;
}
}
}
}
return count;
}
これも、検証しておきます。GetPrimeNumbers がシングル スレッドで動作すること、GetPrimeNumbersParallel がマルチ スレッドで動作しているか、確認です。確認を行う土台が、確認を行う目的にとって正しいことを確認しておかないと、何を確認しているのか、わからなくなります。
次のように _tmain 関数を書き換え、実行します。実行中、タスクマネージャーの[プロセス]タブで実行しているプロセスのスレッド数と、CPU 使用率を見ます。スレッド数は、初期状態では表示されていないので、[表示]メニューから「列の選択」を選び、一覧から「スレッド数」を探してチェックします。シングル スレッドの方は、スレッド数が“1”、CPU 使用率は“(100/CPU数)%”である事を確認します。また、マルチ スレッドの方は、スレッド数が“CPU数”、CPU 使用率が“100%”であることを確認します。もし、マルチ スレッド側のスレッド数が“1”の場合、プロジェクトのプロパティで、「構成プロパティ」→「C/C++」→「言語」を開き、「OpenMP サポート」が「はい(/openmp)」になっていることを確認してください。
シングル スレッドであることを確認する:
int _tmain(int argc, _TCHAR* argv[])
{
GetPrimeNumbers(300000);
return 0;
}
マルチ スレッドであることを確認する:
int _tmain(int argc, _TCHAR* argv[])
{
GetPrimeNumbersParallel(300000);
return 0;
}
アルゴリズム2:FizzBuzz 問題
FizzBuzz 問題は、ご存知ですね。1から任意の数までについて出力します。しかし、3の倍数の時は「Fizz」、5の倍数の時は「Buzz」と出力します。これを単純にコード化すると、次のようになります。
FizzBuzz 問題のコード:
void FizzBuzz(const int この数まで)
{
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
std::cout << "FizzBuzz ";
} else if (i % 3 == 0) {
std::cout << "Fizz ";
} else if (i % 5 == 0) {
std::cout << "Buzz ";
} else {
std::cout << i << " ";
}
}
}
しかし、こうすると、出力結果でコンソールが大変なことになります。また、このように出力すると並列化したときに「実行順」が問題になります。なので、ここでは文字列領域を確保し、そこに入れていくことにします。これも動作の検証をして、並列化したコードも作ります。ここでは動作検証は省きますが、実際にやってみる場合は、しっかり検証してください。
FizzBuzz 問題のコード(文字列格納型):
#define FIZZBUZZMAX 5000000
char FB結果[FIZZBUZZMAX][12];
void FizzBuzz(const int この数まで)
{
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
void FizzBuzzParallel(const int この数まで)
{
#pragma omp parallel for
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
アルゴリズム3:バブル ソート
3つ目のアルゴリズムは、バブル ソートです。
バブル ソートのアルゴリズムをおさらいしましょう。配列に数値が並んでいます。この数値を、小さい順に並べるとします。まず、配列の一番後ろと、そのひとつ手前を比べます。後ろにある方が大きければ、入れ替えます。次に、一番後ろ-1番目と、そのひとつ手前を比べます。そして、ひとつ手前の方が大きければ入れ替えます。これを、最初の要素まで繰り返します。最初までくると、「一番小さい数値」が決定します。そこで今度は、「二番目に小さい数値」を決定すべく、また一番後ろから比較を繰り返します。
バブル ソートのコード:
#define BSORTNUM 100000
int BS配列[BSORTNUM];
void BubbleSort(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
void BubbleSortParallel(int* 対象配列, const int 配列数)
{
#pragma omp parallel for
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
さて、これも、検証しておきます。すると、並列化した方で、結果がおかしいことがわかります。以下の結果は、BSORTNUM を 100 として検証したときの結果です。「BS配列」は、1~BSORTNUM で埋め、任意の2つを入れ替えてシャッフルした後、実行しました。
バブル ソートのための、配列をシャッフルするコード:
void Shuffle(int *対象配列, const int 配列数)
{
for (int i = 0; i < 配列数; ++i) {
対象配列[i] = i + 1;
}
int half = 配列数 / 2;
int r = rand() % half;
// 配列数が奇数だと最後の数はシャッフルされない
for (int i = 0; i < (half + r); ++i) {
int b = rand() % half + half;
int s = rand() % half;
int tmp = 対象配列[b];
対象配列[b] = 対象配列[s];
対象配列[s] = tmp;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
srand((unsigned int) time(0));
for (int i = 0; i < 1; ++i) {
Shuffle(BS配列, BSORTNUM);
std::cout << "ソート前\n";
for (int i = 0; i < BSORTNUM; ++i) {
std::cout << BS配列[i] << "\t";
}
std::cout << "\n----------\nソート後\n";
BubbleSortParallel(BS配列, BSORTNUM);
}
for (int i = 0; i < BSORTNUM; ++i) {
std::cout << BS配列[i] << "\t";
}
return 0;
}
実行結果(一例):
ソート前
88 25 18 83 54 90 79 2 49 73
94 92 13 14 97 52 96 75 27 38
100 39 61 40 59 24 7 26 29 30
42 31 23 55 53 36 37 91 11 98
62 74 43 66 71 46 47 95 58 16
51 45 76 6 99 17 19 12 82 60
33 41 35 64 84 5 67 22 68 70
50 72 10 65 80 86 77 44 9 78
81 85 93 32 87 69 8 1 89 3
20 15 4 63 48 56 28 21 34 57
----------
ソート後
1 2 3 4 4 5 6 7 8 9
10 11 12 13 14 15 16 17 18 19
20 21 21 22 23 24 25 26 27 28
29 30 31 33 34 35 36 37 38 39
40 41 42 43 44 45 46 47 48 49
88 50 51 83 54 90 79 52 53 73
94 92 55 56 97 57 96 75 58 59
100 60 61 62 63 64 65 66 67 68
69 70 71 72 74 76 77 91 78 98
80 81 82 84 86 87 89 95 93 99
検証するクセを付けておいてよかったでしょ?
実は、バブル ソートのアルゴリズムは、並列化に向いていません。複数の並列に行われる処理が、同じ領域を書き換えようとするためです。上の実行結果では、「4」が2回現れています。代わりに、32がなくなっています。「4」の事を除くと50までは正しいように思われますが、51以上は並び方がおかしいです。これは、「対象配列[y]」と「対象配列[y-1]」の比較入れ替えで、複数のスレッドが、y の値が同じところを操作しようとしたためです。
これを防ぐには、この範囲をクリティカル エリアとしてマークします。クリティカルとしてマークされた範囲は、必ず1つのスレッドでしか実行されません。バブル ソートの場合、アルゴリズムの大部分が、クリティカルとしてマークされてしまうことになります。
バブル ソートのコード(クリティカルとマーク):
void BubbleSortParallel(int* 対象配列, const int 配列数)
{
#pragma omp parallel for ordered
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
#pragma omp critical
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
実行結果(一例):
ソート前
63 2 3 100 5 81 16 82 86 53
11 1 13 14 34 35 58 74 19 40
8 22 23 24 73 18 27 28 9 30
55 56 33 76 99 12 98 4 26 90
68 37 57 44 67 95 47 48 32 69
51 52 78 54 80 38 94 17 59 60
72 49 43 64 65 66 45 41 50 70
88 7 25 84 77 15 62 10 36 92
6 21 83 61 85 29 87 42 89 96
91 20 93 79 46 31 97 71 39 75
----------
ソート後
1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95 96 97 98 99 100
さて、一通り、出来ました。ここまでのコードを、もう一度掲示しておきます。
検証コード(全体):
#include "stdafx.h"
#include <iostream>
#include <math.h>
#include <omp.h>
#include <sys/types.h>
#include <sys/timeb.h>
#include <time.h>
#include <stdlib.h>
#define PRIMENUMBERS 500000
#define FIZZBUZZMAX 30000000
#define BSORTNUM 30000
#define ROUNDCOUNT 5
char FB結果[FIZZBUZZMAX][12];
int BS配列[BSORTNUM];
#define TmToSec(X) ((X).time + (X).millitm / 1000.0)
#define DiffTmSec(START, STOP) (TmToSec(STOP) - TmToSec(START))
// 素数かどうか、判別する
// 戻り値:0…素数ではない / 0以外…素数
int IsPrimeNumber(const int 調べる数)
{
for (int i = 2; i < 調べる数; ++i) {
if (調べる数 % i == 0) {
return 0;
}
}
return 1;
}
int IsPrimeNumberFast(const int 調べる数)
{
if (調べる数 == 2) { return 1;}
if (調べる数 % 2 == 0) { return 0;}
int limit = (int) sqrt((double)調べる数);
for (int i = 3; i <= limit; i += 2) {
if (調べる数 % i == 0) {
return 0;
}
}
return 1;
}
int GetPrimeNumbers(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0)
{
int index = 0;
int count = 0;
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
++count;
if (素数配列 != NULL && index < 配列数) {
素数配列[index++] = i;
}
}
}
return count;
}
int GetPrimeNumbersParallel(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0)
{
int index = 0;
int count = 0;
#pragma omp parallel for schedule(static)
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
#pragma omp atomic
++count;
if (素数配列 != NULL) {
#pragma omp critical
if (index < 配列数) {
素数配列[index++] = i;
}
}
}
}
return count;
}
void FizzBuzz(const int この数まで)
{
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
void FizzBuzzParallel(const int この数まで)
{
#pragma omp parallel for
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
void BubbleSort(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
void BubbleSortParallel(int* 対象配列, const int 配列数)
{
#pragma omp parallel for ordered
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
#pragma omp critical
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
void Shuffle(int *対象配列, const int 配列数)
{
for (int i = 0; i < 配列数; ++i) {
対象配列[i] = i + 1;
}
int half = 配列数 / 2;
int r = rand() % half;
for (int i = 0; i < (half + r); ++i) {
int b = rand() % half + half;
int s = rand() % half;
int tmp = 対象配列[b];
対象配列[b] = 対象配列[s];
対象配列[s] = tmp;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
struct _timeb startTm, stopTm; // ストップウォッチ用
srand((unsigned int) time(0));
int cnt;
double bsSeri = 0.0, bsPall = 0.0;
double fbSeri = 0.0, fbPall = 0.0;
double pnSeri = 0.0, pnPall = 0.0;
std::cout << "素数の数を求める範囲\t" << PRIMENUMBERS << "\n";
std::cout << "FizzBuzz を求める範囲\t" << FIZZBUZZMAX << "\n";
std::cout << "Bubble Sort の要素数\t" << BSORTNUM << "\n";
std::cout << "繰り返し回数\t" << ROUNDCOUNT << "\n";
std::cout << "\n";
/**/
for (int i = 0; i < ROUNDCOUNT; ++i) {
Shuffle(BS配列, BSORTNUM);
_ftime_s(&startTm);
BubbleSort(BS配列, BSORTNUM);
_ftime_s(&stopTm);
std::cout << "BubbleSort シリアル\t" << DiffTmSec(startTm, stopTm) << " sec.\n";
bsSeri += DiffTmSec(startTm, stopTm);
}
/**/
for (int i = 0; i < ROUNDCOUNT; ++i) {
_ftime_s(&startTm);
FizzBuzz(FIZZBUZZMAX);
_ftime_s(&stopTm);
std::cout << "FizzBuzz シリアル\t" << DiffTmSec(startTm, stopTm) << " sec.\n";
fbSeri += DiffTmSec(startTm, stopTm);
}
/**/
for (int i = 0; i < ROUNDCOUNT; ++i) {
_ftime_s(&startTm);
cnt = GetPrimeNumbers(PRIMENUMBERS);
_ftime_s(&stopTm);
std::cout << "素数 シリアル " << cnt << "個\t" << DiffTmSec(startTm, stopTm) << " sec.\n";
pnSeri += DiffTmSec(startTm, stopTm);
}
/**/
for (int i = 0; i < ROUNDCOUNT; ++i) {
Shuffle(BS配列, BSORTNUM);
_ftime_s(&startTm);
BubbleSortParallel(BS配列, BSORTNUM);
_ftime_s(&stopTm);
std::cout << "BubbleSort パラレル\t" << DiffTmSec(startTm, stopTm) << " sec.\n";
bsPall += DiffTmSec(startTm, stopTm);
}
/**/
for (int i = 0; i < ROUNDCOUNT; ++i) {
_ftime_s(&startTm);
FizzBuzzParallel(FIZZBUZZMAX);
_ftime_s(&stopTm);
std::cout << "FizzBuzz パラレル\t" << DiffTmSec(startTm, stopTm) << " sec.\n";
fbPall += DiffTmSec(startTm, stopTm);
}
/**/
for (int i = 0; i < ROUNDCOUNT; ++i) {
_ftime_s(&startTm);
cnt = GetPrimeNumbersParallel(PRIMENUMBERS);
_ftime_s(&stopTm);
std::cout << "素数 パラレル " << cnt << "個\t" << DiffTmSec(startTm, stopTm) << " sec.\n";
pnPall += DiffTmSec(startTm, stopTm);
}
/**/
std::cout << "\n";
std::cout << "BubbleSort シリアル平均 " << (bsSeri / (float) ROUNDCOUNT) << " sec.\n";
std::cout << "BubbleSort パラレル平均 " << (bsPall / (float) ROUNDCOUNT) << " sec.\n";
std::cout << "FizzBuzz シリアル平均 " << (fbSeri / (float) ROUNDCOUNT) << " sec.\n";
std::cout << "FizzBuzz パラレル平均 " << (fbPall / (float) ROUNDCOUNT) << " sec.\n";
std::cout << "素数 シリアル平均 " << (pnSeri / (float) ROUNDCOUNT) << " sec.\n";
std::cout << "素数 パラレル平均 " << (pnPall / (float) ROUNDCOUNT) << " sec.\n";
std::cout << "\n";
std::cout << "BubbleSort 処理効率 " << (bsSeri / bsPall) << "\n";
std::cout << "FizzBuzz 処理効率 " << (fbSeri / fbPall) << "\n";
std::cout << "素数 処理効率 " << (pnSeri / pnPall) << "\n";
return 0;
}
実行結果(一例):
素数の数を求める範囲 500000
FizzBuzz を求める範囲 30000000
Bubble Sort の要素数 30000
繰り返し回数 5
BubbleSort シリアル 3.651 sec.
BubbleSort シリアル 3.853 sec.
BubbleSort シリアル 3.9 sec.
BubbleSort シリアル 3.682 sec.
BubbleSort シリアル 3.807 sec.
FizzBuzz シリアル 11.384 sec.
FizzBuzz シリアル 10.92 sec.
FizzBuzz シリアル 10.921 sec.
FizzBuzz シリアル 10.874 sec.
FizzBuzz シリアル 10.921 sec.
素数 シリアル 41538個 52.481 sec.
素数 シリアル 41538個 52.497 sec.
素数 シリアル 41538個 52.606 sec.
素数 シリアル 41538個 52.45 sec.
素数 シリアル 41538個 52.543 sec.
BubbleSort パラレル 54.057 sec.
BubbleSort パラレル 56.287 sec.
BubbleSort パラレル 56.474 sec.
BubbleSort パラレル 56.443 sec.
BubbleSort パラレル 56.459 sec.
FizzBuzz パラレル 5.943 sec.
FizzBuzz パラレル 5.882 sec.
FizzBuzz パラレル 5.819 sec.
FizzBuzz パラレル 5.819 sec.
FizzBuzz パラレル 5.804 sec.
素数 パラレル 41538個 38.533 sec.
素数 パラレル 41538個 38.767 sec.
素数 パラレル 41538個 39.001 sec.
素数 パラレル 41538個 38.924 sec.
素数 パラレル 41538個 38.814 sec.
BubbleSort シリアル平均 3.7786 sec.
BubbleSort パラレル平均 55.944 sec.
FizzBuzz シリアル平均 11.004 sec.
FizzBuzz パラレル平均 5.8534 sec.
素数 シリアル平均 52.5154 sec.
素数 パラレル平均 38.8078 sec.
BubbleSort 処理効率 0.0675425
FizzBuzz 処理効率 1.87993
素数 処理効率 1.35322
検証
さあ、結果を検証しましょう。それぞれのアルゴリズムの主要な要素数を変更して、実行しました。結果の一例を、表形式で表します。
Bubble Sort の計測結果
要素数 |
10000 |
20000 |
30000 |
|
シリアル |
パラレル |
シリアル |
パラレル |
シリアル |
パラレル |
1回目 |
0.4060 |
6.1150 |
1.6380 |
24.0090 |
3.6510 |
54.057 |
2回目 |
0.4210 |
6.2410 |
1.6530 |
25.3200 |
3.8530 |
56.287 |
3回目 |
0.4060 |
6.2560 |
1.7010 |
25.3660 |
3.9000 |
56.474 |
4回目 |
0.4050 |
6.2250 |
1.6380 |
25.1940 |
3.6820 |
56.443 |
5回目 |
0.4210 |
6.3180 |
1.6690 |
25.3040 |
3.8070 |
56.459 |
平均 |
0.4118 |
6.2310 |
1.6598 |
25.0386 |
3.7786 |
55.944 |
処理効率 |
0.06608891 |
0.066289649 |
0.067542543 |
FizzBuzz の計測結果
要素数 |
10000000 |
20000000 |
30000000 |
|
シリアル |
パラレル |
シリアル |
パラレル |
シリアル |
パラレル |
1回目 |
3.4790 |
1.9350 |
7.7220 |
3.8070 |
11.384 |
5.9430 |
2回目 |
3.4330 |
1.8870 |
7.9250 |
3.8370 |
10.920 |
5.8820 |
3回目 |
3.4320 |
1.8570 |
7.3910 |
3.8380 |
10.921 |
5.8190 |
4回目 |
3.4320 |
1.9190 |
7.3950 |
3.8220 |
10.874 |
5.8190 |
5回目 |
3.4480 |
1.9030 |
7.5290 |
3.8220 |
10.921 |
5.8040 |
平均 |
3.4448 |
1.9002 |
7.5924 |
3.8252 |
11.004 |
5.8534 |
処理効率 |
1.812861804 |
1.984837394 |
1.87993303 |
素数(の数)を求めるの計算結果
要素数 |
100000 |
300000 |
500000 |
|
シリアル |
パラレル |
シリアル |
パラレル |
シリアル |
パラレル |
1回目 |
2.6990 |
2.3400 |
22.7450 |
15.5390 |
52.4810 |
38.5330 |
2回目 |
2.6990 |
2.3240 |
21.2480 |
15.4930 |
52.4970 |
38.7670 |
3回目 |
2.7140 |
2.3250 |
20.9510 |
15.4150 |
52.6060 |
39.0010 |
4回目 |
2.7150 |
2.3870 |
21.0290 |
15.4920 |
52.4500 |
38.9240 |
5回目 |
2.6980 |
2.4960 |
20.9980 |
15.8210 |
52.5430 |
38.8140 |
平均 |
2.7050 |
2.3744 |
21.3942 |
15.5520 |
52.5154 |
38.8078 |
処理効率 |
1.139235175 |
1.375655864 |
1.353217652 |
バブル ソートについては、並列化をする方が遅い結果となりました。これは、並列化して実行する処理のほとんどをクリティカルとしたため、実際には並列化が行われないことと、クリティカル エリアを実行するために待ち合わせが発生するためです。並列化をするためには、まず、クリティカルなエリアをまとめ、できるだけ短い時間に抜けるような工夫が必要です。
素数(の数)を求めると、FizzBuzz で、並列化による効果が異なります。FizzBuzz では1.8倍の効果が出ていますが、素数(の数)を求めるでは、1.3倍にとどまっています。これは、素数の数をカウントするためにクリティカル エリア(アトミックな計算)があることと、並列化で実行されるループの中の処理が、ループの後ろほど数が多いことが原因です。
素数(の数)を求める処理では、「2以下の素数の数」「3以下の素数の数」…「100以下の素数の数」と数が進むにつれて、だんだんと調べる数が多くなります。Open MP の for ディレクティブでは、1つのスレッドが実行するループの回数を制御します。たとえば、全体で千回のループであれば、百回のループを10回繰り返すような動作をします。この10回の繰り返しを、並列に処理するわけです。今、50万以下の素数を求めるわけですが、これを「25万未満」と、「25万以上50万以下」の2つに分けて実行したとします。すると、「25万未満」を担当するスレッドは、「25万以上50万以下」を担当するスレッドよりも、処理の時間が圧倒的に短くなります。すなわち、いずれかのスレッドの負荷だけが上がり、負荷の平準化がされないために、効率が上がらないのです。これを、「100ずつ」に区切って5千回のタスクに分けると、1つめのタスクと5千個目のタスクでは処理にかかる時間が大きく異なりますが、1つめのタスクと2つめのタスクでは、わずかな差しかありません。2つのスレッドに負荷を平準化して掛けることにより、処理効率を上げることが出来ます。今回のコードでは、こういう処理をしていないため、1.3倍の効率しか出ていません。コードを少し変更すれば、処理効率はもう少し向上します。
まとめ
この結果から、何でもかんでも並列化すれば、等しく速くなるわけではない、ということがわかります。並列化にむいているアルゴリズム、むいていないアルゴリズムがあります。また、並列化にむいているアルゴリズムであっても、十分に検討しなければ思わぬバグを作り込むことになります。事前にしっかりと調査し、コードの設計をすることが必要です。
並列化する前に:
- パフォーマンスを計測し、高速化しなければならないか、検討する。
- 10ミリ秒で終わる処理を高速化したところで、どれほどの意味があるでしょうか。1分かかるところを20秒で処理できるようにするのは、意味があるでしょう。
- より効率化された書き方がないか、調査する。
- 例示した「素数か判定する」ルーチンは、判定に使う数は判定したい数の平方根を超えない整数まででかまいません(IsPrimeNumberFast 関数を参照)。そうすると、32ビット程度の数では、並列化をする必要がなくなります。
- ここでは、「並列化に向かないアルゴリズム」としてバブル ソートを選びました。ソートをするなら、クイック ソートやマージ ソートなど、より効率的なアルゴリズムがあります。
- 処理を、より単純化する。
- 並列化できる箇所を明確にする。
- 本文では、いくつかの並列化できない箇所を予め並列化しない箇所へ退避させています。コンソール出力をやめる、メモリの動的確保、等です。
- 並列に動作すると不具合を引き起こす箇所がないか、検証する。
- 処理順序が一定であることに依存しないか、検証します。
- 複数のスレッドで、同じ領域を書き換えることがないか、検証します。(「生産者と消費者」問題)
- 不具合を起こす箇所を一カ所にまとめ、クリティカル ブロックとする。
- デッドロックしないように、慎重に設計します。(「哲学者の食事」問題)
- クリティカルブロックの処理時間と、並列化している箇所の処理時間を比較する。
- クリティカルブロックの処理時間が長い場合、並列化をあきらめる。
謝辞
本記事は、一旦ブログにて公開し、コミュニティの皆様よりご意見をいただいて、より読みやすいように修正しました。ご協力くださった方々に、この場を借りてお礼申し上げます。
投稿日時 : 2010年2月5日 23:42