hp12c このページをアンテナに追加 RSSフィード Twitter

2011-09-29

irbから学ぶRubyの並列処理 ~ forkからWebSocketまで

世の中は並列化花ざかりだよ

人間はシングルタスクのままなのに

プログラミングするときは

マルチタスクが要求されるなんて

世知辛い世の中になったものだね


でも情報革命は始まったばかりだから

愚痴ってばかりもいられないよ

自分がその波にうまく乗れないとしても

うまく乗ってる人の様を

間近で見てみたいと思うんだ



そんなわけで..


Rubyのfork Thread Reactor EventMachine

WebSocketなどの並列化について少し学んだので

自分の理解をここにまとめておくよ


REPL

irbはRubyにおける対話型の実行環境だよ

これは一般にはREPLと呼ばれてるんだ

REPLはユーザの入力を

読み取り(Read)

評価し(Eval)

出力する(Print) 処理を

繰り返すよ(Loop)


irbのコードは5000行にものぼるらしいけど

その核心は次のように1行で書けるよ

 loop{ puts eval gets }

getsでユーザ入力を読み取り

evalで評価し

putsで出力する処理を

loopで繰り返す

これじゃGEPLだけどね:)


このコードを保存して(gepl.rb)

実行してみよう

$ ruby gepl.rb
%w(ruby lisp haskell).map(&:upcase)
RUBY
LISP
HASKELL       

"hello, repl!".gsub('r','g')
hello, gepl!  

ちゃんと動いてるね

Ctrl+Cで終了するよ


通常loopは無限ループを生成するけど

先のコードではgetsのところで処理が止まり

ユーザからの入力を待ち受ける

ここがポイントだよ


ちなみにこのコードは

その入出力を明示的にして

次のようにも書けるね

loop do
  input = $stdin.gets
  output = eval(input)
  $stdout.puts output
end

デフォルトグローバル変数$stdinと$stdoutには

標準入力 標準出力がセットされてるから

キーボードからの入力が読み取られ

ディスプレイに出力がなされるんだ


マルチユーザーREPL

REPLは1ユーザに対する対話環境だよ

でも複数ユーザで使えたらもっとうれしいよね

どうすればいい?


そうだよ

入出力と評価(eval)を切り離せばいいんだよ

いわゆるクライアント・サーバー方式だね

クライアントからの入力をサーバーに渡して評価し

結果をクライアントに出力する


じゃあ早速REPLサーバーを書いてみるよ

#repl_server.rb
require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept   # clientからの接続を待つ
  
  begin
    loop { client.puts eval client.gets }
  rescue
  ensure
    client.close
  end
end

Rubyならこんなに簡単に書けちゃうんだ

TCPServer.newでサーバーインスタンスを生成し

acceptメソッドでクライアントからの接続を待ち受けるよ

クライアントが接続したら

getsでユーザからの入力を評価し結果をユーザに返す

接続したクライアント(これをソケットと呼ぶよ)からgetsし

ソケットにputsしてるところがポイントだよ


$stdin $stdoutの参照先を

クライアントのソケットに切り替えるやり方にすると

最初のコードとの違いがはっきりするかもね

#repl_server.rb
require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  begin
    $stdin, $stdout = client, client
    loop { puts eval gets }
  rescue
  ensure
    client.close
    $stdin = STDIN
    $stdout = STDOUT
  end
end

ensure節ではこれらの後処理をしているよ


サーバーを立ち上げて

クライアントから接続してみようよ

telnetを使うね

$ telnet localhost 60000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
3.times { puts "Hello, friend" }
Hello, friend
Hello, friend
Hello, friend
3

いい感じだね

Ctrl+]に続きquitでtelnetの接続を切るよ


並列化REPL

でも先のサーバーには大きな問題があるよ

誰か一人が接続していると

他の人が接続できないつまり

複数の人が同時に使えないんだよ

先のコードで1つの接続がacceptされると

loop内のgetsは

その接続先ユーザからの入力を待ち続けることになる

でそのユーザの接続が切れてはじめて

処理はループされacceptで

別の接続を待ち受けられるようになるんだ

これは大問題だね

複数のターミナルから接続して試してみればわかるよ


さあここで並列化の出番だよ


Rubyで並列化を実現するにはいくつかの方法があるよ

ちょっとどんなやり方があるか考えてみてくれる?


個別接続による並列化

もっとも単純な方法は処理が終わるたびに

ユーザからの接続を毎回切る方法だよ

前のユーザの接続が切れれば

サーバーは別の接続を待てるからね

まあ

これを並列化と呼ぶのはどうかとも思うけど..


コードは次のような感じになるかな

require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  client.puts eval client.gets
  client.close
end

evalしたものをクライアントに返したら

そのソケットを閉じる

これによってそのクライアントの接続は切れるから

別のクライアントからの接続を待ち受けられるようになる


構成がシンプルでいいんだけど

ユーザにとってはちょっと面倒だよね

使うたびに接続し直さなきゃならないからね

なんかWebサーバーみたいだよね..


forkによる並列化

2つ目は複数のプロセスを起動する方法だよ

Rubyでプロセスを並列化するにはKernel#forkを使うよ


forkのブロックで囲まれたコードは別プロセスで起動されるから

loop{}のところをforkのブロックに投げればよさそうだね

やってみるよ

require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  fork do    # 別プロセスで起動
    begin
      loop { client.puts eval client.gets }
    rescue
    ensure
      client.close
    end
  end
end

acceptでクライアントが接続すると

forkで別プロセスが起動されて

その中でgetsの待ち受けがされるけど

メインプロセスは外側のループで先頭に戻り

これでacceptで別のクライアントの接続を待てるね


じゃあ複数のtelnetから接続して試してみよう

$ telnet localhost 60000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1 + 2
3
---------------------------------------
$ telnet localhost 60000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
[2011,10,1].join '-'
2011-10-1

うまくいったね


念のためこの状況で

複数のプロセスが立ってるか確認してみるよ

$ ps aux | grep repl_server
keyes     1335   0.3  0.1  2448356   1176 s001  R+    3:01PM   0:00.90 ruby repl_server.rb
keyes     1303   0.3  0.0  2448356   1040 s001  S+    2:59PM   0:01.30 ruby repl_server.rb
keyes     1301   0.3  0.2  2448356   3756 s001  R+    2:59PM   0:01.34 ruby repl_server.rb

3つのプロセスが立ってるのがわかるね


ただプロセスは個々に独立したメモリ空間を専有するから

接続ユーザ数が多くなるとちょっと心配だよね

またチャットサーバーのように

ユーザ間での情報のやり取りが必要な場合

プロセス間で通信させなきゃならないから

そんなときはちょっと厄介そうだよね


Threadによる並列化

3つ目はスレッドを使う方法だよ

スレッドは1つのプロセス内で処理を並走させる仕組みだよ

並走する処理は同じプロセス内にあるから

その間でのデータ共有が容易という利点があるんだ


じゃあThreadクラスを使ったサーバーを書いてみるよ

require "socket"

server = TCPServer.new(60000)
loop do
  client = server.accept
  
  Thread.new(client) do |cl|
    begin
      loop { cl.puts eval cl.gets }
    rescue
    ensure
      cl.close
    end
  end
end

forkをThread.newに変えればいいだけだから簡単だね

ただスレッドは同じプロセス内で並走するから

acceptしたclientをブロック引数を通して

ちゃんと渡さないと問題が生じるよ


同じように複数のtelnetから接続してみるよ

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
[*1..10].select{|i| i.even? }
2
4
6
8
10
-------------------------------
Connected to localhost.
Escape character is '^]'.
Array.ancestors
Array
Enumerable
Object
Kernel
BasicObject
-------------------------------
$ ps aux |grep repl_server
keyes     1712   0.3  0.2  2451964   3808 s001  S+    7:03PM   0:00.24 ruby repl_server.rb

プロセスは1つのままだってこと確認できるよね


前でスレッドがマルチプロセスよりも

データ共有が容易って書いたから

そのサンプルも書いてみるよ

サーバーからの出力を

接続しているすべてのクライアントに出力する例だよ

require "socket"

server = TCPServer.new(60000)
clients = []   # 接続クライアントの管理用配列
loop do
  client = server.accept
  clients << client  # 接続クライアントを登録
  
  Thread.new(client) do |cl|
    begin
      loop do
        output = eval cl.gets
        clients.each { |c| c.puts output }  # 結果を全クライアントに配信
      end
    rescue
    ensure
      cl.close
      clients.delete(cl) # 切断したクライアントを管理対象外に
    end
  end
end

接続クライアント管理用の配列を用意して

結果を全員にブロードキャストすればいいね

簡単だね


ただスレッドモデルはスレッド間で共有するデータを

書き換えるような場合の取り扱いがちょっと厄介だよ

それとやっぱり各スレッドごとに

いつ来るかわからないデータを待っている

というのが無駄といえば無駄だよね



Reactorパターンによる並列化

4つ目はReactorパターンを使って並列化する方法だよ

Reactorパターンというのは簡単に言うと

一箇所でいろいろなイベントを待ち受けて

イベントが来たらこれに反応(リアクト)して

その種類に応じた処理を実行するモデルのことだよ

RubyでReactorパターンを実現するには

IO.selectメソッド(またはKernel#select)を使うよ


早速Reactor版REPLサーバーを書いてみるよ

REPLサーバーにおけるイベントには

サーバーに対するものつまりクライアントの接続と

クライアントに対するものつまりソケットへのデータ入力があるよ

これらをsocketsという配列で管理しよう

require "socket"

server = TCPServer.new(60000)
sockets = [server]
loop do
  r_sockets = IO.select(sockets)[0] # すべてのイベントを待ち受ける
  r_sockets.each do |socket|
    case socket
    when TCPServer     # サーバーに対するクライアントの接続があったとき
      client = socket.accept
      sockets << client
    when TCPSocket     # クライアントに対するデータ入力があったとき
      unless socket.eof?
        socket.puts eval socket.gets
      else
        socket.close
        sockets.delete(socket)
      end
    end
  end
end

IO.selectは登録したソケットに対する

入力/出力/例外のイベントを待ち受け

そのイベントが発生したソケットを返すけど

返り値はイベント別のソケットの配列になっているよ

ここでは入力イベントだけに関心があるから

配列の第一要素のみ取り出してるよ


そしてcase式でイベントのあった

ソケットの種類に応じて処理を切り分けてるよ

つまりサーバーがクライアントからの接続を受けたときは

TCPServerの節に入って

sockets配列に接続のあったクライアントが登録され

ループでselectに戻って次のイベントを待つよ

最初のクライアントの接続時にはsockets配列には

serverしか登録されていないから

処理は必ずここに来ることになるよ


一方クライアントにデータの入力があったときは

TCPSocketの節に入って入力データの処理をするよ

入力データがあるときはそれを評価し結果を返し

無いときはソケットを閉じてその接続を解放するよ

そしてループでまたselectに戻って次のイベントを待つよ


Reactorパターンでは

すべてのクライアントの接続は維持されたままなのに

処理が並走しないつまり単一プロセス単一スレッドで

複数クライアントからの要求に応じることができる

という点がユニークだよ

このモデルなら処理が並走することはないので

共有データを書き換えるようなことも簡単にできるよね


EventMachineによる並列化

ただwhen式でのソケットの切り分け処理が

面倒といえば面倒だよね

でも安心していいよ

EventMachineというライブラリを使えば

これが驚異的に簡単にできちゃうんだよ

require "eventmachine"

EM.run do
  EM.start_server('localhost', 60000) do |c|
    def c.receive_data(data)
      send_data eval(data)
    end
  end
end

EventMachineはRreactorパターンによる

イベント駆動型のI/Oインタフェースを提供するライブラリだよ

JavaScriptのNode.jsみたいなものなんだろうね


もうコードを見れば分かると思うけど

EM.runでイベントループが開始されて

クライアントからのデータ入力があると

receive_dataメソッドが呼び出されるので

ここでsend_dataを呼んでevalした入力を返せばいいんだ


EventMachineを使えば

チャットサーバーだって簡単に書けちゃうんだ

require "eventmachine"

module Chat
  @@channel = EM::Channel.new
  def post_init
    puts "-- someone connected"
    @sid = @@channel.subscribe { |data| send_data ">> #{data}" }
  end

  def receive_data(data)
    @@channel.push data
  end

  def unbind
    puts "-- someone disconnected from the server"
    @@channel.unsubscribe(@sid)
  end
end

EM.run do
  EM.start_server('localhost', 60000, Chat)
end

EMサーバーはクライアントの接続があるたびに

その引数でセットしたChatモジュールをインスタンス化して

その監視対象として登録するよ*1

各インスタンスのpost_initメソッドはその接続時に

unbindメソッドはその切断時に呼び出され

receive_dataは先の例と同様にデータ受信時に呼び出されるよ


データをブロードキャストするにはEM::Channelを使うよ

subscribeでそのクライアントに対する処理を登録して

pushで呼び出せばいいんだ


gem install eventmachineして

telnetから試してみてね

WebSocket

ここまでくるとWebサーバー上でも

この並列化技術を使いたいと考えるのが人情だよね

そう

これこそがWebSocketなんだよ


そしてうれしいことにEventMachineには

そのためのプラグインem-websocketがあるんだ

gem install em-websocketして使うよ

サーバー側のコードは次のような感じだよ

require 'em-websocket'

EM.run {
  @channel = EM::Channel.new

  EM::WebSocket.start(:host => "localhost", :port => 60000) do |ws|
    sid = nil
    ws.onopen { sid = @channel.subscribe { |msg| ws.send msg } }
    ws.onmessage { |msg| @channel.push "#{sid}: #{msg}" }
    ws.onclose { @channel.unsubscribe(sid) }
  end
}

EM::WebSocket.startでサーバーインスタンスを立ち上げて

クライアントからの接続を待ち受けるよ

クライアントからの接続があるとonopenが呼ばれるから

ここでchannelに

メッセージをブロードキャストする処理を登録するよ

クライアントがテキストを送信するとonmessageが呼ばれるから

それをchannelにpushして登録した処理を呼ぶよ


次にクライアントサイドのコードだよ

<html>
  <head>
    <script src='http://ajax.googleapis.com/ajax/libs/jquery/1.3.2/jquery.min.js'></script>
    <script>
      $(document).ready(function(){
        function debug (str) { $("#debug").append("<p>"+str+"</p>") };
        
        ws = new WebSocket("ws://localhost:60000/");
        ws.onopen = function() { debug("Welcome to Chattata!") };
        ws.onmessage = function(evt) { $("#msglist").append("<p>"+evt.data+"</p>") };
        ws.onclose = function() { debug("socket closed") };

        $("form").submit(function(){
          var msg = $("input#msg");
          ws.send(msg.val());
          msg.val('');
          return false;
        });
      });
    </script>
  </head>
  <body>
    <div id="debug"></div>
    <form>
      <input id="msg" type="text"></input>
    </form>
    <div id="msglist"></div>
  </body>
</html>

クライアント側では

サーバーに接続するソケットをインスタンス化するよ

ユーザがテキストを送信すると$("form").submitが呼ばれ

その内容はws.sendでソケットに送り出されるよ

これによってサーバー側のchannelに登録された処理が呼ばれ

接続されているクライアントに

テキストがブロードキャストされるよ

クライアント側ではこれをonmessageで受けて

テキストをwindow上に表示するよ


じゃあ試してみるよ

f:id:keyesberry:20110929223805p:image


良い感じだね!

WebSocketはHTML5の新しい規格だから

対応ブラウザか確認してね


ちょっと長い投稿になっちゃったけど

最後まで付き合ってくれてありがとう

*1:モジュールをインスタンス化って変だよね..

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/keyesberry/20110929/p1