Pipes and Filters to cure Node.js async woes

Using the Pipes and Filters architectural pattern for complex processing within Node.js applications.


  JavaScript Node.js


How can we perform complex processing on an input data structure, while maintaining independence and flexibility?

The Pipes and Filters pattern provides one possible solution. Described in the Enterprise Integration Patterns book as a method to “divide a larger processing task into a sequence of smaller, independent processing steps (Filters) that are connected by channels (Pipes).

Each filter exposes a very simple interface: it receives messages on the inbound pipe, processes the message, and publishes the results to the outbound pipe. The pipe connects one filter to the next, sending output messages from one filter to the next.

The canonical example given to illustrate the pattern covers an order processing pipeline. An incoming order follows a set of steps, or filters, whereby it is decrypted, authenticated and finally de-duplicated to result in a clean order.

The pipes and filters pipeline provides the functionality to transform a stream of possibly duplicated, encrypted messages containing extra authentication data into a stream of unique, simple plain-text order messages.

Pipes and Filters example

Independence and flexibility

Each of the processing steps in the pipeline are independent from one another. As such, they can be developed and tested in isolation. Filters should have a single responsibility, so testing each becomes straight forward.

Since they all follow the same simple interface they can be composed into different solutions. Individual filters may be added, omitted or be rearranged into a new sequence, within reason, without any modification to the filters themselves.

A consistent interface allows us to create decorators around each filter to add cross-cutting concerns. Examples include metrics (timing the duration of each filter), logging, transactions and raising events or notifications to observers for progress feedback.

Finally, the separation of a complex chain of behaviour into smaller units processing messages allows the possibility to use asynchronous messaging between filters. With this technique, multiple messages could be processed in parallel and the work split across many nodes.

Pipelines in Node.js

One further benefit of this approach in the world of Node.js and asynchronous callbacks is by replacing the Pyramid of Doom with much clearer code. The pipeline approach allows many smaller, single purpose functions to be combined together in an easy to follow flow. Each filter may be synchronous or asynchronous, but the orchestration remains consistent.

Using the pipes-and-filters npm package I have authored, the order processing example now becomes.

// create a pipeline for processing individual orders
var pipeline = Pipeline.create('order processing');

// register the filters
pipeline.use(decrypt);
pipeline.use(authenticate);
pipeline.use(deDuplicate);

// exit the pipeline if the message is a duplicate
pipeline.breakIf(function(input) { return input.exists; });

// execute the pipeline for the given input message
pipeline.execute(message, function completed(err, result) {
  if (err) {
    // error handling
    return console.error(err);
  }

  // completed, successfully
  console.log('completed', result);
});

Or, using the fluent API this can be more succinctly written.

Pipeline.create('order processing')
  .use(decrypt)
  .use(authenticate)
  .use(deDuplicate)
  .breakIf(function(input) { return input.exists; })
  .execute(message, function completed(err, result) {
    // error or success handler
  });

Filters

Each filter follows exactly the same interface, receiving two arguments:

  • input is the message or object to be processed. Passed in via the pipeline.execute call, or from the preceding filter’s output.
  • next is a Node-style callback to indicate the filter has completed or encountered an error. An error will halt the pipeline and no further filters will be executed.
var filter = function(input, next) {
  // ... process input

  // error handling
  if (err) {
    return next('Failed to parse foo');  // return an error and stop the pipeline processing
  }

  // modify input (or clone object)
  input.foo = 'bar';

  next(null, input);  // success, pass updated input object to next filter
};

Getting started

Follow the steps below if you are interested in using this approach within your own Node.js application.

Install the pipes-and-filters package using npm.

npm install --save pipes-and-filters

Create, configure and execute a pipeline.

1. Import the Pipeline class by requiring the pipes-and-filters package.

var Pipeline = require('pipes-and-filters');

2. Create a pipeline, with an optional name.

var pipeline = Pipeline.create('order processing');

3. Create a filter function, that takes an input object and a Node style next callback to indicate the filter has completed or errored.

var filter = function(input, next) {
  // continue to next filter
  next(null, input);
};

4. Register one or more filters.

pipeline.use(decrypt);
pipeline.use(authenticate);
pipeline.use(deDuplicate);

Youn may optionally provide the context when the function is called.

pipeline.use(foo.filter, foo)

5. Add an error handler.

pipeline.once('error', function(err) {
  console.error(err);
});

The pipeline will stop processing on any filter error.

6. Add an end handler to be notified when the pipeline has completed.

pipeline.once('end', function(result) {
  console.log('completed', result);
});

7a. Execute the pipeline for a given input.

pipeline.execute(input);

With this style, an error event handler is required. Otherwise the default action on any filter error is to print a stack trace and exit the program.

7b. Execute the pipeline with a Node-style error/result callback.

pipeline.execute(input, function(err, result) {
  if (err) {
    console.error(err);
    return;
  }

  console.log('completed', result);
});

With this style, an error and/or end event handler are not required.

Further reading