Jonas Bonér bio photo

Jonas Bonér

Present.
Entrepreneur.
Hacker.
Public Speaker.
Powder Skier.
Perpetual Learner.
Jazz Fanatic.
Wannabee Musician.

Twitter LinkedIn Github
In this post I will explain how you can build fault-tolerant systems using Scala Actors by arranging them in Supervisor hierarchies using a library for Scala Supervisors that I just released. But first, let's recap what Actors are and what makes them useful. An actor is an abstraction that implements Message-Passing Concurrency. Actors have no shared state and are communicating by sending and receiving messages. This is a paradigm that provides a very different and much simpler concurrency model than Shared-State Concurrency (the scheme adopted by C, Java, C# etc.) and making it easier to avoid problems like deadlocks, live locks, thread starvation etc. This makes it possible to write code that is deterministic and side-effect-free, something that makes easier to write, test, understand and reason about. Each actor has a mailbox in which it receives incoming messages and can use pattern matching on the messages to decide if a message is interesting and which action to take. The most well known and successful implementation of actors can be found in the Erlang language (and the OTP platform) where it has been used to implement extremely fault tolerant (99.9999999% reliability - 9 nines) and massively concurrent systems (with hundreds of thousand simultaneous actors). So what are Supervisor hierarchies? Let's go to the source; http://www.erlang.org/doc/design_principles/sup_princ.html#5.
A supervisor is responsible for starting, stopping and monitoring its child processes. The basic idea of a supervisor is that it should keep its child processes alive by restarting them when necessary.
It has two different restart strategies; All-For-One and One-For-One. Best explained using some pictures (referenced from erlang.org): OneForOne OneForOne AllForOne AllForOne Naturally, the library I have written for Scala is by no means as complete and hardened as Erlang's, but it seems to do a decent job in providing the core functionality. The implementation consists of two main abstractions; Supervisor and GenericServer. * The Supervisor manages hierarchies of Scala actors and provides fault-tolerance in terms of different restart semantics. The configuration and semantics is almost a 1-1 port of the Erlang Supervisor implementation, explained in the erlang.org doc referenced above. Read this document in order to understand how to configure the Supervisor properly. * The GenericServer (which subclasses the Actor class) is a trait that forms the base for a server to be managed by a Supervisor. The GenericServer is wrapped by a GenericServerContainer instance providing a necessary indirection needed to be able to fully manage the life-cycle of the GenericServer in an easy way. So, let's try it out by writing a small example in which we create a couple of servers, configure them, use them in various ways, kill one of them, see it recover, hotswap its implementation etc. (Sidenote: I have written about hotswapping actors before, however this library has taken this approach a but further and provides a more flexible and powerful way of achieving this. Thanks DPP.) This walk-through will only cover some of the API, for more details look at the code or the tests. 1. Create our server messages
import scala.actors._
import scala.actors.Actor._

import com.jonasboner.supervisor._
import com.jonasboner.supervisor.Helpers._

sealed abstract class SampleMessage
case object Ping extends SampleMessage
case object Pong extends SampleMessage
case object OneWay extends SampleMessage
case object Die extends SampleMessage
2. Create a GenericServer We do that by extending the GenericServer trait and override the body method.
class SampleServer extends GenericServer {

  // This method implements the core server logic and naturally has to be overridden

  override def body: PartialFunction[Any, Unit] = {
    case Ping =>
      println("Received Ping"); reply(Pong)

    case OneWay =>
      println("Received OneWay")

    case Die =>
      println("Received Die..dying...")
      throw new RuntimeException("Received Die message")
  }

}
GenericServer also has some callback life-cycle methods, such as init(..) and shutdown(..). 3. Wrap our SampleServer in a GenericServerContainer Here we also give it a name to be able to refer to it later. We are creating two instances of the same server impl in order to try out multiple server restart in case of failure.
object sampleServer1 extends GenericServerContainer("sample1", () => new SampleServer)
object sampleServer2 extends GenericServerContainer("sample2", () => new SampleServer)
4. Create a Supervisor configuration Here we create a SupervisorFactory that is configuring our servers. The configuration mimics the Erlang configuration and defines a general restart strategy for our Supervisor as well as a list of workers (servers) which for each we define a specific life-cycle.
object factory extends SupervisorFactory {
  override protected def getSupervisorConfig: SupervisorConfig = {
    SupervisorConfig(
      RestartStrategy(AllForOne, 3, 10000),
       Worker(
        sampleServer1,
        LifeCycle(Permanent, 1000)) ::
       Worker(
        sampleServer2,
        LifeCycle(Permanent, 1000)) ::
      Nil)
  }
}
5. Create a new Supervisor
val supervisor = factory.newSupervisor
Output:
12:25:30.031 [Thread-2] DEBUG com.jonasboner.supervisor.Supervisor - Configuring supervisor:com.jonasboner.supervisor.Supervisor@860d49
12:25:30.046 [Thread-2] DEBUG com.jonasboner.supervisor.Supervisor - Linking actor [Main$SampleServer$1@1b9240e] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
12:25:30.062 [Thread-2] DEBUG com.jonasboner.supervisor.Supervisor - Linking actor [Main$SampleServer$1@1808199] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
12:25:30.062 [main] DEBUG Main$factory$2$ - Supervisor successfully configured
6. Start the Supervisor This also starts the servers.
supervisor ! Start
Output:
12:25:30.078 [Thread-8] INFO  com.jonasboner.supervisor.Supervisor - Starting server: Main$sampleServer2$2$@1479feb
12:25:30.078 [Thread-8] INFO  com.jonasboner.supervisor.Supervisor - Starting server: Main$sampleServer1$2$@97a560
7. Try to communicate with the servers. Here we try to send a couple one way asynchronous messages to our servers.
sampleServer1 ! OneWay
Try to get a reference to our sampleServer2 (by name) from the Supervisor before sending a message.
supervisor.getServer("sample2") match {
  case Some(server2) => server2 ! OneWay
  case None => println("server [sample2] could not be found")
}
Output:
Received OneWay
Received OneWay
8. Send a message using a future Try to send an asynchronous message - receive a future - and wait 100 ms (time-out) for the reply.
val future = sampleServer1 !! Ping
val reply1 = future.receiveWithin(100) match {
  case Some(reply) =>
    println("Received reply: " + reply)
  case None =>
    println("Did not get a reply witin 100 ms")
}
Output:
Received Ping
Received reply: Pong
9. Kill one of the servers Try to send a message (Die) telling the server to kill itself (by throwing an exception).
sampleServer1 ! Die
Output:
Received Die..dying...
12:25:30.093 [Thread-8] ERROR c.j.supervisor.AllForOneStrategy - Server [Main$SampleServer$1@1b9240e] has failed due to [java.lang.RuntimeException: Received Die message] - scheduling restart - scheme: ALL_FOR_ONE.
12:25:30.093 [Thread-8] DEBUG Main$sampleServer2$2$ - Waiting 1000 milliseconds for the server to shut down before killing it.
12:25:30.093 [Thread-8] DEBUG Main$sampleServer2$2$ - Server [sample2] has been shut down cleanly.
12:25:30.093 [Thread-8] DEBUG c.j.supervisor.AllForOneStrategy - Restarting server [sample2] configured as PERMANENT.
12:25:30.093 [Thread-8] DEBUG com.jonasboner.supervisor.Supervisor - Linking actor [Main$SampleServer$1@166aa18] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
12:25:30.093 [Thread-8] DEBUG Main$sampleServer1$2$ - Waiting 1000 milliseconds for the server to shut down before killing it.
12:25:30.093 [main] DEBUG com.jonasboner.supervisor.Helpers$ - Future timed out while waiting for actor: Main$SampleServer$1@1b9240e
Expected exception: java.lang.RuntimeException: Time-out
12:25:31.093 [Thread-8] DEBUG c.j.supervisor.AllForOneStrategy - Restarting server [sample1] configured as PERMANENT.
12:25:31.093 [Thread-8] DEBUG com.jonasboner.supervisor.Supervisor - Linking actor [Main$SampleServer$1@1968e23] to supervisor [com.jonasboner.supervisor.Supervisor@860d49]
10. Send an asyncronous message and wait on a future. If this call times out, the error handler we define will be invoked - in this case throw an exception. It is likely that this call will time out since the server is in the middle of recovering from failure and we are on purpose defining a very short time-out to trigger this behavior.
val reply2 = try {
  sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 10)
} catch { case e => println("Expected exception: " + e.toString); Pong }
The output of this call (due to the async nature of actors) is interleaved with the logging for the restart of the servers. As you can see the log below can be found in the middle of the restart output.
12:25:30.093 [main] DEBUG com.jonasboner.supervisor.Helpers$ - Future timed out while waiting for actor: Main$SampleServer$1@1b9240e
Expected exception: java.lang.RuntimeException: Time-out
Server should be up again. Try the same call again
val reply3 = try {
  sampleServer1 !!! (Ping, throw new RuntimeException("Time-out"), 1000)
} catch { case e => println("Expected exception: " + e.toString); Pong }
Output:
Received Ping
Also check that server number 2 is up and healthy.
sampleServer2 ! Ping
Output:
Received Ping
11. Try to hotswap the server implementation Here we are passing in a completely new implementation of the server logic (doesn't look that different tough, but it can be any piece of scala pattern matching code) to the server's hotswap method.
sampleServer1.hotswap(Some({
  case Ping =>
    println("Hotswapped Ping")
}))
12. Try the hotswapped server out
sampleServer1 ! Ping
Output:
Hotswapped Ping
13. Hotswap again
sampleServer1.hotswap(Some({
  case Pong =>
    println("Hotswapped again, now doing Pong")
    reply(Ping)
}))
14. Send an asyncronous message that will wait on a future (using a different syntax/method). Method returns an Option[T] which can be of two different types; Some(result) or None. If we receive Some(result) then we return the result, but if None is received then we invoke the error handler that we define in the getOrElse method. In this case print out an info message (but you could throw an exception or do whatever you like...) and return a default value (Ping).
val reply4 = (sampleServer1 !!! Pong).getOrElse({
  println("Time out when sending Pong")
  Ping
})
Output:
Hotswapped again, now doing Pong
Same invocation with pattern matching syntax.
val reply5 = sampleServer1 !!! Pong match {
  case Some(result) => result
  case None => println("Time out when sending Pong"); Ping
}
Output:
Hotswapped again, now doing Pong
15. Hotswap back to original implementation. This is done by passing in None to the hotswap method.
sampleServer1.hotswap(None)
16. Test the final hotswap
sampleServer1 !  Ping
Output:
Received Ping
17. Shut down the supervisor and its server(s)
supervisor ! Stop
Output:
12:25:31.093 [Thread-6] INFO  com.jonasboner.supervisor.Supervisor - Stopping server: Main$sampleServer2$2$@1479feb
12:25:31.093 [Thread-6] INFO  com.jonasboner.supervisor.Supervisor - Stopping server: Main$sampleServer1$2$@97a560
12:25:31.093 [Thread-6] INFO  com.jonasboner.supervisor.Supervisor - Stopping supervisor: com.jonasboner.supervisor.Supervisor@860d49
You can find this code in the sample.scala file in the root directory of the distribution. Run it by invoking:
scala -cp target/supervisor-0.3.jar:[dependency jars: slf4j and logback] sample.scala
Check out The SCM system used is Git. 1. Download and install Git 2. Invoke git clone [email protected]:jboner/scala-supervisor.git. Build it The build system used is Maven 2. 1. Download and install Maven 2. 2. Step into the root dir scala-supervisor. 3. Invoke mvn install This will build the project, run all tests, create a jar and upload it to your local Maven repository ready for use. Runtime dependencies Automatically downloaded my Maven. 1. Scala 2.7.1-final 2. SLF4J 1.5.2 3. LogBack Classic 0.9.9 That's all to it. Have fun.