Tuesday, December 27, 2011

Programmatic translation to iteratees from pull-based code

When working with iteratees, one notices a sort of dual between producer and consumer--for any given stream processing task (which pairs a producer of values with some consumer of these values), operations can often be expressed on either the producer side or the consumer side. For instance, if we want to square each value in a stream of integers, we can map over the stream (the producer), or we can map (contravariantly) over the values as they enter the stream's consumer (the iteratee). Here is an intriguing idea: what if we could programmatically translate stream processing tasks from one mode to the other? That is, write pull-based code, but translate to push-based for purposes of execution. It turns out this is actually possible and I'll show code in this post for doing the conversion.

Why is this desirable? Push-based APIs like iteratees are generally preferable for execution--data is produced and sent to its consumers, then any resources associated with that data can be freed. This model is simple and easy to reason about in terms of memory and resource usage. Pull-based execution is more problematic: resource lifetimes (and therefore space usage) are not deterministic since at any point in time, a function may choose to request "historical" data from the producer. The consequences of this have plagued some FRP implementations (though there are ad hoc workarounds that involve limiting access to history in various ways or only exposing certain "safe" combinators rather than the full pull-based API that forms the underlying model) and led many to reject lazy IO in favor of iteratees.

While iteratees address these problems for streaming IO, programming with them directly requires inverting your thinking, which is arguably somewhat unnatural. Likewise for other push-oriented APIs like arrows, which have used as the basis for FRP implementations that aren't as susceptible to space leaks. Honestly, I'm not sure what I think about the claims that push-based APIs are less "natural". What's most important is that whatever API you use composes, avoids code duplication, and gives you some reasoning tools. With those ingredients, I suspect you can get comfortable with any API. (I remember when I first started writing iteratee code, which we have a lot of at work, my brain nearly exploded when writing even the simplest of iteratees and HOFs for composing them... but after some practice it came much more naturally.)

Moving on, the key to converting from pull-based code to push-based code is to represent streams of data as a ListT, universally quantified over the underlying monad. I'll explain this in a minute. First, a ListT is just a list that yields a monadic effect with each uncons. Here is a ListT implementation:

-- Skip nodes let us do filtering without lookahead
-- uncons is then a simple loop that removes any Skip nodes
data Step a s = Empty | Yield a s | Skip s

newtype ListT f a = 
  ListT { runListT :: f (Step a (ListT f a)) }

instance Monad f => Monad (ListT f) where
  return a = ListT (return (Yield a empty))

  (ListT s) >>= f = ListT $
    s >>= \step -> case step of  
      Yield h t -> return $ 
        (Skip $ f h `mplus` (t >>= f))
      Skip t -> return $ Skip (t >>= f)
      Empty -> return Empty

instance (Monad f) => MonadPlus (ListT f) where
  mzero = ListT (return Empty)

  (ListT s) `mplus` b = ListT $ 
    s >>= \step -> case step of  
      Yield h t -> return $ Yield h (t `mplus` b)
      Skip t -> return $ Skip (t `mplus` b)
      Empty -> return $ Skip b

instance MonadTrans ListT where
  lift fa = ListT $ liftM (\a -> Yield a empty) fa

empty :: Monad f => ListT f a
empty = ListT (return Empty)

cons :: Monad f => a -> ListT f a -> ListT f a
cons h t = ListT . return $ Yield h t

Now, consider a function of type forall f . Monad f => ListT f a -> ListT f a. Parametricity guarantees the function cannot assume anything about f other than that it's a monad. Substitute the identity monad for f and we can see how any function [a] -> [b] could be written as a forall f . Monad f => ListT f a -> ListT f b. In other words, this API is just as expressive as the usual lazy IO style pull-based API.

The trick is that the caller of such a function can substitute another monad in for f, namely the reader monad. This lets us push values into the ListT.

data Input a = Done | Await | Element a 

prompts :: ListT (Reader (Input a)) a 
prompts = ListT . reader $ \input -> case input of
  Done -> Empty
  Await -> Skip prompts
  Element a -> Yield a prompts

prompts is a potentially infinite stream of a, but with each step, we obtain a function from Input a -> ListT (Reader (Input a). On the one hand, we can feed this to a forall f . Monad f => ListT f a -> ListT f b and from its perspective it is pulling values from the stream. On the other hand, at each step we get to push a value into the stream (or signal termination, or whatever "instruction set" we wish to support).

We need one more ingredient, a slightly-reformulated version of iteratees. A well-behaved iteratee will always yield a result (rather than a Cont) when fed EOF. We can make this more explicit in the type:

type IsEOF = Bool

data Moore a b = 
    Feed b (Input a -> Moore a b) 
  | Stop b IsEOF

I've renamed this Moore since it is essentially just a Moore machine supporting early termination (the Stop case just encodes the fact that the state transition function always leads back to the same state from that point). With iteratees, it is really just a convention that intermediate b values are not inspected (and hence not computed in a lazy language) until either EOF is hit or the iteratee signals termination early.

We can now use this to perform the inversion of control programmatically. Note the signature for invert!

isDone :: Input a -> Bool
isDone Done = True 
isDone _ = False

invert :: (forall f . Monad f => ListT f a -> ListT f b) 
       -> Moore a (Maybe b)
invert f = go Nothing (f prompts) where
  go cur res = Feed cur $ \a -> 
    case runReader (runListT res) a of
      Empty -> Stop cur (isDone a)
      Yield h t -> go (Just h) t
      Skip s -> go cur s

We can write very similar code for monadic iteratees:

data MooreT f a b = 
    FeedT b (Input a -> f (MooreT f a b)) 
  | StopT b IsEOF

invertT :: Monad f 
        => (forall t . (MonadTrans t, Monad m)
          => ListT (t f) a -> ListT (t f) b) 
        -> MooreT f a (Maybe b) 
invertT = ...

Here is the full code. I haven't played with it much, but isn't it fascinating that this translation is possible? (I suspect there is some categorical connection here though I'm not quite sure what it is yet) It means we can write code using a pull-based API, where functions have access to the "full history" of the stream they are transforming, but translate to a push-based API for purposes of execution. The translation will discover and retain the exact portion of the history required to express the transformation, and it will retain this history for only as long as needed. I find it interesting to think about how different operations on ListT will get mapped to the equivalent Moore machine. For instance, a function that conses onto a ListT results in a Moore that "delays" the input stream by one step. And so on...

This trick of writing code which is parametric in the choice of monad is not really specific to stream processing and I suspect it has other uses. (Ed Kmett uses a similar technique in his recent searching infinity post and it's probably been used elsewhere) For any given function parametric in the choice of monad, it's fun to consider what sort of interesting structures can be built by substituting different monads.

8 comments:

Peter Davis said...

I love Haskell, but isn't this a ridiculously complicated way to write a for-loop? What practical advantage exists here vs. C#'s "yield"?

Mitch Skinner said...

I've manually converted between pull-based and push-based before, and I'm intimidated by the idea of writing iteratees, so it's pretty cool to see an automated translation.

Paul Chiusano said...

Note - I made a slight update to the signature for invertT. Ed Kmett pointed out that the implementor of that function will want to have access to a Monad for (t f), otherwise it can't call many ListT functions at all!

@Peter - You can write regular imperative code for doing IO in Haskell too. Just stick everything in the IO monad and pretend it's C#. Haskell programmers have been doing this for years, but it isn't very composable, resulting in a lot of code duplication and motivating the search for something better. Lazy IO composes nicely in the sense that you can use all your usual list processing functions, but has the problems I alluded to (Oleg has said "Lazy IO in serious, server-side programming is unprofessional"). This motivated Oleg to invent iteratees, which compose well but invert control.

As for how this is an improvement over C#'s yield, well, yield is used for creating iterators, which are inherently stateful. And furthermore, if you are using yield to perform IO, this is the moral equivalent of lazy IO in C#, subject to the same problems that many others have discussed.

jlh276 said...

Is scalaz StreamT the equivalent of the ListT in your example?

Paul Chiusano said...

@jlh276 - Yes, that's right.

Carlton Mills said...

(corrected) Write in the mode that expresses your algorithm most effectively. Let the compile sort it out; if it can't - then fix the compiler.

Yair said...

Btw such a ListT is available in hackage in the List package.

Nils said...

Very interesting stuff. It just left me wondering how this works, so I made up a small example to see what is going on:

-- First on lists and then defined for the ListT
streamer init xs = x:x:x:(process xs)
where x = [init]
process (x:xs) = if mod x 2 == 0
then [x, xs!!1] : process xs
else [x, xs!!0, xs!!2] : process xs

exL = take 10 $ streamer 0 [1,2 ..]

-- Same thing on ListT streams ... but first we need !! operator
nthT 0 (ListT m) = m >>= \step -> case step of
Yield h t -> return h
Skip s -> nthT 0 s
nthT (n+1) (ListT m) = m >>= \step -> case step of
Yield _ t -> nthT n t
Skip s -> nthT (n+1) s

streamerT init xs = cons x $ cons x $ cons x $ processT xs
where x = [init]
processT (ListT m) = ListT $ m >>= \step -> case step of
Yield x t -> if mod x 2 == 0
then (nthT 1 t) >>= \y -> return $ Yield [x,y] (processT t)
else (nthT 0 t) >>= \y ->
(nthT 2 t) >>= \z -> return $ Yield [x,y,z] (processT t)
Skip s -> return $ Skip $ processT s

iterT f x0 = ListT . return $ Yield x0 (iterT f (f x0))

uncons (ListT s) = do step <- s
case step of
Yield h t -> do res <- uncons t
return $ (h:res)
Skip t -> uncons t
Empty -> return []

exT = let s = streamerT 0 $ iterT (+1) 1
in take 10 $ runIdentity (uncons s)

exTR = let s = streamerT 0 $ iterT (+1) 1
in take 10 $ runReader (uncons s) $ Element 255

-- now turn this stream transformer into a Moore automaton

runMoore auto [] = case auto of
Stop o _ -> []
Feed o inp -> o : runMoore (inp Done) []
runMoore auto (x:xs) = case auto of
Stop o _ -> [o]
Feed o inp -> o : (runMoore (inp xi) xs)
where xi = case x of
Just x' -> Element x'
Nothing -> Await

moore = invert (streamerT 0)

exM = take 10 $ runMoore moore $ map Just [1,2 ..]

Running this on Hugs gives:

exL = [[0],[0],[0],[1,2,4],[2,4],[3,4,6],[4,6],[5,6,8],[6,8],[7,8,10]]

and the same for exT and exTR.
But the Moore automaton outputs:
exM = [Nothing,Just [0],Just [0],Just [0],Just [4,4],Just [5,5,5],Just [6,6],Just [7,7,7],Just [8,8],Just [9,9,9]]

Looks like dependencies on values at different positions in the stream are not tracked properly or am I missing something here?