We Are Reactive

Erlang-style Supervisor Module for Scala Actors

Erlang-style Supervisor Module for Scala Actors

In this post I will explain how you can build fault-tolerant systems using Scala Actors by arranging them in Supervisor hierarchies using a library for Scala Supervisors that I just released.

But first, let’s recap what Actors are and what makes them useful.

An actor is an abstraction that implements Message-Passing Concurrency. Actors have no shared state and are communicating by sending and receiving messages. This is a paradigm that provides a very different and much simpler concurrency model than Shared-State Concurrency (the scheme adopted by C, Java, C# etc.) and making it easier to avoid problems like deadlocks, live locks, thread starvation etc. This makes it possible to write code that is deterministic and side-effect-free, something that makes easier to write, test, understand and reason about. Each actor has a mailbox in which it receives incoming messages and can use pattern matching on the messages to decide if a message is interesting and which action to take. The most well known and successful implementation of actors can be found in the Erlang language (and the OTP platform) where it has been used to implement extremely fault tolerant (99.9999999% reliability – 9 nines) and massively concurrent systems (with hundreds of thousand simultaneous actors).

So what are Supervisor hierarchies? Let’s go to the source; http://www.erlang.org/doc/design_principles/sup_princ.html#5.

A supervisor is responsible for starting, stopping and monitoring its child processes. The basic idea of a supervisor is that it should keep its child processes alive by restarting them when necessary.

It has two different restart strategies; All-For-One and One-For-One. Best explained using some pictures (referenced from erlang.org):

OneForOne
OneForOne

AllForOne
AllForOne

Naturally, the library I have written for Scala is by no means as complete and hardened as Erlang’s, but it seems to do a decent job in providing the core functionality.

The implementation consists of two main abstractions; Supervisor and GenericServer.

The GenericServer is wrapped by a GenericServerContainer instance providing a necessary indirection needed to be able to fully manage the life-cycle of the GenericServer in an easy way.

So, let’s try it out by writing a small example in which we create a couple of servers, configure them, use them in various ways, kill one of them, see it recover, hotswap its implementation etc.

(Sidenote: I have written about hotswapping actors before, however this library has taken this approach a but further and provides a more flexible and powerful way of achieving this. Thanks DPP.)

This walk-through will only cover some of the API, for more details look at the code or the tests.

1. Create our server messages

import scala.actors._
import scala.actors.Actor._

import com.jonasboner.supervisor._
import com.jonasboner.supervisor.Helpers._

sealed abstract class SampleMessage
case object Ping extends SampleMessage
case object Pong extends SampleMessage
case object OneWay extends SampleMessage
case object Die extends SampleMessage

2. Create a GenericServer
We do that by extending the GenericServer trait and override the body method.

class SampleServer extends GenericServer {

  // This method implements the core server logic and naturally has to be overridden
  override def body: PartialFunction[Any, Unit] = {
    case Ping => 
      println("Received Ping"); reply(Pong)

    case OneWay => 
      println("Received OneWay")

    case Die => 
      println("Received Die..dying...") 
      throw new RuntimeException("Received Die message")
  }

}

GenericServer also has some callback life-cycle methods, such as init(..) and shutdown(..).

3. Wrap our SampleServer in a GenericServerContainer
Here we also give it a name to be able to refer to it later. We are creating two instances of the same server impl in order to try out multiple server restart in case of failure.

object sampleServer1 extends GenericServerContainer("sample1", () => new SampleServer)
object sampleServer2 extends GenericServerContainer("sample2", () => new SampleServer)

4. Create a Supervisor configuration
Here we create a SupervisorFactory that is configuring our servers. The configuration mimics the Erlang configuration and defines a general restart strategy for our Supervisor as well as a list of workers (servers) which for each we define a specific life-cycle.

object factory extends SupervisorFactory {
  override protected def getSupervisorConfig: SupervisorConfig = {
    SupervisorConfig(
      RestartStrategy(AllForOne, 3, 10000),
       Worker(
        sampleServer1,
        LifeCycle(Permanent, 1000)) ::
       Worker(
        sampleServer2,
        LifeCycle(Permanent, 1000)) ::
      Nil)
  }
}

5. Create a new Supervisor

val supervisor = factory.newSupervisor

Output:


12:25:30.031 [Thread-2] DEBUG com.jonasboner.supervisor.Supervisor – Configuring supervisor:com.jonasboner.supervisor.Supervisor@860d49
12:25:30.046 [Thread-2] DEBUG com.jonasboner.supervisor.Supervisor – Linking actor [Main$SampleServer$1@1b9240e] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
12:25:30.062 [Thread-2] DEBUG com.jonasboner.supervisor.Supervisor – Linking actor [Main$SampleServer$1@1808199] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
12:25:30.062 [main] DEBUG Main$factory$2$ – Supervisor successfully configured

6. Start the Supervisor
This also starts the servers.

supervisor ! Start

Output:


12:25:30.078 [Thread-8] INFO com.jonasboner.supervisor.Supervisor – Starting server: Main$sampleServer2$2$1479feb 12:25:30.078 [Thread-8] INFO com.jonasboner.supervisor.Supervisor - Starting server: Main$sampleServer1$2$97a560

7. Try to communicate with the servers.
Here we try to send a couple one way asynchronous messages to our servers.


sampleServer1 ! OneWay

Try to get a reference to our sampleServer2 (by name) from the Supervisor before sending a message.

supervisor.getServer("sample2") match {
  case Some(server2) => server2 ! OneWay
  case None => println("server [sample2] could not be found")
}

Output:


Received OneWay
Received OneWay

8. Send a message using a future
Try to send an asynchronous message – receive a future – and wait 100 ms (time-out) for the reply.

val future = sampleServer1 !! Ping
val reply1 = future.receiveWithin(100) match {
  case Some(reply) => 
    println("Received reply: " + reply)
  case None => 
    println("Did not get a reply witin 100 ms")
}

Output:


Received Ping
Received reply: Pong

9. Kill one of the servers
Try to send a message (Die) telling the server to kill itself (by throwing an exception).

sampleServer1 ! Die

Output:


Received Die..dying…
12:25:30.093 [Thread-8] ERROR c.j.supervisor.AllForOneStrategy – Server [Main$SampleServer$1@1b9240e] has failed due to [java.lang.RuntimeException: Received Die message] – scheduling restart – scheme: ALL_FOR_ONE.
12:25:30.093 [Thread-8] DEBUG Main$sampleServer2$2$ – Waiting 1000 milliseconds for the server to shut down before killing it.
12:25:30.093 [Thread-8] DEBUG Main$sampleServer2$2$ – Server [sample2] has been shut down cleanly.
12:25:30.093 [Thread-8] DEBUG c.j.supervisor.AllForOneStrategy – Restarting server [sample2] configured as PERMANENT.
12:25:30.093 [Thread-8] DEBUG com.jonasboner.supervisor.Supervisor – Linking actor [Main$SampleServer$1@166aa18] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
12:25:30.093 [Thread-8] DEBUG Main$sampleServer1$2$ – Waiting 1000 milliseconds for the server to shut down before killing it.
12:25:30.093 [main] DEBUG com.jonasboner.supervisor.Helpers$ – Future timed out while waiting for actor: Main$SampleServer$1@1b9240e
Expected exception: java.lang.RuntimeException: Time-out
12:25:31.093 [Thread-8] DEBUG c.j.supervisor.AllForOneStrategy – Restarting server [sample1] configured as PERMANENT.
12:25:31.093 [Thread-8] DEBUG com.jonasboner.supervisor.Supervisor – Linking actor [Main$SampleServer$1@1968e23] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]

10. Send an asyncronous message and wait on a future.

If this call times out, the error handler we define will be invoked – in this case throw an exception. It is likely that this call will time out since the server is in the middle of recovering from failure and we are on purpose defining a very short time-out to trigger this behavior.

val reply2 = try {
  sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 10) 
} catch { case e => println("Expected exception: " + e.toString); Pong } 

The output of this call (due to the async nature of actors) is interleaved with the logging for the restart of the servers. As you can see the log below can be found in the middle of the restart output.


12:25:30.093 [main] DEBUG com.jonasboner.supervisor.Helpers$ – Future timed out while waiting for actor: Main$SampleServer$1@1b9240e
Expected exception: java.lang.RuntimeException: Time-out

Server should be up again. Try the same call again

val reply3 = try {
  sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 1000) 
} catch { case e => println("Expected exception: " + e.toString); Pong } 

Output:


Received Ping

Also check that server number 2 is up and healthy.

sampleServer2 ! Ping 

Output:


Received Ping

11. Try to hotswap the server implementation
Here we are passing in a completely new implementation of the server logic (doesn’t look that different tough, but it can be any piece of scala pattern matching code) to the server’s hotswap method.

sampleServer1.hotswap(Some({
  case Ping => 
    println("Hotswapped Ping")
}))

12. Try the hotswapped server out

sampleServer1 ! Ping

Output:


Hotswapped Ping

13. Hotswap again

sampleServer1.hotswap(Some({
  case Pong => 
    println("Hotswapped again, now doing Pong")
    reply(Ping)
}))

14. Send an asyncronous message that will wait on a future (using a different syntax/method).
Method returns an Option[T] which can be of two different types; Some(result) or None. If we receive Some(result) then we return the result, but if None is received then we invoke the error handler that we define in the getOrElse method. In this case print out an info message (but you could throw an exception or do whatever you like…) and return a default value (Ping).

val reply4 = (sampleServer1 !!! Pong).getOrElse({
  println("Time out when sending Pong")
  Ping
})

Output:


Hotswapped again, now doing Pong

Same invocation with pattern matching syntax.

val reply5 = sampleServer1 !!! Pong match {
  case Some(result) => result
  case None => println("Time out when sending Pong"); Ping  
}

Output:


Hotswapped again, now doing Pong

15. Hotswap back to original implementation.
This is done by passing in None to the hotswap method.

sampleServer1.hotswap(None)

16. Test the final hotswap

sampleServer1 !  Ping

Output:


Received Ping

17. Shut down the supervisor and its server(s)

supervisor ! Stop

Output:


12:25:31.093 [Thread-6] INFO com.jonasboner.supervisor.Supervisor – Stopping server: Main$sampleServer2$2$1479feb 12:25:31.093 [Thread-6] INFO com.jonasboner.supervisor.Supervisor - Stopping server: Main$sampleServer1$2$97a560
12:25:31.093 [Thread-6] INFO com.jonasboner.supervisor.Supervisor – Stopping supervisor: com.jonasboner.supervisor.Supervisor@860d49

You can find this code in the sample.scala file in the root directory of the distribution. Run it by invoking:


scala -cp target/supervisor-0.3.jar:[dependency jars: slf4j and logback] sample.scala

Check out
The SCM system used is Git.

1. Download and install Git
2. Invoke git clone git@github.com:jboner/scala-supervisor.git.

Build it
The build system used is Maven 2.

1. Download and install Maven 2.
2. Step into the root dir scala-supervisor.
3. Invoke mvn install

This will build the project, run all tests, create a jar and upload it to your local Maven repository ready for use.

Runtime dependencies
Automatically downloaded my Maven.

1. Scala 2.7.1-final
2. SLF4J 1.5.2
3. LogBack Classic 0.9.9

That’s all to it.

Have fun.

Jonas Bonér 16 June 2008
blog comments powered by Disqus