In a previous post I wrote about Scala OTP, an initial attempt to bring the power or Erlang OTP, in particular its supervisor hierarchies and generic server to the Scala Actors library. OTP is one of the key parts in the Erlang success story and is in my opinion a requirement for Scala Actors to succeed in the real world.
Actors can simplify concurrent programming and reasoning immensely and I believe that Scala Actors is a key piece in the future Java concurrency puzzle. However, programming with actors and with explicit message passing and message dispatch loops can feel a bit unnatural and unnecessary verbose for Java developers that are used to regular OO method invocations and synchronous control flow.
For example, if we want to be able to pass a single message from one actor to the next we have to define two things.
A message with optional payload.
A message dispatch matching loop (partial function) in the receiving actor.
It is also complicated to do things like stateful control flow, e.g. send message to A, wait for reply, then send message to B, wait for reply, then do C. (this can in some ways be addressed using monad continuations, but is still not simple and intuitive to use).
Don’t get me wrong. Even though it has its “flaws”, using actors (message-passing concurrency) for concurrent programming is still so much simpler than using threads and locks (shared-state concurrency).
Active Objects (Concurrent Asynchronous Components)
In this post I’ll outline a little library that we have been using that tries to unify Scala Actors, Scala OTP (supervisor hierarchies for fault tolerance) and regular OO method dispatch into an asynchronous component framework that I think can be best described as Active Objects.
Each so-called Active Object (concurrent asynchronous component) is a
GenericServer that is managed by a
Supervisor, either in isolation or as part of a supervisor hierarchy/tree. E.g. each component is fully fault-tolerant. For more information about how supervisor hierarchies work, read this post. Each component consists of an interface (trait) and a regular Scala class as implementation for the interface. The catch is that each component has to be instantiate through a factory. Let’s first take a look at the API and how to use this thing before we dig into the actual implementation.
Here is a simple example of a component that is using default supervisor management (which among other things mean that it is not part of a supervisor tree/hierarchy, but is still managed and will be restarted upon failure). We start with the component interface and implementation.
Now let’s instantiate this component. The integer 1000 specifies the time interval an (asynchronous) invocation should have before timing out.
Now we can use this component as any regular instance of
Foo. The difference is that invocations are now asynchronously dispatched in an event-based actor with its return value is wrapped in a
Future, this emulates synchronous behavior but is non-blocking. The exception to this behavior is if a method is annotated with the
@scala.actors.annotation.oneway annotation, then it returns immediately. All components created through the factory are fault-tolerant
GenericServers managed by a
Supervisor, which means that they will be restarted upon failure using the restart scheme the supervisor has defined it under.
Now, if I would like to have more control over the
Supervisor configuration and/or want to compose different components into supervisor hierarchies, then we can use another factory method along with a method called
start that allows us to pass in a
Supervisor configuration (as defined by the Scala OTP Supervisor).
Let’s take a look at a full example:
First create proxies (
GenericServers) for our services.
Then let’s configure the
GenericServer(s) by creating a list of
GenericServer configurations (defining lifecycles, restart strategies etc.). This configuration is passed into the
ActiveObject.supervise method which starts up the
Supervisor which starts up all
GenericServer’s according to their configurations before returning the
Supervisor instance (which can be used for management of the components).
Create and use the components as in the previous example..
That pretty much sums it up. So how is this implemented?
The first thing we need to do is to define an invocation context, holding arguments, method to be invoked as well as the target instance.
The second thing we need to do is to create a dynamic proxy wrapping the asynchronous dispatch. This proxy holds an instance of an actor dressed up in a
GenericServer. Now comes the little trick; we will now use the
Invocation context as the message to our
As you can see in the code for the
dispatcher we are defining a partial function that is defined for two different messages;
'exit. If the
Invocation message is received, then we invoke the invocation context, e.g. the method on the implementation instance and are returning the result to the caller using
reply(..). If we receive an
'exit message and we terminate the
The call flow for this proxy is as follows. When a regular synchronous method invocation is made on the service interface or component is redirected to the
invoke(..) method. In this method we simply create an invocation context for this specific method invocation and sends it as a message to our
GenericServer (server). Here we have two options. If the target method is annotated with the
@scala.actors.annotation.oneway annotation then we fire the message and forget by invoking
server ! invocation, else we are sending the message using the
server !!! invocation operator which returns a
Future which we then wait on (emulating a synchronous method call).
Finally we create a factory which will do the
Supervisor configuration, wiring and startup.
That’s pretty much all there is to it. The code is available as part of the Scala OTP library:
Check it out by invoking:
git clone git://github.com/jboner/scala-otp.git
All ideas, improvements, patches etc. are most welcome.