Building a CQRS/ES web application in Elixir using Phoenix
Case study describing how Segment Challenge was built following a Command Query Responsibility Segregation and event sourcing pattern.
Elixir Phoenix CQRS/ES
Background
I’ve been interested in Command Query Responsibility Segregation and event sourcing since hearing Greg Young talk on the subject in early 2010. During the past seven years I’ve built an open-source Ruby CQRS library (rcqrs); worked professionally on .NET applications following the pattern; and more recently built an Event Store (eventstore) and CQRS library (commanded) in Elixir.
Building applications following domain-driven design and using CQRS feels really natural with the Elixir – and Erlang – actor model. Aggregate roots fit well within Elixir processes, which are driven by immutable messages through their own message mailboxes, allowing them to run concurrently and in isolation.
The web application I built to implement these ideas in Elixir was Segment Challenge. This helped me develop a resilient and robust CQRS/ES application.
Article content
- Background
- Command Query Responsibility Segregation (CQRS)
- Event sourcing
- Building a CQRS/ES application in Elixir
- Domain model
- Commands (write model)
- Querying (read model)
- Testing
- Deployment
- Conclusion
- Want to discover more CQRS/ES in Elixir?
About Segment Challenge
Compete against your friends, teammates and fellow club riders — ride a different segment each month
If you’re a keen cyclist or runner then you’ll know about Strava. It’s the social network for athletes, who record their rides and runs, and upload them to the site.
Strava users create segments from sections of their routes. As an example, a segment will cover a climb up a hill; starting at the bottom and finishing at the top. Each segment gets its own leaderboard. This displays ranked attempts by each athlete who has cycled or run through it. The fastest man is King of the Mountain (KOM), and fastest woman is Queen of the Mountain (QOM). Athletes can compare themselves against other Strava users who cycle or run along the same routes.
Segment Challenge allows an athlete to create a competition for a cycling club and its members. A different Strava segment is used each month. Points are accumulated based on each athlete’s position at the end of the stage. The site uses Strava’s API to fetch segment attempts by the club’s members. It ranks their attempts and tallies their points at the end of each stage, replacing the tedium of manually tracking this information in a spreadsheet.
The site is entirely self-service. Any registered Strava user can create and host a challenge for a cycling club they are a member of. It was deployed at the end of 2016 and is now hosting active challenges for five local clubs. I’ll be promoting to clubs to host their own challenges this year. It’s a fantastic way for them to encourage their members to go out and ride throughout the year, helping to bring out their competitive spirit.
Command Query Responsibility Segregation (CQRS)
At it’s simplest CQRS is the separation of commands from queries. Commands are used to mutate state in a write model. Queries are used to retrieve a value from a read model.
-
Commands are used to instruct an application to do something. They are named in the imperative: register account; transfer funds; mark fraudulent activity.
-
Domain events indicate something of importance has occurred within a domain model. They are named in the past tense: account registered; funds transferred; fraudulent activity detected.
The read and write models are different logical models. They may also be separated physically by using different database or storage mechanisms.
The read model is optimised for querying, using whatever technology is most appropriate: relational database; in-memory store; NoSQL database; full-text search index. Domain events from the write model are used to update the read model.
A specialised time series data storage is used for event sourcing the write model.
Event sourcing
Application state changes are modelled as domain events. They are persisted in order – as a logical stream – for each aggregate. An aggregate’s current state is built by replaying its domain events.
A typical event sourcing example is an ecommerce shopping cart. In a CRUD system the cart’s current state would be recorded. The cart contains two items. Using event sourcing, the events that took the cart from an empty state to its current are recorded. Item added to cart, item removed from cart, item added to cart.
The event stream is the canonical source of truth. It is a perfect audit log. All other state in the system may be rebuilt from these events. You can rebuild the read model by replaying every event from the beginning of time.
Benefits & costs of using CQRS
Domain events describe your system activity over time using a rich, domain-specific language. They are an immutable source of truth for the system. They support temporal queries.
A separate logical read model allows optimised and highly specialised query models to be built. You can scale the read and write models independently. The processing of commands and queries is asymmetrical. So you can dedicate the appropriate number of servers to each side as needed.
Events and their schema provide the ideal integration point for other systems. They allow migration of read-only data between persistence technologies by replaying and projecting all events.
Unfortunately events also provide a history of your poor design decisions. Events are immutable. It’s an alternative – and less common – approach to building applications than basic CRUD. It demands a richer understanding of the domain being modelled. CQRS adds risky complexity. Eventual consistency.
Building a CQRS/ES application in Elixir
To build the application and website we require:
- A domain model containing our aggregates, commands, and domain events.
- Hosting of an aggregate root and a way to send it commands.
- An event store to persist the domain events.
- Read model store for querying.
- Event handlers to build and update the read model.
- A web front-end UI to display read model data and dispatch commands to the write model.
Segment Challenge uses the following libraries to help fulfil these requirements.
Write model event store
EventStore is an Elixir library using PostgreSQL as the underlying storage engine. It provides an API to append events to, and read events from, a logical event stream, and to subscribe to events.
Subscriptions to an individual stream, or all event streams, allows handlers to be notified of persisted events. A subscription will guarantee at least once delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped.
Aggregate host & command dispatch
Commanded provides the building blocks for creating CQRS applications in Elixir. It has support for command registration and dispatch; hosting and delegation to aggregate roots; event handling; and long running process managers.
Read model store
Ecto and a PostgreSQL database is used to build the read model for querying.
Web front-end
The Phoenix Framework is used for the web front-end. I’ve also implemented the forms using Elm. A functional language for building webapps that compiles to JavaScript.
Using an umbrella application
For Segment Challenge I created an Elixir umbrella application.
mix new segment_challenge --module SegmentChallenge --umbrella
It contains the following apps.
authorisation
- Policies to authorise command dispatch.challenges
- Core domain model, command router, process managers, read model projections, queries, and periodic tasks.commands
- Modules (structs) for each command (e.g.SegmentChallenge.Commands.ApproveChallenge
).events
- Modules (structs) for each domain event (e.g.SegmentChallenge.Events.ChallengeApproved
).infrastructure
- Serialization and command middleware.projections
- Ecto repository and database migrations to build the read model database schema.web
- Phoenix web front-end containing a router, controllers, plugs, templates, views, and static assets.
cd segment_challenge
cd apps
mix new authorisation --module SegmentChallenge.Authorisation
mix new challenges --module SegmentChallenge.Challenges --sup
mix new commands --module SegmentChallenge.Commands
mix new events --module SegmentChallenge.Events
mix new infrastructure --module SegmentChallenge.Infrastructure
mix new projections --module SegmentChallenge.Projections --sup
mix phoenix.new web --app web --module SegmentChallenge.Web --no-brunch --no-ecto
The challenges
app is an ideal candidate to be further split up by logical area.
- Athletes
- Challenges
- Clubs
- Leaderboards
- Stages
An application’s top-level structure should inform you of it’s intent (e.g. challenges, leaderboards), not the delivery mechanism or technology it uses (e.g. projections, queries, tasks). Uncle Bob describes this ideal architecture in his “Architecture the Lost Years” talk.
Domain model
In domain-driven design, the domain model is a conceptual model of the core business domain in an application containing behaviour and data. It includes the aggregates, commands, domain events, and process managers that comprise the business logic.
An event sourced domain model
An aggregate root must conform to the following behaviour to implement the event sourcing pattern.
- Each public function must accept a command and return any resultant domain events, or an error.
- Its internal state may only be modified by applying a domain event to its current state.
- Its internal state can be rebuilt from an initial empty state by replaying all domain events in the order they were created.
Using Elixir for the domain model
Building an event sourced aggregate in Elixir requires defining a module containing: its state; command functions; and state mutator functions. I use a struct for the aggregate root’s state.
Aggregates can be defined without external dependencies. By using event sourcing they have no persistence concerns. Relationships between aggregate roots are by identity only. Orchestration of aggregate roots is handled by process managers, who respond to events from one aggregate root and dispatch commands to another.
Example: Challenge aggregate root
The challenge aggregate root is used by Segment Challenge to track each hosted challenge.
Here’s a snippet of the Challenge
module that implements the aggregate root. You can see how the public command functions (e.g. create_challenge/2
) accept the challenge state (a %Challenge{}
struct) and a command (e.g. %CreateChallenge{}
struct). They return zero, one, or many domain events in response.
An aggregate root must protect itself against commands that would cause an invariant to be broken. As an example, attempting to start a challenge that has not been approved returns an error tagged tuple: {:error, :challenge_not_approved}
. Pattern matching is used to validate the state of the aggregate. A finite state machine can be used to formalise the allowed state changes within an aggregate root.
Every domain event returned by the aggregate must have a corresponding apply/2
function. Its arguments are the aggregate root state and domain event. It is used to mutate the aggregate’s state.
These functions are also used when the aggregate is rebuilt from an empty state by replaying its events. Apply functions must never fail. You cannot reject an event once it has occurred.
Commands & Events
In Segment Challenge I implemented each command and domain event in its own module, with the modules being located in the relevant commands
or events
app. You could combine the two into a single app (e.g. messages
).
I use Elixir structs to define the fields, providing compile-time checks and default values.
Example: Start a challenge command
Starting a challenge is used to transition a hosted challenge into an active state. This command is dispatched from a periodic task, scheduled using Quantum. Each challenge begins at midnight. The scheduled task runs each day to locate challenges ready to start and dispatches this command for each challenge.
Vex is used to provide basic command validation. These are simple presence, formatting, and data type validation rules. Business rule validation belongs within the aggregate.
Commands are validated before being passed to the aggregate root using a command validation middleware.
Example: Challenge started event
The challenge aggregate root returns a ChallengeStarted
event in response to the start command when in a valid state to begin.
Events returned by an aggregate root are serialised to JSON using the Poison library and persisted to the Event Store. Persisted events are published out to interested subscribers: process managers; read model projections; and event handlers.
Process managers
A process manager is responsible for coordinating one or more aggregate roots. It handles events and may dispatch commands in response. Each process manager has state used to track which aggregate roots are being orchestrated. They are vital for inter-aggregate communication, coordination, and long-running business processes.
A saga can be implemented using a process manager. As an alternative to using a distributed transaction for managing a long-running business process. Each step of the business process has a defined compensating action. When the business process encounters an error condition and is unable to continue, it can execute the compensating actions for the steps that have already completed. This undoes the work completed so far to maintain the consistency of the system.
Typically, you would use a process manager to route messages between aggregates within a bounded context. You would use a saga to manage a long-running business process that spans multiple bounded contexts.
Example: Include stages in a challenge
In Segment Challenge, a challenge and stage are separate aggregate roots. I use a process manager to allow a challenge to track which stages are included.
Commanded provides the building block for defining a process manager. It uses an interested?/1
routing function to indicate which events the process manager receives. The response is used to route the event to an existing process manager instance, or create a new one.
In this example the process manager is started by a ChallengeCreated
event. It uses the challenge_uuid
to identify an instance of the process.
When a StageCreated
event is received, an IncludeStageInChallenge
command is sent to the corresponding challenge aggregate root. All other events are ignored by this process manager.
The state of a process manager instance is modified following the aggregate root approach. An apply/2
function must exist for each handled domain event. The process manager’s state is mutated and returned.
This is a simple process manager example. Segment Challenge contains more complex process managers following the same principles. Examples include tracking club members to include them in challenges and active stages hosted by the club, and applying scores based on stage rank at the end of the stage to update the overall challenge leaderboards.
Supervision
Each process manager must be supervised to ensure it starts with the application and restarts on failure. Commanded provides the Commanded.ProcessManagers.ProcessRouter
module to host a process manager.
Commanded takes care of creating a subscription to the event store for each process manager. It serialises a snapshot of an instance’s state to the event store after every handled event to ensure its state can be rehydrated on restart.
Commands (write model)
Once the core domain model is built, it’s time to provide an external interface to allow commands to be sent in.
The Commanded library provides aggregate root hosting, command registration, and dispatch. Every command has exactly one registered handler. It can be sent to a command handler module, or directly to the target aggregate root. Attempting to dispatch an unregistered command results in an exception.
A command router is used to configure which aggregate root or command handler responds to a command. This is analogous to routing in the Phoenix web framework. However, it sends commands to handlers rather than HTTP requests to controllers.
During command dispatch, an Elixir GenServer process is started to host the aggregate root instance. It will fetch the aggregate’s event stream from the event store, and rebuild its state. Any returned domain events are appended to the event stream. The aggregate root host process remains alive, so subsequent commands routed to the same instance will not require rebuilding its state from the event store.
Command routing and dispatch
Segment Challenge defines a SegmentChallenge.Challenges.Router
module that uses the Commanded.Commands.Router
macro. Here I register every command available within the application.
Example: Router
The snippet below shows the commands relating to the challenge aggregate root.
These commands are sent to the aggregate via the ChallengeCommandHandler
module.
Middleware
The router allows configuration of middleware. All dispatched commands pass through the middleware chain, in the order defined. A middleware can choose to halt execution. This provides the integration point for cross-cutting concerns, including command auditing, logging, and validation. These are concerns applicable to all commands.
Command handler to aggregate root
A registered command handler module receives the target aggregate root state and the dispatched command. This allows additional processing to be done before delegating to the aggregate root.
Example: Challenge command handler
The example below shows how a unique URL slug is created, using the challenge name, and included in the command.
Example: Command dispatch
A command is dispatched using the configured router module.
Command dispatching web controller
In Segment Challenge, all commands are sent to a single Phoenix controller. The Phoenix web router is configured to accept JSON requests posted to /api/commands
which are sent to controller API.CommandController
.
Example: Phoenix command controller
The router contains a single public dispatch
function which:
- Builds and populates the command struct defined by the
command
parameter using the ExConstructor library. - Authorises the current user can dispatch the command using the Canada library.
- Dispatches the command.
- Returns an appropriate HTTP response code, depending upon the outcome of the command dispatch.
Command validation errors are returned to the client as JSON. The view CommandView
deals with formatting the data. By default Phoenix uses Poison to serialize to JSON.
Command validation
All dispatched commands are validated before being passed onto the target aggregate root. I use the following middleware which verifies the simple validation rules defined in a command using Vex.
Middleware is registered in the command routing module using the Commanded.Commands.Router
macro.
Validation failures are returned to the command dispatcher and may be shown to the end user. The example Phoenix command controller demonstrates how these errors are handled.
Command authorisation
For Segment Challenge, authentication is provided by Strava using the OAuth2 authentication protocol. The Strava library provides a strategy to generate the relevant Strava login URL and handle the authentication response.
Canada is used to implement authorisation and define permission rules.
An authenticated user is used to authorise command dispatch.
Example: Authorisation module
To use Canada, I implemented the Canada.Can
protocol for each command dispatched from the web front-end. Any unconfigured commands were disallowed.
The above Canada.Can
protocol implementation simply delegates to the appropriate policy module (e.g. ChallengePolicy
) containing the permission rules.
Elixir pattern matching provides a convenient way of defining rules. A can?
function without matches provides the default disallow response.
Example: Challenge policy module
In the snippet below, the host challenge command uses the read model projection to enforce the rules. The user must be the original creator of the challenge and it must be in a pending state.
Querying (read model)
Reporting and querying the state of an application is handled by building a read model. I use the Ecto library and a dedicated read store PostgreSQL database in Segment Challenge.
Ecto provides a domain specific language for writing queries and interacting with databases in Elixir. It includes a mix
command line tool to create database schema migrations and execute them. This is used to migrate the development and production databases.
The read model is optimised for querying. Data is duplicated and denormalised as required. Queries with table joins are infrequent.
Example: Ecto repo
The read store uses a single Ecto repository to execute queries against the database.
To create and migrate the database.
mix ecto.create -r SegmentChallenge.Projections.Repo
mix ecto.migrate -r SegmentChallenge.Projections.Repo
Projections
All read models are populated using projections.
A projection is an event handler that receives every persisted event from the event store. It executes queries against the database to add, update, and delete data. Event handlers run concurrently and are eventually consistent.
Example: Club projection
Each projection includes at least one Ecto schema definition and a Projector
event handler module. The projector handles all events relevant to the read model it builds. In this example, this includes any event related to clubs.
I use Ecto.Multi
to execute queries to insert, update, and delete data.
The SegmentChallenge.Projections.Projection
macro ensures each event is only processed once. Event handlers may receive an event more than once. Each projection records its last seen event within the same database transaction as the data manipulation. Already seen events can then be ignored; the transaction containing the duplicate change gets rolled back.
Example: Projection supervision
All read model projectors are supervised to ensure they start with the application and restart on failure.
Queries
The read model is optimised for the queries required by the application. The read model projections contain denormalised data so every query can be fulfilled using predicates on indexed columns and without complex joins. This provides performant read querying.
Example: Challenges created by athlete query
Here the Ecto.Query
domain specific query language is used to build a query to retrieve the challenges created by a given athlete. This uses an index on the created_by_athlete_uuid
column.
The query is constructed and executed by the Ecto Repo
module.
Rebuild the read model
In event sourced systems the event stream is the canonical source of truth. This allows the read model to be entirely rebuilt, replaced, and significantly altered, assuming that the domain events contain the relevant information. You can repurpose the read model as future needs dictate.
To rebuild, you define the new schema, modify the affected projections, and replay all events from the beginning of time through the projectors. This approach allows you to migrate the data from one storage mechanism (e.g. a relational database) to another (e.g. document database).
A rebuild and data migration can be done while the application is online. You configure the new projection and it will replay from the first event. Once caught up, you switch to use the new read model for reads from the application.
Testing
Applications built using CQRS/ES are great for unit and integration testing.
Unit testing an aggregate root
Within the domain model, commands are the input and domain events are the output. Unit tests verify the expected events are produced.
I use ExMachina to define fixture data for tests in Segment Challenge.
This is used by calling the build
function included by importing the factory module into a test.
Example: Challenge aggregate root unit tests
Two unit test examples are shown in this aggregate root test. The second test – excluding a competitor – shows how an aggregate root’s state must be mutated by applying any returned events.
I use Elixir’s with
keyword to chain the command functions. The evolve/2
function is a unit test helper that mutates the aggregate root state. By calling the apply/2
function for each of the given events. Starting from an empty state (e.g. %Challenge{}
).
I apply a :unit
tag to all unit tests. This allows me to execute the very fast unit test suite on its own.
mix test --only unit
Integration testing the application
For integration tests I follow the same approach: use commands as input and verify the expected domain events are published.
In Segment Challenge I execute the full application during integration test, including the event store, aggregate root and process manager hosting, read store projections, and external HTTP requests. The event and read stores are both reset between each test.
Example: Host challenge integration test
Here’s an example integration test to create a challenge. The command dispatch is hidden in the :create_challenge
function call during setup, leaving the test to assert the expected domain event is received.
The assert_receive_event
function is provided by the Commanded.Assertions.EventAssertion
module. It creates a new subscription to the event store. For each received event matching the given module (e.g. ChallengeCreated
), it attempts to verify using the provided assertion function. It will wait until the expected event is received, within a limited timeout period then fail.
Example: Create challenge test use case
I’ve created test use case modules; reusable functions that cover an end-user scenario. The test above creates a challenge with the CreateChallengeUseCase
module. ExUnit supports chaining function calls using setup
inside a describe block. Each function receives a context map. It may append new values to it on return, allowing functions to build upon work done in those previous.
This example test use case makes an external HTTP call to the Strava API. I use ExVCR to record the initial response to disk, then replay the cached response for subsequent test runs. This guarantees my test works end-to-end, yet allows a short test feedback loop as the external request is only made when a cached request is not present.
- Read more: HTTP unit tests using ExVCR
The keyword list returned by the function is merged into the context
map. This is made available to the subsequent setup functions and the test. Allowing key-based access (e.g. context[:challenge_uuid]
).
Example: Challenge projection integration test
Integration tests for read model projections follow a similar pattern. I reuse the use cases for succinct tests containing assertions.
The read model is eventually consistent. So I use the following Wait
helper module, to allow the projection to be built within a timeout period before the test fails.
Integration and projection tests are tagged with :integration
and :projection
. This allows me to execute these slower test suites on their own.
mix test --only integration
mix test --only projection
Deployment
Segment Challenge uses Distillery to create the Elixir release. Build and deployment to the production host is handled by edeliver.
Deployment and administration of a production CQRS/ES application deserves it’s own full article. Subscribe to the mailing list below to be notified when new and relevant content is published.
Conclusion
Applying the Command Query Responsibility Segregation and event sourcing pattern to an Elixir and Phoenix web application is an unorthodox approach. I hope this case study has demonstrated why – and briefly how – you might do so.
The eventstore and commanded Elixir libraries provide the building blocks to help you. Event store relies upon PostgreSQL for its persistence. Commanded uses OTP behaviours and supervision to provide concurrency, reliability, and resiliency.
Please get in touch with feedback, ideas, requests for further articles, and criticism. Subscribe to the mailing list below if you’d like to stay informed.
Want to discover more CQRS/ES in Elixir?
Subscribe to the “Learn how to build CQRS/ES applications using Elixir” mailing list to receive guidance, delivered to your inbox.