Python
meinheld

Inside Meinheld

Inside Meinheld

先日、TCFMを聞いてたら「グリーンスレッドってなんぞ?」みたいな話が出て、少し思い出したことがあったので書いてみる。
システムコール云々に関しての部分は以下の記事を参考にして下さい。
最速最強Webサーバーアーキテクチャ

Meinheld

僕はかなり前に Meinheld という WSGI Server を書いた。
これは買収前のTestFlightでも使われていたし、MDNでは数年前から今でも使われている。
(このことはあまり知られていないのかも知れないが。DevToolでヘッダを確認するとserverのとこにmeinheldと出てるはずである)
まあ一般的にはパフォーマンスがウリのように思われている。
実際内部は非同期IOを使っていて速いんだけどこれはタダのWSGI Serverではない。
MeinheldのWSGIハンドラはグリーンスレッド上で動作している。
接続毎にグリーンスレッドをspawnし、コアのメインループで各グリーンスレッドを管理、スケジューリングして動作させているわけだ。
スケジューリングといっても特に難しいことはしていないのだがこのコア部分の事を知ってる人はあまりいない。
今回はメインループ、グリーンスレッドのスイッチイングなどコアな部分の書いてみたいと思う。

メインループ

上でも書いた通りメインループが全ての中心である。
コアを抜粋するとこんな感じである。

static PyObject *
meinheld_run_loop(PyObject *self, PyObject *args, PyObject *kwds)
{
    PyObject *watchdog_result;
    int silent = 0;
    int interrupted = 0;

    static char *kwlist[] = {"app", "silent", 0};
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|i:run",
                                     kwlist, &wsgi_app, &silent)) {
        return NULL;
    }

...

    init_main_loop();
    loop_done = 1;

    PyOS_setsig(SIGPIPE, sigpipe_cb);
    PyOS_setsig(SIGINT, sigint_cb);
    PyOS_setsig(SIGTERM, sigint_cb);


    if (listen_all_sockets() < 0) {
        //FATAL Error
        return NULL;
    }

    /* loop */
    while (likely(loop_done == 1 && activecnt > 0)) {
        fire_pendings();
        fire_timers();
        picoev_loop_once(main_loop, 10);
        if (unlikely(catch_signal != 0)) {
            if (catch_signal == SIGINT) {
                interrupted = 1;
            }
            catch_signal = 0;
            kill_server(0);
        }

        ...

    }

...

メインループは基本的に以下の処理を行う。

  1. 次回実行する予定の関数の呼び出し
  2. タイマーで実行する予定の関数の呼び出し
  3. picoevで各socketを監視

とりあえず1. 2.に関してはなくても良いので割愛する。
クライアントの接続時には3.のpicoevのイベントでaccept_callbackが呼ばれる。

static void
accept_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    int client_fd, ret;
    client_t *client;
    struct sockaddr_in client_addr;
    char *remote_addr;
    uint32_t remote_port;
    int finish = 0;
    if ((events & PICOEV_TIMEOUT) != 0) {
        // time out
        // next turn or other process
        return;
    } else if ((events & PICOEV_READ) != 0) {
        int i;
        socklen_t client_len = sizeof(client_addr);
        for (i=0; i<8; ++i) {
#if linux && defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
            client_fd = accept4(fd, (struct sockaddr *)&client_addr, &client_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
            client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
#endif

            if (client_fd != -1) {
                if (setup_sock(client_fd) == -1) {
                    PyErr_SetFromErrno(PyExc_IOError);
                    /* write_error_log(__FILE__, __LINE__); */
                    call_error_logger();
                    // die
                    loop_done = 0;
                    return;
                }
                remote_addr = inet_ntoa (client_addr.sin_addr);
                remote_port = ntohs(client_addr.sin_port);
                client = new_client_t(client_fd, remote_addr, remote_port);
                init_parser(client, server_name, server_port);

                finish = read_request(loop, fd, client, 1);
                if (finish == 1) {
                    if (check_status_code(client) > 0) {
                        //current request ok
                        if (prepare_call_wsgi(client) > 0) {
                            call_wsgi_handler(client);
                        }
                    }
                } else if (finish == 0) {
                    ret = picoev_add(loop, client_fd, PICOEV_READ, keep_alive_timeout, read_callback, (void *)client);
                    if (ret == 0) {
                        activecnt++;
                    }
                }
            } else {

            ...

パフォーマンス向上の関係で溜まっているfdを最大8読み込むようになっている。
acceptしたfdを元にclientを構築し、リクエストを読み取り始める。
非同期IOなのでリクエストを1回で全て読めるわけではない。
読めるだけ読んだら、次回読めるようになったら呼ばれるようread_callbackを仕込む。
仕込んだら次のfdを読みドンドン処理していく。
このあたりは非同期IOのサーバーでよくあるパターンだ。

static void
read_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    client_t *client = ( client_t *)(cb_arg);
    int finish = 0;

    if ((events & PICOEV_TIMEOUT) != 0) {
        finish = read_timeout(fd, client);

    } else if ((events & PICOEV_READ) != 0) {
        finish = read_request(loop, fd, client, 0);
    }
    if (finish == 1) {
        if (!picoev_del(main_loop, client->fd)) {
            activecnt--;
            DEBUG("activecnt:%d", activecnt);
        }
        if (check_status_code(client) > 0) {
            //current request ok
            if (prepare_call_wsgi(client) > 0) {
                call_wsgi_handler(client);
            }
        }
        return;
    }
}

read_callback内ではread_requestを呼び続きを読んでいく。
この時点でリクエストを読み切ったら実際WSGIハンドラを呼び出す準備をする。
prepare_call_wsgiはHTTP piplining絡みの処理なので割愛する。
次にいよいよWSGIハンドラの呼び出しである。

WSGIハンドラの呼び出しとグリーンスレッド

WSGIハンドラの呼び出しは以下のようになっている。

static void
call_wsgi_handler(client_t *client)
{
    PyObject *handler, *greenlet, *args, *res;
    ClientObject *pyclient;
    request *req = NULL;

    handler = get_app_handler();
    req = client->current_req;
    current_client = PyDict_GetItem(req->environ, client_key);
    pyclient = (ClientObject *)current_client;

    args = PyTuple_Pack(1, req->environ);
#ifdef WITH_GREENLET
    //new greenlet
    greenlet = greenlet_new(handler, NULL);
    // set_greenlet
    pyclient->greenlet = greenlet;
    Py_INCREF(pyclient->greenlet);

    res = greenlet_switch(greenlet, args, NULL);
    Py_DECREF(args);
    Py_DECREF(greenlet);

    ...

ここからグリーンスレッドをspawnするコードになる。
greenlet_newでラップされたWSGIハンドラを指定しグリーンスレッドを起こす。
グリーンスレッドは処理が終わると自動的に親のグリーンスレッドを呼び出すようになっている。
WSGIハンドラのグリーンスレッドの親のグリーンスレッドはメインループのスレッドになるようにしてある。
greenlet_switchでグリーンスレッドへスイッチし、処理を移譲する。
グリーンスレッドへ処理を移譲した時点でメインループは止まることになるわけだが、グリーンスレッド上の処理を行っている間にIO待ちが解消されることを期待するわけである。
ラップしたWSGIハンドラは以下のようになっている。

static PyObject *
app_handler(PyObject *self, PyObject *args)
{
    int ret, active;
    PyObject *wsgi_args = NULL, *start = NULL, *current = NULL, *parent = NULL, *res = NULL;
    PyObject *env = NULL;
    ClientObject *pyclient;
    client_t *client;
    request *req;
    response_status status;

...

    wsgi_args = PyTuple_Pack(2, env, start);
    res = PyObject_CallObject(wsgi_app, wsgi_args);
    Py_DECREF(wsgi_args);

    //check response & PyErr_Occurred
    if (res && res == Py_None) {
        PyErr_SetString(PyExc_Exception, "response must be a iter or sequence object");
        goto error;
    }
    //Check wsgi_app error
    if (PyErr_Occurred()) {
        goto error;
    }

    client->response = res;

    if (client->response_closed) {
        //closed
        close_client(client);
        Py_RETURN_NONE;
    }
    status = response_start(client);

#ifdef WITH_GREENLET
    while(status != STATUS_OK) {
        if (status == STATUS_ERROR) {
            // Internal Server Error
            req->bad_request_code = 500;
            goto error;
        } else {
            active = picoev_is_active(main_loop, client->fd);
            ret = picoev_add(main_loop, client->fd, PICOEV_WRITE, 300, trampoline_callback, (void *)pyclient);
            if ((ret == 0 && !active)) {
                activecnt++;
            }

            // switch to hub
            current = pyclient->greenlet;
            parent = greenlet_getparent(current);

            /* Py_INCREF(hub_switch_value); */
            res = greenlet_switch(parent, hub_switch_value, NULL);
            Py_XDECREF(res);

            // try again after event switch
            status = process_body(client);
        }
    }
    status = close_response(client);
    if (status == STATUS_ERROR) {
        //TODO logging error
    }
    // send OK
    close_client(client);

    ...

この部分は既にグリーンスレッド上で動作している。
PyObject_CallObjectがWSGIハンドラ本体の呼び出しで、レスポンスとしてfilelikeなオブジェクトをもらっている。
ここからがグリーンスレッドならではの処理になる。
response_startでレスポンスを送信しはじめるが、先ほども書いたが非同期IOなので全てが送信できるかどうかわからない。

ちなみにresponse_startはこんな感じでFileであればsendfileで送る。
start_response_xxxx, process_xxxxではそれぞれヘッダとボディを送れるだけ送る。

response_status
response_start(client_t *client)
{
    response_status ret ;
    if(client->status_code == 304){
        return write_headers(client, NULL, 0, 0);
    }

    if (CheckFileWrapper(client->response)) {
        DEBUG("use sendfile");
        //enable_cork(client);
        ret = start_response_file(client);
        if(ret == STATUS_OK){
            // sended header
            ret = process_sendfile(client);
        }
    }else{
        ret = start_response_write(client);
        DEBUG("start_response_write status_code %d ret = %d", client->status_code, ret);
        if(ret == STATUS_OK){
            // sended header
            ret = process_write(client);
        }
    }
    return ret;
}

話を戻すとレスポンス送信時以下の部分、この部分を一見見ると無限ループしそうなコードである。
これこそがグリーンスレッドなコードの肝である。

    while(status != STATUS_OK) {
        if (status == STATUS_ERROR) {
            // Internal Server Error
            req->bad_request_code = 500;
            goto error;
        } else {
            active = picoev_is_active(main_loop, client->fd);
            ret = picoev_add(main_loop, client->fd, PICOEV_WRITE, 300, trampoline_callback, (void *)pyclient);
            if ((ret == 0 && !active)) {
                activecnt++;
            }

            // switch to hub
            current = pyclient->greenlet;
            parent = greenlet_getparent(current);

            /* Py_INCREF(hub_switch_value); */
            res = greenlet_switch(parent, hub_switch_value, NULL);
            Py_XDECREF(res);

            // try again after event switch
            status = process_body(client);
        }
    }

STATUSがERRORでない場合には書き込み可能時に呼ばれるコールバックにtrampoline_callbackを仕込む。
その後、greenlet_switchでメインループへ処理をスイッチする。
この時点でコレ以下のコードは一切呼ばれない。
スイッチすると言っても具体的にどこから再開するのだろうか?
答えはWSGIハンドラのgreenlet_switchの下からである。

static void
call_wsgi_handler(client_t *client)
{
    ...

    res = greenlet_switch(greenlet, args, NULL);
    // ここから復帰

そしてこの後、メインループの続きの処理を行う。
さて、WSGIハンドラのグリーンスレッドは止まった状態である。
ではどのタイミングでどうやってスイッチして処理を戻せばいいのだろう?

ここでグリーンスレッドによるIOトランポリンというテクニックを使う。
(この手法はeventletで初めて実装され、そのような名前で呼ばれていた)
先ほど、書き込み可能になったらtrampoline_callbackを仕込んだ。
このtrampoline_callbackは書き込み可能になったfdを処理しているグリーンスレッドにスイッチする仕組みになっている。

static void
trampoline_callback(picoev_loop* loop, int fd, int events, void* cb_arg)
{
    o = (PyObject*)cb_arg;

    if (CheckClientObject(o)) {
        pyclient = (ClientObject*)cb_arg;
        client = pyclient->client;

        if ((events & PICOEV_TIMEOUT) != 0) {

            RDEBUG("** trampoline_callback timeout **");
            //timeout
            client->keep_alive = 0;
            PyErr_SetString(timeout_error, "timeout");
        }
        YDEBUG("resume_wsgi_handler");
        resume_wsgi_handler(pyclient);

    ...


static void
resume_wsgi_handler(ClientObject *pyclient)
{
    PyObject *res = NULL;
    PyObject *err_type, *err_val, *err_tb;
    client_t *old_client;
    client_t *client = pyclient->client;

    //swap bind client_t

    old_client = start_response->cli;
    start_response->cli = client;

    current_client = (PyObject *)pyclient;
    if (PyErr_Occurred()) {
        PyErr_Fetch(&err_type, &err_val, &err_tb);
        PyErr_Clear();
        //set error
        res = greenlet_throw(pyclient->greenlet, err_type, err_val, err_tb);
    } else {
        res = greenlet_switch(pyclient->greenlet, pyclient->args, pyclient->kwargs);
    }
    start_response->cli = old_client;

    Py_CLEAR(pyclient->args);
    Py_CLEAR(pyclient->kwargs);
    Py_XDECREF(res);
}

resume_wsgi_handler内部で再度保持していたデータからグリーンスレッドを取り出し、そこへスイッチする。

// try again after event switch
status = process_body(client);

そうすると先程グリーンスレッドで止まってしまっていた箇所、先程のレスポンスの処理をするwhileループの上記のとこに戻ってこれるわけである。
書き込み可能になったことがトリガーでスイッチイングしてるのでここでのprocess_body内でのレスポンスの送信はEAGAIN, EWOULDBLOCKにならない。
量は少ないかも知れないが送信は必ず成功する。
Meinheldのパフォーマンスが良いのはこのような無駄な処理、待ち時間を極力減らすようになっているためでもある。

このようにグリーンスレッドを使うと非同期IOでもコールバック地獄ではなく同期的なループで送信するかのようにコードを書くことができる。
まあグリーンスレッドを使っている真の目的はWSGIハンドラ上のネットワーク処理を全て非同期にしつつ同期的に書けるようにするためなのだが、それはまたの機会に。
以前から話そうと思っていたMeinheldのコアをざっと説明したがもう少し綺麗にしたものが以下にある。これはWSGI Serverではなくもう少し低レベルなライブラリになる。
https://github.com/mopemope/jega
もし興味がある人がいれば見てみるといいだろう。