Posts Tagged ‘sharedchannels’

Force-Directed Graph Layout with Barriers and Shared Channels

November 24, 2009 10 comments

A graph is a collection of nodes, and edges joining the nodes together. Automatically drawing them cleanly is an interesting problem. Some force-based graph layout algorithms view the edges between nodes as springs, and simulate the forces acting on each node in order to move the nodes into a good position. Connected nodes will pull towards each other until the edges are of an ideal length. We can implement such a layout algorithm in CHP, with a process per node.

To implement the algorithm, the nodes need to be able to find out the current positions of their neighbours (i.e. the nodes they are connected to) and update their own position accordingly. One approach would be to have a channel pair per edge, to enable sending of position information in both directions, as in this graph of three nodes:

I’m going to take an alternative approach of having one output channel per node, on which the node can send its position. The other end (i.e. the reading end) of these position channels will be shared, and these ends will be passed around to all the nodes that connect to the node with the output end. We can also give all the reading ends to a display process. So our wired up graph now looks like this:

A shared channel is represented by a small hollow circle. Each has one writer (the three nodes), and several readers (the connected nodes and the display process). Each iteration, our nodes will offer to send out their current position (as many times as they are asked for it) while also fetching the position of all their neighbours. Then they will all calculate a new position based on their neighbours and go again. One problem with this common discover-then-act design is that if you do not clearly separate the discovery of the neighbours’ positions and the updating of the positions, you can get nodes updating based on a mix of old positions (which is what you want) and the new updated positions — a race hazard. To prevent this, we divide each simulation step into two phases (discover and act) using a phased barrier.

A phased barrier is a synchronisation primitive. It allows processes to enroll on the barrier, to resign from the barrier, and to synchronise; processes only successfully synchronise on a barrier when all currently-enrolled processes synchronise. Each synchronisation, the phase of the barrier is moved on (and typically cycles around).

We will begin with some import statements, and declaring a NodeInfo type to hold the positions of nodes. We will also include a quick Num and Fractional instance for our NodeInfo that performs plus, minus, etc element-wise (NodeInfo 1 6 * NodeInfo 3 4 == NodeInfo 3 24):

import Control.Concurrent.CHP
import Control.Monad
import Control.Monad.Trans
import Graphics.Rendering.OpenGL
import Graphics.UI.GLUT hiding (alt)

data NodeInfo = NodeInfo GLfloat GLfloat deriving (Show, Eq)

instance Num NodeInfo where ...
instance Fractional NodeInfo where ...

Then we will declare our phase data type, and a helper function to read from a list of shared channels in parallel:

data Phase = Discover | Act deriving (Eq, Show, Bounded, Ord, Enum)

readAll :: [Shared Chanin a] -> CHP [a]
readAll = runParMapM (flip claim readChannel)

Next, we will define our node process. The main body of the node process first begins the discover phase. It then acts as a sender and receiver in parallel: the receiver reads in the positions of all its neighbours, while the sender continually offers to send out its position. It finishes both of these once the phase changes. To facilitate this, we must enroll on the barrier again, and use one barrier end in the sender and one in the receiver. (If we did not enroll a second time, and tried to use the same single barrier end twice in parallel, this would be a mis-use of the library.) So here is most of the node process:

node :: NodeInfo -> [Shared Chanin NodeInfo] -> Chanout NodeInfo
     -> Enrolled PhasedBarrier Phase -> CHP ()
node start neighbourChans out bar = node' start
    node' cur
      = do Discover <- syncBarrier bar
           (_, neighbourPos) <- furtherEnroll bar $ \bar2 ->
              giveOutPosUntilBar cur <||> do pos <- readAll neighbourChans
                                             Act <- syncBarrier bar2
                                             return pos
           node' (updatePos neighbourPos cur)
    giveOutPosUntilBar cur = (writeChannel out cur >> giveOutPosUntilBar cur)
                               <-> do Act <- syncBarrier bar
                                      return ()

The sender is the giveOutPosUntilBar process, and the receiver is on the right-hand side of the parallel. By making explicit the phase that we expect to begin with each barrier synchronisation, we both make our code clear (you can see which part is in the discover phase, and which part is in the act phase) and also effectively assert correctness; if the pattern-match fails, your code will produce an error.

Updating the position of the node based on its neighbours is all pure code. This is not a very sophisticated algorithm, but it will suffice for the purposes of illustration:

    updatePos poss cur = cur + (0.05 * average
      [let v = p - cur in v - ideal * normalise v | p <- poss])
        ideal = 0.3
        normalise (NodeInfo x y) = NodeInfo (x / mag) (y / mag)
            mag = sqrt (x*x + y*y)
        average xs = sum xs / fromIntegral (length xs)

The draw process is mainly irrelevant OpenGL logic (adapted from my boids example), but the interesting part is that it must act in the discover phase, partly because that’s the only time that the nodes will send their position, and partly because it’s actually the drawing that drives the frame-rate (a pull-based architecture).

drawProcess :: [Shared Chanin NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP ()
drawProcess input bar
 = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar
                                  xs <- readAll input
                                  liftIO $ do startFrame
                                              mapM_ draw xs
                                              mapM_ (drawEdge xs) edges
      liftIO (do setup
                 displayCallback $= glRunAs2D displayIO
                 let addTimer = addTimerCallback 500 timer
                     timer = addTimer >> postRedisplay Nothing
    setup = do initialWindowSize $= Size 500 500
               initialDisplayMode $= [DoubleBuffered]
               createWindow "CHP Graph"

    startFrame = do clearColor $= Color4 0 0 0 0
                    clear [ColorBuffer, DepthBuffer]

    endFrame = do flush

glRunAs2D :: IO () -> IO ()
glRunAs2D draw = do
  (matrixMode $= Modelview 0) >> loadIdentity
  (matrixMode $= Projection) >> loadIdentity
  ortho 0 1 0 1 (-1000) 1000
  preservingMatrix draw

draw :: NodeInfo -> IO ()
draw (NodeInfo x y) = renderPrimitive Polygon $ sequence_
  [ vertex $ Vertex2 (x + 0.05 * cos t) (y + 0.05 * sin t)
  | t <- map ((pi/10)*) [0..19]]

drawEdge :: [NodeInfo] -> (Int, Int) -> IO ()
drawEdge nodes (s, e) = renderPrimitive Lines $
  vertex (Vertex2 x1 y1) >> vertex (Vertex2 x2 y2)
    (NodeInfo x1 y1, NodeInfo x2 y2) = (nodes !! s, nodes !! e)

Finally, we must initialise the nodes and wire up the simulation. For our barrier, we will use the enrollAll_ function that takes a barrier-creation function, a list of processes that take an enrolled barrier as a parameter, and runs them all in parallel with their own enrolled barrier ends (discarding the output). Crucially, enrollAll does the enrolling before any of the processes have begun. If you run your processes in parallel and get them to enroll themselves, you will create a race hazard in your program: one process might enroll and start synchronising by itself before the other processes have started executing. This is almost certainly not what you want. So here is the code:

startNodes :: [NodeInfo]
startNodes = [NodeInfo 0 0, NodeInfo 0 1, NodeInfo 1 0, NodeInfo 1 1]

edges :: [(Int, Int)]
edges = [(0,1), (1,2), (2,0), (1, 3)]

main :: IO ()
main = runCHP_ $
       do outChans <- replicateM numNodes oneToAnyChannel
          enrollAll_ (newPhasedBarrier Act)
           (drawProcess (readers outChans) :
            [ let edgesOut = filter ((== i) . fst) edges
                  edgesIn = filter ((== i) . snd) edges
                  connectedNodes = map fst edgesIn ++ map snd edgesOut
              in node n (readers (map (outChans !!) connectedNodes)) (writer c)
            | (n, c, i) <- zip3 startNodes outChans [0..]])
    numNodes = length startNodes

The list comprehension uses the edges list to pick out all the right channels for each node (i.e. it translates the connectivity expressed in the edges list into the channel topology). The code in this post forms a complete program. It is not completely effective as I have not added repulsion among non-connected nodes (an exercise for the reader perhaps), but here is a quick screenshot of the result:

My intention with this example was to illustrate the use of shared channels, and particularly barriers. The pattern shown here, of dividing simulations into phases, is one of their most common uses but they can be used elsewhere, sometimes in place of channels; from a more abstract perspective, channels in CHP offer synchronisation and communication, whereas barriers offer purely synchronisation. A one-to-one channel carrying the unit type is semantically equivalent to a two-party barrier with no phase information. The channel has the benefit of not needing explicit enrollment, but the disadvantage of being asymmetric in its use. For example, picking up and putting down forks in the dining philosophers example can be implemented using either two-party barriers or channels carrying the unit type.

Note: As often with my recent posts, writing them revealed that I lacked certain useful helper functions, so you will need the new CHP 1.7.0 (which also includes other changes I will discuss in future) for the above code.

Terminal Concurrency: The Printing Process

October 6, 2009 1 comment

One simple way to see some of the problems that concurrency can cause is to try printing to the terminal concurrently. Try loading up GHCi, and typing import Control.Concurrent followed by:

mapM_ (forkIO . putStr . replicate 100) ['a', 'b']

This code spawns off two threads: one that prints 100 ‘a’s, and one that prints 100 ‘b’s. Your results will likely differ from mine, and the results will likely differ each time, but here’s a sample (with line breaks added for readability):


Technically, this is non-determinism: the observable behaviour of the program is different each time, even though there is no obvious randomness in our program. The theoretical implications can wait for another day — this is also a practical issue. If we have several concurrently executing processes wanting to write to stdout, how can we stop this messy interleaving from occurring? We want to get all 100 ‘a’s together, and all 100 ‘b’s together.

In CHP, the answer is the printer process. The printer process continually reads Strings from its input channel, and prints them:

printer :: Chanin String -> CHP ()
printer input = forever $ readChannel input >>= liftIO . putStr

We need to connect this printer process to the processes that want to do the printing. A naive attempt might be:

unsafePrinting :: CHP ()
unsafePrinting = do
  c <- oneToOneChannel
  printer (reader c)
    <|*|> writeChannel (writer c) (replicate 100 'a')
    <|*|> writeChannel (writer c) (replicate 100 'b')

This is unsafe, however. The reason is implied by the name oneToOneChannel:

CHP’s standard channels are designed for use by one writer and one reader at a time. Using the output twice in parallel as shown above will sometimes give a run-time error (sometimes you may get away with it, which can make locating the bug hard). The compiler will not stop you writing this code, as encoding the safe-usage restriction in the type system would be very cumbersome. Instead, you must use a shared channel.

We want to share the writing end, but do not need to share the reading end; thus we need an anyToOneChannel, allowing anyone (of multiple writers) to send to one recipient. With this channel, we must use a claim function to grab the shared writing end for ourselves — a form of mutual exclusion. Hence we can get 100 ‘a’s together and 100 ‘b’s together (in either order) by using:

safePrinting :: CHP ()
safePrinting = do
  c <- anyToOneChannel
  printer (reader c)
    <|*|> claim (writer c) (flip writeChannel $ replicate 100 'a')
    <|*|> claim (writer c) (flip writeChannel $ replicate 100 'b')

The claim function takes a shared channel end (e.g. Shared Chanout String), and a function that takes the claimed end (which has the same type as a normal end: Chanout String) and performs some block of code. The claim function claims the channel-end for the duration of the call; releasing is automatic at the end of the block. If you try to use a shared channel-end without claiming it, you’ll get a type error. You can perform multiple communications on the channel during the claim block — we could choose to write “a” to the channel 100 times rather than 100 ‘a’s at once, and the resulting output would still have the ‘a’s uninterrupted.

You would use this printing process in your real application by running it in parallel with the whole system (probably at the very top level), and passing out the shared channel-end to any process that may want to print a message. Each process will then have to claim the channel to send messages, and it is really this claiming process that serialises the messages. Only the printing process actually prints the messages on stdout, which has the effect of stopping any concurrent use of stdout (and hence stopping this garbledness). You could make the printing process print to stderr or to a logfile, if you want to use this method to keep track of errors/warnings/debug messages in your concurrent application.

Categories: Uncategorized Tags: