Introduction
In a previous post I wrote about Scala OTP, an initial attempt to bring the power or Erlang OTP, in particular its supervisor hierarchies and generic server to the Scala Actors library. OTP is one of the key parts in the Erlang success story and is in my opinion a requirement for Scala Actors to succeed in the real world.
Actors can simplify concurrent programming and reasoning immensely and I believe that Scala Actors is a key piece in the future Java concurrency puzzle. However, programming with actors and with explicit message passing and message dispatch loops can feel a bit unnatural and unnecessary verbose for Java developers that are used to regular OO method invocations and synchronous control flow.
For example, if we want to be able to pass a single message from one actor to the next we have to define two things.
A message with optional payload.
A message dispatch matching loop (partial function) in the receiving actor.
It is also complicated to do things like stateful control flow, e.g. send message to A, wait for reply, then send message to B, wait for reply, then do C. (this can in some ways be addressed using monad continuations, but is still not simple and intuitive to use).
Don't get me wrong. Even though it has its "flaws", using actors (message-passing concurrency) for concurrent programming is still so much simpler than using threads and locks (shared-state concurrency).
Active Objects (Concurrent Asynchronous Components)
In this post I'll outline a little library that we have been using that tries to unify Scala Actors, Scala OTP (supervisor hierarchies for fault tolerance) and regular OO method dispatch into an asynchronous component framework that I think can be best described as Active Objects.
Each so-called Active Object (concurrent asynchronous component) is a
GenericServer
that is managed by a Supervisor
, either in isolation or as part of a supervisor hierarchy/tree. E.g. each component is fully fault-tolerant. For more information about how supervisor hierarchies work, read this post. Each component consists of an interface (trait) and a regular Scala class as implementation for the interface. The catch is that each component has to be instantiate through a factory. Let's first take a look at the API and how to use this thing before we dig into the actual implementation.
Usage
Here is a simple example of a component that is using default supervisor management (which among other things mean that it is not part of a supervisor tree/hierarchy, but is still managed and will be restarted upon failure). We start with the component interface and implementation.
Now let's instantiate this component. The integer 1000 specifies the time interval an (asynchronous) invocation should have before timing out.
Now we can use this component as any regular instance of Foo
. The difference is that invocations are now asynchronously dispatched in an event-based actor with its return value is wrapped in a Future
, this emulates synchronous behavior but is non-blocking. The exception to this behavior is if a method is annotated with the @scala.actors.annotation.oneway
annotation, then it returns immediately. All components created through the factory are fault-tolerant GenericServers
managed by a Supervisor
, which means that they will be restarted upon failure using the restart scheme the supervisor has defined it under.
Now, if I would like to have more control over the Supervisor
configuration and/or want to compose different components into supervisor hierarchies, then we can use another factory method along with a method called start
that allows us to pass in a Supervisor
configuration (as defined by the Scala OTP Supervisor).
Let's take a look at a full example:
First create proxies (GenericServer
s) for our services.
Then let's configure the GenericServer
(s) by creating a list of GenericServer
configurations (defining lifecycles, restart strategies etc.). This configuration is passed into the ActiveObject.supervise
method which starts up the Supervisor
which starts up all GenericServer
's according to their configurations before returning the Supervisor
instance (which can be used for management of the components).
Create and use the components as in the previous example..
That pretty much sums it up. So how is this implemented?
Implementation
The first thing we need to do is to define an invocation context, holding arguments, method to be invoked as well as the target instance.
The second thing we need to do is to create a dynamic proxy wrapping the asynchronous dispatch. This proxy holds an instance of an actor dressed up in a GenericServer
. Now comes the little trick; we will now use the Invocation
context as the message to our GenericServer
actor.
As you can see in the code for the dispatcher
we are defining a partial function that is defined for two different messages; Invocation
and 'exit
. If the Invocation
message is received, then we invoke the invocation context, e.g. the method on the implementation instance and are returning the result to the caller using reply(..)
. If we receive an 'exit
message and we terminate the GenericServer
.
The call flow for this proxy is as follows. When a regular synchronous method invocation is made on the service interface or component is redirected to the invoke(..)
method. In this method we simply create an invocation context for this specific method invocation and sends it as a message to our GenericServer
(server). Here we have two options. If the target method is annotated with the @scala.actors.annotation.oneway
annotation then we fire the message and forget by invoking server ! invocation
, else we are sending the message using the server !!! invocation
operator which returns a Future
which we then wait on (emulating a synchronous method call).
Finally we create a factory which will do the Supervisor
configuration, wiring and startup.
That's pretty much all there is to it. The code is available as part of the Scala OTP library:
http://github.com/jboner/scala-otp/tree/master/component
Check it out by invoking: git clone git://github.com/jboner/scala-otp.git
All ideas, improvements, patches etc. are most welcome.