Elixir Phoenix GenStage Flow
Flow allows developers to express computations on collections, similar to the Enum and Stream modules, although computations will be executed in parallel using multiple GenStages.
The canonical example of Flow, taken from the GitHub repository, demonstrates how to count words in a document in parallel:
I’ve wanted to build something using Flow since José Valim introduced the concepts behind it in his Elixir Conf 2016 keynote. I initially thought of building a product recommendation tool using crowdsourced reviews. As it would include both IO intensive stages, such as HTTP requests, and CPU intensive stages, like sentiment analysis. A good candidate for parallel processing and suitable for using Flow.
This lead me to create Start learning Elixir.
- Creating a product recommendation tool
- Crowdsourced recommendations
Start learning Elixir
Start learning Elixir answers the question: what are the best resources for learning Elixir?
Crowdsourcing is used to locate and rank relevant learning materials, including books, eBooks, screencasts, and tutorials.
The ranking algorithm uses a combination of popularity and sentiment analysis. Popular and highly rated material scores well. Scores are a combination of the positive, neutral, and negative mentions from the popular Elixir Forum.
Creating a product recommendation tool
I created an Elixir umbrella application containing three apps:
indexerused to fetch external content, using HTTP requests, and rank products.
recommendationscontaining the domain Ecto schema and queries: resources, reviews, scores, authors.
weba Phoenix front-end to display the recommended products.
Flow is used in the
indexer app; that’s where the processing is done. I jotted down the high-level steps, then built separate modules for each stage of the pipeline. Each stage was built and unit tested in isolation. Then connected together and orchestrated using the appropriate
The following end-to-end process is used to build the product recommendations:
- Manually create a list of resources to be recommended: relevant books, screencasts, websites, tutorials.
- Define a set of input keywords:
- e.g. “learn”, “book”, “books”, “eBook”, “screencast”, “tutorial”, “Programming Elixir”
- Search the Elixir Forum by keyword to return a list of topics.
- Fetch all the posts in a topic:
- Parse post content as HTML.
- Split content by
<p>tags into sentences.
- Extract text content.
- Locate mentions of resources from the text content.
- Use sentiment analysis on each sentence to determine whether it is positive, neutral, or negative.
- Aggregate mentions by the same author.
- Score and rank resources by their mentions, weighted by frequency and sentiment, using Laplace smoothing.
This translates to the following Elixir code using
You will notice the additional
Flow.partition functions included within certain stages of the pipeline. Partitioning ensures that data is mapped to the same process to minimise message passing. A hash function is used to partition the data. You can provide a function, or key tuple, to partition by. This is also necessary when you need to aggregate and reduce data. The partitioning guarantees the data in each partition won’t overlap.
The individual steps within the Flow pipeline are detailed below:
- Search topics by keyword
- Parse HTML content
- Parse sentences
- Extract mentions
- Sentiment analysis
- Score recommendations
Search topics by keyword
So a search request for “book” (
https://elixirforum.com/search.json?q=book. Individual topics can be fetched as JSON in a similar way.
To be a well behaved HTTP crawler, my access to the site is rate limited using ExRated. This was configured to only allow 1 request every second. This limit applies to all requests through this module, regardless of calling process, as ExRated runs as GenServer with its own state.
Responses were cached to disk, so repeated requests would not hit the website.
The HTTP cache module was configured in the application supervisor.
For testing the search, I again used ExVCR to record the HTTP request/response and replay from disk for subsequent test runs.
- Read more: HTTP unit tests using ExVCR
Parse HTML content
Floki is used to parse the HTML and extract text content using CSS selectors.
<p> tag is used to identify paragraphs of text, and the text content extracted using
Given a post containing paragraphs of text, I extract the individual sentences. This uses the
Essence.Chunker.sentences function from the essence Natural Language Processing library.
To extract individual product mentions:
- Split the sentence into lowercase tokens using
- “I recommend Elixir in Action” to [“i, “recommend”, “elixir”, “in”, “action”]
- Split product name into lowercase tokens:
- “Elixir in Action” to [“elixir”, “in”, “action”]
- Chunk through the tokenised sentence using
Enum.chunk, using a count of the length of the search tokens and a step of 1 to include overlaps, looking to match the given product name.
AFINN is a list of English words rated for valence with an integer between minus five (negative) and plus five (positive)
As an example, the text “I recommend Elixir in Action” is scored positive as the word “recommend” is rated +2. There are 2,477 words in the AFINN-111 word list. This simplistic algorithm provides a reasonable indicator of sentiment for a sentence.
The sentiment score of the sentence containing a product mention contributes to its overall score.
I determine a product’s overall score once every mention has been extracted from the text and assigned a sentiment score.
To balance the proportion of positive/negative ratings with the uncertainty from only finding a small number of mentions I apply Laplace smoothing:
score = (upvotes + α) / (upvotes + downvotes + β)
For example, α = 1 and β = 2 means that an item with no votes gets scored 0.5.
Each product has a mix of positive, neutral, and negative mentions. So I assign each neutral mention as +1, and positive mention as +2 to calculate the score.
There’s research to back up the usage of Laplace smoothing to “.. provide a well justified solution to the problem of user-rating based ranking of items in web applications”.
- How To Sort By Average Rating
- How to Count Thumb-Ups and Thumb-Downs: User-Rating based Ranking of Items from an Axiomatic Perspective (PDF)
Composing the individual steps into the single pipeline using Flow shown earlier, I can rank each product by its recommendations.
These are persisted into a PostgreSQL database using Ecto. I use the following Ecto query to display the products, ranked by their score. They may be optionally filtered by programming language or experience level to provide the categorised lists show on the website.
The Phoenix controller builds the query and fetches matching resources using
That’s a real-world example of using Flow to build a computational parallel pipeline. I had assumed that the simplistic approach to building product recommendations wouldn’t be valuable. Yet, looking at the final recommendations I believe does: popular and highly rated resources appear at the top of the lists. Popularity, from mentions on a forum of users, combined with sentiment can be used to rate a product.
I’ve recently added activity tracking to the site with the intention of including those analytics in scores. So more frequently viewed resources are more highly ranked. I am also considering adding a feedback mechanism for users to flag incorrect reviews and add their own.
Get in touch if you have any feedback and suggestions.
I was inspired to look at tuning my Flow pipeline by Tymon Tobolski. He authored two excellent articles on the subject.
By taking his example
Progress module and GnuPlot script I was able to visualise and optimise the Flow pipeline.
Collect progress stats
Learn.Progress module was directly taken from Tymon’s article.
I used Arjan Scherpenisse’s decorator library to apply the progress collecting stats to each stage function. By decorating each function to be recorded with
The decorator simply increments the counter for the method after execution. It increments by the number of items when the function return a list. Otherwise it increments by 1.
Learn.Indexer.rank/3 function configures the progress process with the name of each stage before executing the flow. It stops the progress process afterwards to ensure the log files are written and closed.
Visualising the flow
After running the indexer with the stage functions decorated with the
@progress tag I could plot the flow progress over time.
Initially it took around 33 seconds to run the flow. This is with a fully populated HTTP cache, so no external requests were being made.
I adjusted the
stages options for some of the Flow steps, as shown above. This reduced the running time of the overall flow down from 33 to 26 seconds.
The following GnuPlot script was used to generate the charts. It displays the progress of each stage by percentage complete. The final ranked recommendations are shown on a separate y-axis.
# plot.gp set terminal png font "Arial,10" size 700,500 set output "progress.png" set title "Elixir Flow processing progress over time" set xlabel "Time (ms)" set ylabel "Progress (%)" set y2label "Rankings" set ytics nomirror set yrange [0:100] set format y '%2.0f%%' set y2tics set key top left # put labels in top-left corner # limit x range to 35.000 ms instead of dynamic one - needed when generating graphs that will be later compared visually set xrange [0:35000] plot "trace/progress-search.log" using ($1):($2/1249*100) with steps axes x1y1 ls 1 title "Search topics",\ "trace/progress-search.log" using ($1):($2/1249*100) with points axes x1y1 ls 1 notitle,\ "trace/progress-list_posts.log" using ($1):($2/14974*100) with lines axes x1y1 ls 2 title "List posts",\ "trace/progress-parse_html_content.log" using ($1):($2/6780*100) with lines axes x1y1 ls 3 title "Parse HTML",\ "trace/progress-parse_sentences.log" using ($1):($2/6780*100) with lines axes x1y1 ls 4 title "Parse sentences",\ "trace/progress-extract_mentions.log" using ($1):($2/515*100) with lines axes x1y1 ls 5 title "Extract mentions",\ "trace/progress-sentiment_analysis.log" using ($1):($2/515*100) with lines axes x1y1 ls 6 title "Sentiment analysis",\ "trace/progress-aggregate_mentions_by_author.log" using ($1):($2/314*100) with lines axes x1y1 ls 7 title "Aggregate mentions by author",\ "trace/progress-rank_recommendations.log" with steps axes x1y2 ls 8 title "Rank",\ "trace/progress-rank_recommendations.log" with points axes x1y2 ls 8 notitle