Archive

Posts Tagged ‘STM’

Choice over Events using STM

March 4, 2010 Leave a comment

I’m currently writing a paper on CHP’s performance for conjunction, which I have been optimising recently. The problem with a new feature like conjunction is that there is nothing else to benchmark it against! But I can benchmark the effect that supporting conjunction has on performance for simple channel communications and other things that don’t feature conjunction.

Two of my comparisons are simple synchronous channels based on MVars and STM. These don’t support choice between events — you can’t choose between writing to two synchronous channels built on MVars or STM without more machinery on top. But they are fast. Another comparison is the CML package, which does support such choice between events — the performance of CML merits its own post some time (in short: fine normally, but terrible if you use its choice operator a lot — unless I have made a mistake in the benchmark).

I also wanted to benchmark an implementation that supported choice but not conjunction, based on STM. Version 1.0.0 of CHP fulfils this criteria, but was badly designed and totally unoptimised — and I know from my recent optimisations how bad the performance might be. So I constructed an optimised version of channels with choice but no conjunction. I was surprised at how short the algorithm was, and realised that it could be explained in a blog post. So here it is.

Implementing Event Choice with STM

Let’s be clear on the problem, first. I want an Event type such that I can say “wait for event A or event B, whichever happens first, but only engage in one of them”. Then I want someone else to be able to concurrently say “wait for event B or event C or event D, whichever happens first, but only engage in one of them” and have the algorithm resolve it all. STM doesn’t achieve this by itself; you can use orElse to choose between writing to two variables, but that doesn’t suffice for multiple people engaging in events with each other.

We begin with a helper function — one of those functions that is general enough that it might almost feature in a standard library. Here it is:

-- | Executes the actions until it finds one that returns True (at which point
-- it will execute no further actions).  Returns True if an action did, False
-- if none of them did.
anyM :: Monad m => [m Bool] -> m Bool
anyM = foldM orM False
  where
    orM True _ = return True
    orM False m = m

Next we’ll declare our data-types. Our Event contains a constant enrollment count (the number of processes required to synchronise together), and a transactional variable holding a list of current offers, each with an associated STM action. An offer is in turn a list of events which uses a ThreadId as a unique identifier; think of an offer as saying: I offer to engage in exactly one of the events in the list, and I’m waiting until I do:

data Offer = Offer { offerThreadId :: ThreadId, offerEvents :: [Event] }
instance Eq Offer where (==) = (==) `on` offerThreadId

data Event = Event { enrollCount :: Int, offersTV :: TVar [(STM (), Offer)] }

Adding an offer to an event is as simple as adding it to the list of offers. My modifyTVar' function has type (a -> a) -> TVar a -> STM () and applies the modification function to the contents of the TVar, but it adds a little strictness that helps performance:

recordOffer :: Offer -> (STM (), Event) -> STM ()
recordOffer o (act, e) = modifyTVar' ((act, o):) (offersTV e)

We also define a function for checking if an event is able to complete (when we are making an offer on it). This takes an event, and an action to perform if the event can complete. It then reads the current offers from the event’s transactional variable. If the enrollment count is equal to the number of current offers plus one (the offer now being made), it can complete. Completion involves performing all the associated STM actions, and then using a revoke function to remove the offers (which have now chosen this event, say: A) from all the events that they had offered on (e.g. event A, event B, event C):

checkComplete :: (STM (), Event) -> STM Bool
checkComplete (act, e)
  = do offers <- readTVar (offersTV e)
       if enrollCount e /= length offers + 1
         then return False
         else do sequence_ (act : map fst offers)
                 mapM_ (revoke . snd) offers
                 return True

revoke :: Offer -> STM ()
revoke off = mapM_ (modifyTVar' removeUs . offersTV) (offerEvents off)
  where
    removeUs = filter ((/= off) . snd)

We only require one further function. This function, offerAll, handles the creation of an offer, checks if any of the events in the offer can complete immediately, and otherwise records the offers in the event then waits for one of them to be completed by a later participants. It must use two transactions for this; one to make the offers (this transaction needs to finish for it to be visible to the other participants) and one to wait for an event to be completed. A crucial part of the function is not just knowing that an offer completed, but also knowing which one. For this we construct a TVar of our own into which a result can be written. This starts off as Nothing, and we later wait for it to become a Just value. We augment the user-supplied action-on-completion to also write into this TVar. The design of the algorithm as a whole ensures that this variable will only ever be written to once. Here is offerAll:

offerAll :: [(STM (), Event, a)] -> IO a
offerAll off
  = do tid <- myThreadId
       rtv <- atomically $ checkAll tid
       atomically $ readTVar rtv >>= maybe retry return    
  where
    checkAll tid
      = do rtv <- newTVar Nothing
           let offer = [(act >> writeTVar rtv (Just x), e) | (act, e, x) <- off]
           complete <- anyM (map checkComplete offer)
           unless complete $
              mapM_ (recordOffer (Offer tid [e | (_, e, _) <- off])) offer
           return rtv

This is all that is needed for events with choice at both ends. You call offerAll with a list of offers and it gives you back the value you associated with that offer.

The Public API

To wrap this into a communication channel with a CML-like API, we wrap it up as follows. First we declare an SEvent type (named after CML, hence the re-use of the term event for another meaning) that represents a synchronisation action; this is a list (of choices), each containing an internal event, an action to perform during the completion of the offer, and one to perform afterwards that will yield a return value (which we can use for a Functor instance):

data SEvent a = SEvent { sEvent :: [((STM (), STM a), Event)] }

instance Functor SEvent where
  fmap f (SEvent es) = SEvent [((dur, fmap f aft), e) | ((dur, aft), e) <- es]

choose :: [SEvent a] -> SEvent a
choose = SEvent . concatMap sEvent

You can see that the choose function simply joins lists of choices together. We define our synchronisation function using offerAll, which will return the corresponding afterwards-STM-action for the chosen event, which we then execute using atomically:

sync :: SEvent a -> IO a
sync (SEvent es) = offerAll [(dur,e,aft) | ((dur,aft),e) <- es] >>= atomically

Finally we can define a type for a synchronous communication channel, SChannel that joins together an event (the internal kind) and a transactional variable for passing the value:

data SChannel a = SChannel Event (TVar a)

send :: SChannel a -> a -> SEvent ()
send (SChannel e ctv) x = SEvent [((writeTVar ctv x, return ()), e)]

recv :: SChannel a -> SEvent a
recv (SChannel e ctv) = SEvent [((return (), readTVar ctv), e)]

The send function puts the value to send into the variable during the original event completion, and then afterwards the reader takes the value out of the variable at its leisure. (The implementation assumes that the same participants will use the channel each time; an extra level of indirection could be added to make the implementation more flexible in this regard.)

The code in this post provides nearly the same functionality as the CML library, but my tests indicate it is faster. I have now uploaded this code (with some documentation) as the sync package on Hackage. This provides a useful “lite” alternative to CHP that runs in the IO monad, and an alternative implementation of most of the features of the CML package.

Categories: Uncategorized Tags: , , ,

Graph Layout with Software Transactional Memory and Barriers (plus video!)

November 26, 2009 1 comment

In my last post, I used barriers and shared channels to write a simple program for performing force-based graph layout. There were two things lacking in that example: firstly, repulsion between unconnected nodes and secondly, some cool videos. I’ve now solved both of these shortcomings.

In order to implement repulsion between all nodes, each node must know the position of all other nodes at every time-step. I could implement this with message-passing, but that would be O(N^2) communications each phase, which isn’t very efficient, and at that point, message-passing is probably not the right design. We need some shared variables instead. Software Transactional Memory (STM) provides a nice way to do shared variable concurrency in Haskell, so we will use that.

The idea is that we will have a transactional variable (TVar) per node, holding the node’s position. Every time-step, each node will read from all the TVars to discover the positions of all the other nodes, and will then update its own position based on the other nodes. If we are not careful, we will have the same race hazard that I warned against last time: one node could update its position before all the other nodes have read its previous position. Or, if left totally unconstrained, one node could perform several updates before other nodes have even performed one. To prevent these problems, we will again use barriers to break up the simulation into two phases: discover, and act. This means that we will be combining CHP and STM: using barriers from CHP to regulate access to shared variables from STM.

All the code before the node process is exactly the same as last time, except that I now import Control.Concurrent.STM and Control.Parallel.Strategies (for some strict application) — and I’ve made the nodes a bit smaller. Our node process no longer takes channels as its parameters, but TVars instead. It takes a list corresponding to the other nodes, of pairs of booleans (indicating if the other node is connected to this one) and transactional variables (to read the other node’s position). It also takes a single transactional variable, which it should use to write its own position to:

node :: NodeInfo -> [(Bool, TVar NodeInfo)] -> TVar NodeInfo
     -> Enrolled PhasedBarrier Phase -> CHP ()
node start neighbours me bar = node' start
  where
    (connected, tvs) = unzip neighbours
    
    node' cur
      = do Discover <- syncBarrier bar
           neighbourPos <- liftSTM (mapM readTVar tvs)
           let new = updatePos (zip connected neighbourPos) cur
           Act <- syncBarrier bar
           liftSTM (writeTVar me (new `using` rnf))
           node' new

You can see that the node process is a bit simpler. In the Discover phase it reads from the TVars and in the Act phase it writes to its own. It is immediately apparent that the reads and writes are thus segregated. liftSTM is defined below (it is essentially the atomically function), and the `using` rnf is a strictness thing that you can ignore. The updatePos function is now more complicated as we have added repulsion (though it is all irrelevant to the concurrent logic):

      
    updatePos poss cur
      = sum [cur
            ,0.05 * sum [ let v = p - cur in v - ideal * normalise v
                        | (True, p) <- poss] -- connected nodes
            ,0.000002 * sum [let sc = (p `from` cur) ^^ (-2)
                             in NodeInfo sc sc * normalise (cur - p)
                            | (_, p) <- poss] -- all nodes
            ]
      where
        ideal = 0.02
        (NodeInfo x2 y2) `from` (NodeInfo x1 y1)
          = sqrt $ (x2-x1)^^2 + (y2-y1)^^2
        normalise (NodeInfo x y) = fmapNodeInfo (/ mag) (NodeInfo x y)
          where
            mag = sqrt (x*x + y*y)

instance NFData GLfloat
instance NFData NodeInfo where
  rnf (NodeInfo a b) = rnf a >| rnf b

liftSTM :: MonadIO m => STM a -> m a
liftSTM = liftIO . atomically

I’ve also had to add an instance of NFData to use rnf (strict evaluation) on NodeInfo, and the simple liftSTM function for executing STM transactions in the CHP monad. The draw process only has one change; instead of reading from a list of channels, we now read from a list of transactional variables:

drawProcess :: [TVar NodeInfo] -> Enrolled PhasedBarrier Phase -> CHP ()
drawProcess tvs bar
 = do displayIO <- embedCHP_ $ do syncAndWaitForPhase Discover bar
                                  xs <- liftSTM $ mapM readTVar tvs
                                  liftIO $ do startFrame
                                              mapM_ draw xs
                                              mapM_ (drawEdge xs) edges
                                              endFrame
...

Last time, I only had four nodes in my example, which was fairly boring. This time I have one hundred nodes, connected in a 10×10 grid. To make the demo interesting, all the nodes start with random positions:

startNodes :: IO [NodeInfo]
startNodes = replicateM (10*10) $ liftM2 NodeInfo r r
  where
    r = liftM (fromRational . toRational) $ randomRIO (0 :: Float, 1)

edges :: [(Int, Int)]
edges = across ++ down
  where
    across = [(x+10*y, (x+1)+10*y) | x <- [0..8], y <- [0..9]]
    down = [(x+10*y, x+10*(y+1)) | x <- [0..9], y <- [0..8]]

Finally, our slightly adjusted main process:

main :: IO ()
main = runCHP_ $
       do nodes <- liftIO startNodes
          vars <- liftSTM $ mapM newTVar nodes
          enrollAll_ (newPhasedBarrier Act)
           (drawProcess vars :
            [ let edgesOut = filter ((== i) . fst) edges
                  edgesIn = filter ((== i) . snd) edges
                  connectedNodes = map fst edgesIn ++ map snd edgesOut
                  conn = [(ind `elem` connectedNodes, tv)
                         | (tv, ind) <- zip vars [0..], tv /= mytv]
              in node n conn mytv
            | (n, mytv, i) <- zip3 nodes vars [0..]
            ])

There are not many changes from my previous shared channel-based version to use STM instead. My continued use of barriers alongside STM shows that you can mix STM and CHP quite nicely if you want to. Now, here’s what you’ve been waiting for — a video of the graph layout in action, first on my grid graph as defined above:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

I also ran the algorithm on a simple loop graph:


Choose from WMV format (suitable for Windows) or MPEG format (suitable for Linux, etc).

The two videos have the algorithm running at 10 iterations per second, as that seems like a nice speed to view them at (and apologies for the slight garbage down the right-hand side of the videos). The grid example was inspired by an original example in the Java version of the Prefuse toolkit. Prefuse is an incredibly cool visualisation toolkit; some Java examples can be found online but more impressive are the more recent ActionScript examples. Try the Layouts tab, and also the dependency graph example if you have a spare minute.

Categories: Uncategorized Tags: , , ,