Functions into processes, using arrows
Pointfree notation is often the most elegant way to write a function in Haskell. Put simply, any time you write code such as:
foo x = f (g (h x))
Or, if you are a dollar fan:
foo x = f $ g $ h x
You can rewrite it as:
foo = f . g . h
Consider this example of a function composition from Neil Mitchell that finds the mode (most common element) of a list:
mostCommon :: Ord a => [a] -> a mostCommon = head . maximumBy (comparing length) . group . sort
A nice composition of four functions. Each function in that pipeline of functions is taking a single input and producing a single output, which is then fed into the next function. This pipeline of pure functions is analogous to a pipeline of communicating processes — each taking a single input and sending on a single output to the next process. So is there an easy way of converting such function pipelines into process pipelines? The answer is yes — by using arrows.

Even if you are a Haskell programmer, you may not be familiar with arrows. They can be used to express these input and output compositions. We can convert our function pipeline to use arrow notation by just changing the composition operator:
mostCommonArr1 :: Ord a => [a] -> a mostCommonArr1 = head <<< maximumBy (comparing length) <<< group <<< sort
This is because by good design/happy accident, a function does not need any special annotation to become part of an arrow. If we want to be more general, we must use the arr function to convert pure functions into arrows:
mostCommonArr2 :: Ord a => [a] -> a mostCommonArr2 = arr head <<< arr (maximumBy (comparing length)) <<< arr group <<< arr sort
A bit more cumbersome perhaps, but we haven’t changed the original too much — the pipeline of the four functions is still visibly there. Now that we have our function pipeline expressed in terms of arrows, changing to a process pipeline is a relatively simple matter. You’ll need to import the Control.Concurrent.CHP.Arrow module, and then re-type the pipeline to be a CHP process with channels, and stick runPipeline on the front:
mostCommonArrProc :: Ord a => Chanin [a] -> Chanout a -> CHP () mostCommonArrProc = runPipeline $ arr head <<< arr (maximumBy (comparing length)) <<< arr group <<< arr sort
And that’s it. The function here is now a communicating pipeline of four functions, wrapped up into one. mostCommonArrProc will sit there waiting to be sent a list of items, and once it has been, it will output the most common element of the list. So we’ve re-used our simple pure-function pipeline as a pipeline of communicating processes, with only a little change in notation.
Note: Since base-4, Arrow has become based on Category, which means you can actually express the function using the original dot composition, like so:
mostCommonCatProc :: Ord a => Chanin [a] -> Chanout a -> CHP () mostCommonCatProc = runPipeline $ arr head . arr (maximumBy (comparing length)) . arr group . arr sort
I prefer the <<< notation of the arrows, as I think it better expresses a pipeline of processes — and it doesn’t require base-4.
OK, cool. But I don’t understand what is gained.
It’s a way to compose process pipelines that’s not too far from creating function pipelines. So if you can express your program as a function pipeline, you can easily make that concurrent. I should probably a provide a strict-output equivalent of arr, as then if you use that version you may be able to achieve some parallel speed-up of your process pipeline, while also easily slotting it into your process network. It is also quite easy to vary the granularity when looking for parallel speed-up — you could have all four functions in one process, or two functions in each process, or one function in each process.
I think I should clarify: when I say process pipeline, I mean a pipeline of processes that are all operating concurrently with each other. So by changing to the process arrow from the function arrow, you get four concurrent processes in a pipeline, so that four items of data can be in the pipeline at once and operated on concurrently. However, without the strict output mentioned in my other comment, you’re not going to get speed-up — just thunk-building!
CHP is a really cool project. The concurrent library I’ve always wished to have, but never had time to build, big thanks
Arrows are a nice fit for building and connecting processes/agents together and CHP lets us do run our agents concurrently. So thanks for providing an arrow interface too
I think to that the (<<>>) operators fit better then (.) from Category , so one has finer control about parallelization.
Btw. do you have a strict version of “runPipeline” which creates channels evaluating their input to rnf?
So, when will we see an “efficient” concurrent FRP library based on CHP?
I’ve just uploaded CHP 1.3.2 to Hackage that contains an arrStrict function that evaluates values to rnf before sending them, so you can build the strict pipelines discussed above. So it’s not runPipeline that needs to be altered, it’s the individual arr functions. As to FRP — I haven’t quite wrapped my head around it fully yet, but I would like to explore its relation to CHP if I can.
Did you mean to change the type of mostCommonArr2? As it’s written, pirate-ifying the pipeline could be more general than mostCommonArr1, but it isn’t because the type signature constrains the arrow instance to functions.
Are you suggesting that I could have made the type:
mostCommonArr2 :: (Arrow a, Ord b) => a [b] b
? At which point I could change the definition of mostCommonArrProc to:
mostCommonArrProc = runPipeline mostCommonArr2
That would be quite neat — and in fact I could probably just do away with mostCommonArrProc at that point.