Archive

Posts Tagged ‘barriers’

Sticky Platelets in a Pipeline (Part 2)

December 23, 2009 1 comment

In my last post I showed the infrastructure and logic of a simple simulation of some sticky blood platelets moving through a blood vessel. I added the video to the end of that post; in this post I will explain how the visualisation was implemented.

In most CHP simulations there is a barrier (typically named tick) that is used to keep all the simulation entities synchronised, so that they all proceed through the discrete time-steps together. When it comes to visualising simulations, we could add a separate channel through which to report the status of all the simulation entities to a central drawing process, but this feels like a waste when all the processes are already synchronising together. CHP has reduce channels (many-to-one) that are the opposite of (one-to-many) broadcast channels; in a reduce channel, all the writers send in their data to a single reader, and all must send together before the communication is complete. So a simple way to achieve visualisation is to transform the tick barrier into a reduce channel, which has the same synchronisation behaviour, but that also allows data to be sent to some drawing process.

The simulation entities don’t need to know the details of all this, so we hide it using a send action that I’ve called a tick action:

type TickAction = SendAction (Maybe (Platelet, Bool))

data Platelet = Platelet GLfloat

Note that I’ve also had to tweak the type in the platelet from last time to make working with the OpenGL library a bit easier. The only change to the site process (and plateletGenerator and plateletPrinter processes) is related to the change in the tick event to being an action. Here is the site process in full:

site :: Chanin (Maybe Platelet) -> Chanout (Maybe Platelet)
     -> TickAction -> CHP ()
site prevIn nextOut act = foreverFeed (maybe empty full) Nothing
  where
    tickWith = sendAction act

    empty = join . fst <$> offer
              (            (once $ readChannel prevIn)
               `alongside` (once $ writeChannel nextOut Nothing)
               `alongside` (endWhen $ tickWith Nothing)
              )

    full platelet
      = do r <- liftIO $ randomRIO (0, (1::Double))
           let moving = r >= 0.05
               tick = tickWith (Just (platelet, moving))
               mv = readChannel prevIn <&> writeChannel nextOut (Just platelet)
               probablyMove = if moving then fst <$> mv else stop
               
           fromMaybe (Just platelet) . fst <$>
             offer ((once probablyMove) `alongside` endWhen tick)

Synchronising on the tick barrier has become engaging in the tick event, where we must pass in that optional pair Maybe (Platelet, Bool); the first item is the platelet currently occupying the site, and the boolean indicates whether the platelet was willing to move on this time-step. If the site is empty, the Nothing value is sent.

The status values end up passed to the draw function, which is fairly boring (it draws a vertical bar) — we are only interested in its type:

data ScreenLocation = ScreenLocation Int
  deriving (Eq, Ord)

draw :: ScreenLocation -> Maybe (Platelet, Bool) -> IO ()

This is called from the drawProcess:

drawProcess :: ReduceChanin (Map.Map ScreenLocation (Maybe (Platelet, Bool)))
            -> CHP ()
drawProcess input
 = do displayIO <- embedCHP_ $
        do x <- readChannel input
           liftIO $ do startFrame
                       mapM_ (uncurry draw) (Map.toList x)
                       endFrame

      ...

Note that drawProcess takes the input end of a reduce channel which carries a map from ScreenLocation (a one-dimensional location in our simple example) to the Maybe (Platelet, Bool) values we saw. Reduce channels in CHP must carry a monoid type, because the monoid instance is used to join all the many values from the writers into a single value (using mappend/mconcat, but in a non-deterministic order — so make sure the monoid is commutative).

The Map type (from Data.Map) is a monoid that has union as its mappend operation. This is exactly what we want; each site process will send in a singleton Map with their specific screen location mapped to their current status, and using the monoid instance these maps will all be joined (quite safely, since each sender will have a different location, and hence a distinct key entry in its Map) into one big map, that can then be fed into the draw function as we saw above.

We don’t trouble the site process with knowing its location; instead, we wrap up the location in the send action. It is easy to construct send actions that apply a function to a value before it is sent, so we apply a function that takes a status value, and turns it into the singleton Map just discussed. This is all done as part of the wiring up of the process network, based on the version from last time:

main :: IO ()
main = runCHP_ $
       do status <- manyToOneChannel
          pipelineConnectCompleteT
            (enrollAll (return $ writer status) . zipWith withSend locationList)
            plateletGenerator (replicate numSites site) plateletPrinter
           <|*|> drawProcess (reader status)
  where
    numSites = screenSize - 2
    locationList = map ScreenLocation [0..(screenSize-1)]
    withSend k p c = p $ makeSendAction' c (Map.singleton k)

The withSend function does the wrapping of the modification function with the send action. Each site is given a location from the list, including the generator and printer processes; otherwise this function is the same as the version from part 1.

I’ve omitted the OpenGL code, which is much the same as my previous examples. But here, again, is the video showing the results of the visualisation:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

Turning the tick barrier into a reduce channel is often an easy way to visualise a simulation, and doesn’t require too much change to the code. As I said last time, the video is nothing compared to the TUNA videos which are very impressive, and some of which were generated by distributing the code over a cluster — a topic I hope to come to in CHP at some point in the future.

Sticky Platelets in a Pipeline (Part 1)

December 21, 2009 2 comments

This post brings together concepts from several recent posts, including behaviours, conjunction and channel wiring. It is based on the work in the completed TUNA project, which has some stunning videos of blood clotting (and some images if you prefer):

I’m going to cut down the TUNA example greatly, to blog-post size. What we are going to be simulating is sticky objects in a pipeline, for example sticky blood platelets moving through a blood vessel. Our platelets will be represented as data that is passed between active “site” processes; our blood vessel will be a one-dimensional pipeline of site processes. Each process will be connected to its neighbours with a channel that can pass platelets. All the processes will be enrolled on the tick barrier, as is customary in CHP simulations.

We’ll begin the code with a helper function (one that I would like to see in the standard library) that iterates forever with a state value being passed through, and our platelet data type:

foreverFeed :: Monad m => (a -> m a) -> a -> m b
foreverFeed f x = f x >>= foreverFeed f

data Platelet = Platelet Float

The data in the platelet type is a colour value that I will use for visualisation in the next part of this guide. This is set randomly by the platelet generator at the beginning of the pipeline:

plateletGenerator :: Chanout (Maybe Platelet)
                  -> EnrolledBarrier
                  -> CHP ()
plateletGenerator out tick = forever $ on >> off >> off
  where
    on = do platelet <- Just . Platelet <$> liftIO (randomRIO (0, 0.5))
            offerAll [ once (writeChannel out platelet)
                     , endWhen (syncBarrier tick) ]
    off = offerAll [once (writeChannel out Nothing), endWhen (syncBarrier tick)]

The pipeline generates platelets regularly, one every three time-steps (this is coded as the simple on-off-off sequence). When it is performing an “on” time-step, it generates a platelet with a random shade, then uses behaviours to offer to once send the platelet until tick happens (i.e. the frame is over). The next site in the pipeline may not take the new platelet if the site is full and not moving this time-step, so the platelet may get discarded in that case. In the off state, the generator waits for the tick to end the frame, but also offers to tell the site ahead of it that the generator is empty (signified by sending Nothing rather than Just a platelet).

The main logic is in the site process, which also has two states, empty and full:

site :: Chanin (Maybe Platelet)
     -> Chanout (Maybe Platelet)
     -> EnrolledBarrier
     -> CHP ()
site prevIn nextOut tick = foreverFeed (maybe empty full) Nothing
  where
    empty :: CHP (Maybe Platelet)
    full :: Platelet -> CHP (Maybe Platelet)

Each time, if there is Just a platelet returned by the function, the next state will be full, otherwise it will be empty. The initial state is empty (the Nothing value). The empty state consists of three possible behaviours:

    empty = join . fst <$> offer
              (            (once $ readChannel prevIn)
               `alongside` (once $ writeChannel nextOut Nothing)
               `alongside` (endWhen $ syncBarrier tick)
              )

In an empty state, a site will read in a new platelet from the previous site in the pipeline (if available), it will offer to communicate to the next site in the pipeline that it is empty, and it will finish this behaviour when the tick event happens. The value returned is the result of reading from the channel, which will be Nothing if no read occurred or if we read in a Nothing value (and hence the site remains empty) or Just the result of the read if it did happen and was a platelet (in which case the site will become full). It is possible to reduce the amount of communications happening with empty processes, but I want to keep this example simple if I can.

The full state is as follows:

    full platelet
      = do r <- liftIO $ randomRIO (0, (1::Double))
           let
             move = readChannel prevIn <&> writeChannel nextOut (Just platelet)
             probablyMove = if r < 0.05 then stop else fst <$> move

           fromMaybe (Just platelet) . fst <$>
             (offer $
               once probablyMove
               `alongside` endWhen (syncBarrier tick)
             )

We will pick this code apart, bit by bit. It is primarily an offer between the tick to end the frame and another behaviour, called probablyMove. When the site is full, it has a 5% chance of refusing to do anything, meaning that a single platelet will not move in 5% of time-steps. So it starts by generating a random number between 0 and 1. If this is under 0.05 (i.e. a 5% chance), the probablyMove behaviour is stop, meaning it will not move — the site will just wait for the end of the frame in these 5% of cases.

In the other 95% of the time-steps, a move is offered, using conjunction. The site offers to read a value from the previous channel (which may be Just a platelet, or a Nothing value signifying the site was empty) and send on its current platelet, shuffling the platelets along the pipeline. So its overall behaviour is that it will send on its current platelet, if and only if: the previous site is empty, or the previous site is full and willing to send its platelet on (it won’t be willing 5% of the time). So a platelet can only move if there is no-one behind it, or if the platelet behind it moves too.

The implications of this behaviour are that once platelets are in adjoining cells, they only move on together. Thus any platelets that bump together form a notional clot that stays together forever after. This clot is not explicitly programmed and has no representation in the program. It is emergent behaviour that arises out of the local rules of the site process; each site only communicates with the site either side of it, and yet the program logic means that clots that are tens of platelets long could form, and would be unbreakable.

The other neat thing that arises out of the logic comes from the 5% chance. In 5% of time-steps a platelet will not move. (This is what allows the platelets to bump together in the first place.) Since a clot can only move when all its platelets move, a two-platelet clot has a roughly 10% chance of not moving (really: 1 – 0.95^2), and a three-platelet clot has about a 14% chance of not moving (1 – 0.95^3). So big clots will move slower, which means that the platelets behind become more likely to join on. Despite only having a local probability of not moving, we get the behaviour that larger clots are less likely to be able to move.

Enough on the site process; at the end of the pipeline is a platelet printer, that swallows up any platelets and prints out how large each clot was that reached the end of the pipeline:

plateletPrinter :: Chanin (Maybe Platelet) -> EnrolledBarrier -> CHP ()
plateletPrinter input tick
  = foreverFeed plateletPrinter' []
  where
    plateletPrinter' ps
      = do mp <- join . fst <$> offer (once (readChannel input)
                                       `alongside` endWhen (syncBarrier tick))
           let ps' = ps ++ [mp]
               (test, text) = maybe (isNothing, "Blank")
                                    (const (isJust, "Clot"))
                                    (head ps')
               (chunk, rest) = span test ps'
           if null chunk || null rest
             then return ps'
             else do let status = text ++ ", size: " ++ show (length chunk)
                     liftIO $ putStrLn status
                     return rest

And finally we must wire up all of this. Thankfully, our new connectable helper functions make this quite easy, and short:

main :: IO ()
main = runCHP_ $
         pipelineConnectCompleteT (enrollAll newBarrier)
           plateletGenerator (replicate numSites site) plateletPrinter
  where
    numSites = 100

If we compile and run this program, we get a print-out of clot sizes:

Blank, size: 103
Clot, size: 1
Blank, size: 51
Clot, size: 4
Blank, size: 2
Clot, size: 1

That is terribly unexciting, so I’m going to give a sneak video preview of a visualisation that I will go through in my next post. The 1D pipeline of sites is visualised left-to-right, with each tall bar being a platelet (or black for empty). When the bar flashes white, this is in the 5% of cases where the platelet is refusing to move. Hence you can see that when the larger clots form, the white flashes of the various platelets prevent the clot from moving:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

Solving the Santa Claus Problem with Conjunction

December 10, 2009 1 comment

The Santa Claus problem is a concurrency problem that, like the Dining Philosophers problem, can be used to demonstrate your particular concurrency framework of choice. December seems like the right time of year to post a CHP solution that I hacked up at the CPA 2008 conference last year. It’s short but rather primitive. Here’s the statement of the problem, via Simon Peyton Jones (who made an STM solution in a chapter of Beautiful Code):

Santa repeatedly sleeps until wakened by either all of his nine reindeer, back from their holidays, or by a group of three of his ten elves. If awakened by the reindeer, he harnesses each of them to his sleigh, delivers toys with them and finally unharnesses them (allowing them to go off on holiday). If awakened by a group of elves, he shows each of the group into his study, consults with them on toy R&D and finally shows them each out (allowing them to go back to work). Santa should give priority to the reindeer in the case that there is both a group of elves and a group of reindeer waiting.

The particularly tricky aspects of this problem are:

  1. Santa must make a choice between the reindeer and elves.
  2. Santa must give priority to the reindeer.
  3. The elves must come in groups of exactly three, even though there are ten of them.

Choice is easy in CHP, so the first item is not a problem. We saw in the last post how to emulate priority, so we can use that to solve the second point. The third item is difficult. Our barriers require all enrolled participants to synchronise, so we cannot use one barrier for all ten elves. We could introduce some intermediary agents to coordinate the elves, but that is a bit long-winded. Instead we can take a slightly brute-force approach combined with a CHP feature called conjunction.

Conjunction

The principle behind conjunction is straightforward. Usually you offer a choice such as read from channel c or read from channel d: readChannel c <-> readChannel d. This waits for the first of the two events, and executes it (it can never execute both options). A conjunction is when you want to read from channel c and read from channel d: readChannel c <&> readChannel d. Crucially, this will only read from both when both are available. If c is not ready, it will not read from d, and vice versa. It is both or none: an atomic item, if you like. Conjunction also has a list form, every.

One example of where this is useful is the aforementioned Dining Philosophers problem: a philosopher wishing to pick up his forks should execute syncBarrier leftFork <&> syncBarrier rightFork. That way he will either pick up both his forks or neither, thus eliminating the deadlock wherein each philosopher ends up holding one fork.

Conjunction Specifics

It is valid to offer conjunction as part of a choice; for example, (readChannel c <&> readChannel d) <-> readChannel e will wait to read from channels (c AND d) OR e. These choices can be overlapping, e.g. (readChannel c <&> readChannel d) <-> (readChannel d <&>readChannel e) waits for (c AND d) OR (d AND e).

The choices should be in disjunctive normal form (DNF): OR on the outside, AND in the bracket, so c AND (d OR e) is not valid. But AND does distribute over OR here, so this is equivalent to the DNF: (c AND d) or (c AND e). (The operators do not distribute in the opposite direction, so c OR (d AND e) is not equivalent to (c OR d) AND (c OR e), because the latter allows c and e to both happen, whereas the former does not.)

Conjunction and Barriers

Importantly for today’s post, there is a correspondence between barriers (N participants, all N must synchronise together) and a conjunction of two-party barriers. Let’s imagine for a moment that Santa wanted to meet with all ten of his elves. We could create a single barrier, enroll Santa and the ten elves, and get them all to synchronise on the barrier. The barrier would only complete when all eleven participants synchronised. But alternatively, we could create ten two-party barriers. We would enroll each elf on a different barrier, and enroll Santa on all of them. When they want to meet, the elves would all synchronise on their barrier (meaning their code is unchanged), while Santa would wait for the conjunction of all ten barriers. Each barrier would only complete when all of them complete, because of Santa waiting for the conjunfction of all of them, so we have the same semantics as we had when we had one barrier. These ten barriers would be dynamically fused into one barrier.

The Solution

Santa does not want to wait for all ten elves; he only wants to wait for three. We can implement this with a little brute-force. Labelling the elves A through J, we can make Santa wait for (A AND B AND C) OR (A AND B AND D) OR… (H AND I AND J). That’s 120 possibilities. Not scalable, but the problem said ten elves so that’s all we have to manage.

Let’s finally get to some code. A reindeer waits for a random time, then synchronises on its barrier (i.e. tries to meet with Santa). An elf waits for a random time, then synchronises on its barrier (i.e. tries to meet with Santa). So, perhaps unexpectedly, a reindeer has exactly the same code as an elf. (This is also the case in Peyton Jones’ solution). Here it is:

syncDelay :: EnrolledBarrier -> CHP ()
syncDelay b = forever (randomDelay >> syncBarrier b)
  where
    randomDelay = (liftIO $ randomRIO (500000, 1000000)) >>= waitFor

reindeer :: EnrolledBarrier -> CHP ()
reindeer = syncDelay

elf :: EnrolledBarrier -> CHP ()
elf = syncDelay

Our next job is to write the Santa process. Santa will take one barrier for the reindeer, and multiple barriers for the elves (one each). He will loop forever, first polling the reindeer (to give them priority) and otherwise choosing between the reindeer and the elves:

santa :: [EnrolledBarrier] -> EnrolledBarrier -> CHP ()
santa elfBars reindeerBar
  = forever (deliverToys </> (skip >> (deliverToys <-> meetThreeElves)))
  where

Now we need to define the helper processes. Delivering toys is straightforward; we just synchronise with the reindeer. The rest of the code deals with picking all groups of three elves, and making a choice between synchronising with each group:

    deliverToys = syncBarrier reindeerBar

    meetThreeElves = alt [meetGroup g | g <- allGroupsThreeElves]
    meetGroup bars = every_ [syncBarrier bar | bar <- bars]

    allGroupsThreeElves = allNFrom 3 elfBars
    allNFrom n = filter ((== n) . length) . filterM (const [True, False])

The allNFrom is not important here, so I’ve used a concise (but probably inefficient) definition. Now that we have santa, our elves and our reindeer, all that remains is to put them together. To do this we use two wiring helper functions, enrollAll (that enrolls all of the processes on the given barrier) and enrollOneMany (that enrolls one process on all the barriers, and each of the other processes on one):

main :: IO ()
main = runCHP_VCRTraceAndPrint $
  enrollOneMany
    (\elfBars ->
        enrollAll (newBarrierWithLabel "reindeer")
                  (santa elfBars : replicate 9 reindeer)
    )
    [(newBarrierWithLabel ("elf" ++ show n), elf) | n <- [0..9]]

That’s it. Part of the reason that this code is quite short is that I’ve omitted all the print statements detailing what is going on (for example in Peyton Jones’ version). These print statements were only there to observe the behaviour of the computation, and we can do that with CHP’s built-in traces mechanism, just by using the runCHP_VCRTraceAndPrint function.

View-Centric Reasoning (VCR) traces, developed by Marc L. Smith and subsequently tweaked a little bit by me, display an ordered list of multisets, where each multiset holds independent events (events that did not have a sequential dependency on each other). This style makes it very easy to see from the output that our Santa Claus solution has the right sort of behaviour, viz:

< {elf3, elf4, elf5}, {elf0, elf6, elf7}, {reindeer}, {elf1, elf8, elf9}, {elf2, elf4, elf7}, {elf0, elf1, elf3}, {elf5, elf8, elf9}, {reindeer}, {elf3, elf6, elf7}, {elf0, elf1, elf9}, {elf2, elf4, elf8}, {reindeer}, {elf3, elf5, elf6}, {elf0, elf4, elf9}, {elf1, elf2, elf7}, {elf5, elf6, elf8}, {reindeer}, {elf0, elf1, elf3}, {elf4, elf7, elf9}, {elf2, elf5, elf8}, {elf0, elf1, elf6}, {reindeer}, {elf3, elf7, elf9}, {elf0, elf1, elf4}, {elf2, elf6, elf8}, {elf5, elf7, elf9}, {elf0, elf3, elf4}, {reindeer}, {elf1, elf2, elf6}, {elf5, elf7, elf8}, ...


Concise Solution

For those who like a bit of code golf (finding the shortest version of a program), I came up with this concise version of the whole solution:

import Control.Concurrent.CHP
import Control.Concurrent.CHP.Traces
import Control.Monad
import Control.Monad.Trans
import System.Random

santa elves reindeer = forever $
  syncBarrier reindeer </> (alt $ map (every_ . map syncBarrier) groups)
  where groups = filter ((== 3) . length) $ filterM (const [True, False]) elves

main = runCHP_VCRTraceAndPrint $ enrollOneMany (enrollAll
  (newBarrierWithLabel "reindeer") . (: replicate 9 syncDelay) . santa)
  [(newBarrierWithLabel ("elf" ++ show n), syncDelay) | n <- [0..9]]
  where syncDelay = forever . (randomDelay >>) . syncBarrier
        randomDelay = (liftIO $ randomRIO (500000, 1000000)) >>= waitFor

Excluding the import statements, that’s a solution to the Santa Claus problem in eight lines of Haskell. Rather difficult-to-follow Haskell, but that’s usually the result of code golf.

Graph Layout with Software Transactional Memory and Barriers (plus video!)

November 26, 2009 1 comment

In my last post, I used barriers and shared channels to write a simple program for performing force-based graph layout. There were two things lacking in that example: firstly, repulsion between unconnected nodes and secondly, some cool videos. I’ve now solved both of these shortcomings.

In order to implement repulsion between all nodes, each node must know the position of all other nodes at every time-step. I could implement this with message-passing, but that would be O(N^2) communications each phase, which isn’t very efficient, and at that point, message-passing is probably not the right design. We need some shared variables instead. Software Transactional Memory (STM) provides a nice way to do shared variable concurrency in Haskell, so we will use that.

The idea is that we will have a transactional variable (TVar) per node, holding the node’s position. Every time-step, each node will read from all the TVars to discover the positions of all the other nodes, and will then update its own position based on the other nodes. If we are not careful, we will have the same race hazard that I warned against last time: one node could update its position before all the other nodes have read its previous position. Or, if left totally unconstrained, one node could perform several updates before other nodes have even performed one. To prevent these problems, we will again use barriers to break up the simulation into two phases: discover, and act. This means that we will be combining CHP and STM: using barriers from CHP to regulate access to shared variables from STM.

All the code before the node process is exactly the same as last time, except that I now import Control.Concurrent.STM and Control.Parallel.Strategies (for some strict application) — and I’ve made the nodes a bit smaller. Our node process no longer takes channels as its parameters, but TVars instead. It takes a list corresponding to the other nodes, of pairs of booleans (indicating if the other node is connected to this one) and transactional variables (to read the other node’s position). It also takes a single transactional variable, which it should use to write its own position to:

node :: NodeInfo -> [(Bool, TVar NodeInfo)] -> TVar NodeInfo
     -> Enrolled PhasedBarrier Phase -> CHP ()
node start neighbours me bar = node' start
  where
    (connected, tvs) = unzip neighbours
    
    node' cur
      = do Discover <- syncBarrier bar
           neighbourPos <- liftSTM (mapM readTVar tvs)
           let new = updatePos (zip connected neighbourPos) cur
           Act <- syncBarrier bar
           liftSTM (writeTVar me (new `using` rnf))
           node' new

You can see that the node process is a bit simpler. In the Discover phase it reads from the TVars and in the Act phase it writes to its own. It is immediately apparent that the reads and writes are thus segregated. liftSTM is defined below (it is essentially the atomically function), and the `using` rnf is a strictness thing that you can ignore. The updatePos function is now more complicated as we have added repulsion (though it is all irrelevant to the concurrent logic):

      
    updatePos poss cur
      = sum [cur
            ,0.05 * sum [ let v = p - cur in v - ideal * normalise v
                        | (True, p) <- poss] -- connected nodes
            ,0.000002 * sum [let sc = (p `from` cur) ^^ (-2)
                             in NodeInfo sc sc * normalise (cur - p)
                            | (_, p) <- poss] -- all nodes
            ]
      where
        ideal = 0.02
        (NodeInfo x2 y2) `from` (NodeInfo x1 y1)
          = sqrt $ (x2-x1)^^2 + (y2-y1)^^2
        normalise (NodeInfo x y) = fmapNodeInfo (/ mag) (NodeInfo x y)
          where
            mag = sqrt (x*x + y*y)

instance NFData GLfloat
instance NFData NodeInfo where
  rnf (NodeInfo a b) = rnf a >| rnf b

liftSTM :: MonadIO m => STM a -> m a
liftSTM = liftIO . atomically

I’ve also had to add an instance of NFData to use rnf (strict evaluation) on NodeInfo, and the simple liftSTM function for executing STM transactions in the CHP monad. The draw process only has one change; instead of reading from a list of channels, we now read from a list of transactional variables:

drawProcess :: [TVar NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP ()
drawProcess tvs bar
 = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar
                                  xs <- liftSTM $ mapM readTVar tvs
                                  liftIO $ do startFrame
                                              mapM_ draw xs
                                              mapM_ (drawEdge xs) edges
                                              endFrame
...

Last time, I only had four nodes in my example, which was fairly boring. This time I have one hundred nodes, connected in a 10×10 grid. To make the demo interesting, all the nodes start with random positions:

startNodes :: IO [NodeInfo]
startNodes = replicateM (10*10) $ liftM2 NodeInfo r r
  where
    r = liftM (fromRational . toRational) $ randomRIO (0 :: Float, 1)

edges :: [(Int, Int)]
edges = across ++ down
  where
    across = [(x+10*y, (x+1)+10*y) | x <- [0..8], y <- [0..9]]
    down = [(x+10*y, x+10*(y+1)) | x <- [0..9], y <- [0..8]]

Finally, our slightly adjusted main process:

main :: IO ()
main = runCHP_ $
       do nodes <- liftIO startNodes
          vars <- liftSTM $ mapM newTVar nodes
          enrollAll_ (newPhasedBarrier Act)
           (drawProcess vars :
            [ let edgesOut = filter ((== i) . fst) edges
                  edgesIn = filter ((== i) . snd) edges
                  connectedNodes = map fst edgesIn ++ map snd edgesOut
                  conn = [(ind `elem` connectedNodes, tv)
                         | (tv, ind) <- zip vars [0..], tv /= mytv]
              in node n conn mytv
            | (n, mytv, i) <- zip3 nodes vars [0..]
            ])

There are not many changes from my previous shared channel-based version to use STM instead. My continued use of barriers alongside STM shows that you can mix STM and CHP quite nicely if you want to. Now, here’s what you’ve been waiting for — a video of the graph layout in action, first on my grid graph as defined above:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

I also ran the algorithm on a simple loop graph:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

The two videos have the algorithm running at 10 iterations per second, as that seems like a nice speed to view them at (and apologies for the slight garbage down the right-hand side of the videos). The grid example was inspired by an original example in the Java version of the Prefuse toolkit. Prefuse is an incredibly cool visualisation toolkit; some Java examples can be found online but more impressive are the more recent ActionScript examples. Try the Layouts tab, and also the dependency graph example if you have a spare minute.

Categories: Uncategorized Tags: , , ,

Force-Directed Graph Layout with Barriers and Shared Channels

November 24, 2009 10 comments

A graph is a collection of nodes, and edges joining the nodes together. Automatically drawing them cleanly is an interesting problem. Some force-based graph layout algorithms view the edges between nodes as springs, and simulate the forces acting on each node in order to move the nodes into a good position. Connected nodes will pull towards each other until the edges are of an ideal length. We can implement such a layout algorithm in CHP, with a process per node.

To implement the algorithm, the nodes need to be able to find out the current positions of their neighbours (i.e. the nodes they are connected to) and update their own position accordingly. One approach would be to have a channel pair per edge, to enable sending of position information in both directions, as in this graph of three nodes:

I’m going to take an alternative approach of having one output channel per node, on which the node can send its position. The other end (i.e. the reading end) of these position channels will be shared, and these ends will be passed around to all the nodes that connect to the node with the output end. We can also give all the reading ends to a display process. So our wired up graph now looks like this:

A shared channel is represented by a small hollow circle. Each has one writer (the three nodes), and several readers (the connected nodes and the display process). Each iteration, our nodes will offer to send out their current position (as many times as they are asked for it) while also fetching the position of all their neighbours. Then they will all calculate a new position based on their neighbours and go again. One problem with this common discover-then-act design is that if you do not clearly separate the discovery of the neighbours’ positions and the updating of the positions, you can get nodes updating based on a mix of old positions (which is what you want) and the new updated positions — a race hazard. To prevent this, we divide each simulation step into two phases (discover and act) using a phased barrier.

A phased barrier is a synchronisation primitive. It allows processes to enroll on the barrier, to resign from the barrier, and to synchronise; processes only successfully synchronise on a barrier when all currently-enrolled processes synchronise. Each synchronisation, the phase of the barrier is moved on (and typically cycles around).

We will begin with some import statements, and declaring a NodeInfo type to hold the positions of nodes. We will also include a quick Num and Fractional instance for our NodeInfo that performs plus, minus, etc element-wise (NodeInfo 1 6 * NodeInfo 3 4 == NodeInfo 3 24):

import Control.Concurrent.CHP
import Control.Monad
import Control.Monad.Trans
import Graphics.Rendering.OpenGL
import Graphics.UI.GLUT hiding (alt)

data NodeInfo = NodeInfo GLfloat GLfloat deriving (Show, Eq)

instance Num NodeInfo where ...
instance Fractional NodeInfo where ...

Then we will declare our phase data type, and a helper function to read from a list of shared channels in parallel:

data Phase = Discover | Act deriving (Eq, Show, Bounded, Ord, Enum)

readAll :: [Shared Chanin a] -> CHP [a]
readAll = runParMapM (flip claim readChannel)

Next, we will define our node process. The main body of the node process first begins the discover phase. It then acts as a sender and receiver in parallel: the receiver reads in the positions of all its neighbours, while the sender continually offers to send out its position. It finishes both of these once the phase changes. To facilitate this, we must enroll on the barrier again, and use one barrier end in the sender and one in the receiver. (If we did not enroll a second time, and tried to use the same single barrier end twice in parallel, this would be a mis-use of the library.) So here is most of the node process:

node :: NodeInfo -> [Shared Chanin NodeInfo] -> Chanout NodeInfo
     -> Enrolled PhasedBarrier Phase -> CHP ()
node start neighbourChans out bar = node' start
  where
    node' cur
      = do Discover <- syncBarrier bar
           (_, neighbourPos) <- furtherEnroll bar $ \bar2 ->
              giveOutPosUntilBar cur <||> do pos <- readAll neighbourChans
                                             Act <- syncBarrier bar2
                                             return pos
           node' (updatePos neighbourPos cur)
       
    giveOutPosUntilBar cur = (writeChannel out cur >> giveOutPosUntilBar cur)
                               <-> do Act <- syncBarrier bar
                                      return ()

The sender is the giveOutPosUntilBar process, and the receiver is on the right-hand side of the parallel. By making explicit the phase that we expect to begin with each barrier synchronisation, we both make our code clear (you can see which part is in the discover phase, and which part is in the act phase) and also effectively assert correctness; if the pattern-match fails, your code will produce an error.

Updating the position of the node based on its neighbours is all pure code. This is not a very sophisticated algorithm, but it will suffice for the purposes of illustration:

    updatePos poss cur = cur + (0.05 * average
      [let v = p - cur in v - ideal * normalise v | p <- poss])
      where
        ideal = 0.3
        normalise (NodeInfo x y) = NodeInfo (x / mag) (y / mag)
          where
            mag = sqrt (x*x + y*y)
        average xs = sum xs / fromIntegral (length xs)

The draw process is mainly irrelevant OpenGL logic (adapted from my boids example), but the interesting part is that it must act in the discover phase, partly because that’s the only time that the nodes will send their position, and partly because it’s actually the drawing that drives the frame-rate (a pull-based architecture).

drawProcess :: [Shared Chanin NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP ()
drawProcess input bar
 = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar
                                  xs <- readAll input
                                  liftIO $ do startFrame
                                              mapM_ draw xs
                                              mapM_ (drawEdge xs) edges
                                              endFrame
      liftIO (do setup
                 displayCallback $= glRunAs2D displayIO
                 let addTimer = addTimerCallback 500 timer
                     timer = addTimer >> postRedisplay Nothing
                 addTimer
                 mainLoop)
  where
    setup = do initialWindowSize $= Size 500 500
               getArgsAndInitialize
               initialDisplayMode $= [DoubleBuffered]
               createWindow "CHP Graph"

    startFrame = do clearColor $= Color4 0 0 0 0
                    clear [ColorBuffer, DepthBuffer]

    endFrame = do flush
                  swapBuffers

glRunAs2D :: IO () -> IO ()
glRunAs2D draw = do
  (matrixMode $= Modelview 0) >> loadIdentity
  (matrixMode $= Projection) >> loadIdentity
  ortho 0 1 0 1 (-1000) 1000
  preservingMatrix draw

draw :: NodeInfo -> IO ()
draw (NodeInfo x y) = renderPrimitive Polygon $ sequence_
  [ vertex $ Vertex2 (x + 0.05 * cos t) (y + 0.05 * sin t)
  | t <- map ((pi/10)*) [0..19]]

drawEdge :: [NodeInfo] -> (Int, Int) -> IO ()
drawEdge nodes (s, e) = renderPrimitive Lines $
  vertex (Vertex2 x1 y1) >> vertex (Vertex2 x2 y2)
  where
    (NodeInfo x1 y1, NodeInfo x2 y2) = (nodes !! s, nodes !! e)

Finally, we must initialise the nodes and wire up the simulation. For our barrier, we will use the enrollAll_ function that takes a barrier-creation function, a list of processes that take an enrolled barrier as a parameter, and runs them all in parallel with their own enrolled barrier ends (discarding the output). Crucially, enrollAll does the enrolling before any of the processes have begun. If you run your processes in parallel and get them to enroll themselves, you will create a race hazard in your program: one process might enroll and start synchronising by itself before the other processes have started executing. This is almost certainly not what you want. So here is the code:

startNodes :: [NodeInfo]
startNodes = [NodeInfo 0 0, NodeInfo 0 1, NodeInfo 1 0, NodeInfo 1 1]

edges :: [(Int, Int)]
edges = [(0,1), (1,2), (2,0), (1, 3)]

main :: IO ()
main = runCHP_ $
       do outChans <- replicateM numNodes oneToAnyChannel
          enrollAll_ (newPhasedBarrier Act)
           (drawProcess (readers outChans) :
            [ let edgesOut = filter ((== i) . fst) edges
                  edgesIn = filter ((== i) . snd) edges
                  connectedNodes = map fst edgesIn ++ map snd edgesOut
              in node n (readers (map (outChans !!) connectedNodes)) (writer c)
            | (n, c, i) <- zip3 startNodes outChans [0..]])
  where
    numNodes = length startNodes

The list comprehension uses the edges list to pick out all the right channels for each node (i.e. it translates the connectivity expressed in the edges list into the channel topology). The code in this post forms a complete program. It is not completely effective as I have not added repulsion among non-connected nodes (an exercise for the reader perhaps), but here is a quick screenshot of the result:

My intention with this example was to illustrate the use of shared channels, and particularly barriers. The pattern shown here, of dividing simulations into phases, is one of their most common uses but they can be used elsewhere, sometimes in place of channels; from a more abstract perspective, channels in CHP offer synchronisation and communication, whereas barriers offer purely synchronisation. A one-to-one channel carrying the unit type is semantically equivalent to a two-party barrier with no phase information. The channel has the benefit of not needing explicit enrollment, but the disadvantage of being asymmetric in its use. For example, picking up and putting down forks in the dining philosophers example can be implemented using either two-party barriers or channels carrying the unit type.

Note: As often with my recent posts, writing them revealed that I lacked certain useful helper functions, so you will need the new CHP 1.7.0 (which also includes other changes I will discuss in future) for the above code.

Follow

Get every new post delivered to your Inbox.