Software developer
Event sourcing is the idea that, rather than saving the current state of a system, you save events. The state of the system at any point in time can then be rebuilt by replaying these stored events. You often see event sourcing in conjunction with some other key supporting ideas:
My intention is not to explain all these concepts in detail, but rather to show, with code written in Ruby2, one way that all can come together. I hope to be able to do this in a concise and readable manner. As such, the code is simple by design. Most classes would need further refinements before being suitable for real world usage. I hope that you find this article to be a nice companion to other, more in depth, sources.
This text is divided into two parts. The first part sets up the infrastructure, the building blocks. In the second part I will illustrate how this infrastructure can be used to make a tiny example application for a tiny example domain.
You can download the entire source code from this gist.
Before we can begin for real, we need to do define some basic classes and methods, which the rest of the code in this text depends on:
You will find the implementation here. But I will encourage you not to look at it now, but rather read on and come back for it later. It is not necessary to know this code to understand the rest of this article.
At the root of event sourcing is the event store. The event store holds event streams: One event stream per persisted aggregate. The store has no knowledge of the aggregates themselves apart from their IDs.4
class EventStore < BaseObject
def initialize
@streams = {}
end
def create(id)
raise EventStoreError, "Stream exists for #{id}" if @streams.key? id
@streams[id] = EventStream.new
end
def append(id, *events)
@streams.fetch(id).append(*events)
end
def event_stream_for(id)
@streams[id]&.clone
end
def event_stream_version_for(id)
@streams[id]&.version || 0
end
end
Event streams are append only data structures, holding events.
class EventStream < BaseObject
def initialize
@event_sequence = []
end
def version
@event_sequence.length
end
def append(*events)
@event_sequence.push(*events)
end
def to_a
@event_sequence.clone
end
end
Events are simple value objects.
class Event < ValueObject
end
The event store is accessed through event store repositories, one repository per aggregate type. The repository knows
The reason for the unit of work will be explained in the section on concurrency.
class EventStoreRepository < BaseObject
module InstanceMethods
def find(id)
stream = registry.event_store.event_stream_for(id)
return if stream.nil?
build stream.to_a
end
def unit_of_work(id)
yield UnitOfWork.new(registry.event_store, id)
end
private
def build(stream)
obj = type.new stream.first.to_h
stream[1..-1].each do |event|
message = "apply_" + event.class.name.snake_case
send message.to_sym, obj, event
end
obj
end
end
include InstanceMethods
end
The purpose with the InstanceMethods
module above is to allow users
of this class to choose whether they want to inherit the class or
include it as a mixin. This technique will be used again, and its
usefulness will be demonstrated later.
We need some more auxilary functionality from the event store, so the store we actually use are augmented. I have chosen the decorator pattern for the augmentation. This gives the ability to configure what augmentations we add at runtime. The following figures show the decorators and their runtime configuration.
To prevent the corruption of an event stream from concurrent writes, we use optimistic locking. That is: All changes must be done through a Unit of work which keep track of the expected version of the event stream. The expected version is compared to the actual version before any changes are done.
class EventStoreOptimisticLockDecorator < DelegateClass(EventStore)
def initialize(obj)
super
@locks = {}
end
def create(id)
@locks[id] = Mutex.new
super
end
def append(id, expected_version, *events)
@locks[id].synchronize do
event_stream_version_for(id) == expected_version or
raise EventStoreConcurrencyError
super id, *events
end
end
end
class UnitOfWork < BaseObject
def initialize(event_store, id)
@id = id
@event_store = event_store
@expected_version = event_store.event_stream_version_for(id)
end
def create
@event_store.create @id
end
def append(*events)
@event_store.append @id, @expected_version, *events
end
end
To allow projections (read side data structures) to keep track of the changes done to the system, we publish the events to registered subscribers.
class EventStorePubSubDecorator < DelegateClass(EventStore)
def initialize(obj)
super
@subscribers = []
end
def add_subscriber(subscriber)
@subscribers << subscriber
end
def append(id, *events)
super
publish(*events)
end
def publish(*events)
events.each do |e|
@subscribers.each do |sub|
sub.apply e
end
end
end
end
class EventStoreLoggDecorator < DelegateClass(EventStore)
def append(id, *events)
super
logg "New events: #{events}"
end
end
The public interface for all changes to the system is through commands, that are given to command handlers, who do work on an aggregate.
The handling of a command can result in one of two things: Acceptance or rejection. On acceptance, nothing is returned. On rejection, an error is raised.
Since nothing is returned from an accepted command, the client needs to include an ID even for commands that create an aggregate. This is done by the client creating GUIDs for the IDs.
Here we define the base Command Handler.
class CommandHandler < BaseObject
module InstanceMethods
def handle(command)
process(command)
return
end
def process(command)
message = "process_" + command.class.name.snake_case
send message.to_sym, command
end
end
include InstanceMethods
end
The purpose of the handle/process split above, is to ensure that nothing is ever by accident returned as the result of the command handling. The actual handling is delegated to methods defined in command handler subclasses.
Next is a class that adds logging to CommandHandlers
by
decoration. Logging of commands is most likely an important aspect of
a system, but the event store should not be used for this.
class CommandHandlerLoggDecorator < DelegateClass(CommandHandler)
def initialize(obj)
super obj
end
def handle(command)
logg "Start handling: #{command.inspect}"
super
ensure
logg "Done handling: #{command.class.name}"
end
end
Command objects encodes mutation requests to the system, and have the following characteristics:
Command objects are also good places to validate data comming into the system. I have added some rudimentary validation rules to illustrate this.
module Validations
def required(*values)
values.none?(&:nil?) or
raise ArgumentError
end
def non_blank_string(obj)
return unless obj
obj.is_a?(String) && !obj.strip.empty? or
raise ArgumentError
end
def positive_integer(obj)
return unless obj
obj.is_a?(Integer) && obj > 0 or
raise ArgumentError
end
end
class Command < ValueObject
include Validations
def initialize(*args)
super
validate
end
def validate
raise "Implement in subclass! #{self.class.name}"
end
end
I am not adding any coercions of the data given to the command. I believe this responsibility belongs more appropriately to the command creator.
On the read side you are free to keep things really simple. The idea here is to set up projections that derive the current state from the event streams.
We have two options:
When the first option is good enough, I suggest that you do not use the repositories directly but set up read side versions that forward to the event store repositories. In this way you can enforce the read only nature and you make it easier to change to a real projection at a later stage. To further hide this fact as an implementation detail, I also suggest naming these in the same manner as the real projections.
Here is a base class for fake projections.
class RepositoryProjection < BaseObject
def initialize
@repository = registry.repository_for type
end
def find(id)
@repository.find(id).to_h
end
def apply(*_args); end
private
def type
raise "Implement in subclass! #{self.class.name}"
end
end
And a base class for real projections.
class SubscriberProjection < BaseObject
def initialize
@store = {}
registry.event_store.add_subscriber(self)
end
def find(id)
@store[id]&.clone
end
def apply(event)
handler_name = "when_#{event.class.name.snake_case}".to_sym
send handler_name, event if respond_to?(handler_name)
end
end
The domain model in this article is the super simple domain of releases of recorded music (a.k.a. albums).
Note that the class diagram above is a conceptual diagram. The actual implementation uses CQRS and aggregates, and thus diverges quite a bit.
Let us start with the commands. In this domain we only have commands for creating and updating the aggregates. Also note that we follow here a convention where updates are required to include all attributes (more on this later), and validations for updates and creates are therefore the same. First the commands for releases:
RELEASE_ATTRIBUTES = %I(id title tracks)
class ReleaseCommand < Command
private
def validate
required(*RELEASE_ATTRIBUTES.map {|m| send m})
non_blank_string(title)
end
end
class CreateRelease < ReleaseCommand
attributes *RELEASE_ATTRIBUTES
end
class UpdateRelease < ReleaseCommand
attributes *RELEASE_ATTRIBUTES
end
And then for recordings:
RECORDING_ATTRIBUTES = %I(id title artist duration)
class RecordingCommand < Command
private
def validate
required(*RECORDING_ATTRIBUTES.map {|m| send m})
non_blank_string(title)
non_blank_string(artist)
positive_integer(duration)
end
end
class CreateRecording < RecordingCommand
attributes *RECORDING_ATTRIBUTES
end
class UpdateRecording < RecordingCommand
attributes *RECORDING_ATTRIBUTES
end
Before I show how to handle these commands, I need to take a detour to discuss CRUD.
Even in a richely modeled domain, the need for simple entities that only needs CRUD operations might arise. By using the principle of convention over configuration, this can be handled with a very small amount of code. The code below encodes a convention for CRUD aggregates. In short the convention is:
Here follows a CRUD command handler base class that is capable of handling create and update commands for any aggregate that follows these conventions. 6
class CrudCommandHandler < CommandHandler
module InstanceMethods
private
def validator(obj)
raise "Implement in subclass!"
end
def repository
raise "Implement in subclass!"
end
def type
raise "Implement in subclass!"
end
def process_create(command)
repository.unit_of_work(command.id) do |uow|
obj = type.new(command.to_h)
validator(obj).assert_validity
event = self.class.const_get("#{type}Created").new(command.to_h)
uow.create
uow.append event
end
end
def process_update(command)
repository.unit_of_work(command.id) do |uow|
obj = repository.find command.id
raise ArgumentError if obj.nil?
obj.set_attributes command.to_h
validator(obj).assert_validity
event = self.class.const_get("#{type}Updated").new(command.to_h)
uow.append event
end
end
end
include InstanceMethods
end
Let us use this and implement the rest of the domain for the recording aggregate.
class Recording < Entity
attributes *RECORDING_ATTRIBUTES
end
class RecordingCreated < Event
attributes *RECORDING_ATTRIBUTES
end
class RecordingUpdated < Event
attributes *RECORDING_ATTRIBUTES
end
class RecordingRepository < EventStoreRepository
def type
Recording
end
def apply_recording_updated(recording, event)
recording.set_attributes(event.to_h)
end
end
class RecordingValidator < BaseObject
def initialize(obj)
end
def assert_validity
# Do something here
end
end
class RecordingCommandHandler < CrudCommandHandler
private
def type; Recording; end
def repository
@repository ||= registry.repository_for(Recording)
end
def validator(obj)
RecordingValidator.new(obj)
end
def process_create_recording(command)
process_create(command)
end
def process_update_recording(command)
process_update(command)
end
end
A note on validations: I suggest that:
In the implementation of recordings, we have made separate classes for all the different concerns. This gives great flexibility. But for trivial CRUD aggregates, we can take it a bit further. What I will show here is a way to role all the different concerns into one class, just by including a module.
First the module definition.
module CrudAggregate
module ClassMethods
def repository
self
end
def validator(obj)
obj
end
end
def assert_validity
end
def self.included(othermod)
othermod.extend CommandHandler::InstanceMethods
othermod.extend CrudCommandHandler::InstanceMethods
othermod.extend EventStoreRepository::InstanceMethods
othermod.extend ClassMethods
othermod_name = othermod.name.snake_case
othermod.define_singleton_method("type") { othermod }
othermod.define_singleton_method "process_create_#{othermod_name}" do |command|
process_create command
end
othermod.define_singleton_method "process_update_#{othermod_name}" do |command|
process_update command
end
othermod.define_singleton_method("apply_#{othermod_name}_updated") do |obj, event|
obj.set_attributes(event.to_h)
end
end
end
Finally the InstanceMethods
pattern pays off :-)
Let us now use this to implement the rest of the domain for release aggregates.
class Release < Entity
attributes *RELEASE_ATTRIBUTES
include CrudAggregate
def assert_validity
# Do something here
end
end
class ReleaseCreated < Event
attributes *RELEASE_ATTRIBUTES
end
class ReleaseUpdated < Event
attributes *RELEASE_ATTRIBUTES
end
Remember that I suggested that for the simplest cases we could use the event store repositories as backends for fake projections. I have chosen to show this strategy using Recordings.
class RecordingProjection < RepositoryProjection
def type
Recording
end
end
For releases, I have chosen to maintain the current state using a real projection. This is done as I have described earlier by subscribing to domain events published by the event store.
In this projection we also handle recording events so that we can include all recordings associated with a given release. We also use them to derive an artist for the whole release.7
class ReleaseProjection < SubscriberProjection
def initialize(recordings)
super()
@recordings = recordings
end
def when_release_created(event)
release = build_release_from_event_data event
@store[event.id] = release
end
def when_release_updated(event)
release = build_release_from_event_data event
@store[event.id].merge! release
end
def when_recording_updated(_event)
refresh_all_tracks
end
private
def build_release_from_event_data(event)
release = event.to_h
release[:tracks] = track_id_to_data release.fetch(:tracks)
derive_artist_from_tracks(release)
release
end
def track_id_to_data(track_ids)
track_ids.map { |id| @recordings.find(id).to_h }
end
def refresh_all_tracks
@store.values.each do |r|
r.fetch(:tracks).map! {|track| track.fetch(:id)}
r[:tracks] = track_id_to_data r.fetch(:tracks)
end
end
def derive_artist_from_tracks(release)
artists = release[:tracks].map {|rec| rec[:artist]}.uniq
release[:artist] = artists.length == 1 ? artists.first : "Various artists"
end
end
This strategy allows for all sorts of read optimized projections to be maintained. Here is an example projection that keeps track of the total number of releases and recordings stored by the system.
class TotalsProjection < SubscriberProjection
def initialize
super
@totals = Hash.new(0)
end
def when_recording_created(event)
handle_create_event event
end
def when_release_created(event)
handle_create_event event
end
attr_reader :totals
private
def handle_create_event(event)
@totals[event.class] += 1
end
end
Tying it all together
class Application < BaseObject
def initialize
@recording_id = UUID.generate
@release_id = UUID.generate
initialize_projections
end
def main
puts "LOGG ---------------------------------------------------------"
run_commands
puts
puts "EVENT STORE ------------------------------------------------"
pp registry.event_store
puts
puts "PROJECTIONS ------------------------------------------------"
peek_at_projections
end
private
def initialize_projections
@the_recording_projection = RecordingProjection.new
@the_release_projection = ReleaseProjection.new(@the_recording_projection)
@the_totals_projection = TotalsProjection.new
@projections = [
@the_release_projection,
@the_recording_projection,
@the_totals_projection,
]
end
def peek_at_projections
p @the_release_projection.find @release_id
p @the_recording_projection.find @recording_id
p @the_totals_projection.totals
end
def run_commands
recording_data = {id: @recording_id, title: "Sledge Hammer",
artist: "Peter Gabriel", duration: 313}
run(recording_data, CreateRecording, Recording)
run({id: @release_id, title: "So", tracks: []},
CreateRelease, Release)
run({id: UUID.generate, title: "Shaking The Tree",
tracks: [@recording_id]},
CreateRelease, Release)
run({id: @release_id, title: "So", tracks: [@recording_id]},
UpdateRelease, Release)
run(recording_data.merge({ title: "Sledgehammer" }),
UpdateRecording, Recording)
# Some failing commands, look in log for verification of failure
run({id: "Non-existing ID", title: "Foobar"},
UpdateRecording, Recording)
end
def run(request_data, command_class, aggregate)
logg "Incoming request with data: #{request_data.inspect}"
command_handler = registry.command_handler_for(aggregate)
command = command_class.new(request_data)
command_handler.handle command
rescue StandardError => e
logg "ERROR: Command #{command} failed because of: #{e}"
end
end
DDD patterns that are particularily relevant to event sourcing are:
I have chosen Ruby here since it is the language I feel I can express object oriented code most cleanly in. And I hope that Ruby’s clean and friendly syntax will make it easy to see how these ideas could be implemented in another programming language. ↩
I would use refinements for this in a real project. ↩
Note that the event store I have implemented here holds the streams purely in memory, but I hope that it is easy to imagine how it can be turned into a store that uses files or a proper database as a backend. ↩
The reason we insist on the rule that updates must carry data for the complete aggregate, is that it simplifies the implementation a lot. I feel that supporting patch updates would only add clutter to the code, and only distract from helping you understand the overall picture. ↩
I have left deletes as an excercise for the reader. Hint: We never actually delete anything from the event store. So a delete must be handled by a delete event appended to the event stream. ↩
Again, I ask you to note that I have made an in-memory-only database. And again I hope that it will be easy for you to see how this could be changed to use something like a relational database or a search engine. ↩