In my first piece about node.js streams that I wrote around three years ago, I analyzed the whole stream module. I tried to describe how all of the APIs can be implemented and consumed. You can find that blog post here on this blog.

The conclusion - working with node streams is really hard and confusing. Although, after some years now several new APIs have been implemented. Here I will describe my way to work with streaming data in node.js in a sane and practical way.

This blog post is describing advanced techniques for dealing with node streams. If you are new to the topic, start here - Working With Node.js Streams.

What streams actually represent

Node.js Stream module simply implements several interfaces for consuming temporal data - data available through time, chunk by chunk. So streams are just a way to model that data, and there are other ways to go about it.

Another important thing to notice is that this temporal data is consumed via a pull based mechanism (some older stream APIs can be push based, but those aren't used a lot). 'Pull based' means that a consumer dictates the speed of which the data is sent/consumed. Opposite way of consumption is, like you may have guessed, push based mechanism. 'Push based' means that the producer is in charge of the speed of which data is sent to the consumer. A good and very well known example would be observables.

Ok. So, we now know that streams are just a way to deal with the temporal data in a pull based manner. What do we do with that fact?

Well. There's another way to deal with the mentioned data modeling. It's more modern and more practical. We can use Async Iterators!

Async iterators

This is not a tutorial about async iterators, but I will try to explain them briefly. Async iterators are used to iterate over a set of data.

Iteration, in computer science, is often used to describe a process where an iterator consumes the data available. Iterator is in charge of the speed of consumption. Sounds familiar? Iteration is just another way to model pull based consumption of data.

One can notice that besides async iterators, in JavaScript there were already things called just - iterators (appeared in ES6). They are pretty much similar in nature, and it's going to be helpful if you are already familiar with them. Async iterators just add asynchronicity in the mix, via Promises.

Both iterators and async iterators must define the iteration protocols (the iterable and the iterator protocol). Both have very similar interfaces but in case of async iterators - each return value from the calls to the iterator methods is wrapped in a promise. Also, an async iterable needs to implement the [Symbol.asyncIterator] method, as opposed to the [Symbol.iterator] method for the plain iterables.

Similarly as for the plain iterators, async iterators can be consumed manually, but that's usually not a way to go. In order to automatically consume a plane iterator, there is the for of statement. For the async iterators, there is a new statement added to support the automatic iteration for await...of. Here is a brief example (note: it uses ES modules and the top-level await support):

import { EventEmitter, on } from 'events';

const asyncCounter = new EventEmitter();

let counter = 0;
setInterval(() => {
  asyncCounter.emit('increment', counter++);
}, 1000);

for await (const [arg] of on(asyncCounter, 'increment')) {
  console.log(arg);
}

// Prints an incrementing integer every second:
// 1
// 2
// 3
// ...

The on(asyncCounter, 'increment') function call actually returns an async iterable, and that's why this for await statement works.

We covered automatic consumption of async iterators, but what about the generation? There are both manual and automatic path here as well. The manual way to create an async iterator is rarely needed so I'm going to skip it.

For the automatic creation/generation of async iterables, again, there are similarities in comparison with the plain iterators. To generate plain iterators, we can use a generator function - function* (). For the async iterators there is a new function subtype, the async generator function - async function* (). In those functions both yield and await keywords can be used. Here's an example:

import { EventEmitter, on } from 'events';
import { setTimeout } from 'timers/promises';

const myAsyncGenerator = async function* () {
  let counter = 0;

  while (true) {
    // simulate some async process
    await setTimeout(1_000);
    yield counter++;
  }
};

for await (const currentCounter of myAsyncGenerator()) {
  console.log(currentCounter);
}

// Prints an incrementing integer every second:
// 1
// 2
// 3
// ...

Alternatives for consuming streams

When we talk about the node.js defined streams, the best way to deal with them is - just use them. I know this is probably not what you've expected to see, after I talked about async iterators, so let me explain.

I think that async iterators will go great in a different use case, which I'm going to describe in the next section - (alternative to) implementing new streams. As for the node provided streams - most of the times it's hard to replace them with async iterators (examples: fs.createReadStream(), fs.createWriteStream()). But actually that's totally fine in my opinion, because the consumption of streams is no longer that difficult since the great pipeline() method has been released (since node v10). That method is great because of the simplified way it handles errors for us nicely - which wasn't the case before (looking at you readable.pipe()).

My point is, if we encounter already defined streams while coding in node, that's totally fine, we don't have to bother converting them to async iterators. The pipeline() method is the key, and it works great with both streams and async generators.

Let's see an example:

import * as Fs from 'fs';
import * as Zlib from 'zlib';
import { pipeline } from 'stream/promises';

const nodeProvidedFileReadable = Fs.createReadStream('./file.txt');
const nodeProvidedFileWritable = Fs.createWriteStream('./gziped-file.gz');
const nodeProvidedCompressTransformable = Zlib.createGzip();

// compresses the sibling file.txt and saves it as gziped-file.gz
await pipeline(
  nodeProvidedFileReadable,
  nodeProvidedCompressTransformable,
  nodeProvidedFileWritable,
);

Alternatives for implementing streams

The time were streams are really hard to work and reason with is when we want to implement a custom readable, writable or transform stream. That is the place where I think the async generators provide a really good alternative.

Let's try to implement each type of stream using async generators. The following examples may look a little contrived - keep in mind they are here to serve as an example.

A lot of the stream APIs that deal with async iterators are in the experimental stage in the latest node version (i.e. node v19.2.0 as of this writing).

Replacing a writable stream

One of possible use cases for a writable stream is wanting to direct a stream of data to a custom logger we use. But let's see how can we do that with async generators instead.

import { pipeline } from 'stream/promises';
import { setTimeout } from 'timers/promises';

// async function acting like a writable stream
const chunkLogger = async function (source) {
  for await (const chunk of source) {
    // simulate latency
    await setTimeout(1_000);
    // log to stdout for demonstration purposes
    console.log({
      message: chunk.toString(),
      timestamp: new Date().toDateString(),
    });
  }
};

// everything typed in the terminal will be forwarded to the logger (stdout)
await pipeline(process.stdin, chunkLogger);

To be easier to test this snippet, process.stdin is redirected to the logger, so everything you type in your terminal will be forwarded to the async generator i.e. writable.

As we can see, this is just a simple function - nothing else. We don't need to look at the docs and see what method must a Stream.Writable class implement. And everything seamlessly works because pipeline() works great with async generators.

Replace a readable stream

To demonstrate a readable stream alternative, let's create an infinite counter.

import { pipeline } from 'stream/promises';
import { setTimeout } from 'timers/promises';

// async generator function acting like a readable stream
const secondsGenerator = async function* () {
  let counter = 0;
  while (true) {
    await setTimeout(1_000);
    yield `${counter.toString()}\n`;
    counter++;
  }
};

await pipeline(secondsGenerator, process.stdout);

// prints:
// 1 s
// 2 s
// 3 s
// ...

In order to replace a readable, we can't use an ordinary async function like in the previous example. We need to use async generator function (marked by the * on line 163). Again, it looks way cleaner than if we would go and implement a standard Stream.Readable.

Replace a transform stream

The place where I find the async generators most useful is in replacing various transform streams!

Both readable and writable streams in majority of use cases just represent files (see the code snippet in the previous section about consuming streams). But the custom transform streams are often needed, and people struggle implementing them properly.

That shouldn't be the problem anymore though. Let's take a look at how we can implement and combine several transform streams via async generator functions:

import { pipeline } from 'stream/promises';

const colorsGenerator = async function* () {
  yield 'yellow';
  yield 'orange';
  yield 'red';
  yield 'blue';
  yield 'purple';
  yield 'green';
};

const wordUpperCaser = async function* (source) {
  for await (const chunk of source) {
    yield chunk.toUpperCase();
  }
};

const letterUnderscorer = async function* (source) {
  for await (const chunk of source) {
    yield chunk.split('').join('_');
  }
};

const chunkLogger = async function (source) {
  for await (const chunk of source) {
    console.log(chunk);
  }
};

await pipeline(colorsGenerator, wordUpperCaser, letterUnderscorer, chunkLogger);

// prints:
// Y_E_L_L_O_W
// O_R_A_N_G_E
// R_E_D
// B_L_U_E
// P_U_R_P_L_E
// G_R_E_E_N

Summary

In this article, we discovered an alternative way to model stream-like data and avoid the difficulties in dealing with the node.js stream API.

Personally, I think that async iterators combined with the top-level await provide a really modern and readable code.