We Are Reactive

How to Build a POJO-based Data Grid using Terracotta

How to Build a POJO-based Data Grid using Terracotta

Introduction

In this article, I will show you step by step how you can build a reusable POJO-based Data Grid with nothing but standard JDK *5 and then make it distributed through the use of the declarative JVM-level clustering technology provided by [Terracotta](http://terracotta.org).

The article is divided into three sections:

Part 1: Data Grids – What’s behind the buzzwords?

What are Data Grids?

A Data Grid is a set of servers that together creates a mainframe-class processing service where data and operations can move seamlessly across the grid in order to optimize the performance and scalability of the computing tasks submitted to the grid. A Data Grid scales through use of Locality of Reference (see the section “How do Data Grids scale?”) and is highly-available through effective use of data duplication. It combines data management with data processing.

What are Data Grids used for?

Applications that could benefit from being run on a Data Grid are usually applications that needs to work on large data sets and/or have a need to parallelize processing of the data in order to get better throughput and performance. Examples are financial risk analysis and other simulations, searching and aggregation on large datasets as well as sales order pipeline processing.

How do Data Grids scale?

One of the main reasons why Data Grids can scale so well is that they can make intelligent choices if it should move data to its processing context or moved the processing context (the operations) to the data. Effective use of the latter means that it can make use of Locality of Reference; this means that data that is being processed on a specific node stays local to that node and will in the ideal case never have to leave that node. Instead, all operations that are working on this specific data set will always be routed to this specific node.

The immediate benefits are that it has the advantage of minimizing the latency and can in the ideal case give unlimited and linear scalability. How well it scales is of course use-case specific, and it can take both effort and skills in partitioning the data and work sets, as well as potential routing algorithms. The ultimate (and simplest) situation is when all work are what we call Embarrassingly Parallel – which means that it has no shared state and can to its working complete isolation. But it is in most cases acceptable to partition the work into work sets, in such a way that the work sets have no shared state. However, the latter solution might need some intelligent routing algorithms, something that we will take a look at later.

How do Data Grids deal with failure?

Data Grids are resilient to failure and down time by effective use of data duplication. Data Grids can be seen as an organic system of cells (nodes) that is designed to not only handle, but expect failure of individual cells (nodes). This is very different from traditional design of distributed systems in which each node is seen as an isolated unit which must always expect the worst and protect itself accordingly. See this page) for a more thorough discussion on the subject.

Part 2: Build a multi-threaded Master/Worker container and cluster it with Terracotta

Master/Worker algorithm

What is the Master/Worker pattern?

The Master/Worker pattern is a pattern that is heavily used in Data Grids and is one of the most well-known and common patterns for parallelizing work. We will now explain the characteristics of the pattern and then go through the possible approaches for a Java implementation.

So, how does it work?

The Master/Worker pattern consists of three logical entities: a Master, a Shared Space and one or more instances of a Worker. The Master initiates the computation by creating a set of tasks, puts them in some shared space and then waits for the tasks to be picked up and completed by the Workers. The shared space is usually some sort of Shared Queue, but it can also be implemented as a Tuple Space (for example in Linda programming environments, such as JavaSpaces, where the pattern is used extensively). One of the advantages of using this pattern is that the algorithm automatically balances the load. This is possible due to the simple fact that, the work set is shared, and the workers continue to pull work from the set until there is no more work to be done. The algorithm usually has good scalability as long as the number of tasks, by far exceeds the number of workers and if the tasks take a fairly similar amount of time to complete.

Possible approaches in Java

Let’s now look at the different alternatives we have for implementing this in Java. There might be more ways of doing it, but we will focus the discussion on three different alternatives, each one with a higher abstraction level.

Using Java’s threading primitives

The most hard-core approach is to use the concurrency and threading primitives that we have in the Java Language Specification (JLS), e.g. wait/notify and the synchronized and volatile keywords. The benefits are that everything is really “under your fingers”, meaning that you could customize the solution without limitations. However, this is also its main problem, since it is both a very hard and tedious to implement this yourself, and will most likely be even worse to maintain. These low-level abstractions is not something that you want to work with on a day-to-day basis. We need to raise the abstractions level above the core primitives in the Java Memory Model and that is exactly what the data structure abstractions in the java.util.concurrent library in JDK *5 does for us.

Using the java.util.concurrent abstractions

Given that using the low-level abstractions in JLS is both tedious and hard to use, the concurrency abstractions in JDK *5 was both a very welcome and natural addition to the Java libraries. It is a very rich API that provides everything from semaphores and barriers to implementations of the Java Collections data structure interfaces highly tuned for concurrent access. It also provides an ExecutorService, which is mainly a thread pool that provides direct support for the Master/Worker pattern. This is very powerful, since you’re basically getting support from the Master/Worker pattern in a single abstraction.

It is possible to cluster the ExecutorService using Terracotta, you can read about that exercise in this article. Even though this approach would be sufficient in many situations and use-cases it has some problems:

Using the CommonJ WorkManager specification

Spawning and coordinating threads is something that have been prohibited by the EJB specification. However, there has naturally been (and still is) a common need for coordinating work in JEE (something that was partly, but not completely solved in a very verbose way with Message-Driven Beans (MDB)). This need was the main motivation why IBM and BEA decided to do a joint specification that solves this problem by providing a standardized way of executing concurrent tasks in a JEE environment. The specification is called CommonJ and has support for the Master/Worker pattern in its WorkManager API.

From BEA’s documentation about the specification:

“The Work Manager provides a simple API for application-server-supported concurrent execution of work items. This enables J2EE-based applications (including Servlets and EJBs) to schedule work items for concurrent execution, which will provide greater throughput and increased response time. After an application submits work items to a Work Manager for concurrent execution, the application can gather the results. The Work Manager provides common “join” operations, such as waiting for any or all work items to complete. The Work Manager for Application Servers specification provides an application-server-supported alternative to using lower-level threading APIs, which are inappropriate for use in managed environments such as Servlets and EJBs, as well as being too difficult to use for most applications."

What is interesting is that specification not only defines an API for submitting work and getting the result back, but it also provides functionality for tracking work status in that it allows you to register listener that will receive callback events whenever the state of the work has been changed. This makes it possible to for example not only detect work failure but also the reason why it failed, which gives us a possibility of restarting the work.

Each of the three approaches we have looked at so far have been built upon the previous one and has gradually raised the abstraction level. But even more importantly minimized and simplified the code that we as users have to write and maintain.

The most natural choice is to base our implementation on the CommonJ WorkManager specification. It is a simple and minimalistic specification and seems to provide everything that we need in order to build a reliable Master/Worker container.

Here are the interfaces in the specification:

public interface Work extends Runnable { }

public interface WorkManager {
  WorkItem schedule(Work work);
  WorkItem schedule(Work work, WorkListener listener);
  boolean waitForAll(Collection workItems, long timeout);
  Collection waitForAny(Collection workItems, long timeout);
}

public interface WorkItem {
  Work getResult();
  int getStatus();
}

public interface WorkListener {
  void workAccepted(WorkEvent we);
  void workRejected(WorkEvent we);
  void workStarted(WorkEvent we);
  void workCompleted(WorkEvent we);
}

public interface WorkEvent {
  int WORKACCEPTED  = 1;
  int WORKREJECTED  = 2;
  int WORKSTARTED   = 3;
  int WORKCOMPLETED = 4;
  public int getType();
  public WorkItem getWorkItem();
  public WorkException getException();
}

The Plan

The plan is that we will first implement the specification as a regular multi-threaded application using the java.util.concurrent abstractions (in particular the ExecutorService and the LinkedBlockingQueue classes) and then use Terracotta to declaratively (and transparently), turn it into a multi-JVM, distributed implementation.

Creating the Master (WorkManager)

We start with creating the SingleQueueWorkManager, which serves as our Master. It implements the CommonJ WorkManager interface which provides the API that the user uses to schedule the Work and wait for the Work to be completed. It has a reference to the work queue that is shared between the Master and the Workers, in this case represented by the SingleWorkQueue abstraction.

Here is how we could implement the work manager:

public class SingleQueueWorkManager implements WorkManager {

private final SingleWorkQueue m_queue;

  public SingleQueueWorkManager(final SingleWorkQueue queue) {
    m_queue = queue;
  }

  public WorkItem schedule(final Work work) throws WorkException {
    DefaultWorkItem workItem = new DefaultWorkItem(work, null);
    m_queue.put(workItem);
    return workItem;
  }

  public WorkItem schedule(final Work work, final WorkListener listener)
      throws WorkException {
    DefaultWorkItem workItem = new DefaultWorkItem(work, listener);
    m_queue.put(workItem);
    return workItem;
  }

  public boolean waitForAll(Collection workItems, long timeout) {
    long start = System.currentTimeMillis();
    do {
      boolean isAllCompleted = true;
      for (Iterator it = workItems.iterator();
           it.hasNext() && isAllCompleted;) {
        int status = ((WorkItem) it.next()).getStatus();
        isAllCompleted =
            status == WorkEvent.WORKCOMPLETED ](](
            status == WorkEvent.WORKREJECTED;
      }
      if (isAllCompleted) { return true; }
      if (timeout == IMMEDIATE) { return false; }
      if (timeout == INDEFINITE) { continue; }
    } while ((System.currentTimeMillis() - start) < timeout);
    return false;
  }

  public Collection waitForAny(Collection workItems, long timeout) {
    long start = System.currentTimeMillis();
    do {
      synchronized (this) {
        Collection completed = new ArrayList();
        for (Iterator it = workItems.iterator(); it.hasNext();) {
          WorkItem workItem = (WorkItem) it.next();
          if (workItem.getStatus() == WorkEvent.WORKCOMPLETED ](](
              workItem.getStatus() == WorkEvent.WORKREJECTED) {
            completed.add(workItem);
          }
        }
        if (!completed.isEmpty()) { return completed; }
      }
      if (timeout == IMMEDIATE) { return Collections.EMPTYLIST; }
      if (timeout == INDEFINITE) { continue; }
    } while ((System.currentTimeMillis() - start) < timeout);
    return Collections.EMPTYLIST;
  }
}

Creating the Shared Queue

The SingleQueueWorkManager schedules work by adding work to the SingleWorkQueue. The SingleWorkQueue is the artifact that has state that needs to be shared between the Master and the Workers, since it holds the queue with all the pending Work. We need to have a single instance of this queue that can be available to both the Master and all its Workers.

The work queue can be implemented like this:

public class SingleWorkQueue {

  // Terracotta Shared Root
  private final BlockingQueue m_workQueue =
      new LinkedBlockingQueue();

  public void put(final DefaultWorkItem workItem) throws WorkException {
    try {
      mworkQueue.put(workItem); // blocks if queue is full
    } catch (InterruptedException e) {
      e.printStackTrace();
      WorkRejectedException we = new WorkRejectedException(e.getMessage());
      workItem.setStatus(WorkEvent.WORKREJECTED, we);
      Thread.currentThread().interrupt();
      return we;            }
  }

  public DefaultWorkItem peek() throws WorkException {
    return m_workQueue.peek(); // returns null if queue is empty
  }

  public DefaultWorkItem take() throws WorkException {
    try {
      return m_workQueue.take(); // blocks if queue is empty
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new WorkException(e);
    }
  }
}

As you can see, it is a simple wrapper around a java.util.concurrent.BlockingQueue queue and has three methods; take(), put() and peek() which simply delegates to the BlockingQueue but also adds error handling in that it sets the status for the Work in case of failure.

Creating the Worker

Finally, we have the Worker, in our case represented by the SingleQueueWorker abstraction. This class uses a thread pool to spawn up N number of worker threads that continuously grabs and executes Work from the SingleWorkQueue. During the processing of the Work, the status flag in the wrapping WorkItem is maintained (can be one of either ACCEPTED, STARTED, COMPLETED or REJECTED). This is needed in order for the SingleQueueWorkManager to be able to continuously monitor the status of the Work it has scheduled. 

This is what a Worker implementation can look like. As you can see we choose to make use of the ExecutorService thread pool implementation in the java.util.concurrent package:

public class SingleQueueWorker implements Worker {

  protected final ExecutorService mthreadPool = Executors.newCachedThreadPool();
  protected final SingleWorkQueue mqueue;
  protected volatile boolean m_isRunning = true;

  public SingleQueueWorker(final SingleWorkQueue queue) {
    m_queue = queue;
  }

  public void start() throws WorkException {
    while (misRunning) {
      final DefaultWorkItem workItem = mqueue.take();
      final Work work = workItem.getResult();
      mthreadPool.execute(new Runnable() {
        public void run() {
          try {
            workItem.setStatus(WorkEvent.WORKSTARTED, null);
            work.run();
            workItem.setStatus(WorkEvent.WORKCOMPLETED, null);
          } catch (Throwable e) {
            workItem.setStatus(
                WorkEvent.WORKREJECTED, new WorkRejectedException(e));
          }
        }
      });
    }
  }

  public void stop() {
    misRunning = false;
    mthreadPool.shutdown();
  }
}

What about my Work?

Now we have the three main artifacts in place; the Master, the Worker and the Shared Queue. But what about this Work abstraction and which role does the DefaultWorkItem play?

Starting with the Work abstraction. It is an interface in the CommonJ WorkManager specification but is an interface that we cannot implement generically in the WorkManager “container” we are building now, but should be implemented by the user of this container since the implementation of the actual work that is supposed to be done is of course use-case specific.

The DefaultWorkItem is an implementation of the WorkItem interface in the specification. It’s purpose is simply to wrap a Work instance and provide additional information, such as status and an optional WorkListener and can look something like this:

public class DefaultWorkItem implements WorkItem {

private volatile int mstatus;
  private final Work mwork;
  private final WorkListener m_workListener;

  public DefaultWorkItem(final Work work, final WorkListener workListener) {
    mwork = work;
    mstatus = WorkEvent.WORKACCEPTED;
    mworkListener = workListener;
  }

  public Work getResult() {
    return m_work;
  }

  public synchronized void setStatus(
      final int status, final WorkException exception) {
    mstatus = status;
    if (mworkListener != null) {
      switch (status) {
        case WorkEvent.WORKACCEPTED:
          mworkListener.workAccepted(
              new DefaultWorkEvent(WorkEvent.WORKACCEPTED, this, exception));
          break;
        case WorkEvent.WORKREJECTED:
          mworkListener.workRejected(
              new DefaultWorkEvent(WorkEvent.WORKREJECTED, this, exception));
          break;
        case WorkEvent.WORKSTARTED:
          mworkListener.workStarted(
              new DefaultWorkEvent(WorkEvent.WORKSTARTED, this, exception));
          break;
        case WorkEvent.WORKCOMPLETED:
          mworkListener.workCompleted(
              new DefaultWorkEvent(WorkEvent.WORKCOMPLETED, this, exception));
          break;
      }
    }
  }

  public synchronized int getStatus() {
    return m_status;
  }
  ... // remaining methods omitted (toString() and compareTo())
}

A WorkListener is a listener that the user can register in order to track the status of the work and can upon reception of a state-changed-event take proper action (like retry upon failure etc.)

That’s it, now we now have a fully functional single-JVM, multi-threaded, implementation of the CommonJ WorkManager specification. Even thought this could be useful as is, there is no way we can scale it out with the needs of the application, since we are bound by the computing power in the single box we are using. Plus, it is not topic for the article. So, let’s make it a bit more interesting.

Introducing Terracotta

Terracotta is a product that delivers JVM-level clustering as a runtime infrastructure service. It is Open Source and available under a Mozilla-based license.

Terracotta uses bytecode instrumentation to adapt the target application at class load time. In this phase it extends the application in order to ensure that the semantics of the Java Language Specification (JLS) are correctly maintained across the cluster, including object references, thread coordination, garbage collection etc.

Another important thing to mention is that Terracotta does not use Java Serialization, which means that any POJO can be shared across the cluster. What this also means is that Terracotta is not sending the whole object graph for the POJO state to all nodes but breaks down the graph into pure data and is only sending the actual “delta” over the wire, meaning the actual changes – the data that is “stale” on the other node(s).

Terracotta is using an architecture known as hub-and-spoke, which means that it has one central L2 server and N number of L1 clients. (The L1 client is the Terracotta client JARs running inside the target JVM, while the L2 server is the central Terracotta server. L1 and L2 refers to first and second level caches.)

This might seem strange, since most clustering solutions on the market today are using peer-to-peer, but as we will see, hub-and-spoke has some advantages and makes it possible to do some very interesting optimizations. The server plays two different roles:

One way of looking at Terracotta is to see it as Network Attached Memory=* Network Attached Memory (NAM) is similar to Network Attached Storage (NAS) in the sense that JVM-level (heap-level) replication is making NAM’s presence transparent just like NAS can be transparent behind a file I/O API. Getting NAM to perform and scale is similar to any I/O platform; read-ahead buffering, read/write locking optimizations etc.

Even though it is in clustering, meaning scalability and high-availability, of Web and enterprise applications, that Terracotta can bring its most immediate value, it is really a platform for solving generic distributed computing and shared memory problems in plain Java code. This is something that makes it applicable to a wide range problem domains, for example building a POJO-based Data Grid.

Let’s cluster it using Terracotta!

I know what you are thinking:
> “Clustering, hmmm… Now comes the hard part right?”

Well…no.

It turns out that in order to cluster our current WorkManager implementation, all we have to do is to create a Terracotta configuration file in which we define three things:

...
<roots>
  <root>
    <field-name>
      org.terracotta.datagrid.workmanager.singlequeue.SingleWorkQueue.m_workQueue
    </field-name>
  </root>
</roots>
<instrumented-classes>
  <include>
    <class-expression>
      org.terracotta.datagrid.workmanager..*
    </class-expression>
    <honor-transient>true</honor-transient>
  </include>
</instrumented-classes>

<locks>
  <autolock>
    <method-expression>* ...*(..)</method-expression>
    <lock-level>write</lock-level>
  </autolock>
</locks>
...

Done!

Now we have a distributed/multi-JVM CommonJ WorkManager ready for use. But in order to call it a POJO-based Data Grid, we need extend it a bit and address some challenges that are likely to arise if we were to deploy this implementation into production.

Part 3: Getting serious – Handling real-world challenges

Now we have learned the basics of Data Grids, the Master/Worker pattern and what the CommonJ WorkManager specification is all about. We also walked you through how to implement a distributed CommonJ WorkManager by first creating a single-JVM implementation that we then cluster into a distributed multi-JVM implementation with Terracotta.

However, it was a fairly simple and in some sense naïve implementation. This was a good exercise in terms of education and understanding, but in order to use the implementation in the real world we need to know how to address some of the challenges is that might come up. Now we will discuss some of these challenges and how we can extend the initial implementation to address them.

The challenges that we will look at, one by one, are how to handle:

Dealing with very high volumes of data

Our current implementation has one single queue that is shared by the master(s) and the all workers. This usually gives acceptable performance and scalability when used with a moderate work load. However, if we need to deal with very high volumes of data then it is likely that we will bottleneck on the single queue. So how can we address this in the most simple fashion?

The perhaps simplest solution is to create one queue per worker, and have the master do some more or less intelligent load-balancing. If we are able to do a good partition of the work and data, that we discussed in the previous section (“Scaling Data Grids”), then this solution is one that will:

If we take a look at the current code, what needs to be changed is to first change the SingleWorkQueue class to a WorkQueueManager class and swap the single LinkedBlockingQueue to a ConcurrentHashMap with entries containing a routing ID mapped to a LinkedBlockingQueue:

Swap the wrapped LinkedBlockingQueue:

public class SingleWorkQueue {
  BlockingQueue<WorkItem> m_workQueue = new LinkedBlockingQueue<WorkItem>();
}

To a ConcurrentHashMap of _LinkedBlockingQueue_’s :

public class DefaultWorkQueueManager<ID> implements WorkQueueManager {
  Map<ID , BlockingQueue<WorkItem>> m_workQueues =
      new ConcurrentHashMap<ID, BlockingQueue<WorkItem>>();
}

We also need to change the WorkManager implementation and rewrite the schedule(..) methods to use a Router abstraction and not put the work directly into the queue:

Change the schedule(..) method from:

public class DefaultWorkManager implements WorkManager {

  public WorkItem schedule(final Work work) {
    WorkItem workItem = new DefaultWorkItem(work, null);
    m_queue.put(workItem);
    return workItem;
  }
  ...
}

To redirect to the Router abstraction (more on the Router below):

public class RoutingAwareWorkManager<ID> implements WorkManager {

  public WorkItem schedule(final Work work) {
    return m_router.route(work);
  }
  ...
}

In the last code block we could see that we have introduced a new abstraction; the Router, which leads us to the topic of how to deal with different routing schemes.

Dealing with different routing schemes

By splitting up the single working queue into multiple queues each with a unique routing ID we are opening up for the possibility of providing different routing schemes that can be customized to address specific use cases.

As in the previous section, we have to make some changes to the initial single queue implementation. First we need to add a routing id to the WorkItem abstraction, we do that by adding the generic type ID and let the WorkItem implement the Routable interface:

public class RoutableWorkItem<ID> extends DefaultWorkItem implements Routable<ID> {

  protected ID m_routingID;

  public ID getRoutingID() {
    return m_routingID;
  }
  ...
}

As we saw in the previous section, we also introduce a new abstraction called Router:


public interface Router {
RoutableWorkItem route(Work work);
RoutableWorkItem route(Work work, WorkListener listener);
RoutableWorkItem route(RoutableWorkItem WorkItem);
}

This abstraction can be used to implement various load-balancing algorithms, such as for example:

In our Router implementation we have three default algorithms; round-robin, load-balancing and single queue. You can find them as static inner classes in the Router interface.

Here is an example of the load-balancing router implementation that takes an array of the routing IDs that are registered and always sends the next pending work to the shortest queue:

public class LoadBalancingRouter<ID> implements Router<ID> {
    
  private final WorkQueueManager<ID> m_queueManager;

  private static class WorkQueueInfo<ID> {
    public ID routingID;
    public BlockingQueue<RoutableWorkItem<ID>> workQueue;
    public int queueLength = Integer.MAX_VALUE;
  }

  public LoadBalancingRouter(final WorkQueueManager<ID> queueManager, final ID[] routingIDs) {
    m_queueManager = queueManager;
    // create all queues upfront
    for (int i = 0; i < routingIDs.length; i++) {
      m_queueManager.getOrCreateQueueFor(routingIDs[i]);        
    }
  }
    
  public RoutableWorkItem<ID> route(final Work work) {
    return route(work, null);
  }

  public RoutableWorkItem<ID> route(final Work work, final WorkListener listener) {
    WorkQueueLength<ID> shortestQueue = getShortestWorkQueue();
    RoutableWorkItem<ID> workItem = new RoutableWorkItem<ID>(work, listener, shortestQueue.routingID);
    try {
      shortestQueue.workQueue.put(workItem);
    } catch (InterruptedException e) {
      workItem.setStatus(WorkEvent.WORK_REJECTED, new WorkException(e));
      Thread.currentThread().interrupt();
    }
    return workItem;
  }

  public RoutableWorkItem<ID> route(final RoutableWorkItem<ID> workItem) {
    WorkQueueLength<ID> shortestQueue = getShortestWorkQueue();
    synchronized (workItem) {
      workItem.setRoutingID(shortestQueue.routingID);        
    }
    try {
      shortestQueue.workQueue.put(workItem);
    } catch (InterruptedException e) {
      workItem.setStatus(WorkEvent.WORK_REJECTED, new WorkException(e));
      Thread.currentThread().interrupt();
    }
    return workItem;
  }

  private WorkQueueLength<ID> getShortestWorkQueue() {
    WorkQueueLength<ID> shortestQueue = new WorkQueueLength<ID>();
    
    int queueLengthForShortestQueue = Integer.MAX_VALUE;
    ID routingIDForShortestQueue = null;
    Map<ID, BlockingQueue<RoutableWorkItem<ID>>> queues = m_queueManager.getQueues();
    for (Map.Entry<ID, BlockingQueue<RoutableWorkItem<ID>>> entry: queues.entrySet()) {
      ID routingID = entry.getKey();
      BlockingQueue<RoutableWorkItem<ID>> queue = entry.getValue();
      int queueSize = queue.size();
      if (queueSize <= queueLengthForShortestQueue) {
        queueLengthForShortestQueue = queueSize;
        routingIDForShortestQueue = routingID;
      } 
    }
    shortestQueue.workQueue = m_queueManager.getOrCreateQueueFor(routingIDForShortestQueue);         
    shortestQueue.routingID = routingIDForShortestQueue;
    return shortestQueue;
  }
}

Dealing with work failure

As we discussed earlier in this article the CommonJ WorkManager specification provides APIs for event-based failure reporting and tracking of work status. Each Work instance is wrapped in a WorkItem instance which provides status information. It also gives us the possibility of defining an optional WorkListener through which we will get callback events whenever the status of the work has been changed.

The WorkListener interface looks like this:

public interface WorkListener {
  void workAccepted(WorkEvent we);
  void workRejected(WorkEvent we);
  void workStarted(WorkEvent we);
  void workCompleted(WorkEvent we);
}

As you can see, we can implement callbacks methods that subscribes to events triggered by work being accepted, rejected, started and completed. In this particular case we are mainly interested in doing something when receiving the work rejected event. In order to do that to we simply need to create a implementation of the WorkListener interface and add some code in the workRejected method:

public class RetryingWorkListener implemets WorkListener {

  public void workRejected(WorkEvent we) {
    Expection cause = we.getException();
    WorkItem wi = we.getWorkItem();
    Work work = wi.getResult(); // the rejected work

    ... // reroute the work onto queue X
  }

  public void workAccepted(WorkEvent event) {}
  public void workCompleted(WorkEvent event) {}
  public void workStarted(WorkEvent event) {}
}

Dealing with ordering of work

In some situations it might be important to define and maintain ordering of the work. This doesn’t seem to be a problem if we have a single instance of the master, but imagine scaling out the master; then it immediately gets worse.

So, how can we maintain ordering of the work? Since the internal implementation is based on POJOs and everything is open and customizable, it is actually very easy to implement support for ordering. All we need to do is to swap our map of LinkedBlockingQueue(s) to a map of PriorityBlockingQueue (s).

Then you can let your Work implement Comparable and create a custom Comparator that you pass it into the constructor of the PriorityBlockingQueue:

Comparator c = new Comparator<RoutableWorkItem<ID>>() {
    public int compare(
        RoutableWorkItem<ID> workItem1,
        RoutableWorkItem<ID> workItem2)
      Comparable work1 =
         (Comparable)workItem*getResult();
      Comparable work2 =
         (Comparable)workItem2.getResult();
      return work*compareTo(work2);
    }};

If you need to maintain ordering of the result then you have to do something similar with the references to the WorkItem(s) that you get when invoking schedule(..) on the WorkManager instance.

Dealing with worker failure

In a reliable distributed system it is important to be able to detect if a worker goes down. (In order to simplify the discussion, let’s define a worker as being a JVM.) There are some different ways this could be implemented and we will now discuss some of the different strategies that we can take.

synchronized(workerLock) { 
  while (isRunning) {}; // hold the lock until worker is shut down or dies 
}

Then the the worker connects to the master which spawns up a thread that tries to take the exact same lock. Here comes the trick – the master thread will block until the lock is released, something that will only happen if the worker dies:


synchronized(workerLock) { // will block here until worker dies
… // worker dead – take proper action
}

In all of the strategies we need to take proper action when worker failure has been detected. In our case it would (among other things) mean to reroute all non-completed work to another queue.

Rewrite the Terracotta config

As you perhaps remember from section “Very high volumes of data?”, in order to make the implementation more scalable we had to change the SingleWorkQueue class to a WorkQueueManager class and swap the single LinkedBlockingQueue to a ConcurrentHashMap with entries containing a routing ID mapped to a LinkedBlockingQueue. When we did that we also changed the name of the field and the name of class holding this field, this is something that needs to be reflected in the Terracotta configuration file:

...
<roots>
  <root>
    <field-name>org.terracotta.datagrid.workmanager.routing.DefaultWorkQueueManager.m_workQueues</field-name>
  </root>
</roots>
...

How to use the POJO Grid?

Here is an example of the usage of the WorkManager, WorkQueueManager, Router and WorkListener abstractions:

// Create the work queue manager with 'String' as routing ID type. The DefaultWorkQueueManager 
// is usually sufficient, but you can easily provide your own implementation of the 
// WorkQueueManager interface
WorkQueueManager workQueueManager = new DefaultWorkQueueManager();

// create the router - there are a bunch of default routers in the Router interface, 
// such as SingleQueueRouter, RoundRobinRouter and LoadBalancingRouter, but
// it is probably here that you need to plug in your custom implementaion 
Router router = new Router.LoadBalancingRouter(workQueueManager);   
    
// create the work manager - this implementation is very generic and most likely enough
// for you needs
WorkManager workManager = new RoutingAwareWorkManager(router);

// optionally (null is ok) create a work listener that will get a call back each time 
// the status of a WorkEvent (Work) has changed - allows you to for example take proper 
// action upon failure and retry etc. 
WorkListener workListener = null;

Set workSet = ... // create a set with the work to be done

// loop over the work set
for (Work work: workSet) {
  RoutableWorkItem workItem; // the work item, wrapping the work (result) and holds the status 
  try {
    // schedule the work, e.g. add it to the work queue and get the work item in return that can
    // be used to keep track of the pending work
    workItem = workManager.schedule(work, workListener); 
  } catch (WorkException e) {
    // handle failure
  }
}

To start up a Worker you simply have to create an instance of the Worker, pass in a reference to the DefaultWorkQueueManager and the routing id to use, and finally invoke start():

try {
  // create a work queue manager with 'String' as routing ID type
  WorkQueueManager workQueueManager = new DefaultWorkQueueManager();
     
  // create and start up a Worker implementation - the provided RoutingAwareWorker 
  // implementation is usually enough
  // we pass in the work queue manager and the routing ID for this worker
  Worker worker = new RoutingAwareWorker(workQueueManager, "ROUTING_ID_1");
  worker.start();
  
} catch (WorkException e) {
  // handle failure
}

Don’t we need a result queue?

There is no need for a result queue, since the Master is holding on to the WorkItem e.g. the pending work (the work that is in progress) including its result. Terracotta maintains Java’s pass-by-reference semantics, the regular (local) reference to the WorkItem that the Master holds, will work transparently across the cluster. This means that a Worker can update the exact same WorkItem and the Master would know about it immediately.

Enabling Terracotta

The client usage would roughly be to start up one WorkManager and N number of Workers, each one on a different JVM. Before you start up the WorkManager and Workers you have to enable the Terracotta runtime.

There are two ways you can do this:

Use Terracotta wrapper script

Terracotta ships with a script that should be used as a drop-in replacement to the regular java command. The name of the script is dso-java and can be found in the bin directory in the distribution. To use the recommended Terracotta startup script:

Use JVM options

You can also use additional JVM options for applications inside the web container, perform the following:

Here is an example (assuming you are running Windows):

java -Xbootclasspath/p:<path to terracotta boot jar> \
     -Dtc.config=path/to/your/tc-config.xml \
     -Dtc.install-root=<path to terracotta install dir> \
      ... <your regular options> ...

Run it

Now we are almost done, but before you spawn up the Master and the Workers we must first start the Terracotta Server. This is done by invoking the start-tc-server script in the dso/bin directory. After you have done that then you can just start up the Master and the Workers (in any order you like).

That is all there is to it.

Benefits of implementing a POJO-based Data Grid

So let’s wrap up with a brief discussion of the most immediate benefits of building and using a POJO-based Data Grid. Here are some of the benefits that we value:

Resources

Jonas Bonér 29 January 2007
blog comments powered by Disqus