Dealing with eventual consistency in a CQRS/ES application

Eventual consistency can cause headaches when building a CQRS/ES application. You’ve successfully dispatched a command, but has the read model been updated with the latest changes? Commanded v0.14 supports command dispatch with strong consistency guarantees. This article describes how to use it to alleviate eventual consistency woes.


  Elixir CQRS/ES Commanded


Announcing the release of Commanded v0.14.0-rc.0 with support for strongly consistent event handlers.

Commanded is an open-source library you can use to build Elixir applications following the Command Query Responsibility Segregation and event sourcing (CQRS/ES) pattern. The project’s README contains a useful getting started guide that covers its features and I’ve written an article on building a CQRS/ES web application in Elixir using the library.


What’s the problem with eventual consistency?

At its most basic, a CQRS application involves building two models rather than one: a write model and a read model. The write model is strongly consistent; you are guaranteed that the data is up-to-date when handling a command. However, the read model is often implemented using eventual consistency. This allows scalability, you can have multiple event handlers updating individual read models concurrently, and supports rebuilding read models independently. The cost is that read models are eventually consistent.

Most web apps use the POST/Redirect/GET pattern that is affected by this problem. As an example, after publishing a blog post the author should be redirected to view their new post. In an eventually consistent system the post may not exist in the read model immediately after publishing, resulting in a redirect to a 404 error page for the author. Not a good scenario, potentially leading the author to believe they have lost their finely crafted prose.

Dealing with eventual consistency

You can use one of the following strategies to workaround the issue of stale read models. The examples are taken from Building Conduit - Applying CQRS/ES to an Elixir and Phoenix web app.

“Fake it, until you make it”

In certain situations it’s possible to construct the resultant read model data from information contained within the command being dispatched. We can return the data, built from the command, back to the user once the the command has been successfully dispatched.

Here’s an example where an author is followed by another person. We simply set the following field to true and return the amended read model back to caller:

defmodule Conduit.Blog do
  @doc """
  Follow an author
  """
  def follow_author(%Author{uuid: author_uuid} = author, %Author{uuid: follower_uuid}) do
    with :ok <- Router.dispatch(FollowAuthor.new(author_uuid: author_uuid, follower_uuid: follower_uuid)) do
      {:ok, %Author{author | following: true}}
    else
      reply -> reply
    end
  end
end

This approach only works when the command contains all the data needed. It also requires you to write additional, duplicated code each time you need to construct the read model returned from the input command.

Poll the read model

After dispatching a command we can poll the read model until it has been updated with our expected changes.

In this example we publish a new article and then continually query the database for the created article to exist:

defmodule Conduit.Blog do
  @doc """
  Publishes an article by the given author.
  """
  def publish_article(%Author{} = author, attrs \\ %{}) do
    article_uuid = UUID.uuid4()
    publish_article =
      attrs
      |> PublishArticle.new()
      |> PublishArticle.assign_uuid(article_uuid)
      |> PublishArticle.assign_author(author)
      |> PublishArticle.generate_url_slug()

    with :ok <- Router.dispatch(publish_article) do
      Wait.until(fn -> Repo.get(Article, uuid) end)
    else
      reply -> reply
    end
  end
end

This approach is relatively straightforward, the downside is the additional latency caused by the polling interval. Poll too frequently and you add extra load to the database; too infrequently and you add more latency to the request.

Use pub/sub for read model changes

You can use pub/sub in your read model to notify subscribers after processing each event. This allows you to dispatch a command and then wait until the created domain events have all been handled by the read model projector.

Commanded v0.13.0 included support for returning the aggregate’s stream version after successfully dispatching a command to use this strategy: {:ok, version} = Router.dispatch(command, include_aggregate_version: true)

In this example we subscribe to notifications from the affected read model for a given user and its latest version as returned:

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    user_uuid = UUID.uuid4()
    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(user_uuid)
      |> RegisterUser.downcase_username()
      |> RegisterUser.downcase_email()
      |> RegisterUser.hash_password()

    with {:ok, version} <- Router.dispatch(register_user, include_aggregate_version: true) do
      Notifications.wait_for(User, user_uuid, version)
    else
      reply -> reply
    end
  end
end

The User read model projector stores the version of the aggregate and publishes each change of the model:

defmodule Conduit.Accounts.Projectors.User do
  use Commanded.Projections.Ecto, name: "Accounts.Projectors.User"

  project %UserRegistered{} = registered, %{stream_version: version} do
    Ecto.Multi.insert(multi, :user, %User{
      uuid: registered.uuid,
      version: version,
      username: registered.username,
      email: registered.email,
      hashed_password: registered.hashed_password,
    })
  end

  def after_update(event, metadata, changes), do: Notifications.publish_changes(changes)
end

Latency is minimised as you are immediately notified when the read model has been updated. But you must write the extra infrastructure and boilerplate code required to support the publishing and notifications.

Strongly consistent command dispatch

The preferred approach is to simulate strong consistency for particular read model event handlers when dispatching a command. Once the command has been successfully dispatched, it blocks until the strongly consistent handlers have each processed all of the created domain events.

Commanded v0.14.0 provides a consistency option during command dispatch: :ok = Router.dispatch(command, consistency: :strong)

The available options are :eventual (default) and :strong.

Dispatching a command using consistency: :strong will block until all events created by the command have been processed by the handlers configured to use strong consistency. This guarantees that when you receive the :ok response from dispatch, your read model will haven been updated and can safely be queried.

In this example we dispatch with :strong consistency and can then immediately query the read model, safe in the knowledge that it is up-to-date:

defmodule Conduit.Blog do
  @doc """
  Publishes an article by the given author.
  """
  def publish_article(%Author{} = author, attrs \\ %{}) do
    article_uuid = UUID.uuid4()
    publish_article =
      attrs
      |> PublishArticle.new()
      |> PublishArticle.assign_uuid(article_uuid)
      |> PublishArticle.assign_author(author)
      |> PublishArticle.generate_url_slug()

    with :ok <- Router.dispatch(publish_article, consistency: :strong) do
      get(Article, article_uuid)
    else
      reply -> reply
    end
  end

  defp get(schema, uuid) do
    case Repo.get(schema, uuid) do
      nil -> {:error, :not_found}
      projection -> {:ok, projection}
    end
  end
end

To support this, the Article read model projector has been configured to use :strong consistency:

defmodule Conduit.Blog.Projectors.Article do
  use Commanded.Projections.Ecto,
    name: "Blog.Projectors.Article",
    consistency: :strong

  project %ArticlePublished{} = published, %{created_at: published_at} do
    Ecto.Multi.insert(multi, :article, %Article{
      uuid: published.uuid,
      slug: published.slug,
      title: published.title,
      # ...
    })
  end
end

Summary

By opting-in to strongly consistent command dispatch you alleviate the issue of stale read model date. There’s a trade-off involved, and the cost is additional latency while waiting for the read models to update.

For each command dispatch you can choose:

  • Strong consistency - up-to-date data but at the cost of high latency.
  • Eventual consistency - low latency but may reply to read requests with stale data since they may not have processed the persisted events and updated their data.