Building CQRS/ES web applications
in Elixir using Phoenix

by Ben Smith

Building CQRS/ES web applications
in Elixir using Phoenix

Thursday March 23rd, 2017

Presented by Ben Smith

In this talk you will discover how to build applications following domain-driven design, using the CQRS/ES pattern with Elixir and Phoenix.

I’ll take you through a real-world case study to demonstrate how these principles can be applied.

What you'll learn

How we'll get there

  1. Concepts & building blocks:
    • Elixir and OTP.
    • CQRS/ES: aggregate roots, commands, events, event handlers, process managers, read-only projections.
  2. Implementation:
    • Event store using PostgreSQL for persistance.
    • Phoenix web application.

With plenty of code samples and demos along the way.

Before we begin …

My experience with Elixir

August 2015 Phoenix web framework 1.0 released.
Discovered Elixir, began reading about the language.
January 2016 Purchased Programming Elixir book.
Started building first Elixir & Phoenix web app.
October 2016 Full-time Elixir development.

Erlang

Joe Armstrong — "Erlang The Movie"

Why Erlang?

Erlang is well suited to systems that are:

WhatsApp ♥ Erlang

WhatsApp architecture in 2014 when acquired by Facebook for $19 billion.

We do not have one webserver handling 2 millions sessions.
We have 2 million webservers handling one session each.

Because we have one webserver per user we can easily make the system fault tolerant or scalable.

-- Joe Armstrong

Elixir is a dynamic, functional language designed for building scalable and maintainable applications.

It leverages the Erlang VM, known for running low-latency, distributed and fault-tolerant systems.

Elixir's key features

Elixir takes full advantage of OTP

Behaviours provide a module of common code — the wiring and plumbing — and a list of callbacks you must implement for customisation.

Elixir's promise

Elixir's concurrency model

Elixir code runs inside lightweight threads of execution, called processes.

Processes are also used to hold state.

A process in Elixir is not the same as an operating system process. Instead, it is extremely lightweight in terms of memory and CPU usage.

An Elixir application may have tens, or even hundreds of thousands of processes running concurrently on the same machine.

A single Elixir process is analogous to the event loop in JavaScript.

Actor model

Message passing

Processes are isolated and exchange information via messages:

current_process = self()

# spawn a process to send a message
spawn_link(fn ->
  send(current_process, {:message, "hello world"})
end)

# block current process until the message is received
receive do
  {:message, message} -> IO.puts(message)
end

# ... or flush all messages
flush()

Processes encapsulate state

State is held by a process while it runs an infinite receive loop:

defmodule Counter do
  def start(initial_count) do
    loop(initial_count)
  end

  defp loop(count) do
    new_count = receive do
      :increment -> count + 1
      :decrement -> count - 1
    end

    IO.puts(new_count)

    loop(new_count)
  end
end
iex> counter = spawn(Counter, :start, [0])
#PID<0.184.0>
iex> send(counter, :increment)
1
iex> send(counter, :increment)
2
iex> send(counter, :increment)
3
iex> send(counter, :decrement)
2

OTP behaviours

  1. GenServer
  2. Supervisor
  3. Application

GenServer

Example GenServer

defmodule ExampleServer do
  use GenServer

  def start_link do
    GenServer.start_link(ExampleServer, :ok, [])
  end

  # client API
  def put(server, key, value), do: GenServer.cast(server, {:put, key, value})
  def get(server, key), do: GenServer.call(server, {:get, key})

  # OTP GenServer behaviour callbacks
  def init(:ok) do
    {:ok, %{}}
  end

  def handle_cast({:put, key, value}, state) do
    {:noreply, Map.put(state, key, value)}
  end

  def handle_call({:get, key}, _from, state) do
    {:reply, Map.get(state, key), state}
  end
end

GenServer usage

iex> {:ok, pid} = ExampleServer.start_link()
{:ok, #PID<0.118.0>}

iex> ExampleServer.put(pid, :foo, 1)
:ok

iex> ExampleServer.get(pid, :foo)
1

iex> ExampleServer.get(pid, :foo2)
nil

iex> Process.unlink(pid)
true

iex> Process.exit(pid, :kill)
true

iex> Process.alive?(pid)
false

iex> ExampleServer.get(pid, :foo)
** (exit) exited in: GenServer.call(#PID<0.118.0>, {:get, :foo}, 5000)
    ** (EXIT) no process: the process is not alive ...

OTP behaviours

  1. GenServer
  2. Supervisor
  3. Application

Supervision

A supervisor is responsible for starting, stopping, and monitoring its child processes. The basic idea of a supervisor is that it is to keep its child processes alive by restarting them when necessary.

Supervision is key to Erlang – and Elixir's – "let it crash" philosophy.

Example Supervisor

Define child processes to monitor and restart:

import Supervisor.Spec

children = [
  worker(ExampleServer, [], [name: ExampleServer])
]

A supervisor's children can also include other supervisors.

Starting the supervisor will start its children:

{:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)


Refactor our GenServer to use its module name, not a process identifier.

defmodule ExampleServer do
  use GenServer

  def start_link do
    GenServer.start_link(ExampleServer, :ok, [name: ExampleServer])
  end

  # client API
  def put(key, value), do: GenServer.cast(ExampleServer, {:put, key, value})
  def get(key), do: GenServer.call(ExampleServer, {:get, key})

  # OTP GenServer behaviour callbacks
  def init(:ok) do
    {:ok, %{}}
  end

  def handle_cast({:put, key, value}, state) do
    {:noreply, Map.put(state, key, value)}
  end

  def handle_call({:get, key}, _from, state) do
    {:reply, Map.get(state, key), state}
  end
end

Supervisor in action

We can observe the Erlang VM, including running processes and their state:

iex> :observer.start()
:ok
iex> {:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)
{:ok, #PID<0.116.0>}
iex> ExampleServer.put(:foo, 1)
:ok
iex> ExampleServer.get(:foo)
1
iex> pid = Process.whereis(ExampleServer)
#PID<0.117.0>
iex> Process.exit(pid, :kill)
true
iex> Process.whereis(ExampleServer)
#PID<0.123.0>
iex> ExampleServer.get(:foo)
nil

Supervisor restart strategies

  1. One for one.
  2. One for all.
  3. Rest for one.

Restart strategies: one for one

When a process dies, restart only the process that failed.

Restart strategies: one for all

When a process dies, kill any remaining supervised processes and restart them all.

Used whenever processes under a single supervisor heavily depend on each other to be able to work normally.

Restart strategies: rest for one

When a process dies, kill any processes defined after it and restart them.

Used whenever you have to start processes that depend on each other in a chain

"Let it crash"

This leads to a clean separation of issues. We write code that solves problems and code that fixes problems, but the two are not intertwined.

OTP behaviours

  1. GenServer
  2. Supervisor
  3. Application

Application

def application do
  [
    mod: {PhoenixExample, []},
    applications: [:phoenix, :phoenix_html, :cowboy, :logger, :gettext, :phoenix_ecto, :postgrex]
  ]
end


Elixir's multicore advantage

Let's dive into some Elixir usage

Pattern matching using the match operator

iex> a = 1
1
iex> [a, b, c] = [1, 2, 3]
[1, 2, 3]
iex> a
1
iex> b
2
iex> c
3

Case statements

Using pattern matching within a case statement:

def buy_ticket?(age) do
  case age do
    age when age >= 18 -> true
    _ -> false
  end
end

This example could also be written by pattern matching on function arguments:

def buy_ticket?(age) when age >= 18, do: true
def buy_ticket?(_age), do: false

Pipe operator

The pipe operator |> passes the result of an expression as the first parameter of another expression.

foo(bar(baz(new_function("initial value"))))

Becomes:

"initial value"
|> new_function()
|> baz()
|> bar()
|> foo()



Macros and testing

Elixir’s built-in unit testing framework, ExUnit, takes advantage of macros to provide great error messages when test assertions fail.

defmodule ListTest do
  use ExUnit.Case, async: true

  test "can compare two lists" do
    assert [1, 2, 3] == [1, 3]
  end
end

The async: true option allows tests to run in parallel, using as many CPU cores as possible.

Unit test execution

Running the failing test produces a descriptive error:

$ mix test

1) test can compare two lists (ListTest)
     test/list_test.exs:13
     Assertion with == failed
     code:  [1, 2, 3] == [1, 3]
     left:  [1, 2, 3]
     right: [1, 3]

The equality comparison failed due to differing left and right hand side values.

Using mix to create an Elixir app

$ mix new example --sup --module Example --app example
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/example.ex
* creating lib/example/application.ex
* creating test
* creating test/test_helper.exs
* creating test/example_test.exs

Your Mix project was created successfully.
You can use "mix" to compile it, test it, and more:

    cd example
    mix test

Run "mix help" for more commands.

Let's have a peek inside an Elixir app …

Managing dependencies

defp deps do
  [
    {:ecto, "~> 2.1"},
    {:local_dependency, path: "~/src/local_dependency"},
  ]
end

Fetching dependencies

Once the dependencies have been listed in mix.exs, you run mix deps.get:

$ mix deps.get
Running dependency resolution...
Dependency resolution completed:
  decimal 1.3.1
  ecto 2.1.3
  poolboy 1.5.1
* Getting ecto (Hex package)
  Checking package (https://repo.hex.pm/tarballs/ecto-2.1.3.tar)
  Fetched package

Then compile them using mix deps.compile.

Elixir development

A productive web framework that does not compromise speed and maintainability.

Phoenix vs another web framework

Comparison of technologies used in two real-life web servers.

Command Query Responsibility Segregation
and event sourcing

CQRS/ES

At it’s simplest CQRS is the separation of commands from queries.

The read and write models are different logical models.
They may also be separated physically by using a different database or storage mechanism.

CQRS

Commands

Domain events

Queries

Event sourcing

f(state, event) => state

Event streams

Why choose CQRS/ES?

Benefits of using CQRS

Costs of using CQRS

Recipe for building a CQRS/ES application in Elixir

High-level application lifecycle

An aggregate in domain-driven design

An event-sourced aggregate

Must adhere to these rules:

  1. Each public function must accept a command and return any resultant domain events, or raise an error.
  2. Its internal state may only be modified by applying a domain event to its current state.
  3. Its internal state can be rebuilt from an initial empty state by replaying all domain events in the order they were raised.

Let's build an aggregate in Elixir

defmodule ExampleAggregate do
  # aggregate's state
  defstruct [
    uuid: nil,
    name: nil,
  ]

  # public command API
  def create(%ExampleAggregate{}, uuid, name) do
    %CreatedEvent{
      uuid: uuid,
      name: name,
    }
  end

  # state mutator
  def apply(%ExampleAggregate{} = aggregate, %CreatedEvent{uuid: uuid, name: name}) do
    %ExampleAggregate{aggregate |
      uuid: uuid,
      name: name,
    }
  end
end

A bank account example

This example provides three public API functions:

  1. To open an account: open_account/2.
  2. To deposit money: deposit/2.
  3. To withdraw money: withdraw/2.

A guard clause is used to prevent the account from being opened with an invalid initial balance.
This protects the aggregate from violating the business rule that an account must be opened with a positive balance.

Using the aggregate root

Initial empty account state:

account = %BankAccount{}

Opening the account returns an account opened event:

account_opened = BankAccount.open_account(account, %BankAccount.Commands.OpenAccount{
  account_number: "ACC123",
  initial_balance: 100
})

Mutate the bank account state by applying the opened event:

account = BankAccount.apply(account, account_opened)


Aggregates in Elixir

I've shown examples of aggregates implemented using pure functions.

The function always evaluates the same result value given the same argument value.

Unit testing an aggregate

defmodule BankAccountTest do
  use ExUnit.Case, async: true

  alias BankAccount.Commands.OpenAccount
  alias BankAccount.Events.BankAccountOpened

  describe "opening an account with a valid initial balance"
    test "should be opened" do
      account = %BankAccount{}
      open_account = %OpenAccount{
        account_number: "ACC123",
        initial_balance: 100,
      }

      account_opened = BankAccount.open_account(account, open_account)

      assert account_opened == %BankAccountOpened{
        account_number: "ACC123",
        initial_balance: 100,
      }
    end
  end
end

Unit testing a CQRS/ES application

ExMachina example:

defmodule Factory do
  use ExMachina

  def open_account_factory do
    %BankAccount.Commands.OpenAccount{
      account_number: "ACC123",
      initial_balance: 100,
    }
  end
end

Hosting an aggregate in a GenServer

Executing a command

  1. Rebuild an aggregate's state from its events.
  2. Execute the aggregate function, providing the state and command.
  3. Update the aggregate state by applying the returned event(s).
  4. Append the events to storage.
  5. An error will terminate the process:
    • Caller will receive an {:error, reason} tagged tuple.
    • Aggregate state rebuilt from events in storage on next command.

Command dispatch

defmodule BankRouter do
  use Commanded.Commands.Router

  dispatch OpenAccount,
    to: OpenAccountHandler,
    aggregate: BankAccount,
    identity: :account_number
end

:ok = BankRouter.dispatch(%OpenAccount{account_number: "ACC123", initial_balance: 1_000})

Process managers

Before I forget …

It is worth remembering that domain events are the contracts of our domain model.

They are recorded within the immutable event stream of the aggregate.

A recorded domain event cannot be changed; history cannot be altered.

I'll show you how to migrate, modify, and retire domain events — in effect rewriting history — later.

An Elixir CQRS/ES case study

Let's explore a real-world example of implementing these concepts in a Phoenix-based web app.

Strava

Strava

Segment Challenge

Segment Challenge

Journey to CQRS/ES in Elixir

Building an event store

Event store API

defmodule EventStore do
  @doc """
  Append one or more events to a stream atomically.
  """
  def append_to_stream(stream_uuid, expected_version, events)

  @doc """
  Reads the requested number of events from the given stream,
  in the order in which they were originally written.
  """
  def read_stream_forward(stream_uuid, start_version \\ 0, count \\ 1_000)

  @doc """
  Subscriber will be notified of each batch of events persisted to a single stream.
  """
  def subscribe_to_stream(stream_uuid, subscription_name, subscriber, start_from \\ :origin)

  @doc """
  Subscriber will be notified of every event persisted to any stream.
  """
  def subscribe_to_all_streams(subscription_name, subscriber, start_from \\ :origin)
end

Event store single writer

Architecting an Elixir application

Elixir umbrella application

Let's take a look at the implementation

Challenge aggregate root

Commands & events

Unit & integration testing

Tag individual tests to allow specific test runs:

mix test --only unit
mix test --only integration
mix test --only wip

Use mix test.watch to run tests each time you save a file:

mix test.watch --only " wip"

Stage event handler

Challenge competitor process manager

Read model

Phoenix web framework integration

Deployment

mix edeliver build release --skip-mix-clean
mix edeliver deploy release to production
ssh edeliver@segmentchallenge.com 'sudo /bin/systemctl restart segment_challenge'

Production monitoring

Domain event migration strategy

  1. Multiple versions: RegisterAccountV1, RegisterAccountV2
  2. Upcasting: single event version, upgrade old events on read.
  3. Lazy transformation: upcast the events on read, and persist the modified event.
  4. In place transformation: background job to edit events in the database.
  5. Copy & transfrom: migrate an event store into a new database, altering events as required.

Greg Young has published: Versioning in an Event Sourced System.

Copy & transform

Copy and transformation transforms every event to a new store. In this technique the old event store stays intact, and a new store is created instead.

Event store migration

Remove an event

Uses Elixir's standard Stream module to exclude a particular event:

EventStore.Migrator.migrate(fn stream ->
  Stream.reject(
    stream,
    fn (event_data) -> event_data.event_type == "UnwantedEvent" end
  )
end)

Upgrade an event

Using pattern matching to migrate a specific type of event:

defmodule OriginalEvent, do: defstruct [uuid: nil]
defmodule UpgradedEvent, do: defstruct [uuid: nil, additional: nil]

EventStore.Migrator.migrate(fn stream ->
  Stream.map(
    stream,
    fn (event) ->
      case event.data do
        %OriginalEvent{uuid: uuid} ->
          %EventStore.RecordedEvent{event |
            event_type: "UpgradedEvent",
            data: %UpgradedEvent{uuid: uuid, additional: "upgraded #{uuid}"},
          }
        _ -> event
      end
    end
  )
end)

Aggregate events

defmodule SingleEvent, do: defstruct [uuid: nil, group: nil]
defmodule AggregatedEvent, do: defstruct [uuids: [], group: nil]

# aggregate multiple single events for the same group into one aggregated event
defp aggregate([%{data: %SingleEvent{}}] = events), do: events
defp aggregate([%{data: %SingleEvent{group: group}} = source | _] = events) do
  [
    %EventStore.RecordedEvent{source |
      data: %AggregatedEvent{
        uuids: Enum.map(events, fn event -> event.data.uuid end),
        group: group,
      },
      event_type: "AggregatedEvent",
    },
  ]
end
defp aggregate(events), do: events

EventStore.Migrator.migrate(fn stream ->
  stream
  |> Stream.chunk_by(fn event -> {event.stream_id, event.event_type} end)
  |> Stream.map(fn events -> aggregate(events) end)
  |> Stream.flat_map(fn events -> events end)
end)

Live queries

Lessons learnt

Questions?

Thank you

Find out more about Ben