Growing Sort Pump
In previous posts, I have explained the (fixed capacity) sort pump, and expanding pipelines. We can combine these ideas to form an expanding sort pump that grows as required.
The sort pump as originally described was fixed capacity — if you send that sort pump a list that is longer than its stated capacity, it wouldn’t work correctly. We could set-up and tear-down the list of processes each time if we wished, but there is also another solution — a pipeline of processes that automatically grows to the longest required length.
To implement the growing pipeline, we pass each component process a flag that indicates whether it is the final process in the pipeline. If this flag is True (it’s the last process) and the process wants to send a non-flush message onwards (there’s no-one there!), it automatically splits itself into two component processes. By this mechanism, the pipeline will grow as long as needed, with the last process always splitting itself into two when needed:
highestFlushGrow :: Ord a => Bool -> Maybe a -> Chanin (FlushOrNormal a) -> Chanout (FlushOrNormal a) -> CHP () highestFlushGrow last initVal input output = maybe empty full initVal where empty = do x <- readChannel input case x of Flush _ -> writeChannel output x >> empty Normal y -> full y full cur = do x <- readChannel input case x of Flush xs -> writeChannel output (Flush (cur:xs)) >> empty Normal new -> let (low, high) = if cur < new then (cur, new) else (new, cur) in if not last then writeChannel output (Normal low) >> full high else (highestFlushGrow False (Just high) |->| highestFlushGrow True (Just low)) input output
So now our wrapper process doesn’t need to start a pipeline of these things — it just starts with a single process, and the automatic growth will take care of the rest:
sorterGrow :: Ord a => Chanin [a] -> Chanout [a] -> CHP () sorterGrow input output = do pumpIn <- oneToOneChannel pumpOut <- oneToOneChannel highestFlushGrow True Nothing (reader pumpIn) (writer pumpOut) <|*|> (forever $ do xs <- readChannel input mapM_ (writeChannel (writer pumpIn) . Normal) xs writeChannel (writer pumpIn) $ Flush [] Flush xs' <- readChannel (reader pumpOut) writeChannel output xs')
This is a slight variation on the expanding pipeline seen in the primes example. In that example we had an explicit “end” process that added on a new filterDiv process whenever it received an item. Here, we let the highestFlushGrow process know whether it is the last process in the pipeline, and it splits itself. So we do not need any other type of process in the pipeline besides highestFlushGrow.
This pipeline will grow to have as many processes as there are items of data (before the flush). However, it will not — as it stands — shrink again if future lists are shorter. That is a little more difficult, and I will return to it in a future post.
-
October 2, 2009 at 12:16 pmTesting Communicating Haskell Processes with QuickCheck and HUnit « Communicating Haskell Processes