Supporting multiple event stores in Commanded using an adapter based approach for Elixir

Commanded v0.10 includes support for Greg Young’s Event Store, in addition to the existing PostgreSQL-based Elixir EventStore. This article describes how an Elixir behaviour and adapter approach was used to support multiple event stores.


  Elixir CQRS/ES Commanded


Announcing the release of Commanded v0.10 with support for using Greg Young’s Event Store.


Commanded is an open-source library you can use to build Elixir applications following the Command Query Responsibility Segregation and event sourcing (CQRS/ES) pattern. The project’s README contains a useful getting started guide that covers its features and I’ve written an article on building a CQRS/ES web application in Elixir using the library.

In previous versions, Commanded used the PostgreSQL-based Elixir EventStore that I built in tandem with it. The EventStore library provided the event persistence, event handler subscriptions, and snapshot storage. Commanded was heavily dependent upon the EventStore library.

In the past few months I had received a few requests to allow Commanded to use Greg Young’s Event Store. This was a great feature request that I happily added to the development roadmap. Unfortunately, it wasn’t a priority for me to build out straight away since I was satisfied with using the PostgreSQL event store.

Two months later a comment was left on the GitHub issue by a user named sharksdontfly. He had forked Commanded and added support for multiple event stores. Demonstrating one of the real benefits of open source projects. I took on the pull request containing his changes to merge into Commanded.

This article describes how an Elixir behaviour and adapter approach was used to support multiple event stores.


Adding support for multiple event stores

The high level approach used was an adapter strategy:

  1. Create an Elixir behaviour to define the contract required for an event store.
  2. Implement a set of unit/integration tests against the behaviour, used to verify a given storage provider adheres to the contract.
  3. Build an adapter module that implements the behaviour for each supported event store as a separate package.
  4. Extend the Commanded configuration to specify the chosen event store adapter.
  5. Replace all existing event store usage with the adapter specified in config.

The end goal was to allow the consumer of Commanded to choose which event store they wanted to use. They would install the adapter package in their Elixir application, configure any specific event store settings, and use it.

For development purposes I wanted each event store adapter to be a separate package. This allows mix dependencies to be cleanly defined and there would be no need to specify optional: true. Having an individual Elixir application, Hex package, Git repository, and GitHub project per adapter allows releases to be made separately and any issues to be tracked against particular event stores.

Create an Elixir behaviour

In Elixir, behaviours provide a way to:

  • define a set of functions that have to be implemented by a module;
  • ensure that a module implements all the functions in that set.

They are similar to an interface in an object oriented language. This is exactly what we need to state, in code, the set of function signatures that an event store adapter module must implement.

I identified all occurrences of the event store used in Commanded. The features were: appending events to a stream; reading a stream forward; subscribing to events from all streams; and recording snapshots.

This determined the public API of the event store that needed to be replicated across additional adapters:

defmodule Commanded.EventStore do
  @moduledoc """
  Defines the behaviour to be implemented by an event store adapter to be used by Commanded.
  """

  @type stream_uuid :: String.t
  @type start_from :: :origin | :current | integer
  @type stream_version :: integer
  @type subscription_name :: String.t
  @type source_uuid :: String.t
  @type snapshot :: Commanded.EventStore.SnapshotData.t
  @type reason :: term

  @doc """
  Append one or more events to a stream atomically.
  """
  @callback append_to_stream(stream_uuid, expected_version :: non_neg_integer, events :: list(Commanded.EventStore.EventData.t)) :: {:ok, stream_version} | {:error, reason}

  @doc """
  Streams events from the given stream, in the order in which they were originally written.
  """
  @callback stream_forward(stream_uuid, start_version :: non_neg_integer, read_batch_size :: non_neg_integer) :: Enumerable.t | {:error, :stream_not_found} | {:error, reason}

  @doc """
  Subscriber will be notified of every event persisted to any stream.
  """
  @callback subscribe_to_all_streams(subscription_name, subscriber :: pid, start_from) :: {:ok, subscription :: pid}
    | {:error, :subscription_already_exists}
    | {:error, reason}

  @doc """
  Acknowledge receipt and successful processing of the given event received from a subscription to an event stream.
  """
  @callback ack_event(pid, Commanded.EventStore.RecordedEvent.t) :: any

  @doc """
  Unsubscribe an existing subscriber from all event notifications.
  """
  @callback unsubscribe_from_all_streams(subscription_name) :: :ok

  @doc """
  Read a snapshot, if available, for a given source.
  """
  @callback read_snapshot(source_uuid) :: {:ok, snapshot} | {:error, :snapshot_not_found}

  @doc """
  Record a snapshot of the data and metadata for a given source
  """
  @callback record_snapshot(snapshot) :: :ok | {:error, reason}

  @doc """
  Delete a previously recorded snapshop for a given source
  """
  @callback delete_snapshot(source_uuid) :: :ok | {:error, reason}
end

Additional event stores can be supported by Commanded by writing an adapter that confirms to this behaviour.

Implement unit/integration tests against the behaviour

With the behaviour module defined, I wanted to write a set of unit and integration tests. These would be used to verify each event store adapter meets the expected behaviour. By defining the tests in Commanded – and including the files in the published Hex package – any adapter can reference them within its own tests.

Here’s an example of a test to append events:

defmodule Commanded.EventStore.Adapter.AppendEventsTest do
  use Commanded.StorageCase
  use Commanded.EventStore

  defmodule BankAccountOpened, do: defstruct [:account_number, :initial_balance]

  describe "append events to a stream" do
    test "should append events" do
      assert {:ok, 2} == @event_store.append_to_stream("stream", 0, build_events(2))
      assert {:ok, 4} == @event_store.append_to_stream("stream", 2, build_events(2))
      assert {:ok, 5} == @event_store.append_to_stream("stream", 4, build_events(1))
    end
  end

  defp build_event(account_number) do
    %Commanded.EventStore.EventData{
      correlation_id: UUID.uuid4,
      event_type: "Elixir.Commanded.EventStore.Adapter.AppendEventsTest.BankAccountOpened",
      data: %BankAccountOpened{account_number: account_number, initial_balance: 1_000},
      metadata: %{"metadata" => "value"},
    }
  end

  defp build_events(count) do
    for account_number <- 1..count, do: build_event(account_number)
  end
end

In the Commanded mix.exs file I included the test/event_store_adapter folder containing the tests:

defp package do
  [
    files: [
      "lib", "mix.exs", "README*", "LICENSE*",
      "test/helpers",
      "test/event_store_adapter",
      "test/example_domain/bank_account", "test/example_domain/money_transfer",
    ],
    # ...
  ]
end

For an example adapter, the test/test_helper.exs file configures itself to be used by Commanded:

ExUnit.start()

# configure this event store adapter for Commanded
Application.put_env(:commanded, :event_store_adapter, Commanded.EventStore.Adapters.EventStore)

The adapter test files from Commanded, located in the deps/commanded folder, are loaded to be executed when the tests are run:

# test/event_store_adapter_test.exs
# execute event store adapter tests from Commanded
Code.require_file "../deps/commanded/test/event_store_adapter/append_events_test.exs", __DIR__
Code.require_file "../deps/commanded/test/event_store_adapter/snapshot_test.exs", __DIR__
Code.require_file "../deps/commanded/test/event_store_adapter/subscription_test.exs", __DIR__

Tests are run using the standard mix command: mix test

This allows the same tests to be used for all adapters, pertinent to the version of Commanded they support.

Build an adapter module

I decided to implement an in-memory GenServer adapter within Commanded. This provides ephemeral storage, so is only used for testing purposes. It is used by both the adapter tests and when testing the Commanded library itself. An in-memory process event store allowed very fast test runs – resetting the event store merely required restarting the process – and that there were no external dependencies required.

PostgreSQL EventStore adapter

The existing Elixir EventStore library, built to support Commanded, had an API that only required a very lightweight adapter (commanded-eventstore-adapter). Mapping data between the structs defined by the EventStore and Commanded libraries. An example of appending events to a stream is shown below:

defmodule Commanded.EventStore.Adapters.EventStore do
  @behaviour Commanded.EventStore

  @spec append_to_stream(String.t, non_neg_integer, list(Commanded.EventData.t)) :: {:ok, non_neg_integer} | {:error, reason :: term}
  def append_to_stream(stream_uuid, expected_version, events) do
    case EventStore.append_to_stream(stream_uuid, expected_version, Enum.map(events, &to_event_data(&1))) do
      :ok -> {:ok, expected_version + length(events)}
      err -> err
    end
  end

  defp to_event_data(%Commanded.EventData{} = event_data) do
    struct(EventStore.EventData, Map.from_struct(event_data))
  end
end

This adapter includes Commanded as a dependency in mix.exs so that it can use the Commanded.EventStore behaviour. I had to specify runtime: false due to the change in Elixir 1.4 where dependencies are included as runtime applications by default. This allows access to the dependency modules, but doesn’t start the dependency at runtime.

defp deps do
  [
    {:commanded, "~> 0.10", runtime: false},
    {:eventstore, "~> 0.9"},
    # ...
  ]
end

Extreme event store adapter

The Extreme Elixir TCP client library is used to connect to Greg Young’s Event Store.

The adapter (commanded-extreme-adapter) required additional work as Extreme did not have support for persistent subscriptions. This is a feature of the Event Store where the server remembers the position of all subscribers to an event stream. Commanded requires a event store subscription with this facility. Event handlers acknowledge receipt and successful processing of each received event so that on restart they resume from the next event following the last one acknowledged.

I submitted a pull request to the Extreme library to add support for persistent subscriptions (#38).

For development and test purposes, Docker is used to host the Event Store using their official image (eventstore/eventstore). In between test runs the Docker image is recreated, using the module below:

defmodule Commanded.EventStore.Adapters.Extreme.ResetStorage do
  @container_name "commanded-tests-eventstore"

  def execute do
    Application.stop(:commanded_extreme_adapter)
    Application.stop(:extreme)

    reset_extreme_storage()

    Application.ensure_all_started(:commanded_extreme_adapter)
  end

  defp reset_extreme_storage do
    {:ok, conn} = Docker.start_link(%{
      baseUrl: "http://localhost:2375",
      ssl_options: [
      	{:certfile, 'docker.crt'},
      	{:keyfile, 'docker.key'},
      ],
    })

    Docker.Container.kill(conn, @container_name)
    Docker.Container.delete(conn, @container_name)
    Docker.Container.create(conn, @container_name, %{
      "Image": "eventstore/eventstore",
      "ExposedPorts": %{
      	"2113/tcp" => %{},
      	"1113/tcp" => %{}
      },
      "PortBindings": %{
      	"1113/tcp": [%{ "HostPort" => "1113" }],
      	"2113/tcp": [%{ "HostPort" => "2113" }]
      },
      "Env": [
      	"EVENTSTORE_DB=/tmp/db",
      	"EVENTSTORE_RUN_PROJECTIONS=All",
      	"EVENTSTORE_START_STANDARD_PROJECTIONS=True"
      ]
    })

    Docker.Container.start(conn, @container_name)

    wait_eventstore_ready()
  end

  defp wait_eventstore_ready do
    headers = ["Accept": "application/vnd.eventstore.atom+json"]
    options = [recv_timeout: 400]

    case HTTPoison.get "http://localhost:2113/streams/somestream", headers, options do
      {:ok, %HTTPoison.Response{status_code: 404}} ->
      	:timer.sleep(1_000)
      	:ok

      _ ->
      	:timer.sleep(1_000)
      	wait_eventstore_ready()
    end
  end
end

This ensures clean test runs where no individual test can impact another.

Docker is also supported by Travis CI.

In the .travis.yml config file you include the docker service and pull the image before the build and tests:

language: elixir

elixir:
  - 1.4.2

otp_release:
  - 19.3

sudo: required

services:
  - docker

before_install:
  - docker pull eventstore/eventstore

Extend the Commanded configuration

I extended the Commanded configuration to support specifying the event store adapter to use:

config :commanded,
  event_store_adapter: Commanded.EventStore.Adapters.EventStore

For a consumer of Commanded, switching between event stores only requires changing this config setting.

Use the event store via an adapter

Finally, an Elixir macro was written to support compile-time lookup of the configured adapter module and provide this as a module attribute constant.

defmodule Commanded.EventStore do
  defmacro __using__(_) do
    adapter = Application.get_env(:commanded, :event_store_adapter, Commanded.EventStore.Adapters.InMemory)

    quote do
      @event_store unquote adapter
    end
  end
end

The macro is used in a consumer module and @event_store provides access to the configured store.

use Commanded.EventStore

events = @event_store.stream_forward(aggregate_uuid)

The final step to allow Commanded to be storage agnostic was to replace all usages of EventStore with the macro and @event_store attribute.

The pull request containing these changes in Commanded is “Support multiple event stores”(#55).

Summary

Commanded v0.10 now supports two event stores:

Additional storage can be supported by writing an adapter following these implementations.

This article has demonstrated how you can add support to your Elixir library for drop-in replacements to third party dependencies.

Please get in touch with any feedback, corrections, suggestions, and improvements you have.