Jonas Bonér bio photo

Jonas Bonér

Present.
Entrepreneur.
Hacker.
Public Speaker.
Powder Skier.
Perpetual Learner.
Jazz Fanatic.
Wannabee Musician.

Twitter LinkedIn Github
h1. Introduction In this article, I will show you step by step how you can build a reusable POJO-based _Data Grid_ with nothing but standard JDK *5 and then make it distributed through the use of the declarative JVM-level clustering technology provided by [Terracotta](http://terracotta.org). The article is divided into three sections: * In the first section I will talk about the concepts of _Data Grids_, what they are and what they do. I will discuss some of the underlying mechanisms that are used in _Data Grids_ and finally how one can scale out applications on a _Data Grid_. * The second section will walk you through how to build a naive but fully usable implementation of a POJO-based _Data Grid_ by implementing a multi-threaded _Master/Worker_ container and cluster it using "Terracotta's JVM-level clustering technologies":http://terracotta.org. * Last, in the third section I will show you how to extend the initial implementation to handle real-world requirements such as dealing with; high volumes of data, work failure, different routing algorithms, ordering of work and worker failure. h1. Part 1: Data Grids - What's behind the buzzwords? h2. What are Data Grids? A _Data Grid_ is a set of servers that together creates a mainframe-class processing service where data and operations can move seamlessly across the grid in order to optimize the performance and scalability of the computing tasks submitted to the grid. A _Data Grid_ scales through use of "Locality of Reference":http://en.wikipedia.org/wiki/Locality_of_reference (see the section "How do Data Grids scale?") and is highly-available through effective use of data duplication. It combines data management with data processing. h2. What are Data Grids used for? Applications that could benefit from being run on a _Data Grid_ are usually applications that needs to work on large data sets and/or have a need to parallelize processing of the data in order to get better throughput and performance. Examples are financial risk analysis and other simulations, searching and aggregation on large datasets as well as sales order pipeline processing. h2. How do Data Grids scale? One of the main reasons why _Data Grids_ can scale so well is that they can make intelligent choices if it should move data to its processing context or moved the processing context (the operations) to the data. Effective use of the latter means that it can make use of _Locality of Reference_; this means that data that is being processed on a specific node stays local to that node and will in the ideal case never have to leave that node. Instead, all operations that are working on this specific data set will always be routed to this specific node. The immediate benefits are that it has the advantage of minimizing the latency and can in the ideal case give unlimited and linear scalability. How well it scales is of course use-case specific, and it can take both effort and skills in partitioning the data and work sets, as well as potential routing algorithms. The ultimate (and simplest) situation is when all work are what we call "Embarrassingly Parallel":http://en.wikipedia.org/wiki/Embarrassingly_parallel - which means that it has no shared state and can to its working complete isolation. But it is in most cases acceptable to partition the work into _work sets_, in such a way that the _work sets_ have no shared state. However, the latter solution might need some intelligent routing algorithms, something that we will take a look at later. h2. How do Data Grids deal with failure? _Data Grids_ are resilient to failure and down time by effective use of data duplication. _Data Grids_ can be seen as an organic system of cells (nodes) that is designed to not only handle, but expect failure of individual cells (nodes). This is very different from traditional design of distributed systems in which each node is seen as an isolated unit which must always expect the worst and protect itself accordingly. See "this page":http://www.artima.com/forums/flat.jsp?forum=106&thread=172063) for a more thorough discussion on the subject. h1. Part 2: Build a multi-threaded Master/Worker container and cluster it with Terracotta Master/Worker algorithm h2. What is the Master/Worker pattern? The _Master/Worker_ pattern is a pattern that is heavily used in _Data Grids_ and is one of the most well-known and common patterns for parallelizing work. We will now explain the characteristics of the pattern and then go through the possible approaches for a Java implementation. So, how does it work? The _Master/Worker_ pattern consists of three logical entities: a _Master_, a _Shared Space_ and one or more instances of a _Worker_. The _Master_ initiates the computation by creating a set of tasks, puts them in some shared space and then waits for the tasks to be picked up and completed by the _Workers_. The shared space is usually some sort of _Shared Queue_, but it can also be implemented as a _Tuple Space_ (for example in Linda programming environments, such as JavaSpaces, where the pattern is used extensively). One of the advantages of using this pattern is that the algorithm automatically balances the load. This is possible due to the simple fact that, the work set is shared, and the workers continue to pull work from the set until there is no more work to be done. The algorithm usually has good scalability as long as the number of tasks, by far exceeds the number of workers and if the tasks take a fairly similar amount of time to complete. h2. Possible approaches in Java Let's now look at the different alternatives we have for implementing this in Java. There might be more ways of doing it, but we will focus the discussion on three different alternatives, each one with a higher abstraction level. h3. Using Java's threading primitives The most hard-core approach is to use the concurrency and threading primitives that we have in the _Java Language Specification_ (JLS), e.g. _wait/notify_ and the _synchronized_ and _volatile_ keywords. The benefits are that everything is really "under your fingers", meaning that you could customize the solution without limitations. However, this is also its main problem, since it is both a very hard and tedious to implement this yourself, and will most likely be even worse to maintain. These low-level abstractions is not something that you want to work with on a day-to-day basis. We need to raise the abstractions level above the core primitives in the _Java Memory Model_ and that is exactly what the data structure abstractions in the _java.util.concurrent_ library in JDK *5 does for us. h3. Using the java.util.concurrent abstractions Given that using the low-level abstractions in JLS is both tedious and hard to use, the concurrency abstractions in JDK *5 was both a very welcome and natural addition to the Java libraries. It is a very rich API that provides everything from semaphores and barriers to implementations of the Java Collections data structure interfaces highly tuned for concurrent access. It also provides an _ExecutorService_, which is mainly a thread pool that provides direct support for the _Master/Worker_ pattern. This is very powerful, since you're basically getting support from the _Master/Worker_ pattern in a single abstraction. It is possible to cluster the _ExecutorService_ using _Terracotta_, you can read about that exercise in "this article":http://www.theserverside.com/tt/articles/article.tss?l=DistCompute. Even though this approach would be sufficient in many situations and use-cases it has some problems: * First, it does not separate the master from the worker (since they are both part of the same abstraction). What this means in practice is that you cannot scale out that the master independently of the worker but each node will contain both a master and a worker. * Second, and perhaps even more important, is that it is lacking a layer of reliability and control. Meaning that it is no way of knowing if a specific work task has been started, completed or if it has been rejected due to some error. This means that there is no way we can detect work failure and can retry the work task on the same node or on another node. So we need a way to deal with these things in a simple and if possible standardized way. h3. Using the CommonJ WorkManager specification Spawning and coordinating threads is something that have been prohibited by the EJB specification. However, there has naturally been (and still is) a common need for coordinating work in JEE (something that was partly, but not completely solved in a very verbose way with _Message-Driven Beans_ (MDB)). This need was the main motivation why IBM and BEA decided to do a joint specification that solves this problem by providing a standardized way of executing concurrent tasks in a JEE environment. The specification is called _CommonJ_ and has support for the _Master/Worker_ pattern in its _WorkManager_ API. From BEA's documentation about the specification: ??"The Work Manager provides a simple API for application-server-supported concurrent execution of work items. This enables J2EE-based applications (including Servlets and EJBs) to schedule work items for concurrent execution, which will provide greater throughput and increased response time. After an application submits work items to a Work Manager for concurrent execution, the application can gather the results. The Work Manager provides common "join" operations, such as waiting for any or all work items to complete. The Work Manager for Application Servers specification provides an application-server-supported alternative to using lower-level threading APIs, which are inappropriate for use in managed environments such as Servlets and EJBs, as well as being too difficult to use for most applications."?? What is interesting is that specification not only defines an API for submitting work and getting the result back, but it also provides functionality for tracking work status in that it allows you to register listener that will receive callback events whenever the state of the work has been changed. This makes it possible to for example not only detect work failure but also the reason why it failed, which gives us a possibility of restarting the work. Each of the three approaches we have looked at so far have been built upon the previous one and has gradually raised the abstraction level. But even more importantly minimized and simplified the code that we as users have to write and maintain. The most natural choice is to base our implementation on the _CommonJ WorkManager_ specification. It is a simple and minimalistic specification and seems to provide everything that we need in order to build a reliable _Master/Worker_ container. Here are the interfaces in the specification:
public interface Work extends Runnable { }

public interface WorkManager {
  WorkItem schedule(Work work);
  WorkItem schedule(Work work, WorkListener listener);
  boolean waitForAll(Collection workItems, long timeout);
  Collection waitForAny(Collection workItems, long timeout);
}

public interface WorkItem {
  Work getResult();
  int getStatus();
}

public interface WorkListener {
  void workAccepted(WorkEvent we);
  void workRejected(WorkEvent we);
  void workStarted(WorkEvent we);
  void workCompleted(WorkEvent we);
}

public interface WorkEvent {
  int WORKACCEPTED  = 1;
  int WORKREJECTED  = 2;
  int WORKSTARTED   = 3;
  int WORKCOMPLETED = 4;
  public int getType();
  public WorkItem getWorkItem();
  public WorkException getException();
}
h2. The Plan The plan is that we will first implement the specification as a regular multi-threaded application using the _java.util.concurrent_ abstractions (in particular the _ExecutorService_ and the _LinkedBlockingQueue_ classes) and then use _Terracotta_ to declaratively (and transparently), turn it into a multi-JVM, distributed implementation. h2. Creating the Master (WorkManager) We start with creating the _SingleQueueWorkManager_, which serves as our _Master._ It implements the _CommonJ WorkManager_ interface which provides the API that the user uses to schedule the _Work_ and wait for the _Work_ to be completed. It has a reference to the work queue that is shared between the _Master_ and the _Workers_, in this case represented by the _SingleWorkQueue_ abstraction. Here is how we could implement the work manager:
public class SingleQueueWorkManager implements WorkManager {

private final SingleWorkQueue m_queue;

  public SingleQueueWorkManager(final SingleWorkQueue queue) {
    m_queue = queue;
  }

  public WorkItem schedule(final Work work) throws WorkException {
    DefaultWorkItem workItem = new DefaultWorkItem(work, null);
    m_queue.put(workItem);
    return workItem;
  }

  public WorkItem schedule(final Work work, final WorkListener listener)
      throws WorkException {
    DefaultWorkItem workItem = new DefaultWorkItem(work, listener);
    m_queue.put(workItem);
    return workItem;
  }

  public boolean waitForAll(Collection workItems, long timeout) {
    long start = System.currentTimeMillis();
    do {
      boolean isAllCompleted = true;
      for (Iterator it = workItems.iterator();
           it.hasNext() && isAllCompleted;) {
        int status = ((WorkItem) it.next()).getStatus();
        isAllCompleted =
            status == WorkEvent.WORKCOMPLETED ](](
            status == WorkEvent.WORKREJECTED;
      }
      if (isAllCompleted) { return true; }
      if (timeout == IMMEDIATE) { return false; }
      if (timeout == INDEFINITE) { continue; }
    } while ((System.currentTimeMillis() - start) < timeout);
    return false;
  }

  public Collection waitForAny(Collection workItems, long timeout) {
    long start = System.currentTimeMillis();
    do {
      synchronized (this) {
        Collection completed = new ArrayList();
        for (Iterator it = workItems.iterator(); it.hasNext();) {
          WorkItem workItem = (WorkItem) it.next();
          if (workItem.getStatus() == WorkEvent.WORKCOMPLETED ](](
              workItem.getStatus() == WorkEvent.WORKREJECTED) {
            completed.add(workItem);
          }
        }
        if (!completed.isEmpty()) { return completed; }
      }
      if (timeout == IMMEDIATE) { return Collections.EMPTYLIST; }
      if (timeout == INDEFINITE) { continue; }
    } while ((System.currentTimeMillis() - start) < timeout);
    return Collections.EMPTYLIST;
  }
}
h2. Creating the Shared Queue The _SingleQueueWorkManager_ schedules work by adding work to the _SingleWorkQueue_. The _SingleWorkQueue_ is the artifact that has state that needs to be shared between the _Master_ and the _Workers_, since it holds the queue with all the pending _Work_. We need to have a single instance of this queue that can be available to both the _Master_ and all its _Workers_. The work queue can be implemented like this:
public class SingleWorkQueue {

  // Terracotta Shared Root

  private final BlockingQueue m_workQueue =
      new LinkedBlockingQueue();

  public void put(final DefaultWorkItem workItem) throws WorkException {
    try {
      mworkQueue.put(workItem); // blocks if queue is full

    } catch (InterruptedException e) {
      e.printStackTrace();
      WorkRejectedException we = new WorkRejectedException(e.getMessage());
      workItem.setStatus(WorkEvent.WORKREJECTED, we);
      Thread.currentThread().interrupt();
      return we;            }
  }

  public DefaultWorkItem peek() throws WorkException {
    return m_workQueue.peek(); // returns null if queue is empty

  }

  public DefaultWorkItem take() throws WorkException {
    try {
      return m_workQueue.take(); // blocks if queue is empty

    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new WorkException(e);
    }
  }
}
As you can see, it is a simple wrapper around a _java.util.concurrent.BlockingQueue_ queue and has three methods; _take(), put()_ and _peek()_ which simply delegates to the _BlockingQueue_ but also adds error handling in that it sets the status for the _Work_ in case of failure. h2. Creating the Worker Finally, we have the _Worker_, in our case represented by the _SingleQueueWorker_ abstraction. This class uses a thread pool to spawn up N number of worker threads that continuously grabs and executes _Work_ from the _SingleWorkQueue_. During the processing of the _Work_, the status flag in the wrapping _WorkItem_ is maintained (can be one of either _ACCEPTED, STARTED, COMPLETED_ or _REJECTED_). This is needed in order for the _SingleQueueWorkManager_ to be able to continuously monitor the status of the _Work_ it has scheduled.  This is what a _Worker_ implementation can look like. As you can see we choose to make use of the _ExecutorService_ thread pool implementation in the _java.util.concurrent_ package:
public class SingleQueueWorker implements Worker {

  protected final ExecutorService mthreadPool = Executors.newCachedThreadPool();
  protected final SingleWorkQueue mqueue;
  protected volatile boolean m_isRunning = true;

  public SingleQueueWorker(final SingleWorkQueue queue) {
    m_queue = queue;
  }

  public void start() throws WorkException {
    while (misRunning) {
      final DefaultWorkItem workItem = mqueue.take();
      final Work work = workItem.getResult();
      mthreadPool.execute(new Runnable() {
        public void run() {
          try {
            workItem.setStatus(WorkEvent.WORKSTARTED, null);
            work.run();
            workItem.setStatus(WorkEvent.WORKCOMPLETED, null);
          } catch (Throwable e) {
            workItem.setStatus(
                WorkEvent.WORKREJECTED, new WorkRejectedException(e));
          }
        }
      });
    }
  }

  public void stop() {
    misRunning = false;
    mthreadPool.shutdown();
  }
}
h2. What about my Work? Now we have the three main artifacts in place; the _Master_, the _Worker_ and the _Shared Queue_. But what about this _Work_ abstraction and which role does the _DefaultWorkItem_ play? Starting with the _Work_ abstraction. It is an interface in the _CommonJ WorkManager_ specification but is an interface that we cannot implement generically in the _WorkManager_ "container" we are building now, but should be implemented by the user of this container since the implementation of the actual work that is supposed to be done is of course use-case specific. The _DefaultWorkItem_ is an implementation of the _WorkItem_ interface in the specification. It's purpose is simply to wrap a _Work_ instance and provide additional information, such as status and an optional _WorkListener_ and can look something like this:
public class DefaultWorkItem implements WorkItem {

private volatile int mstatus;
  private final Work mwork;
  private final WorkListener m_workListener;

  public DefaultWorkItem(final Work work, final WorkListener workListener) {
    mwork = work;
    mstatus = WorkEvent.WORKACCEPTED;
    mworkListener = workListener;
  }

  public Work getResult() {
    return m_work;
  }

  public synchronized void setStatus(
      final int status, final WorkException exception) {
    mstatus = status;
    if (mworkListener != null) {
      switch (status) {
        case WorkEvent.WORKACCEPTED:
          mworkListener.workAccepted(
              new DefaultWorkEvent(WorkEvent.WORKACCEPTED, this, exception));
          break;
        case WorkEvent.WORKREJECTED:
          mworkListener.workRejected(
              new DefaultWorkEvent(WorkEvent.WORKREJECTED, this, exception));
          break;
        case WorkEvent.WORKSTARTED:
          mworkListener.workStarted(
              new DefaultWorkEvent(WorkEvent.WORKSTARTED, this, exception));
          break;
        case WorkEvent.WORKCOMPLETED:
          mworkListener.workCompleted(
              new DefaultWorkEvent(WorkEvent.WORKCOMPLETED, this, exception));
          break;
      }
    }
  }

  public synchronized int getStatus() {
    return m_status;
  }
  ... // remaining methods omitted (toString() and compareTo())

}
A _WorkListener_ is a listener that the user can register in order to track the status of the work and can upon reception of a state-changed-event take proper action (like retry upon failure etc.) That's it, now we now have a fully functional single-JVM, multi-threaded, implementation of the _CommonJ WorkManager_ specification. Even thought this could be useful as is, there is no way we can scale it out with the needs of the application, since we are bound by the computing power in the single box we are using. Plus, it is not topic for the article. So, let's make it a bit more interesting. h2. Introducing Terracotta "Terracotta":http://terracotta.org is a product that delivers JVM-level clustering as a runtime infrastructure service. It is Open Source and available under a Mozilla-based license. _Terracotta_ uses bytecode instrumentation to adapt the target application at class load time. In this phase it extends the application in order to ensure that the semantics of the _Java Language Specification_ (JLS) are correctly maintained across the cluster, including object references, thread coordination, garbage collection etc. Another important thing to mention is that _Terracotta_ does not use _Java Serialization_, which means that any POJO can be shared across the cluster. What this also means is that _Terracotta_ is not sending the whole object graph for the POJO state to all nodes but breaks down the graph into pure data and is only sending the actual "delta" over the wire, meaning the actual changes - the data that is "stale" on the other node(s). _Terracotta_ is using an architecture known as _hub-and-spoke_, which means that it has one central L2 server and N number of L1 clients. (The L1 client is the Terracotta client JARs running inside the target JVM, while the L2 server is the central Terracotta server. L1 and L2 refers to first and second level caches.) This might seem strange, since most clustering solutions on the market today are using _peer-to-peer_, but as we will see, _hub-and-spoke_ has some advantages and makes it possible to do some very interesting optimizations. The server plays two different roles: * First it serves as the coordinator ("the traffic cop") in the cluster. It keeps track of things like; which thread in which node is holding which lock, which nodes are referencing which part of the shared state, which objects have not been used for a specific time period and can be paged out, etc. Keeping all this knowledge in one single place is very valuable and allows for very interesting optimizations. * Second, it serves as a dedicated state _Service of Record_ (SoR), meaning that it stores all the shared state in the cluster. The state server does not know anything about Java, but only stores the bytes of the data that has changed plus a minimal set of meta info. The L2 server itself is clusterable through a SAN-based failover mechanism. This means that it is possible to scale-out the L2 server in the same fashion as most peer-to-peer solutions but with the advantage of keeping the L2 separate from the L* This separation allows us to scale out the L1 clients and the L2 servers independently of each other, which is the way that the _Internet_ scales. One way of looking at _Terracotta_ is to see it as "_Network Attached Memory_":http://www.devx.com/Java/Article/32603/1763?supportItem=* _Network Attached Memory_ (NAM) is similar to _Network Attached Storage_ (NAS) in the sense that JVM-level (heap-level) replication is making NAM's presence transparent just like NAS can be transparent behind a file I/O API. Getting NAM to perform and scale is similar to any I/O platform; read-ahead buffering, read/write locking optimizations etc. Even though it is in clustering, meaning scalability and high-availability, of Web and enterprise applications, that _Terracotta_ can bring its most immediate value, it is really a platform for solving generic distributed computing and shared memory problems in plain Java code. This is something that makes it applicable to a wide range problem domains, for example building a POJO-based _Data Grid_. h2. Let's cluster it using Terracotta! I know what you are thinking: > "Clustering, hmmm... Now comes the hard part right?" Well...no. It turns out that in order to cluster our current _WorkManager_ implementation, all we have to do is to create a _Terracotta_ configuration file in which we define three things: * The fully qualified name of the top level object in the object graph that we want to share. In our case we want to share the work queue. This means that the most natural root would be the _LinkedBlockingQueue_ in the _SingleWorkQueue_ class, e.g. the m_workQueue field. * The classes that we want to include for instrumentation. We can include all classes for instrumentation, e.g. use a "match-all" pattern, but it is usually better to narrow down the scope of the classes that _Terracotta_ needs to introspect. * The potential lock boundaries that we want Terracotta to introspect. These are called _auto-locks_ and is simply a hint that Terracotta should treat the synchronized blocks in these places as transaction boundaries. (You can also define explicit locks called _named-locks_.) In our case we will define a "match-all" pattern for the locking, something that works fine in most cases and should be treated as the default.
...
<roots>
  <root>
    <field-name>
      org.terracotta.datagrid.workmanager.singlequeue.SingleWorkQueue.m_workQueue
    </field-name>
  </root>
</roots>
<instrumented-classes>
  <include>
    <class-expression>
      org.terracotta.datagrid.workmanager..*
    </class-expression>
    <honor-transient>true</honor-transient>
  </include>
</instrumented-classes>

<locks>
  <autolock>
    <method-expression>* ...*(..)</method-expression>
    <lock-level>write</lock-level>
  </autolock>
</locks>
...
Done! Now we have a distributed/multi-JVM _CommonJ WorkManager_ ready for use. But in order to call it a POJO-based _Data Grid_, we need extend it a bit and address some challenges that are likely to arise if we were to deploy this implementation into production. h1. Part 3: Getting serious - Handling real-world challenges Now we have learned the basics of _Data Grids_, the _Master/Worker_ pattern and what the _CommonJ WorkManager_ specification is all about. We also walked you through how to implement a distributed _CommonJ WorkManager_ by first creating a single-JVM implementation that we then cluster into a distributed multi-JVM implementation with _Terracotta_. However, it was a fairly simple and in some sense naïve implementation. This was a good exercise in terms of education and understanding, but in order to use the implementation in the real world we need to know how to address some of the challenges is that might come up. Now we will discuss some of these challenges and how we can extend the initial implementation to address them. The challenges that we will look at, one by one, are how to handle: * Very high volumes of data? * Work failure? * Routing? * Ordering? * Worker failure? h2. Dealing with very high volumes of data Our current implementation has one single queue that is shared by the master(s) and the all workers. This usually gives acceptable performance and scalability when used with a moderate work load. However, if we need to deal with very high volumes of data then it is likely that we will bottleneck on the single queue. So how can we address this in the most simple fashion? The perhaps simplest solution is to create one queue per worker, and have the master do some more or less intelligent load-balancing. If we are able to do a good partition of the work and data, that we discussed in the previous section ("Scaling Data Grids"), then this solution is one that will: * **Maximize the use of _Locality of Reference_** - since all work that are operating on the same data set will be routed to the same queue with the same worker working on them * **Minimize contention** - since since there will only be one reader and one writer per queue. If we take a look at the current code, what needs to be changed is to first change the _SingleWorkQueue_ class to a _WorkQueueManager_ class and swap the single _LinkedBlockingQueue_ to a _ConcurrentHashMap_ with entries containing a _routing ID_ mapped to a _LinkedBlockingQueue_: Swap the wrapped _LinkedBlockingQueue_:
public class SingleWorkQueue {
  BlockingQueue<WorkItem> m_workQueue = new LinkedBlockingQueue<WorkItem>();
}
To a _ConcurrentHashMap_ of _LinkedBlockingQueue_'s :
public class DefaultWorkQueueManager<ID> implements WorkQueueManager {
  Map<ID , BlockingQueue<WorkItem>> m_workQueues =
      new ConcurrentHashMap<ID, BlockingQueue<WorkItem>>();
}
We also need to change the _WorkManager_ implementation and rewrite the _schedule(..)_ methods to use a _Router_ abstraction and not put the work directly into the queue: Change the _schedule(..)_ method from:
public class DefaultWorkManager implements WorkManager {

  public WorkItem schedule(final Work work) {
    WorkItem workItem = new DefaultWorkItem(work, null);
    m_queue.put(workItem);
    return workItem;
  }
  ...
}
To redirect to the _Router_ abstraction (more on the _Router_ below):
public class RoutingAwareWorkManager<ID> implements WorkManager {

  public WorkItem schedule(final Work work) {
    return m_router.route(work);
  }
  ...
}
In the last code block we could see that we have introduced a new abstraction; the _Router_, which leads us to the topic of how to deal with different routing schemes. h2. Dealing with different routing schemes By splitting up the single working queue into multiple queues each with a unique routing ID we are opening up for the possibility of providing different routing schemes that can be customized to address specific use cases. As in the previous section, we have to make some changes to the initial single queue implementation. First we need to add a routing id to the _WorkItem_ abstraction, we do that by adding the generic type ID and let the _WorkItem_ implement the _Routable_ interface:
public class RoutableWorkItem<ID> extends DefaultWorkItem implements Routable<ID> {

  protected ID m_routingID;

  public ID getRoutingID() {
    return m_routingID;
  }
  ...
}
As we saw in the previous section, we also introduce a new abstraction called _Router_:
public interface Router<ID> {
  RoutableWorkItem<ID> route(Work work);
  RoutableWorkItem<ID> route(Work work, WorkListener listener);
  RoutableWorkItem<ID> route(RoutableWorkItem<ID> WorkItem);
}
This abstraction can be used to implement various load-balancing algorithms, such as for example: * **Round-robin** - _Router_ circles around all queues one by one * **Work load sensitive balancing** - _Router_ looks at queue depth and always sends the next pending work to the shortest queue * **Data affinity** - "Sticky routing", meaning that the _Router_ sends all pending work of a specific type to a specific queue * **Roll your own** - to maximize _Locality of Reference_ for your specific requirements In our _Router_ implementation we have three default algorithms; _round-robin_, _load-balancing_ and _single queue_. You can find them as static inner classes in the _Router_ interface. Here is an example of the load-balancing router implementation that takes an array of the routing IDs that are registered and always sends the next pending work to the shortest queue:
public class LoadBalancingRouter<ID> implements Router<ID> {

  private final WorkQueueManager<ID> m_queueManager;

  private static class WorkQueueInfo<ID> {
    public ID routingID;
    public BlockingQueue<RoutableWorkItem<ID>> workQueue;
    public int queueLength = Integer.MAX_VALUE;
  }

  public LoadBalancingRouter(final WorkQueueManager<ID> queueManager, final ID[] routingIDs) {
    m_queueManager = queueManager;
    // create all queues upfront

    for (int i = 0; i < routingIDs.length; i++) {
      m_queueManager.getOrCreateQueueFor(routingIDs[i]);
    }
  }

  public RoutableWorkItem<ID> route(final Work work) {
    return route(work, null);
  }

  public RoutableWorkItem<ID> route(final Work work, final WorkListener listener) {
    WorkQueueLength<ID> shortestQueue = getShortestWorkQueue();
    RoutableWorkItem<ID> workItem = new RoutableWorkItem<ID>(work, listener, shortestQueue.routingID);
    try {
      shortestQueue.workQueue.put(workItem);
    } catch (InterruptedException e) {
      workItem.setStatus(WorkEvent.WORK_REJECTED, new WorkException(e));
      Thread.currentThread().interrupt();
    }
    return workItem;
  }

  public RoutableWorkItem<ID> route(final RoutableWorkItem<ID> workItem) {
    WorkQueueLength<ID> shortestQueue = getShortestWorkQueue();
    synchronized (workItem) {
      workItem.setRoutingID(shortestQueue.routingID);
    }
    try {
      shortestQueue.workQueue.put(workItem);
    } catch (InterruptedException e) {
      workItem.setStatus(WorkEvent.WORK_REJECTED, new WorkException(e));
      Thread.currentThread().interrupt();
    }
    return workItem;
  }

  private WorkQueueLength<ID> getShortestWorkQueue() {
    WorkQueueLength<ID> shortestQueue = new WorkQueueLength<ID>();

    int queueLengthForShortestQueue = Integer.MAX_VALUE;
    ID routingIDForShortestQueue = null;
    Map<ID, BlockingQueue<RoutableWorkItem<ID>>> queues = m_queueManager.getQueues();
    for (Map.Entry<ID, BlockingQueue<RoutableWorkItem<ID>>> entry: queues.entrySet()) {
      ID routingID = entry.getKey();
      BlockingQueue<RoutableWorkItem<ID>> queue = entry.getValue();
      int queueSize = queue.size();
      if (queueSize <= queueLengthForShortestQueue) {
        queueLengthForShortestQueue = queueSize;
        routingIDForShortestQueue = routingID;
      }
    }
    shortestQueue.workQueue = m_queueManager.getOrCreateQueueFor(routingIDForShortestQueue);
    shortestQueue.routingID = routingIDForShortestQueue;
    return shortestQueue;
  }
}
h2. Dealing with work failure As we discussed earlier in this article the _CommonJ WorkManager_ specification provides APIs for event-based failure reporting and tracking of work status. Each _Work_ instance is wrapped in a _WorkItem_ instance which provides status information. It also gives us the possibility of defining an optional _WorkListener_ through which we will get callback events whenever the status of the work has been changed. The _WorkListener_ interface looks like this:
public interface WorkListener {
  void workAccepted(WorkEvent we);
  void workRejected(WorkEvent we);
  void workStarted(WorkEvent we);
  void workCompleted(WorkEvent we);
}
As you can see, we can implement callbacks methods that subscribes to events triggered by work being _accepted_, _rejected_, _started_ and _completed_. In this particular case we are mainly interested in doing something when receiving the _work rejected_ event. In order to do that to we simply need to create a implementation of the _WorkListener_ interface and add some code in the _workRejected_ method:
public class RetryingWorkListener implemets WorkListener {

  public void workRejected(WorkEvent we) {
    Expection cause = we.getException();
    WorkItem wi = we.getWorkItem();
    Work work = wi.getResult(); // the rejected work


    ... // reroute the work onto queue X

  }

  public void workAccepted(WorkEvent event) {}
  public void workCompleted(WorkEvent event) {}
  public void workStarted(WorkEvent event) {}
}
h2. Dealing with ordering of work In some situations it might be important to define and maintain ordering of the work. This doesn't seem to be a problem if we have a single instance of the master, but imagine scaling out the master; then it immediately gets worse. So, how can we maintain ordering of the work? Since the internal implementation is based on POJOs and everything is open and customizable, it is actually very easy to implement support for ordering. All we need to do is to swap our map of _LinkedBlockingQueue_(s) to a map of _PriorityBlockingQueue_ (s). Then you can let your _Work_ implement _Comparable_ and create a custom _Comparator_ that you pass it into the constructor of the _PriorityBlockingQueue_:
Comparator c = new Comparator<RoutableWorkItem<ID>>() {
    public int compare(
        RoutableWorkItem<ID> workItem1,
        RoutableWorkItem<ID> workItem2)
      Comparable work1 =
         (Comparable)workItem*getResult();
      Comparable work2 =
         (Comparable)workItem2.getResult();
      return work*compareTo(work2);
    }};
If you need to maintain ordering of the result then you have to do something similar with the references to the _WorkItem_(s) that you get when invoking _schedule(..)_ on the _WorkManager_ instance. h2. Dealing with worker failure In a reliable distributed system it is important to be able to detect if a worker goes down. (In order to simplify the discussion, let's define a worker as being a JVM.) There are some different ways this could be implemented and we will now discuss some of the different strategies that we can take. * **Heartbeat mechanism** - In this strategy each worker has an open connection to the master through which it continuously (and periodically) sends a heartbeat (some sort of "I'm-alive" event) to the master. The master can then detect if the heartbeat for a specific worker stops and can after a certain time period consider the node to be dead. This is for example the way that "Google's MapReduce implementation":http://www.cs.wisc.edu/~dusseau/Classes/CS739/Writeups/mapreduce.pdf detects worker failure. * **Work timestamp** - Here we are adding a timestamp to each pending work item. The master can then periodically peek into the head of each work queue, read the timestamp and and match it to a predefined timeout interval. If it detects that a work item has timed out then it can consider the worker(s) that is polling from the specific queue to be dead. * **Worker "is-alive-lock"** - This strategy utilizes the cross-JVM thread coordination that _Terracotta_ enables. The first thing that each worker does when it starts up is spawn a thread that takes a worker specific lock:
synchronized(workerLock) {
  while (isRunning) {}; // hold the lock until worker is shut down or dies

}
Then the the worker connects to the master which spawns up a thread that tries to take the exact same lock. Here comes the trick - the master thread will block until the lock is released, something that will only happen if the worker dies:
synchronized(workerLock) { // will block here until worker dies

  ... // worker dead - take proper action

}
* **Terracotta Server notifications** - Since detection of a node failure is such a common problem, an implementation in which the _Terracotta_ clients can subscribe on notifications of node death from the _Terracotta Server_ is underway and will be released in an upcoming version of _Terracotta_. In all of the strategies we need to take proper action when worker failure has been detected. In our case it would (among other things) mean to reroute all non-completed work to another queue. h2. Rewrite the Terracotta config As you perhaps remember from section "Very high volumes of data?", in order to make the implementation more scalable we had to change the _SingleWorkQueue_ class to a _WorkQueueManager_ class and swap the single _LinkedBlockingQueue_ to a _ConcurrentHashMap_ with entries containing a _routing ID_ mapped to a _LinkedBlockingQueue_. When we did that we also changed the name of the field and the name of class holding this field, this is something that needs to be reflected in the _Terracotta_ configuration file:
...
<roots>
  <root>
    <field-name>org.terracotta.datagrid.workmanager.routing.DefaultWorkQueueManager.m_workQueues</field-name>
  </root>
</roots>
...
h2. How to use the POJO Grid? Here is an example of the usage of the _WorkManager_, _WorkQueueManager_, _Router_ and _WorkListener_ abstractions:
// Create the work queue manager with 'String' as routing ID type. The DefaultWorkQueueManager

// is usually sufficient, but you can easily provide your own implementation of the

// WorkQueueManager interface

WorkQueueManager workQueueManager = new DefaultWorkQueueManager();

// create the router - there are a bunch of default routers in the Router interface,

// such as SingleQueueRouter, RoundRobinRouter and LoadBalancingRouter, but

// it is probably here that you need to plug in your custom implementaion

Router router = new Router.LoadBalancingRouter(workQueueManager);

// create the work manager - this implementation is very generic and most likely enough

// for you needs

WorkManager workManager = new RoutingAwareWorkManager(router);

// optionally (null is ok) create a work listener that will get a call back each time

// the status of a WorkEvent (Work) has changed - allows you to for example take proper

// action upon failure and retry etc.

WorkListener workListener = null;

Set workSet = ... // create a set with the work to be done


// loop over the work set

for (Work work: workSet) {
  RoutableWorkItem workItem; // the work item, wrapping the work (result) and holds the status

  try {
    // schedule the work, e.g. add it to the work queue and get the work item in return that can

    // be used to keep track of the pending work

    workItem = workManager.schedule(work, workListener);
  } catch (WorkException e) {
    // handle failure

  }
}
To start up a _Worker_ you simply have to create an instance of the _Worker_, pass in a reference to the _DefaultWorkQueueManager_ and the routing id to use, and finally invoke _start():_
try {
  // create a work queue manager with 'String' as routing ID type

  WorkQueueManager workQueueManager = new DefaultWorkQueueManager();

  // create and start up a Worker implementation - the provided RoutingAwareWorker

  // implementation is usually enough

  // we pass in the work queue manager and the routing ID for this worker

  Worker worker = new RoutingAwareWorker(workQueueManager, "ROUTING_ID_1");
  worker.start();

} catch (WorkException e) {
  // handle failure

}
h2. Don't we need a result queue? There is no need for a result queue, since the _Master_ is holding on to the _WorkItem_ e.g. the pending work (the work that is in progress) including its result. _Terracotta_ maintains Java's pass-by-reference semantics, the regular (local) reference to the _WorkItem_ that the _Master_ holds, will work transparently across the cluster. This means that a _Worker_ can update the exact same _WorkItem_ and the _Master_ would know about it immediately. h2. Enabling Terracotta The client usage would roughly be to start up one _WorkManager_ and N number of _Workers_, each one on a different JVM. Before you start up the _WorkManager_ and _Workers_ you have to enable the _Terracotta_ runtime. There are two ways you can do this: h3. Use Terracotta wrapper script _Terracotta_ ships with a script that should be used as a drop-in replacement to the regular _java_ command. The name of the script is _dso-java_ and can be found in the _bin_ directory in the distribution. To use the recommended _Terracotta_ startup script: * Find the _dso-java_ script in the _dso/bin_ directory. * Replace the regular invocation to _java_ with the invocation to _dso-java_. h3. Use JVM options You can also use additional JVM options for applications inside the web container, perform the following: * Prepend the _Terracotta Boot Jar_ to the _Java bootclasspath_. * Define the path to the _Terracotta_ configuration file. Here is an example (assuming you are running _Windows_):
java -Xbootclasspath/p:<path to terracotta boot jar> \
     -Dtc.config=path/to/your/tc-config.xml \
     -Dtc.install-root=<path to terracotta install dir> \
      ... <your regular options> ...
h2. Run it Now we are almost done, but before you spawn up the _Master_ and the _Workers_ we must first start the _Terracotta Server_. This is done by invoking the _start-tc-server_ script in the _dso/bin_ directory. After you have done that then you can just start up the _Master_ and the _Workers_ (in any order you like). That is all there is to it. h2. Benefits of implementing a POJO-based Data Grid So let's wrap up with a brief discussion of the most immediate benefits of building and using a POJO-based _Data Grid_. Here are some of the benefits that we value: * **Work with POJOs** - You get to work with POJOs, meaning simple and plain Java code. Any POJO can be shared across the cluster and migrated between worker nodes. There is no need for implementing _Serializable_ or any other interface. It is as simple as Java can be. * **Event-driven development** - The _Master/Worker_ (and _CommonJ WorkManager_) pattern leans itself towards event-driven development with no need for explicit threading and guarding. This can simplify the user code immensely compared to the use of explicit thread coordination. * **Simpler development and testing** - We have talked about how _Terracotta_ allows you to develop an application for a single JVM and then simply cluster it in order to run it on multiple JVMs. This means that you can simplify the development and testing of your Data Grid application by doing the development and testing on your workstation, utilizing multiple threads instead of JVMs and then deploy the application onto multiple nodes at a later stage. * **White Box container implementation** - The whole grid implementation is "under your fingers", nothing is really abstracted away from you as a developer. This gives you the freedom to design _Master_, _Worker_, routing algorithms, fail-over schemes etc. the way you need, you can customize everything as much as you want. h2. Resources * Checkout the distribution for the POJO-based Data Grid. It is a Maven 2 based project that includes a sample implementation of a distributed web spider. Read the readme.html for instructions on how to build and run the sample: "http://svn.terracotta.org/svn/forge":http://svn.terracotta.org/svn/forge - (module projects/labs/opendatagrid) * Download Terracotta's JVM-clustering technology - Open Source: "http://terracotta.org":http://terracotta.org * Tutorial that is using the implementation outlined in this article to build a parallel web spider (that is part of the pojo-grid distribution): "http://terracotta.org/confluence/display/orgsite/TutorialTerracottaDsoSpider":http://terracotta.org/confluence/display/orgsite/TutorialTerracottaDsoSpider * Article - Distributed Computing Made Easy: "http://www.theserverside.com/tt/articles/article.tss?l=DistCompute":http://www.theserverside.com/tt/articles/article.tss?l=DistCompute * Article - Stateful Session Clustering: Have Your Availability and Scale It Too: "http://www.devx.com/Java/Article/32603/":http://www.devx.com/Java/Article/32603/ * Documentation for Terracotta's JVM-clustering technology: "http://terracotta.org/confluence/display/orgsite/Documentation":http://terracotta.org/confluence/display/orgsite/Documentation * Author's weblog: "http://jonasboner.com":http://jonasboner.com