Home > Uncategorized > Parallel Composition in Haskell

Parallel Composition in Haskell

CHP, my Haskell concurrency library, allows you to run processes in parallel. One way of doing so is this binary operator:

(<||>) :: CHP a -> CHP b -> CHP (a, b)

Informally, the behaviour is as follows: this starts both processes running (using Haskell's forkIO under the hood) and waits for them both to terminate and return their results. They can terminate at different times, but the parent process (the one running that composition) will wait for both to terminate before it returns. This parallel composition is not uncommon -- for example, the parallel-io package provides a similar operation. In this post I'll talk about the properties we would expect from this composition, and how to make them hold in CHP -- but this easily generalises to making them hold for a parallel composition operator in the IO monad (CHP being a thin layer on top of IO).

Properties of Parallel Composition

I intuitively expect that parallel composition should be commutative: p <||> q should be the same as q <||> p. I'd also expect associativity; this:

do (x, (y, z) <- p <||> (q <||> r)

should be equivalent to:

do ((x, y), z) <- (p <||> q) <||> r

Another property (which I'll call the unit law) is that composing something in parallel with a null (or more generally, a short-running) computation should have no effect, that is p should be equivalent to:

p <||> return ()

Finally, I'd expect independence; if I compose p in parallel with q, I would not expect p to behave any differently (or have its environment behave any differently) than if it was not part of a parallel composition. So there are our four properties: commutativity, associativity, the unit law and independence. Let's consider the behaviour of our parallel composition, to try to make sure it satisifes all of these.

Normal Execution

For those, like me, who like concrete code to look at, a basic implementation of parallel composition that works with normal execution (we'll add to it as we go along) is:

(<||>) :: CHP a -> CHP b -> CHP (a, b)
(<||>) p q = liftIO_CHP $ do
  pv <- newEmptyMVar
  qv <- newEmptyMVar
  forkIO $ executeCHP p >>= putMVar pv
  forkIO $ executeCHP q >>= putMVar qv
  (,) <$> takeMVar pv <*> takeMVar qv

(We're using liftIO_CHP :: IO a -> CHP a and executeCHP :: CHP a -> IO a to go between the monads.)
If all the processes being composed execute and terminate successfully, all the properties easily hold. All the processes start up, they all run, and when they all complete we get the results back. If one of the processes doesn't terminate, the properties also all hold -- the composition waits for them all to terminate, so if p doesn't terminate, the whole composition will not terminate, no matter which order the processes are in, or how we've associated the composition.

Poison

(This is the only CHP-specific aspect of our parallel composition.) CHP processes don't have to terminate successfully; they can also terminate with poison. The semantics with respect to poison are simple, and preserve all our properties; if either process terminates with poison, the composition will terminate with poison once both child processes have finished. We can update our implementation, based around the adjusted type of executeCHP that now indicates whether a process terminated with poison:

data WithPoison a = Poison | NoPoison a
executeCHP :: CHP a -> IO (WithPoison a)

(<||>) :: CHP a -> CHP b -> CHP (a, b)
(<||>) p q = merge =<< liftIO_CHP (do
  pv <- newEmptyMVar
  qv <- newEmptyMVar
  forkIO $ executeCHP p >>= putMVar pv
  forkIO $ executeCHP q >>= putMVar qv
  (,) <$> takeMVar pv <*> takeMVar qv)
  where
    merge (NoPoison x, NoPoison y) = return (x, y)
    merge _ = throwPoison

This is very similar to our first definition, but it merges the values afterwards; they are only returned if neither side threw poison (otherwise poison is thrown). This again preserves all the properties, although associativity may require a little explanation. Consider p <||> (q <||> r). If p terminates with poison, the outer composition will wait for the inner composition to finish (which means waiting for q and r), then throw poison. If r terminates with poison, the inner composition will wait for q, then throw poison; the outer composition will wait for p to also finish, then throw poison. So the behaviour is the same no matter which part of the nested composition throws poison.

(Synchronous) Exceptions

The termination rules given for poison extend easily to exceptions (poison is really just a kind of exception anyway). If either process terminates with an exception (because it wasn't trapped, or it was rethrown), the parent process will rethrow that exception. If both processes throw an exception, an arbitrary one of the two is thrown by the parent process. We make exceptions "beat" poison: if one process throws an exception and the other exits with poison, the exception is rethrown and the poison is dropped. Our slightly adjusted implementation is now:

(<||>) :: CHP a -> CHP b -> CHP (a, b)
(<||>) p q = merge =<< liftIO_CHP (do
  pv <- newEmptyMVar
  qv <- newEmptyMVar
  forkIO $ wrap p >>= putMVar pv
  forkIO $ wrap q >>= putMVar qv
  (,) <$> takeMVar pv <*> takeMVar qv)
  where
    wrap proc = (Right <$> executeCHP proc) `catch`
                         (\(e :: SomeException) -> return (Left e))
    
    merge (Left e, _) = throw e
    merge (_, Left e) = throw e
    merge (Right Poison, _) = throwPoison
    merge (_, Right Poison) = throwPoison
    merge (Right (NoPoison x), Right (NoPoison y)) = return (x, y)

The wrap function catches any untrapped exceptions from the process, and sends them back to the parent process instead of a result. The merge function prefers exceptions to poison, and only returns a result when neither side had an exception or poison.

(Another possibility for exceptions would have been ignoring the exceptions from the child processes; this would maintain commutativity, associativity and independence, but would have broken the unit law, because when p threw an exception, it would not be propagate if p was wrapped inside a dummy parallel composition.)

Asynchronous Exceptions

Asynchronous exceptions can be received by a thread at any time (unless you mask them), without that thread doing anything to cause them. CHP is a concurrency library with its own mechanisms for communicating between threads and terminating, so I don't usually use asynchronous exceptions in CHP programs. But that doesn't stop them existing, and even if you don't use them yourself (via killThread or throwTo) it doesn't stop them occurring; asynchronous exceptions include stack overflow and user interrupts (i.e. Ctrl-C). So we need to take account of them, and preferably do it in such a way that all our expected properties of parallel composition are preserved.

When we execute a nested parallel composition, such as p <||> (q <||> r), we actually have five threads running: one each for p, q, and r, one for the outer composition and one for the inner composition:

(Each circle is a thread.) We must assume, in the general case, that any one of these threads could potentially receive an asynchronous exception (e.g. the system can kill any thread it likes with an async exception, according to the docs).

Child Process Receives an Asynchronous Exception

Let's start by discussing what happens if a child process such as q gets an asynchronous exception directly. One possibility is that q may trap the exception and deal with it. In that case, we're fine -- the parallel composition never sees the exception, and since it's been trapped by q, it shouldn't escape from q anyway. But q may not handle it (or may handle and rethrow, which is the same from the point of view of our operator). So what should happen when q terminates with this exception? One possibility would be to immediately kill its sibling (or throw the exception to them). This would completely break independence; r would now be vulnerable to being killed or receiving an asynchronous exception initially targeted at another process, just because it happens to be composed with q. The only way to preserve independence is to treat the asynchronous exception in a child process as we do synchronous exceptions; we wait for both processes to terminate, and then the parent exits by rethrowing that asynchronous exception. This preserves commutativity, associativity (no matter which of p, q and r get the exception, all will be allowed to finish, and then the outer composition will end up rethrowing it, potentially via the inner composition doing the same) and independence. Our unit law is preserved if p receives the exception (see the next section for more discussion on the unit law).

Parent Process Receives an Asynchronous Exception

Now we move on to consider what happens if the parent process in a composition receives an asynchronous exception. This is actually one of the most likely cases, because if your program consists of:

main = runCHP (p <||> q)

If the user hits Ctrl-C, it's the parent process that will get the exception, not p or q. We mustn't terminate only this thread and leave p and q dangling. The whole point of the parallel composition is that you scope the lifetime of the processes, and we don't want to break that. We could catch the exception, wait for p and q to terminate and then rethrow it. But if the exception is something like a user-interrupt then it seems wrong to ignore it while we wait for the oblivious child processes to terminate. So the semantics are this: when the parent process receives an asynchronous exception, it rethrows it to all the child processes. This trivially preserves commutativity. Independence is also satisfied; if the processes weren't in the parallel composition, they would receive the exception directly, so rethrowing it to them is the same effect as if they weren't composed together.

The unit law and associativity are in danger, though. Consider our unit law example: p <||> return (). If p does not trap the asynchronous exception that the parent throws to it, the unit law is preserved; p will get the exception, and then exit with it (regardless of the other side), which will cause the parent to exit with the same exception, so it is as if the composition was not there. If p does trap the exception, the behaviour becomes a race hazard with one of two behaviours (the difference is bolded):

  1. The parent receives the exception, and throws it to its children. The return () process has not yet terminated, this then terminates with an uncaught exception. p traps the exception and deals with it, but when the parallel composition exits, the exception is rethrown from the return () branch; this exception would not have been thrown if p was executing alone, because p would have caught it.
  2. The parent receives the exception, and throws it to its children. The return () process has already terminated succesfully. The exception is thus only thrown to p. p traps the exception and deals with it, so when the parallel composition exits, the exception is not visible, just as if p was executing alone.

There are two ways to "fix" this, by adjusting our unit law: one is to say that the unit law only holds if child processes do not trap exceptions. The other is to say that the unit of parallel composition is not return (), but rather:

return () `catch` (\(e :: SomeException) -> return ()

The problem with that is that catch is in the IO monad, and to be part of the parallel composition it needs to be in the CHP monad, which requires a lot of type-fiddling. I'm still thinking about this one for CHP, but if you were dealing with IO, changing the unit to the above would probably make most sense.

Associativity also has some caveats. Here's our diagram again, with the outer composition at the top, and the inner composition its right-hand child:

If the outer composition receives an exception, all is well; the exception is thrown to the children, which includes the inner composition -- and the inner composition throws to its children, so all the processes get it. If any of them don't trap the exception, the exception will be the result of the whole composition no matter which way it was associated -- and if all trap it, associativity is still preserved. However, if the inner composition receives an exception, associativity is not preserved; the inner composition will throw the exception to its children but the outer composition will not know about the exception immediately, so only the processes in the inner composition will see the exception. Now it matters which processes are in the inner composition and which is in the outer composition. But this seems morally fair: if an exception is thrown to an inner part of the whole composition, that already depends on the associativity, so it's unreasonable to expect that the composition can function the same regardless of associativity.

Masking Asynchronous Exceptions

When defining our parallel composition operator, we also need to be careful about precisely when asynchronous exceptions might occur. One major difference between poison and asynchronous exceptions is that poison can only occur when you try to use a channel or barrier, whereas asynchronous exceptions can occur any time. This is a plus and minus point for both sides; it means poison is easier to reason about, but asynchronous exceptions can interrupt processes which are not using channels or barriers (e.g. that are blocked on an external call). To make sure we don't receive an asynchronous exception at an awkward moment, such as inbetween forking off p and forking off q (which would really mess with our semantics!), we must mask against asynchronous exceptions, and only restore them inside the forked processes. You can read more about masking in the asynchronous exception docs. So, the final adjusted definition of parallel composition that I will give here is as follows (we don't need a restore call around takeMVar because the blocking nature implicitly unmasks exceptions):

(<||>) :: CHP a -> CHP b -> CHP (a, b)
(<||>) p q = merge =<< liftIO_CHP (mask $ \restore -> do
  pv <- newEmptyMVar
  qv <- newEmptyMVar
  let wrap proc = restore (Right <$> executeCHP proc) `catch`
                    (\(e :: SomeException) -> return (Left e))
  pid <- forkIO $ wrap p >>= putMVar pv
  qid <- forkIO $ wrap q >>= putMVar qv
  let waitMVar v = takeMVar v `catch` (\(e :: SomeException) ->
                    mapM_ (flip throwTo e) [pid, qid] >> waitMVar v)
  (,) <$> waitMVar pv <*> waitMVar qv)
  where
    merge (Left e, _) = throw e
    merge (_, Left e) = throw e
    merge (Right Poison, _) = throwPoison
    merge (_, Right Poison) = throwPoison
    merge (Right (NoPoison x), Right (NoPoison y)) = return (x, y)

(In fact, this isn't quite enough. I'm currently adding a finallyCHP function that acts like its IO equivalent, and to support that I must push the restore call inside executeCHP to avoid asynchronous exceptions being acknowledged before the finallyCHP call is executed, but that's a bit further into CHP's internals than I want to go in this post.)

Hopefully that was a useful tour of parallel composition semantics in Haskell. Synchronous exceptions are easily dealt with, but asynchronous exceptions (which were perhaps designed more for the forking model of thread creation than this style of parallel composition) make things a fair bit trickier.

About these ads
Categories: Uncategorized
  1. Sebastian
    June 2, 2011 at 7:16 pm

    Did you think about providing a parallel

    () :: CHP (a -> b) -> CHP a -> CHP b

    and how the Applicative laws relate to your desired laws?

    • June 2, 2011 at 7:24 pm

      I don’t have a type wrapper for that at the moment. CHP is an Applicative, but only in the classic based-on-Monad way:

      pure = return
      () = ap

      You can accomplish the parallel operator with:

      uncurry ($) (p q)

      It’s not something I’ve needed in the past (CHP is focused on concurrency rather than parallelism, and so the parallel applicative has never come up, for me at least), but it might be interesting to look at the laws.

      • Sebastian
        June 2, 2011 at 7:29 pm

        Both versions are inter-definable. I think commutativity corresponds to the Applicative “interchange” law and, maybe, associativity corresponds to the “composition” law somehow.

  2. Eric
    June 12, 2011 at 3:16 pm

    Hello Neil,

    I am trying to get familiar with your library but the “Communicating Haskell Processes tutorial” is no longer up to date. Is there more current documentation that I could read instead?

    Regards,

    • Eric
      June 12, 2011 at 3:26 pm

      Scratch that — found your thesis. Looks like there some good examples there.

      Thx,

  3. Josef
    June 28, 2011 at 4:26 pm

    The types in the unit law don’t match. I would expect that you would say that ‘p’ is equivalent to ‘fmap fst $ p <||> return ()’.

    • June 28, 2011 at 7:47 pm

      It appears that I forgot to add my standard disclaimer: I’m consider semantic equivalence here, and as you rightly point out, I’ve omitted the small pure rearrangements needed on some of the laws. So for example, I say that p q is the same as q <||> p, but the full rule would be that p <||> q is the same as (\(x, y) -> (y, x)) <$> (q <||> p). And as you say, p should be equivalent to fst <$> p <||> return ()

  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: