Jonas Bonér bio photo

Jonas Bonér

Specialist at Large.
Entrepreneur.
Hacker.
Public Speaker.
Powder Skier.
Obsessive Learner.
Jazz Addict.
Wannabee Musician.

Twitter LinkedIn Github
Introduction In a previous post I wrote about Scala OTP, an initial attempt to bring the power or Erlang OTP, in particular its supervisor hierarchies and generic server to the Scala Actors library. OTP is one of the key parts in the Erlang success story and is in my opinion a requirement for Scala Actors to succeed in the real world. Actors can simplify concurrent programming and reasoning immensely and I believe that Scala Actors is a key piece in the future Java concurrency puzzle. However, programming with actors and with explicit message passing and message dispatch loops can feel a bit unnatural and unnecessary verbose for Java developers that are used to regular OO method invocations and synchronous control flow. For example, if we want to be able to pass a single message from one actor to the next we have to define two things. A message with optional payload.
case class MyMessage(payload: AnyRef)
A message dispatch matching loop (partial function) in the receiving actor.
def act = {
  loop {
    react {
      case MyMessage(payload) =>
        ...  // do something with payload
      case _ =>
        ... // default case
    }
  }
}
It is also complicated to do things like stateful control flow, e.g. send message to A, wait for reply, then send message to B, wait for reply, then do C. (this can in some ways be addressed using monad continuations, but is still not simple and intuitive to use). Don't get me wrong. Even though it has its "flaws", using actors (message-passing concurrency) for concurrent programming is still so much simpler than using threads and locks (shared-state concurrency). Active Objects (Concurrent Asynchronous Components) In this post I'll outline a little library that we have been using that tries to unify Scala Actors, Scala OTP (supervisor hierarchies for fault tolerance) and regular OO method dispatch into an asynchronous component framework that I think can be best described as Active Objects. Each so-called Active Object (concurrent asynchronous component) is a GenericServer that is managed by a Supervisor, either in isolation or as part of a supervisor hierarchy/tree. E.g. each component is fully fault-tolerant. For more information about how supervisor hierarchies work, read this post. Each component consists of an interface (trait) and a regular Scala class as implementation for the interface. The catch is that each component has to be instantiate through a factory. Let's first take a look at the API and how to use this thing before we dig into the actual implementation. Usage Here is a simple example of a component that is using default supervisor management (which among other things mean that it is not part of a supervisor tree/hierarchy, but is still managed and will be restarted upon failure). We start with the component interface and implementation.
  trait Foo {
    def foo(msg: String): String
    @oneway def bar(msg: String)
  }

  class FooImpl extends Foo {
    def foo(msg: String): String = { println("foo: " + msg); msg }
    def bar(msg: String) = println("bar: " + msg)
  }
Now let's instantiate this component. The integer 1000 specifies the time interval an (asynchronous) invocation should have before timing out.
  val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
Now we can use this component as any regular instance of Foo. The difference is that invocations are now asynchronously dispatched in an event-based actor with its return value is wrapped in a Future, this emulates synchronous behavior but is non-blocking. The exception to this behavior is if a method is annotated with the @scala.actors.annotation.oneway annotation, then it returns immediately. All components created through the factory are fault-tolerant GenericServers managed by a Supervisor, which means that they will be restarted upon failure using the restart scheme the supervisor has defined it under.
   // use as usual
   foo.foo("foo")
   foo.bar("bar") // returns immediately since annotated with @oneway
Now, if I would like to have more control over the Supervisor configuration and/or want to compose different components into supervisor hierarchies, then we can use another factory method along with a method called start that allows us to pass in a Supervisor configuration (as defined by the Scala OTP Supervisor). Let's take a look at a full example:
  trait Foo {
    def foo(msg: String): String
    @oneway def bar(msg: String)
  }

  class FooImpl extends Foo {
    val bar: Bar = new BarImpl
    def foo(msg: String): String = { println("foo: " + msg); msg }
    def bar(msg: String) = bar.bar(msg)
  }

  trait Bar {
    def bar(msg: String)
  }

  class BarImpl extends Bar {
    def bar(msg: String) = println("bar: " + msg)
  }
First create proxies (GenericServers) for our services.
  val fooProxy = new ActiveObjectProxy(new FooImpl, 1000)
  val barProxy = new ActiveObjectProxy(new BarImpl, 1000)
Then let's configure the GenericServer(s) by creating a list of GenericServer configurations (defining lifecycles, restart strategies etc.). This configuration is passed into the ActiveObject.supervise method which starts up the Supervisor which starts up all GenericServer's according to their configurations before returning the Supervisor instance (which can be used for management of the components).
val supervisor =
  ActiveObject.supervise(
    RestartStrategy(AllForOne, 3, 100),
    Component(
      fooProxy,
      LifeCycle(Permanent, 100)) ::
    Component(
      barProxy,
      LifeCycle(Permanent, 100))
    :: Nil)
Create and use the components as in the previous example..
  val foo = ActiveObject.newInstance[Foo](classOf[Foo], fooProxy)
  val bar = ActiveObject.newInstance[Bar](classOf[Bar], barProxy)

  foo.foo("foo ")
  bar.bar("bar ")
That pretty much sums it up. So how is this implemented? Implementation The first thing we need to do is to define an invocation context, holding arguments, method to be invoked as well as the target instance.
  case class Invocation(val method: Method, val args: Array[AnyRef], val target: AnyRef) {
    def invoke: AnyRef = method.invoke(target, args:_*)
    override def toString: String = "Invocation [method: " + method.getName + ", args: " + args + ", target: " + target + "]"
    override def hashCode(): Int = { ... }
    override def equals(that: Any): Boolean = { ... }
  }
The second thing we need to do is to create a dynamic proxy wrapping the asynchronous dispatch. This proxy holds an instance of an actor dressed up in a GenericServer. Now comes the little trick; we will now use the Invocation context as the message to our GenericServer actor. As you can see in the code for the dispatcher we are defining a partial function that is defined for two different messages; Invocation and 'exit. If the Invocation message is received, then we invoke the invocation context, e.g. the method on the implementation instance and are returning the result to the caller using reply(..). If we receive an 'exit message and we terminate the GenericServer. The call flow for this proxy is as follows. When a regular synchronous method invocation is made on the service interface or component is redirected to the invoke(..) method. In this method we simply create an invocation context for this specific method invocation and sends it as a message to our GenericServer (server). Here we have two options. If the target method is annotated with the @scala.actors.annotation.oneway annotation then we fire the message and forget by invoking server ! invocation, else we are sending the message using the server !!! invocation operator which returns a Future which we then wait on (emulating a synchronous method call).
class ActiveObjectProxy(val target: AnyRef, val timeout: Int) extends InvocationHandler {
  private val oneway = classOf[scala.actors.annotation.oneway]

  private[ActiveObjectProxy] object dispatcher extends GenericServer {
    override def body: PartialFunction[Any, Unit] = {
      case invocation: Invocation =>
        try {
          reply(ErrRef(invocation.invoke))
        } catch {
          case e: InvocationTargetException => reply(ErrRef({ throw e.getTargetException }))
          case e => reply(ErrRef({ throw e }))
        }
      case 'exit => exit; reply()
      case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
    }
  }

  private[component] val server = new GenericServerContainer(target.getClass.getName, () => dispatcher)
  server.setTimeout(timeout)

  def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = invoke(Invocation(m, args, target))

  def invoke(invocation: Invocation): AnyRef = {
    if (invocation.method.isAnnotationPresent(oneway)) server ! invocation // fire and forget
    else {
      val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("proxy invocation timed out after " + timeout + " milliseconds") }))
      result() // wait on future for result
    }
  }
}
Finally we create a factory which will do the Supervisor configuration, wiring and startup.
object ActiveObject {

  def newInstance[T](intf: Class[T] forSome {type T}, target: AnyRef, timeout: Int): T = {
    val proxy = new ActiveObjectProxy(target, timeout)
    supervise(proxy)
    newInstance(intf, proxy)
  }

  def newInstance[T](intf: Class[T] forSome {type T}, proxy: ActiveObjectProxy): T = {
    Proxy.newProxyInstance(
      proxy.target.getClass.getClassLoader,
      Array(intf),
      proxy).asInstanceOf[T]
  }

  def supervise(restartStrategy: RestartStrategy, components: List[Component]): Supervisor = {
    object factory extends SupervisorFactory {
      override def getSupervisorConfig: SupervisorConfig = {
        SupervisorConfig(restartStrategy, components.map(c => Worker(c.component.server, c.lifeCycle)))
      }
    }
    val supervisor = factory.newSupervisor
    supervisor ! scala.actors.behavior.Start
    supervisor
  }

  private def supervise(proxy: ActiveObjectProxy): Supervisor =
    supervise(
      RestartStrategy(OneForOne, 5, 1000),
      Component(
        proxy,
        LifeCycle(Permanent, 100))
      :: Nil)
}
 
That's pretty much all there is to it. The code is available as part of the Scala OTP library: http://github.com/jboner/scala-otp/tree/master/component Check it out by invoking: git clone git://github.com/jboner/scala-otp.git All ideas, improvements, patches etc. are most welcome.