Jonas Bonér bio photo

Jonas Bonér

Specialist at Large.
Entrepreneur.
Hacker.
Public Speaker.
Powder Skier.
Obsessive Learner.
Jazz Addict.
Wannabee Musician.

Twitter LinkedIn Github

12 Feb 2009

Introduction

There has been some discussions lately about Event Sourcing. For example, Greg Young recently discussed how they were using Event Sourcing and explicit state transitions together with Domain-Driven Design (DDD) to build a highly scalable and loosely coupled system.

So what is Event Sourcing? Martin Fowler wrote an excellent article about some years ago and there is no use repeating it here, so please read (or at least skim) that article before reading further.

What I will do in this article is to show you how you can implement Event Sourcing using asynchronous message-passing based on actors. Actors are generally an excellent paradigm to implement asynchronous event-based systems and they allow you to easily get explicit state transitions working nicely together with an immutable domain model. This gives a concurrent system that scales very well, with the side-effect/feature of Eventual Consistency.

Domain model

I will reuse the example Martin Fowler used in his article but rewrite it using Scala Actors. So without further ado let’s start hacking. Martin’s example implements a simple Ship management system.

First, let’s define the simplistic domain model; Ship, Port and Country.

The Ship class is worth discussing a bit. It is an actor, which means that it is an isolated ‘lightweight process’ with its own state, which is only accessible and modifiable using messages (in our case, events). The Ship actor responds to four different events; set arrival and departure, query for current port and finally reset the state.

class Ship(val name: String, val home: Port) extends Actor {

  def act = loop(home)

  private def loop(current: Port) {
    react {
      case ArrivalEvent(time, port, _) =>
        println(toString + " ARRIVED  at port   " + port + " @ " + time)
        loop(port)

      case DepartureEvent(time, port, _) =>
        println(toString + " DEPARTED from port " + port  + " @ " + time)
        loop(Port.AT_SEA)

      case Reset =>
        println(toString + " has been reset")
        loop(home)

      case CurrentPort =>
        reply(current)
        loop(current)

      case unknown =>
        error("Unknown event: " + unknown)
    }
  }

  override def toString = "Ship(" + name + ")"
}

class Port(val city: String, val country: Country) {
  override def toString = "Port(" + city + ")"
}
object Port {
  val AT_SEA = new Port("AT SEA", Country.AT_SEA)
}

case class Country(val name: String)
object Country {
  val US = new Country("US")
  val CANADA = new Country("CANADA")
  val AT_SEA = new Country("AT_SEA")
}

Note: In this example I have been managing the state in the actors (Ship and EventProcessor) by passing it on in the recursive ‘loop’, using stack-confinement. This is a slick technique but not possible if you need to persist the state in some way, either using something like Terracotta or store it in a database. Then you would have to put the state in private field(s) in the actor, something that will not affect the correctness or performance.

Events

Now let’s define our events, implementing the explicit state transitions DepartureEvent and ArrivalEvent. In Scala these are best defined as ‘case classes’ which supports pattern matching and attribute destructing. These two events encapsulate their state transition in the ‘process’ method. We also define one event for asking the Ship for its current port and one for resetting its state to its “home” port.

sealed abstract case class Event

abstract case class StateTransitionEvent(val occurred: Date)
  extends Event {
  val recorded = new Date
  def process: Unit
}

case class DepartureEvent(val time: Date, val port: Port, val ship: Ship)
  extends StateTransitionEvent(time) {
  override def process = ship ! this
}

case class ArrivalEvent(val time: Date, val port: Port, val ship: Ship)
  extends StateTransitionEvent(time) {
  override def process = ship ! this
}

case object Reset extends Event

case object CurrentPort extends Event

Event processor

Finally, let’s define the event processor. This class is an actor which responds to any event that is a subtype of StateTransitionEvent, e.g. either DepartureEvent or ArrivalEvent. It also holds a history list (‘log’) with all events that it has processed. Something that we will make use of later on.

class EventProcessor extends Actor {
  def act = loop(Nil)

  private def loop(log: List[StateTransitionEvent]) {
    react {
      case event: StateTransitionEvent =>
        event.process
        loop(event :: log)

      case unknown =>
        error("Unknown event: " + unknown)
    }
  }
}

Test run 1

Now we have the basis for our Ship Management Event Sourcing framework. Let’s create some tests to drive the thing. Since each event submission is processed asynchronously we have to interleave them with calls to ‘Thread.sleep(500)’ in order to see what is going on.

class ActorBasedEventSourcingTest {

  private var shipKR: Ship = _
  private var portSFO, portLA, portYYV: Port = _
  private var processor: EventProcessor = _

  def setup = {
    processor = new EventProcessor
    processor.start

    portSFO = new Port("San Francisco", Country.US)
    portLA = new Port("Los Angeles", Country.US)
    portYYV = new Port("Vancouver", Country.CANADA)

    shipKR = new Ship("King Roy", portYYV)
    shipKR.start

    this
  }

  def tearDown = {
    processor.exit
    this
  }

  def arrivalSetsShipsLocation = {
    println("\n===> arrivalSetsShipsLocation")

    processor ! DepartureEvent(new Date(2009, 2, 1), portSFO, shipKR)
    Thread.sleep(500)

    processor ! ArrivalEvent(new Date(2009, 2, 3), portSFO, shipKR)
    Thread.sleep(500)

    assert(portSFO == (shipKR !? CurrentPort))
    this
  }

  def departurePutsShipOutToSea = {
    println("\n===> departurePutsShipOutToSea")

    processor ! DepartureEvent(new Date(2009, 2, 4), portLA, shipKR)
    Thread.sleep(500)

    assert(Port.AT_SEA == (shipKR !? CurrentPort))
    this
  }

  def smallTrip = {
    println("\n===> smallTrip")

    processor ! ArrivalEvent(new Date(2009, 2, 5), portLA, shipKR)
    Thread.sleep(500)

    processor ! DepartureEvent(new Date(2009, 2, 6), portYYV, shipKR)
    Thread.sleep(500)

    processor ! ArrivalEvent(new Date(2009, 2, 8), portYYV, shipKR)
    Thread.sleep(500)

    processor ! DepartureEvent(new Date(2009, 2, 9), portSFO, shipKR)
    Thread.sleep(500)

    processor ! ArrivalEvent(new Date(2009, 2, 11), portSFO, shipKR)
    Thread.sleep(500)

    assert(portSFO == (shipKR !? CurrentPort))
    this
  }
}

(new ActorBasedEventSourcingTest)
  .setup
  .arrivalSetsShipsLocation
  .departurePutsShipOutToSea
  .smallTrip
  .tearDown

Which gives us the following output:

===> arrivalSetsShipsLocation
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Mon Mar 01 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Wed Mar 03 00:00:00 CET 3909

===> departurePutsShipOutToSea
Ship(King Roy) DEPARTED from port Port(Los Angeles) @ Thu Mar 04 00:00:00 CET 3909

===> smallTrip
Ship(King Roy) ARRIVED  at port   Port(Los Angeles) @ Fri Mar 05 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Vancouver) @ Sat Mar 06 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Vancouver) @ Mon Mar 08 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Tue Mar 09 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Thu Mar 11 00:00:00 CET 3909

Pretty nice.

But now, let’s start to take advantage of the event persistence. Let’s implement event replay.

Replay

Implementing replay is actually very simple now when we have an event log. First we define a Replay event.

case object Replay extends Event

Then we need the EventProcessor to respond to this new event by first reversing the order of the event log (since functional lists are concatenated in reverse order) and then for each event invoke ‘process’.

class EventProcessor extends Actor {
  def act = loop(Nil)

  private def loop(log: List[DomainEvent]) {
    react {
       ...

      case Replay =>
        log.reverse.foreach(_.process)
        loop(log)
    }
  }
}

Done deal.

Test run 2

Let’s try it out by adding a new test method to our suite. Here we make use of the Reset event which resets the ship to its initial state before replaying all state transitions.

def resetAndReplayEventLog = {
  println("\n===> resetAndReplayEventLog")

  shipKR ! Reset

  processor ! Replay
  Thread.sleep(500)

  assert(portSFO == (shipKR !? CurrentPort))
  this
}

(new ActorBasedEventSourcingTest)
  .setup
  .arrivalSetsShipsLocation
  .departurePutsShipOutToSea
  .smallTrip
  .resetAndReplayEventLog // new test method
  .tearDown

This yields the following output:

===> arrivalSetsShipsLocation
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Mon Mar 01 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Wed Mar 03 00:00:00 CET 3909

===> departurePutsShipOutToSea
Ship(King Roy) DEPARTED from port Port(Los Angeles) @ Thu Mar 04 00:00:00 CET 3909

===> smallTrip
Ship(King Roy) ARRIVED  at port   Port(Los Angeles) @ Fri Mar 05 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Vancouver) @ Sat Mar 06 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Vancouver) @ Mon Mar 08 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Tue Mar 09 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Thu Mar 11 00:00:00 CET 3909

===> resetAndReplayEventLog
Ship(King Roy) has been reset
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Mon Mar 01 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Wed Mar 03 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Los Angeles) @ Thu Mar 04 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Los Angeles) @ Fri Mar 05 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Vancouver) @ Sat Mar 06 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Vancouver) @ Mon Mar 08 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Tue Mar 09 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Thu Mar 11 00:00:00 CET 3909

Replay up to a specific point in time

Finally, (my last example, I promise) let’s add the possibility of replaying the event log up to a specific date to get a snapshot of the system’s state at a particular point in time.

You know the drill by now, first define a new event; ReplayUpTo, holding the date.

case class ReplayUpTo(date: Date) extends Event

Here the event processor first reverses the log, then it applies a filter to the list which filters out all events that has been created after the date specified and finally run ‘process’ on all events in the resulting filtered list.

class EventProcessor extends Actor {
  def act = loop(Nil)

  private def loop(log: List[DomainEvent]) {
    react {
       ...

      case ReplayUpTo(date) =>
        log.reverse.filter(_.occurred.getTime <= date.getTime).foreach(_.process)
        loop(log)
    }
  }
}

Test run 3

So we add a last test method to our suite, one that replays all events created in earlier tests up to the date ‘2009/2/4’.

def resetAndReplayEventLogUpToDate = {
  println("\n===> resetAndReplayEventLogUpToDate")

  shipKR ! Reset

  processor ! ReplayUpTo(new Date(2009, 2, 4))
  Thread.sleep(500)

  assert(Port.AT_SEA == (shipKR !? CurrentPort))
  this
}

(new EventSourcingWithActorsTest)
  .setup
  .arrivalSetsShipsLocation
  .departurePutsShipOutToSea
  .smallTrip
  .resetAndReplayEventLog
  .resetAndReplayEventLogUpToDate // new test method
  .tearDown

This yield the following output.

===> arrivalSetsShipsLocation
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Mon Mar 01 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Wed Mar 03 00:00:00 CET 3909

===> departurePutsShipOutToSea
Ship(King Roy) DEPARTED from port Port(Los Angeles) @ Thu Mar 04 00:00:00 CET 3909

===> smallTrip
Ship(King Roy) ARRIVED  at port   Port(Los Angeles) @ Fri Mar 05 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Vancouver) @ Sat Mar 06 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Vancouver) @ Mon Mar 08 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Tue Mar 09 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Thu Mar 11 00:00:00 CET 3909

===> resetAndReplayEventLog
Ship(King Roy) has been reset
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Mon Mar 01 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Wed Mar 03 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Los Angeles) @ Thu Mar 04 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Los Angeles) @ Fri Mar 05 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Vancouver) @ Sat Mar 06 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(Vancouver) @ Mon Mar 08 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Tue Mar 09 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Thu Mar 11 00:00:00 CET 3909

===> resetAndReplayEventLogUpToDate
Ship(King Roy) has been reset
Ship(King Roy) DEPARTED from port Port(San Francisco) @ Mon Mar 01 00:00:00 CET 3909
Ship(King Roy) ARRIVED  at port   Port(San Francisco) @ Wed Mar 03 00:00:00 CET 3909
Ship(King Roy) DEPARTED from port Port(Los Angeles) @ Thu Mar 04 00:00:00 CET 3909

That’s all there’s to it. We have only scratched the surface on what can be done with asynchronous Event Sourcing, and as in all these kind of articles, the example is almost too simplistic to fully understand the power and flexibility of the solution. But I hope that you have understood the underlying principle enough to be able to apply it to a real-world enterprise system.