Archive

Posts Tagged ‘mobilechannels’

The Observer Pattern using a Message-Passing Process

November 10, 2009 3 comments

The observer code pattern refers to many observers being notified of a change in state of a particular variable, typically by use of callbacks or similar. For example, many objects in a GUI might register as observers of the colour scheme setting, and would then be notified by a callback if the user changed their preferred colours, so that they could then redraw themselves in the new colour. Recently, an implementation of the observer pattern was discussed on haskell-cafe. A lot of the OO implementations (and the initial haskell-cafe offering) are sequential, meaning that the observers are notified in a sequential fashion (and typically execute their code in the thread that does the notification), which goes against the concurrent approach of this blog.

The observer pattern is very easy to capture in CHP. There are actually several implementations — I will discuss the slightly lengthier process-based approach in this post, and show a simpler version next time. Our observer pattern here is implemented in a process. The process has two channels: one to receive new values when the state changes, and one to receive new observer actions (to be run when the state changes). For simplicity, I am not concerned with removing observers in this version.

For our observer actions, we can use the SendAction type from the Control.Concurrent.CHP.Actions module in the CHP library. Something of type SendAction a is like having type a -> CHP () (i.e. it performs a CHP action when given something of type a), but it also contains information about how to poison the action. So the outside of our observer process looks like this:

observer :: forall a. Chanin a -> Chanin (SendAction a) -> CHP ()
observer newValIn newObsIn = observer' []
  where
    observer' :: [SendAction a] -> CHP ()

The first channel takes in the new values, the second channel takes in the new observers. Note that we have no outgoing channels in this process initially — they will be passed in by sending SendActions. This process delegates to an inner process (a common idiom in CHP) that takes the current list of actions as its argument:

    observer' acts
      = ((newVal <-> newObs)
           `onPoisonRethrow` (do poison newValIn
                                 poison newObsIn
                                 poisonAll acts)
        ) >>= observer'

The inner process offers a choice between receiving a new value (newVal), or receiving a new observer (newObs). If poison occurs, the two input channels are poisoned and all the observers are poisoned. newVal and newObs both return the new list of observers, which are then fed into a recursive call. They are defined as follows:

      where
        newVal = do x <- readChannel newValIn
                    runParallel_ [sendAction a x | a <- acts]
                    return acts

        newObs = do a <- readChannel newObsIn
                    return (a : acts) 

newVal reads in a new value, and in parallel it runs the current list of observer actions (which are then returned unaltered). newObs reads in a new observer and adds it to the list (the order of the list doesn’t matter).

That’s all we need for our observer. Typically the observer actions are channel-writes, so that the information is passed out of the observer process to whatever other process we want, which then executes the real response to the state change. Here is some example code for using the observer:

useObserver :: Chanout String -> Chanout (SendAction String) -> CHP ()
useObserver newValOut newObsOut
  = do -- Send a new value when no observers are registered:
       writeChannel newValOut "This will be ignored"

       -- Create a channel to get the value back on, and register it:
       obs1 <- oneToOneChannel
       writeChannel newObsOut (makeSendAction (writer obs1))

       -- Send in an update, and read the value back from our channel:
       writeChannel newValOut "First update"
       readChannel (reader obs1) >>= liftIO . putStrLn

       -- Add a second observer:
       obs2 <- oneToOneChannel
       writeChannel newObsOut (makeSendAction (writer obs2))

       -- Now send a few more updates:
       writeChannel newValOut "Second update; should be printed twice"
       readChannel (reader obs1) >>= liftIO . putStrLn
       readChannel (reader obs2) >>= liftIO . putStrLn
       writeChannel newValOut "Third update; should be printed twice"
       readChannel (reader obs2) >>= liftIO . putStrLn
       readChannel (reader obs1) >>= liftIO . putStrLn

       -- Finally, shut the network down:
       poison newValOut >> poison newObsOut

main :: IO ()
main = runCHP_ $ do
  c <- oneToOneChannel
  d <- oneToOneChannel
  useObserver (writer c) (writer d)
    <||> observer (reader c) (reader d)

Our example code is quite artificial — the observer actions will almost certainly come from different processes running in parallel (and thus the channel reads will be done in parallel), but I didn’t want to make the code too large and complex. Note that we can read the values from the channels in either order, because the sends are done in parallel. The observer does wait for all the sends to complete before it will offer to receive a new value; we can fix this using forking, but that can again wait for another post.

One final note: constructing this example revealed a bug in CHP 1.5.0; runParallel and runParallel_ were deadlocking when passed an empty list of processes (rather than instantly returning an empty list of results). This is fixed in CHP 1.5.1, which I have just uploaded to Hackage.