Archive

Posts Tagged ‘broadcastchannels’

The Observer Pattern using a Broadcast Channel

November 12, 2009 Leave a comment

In the last post, I explained how to implement the observer pattern using an active process. The process approach is fairly flexible, but there is an even simpler way to implement the observer pattern in CHP: use a broadcast channel.

A broadcast channel has one writer, but potentially many readers. When the writer sends one piece of data, all the readers receive that same data; we call this one-to-many. Readers may join and leave the channel dynamically — we refer to this as enrolling on the channel, and resigning from the channel respectively. The broadcast channel fits the observer pattern perfectly: the process that changes the value is the writer, sending out updates. The observers enroll on the channel and subsequently listen for updates on the channel. They can easily resign from the channel at any time when they want to stop receiving updates — unlike our process-oriented approach, where I decided to omit stopping observation for simplicity.

To demonstrate the use of a broadcast channel, I’ve put together a little example. It deals with one observer joining and leaving the channel. You control it from the console; pressing space toggles between observing and not observing. Meanwhile (in parallel), a writer will send updates every 2 seconds. Here’s the first bit of the code:

{-# LANGUAGE RecordPuns #-}
import Control.Concurrent.CHP
import Control.Concurrent.CHP.Console

consoleMain :: ConsoleChans -> CHP ()
consoleMain (ConsoleChans {cStdin, cStdout})
  = do c <- oneToManyChannel
       waitSend (writer c) 0 <|*|> nonObserver (reader c)
  where
    waitSend :: BroadcastChanout Int -> Int -> CHP ()
    waitSend out n = do writeChannel out n
                        waitFor 2000000 -- 2 seconds
                        waitSend out (n+1)

This creates the broadcast channel (using oneToManyChannel) and runs in parallel the writer and the non-observer process (the <|*|> is parallel composition that discards the results). The nonObserver process is one that is not currently observing — there is also an observer process that does observe the changes. Here is the nonObserver:

    nonObserver :: BroadcastChanin Int -> CHP ()
    nonObserver obsIn
      = do c <- readChannel cStdin
           case c of
             ' ' -> do mapM_ (writeChannel cStdout) "Beginning observation\n"
                       enroll obsIn observer
             _ -> return ()
           nonObserver obsIn

This waits for input from stdin (i.e., you). When you press space, it sends the message about beginning observation. It then calls enroll: this function enrolls on its first argument, and passes the enrolled item to the second argument (the inner process). It stays enrolled for the duration of the inner process, and automatically resigns when the inner process is finished. If you pressed another key, or once the observer has finished, it recurses. We will now look at the observer process that takes the enrolled version of the BroadcastChanin as its argument:

    observer :: Enrolled BroadcastChanin Int -> CHP ()
    observer obsIn
      = (do x <- readChannel obsIn
            mapM_ (writeChannel cStdout) $ "Observed: " ++ show x ++ "\n"
            observer obsIn
        ) <-> (do c <- readChannel cStdin
                  case c of
                    ' ' -> do mapM_ (writeChannel cStdout) "Ceasing observation\n"
                              return () -- Stop being an observer
                    _ -> observer obsIn
              )

This process offers a choice. It offers to read from the observer input channel, in which case it prints out the observed value and recurses. It also offers to read in a key from stdin (that’s you hitting space again), in which case it prints out that it is ceasing observation, and returns (which falls out of the enroll call in nonObserver and thus resigns from the channel). If you press a key other than space, it continues to observe.

Note that the type of the enrolled channel-end (Enrolled BroadcastChanin Int) is different to the non-enrolled end (BroadcastChanin Int). If you try to read from the non-enrolled version, you will get a compile-time error. This uses Haskell’s type system to prevent any confusion as to whether you are currently enrolled, and we use the natural scoping of the process passed to enroll to prevent forgetting to resign from the channel (when the process finishes, the resignation happens automatically).

When you first run this program, nothing happens. The writer is generating values but no-one is observing. When you press space for the first time, the observer starts watching (i.e. it enrolls on the broadcast channel) and printing out the values. You’ll see a stream of new values being printed. If you hit space again, the observer stops watching (it resigns from the broadcast channel), and the values are discarded. Hit space and you are back to observing. You can do this as you like — ceasing and beginning observation again in quick succession between two updates (thus not missing one), or ceasing observation for a long time. The sequence number helps show how many updates you have missed.

This example demonstrates some concurrency between the process changing the value, and the observers. I find this version much simpler than the process approach, although it does have the drawback that all the observers must agree to receive from the modifying process before the modifying process is free to proceed.

(Note: you will need to compile the above program with -threaded to get the proper concurrency, and you will also need the imminent new CHP 1.6.0 release; writing this blog post prompted me to tighten up the semantics of broadcast channels with no readers enrolled.) Also, there is now a blog post describing the Haskell observer library that prompted my posts.

Follow

Get every new post delivered to your Inbox.