Event sourcing in practice

Using Elixir to build event-driven applications

by Ben Smith

Event sourcing in practice

Using Elixir to build event-driven applications

by Ben Smith

Photo by Amanda Jones on Unsplash

Talk outline

Let's start by introducing the application

defmodule SegmentChallenge.Challenge do
  use SegmentChallenge.Web, :model
 
  schema "challenges" do
    field(:name, :string)
    field(:description, :string)
    field(:start_date, Ecto.DateTime)
    field(:start_date_local, Ecto.DateTime)
    field(:end_date, Ecto.DateTime)
    field(:end_date_local, Ecto.DateTime)
    field(:active, :boolean)
    field(:competitor_count, :integer, default: 0)
    field(:slug, :string)
 
    belongs_to(:club, Club)
 
    has_many(:challenge_participations, ChallengeParticipation)
    has_many(:competitors, through: [:challenge_participations, :athlete])
    
    timestamps()
  end
end

defmodule SegmentChallengeWeb.API.StravaController do
  use SegmentChallengeWeb, :controller
 
  def webhook(conn, %{"object_type" => "activity", "aspect_type" => "create"} = params) do
    %{"object_id" => strava_activity_id} = params
 
    SegmentChallenge.ActivityImport.execute(strava_activity_id)
 
    json(conn, "")
  end
 
  def webhook(conn, _params) do
    json(conn, "")
  end
end
 


defmodule SegmentChallenge.ActivityImport do
  def execute(strava_activity_id) do
    {:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
 
    for challenge <- athlete_active_challenges(activity) do
      Repo.transaction(fn ->
        :ok = Challenge.import_activity(activity)
        :ok = Leaderboard.rank_leaderboard(challenge)
      end)
    end
  end
end
 

defmodule SegmentChallenge.ActivityImport do
  def execute(strava_activity_id) do
    {:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
 
    for challenge <- athlete_active_challenges(activity) do
      Repo.transaction(fn ->
        :ok = Challenge.import_activity(challenge, activity)
        :ok = Leaderboard.rank_leaderboard(challenge)
        :ok = ActivityFeed.record_activity(activity)
        :ok = Email.send_lost_place_notification(activity)
      end)
    end
  end
end
 
defmodule SegmentChallenge.ActivityImport do
  def execute(strava_activity_id) do
    {:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
 
    challenges = athlete_active_challenges(activity)
 
    multi =
      Enum.reduce(challenges, Ecto.Multi.new(), fn challenge, multi ->
        multi
        |> import_challenge_activity(challenge, activity)
        |> rank_challenge_leaderboard(challenge)
        |> record_activity_in_feed(activity)
        |> send_lost_place_email_notification(challenge)
      end)
 
    case Repo.transaction(multi) do
      {:ok, _changes} -> :ok
      {:error, _operation, _failure, _changes} -> {:error, :activity_import_failed}
    end
  end
end
 

defmodule SegmentChallenge.ActivityImport do
  def execute(strava_activity_id) do
    {:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
 
    for challenge <- athlete_active_challenges(activity do
      with {:ok, activity} <- Challenge.import_activity(challenge, activity) do
        Registry.dispatch(SegmentChallenge.PubSub, :challenge, fn entries ->
          for {pid, _} <- entries do
            send(pid, {:activity_recorded, challenge, activity})
          end
        end)
      end
    end
  end
end
 
defmodule SegmentChallenge.ChallengeActivityHandler do
  use GenServer
 
  def init(state) do
    {:ok, _} = Registry.register(SegmentChallenge.PubSub, :challenge, [])
 
    {:ok, state}
  end
 
  @doc """
  Handle challenge activity recorded message.
  """
  def handle_info({:activity_recorded, challenge, activity}, state) do
    :ok = Leaderboard.rank_leaderboard(challenge)
 
    {:noreply, state}
  end
end
 

Using domain events as the source of truth

Command function

execute(state, command) :: {:ok, [event]}
  | {:error, term}

defmodule SegmentChallenge.Challenge do
  # Command functions
  def execute(%Challenge{}, %CreateChallenge{}), do: # ...
  def execute(%Challenge{}, %JoinChallenge{}), do: # ...
  def execute(%Challenge{}, %ImportActivity{}), do: # ...
end
defmodule SegmentChallenge.Challenge do
  defstruct [:id, :start_date_local, :end_date_local, competitors: MapSet.new(), activities: []]
 
  def execute(%Challenge{id: id} = challenge, %ImportActivity{} = command) do
    %ImportActivity{
      activity_id: activity_id,
      athlete_id: athlete_id,
      start_date_local: start_date_local
    } = command
 
    with :ok <- validate_new_activity(challenge, activity_id),
         :ok <- validate_is_competitor(challenge, athlete_id),
         :ok <- validate_within_challenge_period(challenge, start_date_local) do
      event = struct(ActivityRecorded, Map.from_struct(command))
 
      {:ok, [event]}
    else
      {:error, error} -> {:error, error}
    end
  end
end
 

State mutator function

      apply(state, event) :: state
    

defmodule SegmentChallenge.Challenge do
  # State mutators  
  def apply(%Challenge{}, %ChallengeCreated{}), do: # ...
  def apply(%Challenge{}, %CompetitorJoinedChallenge{}, do: # ...
  def apply(%Challenge{}, %ActivityRecorded{}), do: # ...
end
 
defmodule SegmentChallenge.Challenge do
  defstruct [:id, :start_date_local, :end_date_local, competitors: MapSet.new(), activities: []]
 
  def apply(%Challenge{} = challenge, %CompetitorJoinedChallenge{} = event) do
    %Challenge{competitors: competitors} = challenge
    %CompetitorJoinedChallenge{athlete_id: athlete_id} = event
 
    %Challenge{challenge | competitors: MapSet.put(competitors, athlete_id)}
  end
 
  def apply(%Challenge{} = challenge, %ActivityRecorded{} = event) do
    %Challenge{activities: activities} = challenge
    %ActivityRecorded{athlete_id: athlete_id} = event
 
    %Challenge{challenge | activities: [event | activities]}
  end
end
 

Example usage

challenge = %Challenge{}
 
{:ok, events} = Challenge.execute(challenge, command)
 
challenge = Enum.reduce(events, challenge, &Challenge.apply(&2, &1))
 
defmodule SegmentChallenge.ChallengeTest do
  use SegmentChallenge.AggregateCase, aggregate: SegmentChallenge.Challenge
 
  describe "challenge aggregate" do
    test "should record activity for competitor within challenge period" do
      assert_events(
        [
          %ChallengeCreated{
            id: "1",
            start_date_local: ~N[2019-03-01 00:00:00],
            end_date_local: ~N[2019-03-31 23:59:59]
          },
          %CompetitorJoinedChallenge{id: "1", athlete_id: "2"}
        ],
        %ImportActivity{athlete_id: "2", start_date_local: ~N[2019-03-28 10:27:15]},
        [
          %ActivityRecorded{athlete_id: "2", start_date_local: ~N[2019-03-28 10:27:15]}
        ]
      )
    end
  end
end
 
defmodule SegmentChallenge.ChallengeTest do
  use SegmentChallenge.AggregateCase, aggregate: SegmentChallenge.Challenge
 
  describe "challenge aggregate" do
    test "should exclude activity for non-comptitor" do
      assert_error(
        [
          %ChallengeCreated{
            id: "1",
            start_date_local: ~N[2019-03-01 00:00:00],
            end_date_local: ~N[2019-03-31 23:59:59]
          }
        ],
        %ImportActivity{athlete_id: "2", start_date_local: ~N[2019-03-28 10:27:15]},
        {:error, :not_a_competitor}
      )
    end
  end
end
 

How do I design using events?

Challenge lifecycle

%ChallengeCreated{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b", name: "Ride 2,0189 miles in 2019"},
%ChallengeHosted{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b"},
%ChallengeStarted{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b"},
%CompetitorJoinedChallenge{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b", competitor: "athlete-1234"},
%ActivityRecorded{
  id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b",
  athlete: "athlete-1234",
  start_date_local: ~N[2019-01-17 10:25:16],
  distance_in_metres: 42417.8,
  moving_time_in_seconds: 6078
},
%LeaderboardRanked{
  id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b",
  rankings: [
    %LeaderboardRanked.Ranking{
      rank: 1,
      athlete_uuid: "athlete-1234",
      total_distance_in_metres: 42417.8,
      total_moving_time_in_seconds: 6078
    }
  ],
  new_positions: [%LeaderboardRanked.Position{rank: 1, athlete_uuid: "athlete-1234"}],
  positions_gained: [],
  positions_lost: []
}
 
defmodule SegmentChallenge.LostPlaceEmailNotification do
  use Commanded.Event.Handler, name: __MODULE__
 
  @doc """
  Handle leaderboard ranked events where an athlete has lost a position.
  """
  def handle(%LeaderboardRanked{} = event, _metadata) do
    %LeaderboardRanked{positions_lost: positions_lost} = event
 
    for position <- positions_lost do
      :ok = Email.send_lost_place_notification(position)
    end
 
    :ok
  end
end
 
defmodule SegmentChallenge.ChallengeProjector do
  use Commanded.Projections.Ecto, name: __MODULE__
 
  project %ActivityRecorded{} = event, fn multi ->
    projection = to_projection(Activity, event)
 
    Ecto.Multi.insert(multi, :activity, projection)
  end
 
  project %LeaderboardRanked{} = event, fn multi ->
    %LeaderboardRanked{rankings: rankings} = event
 
    Enum.reduce(rankings, multi, fn ranking, multi ->
      projection = to_projection(LeaderboardEntry, ranking)
      
      Ecto.Multi.insert(multi, :leaderboard_entry, projection)
    end)
  end
end
 

What about changing requirements?

Dealing with external events



defmodule SegmentChallenge.StravaActivityRecorder do
  def execute(strava_activity_id) do
    {:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
 
    events = [
      struct(StravaActivityCreated, Map.from_struct(activity))
    ]
 
    :ok = EventStore.append_to_stream("strava", :any_version, events)
  end
end
 

Extending third party providers

Questions?

Want to learn more?

Get in touch

I'm available to help your company become event-driven.