Archive

Archive for the ‘Uncategorized’ Category

CHP vs CML, Forking and Picky Receivers

January 6, 2010 3 comments

CHP is a Haskell library that implements message-passing concurrency — and so is CML, a Haskell implementation of the concurrency features of Concurrent ML. Recently, someone asked on reddit what the differences are between the CHP and CML libraries. So in this post, I will perform a comparison by going through the API of the CML Haskell package and seeing how to implement the same functionality in CHP.

CML Events in CHP

CML is organised around Event types. An Event represents a sort of future synchronisation, that can be engaged in using the function sync :: Event a -> IO a. CHP doesn’t have this distinction as such; all events are monadic actions, and you engage in them when you execute them in the CHP monad. So in effect, anything of type Event a in CML is just CHP a in CHP. CML has a choose :: [Event a] -> Event a function for choosing between such events, which is identical to CHP’s function alt :: [CHP a] -> CHP a.

CML has a wrap :: Event a -> (a -> IO b) -> Event b function that allows you to specify a post-synchronisation operation. Given that we have already seen that the Event type is the CHP monad and that we would execute the operation in the CHP monad rather than IO, this operation in CHP would be something like wrapCHP :: CHP a -> (a -> CHP b) -> CHP b. I’m sure many Haskell programmers will recognise this as the type of monadic bind (>>=), and indeed monadic bind has the same semantics of wrap; it can choose and synchronise on the left-hand event, then later execute the right-hand operation with the result.

CML has a guard :: IO (Event a) -> Event a event for specifying a pre-synchronisation operation that is run every time a synchronisation is attempted on the given event. I have not come across this particular feature before, but it would be interesting to see how to achieve a similar effect in CHP if anyone has a use case for the guard function. CML also has a wrapabort :: IO () -> Event a -> Event a that allows you to specify an action to execute if the event is not chosen. Like guard, I am not sure what I might use this function for. It would be relatively easy to wrap up an alt to execute abort actions for non-chosen guards, though.

Forking

CML has a “fork and forget” spawning operator, spawn :: IO () -> IO ThreadId, that is really a synonym for forkIO. There are philosophical issues here. Fork and forget, which is present in many concurrency frameworks, can be very useful to set off processes that you just want to set going and not interact with, or to launch daemon processes that should run for the program’s lifetime. One problem with it is that the semantics are not always immediately clear — to what extent are the processes forgotten? For example, what do you think the output of this program will be?

import Control.Concurrent

main = forkIO (threadDelay 1000 >> putStrLn "Hello World?")

The question is: will the Haskell program wait for all the forked processes to finish before the main program finishes, i.e. will the text be printed? The answer seems to be: no; on my GHC 6.10.3 system, if I compile the above with -threaded and run it, there is no output, even if I take the delay down to zero. So if you want to wait for a particular forked process to complete before your program ends, you’ll need to add mechanisms to do so yourself. Or rather: when your main program finishes, you had better be sure that all the forked processes have finished, or that you are sure that you don’t care if they are still running.

To help with such issues, CHP offers runParallel, which we’ve seen a lot, but also something that I call scoped forking. There are two basic operations in scoped forking, centred around a monad transformer called ForkingT. (You can use ForkingT on any monad that is a member of the MonadCHP type-class, i.e. that has CHP at the bottom. The most obvious thing is to have the ForkingT CHP monad, but you can also have ForkingT (StateT Int CHP) or similar.) You fork off a process within this transformer using fork :: MonadCHP m => CHP () -> ForkingT m (). This forks off the first parameter, and is an action in the ForkingT transformed monad. You initiate one of these blocks using forking :: MonadCHP m => ForkingT m a -> m a. The important thing is that when the ForkingT block (the first parameter) finishes its own execution, the call to forking does not return until every forked process has also finished. So you can fork off as many processes as you like in the forking block, but at the end, you must wait for them all to finish.

So between runParallel and forking (the only mechanisms for running processes concurrently in CHP), there is always a defined, clear point in your program at which you will wait for any concurrent sub-processes to terminate. When your outer-most runCHP call finishes, it is not possible for these to still be any CHP processes running, so there is no confusion or race hazards involving termination semantics as there is with fork-and-forget functions such as forkIO.

Channels

Back to CML — the last item to cover is channels. CML has a type Channel a. It’s not clear from the documentation whether these channels support multiple readers and/or multiple writers, but let’s assume for simplicity that they don’t, and thus that this is the same as CHP’s OneToOneChannel a. There is a channel :: IO (Channel a) function for creating channels which is the same as CHP’s oneToOneChannel :: CHP (OneToOneChannel a). There is also a function to write to a channel, transmit :: Channel a -> a -> Event (), which is the same as CHP’s writeChannel :: Chanout a -> a -> CHP () except that CHP has the concept of channel-ends which CML does not. CHP uses channel-ends to make it clear that when a process is passed a Chanout end, it will be writing to that end of the channel, and cannot read from it. In this way, the directionality of the channels becomes apparent in the types, and there is some static guarantee of the patterns of usage of channels.

CML’s operation for reading from a channel is more interesting: receive :: Channel a -> (a -> Bool) -> Event a. The second parameter is a function that allows you to decide whether you want to engage in reading from the channel, based on the value being offered. Interestingly, the CSP calculus contains this feature, but it is not something I have added to CHP. I believe Erlang effectively also offers something like this (by pattern-matching on messages in a process’s message queue). I can see uses for this, and in CHP you would have to implement something like this by sending values on different channels, and choosing based on the channel the value was being communicated on rather than the value itself. I should perhaps add this to CHP in future, but for now this is a useful CML feature that CHP does not support. Without this aspect, receive is equivalent to CHP’s readChannel :: Chanin a -> CHP a.

Conclusions

The main difference between the features of CML and CHP are that CML is a nice small library, and CHP is a larger library with many more features (barriers, clocks, conjunction, parallel operators, poison, more channel types, traces, and the new higher-level features like the connectable type-class and behaviours). I think CHP subsumes CML for the features I want, but obviously I’m very biased. CML does have the benefit of operating in the IO monad, which can be more convenient than CHP’s eponymous monad. The reason for having the CHP monad is to support poison (more cleanly than using exceptions) and tracing, two features not present in CML.

In terms of the implementation, CHP uses STM and has a lot of logic to support some of the extra features (especially conjunction), but only uses as many threads as you would expect (one per thread of execution). CML uses MVars and has relatively simple algorithms, although they do spawn a lot of threads — from what I can understand of skimming the code, choose spawns off a new thread for every possibility being offered. The transactional events library does something similar. In contrast, CHP spawns off no threads for an alt call, and just performs one (potentially expensive) memory transaction then blocks until it is woken up by another thread offering to synchronise with it. That said, I expect CML may be faster than CHP for common cases.

Differences aside, CML and CHP are similar in their approach to concurrency (I can’t remember if CML was influenced by the CSP calculus, but it certainly has a similar approach to concurrency), so many of the design patterns should transfer between the two libraries — especially if CML’s developers were to add barriers in a future version.

Categories: Uncategorized Tags: ,

Sticky Platelets in a Pipeline (Part 2)

December 23, 2009 1 comment

In my last post I showed the infrastructure and logic of a simple simulation of some sticky blood platelets moving through a blood vessel. I added the video to the end of that post; in this post I will explain how the visualisation was implemented.

In most CHP simulations there is a barrier (typically named tick) that is used to keep all the simulation entities synchronised, so that they all proceed through the discrete time-steps together. When it comes to visualising simulations, we could add a separate channel through which to report the status of all the simulation entities to a central drawing process, but this feels like a waste when all the processes are already synchronising together. CHP has reduce channels (many-to-one) that are the opposite of (one-to-many) broadcast channels; in a reduce channel, all the writers send in their data to a single reader, and all must send together before the communication is complete. So a simple way to achieve visualisation is to transform the tick barrier into a reduce channel, which has the same synchronisation behaviour, but that also allows data to be sent to some drawing process.

The simulation entities don’t need to know the details of all this, so we hide it using a send action that I’ve called a tick action:

type TickAction = SendAction (Maybe (Platelet, Bool))

data Platelet = Platelet GLfloat

Note that I’ve also had to tweak the type in the platelet from last time to make working with the OpenGL library a bit easier. The only change to the site process (and plateletGenerator and plateletPrinter processes) is related to the change in the tick event to being an action. Here is the site process in full:

site :: Chanin (Maybe Platelet) -> Chanout (Maybe Platelet)
     -> TickAction -> CHP ()
site prevIn nextOut act = foreverFeed (maybe empty full) Nothing
  where
    tickWith = sendAction act

    empty = join . fst <$> offer
              (            (once $ readChannel prevIn)
               `alongside` (once $ writeChannel nextOut Nothing)
               `alongside` (endWhen $ tickWith Nothing)
              )

    full platelet
      = do r <- liftIO $ randomRIO (0, (1::Double))
           let moving = r >= 0.05
               tick = tickWith (Just (platelet, moving))
               mv = readChannel prevIn <&> writeChannel nextOut (Just platelet)
               probablyMove = if moving then fst <$> mv else stop
               
           fromMaybe (Just platelet) . fst <$>
             offer ((once probablyMove) `alongside` endWhen tick)

Synchronising on the tick barrier has become engaging in the tick event, where we must pass in that optional pair Maybe (Platelet, Bool); the first item is the platelet currently occupying the site, and the boolean indicates whether the platelet was willing to move on this time-step. If the site is empty, the Nothing value is sent.

The status values end up passed to the draw function, which is fairly boring (it draws a vertical bar) — we are only interested in its type:

data ScreenLocation = ScreenLocation Int
  deriving (Eq, Ord)

draw :: ScreenLocation -> Maybe (Platelet, Bool) -> IO ()

This is called from the drawProcess:

drawProcess :: ReduceChanin (Map.Map ScreenLocation (Maybe (Platelet, Bool)))
            -> CHP ()
drawProcess input
 = do displayIO <- embedCHP_ $
        do x <- readChannel input
           liftIO $ do startFrame
                       mapM_ (uncurry draw) (Map.toList x)
                       endFrame

      ...

Note that drawProcess takes the input end of a reduce channel which carries a map from ScreenLocation (a one-dimensional location in our simple example) to the Maybe (Platelet, Bool) values we saw. Reduce channels in CHP must carry a monoid type, because the monoid instance is used to join all the many values from the writers into a single value (using mappend/mconcat, but in a non-deterministic order — so make sure the monoid is commutative).

The Map type (from Data.Map) is a monoid that has union as its mappend operation. This is exactly what we want; each site process will send in a singleton Map with their specific screen location mapped to their current status, and using the monoid instance these maps will all be joined (quite safely, since each sender will have a different location, and hence a distinct key entry in its Map) into one big map, that can then be fed into the draw function as we saw above.

We don’t trouble the site process with knowing its location; instead, we wrap up the location in the send action. It is easy to construct send actions that apply a function to a value before it is sent, so we apply a function that takes a status value, and turns it into the singleton Map just discussed. This is all done as part of the wiring up of the process network, based on the version from last time:

main :: IO ()
main = runCHP_ $
       do status <- manyToOneChannel
          pipelineConnectCompleteT
            (enrollAll (return $ writer status) . zipWith withSend locationList)
            plateletGenerator (replicate numSites site) plateletPrinter
           <|*|> drawProcess (reader status)
  where
    numSites = screenSize - 2
    locationList = map ScreenLocation [0..(screenSize-1)]
    withSend k p c = p $ makeSendAction' c (Map.singleton k)

The withSend function does the wrapping of the modification function with the send action. Each site is given a location from the list, including the generator and printer processes; otherwise this function is the same as the version from part 1.

I’ve omitted the OpenGL code, which is much the same as my previous examples. But here, again, is the video showing the results of the visualisation:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

Turning the tick barrier into a reduce channel is often an easy way to visualise a simulation, and doesn’t require too much change to the code. As I said last time, the video is nothing compared to the TUNA videos which are very impressive, and some of which were generated by distributing the code over a cluster — a topic I hope to come to in CHP at some point in the future.

Sticky Platelets in a Pipeline (Part 1)

December 21, 2009 2 comments

This post brings together concepts from several recent posts, including behaviours, conjunction and channel wiring. It is based on the work in the completed TUNA project, which has some stunning videos of blood clotting (and some images if you prefer):

I’m going to cut down the TUNA example greatly, to blog-post size. What we are going to be simulating is sticky objects in a pipeline, for example sticky blood platelets moving through a blood vessel. Our platelets will be represented as data that is passed between active “site” processes; our blood vessel will be a one-dimensional pipeline of site processes. Each process will be connected to its neighbours with a channel that can pass platelets. All the processes will be enrolled on the tick barrier, as is customary in CHP simulations.

We’ll begin the code with a helper function (one that I would like to see in the standard library) that iterates forever with a state value being passed through, and our platelet data type:

foreverFeed :: Monad m => (a -> m a) -> a -> m b
foreverFeed f x = f x >>= foreverFeed f

data Platelet = Platelet Float

The data in the platelet type is a colour value that I will use for visualisation in the next part of this guide. This is set randomly by the platelet generator at the beginning of the pipeline:

plateletGenerator :: Chanout (Maybe Platelet)
                  -> EnrolledBarrier
                  -> CHP ()
plateletGenerator out tick = forever $ on >> off >> off
  where
    on = do platelet <- Just . Platelet <$> liftIO (randomRIO (0, 0.5))
            offerAll [ once (writeChannel out platelet)
                     , endWhen (syncBarrier tick) ]
    off = offerAll [once (writeChannel out Nothing), endWhen (syncBarrier tick)]

The pipeline generates platelets regularly, one every three time-steps (this is coded as the simple on-off-off sequence). When it is performing an “on” time-step, it generates a platelet with a random shade, then uses behaviours to offer to once send the platelet until tick happens (i.e. the frame is over). The next site in the pipeline may not take the new platelet if the site is full and not moving this time-step, so the platelet may get discarded in that case. In the off state, the generator waits for the tick to end the frame, but also offers to tell the site ahead of it that the generator is empty (signified by sending Nothing rather than Just a platelet).

The main logic is in the site process, which also has two states, empty and full:

site :: Chanin (Maybe Platelet)
     -> Chanout (Maybe Platelet)
     -> EnrolledBarrier
     -> CHP ()
site prevIn nextOut tick = foreverFeed (maybe empty full) Nothing
  where
    empty :: CHP (Maybe Platelet)
    full :: Platelet -> CHP (Maybe Platelet)

Each time, if there is Just a platelet returned by the function, the next state will be full, otherwise it will be empty. The initial state is empty (the Nothing value). The empty state consists of three possible behaviours:

    empty = join . fst <$> offer
              (            (once $ readChannel prevIn)
               `alongside` (once $ writeChannel nextOut Nothing)
               `alongside` (endWhen $ syncBarrier tick)
              )

In an empty state, a site will read in a new platelet from the previous site in the pipeline (if available), it will offer to communicate to the next site in the pipeline that it is empty, and it will finish this behaviour when the tick event happens. The value returned is the result of reading from the channel, which will be Nothing if no read occurred or if we read in a Nothing value (and hence the site remains empty) or Just the result of the read if it did happen and was a platelet (in which case the site will become full). It is possible to reduce the amount of communications happening with empty processes, but I want to keep this example simple if I can.

The full state is as follows:

    full platelet
      = do r <- liftIO $ randomRIO (0, (1::Double))
           let
             move = readChannel prevIn <&> writeChannel nextOut (Just platelet)
             probablyMove = if r < 0.05 then stop else fst <$> move

           fromMaybe (Just platelet) . fst <$>
             (offer $
               once probablyMove
               `alongside` endWhen (syncBarrier tick)
             )

We will pick this code apart, bit by bit. It is primarily an offer between the tick to end the frame and another behaviour, called probablyMove. When the site is full, it has a 5% chance of refusing to do anything, meaning that a single platelet will not move in 5% of time-steps. So it starts by generating a random number between 0 and 1. If this is under 0.05 (i.e. a 5% chance), the probablyMove behaviour is stop, meaning it will not move — the site will just wait for the end of the frame in these 5% of cases.

In the other 95% of the time-steps, a move is offered, using conjunction. The site offers to read a value from the previous channel (which may be Just a platelet, or a Nothing value signifying the site was empty) and send on its current platelet, shuffling the platelets along the pipeline. So its overall behaviour is that it will send on its current platelet, if and only if: the previous site is empty, or the previous site is full and willing to send its platelet on (it won’t be willing 5% of the time). So a platelet can only move if there is no-one behind it, or if the platelet behind it moves too.

The implications of this behaviour are that once platelets are in adjoining cells, they only move on together. Thus any platelets that bump together form a notional clot that stays together forever after. This clot is not explicitly programmed and has no representation in the program. It is emergent behaviour that arises out of the local rules of the site process; each site only communicates with the site either side of it, and yet the program logic means that clots that are tens of platelets long could form, and would be unbreakable.

The other neat thing that arises out of the logic comes from the 5% chance. In 5% of time-steps a platelet will not move. (This is what allows the platelets to bump together in the first place.) Since a clot can only move when all its platelets move, a two-platelet clot has a roughly 10% chance of not moving (really: 1 – 0.95^2), and a three-platelet clot has about a 14% chance of not moving (1 – 0.95^3). So big clots will move slower, which means that the platelets behind become more likely to join on. Despite only having a local probability of not moving, we get the behaviour that larger clots are less likely to be able to move.

Enough on the site process; at the end of the pipeline is a platelet printer, that swallows up any platelets and prints out how large each clot was that reached the end of the pipeline:

plateletPrinter :: Chanin (Maybe Platelet) -> EnrolledBarrier -> CHP ()
plateletPrinter input tick
  = foreverFeed plateletPrinter' []
  where
    plateletPrinter' ps
      = do mp <- join . fst <$> offer (once (readChannel input)
                                       `alongside` endWhen (syncBarrier tick))
           let ps' = ps ++ [mp]
               (test, text) = maybe (isNothing, "Blank")
                                    (const (isJust, "Clot"))
                                    (head ps')
               (chunk, rest) = span test ps'
           if null chunk || null rest
             then return ps'
             else do let status = text ++ ", size: " ++ show (length chunk)
                     liftIO $ putStrLn status
                     return rest

And finally we must wire up all of this. Thankfully, our new connectable helper functions make this quite easy, and short:

main :: IO ()
main = runCHP_ $
         pipelineConnectCompleteT (enrollAll newBarrier)
           plateletGenerator (replicate numSites site) plateletPrinter
  where
    numSites = 100

If we compile and run this program, we get a print-out of clot sizes:

Blank, size: 103
Clot, size: 1
Blank, size: 51
Clot, size: 4
Blank, size: 2
Clot, size: 1

That is terribly unexciting, so I’m going to give a sneak video preview of a visualisation that I will go through in my next post. The 1D pipeline of sites is visualised left-to-right, with each tall bar being a platelet (or black for empty). When the bar flashes white, this is in the 5% of cases where the platelet is refusing to move. Hence you can see that when the larger clots form, the white flashes of the various platelets prevent the clot from moving:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

Solving the Santa Claus Problem with Conjunction

December 10, 2009 1 comment

The Santa Claus problem is a concurrency problem that, like the Dining Philosophers problem, can be used to demonstrate your particular concurrency framework of choice. December seems like the right time of year to post a CHP solution that I hacked up at the CPA 2008 conference last year. It’s short but rather primitive. Here’s the statement of the problem, via Simon Peyton Jones (who made an STM solution in a chapter of Beautiful Code):

Santa repeatedly sleeps until wakened by either all of his nine reindeer, back from their holidays, or by a group of three of his ten elves. If awakened by the reindeer, he harnesses each of them to his sleigh, delivers toys with them and finally unharnesses them (allowing them to go off on holiday). If awakened by a group of elves, he shows each of the group into his study, consults with them on toy R&D and finally shows them each out (allowing them to go back to work). Santa should give priority to the reindeer in the case that there is both a group of elves and a group of reindeer waiting.

The particularly tricky aspects of this problem are:

  1. Santa must make a choice between the reindeer and elves.
  2. Santa must give priority to the reindeer.
  3. The elves must come in groups of exactly three, even though there are ten of them.

Choice is easy in CHP, so the first item is not a problem. We saw in the last post how to emulate priority, so we can use that to solve the second point. The third item is difficult. Our barriers require all enrolled participants to synchronise, so we cannot use one barrier for all ten elves. We could introduce some intermediary agents to coordinate the elves, but that is a bit long-winded. Instead we can take a slightly brute-force approach combined with a CHP feature called conjunction.

Conjunction

The principle behind conjunction is straightforward. Usually you offer a choice such as read from channel c or read from channel d: readChannel c <-> readChannel d. This waits for the first of the two events, and executes it (it can never execute both options). A conjunction is when you want to read from channel c and read from channel d: readChannel c <&> readChannel d. Crucially, this will only read from both when both are available. If c is not ready, it will not read from d, and vice versa. It is both or none: an atomic item, if you like. Conjunction also has a list form, every.

One example of where this is useful is the aforementioned Dining Philosophers problem: a philosopher wishing to pick up his forks should execute syncBarrier leftFork <&> syncBarrier rightFork. That way he will either pick up both his forks or neither, thus eliminating the deadlock wherein each philosopher ends up holding one fork.

Conjunction Specifics

It is valid to offer conjunction as part of a choice; for example, (readChannel c <&> readChannel d) <-> readChannel e will wait to read from channels (c AND d) OR e. These choices can be overlapping, e.g. (readChannel c <&> readChannel d) <-> (readChannel d <&>readChannel e) waits for (c AND d) OR (d AND e).

The choices should be in disjunctive normal form (DNF): OR on the outside, AND in the bracket, so c AND (d OR e) is not valid. But AND does distribute over OR here, so this is equivalent to the DNF: (c AND d) or (c AND e). (The operators do not distribute in the opposite direction, so c OR (d AND e) is not equivalent to (c OR d) AND (c OR e), because the latter allows c and e to both happen, whereas the former does not.)

Conjunction and Barriers

Importantly for today’s post, there is a correspondence between barriers (N participants, all N must synchronise together) and a conjunction of two-party barriers. Let’s imagine for a moment that Santa wanted to meet with all ten of his elves. We could create a single barrier, enroll Santa and the ten elves, and get them all to synchronise on the barrier. The barrier would only complete when all eleven participants synchronised. But alternatively, we could create ten two-party barriers. We would enroll each elf on a different barrier, and enroll Santa on all of them. When they want to meet, the elves would all synchronise on their barrier (meaning their code is unchanged), while Santa would wait for the conjunction of all ten barriers. Each barrier would only complete when all of them complete, because of Santa waiting for the conjunfction of all of them, so we have the same semantics as we had when we had one barrier. These ten barriers would be dynamically fused into one barrier.

The Solution

Santa does not want to wait for all ten elves; he only wants to wait for three. We can implement this with a little brute-force. Labelling the elves A through J, we can make Santa wait for (A AND B AND C) OR (A AND B AND D) OR… (H AND I AND J). That’s 120 possibilities. Not scalable, but the problem said ten elves so that’s all we have to manage.

Let’s finally get to some code. A reindeer waits for a random time, then synchronises on its barrier (i.e. tries to meet with Santa). An elf waits for a random time, then synchronises on its barrier (i.e. tries to meet with Santa). So, perhaps unexpectedly, a reindeer has exactly the same code as an elf. (This is also the case in Peyton Jones’ solution). Here it is:

syncDelay :: EnrolledBarrier -> CHP ()
syncDelay b = forever (randomDelay >> syncBarrier b)
  where
    randomDelay = (liftIO $ randomRIO (500000, 1000000)) >>= waitFor

reindeer :: EnrolledBarrier -> CHP ()
reindeer = syncDelay

elf :: EnrolledBarrier -> CHP ()
elf = syncDelay

Our next job is to write the Santa process. Santa will take one barrier for the reindeer, and multiple barriers for the elves (one each). He will loop forever, first polling the reindeer (to give them priority) and otherwise choosing between the reindeer and the elves:

santa :: [EnrolledBarrier] -> EnrolledBarrier -> CHP ()
santa elfBars reindeerBar
  = forever (deliverToys </> (skip >> (deliverToys <-> meetThreeElves)))
  where

Now we need to define the helper processes. Delivering toys is straightforward; we just synchronise with the reindeer. The rest of the code deals with picking all groups of three elves, and making a choice between synchronising with each group:

    deliverToys = syncBarrier reindeerBar

    meetThreeElves = alt [meetGroup g | g <- allGroupsThreeElves]
    meetGroup bars = every_ [syncBarrier bar | bar <- bars]

    allGroupsThreeElves = allNFrom 3 elfBars
    allNFrom n = filter ((== n) . length) . filterM (const [True, False])

The allNFrom is not important here, so I’ve used a concise (but probably inefficient) definition. Now that we have santa, our elves and our reindeer, all that remains is to put them together. To do this we use two wiring helper functions, enrollAll (that enrolls all of the processes on the given barrier) and enrollOneMany (that enrolls one process on all the barriers, and each of the other processes on one):

main :: IO ()
main = runCHP_VCRTraceAndPrint $
  enrollOneMany
    (\elfBars ->
        enrollAll (newBarrierWithLabel "reindeer")
                  (santa elfBars : replicate 9 reindeer)
    )
    [(newBarrierWithLabel ("elf" ++ show n), elf) | n <- [0..9]]

That’s it. Part of the reason that this code is quite short is that I’ve omitted all the print statements detailing what is going on (for example in Peyton Jones’ version). These print statements were only there to observe the behaviour of the computation, and we can do that with CHP’s built-in traces mechanism, just by using the runCHP_VCRTraceAndPrint function.

View-Centric Reasoning (VCR) traces, developed by Marc L. Smith and subsequently tweaked a little bit by me, display an ordered list of multisets, where each multiset holds independent events (events that did not have a sequential dependency on each other). This style makes it very easy to see from the output that our Santa Claus solution has the right sort of behaviour, viz:

< {elf3, elf4, elf5}, {elf0, elf6, elf7}, {reindeer}, {elf1, elf8, elf9}, {elf2, elf4, elf7}, {elf0, elf1, elf3}, {elf5, elf8, elf9}, {reindeer}, {elf3, elf6, elf7}, {elf0, elf1, elf9}, {elf2, elf4, elf8}, {reindeer}, {elf3, elf5, elf6}, {elf0, elf4, elf9}, {elf1, elf2, elf7}, {elf5, elf6, elf8}, {reindeer}, {elf0, elf1, elf3}, {elf4, elf7, elf9}, {elf2, elf5, elf8}, {elf0, elf1, elf6}, {reindeer}, {elf3, elf7, elf9}, {elf0, elf1, elf4}, {elf2, elf6, elf8}, {elf5, elf7, elf9}, {elf0, elf3, elf4}, {reindeer}, {elf1, elf2, elf6}, {elf5, elf7, elf8}, ...


Concise Solution

For those who like a bit of code golf (finding the shortest version of a program), I came up with this concise version of the whole solution:

import Control.Concurrent.CHP
import Control.Concurrent.CHP.Traces
import Control.Monad
import Control.Monad.Trans
import System.Random

santa elves reindeer = forever $
  syncBarrier reindeer </> (alt $ map (every_ . map syncBarrier) groups)
  where groups = filter ((== 3) . length) $ filterM (const [True, False]) elves

main = runCHP_VCRTraceAndPrint $ enrollOneMany (enrollAll
  (newBarrierWithLabel "reindeer") . (: replicate 9 syncDelay) . santa)
  [(newBarrierWithLabel ("elf" ++ show n), syncDelay) | n <- [0..9]]
  where syncDelay = forever . (randomDelay >>) . syncBarrier
        randomDelay = (liftIO $ randomRIO (500000, 1000000)) >>= waitFor

Excluding the import statements, that’s a solution to the Santa Claus problem in eight lines of Haskell. Rather difficult-to-follow Haskell, but that’s usually the result of code golf.

The Problem with Parallel Participants Professing Priority

December 8, 2009 2 comments

Priority

There are two kinds of priority in the CHP style of concurrent programming: priority on processes and priority on events. Priority on processes is about specifying that a high-priority process P should run whenever possible, at the expense of a low-priority process Q. This is difficult to co-ordinate across multiple cores (especially if lightweight threads are used, as in Haskell) and isn’t offered by all run-times. The priority I am interested in discussing in this post is that of events: specifying that if two events A and B are ready to complete, A should happen in preference to B.

There is an immediate problem with local priorities over events, where each process separately specifies its priorities to the events it is offering. Imagine that you offer to either go to the cinema, or go bowling, and you prefer (i.e. give priority to) the cinema. Your friend also offers to go to the cinema or to go bowling, but they prefer (give priority to) bowling. For a one-off choice of doing one thing, there is no amount of algorithmic cleverness that can resolve such situations to the satisfaction of both parties. So local priorities,where both sides can specify their own priorities, are fairly meaningless because in general they cannot be resolved correctly.

One way to solve this is to only allow one side to specify a priority. The occam language did this; only processes reading from channels were allowed to specify priority, not the writers. (In fact, only processes reading from channels were allowed to offer a choice!) This means that the priorities can always be satisfied because you only have one set of priorities to resolve in each choice. This falls down with barriers — it becomes difficult to specify which synchronising process of many is allowed to offer priorities.

Another solution is to have global priorities instead. If we specify up-front that the cinema is always better than bowling, there can be no dispute when we make our offers for activities for the evening. This could be implemented, for example, by assigning a global integer priority to all events (perhaps with 0 as the default). I gather that global priorities make things difficult for formal reasoning in CSP, but that does not mean we cannot use it.

CHP and Prioritised Choice

So what does CHP do? Events do not currently have global priority (although I would like to implement it at some point). There is an unprioritised choice operator, <-> (with a list form: alt), which is commutative and associative. But there is also a prioritised choice operator, </> (with a list form: priAlt), which is associative but not, of course, commutative. Its existence is partly a historical hangover from the first version of CHP (which was a more direct conversion from occam), and it has some slightly intricate semantics, which I’ll describe here in terms of the list form.

The relative positions in the list of any guards involving reading from channels, writing to channels, or synchronising on barriers are discounted. So priAlt [readChannel c, syncBarrier b] is the same as priAlt [syncBarrier b, readChannel c]. The position of any stop guards is irrelevant because they will never trigger. The position of any skip guards is important in relation to all the other guards. priAlt (skip : others) is guaranteed to choose the first guard, regardless of what comes after. Similarly, priAlt (initialGuards ++ [skip] ++ otherGuards) will never choose any of the otherGuards, but if any of the initialGuards are ready, they will be chosen in preference to the skip. Effectively, skip is like an early terminator for the list of guards passed to priAlt (but don’t go overboard — I don’t think passing an infinite list of guards will work, even if skip is early on). In contrast, the presence of skip guards in an unprioritised choice is generally wrong; the outcome of alt [readChannel c, skip] is non-deterministic, even if c is ready.

Polling

Generally in my examples on the blog, I have always avoided the use of priAlt and </> in favour of alt and <-> because the former is only really different to the latter when skip guards are present, and thus the latter form, being more clearly an unprioritised choice, is better. There is one, slightly inelegant, use for prioritised choice though: polling. Imagine that you want to poll to see if a channel is ready. If it is, you are happy to read from it, but if it’s not ready yet, you want to continue on and do something else. That is easy to capture: readChannel c </> skip. In fact, it is possible to capture this as a helper function:

poll :: CHP a -> CHP (Maybe a)
poll c = (Just <$> c) </> (skip >> return Nothing)

You can even nest these things; this code will check channels c and d for readiness (if both are ready, either might be taken), and return Nothing only if neither is ready:

poll (alt [readChannel c, readChannel d])

It is also important to be aware that this polling is only a snapshot of the current state. If you poll channel c, you have no guarantee that the result of the poll will still hold by the time you get the result. So if you poll channel c, and find it is not ready, it may have turned ready by the time you examine the result and make a subsequent decision. A particularly bad use would be to have both ends polling: if one process continually polls to read from c, and the other process continually polls to write to c, depending on timing, it is quite possible that no communication will ever take place. It is only really a good idea to use polling if you know the other end will stay committed to the action once offered (i.e. that it is not offering a choice of events).

Emulating Priority

This pattern can also be used to give one event a form of priority over another. This code:

readChannel c </> (skip >> alt [readChannel c, readChannel d])

First checks to see if c was ready. If so, it takes it, otherwise it waits for the next event of c and d. So it gives a form of priority to c. This is not foolproof priority; if another process later offers c and d there is no guarantee that c will be chosen, so it only provides real priority if different processes are offering the events involved.

Concisely Expressing Behaviours with Process Combinators

December 6, 2009 3 comments

The Problem

In recent posts we saw the phased-barrier pattern, wherein simulation entities perform various optional activities until the phase moves on. For example, our graph nodes were willing to send out their position to anyone that attempted to read it, until the phase ended. In a different simulation, space cells might be willing to send out information to anyone that attempted to read it and/or to accept a single new agent into the space and/or to send an agent away, until the phase ends. Programming these sorts of behaviours can become difficult. Let’s begin with coding up a single optional move action before the frame ends (commonly called a tick):

myproc1 = alt [tick, move >> tick]

This begins by offering move or tick; if tick happens we’re done, but if move happens then we wait for tick. Then we can add on repeatedly offering to send out our status:

myproc2 = alt [sendStatus >> myproc, tick, move >> myprocMoved]
  where
    myprocMoved = alt [sendStatus >> myprocMoved, tick]

We need two processes now to represent our two states (the optional move has or has not happened). By the time we change to an optional moveIn and optional moveOut, this approach has become unmanageable:

myproc3 = alt [ sendStatus >> myproc
              , tick
              , moveIn >> myprocMovedIn
              , moveOut >> myprocMovedOut]
  where
    myprocMovedIn = alt [ sendStatus >> myprocMovedIn
                        , tick
                        , moveOut >> myprocMovedInOut]
    myprocMovedOut = alt [ sendStatus >> myprocMovedOut
                         , tick
                         , moveIn >> myprocMovedInOut]
    myprocMovedInOut = alt [sendStatus >> myprocMovedInOut
                           , tick]

There is probably a half-way nice solution involving zippers, but I decided to clean up this mess with a set of process combinators that I have called behaviours. Here is how I express those three pieces of code with behaviours:

myproc1 = offer (once move `alongside` endWhen tick)

myproc2 = offer (repeatedly sendStatus
                 `alongside` once move
                 `alongside` endWhen tick)

myproc3 = offer (repeatedly sendStatus
                 `alongside` once moveIn
                 `alongside` once moveOut
                 `alongside` endWhen tick)

Rather than the combinatorial explosion of states, we now have a nice readable description of the process behaviours that scales up nicely.

Behaviour Combinators

In this section we’ll explore the full-set of combinators, with their types:

offer :: CHPBehaviour a -> CHP a
alongside :: CHPBehaviour a -> CHPBehaviour b -> CHPBehaviour (a, b)
alongside_ :: CHPBehaviour a -> CHPBehaviour b -> CHPBehaviour ()

The offer function executes the given description of behaviours. The alongside_ operator, which is commutative and associative, joins together two behaviours.

Terminal Behaviours

The behaviours can be constructed with two fundamentally different types of behaviour. The first is a terminal behaviour, i.e. one that ends the current set of behaviours. The endWhen combinator returns from the outer call to offer when it happens:

endWhen :: CHP a -> CHPBehaviour a

Multiple endWhen behaviours are allowed in one offer, but only one will ever happen (because the call to offer will return before a second could happen). This gives rise to the law (ignoring return types):

endWhen p `alongside` endWhen q == endWhen (p <-> q)

(This, in combination with the commutativity and associativity of alongside, means that all behaviours with multiple endWhen calls can be reduced to a behaviour with a single endWhen call.)

Repeated Behaviours

The second class of behaviour is the one that offers an event 0 or more times. The once combinator has an upper-limit of one occurrence, which is really a special case of upTo which takes a specified upper bound, and there is also repeatedly which has no upper bound (and offers the event endlessly until an endWhen event happens to end the offer):

once :: CHP a -> CHPBehaviour (Maybe a)
upTo :: Int -> CHP a -> CHPBehaviour [a]
repeatedly :: CHP a -> CHPBehaviour [a]

Repeatedly has a similar law to endWhen:

repeatedly p `alongside` repeatedly q == repeatedly (p <-> q)

(Which again means that any behaviour with multiple repeatedly calls can be reduced to a behaviour with a single repeatedly call.)

All of these combinators assume that you do not need to preserve any state between occurrences of the event. However, particularly for repeatedly, you may need to pass state around, so there is an extra repeatedlyRecurse function:

repeatedlyRecurse :: (a -> CHP (b, a)) -> a -> CHPBehaviour [b]

Here, a is the type of the state, and b is the type of the results. The arguments to this function are arranged in an order that allows you to write:

repeatedlyRecurse (runStateT myStateAction) initialState

Which is useful to run a behaviour in the StateT s CHP monad.

This behaviours feature was added to CHP recently, and will crop up in several future posts. I’m beginning to develop several of these higher levels of abstraction on top of CHP, and I’m interested to see where it can take me. The key thing that enables the process combinators seen here and in the recent post on connecting processes, besides the powerful type system, is the ability to pass processes as arguments to the combinators; both processes that are ready to run (i.e. of type CHP a) and processes that still require some arguments (e.g. of the form a -> b -> CHP c). In Haskell this is as easy as passing around functions is, but in other languages it is either horribly clunky (e.g. C++) or impossible (e.g. occam).

 


 
 

Theory — Grammars

I realised when developing this system of behaviours that my combinators looked a little familiar. Users of Parsec may recognise the concepts behind repeatedly and once as being similar to many1 and optional. My library is not a parser, but this similarity suggests that what I am really doing with these combinators is expressing a form of grammar: a grammar on the CSP trace of the program (the list of events that the program performed).

My grammars are a little different to standard grammars in that order barely matters: the only ordering that really matters is that the endWhen events must come last. This is in contrast to most other grammars where order is all-important (e.g. function name then double colon then type). This means that not many grammar systems are suited to expressing my desired grammar, e.g. upTo 10 a `alongside` repeatedly b `alongside` endWhen c describes any sequence that ends in a c, and has as many bs as you like, with at most ten as, which can occur anywhere in that sequence before the c.

If you look at the clunky definitions at the beginning of this post (that did not use behaviours) you can see that this is really a Context-Free Grammar (CFG) version of my required behaviour. Clearly, though, a CFG is not the ideal way to express myself, as it becomes combinatorially larger as you add more options (in contrast to my behaviours, where it just involved adding one item on the end). So I wondered if there was any grammar system that more closely corresponded to my behaviour combinators.

One promising answer to that question seems to be two-level grammars: intuitively, a grammar that generates a grammar (think meta-programming, where you use a program to generate a program). Google does not provide much accessible information on the subject, and the wikipedia page is not too helpful. Thankfully, my university library holds a copy of “Grammars for Programming Languages” by Cleaveland and Uzgalis (1977, Elsevier), which turns out to be a very nice little book on the matter. Based only on a few hours reading the middle of the book, I have had a crack at representing my behaviours with a two-level grammar:

Metaproductions

END :: tick.
REPEATABLE :: sendStatus.
OPTIONAL :: moveIn; moveOut.

EVENT :: END; OPTIONAL; REPEATABLE.
EMPTY :: .
SEQ :: EVENT ; SEQ EVENT.

Hyper-rules

g: g’, END.
g’: EMPTY; SEQ check.

OPTIONAL SEQ check: OPTIONAL symbol, SEQ check, OPTIONAL notin SEQ.
REPEATABLE SEQ check: REPEATABLE symbol, SEQ check.
EVENT check: EVENT symbol.

This may seem long, but the idea is that you only need to change the top three lines of the meta-productions to suit your program; the rest of it is like a set of helper functions (including notin, which is defined in the book, but I’ve omitted the definition here). Adding the upTo combinator should not be too much harder. If anyone reading this is familiar with two-level grammars, I’d appreciate knowing if what I have written above is actually correct! Alternatively, if you know of more suitable ways to express the grammar, please let me know — for example, would attribute grammars be suitable?

Categories: Uncategorized

Automated Wiring of Message-Passing Processes

December 2, 2009 5 comments

A CHP program consists of many concurrent processes, wired together. They can be wired together with channels of any type (here, colour indicates the type the channel carries) that may be connected in one direction or the other:

They can also be joined with non-phased barriers (which come in one colour, black):

 

Connection Example 1

Now consider this pair of processes, p that requires an incoming purple channel and outgoing red channel, and q that requires an incoming red channel and outgoing green channel:

It is incredibly obvious what is needed to wire them up: a red channel from p to q. Once the two processes are connected in this way, they will become a new process, r, that requires an incoming purple channel (which will be passed to p) and an outgoing green channel (which will be passed to q):

 

Connection Example 2

It remains obvious even for this slightly more complicated example, where p requires an incoming purple channel and a pair (outgoing red channel, incoming blue channel), and q requires a pair (incoming red channel, outgoing blue channel) and a pair (outgoing green channel, incoming beige channel):

Their composition into r is:

 

Connection Example 3

Finally, there is an example where p takes one argument — a pair (outgoing red channel, enrolled barrier) — and q takes two arguments — a pair (incoming red channel, enrolled barrier), and an Int:

They compose into a process that only requires one argument, the Int:

 
 

Connectable Operators

It is child’s play to work out how to connect the processes — and the user code should be just as easy. I recently added the Connectable type-class to CHP to facilitate this kind of simple wiring. I’ll come to the implementation, but first here are the examples again, along with the code to wire them up (the type for r can be inferred from the type of p and q, but I give all the types anyway):

p :: Chanin Purple -> Chanout Red -> CHP ()
q :: Chanin Red -> Chanout Green -> CHP ()
r :: Chanin Purple -> Chanout Green -> CHP ()
r = p <=> q

 
 

p :: Chanin Purple -> (Chanout Red, Chanin Blue) -> CHP ()
q :: (Chanin Red, Chanout Blue) -> (Chanout Green, Chanin Beige) -> CHP ()
r :: Chanin Purple -> (Chanout Green, Chanin Beige) -> CHP ()
r = p <=> q

 
 

p :: (Chanout Red, EnrolledBarrier) -> CHP ()
q :: (Chanin Red, EnrolledBarrier) -> Int -> CHP ()
r :: Int -> CHP ()
r = p |<=> q

Note that the <=> operator is for when both processes take two parameters, and there are slight variants for other situations; for example, |<=> for when the left-hand side only takes one parameter (the bar indicates that this could be at the left-hand end of a pipeline of such processes).

Connectable Implementation

The implementation is simple enough that I will show it here. We have our Connectable type-class:

class Connectable l r where
  connect :: ((l, r) -> CHP a) -> CHP a

Given a CHP process requiring the two things connected, it will connect them for the duration of the process and run it. So we have simple instances for channels and barriers:

instance Connectable (Chanout a) (Chanin a) where
  connect p = newChannelWR >>= p
instance Connectable (Chanin a) (Chanout a) where
  connect p = newChannelRW >>= p
instance Connectable EnrolledBarrier EnrolledBarrier where
  connect p = do b <- newBarrier
                 enroll b $ \b0 -> enroll b $ \b1 -> p (b0, b1)

Then the real power comes from the instances for pairs, triples and so on:

instance (Connectable al ar, Connectable bl br) =>
    Connectable (al, bl) (ar, br) where
  connect p = connect (\(ax, ay) -> connect (\(bx, by) ->
                p ((ax, bx), (ay, by))))

This instance means that any two corresponding pairs of connectable things are also connectable. This means that we can define our useful process composition operator and use it everywhere on all sorts of types:

(<=>) :: Connectable l r =>
          (a -> l -> CHP ()) -> (r -> b -> CHP ()) -> a -> b -> CHP ()
(<=>) p q x y = p x |<=>| flip q y

(|<=>|) :: Connectable l r => (l -> CHP ()) -> (r -> CHP ()) -> CHP ()
(|<=>|) p q = connect (\(x, y) -> p x <|*|> q y)

Now that we have this notion of connectable things, we can define further functions to connect a list of processes in a pipeline and so on (these are provided for you in CHP). My favourite function, which is required very often if you build 2D simulations, is the wrappedGrid operator, forthcoming in CHP 1.8.0. Given a data-type to represent 4-way connectivity:

data FourWay above below left right
  = FourWay { above :: above, below :: below, left :: left, right :: right }

And provided above can connect to below, and left to right, we can wire up a list of lists of processes (i.e. a 2D grid in waiting) into a torus, that is a grid where the top edge connects to the bottom edge and the left edge to the right edge (as is often the case in old arcade games):

wrappedGrid :: (Connectable above below, Connectable left right) =>
  [[FourWay above below left right -> CHP a]] -> CHP [[a]]

The implementation is a little fiddly — but I can define it once in the CHP library, and you can use it with whatever channels and barriers you use to connect up your four-way wrapped-round 2D grid. I’m also going to define a similar operator for 8-way connectivity. By capturing the notion of two things being connectable, we are able to build operators and helper functions for such common topologies without caring whether the individual components are connected by channels of Ints, Strings, MyCustomDataType or by barriers.

Categories: Uncategorized Tags:

Graph Layout with Software Transactional Memory and Barriers (plus video!)

November 26, 2009 1 comment

In my last post, I used barriers and shared channels to write a simple program for performing force-based graph layout. There were two things lacking in that example: firstly, repulsion between unconnected nodes and secondly, some cool videos. I’ve now solved both of these shortcomings.

In order to implement repulsion between all nodes, each node must know the position of all other nodes at every time-step. I could implement this with message-passing, but that would be O(N^2) communications each phase, which isn’t very efficient, and at that point, message-passing is probably not the right design. We need some shared variables instead. Software Transactional Memory (STM) provides a nice way to do shared variable concurrency in Haskell, so we will use that.

The idea is that we will have a transactional variable (TVar) per node, holding the node’s position. Every time-step, each node will read from all the TVars to discover the positions of all the other nodes, and will then update its own position based on the other nodes. If we are not careful, we will have the same race hazard that I warned against last time: one node could update its position before all the other nodes have read its previous position. Or, if left totally unconstrained, one node could perform several updates before other nodes have even performed one. To prevent these problems, we will again use barriers to break up the simulation into two phases: discover, and act. This means that we will be combining CHP and STM: using barriers from CHP to regulate access to shared variables from STM.

All the code before the node process is exactly the same as last time, except that I now import Control.Concurrent.STM and Control.Parallel.Strategies (for some strict application) — and I’ve made the nodes a bit smaller. Our node process no longer takes channels as its parameters, but TVars instead. It takes a list corresponding to the other nodes, of pairs of booleans (indicating if the other node is connected to this one) and transactional variables (to read the other node’s position). It also takes a single transactional variable, which it should use to write its own position to:

node :: NodeInfo -> [(Bool, TVar NodeInfo)] -> TVar NodeInfo
     -> Enrolled PhasedBarrier Phase -> CHP ()
node start neighbours me bar = node' start
  where
    (connected, tvs) = unzip neighbours
    
    node' cur
      = do Discover <- syncBarrier bar
           neighbourPos <- liftSTM (mapM readTVar tvs)
           let new = updatePos (zip connected neighbourPos) cur
           Act <- syncBarrier bar
           liftSTM (writeTVar me (new `using` rnf))
           node' new

You can see that the node process is a bit simpler. In the Discover phase it reads from the TVars and in the Act phase it writes to its own. It is immediately apparent that the reads and writes are thus segregated. liftSTM is defined below (it is essentially the atomically function), and the `using` rnf is a strictness thing that you can ignore. The updatePos function is now more complicated as we have added repulsion (though it is all irrelevant to the concurrent logic):

      
    updatePos poss cur
      = sum [cur
            ,0.05 * sum [ let v = p - cur in v - ideal * normalise v
                        | (True, p) <- poss] -- connected nodes
            ,0.000002 * sum [let sc = (p `from` cur) ^^ (-2)
                             in NodeInfo sc sc * normalise (cur - p)
                            | (_, p) <- poss] -- all nodes
            ]
      where
        ideal = 0.02
        (NodeInfo x2 y2) `from` (NodeInfo x1 y1)
          = sqrt $ (x2-x1)^^2 + (y2-y1)^^2
        normalise (NodeInfo x y) = fmapNodeInfo (/ mag) (NodeInfo x y)
          where
            mag = sqrt (x*x + y*y)

instance NFData GLfloat
instance NFData NodeInfo where
  rnf (NodeInfo a b) = rnf a >| rnf b

liftSTM :: MonadIO m => STM a -> m a
liftSTM = liftIO . atomically

I’ve also had to add an instance of NFData to use rnf (strict evaluation) on NodeInfo, and the simple liftSTM function for executing STM transactions in the CHP monad. The draw process only has one change; instead of reading from a list of channels, we now read from a list of transactional variables:

drawProcess :: [TVar NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP ()
drawProcess tvs bar
 = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar
                                  xs <- liftSTM $ mapM readTVar tvs
                                  liftIO $ do startFrame
                                              mapM_ draw xs
                                              mapM_ (drawEdge xs) edges
                                              endFrame
...

Last time, I only had four nodes in my example, which was fairly boring. This time I have one hundred nodes, connected in a 10×10 grid. To make the demo interesting, all the nodes start with random positions:

startNodes :: IO [NodeInfo]
startNodes = replicateM (10*10) $ liftM2 NodeInfo r r
  where
    r = liftM (fromRational . toRational) $ randomRIO (0 :: Float, 1)

edges :: [(Int, Int)]
edges = across ++ down
  where
    across = [(x+10*y, (x+1)+10*y) | x <- [0..8], y <- [0..9]]
    down = [(x+10*y, x+10*(y+1)) | x <- [0..9], y <- [0..8]]

Finally, our slightly adjusted main process:

main :: IO ()
main = runCHP_ $
       do nodes <- liftIO startNodes
          vars <- liftSTM $ mapM newTVar nodes
          enrollAll_ (newPhasedBarrier Act)
           (drawProcess vars :
            [ let edgesOut = filter ((== i) . fst) edges
                  edgesIn = filter ((== i) . snd) edges
                  connectedNodes = map fst edgesIn ++ map snd edgesOut
                  conn = [(ind `elem` connectedNodes, tv)
                         | (tv, ind) <- zip vars [0..], tv /= mytv]
              in node n conn mytv
            | (n, mytv, i) <- zip3 nodes vars [0..]
            ])

There are not many changes from my previous shared channel-based version to use STM instead. My continued use of barriers alongside STM shows that you can mix STM and CHP quite nicely if you want to. Now, here’s what you’ve been waiting for — a video of the graph layout in action, first on my grid graph as defined above:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

I also ran the algorithm on a simple loop graph:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

The two videos have the algorithm running at 10 iterations per second, as that seems like a nice speed to view them at (and apologies for the slight garbage down the right-hand side of the videos). The grid example was inspired by an original example in the Java version of the Prefuse toolkit. Prefuse is an incredibly cool visualisation toolkit; some Java examples can be found online but more impressive are the more recent ActionScript examples. Try the Layouts tab, and also the dependency graph example if you have a spare minute.

Categories: Uncategorized Tags: , , ,

Force-Directed Graph Layout with Barriers and Shared Channels

November 24, 2009 10 comments

A graph is a collection of nodes, and edges joining the nodes together. Automatically drawing them cleanly is an interesting problem. Some force-based graph layout algorithms view the edges between nodes as springs, and simulate the forces acting on each node in order to move the nodes into a good position. Connected nodes will pull towards each other until the edges are of an ideal length. We can implement such a layout algorithm in CHP, with a process per node.

To implement the algorithm, the nodes need to be able to find out the current positions of their neighbours (i.e. the nodes they are connected to) and update their own position accordingly. One approach would be to have a channel pair per edge, to enable sending of position information in both directions, as in this graph of three nodes:

I’m going to take an alternative approach of having one output channel per node, on which the node can send its position. The other end (i.e. the reading end) of these position channels will be shared, and these ends will be passed around to all the nodes that connect to the node with the output end. We can also give all the reading ends to a display process. So our wired up graph now looks like this:

A shared channel is represented by a small hollow circle. Each has one writer (the three nodes), and several readers (the connected nodes and the display process). Each iteration, our nodes will offer to send out their current position (as many times as they are asked for it) while also fetching the position of all their neighbours. Then they will all calculate a new position based on their neighbours and go again. One problem with this common discover-then-act design is that if you do not clearly separate the discovery of the neighbours’ positions and the updating of the positions, you can get nodes updating based on a mix of old positions (which is what you want) and the new updated positions — a race hazard. To prevent this, we divide each simulation step into two phases (discover and act) using a phased barrier.

A phased barrier is a synchronisation primitive. It allows processes to enroll on the barrier, to resign from the barrier, and to synchronise; processes only successfully synchronise on a barrier when all currently-enrolled processes synchronise. Each synchronisation, the phase of the barrier is moved on (and typically cycles around).

We will begin with some import statements, and declaring a NodeInfo type to hold the positions of nodes. We will also include a quick Num and Fractional instance for our NodeInfo that performs plus, minus, etc element-wise (NodeInfo 1 6 * NodeInfo 3 4 == NodeInfo 3 24):

import Control.Concurrent.CHP
import Control.Monad
import Control.Monad.Trans
import Graphics.Rendering.OpenGL
import Graphics.UI.GLUT hiding (alt)

data NodeInfo = NodeInfo GLfloat GLfloat deriving (Show, Eq)

instance Num NodeInfo where ...
instance Fractional NodeInfo where ...

Then we will declare our phase data type, and a helper function to read from a list of shared channels in parallel:

data Phase = Discover | Act deriving (Eq, Show, Bounded, Ord, Enum)

readAll :: [Shared Chanin a] -> CHP [a]
readAll = runParMapM (flip claim readChannel)

Next, we will define our node process. The main body of the node process first begins the discover phase. It then acts as a sender and receiver in parallel: the receiver reads in the positions of all its neighbours, while the sender continually offers to send out its position. It finishes both of these once the phase changes. To facilitate this, we must enroll on the barrier again, and use one barrier end in the sender and one in the receiver. (If we did not enroll a second time, and tried to use the same single barrier end twice in parallel, this would be a mis-use of the library.) So here is most of the node process:

node :: NodeInfo -> [Shared Chanin NodeInfo] -> Chanout NodeInfo
     -> Enrolled PhasedBarrier Phase -> CHP ()
node start neighbourChans out bar = node' start
  where
    node' cur
      = do Discover <- syncBarrier bar
           (_, neighbourPos) <- furtherEnroll bar $ \bar2 ->
              giveOutPosUntilBar cur <||> do pos <- readAll neighbourChans
                                             Act <- syncBarrier bar2
                                             return pos
           node' (updatePos neighbourPos cur)
       
    giveOutPosUntilBar cur = (writeChannel out cur >> giveOutPosUntilBar cur)
                               <-> do Act <- syncBarrier bar
                                      return ()

The sender is the giveOutPosUntilBar process, and the receiver is on the right-hand side of the parallel. By making explicit the phase that we expect to begin with each barrier synchronisation, we both make our code clear (you can see which part is in the discover phase, and which part is in the act phase) and also effectively assert correctness; if the pattern-match fails, your code will produce an error.

Updating the position of the node based on its neighbours is all pure code. This is not a very sophisticated algorithm, but it will suffice for the purposes of illustration:

    updatePos poss cur = cur + (0.05 * average
      [let v = p - cur in v - ideal * normalise v | p <- poss])
      where
        ideal = 0.3
        normalise (NodeInfo x y) = NodeInfo (x / mag) (y / mag)
          where
            mag = sqrt (x*x + y*y)
        average xs = sum xs / fromIntegral (length xs)

The draw process is mainly irrelevant OpenGL logic (adapted from my boids example), but the interesting part is that it must act in the discover phase, partly because that’s the only time that the nodes will send their position, and partly because it’s actually the drawing that drives the frame-rate (a pull-based architecture).

drawProcess :: [Shared Chanin NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP ()
drawProcess input bar
 = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar
                                  xs <- readAll input
                                  liftIO $ do startFrame
                                              mapM_ draw xs
                                              mapM_ (drawEdge xs) edges
                                              endFrame
      liftIO (do setup
                 displayCallback $= glRunAs2D displayIO
                 let addTimer = addTimerCallback 500 timer
                     timer = addTimer >> postRedisplay Nothing
                 addTimer
                 mainLoop)
  where
    setup = do initialWindowSize $= Size 500 500
               getArgsAndInitialize
               initialDisplayMode $= [DoubleBuffered]
               createWindow "CHP Graph"

    startFrame = do clearColor $= Color4 0 0 0 0
                    clear [ColorBuffer, DepthBuffer]

    endFrame = do flush
                  swapBuffers

glRunAs2D :: IO () -> IO ()
glRunAs2D draw = do
  (matrixMode $= Modelview 0) >> loadIdentity
  (matrixMode $= Projection) >> loadIdentity
  ortho 0 1 0 1 (-1000) 1000
  preservingMatrix draw

draw :: NodeInfo -> IO ()
draw (NodeInfo x y) = renderPrimitive Polygon $ sequence_
  [ vertex $ Vertex2 (x + 0.05 * cos t) (y + 0.05 * sin t)
  | t <- map ((pi/10)*) [0..19]]

drawEdge :: [NodeInfo] -> (Int, Int) -> IO ()
drawEdge nodes (s, e) = renderPrimitive Lines $
  vertex (Vertex2 x1 y1) >> vertex (Vertex2 x2 y2)
  where
    (NodeInfo x1 y1, NodeInfo x2 y2) = (nodes !! s, nodes !! e)

Finally, we must initialise the nodes and wire up the simulation. For our barrier, we will use the enrollAll_ function that takes a barrier-creation function, a list of processes that take an enrolled barrier as a parameter, and runs them all in parallel with their own enrolled barrier ends (discarding the output). Crucially, enrollAll does the enrolling before any of the processes have begun. If you run your processes in parallel and get them to enroll themselves, you will create a race hazard in your program: one process might enroll and start synchronising by itself before the other processes have started executing. This is almost certainly not what you want. So here is the code:

startNodes :: [NodeInfo]
startNodes = [NodeInfo 0 0, NodeInfo 0 1, NodeInfo 1 0, NodeInfo 1 1]

edges :: [(Int, Int)]
edges = [(0,1), (1,2), (2,0), (1, 3)]

main :: IO ()
main = runCHP_ $
       do outChans <- replicateM numNodes oneToAnyChannel
          enrollAll_ (newPhasedBarrier Act)
           (drawProcess (readers outChans) :
            [ let edgesOut = filter ((== i) . fst) edges
                  edgesIn = filter ((== i) . snd) edges
                  connectedNodes = map fst edgesIn ++ map snd edgesOut
              in node n (readers (map (outChans !!) connectedNodes)) (writer c)
            | (n, c, i) <- zip3 startNodes outChans [0..]])
  where
    numNodes = length startNodes

The list comprehension uses the edges list to pick out all the right channels for each node (i.e. it translates the connectivity expressed in the edges list into the channel topology). The code in this post forms a complete program. It is not completely effective as I have not added repulsion among non-connected nodes (an exercise for the reader perhaps), but here is a quick screenshot of the result:

My intention with this example was to illustrate the use of shared channels, and particularly barriers. The pattern shown here, of dividing simulations into phases, is one of their most common uses but they can be used elsewhere, sometimes in place of channels; from a more abstract perspective, channels in CHP offer synchronisation and communication, whereas barriers offer purely synchronisation. A one-to-one channel carrying the unit type is semantically equivalent to a two-party barrier with no phase information. The channel has the benefit of not needing explicit enrollment, but the disadvantage of being asymmetric in its use. For example, picking up and putting down forks in the dining philosophers example can be implemented using either two-party barriers or channels carrying the unit type.

Note: As often with my recent posts, writing them revealed that I lacked certain useful helper functions, so you will need the new CHP 1.7.0 (which also includes other changes I will discuss in future) for the above code.

The Operators and Monoids of CHP

November 20, 2009 7 comments

When we create binary operators, in mathematics or in programming, they often have certain common identifiable properties:

  • If you can re-order the arguments, e.g. 1 + 2 is the same as 2 + 1, we say that it is commutative — in contrast, division is not commutative.
  • If you have two applications of the operator and the order of evaluation/bracketing doesn’t matter, e.g. (1 + 2) + 3 is the same as 1 + (2 + 3), we say that it is associative — in contrast, subtraction is not associative.
  • If one particular operand always leaves the other side unchanged, we can say that this is the unit of an operator, e.g. 1 * x is the same as x, so 1 is the unit of multiplication.
  • If one particular operand always ignores/overrides the other, we can say that this is the zero of an operator, e.g. 0 * x is the same as 0, so 0 is the zero of multiplication.
  • If an operator has a unit or zero that only works on one side of the operator, we name it accordingly. For example, we say that division has a right-unit of 1 (because x / 1 is the same as x), but it does not have a left-unit; there is no value k such that for all x, k / x is the same as x.

We can find these properties all over maths and programming. Set union is commutative, associative, and has a unit of the empty set, but no zero. The boolean AND operator is commutative, associative, has the unit “true” and the zero “false”. STM’s orElse combinator is associative, with the unit retry, and the left-zero of a return statement. Any operator that is associative and has a unit forms a monoid, which can be put into Haskell as an instance of the Monoid type-class (more on that below).

The operators in CHP also have some of the aforementioned properties. A full list is buried at the back of the tutorial, but I should probably pull them into the API documentation. (Note that the laws I discuss here are concerned with the behavioural semantics of the operators; the types of the expressions may differ trivially.) The parallel operator <||> is commutative and associative, with a unit of skip, the process that does nothing and returns successfully. The unprioritised choice operator <-> is commutative and associative, with a unit of stop, the process that is never ready in a choice. The implication of choice and parallelism being associative and commutative is that the order of the items in a call to alt or runParallel doesn’t make any difference to the behaviour. The operators for wiring up a pipeline in the Utils module are associative but lack the other properties.

Poison Handler Properties

We can view the poison handlers `onPoisonTrap` and `onPoisonRethrow` as binary operators. To recap: `onPoisonTrap` runs the left-hand side, but if a poison exception occurs then the right-hand side is run. `onPoisonRethrow` does the same, but after the right-hand side has finished, the poison exception is rethrown. They are not commutative — in exception terminology, the first argument is the try and the second the catch; they cannot be swapped freely!

To my surprise, `onPoisonTrap` is associative. Abbreviating it to `oPT`, consider p `oPT` q `oPT` r. If you bracket the first two items, (p `oPT` q) `oPT` r, q will only execute if p throws poison, and r will only execute if q then throws poison (because p’s poison is trapped, so the only poison that can escape the first bracket is from q). If you bracket the latter two, p `oPT` (q `oPT` r), the brackets will only execute if p throws poison, which will pass control to q, which will only pass control to r if poison is thrown by q. So the semantics are associative.

In contrast, `onPoisonRethrow` is not associative. Abbreviating it to `oPR`, consider: p `oPR` skip `oPR` r. If bracketed (p `oPR` skip) `oPR` r, r will be executed if p poisons, but if bracketed p `oPR` (skip `oPR` r), r will never be executed (because skip won’t throw poison).

`onPoisonTrap` has a left-unit of throwPoison (because throwing poison automatically transfers control to the other side, the handler), and a right-unit of throwPoison (because trapping poison then throwing poison has a null effect on the original code). `onPoisonRethrow` has no left-unit but has two right-units: throwPoison and the return statement. Any code that cannot throw poison (e.g. a return statement) is a left-zero of both `onPoisonTrap` and `onPoisonRethrow` because it will never trigger the handler. Neither operator has a right-zero; there is no handler that can cause the original code to always be ignored.

Monoids

The fact that some of the operators mentioned here are associative and have units mean that they could form a monoid. In fact, CHP blocks of code could form several monoids. In Haskell, there is the problem that the monoid instance must be uniquely identified by its type, even though it is really its operator that is distinctive. All the standard number types can form a Monoid in addition (unit: 0, operator: +) or multiplication (unit: 1, operator: *). Defining a Monoid instance for, say, Int would thus be ambigious: when you say 4 `mappend` 3, would you expect 7 or 12? To solve this, the Data.Monoid module defines newtype-wrappers around types to identify the monoid. Sum Int is a monoid in addition, whereas Product Int is a monoid in multiplication.

I could use the same trick for CHP; I could define several monoid instances. Here is a monoid that allows blocks of code (with no useful return) to be joined in parallel:

newtype Par = Par {runPar :: CHP ()}

instance Monoid Par where
  mempty = Par skip
  mappend p q = Par (runPar p <|*|> runPar q)
  mconcat = Par . runParallel_ . map runPar

This could be made a little more useful by making a parallel monoid out of blocks of code that return a type that is itself a monoid; when the parallel blocks of code have all finished, their results are combined using the monoid instance:

newtype ParMonoid a = ParMonoid {runParMonoid :: CHP a}

instance Monoid a => Monoid (ParMonoid a) where
  mempty = ParMonoid (return mempty)
  mappend p q = ParMonoid
    (liftM (uncurry mappend) $ runParMonoid p <||> runParMonoid q)
  mconcat = ParMonoid . liftM mconcat . runParallel . map runParMonoid

There is also a straightforward monoid instance for choice between blocks:

newtype Alt a = Alt {runAlt :: CHP a}

instance Monoid (Alt a) where
  mempty = Alt stop
  mappend a b = Alt (runAlt a <-> runAlt b)
  mconcat = Alt . alt . map runAlt

Finally, there is a monoid built around `onPoisonTrap`:

newtype PoisonTrap a = PoisonTrap {runPoisonTrap :: CHP a}

instance Monoid (PoisonTrap a) where
  mempty = PoisonTrap throwPoison
  mappend a b = PoisonTrap (runPoisonTrap a `onPoisonTrap` runPoisonTrap b)

Consider the meaning of mconcat (map PoisonTrap [p,q,r,s]). It says run p; if no poison is thrown, that’s done. If poison is thrown, run q. If q throws poison, run r, and if that throws a poison, run s. Obviously this is quite excessive, but I had never thought of constructing such a function until I realised that `onPoisonTrap` was associative and thus could form a monoid.

I can’t recall seeing monoid instances like these (involving monadic actions), so perhaps these sorts of monoid instances on monads don’t end up being very useful (if you know of a particular use, please add a comment below). I find it interesting to see how CHP code can form several different monoids just as an exercise.

Categories: Uncategorized Tags: ,
Follow

Get every new post delivered to your Inbox.