私が覚えている限り、非常に重い(または非同期の)タスク処理に関して、PHPは常に厳しい評価をされていました。これまではずっと、長いタスクを並列化したければpcntl_fork
を通してフォークするという方法を取らなければいけなかったので、タスクの結果を適切に処理することができませんでした。
そこで私たちは、キューイング(どちらかと言えばタスクを遅くするだけ)やReactPHP、または他の言語を一緒に使うといった、より複雑なソリューションへと向かっていきましたが、PHPでもスレッド処理は可能なのです。そしてより重要なのは、その方法はあなたが思っているよりもはるかに簡単だということです。
この記事では、pthreads拡張(POSIX Threadsの略)について説明します。2012年ごろから広く使われていますが、多くの人がその存在を忘れているか、使うのが苦痛だと考えると思います。その主な理由は、公式ドキュメントの内容が少ないからです。
スレッドに詳しくない方のために、pthreadsを作成した人物による素晴らしい要約をご紹介します。
スレッド処理とは、命令を実行ユニットに分割すること。それらのユニットをプロセッサとコア、またはプロセッサかコアに分散させて、アプリケーションのスループットを最大化させる。
スレッド処理はあらゆるケースに適しているわけではなく、PHPでもそれ以外でも、ただ全てを並列化するだけで速度を上げることはできません。特殊なケースに備えて用意されているものです。その特殊な状況に立たされた時に、既存のドメインロジックでPHPを活用できるようになるでしょう。
pthreadsを知ろう
セットアップ
まず必要なのは、拡張機能をインストールすることです。PHPのHomebrewバージョンを使用しているなら、非常に簡単です。他の拡張機能と同じ要領でインストールしてください。
brew install php56-pthreads
メモ:pthreadsはPHPでコンパイルされ、--with-thread-safety
フラグを要求します。インストールが完了したら、ZTS(Zend Thread Safety)と呼ばれるPHPの別バージョンを実行します。php --version
を実行した際に下記のような警告が出た場合、ほとんどの拡張機能では気にすることはありません。
エラー:mcryptは異なるバージョンのPHPでコンパイルされました
brew reinstall php56-mcrypt
のように、拡張機能を再インストールすれば、うまく先に進めるでしょう。もしBlackfire拡張を有効にしているなら、それを無効にすることをお勧めします。ZTSサポートはいまだ実験的なので、何か問題が発生するかもしれないからです。
用語
pthreads拡張には様々な概念が含まれていますが、それらが詳しく説明されているとは限らないので、最初は混乱するでしょう。拡張機能から提供される次の3つのクラスを理解しておけば十分です。
- Threaded objectは、スレッド化できるタスク。非同期で実行するジョブ。
- Workerはジョブを実行して結果を同期するためのクラス。
- Poolは複数のワーカーを管理して、それらの間でジョブをディスパッチするためのもの。
これだけです。この拡張には少しのクラスしかありませんが、単純に次の図のように派生しています。
この図を見て分かるように、根本的に全てのクラスはThreaded
の子クラスです。Threaded
は継承の基本クラスとなりますが、これ自体はめったに使いません。子クラスがより便利なメソッドを提供し、通常はそちらの処理の方が簡単だからです。
簡単な非同期の例
メモ:簡潔に説明するため、例の中ではクラスでプロパティを宣言することもdocblockを追加することもしませんが、これらは必ず行うべきことです。もしそうしなければ、あなたを見つけ出して、ひどい目に遭わせますよ。
では簡単に、単純なWebクローリングの例から始めましょう。まずジョブを作成します。これはThreaded
を継承しているThread
をさらに継承したクラスになります。Threaded
を継承する全てのクラスには、スレッド対象のタスクを定義するrunメソッドがなくてはいけません。今回は単にGoogle検索のfile_get_contents
にします。
<?php class SearchGoogle extends Thread { public function __construct($query) { $this->query = $query; } public function run() { $this->html = file_get_contents('http://google.fr?q='.$this->query); } }
“cats”と検索するインスタンスを作成してみましょう。ジョブを開始するために、start
メソッドを呼び出します。これは何も返してきませんが、ジョブは別のスレッドで開始されます。
<?php $job = new SearchGoogle('cats'); $job->start();
ジョブが開始されると、$job->isRunning()
のようなメソッドを呼び出してジョブをチェックすることができますが、必要なのはjoin
メソッドです。それを使って親メソッドに処理対象のタスクを待機させ、メインスレッドに”再び結合”します。基本的に次のようなことが起こっています。
注釈(左上から):メインスレッド、メインスレッド、メインスレッド、子スレッド
join
を呼び出せば、クラスが結果を保持していることを確認できます。
<?php $job = new SearchGoogle('cats'); $job->start(); // Wait for the job to be finished and print results $job->join(); echo $job->html;
これを踏まえ、例ごとに複数の検索を同時に開始して結果を取得することができます。
<?php $searches = ['cats', 'dogs', 'birds']; foreach ($searches as &$search) { $search = new SearchGoogle($search); $search->start(); } foreach ($searches as $search) { $search->join(); echo substr($search->html, 0, 20); }
jobクラス内でタイムスタンプを出力する場合は以下のようになります。
public function run() { echo microtime(true).PHP_EOL; $this->html = file_get_contents('http://google.fr?q='.$this->query); }
ファイルを実行すれば、3つのジョブ全てが確かに同時に開始していることを確認できるでしょう。
$ php multiple.php 1446987102.4479 1446987102.4503 1446987102.4525 <!doctype html><html <!doctype html><html <!doctype html><html
ワーカーによるジョブ管理
ここまでは比較的簡単でしたが、ジョブを自分で管理せずに済むのが理想でしょう。ジョブをそれぞれ個別に開始し、1つずつ結合するようなことはしたくないと思います。ジョブをどこかへ投げさえすれば、勝手に処理され、処理が全て完了した時に結果を取得できたらいいと思っていることでしょう。そこでワーカーの出番です。
Workerは、その上にジョブをスタックすることができるクラスで、全ジョブを一度に開始し、結合します。
<?php class Searcher extends Worker { public function run() { echo 'Running '.$this->getStacked().' jobs'.PHP_EOL; } } // Stack our jobs on our worker $worker = new Searcher(); $searches = ['dogs', 'cats', 'birds']; foreach ($searches as &$search) { $search = new SearchGoogle($search); $worker->stack($search); } // Start all jobs $worker->start(); // Join all jobs and close worker $worker->shutdown();
この出力結果は次のようになります。
Running 3 jobs 1446989108.5938 1446989108.6898 1446989108.9086
結果の収集
これで、ワーカーにメソッドを追加するだけでジョブからそのメソッドを呼び出して、各ジョブの結果を追跡できるようになりました。ジョブをワーカーにスタックすると、ジョブがそのワーカーを認識し、$this->worker
を通してワーカーにアクセスできるようになります。それを行い、ジョブによってフェッチされたHTMLを収集してみましょう。
<?php class Searcher extends Worker { public $data = []; public function run() { echo 'Running '.$this->getStacked().' jobs'.PHP_EOL; } /** * To avoid corrupting the array * we use array_merge here instead of just * $this->data[] = $html */ public function addData($data) { $this->data = array_merge($this->data, [$data]); } } class SearchGoogle extends Threaded { public function __construct($query) { $this->query = $query; } public function run() { echo microtime(true).PHP_EOL; $this->worker->addData( file_get_contents('http://google.fr?q='.$this->query) ); } } // Stack our jobs on our worker $worker = new Searcher(); $searches = ['dogs', 'cats', 'birds']; foreach ($searches as &$search) { $search = new SearchGoogle($search); $worker->stack($search); } // Start all jobs $worker->start(); // Join all jobs and close worker $worker->shutdown(); foreach ($worker->data as $html) { echo substr($html, 0, 20).PHP_EOL; }
これを実行すれば、以下のような適切な結果を取得できます。
Running 3 jobs 1446989506.0509 1446989507.938 1446989510.4684 <!doctype html><html <!doctype html><html <!doctype html><html
ここで、$worker->stack(new SearchGoogle($search))
を行っていないことに気づくでしょう。これは、ワーカーにそのジョブの参照を追跡させる必要があるためです。つまり、ワーカーを開始すると、ワーカーにスタックされたジョブは全てメインスレッドで何かを参照しなければなりません。これはかなり面倒なので、このためにPoolクラスと呼ばれるクラスが作成されました。見てみましょう。
ジョブのプーリング
Poolはクラスで、その目的は、ジョブを1つ以上のワーカーにディスパッチして、それらのジョブを管理することです。ドキュメントでは次のように説明されています。
プーリングによって、pthreadsに必要な方法での参照の管理を含む、ワーカー機能のより高いレベルの抽象化を行うことができます。
前の例を、プーリングで書き換えてみましょう。プールを作成する際、以下の3つの引数を渡す必要があります。
- プールが同時に使用できるワーカーの数
- プールが(クラス名の文字列を)使用するワーカーの種類
- 作成する際にワーカーを渡す任意の引数
ネイティブのpoolとworkerクラスを使う今回の例では、新しいインスタンスは以下のようになります。
<?php $pool = new Pool(5, Worker::class);
それから$pool->submit(<Threaded>)
を呼び出せば、ジョブをプールにサブミットできます。ワーカーとの主な違いは、ジョブをプールにサブミットするとすぐにジョブが開始するということです。もう参照に対処する必要はありません。
<?php // Create a pool and submit jobs to it $pool = new Pool(5, Worker::class); $pool->submit(new SearchGoogle('cats')); $pool->submit(new SearchGoogle('dogs')); $pool->submit(new SearchGoogle('birds')); // Close the pool once done $pool->shutdown();
前と同じように行えば、全てのジョブが同時に開始するのが見られるでしょう。全てが正常に機能します。それでは、結果を収集してみましょう。ワーカーなどを通してループしなくてもプール内のジョブとやり取りできるようにするため、プールには便利なcollect
メソッドがあり、一種のフィルタとして機能します。それをクロージャに渡し、ジョブがプールにあるかどうかを返すことになります。通常はジョブが完了したかどうかを返します。
このために、pthreadsはCollectable
クラスを提供します。そのクラスはThreaded
を拡張するもので、2つの追加メソッドを持っています。setGarbage
とisGarbage
によって、ジョブは完了したものとして記録され、ガベージコレクタに収集される準備ができます。Poolクラスを書いて、ジョブから結果を収集し、それらを廃棄してみましょう。
<?php class SearchPool extends Pool { public $data = []; public function process() { // Run this loop as long as we have // jobs in the pool while (count($this->work)) { $this->collect(function (SearchGoogle $job) { // If a job was marked as done // collect its results if ($job->isGarbage()) { $this->data[$job->query] = $job->html; } return $job->isGarbage(); }); } // All jobs are done // we can shutdown the pool $this->shutdown(); return $this->data; } }
また、これはつまり、SearchGoogle
ジョブを編集してCollectable
を拡張し、setGarbage
を実行されたメソッドの最後で呼び出す必要があるということです。
<?php class SearchGoogle extends Collectable { public function __construct($query) { $this->query = $query; } public function run() { echo microtime(true).PHP_EOL; $this->html = file_get_contents('http://google.fr?q='.$this->query); $this->setGarbage(); } }
これで次のようなことが行えます。
<?php // Create a pool and submit jobs to it $pool = new SearchPool(5, Worker::class); $pool->submit(new SearchGoogle('cats')); $pool->submit(new SearchGoogle('dogs')); $pool->submit(new SearchGoogle('birds')); $pool->submit(new SearchGoogle('planes')); $pool->submit(new SearchGoogle('cars')); $data = $pool->process(); var_dump($data);
これを実行すれば、5つのジョブが同時に実行されたことだけでなく、それらのジョブが同時に完了し、簡単にジョブの結果が得られたことも分かります。
$ time php pooling.php 1446990493.7183 1446990493.7205 1446990493.7227 1446990493.7247 1446990493.7266 array(5) { 'birds' => (53230) "<!doctype html>"... 'cats' => (53211) "<!doctype html>"... 'cars' => (53250) "<!doctype html>"... 'dogs' => (53244) "<!doctype html>"... 'planes' => (53267) "<!doctype html>"... } php pooling.php 0.51s user 0.03s system 100% cpu 0.940 total