### Archive

Archive for the ‘Pearls’ Category

## Concurrent Pearl: The Expanding Prime Pipeline

Process networks in CHP do not have to be static. In fact, some of the more interesting examples are dynamic. To illustrate a dynamic process pipeline I will use the example task of generating prime numbers.

One way to find prime numbers is as follows. Start with an empty list of primes, and the integer 2. Check if the integer is divisible by any of the numbers in the list of primes. If yes, discard it. If no, add it to the list. Then try the next integer (i.e. add one), and continue indefinitely. A very simple, if inefficient algorithm. (This is not the Sieve of Eratosthenes — see this paper.)

Let’s see how we can turn this algorithm into a CHP program. Every time we find a prime, we want to add it to our list of numbers to filter out in future. We can implement this list as a concurrent process pipeline — each process in the pipeline will be responsible for filtering out numbers divisible by a single prime. Such a process is quite straightforward:

```filterDiv :: Integer -> Chanin Integer -> Chanout Integer -> CHP ()
filterDiv n input output
= forever \$ do x <- readChannel input
when (x `mod` n /= 0) \$
writeChannel output x
```

Every time we find a prime, we need to add a new filterDiv process on to the end of our pipeline. This may sound difficult, but can be easily expressed with some concurrent recursion:

```end :: Chanin Integer -> Chanout Integer -> CHP ()
end input output = do x <- readChannel input
writeChannel output x
(filterDiv x |->| end) input output
```

This process sits at the end of our pipeline and reads in a single value. Since our pipeline is filtering out all non-primes, any number that this “end” process receives must be prime. In response to receiving a prime, it sends it on (to whatever process is waiting beyond the prime pipeline to receive the primes). It then becomes the parallel composition of a new filterDiv process, connected to a new end process (the |->| operator expresses this composition). Thus every time a new prime comes out of the end of the pipeline, the pipeline grows by one more process. After the 100th prime, the pipeline will consist of 100 filterDiv processes executing concurrently, and one “end” process.

To complete our pipeline, we need a process at the beginning of the pipeline, supplying an ever-increasing stream of numbers on its output channel, starting at 2:

```genStream :: Chanout Integer -> CHP ()
genStream output = mapM_ (writeChannel output) [2..]
```

We can now write a primes process that generates a list of prime numbers. Using another helpful operator from the CHP library, this is easy:

```primes :: Chanout Integer -> CHP ()
primes = genStream ->| end
```

Observe that there is no mention of filterDiv. The pipeline starts out with the stream generator and an end process. All the filterDiv processes are added once the primes start coming out of the pipeline, beginning with 2, as shown in the diagram below:

The Expanding Prime Pipeline. Each time that the end process receives a number, it splits itself into a filterDiv process (for filtering out all numbers divisible by that prime) and a new end process. Thus the pipeline grows to filter out all numbers divisible by lower primes, and the only numbers that the end process receives are prime numbers. Here, 4 would have been discarded by filterDiv 2.

Note also that the interface of primes is simply Chanout Integer -> CHP (); all the internal details of the expanding concurrent pipeline are hidden from the outside, leaving only an opaque box:

To make a complete program, we can compose this in parallel with a process responsible for printing the primes:

```main :: IO ()
main = runCHP_ \$ do c <- oneToOneChannel
primes (writer c) <||>
```

The expanding pipeline is not restricted to conceptual examples such as generating the primes, and the powerful idea of a process becoming the parallel composition of sub-processes is not restricted to pipelines — two things that I hope to explore in future.

Categories: Pearls

## Concurrent Pearl: The Sort Pump

A common feature of many Computer Science courses is the issue of sorting: you may recall the bubble sort, the insertion sort, the merge sort and the quick sort. You may also know their time complexity — bubble sort is O(N^2), meaning the time it takes is proportional to the square of the number of items involved. In contrast, quick sort can reach O(N*log N) in some cases, which — crucially — is better. These are all sequential algorithms though — what happens when we want to make a concurrent algorithm?

There is a parallel version of quicksort, where as you divide and conquer your list, you fork off the two divisions into separate threads, and there are probably other pure parallel sorting strategies too. With CHP we can approach the problem differently, using a concurrent sort pump. I want to explain the sort pump because I like its design, not because it is blazingly fast. We’ll begin with a simple component process:

```highest :: Ord a => Chanin a -> Chanout a -> CHP ()
highest input output = readChannel input >>= highest'
where
highest' x = do y <- readChannel input
let (low, high) = if x < y then (x, y) else (y, x)
writeChannel output low
highest' high
```

This process starts by reading in a value to hold. Then it repeatedly reads in a new value, keeping the highest one and sending on the lowest one. So if you feed it a stream of values, it will keep hold of the highest value it has seen, and will pass on all lower values. Now let’s consider what happens if you connect two of these processes together. The first process will keep the highest value, sending on all others. The second process will keep the highest value of these others — i.e. the second highest value of all of them. Connect three together, and the third will hold the third highest value. Connect N of these together, pass in N values, and the Mth process will hold the Mth highest value. Voila, a sort pump!

There is a problem with our sort pump though — it sorts the values, but we have no way of getting the result out! So we need to augment our component to permit the “flushing” of the pump:

```data FlushOrNormal a = Flush [a] | Normal a

highestFlush :: Ord a => Chanin (FlushOrNormal a) -> Chanout (FlushOrNormal a) -> CHP ()
highestFlush input output = empty
where
empty = do x <- readChannel input
case x of
Flush _ -> writeChannel output x >> empty
Normal y -> full y

full cur = do x <- readChannel input
case x of
Flush xs -> writeChannel output (Flush (cur:xs)) >> empty
Normal new -> let (low, high) = if cur < new then (cur, new) else (new, cur)
in writeChannel output (Normal low) >> full high
```

If the component receives a flush signal, it will prepend its currently held value to the list and send on the flush message. Because the first process has the highest value, and all processes prepend, the list ends up in ascending order. If a normal (non-flush) value is received, the component behaves as before. We can then wrap a pipeline of these processes in a tidy process that takes in a list of items, sorts them and sends them out again:

```sorterFixedCapacity :: Ord a => Int -> Chanin [a] -> Chanout [a] -> CHP ()
sorterFixedCapacity n input output
= do pumpIn <- oneToOneChannel
pumpOut <- oneToOneChannel
pipeline (replicate n highestFlush) (reader pumpIn) (writer pumpOut)
<|*|> (forever \$ do xs <- readChannel input
mapM_ (writeChannel (writer pumpIn) . Normal) xs
writeChannel (writer pumpIn) \$ Flush []