Using Elixir's GenStage and Flow to build product recommendations

Crowdsourcing product mentions, applying sentiment analysis, and scoring to find the best resources to Start learning Elixir. A real-world example of building a computational parallel pipeline using Flow.


  Elixir Phoenix GenStage Flow


Background

Flow allows developers to express computations on collections, similar to the Enum and Stream modules, although computations will be executed in parallel using multiple GenStages.

The canonical example of Flow, taken from the GitHub repository, demonstrates how to count words in a document in parallel:

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

I’ve wanted to build something using Flow since José Valim introduced the concepts behind it in his Elixir Conf 2016 keynote. I initially thought of building a product recommendation tool using crowdsourced reviews. As it would include both IO intensive stages, such as HTTP requests, and CPU intensive stages, like sentiment analysis. A good candidate for parallel processing and suitable for using Flow.

This lead me to create Start learning Elixir.



Start learning Elixir

Start learning Elixir answers the question: what are the best resources for learning Elixir?

Start learning Elixir

Crowdsourcing is used to locate and rank relevant learning materials, including books, eBooks, screencasts, and tutorials.

The ranking algorithm uses a combination of popularity and sentiment analysis. Popular and highly rated material scores well. Scores are a combination of the positive, neutral, and negative mentions from the popular Elixir Forum.

Creating a product recommendation tool

I created an Elixir umbrella application containing three apps:

  • indexer used to fetch external content, using HTTP requests, and rank products.
  • recommendations containing the domain Ecto schema and queries: resources, reviews, scores, authors.
  • web a Phoenix front-end to display the recommended products.

Flow is used in the indexer app; that’s where the processing is done. I jotted down the high-level steps, then built separate modules for each stage of the pipeline. Each stage was built and unit tested in isolation. Then connected together and orchestrated using the appropriate Flow function.

High-level flow

The following end-to-end process is used to build the product recommendations:

  • Manually create a list of resources to be recommended: relevant books, screencasts, websites, tutorials.
  • Define a set of input keywords:
    • e.g. “learn”, “book”, “books”, “eBook”, “screencast”, “tutorial”, “Programming Elixir”
  • Search the Elixir Forum by keyword to return a list of topics.
  • Fetch all the posts in a topic:
    • Parse post content as HTML.
    • Strip <aside/> and <code/> tags.
    • Split content by <p> tags into sentences.
    • Extract text content.
  • Locate mentions of resources from the text content.
  • Use sentiment analysis on each sentence to determine whether it is positive, neutral, or negative.
  • Aggregate mentions by the same author.
  • Score and rank resources by their mentions, weighted by frequency and sentiment, using Laplace smoothing.

This translates to the following Elixir code using Flow.

defmodule Learn.Indexer do
  @moduledoc """
  Index mentions of resources from an authoritative source.
  """

  alias Learn.Indexer

  alias Learn.Indexer.{
    Mention,
    Post,
    Ranking,
    Resource,
  }

  @doc """
  Rank the given resources based on their mentions in topics matching the given keywords
  """
  @spec rank(list(String.t), list(Resource.t)) :: list(Ranking.t)
  def rank(keywords, resources, opts \\ []) do
    keywords
    |> Flow.from_enumerable()
    |> Flow.flat_map(&Indexer.search(&1, opts))
    |> Flow.uniq()
    |> Flow.partition()
    |> Flow.flat_map(&Indexer.list_posts(&1, opts))
    |> Flow.partition(key: {:key, :id})
    |> Flow.uniq_by(&(&1.id))
    |> Flow.map(&Indexer.parse_html_content/1)
    |> Flow.map(&Indexer.parse_sentences/1)
    |> Flow.flat_map(&Indexer.extract_mentions(&1, resources))
    |> Flow.map(&Indexer.sentiment_analysis/1)
    |> Flow.partition(key: {:key, :resource})
    |> Flow.group_by(&(&1.resource))
    |> Flow.map(fn {resource, mentions} -> {resource, Indexer.aggregate_mentions_by_author(mentions)} end)
    |> Flow.map(fn {resource, recommendations} -> Indexer.rank_recommendations(resource, recommendations) end)
    |> Enum.to_list()
    |> Enum.sort_by(&(&1.score), &>=/2)
  end
end

You will notice the additional Flow.partition functions included within certain stages of the pipeline. Partitioning ensures that data is mapped to the same process to minimise message passing. A hash function is used to partition the data. You can provide a function, or key tuple, to partition by. This is also necessary when you need to aggregate and reduce data. The partitioning guarantees the data in each partition won’t overlap.

The individual steps within the Flow pipeline are detailed below:

Search topics by keyword

Elixir Forum uses Discourse as its forum platform. This provides a public API that serves up JSON. You simply append .json to the request to receive JSON encoded data.

So a search request for “book” (https://elixirforum.com/search?q=book) becomes: https://elixirforum.com/search.json?q=book. Individual topics can be fetched as JSON in a similar way.

I wrote a simple ElixirForum module using HTTPoison to send HTTP requests and parse the body using Poison.

defmodule Learn.Indexer.Sources.ElixirForum do
  use HTTPoison.Base

  alias Learn.Indexer.Cache.HttpRequestCache
  alias Learn.Indexer.Sources.ElixirForum

  @endpoint "https://elixirforum.com"

  defp process_url(path) do
    @endpoint <> path
  end

  def process_response_body(body) do
    body
    |> Poison.decode!
  end

  def cached_request!(url, opts) do
    HttpRequestCache.cached("elixirforum.com/" <> url, fn ->
      rate_limit_access(fn ->
        ElixirForum.get!(url).body
      end, opts)
    end)
  end

  defp rate_limit_access(request, opts \\ []) do
    scale = Keyword.get(opts, :scale, 1_000)
    limit = Keyword.get(opts, :limit, 1)

    case ExRated.check_rate(@endpoint, scale, limit) do
      {:ok, _} ->
        request.()

      {:error, _} ->
        :timer.sleep(1_000)
        rate_limit_access(request, opts)
    end
  end

  defmodule Search do
    @expected_fields ~w(posts topics)

    def query(q, opts) do
      "/search.json?" <> URI.encode_query(q: q)
      |> ElixirForum.cached_request!(opts)
      |> Map.take(@expected_fields)
      |> Enum.map(fn({k, v}) -> {String.to_existing_atom(k), v} end)
    end
  end
end

To be a well behaved HTTP crawler, my access to the site is rate limited using ExRated. This was configured to only allow 1 request every second. This limit applies to all requests through this module, regardless of calling process, as ExRated runs as GenServer with its own state.

Usage:

ElixirForum.Search.query("books", [scale: 1_000, limit: 1])

Responses were cached to disk, so repeated requests would not hit the website.

defmodule Learn.Indexer.Cache.HttpRequestCache do
  use GenServer
  require Logger

  defmodule State do
    defstruct [
      cache_dir: nil
    ]
  end

  def start_link(cache_dir) do
    GenServer.start_link(__MODULE__, %State{cache_dir: cache_dir}, name: __MODULE__)
  end

  def init(%State{cache_dir: cache_dir} = state) do
    File.mkdir_p!(cache_dir)
    {:ok, state}
  end

  def cached(key, operation) do
    case read(key) do
      {:ok, value} -> value
      {:error, :not_found} ->
        value = operation.()
        :ok = cache(key, value)
        value
    end
  end

  def read(key) do
    GenServer.call(__MODULE__, {:read, key})
  end

  def cache(key, value) do
    GenServer.call(__MODULE__, {:cache, key, value})
  end

  def handle_call({:read, key}, _from, %State{} = state) do
    path = cached_path(key, state)

    reply = case File.read(path) do
      {:ok, data} -> {:ok, Poison.decode!(data)}
      {:error, :enoent} -> {:error, :not_found}
      {:error, _} = reply -> reply
    end

    {:reply, reply, state}
  end

  def handle_call({:cache, key, value}, _from, %State{} = state) do
    path = cached_path(key, state)

    File.write!(path, Poison.encode!(value), [:write])

    {:reply, :ok, state}
  end

  defp cached_path(key, %State{cache_dir: cache_dir}) do
    key = String.slice(key, 0, 255)
    path = Path.join(cache_dir, key)
    ensure_dir(path)
    path
  end

  defp ensure_dir(path) do
    path
    |> Path.dirname()
    |> File.mkdir_p!()
  end
end

The HTTP cache module was configured in the application supervisor.

defmodule Learn.Indexer.Application do
  @moduledoc false

  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(Learn.Indexer.Cache.HttpRequestCache, ["fixture/http_cache"])
    ]

    opts = [strategy: :one_for_one, name: Learn.Indexer.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

For testing the search, I again used ExVCR to record the HTTP request/response and replay from disk for subsequent test runs.

defmodule Learn.Indexer.Sources.ElixirForumTest do
  use ExUnit.Case, async: false
  use ExVCR.Mock, adapter: ExVCR.Adapter.Hackney

  alias Learn.Indexer.Sources.ElixirForum

  setup_all do
    HTTPoison.start
    :ok
  end

  describe "search" do
    test "for \"books\"" do
      use_cassette "elixirforum.com/search.json?&q=book", match_requests_on: [:query] do
        response = ElixirForum.Search.query("book", [])

        assert length(response[:posts]) > 0
        assert length(response[:topics]) > 0
      end
    end
  end
end

Parse HTML content

Floki is used to parse the HTML and extract text content using CSS selectors.

defmodule Learn.Indexer.Stages.ParseHtmlContent do
  @moduledoc """
  Parse the HTML post content into paragraphs of text.
  """

  alias Learn.Indexer.{
    Content,
    Post,
  }

  def execute(%Post{content: content} = post) do
    %Post{post |
      content: parse_content_html(content),
    }
  end

  defp parse_content_html(%Content{html: html} = content) do
    paragraphs =
      html
      |> Floki.filter_out("aside")
      |> Floki.filter_out("pre")
      |> Floki.find("p")
      |> Enum.map(&Floki.text/1)
      |> Enum.flat_map(fn p -> String.split(p, "\n") end)
      |> Enum.reject(fn p -> p == "" end)

    %Content{content |
      paragraphs: paragraphs,
    }
  end
end

Each <p> tag is used to identify paragraphs of text, and the text content extracted using Floki.text/1.

Parse sentences

Given a post containing paragraphs of text, I extract the individual sentences. This uses the Essence.Chunker.sentences function from the essence Natural Language Processing library.

defmodule Learn.Indexer.Stages.ParseSentences do
  @moduledoc """
  Parse the paragraphs of text into sentences.
  """

  alias Learn.Indexer.{
    Content,
    Post,
  }

  def execute(%Post{content: content} = post) do
    %Post{post |
      content: parse_sentences(content),
    }
  end

  # chunks the given paragraphs into sentences.
  defp parse_sentences(%Content{paragraphs: paragraphs} = content) do
    sentences =
      paragraphs
      |> Enum.map(&Essence.Chunker.sentences/1)
      |> Enum.flat_map(fn sentences -> sentences end)

    %Content{content |
      sentences: sentences,
    }
  end
end

Extract mentions

To extract individual product mentions:

  1. Split the sentence into lowercase tokens using Essence.Tokenizer.tokenize:
    • “I recommend Elixir in Action” to [“i, “recommend”, “elixir”, “in”, “action”]
  2. Split product name into lowercase tokens:
    • “Elixir in Action” to [“elixir”, “in”, “action”]
  3. Chunk through the tokenised sentence using Enum.chunk, using a count of the length of the search tokens and a step of 1 to include overlaps, looking to match the given product name.
defp mentioned?(sentence, %Resource{name: name}) do
  contains?(tokenize_downcase(sentence), tokenize_downcase(name))
end

def tokenize_downcase(text), do: text |> String.downcase |> Essence.Tokenizer.tokenize

defp contains?(source_tokens, search_tokens) do
  source_tokens
  |> Stream.chunk(length(search_tokens), 1)
  |> Enum.any?(fn chunk -> chunk == search_tokens end)
end

Sentiment analysis

I use Sentient for simple sentiment analysis. It uses the AFINN-111 wordlist to score a sentence as positive, neutral, or negative based upon the presence, and score, of any words from the list.

AFINN is a list of English words rated for valence with an integer between minus five (negative) and plus five (positive)

As an example, the text “I recommend Elixir in Action” is scored positive as the word “recommend” is rated +2. There are 2,477 words in the AFINN-111 word list. This simplistic algorithm provides a reasonable indicator of sentiment for a sentence.

defmodule Learn.Indexer.Stages.SentimentAnalysis do
  @moduledoc """
  Analyse the mention sentence for its sentiment (positive, neutral, or negative).

  Uses the AFINN-111 word list.
  """

  alias Learn.Indexer.{
    Mention,
  }

  @spec execute(Mention.t) :: Mention.t
  def execute(%Mention{} = mention) do
    %Mention{mention |
      sentiment_score: sentiment(mention),
    }
  end

  @override_words %{"free" => 0}

  defp sentiment(%Mention{sentence: sentence}) do
    Sentient.analyze(sentence, @override_words)
  end
end

The sentiment score of the sentence containing a product mention contributes to its overall score.

Score recommendations

I determine a product’s overall score once every mention has been extracted from the text and assigned a sentiment score.

To balance the proportion of positive/negative ratings with the uncertainty from only finding a small number of mentions I apply Laplace smoothing:

score = (upvotes + α) / (upvotes + downvotes + β)

For example, α = 1 and β = 2 means that an item with no votes gets scored 0.5.

Each product has a mix of positive, neutral, and negative mentions. So I assign each neutral mention as +1, and positive mention as +2 to calculate the score.

defmodule Learn.Indexer.Stages.RankRecommendations do
  @moduledoc """
  Combine recommendations for the same resource into a single ranking and score
  """

  alias Learn.Indexer.{
    Ranking,
    Recommendation,
    Resource,
  }

  @spec execute(Resource.t, list(Recommendation.t)) :: Ranking.t
  def execute(resource, recommendations) do
    %Ranking{
      resource: resource,
      recommendations: recommendations,
      score: calculate_score(recommendations),
    }
  end

  # calculate score by using Laplace smoothing on positive, neutral and negative mentions
  defp calculate_score([]), do: 0
  defp calculate_score(recommendations) do
    recommendations
    |> Enum.reduce({0, 0, 0}, &Recommendation.score/2)
    |> score
  end

  defp score({negative, neutral, positive}) do
    upvotes = neutral + (positive * 2)
    downvotes = negative

    (upvotes + 1) / (upvotes + downvotes + 2)
  end
end

There’s research to back up the usage of Laplace smoothing to “.. provide a well justified solution to the problem of user-rating based ranking of items in web applications”.

Crowdsourced recommendations

Composing the individual steps into the single pipeline using Flow shown earlier, I can rank each product by its recommendations.

def index_mentions(keywords, resources, opts \\ []) do
  keywords
  |> Learn.Indexer.rank(resources, opts)
  |> record_rankings(resources)
end

These are persisted into a PostgreSQL database using Ecto. I use the following Ecto query to display the products, ranked by their score. They may be optionally filtered by programming language or experience level to provide the categorised lists show on the website.

defmodule Learn.Recommendations.Queries.RecommendedResources do
  import Ecto.Query

  alias Learn.Recommendations.{
    Resource,
    Score,
  }

  def new do
    from r in Resource,
    left_join: s in assoc(r, :score),
    order_by: [asc: s.rank, asc: r.title],
    preload: [:score]
  end

  def by_experience(query, level) do
    from [r, s] in query,
    where: r.experience_level == ^level
  end

  def by_language(query, language) do
    from [r, s] in query,
    where: r.programming_language == ^language
  end
end

The Phoenix controller builds the query and fetches matching resources using Repo.all/2:

defmodule Learn.Web.ResourceController do
  use Learn.Web.Web, :controller

  alias Learn.Recommendations.Repo,
  alias Learn.Recommendations.Queries.RecommendedResources

  def index(conn, _params) do
    resources =
      RecommendedResources.new
      |> RecommendedResources.by_experience("beginner")
      |> RecommendedResources.by_language("Elixir")
      |> Repo.all()

    render conn, "index.html", resources: resources
  end
end

Summary

That’s a real-world example of using Flow to build a computational parallel pipeline. I had assumed that the simplistic approach to building product recommendations wouldn’t be valuable. Yet, looking at the final recommendations I believe does: popular and highly rated resources appear at the top of the lists. Popularity, from mentions on a forum of users, combined with sentiment can be used to rate a product.

I’ve recently added activity tracking to the site with the intention of including those analytics in scores. So more frequently viewed resources are more highly ranked. I am also considering adding a feedback mechanism for users to flag incorrect reviews and add their own.

Get in touch if you have any feedback and suggestions.


Tuning

I was inspired to look at tuning my Flow pipeline by Tymon Tobolski. He authored two excellent articles on the subject.

By taking his example Progress module and GnuPlot script I was able to visualise and optimise the Flow pipeline.

Collect progress stats

The Learn.Progress module was directly taken from Tymon’s article.

defmodule Learn.Progress do
  @moduledoc """
  Progress stats collector, courtesy of http://teamon.eu/2016/measuring-visualizing-genstage-flow-with-gnuplot/
  """

  use GenServer

  @timeres :millisecond

  # Progress.start_link [:a, :b, :c]
  def start_link(scopes \\ []) do
    GenServer.start_link(__MODULE__, scopes, name: __MODULE__)
  end

  def stop do
    GenServer.stop(__MODULE__)
  end

  # increment counter for given scope by `n`
  #     Progress.incr(:my_scope)
  #     Progress.incr(:my_scope, 10)
  def incr(scope, n \\ 1) do
    GenServer.cast __MODULE__, {:incr, scope, n}
  end

  def init(scopes) do
    File.mkdir_p!("fixture/trace")

    # open "progress-{scope}.log" file for every scope
    files = Enum.map(scopes, fn scope ->
      {scope, File.open!("fixture/trace/progress-#{scope}.log", [:write])}
    end)

    # keep current counter for every scope
    counts = Enum.map(scopes, fn scope -> {scope, 0} end)

    # save current time
    time = :os.system_time(@timeres)

    # write first data point for every scope with current time and value 0
    # this helps to keep the graph starting nicely at (0,0) point
    Enum.each(files, fn {_, io} -> write(io, time, 0) end)

    {:ok, {time, files, counts}}
  end

  def handle_cast({:incr, scope, n}, {time, files, counts}) do
    # update counter
    {value, counts} = Keyword.get_and_update!(counts, scope, &({&1+n, &1+n}))

    # write new data point
    write(files[scope], time, value)

    {:noreply, {time, files, counts}}
  end

  defp write(file, time, value) do
    time = :os.system_time(@timeres) - time
    IO.write(file, "#{time}\t#{value}\n")
  end
end

I used Arjan Scherpenisse’s decorator library to apply the progress collecting stats to each stage function. By decorating each function to be recorded with @decorate progress.

defmodule Learn.Indexer do
  @moduledoc """
  Index mentions of resources from an authoritative source.
  """

  use Learn.ProgressDecorator

  @doc """
  Search for topics matching a given query
  """
  @decorate progress
  def search(query, opts \\ []), do: SearchKeyword.execute(query, opts)
end

The decorator simply increments the counter for the method after execution. It increments by the number of items when the function return a list. Otherwise it increments by 1.

defmodule Learn.ProgressDecorator do
  use Decorator.Define, [progress: 0]

  alias Learn.Progress

  def progress(body, context) do
    quote do
      reply = unquote(body)

      case reply do
        list when is_list(list) -> Progress.incr(unquote(context.name), length(list))
        _ -> Progress.incr(unquote(context.name), 1)
      end

      reply
    end
  end
end

The Learn.Indexer.rank/3 function configures the progress process with the name of each stage before executing the flow. It stops the progress process afterwards to ensure the log files are written and closed.

defmodule Learn.Indexer do
  @moduledoc """
  Index mentions of resources from an authoritative source.
  """

  @doc """
  Rank the given resources based on their mentions in topics matching the given keywords
  """
  @spec rank(list(String.t), list(Resource.t)) :: list(Ranking.t)
  def rank(keywords, resources, opts \\ []) do
    Progress.start_link([
      :search,
      :list_posts,
      :parse_html_content,
      :parse_sentences,
      :extract_mentions,
      :sentiment_analysis,
      :aggregate_mentions_by_author,
      :rank_recommendations,
    ])

    rankings =
      keywords
      |> Flow.from_enumerable(max_demand: 1, stages: 1)
      |> Flow.flat_map(&Indexer.search(&1, opts))
      |> Flow.uniq()
      |> Flow.partition(max_demand: 5)
      |> Flow.flat_map(&Indexer.list_posts(&1, opts))
      |> Flow.partition(key: {:key, :id}, max_demand: 5)
      |> Flow.uniq_by(&(&1.id))
      |> Flow.map(&Indexer.parse_html_content/1)
      |> Flow.map(&Indexer.parse_sentences/1)
      |> Flow.flat_map(&Indexer.extract_mentions(&1, resources))
      |> Flow.map(&Indexer.sentiment_analysis/1)
      |> Flow.partition(key: {:key, :resource}, max_demand: 5)
      |> Flow.group_by(&(&1.resource))
      |> Flow.map(fn {resource, mentions} -> {resource, Indexer.aggregate_mentions_by_author(mentions)} end)
      |> Flow.map(fn {resource, recommendations} -> Indexer.rank_recommendations(resource, recommendations) end)
      |> Enum.to_list()
      |> Enum.sort_by(&(&1.score), &>=/2)

    Progress.stop()

    rankings
  end
end

Visualising the flow

After running the indexer with the stage functions decorated with the @progress tag I could plot the flow progress over time.

Initial flow

Initially it took around 33 seconds to run the flow. This is with a fully populated HTTP cache, so no external requests were being made.

Initial flow

Optimised flow

I adjusted the max_demand and stages options for some of the Flow steps, as shown above. This reduced the running time of the overall flow down from 33 to 26 seconds.

Optimised flow

The following GnuPlot script was used to generate the charts. It displays the progress of each stage by percentage complete. The final ranked recommendations are shown on a separate y-axis.

# plot.gp
set terminal png font "Arial,10" size 700,500
set output "progress.png"

set title "Elixir Flow processing progress over time"
set xlabel "Time (ms)"

set ylabel "Progress (%)"
set y2label "Rankings"
set ytics nomirror
set yrange [0:100]
set format y '%2.0f%%'
set y2tics

set key top left # put labels in top-left corner

# limit x range to 35.000 ms instead of dynamic one - needed when generating graphs that will be later compared visually
set xrange [0:35000]

plot  "trace/progress-search.log"                         using ($1):($2/1249*100)  with steps  axes x1y1 ls 1 title "Search topics",\
      "trace/progress-search.log"                         using ($1):($2/1249*100)  with points axes x1y1 ls 1 notitle,\
      "trace/progress-list_posts.log"                     using ($1):($2/14974*100) with lines  axes x1y1 ls 2 title "List posts",\
      "trace/progress-parse_html_content.log"             using ($1):($2/6780*100)  with lines  axes x1y1 ls 3 title "Parse HTML",\
      "trace/progress-parse_sentences.log"                using ($1):($2/6780*100)  with lines  axes x1y1 ls 4 title "Parse sentences",\
      "trace/progress-extract_mentions.log"               using ($1):($2/515*100)   with lines  axes x1y1 ls 5 title "Extract mentions",\
      "trace/progress-sentiment_analysis.log"             using ($1):($2/515*100)   with lines  axes x1y1 ls 6 title "Sentiment analysis",\
      "trace/progress-aggregate_mentions_by_author.log"   using ($1):($2/314*100)   with lines  axes x1y1 ls 7 title "Aggregate mentions by author",\
      "trace/progress-rank_recommendations.log"                                     with steps  axes x1y2 ls 8 title "Rank",\
      "trace/progress-rank_recommendations.log"                                     with points axes x1y2 ls 8 notitle