diff options
Diffstat (limited to 'ext/node/polyfills/internal/streams')
-rw-r--r-- | ext/node/polyfills/internal/streams/add-abort-signal.mjs | 48 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/buffer_list.mjs | 188 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/destroy.mjs | 320 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/duplex.mjs | 9 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/end-of-stream.mjs | 229 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/lazy_transform.mjs | 53 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/legacy.mjs | 113 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/passthrough.mjs | 7 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/readable.mjs | 9 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/state.mjs | 10 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/transform.mjs | 7 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/utils.mjs | 242 | ||||
-rw-r--r-- | ext/node/polyfills/internal/streams/writable.mjs | 9 |
13 files changed, 1244 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal/streams/add-abort-signal.mjs b/ext/node/polyfills/internal/streams/add-abort-signal.mjs new file mode 100644 index 000000000..5d7512f1c --- /dev/null +++ b/ext/node/polyfills/internal/streams/add-abort-signal.mjs @@ -0,0 +1,48 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { AbortError, ERR_INVALID_ARG_TYPE } from "internal:deno_node/polyfills/internal/errors.ts"; +import eos from "internal:deno_node/polyfills/internal/streams/end-of-stream.mjs"; + +// This method is inlined here for readable-stream +// It also does not allow for signal to not exist on the stream +// https://github.com/nodejs/node/pull/36061#discussion_r533718029 +const validateAbortSignal = (signal, name) => { + if ( + typeof signal !== "object" || + !("aborted" in signal) + ) { + throw new ERR_INVALID_ARG_TYPE(name, "AbortSignal", signal); + } +}; + +function isStream(obj) { + return !!(obj && typeof obj.pipe === "function"); +} + +function addAbortSignal(signal, stream) { + validateAbortSignal(signal, "signal"); + if (!isStream(stream)) { + throw new ERR_INVALID_ARG_TYPE("stream", "stream.Stream", stream); + } + return addAbortSignalNoValidate(signal, stream); +} +function addAbortSignalNoValidate(signal, stream) { + if (typeof signal !== "object" || !("aborted" in signal)) { + return stream; + } + const onAbort = () => { + stream.destroy(new AbortError()); + }; + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort); + eos(stream, () => signal.removeEventListener("abort", onAbort)); + } + return stream; +} + +export default { addAbortSignal, addAbortSignalNoValidate }; +export { addAbortSignal, addAbortSignalNoValidate }; diff --git a/ext/node/polyfills/internal/streams/buffer_list.mjs b/ext/node/polyfills/internal/streams/buffer_list.mjs new file mode 100644 index 000000000..3016ffba5 --- /dev/null +++ b/ext/node/polyfills/internal/streams/buffer_list.mjs @@ -0,0 +1,188 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { Buffer } from "internal:deno_node/polyfills/buffer.ts"; +import { inspect } from "internal:deno_node/polyfills/internal/util/inspect.mjs"; + +class BufferList { + constructor() { + this.head = null; + this.tail = null; + this.length = 0; + } + + push(v) { + const entry = { data: v, next: null }; + if (this.length > 0) { + this.tail.next = entry; + } else { + this.head = entry; + } + this.tail = entry; + ++this.length; + } + + unshift(v) { + const entry = { data: v, next: this.head }; + if (this.length === 0) { + this.tail = entry; + } + this.head = entry; + ++this.length; + } + + shift() { + if (this.length === 0) { + return; + } + const ret = this.head.data; + if (this.length === 1) { + this.head = this.tail = null; + } else { + this.head = this.head.next; + } + --this.length; + return ret; + } + + clear() { + this.head = this.tail = null; + this.length = 0; + } + + join(s) { + if (this.length === 0) { + return ""; + } + let p = this.head; + let ret = "" + p.data; + while (p = p.next) { + ret += s + p.data; + } + return ret; + } + + concat(n) { + if (this.length === 0) { + return Buffer.alloc(0); + } + const ret = Buffer.allocUnsafe(n >>> 0); + let p = this.head; + let i = 0; + while (p) { + ret.set(p.data, i); + i += p.data.length; + p = p.next; + } + return ret; + } + + // Consumes a specified amount of bytes or characters from the buffered data. + consume(n, hasStrings) { + const data = this.head.data; + if (n < data.length) { + // `slice` is the same for buffers and strings. + const slice = data.slice(0, n); + this.head.data = data.slice(n); + return slice; + } + if (n === data.length) { + // First chunk is a perfect match. + return this.shift(); + } + // Result spans more than one buffer. + return hasStrings ? this._getString(n) : this._getBuffer(n); + } + + first() { + return this.head.data; + } + + *[Symbol.iterator]() { + for (let p = this.head; p; p = p.next) { + yield p.data; + } + } + + // Consumes a specified amount of characters from the buffered data. + _getString(n) { + let ret = ""; + let p = this.head; + let c = 0; + do { + const str = p.data; + if (n > str.length) { + ret += str; + n -= str.length; + } else { + if (n === str.length) { + ret += str; + ++c; + if (p.next) { + this.head = p.next; + } else { + this.head = this.tail = null; + } + } else { + ret += str.slice(0, n); + this.head = p; + p.data = str.slice(n); + } + break; + } + ++c; + } while (p = p.next); + this.length -= c; + return ret; + } + + // Consumes a specified amount of bytes from the buffered data. + _getBuffer(n) { + const ret = Buffer.allocUnsafe(n); + const retLen = n; + let p = this.head; + let c = 0; + do { + const buf = p.data; + if (n > buf.length) { + ret.set(buf, retLen - n); + n -= buf.length; + } else { + if (n === buf.length) { + ret.set(buf, retLen - n); + ++c; + if (p.next) { + this.head = p.next; + } else { + this.head = this.tail = null; + } + } else { + ret.set( + new Uint8Array(buf.buffer, buf.byteOffset, n), + retLen - n, + ); + this.head = p; + p.data = buf.slice(n); + } + break; + } + ++c; + } while (p = p.next); + this.length -= c; + return ret; + } + + // Make sure the linked list only shows the minimal necessary information. + [inspect.custom](_, options) { + return inspect(this, { + ...options, + // Only inspect one level. + depth: 0, + // It should not recurse. + customInspect: false, + }); + } +} + +export default BufferList; diff --git a/ext/node/polyfills/internal/streams/destroy.mjs b/ext/node/polyfills/internal/streams/destroy.mjs new file mode 100644 index 000000000..b065f2119 --- /dev/null +++ b/ext/node/polyfills/internal/streams/destroy.mjs @@ -0,0 +1,320 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { aggregateTwoErrors, ERR_MULTIPLE_CALLBACK } from "internal:deno_node/polyfills/internal/errors.ts"; +import * as process from "internal:deno_node/polyfills/_process/process.ts"; + +const kDestroy = Symbol("kDestroy"); +const kConstruct = Symbol("kConstruct"); + +function checkError(err, w, r) { + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; // eslint-disable-line no-unused-expressions + + if (w && !w.errored) { + w.errored = err; + } + if (r && !r.errored) { + r.errored = err; + } + } +} + +// Backwards compat. cb() is undocumented and unused in core but +// unfortunately might be used by modules. +function destroy(err, cb) { + const r = this._readableState; + const w = this._writableState; + // With duplex streams we use the writable side for state. + const s = w || r; + + if ((w && w.destroyed) || (r && r.destroyed)) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + // We set destroyed to true before firing error callbacks in order + // to make it re-entrance safe in case destroy() is called within callbacks + checkError(err, w, r); + + if (w) { + w.destroyed = true; + } + if (r) { + r.destroyed = true; + } + + // If still constructing then defer calling _destroy. + if (!s.constructed) { + this.once(kDestroy, function (er) { + _destroy(this, aggregateTwoErrors(er, err), cb); + }); + } else { + _destroy(this, err, cb); + } + + return this; +} + +function _destroy(self, err, cb) { + let called = false; + + function onDestroy(err) { + if (called) { + return; + } + called = true; + + const r = self._readableState; + const w = self._writableState; + + checkError(err, w, r); + + if (w) { + w.closed = true; + } + if (r) { + r.closed = true; + } + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + process.nextTick(emitErrorCloseNT, self, err); + } else { + process.nextTick(emitCloseNT, self); + } + } + try { + const result = self._destroy(err || null, onDestroy); + if (result != null) { + const then = result.then; + if (typeof then === "function") { + then.call( + result, + function () { + process.nextTick(onDestroy, null); + }, + function (err) { + process.nextTick(onDestroy, err); + }, + ); + } + } + } catch (err) { + onDestroy(err); + } +} + +function emitErrorCloseNT(self, err) { + emitErrorNT(self, err); + emitCloseNT(self); +} + +function emitCloseNT(self) { + const r = self._readableState; + const w = self._writableState; + + if (w) { + w.closeEmitted = true; + } + if (r) { + r.closeEmitted = true; + } + + if ((w && w.emitClose) || (r && r.emitClose)) { + self.emit("close"); + } +} + +function emitErrorNT(self, err) { + const r = self._readableState; + const w = self._writableState; + + if ((w && w.errorEmitted) || (r && r.errorEmitted)) { + return; + } + + if (w) { + w.errorEmitted = true; + } + if (r) { + r.errorEmitted = true; + } + + self.emit("error", err); +} + +function undestroy() { + const r = this._readableState; + const w = this._writableState; + + if (r) { + r.constructed = true; + r.closed = false; + r.closeEmitted = false; + r.destroyed = false; + r.errored = null; + r.errorEmitted = false; + r.reading = false; + r.ended = false; + r.endEmitted = false; + } + + if (w) { + w.constructed = true; + w.destroyed = false; + w.closed = false; + w.closeEmitted = false; + w.errored = null; + w.errorEmitted = false; + w.ended = false; + w.ending = false; + w.finalCalled = false; + w.prefinished = false; + w.finished = false; + } +} + +function errorOrDestroy(stream, err, sync) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + + const r = stream._readableState; + const w = stream._writableState; + + if ((w && w.destroyed) || (r && r.destroyed)) { + return this; + } + + if ((r && r.autoDestroy) || (w && w.autoDestroy)) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; // eslint-disable-line no-unused-expressions + + if (w && !w.errored) { + w.errored = err; + } + if (r && !r.errored) { + r.errored = err; + } + if (sync) { + process.nextTick(emitErrorNT, stream, err); + } else { + emitErrorNT(stream, err); + } + } +} + +function construct(stream, cb) { + if (typeof stream._construct !== "function") { + return; + } + + const r = stream._readableState; + const w = stream._writableState; + + if (r) { + r.constructed = false; + } + if (w) { + w.constructed = false; + } + + stream.once(kConstruct, cb); + + if (stream.listenerCount(kConstruct) > 1) { + // Duplex + return; + } + + process.nextTick(constructNT, stream); +} + +function constructNT(stream) { + let called = false; + + function onConstruct(err) { + if (called) { + errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK()); + return; + } + called = true; + + const r = stream._readableState; + const w = stream._writableState; + const s = w || r; + + if (r) { + r.constructed = true; + } + if (w) { + w.constructed = true; + } + + if (s.destroyed) { + stream.emit(kDestroy, err); + } else if (err) { + errorOrDestroy(stream, err, true); + } else { + process.nextTick(emitConstructNT, stream); + } + } + + try { + const result = stream._construct(onConstruct); + if (result != null) { + const then = result.then; + if (typeof then === "function") { + then.call( + result, + function () { + process.nextTick(onConstruct, null); + }, + function (err) { + process.nextTick(onConstruct, err); + }, + ); + } + } + } catch (err) { + onConstruct(err); + } +} + +function emitConstructNT(stream) { + stream.emit(kConstruct); +} + +function isRequest(stream) { + return stream && stream.setHeader && typeof stream.abort === "function"; +} + +// Normalize destroy for legacy. +function destroyer(stream, err) { + if (!stream) return; + if (isRequest(stream)) return stream.abort(); + if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === "function") return stream.destroy(err); + if (typeof stream.close === "function") return stream.close(); +} + +export default { + construct, + destroyer, + destroy, + undestroy, + errorOrDestroy, +}; +export { construct, destroy, destroyer, errorOrDestroy, undestroy }; diff --git a/ext/node/polyfills/internal/streams/duplex.mjs b/ext/node/polyfills/internal/streams/duplex.mjs new file mode 100644 index 000000000..b2086d467 --- /dev/null +++ b/ext/node/polyfills/internal/streams/duplex.mjs @@ -0,0 +1,9 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { Duplex } from "internal:deno_node/polyfills/_stream.mjs"; +const { from, fromWeb, toWeb } = Duplex; + +export default Duplex; +export { from, fromWeb, toWeb }; diff --git a/ext/node/polyfills/internal/streams/end-of-stream.mjs b/ext/node/polyfills/internal/streams/end-of-stream.mjs new file mode 100644 index 000000000..b5c380d56 --- /dev/null +++ b/ext/node/polyfills/internal/streams/end-of-stream.mjs @@ -0,0 +1,229 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { AbortError, ERR_STREAM_PREMATURE_CLOSE } from "internal:deno_node/polyfills/internal/errors.ts"; +import { once } from "internal:deno_node/polyfills/internal/util.mjs"; +import { + validateAbortSignal, + validateFunction, + validateObject, +} from "internal:deno_node/polyfills/internal/validators.mjs"; +import * as process from "internal:deno_node/polyfills/_process/process.ts"; + +function isRequest(stream) { + return stream.setHeader && typeof stream.abort === "function"; +} + +function isServerResponse(stream) { + return ( + typeof stream._sent100 === "boolean" && + typeof stream._removedConnection === "boolean" && + typeof stream._removedContLen === "boolean" && + typeof stream._removedTE === "boolean" && + typeof stream._closed === "boolean" + ); +} + +function isReadable(stream) { + return typeof stream.readable === "boolean" || + typeof stream.readableEnded === "boolean" || + !!stream._readableState; +} + +function isWritable(stream) { + return typeof stream.writable === "boolean" || + typeof stream.writableEnded === "boolean" || + !!stream._writableState; +} + +function isWritableFinished(stream) { + if (stream.writableFinished) return true; + const wState = stream._writableState; + if (!wState || wState.errored) return false; + return wState.finished || (wState.ended && wState.length === 0); +} + +const nop = () => {}; + +function isReadableEnded(stream) { + if (stream.readableEnded) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + return rState.endEmitted || (rState.ended && rState.length === 0); +} + +function eos(stream, options, callback) { + if (arguments.length === 2) { + callback = options; + options = {}; + } else if (options == null) { + options = {}; + } else { + validateObject(options, "options"); + } + validateFunction(callback, "callback"); + validateAbortSignal(options.signal, "options.signal"); + + callback = once(callback); + + const readable = options.readable || + (options.readable !== false && isReadable(stream)); + const writable = options.writable || + (options.writable !== false && isWritable(stream)); + + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + + const onlegacyfinish = () => { + if (!stream.writable) onfinish(); + }; + + // TODO (ronag): Improve soft detection to include core modules and + // common ecosystem modules that do properly emit 'close' but fail + // this generic check. + let willEmitClose = isServerResponse(stream) || ( + state && + state.autoDestroy && + state.emitClose && + state.closed === false && + isReadable(stream) === readable && + isWritable(stream) === writable + ); + + let writableFinished = stream.writableFinished || + (wState && wState.finished); + const onfinish = () => { + writableFinished = true; + // Stream should not be destroyed here. If it is that + // means that user space is doing something differently and + // we cannot trust willEmitClose. + if (stream.destroyed) willEmitClose = false; + + if (willEmitClose && (!stream.readable || readable)) return; + if (!readable || readableEnded) callback.call(stream); + }; + + let readableEnded = stream.readableEnded || + (rState && rState.endEmitted); + const onend = () => { + readableEnded = true; + // Stream should not be destroyed here. If it is that + // means that user space is doing something differently and + // we cannot trust willEmitClose. + if (stream.destroyed) willEmitClose = false; + + if (willEmitClose && (!stream.writable || writable)) return; + if (!writable || writableFinished) callback.call(stream); + }; + + const onerror = (err) => { + callback.call(stream, err); + }; + + const onclose = () => { + if (readable && !readableEnded) { + if (!isReadableEnded(stream)) { + return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } + } + if (writable && !writableFinished) { + if (!isWritableFinished(stream)) { + return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } + } + callback.call(stream); + }; + + const onrequest = () => { + stream.req.on("finish", onfinish); + }; + + if (isRequest(stream)) { + stream.on("complete", onfinish); + if (!willEmitClose) { + stream.on("abort", onclose); + } + if (stream.req) onrequest(); + else stream.on("request", onrequest); + } else if (writable && !wState) { // legacy streams + stream.on("end", onlegacyfinish); + stream.on("close", onlegacyfinish); + } + + // Not all streams will emit 'close' after 'aborted'. + if (!willEmitClose && typeof stream.aborted === "boolean") { + stream.on("aborted", onclose); + } + + stream.on("end", onend); + stream.on("finish", onfinish); + if (options.error !== false) stream.on("error", onerror); + stream.on("close", onclose); + + // _closed is for OutgoingMessage which is not a proper Writable. + const closed = (!wState && !rState && stream._closed === true) || ( + (wState && wState.closed) || + (rState && rState.closed) || + (wState && wState.errorEmitted) || + (rState && rState.errorEmitted) || + (rState && stream.req && stream.aborted) || + ( + (!wState || !willEmitClose || typeof wState.closed !== "boolean") && + (!rState || !willEmitClose || typeof rState.closed !== "boolean") && + (!writable || (wState && wState.finished)) && + (!readable || (rState && rState.endEmitted)) + ) + ); + + if (closed) { + // TODO(ronag): Re-throw error if errorEmitted? + // TODO(ronag): Throw premature close as if finished was called? + // before being closed? i.e. if closed but not errored, ended or finished. + // TODO(ronag): Throw some kind of error? Does it make sense + // to call finished() on a "finished" stream? + // TODO(ronag): willEmitClose? + process.nextTick(() => { + callback(); + }); + } + + const cleanup = () => { + callback = nop; + stream.removeListener("aborted", onclose); + stream.removeListener("complete", onfinish); + stream.removeListener("abort", onclose); + stream.removeListener("request", onrequest); + if (stream.req) stream.req.removeListener("finish", onfinish); + stream.removeListener("end", onlegacyfinish); + stream.removeListener("close", onlegacyfinish); + stream.removeListener("finish", onfinish); + stream.removeListener("end", onend); + stream.removeListener("error", onerror); + stream.removeListener("close", onclose); + }; + + if (options.signal && !closed) { + const abort = () => { + // Keep it because cleanup removes it. + const endCallback = callback; + cleanup(); + endCallback.call(stream, new AbortError()); + }; + if (options.signal.aborted) { + process.nextTick(abort); + } else { + const originalCallback = callback; + callback = once((...args) => { + options.signal.removeEventListener("abort", abort); + originalCallback.apply(stream, args); + }); + options.signal.addEventListener("abort", abort); + } + } + + return cleanup; +} + +export default eos; diff --git a/ext/node/polyfills/internal/streams/lazy_transform.mjs b/ext/node/polyfills/internal/streams/lazy_transform.mjs new file mode 100644 index 000000000..2bb93bd91 --- /dev/null +++ b/ext/node/polyfills/internal/streams/lazy_transform.mjs @@ -0,0 +1,53 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { getDefaultEncoding } from "internal:deno_node/polyfills/internal/crypto/util.ts"; +import stream from "internal:deno_node/polyfills/stream.ts"; + +function LazyTransform(options) { + this._options = options; +} +Object.setPrototypeOf(LazyTransform.prototype, stream.Transform.prototype); +Object.setPrototypeOf(LazyTransform, stream.Transform); + +function makeGetter(name) { + return function () { + stream.Transform.call(this, this._options); + this._writableState.decodeStrings = false; + + if (!this._options || !this._options.defaultEncoding) { + this._writableState.defaultEncoding = getDefaultEncoding(); + } + + return this[name]; + }; +} + +function makeSetter(name) { + return function (val) { + Object.defineProperty(this, name, { + value: val, + enumerable: true, + configurable: true, + writable: true, + }); + }; +} + +Object.defineProperties(LazyTransform.prototype, { + _readableState: { + get: makeGetter("_readableState"), + set: makeSetter("_readableState"), + configurable: true, + enumerable: true, + }, + _writableState: { + get: makeGetter("_writableState"), + set: makeSetter("_writableState"), + configurable: true, + enumerable: true, + }, +}); + +export default LazyTransform; diff --git a/ext/node/polyfills/internal/streams/legacy.mjs b/ext/node/polyfills/internal/streams/legacy.mjs new file mode 100644 index 000000000..0de18956f --- /dev/null +++ b/ext/node/polyfills/internal/streams/legacy.mjs @@ -0,0 +1,113 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import EE from "internal:deno_node/polyfills/events.ts"; + +function Stream(opts) { + EE.call(this, opts); +} +Object.setPrototypeOf(Stream.prototype, EE.prototype); +Object.setPrototypeOf(Stream, EE); + +Stream.prototype.pipe = function (dest, options) { + // deno-lint-ignore no-this-alias + const source = this; + + function ondata(chunk) { + if (dest.writable && dest.write(chunk) === false && source.pause) { + source.pause(); + } + } + + source.on("data", ondata); + + function ondrain() { + if (source.readable && source.resume) { + source.resume(); + } + } + + dest.on("drain", ondrain); + + // If the 'end' option is not supplied, dest.end() will be called when + // source gets the 'end' or 'close' events. Only dest.end() once. + if (!dest._isStdio && (!options || options.end !== false)) { + source.on("end", onend); + source.on("close", onclose); + } + + let didOnEnd = false; + function onend() { + if (didOnEnd) return; + didOnEnd = true; + + dest.end(); + } + + function onclose() { + if (didOnEnd) return; + didOnEnd = true; + + if (typeof dest.destroy === "function") dest.destroy(); + } + + // Don't leave dangling pipes when there are errors. + function onerror(er) { + cleanup(); + if (EE.listenerCount(this, "error") === 0) { + this.emit("error", er); + } + } + + prependListener(source, "error", onerror); + prependListener(dest, "error", onerror); + + // Remove all the event listeners that were added. + function cleanup() { + source.removeListener("data", ondata); + dest.removeListener("drain", ondrain); + + source.removeListener("end", onend); + source.removeListener("close", onclose); + + source.removeListener("error", onerror); + dest.removeListener("error", onerror); + + source.removeListener("end", cleanup); + source.removeListener("close", cleanup); + + dest.removeListener("close", cleanup); + } + + source.on("end", cleanup); + source.on("close", cleanup); + + dest.on("close", cleanup); + dest.emit("pipe", source); + + // Allow for unix-like usage: A.pipe(B).pipe(C) + return dest; +}; + +function prependListener(emitter, event, fn) { + // Sadly this is not cacheable as some libraries bundle their own + // event emitter implementation with them. + if (typeof emitter.prependListener === "function") { + return emitter.prependListener(event, fn); + } + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + // the prependListener() method. The goal is to eventually remove this hack. + if (!emitter._events || !emitter._events[event]) { + emitter.on(event, fn); + } else if (Array.isArray(emitter._events[event])) { + emitter._events[event].unshift(fn); + } else { + emitter._events[event] = [fn, emitter._events[event]]; + } +} + +export { prependListener, Stream }; diff --git a/ext/node/polyfills/internal/streams/passthrough.mjs b/ext/node/polyfills/internal/streams/passthrough.mjs new file mode 100644 index 000000000..136a0484a --- /dev/null +++ b/ext/node/polyfills/internal/streams/passthrough.mjs @@ -0,0 +1,7 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { PassThrough } from "internal:deno_node/polyfills/_stream.mjs"; + +export default PassThrough; diff --git a/ext/node/polyfills/internal/streams/readable.mjs b/ext/node/polyfills/internal/streams/readable.mjs new file mode 100644 index 000000000..36133d297 --- /dev/null +++ b/ext/node/polyfills/internal/streams/readable.mjs @@ -0,0 +1,9 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { Readable } from "internal:deno_node/polyfills/_stream.mjs"; +const { ReadableState, _fromList, from, fromWeb, toWeb, wrap } = Readable; + +export default Readable; +export { _fromList, from, fromWeb, ReadableState, toWeb, wrap }; diff --git a/ext/node/polyfills/internal/streams/state.mjs b/ext/node/polyfills/internal/streams/state.mjs new file mode 100644 index 000000000..93708fe9d --- /dev/null +++ b/ext/node/polyfills/internal/streams/state.mjs @@ -0,0 +1,10 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +function getDefaultHighWaterMark(objectMode) { + return objectMode ? 16 : 16 * 1024; +} + +export default { getDefaultHighWaterMark }; +export { getDefaultHighWaterMark }; diff --git a/ext/node/polyfills/internal/streams/transform.mjs b/ext/node/polyfills/internal/streams/transform.mjs new file mode 100644 index 000000000..3fc4fa5cd --- /dev/null +++ b/ext/node/polyfills/internal/streams/transform.mjs @@ -0,0 +1,7 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { Transform } from "internal:deno_node/polyfills/_stream.mjs"; + +export default Transform; diff --git a/ext/node/polyfills/internal/streams/utils.mjs b/ext/node/polyfills/internal/streams/utils.mjs new file mode 100644 index 000000000..a575f831d --- /dev/null +++ b/ext/node/polyfills/internal/streams/utils.mjs @@ -0,0 +1,242 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +const kIsDisturbed = Symbol("kIsDisturbed"); + +function isReadableNodeStream(obj) { + return !!( + obj && + typeof obj.pipe === "function" && + typeof obj.on === "function" && + (!obj._writableState || obj._readableState?.readable !== false) && // Duplex + (!obj._writableState || obj._readableState) // Writable has .pipe. + ); +} + +function isWritableNodeStream(obj) { + return !!( + obj && + typeof obj.write === "function" && + typeof obj.on === "function" && + (!obj._readableState || obj._writableState?.writable !== false) // Duplex + ); +} + +function isDuplexNodeStream(obj) { + return !!( + obj && + (typeof obj.pipe === "function" && obj._readableState) && + typeof obj.on === "function" && + typeof obj.write === "function" + ); +} + +function isNodeStream(obj) { + return ( + obj && + ( + obj._readableState || + obj._writableState || + (typeof obj.write === "function" && typeof obj.on === "function") || + (typeof obj.pipe === "function" && typeof obj.on === "function") + ) + ); +} + +function isDestroyed(stream) { + if (!isNodeStream(stream)) return null; + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + return !!(stream.destroyed || state?.destroyed); +} + +// Have been end():d. +function isWritableEnded(stream) { + if (!isWritableNodeStream(stream)) return null; + if (stream.writableEnded === true) return true; + const wState = stream._writableState; + if (wState?.errored) return false; + if (typeof wState?.ended !== "boolean") return null; + return wState.ended; +} + +// Have emitted 'finish'. +function isWritableFinished(stream, strict) { + if (!isWritableNodeStream(stream)) return null; + if (stream.writableFinished === true) return true; + const wState = stream._writableState; + if (wState?.errored) return false; + if (typeof wState?.finished !== "boolean") return null; + return !!( + wState.finished || + (strict === false && wState.ended === true && wState.length === 0) + ); +} + +// Have been push(null):d. +function isReadableEnded(stream) { + if (!isReadableNodeStream(stream)) return null; + if (stream.readableEnded === true) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + if (typeof rState?.ended !== "boolean") return null; + return rState.ended; +} + +// Have emitted 'end'. +function isReadableFinished(stream, strict) { + if (!isReadableNodeStream(stream)) return null; + const rState = stream._readableState; + if (rState?.errored) return false; + if (typeof rState?.endEmitted !== "boolean") return null; + return !!( + rState.endEmitted || + (strict === false && rState.ended === true && rState.length === 0) + ); +} + +function isDisturbed(stream) { + return !!(stream && ( + stream.readableDidRead || + stream.readableAborted || + stream[kIsDisturbed] + )); +} + +function isReadable(stream) { + const r = isReadableNodeStream(stream); + if (r === null || typeof stream?.readable !== "boolean") return null; + if (isDestroyed(stream)) return false; + return r && stream.readable && !isReadableFinished(stream); +} + +function isWritable(stream) { + const r = isWritableNodeStream(stream); + if (r === null || typeof stream?.writable !== "boolean") return null; + if (isDestroyed(stream)) return false; + return r && stream.writable && !isWritableEnded(stream); +} + +function isFinished(stream, opts) { + if (!isNodeStream(stream)) { + return null; + } + + if (isDestroyed(stream)) { + return true; + } + + if (opts?.readable !== false && isReadable(stream)) { + return false; + } + + if (opts?.writable !== false && isWritable(stream)) { + return false; + } + + return true; +} + +function isClosed(stream) { + if (!isNodeStream(stream)) { + return null; + } + + const wState = stream._writableState; + const rState = stream._readableState; + + if ( + typeof wState?.closed === "boolean" || + typeof rState?.closed === "boolean" + ) { + return wState?.closed || rState?.closed; + } + + if (typeof stream._closed === "boolean" && isOutgoingMessage(stream)) { + return stream._closed; + } + + return null; +} + +function isOutgoingMessage(stream) { + return ( + typeof stream._closed === "boolean" && + typeof stream._defaultKeepAlive === "boolean" && + typeof stream._removedConnection === "boolean" && + typeof stream._removedContLen === "boolean" + ); +} + +function isServerResponse(stream) { + return ( + typeof stream._sent100 === "boolean" && + isOutgoingMessage(stream) + ); +} + +function isServerRequest(stream) { + return ( + typeof stream._consuming === "boolean" && + typeof stream._dumped === "boolean" && + stream.req?.upgradeOrConnect === undefined + ); +} + +function willEmitClose(stream) { + if (!isNodeStream(stream)) return null; + + const wState = stream._writableState; + const rState = stream._readableState; + const state = wState || rState; + + return (!state && isServerResponse(stream)) || !!( + state && + state.autoDestroy && + state.emitClose && + state.closed === false + ); +} + +export default { + isDisturbed, + kIsDisturbed, + isClosed, + isDestroyed, + isDuplexNodeStream, + isFinished, + isReadable, + isReadableNodeStream, + isReadableEnded, + isReadableFinished, + isNodeStream, + isWritable, + isWritableNodeStream, + isWritableEnded, + isWritableFinished, + isServerRequest, + isServerResponse, + willEmitClose, +}; +export { + isClosed, + isDestroyed, + isDisturbed, + isDuplexNodeStream, + isFinished, + isNodeStream, + isReadable, + isReadableEnded, + isReadableFinished, + isReadableNodeStream, + isServerRequest, + isServerResponse, + isWritable, + isWritableEnded, + isWritableFinished, + isWritableNodeStream, + kIsDisturbed, + willEmitClose, +}; diff --git a/ext/node/polyfills/internal/streams/writable.mjs b/ext/node/polyfills/internal/streams/writable.mjs new file mode 100644 index 000000000..6f4d77960 --- /dev/null +++ b/ext/node/polyfills/internal/streams/writable.mjs @@ -0,0 +1,9 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// deno-lint-ignore-file + +import { Writable } from "internal:deno_node/polyfills/_stream.mjs"; +const { WritableState, fromWeb, toWeb } = Writable; + +export default Writable; +export { fromWeb, toWeb, WritableState }; |