Saturday, August 6, 2016

Functional programming at CognitiveScale

One of the things that attracted me to CognitiveScale (where I currently work as a senior platform engineer), was the opportunity to expand my skill-set in a variety of ways relative to my previous background.

Of these, the one I want to talk about here is the switch from an imperative to a functional programming paradigm.  All my previous work had been in imperative languages, from C through C++ and C#, with side-trips into Java along the way.  The core services at CognitiveScale are developed using Scala as our language of choice, and with a very functional style.

Scala is something of a hybrid, which bridges the OO and functional worlds, and probably the main reasons it was chosen were:

  • Flexibility in style - it can be used in a variety of ways, from simply 'a better Java', through to a much more Haskell-like pure functional style.  This allows for quick on-boarding of Java programmers (or those with similar language backgrounds, such as C#) while also supporting much more abstract styles, with the benefits accrued from the functional world (as I'll discuss later).  The choice can be made by individual teams, or components as appropriate to them.
  • Brevity and expressiveness - very little boilerplate is required, and a lot can be achieved in a small number of lines of code
  • Good community - Scala is a sufficiently popular language to have a good online community that seems to be generally friendly and open.
  • JVM ecosystem - because Scala is a JVM language, with strong Java inter-operability, there is a vast resource of open-source libraries available for almost any purpose you can imagine.  Notable open-source building blocks in use at CognitiveScale include:
    • Apache Kafka for asynchronous queuing
    • Argonaut for JSON serialization
    • Zookeeper as a foundation for distributed cache coordination
    • HTTP4S as the HTTP layer for REST services and clients
    • ScalaZ, of which more later, which is a categorical functional library for Scala
  • Strong type system.  This is perhaps a personal choice and there is room to differ on the pros and cons of strong vs weak typing in a programming language.  However, the group of engineers we have at CognitiveScale would all fall somewhere on the stronger-typing end of the spectrum (some more extremely so than others!)
Within the core development team at CognitiveScale, we hew very much to the functional end of the style spectrum, strongly influenced by trends that largely began in the Haskell community.

For me, as a new developer at CognitiveScale (almost a year ago now), this was a huge paradigm shift, and it has an admittedly steep learning curve.  For someone with an imperative background, the idea of writing complex code that eschews mutable data structures ([very] loosely speaking, no modifiable variables) and doesn't permit side effecting functions/methods (the ONLY thing a function does is return a value), is initially mind-boggling!

Fortunately the general atmosphere at CognitiveScale is one that promotes openness and continued learning, and one manifestation of this is a number of ad-hoc skill-sharing groups on various subjects, with semi-regular presentation and teaching sessions.  One of these teaches functional Scala (another interesting one is a series of semi-regular machine-learning talks from some of our machine-learning experts).  In the Scala case, we are fortunate to have on the staff a senior engineer with deep experience of the pure functional style, from both the Scala and Haskell worlds, who is also motivated to promote functional techniques and help develop deeper skills in the wider engineering organization.

My experience has been that it takes a while to get up to speed on such a different paradigm, but that once a certain threshold is reached (which was probably about 3 months in for me, though I'm still learning now), significant gains become apparent.  These manifest in a few ways:
  • Often pieces of code seem to 'write themselves' - so much compile-time checking arises from the combination of strong typing, and powerful parametrically typed building-block abstractions, that there can be very limited choice in the details of implementation once you know what the types involved are, and what it's going to be assembled from.  This mostly occurs at the level of individual functions, but the overall experience is almost like 'discovering' the solution as you follow a chain of compositional reasoning, rather than having to invent it from a blank slate.
  • Things frequently 'just work' - the experience of (at least unit) tests that just sail through the first time you run them becomes much more common than I have known it to be in previous environments
  • Reasoning becomes local rather than involving factors of more global context.  This keeps the 'cognitive frame' small and easy to hold in your head.  Code becomes much more understandable in smaller pieces.  This also promotes reuse through composition.
  • Concurrency is much less error-prone and easier to reason about, because coupling via side effects is not present
  • Productivity and velocity increase - once you learn how to leverage higher-level abstractions, and to reason about the constraints and laws they obey, complex operations turn into relatively simple compositions of pre-existing building blocks.  This is enabled by the functional nature of the language, but amplified considerably by the adoption of a pure-functional style and the categorical framework that ScalaZ provides us.
Overall, programming and debugging start to feel more like algebraic proof (which suits me since I'm a mathematician by educational background), and rigorous reasoning about the code is possible, at least locally.  Getting up to speed on this approach to developing software is a considerable investment, but one that has a big pay off.  Working for a company that is prepared to make investments in its employees like this, and give them the space for continued development, is something that makes going in to work each day something to look forward to.

I would therefore recommend any serious software development organization, which values their employees as long-term investments, to explore this approach to creating software.  Try it on a relatively isolated project with a small group of developers who are keen to explore new techniques first, and let them promote the results more widely as they learn to leverage the benefits.

It's paying off in big ways for CognitiveScale, and for those of us that work there as individuals.
I'm sure it can work in many other organizations also if they are prepared to try.

Sunday, November 29, 2015

Parallelizing independent effectual sub-computations

Background


In production code we abstract effectual computations that may fail via Scalaz Tasks and disjunctions.  For example:

type TestTask[A] = Task[\/[TestFault,A]]

This represents a task (assumed effectual) which can result in a fault of a known type (TestFault - usually this would be a sealed trait with a bunch of case classes as fault cases), or a result of parameterized type [A].

To aid in composition we typically use EitherT, so that we can deal with fail-fast semantics on the left side of the disjunction.  So the basic type infrastructure would look something like this:

object TestProcesses {
  type TestTask[A] = Task[\/[TestFault,A]]
  type TestProcess[A] = EitherT[Task,TestFault,A]

  def toProcess[A](a: => A): TestProcess[A] = EitherT.right[Task,TestFault,A](Task.delay(a))
  def fault[A](f: TestFault): TestProcess[A] = EitherT.left[Task,TestFault,A](Task.delay(f))
  implicit def liftToProcess[A](t: TestTask[A]): TestProcess[A] = EitherT[Task,TestFault,A](t)
}

which lets us compose things monadically while still staying in a referentially transparent framework.  For example, suppose we have two tasks that compute integers effectually (say by accepting input, or querying some service over the network) and we want to compose them into a single task that results in their product:

def product(p1: TestProcess[Int], p2: TestProcess[Int]): TestProcess[Int] = {
  for {
    val1 <- p1
    val2 <- p2
  } yield val1*val2
}

The important thing here is that everything is referentially transparent (no effects are run by this call - it just produces a TestProcess that performs the entire computation when it is run later), but the idioms used to deal with the values are the natural ones for the base types (Int in this case).

Frequently we come across cases where we have some unbounded collection of sub-computations, which are mutually independent (canonically a list, but really any traversable).  For example, suppose we have some service that given a sentence can return a numeric 'style' rating of some sort, and we want to use this to find the average rating of a document (considered as a list of sentences):

trait RatingService
{
  def rateSentence(s: Sentence):  TestProcess[Double]

  def documentStyleRating(content: List[Sentence]):
    TestProcess[Double] = {
    val subTasks = content map(rateSentence(_))

    subTasks.sequenceU map(_.sum)
  }
}

This works fine, but the sequenceU here which converts List[TestProcess[Double]] to TestProcess[List[Double]] will, when run, execute the subtasks sequentially, and if those subtasks are network (or possibly IO) bound the result will be poor utilization and slow resultant computation.

Options and test setup

It would be better (at least in many contexts) if we could modify things to have the results gathered concurrently, so what are the options?

I spent a few hours exploring some possibilities (quite possibly missing some obvious ones I suspect), so before I dive into the details here's a quick overview of a simple test setup to check the implementations seem to work.
For the purposes of testing, I am just going to consider computations that result in integers (or fail), which we then want to sum (over some monoid), returning a failure if any sub-computation fails, else the summed value.  All implementations will therefore implement this simple test trait:

trait Combinator {
  protected  def sum[A](values: List[A])(implicit m: Monoid[A]) = values.foldLeft(m.zero)((x,y)=> m.append(x,y))
  def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A]
}

The code to test this just uses Thread sleeps to simulate some effectual processing that takes appreciable time, and in some test cases a subset of the sub-tasks will fail.

The code of the tests is a bit longer than I want to insert here, so here's a gist instead: Gist of test code

Having set this up we can consider options on how to parallelize.  The key property we want is that it be transparent to the caller, so everything needs to be tidily wrapped inside our TestProcess abstraction, and no effects should run until the final returned composition is itself run by some top-level caller.

The first option is just to rely directly on Task primitives - fork() and gatherUnordered() are enough:

object SimpleForkCombinator extends Combinator {
  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    def conditionalSum(results: List[\/[TestFault,A]]): \/[TestFault,A] = {
      results.sequenceU map(sum(_)(m))
    }
    def runForked(subTask: TestProcess[A]): TestProcess[A] = {
      EitherT[Task,TestFault,A](Task.fork(subTask.run))
    }
    val forkedSubTasks = subTasks map (runForked(_))

    EitherT[Task,TestFault,A](Task.gatherUnordered(forkedSubTasks map(_.run)) map conditionalSum)
  }
}

There's quite a lot of mostly-not-generic boilerplate going on here, which is not ideal.  Also browsing around the internet suggests that Task.gatherUnordered() might have some scaling issues.

How about using scalaz.Nondeterminism?  This doesn't help with boilerplate, but it's a valid option, and maybe it has better performance characteristics (TBD):

object NondeterminismCombinator extends Combinator {
  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    def conditionalSum(results: List[\/[TestFault,A]]): \/[TestFault,A] = {
      results.sequenceU map(sum(_)(m))
    }
    def runForked(subTask: TestProcess[A]): TestProcess[A] = {
      EitherT[Task,TestFault,A](Task.fork(subTask.run))
    }
    val forkedSubTasks = subTasks map (runForked(_))
    EitherT[Task,TestFault,A](Nondeterminism[Task].gather(forkedSubTasks map(_.run)) map conditionalSum)
  }
}

How can we attack the boilerplate?  The original (sequential) implementation just involved a call to sequenceU, which is implemented on the Traverse trait, which gets imposed on List by scalaz implicit magic.  We should be able to do the same thing, and produce an instance of Applicative which does parallel composition for us.  This is actually slightly more code than the above boilerplate, but now it's totally generic (well, up to the fault type, which should be convertable to a type parameter with a bit more work), so it means we have a parallelized version of sequence() that can be used with ANY sub-computations that fit our TestProcess abstraction.  Here's the Applicative (based on one for raw Tasks by Paul Chiusano, which may be found here) and a little bit of implicit infrastructure to make it easier to use:

object ParallelApplicative {
  implicit val T = new Applicative[TestProcess] {
    def point[A](a: => A) = toProcess(a)
    def ap[A,B](a: => TestProcess[A])(f: => TestProcess[A => B]): TestProcess[B] = apply2(f,a)(_(_))
    override def apply2[A,B,C](a: => TestProcess[A], b: => TestProcess[B])(f: (A,B) => C): TestProcess[C] = {
      def disjunctf(da: \/[TestFault,A], db: \/[TestFault,B]): \/[TestFault,C] = {
        for {
          a <- da
          b <- db
        } yield f(a,b)
      }

      liftToProcess(Nondeterminism[Task].mapBoth(Task.fork(a.run),Task.fork(b.run))(disjunctf))
    }
  }

  case class ParallelApplicativeOps[A](l: List[TestProcess[A]]) {
    def sequenceP = T.sequence(l)
  }

  implicit def toParallelApplicativeOps[A](l: List[TestProcess[A]]): ParallelApplicativeOps[A] = ParallelApplicativeOps(l)
}

Given the above (which is generic), all our specific use needs is:

object ParallelApplicativeCombinator extends Combinator{
  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    subTasks.sequenceP map { l => sum(l)(m) }
  }
}

Ok, but what about Akka?  I've never used Akka, but this seems like a good time to start!  I'm probably missing all sort of tricks, but I wound up with a relatively large amount of code in this case (and some difficulty in handling the type parameterization).  This is a slightly unfair use-case for Akka however, as we want everything hidden away inside our TestProcess abstraction, rather than using Akka throughout our application for all effectual processing (so maybe an unnatural hybrid):

case class AkkaCombinator(numWorkers: Int) extends Combinator {
  class AkkaPool[A] {

    sealed trait CombinatorMessage
    case class Run(subTasks: List[TestProcess[A]]) extends CombinatorMessage
    case class Work(task: TestTask[A]) extends CombinatorMessage
    case class WorkResult(value: \/[TestFault, A]) extends CombinatorMessage
    case class Result(value: \/[TestFault, List[A]]) extends CombinatorMessage

    class Worker extends Actor {
      def receive = {
        case Work(task) => sender ! WorkResult(task.run)
      }
    }

    class Master extends Actor {
      val routees = for(n <- 1 to numWorkers) yield context.actorOf(Props(new Worker()), s"worker.$n")
      val workerRouter = context.actorOf(
        Props.empty.withRouter(RoundRobinRouter(routees)), name = "workerRouter")

      var results = mutable.MutableList[A]()
      var numOutstandingSubTasks = 0      var originator: ActorRef = null
      def receive = {
        case Run(processes) => {
          originator = sender
          for (process <- processes) {
            numOutstandingSubTasks += 1            workerRouter ! Work(process.run)
          }
        }
        case WorkResult(result) => {
          def handleError(f: TestFault) = {
            originator ! Result(f.left)
            context.stop(self)
          }
          def handleResult(a: A) = {
            results += a
            numOutstandingSubTasks -= 1            if (numOutstandingSubTasks == 0) {
              originator ! Result(results.toList.right)
              context.stop(self)
            }
          }

          result fold(handleError, handleResult)
        }
      }
    }

    def runUnordered(subTasks: List[TestProcess[A]]) = {
      val system = ActorSystem("CombinatorSystem")  //  TODO - need to disambiguate by instance?      val master = system.actorOf(Props(new Master()), name="master")
      implicit val timeout = Timeout(50 seconds)
      val resultFuture = master ? Run(subTasks)

      val result = Await.result(resultFuture, timeout.duration).asInstanceOf[Result]
      liftToProcess(Task.now(result.value))
    }
  }

  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    val workerPool = new AkkaPool[A]

    workerPool.runUnordered(subTasks) map { l => sum(l)(m) }
  }
}

Test Results

So how do they compare performance-wise in my simple test, and how do they scale?

CombinatorNum sub-tasks
105010050010005000
Sequential0.862.594.7521.943.6215
Simple fork0.290.480.773.086.75100
Nondeterminism0.180.430.72.865.5927
Applicative0.180.420.682.885.59DNF
Akka (50 workers)0.430.490.631.732.7211

*DNF - did not finish - above about 4000 this combinator seems to just hang!  This is suspiciously close to the default stack frame size, so I suspect scalaz is being forced into some trampolining that is interacting badly with something inside Applicative (this is somewhat disturbing, and perhaps bears further investigation!)

Conclusions/Observations

  • The test tasks here are all non-CPU-limited (they are just sleeping), so the composite is bound to scale well with the size of the underlying executor pool thrown at it.  This is likely the reason why Akka (with 50 workers) scales better than the Task-based alternatives (for which I just used the default executor, which likely allocated 8 threads given the hardware I was running on).
  • Nondeterminism scales significantly better than direct use of Task above 4000 or so sub-tasks - again I suspect a stack-frame size issue forcing badly optimized trampolining, but regardless this is consistent with reports on the Internet generally.
  • Akka suffers from overhead at small scales, though the particular use case may be a bit unfair
  • It might be interesting to look at a tree-decomposition of the sub-tasks rather than a linear one (divide the set into 2 repeatedly and run each half as a sub-task) as a possible means to fixing the Applicative.  This needs more investigation, because idiomatically the Applicative seems to give the most useable solution from the perspective of any code making use of it.
Full project code can be found here.