Home > Pearls > Concurrent Pearl: The Sort Pump

## Concurrent Pearl: The Sort Pump

A common feature of many Computer Science courses is the issue of sorting: you may recall the bubble sort, the insertion sort, the merge sort and the quick sort. You may also know their time complexity — bubble sort is O(N^2), meaning the time it takes is proportional to the square of the number of items involved. In contrast, quick sort can reach O(N*log N) in some cases, which — crucially — is better. These are all sequential algorithms though — what happens when we want to make a concurrent algorithm?

There is a parallel version of quicksort, where as you divide and conquer your list, you fork off the two divisions into separate threads, and there are probably other pure parallel sorting strategies too. With CHP we can approach the problem differently, using a concurrent sort pump. I want to explain the sort pump because I like its design, not because it is blazingly fast. We’ll begin with a simple component process:

```highest :: Ord a => Chanin a -> Chanout a -> CHP ()
highest input output = readChannel input >>= highest'
where
highest' x = do y <- readChannel input
let (low, high) = if x < y then (x, y) else (y, x)
writeChannel output low
highest' high
```

This process starts by reading in a value to hold. Then it repeatedly reads in a new value, keeping the highest one and sending on the lowest one. So if you feed it a stream of values, it will keep hold of the highest value it has seen, and will pass on all lower values. Now let’s consider what happens if you connect two of these processes together. The first process will keep the highest value, sending on all others. The second process will keep the highest value of these others — i.e. the second highest value of all of them. Connect three together, and the third will hold the third highest value. Connect N of these together, pass in N values, and the Mth process will hold the Mth highest value. Voila, a sort pump!

There is a problem with our sort pump though — it sorts the values, but we have no way of getting the result out! So we need to augment our component to permit the “flushing” of the pump:

```data FlushOrNormal a = Flush [a] | Normal a

highestFlush :: Ord a => Chanin (FlushOrNormal a) -> Chanout (FlushOrNormal a) -> CHP ()
highestFlush input output = empty
where
empty = do x <- readChannel input
case x of
Flush _ -> writeChannel output x >> empty
Normal y -> full y

full cur = do x <- readChannel input
case x of
Flush xs -> writeChannel output (Flush (cur:xs)) >> empty
Normal new -> let (low, high) = if cur < new then (cur, new) else (new, cur)
in writeChannel output (Normal low) >> full high
```

If the component receives a flush signal, it will prepend its currently held value to the list and send on the flush message. Because the first process has the highest value, and all processes prepend, the list ends up in ascending order. If a normal (non-flush) value is received, the component behaves as before. We can then wrap a pipeline of these processes in a tidy process that takes in a list of items, sorts them and sends them out again:

```sorterFixedCapacity :: Ord a => Int -> Chanin [a] -> Chanout [a] -> CHP ()
sorterFixedCapacity n input output
= do pumpIn <- oneToOneChannel
pumpOut <- oneToOneChannel
pipeline (replicate n highestFlush) (reader pumpIn) (writer pumpOut)
<|*|> (forever \$ do xs <- readChannel input
mapM_ (writeChannel (writer pumpIn) . Normal) xs
writeChannel (writer pumpIn) \$ Flush []
writeChannel output xs')
```

This sort pump is fixed capacity — if you send the above process a list that is longer than its stated capacity, it won’t work correctly. I will not go into the solutions in detail here, but there are are several — we could set-up and tear-down the list of processes each time if we wished, and we could also make the pump automatically expand.

Finally, let’s think about the sort pump’s time complexity. Each value will pass through half the pipeline on average (not counting the flush), so N/2 communications per item. This happens to N items, so N*N/2 communications (plus N for the flush, but that falls away) [Correction: this should be N*(N+1)/2 — see the comments below]. Each component will see on average N/2 items, and will thus perform N/2 comparisons, so N*N/2 comparisons [Correction: actually, this should be N*(N-1)/2, I think — see the comments]. Our time complexity is therefore O(N^2) overall, but importantly if you had N processors (and very fast communications), each process would run O(N) operations in parallel, and thus you would have an O(N) sorting algorithm. Although, as the corrections indicate, some of this can be a bit harder to calculate than it first appears!

While our theoretical bound is good, in practice the sort pump is only going to stand a chance of good performance if you have very few items that are very expensive to compare (so that the computation — i.e. the comparisons — outweigh the communications). The sort pump is really a concurrent version of the bubble sort, and shares many of its attributes: small, built on simple principles — but slow.

Categories: Pearls
1. September 18, 2009 at 5:53 pm

QuickSort is O(N^2), but the average case (under certain circumstances) is O(N log N). I also think your complexity analysis is dodgy – O(N * N/2) in a decreasing series can be bounded at log N, but the crucial thing is you aren’t O(N * N/2), at worst you’er O(N * N-1) (if you picked the lowest element in the list as a pivot)

2. September 18, 2009 at 6:19 pm

Thanks for the quicksort correction (now fixed). However, I don’t follow the second part of your comment. I mention O(N*N/2) twice near the end, about the sort pump (which has no pivot) rather than about quick sort. I’ll try and explain the time complexity of the sort pump a little further.

First, communication. The item that ends up in the final (Nth) cell must have be sent N times (including the initial injection into the first component) to reach the final component. The item in the (N-1)th cell must have been sent N-1 times, and the item in the first cell was sent once. So overall we actually have (sum [1..n]) communications, which is N*(N+1)/2 — I’ve corrected that, but it still ends up an O(N^2) number of communications, which is fixed (i.e. it does not depend on the data set).

Second, comparisons. We can actually cheat here, and note that each input into a component after the first is always followed by a comparison, and that is the only time a comparison is triggered. So the number of comparisons must be the number of communications minus N, i.e. N*(N-1)/2. This again is a fixed number that does not depend on the data set.

3. October 1, 2009 at 11:38 am

Nice post, although the assumptions required for this to be considered an O(n) sort do seem rather more absurd than calling a radix sort an O(n) sort, which in my opinion would be much more honest to be termed an O(n * log m) sort, with m being the length of the keys being sorted.

Out of curiosity, are you familiar with Occam? Occam is one of those languages I’ve been meaning to learn, although my energy for learning new languages has really tapered off. How does CHP compare to Occam?

• October 1, 2009 at 1:28 pm

I originally got into Haskell while writing an occam compiler, so yes — I’m familiar with it 🙂 CHP is part of a family of libraries (JCSP, C++CSP) that were developed at Kent based on the occam language, which we also work with (see http://www.cs.kent.ac.uk/research/groups/plas/projects.html ). occam can express some bits more concisely (the channel communications, for example) but Haskell is actually neater than occam for many things. The main difference is that occam is blazingly-fast, while CHP is not. I’m working on speed-up…

4. October 1, 2009 at 7:04 pm

Yeah, I saw occam listed on the CHP homepage that I visted shortly after commenting. 🙂

This is definitely interesting; I’ll probably browse the source a bit and if I notice anything obviously bad from a performance perspective, I will yell.

1. September 19, 2009 at 6:07 pm
2. September 28, 2009 at 4:17 pm