Using async iteration natively in Node.js

April 02, 2018 0 Comments

Using async iteration natively in Node.js



Node.js v10 will be released on April 30, 2018. In that version, there will be experimental support for asynchronously iterating over readable streams. This blog post explains how that works.

Reading streams asynchronously  #

In this section, we examine two ways of reading data from a stream asynchronously: via callbacks and via asynchronous iteration.

Reading asynchronously via callbacks  #

To read the contents of a file asynchronously, you can use callbacks, as follows.

First, you create a readable stream:

const readStream = fs.createReadStream(inputFilePath, { encoding: 'utf8', highWaterMark: 1024 });
  • The option encoding determines how the stream delivers its content:
    • If it is null, the stream delivers buffers.
    • If it is a string such as 'utf8', it delivers strings in the specified encoding.

  • The option highWaterMark determines the maximum size (in bytes) of each delivered buffer or string.

Second, you receive the data by listening to the events data and end:

readStream.on('data', (chunk) => { console.log('>>> '+chunk);
readStream.on('end', () => { console.log('### DONE ###');

Reading asynchronously via async iteration  #

Starting with Node.js v10, you can also use asynchronous iteration to read a file asynchronously. Readable streams have a property whose key is Symbol.asyncIterator, which enables the for-await-of loop to iterate over their chunks. However, this kind of loop is only available within async functions and async generators. That’s why we have to put the code inside an async function:

async function main(inputFilePath) { const readStream = fs.createReadStream(inputFilePath, { encoding: 'utf8', highWaterMark: 1024 }); for await (const chunk of readStream) { console.log('>>> '+chunk); } console.log('### DONE ###');

Processing async iterables via async generators  #

So far, we have seen how you can use async functions as sinks of async iterables. With async generators, you can go one step further: They can be the source of an async iterable. Or they can transform an async iterable (as both sink and source). The latter works as follows. The async generator:

  • Consumes an async iterable via for-await-of.
  • Returns an async iterable and feeds data into it via yield.

That is, if you chain async generators, you can process input similarly to Unix piping. Let’s look at a pipe with two async generators.

Generator #1: from chunks to lines  #

The following function takes an async iterable over strings and returns an async iterable over lines:

async function* chunksToLines(chunksAsync) { let previous = ''; for await (const chunk of chunksAsync) { previous += chunk; let eolIndex; while ((eolIndex = previous.indexOf('\n')) >= 0) { const line = previous.slice(0, eolIndex+1); yield line; previous = previous.slice(eolIndex+1); } } if (previous.length > 0) { yield previous; }

Generator #2: from lines to numbered lines  #

This function takes lines and numbers them:

async function* numberLines(linesAsync) { let counter = 1; for await (const line of linesAsync) { yield counter + ': ' + line; counter++; }

Connecting the generators  #

The main() function reads a text file via a readable stream and applies the two async generators to it, therefore numbering the lines in that file.

async function main() { const inputFilePath = process.argv[2]; const readStream = fs.createReadStream(inputFilePath, { encoding: 'utf8', highWaterMark: 1024 }); printAsyncIterable(numberLines(chunksToLines(readStream)));

One intriguing trait of processing data asynchronously is that the processing steps become intertwined: As soon as the first chunk arrives, it is split into lines and the lines are numbered. Therefore, the code can handle very large files, because it processes them in chunks of 1024 bytes.

Conclusion  #

Having async iterables in Node.js is great. It’s a considerable improvement over callback-based processing. Based on async iteration, we can now have combinators such as .map() and .filter() for asynchronous data.

Further reading  #

Tag cloud