summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/internal/streams
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/internal/streams')
-rw-r--r--ext/node/polyfills/internal/streams/add-abort-signal.mjs48
-rw-r--r--ext/node/polyfills/internal/streams/buffer_list.mjs188
-rw-r--r--ext/node/polyfills/internal/streams/destroy.mjs320
-rw-r--r--ext/node/polyfills/internal/streams/duplex.mjs9
-rw-r--r--ext/node/polyfills/internal/streams/end-of-stream.mjs229
-rw-r--r--ext/node/polyfills/internal/streams/lazy_transform.mjs53
-rw-r--r--ext/node/polyfills/internal/streams/legacy.mjs113
-rw-r--r--ext/node/polyfills/internal/streams/passthrough.mjs7
-rw-r--r--ext/node/polyfills/internal/streams/readable.mjs9
-rw-r--r--ext/node/polyfills/internal/streams/state.mjs10
-rw-r--r--ext/node/polyfills/internal/streams/transform.mjs7
-rw-r--r--ext/node/polyfills/internal/streams/utils.mjs242
-rw-r--r--ext/node/polyfills/internal/streams/writable.mjs9
13 files changed, 1244 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal/streams/add-abort-signal.mjs b/ext/node/polyfills/internal/streams/add-abort-signal.mjs
new file mode 100644
index 000000000..5d7512f1c
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/add-abort-signal.mjs
@@ -0,0 +1,48 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { AbortError, ERR_INVALID_ARG_TYPE } from "internal:deno_node/polyfills/internal/errors.ts";
+import eos from "internal:deno_node/polyfills/internal/streams/end-of-stream.mjs";
+
+// This method is inlined here for readable-stream
+// It also does not allow for signal to not exist on the stream
+// https://github.com/nodejs/node/pull/36061#discussion_r533718029
+const validateAbortSignal = (signal, name) => {
+ if (
+ typeof signal !== "object" ||
+ !("aborted" in signal)
+ ) {
+ throw new ERR_INVALID_ARG_TYPE(name, "AbortSignal", signal);
+ }
+};
+
+function isStream(obj) {
+ return !!(obj && typeof obj.pipe === "function");
+}
+
+function addAbortSignal(signal, stream) {
+ validateAbortSignal(signal, "signal");
+ if (!isStream(stream)) {
+ throw new ERR_INVALID_ARG_TYPE("stream", "stream.Stream", stream);
+ }
+ return addAbortSignalNoValidate(signal, stream);
+}
+function addAbortSignalNoValidate(signal, stream) {
+ if (typeof signal !== "object" || !("aborted" in signal)) {
+ return stream;
+ }
+ const onAbort = () => {
+ stream.destroy(new AbortError());
+ };
+ if (signal.aborted) {
+ onAbort();
+ } else {
+ signal.addEventListener("abort", onAbort);
+ eos(stream, () => signal.removeEventListener("abort", onAbort));
+ }
+ return stream;
+}
+
+export default { addAbortSignal, addAbortSignalNoValidate };
+export { addAbortSignal, addAbortSignalNoValidate };
diff --git a/ext/node/polyfills/internal/streams/buffer_list.mjs b/ext/node/polyfills/internal/streams/buffer_list.mjs
new file mode 100644
index 000000000..3016ffba5
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/buffer_list.mjs
@@ -0,0 +1,188 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { Buffer } from "internal:deno_node/polyfills/buffer.ts";
+import { inspect } from "internal:deno_node/polyfills/internal/util/inspect.mjs";
+
+class BufferList {
+ constructor() {
+ this.head = null;
+ this.tail = null;
+ this.length = 0;
+ }
+
+ push(v) {
+ const entry = { data: v, next: null };
+ if (this.length > 0) {
+ this.tail.next = entry;
+ } else {
+ this.head = entry;
+ }
+ this.tail = entry;
+ ++this.length;
+ }
+
+ unshift(v) {
+ const entry = { data: v, next: this.head };
+ if (this.length === 0) {
+ this.tail = entry;
+ }
+ this.head = entry;
+ ++this.length;
+ }
+
+ shift() {
+ if (this.length === 0) {
+ return;
+ }
+ const ret = this.head.data;
+ if (this.length === 1) {
+ this.head = this.tail = null;
+ } else {
+ this.head = this.head.next;
+ }
+ --this.length;
+ return ret;
+ }
+
+ clear() {
+ this.head = this.tail = null;
+ this.length = 0;
+ }
+
+ join(s) {
+ if (this.length === 0) {
+ return "";
+ }
+ let p = this.head;
+ let ret = "" + p.data;
+ while (p = p.next) {
+ ret += s + p.data;
+ }
+ return ret;
+ }
+
+ concat(n) {
+ if (this.length === 0) {
+ return Buffer.alloc(0);
+ }
+ const ret = Buffer.allocUnsafe(n >>> 0);
+ let p = this.head;
+ let i = 0;
+ while (p) {
+ ret.set(p.data, i);
+ i += p.data.length;
+ p = p.next;
+ }
+ return ret;
+ }
+
+ // Consumes a specified amount of bytes or characters from the buffered data.
+ consume(n, hasStrings) {
+ const data = this.head.data;
+ if (n < data.length) {
+ // `slice` is the same for buffers and strings.
+ const slice = data.slice(0, n);
+ this.head.data = data.slice(n);
+ return slice;
+ }
+ if (n === data.length) {
+ // First chunk is a perfect match.
+ return this.shift();
+ }
+ // Result spans more than one buffer.
+ return hasStrings ? this._getString(n) : this._getBuffer(n);
+ }
+
+ first() {
+ return this.head.data;
+ }
+
+ *[Symbol.iterator]() {
+ for (let p = this.head; p; p = p.next) {
+ yield p.data;
+ }
+ }
+
+ // Consumes a specified amount of characters from the buffered data.
+ _getString(n) {
+ let ret = "";
+ let p = this.head;
+ let c = 0;
+ do {
+ const str = p.data;
+ if (n > str.length) {
+ ret += str;
+ n -= str.length;
+ } else {
+ if (n === str.length) {
+ ret += str;
+ ++c;
+ if (p.next) {
+ this.head = p.next;
+ } else {
+ this.head = this.tail = null;
+ }
+ } else {
+ ret += str.slice(0, n);
+ this.head = p;
+ p.data = str.slice(n);
+ }
+ break;
+ }
+ ++c;
+ } while (p = p.next);
+ this.length -= c;
+ return ret;
+ }
+
+ // Consumes a specified amount of bytes from the buffered data.
+ _getBuffer(n) {
+ const ret = Buffer.allocUnsafe(n);
+ const retLen = n;
+ let p = this.head;
+ let c = 0;
+ do {
+ const buf = p.data;
+ if (n > buf.length) {
+ ret.set(buf, retLen - n);
+ n -= buf.length;
+ } else {
+ if (n === buf.length) {
+ ret.set(buf, retLen - n);
+ ++c;
+ if (p.next) {
+ this.head = p.next;
+ } else {
+ this.head = this.tail = null;
+ }
+ } else {
+ ret.set(
+ new Uint8Array(buf.buffer, buf.byteOffset, n),
+ retLen - n,
+ );
+ this.head = p;
+ p.data = buf.slice(n);
+ }
+ break;
+ }
+ ++c;
+ } while (p = p.next);
+ this.length -= c;
+ return ret;
+ }
+
+ // Make sure the linked list only shows the minimal necessary information.
+ [inspect.custom](_, options) {
+ return inspect(this, {
+ ...options,
+ // Only inspect one level.
+ depth: 0,
+ // It should not recurse.
+ customInspect: false,
+ });
+ }
+}
+
+export default BufferList;
diff --git a/ext/node/polyfills/internal/streams/destroy.mjs b/ext/node/polyfills/internal/streams/destroy.mjs
new file mode 100644
index 000000000..b065f2119
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/destroy.mjs
@@ -0,0 +1,320 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { aggregateTwoErrors, ERR_MULTIPLE_CALLBACK } from "internal:deno_node/polyfills/internal/errors.ts";
+import * as process from "internal:deno_node/polyfills/_process/process.ts";
+
+const kDestroy = Symbol("kDestroy");
+const kConstruct = Symbol("kConstruct");
+
+function checkError(err, w, r) {
+ if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack; // eslint-disable-line no-unused-expressions
+
+ if (w && !w.errored) {
+ w.errored = err;
+ }
+ if (r && !r.errored) {
+ r.errored = err;
+ }
+ }
+}
+
+// Backwards compat. cb() is undocumented and unused in core but
+// unfortunately might be used by modules.
+function destroy(err, cb) {
+ const r = this._readableState;
+ const w = this._writableState;
+ // With duplex streams we use the writable side for state.
+ const s = w || r;
+
+ if ((w && w.destroyed) || (r && r.destroyed)) {
+ if (typeof cb === "function") {
+ cb();
+ }
+
+ return this;
+ }
+
+ // We set destroyed to true before firing error callbacks in order
+ // to make it re-entrance safe in case destroy() is called within callbacks
+ checkError(err, w, r);
+
+ if (w) {
+ w.destroyed = true;
+ }
+ if (r) {
+ r.destroyed = true;
+ }
+
+ // If still constructing then defer calling _destroy.
+ if (!s.constructed) {
+ this.once(kDestroy, function (er) {
+ _destroy(this, aggregateTwoErrors(er, err), cb);
+ });
+ } else {
+ _destroy(this, err, cb);
+ }
+
+ return this;
+}
+
+function _destroy(self, err, cb) {
+ let called = false;
+
+ function onDestroy(err) {
+ if (called) {
+ return;
+ }
+ called = true;
+
+ const r = self._readableState;
+ const w = self._writableState;
+
+ checkError(err, w, r);
+
+ if (w) {
+ w.closed = true;
+ }
+ if (r) {
+ r.closed = true;
+ }
+
+ if (typeof cb === "function") {
+ cb(err);
+ }
+
+ if (err) {
+ process.nextTick(emitErrorCloseNT, self, err);
+ } else {
+ process.nextTick(emitCloseNT, self);
+ }
+ }
+ try {
+ const result = self._destroy(err || null, onDestroy);
+ if (result != null) {
+ const then = result.then;
+ if (typeof then === "function") {
+ then.call(
+ result,
+ function () {
+ process.nextTick(onDestroy, null);
+ },
+ function (err) {
+ process.nextTick(onDestroy, err);
+ },
+ );
+ }
+ }
+ } catch (err) {
+ onDestroy(err);
+ }
+}
+
+function emitErrorCloseNT(self, err) {
+ emitErrorNT(self, err);
+ emitCloseNT(self);
+}
+
+function emitCloseNT(self) {
+ const r = self._readableState;
+ const w = self._writableState;
+
+ if (w) {
+ w.closeEmitted = true;
+ }
+ if (r) {
+ r.closeEmitted = true;
+ }
+
+ if ((w && w.emitClose) || (r && r.emitClose)) {
+ self.emit("close");
+ }
+}
+
+function emitErrorNT(self, err) {
+ const r = self._readableState;
+ const w = self._writableState;
+
+ if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
+ return;
+ }
+
+ if (w) {
+ w.errorEmitted = true;
+ }
+ if (r) {
+ r.errorEmitted = true;
+ }
+
+ self.emit("error", err);
+}
+
+function undestroy() {
+ const r = this._readableState;
+ const w = this._writableState;
+
+ if (r) {
+ r.constructed = true;
+ r.closed = false;
+ r.closeEmitted = false;
+ r.destroyed = false;
+ r.errored = null;
+ r.errorEmitted = false;
+ r.reading = false;
+ r.ended = false;
+ r.endEmitted = false;
+ }
+
+ if (w) {
+ w.constructed = true;
+ w.destroyed = false;
+ w.closed = false;
+ w.closeEmitted = false;
+ w.errored = null;
+ w.errorEmitted = false;
+ w.ended = false;
+ w.ending = false;
+ w.finalCalled = false;
+ w.prefinished = false;
+ w.finished = false;
+ }
+}
+
+function errorOrDestroy(stream, err, sync) {
+ // We have tests that rely on errors being emitted
+ // in the same tick, so changing this is semver major.
+ // For now when you opt-in to autoDestroy we allow
+ // the error to be emitted nextTick. In a future
+ // semver major update we should change the default to this.
+
+ const r = stream._readableState;
+ const w = stream._writableState;
+
+ if ((w && w.destroyed) || (r && r.destroyed)) {
+ return this;
+ }
+
+ if ((r && r.autoDestroy) || (w && w.autoDestroy)) {
+ stream.destroy(err);
+ } else if (err) {
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack; // eslint-disable-line no-unused-expressions
+
+ if (w && !w.errored) {
+ w.errored = err;
+ }
+ if (r && !r.errored) {
+ r.errored = err;
+ }
+ if (sync) {
+ process.nextTick(emitErrorNT, stream, err);
+ } else {
+ emitErrorNT(stream, err);
+ }
+ }
+}
+
+function construct(stream, cb) {
+ if (typeof stream._construct !== "function") {
+ return;
+ }
+
+ const r = stream._readableState;
+ const w = stream._writableState;
+
+ if (r) {
+ r.constructed = false;
+ }
+ if (w) {
+ w.constructed = false;
+ }
+
+ stream.once(kConstruct, cb);
+
+ if (stream.listenerCount(kConstruct) > 1) {
+ // Duplex
+ return;
+ }
+
+ process.nextTick(constructNT, stream);
+}
+
+function constructNT(stream) {
+ let called = false;
+
+ function onConstruct(err) {
+ if (called) {
+ errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
+ return;
+ }
+ called = true;
+
+ const r = stream._readableState;
+ const w = stream._writableState;
+ const s = w || r;
+
+ if (r) {
+ r.constructed = true;
+ }
+ if (w) {
+ w.constructed = true;
+ }
+
+ if (s.destroyed) {
+ stream.emit(kDestroy, err);
+ } else if (err) {
+ errorOrDestroy(stream, err, true);
+ } else {
+ process.nextTick(emitConstructNT, stream);
+ }
+ }
+
+ try {
+ const result = stream._construct(onConstruct);
+ if (result != null) {
+ const then = result.then;
+ if (typeof then === "function") {
+ then.call(
+ result,
+ function () {
+ process.nextTick(onConstruct, null);
+ },
+ function (err) {
+ process.nextTick(onConstruct, err);
+ },
+ );
+ }
+ }
+ } catch (err) {
+ onConstruct(err);
+ }
+}
+
+function emitConstructNT(stream) {
+ stream.emit(kConstruct);
+}
+
+function isRequest(stream) {
+ return stream && stream.setHeader && typeof stream.abort === "function";
+}
+
+// Normalize destroy for legacy.
+function destroyer(stream, err) {
+ if (!stream) return;
+ if (isRequest(stream)) return stream.abort();
+ if (isRequest(stream.req)) return stream.req.abort();
+ if (typeof stream.destroy === "function") return stream.destroy(err);
+ if (typeof stream.close === "function") return stream.close();
+}
+
+export default {
+ construct,
+ destroyer,
+ destroy,
+ undestroy,
+ errorOrDestroy,
+};
+export { construct, destroy, destroyer, errorOrDestroy, undestroy };
diff --git a/ext/node/polyfills/internal/streams/duplex.mjs b/ext/node/polyfills/internal/streams/duplex.mjs
new file mode 100644
index 000000000..b2086d467
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/duplex.mjs
@@ -0,0 +1,9 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { Duplex } from "internal:deno_node/polyfills/_stream.mjs";
+const { from, fromWeb, toWeb } = Duplex;
+
+export default Duplex;
+export { from, fromWeb, toWeb };
diff --git a/ext/node/polyfills/internal/streams/end-of-stream.mjs b/ext/node/polyfills/internal/streams/end-of-stream.mjs
new file mode 100644
index 000000000..b5c380d56
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/end-of-stream.mjs
@@ -0,0 +1,229 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { AbortError, ERR_STREAM_PREMATURE_CLOSE } from "internal:deno_node/polyfills/internal/errors.ts";
+import { once } from "internal:deno_node/polyfills/internal/util.mjs";
+import {
+ validateAbortSignal,
+ validateFunction,
+ validateObject,
+} from "internal:deno_node/polyfills/internal/validators.mjs";
+import * as process from "internal:deno_node/polyfills/_process/process.ts";
+
+function isRequest(stream) {
+ return stream.setHeader && typeof stream.abort === "function";
+}
+
+function isServerResponse(stream) {
+ return (
+ typeof stream._sent100 === "boolean" &&
+ typeof stream._removedConnection === "boolean" &&
+ typeof stream._removedContLen === "boolean" &&
+ typeof stream._removedTE === "boolean" &&
+ typeof stream._closed === "boolean"
+ );
+}
+
+function isReadable(stream) {
+ return typeof stream.readable === "boolean" ||
+ typeof stream.readableEnded === "boolean" ||
+ !!stream._readableState;
+}
+
+function isWritable(stream) {
+ return typeof stream.writable === "boolean" ||
+ typeof stream.writableEnded === "boolean" ||
+ !!stream._writableState;
+}
+
+function isWritableFinished(stream) {
+ if (stream.writableFinished) return true;
+ const wState = stream._writableState;
+ if (!wState || wState.errored) return false;
+ return wState.finished || (wState.ended && wState.length === 0);
+}
+
+const nop = () => {};
+
+function isReadableEnded(stream) {
+ if (stream.readableEnded) return true;
+ const rState = stream._readableState;
+ if (!rState || rState.errored) return false;
+ return rState.endEmitted || (rState.ended && rState.length === 0);
+}
+
+function eos(stream, options, callback) {
+ if (arguments.length === 2) {
+ callback = options;
+ options = {};
+ } else if (options == null) {
+ options = {};
+ } else {
+ validateObject(options, "options");
+ }
+ validateFunction(callback, "callback");
+ validateAbortSignal(options.signal, "options.signal");
+
+ callback = once(callback);
+
+ const readable = options.readable ||
+ (options.readable !== false && isReadable(stream));
+ const writable = options.writable ||
+ (options.writable !== false && isWritable(stream));
+
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+ const state = wState || rState;
+
+ const onlegacyfinish = () => {
+ if (!stream.writable) onfinish();
+ };
+
+ // TODO (ronag): Improve soft detection to include core modules and
+ // common ecosystem modules that do properly emit 'close' but fail
+ // this generic check.
+ let willEmitClose = isServerResponse(stream) || (
+ state &&
+ state.autoDestroy &&
+ state.emitClose &&
+ state.closed === false &&
+ isReadable(stream) === readable &&
+ isWritable(stream) === writable
+ );
+
+ let writableFinished = stream.writableFinished ||
+ (wState && wState.finished);
+ const onfinish = () => {
+ writableFinished = true;
+ // Stream should not be destroyed here. If it is that
+ // means that user space is doing something differently and
+ // we cannot trust willEmitClose.
+ if (stream.destroyed) willEmitClose = false;
+
+ if (willEmitClose && (!stream.readable || readable)) return;
+ if (!readable || readableEnded) callback.call(stream);
+ };
+
+ let readableEnded = stream.readableEnded ||
+ (rState && rState.endEmitted);
+ const onend = () => {
+ readableEnded = true;
+ // Stream should not be destroyed here. If it is that
+ // means that user space is doing something differently and
+ // we cannot trust willEmitClose.
+ if (stream.destroyed) willEmitClose = false;
+
+ if (willEmitClose && (!stream.writable || writable)) return;
+ if (!writable || writableFinished) callback.call(stream);
+ };
+
+ const onerror = (err) => {
+ callback.call(stream, err);
+ };
+
+ const onclose = () => {
+ if (readable && !readableEnded) {
+ if (!isReadableEnded(stream)) {
+ return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
+ }
+ }
+ if (writable && !writableFinished) {
+ if (!isWritableFinished(stream)) {
+ return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
+ }
+ }
+ callback.call(stream);
+ };
+
+ const onrequest = () => {
+ stream.req.on("finish", onfinish);
+ };
+
+ if (isRequest(stream)) {
+ stream.on("complete", onfinish);
+ if (!willEmitClose) {
+ stream.on("abort", onclose);
+ }
+ if (stream.req) onrequest();
+ else stream.on("request", onrequest);
+ } else if (writable && !wState) { // legacy streams
+ stream.on("end", onlegacyfinish);
+ stream.on("close", onlegacyfinish);
+ }
+
+ // Not all streams will emit 'close' after 'aborted'.
+ if (!willEmitClose && typeof stream.aborted === "boolean") {
+ stream.on("aborted", onclose);
+ }
+
+ stream.on("end", onend);
+ stream.on("finish", onfinish);
+ if (options.error !== false) stream.on("error", onerror);
+ stream.on("close", onclose);
+
+ // _closed is for OutgoingMessage which is not a proper Writable.
+ const closed = (!wState && !rState && stream._closed === true) || (
+ (wState && wState.closed) ||
+ (rState && rState.closed) ||
+ (wState && wState.errorEmitted) ||
+ (rState && rState.errorEmitted) ||
+ (rState && stream.req && stream.aborted) ||
+ (
+ (!wState || !willEmitClose || typeof wState.closed !== "boolean") &&
+ (!rState || !willEmitClose || typeof rState.closed !== "boolean") &&
+ (!writable || (wState && wState.finished)) &&
+ (!readable || (rState && rState.endEmitted))
+ )
+ );
+
+ if (closed) {
+ // TODO(ronag): Re-throw error if errorEmitted?
+ // TODO(ronag): Throw premature close as if finished was called?
+ // before being closed? i.e. if closed but not errored, ended or finished.
+ // TODO(ronag): Throw some kind of error? Does it make sense
+ // to call finished() on a "finished" stream?
+ // TODO(ronag): willEmitClose?
+ process.nextTick(() => {
+ callback();
+ });
+ }
+
+ const cleanup = () => {
+ callback = nop;
+ stream.removeListener("aborted", onclose);
+ stream.removeListener("complete", onfinish);
+ stream.removeListener("abort", onclose);
+ stream.removeListener("request", onrequest);
+ if (stream.req) stream.req.removeListener("finish", onfinish);
+ stream.removeListener("end", onlegacyfinish);
+ stream.removeListener("close", onlegacyfinish);
+ stream.removeListener("finish", onfinish);
+ stream.removeListener("end", onend);
+ stream.removeListener("error", onerror);
+ stream.removeListener("close", onclose);
+ };
+
+ if (options.signal && !closed) {
+ const abort = () => {
+ // Keep it because cleanup removes it.
+ const endCallback = callback;
+ cleanup();
+ endCallback.call(stream, new AbortError());
+ };
+ if (options.signal.aborted) {
+ process.nextTick(abort);
+ } else {
+ const originalCallback = callback;
+ callback = once((...args) => {
+ options.signal.removeEventListener("abort", abort);
+ originalCallback.apply(stream, args);
+ });
+ options.signal.addEventListener("abort", abort);
+ }
+ }
+
+ return cleanup;
+}
+
+export default eos;
diff --git a/ext/node/polyfills/internal/streams/lazy_transform.mjs b/ext/node/polyfills/internal/streams/lazy_transform.mjs
new file mode 100644
index 000000000..2bb93bd91
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/lazy_transform.mjs
@@ -0,0 +1,53 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { getDefaultEncoding } from "internal:deno_node/polyfills/internal/crypto/util.ts";
+import stream from "internal:deno_node/polyfills/stream.ts";
+
+function LazyTransform(options) {
+ this._options = options;
+}
+Object.setPrototypeOf(LazyTransform.prototype, stream.Transform.prototype);
+Object.setPrototypeOf(LazyTransform, stream.Transform);
+
+function makeGetter(name) {
+ return function () {
+ stream.Transform.call(this, this._options);
+ this._writableState.decodeStrings = false;
+
+ if (!this._options || !this._options.defaultEncoding) {
+ this._writableState.defaultEncoding = getDefaultEncoding();
+ }
+
+ return this[name];
+ };
+}
+
+function makeSetter(name) {
+ return function (val) {
+ Object.defineProperty(this, name, {
+ value: val,
+ enumerable: true,
+ configurable: true,
+ writable: true,
+ });
+ };
+}
+
+Object.defineProperties(LazyTransform.prototype, {
+ _readableState: {
+ get: makeGetter("_readableState"),
+ set: makeSetter("_readableState"),
+ configurable: true,
+ enumerable: true,
+ },
+ _writableState: {
+ get: makeGetter("_writableState"),
+ set: makeSetter("_writableState"),
+ configurable: true,
+ enumerable: true,
+ },
+});
+
+export default LazyTransform;
diff --git a/ext/node/polyfills/internal/streams/legacy.mjs b/ext/node/polyfills/internal/streams/legacy.mjs
new file mode 100644
index 000000000..0de18956f
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/legacy.mjs
@@ -0,0 +1,113 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import EE from "internal:deno_node/polyfills/events.ts";
+
+function Stream(opts) {
+ EE.call(this, opts);
+}
+Object.setPrototypeOf(Stream.prototype, EE.prototype);
+Object.setPrototypeOf(Stream, EE);
+
+Stream.prototype.pipe = function (dest, options) {
+ // deno-lint-ignore no-this-alias
+ const source = this;
+
+ function ondata(chunk) {
+ if (dest.writable && dest.write(chunk) === false && source.pause) {
+ source.pause();
+ }
+ }
+
+ source.on("data", ondata);
+
+ function ondrain() {
+ if (source.readable && source.resume) {
+ source.resume();
+ }
+ }
+
+ dest.on("drain", ondrain);
+
+ // If the 'end' option is not supplied, dest.end() will be called when
+ // source gets the 'end' or 'close' events. Only dest.end() once.
+ if (!dest._isStdio && (!options || options.end !== false)) {
+ source.on("end", onend);
+ source.on("close", onclose);
+ }
+
+ let didOnEnd = false;
+ function onend() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ dest.end();
+ }
+
+ function onclose() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ if (typeof dest.destroy === "function") dest.destroy();
+ }
+
+ // Don't leave dangling pipes when there are errors.
+ function onerror(er) {
+ cleanup();
+ if (EE.listenerCount(this, "error") === 0) {
+ this.emit("error", er);
+ }
+ }
+
+ prependListener(source, "error", onerror);
+ prependListener(dest, "error", onerror);
+
+ // Remove all the event listeners that were added.
+ function cleanup() {
+ source.removeListener("data", ondata);
+ dest.removeListener("drain", ondrain);
+
+ source.removeListener("end", onend);
+ source.removeListener("close", onclose);
+
+ source.removeListener("error", onerror);
+ dest.removeListener("error", onerror);
+
+ source.removeListener("end", cleanup);
+ source.removeListener("close", cleanup);
+
+ dest.removeListener("close", cleanup);
+ }
+
+ source.on("end", cleanup);
+ source.on("close", cleanup);
+
+ dest.on("close", cleanup);
+ dest.emit("pipe", source);
+
+ // Allow for unix-like usage: A.pipe(B).pipe(C)
+ return dest;
+};
+
+function prependListener(emitter, event, fn) {
+ // Sadly this is not cacheable as some libraries bundle their own
+ // event emitter implementation with them.
+ if (typeof emitter.prependListener === "function") {
+ return emitter.prependListener(event, fn);
+ }
+
+ // This is a hack to make sure that our error handler is attached before any
+ // userland ones. NEVER DO THIS. This is here only because this code needs
+ // to continue to work with older versions of Node.js that do not include
+ // the prependListener() method. The goal is to eventually remove this hack.
+ if (!emitter._events || !emitter._events[event]) {
+ emitter.on(event, fn);
+ } else if (Array.isArray(emitter._events[event])) {
+ emitter._events[event].unshift(fn);
+ } else {
+ emitter._events[event] = [fn, emitter._events[event]];
+ }
+}
+
+export { prependListener, Stream };
diff --git a/ext/node/polyfills/internal/streams/passthrough.mjs b/ext/node/polyfills/internal/streams/passthrough.mjs
new file mode 100644
index 000000000..136a0484a
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/passthrough.mjs
@@ -0,0 +1,7 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { PassThrough } from "internal:deno_node/polyfills/_stream.mjs";
+
+export default PassThrough;
diff --git a/ext/node/polyfills/internal/streams/readable.mjs b/ext/node/polyfills/internal/streams/readable.mjs
new file mode 100644
index 000000000..36133d297
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/readable.mjs
@@ -0,0 +1,9 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { Readable } from "internal:deno_node/polyfills/_stream.mjs";
+const { ReadableState, _fromList, from, fromWeb, toWeb, wrap } = Readable;
+
+export default Readable;
+export { _fromList, from, fromWeb, ReadableState, toWeb, wrap };
diff --git a/ext/node/polyfills/internal/streams/state.mjs b/ext/node/polyfills/internal/streams/state.mjs
new file mode 100644
index 000000000..93708fe9d
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/state.mjs
@@ -0,0 +1,10 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+function getDefaultHighWaterMark(objectMode) {
+ return objectMode ? 16 : 16 * 1024;
+}
+
+export default { getDefaultHighWaterMark };
+export { getDefaultHighWaterMark };
diff --git a/ext/node/polyfills/internal/streams/transform.mjs b/ext/node/polyfills/internal/streams/transform.mjs
new file mode 100644
index 000000000..3fc4fa5cd
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/transform.mjs
@@ -0,0 +1,7 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { Transform } from "internal:deno_node/polyfills/_stream.mjs";
+
+export default Transform;
diff --git a/ext/node/polyfills/internal/streams/utils.mjs b/ext/node/polyfills/internal/streams/utils.mjs
new file mode 100644
index 000000000..a575f831d
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/utils.mjs
@@ -0,0 +1,242 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+const kIsDisturbed = Symbol("kIsDisturbed");
+
+function isReadableNodeStream(obj) {
+ return !!(
+ obj &&
+ typeof obj.pipe === "function" &&
+ typeof obj.on === "function" &&
+ (!obj._writableState || obj._readableState?.readable !== false) && // Duplex
+ (!obj._writableState || obj._readableState) // Writable has .pipe.
+ );
+}
+
+function isWritableNodeStream(obj) {
+ return !!(
+ obj &&
+ typeof obj.write === "function" &&
+ typeof obj.on === "function" &&
+ (!obj._readableState || obj._writableState?.writable !== false) // Duplex
+ );
+}
+
+function isDuplexNodeStream(obj) {
+ return !!(
+ obj &&
+ (typeof obj.pipe === "function" && obj._readableState) &&
+ typeof obj.on === "function" &&
+ typeof obj.write === "function"
+ );
+}
+
+function isNodeStream(obj) {
+ return (
+ obj &&
+ (
+ obj._readableState ||
+ obj._writableState ||
+ (typeof obj.write === "function" && typeof obj.on === "function") ||
+ (typeof obj.pipe === "function" && typeof obj.on === "function")
+ )
+ );
+}
+
+function isDestroyed(stream) {
+ if (!isNodeStream(stream)) return null;
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+ const state = wState || rState;
+ return !!(stream.destroyed || state?.destroyed);
+}
+
+// Have been end():d.
+function isWritableEnded(stream) {
+ if (!isWritableNodeStream(stream)) return null;
+ if (stream.writableEnded === true) return true;
+ const wState = stream._writableState;
+ if (wState?.errored) return false;
+ if (typeof wState?.ended !== "boolean") return null;
+ return wState.ended;
+}
+
+// Have emitted 'finish'.
+function isWritableFinished(stream, strict) {
+ if (!isWritableNodeStream(stream)) return null;
+ if (stream.writableFinished === true) return true;
+ const wState = stream._writableState;
+ if (wState?.errored) return false;
+ if (typeof wState?.finished !== "boolean") return null;
+ return !!(
+ wState.finished ||
+ (strict === false && wState.ended === true && wState.length === 0)
+ );
+}
+
+// Have been push(null):d.
+function isReadableEnded(stream) {
+ if (!isReadableNodeStream(stream)) return null;
+ if (stream.readableEnded === true) return true;
+ const rState = stream._readableState;
+ if (!rState || rState.errored) return false;
+ if (typeof rState?.ended !== "boolean") return null;
+ return rState.ended;
+}
+
+// Have emitted 'end'.
+function isReadableFinished(stream, strict) {
+ if (!isReadableNodeStream(stream)) return null;
+ const rState = stream._readableState;
+ if (rState?.errored) return false;
+ if (typeof rState?.endEmitted !== "boolean") return null;
+ return !!(
+ rState.endEmitted ||
+ (strict === false && rState.ended === true && rState.length === 0)
+ );
+}
+
+function isDisturbed(stream) {
+ return !!(stream && (
+ stream.readableDidRead ||
+ stream.readableAborted ||
+ stream[kIsDisturbed]
+ ));
+}
+
+function isReadable(stream) {
+ const r = isReadableNodeStream(stream);
+ if (r === null || typeof stream?.readable !== "boolean") return null;
+ if (isDestroyed(stream)) return false;
+ return r && stream.readable && !isReadableFinished(stream);
+}
+
+function isWritable(stream) {
+ const r = isWritableNodeStream(stream);
+ if (r === null || typeof stream?.writable !== "boolean") return null;
+ if (isDestroyed(stream)) return false;
+ return r && stream.writable && !isWritableEnded(stream);
+}
+
+function isFinished(stream, opts) {
+ if (!isNodeStream(stream)) {
+ return null;
+ }
+
+ if (isDestroyed(stream)) {
+ return true;
+ }
+
+ if (opts?.readable !== false && isReadable(stream)) {
+ return false;
+ }
+
+ if (opts?.writable !== false && isWritable(stream)) {
+ return false;
+ }
+
+ return true;
+}
+
+function isClosed(stream) {
+ if (!isNodeStream(stream)) {
+ return null;
+ }
+
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+
+ if (
+ typeof wState?.closed === "boolean" ||
+ typeof rState?.closed === "boolean"
+ ) {
+ return wState?.closed || rState?.closed;
+ }
+
+ if (typeof stream._closed === "boolean" && isOutgoingMessage(stream)) {
+ return stream._closed;
+ }
+
+ return null;
+}
+
+function isOutgoingMessage(stream) {
+ return (
+ typeof stream._closed === "boolean" &&
+ typeof stream._defaultKeepAlive === "boolean" &&
+ typeof stream._removedConnection === "boolean" &&
+ typeof stream._removedContLen === "boolean"
+ );
+}
+
+function isServerResponse(stream) {
+ return (
+ typeof stream._sent100 === "boolean" &&
+ isOutgoingMessage(stream)
+ );
+}
+
+function isServerRequest(stream) {
+ return (
+ typeof stream._consuming === "boolean" &&
+ typeof stream._dumped === "boolean" &&
+ stream.req?.upgradeOrConnect === undefined
+ );
+}
+
+function willEmitClose(stream) {
+ if (!isNodeStream(stream)) return null;
+
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+ const state = wState || rState;
+
+ return (!state && isServerResponse(stream)) || !!(
+ state &&
+ state.autoDestroy &&
+ state.emitClose &&
+ state.closed === false
+ );
+}
+
+export default {
+ isDisturbed,
+ kIsDisturbed,
+ isClosed,
+ isDestroyed,
+ isDuplexNodeStream,
+ isFinished,
+ isReadable,
+ isReadableNodeStream,
+ isReadableEnded,
+ isReadableFinished,
+ isNodeStream,
+ isWritable,
+ isWritableNodeStream,
+ isWritableEnded,
+ isWritableFinished,
+ isServerRequest,
+ isServerResponse,
+ willEmitClose,
+};
+export {
+ isClosed,
+ isDestroyed,
+ isDisturbed,
+ isDuplexNodeStream,
+ isFinished,
+ isNodeStream,
+ isReadable,
+ isReadableEnded,
+ isReadableFinished,
+ isReadableNodeStream,
+ isServerRequest,
+ isServerResponse,
+ isWritable,
+ isWritableEnded,
+ isWritableFinished,
+ isWritableNodeStream,
+ kIsDisturbed,
+ willEmitClose,
+};
diff --git a/ext/node/polyfills/internal/streams/writable.mjs b/ext/node/polyfills/internal/streams/writable.mjs
new file mode 100644
index 000000000..6f4d77960
--- /dev/null
+++ b/ext/node/polyfills/internal/streams/writable.mjs
@@ -0,0 +1,9 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+import { Writable } from "internal:deno_node/polyfills/_stream.mjs";
+const { WritableState, fromWeb, toWeb } = Writable;
+
+export default Writable;
+export { fromWeb, toWeb, WritableState };