Home > Pearls > Concurrent Pearl: The Expanding Prime Pipeline

Concurrent Pearl: The Expanding Prime Pipeline

pearlProcess networks in CHP do not have to be static. In fact, some of the more interesting examples are dynamic. To illustrate a dynamic process pipeline I will use the example task of generating prime numbers.

One way to find prime numbers is as follows. Start with an empty list of primes, and the integer 2. Check if the integer is divisible by any of the numbers in the list of primes. If yes, discard it. If no, add it to the list. Then try the next integer (i.e. add one), and continue indefinitely. A very simple, if inefficient algorithm. (This is not the Sieve of Eratosthenes — see this paper.)

Let’s see how we can turn this algorithm into a CHP program. Every time we find a prime, we want to add it to our list of numbers to filter out in future. We can implement this list as a concurrent process pipeline — each process in the pipeline will be responsible for filtering out numbers divisible by a single prime. Such a process is quite straightforward:

filterDiv :: Integer -> Chanin Integer -> Chanout Integer -> CHP ()
filterDiv n input output
  = forever $ do x <- readChannel input
                 when (x `mod` n /= 0) $
                   writeChannel output x

Every time we find a prime, we need to add a new filterDiv process on to the end of our pipeline. This may sound difficult, but can be easily expressed with some concurrent recursion:

end :: Chanin Integer -> Chanout Integer -> CHP ()
end input output = do x <- readChannel input
                      writeChannel output x
                      (filterDiv x |->| end) input output

This process sits at the end of our pipeline and reads in a single value. Since our pipeline is filtering out all non-primes, any number that this “end” process receives must be prime. In response to receiving a prime, it sends it on (to whatever process is waiting beyond the prime pipeline to receive the primes). It then becomes the parallel composition of a new filterDiv process, connected to a new end process (the |->| operator expresses this composition). Thus every time a new prime comes out of the end of the pipeline, the pipeline grows by one more process. After the 100th prime, the pipeline will consist of 100 filterDiv processes executing concurrently, and one “end” process.

To complete our pipeline, we need a process at the beginning of the pipeline, supplying an ever-increasing stream of numbers on its output channel, starting at 2:

genStream :: Chanout Integer -> CHP ()
genStream output = mapM_ (writeChannel output) [2..]

We can now write a primes process that generates a list of prime numbers. Using another helpful operator from the CHP library, this is easy:

primes :: Chanout Integer -> CHP ()
primes = genStream ->| end

Observe that there is no mention of filterDiv. The pipeline starts out with the stream generator and an end process. All the filterDiv processes are added once the primes start coming out of the pipeline, beginning with 2, as shown in the diagram below:

Expanding Prime Pipeline

The Expanding Prime Pipeline. Each time that the end process receives a number, it splits itself into a filterDiv process (for filtering out all numbers divisible by that prime) and a new end process. Thus the pipeline grows to filter out all numbers divisible by lower primes, and the only numbers that the end process receives are prime numbers. Here, 4 would have been discarded by filterDiv 2.

Note also that the interface of primes is simply Chanout Integer -> CHP (); all the internal details of the expanding concurrent pipeline are hidden from the outside, leaving only an opaque box:

primes-2

To make a complete program, we can compose this in parallel with a process responsible for printing the primes:

main :: IO ()
main = runCHP_ $ do c <- oneToOneChannel
                    primes (writer c) <||>
                      (forever $ readChannel (reader c) >>= (liftIO . putStrLn . show))

The expanding pipeline is not restricted to conceptual examples such as generating the primes, and the powerful idea of a process becoming the parallel composition of sub-processes is not restricted to pipelines — two things that I hope to explore in future.

Categories: Pearls
  1. Robert Lee
    September 24, 2009 at 6:19 pm

    Please keep up the great work. Learning CHP by example is very helpful stuff.

  2. jberryman
    September 24, 2009 at 10:07 pm

    Cool! Did you do any tests of performance?

    • September 24, 2009 at 10:19 pm

      I suspect that like many of my pearls, the concurrency is too fine-grained to add performance as it stands — let alone the fact that the trial division algorithm itself (even in standard Haskell) is rather slow. Each number will be communicated at least once — but then half will be knocked out by a simple division by two in the adjacent process! In fact, the activity of the pipeline will follow a sort of power law: half the numbers will be discarded by the first process, one sixth by the second process (the numbers divisible by three but not two), and one fifteenth by the third process (I think!). So the load is not even either. It might be interesting to try to convert the real Sieve of Eratosthenes given in the paper link in CHP and try to get performance out of that — I’ll try to look at that in future.

  3. Felipe Lessa
    September 26, 2009 at 12:38 pm

    The sieve you say? Hmmm, what about something like

    filterDiv k input output = go k
    where
    go n = do x <- readChannel input
    if (x == n)
    then go (n+k) — ignore
    else writeChannel output x >> go n

    Closer to the sieve, probably as close as you can get with a pipeline, I’d say. ๐Ÿ™‚

    • September 28, 2009 at 4:44 pm

      That definition doesn’t work, do you see why? ๐Ÿ™‚

  4. Lanny Ripple
    September 29, 2009 at 5:16 pm

    The sieve gets its speed from only needing to grow its control structure at the 4th root of the dataset. To evaluate if a number N is prime we only need to see if it’s divisible by primes up to N^1/2. To find the dividing primes we only need to check their divisors so since the largest divisor of N cannot be > N^1/2 we only need to have used filters of primes N^1/2.

  5. October 3, 2009 at 6:37 pm

    Process networks can often be more perspicuously expressed
    in terms of lists. Then the burden of setting up the network
    and sending and receiving messages is assumed by the lazy
    evaluator. Here’s the prime pipeline written in this style–
    a well burnished pearl.

    primes = sieve [2..] where
    sieve (x:xs) = x : sieve [y | y<-xs, y `mod` x /= 0]

  1. September 25, 2009 at 2:10 pm
  2. October 2, 2009 at 12:12 pm
  3. November 14, 2009 at 6:10 pm

Leave a reply to Doug McIlroy Cancel reply