Home > Uncategorized > Growing Sort Pump

Growing Sort Pump

In previous posts, I have explained the (fixed capacity) sort pump, and expanding pipelines. We can combine these ideas to form an expanding sort pump that grows as required.

The sort pump as originally described was fixed capacity — if you send that sort pump a list that is longer than its stated capacity, it wouldn’t work correctly. We could set-up and tear-down the list of processes each time if we wished, but there is also another solution — a pipeline of processes that automatically grows to the longest required length.

To implement the growing pipeline, we pass each component process a flag that indicates whether it is the final process in the pipeline. If this flag is True (it’s the last process) and the process wants to send a non-flush message onwards (there’s no-one there!), it automatically splits itself into two component processes. By this mechanism, the pipeline will grow as long as needed, with the last process always splitting itself into two when needed:

highestFlushGrow :: Ord a => Bool -> Maybe a -> Chanin (FlushOrNormal a) -> Chanout (FlushOrNormal a) -> CHP ()
highestFlushGrow last initVal input output = maybe empty full initVal
    empty = do x <- readChannel input
               case x of
                 Flush _ -> writeChannel output x >> empty
                 Normal y -> full y

    full cur = do x <- readChannel input
                  case x of
                    Flush xs -> writeChannel output (Flush (cur:xs)) >> empty
                    Normal new -> let (low, high) = if cur < new then (cur, new) else (new, cur)
                      in if not last
                           then writeChannel output (Normal low) >> full high
                           else (highestFlushGrow False (Just high) |->| highestFlushGrow True (Just low)) input output

So now our wrapper process doesn’t need to start a pipeline of these things — it just starts with a single process, and the automatic growth will take care of the rest:

sorterGrow :: Ord a => Chanin [a] -> Chanout [a] -> CHP ()
sorterGrow input output
  = do pumpIn <- oneToOneChannel
       pumpOut <- oneToOneChannel
       highestFlushGrow True Nothing (reader pumpIn) (writer pumpOut)
         <|*|> (forever $ do xs <- readChannel input
                             mapM_ (writeChannel (writer pumpIn) . Normal) xs
                             writeChannel (writer pumpIn) $ Flush []
                             Flush xs' <- readChannel (reader pumpOut)
                             writeChannel output xs')

This is a slight variation on the expanding pipeline seen in the primes example. In that example we had an explicit “end” process that added on a new filterDiv process whenever it received an item. Here, we let the highestFlushGrow process know whether it is the last process in the pipeline, and it splits itself. So we do not need any other type of process in the pipeline besides highestFlushGrow.

This pipeline will grow to have as many processes as there are items of data (before the flush). However, it will not — as it stands — shrink again if future lists are shorter. That is a little more difficult, and I will return to it in a future post.

Categories: Uncategorized

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: