Poison: Concurrent Termination
All programs must come to an end. At some point, a program will need to terminate, either because it has finished its task, or because the user has told it to terminate — or perhaps because the program has encountered an unrecoverable error. Especially in the latter two cases (where the termination is unanticipated), the program may have files or sockets open and so we need to terminate it gracefully, tidying up all such externalities.
In Haskell, we differentiate between pure computations, and side-effecting computations. Pure computations, used by many parallel mechanisms, can be terminated at any time without needing to tidy up — or rather, all that needs tidying up is immediately apparent to the run-time. Side-effecting computations, which are used with concurrency mechanisms such as MVars, STM and our own CHP, need extra code to deal with termination.
Sequential imperative programs often deal with this via exception handlers — the termination becomes an exception that unwinds the call stack, tidying up all current work (call-stack entries) until it reaches the top of the program. Haskell’s error monads can provide similar functionality.
Concurrency introduces two issues that can make termination much more difficult. Firstly, there is the issue of notifying all concurrent threads about the termination. The termination usually originates in one concurrent thread — the thread that handled the user request, the thread that encountered the error, or the thread that was collecting the results of the work being done. This thread needs to tell all the other concurrent threads in the system that it is time to terminate — bearing in mind that the other threads might be busy doing something else (e.g. computation, reading from a file, waiting to communicate with other threads, and so on). Secondly, there is the problem that the program no longer consists of a single call stack that can be unwound. It instead consists of lots of different call stacks.
Haskell now has asynchronous exceptions, which can be used for terminating a concurrent system. Asynchronous exceptions are quite a neat idea, but they do require thought over the use of block and unblock to get the right semantics, as well as it being a little overwhelming as to when exceptions might occur (i.e. at any time!). Asynchronous exceptions also introduce a book-keeping problem: which thread is responsible for throwing to which thread? You could keep a registry of threads and make any thread wishing to terminate throw to all of them (a machine-gun approach to terminating the system), but then you may end up with race hazards if new threads are created by a still-running thread while you are attempting to kill off all the threads and so on. Furthermore, some threads may only be able to tidy up once their children threads have tidied up — for example, a thread may have initialised an external library and spawned children to work with the library, but may need to call a finalising function in the external library once all the children have tidied up — and not before.
CHP introduces poison as a way to terminate concurrent systems. The idea is that the communication/synchronisation network in your program already provides a way to link your program together. Using the channels to send termination messages is an approach fraught with race hazards (see the classic paper on the matter, although poison improves on the solution offered there) — poison is not about sending messages. In CHP you can poison a channel, barrier and clock (I’ll talk about channels here, but it applies equally to barriers and clocks) . This sets the channel into a poisoned state. Forever after, any attempt to use that channel will cause a poison exception to be thrown in the thread that made the attempt. If a process is waiting on a channel when the other end is poisoned, the waiting process is woken and the exception is thrown. Thus poison is really a system of synchronous exceptions, that can occur only when you attempt to communicate on a channel. This notifies your immediate neighbours (i.e. those with whom you share a channel) that they should tidy up and terminate. The key is that on discovering the poison, a process should poison all its channels (repeated poisonings of the same channel are benign, and will not throw a poison exception), thus spreading the poison around the network.
An example is shown in a sequence of diagrams below. The network begins unpoisoned (1), with boxes for processes and arrows for channels connecting them. The mid-bottom process introduces poison into its channels and terminates (2) — poisoned channels are shown green, and terminated processes with a red cross. Any process that notices the poison on any of its channels terminates and poisons its other channels (3,4) until the whole network has terminated (5). The poison may not happen to spread in such a lock-step fashion as shown here, but the ordering of termination using poison does not matter.
Poison should not deadlock a process network, because the ordering of poison does not matter, and once all the channels are poisoned, there is nothing further that a process can wait on without being woken up. There are some tricky corner cases with poison, but I will discuss those in another post. To show how poison is used in the code, I will use my recent prime number sieve example. Here was the code before:
import Control.Concurrent.CHP import Control.Concurrent.CHP.Utils import Control.Monad filterDiv :: Integer -> Chanin Integer -> Chanout Integer -> CHP () filterDiv n input output = forever $ do x <- readChannel input when (x `mod` n /= 0) $ writeChannel output x end :: Chanin Integer -> Chanout Integer -> CHP () end input output = do x <- readChannel input writeChannel output x (filterDiv x |->| end) input output genStream :: Chanout Integer -> CHP () genStream output = mapM_ (writeChannel output) [2..] primes :: Chanout Integer -> CHP () primes = genStream ->| end main :: IO () main = runCHP_ $ do c <- oneToOneChannel primes (writer c) <||> (forever $ readChannel (reader c) >>= (liftIO . putStrLn . show))
This is a complete example program that runs forever spitting out primes. Let’s imagine that we want to stop the program after a time — for example, we may want only the first 100 primes. First, we must add a poison handler to each process. The filterDiv process shows the typical poison handler:
filterDiv :: Integer -> Chanin Integer -> Chanout Integer -> CHP () filterDiv n input output = (forever $ do x <- readChannel input when (x `mod` n /= 0) $ writeChannel output x ) `onPoisonRethrow` (poison input >> poison output)
That’s usually all that is involved — tacking an onPoisonRethrow block on the end that poisons all the channels that the filterDiv process is aware of. Here, the onPoisonRethrow block can either be outside the forever (which is broken out of by the poison exception) or inside the forever (rethrowing the poison exception would similarly break out of the forever). Our end process is more interesting, as it contains a parallel composition (via the |->| operator). These are the rules for parallel composition:
A parallel composition returns once all of its children have terminated (successfully, or through poison). Once they are all terminated, if any of them terminated because of poison, a poison exception is also thrown in the parent process.
So, back to our end process. There are two ways we could add a poison handler. This is the simple way, tacking on a poison handler as before:
end :: Chanin Integer -> Chanout Integer -> CHP () end input output = (do x <- readChannel input writeChannel output x (filterDiv x |->| end) input output ) `onPoisonRethrow` (poison input >> poison output)
This will give the correct effect; if any poison is encountered during the input or output on the first line of the do block, the handler will poison both channels. If any poison is discovered in the filterDiv or end sub-processes, the channels will similarly be poisoned. However, if any poison is discovered in the filterDiv or end sub-processes, they will already have poisoned the input and output channels. Again, this multiple poisoning is harmless.
We do not need to add handlers to genStream or primes; genStream only has one channel, so if poison is encountered there, all of its (one) channels must already be poisoned, and primes is merely a parallel composition of two processes that will deal with any poison encountered. So our final change is to main, to shut the program down after 100 primes have come out:
main :: IO () main = runCHP_ $ do c <- oneToOneChannel primes (writer c) <||> (do replicateM_ 100 $ readChannel (reader c) >>= (liftIO . putStrLn . show) poison (reader c))
That poison command is what introduces poison into our pipeline. Thereafter, our poison handlers will take care of spreading the poison all the way along the pipeline and shutting down our whole process network without fear of deadlock. So primes exits from being poisoned, our prime-printing mini-process exits successfully, and thus the parallel composition (and hence runCHP_) also exits with poison. This is quite typical of a CHP program, and is not (necessarily) indicative of a failure.
Poison is a fitting way to terminate CHP programs, and poison handlers are fairly simple to write. It is a good idea to add them to all your CHP programs, but since they are very formulaic I will often leave them out of my blog examples so that they don’t get in the way of what I’m trying to explain.