Emulating Shared Mutable Variables with Message-Passing Processes
CHP supports a message-passing style of concurrency. Shared mutable variables are not intended to be used — which is quite easy to keep to in a functional language like Haskell, where mutable variables are special, like IORef, MVar or TVar. (Contrast this with C, where mutating a variable from several threads is dangerously easy!) But sometimes you might find that you need some shared mutable data with safe updates — perhaps a global setting that most processes need access to. Instead of planting an MVar (or TVar) in the middle of your CHP program, you can instead use CHP’s message passing model to implement shared mutable variables.
The first challenge of shared mutable data is that the reads should not overlap the writes. You don’t want a situation where two processes try to overwrite the shared variable and you end up with half of each value. We can avoid this by representing a shared variable with an active process. The process has an incoming channel (for sending in new values of the variable) and an outgoing channel (for reading out the current value of the variable). These channels can be shared amongst many readers and writers of the variable, using anyToOneChannel and oneToAnyChannel (with claim, as touched upon in a previous post). Our first attempt at the shared variable process is simple:
sharedRW :: a -> Chanin a -> Chanout a -> CHP () sharedRW start input output = sharedRW' start where sharedRW' x = ((readChannel input) <-> (writeChannel output x >> return x)) >>= sharedRW'
The process sits and waits for one of two things (using <->, CHP’s choice operator): either a new value from its input channel, or a communication of the old value on the output channel (in which case the previous value is retained for the next iteration). The following test (that uses some forthcoming CHP testing additions) reads and writes the variable 1000 times, adding one each time. If we begin with value 1, we expect (and get) 1001 at the end:
succer :: Int -> Chanin Int -> Chanout Int -> CHP () succer n input output = replicateM_ n $ readChannel input >>= writeChannel output . succ testRW1 :: CHP CHPTestResult testRW1 = do toV <- oneToOneChannel fromV <- oneToOneChannel sharedRW 1 (reader toV) (writer fromV) `withCheck` do liftCHP $ succer 1000 (reader fromV) (writer toV) final <- liftCHP $ readChannel $ reader fromV poison (reader fromV) >> poison (writer toV) assertCHPEqual' "Final value" 1001 final
However, there is a problem that is not revealed by the above test. We have implemented a shared mutable variable, but we’ve also inherited the disadvantages! If you change the test so that it has three such processes in parallel, all accessing the shared variable (with shared channels), reading then writing back (adding one), a problem occurs:
succer' :: Int -> Shared Chanin Int -> Shared Chanout Int -> CHP () succer' n input output = replicateM_ n $ do x <- claim input readChannel claim output (flip writeChannel (succ x)) testRW2 :: CHP CHPTestResult testRW2 = do toV <- anyToOneChannel fromV <- oneToAnyChannel sharedRW 1 (reader toV) (writer fromV) `withCheck` do let p = succer' 1000 (reader fromV) (writer toV) liftCHP $ p <||> p <||> p final <- liftCHP $ claim (reader fromV) readChannel liftCHP $ claim (reader fromV) poison >> claim (writer toV) poison assertCHPEqual' "Final value" 3001 final
The failing output is:
### Failure in: 1:RW2 Final value; expected: 3001; actual: 1659
The problem is that if two processes read the variable, and then both processes write to the variable, whichever order the writes happen in, only 1 will be added. This is just the sort of problem that can occur with atomic reads and writes (rather than mutexes) on shared variables in languages such as C. We can fix this type of update problem neatly by redefining our shared variable process to accept a modification function rather than a new value (this is Haskell, so no issue in sending a function down a channel). This modification will then be applied atomically with respect to any other modifications. The modification can be conditional on the previous value, or just const someNewValue. Here’s the revised process:
sharedVar :: NFData a => a -> Chanin (a -> a) -> Chanout a -> CHP () sharedVar start input output = sharedVar' start where sharedVar' x = ( (forceApplyTo x <$> readChannel input) <-> (writeChannel output x >> return x) ) >>= sharedVar' forceApplyTo x f = (id $| rnf) (f x)
Note that because this process is applying lots of consecutive functions to the value without ever examining it, it could lead to space leaks — so I use some strict application magic from the strategies library to prevent this.
Our example uses the pattern of waiting to either read a new value or to engage in an output of the old value — a pattern that is likely to crop up repeatedly in future posts. It is something that is not directly supported by MVars: you cannot wait for a choice of several actions involving MVars. Nor is this pattern easily supported by TVars: having both sides in a synchronisation wait for a choice of synchronisations cannot be done without ending up with something like the STM algorithms that already underpin CHP.
If you want anything more complicated than the shared variable shown here, such as updating two values atomically, you should look to STM and TVars after all, but at least for simple cases where you require one shared variable, CHP can fulfil that need.