Sunday, November 29, 2015

Parallelizing independent effectual sub-computations

Background


In production code we abstract effectual computations that may fail via Scalaz Tasks and disjunctions.  For example:

type TestTask[A] = Task[\/[TestFault,A]]

This represents a task (assumed effectual) which can result in a fault of a known type (TestFault - usually this would be a sealed trait with a bunch of case classes as fault cases), or a result of parameterized type [A].

To aid in composition we typically use EitherT, so that we can deal with fail-fast semantics on the left side of the disjunction.  So the basic type infrastructure would look something like this:

object TestProcesses {
  type TestTask[A] = Task[\/[TestFault,A]]
  type TestProcess[A] = EitherT[Task,TestFault,A]

  def toProcess[A](a: => A): TestProcess[A] = EitherT.right[Task,TestFault,A](Task.delay(a))
  def fault[A](f: TestFault): TestProcess[A] = EitherT.left[Task,TestFault,A](Task.delay(f))
  implicit def liftToProcess[A](t: TestTask[A]): TestProcess[A] = EitherT[Task,TestFault,A](t)
}

which lets us compose things monadically while still staying in a referentially transparent framework.  For example, suppose we have two tasks that compute integers effectually (say by accepting input, or querying some service over the network) and we want to compose them into a single task that results in their product:

def product(p1: TestProcess[Int], p2: TestProcess[Int]): TestProcess[Int] = {
  for {
    val1 <- p1
    val2 <- p2
  } yield val1*val2
}

The important thing here is that everything is referentially transparent (no effects are run by this call - it just produces a TestProcess that performs the entire computation when it is run later), but the idioms used to deal with the values are the natural ones for the base types (Int in this case).

Frequently we come across cases where we have some unbounded collection of sub-computations, which are mutually independent (canonically a list, but really any traversable).  For example, suppose we have some service that given a sentence can return a numeric 'style' rating of some sort, and we want to use this to find the average rating of a document (considered as a list of sentences):

trait RatingService
{
  def rateSentence(s: Sentence):  TestProcess[Double]

  def documentStyleRating(content: List[Sentence]):
    TestProcess[Double] = {
    val subTasks = content map(rateSentence(_))

    subTasks.sequenceU map(_.sum)
  }
}

This works fine, but the sequenceU here which converts List[TestProcess[Double]] to TestProcess[List[Double]] will, when run, execute the subtasks sequentially, and if those subtasks are network (or possibly IO) bound the result will be poor utilization and slow resultant computation.

Options and test setup

It would be better (at least in many contexts) if we could modify things to have the results gathered concurrently, so what are the options?

I spent a few hours exploring some possibilities (quite possibly missing some obvious ones I suspect), so before I dive into the details here's a quick overview of a simple test setup to check the implementations seem to work.
For the purposes of testing, I am just going to consider computations that result in integers (or fail), which we then want to sum (over some monoid), returning a failure if any sub-computation fails, else the summed value.  All implementations will therefore implement this simple test trait:

trait Combinator {
  protected  def sum[A](values: List[A])(implicit m: Monoid[A]) = values.foldLeft(m.zero)((x,y)=> m.append(x,y))
  def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A]
}

The code to test this just uses Thread sleeps to simulate some effectual processing that takes appreciable time, and in some test cases a subset of the sub-tasks will fail.

The code of the tests is a bit longer than I want to insert here, so here's a gist instead: Gist of test code

Having set this up we can consider options on how to parallelize.  The key property we want is that it be transparent to the caller, so everything needs to be tidily wrapped inside our TestProcess abstraction, and no effects should run until the final returned composition is itself run by some top-level caller.

The first option is just to rely directly on Task primitives - fork() and gatherUnordered() are enough:

object SimpleForkCombinator extends Combinator {
  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    def conditionalSum(results: List[\/[TestFault,A]]): \/[TestFault,A] = {
      results.sequenceU map(sum(_)(m))
    }
    def runForked(subTask: TestProcess[A]): TestProcess[A] = {
      EitherT[Task,TestFault,A](Task.fork(subTask.run))
    }
    val forkedSubTasks = subTasks map (runForked(_))

    EitherT[Task,TestFault,A](Task.gatherUnordered(forkedSubTasks map(_.run)) map conditionalSum)
  }
}

There's quite a lot of mostly-not-generic boilerplate going on here, which is not ideal.  Also browsing around the internet suggests that Task.gatherUnordered() might have some scaling issues.

How about using scalaz.Nondeterminism?  This doesn't help with boilerplate, but it's a valid option, and maybe it has better performance characteristics (TBD):

object NondeterminismCombinator extends Combinator {
  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    def conditionalSum(results: List[\/[TestFault,A]]): \/[TestFault,A] = {
      results.sequenceU map(sum(_)(m))
    }
    def runForked(subTask: TestProcess[A]): TestProcess[A] = {
      EitherT[Task,TestFault,A](Task.fork(subTask.run))
    }
    val forkedSubTasks = subTasks map (runForked(_))
    EitherT[Task,TestFault,A](Nondeterminism[Task].gather(forkedSubTasks map(_.run)) map conditionalSum)
  }
}

How can we attack the boilerplate?  The original (sequential) implementation just involved a call to sequenceU, which is implemented on the Traverse trait, which gets imposed on List by scalaz implicit magic.  We should be able to do the same thing, and produce an instance of Applicative which does parallel composition for us.  This is actually slightly more code than the above boilerplate, but now it's totally generic (well, up to the fault type, which should be convertable to a type parameter with a bit more work), so it means we have a parallelized version of sequence() that can be used with ANY sub-computations that fit our TestProcess abstraction.  Here's the Applicative (based on one for raw Tasks by Paul Chiusano, which may be found here) and a little bit of implicit infrastructure to make it easier to use:

object ParallelApplicative {
  implicit val T = new Applicative[TestProcess] {
    def point[A](a: => A) = toProcess(a)
    def ap[A,B](a: => TestProcess[A])(f: => TestProcess[A => B]): TestProcess[B] = apply2(f,a)(_(_))
    override def apply2[A,B,C](a: => TestProcess[A], b: => TestProcess[B])(f: (A,B) => C): TestProcess[C] = {
      def disjunctf(da: \/[TestFault,A], db: \/[TestFault,B]): \/[TestFault,C] = {
        for {
          a <- da
          b <- db
        } yield f(a,b)
      }

      liftToProcess(Nondeterminism[Task].mapBoth(Task.fork(a.run),Task.fork(b.run))(disjunctf))
    }
  }

  case class ParallelApplicativeOps[A](l: List[TestProcess[A]]) {
    def sequenceP = T.sequence(l)
  }

  implicit def toParallelApplicativeOps[A](l: List[TestProcess[A]]): ParallelApplicativeOps[A] = ParallelApplicativeOps(l)
}

Given the above (which is generic), all our specific use needs is:

object ParallelApplicativeCombinator extends Combinator{
  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    subTasks.sequenceP map { l => sum(l)(m) }
  }
}

Ok, but what about Akka?  I've never used Akka, but this seems like a good time to start!  I'm probably missing all sort of tricks, but I wound up with a relatively large amount of code in this case (and some difficulty in handling the type parameterization).  This is a slightly unfair use-case for Akka however, as we want everything hidden away inside our TestProcess abstraction, rather than using Akka throughout our application for all effectual processing (so maybe an unnatural hybrid):

case class AkkaCombinator(numWorkers: Int) extends Combinator {
  class AkkaPool[A] {

    sealed trait CombinatorMessage
    case class Run(subTasks: List[TestProcess[A]]) extends CombinatorMessage
    case class Work(task: TestTask[A]) extends CombinatorMessage
    case class WorkResult(value: \/[TestFault, A]) extends CombinatorMessage
    case class Result(value: \/[TestFault, List[A]]) extends CombinatorMessage

    class Worker extends Actor {
      def receive = {
        case Work(task) => sender ! WorkResult(task.run)
      }
    }

    class Master extends Actor {
      val routees = for(n <- 1 to numWorkers) yield context.actorOf(Props(new Worker()), s"worker.$n")
      val workerRouter = context.actorOf(
        Props.empty.withRouter(RoundRobinRouter(routees)), name = "workerRouter")

      var results = mutable.MutableList[A]()
      var numOutstandingSubTasks = 0      var originator: ActorRef = null
      def receive = {
        case Run(processes) => {
          originator = sender
          for (process <- processes) {
            numOutstandingSubTasks += 1            workerRouter ! Work(process.run)
          }
        }
        case WorkResult(result) => {
          def handleError(f: TestFault) = {
            originator ! Result(f.left)
            context.stop(self)
          }
          def handleResult(a: A) = {
            results += a
            numOutstandingSubTasks -= 1            if (numOutstandingSubTasks == 0) {
              originator ! Result(results.toList.right)
              context.stop(self)
            }
          }

          result fold(handleError, handleResult)
        }
      }
    }

    def runUnordered(subTasks: List[TestProcess[A]]) = {
      val system = ActorSystem("CombinatorSystem")  //  TODO - need to disambiguate by instance?      val master = system.actorOf(Props(new Master()), name="master")
      implicit val timeout = Timeout(50 seconds)
      val resultFuture = master ? Run(subTasks)

      val result = Await.result(resultFuture, timeout.duration).asInstanceOf[Result]
      liftToProcess(Task.now(result.value))
    }
  }

  override def sum[A](subTasks: List[TestProcess[A]])(implicit m: Monoid[A]): TestProcess[A] = {
    val workerPool = new AkkaPool[A]

    workerPool.runUnordered(subTasks) map { l => sum(l)(m) }
  }
}

Test Results

So how do they compare performance-wise in my simple test, and how do they scale?

CombinatorNum sub-tasks
105010050010005000
Sequential0.862.594.7521.943.6215
Simple fork0.290.480.773.086.75100
Nondeterminism0.180.430.72.865.5927
Applicative0.180.420.682.885.59DNF
Akka (50 workers)0.430.490.631.732.7211

*DNF - did not finish - above about 4000 this combinator seems to just hang!  This is suspiciously close to the default stack frame size, so I suspect scalaz is being forced into some trampolining that is interacting badly with something inside Applicative (this is somewhat disturbing, and perhaps bears further investigation!)

Conclusions/Observations

  • The test tasks here are all non-CPU-limited (they are just sleeping), so the composite is bound to scale well with the size of the underlying executor pool thrown at it.  This is likely the reason why Akka (with 50 workers) scales better than the Task-based alternatives (for which I just used the default executor, which likely allocated 8 threads given the hardware I was running on).
  • Nondeterminism scales significantly better than direct use of Task above 4000 or so sub-tasks - again I suspect a stack-frame size issue forcing badly optimized trampolining, but regardless this is consistent with reports on the Internet generally.
  • Akka suffers from overhead at small scales, though the particular use case may be a bit unfair
  • It might be interesting to look at a tree-decomposition of the sub-tasks rather than a linear one (divide the set into 2 repeatedly and run each half as a sub-task) as a possible means to fixing the Applicative.  This needs more investigation, because idiomatically the Applicative seems to give the most useable solution from the perspective of any code making use of it.
Full project code can be found here.