Python 3 の asyncio で http サーバーを書く

  • 6
    Like
  • 0
    Comment
1473682442 t2y
posted at

先日 JJUG CCC 2017 FALL に参加してきました。

イベントの中で 田所 駿佑 さんによるセッションでプログラミング言語の勉強のためのお題として HTTP サーバーを書こうといった発表がありました。

おおたに さんと一緒にセッションを聞いてセッションが終わってから

HTTP サーバーを書きたくなりましたよね?

と問われ、そのときはセッションを聞いてテンションも上がっていたのもあり「あー、そうですね。。。」と相槌をうっていました。

その後、イベントが終わって新宿駅へ歩いていった帰り際にも

帰ったら HTTP サーバーを書くんですよね?

と念を押されました。たぶん帰るまでに3回以上言われた気がします。

fushigina-tikara320x240.png
出典: 不思議な力で死ぬ事になるの元ネタ

じゃないかと思って HTTP サーバーを書いてみました。 1

閑話休題。

Python は私にとって新しい言語ではないのですが、今回は自分が使ったことのないライブラリの勉強のために書いてみることにしました。

これは私の経験談でもありますが、全く同じ処理でも何年か経ってから書き直してみると、以前よりライブラリをうまく使えるようになっていたり、簡潔で効率のよい実装を書けたりする場合があります。そんなとき、プログラマーとしての成長を自分自身で実感するよいチャンスだと思います。楽しいです。

HTTP サーバーの実装

発表の中でも学習のために simple http server の参照実装として Python の標準ライブラリにある http.server を参考にすればいいのではないかと説明されていました。実は私も昔そのサーバーをベースにしてちょっとだけ拡張した http server を書いたこともありました。

当初はこれを書き直そうかとも考えたのですが、本セッションの趣旨である「勉強するためのお題として書く」から離れてしまう気がしたので Python 3.4 で導入され 3.6 で API が stable となった asyncio を使ってみることにしました。

asyncio とは

詳細な説明は公式ドキュメントやリファレンス (後述) にあげたスライドを読んでもらうとして、Python でコルーチンを使った非同期プログラミングをするための仕組みです。低レベルなプリミティブも扱えるのでフレームワークを作るための部品なども提供しています。

10ヶ月ぐらい前に Python 3.6 リリースパーティー というイベントを開催しました。そのときに asyncio について調べて発表しました。業務では Python を使っていないのもあり、その後 asyncio を使う機会はありませんでした。

ちょうどよい機会なので asyncio の勉強に HTTP サーバーを書いてみます。

以下にソースコードを置いてあります。

リファレンス

asyncio のチュートリアル

asyncio を使ってコードを書き始めるとき、どこのチュートリアルを参考にすればいいのか?この質問に回答するのは難しいです。ググればたくさんの asyncio のチュートリアルはみつかると思います。私が唯一知っている優れたチュートリアルとして PyMOTW-3 の ayncio をあげておきます。

もちろん公式ドキュメントにもサンプルコード自体はたくさんありますが、チュートリアルとして丁寧に解説されているものではないため、非同期プログラミングに慣れていないと分かりにくいかもしれません。

今回私が実装した HTTP サーバーは公式ドキュメントの中の TCP Echo サーバーのサンプルコードを参考にしています。

リファレンス

イベントループとソケットと並行処理

asyncio を使うときは最初にイベントループを取得します。そのイベントループを使ってサーバーを構築します。

ここでは asyncio.start_server を使っています。その第一引数に コルーチン関数 をコールバックとして渡すことで非同期に処理できます。

    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(http(args), args.host, args.port, loop=loop)
    server = loop.run_until_complete(coro)
    log.info('start simple http server, %s:%d' % (args.host, args.port))

    try:
        loop.run_forever()
    except KeyboardInterrupt:  # Serve requests until Ctrl+C is pressed
        pass
    finally:
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()

コマンドラインから受け取った args をコルーチン関数に渡す方法が分からなくて関数をラップして名前解決できるようにしたものが以下になります。

def http(
        args: argparse.Namespace
        ) -> Callable[[StreamReader, StreamWriter], Optional[Awaitable[None]]]:
    args.public_path = os.path.normpath(args.top_dir)
    args.bad_request_html = open('%s/400.html' % args.public_path, 'rb').read()
    args.forbidden_html = open('%s/403.html' % args.public_path, 'rb').read()
    args.not_found_html = open('%s/404.html' % args.public_path, 'rb').read()

    async def async_http(reader: StreamReader, writer: StreamWriter) -> None:
        log.info('start callback')
        request_line: bytes = await reader.readline()
        if not request_line:
            writer.close()

        data = request_line.decode()
        request = await parse_http(data)

        response = handle_request(args, request)
        response.write_to(writer)
        await writer.drain()

        log.info('end callback')
        writer.close()

    return async_http

ここで pep-0492 の New Coroutine Declaration Syntax によると async def で定義した関数を native coroutine と呼びます。

一方で @asyncio.coroutine でデコレートされたものを generator-based coroutine と呼びます。generator-based coroutine は過去のジェネレーターを使ったものをコルーチンとして扱う互換性のために用意されています。基本的に新規でコードを書くときは async def を使えばよいのではないかと私は解釈しています (ちゃんと調査していないのでも間違っていたらご指摘ください) 。

閑話休題。asyncio.start_server の第一引数にはコルーチン関数をコールバックとして渡すとあったので async def で定義した aync_http というコルーチン関数を返して渡します。その async_http は2つの引数、StreamReaderStreamWriter を取ります。これらによりソケットの読み書きが抽象化されています。

そして、コルーチン関数内に Await Expression があるのが目につきます。

    ...
    request_line: bytes = await reader.readline()
    ...
    request = await parse_http(data)
    ...
    await writer.drain()

await で扱えるのはコルーチンのみなのでここで実装している parse_http() もコルーチンとして定義します。

async def parse_http(data: str) -> Request:                                         
    ...

他の StreamReader.readline も StreamWriter.drain もコルーチンです。asyncio では await を書いたところで asyncio におけるコンテキストスイッチが発生します。つまり、CPU を使わない、例えば I/O 処理が発生することをプログラマーがあらかじめ分かっているのであれば、そういったところで CPU を有効活用するために await を使うことで他のコルーチン処理に切り替えるといったことを明示できます。

このように async def で定義したコルーチン関数と await でコンテキストスイッチを意識しながらコーディングしていくというのが asyncio における非同期プログラミングの取っかかりになると思います。

別の実装方法

ドキュメントを眺めていたらもう1つ別のサンプルコードをみつけました。

asyncio.Protocol を継承して実装する方法もあります。先ほどの例ではソケットとのやり取りが StreamReader と StreamWriter に抽象化されていましたが、こちらの例では直接扱うこともできるようにみえます。

asyncio.Protocol を使って、ほぼ同様に実装したものが以下になります。

def http(args: argparse.Namespace) -> Any:  # FIXME: forward reference?             
    args.public_path = os.path.normpath(args.top_dir)                               
    args.bad_request_html = open('%s/400.html' % args.public_path, 'rb').read()  
    args.forbidden_html = open('%s/403.html' % args.public_path, 'rb').read()       
    args.not_found_html = open('%s/404.html' % args.public_path, 'rb').read()       

    class AsyncHTTP(asyncio.Protocol):                                              

        def connection_made(self, transport: Any) -> None:                          
            peername = transport.get_extra_info('peername')                         
            if peername is None:                                                    
                return                                                              

            log.info('Connection from %s:%d' % peername)                            
            self.transport = transport                                              

        def data_received(self, data: bytes) -> None:                               
            message = data.decode()                                                 
            request = parse_http(message)                                           

            response = handle_request(args, request)                                
            response.write_to(self.transport)                                       

            log.info('Close the client socket')                                     
            self.transport.close()                                                  

    return AsyncHTTP

ベンチマークツールで並行にリクエストしてみる

とりあえず動くようになったら標準ライブラリにある http.server と自分で実装した asyncio HTTP サーバーにベンチマークをかけてみましょう。

$ python -m http.server 8080
$ ab -n 500 -c 50 http://localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
apr_socket_recv: Connection reset by peer (54)
Total of 131 requests completed

例えば、私の環境では http.server は50コネクションでそれぞれ10リクエストするといった負荷をかけると、すべてのリクエストを処理できずにエラーになってしまいました。

これが ayncio HTTP サーバーだと以下のように処理できます。

$ simple-http-server --top-dir ./public/
$ ab -n 500 -c 50 http://localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Finished 500 requests


Server Software:        Simple
Server Hostname:        localhost
Server Port:            8080

Document Path:          /
Document Length:        404 bytes

Concurrency Level:      50
Time taken for tests:   0.515 seconds
Complete requests:      500
Failed requests:        0
Total transferred:      287000 bytes
HTML transferred:       202000 bytes
Requests per second:    971.61 [#/sec] (mean)
Time per request:       51.461 [ms] (mean)
Time per request:       1.029 [ms] (mean, across all concurrent requests)
Transfer rate:          544.63 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    4   2.5      3      11
Processing:     9   46   8.6     45      63
Waiting:        4   35   9.9     37      54
Total:         19   50   7.8     48      65

Percentage of the requests served within a certain time (ms)
  50%     48
  66%     51
  75%     55
  80%     57
  90%     62
  95%     63
  98%     65
  99%     65
 100%     65 (longest request)

macOS の ulimit の設定方法を私はよく分かっていないのですが、asyncio HTTP サーバーもだいたい200コネクションぐらいで成功したり、失敗したりするようになりました。

$ ulimit -n 2048
$ ab -n 2000 -c 200 http://localhost:8080/

テストと型ヒント

HTTP サーバーとは関係ないのですが、Python 3.5 から 型ヒント (Type Hints) が導入され、その型チェッカーとして mypy の成熟度も上がってきているようにみえます。

このコードは勉強用途なのでどんどん新しいことを試してみましょう。型ヒントをセットするには以下のチートシートをみればほとんどのケースは大丈夫だと思います。主には関数の引数と返り値の型を書くだけでローカル変数などは mypy が (ちゃんと機能すれば) 型推論してチェックしてくれるように思います。

さらに 3.6 で導入された 変数アノテーション も使ってみましょう。これを使うためにこのコードは 3.6 でしか動きません。

$ mypy --strict simple_http_server

適当に型ヒントを書いて mypy を実行すると、例えば、以下のように型推論の結果と違うよと mypy が警告してくれたりします。

simple_http_server/protocols.py:30: error: Incompatible return value type (got "Callable[[StreamReader, StreamWriter], Awaitable[None]]", expected "Coroutine[Any, Any, Any]")
simple_http_server/main.py:55: error: Argument 1 to "start_server" has incompatible type "Coroutine[Any, Any, Any]"; expected "Callable[[StreamReader, StreamWriter], Optional[Awaitable[None]]]"

私は普段 pytest を使っているのですが、このテストも pytest に組み込みんでしまいましょう。pytest-mypy というプラグインがあったのでそれを使います。

[tox]
envlist = py36

[testenv]
deps =
    pytest
    pytest-pycodestyle
    pytest-flakes
    pytest-mypy
commands = py.test -v --mypy --pep8 --flakes simple_http_server

tox 経由だと以下のようにテストを実行します。

$ tox -e py36

リファレンス


  1. おおたにさんはこんなことを言ってますが、私は言ってません ((((;゜Д゜))) おおたにさんも
    kotlin で実装しています。