We Are Reactive

Clustering Scala Actors with Terracotta

Clustering Scala Actors with Terracotta

Introduction

A month ago I wrote an introductory post about Scala Actors: HotSwap Code using Scala and Actors. For you who don’t know what it is I don’t want to start by reading the previous post let’s briefly recap what Actors are and what you can use them for.

An Actor is an abstraction that implements Message-Passing Concurrency. Actors have no shared state and are communicating by sending and receiving messages. This is a paradigm that provides a very different and much simple concurrency model than Shared-State Concurrency (the scheme adopted by C, Java, C# etc.) and is avoiding most of the latter one’s complexity and problems. This makes it possible to write code that is deterministic and side-effect-free, something that makes it easier to write, test, understand and reason about. Each Actor has a mailbox in which it receives incoming messages and normally uses pattern matching on the messages to decide if a message is interesting if action is needed.

Scala’s Actors are based on Doug Lea’s Fork/Join library and have for example been used very effectively in the excellent lift web framework to among other things to enable Comet style (push/streaming ajax) development. Actors allows us to, in a simple and uniformed way, parallelize applications using multiple threads, something that helps us take advantage of all the new dual/quad/… core or SMP machines that we are starting to get now days. But this also poses challenges; how can we make applications build on this “new” programming model highly available and how can we scale them out, if necessary. Would it not be cool if we could not only parallelize our application onto multiple threads but also onto multiple machines?

Note: Erlang, the most successful implementation of Actors to date, solves the challenges in building fault-tolerant and highly available systems in an elegant way using supervisor hierarchies. Nothing prevents an implement of this strategy in Scala Actors, all the primitives (like link, trap_exit etc.) already exists.

I have spent some time last weeks looking into if would make sense to utilize Terracotta to cluster the Scala Actors library to give a platform on which we can both scale Actors out in a distributed fashion and ensure full fault tolerance and high-availability. The result of this exercise have been successful and I’m happy to announce that they work very nice together. I will now spend the remainder of this post on walking you through a simple example in how to cluster a Scala Actor using Terracotta.

Check out the code from SVN

But before we do anything, let my point you to the SVN repository where you can fetch the Terracotta Integration Module (TIM) that I have implemented for Scala Actors. You can check it out anonymously by invoking:

svn co http://svn.terracotta.org/svn/forge/projects/labs/tim-scala-actors-2.6.1/

When that is done, step into tim-scala-actors-2.6.1/trunk and invoke mvn install (the last command requires that you have Maven installed). it’ll take a while for Maven to download all its dependencies but after a while you will have a shiny new TIM for Scala Actors installed in your local Maven repository (usually in ~/.m2). The sample that we will discuss in the next sections is available in the src/samples/scala directory with the sample configuration in the src/samples/resources directory.

Write an Actor

Now, let’s write a little Cart actor. This actor response to two different messages AddItem(item) and Tick. The former one adds an item to the Cart while the latter one triggers the Cart to print out its content (I’ll let you know why it’s called Tick in a second):

// Messages
case object Tick
case class AddItem(item: String)
  
class Cart extends Actor {
  private[this] var items: List[String] = Nil 

  def act() {
    loop {
      react {
        case AddItem(item) =>
          items = item :: items
        case Tick =>
          println("Items: " + items)
      }
    }
  }
  
  def ping = ActorPing.scheduleAtFixedRate(this, Tick, 0L, 5000L)
}

As you see the state is held by a Scala var, which holds onto a List (immutable). In react we wait for the next incoming message and if it is of type AddItem then we grab the item and append it to the list with all our items, but if the message is of type Tick we simply print the list of items out. Simple enough. But what is this method ping doing? It uses an object called ActorPing to schedule that a Tick should be sent to the Cart every 5 seconds (ActorPing is shamelessly stolen from Dave Pollak’s lift).

Configuration

In order to cluster the Cart actor we have to write two things. First a hack, a simple configuration file in which declare which actors we want to cluster. This is something that later should be put into the regular tc-config.xml file, but for now we have to live with it. So let’s create a file with one single line, stating the fully qualified name of the Cart actor;

 samples.Cart 
We can either name this file clustered-scala-actors.conf and put it in root directory of the application or name it whatever we want and feed it to Terracotta using the -Dclustered.scala.actors.config=[path to the file] JVM property. Second, we have to write the regular Terracotta configuration file (tc-config.xml). Here we essentially have to define three things; the TIM for Scala Actors, locks to guard our mutable state and finally which classes should be included for bytecode instrumentation.

Starting with the TIM for Scala Actors. Here we define the version on the module as well as the URL to our Maven repository (in a short while we will put this jar in the Terracotta Maven repository and then you would not have to point out a local one).

<modules>
 <repository>file:///Users/jonas/.m2/repository</repository>
 <module name="clustered-scala-actors-2.6.1" version="2.6.0.SNAPSHOT"/>
</modules>

Now we have to define the locks, which in Terracotta, also marks the transaction boundaries. The Cart has one mutable field (the var named items) that we need to ensure is guarded correctly and has transactional semantics. For each var Scala generates a setter and a getter. The getter is named the same as the field while the getter has the name suffixed with _$eq. That gives us the following lock definition:

<locks>
  <named-lock>
    <lock-name>cart_items_write</lock-name>
    <lock-level>write</lock-level>
    <method-expression>* samples.Cart.items_$eq(..)</method-expression>
  </named-lock>
  <named-lock>
    <lock-name>cart_items_read</lock-name>
    <lock-level>read</lock-level>
    <method-expression>* samples.Cart.items()</method-expression>
  </named-lock>
</locks>

We have to define a pair like this for each mutable user defined field in a clustered actor (not the standard one’s that are common for all Scala Actors, those are automatically defined).

It important to understand the TIM automatically clusters the Actor’s mailbox, which means that no messages will ever be lost – providing full fault-tolerance.

Finally we have to define the classes that we need to include for instrumentation. This naturally includes our application classes, e.g. the classes that are using our Cart actor in one way or the other. Those are picked out by the pattern like: 'samples.*'. We also have to include all the Scala runtime and library classes that we are referencing from the message is that we send. In our case that means the classes that are used to implement the List abstraction in Scala. Here is the full listing:

<instrumented-classes>
 <include>
   <class-expression>samples.*</class-expression>
 </include>
 <include>
   <class-expression>scala.List</class-expression>
 </include>
 <include>
   <class-expression>scala.$colon$colon</class-expression>
 </include>
 <include>
   <class-expression>scala.Nil$</class-expression>
 </include>
</instrumented-classes>

I could have included these (and many more) classes in the TIM, but since Terracotta adds a tiny bit of overhead to each class that it instruments I took the decision that it would be better to let the user explicitly define the classes that needs to be instrumented and leave the other ones alone. Since you can pretty much put any valid Scala data or abstraction in an actor message, it is very likely that you will have to declare some includes, else Terracotta will throw an exception (which is expected) with a message listing the XML snippet that you have to put in the tc-config.xml file. So don’t panic if things blow up.

Enable Terracotta

Last but not least, we need to enable Terracotta in the Scala runtime (if you are planning to run the application in a Terracotta enabled application server, then you can skip this section – however I think it might still be useful to be able to try the application out in the Scala REPL). The simplest way of doing that is to do some minor changes to the scala command. First, let’s step down into the scala/bin directory and make a copy of the scala command called tc-scala, then scroll down all the way to the bottom. As you can see it is just a wrapper around the regular java command, which makes things pretty easy for us. We start by defining some environmental variables (here showing my local settings):

TC_SCALA_ACTORS_CONFIG_FILE=/Users/jonas/src/java/tc-forge/projects/tim-scala-actors-2.6.1/trunk/src/samples/resources/clustered-scala-actors.conf
TC_CONFIG_FILE=/Users/jonas/src/java/tc-forge/projects/tim-scala-actors-2.6.1/trunk/src/samples/resources/tc-config.xml
TC_INSTALL_DIR=/Users/jonas/src/java/tc/code/base/build/dist/terracotta-trunk-rev6814
TC_BOOT_JAR="$TC_INSTALL_DIR"/lib/dso-boot/dso-boot-hotspot_osx_150_13.jar
TC_TIM_SCALA_ACTORS_JAR=/Users/jonas/.m2/repository/org/terracotta/modules/clustered-scala-actors-2.6.1/2.6.0-SNAPSHOT/clustered-scala-actors-2.6.1-2.6.0-SNAPSHOT.jar

When these variables have been defined we can replace the existing invocation of java with the following:

${JAVACMD:=java} ${JAVA_OPTS:=-Xmx256M -Xms256M} \
 -Xbootclasspath/p:"$TC_BOOT_JAR" \
 -Dtc.install-root="$TC_INSTALL_DIR" \
 -Dtc.config="$TC_CONFIG_FILE" \
 -Dclustered.scala.actors.config="$TC_SCALA_ACTORS_CONFIG_FILE" \
 -Dscala.home="$SCALA_HOME" \
 -Denv.classpath="$CLASSPATH" \
 -Denv.emacs="$EMACS" \
 -cp "$BOOT_CLASSPATH":"$EXTENSION_CLASSPATH":"$TC_TIM_SCALA_ACTORS_JAR" \
 scala.tools.nsc.MainGenericRunner  "$@"

Let’s run it

Enough hacking. Now let’s try it out. I think that the best way of learning new things in Scala is to use its REPL, so let’s start that up, this time with Terracotta enabled. But before we do that we have to start up the Terracotta server by stepping into the bin directory in the Terracotta installation and invoke:

$ ./start-tc-server.sh

Note: you need to grab Terracotta from SVN trunk to get the bits that work with the Scala TIM. See instructions on how to check out the sources and how to build it.

Now, we can start up the Terracotta enabled Scala REPL:

$ tc-scala
2008-01-25 07:42:11,643 INFO - Terracotta trunk-rev6814, as of 20080124-140101 (Revision 6814 by jonas@homer from trunk)
2008-01-25 07:42:12,136 INFO - Configuration loaded from the file at '/Users/jonas/src/java/tc-forge/projects/tim-scala-actors-2.6.1/trunk/src/samples/resources/tc-config.xml'.
2008-01-25 07:42:12,325 INFO - Log file: '/Users/jonas/terracotta/client-logs/scala/actors/20080125074212303/terracotta-client.log'.
Parsing scala actors config file: /Users/jonas/src/java/tc-forge/projects/tim-scala-actors-2.6.1/trunk/src/samples/resources/clustered-scala-actors.conf
Configuring clustering for Scala Actor: samples.Cart
Welcome to Scala version 2.6.0-final.
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Here we can see that it has found and connected to the Terracotta server, found our clustered-scala-actors.conf config file and configured clustering for one Scala Actor; samples.Cart.

Let’s have some fun and start up another REPL in another terminal window. In each of these we do the following; import our classes, create a new Cart (Actor) and start up the Actor.

scala> import samples._              
import samples._

scala> val cart = new Cart
cart: samples.Cart = samples.Cart@81af82

scala> cart.start
res0: scala.actors.Actor = samples.Cart@81af82

scala> 

Now we have a distributed Actor just waiting to be fed with some messages. We don’t want to make it disappointed so let’s now add a bunch of bananas and apples to the Cart, and then feed it with a Tick message to make it print out the result:

scala> cart ! AddItem("bananas")

scala> cart ! AddItem("apples")

scala> cart ! Tick

scala> Items: List(apples, bananas)

scala> 

Ok, so far no news. But comes the moment of truth, let’s take the other REPL and fire of a Tick:

scala> cart ! Tick

scala> Items: List(apples, bananas)

scala> 

Yippee, it works. Now we can invoke the ping method to schedule a Tick (to print out status) every 5 seconds.

scala> cart.ping
res2: java.util.concurrent.ScheduledFuture = java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3a4388

scala> Items: List(apples, bananas)

scala> Items: List(apples, bananas)

scala> cart ! AddItem("football")

scala> Items: List(football, apples, bananas)

...

How to define scope of the clustered Actor?

The Scala Actors TIM currently supports three different scopes; instance, class and custom. The scope is defined by appending a colon ‘:’ and the type of scope after the FQN of the Actor in the clustered-scala-actors.conf. If no scope is defined then the Actor is assumed to have scope instance. For example:

com.biz.Foo:custom
com.biz.Bar:class
com.biz.Baz:instance  
com.biz.Bam

The default scope named instance means that the Scala TIM is transparently intercepting the instantiation (f.e. new Cart) of all the Actors that you declare in the clustered-scala-actors.conf file. Each clustered Actor instance will have a unique identity across the cluster and each time this specific instance is created (f.e. when a new node joins the cluster) then the clustered instance with this specific identity will be handed out. The TIM distinguishes between actors of the same type but instantiated in different code paths. To take an example, let’s create one object ActorFactory with one single method create:

object ActorFactory {
  def create: Actor = new MyActor
}

If we now have two classes Foo and Bar as follows:

class Foo {
  val actor = ActorFactory.create
}

class Bar {
  val actor = ActorFactory.create
}

Then Foo and Bar will have two distinct clustered Actors each with a unique but cluster-wide identity.

The class scope lets all Actors of a the same type share Actor instance, so each time an Actor of a specific type is created the same clustered one will be handed out.

Finally we have the custom scope. Which, as it sounds, allows custom user defined scoping.

How to define custom scoped Actors?

If you want more control over scope and life-cycle of a specific Actor then you can define it to have custom scope in the clustered-scala-actors.conf file and create a factory in which you bind each Actor to whatever scope you wish. But now you have to create some data structure that is holding on to your Actors in the factory and explicitly define it to be a root in the tc-config.xml file. The factory might look something like this:

// Cart factory, allows mapping an instance to any ID
object Cart {
  
  // This instance is the custom Terracotta root
  private[this] val instances: Map[Any, Cart] = new HashMap

  def newInstance(id: Any): Cart = {
    instances.get(id) match {
      case Some(cart) => cart
      case None => 
        val cart = new Cart
        instances += (id -> cart)
        cart
    }
  }
}

This means that we have to add some more configuration elements to our Terracotta configuration. First we need to add the root samples.Cart$.instances (Cart$ is the name of Scala’s compiled Cart companion object, all companion objects compiles to a class with the name of the original class suffixed with $):

<roots>
  <root>
    <field-name>samples.Cart$.instances</field-name>
  </root>
</roots>

Then we have to add locking for the Cart.newInstance(..) method and finally a whole bunch of new include statements for all the Scala types that are referenced by the scala.collection.mutable.HashMap that we used as root:

...  
<named-lock>
  <lock-name>Cart_newInstance</lock-name>
  <lock-level>write</lock-level>
  <method-expression>* samples.Cart$.newInstance(..)</method-expression>
</named-lock>

...
          
<include>
  <class-expression>scala.collection.mutable.HashMap</class-expression>
</include>
<include>
  <class-expression>scala.runtime.BoxedAnyArray</class-expression>
</include>
<include>
  <class-expression>scala.runtime.BoxedArray</class-expression>
</include>
<include>
  <class-expression>scala.collection.mutable.DefaultEntry</class-expression>
</include>
  
...

That’s pretty much all there’s to it. Check out the code, play with it and come back with feedback, bug reports, patches etc.

Enjoy.

Jonas Bonér 25 January 2008
blog comments powered by Disqus