サンタとConduit (Haskell Advent Calendar 23日目)
Tag: Haskell※これはHaskell Advent Calendar 2012の23日目の記事です。
もうすぐクリスマスです。サンタクロースの皆さんはそろそろ子供達にプレゼントを配る用意をしなければいけません。ところで、最近のサンタクロースはIT化が進み、どうやらHaskellでプレゼントを配るシステムを使っているそうです。新米サンタさんはクリスマスまでの残り2日で、このシステム通りにプレゼントを配る方法を覚えなければいけません。新米サンタさんはどうやらHaskellの基本的な使い方は覚えたようですが、このプレゼント配送システムに"Conduit"という聞きなれない物が使われているので、うまく扱えるか心配です。
Conduitは主にストリーム処理用に作られたライブラリで、Source, Sink, そしてConduitをうまく繋いで処理を行います。Sourceは上流からストリームを流し、Sinkは最終的に流れてきたストリームを消費して何か処理をします。ConduitはSourceとSinkの間に置き、Sourceから流れてきたストリームに何らかの処理を施し、それをSinkに向けて再び流します。
では、例のプレゼント配送システムを見てみましょう。
christmas :: Monad m => m ()
christmas = sourceRequest
$$ santaClaus
=$ sinkChildren
sourceRequestは何だかよく分からないけど子供達からのプレゼントのリクエストが流れてくる物で、sinkChildrenは何だかよく分からないけど渡したプレゼントが子供達に届く物です。Sourceの中身もSinkの中身もよくわからないのに間のConduitを書けるのも、Conduitの利点の1つです。
上のソースをもう少し見てみましょう。christmasではSourceであるsourceRequest、SinkであるsinkChildren、ConduitであるsantaClausの他に、何やら見慣れない$$とか=$とかいう演算子が混ざっています。
この2つの演算子のうち、$$
演算子のほうはSourceとSinkを繋いで処理を実行します。
ghci> runResourceT $ sourceFile "input.txt" $$ sinkFile "output.txt"
この例の場合、sourceFileはinput.txtの中身を読み取ってSinkに送り、sinkFileではSourceから受け取った内容をoutput.txtに書き込みます。
もう1つの=$
演算子はConduitとSinkを繋ぎ、1つのSinkに合成します。例えば、上の例でinput.txtの各行の先頭に"hoge"と付け足したい場合は、
ghci> runResourceT $ sourceFile "input.txt" -- input.txtを…
$$ Data.Conduit.Binary.lines -- 行ごとに分けて…
=$ Data.Conduit.List.map -- 各行の先頭に"hoge"を足して…
(\s -> "hoge" `append` s `append` "\n")
=$ sinkFile "output.txt" -- output.txtに出力
というようにします。この場合、4行目のSinkと3行目のConduitが合成されて1つのSinkになり、さらにそれと2行目のConduitが合成されて1つのSinkになり、そのSinkと1行目のSourceが合成され、runResourceTによって実行されます。この他にも、SourceとConduitを合成して新たなSourceを作る$=
演算子や、ConduitとConduitを合成する=$=
演算子などもあります。
先程の例をもう一度見てみましょう。
christmas :: Monad m => m ()
christmas = sourceRequest
$$ santaClaus
=$ sinkChildren
この例では、sinkChildrenとsantaClausが合成されて1つのSinkとなり、それがSourceであるsourceRequestと合成されています。
では、間にいるサンタは一体何をやればよいでしょう?
santaClaus = do
...
まずはSourceとSinkの型を見てみましょう。
ghci> :t sourceRequest
sourceRequest :: Monad m => Source m Request
ghci> :t sinkChildren
sinkChildren :: Monad m => Sink Present m ()
soureRequestやsinkChildrenがどのような実装になっているかは分かりませんが、とりあえずコードを動かすために、以下のように暫定的な定義をしておきましょう。実際のSourceやSinkと違っても、上流から受けとるデータの型と下流へ流すデータの型さえ合っていれば、ConduitはどのようなSourceやSinkにも使うことができます。
import Data.Conduit
import qualified Data.Conduit.List as CL
data Request = Request {
reqPresent :: String
, reqFrom :: String
} deriving (Show, Eq)
data Present = Present {
presentName :: String
} deriving (Show, Eq)
sourceRequest :: Monad m => Source m Request
sourceRequest = CL.sourceList [
...
]
sinkChildren :: Monad m => Sink Present m ()
sinkChildren = CL.consume >> return ()
先程の型情報によると、どうやら渡されたRequestに対応するPresentをsinkChildrenに送ってやればいいみたいです。これでsantaClausの型は決まりました。
santaClaus :: Monad m => Conduit Request m Present
santaClaus = do
...
とにかくプレゼントのリクエストを見ないと何も始まらないので、まずは上流のSourceから1つリクエストを受け取ってみましょう。
mreq <- await
awaitはSourceからデータを1つ受け取って返す関数です。Sourceから受け取ることのできるデータが1つも無い場合はNothingを返します。Sourceから渡されたのは以下のようなプレゼントのリクエストでした。
Just (Request {
reqFrom = "葉月"
, reqPresent = "絵本"
})
葉月ちゃんという子から、絵本のリクエストが来ました。早速プレゼントを用意しましょう。
yield $ Present "絵本"
yieldは引数に取った値を下流のSinkやらConduitに渡す関数です。これで渡したプレゼントはsinkChildrenが25日に子供に届けてくれるでしょう。ともかく、これでまず一人分のプレゼントの用意が終わりました。同じようにもう1度やってみましょう。
mreq <- await -- => Just (Request { reqFrom = "恋", reqPresent = "牛乳" })
yield $ Present "牛乳"
今度は恋(れん)ちゃんという子のために牛乳を用意してあげました。この調子で全員分のプレゼントを用意しましょう。実は、このようなパターンはawaitForever関数を使って楽に書くことができます。
santaClaus = awaitForever $ \req -> yield $ reqPresent req
awaitForeverは、「上流から流れてくるデータを1つ受け取り、引数の関数の引数に渡して実行する」という動作を上流からデータが流れてこなくなるまで実行し続けます。今回のように上から流れてくるデータを1つずつ変換して下に流すだけのコードを書く時には便利です。
おっと、まだ2人分のプレゼントしか用意し終えていないのに、サンタさんは疲れて休憩に入ってしまいました。
return ()
クリスマスまであと2日しかありません。世界中の子供達の分のプレゼントを用意しなければならないので、こんな所で休んでいる場合ではありません。
Conduitでは、Source等が簡単に自分が生産するデータが下流で使い果たされたかどうかを知ることができます。Sourceがまだデータを生産できる状態で下流の動作が終了してしまった場合、Sourceでは終了処理が呼ばれます。Sourceに終了処理を追加する一番簡単な方法はaddCleanup
関数を使うことです。試しに、以下のような2人のサンタクロースを例にこの関数を使ってみましょう。
lazySanta, diligentSanta :: Monad m => Conduit Request m Present
-- 1つ処理して終了("遅延評価"という意味のlazyではない)
lazySanta = do
mreq <- await
case mreq of
Nothing -> return ()
Just req -> do
yield $ reqPresent req
-- 全部処理する
diligentSanta = awaitForever $ \req -> yield $ reqPresent req
addCleanup
を用いて終了処理に追加するのは以下のような関数です。
check :: Bool -> IO ()
check finished = putStrLn $ if finished
then "お疲れ様!"
else "まだ残ってるよ!!!"
addCleanup
により追加される終了処理の関数に渡される引数は、上流がもうストリームに流せるデータを持っていない場合はTrue、そうでない場合はFalseとなります。これを使って挙動を確かめてみましょう。
ghci> addCleanup check sourceRequest $$ diligentSanta =$ sinkChildren
お疲れ様!
ghci> addCleanup check sourceRequest $$ lazySanta =$ sinkChildren
まだ残ってるよ!!!
しかし、Conduitの終了処理には1つ罠があります。以下のようなサンタで同じ事をやってみましょう。
-- 何もしない
laziestSanta :: Monad m => Conduit Request m Present
laziestSanta = return ()
結果は以下のようになります。
ghci> addCleanup check sourceRequest $$ laziestSanta =$ sinkChildren
ghci>
何も表示されませんでした! 一体どういうことでしょう!?
実は、この原因はSource, Conduit, Sinkの実行の順番にあります。
Conduitは、まずストリームの最下流を実行し、最下流のSinkが何かデータを要求することで初めて上流の実行をします。そのため、SinkがSourceに対して1つのデータも要求しなかった場合、Sinkはそもそも開始すらされないので、終了処理が呼ばれる事もありません。
また、サンタがストリームの処理をやめてしまった場合、別のサンタに処理の続きを引き継いでもらうこともできます。
ghci> (next, _) <- addCleanup check sourceRequest $$+ santa1 =$ sinkChildren
ghci> (next', _) <- next $$++ santa2 =$ sinkChildren
ghci> next' $$+- santa3 =$ sinkChildren
お疲れ様!
$$+
やら$$++
やら$$+-
やら、続々と見慣れない演算子が登場してきました。しかし心配はいりません。1つずつ機能を見ていきましょう。$$+
演算子はSourceとConduitを結合し、実行結果とSourceの続きを返します。
($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b)
返ってきたSourceの続きはResumableSourceという、Sourceとは別な型になっています。これをSinkと結合するには、今までの$$
演算子ではなく$$++
演算子か$$+-
演算子を使います。$$++
演算子はResumableSourceとSinkを結合し、実行結果と残りのResumableSourceを返すのに対し、$$+-
演算子は実行結果のみを返し、ResumableSourceの終了動作を行います。
これだけのことを覚えれば、あとは基本的なConduitなら使いこなせるはずです。サンタさんもこれで安心してプレゼントを配達することができます。