Functional concurrency

ayush mittal
5 min readOct 23, 2019
Photo by Sanjeevan SatheesKumar on Unsplash

This is a write up about how to design applications that maintain shared state, use concurrent code and are still based on the principles of functional programming. But first lets understand why it is easier said than done

The trouble is that essentially all the interesting applications of concurrency involve the deliberate and controlled mutation of shared state, such as screen real estate, the file system, or the internal data structures of the program. The right solution, therefore, is to provide mechanisms which allow the safe mutation of shared state section.

— Peyton Jones, Andrew Gordon, and Sigbjorn Finne in Concurrent Haskell

The problems is that concurrent applications by their very nature are stateful. We would have to maintain variables that are shared across and would be mutated by various sources. How do we create these applications using a functional language such as Scala?

Suppose we have to design an application that keeps a count of errors that have occurred so far in a Function at any given time. There are various places from where this function could be called concurrently. Let’s define a randomly failing function.

def randomlyFailing(id: Int): String = {
if(Math.random()>0.5)
throw new RuntimeException("error")

id.toString
}

Here is one plain simple attempt to maintain an error count for such a function. We write a wrapper around this function and the wrapper keeps the error count.

var errorCount = 0

def randomlyFailing(id: Int): String = {

if(Math.random()>0.5)
throw new RuntimeException("error")

id.toString
}

def wrapper(id: Int): String = {
try {
randomlyFailing(id)
}
catch {
case t => errorCount = errorCount+1
throw t
}
}

To the outside world we expose the wrapper function. The wrapper uses a mutable variable and increments it whenever an error is encountered. The solution is not pure and it does not solve the problem of concurrency. Multiple threads calling the wrapper could increment errorCount and we could loose some updates. Let’s fix that first using AtomicReference.

val atomicErrorCount = new AtomicReference[Long](0)

def randomlyFailing(id: Int): String = {

if(Math.random()>0.5)
throw new RuntimeException("told you")

id.toString
}

def wrapper(id: Int): String = {
try {
randomlyFailing(id)
}
catch {
case t => atomicErrorCount.updateAndGet(n => n+1)
throw t
}
}

This solution is an attempt to use atomic reference for maintaining a shared state. However the atomic reference we have just used is a mutable shared variable. This solution however is not functionally acceptable. atomicErrorCount is a mutable variable by design and does not guarantee referential transparency.

The recipe for making our solution functionally acceptable is simple. Access, modification and creation of mutable state needs suspension in an IO. What is an IO ?

IO values are pure, immutable values and thus preserves referential transparency, being usable in functional programming. An IO is a data structure that represents just a description of a side effectful computation.

Many Scala libraries define an IO type. We would use the cats.effect.IO for our use case. We need to ensure that all mutable state access is done inside this pure and referentially transparent data structure. We can come up with a basic algebra for for our requirements.

final case class IORef(longReference : AtomicReference[Int]) {

def modify[A](f: Int => (Int,A)) = IO {
def loop: A = {
val oldValue = longReference.get()
val newValues = f(oldValue)
if(longReference.compareAndSet(oldValue, newValues._1))
newValues._2
else
loop
}
loop
}

def get : IO[Long] = IO(longReference.get())
}

object IORef {
def create(initialValue: Int) =
IO(IORef(new AtomicReference[Int](initialValue)))
}

Following principles are key to the above algebra :

  1. Access to the atomic reference is wrapped inside IO construct.
  2. For modification we have to loop until we are able to set the right value because we need to ensure that we do modification after getting the latest state of the atomic reference. The modification itself is wrapped inside an IO construct.
  3. Create returns an IO[IORef]. This ensures that call graph where the IORef is created is also pure and safe.

We shall now use our new type for wrapping our randomlyFailing function inside a new function that is pure, safe, referentially transparent and support concurrency.

def wrapper(id: Int, ref: IORef): IO[String] = {
ref.modify {
currentErrors =>
Try(randomlyFailing(id)) match {
case Success(value) => (currentErrors, IO(value))
case Failure(ex) =>
(currentErrors+1, IO(ex.getMessage))
}
}.flatten
}

The new function wrapper performs all operations using modify from the IORef type. The business logic remains the same : increment error count on failure of the underlying function. For reasons that are not too relevant and unimportant for now, we are returning a IO[String] type as A for modify method. It simply helps us to capture errors and return a fallback value so that we can continue.

And now we can use the wrapper method to make calls to our function and keep count of how many errors were observed for each call. The p

val program: IO[(List[String], Int)] = for {
ref <- IORef.create(0)
result <- List.range(0,10).map(id => wrapper(id, ref)).parSequence
errorCount <- ref.get
} yield (result, errorCount)

The program creates a IORef and then calls the wrapper for a range of values in parallel. The program eventually returns the result list and the error count.

program.unsafeRunSync()
//(List(error, error, 2, 3, 4, 5, 6, 7, 8, error),3)
program.unsafeRunSync()
//(List(0, error, error, 3, 4, 5, error, error, error, 9),5)
program.unsafeRunSync()
//(List(0, error, error, 3, 4, 5, error, error, error, error),6)

As we can see from the output , the program achieves the expected output. It successfully captures concurrent update to a mutable reference : the error count. We can basically apply the same principles to any other use case where safe concurrent access and modification content is required.

We had to create a class IORef from scratch to guarantee asynchronous, concurrent mutable reference. However that was just an example to explain the concepts. In real world we already have a host of library that provides types such as IORef. cats-effect-concurrency module has a number of such types like Ref , MVar, Deferred etc that are meant to solve problems of concurrency using pure functional constructs.

Hopefully the post was able to explain the problem statement : why is shared state and concurrency difficult in Functional languages. The solution : as always with Functional paradigm is to create data types that capture the problem. For our use case the IORef data type was the solution.

This write up was inspired by Fabio’s upperbound library. If you are interested in going further , i would encourage you to listen to his talks. Also check this post by Oleg Pyzhcov if you would like to learn more. Happy reading !!

--

--