In my last post I showed the infrastructure and logic of a simple simulation of some sticky blood platelets moving through a blood vessel. I added the video to the end of that post; in this post I will explain how the visualisation was implemented.
In most CHP simulations there is a barrier (typically named tick) that is used to keep all the simulation entities synchronised, so that they all proceed through the discrete time-steps together. When it comes to visualising simulations, we could add a separate channel through which to report the status of all the simulation entities to a central drawing process, but this feels like a waste when all the processes are already synchronising together. CHP has reduce channels (many-to-one) that are the opposite of (one-to-many) broadcast channels; in a reduce channel, all the writers send in their data to a single reader, and all must send together before the communication is complete. So a simple way to achieve visualisation is to transform the tick barrier into a reduce channel, which has the same synchronisation behaviour, but that also allows data to be sent to some drawing process.
The simulation entities don’t need to know the details of all this, so we hide it using a send action that I’ve called a tick action:
type TickAction = SendAction (Maybe (Platelet, Bool)) data Platelet = Platelet GLfloat
Note that I’ve also had to tweak the type in the platelet from last time to make working with the OpenGL library a bit easier. The only change to the site process (and plateletGenerator and plateletPrinter processes) is related to the change in the tick event to being an action. Here is the site process in full:
site :: Chanin (Maybe Platelet) -> Chanout (Maybe Platelet) -> TickAction -> CHP () site prevIn nextOut act = foreverFeed (maybe empty full) Nothing where tickWith = sendAction act empty = join . fst <$> offer ( (once $ readChannel prevIn) `alongside` (once $ writeChannel nextOut Nothing) `alongside` (endWhen $ tickWith Nothing) ) full platelet = do r <- liftIO $ randomRIO (0, (1::Double)) let moving = r >= 0.05 tick = tickWith (Just (platelet, moving)) mv = readChannel prevIn <&> writeChannel nextOut (Just platelet) probablyMove = if moving then fst <$> mv else stop fromMaybe (Just platelet) . fst <$> offer ((once probablyMove) `alongside` endWhen tick)
Synchronising on the tick barrier has become engaging in the tick event, where we must pass in that optional pair Maybe (Platelet, Bool); the first item is the platelet currently occupying the site, and the boolean indicates whether the platelet was willing to move on this time-step. If the site is empty, the Nothing value is sent.
The status values end up passed to the draw function, which is fairly boring (it draws a vertical bar) — we are only interested in its type:
data ScreenLocation = ScreenLocation Int deriving (Eq, Ord) draw :: ScreenLocation -> Maybe (Platelet, Bool) -> IO ()
This is called from the drawProcess:
drawProcess :: ReduceChanin (Map.Map ScreenLocation (Maybe (Platelet, Bool))) -> CHP () drawProcess input = do displayIO <- embedCHP_ $ do x <- readChannel input liftIO $ do startFrame mapM_ (uncurry draw) (Map.toList x) endFrame ...
Note that drawProcess takes the input end of a reduce channel which carries a map from ScreenLocation (a one-dimensional location in our simple example) to the Maybe (Platelet, Bool) values we saw. Reduce channels in CHP must carry a monoid type, because the monoid instance is used to join all the many values from the writers into a single value (using mappend/mconcat, but in a non-deterministic order — so make sure the monoid is commutative).
The Map type (from Data.Map) is a monoid that has union as its mappend operation. This is exactly what we want; each site process will send in a singleton Map with their specific screen location mapped to their current status, and using the monoid instance these maps will all be joined (quite safely, since each sender will have a different location, and hence a distinct key entry in its Map) into one big map, that can then be fed into the draw function as we saw above.
We don’t trouble the site process with knowing its location; instead, we wrap up the location in the send action. It is easy to construct send actions that apply a function to a value before it is sent, so we apply a function that takes a status value, and turns it into the singleton Map just discussed. This is all done as part of the wiring up of the process network, based on the version from last time:
main :: IO () main = runCHP_ $ do status <- manyToOneChannel pipelineConnectCompleteT (enrollAll (return $ writer status) . zipWith withSend locationList) plateletGenerator (replicate numSites site) plateletPrinter <|*|> drawProcess (reader status) where numSites = screenSize - 2 locationList = map ScreenLocation [0..(screenSize-1)] withSend k p c = p $ makeSendAction' c (Map.singleton k)
The withSend function does the wrapping of the modification function with the send action. Each site is given a location from the list, including the generator and printer processes; otherwise this function is the same as the version from part 1.
I’ve omitted the OpenGL code, which is much the same as my previous examples. But here, again, is the video showing the results of the visualisation:
Turning the tick barrier into a reduce channel is often an easy way to visualise a simulation, and doesn’t require too much change to the code. As I said last time, the video is nothing compared to the TUNA videos which are very impressive, and some of which were generated by distributing the code over a cluster — a topic I hope to come to in CHP at some point in the future.
This post brings together concepts from several recent posts, including behaviours, conjunction and channel wiring. It is based on the work in the completed TUNA project, which has some stunning videos of blood clotting (and some images if you prefer):
I’m going to cut down the TUNA example greatly, to blog-post size. What we are going to be simulating is sticky objects in a pipeline, for example sticky blood platelets moving through a blood vessel. Our platelets will be represented as data that is passed between active “site” processes; our blood vessel will be a one-dimensional pipeline of site processes. Each process will be connected to its neighbours with a channel that can pass platelets. All the processes will be enrolled on the tick barrier, as is customary in CHP simulations.
We’ll begin the code with a helper function (one that I would like to see in the standard library) that iterates forever with a state value being passed through, and our platelet data type:
foreverFeed :: Monad m => (a -> m a) -> a -> m b foreverFeed f x = f x >>= foreverFeed f data Platelet = Platelet Float
The data in the platelet type is a colour value that I will use for visualisation in the next part of this guide. This is set randomly by the platelet generator at the beginning of the pipeline:
plateletGenerator :: Chanout (Maybe Platelet) -> EnrolledBarrier -> CHP () plateletGenerator out tick = forever $ on >> off >> off where on = do platelet <- Just . Platelet <$> liftIO (randomRIO (0, 0.5)) offerAll [ once (writeChannel out platelet) , endWhen (syncBarrier tick) ] off = offerAll [once (writeChannel out Nothing), endWhen (syncBarrier tick)]
The pipeline generates platelets regularly, one every three time-steps (this is coded as the simple on-off-off sequence). When it is performing an “on” time-step, it generates a platelet with a random shade, then uses behaviours to offer to once send the platelet until tick happens (i.e. the frame is over). The next site in the pipeline may not take the new platelet if the site is full and not moving this time-step, so the platelet may get discarded in that case. In the off state, the generator waits for the tick to end the frame, but also offers to tell the site ahead of it that the generator is empty (signified by sending Nothing rather than Just a platelet).
The main logic is in the site process, which also has two states, empty and full:
site :: Chanin (Maybe Platelet) -> Chanout (Maybe Platelet) -> EnrolledBarrier -> CHP () site prevIn nextOut tick = foreverFeed (maybe empty full) Nothing where empty :: CHP (Maybe Platelet) full :: Platelet -> CHP (Maybe Platelet)
Each time, if there is Just a platelet returned by the function, the next state will be full, otherwise it will be empty. The initial state is empty (the Nothing value). The empty state consists of three possible behaviours:
empty = join . fst <$> offer ( (once $ readChannel prevIn) `alongside` (once $ writeChannel nextOut Nothing) `alongside` (endWhen $ syncBarrier tick) )
In an empty state, a site will read in a new platelet from the previous site in the pipeline (if available), it will offer to communicate to the next site in the pipeline that it is empty, and it will finish this behaviour when the tick event happens. The value returned is the result of reading from the channel, which will be Nothing if no read occurred or if we read in a Nothing value (and hence the site remains empty) or Just the result of the read if it did happen and was a platelet (in which case the site will become full). It is possible to reduce the amount of communications happening with empty processes, but I want to keep this example simple if I can.
The full state is as follows:
full platelet = do r <- liftIO $ randomRIO (0, (1::Double)) let move = readChannel prevIn <&> writeChannel nextOut (Just platelet) probablyMove = if r < 0.05 then stop else fst <$> move fromMaybe (Just platelet) . fst <$> (offer $ once probablyMove `alongside` endWhen (syncBarrier tick) )
We will pick this code apart, bit by bit. It is primarily an offer between the tick to end the frame and another behaviour, called probablyMove. When the site is full, it has a 5% chance of refusing to do anything, meaning that a single platelet will not move in 5% of time-steps. So it starts by generating a random number between 0 and 1. If this is under 0.05 (i.e. a 5% chance), the probablyMove behaviour is stop, meaning it will not move — the site will just wait for the end of the frame in these 5% of cases.
In the other 95% of the time-steps, a move is offered, using conjunction. The site offers to read a value from the previous channel (which may be Just a platelet, or a Nothing value signifying the site was empty) and send on its current platelet, shuffling the platelets along the pipeline. So its overall behaviour is that it will send on its current platelet, if and only if: the previous site is empty, or the previous site is full and willing to send its platelet on (it won’t be willing 5% of the time). So a platelet can only move if there is no-one behind it, or if the platelet behind it moves too.
The implications of this behaviour are that once platelets are in adjoining cells, they only move on together. Thus any platelets that bump together form a notional clot that stays together forever after. This clot is not explicitly programmed and has no representation in the program. It is emergent behaviour that arises out of the local rules of the site process; each site only communicates with the site either side of it, and yet the program logic means that clots that are tens of platelets long could form, and would be unbreakable.
The other neat thing that arises out of the logic comes from the 5% chance. In 5% of time-steps a platelet will not move. (This is what allows the platelets to bump together in the first place.) Since a clot can only move when all its platelets move, a two-platelet clot has a roughly 10% chance of not moving (really: 1 – 0.95^2), and a three-platelet clot has about a 14% chance of not moving (1 – 0.95^3). So big clots will move slower, which means that the platelets behind become more likely to join on. Despite only having a local probability of not moving, we get the behaviour that larger clots are less likely to be able to move.
Enough on the site process; at the end of the pipeline is a platelet printer, that swallows up any platelets and prints out how large each clot was that reached the end of the pipeline:
plateletPrinter :: Chanin (Maybe Platelet) -> EnrolledBarrier -> CHP () plateletPrinter input tick = foreverFeed plateletPrinter'  where plateletPrinter' ps = do mp <- join . fst <$> offer (once (readChannel input) `alongside` endWhen (syncBarrier tick)) let ps' = ps ++ [mp] (test, text) = maybe (isNothing, "Blank") (const (isJust, "Clot")) (head ps') (chunk, rest) = span test ps' if null chunk || null rest then return ps' else do let status = text ++ ", size: " ++ show (length chunk) liftIO $ putStrLn status return rest
And finally we must wire up all of this. Thankfully, our new connectable helper functions make this quite easy, and short:
main :: IO () main = runCHP_ $ pipelineConnectCompleteT (enrollAll newBarrier) plateletGenerator (replicate numSites site) plateletPrinter where numSites = 100
If we compile and run this program, we get a print-out of clot sizes:
Blank, size: 103 Clot, size: 1 Blank, size: 51 Clot, size: 4 Blank, size: 2 Clot, size: 1
That is terribly unexciting, so I’m going to give a sneak video preview of a visualisation that I will go through in my next post. The 1D pipeline of sites is visualised left-to-right, with each tall bar being a platelet (or black for empty). When the bar flashes white, this is in the 5% of cases where the platelet is refusing to move. Hence you can see that when the larger clots form, the white flashes of the various platelets prevent the clot from moving:
In my last post, I used barriers and shared channels to write a simple program for performing force-based graph layout. There were two things lacking in that example: firstly, repulsion between unconnected nodes and secondly, some cool videos. I’ve now solved both of these shortcomings.
In order to implement repulsion between all nodes, each node must know the position of all other nodes at every time-step. I could implement this with message-passing, but that would be O(N^2) communications each phase, which isn’t very efficient, and at that point, message-passing is probably not the right design. We need some shared variables instead. Software Transactional Memory (STM) provides a nice way to do shared variable concurrency in Haskell, so we will use that.
The idea is that we will have a transactional variable (TVar) per node, holding the node’s position. Every time-step, each node will read from all the TVars to discover the positions of all the other nodes, and will then update its own position based on the other nodes. If we are not careful, we will have the same race hazard that I warned against last time: one node could update its position before all the other nodes have read its previous position. Or, if left totally unconstrained, one node could perform several updates before other nodes have even performed one. To prevent these problems, we will again use barriers to break up the simulation into two phases: discover, and act. This means that we will be combining CHP and STM: using barriers from CHP to regulate access to shared variables from STM.
All the code before the node process is exactly the same as last time, except that I now import Control.Concurrent.STM and Control.Parallel.Strategies (for some strict application) — and I’ve made the nodes a bit smaller. Our node process no longer takes channels as its parameters, but TVars instead. It takes a list corresponding to the other nodes, of pairs of booleans (indicating if the other node is connected to this one) and transactional variables (to read the other node’s position). It also takes a single transactional variable, which it should use to write its own position to:
node :: NodeInfo -> [(Bool, TVar NodeInfo)] -> TVar NodeInfo -> Enrolled PhasedBarrier Phase -> CHP () node start neighbours me bar = node' start where (connected, tvs) = unzip neighbours node' cur = do Discover <- syncBarrier bar neighbourPos <- liftSTM (mapM readTVar tvs) let new = updatePos (zip connected neighbourPos) cur Act <- syncBarrier bar liftSTM (writeTVar me (new `using` rnf)) node' new
You can see that the node process is a bit simpler. In the Discover phase it reads from the TVars and in the Act phase it writes to its own. It is immediately apparent that the reads and writes are thus segregated. liftSTM is defined below (it is essentially the atomically function), and the `using` rnf is a strictness thing that you can ignore. The updatePos function is now more complicated as we have added repulsion (though it is all irrelevant to the concurrent logic):
updatePos poss cur = sum [cur ,0.05 * sum [ let v = p - cur in v - ideal * normalise v | (True, p) <- poss] -- connected nodes ,0.000002 * sum [let sc = (p `from` cur) ^^ (-2) in NodeInfo sc sc * normalise (cur - p) | (_, p) <- poss] -- all nodes ] where ideal = 0.02 (NodeInfo x2 y2) `from` (NodeInfo x1 y1) = sqrt $ (x2-x1)^^2 + (y2-y1)^^2 normalise (NodeInfo x y) = fmapNodeInfo (/ mag) (NodeInfo x y) where mag = sqrt (x*x + y*y) instance NFData GLfloat instance NFData NodeInfo where rnf (NodeInfo a b) = rnf a >| rnf b liftSTM :: MonadIO m => STM a -> m a liftSTM = liftIO . atomically
I’ve also had to add an instance of NFData to use rnf (strict evaluation) on NodeInfo, and the simple liftSTM function for executing STM transactions in the CHP monad. The draw process only has one change; instead of reading from a list of channels, we now read from a list of transactional variables:
drawProcess :: [TVar NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP () drawProcess tvs bar = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar xs <- liftSTM $ mapM readTVar tvs liftIO $ do startFrame mapM_ draw xs mapM_ (drawEdge xs) edges endFrame ...
Last time, I only had four nodes in my example, which was fairly boring. This time I have one hundred nodes, connected in a 10×10 grid. To make the demo interesting, all the nodes start with random positions:
startNodes :: IO [NodeInfo] startNodes = replicateM (10*10) $ liftM2 NodeInfo r r where r = liftM (fromRational . toRational) $ randomRIO (0 :: Float, 1) edges :: [(Int, Int)] edges = across ++ down where across = [(x+10*y, (x+1)+10*y) | x <- [0..8], y <- [0..9]] down = [(x+10*y, x+10*(y+1)) | x <- [0..9], y <- [0..8]]
Finally, our slightly adjusted main process:
main :: IO () main = runCHP_ $ do nodes <- liftIO startNodes vars <- liftSTM $ mapM newTVar nodes enrollAll_ (newPhasedBarrier Act) (drawProcess vars : [ let edgesOut = filter ((== i) . fst) edges edgesIn = filter ((== i) . snd) edges connectedNodes = map fst edgesIn ++ map snd edgesOut conn = [(ind `elem` connectedNodes, tv) | (tv, ind) <- zip vars [0..], tv /= mytv] in node n conn mytv | (n, mytv, i) <- zip3 nodes vars [0..] ])
There are not many changes from my previous shared channel-based version to use STM instead. My continued use of barriers alongside STM shows that you can mix STM and CHP quite nicely if you want to. Now, here’s what you’ve been waiting for — a video of the graph layout in action, first on my grid graph as defined above:
I also ran the algorithm on a simple loop graph:
The two videos have the algorithm running at 10 iterations per second, as that seems like a nice speed to view them at (and apologies for the slight garbage down the right-hand side of the videos). The grid example was inspired by an original example in the Java version of the Prefuse toolkit. Prefuse is an incredibly cool visualisation toolkit; some Java examples can be found online but more impressive are the more recent ActionScript examples. Try the Layouts tab, and also the dependency graph example if you have a spare minute.