by Ben Smith
by Ben Smith
Photo by Amanda Jones on Unsplash
defmodule SegmentChallenge.Challenge do
use SegmentChallenge.Web, :model
schema "challenges" do
field(:name, :string)
field(:description, :string)
field(:start_date, Ecto.DateTime)
field(:start_date_local, Ecto.DateTime)
field(:end_date, Ecto.DateTime)
field(:end_date_local, Ecto.DateTime)
field(:active, :boolean)
field(:competitor_count, :integer, default: 0)
field(:slug, :string)
belongs_to(:club, Club)
has_many(:challenge_participations, ChallengeParticipation)
has_many(:competitors, through: [:challenge_participations, :athlete])
timestamps()
end
end
defmodule SegmentChallengeWeb.API.StravaController do
use SegmentChallengeWeb, :controller
def webhook(conn, %{"object_type" => "activity", "aspect_type" => "create"} = params) do
%{"object_id" => strava_activity_id} = params
SegmentChallenge.ActivityImport.execute(strava_activity_id)
json(conn, "")
end
def webhook(conn, _params) do
json(conn, "")
end
end
defmodule SegmentChallenge.ActivityImport do
def execute(strava_activity_id) do
{:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
for challenge <- athlete_active_challenges(activity) do
Repo.transaction(fn ->
:ok = Challenge.import_activity(activity)
:ok = Leaderboard.rank_leaderboard(challenge)
end)
end
end
end
But …
defmodule SegmentChallenge.ActivityImport do
def execute(strava_activity_id) do
{:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
for challenge <- athlete_active_challenges(activity) do
Repo.transaction(fn ->
:ok = Challenge.import_activity(challenge, activity)
:ok = Leaderboard.rank_leaderboard(challenge)
:ok = ActivityFeed.record_activity(activity)
:ok = Email.send_lost_place_notification(activity)
end)
end
end
end
defmodule SegmentChallenge.ActivityImport do
def execute(strava_activity_id) do
{:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
challenges = athlete_active_challenges(activity)
multi =
Enum.reduce(challenges, Ecto.Multi.new(), fn challenge, multi ->
multi
|> import_challenge_activity(challenge, activity)
|> rank_challenge_leaderboard(challenge)
|> record_activity_in_feed(activity)
|> send_lost_place_email_notification(challenge)
end)
case Repo.transaction(multi) do
{:ok, _changes} -> :ok
{:error, _operation, _failure, _changes} -> {:error, :activity_import_failed}
end
end
end
No real improvement
defmodule SegmentChallenge.ActivityImport do
def execute(strava_activity_id) do
{:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
for challenge <- athlete_active_challenges(activity do
with {:ok, activity} <- Challenge.import_activity(challenge, activity) do
Registry.dispatch(SegmentChallenge.PubSub, :challenge, fn entries ->
for {pid, _} <- entries do
send(pid, {:activity_recorded, challenge, activity})
end
end)
end
end
end
end
defmodule SegmentChallenge.ChallengeActivityHandler do
use GenServer
def init(state) do
{:ok, _} = Registry.register(SegmentChallenge.PubSub, :challenge, [])
{:ok, state}
end
@doc """
Handle challenge activity recorded message.
"""
def handle_info({:activity_recorded, challenge, activity}, state) do
:ok = Leaderboard.rank_leaderboard(challenge)
{:noreply, state}
end
end
But …
execute(state, command) :: {:ok, [event]}
| {:error, term}
defmodule SegmentChallenge.Challenge do
# Command functions
def execute(%Challenge{}, %CreateChallenge{}), do: # ...
def execute(%Challenge{}, %JoinChallenge{}), do: # ...
def execute(%Challenge{}, %ImportActivity{}), do: # ...
end
defmodule SegmentChallenge.Challenge do
defstruct [:id, :start_date_local, :end_date_local, competitors: MapSet.new(), activities: []]
def execute(%Challenge{id: id} = challenge, %ImportActivity{} = command) do
%ImportActivity{
activity_id: activity_id,
athlete_id: athlete_id,
start_date_local: start_date_local
} = command
with :ok <- validate_new_activity(challenge, activity_id),
:ok <- validate_is_competitor(challenge, athlete_id),
:ok <- validate_within_challenge_period(challenge, start_date_local) do
event = struct(ActivityRecorded, Map.from_struct(command))
{:ok, [event]}
else
{:error, error} -> {:error, error}
end
end
end
apply(state, event) :: state
defmodule SegmentChallenge.Challenge do
# State mutators
def apply(%Challenge{}, %ChallengeCreated{}), do: # ...
def apply(%Challenge{}, %CompetitorJoinedChallenge{}, do: # ...
def apply(%Challenge{}, %ActivityRecorded{}), do: # ...
end
defmodule SegmentChallenge.Challenge do
defstruct [:id, :start_date_local, :end_date_local, competitors: MapSet.new(), activities: []]
def apply(%Challenge{} = challenge, %CompetitorJoinedChallenge{} = event) do
%Challenge{competitors: competitors} = challenge
%CompetitorJoinedChallenge{athlete_id: athlete_id} = event
%Challenge{challenge | competitors: MapSet.put(competitors, athlete_id)}
end
def apply(%Challenge{} = challenge, %ActivityRecorded{} = event) do
%Challenge{activities: activities} = challenge
%ActivityRecorded{athlete_id: athlete_id} = event
%Challenge{challenge | activities: [event | activities]}
end
end
challenge = %Challenge{}
{:ok, events} = Challenge.execute(challenge, command)
challenge = Enum.reduce(events, challenge, &Challenge.apply(&2, &1))
GenServer
processdefmodule SegmentChallenge.ChallengeTest do
use SegmentChallenge.AggregateCase, aggregate: SegmentChallenge.Challenge
describe "challenge aggregate" do
test "should record activity for competitor within challenge period" do
assert_events(
[
%ChallengeCreated{
id: "1",
start_date_local: ~N[2019-03-01 00:00:00],
end_date_local: ~N[2019-03-31 23:59:59]
},
%CompetitorJoinedChallenge{id: "1", athlete_id: "2"}
],
%ImportActivity{athlete_id: "2", start_date_local: ~N[2019-03-28 10:27:15]},
[
%ActivityRecorded{athlete_id: "2", start_date_local: ~N[2019-03-28 10:27:15]}
]
)
end
end
end
defmodule SegmentChallenge.ChallengeTest do
use SegmentChallenge.AggregateCase, aggregate: SegmentChallenge.Challenge
describe "challenge aggregate" do
test "should exclude activity for non-comptitor" do
assert_error(
[
%ChallengeCreated{
id: "1",
start_date_local: ~N[2019-03-01 00:00:00],
end_date_local: ~N[2019-03-31 23:59:59]
}
],
%ImportActivity{athlete_id: "2", start_date_local: ~N[2019-03-28 10:27:15]},
{:error, :not_a_competitor}
)
end
end
end
async: true
Command and state mutation functions are pure — IO and side-affect free
%ChallengeCreated{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b", name: "Ride 2,0189 miles in 2019"},
%ChallengeHosted{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b"},
%ChallengeStarted{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b"},
%CompetitorJoinedChallenge{id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b", competitor: "athlete-1234"},
%ActivityRecorded{
id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b",
athlete: "athlete-1234",
start_date_local: ~N[2019-01-17 10:25:16],
distance_in_metres: 42417.8,
moving_time_in_seconds: 6078
},
%LeaderboardRanked{
id: "42abbe22-a93b-411b-b0b1-c11e3ccad77b",
rankings: [
%LeaderboardRanked.Ranking{
rank: 1,
athlete_uuid: "athlete-1234",
total_distance_in_metres: 42417.8,
total_moving_time_in_seconds: 6078
}
],
new_positions: [%LeaderboardRanked.Position{rank: 1, athlete_uuid: "athlete-1234"}],
positions_gained: [],
positions_lost: []
}
defmodule SegmentChallenge.LostPlaceEmailNotification do
use Commanded.Event.Handler, name: __MODULE__
@doc """
Handle leaderboard ranked events where an athlete has lost a position.
"""
def handle(%LeaderboardRanked{} = event, _metadata) do
%LeaderboardRanked{positions_lost: positions_lost} = event
for position <- positions_lost do
:ok = Email.send_lost_place_notification(position)
end
:ok
end
end
defmodule SegmentChallenge.ChallengeProjector do
use Commanded.Projections.Ecto, name: __MODULE__
project %ActivityRecorded{} = event, fn multi ->
projection = to_projection(Activity, event)
Ecto.Multi.insert(multi, :activity, projection)
end
project %LeaderboardRanked{} = event, fn multi ->
%LeaderboardRanked{rankings: rankings} = event
Enum.reduce(rankings, multi, fn ranking, multi ->
projection = to_projection(LeaderboardEntry, ranking)
Ecto.Multi.insert(multi, :leaderboard_entry, projection)
end)
end
end
defmodule SegmentChallenge.StravaActivityRecorder do
def execute(strava_activity_id) do
{:ok, %Strava.Activity{} = activity} = get_strava_activity(strava_activity_id)
events = [
struct(StravaActivityCreated, Map.from_struct(activity))
]
:ok = EventStore.append_to_stream("strava", :any_version, events)
end
end
I'm available to help your company become event-driven.