スレッド処理は慎重に – PHPでのスレッド処理 : 前編

私が覚えている限り、非常に重い(または非同期の)タスク処理に関して、PHPは常に厳しい評価をされていました。これまではずっと、長いタスクを並列化したければpcntl_forkを通してフォークするという方法を取らなければいけなかったので、タスクの結果を適切に処理することができませんでした。

そこで私たちは、キューイング(どちらかと言えばタスクを遅くするだけ)やReactPHP、または他の言語を一緒に使うといった、より複雑なソリューションへと向かっていきましたが、PHPでもスレッド処理は可能なのです。そしてより重要なのは、その方法はあなたが思っているよりもはるかに簡単だということです。

この記事では、pthreads拡張(POSIX Threadsの略)について説明します。2012年ごろから広く使われていますが、多くの人がその存在を忘れているか、使うのが苦痛だと考えると思います。その主な理由は、公式ドキュメントの内容が少ないからです。

スレッドに詳しくない方のために、pthreadsを作成した人物による素晴らしい要約をご紹介します。

スレッド処理とは、命令を実行ユニットに分割すること。それらのユニットをプロセッサとコア、またはプロセッサかコアに分散させて、アプリケーションのスループットを最大化させる。

スレッド処理はあらゆるケースに適しているわけではなく、PHPでもそれ以外でも、ただ全てを並列化するだけで速度を上げることはできません。特殊なケースに備えて用意されているものです。その特殊な状況に立たされた時に、既存のドメインロジックでPHPを活用できるようになるでしょう。

pthreadsを知ろう

セットアップ

まず必要なのは、拡張機能をインストールすることです。PHPのHomebrewバージョンを使用しているなら、非常に簡単です。他の拡張機能と同じ要領でインストールしてください。

brew install php56-pthreads

メモ:pthreadsはPHPでコンパイルされ、--with-thread-safetyフラグを要求します。インストールが完了したら、ZTS(Zend Thread Safety)と呼ばれるPHPのバージョンを実行します。php --versionを実行した際に下記のような警告が出た場合、ほとんどの拡張機能では気にすることはありません。

エラー:mcryptは異なるバージョンのPHPでコンパイルされました

brew reinstall php56-mcryptのように、拡張機能を再インストールすれば、うまく先に進めるでしょう。もしBlackfire拡張を有効にしているなら、それを無効にすることをお勧めします。ZTSサポートはいまだ実験的なので、何か問題が発生するかもしれないからです。

用語

pthreads拡張には様々な概念が含まれていますが、それらが詳しく説明されているとは限らないので、最初は混乱するでしょう。拡張機能から提供される次の3つのクラスを理解しておけば十分です。

  • Threaded objectは、スレッド化できるタスク。非同期で実行するジョブ。
  • Workerはジョブを実行して結果を同期するためのクラス。
  • Poolは複数のワーカーを管理して、それらの間でジョブをディスパッチするためのもの。

これだけです。この拡張には少しのクラスしかありませんが、単純に次の図のように派生しています。

この図を見て分かるように、根本的に全てのクラスはThreadedの子クラスです。Threadedは継承の基本クラスとなりますが、これ自体はめったに使いません。子クラスがより便利なメソッドを提供し、通常はそちらの処理の方が簡単だからです。

簡単な非同期の例

メモ:簡潔に説明するため、例の中ではクラスでプロパティを宣言することもdocblockを追加することもしませんが、これらは必ず行うべきことです。もしそうしなければ、あなたを見つけ出して、ひどい目に遭わせますよ。

では簡単に、単純なWebクローリングの例から始めましょう。まずジョブを作成します。これはThreadedを継承しているThreadをさらに継承したクラスになります。Threadedを継承する全てのクラスには、スレッド対象のタスクを定義するrunメソッドがなくてはいけません。今回は単にGoogle検索のfile_get_contentsにします。

<?php
class SearchGoogle extends Thread
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        $this->html = file_get_contents('http://google.fr?q='.$this->query);
    }
}

“cats”と検索するインスタンスを作成してみましょう。ジョブを開始するために、startメソッドを呼び出します。これは何も返してきませんが、ジョブは別のスレッドで開始されます。

<?php
$job = new SearchGoogle('cats');
$job->start();

ジョブが開始されると、$job->isRunning()のようなメソッドを呼び出してジョブをチェックすることができますが、必要なのはjoinメソッドです。それを使って親メソッドに処理対象のタスクを待機させ、メインスレッドに”再び結合”します。基本的に次のようなことが起こっています。


注釈(左上から):メインスレッド、メインスレッド、メインスレッド、子スレッド

joinを呼び出せば、クラスが結果を保持していることを確認できます。

<?php
$job = new SearchGoogle('cats');
$job->start();

// Wait for the job to be finished and print results
$job->join();
echo $job->html;

これを踏まえ、例ごとに複数の検索を同時に開始して結果を取得することができます。

<?php
$searches = ['cats', 'dogs', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $search->start();
}

foreach ($searches as $search) {
    $search->join();
    echo substr($search->html, 0, 20);
}

jobクラス内でタイムスタンプを出力する場合は以下のようになります。

public function run()
{
    echo microtime(true).PHP_EOL;

    $this->html = file_get_contents('http://google.fr?q='.$this->query);
}

ファイルを実行すれば、3つのジョブ全てが確かに同時に開始していることを確認できるでしょう。

$ php multiple.php
1446987102.4479
1446987102.4503
1446987102.4525

<!doctype html><html
<!doctype html><html
<!doctype html><html

ワーカーによるジョブ管理

ここまでは比較的簡単でしたが、ジョブを自分で管理せずに済むのが理想でしょう。ジョブをそれぞれ個別に開始し、1つずつ結合するようなことはしたくないと思います。ジョブをどこかへ投げさえすれば、勝手に処理され、処理が全て完了した時に結果を取得できたらいいと思っていることでしょう。そこでワーカーの出番です。

Workerは、その上にジョブをスタックすることができるクラスで、全ジョブを一度に開始し、結合します。

<?php
class Searcher extends Worker
{
    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }
}

// Stack our jobs on our worker
$worker   = new Searcher();
$searches = ['dogs', 'cats', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $worker->stack($search);
}

// Start all jobs
$worker->start();

// Join all jobs and close worker
$worker->shutdown();

この出力結果は次のようになります。

Running 3 jobs
1446989108.5938
1446989108.6898
1446989108.9086

結果の収集

これで、ワーカーにメソッドを追加するだけでジョブからそのメソッドを呼び出して、各ジョブの結果を追跡できるようになりました。ジョブをワーカーにスタックすると、ジョブがそのワーカーを認識し、$this->workerを通してワーカーにアクセスできるようになります。それを行い、ジョブによってフェッチされたHTMLを収集してみましょう。

<?php
class Searcher extends Worker
{
    public $data = [];

    public function run()
    {
        echo 'Running '.$this->getStacked().' jobs'.PHP_EOL;
    }

    /**
     * To avoid corrupting the array
     * we use array_merge here instead of just
     * $this->data[] = $html
     */
    public function addData($data)
    {
        $this->data = array_merge($this->data, [$data]);
    }
}

class SearchGoogle extends Threaded
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        echo microtime(true).PHP_EOL;

        $this->worker->addData(
            file_get_contents('http://google.fr?q='.$this->query)
        );
    }
}

// Stack our jobs on our worker
$worker   = new Searcher();
$searches = ['dogs', 'cats', 'birds'];
foreach ($searches as &$search) {
    $search = new SearchGoogle($search);
    $worker->stack($search);
}

// Start all jobs
$worker->start();

// Join all jobs and close worker
$worker->shutdown();
foreach ($worker->data as $html) {
    echo substr($html, 0, 20).PHP_EOL;
}

これを実行すれば、以下のような適切な結果を取得できます。

Running 3 jobs
1446989506.0509
1446989507.938
1446989510.4684
<!doctype html><html
<!doctype html><html
<!doctype html><html

ここで、$worker->stack(new SearchGoogle($search))を行っていないことに気づくでしょう。これは、ワーカーにそのジョブの参照を追跡させる必要があるためです。つまり、ワーカーを開始すると、ワーカーにスタックされたジョブは全てメインスレッドで何かを参照しなければなりません。これはかなり面倒なので、このためにPoolクラスと呼ばれるクラスが作成されました。見てみましょう。

ジョブのプーリング

Poolはクラスで、その目的は、ジョブを1つ以上のワーカーにディスパッチして、それらのジョブを管理することです。ドキュメントでは次のように説明されています。

プーリングによって、pthreadsに必要な方法での参照の管理を含む、ワーカー機能のより高いレベルの抽象化を行うことができます。

前の例を、プーリングで書き換えてみましょう。プールを作成する際、以下の3つの引数を渡す必要があります。

  • プールが同時に使用できるワーカーの数
  • プールが(クラス名の文字列を)使用するワーカーの種類
  • 作成する際にワーカーを渡す任意の引数

ネイティブのpoolとworkerクラスを使う今回の例では、新しいインスタンスは以下のようになります。

<?php
$pool = new Pool(5, Worker::class);

それから$pool->submit(<Threaded>)を呼び出せば、ジョブをプールにサブミットできます。ワーカーとの主な違いは、ジョブをプールにサブミットするとすぐにジョブが開始するということです。もう参照に対処する必要はありません。

<?php
// Create a pool and submit jobs to it
$pool = new Pool(5, Worker::class);
$pool->submit(new SearchGoogle('cats'));
$pool->submit(new SearchGoogle('dogs'));
$pool->submit(new SearchGoogle('birds'));

// Close the pool once done
$pool->shutdown();

前と同じように行えば、全てのジョブが同時に開始するのが見られるでしょう。全てが正常に機能します。それでは、結果を収集してみましょう。ワーカーなどを通してループしなくてもプール内のジョブとやり取りできるようにするため、プールには便利なcollectメソッドがあり、一種のフィルタとして機能します。それをクロージャに渡し、ジョブがプールにあるかどうかを返すことになります。通常はジョブが完了したかどうかを返します。

このために、pthreadsはCollectableクラスを提供します。そのクラスはThreadedを拡張するもので、2つの追加メソッドを持っています。setGarbageisGarbageによって、ジョブは完了したものとして記録され、ガベージコレクタに収集される準備ができます。Poolクラスを書いて、ジョブから結果を収集し、それらを廃棄してみましょう。

<?php
class SearchPool extends Pool
{
    public $data = [];

    public function process()
    {
        // Run this loop as long as we have
        // jobs in the pool
        while (count($this->work)) {
            $this->collect(function (SearchGoogle $job) {
                // If a job was marked as done
                // collect its results
                if ($job->isGarbage()) {
                    $this->data[$job->query] = $job->html;
                }

                return $job->isGarbage();
            });
        }

        // All jobs are done
        // we can shutdown the pool
        $this->shutdown();

        return $this->data;
    }
}

また、これはつまり、SearchGoogleジョブを編集してCollectableを拡張し、setGarbageを実行されたメソッドの最後で呼び出す必要があるということです。

<?php
class SearchGoogle extends Collectable
{
    public function __construct($query)
    {
        $this->query = $query;
    }

    public function run()
    {
        echo microtime(true).PHP_EOL;

        $this->html = file_get_contents('http://google.fr?q='.$this->query);
        $this->setGarbage();
    }
}

これで次のようなことが行えます。

<?php
// Create a pool and submit jobs to it
$pool = new SearchPool(5, Worker::class);
$pool->submit(new SearchGoogle('cats'));
$pool->submit(new SearchGoogle('dogs'));
$pool->submit(new SearchGoogle('birds'));
$pool->submit(new SearchGoogle('planes'));
$pool->submit(new SearchGoogle('cars'));

$data = $pool->process();
var_dump($data);

これを実行すれば、5つのジョブが同時に実行されたことだけでなく、それらのジョブが同時に完了し、簡単にジョブの結果が得られたことも分かります。

$ time php pooling.php
1446990493.7183
1446990493.7205
1446990493.7227
1446990493.7247
1446990493.7266
array(5) {
  'birds' => (53230) "<!doctype html>"...
  'cats' => (53211) "<!doctype html>"...
  'cars' => (53250) "<!doctype html>"...
  'dogs' => (53244) "<!doctype html>"...
  'planes' => (53267) "<!doctype html>"...
}
php pooling.php  0.51s user 0.03s system 100% cpu 0.940 total