Archive

Posts Tagged ‘CML’

Choice over Events using STM

March 4, 2010 Leave a comment

I’m currently writing a paper on CHP’s performance for conjunction, which I have been optimising recently. The problem with a new feature like conjunction is that there is nothing else to benchmark it against! But I can benchmark the effect that supporting conjunction has on performance for simple channel communications and other things that don’t feature conjunction.

Two of my comparisons are simple synchronous channels based on MVars and STM. These don’t support choice between events — you can’t choose between writing to two synchronous channels built on MVars or STM without more machinery on top. But they are fast. Another comparison is the CML package, which does support such choice between events — the performance of CML merits its own post some time (in short: fine normally, but terrible if you use its choice operator a lot — unless I have made a mistake in the benchmark).

I also wanted to benchmark an implementation that supported choice but not conjunction, based on STM. Version 1.0.0 of CHP fulfils this criteria, but was badly designed and totally unoptimised — and I know from my recent optimisations how bad the performance might be. So I constructed an optimised version of channels with choice but no conjunction. I was surprised at how short the algorithm was, and realised that it could be explained in a blog post. So here it is.

Implementing Event Choice with STM

Let’s be clear on the problem, first. I want an Event type such that I can say “wait for event A or event B, whichever happens first, but only engage in one of them”. Then I want someone else to be able to concurrently say “wait for event B or event C or event D, whichever happens first, but only engage in one of them” and have the algorithm resolve it all. STM doesn’t achieve this by itself; you can use orElse to choose between writing to two variables, but that doesn’t suffice for multiple people engaging in events with each other.

We begin with a helper function — one of those functions that is general enough that it might almost feature in a standard library. Here it is:

-- | Executes the actions until it finds one that returns True (at which point
-- it will execute no further actions).  Returns True if an action did, False
-- if none of them did.
anyM :: Monad m => [m Bool] -> m Bool
anyM = foldM orM False
  where
    orM True _ = return True
    orM False m = m

Next we’ll declare our data-types. Our Event contains a constant enrollment count (the number of processes required to synchronise together), and a transactional variable holding a list of current offers, each with an associated STM action. An offer is in turn a list of events which uses a ThreadId as a unique identifier; think of an offer as saying: I offer to engage in exactly one of the events in the list, and I’m waiting until I do:

data Offer = Offer { offerThreadId :: ThreadId, offerEvents :: [Event] }
instance Eq Offer where (==) = (==) `on` offerThreadId

data Event = Event { enrollCount :: Int, offersTV :: TVar [(STM (), Offer)] }

Adding an offer to an event is as simple as adding it to the list of offers. My modifyTVar' function has type (a -> a) -> TVar a -> STM () and applies the modification function to the contents of the TVar, but it adds a little strictness that helps performance:

recordOffer :: Offer -> (STM (), Event) -> STM ()
recordOffer o (act, e) = modifyTVar' ((act, o):) (offersTV e)

We also define a function for checking if an event is able to complete (when we are making an offer on it). This takes an event, and an action to perform if the event can complete. It then reads the current offers from the event’s transactional variable. If the enrollment count is equal to the number of current offers plus one (the offer now being made), it can complete. Completion involves performing all the associated STM actions, and then using a revoke function to remove the offers (which have now chosen this event, say: A) from all the events that they had offered on (e.g. event A, event B, event C):

checkComplete :: (STM (), Event) -> STM Bool
checkComplete (act, e)
  = do offers <- readTVar (offersTV e)
       if enrollCount e /= length offers + 1
         then return False
         else do sequence_ (act : map fst offers)
                 mapM_ (revoke . snd) offers
                 return True

revoke :: Offer -> STM ()
revoke off = mapM_ (modifyTVar' removeUs . offersTV) (offerEvents off)
  where
    removeUs = filter ((/= off) . snd)

We only require one further function. This function, offerAll, handles the creation of an offer, checks if any of the events in the offer can complete immediately, and otherwise records the offers in the event then waits for one of them to be completed by a later participants. It must use two transactions for this; one to make the offers (this transaction needs to finish for it to be visible to the other participants) and one to wait for an event to be completed. A crucial part of the function is not just knowing that an offer completed, but also knowing which one. For this we construct a TVar of our own into which a result can be written. This starts off as Nothing, and we later wait for it to become a Just value. We augment the user-supplied action-on-completion to also write into this TVar. The design of the algorithm as a whole ensures that this variable will only ever be written to once. Here is offerAll:

offerAll :: [(STM (), Event, a)] -> IO a
offerAll off
  = do tid <- myThreadId
       rtv <- atomically $ checkAll tid
       atomically $ readTVar rtv >>= maybe retry return    
  where
    checkAll tid
      = do rtv <- newTVar Nothing
           let offer = [(act >> writeTVar rtv (Just x), e) | (act, e, x) <- off]
           complete <- anyM (map checkComplete offer)
           unless complete $
              mapM_ (recordOffer (Offer tid [e | (_, e, _) <- off])) offer
           return rtv

This is all that is needed for events with choice at both ends. You call offerAll with a list of offers and it gives you back the value you associated with that offer.

The Public API

To wrap this into a communication channel with a CML-like API, we wrap it up as follows. First we declare an SEvent type (named after CML, hence the re-use of the term event for another meaning) that represents a synchronisation action; this is a list (of choices), each containing an internal event, an action to perform during the completion of the offer, and one to perform afterwards that will yield a return value (which we can use for a Functor instance):

data SEvent a = SEvent { sEvent :: [((STM (), STM a), Event)] }

instance Functor SEvent where
  fmap f (SEvent es) = SEvent [((dur, fmap f aft), e) | ((dur, aft), e) <- es]

choose :: [SEvent a] -> SEvent a
choose = SEvent . concatMap sEvent

You can see that the choose function simply joins lists of choices together. We define our synchronisation function using offerAll, which will return the corresponding afterwards-STM-action for the chosen event, which we then execute using atomically:

sync :: SEvent a -> IO a
sync (SEvent es) = offerAll [(dur,e,aft) | ((dur,aft),e) <- es] >>= atomically

Finally we can define a type for a synchronous communication channel, SChannel that joins together an event (the internal kind) and a transactional variable for passing the value:

data SChannel a = SChannel Event (TVar a)

send :: SChannel a -> a -> SEvent ()
send (SChannel e ctv) x = SEvent [((writeTVar ctv x, return ()), e)]

recv :: SChannel a -> SEvent a
recv (SChannel e ctv) = SEvent [((return (), readTVar ctv), e)]

The send function puts the value to send into the variable during the original event completion, and then afterwards the reader takes the value out of the variable at its leisure. (The implementation assumes that the same participants will use the channel each time; an extra level of indirection could be added to make the implementation more flexible in this regard.)

The code in this post provides nearly the same functionality as the CML library, but my tests indicate it is faster. I have now uploaded this code (with some documentation) as the sync package on Hackage. This provides a useful “lite” alternative to CHP that runs in the IO monad, and an alternative implementation of most of the features of the CML package.

Categories: Uncategorized Tags: , , ,

CHP vs CML, Forking and Picky Receivers

January 6, 2010 3 comments

CHP is a Haskell library that implements message-passing concurrency — and so is CML, a Haskell implementation of the concurrency features of Concurrent ML. Recently, someone asked on reddit what the differences are between the CHP and CML libraries. So in this post, I will perform a comparison by going through the API of the CML Haskell package and seeing how to implement the same functionality in CHP.

CML Events in CHP

CML is organised around Event types. An Event represents a sort of future synchronisation, that can be engaged in using the function sync :: Event a -> IO a. CHP doesn’t have this distinction as such; all events are monadic actions, and you engage in them when you execute them in the CHP monad. So in effect, anything of type Event a in CML is just CHP a in CHP. CML has a choose :: [Event a] -> Event a function for choosing between such events, which is identical to CHP’s function alt :: [CHP a] -> CHP a.

CML has a wrap :: Event a -> (a -> IO b) -> Event b function that allows you to specify a post-synchronisation operation. Given that we have already seen that the Event type is the CHP monad and that we would execute the operation in the CHP monad rather than IO, this operation in CHP would be something like wrapCHP :: CHP a -> (a -> CHP b) -> CHP b. I’m sure many Haskell programmers will recognise this as the type of monadic bind (>>=), and indeed monadic bind has the same semantics of wrap; it can choose and synchronise on the left-hand event, then later execute the right-hand operation with the result.

CML has a guard :: IO (Event a) -> Event a event for specifying a pre-synchronisation operation that is run every time a synchronisation is attempted on the given event. I have not come across this particular feature before, but it would be interesting to see how to achieve a similar effect in CHP if anyone has a use case for the guard function. CML also has a wrapabort :: IO () -> Event a -> Event a that allows you to specify an action to execute if the event is not chosen. Like guard, I am not sure what I might use this function for. It would be relatively easy to wrap up an alt to execute abort actions for non-chosen guards, though.

Forking

CML has a “fork and forget” spawning operator, spawn :: IO () -> IO ThreadId, that is really a synonym for forkIO. There are philosophical issues here. Fork and forget, which is present in many concurrency frameworks, can be very useful to set off processes that you just want to set going and not interact with, or to launch daemon processes that should run for the program’s lifetime. One problem with it is that the semantics are not always immediately clear — to what extent are the processes forgotten? For example, what do you think the output of this program will be?

import Control.Concurrent

main = forkIO (threadDelay 1000 >> putStrLn "Hello World?")

The question is: will the Haskell program wait for all the forked processes to finish before the main program finishes, i.e. will the text be printed? The answer seems to be: no; on my GHC 6.10.3 system, if I compile the above with -threaded and run it, there is no output, even if I take the delay down to zero. So if you want to wait for a particular forked process to complete before your program ends, you’ll need to add mechanisms to do so yourself. Or rather: when your main program finishes, you had better be sure that all the forked processes have finished, or that you are sure that you don’t care if they are still running.

To help with such issues, CHP offers runParallel, which we’ve seen a lot, but also something that I call scoped forking. There are two basic operations in scoped forking, centred around a monad transformer called ForkingT. (You can use ForkingT on any monad that is a member of the MonadCHP type-class, i.e. that has CHP at the bottom. The most obvious thing is to have the ForkingT CHP monad, but you can also have ForkingT (StateT Int CHP) or similar.) You fork off a process within this transformer using fork :: MonadCHP m => CHP () -> ForkingT m (). This forks off the first parameter, and is an action in the ForkingT transformed monad. You initiate one of these blocks using forking :: MonadCHP m => ForkingT m a -> m a. The important thing is that when the ForkingT block (the first parameter) finishes its own execution, the call to forking does not return until every forked process has also finished. So you can fork off as many processes as you like in the forking block, but at the end, you must wait for them all to finish.

So between runParallel and forking (the only mechanisms for running processes concurrently in CHP), there is always a defined, clear point in your program at which you will wait for any concurrent sub-processes to terminate. When your outer-most runCHP call finishes, it is not possible for these to still be any CHP processes running, so there is no confusion or race hazards involving termination semantics as there is with fork-and-forget functions such as forkIO.

Channels

Back to CML — the last item to cover is channels. CML has a type Channel a. It’s not clear from the documentation whether these channels support multiple readers and/or multiple writers, but let’s assume for simplicity that they don’t, and thus that this is the same as CHP’s OneToOneChannel a. There is a channel :: IO (Channel a) function for creating channels which is the same as CHP’s oneToOneChannel :: CHP (OneToOneChannel a). There is also a function to write to a channel, transmit :: Channel a -> a -> Event (), which is the same as CHP’s writeChannel :: Chanout a -> a -> CHP () except that CHP has the concept of channel-ends which CML does not. CHP uses channel-ends to make it clear that when a process is passed a Chanout end, it will be writing to that end of the channel, and cannot read from it. In this way, the directionality of the channels becomes apparent in the types, and there is some static guarantee of the patterns of usage of channels.

CML’s operation for reading from a channel is more interesting: receive :: Channel a -> (a -> Bool) -> Event a. The second parameter is a function that allows you to decide whether you want to engage in reading from the channel, based on the value being offered. Interestingly, the CSP calculus contains this feature, but it is not something I have added to CHP. I believe Erlang effectively also offers something like this (by pattern-matching on messages in a process’s message queue). I can see uses for this, and in CHP you would have to implement something like this by sending values on different channels, and choosing based on the channel the value was being communicated on rather than the value itself. I should perhaps add this to CHP in future, but for now this is a useful CML feature that CHP does not support. Without this aspect, receive is equivalent to CHP’s readChannel :: Chanin a -> CHP a.

Conclusions

The main difference between the features of CML and CHP are that CML is a nice small library, and CHP is a larger library with many more features (barriers, clocks, conjunction, parallel operators, poison, more channel types, traces, and the new higher-level features like the connectable type-class and behaviours). I think CHP subsumes CML for the features I want, but obviously I’m very biased. CML does have the benefit of operating in the IO monad, which can be more convenient than CHP’s eponymous monad. The reason for having the CHP monad is to support poison (more cleanly than using exceptions) and tracing, two features not present in CML.

In terms of the implementation, CHP uses STM and has a lot of logic to support some of the extra features (especially conjunction), but only uses as many threads as you would expect (one per thread of execution). CML uses MVars and has relatively simple algorithms, although they do spawn a lot of threads — from what I can understand of skimming the code, choose spawns off a new thread for every possibility being offered. The transactional events library does something similar. In contrast, CHP spawns off no threads for an alt call, and just performs one (potentially expensive) memory transaction then blocks until it is woken up by another thread offering to synchronise with it. That said, I expect CML may be faster than CHP for common cases.

Differences aside, CML and CHP are similar in their approach to concurrency (I can’t remember if CML was influenced by the CSP calculus, but it certainly has a similar approach to concurrency), so many of the design patterns should transfer between the two libraries — especially if CML’s developers were to add barriers in a future version.

Categories: Uncategorized Tags: ,
Follow

Get every new post delivered to your Inbox.