はじめに
初めまして。新人ツチノコの大山裕泰です。私もツチノコの一人ですが、比較的容易に発見できる部類のツチノコだと思います。勉強会等で見つけてくださった際には、是非お声掛けください。
最近 Erlang を触り始め、ちょうど良いネタが見つかったので書いてみました。
余談ですが、最近の Erlang の人気の高まりは恐らく Riak などのが注目されているためだと思いますが、Erlang エンジニアの人口が少ない ためか、Erlang について扱った記事やブログなどはあまり多くありません。
とかく今回取り上げる prim_inet モジュールは、Erlang の標準ライブラリに組み込まれていながら、本稿執筆時点での ユーザガイド にも リファレンスマニュアル にも載っていない隠し機能としか言いようがないにも関わらず、RabbitMQ とかではフツーに使われていたりします。
そんなわけで今回は prim_inet モジュールの一部機能の使い方について、簡単なサンプルと共に紹介したいと思います。Erlang を知らない人でも何となく理解してもらえるように頑張ってみました。
Erlang ってどんな言語?
Erlang は超並列なシステムをプログラムするのに適しており、全てのプロセス間において共有メモリを無くすことで安全な並列性を実現しています。また、言語システム自体に分散システムをサポートする機能を提供しており、システム全体で数千から数十万単位のプロセスによる超並列・分散システムを構築する用途で広く利用されています。
そして Erlang の最も象徴的な特徴は、OTP (Open Telecom Platform) と呼ばれるプロセスの汎用的な挙動を規定したテンプレートにあります。OTP のような
他の言語の例で言い換えると、Erlang 言語自体が Ruby で、OTP が Rails といった感じです。ただ OTP のスキームは Rails よりも遥かに厳格です。Erlang と OTP のようなプロセスの挙動をガチガチに規定した仕組みを用いる事で、ある意味簡単に超並列で堅牢な分散システムを構築できるようになります。
本題
prim_inet は ERTS (Erlang Run-Time System Application) に組み込まれているライブラリの一つで、TCP, UDP, SCTP プロトコル通信に関する様々な機能を提供しています。ERTS とは Erlang のランタイムシステムの実行において必須機能を提供する Kernel アプリケーション と、OTP 機能をはじめとする 標準ライブラリ から構成されるアプリケーション群です。
今回は、簡単な Echo Server の実装を通して prim_inet モジュールによるノンブロッキング通信システムの実装方法について見て行きます。Echo Server とは RFC862 で規定されており、クライアントからの接続後に送られたデータを、そのクライアントに対して送るサーバになります。
今回実装するサーバのプロセスモデルを以下に示します。サーバ側では一つの “Server” プロセスを生成し、ユーザ側からの TCP 接続毎に Echo Serverの実装である “Worker” プロセスを生成し、以降のデータ通信は “Worker” で行います。
prim_inet での実装について見る前に、一般的に TCP 通信を行う際に利用する gen_tcp モジュール を利用した場合の実装について見て行きます。
gen_tcp, prim_inet それぞれを利用して実装した Echo Serverを以下に用意しました。
https://github.com/userlocalhost2000/echo-server
まずは gen_tcp 側の実装 “generic_server” を見て行きます。ビルド方法、及び使い方はリポジトリの README を参照してください。
generic_server では、OTP によるクライアント・サーバモデルのプロセスを一つ起動させる。プロセスの起動後、以下に示す init 関数が呼ばれる。
init([Port]) -> case gen_tcp:listen(Port, []) of {ok, Socket} -> wait_connect(Socket, 0) end. wait_connect(ListenSocket, Count) -> {ok, Socket} = gen_tcp:accept(ListenSocket), % delegate Socket control to new process gen_tcp:controlling_process(Socket, spawn(?MODULE, recv, [Socket])), wait_connect(ListenSocket, Count + 1).
ここではクライアントからの TCP の接続を待ち受ける wait_connect/2 関数を呼び出します。この短い関数内に Erlang の幾つかの特徴的な処理がみられます。
まず関数の評価に対して、パターンマッチ と呼ばれる仕組みが利用されます。この仕組みでは、関数の呼び出しに対して、呼び出し時に指定された実引数と、定義された関数の仮引数の数、及び各仮引数の型・値の比較を行い、これにマッチした関数の評価を行います。なので、同名の関数でも引数の数が異なる別関数が存在することがあるため、関数名の表記には仮引数の数を表す数字を末尾につけ wait_connect/2 のように表現します。
パターンマッチは関数評価以外にもあらゆる場面で利用されるとても強力な仕組みです(wait_connect の一行目でもこれを利用し、返礼値のチェックとエラーハンドリングを同時に行っています)。言語機能としてパターンマッチをサポートしている言語に Scala などがあります。
wait_connect 関数では、クライアントからの接続が来た場合に、Echo Serverのワーカプロセスを生成し、再びユーザからの接続要求を待ち受ける処理に戻ります。C や PHP などの構造化プログラミング言語で言うところの無限ループです。しかし Erlang では Haskell などと同様に、変数は単一代入 (値を持っている変数に別の値を持たせられない) の原則があるためループ処理が言語として実装できません。そのため、ループ相当の処理を再帰などを用いて行います。
ここで wait_connect 関数内部の処理について見て行きます。内部では、一行目でユーザからの接続を待ち受け、接続が来た場合にはワーカプロセス (Echo Server) を生成し、以降のクライアントとの通信処理をワーカプロセス側で行わせるように設定し、自分自信 (wait_connect/2) を呼び出して再びユーザからの接続を待ちます。
wait_connect の一行目で実行している gen_tcp:accept/1 が generic_server の特徴になります。これによりユーザからの接続が来るまで処理をブロック(他の処理を行わないで、ひたすた接続が来るまで待っている状態になる)してしまいます。
では prim_inet を利用して、ユーザからの接続を待ち受ける処理をノンブロッキングで行うようにします。nonblocking_server では wait_connect の代わりに prim_inet:async_accept/2 を呼び出します。この関数呼び出しはノンブロッキングで、呼び出し元に処理が返ってきます。
ユーザからの TCP 接続が来た場合には、handle_info/2 関数でハンドリングします。内部では、generic_server と同じように、prim_inet:assync_accept を呼び出して再びユーザからの接続を待ち受ける処理と、ワーカプロセスの起動処理を行っています。一点、接続してきたクライアントのソケットとサーバ側のソケットのソケットオプションを合わせる設定をするための関数 (set_sockopt/2) 呼び出しだけ追加されています。
handle_info({inet_async, ListSock, _Ref, {ok, Socket}}, _State) -> prim_inet:async_accept(ListSock, -1), % delegate Socket control to new process gen_tcp:controlling_process(Socket, spawn(?MODULE, recv, [Socket])), % sync Client Socket options with Server Socket set_sockopt(ListSock, Socket), {noreply, 0}.
最後にワーカプロセス (Echo Server) の実装を以下に示します。ワーカプロセスは generic_server, nonblocking_server それぞれにおいて同じになります。基本的にユーザから送られたデータを返し、”quit” というメッセージが来た場合にだけはコネクションを閉じる処理を行います。
recv(Socket) -> receive {tcp, Socket, Data} -> io:format("receive data: ~p~n", [Data]), case string:substr(Data, 1, 4) of "quit" -> close_socket(Socket); _ -> gen_tcp:send(Socket, "(Response) >> " ++ Data), recv(Socket) end; {tcp_closed, Socket} -> close_socket(Socket) end.
おわりに
OTP についてはごくごく一部しか触れられませんでしたが、OTP とその背後にあるプロセスの監視機能について理解できれば Erlang のコードが読めるようになると思います。
今回の内容が Erlang についての興味や理解の助けになれたなら幸いです。最後まで読んでくださり、ありがとうございました。