Jonas Bonér bio photo

Jonas Bonér

Present.
Entrepreneur.
Hacker.
Public Speaker.
Powder Skier.
Perpetual Learner.
Jazz Fanatic.
Wannabee Musician.

Twitter LinkedIn Github
In this article I will show you how to implement a distributed version of the CommonJ WorkManager specification using Terracotta for Spring. This article is a variation of an article that I wrote for TheServerSide.com titled Distributed Computing Made Easy, an article that can be found here.

What is CommonJ WorkManager?

CommonJ is a BEA and IBM joint specification that provides a standard for executing concurrent tasks in a JEE environment. It for example has support for the Master/Worker pattern in its WorkManager API. From BEA's documentation about the specification:
"The Work Manager provides a simple API for application-server-supported concurrent execution of work items. This enables J2EE-based applications (including Servlets and EJBs) to schedule work items for concurrent execution, which will provide greater throughput and increased response time. After an application submits work items to a Work Manager for concurrent execution, the application can gather the results. The Work Manager provides common "join" operations, such as waiting for any or all work items to complete. The Work Manager for Application Servers specification provides an application-server-supported alternative to using lower-level threading APIs, which are inappropriate for use in managed environments such as Servlets and EJBs, as well as being too difficult to use for most applications."
What we are going to do is to first implement a the specification as a regular single node multi-threaded application, based on the Master/Worker pattern. We are also going to use the Spring Framework and implement the Master, Worker and Shared Queue entities as three different Spring beans; MyWorkManager, Worker and WorkQueue. We will then use Terracotta for Spring* to transparently and declaratively, turn this implementation into a multi-node, distributed WorkManager.

Master (WorkManager)

MyWorkManager bean implements the CommonJ WorkManager interface which has the API that the user uses to schedule Work and wait for all Work to be completed. The MyWorkManager bean does not have any state, and can therefore be configured as a Prototype in the Spring bean config XML file. Here is how we could implement the work manager bean:
public class MyWorkManager implements WorkManager {

  // The Work Queue bean, is injected by Spring

  private final WorkQueue m_queue;

  public MyWorkManager(WorkQueue queue) {
    m_queue = queue;
  }

  public WorkItem schedule(final Work work) throws WorkException {
    WorkItem workItem = new MyWorkItem(work, null);
    m_queue.addWork(workItem);
    return workItem;
  }

  public WorkItem schedule(Work work, WorkListener listener)
    throws WorkException {
    WorkItem workItem = new MyWorkItem(work, listener);
    m_queue.addWork(workItem); // adds work to the shared queue

    return workItem;
  }

  public boolean waitForAll(Collection workItems, long timeout) {
    long start = System.currentTimeMillis();
    do {
      boolean isAllCompleted = true;
      for (Iterator it = workItems.iterator();
           it.hasNext() && isAllCompleted;) {
        int status = ((WorkItem) it.next()).getStatus();
        isAllCompleted =
            status == WorkEvent.WORK_COMPLETED ||
            status == WorkEvent.WORK_REJECTED;
      }
      if (isAllCompleted) { return true; }
      if (timeout == IMMEDIATE) { return false; }
      if (timeout == INDEFINITE) { continue; }
    } while ((System.currentTimeMillis() - start) < timeout);
    return false;
  }

  public Collection waitForAny(Collection workItems, long timeout) {
    long start = System.currentTimeMillis();
    do {
      synchronized (this) {
        Collection completed = new ArrayList();
        for (Iterator it = workItems.iterator(); it.hasNext();) {
          WorkItem workItem = (WorkItem) it.next();
          if (workItem.getStatus() == WorkEvent.WORK_COMPLETED ||
              workItem.getStatus() == WorkEvent.WORK_REJECTED) {
            completed.add(workItem);
          }
        }
        if (!completed.isEmpty()) { return completed; }
      }
      if (timeout == IMMEDIATE) { return Collections.EMPTY_LIST; }
      if (timeout == INDEFINITE) { continue; }
    } while ((System.currentTimeMillis() - start) < timeout);
    return Collections.EMPTY_LIST;
  }
}

Shared Queue

The MyWorkManager bean schedules work by adding work to the WorkQueue bean, which is a simple wrapper around a java.util.concurrent.BlockingQueue queue. The WorkQueue bean is the bean that has state, since it holds the queue with all the pending Work. We need to have a single instance of this queue that can be available to all workers, and we therefore define it as Singleton in the bean config XML file. The work queue can be implemented like this:
public class WorkQueue {
  private final BlockingQueue m_workQueue;

  public WorkQueue() {
    m_workQueue = new LinkedBlockingQueue();
  }

  public WorkQueue(int capacity) {
    m_workQueue = new LinkedBlockingQueue(capacity);
  }

  public MyWorkItem getWork() throws WorkException {
    try {
      return (MyWorkItem) m_workQueue.take(); // blocks if empty

    } catch (InterruptedException e) {
      throw new WorkException(e);
    }
  }

  public void addWork(WorkItem workItem) throws WorkException {
    try {
      m_workQueue.put(workItem);
    } catch (InterruptedException e) {
      WorkRejectedException we =
          new WorkRejectedException(e.getMessage());
      ((MyWorkItem)workItem).setStatus(WorkEvent.WORK_REJECTED, we);
      throw we;
    }
  }
}

Worker

Finally, we have the Worker bean. This bean uses a thread pool to spawn up N number of worker threads that continuously grabs and executes Work from the WorkQueue. During the processing of the Work, its status flag is maintained (can be one of either Accepted, Started, Completed or Rejected), this is needed in order for the MyWorkManager bean to be able to continuously monitor the status of the Work it has scheduled. The Worker bean does not have any shared state and is configured as a Prototype in the bean config XML file. This is what a worker bean implementation can look like. As you can see we choose to make use of the Executor thread pool implementation in the java.util.concurrent package:
public class Worker {

  private transient final WorkQueue  m_queue;
  private transient final ExecutorService m_threadPool =
      Executors.newCachedThreadPool();
  private volatile boolean m_isRunning  = true;

  public Worker(WorkQueue queue) {
    m_queue = queue;
  }

  public void start() throws WorkException {
    while (m_isRunning) {
      final MyWorkItem workItem = m_queue.getWork();
      m_threadPool.execute(new Runnable() {
        public void run() {
          try {
            Work work = workItem.getResult();
            workItem.setStatus(WorkEvent.WORK_STARTED, null);
            work.run();
            workItem.setStatus(WorkEvent.WORK_COMPLETED, null);
          } catch (Throwable e) {
            workItem.setStatus(
                WorkEvent.WORK_REJECTED,
                new WorkRejectedException(e.getMessage()));
          }
        });
      }
    }
  }
}

Assembly

These three beans can now be wired up by the Spring bean config file:
&lt;beans&gt;
  &lt;!-- workManager is prototype - not shared --&gt;
  &lt;bean id="workManager"
        class="com.jonasboner.commonj.workmanager.MyWorkManager"
        singleton="false"&gt;
    &lt;constructor-arg ref="queue"/&gt;
  &lt;/bean&gt;

   &lt;!-- worker is prototype - not shared --&gt;
  &lt;bean id="worker"
        class="com.jonasboner.commonj.workmanager.Worker"
        singleton="false"&gt;
    &lt;constructor-arg ref="queue"/&gt;
  &lt;/bean&gt;

  &lt;!-- the work queue is singleton - can be made shared by Terracotta --&gt;
  &lt;bean id="queue"
        class="com.jonasboner.commonj.workmanager.WorkQueue"/&gt;

&lt;/beans&gt;
We now have a fully functional local, multi-threaded, implementation of the CommonJ WorkManager specification.

Making the WorkManager distributed

Now comes the hard part right? Well...no. It turns out that in order to turn this implementation into a distributed WorkManager, all we have to do is to create a Terracotta configuration file in which we declare the Spring beans that we want to share across the cluster:
...
&lt;spring&gt;
  &lt;jee-application name="webAppName"&gt;
    &lt;application-contexts&gt;
      &lt;application-context&gt;
        &lt;paths&gt;
          &lt;path&gt;*/work-manager.xml&lt;/path&gt;
        &lt;/paths&gt;
        &lt;beans&gt;
          &lt;bean name="queue"/&gt;
        &lt;/beans&gt;
      &lt;/application-context&gt;
    &lt;/application-contexts&gt;
  &lt;/jee-application&gt;
&lt;/spring&gt;
...
Done! Now we have a fully distributed, multi-JVM CommonJ WorkManager.

Client usage

Using the distributed work manager is now simply a matter of getting the bean from the application context and invoke schedule(..):
ApplicationContext ctx =
    new ClassPathXmlApplicationContext("*/work-manager.xml");

// get the work manager from the application context

WorkManager workManager = (WorkManager) ctx.getBean("workManager");

Set pendingWork = new HashSet();
for (int i = 0; i < nrOfWork; i++) {

  // schedule work

  WorkItem workItem = workManager.schedule(new Work() {
      public void run() {
        ... // do work

      }
  });

  // collect the pending work

  pendingWork.add(workItem);
}

// wait for all work to be completed

workManager.waitForAll(pendingWork, WorkManager.INDEFINITE);
To start up a Worker you simply have to get the Worker bean from the application context and invoke start():
ApplicationContext ctx =
    new ClassPathXmlApplicationContext("*/work-manager.xml");

// get the worker from the application context

Worker worker = (Worker) ctx.getBean("worker");

// starting worker

worker.start();
The usage of the distributed version would roughly be to start up one WorkManager bean and N number of Worker beans, each one on a different JVM. That is all there is to it. Now we have a simple, distributed, reliable, high-performant and scalable CommonJ WorkManager ready for use. Enjoy. * RC 1 of Terracotta for Spring was released some days ago (9/12/2006) and is free for production use for up to two nodes.