Sitemap

How ReadStream responds to back pressure

3 min readSep 19, 2021
Photo of a brick house in England by a stream by Shelley Linenbach

Consider a scenario in which a NodeJS program writes a large amount of data to a file. Node Streams are leveraged for efficiency and convenience, since there are many types of output streams available. In our scenario, the data isn’t immediately available via a stream, so we need to wrap the data source in a ReadableStream. For example:

const source = new SomeDataProvider();
const dest = require('fs').createWriteStream('out.txt');
const input = new Readable();
input.pipe(dest);
while (await source.hasData()) { input.push( await source.next() ); }

The problem with the above program is that input.push() is called whenever the Promise returned by source.next() resolves. If data arrives faster than it can be written to the file, the data sent to push() will be queued, consuming memory until the program crashes with an out of memory error.

This is where highWaterMark comes in.

const input = new ReadableStream({ highWaterMark: 50 });

This instructs the stream to queue up to fifty objects. But there’s nothing stopping the program from calling input.push(). We need back pressure.

Back pressure informs the caller of push() when it takes longer to write to ‘out.txt’ than it does to read from ‘source.’ When the high water mark is reached, the program must wait instead of calling source.next().

Specifying highWaterMark isn’t enough to add back pressure to programs that call push().

When the highWaterMark is reached, push() returns a falsy value. That tells the program when to wait. The queue is full. Whatever is calling push() must wait.

ReadableStream._read() is called to request data after an object has been removed from the queue. _read() can therefore be used signal when to resume calling push().

Alternatively, the program could wait for the drain event to fire on the writable stream. This approach, however, is less efficient as it causes long pauses and reduces throughput. It’s better to call push() when the queue has an available slot instead of waiting for the entire queue to be processed.

See also: How to use drain event of stream.Writable in Node.js.

The above code should pause in the while loop when push() returns false. The loop should later resume when ReadableStream._read() is called.

A Promise can be used to wait for _read() to be called after push() returns false. _read() would invoke the Promise’s resolve function.

/**
* A custom PassThrough stream for piping any sort of data in a
* back-pressure-friendly way using Promises.
*
* Derives from PassThrough instead of ReadableStream; otherwise,
* piping to files etc. doesn’t work -- for example, push(null)
* doesn’t cause finished events to fire.
*
* Usage:
* - Call push(obj) to write obj to the stream
* - When push() returns false, call wait() to avoid exceeding the buffer
* - Call push(null) to indicate EOF
*/
class ResumePush extends stream.PassThrough {
/**
* Specify at least {highWaterMark} (number of objects)
*/
constructor(options) {
super({ ...options, objectMode: true });
}

// eslint-disable-next-line no-underscore-dangle
_read() {
const {_resume} = this;
if (!_resume) {
this.ready = true;
return;
}
this._resume = undefined;
_resume();
}

/**
* Returns a Promise that is settled when data has been flushed and push()
* can be called again
* @returns {Promise<undefined>}
*/
async wait() {
if (this._resume) {
// This is a bug. If it can happen in the wild (eg lost
// network connection), multiplexing needs to be added.
throw new Error(‘not ready’);
}
if (this.ready) this.ready = false;
else return new Promise((resume) => { this._resume = resume; });
}
}

You might be able to do better than this code by reading Creating your own custom readable stream. Here is some sample code that uses ResumePush:

const dataSource = new ResumePush({highWaterMark: 50});
const dest = require('fs').createWriteStream('out.txt');
dataSource.pipe(dest);

for (const line of ['one', 'two', null /* eof */]) {
if (!dataSource.push(line)) await (dataSource.wait());
}

--

--

Terris Linenbach
Terris Linenbach

Written by Terris Linenbach

Coder since 1980. Always seeking the Best Way. CV: https://terris.com/cv

No responses yet