In my last post, I used barriers and shared channels to write a simple program for performing force-based graph layout. There were two things lacking in that example: firstly, repulsion between unconnected nodes and secondly, some cool videos. I’ve now solved both of these shortcomings.
In order to implement repulsion between all nodes, each node must know the position of all other nodes at every time-step. I could implement this with message-passing, but that would be O(N^2) communications each phase, which isn’t very efficient, and at that point, message-passing is probably not the right design. We need some shared variables instead. Software Transactional Memory (STM) provides a nice way to do shared variable concurrency in Haskell, so we will use that.
The idea is that we will have a transactional variable (TVar) per node, holding the node’s position. Every time-step, each node will read from all the TVars to discover the positions of all the other nodes, and will then update its own position based on the other nodes. If we are not careful, we will have the same race hazard that I warned against last time: one node could update its position before all the other nodes have read its previous position. Or, if left totally unconstrained, one node could perform several updates before other nodes have even performed one. To prevent these problems, we will again use barriers to break up the simulation into two phases: discover, and act. This means that we will be combining CHP and STM: using barriers from CHP to regulate access to shared variables from STM.
All the code before the node process is exactly the same as last time, except that I now import Control.Concurrent.STM and Control.Parallel.Strategies (for some strict application) — and I’ve made the nodes a bit smaller. Our node process no longer takes channels as its parameters, but TVars instead. It takes a list corresponding to the other nodes, of pairs of booleans (indicating if the other node is connected to this one) and transactional variables (to read the other node’s position). It also takes a single transactional variable, which it should use to write its own position to:
node :: NodeInfo -> [(Bool, TVar NodeInfo)] -> TVar NodeInfo -> Enrolled PhasedBarrier Phase -> CHP () node start neighbours me bar = node' start where (connected, tvs) = unzip neighbours node' cur = do Discover <- syncBarrier bar neighbourPos <- liftSTM (mapM readTVar tvs) let new = updatePos (zip connected neighbourPos) cur Act <- syncBarrier bar liftSTM (writeTVar me (new `using` rnf)) node' new
You can see that the node process is a bit simpler. In the Discover phase it reads from the TVars and in the Act phase it writes to its own. It is immediately apparent that the reads and writes are thus segregated. liftSTM is defined below (it is essentially the atomically function), and the `using` rnf is a strictness thing that you can ignore. The updatePos function is now more complicated as we have added repulsion (though it is all irrelevant to the concurrent logic):
updatePos poss cur = sum [cur ,0.05 * sum [ let v = p - cur in v - ideal * normalise v | (True, p) <- poss] -- connected nodes ,0.000002 * sum [let sc = (p `from` cur) ^^ (-2) in NodeInfo sc sc * normalise (cur - p) | (_, p) <- poss] -- all nodes ] where ideal = 0.02 (NodeInfo x2 y2) `from` (NodeInfo x1 y1) = sqrt $ (x2-x1)^^2 + (y2-y1)^^2 normalise (NodeInfo x y) = fmapNodeInfo (/ mag) (NodeInfo x y) where mag = sqrt (x*x + y*y) instance NFData GLfloat instance NFData NodeInfo where rnf (NodeInfo a b) = rnf a >| rnf b liftSTM :: MonadIO m => STM a -> m a liftSTM = liftIO . atomically
I’ve also had to add an instance of NFData to use rnf (strict evaluation) on NodeInfo, and the simple liftSTM function for executing STM transactions in the CHP monad. The draw process only has one change; instead of reading from a list of channels, we now read from a list of transactional variables:
drawProcess :: [TVar NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP () drawProcess tvs bar = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar xs <- liftSTM $ mapM readTVar tvs liftIO $ do startFrame mapM_ draw xs mapM_ (drawEdge xs) edges endFrame ...
Last time, I only had four nodes in my example, which was fairly boring. This time I have one hundred nodes, connected in a 10×10 grid. To make the demo interesting, all the nodes start with random positions:
startNodes :: IO [NodeInfo] startNodes = replicateM (10*10) $ liftM2 NodeInfo r r where r = liftM (fromRational . toRational) $ randomRIO (0 :: Float, 1) edges :: [(Int, Int)] edges = across ++ down where across = [(x+10*y, (x+1)+10*y) | x <- [0..8], y <- [0..9]] down = [(x+10*y, x+10*(y+1)) | x <- [0..9], y <- [0..8]]
Finally, our slightly adjusted main process:
main :: IO () main = runCHP_ $ do nodes <- liftIO startNodes vars <- liftSTM $ mapM newTVar nodes enrollAll_ (newPhasedBarrier Act) (drawProcess vars : [ let edgesOut = filter ((== i) . fst) edges edgesIn = filter ((== i) . snd) edges connectedNodes = map fst edgesIn ++ map snd edgesOut conn = [(ind `elem` connectedNodes, tv) | (tv, ind) <- zip vars [0..], tv /= mytv] in node n conn mytv | (n, mytv, i) <- zip3 nodes vars [0..] ])
There are not many changes from my previous shared channel-based version to use STM instead. My continued use of barriers alongside STM shows that you can mix STM and CHP quite nicely if you want to. Now, here’s what you’ve been waiting for — a video of the graph layout in action, first on my grid graph as defined above:
I also ran the algorithm on a simple loop graph:
The two videos have the algorithm running at 10 iterations per second, as that seems like a nice speed to view them at (and apologies for the slight garbage down the right-hand side of the videos). The grid example was inspired by an original example in the Java version of the Prefuse toolkit. Prefuse is an incredibly cool visualisation toolkit; some Java examples can be found online but more impressive are the more recent ActionScript examples. Try the Layouts tab, and also the dependency graph example if you have a spare minute.
A graph is a collection of nodes, and edges joining the nodes together. Automatically drawing them cleanly is an interesting problem. Some force-based graph layout algorithms view the edges between nodes as springs, and simulate the forces acting on each node in order to move the nodes into a good position. Connected nodes will pull towards each other until the edges are of an ideal length. We can implement such a layout algorithm in CHP, with a process per node.
To implement the algorithm, the nodes need to be able to find out the current positions of their neighbours (i.e. the nodes they are connected to) and update their own position accordingly. One approach would be to have a channel pair per edge, to enable sending of position information in both directions, as in this graph of three nodes:
I’m going to take an alternative approach of having one output channel per node, on which the node can send its position. The other end (i.e. the reading end) of these position channels will be shared, and these ends will be passed around to all the nodes that connect to the node with the output end. We can also give all the reading ends to a display process. So our wired up graph now looks like this:
A shared channel is represented by a small hollow circle. Each has one writer (the three nodes), and several readers (the connected nodes and the display process). Each iteration, our nodes will offer to send out their current position (as many times as they are asked for it) while also fetching the position of all their neighbours. Then they will all calculate a new position based on their neighbours and go again. One problem with this common discover-then-act design is that if you do not clearly separate the discovery of the neighbours’ positions and the updating of the positions, you can get nodes updating based on a mix of old positions (which is what you want) and the new updated positions — a race hazard. To prevent this, we divide each simulation step into two phases (discover and act) using a phased barrier.
A phased barrier is a synchronisation primitive. It allows processes to enroll on the barrier, to resign from the barrier, and to synchronise; processes only successfully synchronise on a barrier when all currently-enrolled processes synchronise. Each synchronisation, the phase of the barrier is moved on (and typically cycles around).
We will begin with some import statements, and declaring a NodeInfo type to hold the positions of nodes. We will also include a quick Num and Fractional instance for our NodeInfo that performs plus, minus, etc element-wise (NodeInfo 1 6 * NodeInfo 3 4 == NodeInfo 3 24):
import Control.Concurrent.CHP import Control.Monad import Control.Monad.Trans import Graphics.Rendering.OpenGL import Graphics.UI.GLUT hiding (alt) data NodeInfo = NodeInfo GLfloat GLfloat deriving (Show, Eq) instance Num NodeInfo where ... instance Fractional NodeInfo where ...
Then we will declare our phase data type, and a helper function to read from a list of shared channels in parallel:
data Phase = Discover | Act deriving (Eq, Show, Bounded, Ord, Enum) readAll :: [Shared Chanin a] -> CHP [a] readAll = runParMapM (flip claim readChannel)
Next, we will define our node process. The main body of the node process first begins the discover phase. It then acts as a sender and receiver in parallel: the receiver reads in the positions of all its neighbours, while the sender continually offers to send out its position. It finishes both of these once the phase changes. To facilitate this, we must enroll on the barrier again, and use one barrier end in the sender and one in the receiver. (If we did not enroll a second time, and tried to use the same single barrier end twice in parallel, this would be a mis-use of the library.) So here is most of the node process:
node :: NodeInfo -> [Shared Chanin NodeInfo] -> Chanout NodeInfo -> Enrolled PhasedBarrier Phase -> CHP () node start neighbourChans out bar = node' start where node' cur = do Discover <- syncBarrier bar (_, neighbourPos) <- furtherEnroll bar $ \bar2 -> giveOutPosUntilBar cur <||> do pos <- readAll neighbourChans Act <- syncBarrier bar2 return pos node' (updatePos neighbourPos cur) giveOutPosUntilBar cur = (writeChannel out cur >> giveOutPosUntilBar cur) <-> do Act <- syncBarrier bar return ()
The sender is the giveOutPosUntilBar process, and the receiver is on the right-hand side of the parallel. By making explicit the phase that we expect to begin with each barrier synchronisation, we both make our code clear (you can see which part is in the discover phase, and which part is in the act phase) and also effectively assert correctness; if the pattern-match fails, your code will produce an error.
Updating the position of the node based on its neighbours is all pure code. This is not a very sophisticated algorithm, but it will suffice for the purposes of illustration:
updatePos poss cur = cur + (0.05 * average [let v = p - cur in v - ideal * normalise v | p <- poss]) where ideal = 0.3 normalise (NodeInfo x y) = NodeInfo (x / mag) (y / mag) where mag = sqrt (x*x + y*y) average xs = sum xs / fromIntegral (length xs)
The draw process is mainly irrelevant OpenGL logic (adapted from my boids example), but the interesting part is that it must act in the discover phase, partly because that’s the only time that the nodes will send their position, and partly because it’s actually the drawing that drives the frame-rate (a pull-based architecture).
drawProcess :: [Shared Chanin NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP () drawProcess input bar = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar xs <- readAll input liftIO $ do startFrame mapM_ draw xs mapM_ (drawEdge xs) edges endFrame liftIO (do setup displayCallback $= glRunAs2D displayIO let addTimer = addTimerCallback 500 timer timer = addTimer >> postRedisplay Nothing addTimer mainLoop) where setup = do initialWindowSize $= Size 500 500 getArgsAndInitialize initialDisplayMode $= [DoubleBuffered] createWindow "CHP Graph" startFrame = do clearColor $= Color4 0 0 0 0 clear [ColorBuffer, DepthBuffer] endFrame = do flush swapBuffers glRunAs2D :: IO () -> IO () glRunAs2D draw = do (matrixMode $= Modelview 0) >> loadIdentity (matrixMode $= Projection) >> loadIdentity ortho 0 1 0 1 (-1000) 1000 preservingMatrix draw draw :: NodeInfo -> IO () draw (NodeInfo x y) = renderPrimitive Polygon $ sequence_ [ vertex $ Vertex2 (x + 0.05 * cos t) (y + 0.05 * sin t) | t <- map ((pi/10)*) [0..19]] drawEdge :: [NodeInfo] -> (Int, Int) -> IO () drawEdge nodes (s, e) = renderPrimitive Lines $ vertex (Vertex2 x1 y1) >> vertex (Vertex2 x2 y2) where (NodeInfo x1 y1, NodeInfo x2 y2) = (nodes !! s, nodes !! e)
Finally, we must initialise the nodes and wire up the simulation. For our barrier, we will use the enrollAll_ function that takes a barrier-creation function, a list of processes that take an enrolled barrier as a parameter, and runs them all in parallel with their own enrolled barrier ends (discarding the output). Crucially, enrollAll does the enrolling before any of the processes have begun. If you run your processes in parallel and get them to enroll themselves, you will create a race hazard in your program: one process might enroll and start synchronising by itself before the other processes have started executing. This is almost certainly not what you want. So here is the code:
startNodes :: [NodeInfo] startNodes = [NodeInfo 0 0, NodeInfo 0 1, NodeInfo 1 0, NodeInfo 1 1] edges :: [(Int, Int)] edges = [(0,1), (1,2), (2,0), (1, 3)] main :: IO () main = runCHP_ $ do outChans <- replicateM numNodes oneToAnyChannel enrollAll_ (newPhasedBarrier Act) (drawProcess (readers outChans) : [ let edgesOut = filter ((== i) . fst) edges edgesIn = filter ((== i) . snd) edges connectedNodes = map fst edgesIn ++ map snd edgesOut in node n (readers (map (outChans !!) connectedNodes)) (writer c) | (n, c, i) <- zip3 startNodes outChans [0..]]) where numNodes = length startNodes
The list comprehension uses the edges list to pick out all the right channels for each node (i.e. it translates the connectivity expressed in the edges list into the channel topology). The code in this post forms a complete program. It is not completely effective as I have not added repulsion among non-connected nodes (an exercise for the reader perhaps), but here is a quick screenshot of the result:
My intention with this example was to illustrate the use of shared channels, and particularly barriers. The pattern shown here, of dividing simulations into phases, is one of their most common uses but they can be used elsewhere, sometimes in place of channels; from a more abstract perspective, channels in CHP offer synchronisation and communication, whereas barriers offer purely synchronisation. A one-to-one channel carrying the unit type is semantically equivalent to a two-party barrier with no phase information. The channel has the benefit of not needing explicit enrollment, but the disadvantage of being asymmetric in its use. For example, picking up and putting down forks in the dining philosophers example can be implemented using either two-party barriers or channels carrying the unit type.
Note: As often with my recent posts, writing them revealed that I lacked certain useful helper functions, so you will need the new CHP 1.7.0 (which also includes other changes I will discuss in future) for the above code.