summaryrefslogtreecommitdiff
path: root/std/node/_stream/readable.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/node/_stream/readable.ts')
-rw-r--r--std/node/_stream/readable.ts546
1 files changed, 43 insertions, 503 deletions
diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts
index 72e61dff7..c8ed29953 100644
--- a/std/node/_stream/readable.ts
+++ b/std/node/_stream/readable.ts
@@ -1,492 +1,46 @@
// Copyright Node.js contributors. All rights reserved. MIT License.
-import EventEmitter, { captureRejectionSymbol } from "../events.ts";
+import { captureRejectionSymbol } from "../events.ts";
import Stream from "./stream.ts";
-import { Buffer } from "../buffer.ts";
+import type { Buffer } from "../buffer.ts";
import BufferList from "./buffer_list.ts";
import {
ERR_INVALID_OPT_VALUE,
ERR_METHOD_NOT_IMPLEMENTED,
- ERR_MULTIPLE_CALLBACK,
- ERR_STREAM_PUSH_AFTER_EOF,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
} from "../_errors.ts";
+import type { Encodings } from "../_utils.ts";
import { StringDecoder } from "../string_decoder.ts";
import createReadableStreamAsyncIterator from "./async_iterator.ts";
import streamFrom from "./from.ts";
-import { kConstruct, kDestroy, kPaused } from "./symbols.ts";
-import type Writable from "./writable.ts";
-import { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
-
-function construct(stream: Readable, cb: () => void) {
- const r = stream._readableState;
-
- if (!stream._construct) {
- return;
- }
-
- stream.once(kConstruct, cb);
-
- r.constructed = false;
-
- queueMicrotask(() => {
- let called = false;
- stream._construct?.((err?: Error) => {
- r.constructed = true;
-
- if (called) {
- err = new ERR_MULTIPLE_CALLBACK();
- } else {
- called = true;
- }
-
- if (r.destroyed) {
- stream.emit(kDestroy, err);
- } else if (err) {
- errorOrDestroy(stream, err, true);
- } else {
- queueMicrotask(() => stream.emit(kConstruct));
- }
- });
- });
-}
-
-function _destroy(
- self: Readable,
- err?: Error,
- cb?: (error?: Error | null) => void,
-) {
- self._destroy(err || null, (err) => {
- const r = (self as Readable)._readableState;
-
- if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- }
-
- r.closed = true;
-
- if (typeof cb === "function") {
- cb(err);
- }
-
- if (err) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- self.emit("error", err);
- }
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- } else {
- queueMicrotask(() => {
- r.closeEmitted = true;
- if (r.emitClose) {
- self.emit("close");
- }
- });
- }
- });
-}
-
-function errorOrDestroy(stream: Readable, err: Error, sync = false) {
- const r = stream._readableState;
-
- if (r.destroyed) {
- return stream;
- }
-
- if (r.autoDestroy) {
- stream.destroy(err);
- } else if (err) {
- // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
- err.stack;
-
- if (!r.errored) {
- r.errored = err;
- }
- if (sync) {
- queueMicrotask(() => {
- if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- });
- } else if (!r.errorEmitted) {
- r.errorEmitted = true;
- stream.emit("error", err);
- }
- }
-}
-
-function flow(stream: Readable) {
- const state = stream._readableState;
- while (state.flowing && stream.read() !== null);
-}
-
-function pipeOnDrain(src: Readable, dest: Writable) {
- return function pipeOnDrainFunctionResult() {
- const state = src._readableState;
-
- if (state.awaitDrainWriters === dest) {
- state.awaitDrainWriters = null;
- } else if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).delete(dest);
- }
-
- if (
- (!state.awaitDrainWriters ||
- (state.awaitDrainWriters as Set<Writable>).size === 0) &&
- src.listenerCount("data")
- ) {
- state.flowing = true;
- flow(src);
- }
- };
-}
-
-function updateReadableListening(self: Readable) {
- const state = self._readableState;
- state.readableListening = self.listenerCount("readable") > 0;
-
- if (state.resumeScheduled && state[kPaused] === false) {
- // Flowing needs to be set to true now, otherwise
- // the upcoming resume will not flow.
- state.flowing = true;
-
- // Crude way to check if we should resume.
- } else if (self.listenerCount("data") > 0) {
- self.resume();
- } else if (!state.readableListening) {
- state.flowing = null;
- }
-}
-
-function nReadingNextTick(self: Readable) {
- self.read(0);
-}
-
-function resume(stream: Readable, state: ReadableState) {
- if (!state.resumeScheduled) {
- state.resumeScheduled = true;
- queueMicrotask(() => resume_(stream, state));
- }
-}
-
-function resume_(stream: Readable, state: ReadableState) {
- if (!state.reading) {
- stream.read(0);
- }
-
- state.resumeScheduled = false;
- stream.emit("resume");
- flow(stream);
- if (state.flowing && !state.reading) {
- stream.read(0);
- }
-}
-
-function readableAddChunk(
- stream: Readable,
- chunk: string | Buffer | Uint8Array | null,
- encoding: undefined | string = undefined,
- addToFront: boolean,
-) {
- const state = stream._readableState;
- let usedEncoding = encoding;
-
- let err;
- if (!state.objectMode) {
- if (typeof chunk === "string") {
- usedEncoding = encoding || state.defaultEncoding;
- if (state.encoding !== usedEncoding) {
- if (addToFront && state.encoding) {
- chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding);
- } else {
- chunk = Buffer.from(chunk, usedEncoding);
- usedEncoding = "";
- }
- }
- } else if (chunk instanceof Uint8Array) {
- chunk = Buffer.from(chunk);
- }
- }
-
- if (err) {
- errorOrDestroy(stream, err);
- } else if (chunk === null) {
- state.reading = false;
- onEofChunk(stream, state);
- } else if (state.objectMode || (chunk.length > 0)) {
- if (addToFront) {
- if (state.endEmitted) {
- errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
- } else {
- addChunk(stream, state, chunk, true);
- }
- } else if (state.ended) {
- errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
- } else if (state.destroyed || state.errored) {
- return false;
- } else {
- state.reading = false;
- if (state.decoder && !usedEncoding) {
- //TODO(Soremwar)
- //I don't think this cast is right
- chunk = state.decoder.write(Buffer.from(chunk as Uint8Array));
- if (state.objectMode || chunk.length !== 0) {
- addChunk(stream, state, chunk, false);
- } else {
- maybeReadMore(stream, state);
- }
- } else {
- addChunk(stream, state, chunk, false);
- }
- }
- } else if (!addToFront) {
- state.reading = false;
- maybeReadMore(stream, state);
- }
-
- return !state.ended &&
- (state.length < state.highWaterMark || state.length === 0);
-}
-
-function addChunk(
- stream: Readable,
- state: ReadableState,
- chunk: string | Buffer | Uint8Array,
- addToFront: boolean,
-) {
- if (state.flowing && state.length === 0 && !state.sync) {
- if (state.multiAwaitDrain) {
- (state.awaitDrainWriters as Set<Writable>).clear();
- } else {
- state.awaitDrainWriters = null;
- }
- stream.emit("data", chunk);
- } else {
- // Update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront) {
- state.buffer.unshift(chunk);
- } else {
- state.buffer.push(chunk);
- }
-
- if (state.needReadable) {
- emitReadable(stream);
- }
- }
- maybeReadMore(stream, state);
-}
-
-function prependListener(
- emitter: EventEmitter,
- event: string,
- // deno-lint-ignore no-explicit-any
- fn: (...args: any[]) => any,
-) {
- if (typeof emitter.prependListener === "function") {
- return emitter.prependListener(event, fn);
- }
-
- // This is a hack to make sure that our error handler is attached before any
- // userland ones. NEVER DO THIS. This is here only because this code needs
- // to continue to work with older versions of Node.js that do not include
- //the prependListener() method. The goal is to eventually remove this hack.
- // TODO(Soremwar)
- // Burn it with fire
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- if (emitter._events.get(event)?.length) {
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- const listeners = [fn, ...emitter._events.get(event)];
- // deno-lint-ignore ban-ts-comment
- //@ts-ignore
- emitter._events.set(event, listeners);
- } else {
- emitter.on(event, fn);
- }
-}
-
-/** Pluck off n bytes from an array of buffers.
-* Length is the combined lengths of all the buffers in the list.
-* This function is designed to be inlinable, so please take care when making
-* changes to the function body.
-*/
-function fromList(n: number, state: ReadableState) {
- // nothing buffered.
- if (state.length === 0) {
- return null;
- }
-
- let ret;
- if (state.objectMode) {
- ret = state.buffer.shift();
- } else if (!n || n >= state.length) {
- if (state.decoder) {
- ret = state.buffer.join("");
- } else if (state.buffer.length === 1) {
- ret = state.buffer.first();
- } else {
- ret = state.buffer.concat(state.length);
- }
- state.buffer.clear();
- } else {
- ret = state.buffer.consume(n, !!state.decoder);
- }
-
- return ret;
-}
-
-function endReadable(stream: Readable) {
- const state = stream._readableState;
-
- if (!state.endEmitted) {
- state.ended = true;
- queueMicrotask(() => endReadableNT(state, stream));
- }
-}
-
-function endReadableNT(state: ReadableState, stream: Readable) {
- if (
- !state.errorEmitted && !state.closeEmitted &&
- !state.endEmitted && state.length === 0
- ) {
- state.endEmitted = true;
- stream.emit("end");
-
- if (state.autoDestroy) {
- stream.destroy();
- }
- }
-}
-
-// Don't raise the hwm > 1GB.
-const MAX_HWM = 0x40000000;
-function computeNewHighWaterMark(n: number) {
- if (n >= MAX_HWM) {
- n = MAX_HWM;
- } else {
- n--;
- n |= n >>> 1;
- n |= n >>> 2;
- n |= n >>> 4;
- n |= n >>> 8;
- n |= n >>> 16;
- n++;
- }
- return n;
-}
-
-function howMuchToRead(n: number, state: ReadableState) {
- if (n <= 0 || (state.length === 0 && state.ended)) {
- return 0;
- }
- if (state.objectMode) {
- return 1;
- }
- if (Number.isNaN(n)) {
- // Only flow one buffer at a time.
- if (state.flowing && state.length) {
- return state.buffer.first().length;
- }
- return state.length;
- }
- if (n <= state.length) {
- return n;
- }
- return state.ended ? state.length : 0;
-}
-
-function onEofChunk(stream: Readable, state: ReadableState) {
- if (state.ended) return;
- if (state.decoder) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length) {
- state.buffer.push(chunk);
- state.length += state.objectMode ? 1 : chunk.length;
- }
- }
- state.ended = true;
-
- if (state.sync) {
- emitReadable(stream);
- } else {
- state.needReadable = false;
- state.emittedReadable = true;
- emitReadable_(stream);
- }
-}
-
-function emitReadable(stream: Readable) {
- const state = stream._readableState;
- state.needReadable = false;
- if (!state.emittedReadable) {
- state.emittedReadable = true;
- queueMicrotask(() => emitReadable_(stream));
- }
-}
-
-function emitReadable_(stream: Readable) {
- const state = stream._readableState;
- if (!state.destroyed && !state.errored && (state.length || state.ended)) {
- stream.emit("readable");
- state.emittedReadable = false;
- }
-
- state.needReadable = !state.flowing &&
- !state.ended &&
- state.length <= state.highWaterMark;
- flow(stream);
-}
-
-function maybeReadMore(stream: Readable, state: ReadableState) {
- if (!state.readingMore && state.constructed) {
- state.readingMore = true;
- queueMicrotask(() => maybeReadMore_(stream, state));
- }
-}
-
-function maybeReadMore_(stream: Readable, state: ReadableState) {
- while (
- !state.reading && !state.ended &&
- (state.length < state.highWaterMark ||
- (state.flowing && state.length === 0))
- ) {
- const len = state.length;
- stream.read(0);
- if (len === state.length) {
- // Didn't get any data, stop spinning.
- break;
- }
- }
- state.readingMore = false;
-}
+import { kDestroy, kPaused } from "./symbols.ts";
+import {
+ _destroy,
+ computeNewHighWaterMark,
+ emitReadable,
+ endReadable,
+ errorOrDestroy,
+ fromList,
+ howMuchToRead,
+ nReadingNextTick,
+ pipeOnDrain,
+ prependListener,
+ readableAddChunk,
+ resume,
+ updateReadableListening,
+} from "./readable_internal.ts";
+import Writable from "./writable.ts";
+import { errorOrDestroy as errorOrDestroyWritable } from "./writable_internal.ts";
+import Duplex, { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts";
export interface ReadableOptions {
autoDestroy?: boolean;
- construct?: () => void;
- //TODO(Soremwar)
- //Import available encodings
- defaultEncoding?: string;
+ defaultEncoding?: Encodings;
destroy?(
this: Readable,
error: Error | null,
callback: (error: Error | null) => void,
): void;
emitClose?: boolean;
- //TODO(Soremwar)
- //Import available encodings
- encoding?: string;
+ encoding?: Encodings;
highWaterMark?: number;
objectMode?: boolean;
read?(this: Readable): void;
@@ -494,7 +48,7 @@ export interface ReadableOptions {
export class ReadableState {
[kPaused]: boolean | null = null;
- awaitDrainWriters: Writable | Set<Writable> | null = null;
+ awaitDrainWriters: Duplex | Writable | Set<Duplex | Writable> | null = null;
buffer = new BufferList();
closed = false;
closeEmitted = false;
@@ -502,9 +56,7 @@ export class ReadableState {
decoder: StringDecoder | null = null;
destroyed = false;
emittedReadable = false;
- //TODO(Soremwar)
- //Import available encodings
- encoding: string | null = null;
+ encoding: Encodings | null = null;
ended = false;
endEmitted = false;
errored: Error | null = null;
@@ -515,7 +67,7 @@ export class ReadableState {
multiAwaitDrain = false;
needReadable = false;
objectMode: boolean;
- pipes: Writable[] = [];
+ pipes: Array<Duplex | Writable> = [];
readable = true;
readableListening = false;
reading = false;
@@ -551,7 +103,6 @@ export class ReadableState {
}
class Readable extends Stream {
- _construct?: (cb: (error?: Error) => void) => void;
_readableState: ReadableState;
constructor(options?: ReadableOptions) {
@@ -563,15 +114,8 @@ class Readable extends Stream {
if (typeof options.destroy === "function") {
this._destroy = options.destroy;
}
- if (typeof options.construct === "function") {
- this._construct = options.construct;
- }
}
this._readableState = new ReadableState(options);
-
- construct(this, () => {
- maybeReadMore(this, this._readableState);
- });
}
static from(
@@ -690,13 +234,11 @@ class Readable extends Stream {
return ret;
}
- _read() {
+ _read(_size?: number) {
throw new ERR_METHOD_NOT_IMPLEMENTED("_read()");
}
- //TODO(Soremwar)
- //Should be duplex
- pipe<T extends Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
+ pipe<T extends Duplex | Writable>(dest: T, pipeOpts?: { end?: boolean }): T {
// deno-lint-ignore no-this-alias
const src = this;
const state = this._readableState;
@@ -776,7 +318,7 @@ class Readable extends Stream {
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
- (state.awaitDrainWriters as Set<Writable>).add(dest);
+ (state.awaitDrainWriters as Set<Duplex | Writable>).add(dest);
}
src.pause();
}
@@ -791,12 +333,13 @@ class Readable extends Stream {
unpipe();
dest.removeListener("error", onerror);
if (dest.listenerCount("error") === 0) {
- //TODO(Soremwar)
- //Should be const s = dest._writableState || dest._readableState;
- const s = dest._writableState;
+ const s = dest._writableState || (dest as Duplex)._readableState;
if (s && !s.errorEmitted) {
- // User incorrectly emitted 'error' directly on the stream.
- errorOrDestroyDuplex(dest, er);
+ if (dest instanceof Duplex) {
+ errorOrDestroyDuplex(dest as unknown as Duplex, er);
+ } else {
+ errorOrDestroyWritable(dest as Writable, er);
+ }
} else {
dest.emit("error", er);
}
@@ -817,7 +360,7 @@ class Readable extends Stream {
dest.once("finish", onfinish);
function unpipe() {
- src.unpipe(dest);
+ src.unpipe(dest as Writable);
}
dest.emit("pipe", this);
@@ -834,12 +377,11 @@ class Readable extends Stream {
this._readableState.flowing === false;
}
- //TODO(Soremwar)
- //Replace string with encoding types
- setEncoding(enc: string) {
+ setEncoding(enc: Encodings) {
const decoder = new StringDecoder(enc);
this._readableState.decoder = decoder;
- this._readableState.encoding = this._readableState.decoder.encoding;
+ this._readableState.encoding = this._readableState.decoder
+ .encoding as Encodings;
const buffer = this._readableState.buffer;
let content = "";
@@ -931,7 +473,7 @@ class Readable extends Stream {
off = this.removeListener;
- destroy(err?: Error, cb?: () => void) {
+ destroy(err?: Error | null, cb?: () => void) {
const r = this._readableState;
if (r.destroyed) {
@@ -989,10 +531,8 @@ class Readable extends Stream {
this.destroy(err);
}
- //TODO(Soremwar)
- //Same deal, string => encodings
// deno-lint-ignore no-explicit-any
- push(chunk: any, encoding?: string): boolean {
+ push(chunk: any, encoding?: Encodings): boolean {
return readableAddChunk(this, chunk, encoding, false);
}
@@ -1233,7 +773,7 @@ class Readable extends Stream {
}
}
-Object.defineProperties(Stream, {
+Object.defineProperties(Readable, {
_readableState: { enumerable: false },
destroyed: { enumerable: false },
readableBuffer: { enumerable: false },