Skip to content
154 changes: 93 additions & 61 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,75 @@ const nop = () => {};

const { errorOrDestroy } = destroyImpl;

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
const kEndEmitted = 1 << 2;
const kReading = 1 << 3;
const kConstructed = 1 << 4;
const kSync = 1 << 5;
const kNeedReadable = 1 << 6;
const kEmittedReadable = 1 << 7;
const kReadableListening = 1 << 8;
const kResumeScheduled = 1 << 9;
const kErrorEmitted = 1 << 10;
const kEmitClose = 1 << 11;
const kAutoDestroy = 1 << 12;
const kDestroyed = 1 << 13;
const kClosed = 1 << 14;
const kCloseEmitted = 1 << 15;
const kMultiAwaitDrain = 1 << 16;
const kReadingMore = 1 << 17;
const kDataEmitted = 1 << 18;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
return {
enumerable: false,
get() { return (this.state & bit) !== 0; },
set(value) {
if (value) this.state |= bit;
else this.state &= ~bit;
},
};
}
Comment on lines +107 to +116
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You save some booleans but now you need more memory to store these functions and a more complicated property descriptor.

I think this will use more memory than the older version

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@H4ad ideally we wouldn't need the functions but note these functions are stored once per all streams vs. once per stream on the prototype.

My next step is to diff with the systems analyzer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The functions are on the prototype so they don't take extra memory. But I don't know how well V8 will inline these calls. Needs a benchmark run.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally I wouldn't want to use these functions and would prefer a static which is more obviously inlinable

ObjectDefineProperties(ReadableState.prototype, {
objectMode: makeBitMapDescriptor(kObjectMode),
ended: makeBitMapDescriptor(kEnded),
endEmitted: makeBitMapDescriptor(kEndEmitted),
reading: makeBitMapDescriptor(kReading),
// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
constructed: makeBitMapDescriptor(kConstructed),
// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
sync: makeBitMapDescriptor(kSync),
// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
needReadable: makeBitMapDescriptor(kNeedReadable),
emittedReadable: makeBitMapDescriptor(kEmittedReadable),
readableListening: makeBitMapDescriptor(kReadableListening),
resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
// True if the error was already emitted and should not be thrown again.
errorEmitted: makeBitMapDescriptor(kErrorEmitted),
emitClose: makeBitMapDescriptor(kEmitClose),
autoDestroy: makeBitMapDescriptor(kAutoDestroy),
// Has it been destroyed.
destroyed: makeBitMapDescriptor(kDestroyed),
// Indicates whether the stream has finished destroying.
closed: makeBitMapDescriptor(kClosed),
// True if close has been emitted or would have been emitted
// depending on emitClose.
closeEmitted: makeBitMapDescriptor(kCloseEmitted),
multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
// If true, a maybeReadMore has been scheduled.
readingMore: makeBitMapDescriptor(kReadingMore),
dataEmitted: makeBitMapDescriptor(kDataEmitted),
});

function ReadableState(options, stream, isDuplex) {
// Duplex streams are both readable and writable, but share
// the same options object.
Expand All @@ -92,13 +161,15 @@ function ReadableState(options, stream, isDuplex) {
if (typeof isDuplex !== 'boolean')
isDuplex = stream instanceof Stream.Duplex;

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
this.objectMode = !!(options && options.objectMode);
if (options && options.objectMode) this.state |= kObjectMode;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (options && options.objectMode) this.state |= kObjectMode;
if (options?.objectMode) this.state |= kObjectMode;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that was slower so I followed the pattern, do you know which of the following is faster?

 if (options && options.objectMode) this.state |= kObjectMode;
 if (options?.objectMode) this.state |= kObjectMode;
this.state |= kObjectMode & +options?.objectMode;


if (isDuplex)
this.objectMode = this.objectMode ||
!!(options && options.readableObjectMode);
if (isDuplex && options && options.readableObjectMode)
this.state |= options.readableObjectMode;
Comment thread
benjamingr marked this conversation as resolved.
Outdated

// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
Expand All @@ -113,54 +184,22 @@ function ReadableState(options, stream, isDuplex) {
this.length = 0;
this.pipes = [];
this.flowing = null;
this.ended = false;
this.endEmitted = false;
this.reading = false;

// Stream is still being constructed and cannot be
// destroyed until construction finished or failed.
// Async construction is opt in, therefore we start as
// constructed.
this.constructed = true;

// A flag to be able to tell if the event 'readable'/'data' is emitted
// immediately, or on a later tick. We set this to true at first, because
// any actions that shouldn't happen until "later" should generally also
// not happen before the first read call.
this.sync = true;

// Whenever we return null, then we set a flag to say
// that we're awaiting a 'readable' event emission.
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
this.resumeScheduled = false;
this[kPaused] = null;

// True if the error was already emitted and should not be thrown again.
this.errorEmitted = false;

// Should close be emitted on destroy. Defaults to true.
this.emitClose = !options || options.emitClose !== false;
if (options && options.emitClose === false) this.state &= ~kEmitClose;

// Should .destroy() be called after 'end' (and potentially 'finish').
this.autoDestroy = !options || options.autoDestroy !== false;
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;

// Has it been destroyed.
this.destroyed = false;

// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = null;

// Indicates whether the stream has finished destroying.
this.closed = false;

// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
Expand All @@ -177,12 +216,6 @@ function ReadableState(options, stream, isDuplex) {
// Ref the piped dest which we need a drain event on it
// type: null | Writable | Set<Writable>.
this.awaitDrainWriters = null;
this.multiAwaitDrain = false;

// If true, a maybeReadMore has been scheduled.
this.readingMore = false;

this.dataEmitted = false;

this.decoder = null;
this.encoding = null;
Expand Down Expand Up @@ -263,7 +296,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream._readableState;

let err;
if (!state.objectMode) {
if ((state.state & kObjectMode) !== 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
Expand All @@ -290,11 +323,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.reading = false;
state.state &= ~kReading;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you keep this line and the rest of the assignment as is, wouldn't v8 just inline the setter?

I'm asking as it's not straightforward

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you do the changes for Writable we'll go over the system analyzer together and see I basically did the changes based on generated IR/maps

onEofChunk(stream, state);
} else if (state.objectMode || (chunk && chunk.length > 0)) {
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if (state.endEmitted)
if ((state.state & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
Expand All @@ -305,7 +338,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
state.state &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
Expand All @@ -317,7 +350,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
}
}
} else if (!addToFront) {
state.reading = false;
state.state &= ~kReading;
maybeReadMore(stream, state);
}

Expand All @@ -333,7 +366,7 @@ function addChunk(stream, state, chunk, addToFront) {
stream.listenerCount('data') > 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if (state.multiAwaitDrain) {
if ((state.state & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
Expand All @@ -349,7 +382,7 @@ function addChunk(stream, state, chunk, addToFront) {
else
state.buffer.push(chunk);

if (state.needReadable)
if ((state.state & kNeedReadable) !== 0)
emitReadable(stream);
}
maybeReadMore(stream, state);
Expand Down Expand Up @@ -404,7 +437,7 @@ function computeNewHighWaterMark(n) {
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
Comment thread
rluvaton marked this conversation as resolved.
return 0;
if (state.objectMode)
if ((state.state & kObjectMode !== 0))
Comment thread
ronag marked this conversation as resolved.
Outdated
Comment thread
benjamingr marked this conversation as resolved.
Outdated
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
Expand Down Expand Up @@ -435,9 +468,9 @@ Readable.prototype.read = function(n) {
state.highWaterMark = computeNewHighWaterMark(n);

if (n !== 0)
state.emittedReadable = false;

// If we're doing read(0) to trigger a readable event, but we
state.state &= ~kEmittedReadable;
// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
if (n === 0 &&
Expand Down Expand Up @@ -486,7 +519,7 @@ Readable.prototype.read = function(n) {
// 3. Actually pull the requested chunks out of the buffer and return.

// if we need a readable event, then we need to do some reading.
let doRead = state.needReadable;
let doRead = (state.state & kNeedReadable) !== 0;
Comment thread
ronag marked this conversation as resolved.
debug('need readable', doRead);

// If we currently have less than the highWaterMark, then also read some.
Expand All @@ -504,20 +537,19 @@ Readable.prototype.read = function(n) {
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
state.state |= kReading | kSync;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;
state.state |= kNeedReadable;

// Call internal read method
try {
this._read(state.highWaterMark);
} catch (err) {
errorOrDestroy(this, err);
}
state.state &= ~kSync;

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (!state.reading)
Comment thread
rluvaton marked this conversation as resolved.
Expand Down