Pipes and Filters to cure Node.js async woes
Using the Pipes and Filters architectural pattern for complex processing within Node.js applications.
JavaScript Node.js
How can we perform complex processing on an input data structure, while maintaining independence and flexibility?
The Pipes and Filters pattern provides one possible solution. Described in the Enterprise Integration Patterns book as a method to “divide a larger processing task into a sequence of smaller, independent processing steps (Filters) that are connected by channels (Pipes).”
Each filter exposes a very simple interface: it receives messages on the inbound pipe, processes the message, and publishes the results to the outbound pipe. The pipe connects one filter to the next, sending output messages from one filter to the next.
The canonical example given to illustrate the pattern covers an order processing pipeline. An incoming order follows a set of steps, or filters, whereby it is decrypted, authenticated and finally de-duplicated to result in a clean order.
The pipes and filters pipeline provides the functionality to transform a stream of possibly duplicated, encrypted messages containing extra authentication data into a stream of unique, simple plain-text order messages.
Independence and flexibility
Each of the processing steps in the pipeline are independent from one another. As such, they can be developed and tested in isolation. Filters should have a single responsibility, so testing each becomes straight forward.
Since they all follow the same simple interface they can be composed into different solutions. Individual filters may be added, omitted or be rearranged into a new sequence, within reason, without any modification to the filters themselves.
A consistent interface allows us to create decorators around each filter to add cross-cutting concerns. Examples include metrics (timing the duration of each filter), logging, transactions and raising events or notifications to observers for progress feedback.
Finally, the separation of a complex chain of behaviour into smaller units processing messages allows the possibility to use asynchronous messaging between filters. With this technique, multiple messages could be processed in parallel and the work split across many nodes.
Pipelines in Node.js
One further benefit of this approach in the world of Node.js and asynchronous callbacks is by replacing the Pyramid of Doom with much clearer code. The pipeline approach allows many smaller, single purpose functions to be combined together in an easy to follow flow. Each filter may be synchronous or asynchronous, but the orchestration remains consistent.
Using the pipes-and-filters npm package I have authored, the order processing example now becomes.
Or, using the fluent API this can be more succinctly written.
Filters
Each filter follows exactly the same interface, receiving two arguments:
input
is the message or object to be processed. Passed in via thepipeline.execute
call, or from the preceding filter’s output.next
is a Node-style callback to indicate the filter has completed or encountered an error. An error will halt the pipeline and no further filters will be executed.
Getting started
Follow the steps below if you are interested in using this approach within your own Node.js application.
Install the pipes-and-filters
package using npm.
npm install --save pipes-and-filters
Create, configure and execute a pipeline.
1. Import the Pipeline
class by requiring the pipes-and-filters
package.
var Pipeline = require('pipes-and-filters');
2. Create a pipeline, with an optional name.
var pipeline = Pipeline.create('order processing');
3. Create a filter function, that takes an input object and a Node style next
callback to indicate the filter has completed or errored.
4. Register one or more filters.
Youn may optionally provide the context when the function is called.
pipeline.use(foo.filter, foo)
5. Add an error
handler.
pipeline.once('error', function(err) {
console.error(err);
});
The pipeline will stop processing on any filter error.
6. Add an end
handler to be notified when the pipeline has completed.
pipeline.once('end', function(result) {
console.log('completed', result);
});
7a. Execute the pipeline for a given input.
pipeline.execute(input);
With this style, an error
event handler is required. Otherwise the default action on any filter error is to print a stack trace and exit the program.
7b. Execute the pipeline with a Node-style error/result callback.
With this style, an error
and/or end
event handler are not required.
Further reading
- pipes-and-filters npm package (npmjs.org)
- node-pipes-and-filters source code on GitHub (github.com)
- Messaging as a programming model