Archive

Posts Tagged ‘observerpattern’

The Observer Pattern using a Broadcast Channel

November 12, 2009 Leave a comment

In the last post, I explained how to implement the observer pattern using an active process. The process approach is fairly flexible, but there is an even simpler way to implement the observer pattern in CHP: use a broadcast channel.

A broadcast channel has one writer, but potentially many readers. When the writer sends one piece of data, all the readers receive that same data; we call this one-to-many. Readers may join and leave the channel dynamically — we refer to this as enrolling on the channel, and resigning from the channel respectively. The broadcast channel fits the observer pattern perfectly: the process that changes the value is the writer, sending out updates. The observers enroll on the channel and subsequently listen for updates on the channel. They can easily resign from the channel at any time when they want to stop receiving updates — unlike our process-oriented approach, where I decided to omit stopping observation for simplicity.

To demonstrate the use of a broadcast channel, I’ve put together a little example. It deals with one observer joining and leaving the channel. You control it from the console; pressing space toggles between observing and not observing. Meanwhile (in parallel), a writer will send updates every 2 seconds. Here’s the first bit of the code:

{-# LANGUAGE RecordPuns #-}
import Control.Concurrent.CHP
import Control.Concurrent.CHP.Console

consoleMain :: ConsoleChans -> CHP ()
consoleMain (ConsoleChans {cStdin, cStdout})
  = do c <- oneToManyChannel
       waitSend (writer c) 0 <|*|> nonObserver (reader c)
  where
    waitSend :: BroadcastChanout Int -> Int -> CHP ()
    waitSend out n = do writeChannel out n
                        waitFor 2000000 -- 2 seconds
                        waitSend out (n+1)

This creates the broadcast channel (using oneToManyChannel) and runs in parallel the writer and the non-observer process (the <|*|> is parallel composition that discards the results). The nonObserver process is one that is not currently observing — there is also an observer process that does observe the changes. Here is the nonObserver:

    nonObserver :: BroadcastChanin Int -> CHP ()
    nonObserver obsIn
      = do c <- readChannel cStdin
           case c of
             ' ' -> do mapM_ (writeChannel cStdout) "Beginning observation\n"
                       enroll obsIn observer
             _ -> return ()
           nonObserver obsIn

This waits for input from stdin (i.e., you). When you press space, it sends the message about beginning observation. It then calls enroll: this function enrolls on its first argument, and passes the enrolled item to the second argument (the inner process). It stays enrolled for the duration of the inner process, and automatically resigns when the inner process is finished. If you pressed another key, or once the observer has finished, it recurses. We will now look at the observer process that takes the enrolled version of the BroadcastChanin as its argument:

    observer :: Enrolled BroadcastChanin Int -> CHP ()
    observer obsIn
      = (do x <- readChannel obsIn
            mapM_ (writeChannel cStdout) $ "Observed: " ++ show x ++ "\n"
            observer obsIn
        ) <-> (do c <- readChannel cStdin
                  case c of
                    ' ' -> do mapM_ (writeChannel cStdout) "Ceasing observation\n"
                              return () -- Stop being an observer
                    _ -> observer obsIn
              )

This process offers a choice. It offers to read from the observer input channel, in which case it prints out the observed value and recurses. It also offers to read in a key from stdin (that’s you hitting space again), in which case it prints out that it is ceasing observation, and returns (which falls out of the enroll call in nonObserver and thus resigns from the channel). If you press a key other than space, it continues to observe.

Note that the type of the enrolled channel-end (Enrolled BroadcastChanin Int) is different to the non-enrolled end (BroadcastChanin Int). If you try to read from the non-enrolled version, you will get a compile-time error. This uses Haskell’s type system to prevent any confusion as to whether you are currently enrolled, and we use the natural scoping of the process passed to enroll to prevent forgetting to resign from the channel (when the process finishes, the resignation happens automatically).

When you first run this program, nothing happens. The writer is generating values but no-one is observing. When you press space for the first time, the observer starts watching (i.e. it enrolls on the broadcast channel) and printing out the values. You’ll see a stream of new values being printed. If you hit space again, the observer stops watching (it resigns from the broadcast channel), and the values are discarded. Hit space and you are back to observing. You can do this as you like — ceasing and beginning observation again in quick succession between two updates (thus not missing one), or ceasing observation for a long time. The sequence number helps show how many updates you have missed.

This example demonstrates some concurrency between the process changing the value, and the observers. I find this version much simpler than the process approach, although it does have the drawback that all the observers must agree to receive from the modifying process before the modifying process is free to proceed.

(Note: you will need to compile the above program with -threaded to get the proper concurrency, and you will also need the imminent new CHP 1.6.0 release; writing this blog post prompted me to tighten up the semantics of broadcast channels with no readers enrolled.) Also, there is now a blog post describing the Haskell observer library that prompted my posts.

The Observer Pattern using a Message-Passing Process

November 10, 2009 3 comments

The observer code pattern refers to many observers being notified of a change in state of a particular variable, typically by use of callbacks or similar. For example, many objects in a GUI might register as observers of the colour scheme setting, and would then be notified by a callback if the user changed their preferred colours, so that they could then redraw themselves in the new colour. Recently, an implementation of the observer pattern was discussed on haskell-cafe. A lot of the OO implementations (and the initial haskell-cafe offering) are sequential, meaning that the observers are notified in a sequential fashion (and typically execute their code in the thread that does the notification), which goes against the concurrent approach of this blog.

The observer pattern is very easy to capture in CHP. There are actually several implementations — I will discuss the slightly lengthier process-based approach in this post, and show a simpler version next time. Our observer pattern here is implemented in a process. The process has two channels: one to receive new values when the state changes, and one to receive new observer actions (to be run when the state changes). For simplicity, I am not concerned with removing observers in this version.

For our observer actions, we can use the SendAction type from the Control.Concurrent.CHP.Actions module in the CHP library. Something of type SendAction a is like having type a -> CHP () (i.e. it performs a CHP action when given something of type a), but it also contains information about how to poison the action. So the outside of our observer process looks like this:

observer :: forall a. Chanin a -> Chanin (SendAction a) -> CHP ()
observer newValIn newObsIn = observer' []
  where
    observer' :: [SendAction a] -> CHP ()

The first channel takes in the new values, the second channel takes in the new observers. Note that we have no outgoing channels in this process initially — they will be passed in by sending SendActions. This process delegates to an inner process (a common idiom in CHP) that takes the current list of actions as its argument:

    observer' acts
      = ((newVal <-> newObs)
           `onPoisonRethrow` (do poison newValIn
                                 poison newObsIn
                                 poisonAll acts)
        ) >>= observer'

The inner process offers a choice between receiving a new value (newVal), or receiving a new observer (newObs). If poison occurs, the two input channels are poisoned and all the observers are poisoned. newVal and newObs both return the new list of observers, which are then fed into a recursive call. They are defined as follows:

      where
        newVal = do x <- readChannel newValIn
                    runParallel_ [sendAction a x | a <- acts]
                    return acts

        newObs = do a <- readChannel newObsIn
                    return (a : acts) 

newVal reads in a new value, and in parallel it runs the current list of observer actions (which are then returned unaltered). newObs reads in a new observer and adds it to the list (the order of the list doesn’t matter).

That’s all we need for our observer. Typically the observer actions are channel-writes, so that the information is passed out of the observer process to whatever other process we want, which then executes the real response to the state change. Here is some example code for using the observer:

useObserver :: Chanout String -> Chanout (SendAction String) -> CHP ()
useObserver newValOut newObsOut
  = do -- Send a new value when no observers are registered:
       writeChannel newValOut "This will be ignored"

       -- Create a channel to get the value back on, and register it:
       obs1 <- oneToOneChannel
       writeChannel newObsOut (makeSendAction (writer obs1))

       -- Send in an update, and read the value back from our channel:
       writeChannel newValOut "First update"
       readChannel (reader obs1) >>= liftIO . putStrLn

       -- Add a second observer:
       obs2 <- oneToOneChannel
       writeChannel newObsOut (makeSendAction (writer obs2))

       -- Now send a few more updates:
       writeChannel newValOut "Second update; should be printed twice"
       readChannel (reader obs1) >>= liftIO . putStrLn
       readChannel (reader obs2) >>= liftIO . putStrLn
       writeChannel newValOut "Third update; should be printed twice"
       readChannel (reader obs2) >>= liftIO . putStrLn
       readChannel (reader obs1) >>= liftIO . putStrLn

       -- Finally, shut the network down:
       poison newValOut >> poison newObsOut

main :: IO ()
main = runCHP_ $ do
  c <- oneToOneChannel
  d <- oneToOneChannel
  useObserver (writer c) (writer d)
    <||> observer (reader c) (reader d)

Our example code is quite artificial — the observer actions will almost certainly come from different processes running in parallel (and thus the channel reads will be done in parallel), but I didn’t want to make the code too large and complex. Note that we can read the values from the channels in either order, because the sends are done in parallel. The observer does wait for all the sends to complete before it will offer to receive a new value; we can fix this using forking, but that can again wait for another post.

One final note: constructing this example revealed a bug in CHP 1.5.0; runParallel and runParallel_ were deadlocking when passed an empty list of processes (rather than instantly returning an empty list of results). This is fixed in CHP 1.5.1, which I have just uploaded to Hackage.

Follow

Get every new post delivered to your Inbox.