Using Elixir's GenStage and Flow to build product recommendations
Crowdsourcing product mentions, applying sentiment analysis, and scoring to find the best resources to Start learning Elixir. A real-world example of building a computational parallel pipeline using Flow.
Elixir Phoenix GenStage Flow
Background
Flow allows developers to express computations on collections, similar to the Enum and Stream modules, although computations will be executed in parallel using multiple GenStages.
The canonical example of Flow, taken from the GitHub repository, demonstrates how to count words in a document in parallel:
File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
I’ve wanted to build something using Flow since José Valim introduced the concepts behind it in his Elixir Conf 2016 keynote. I initially thought of building a product recommendation tool using crowdsourced reviews. As it would include both IO intensive stages, such as HTTP requests, and CPU intensive stages, like sentiment analysis. A good candidate for parallel processing and suitable for using Flow.
This lead me to create Start learning Elixir.
Start learning Elixir
Start learning Elixir answers the question: what are the best resources for learning Elixir?
Crowdsourcing is used to locate and rank relevant learning materials, including books, eBooks, screencasts, and tutorials.
The ranking algorithm uses a combination of popularity and sentiment analysis. Popular and highly rated material scores well. Scores are a combination of the positive, neutral, and negative mentions from the popular Elixir Forum.
Creating a product recommendation tool
I created an Elixir umbrella application containing three apps:
indexer
used to fetch external content, using HTTP requests, and rank products.recommendations
containing the domain Ecto schema and queries: resources, reviews, scores, authors.web
a Phoenix front-end to display the recommended products.
Flow is used in the indexer
app; that’s where the processing is done. I jotted down the high-level steps, then built separate modules for each stage of the pipeline. Each stage was built and unit tested in isolation. Then connected together and orchestrated using the appropriate Flow
function.
High-level flow
The following end-to-end process is used to build the product recommendations:
- Manually create a list of resources to be recommended: relevant books, screencasts, websites, tutorials.
- Define a set of input keywords:
- e.g. “learn”, “book”, “books”, “eBook”, “screencast”, “tutorial”, “Programming Elixir”
- Search the Elixir Forum by keyword to return a list of topics.
- Fetch all the posts in a topic:
- Parse post content as HTML.
- Strip
<aside/>
and<code/>
tags. - Split content by
<p>
tags into sentences. - Extract text content.
- Locate mentions of resources from the text content.
- Use sentiment analysis on each sentence to determine whether it is positive, neutral, or negative.
- Aggregate mentions by the same author.
- Score and rank resources by their mentions, weighted by frequency and sentiment, using Laplace smoothing.
This translates to the following Elixir code using Flow
.
defmodule Learn.Indexer do
@moduledoc """
Index mentions of resources from an authoritative source.
"""
alias Learn.Indexer
alias Learn.Indexer.{
Mention,
Post,
Ranking,
Resource,
}
@doc """
Rank the given resources based on their mentions in topics matching the given keywords
"""
@spec rank(list(String.t), list(Resource.t)) :: list(Ranking.t)
def rank(keywords, resources, opts \\ []) do
keywords
|> Flow.from_enumerable()
|> Flow.flat_map(&Indexer.search(&1, opts))
|> Flow.uniq()
|> Flow.partition()
|> Flow.flat_map(&Indexer.list_posts(&1, opts))
|> Flow.partition(key: {:key, :id})
|> Flow.uniq_by(&(&1.id))
|> Flow.map(&Indexer.parse_html_content/1)
|> Flow.map(&Indexer.parse_sentences/1)
|> Flow.flat_map(&Indexer.extract_mentions(&1, resources))
|> Flow.map(&Indexer.sentiment_analysis/1)
|> Flow.partition(key: {:key, :resource})
|> Flow.group_by(&(&1.resource))
|> Flow.map(fn {resource, mentions} -> {resource, Indexer.aggregate_mentions_by_author(mentions)} end)
|> Flow.map(fn {resource, recommendations} -> Indexer.rank_recommendations(resource, recommendations) end)
|> Enum.to_list()
|> Enum.sort_by(&(&1.score), &>=/2)
end
end
You will notice the additional Flow.partition
functions included within certain stages of the pipeline. Partitioning ensures that data is mapped to the same process to minimise message passing. A hash function is used to partition the data. You can provide a function, or key tuple, to partition by. This is also necessary when you need to aggregate and reduce data. The partitioning guarantees the data in each partition won’t overlap.
The individual steps within the Flow pipeline are detailed below:
- Search topics by keyword
- Parse HTML content
- Parse sentences
- Extract mentions
- Sentiment analysis
- Score recommendations
Search topics by keyword
Elixir Forum uses Discourse as its forum platform. This provides a public API that serves up JSON. You simply append .json
to the request to receive JSON encoded data.
So a search request for “book” (https://elixirforum.com/search?q=book
) becomes: https://elixirforum.com/search.json?q=book
. Individual topics can be fetched as JSON in a similar way.
I wrote a simple ElixirForum
module using HTTPoison to send HTTP requests and parse the body using Poison.
defmodule Learn.Indexer.Sources.ElixirForum do
use HTTPoison.Base
alias Learn.Indexer.Cache.HttpRequestCache
alias Learn.Indexer.Sources.ElixirForum
@endpoint "https://elixirforum.com"
defp process_url(path) do
@endpoint <> path
end
def process_response_body(body) do
body
|> Poison.decode!
end
def cached_request!(url, opts) do
HttpRequestCache.cached("elixirforum.com/" <> url, fn ->
rate_limit_access(fn ->
ElixirForum.get!(url).body
end, opts)
end)
end
defp rate_limit_access(request, opts \\ []) do
scale = Keyword.get(opts, :scale, 1_000)
limit = Keyword.get(opts, :limit, 1)
case ExRated.check_rate(@endpoint, scale, limit) do
{:ok, _} ->
request.()
{:error, _} ->
:timer.sleep(1_000)
rate_limit_access(request, opts)
end
end
defmodule Search do
@expected_fields ~w(posts topics)
def query(q, opts) do
"/search.json?" <> URI.encode_query(q: q)
|> ElixirForum.cached_request!(opts)
|> Map.take(@expected_fields)
|> Enum.map(fn({k, v}) -> {String.to_existing_atom(k), v} end)
end
end
end
To be a well behaved HTTP crawler, my access to the site is rate limited using ExRated. This was configured to only allow 1 request every second. This limit applies to all requests through this module, regardless of calling process, as ExRated runs as GenServer with its own state.
Usage:
ElixirForum.Search.query("books", [scale: 1_000, limit: 1])
Responses were cached to disk, so repeated requests would not hit the website.
defmodule Learn.Indexer.Cache.HttpRequestCache do
use GenServer
require Logger
defmodule State do
defstruct [
cache_dir: nil
]
end
def start_link(cache_dir) do
GenServer.start_link(__MODULE__, %State{cache_dir: cache_dir}, name: __MODULE__)
end
def init(%State{cache_dir: cache_dir} = state) do
File.mkdir_p!(cache_dir)
{:ok, state}
end
def cached(key, operation) do
case read(key) do
{:ok, value} -> value
{:error, :not_found} ->
value = operation.()
:ok = cache(key, value)
value
end
end
def read(key) do
GenServer.call(__MODULE__, {:read, key})
end
def cache(key, value) do
GenServer.call(__MODULE__, {:cache, key, value})
end
def handle_call({:read, key}, _from, %State{} = state) do
path = cached_path(key, state)
reply = case File.read(path) do
{:ok, data} -> {:ok, Poison.decode!(data)}
{:error, :enoent} -> {:error, :not_found}
{:error, _} = reply -> reply
end
{:reply, reply, state}
end
def handle_call({:cache, key, value}, _from, %State{} = state) do
path = cached_path(key, state)
File.write!(path, Poison.encode!(value), [:write])
{:reply, :ok, state}
end
defp cached_path(key, %State{cache_dir: cache_dir}) do
key = String.slice(key, 0, 255)
path = Path.join(cache_dir, key)
ensure_dir(path)
path
end
defp ensure_dir(path) do
path
|> Path.dirname()
|> File.mkdir_p!()
end
end
The HTTP cache module was configured in the application supervisor.
defmodule Learn.Indexer.Application do
@moduledoc false
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
worker(Learn.Indexer.Cache.HttpRequestCache, ["fixture/http_cache"])
]
opts = [strategy: :one_for_one, name: Learn.Indexer.Supervisor]
Supervisor.start_link(children, opts)
end
end
For testing the search, I again used ExVCR to record the HTTP request/response and replay from disk for subsequent test runs.
- Read more: HTTP unit tests using ExVCR
defmodule Learn.Indexer.Sources.ElixirForumTest do
use ExUnit.Case, async: false
use ExVCR.Mock, adapter: ExVCR.Adapter.Hackney
alias Learn.Indexer.Sources.ElixirForum
setup_all do
HTTPoison.start
:ok
end
describe "search" do
test "for \"books\"" do
use_cassette "elixirforum.com/search.json?&q=book", match_requests_on: [:query] do
response = ElixirForum.Search.query("book", [])
assert length(response[:posts]) > 0
assert length(response[:topics]) > 0
end
end
end
end
Parse HTML content
Floki is used to parse the HTML and extract text content using CSS selectors.
defmodule Learn.Indexer.Stages.ParseHtmlContent do
@moduledoc """
Parse the HTML post content into paragraphs of text.
"""
alias Learn.Indexer.{
Content,
Post,
}
def execute(%Post{content: content} = post) do
%Post{post |
content: parse_content_html(content),
}
end
defp parse_content_html(%Content{html: html} = content) do
paragraphs =
html
|> Floki.filter_out("aside")
|> Floki.filter_out("pre")
|> Floki.find("p")
|> Enum.map(&Floki.text/1)
|> Enum.flat_map(fn p -> String.split(p, "\n") end)
|> Enum.reject(fn p -> p == "" end)
%Content{content |
paragraphs: paragraphs,
}
end
end
Each <p>
tag is used to identify paragraphs of text, and the text content extracted using Floki.text/1
.
Parse sentences
Given a post containing paragraphs of text, I extract the individual sentences. This uses the Essence.Chunker.sentences
function from the essence Natural Language Processing library.
defmodule Learn.Indexer.Stages.ParseSentences do
@moduledoc """
Parse the paragraphs of text into sentences.
"""
alias Learn.Indexer.{
Content,
Post,
}
def execute(%Post{content: content} = post) do
%Post{post |
content: parse_sentences(content),
}
end
# chunks the given paragraphs into sentences.
defp parse_sentences(%Content{paragraphs: paragraphs} = content) do
sentences =
paragraphs
|> Enum.map(&Essence.Chunker.sentences/1)
|> Enum.flat_map(fn sentences -> sentences end)
%Content{content |
sentences: sentences,
}
end
end
Extract mentions
To extract individual product mentions:
- Split the sentence into lowercase tokens using
Essence.Tokenizer.tokenize
:- “I recommend Elixir in Action” to [“i, “recommend”, “elixir”, “in”, “action”]
- Split product name into lowercase tokens:
- “Elixir in Action” to [“elixir”, “in”, “action”]
- Chunk through the tokenised sentence using
Enum.chunk
, using a count of the length of the search tokens and a step of 1 to include overlaps, looking to match the given product name.
defp mentioned?(sentence, %Resource{name: name}) do
contains?(tokenize_downcase(sentence), tokenize_downcase(name))
end
def tokenize_downcase(text), do: text |> String.downcase |> Essence.Tokenizer.tokenize
defp contains?(source_tokens, search_tokens) do
source_tokens
|> Stream.chunk(length(search_tokens), 1)
|> Enum.any?(fn chunk -> chunk == search_tokens end)
end
Sentiment analysis
I use Sentient for simple sentiment analysis. It uses the AFINN-111 wordlist to score a sentence as positive, neutral, or negative based upon the presence, and score, of any words from the list.
AFINN is a list of English words rated for valence with an integer between minus five (negative) and plus five (positive)
As an example, the text “I recommend Elixir in Action” is scored positive as the word “recommend” is rated +2. There are 2,477 words in the AFINN-111 word list. This simplistic algorithm provides a reasonable indicator of sentiment for a sentence.
defmodule Learn.Indexer.Stages.SentimentAnalysis do
@moduledoc """
Analyse the mention sentence for its sentiment (positive, neutral, or negative).
Uses the AFINN-111 word list.
"""
alias Learn.Indexer.{
Mention,
}
@spec execute(Mention.t) :: Mention.t
def execute(%Mention{} = mention) do
%Mention{mention |
sentiment_score: sentiment(mention),
}
end
@override_words %{"free" => 0}
defp sentiment(%Mention{sentence: sentence}) do
Sentient.analyze(sentence, @override_words)
end
end
The sentiment score of the sentence containing a product mention contributes to its overall score.
Score recommendations
I determine a product’s overall score once every mention has been extracted from the text and assigned a sentiment score.
To balance the proportion of positive/negative ratings with the uncertainty from only finding a small number of mentions I apply Laplace smoothing:
score = (upvotes + α) / (upvotes + downvotes + β)
For example, α = 1 and β = 2 means that an item with no votes gets scored 0.5.
Each product has a mix of positive, neutral, and negative mentions. So I assign each neutral mention as +1, and positive mention as +2 to calculate the score.
defmodule Learn.Indexer.Stages.RankRecommendations do
@moduledoc """
Combine recommendations for the same resource into a single ranking and score
"""
alias Learn.Indexer.{
Ranking,
Recommendation,
Resource,
}
@spec execute(Resource.t, list(Recommendation.t)) :: Ranking.t
def execute(resource, recommendations) do
%Ranking{
resource: resource,
recommendations: recommendations,
score: calculate_score(recommendations),
}
end
# calculate score by using Laplace smoothing on positive, neutral and negative mentions
defp calculate_score([]), do: 0
defp calculate_score(recommendations) do
recommendations
|> Enum.reduce({0, 0, 0}, &Recommendation.score/2)
|> score
end
defp score({negative, neutral, positive}) do
upvotes = neutral + (positive * 2)
downvotes = negative
(upvotes + 1) / (upvotes + downvotes + 2)
end
end
There’s research to back up the usage of Laplace smoothing to “.. provide a well justified solution to the problem of user-rating based ranking of items in web applications”.
- How To Sort By Average Rating
- How to Count Thumb-Ups and Thumb-Downs: User-Rating based Ranking of Items from an Axiomatic Perspective (PDF)
Crowdsourced recommendations
Composing the individual steps into the single pipeline using Flow shown earlier, I can rank each product by its recommendations.
def index_mentions(keywords, resources, opts \\ []) do
keywords
|> Learn.Indexer.rank(resources, opts)
|> record_rankings(resources)
end
These are persisted into a PostgreSQL database using Ecto. I use the following Ecto query to display the products, ranked by their score. They may be optionally filtered by programming language or experience level to provide the categorised lists show on the website.
defmodule Learn.Recommendations.Queries.RecommendedResources do
import Ecto.Query
alias Learn.Recommendations.{
Resource,
Score,
}
def new do
from r in Resource,
left_join: s in assoc(r, :score),
order_by: [asc: s.rank, asc: r.title],
preload: [:score]
end
def by_experience(query, level) do
from [r, s] in query,
where: r.experience_level == ^level
end
def by_language(query, language) do
from [r, s] in query,
where: r.programming_language == ^language
end
end
The Phoenix controller builds the query and fetches matching resources using Repo.all/2
:
defmodule Learn.Web.ResourceController do
use Learn.Web.Web, :controller
alias Learn.Recommendations.Repo,
alias Learn.Recommendations.Queries.RecommendedResources
def index(conn, _params) do
resources =
RecommendedResources.new
|> RecommendedResources.by_experience("beginner")
|> RecommendedResources.by_language("Elixir")
|> Repo.all()
render conn, "index.html", resources: resources
end
end
Summary
That’s a real-world example of using Flow to build a computational parallel pipeline. I had assumed that the simplistic approach to building product recommendations wouldn’t be valuable. Yet, looking at the final recommendations I believe does: popular and highly rated resources appear at the top of the lists. Popularity, from mentions on a forum of users, combined with sentiment can be used to rate a product.
I’ve recently added activity tracking to the site with the intention of including those analytics in scores. So more frequently viewed resources are more highly ranked. I am also considering adding a feedback mechanism for users to flag incorrect reviews and add their own.
Get in touch if you have any feedback and suggestions.
Tuning
I was inspired to look at tuning my Flow pipeline by Tymon Tobolski. He authored two excellent articles on the subject.
By taking his example Progress
module and GnuPlot script I was able to visualise and optimise the Flow pipeline.
Collect progress stats
The Learn.Progress
module was directly taken from Tymon’s article.
defmodule Learn.Progress do
@moduledoc """
Progress stats collector, courtesy of http://teamon.eu/2016/measuring-visualizing-genstage-flow-with-gnuplot/
"""
use GenServer
@timeres :millisecond
# Progress.start_link [:a, :b, :c]
def start_link(scopes \\ []) do
GenServer.start_link(__MODULE__, scopes, name: __MODULE__)
end
def stop do
GenServer.stop(__MODULE__)
end
# increment counter for given scope by `n`
# Progress.incr(:my_scope)
# Progress.incr(:my_scope, 10)
def incr(scope, n \\ 1) do
GenServer.cast __MODULE__, {:incr, scope, n}
end
def init(scopes) do
File.mkdir_p!("fixture/trace")
# open "progress-{scope}.log" file for every scope
files = Enum.map(scopes, fn scope ->
{scope, File.open!("fixture/trace/progress-#{scope}.log", [:write])}
end)
# keep current counter for every scope
counts = Enum.map(scopes, fn scope -> {scope, 0} end)
# save current time
time = :os.system_time(@timeres)
# write first data point for every scope with current time and value 0
# this helps to keep the graph starting nicely at (0,0) point
Enum.each(files, fn {_, io} -> write(io, time, 0) end)
{:ok, {time, files, counts}}
end
def handle_cast({:incr, scope, n}, {time, files, counts}) do
# update counter
{value, counts} = Keyword.get_and_update!(counts, scope, &({&1+n, &1+n}))
# write new data point
write(files[scope], time, value)
{:noreply, {time, files, counts}}
end
defp write(file, time, value) do
time = :os.system_time(@timeres) - time
IO.write(file, "#{time}\t#{value}\n")
end
end
I used Arjan Scherpenisse’s decorator library to apply the progress collecting stats to each stage function. By decorating each function to be recorded with @decorate progress
.
defmodule Learn.Indexer do
@moduledoc """
Index mentions of resources from an authoritative source.
"""
use Learn.ProgressDecorator
@doc """
Search for topics matching a given query
"""
@decorate progress
def search(query, opts \\ []), do: SearchKeyword.execute(query, opts)
end
The decorator simply increments the counter for the method after execution. It increments by the number of items when the function return a list. Otherwise it increments by 1.
defmodule Learn.ProgressDecorator do
use Decorator.Define, [progress: 0]
alias Learn.Progress
def progress(body, context) do
quote do
reply = unquote(body)
case reply do
list when is_list(list) -> Progress.incr(unquote(context.name), length(list))
_ -> Progress.incr(unquote(context.name), 1)
end
reply
end
end
end
The Learn.Indexer.rank/3
function configures the progress process with the name of each stage before executing the flow. It stops the progress process afterwards to ensure the log files are written and closed.
defmodule Learn.Indexer do
@moduledoc """
Index mentions of resources from an authoritative source.
"""
@doc """
Rank the given resources based on their mentions in topics matching the given keywords
"""
@spec rank(list(String.t), list(Resource.t)) :: list(Ranking.t)
def rank(keywords, resources, opts \\ []) do
Progress.start_link([
:search,
:list_posts,
:parse_html_content,
:parse_sentences,
:extract_mentions,
:sentiment_analysis,
:aggregate_mentions_by_author,
:rank_recommendations,
])
rankings =
keywords
|> Flow.from_enumerable(max_demand: 1, stages: 1)
|> Flow.flat_map(&Indexer.search(&1, opts))
|> Flow.uniq()
|> Flow.partition(max_demand: 5)
|> Flow.flat_map(&Indexer.list_posts(&1, opts))
|> Flow.partition(key: {:key, :id}, max_demand: 5)
|> Flow.uniq_by(&(&1.id))
|> Flow.map(&Indexer.parse_html_content/1)
|> Flow.map(&Indexer.parse_sentences/1)
|> Flow.flat_map(&Indexer.extract_mentions(&1, resources))
|> Flow.map(&Indexer.sentiment_analysis/1)
|> Flow.partition(key: {:key, :resource}, max_demand: 5)
|> Flow.group_by(&(&1.resource))
|> Flow.map(fn {resource, mentions} -> {resource, Indexer.aggregate_mentions_by_author(mentions)} end)
|> Flow.map(fn {resource, recommendations} -> Indexer.rank_recommendations(resource, recommendations) end)
|> Enum.to_list()
|> Enum.sort_by(&(&1.score), &>=/2)
Progress.stop()
rankings
end
end
Visualising the flow
After running the indexer with the stage functions decorated with the @progress
tag I could plot the flow progress over time.
Initial flow
Initially it took around 33 seconds to run the flow. This is with a fully populated HTTP cache, so no external requests were being made.
Optimised flow
I adjusted the max_demand
and stages
options for some of the Flow steps, as shown above. This reduced the running time of the overall flow down from 33 to 26 seconds.
The following GnuPlot script was used to generate the charts. It displays the progress of each stage by percentage complete. The final ranked recommendations are shown on a separate y-axis.
# plot.gp
set terminal png font "Arial,10" size 700,500
set output "progress.png"
set title "Elixir Flow processing progress over time"
set xlabel "Time (ms)"
set ylabel "Progress (%)"
set y2label "Rankings"
set ytics nomirror
set yrange [0:100]
set format y '%2.0f%%'
set y2tics
set key top left # put labels in top-left corner
# limit x range to 35.000 ms instead of dynamic one - needed when generating graphs that will be later compared visually
set xrange [0:35000]
plot "trace/progress-search.log" using ($1):($2/1249*100) with steps axes x1y1 ls 1 title "Search topics",\
"trace/progress-search.log" using ($1):($2/1249*100) with points axes x1y1 ls 1 notitle,\
"trace/progress-list_posts.log" using ($1):($2/14974*100) with lines axes x1y1 ls 2 title "List posts",\
"trace/progress-parse_html_content.log" using ($1):($2/6780*100) with lines axes x1y1 ls 3 title "Parse HTML",\
"trace/progress-parse_sentences.log" using ($1):($2/6780*100) with lines axes x1y1 ls 4 title "Parse sentences",\
"trace/progress-extract_mentions.log" using ($1):($2/515*100) with lines axes x1y1 ls 5 title "Extract mentions",\
"trace/progress-sentiment_analysis.log" using ($1):($2/515*100) with lines axes x1y1 ls 6 title "Sentiment analysis",\
"trace/progress-aggregate_mentions_by_author.log" using ($1):($2/314*100) with lines axes x1y1 ls 7 title "Aggregate mentions by author",\
"trace/progress-rank_recommendations.log" with steps axes x1y2 ls 8 title "Rank",\
"trace/progress-rank_recommendations.log" with points axes x1y2 ls 8 notitle