diff options
-rw-r--r-- | std/node/_errors.ts | 221 | ||||
-rw-r--r-- | std/node/_stream/async_iterator.ts | 260 | ||||
-rw-r--r-- | std/node/_stream/async_iterator_test.ts | 249 | ||||
-rw-r--r-- | std/node/_stream/buffer_list.ts | 183 | ||||
-rw-r--r-- | std/node/_stream/duplex.ts | 3 | ||||
-rw-r--r-- | std/node/_stream/end-of-stream.ts | 240 | ||||
-rw-r--r-- | std/node/_stream/from.ts | 102 | ||||
-rw-r--r-- | std/node/_stream/readable.ts | 1248 | ||||
-rw-r--r-- | std/node/_stream/readable_test.ts | 489 | ||||
-rw-r--r-- | std/node/_stream/stream.ts | 79 | ||||
-rw-r--r-- | std/node/_stream/symbols.ts | 4 | ||||
-rw-r--r-- | std/node/_stream/writable.ts | 952 | ||||
-rw-r--r-- | std/node/_stream/writable_test.ts | 209 | ||||
-rw-r--r-- | std/node/_util.ts | 6 | ||||
-rw-r--r-- | std/node/_utils.ts | 12 | ||||
-rw-r--r-- | std/node/assertion_error.ts | 3 |
16 files changed, 4212 insertions, 48 deletions
diff --git a/std/node/_errors.ts b/std/node/_errors.ts index 864f1252b..1fcd4a2fa 100644 --- a/std/node/_errors.ts +++ b/std/node/_errors.ts @@ -1,52 +1,99 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +// Copyright Node.js contributors. All rights reserved. MIT License. +/************ NOT IMPLEMENTED +* ERR_INVALID_ARG_VALUE +* ERR_INVALID_MODULE_SPECIFIER +* ERR_INVALID_PACKAGE_TARGET +* ERR_INVALID_URL_SCHEME +* ERR_MANIFEST_ASSERT_INTEGRITY +* ERR_MISSING_ARGS +* ERR_MODULE_NOT_FOUND +* ERR_PACKAGE_PATH_NOT_EXPORTED +* ERR_QUICSESSION_VERSION_NEGOTIATION +* ERR_REQUIRE_ESM +* ERR_SOCKET_BAD_PORT +* ERR_TLS_CERT_ALTNAME_INVALID +* ERR_UNHANDLED_ERROR +* ERR_WORKER_INVALID_EXEC_ARGV +* ERR_WORKER_PATH +* ERR_QUIC_ERROR +* ERR_SOCKET_BUFFER_SIZE //System error, shouldn't ever happen inside Deno +* ERR_SYSTEM_ERROR //System error, shouldn't ever happen inside Deno +* ERR_TTY_INIT_FAILED //System error, shouldn't ever happen inside Deno +* ERR_INVALID_PACKAGE_CONFIG // package.json stuff, probably useless +*************/ -// Adapted from Node.js. Copyright Joyent, Inc. and other Node contributors. +import { unreachable } from "../testing/asserts.ts"; -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: +/** + * All error instances in Node have additional methods and properties + * This export class is meant to be extended by these instances abstracting native JS error instances + */ +export class NodeErrorAbstraction extends Error { + code: string; -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. + constructor(name: string, code: string, message: string) { + super(message); + this.code = code; + this.name = name; + //This number changes dependending on the name of this class + //20 characters as of now + this.stack = this.stack && `${name} [${this.code}]${this.stack.slice(20)}`; + } -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. + toString() { + return `${this.name} [${this.code}]: ${this.message}`; + } +} -import { unreachable } from "../testing/asserts.ts"; +export class NodeError extends NodeErrorAbstraction { + constructor(code: string, message: string) { + super(Error.prototype.name, code, message); + } +} -// It will do so until we'll have Node errors completely ported (#5944): +export class NodeSyntaxError extends NodeErrorAbstraction + implements SyntaxError { + constructor(code: string, message: string) { + super(SyntaxError.prototype.name, code, message); + Object.setPrototypeOf(this, SyntaxError.prototype); + } +} -// Ref: https://github.com/nodejs/node/blob/50d28d4b3a616b04537feff014aa70437f064e30/lib/internal/errors.js#L251 -// Ref: https://github.com/nodejs/node/blob/50d28d4b3a616b04537feff014aa70437f064e30/lib/internal/errors.js#L299 -// Ref: https://github.com/nodejs/node/blob/50d28d4b3a616b04537feff014aa70437f064e30/lib/internal/errors.js#L325 -// Ref: https://github.com/nodejs/node/blob/50d28d4b3a616b04537feff014aa70437f064e30/lib/internal/errors.js#L943 -class ERR_INVALID_ARG_TYPE extends TypeError { - code = "ERR_INVALID_ARG_TYPE"; +export class NodeRangeError extends NodeErrorAbstraction { + constructor(code: string, message: string) { + super(RangeError.prototype.name, code, message); + Object.setPrototypeOf(this, RangeError.prototype); + } +} + +export class NodeTypeError extends NodeErrorAbstraction implements TypeError { + constructor(code: string, message: string) { + super(TypeError.prototype.name, code, message); + Object.setPrototypeOf(this, TypeError.prototype); + } +} - constructor(a1: string, a2: string, a3: unknown) { +export class NodeURIError extends NodeErrorAbstraction implements URIError { + constructor(code: string, message: string) { + super(URIError.prototype.name, code, message); + Object.setPrototypeOf(this, URIError.prototype); + } +} + +export class ERR_INVALID_ARG_TYPE extends NodeTypeError { + constructor(a1: string, a2: string | string[], a3: unknown) { super( - `The "${a1}" argument must be of type ${a2.toLocaleLowerCase()}. Received ${typeof a3} (${a3})`, + "ERR_INVALID_ARG_TYPE", + `The "${a1}" argument must be of type ${ + typeof a2 === "string" + ? a2.toLocaleLowerCase() + : a2.map((x) => x.toLocaleLowerCase()).join(", ") + }. Received ${typeof a3} (${a3})`, ); - const { name } = this; - // Add the error code to the name to include it in the stack trace. - this.name = `${name} [${this.code}]`; - // Access the stack to generate the error message including the error code from the name. - this.stack; - // Reset the name to the actual name. - this.name = name; } } -class ERR_OUT_OF_RANGE extends RangeError { +export class ERR_OUT_OF_RANGE extends RangeError { code = "ERR_OUT_OF_RANGE"; constructor(str: string, range: string, received: unknown) { @@ -64,11 +111,6 @@ class ERR_OUT_OF_RANGE extends RangeError { } } -export const codes = { - ERR_INVALID_ARG_TYPE, - ERR_OUT_OF_RANGE, -}; - // In Node these values are coming from libuv: // Ref: https://github.com/libuv/libuv/blob/v1.x/include/uv/errno.h // Ref: https://github.com/nodejs/node/blob/524123fbf064ff64bb6fcd83485cfc27db932f68/lib/internal/errors.js#L383 @@ -342,3 +384,100 @@ export const errorMap = new Map<number, [string, string]>( ? linux : unreachable(), ); +export class ERR_METHOD_NOT_IMPLEMENTED extends NodeError { + constructor(x: string) { + super( + "ERR_METHOD_NOT_IMPLEMENTED", + `The ${x} method is not implemented`, + ); + } +} +export class ERR_MULTIPLE_CALLBACK extends NodeError { + constructor() { + super( + "ERR_MULTIPLE_CALLBACK", + `Callback called multiple times`, + ); + } +} +export class ERR_STREAM_ALREADY_FINISHED extends NodeError { + constructor(x: string) { + super( + "ERR_STREAM_ALREADY_FINISHED", + `Cannot call ${x} after a stream was finished`, + ); + } +} +export class ERR_STREAM_CANNOT_PIPE extends NodeError { + constructor() { + super( + "ERR_STREAM_CANNOT_PIPE", + `Cannot pipe, not readable`, + ); + } +} +export class ERR_STREAM_DESTROYED extends NodeError { + constructor(x: string) { + super( + "ERR_STREAM_DESTROYED", + `Cannot call ${x} after a stream was destroyed`, + ); + } +} +export class ERR_STREAM_NULL_VALUES extends NodeTypeError { + constructor() { + super( + "ERR_STREAM_NULL_VALUES", + `May not write null values to stream`, + ); + } +} +export class ERR_STREAM_PREMATURE_CLOSE extends NodeError { + constructor() { + super( + "ERR_STREAM_PREMATURE_CLOSE", + `Premature close`, + ); + } +} +export class ERR_STREAM_PUSH_AFTER_EOF extends NodeError { + constructor() { + super( + "ERR_STREAM_PUSH_AFTER_EOF", + `stream.push() after EOF`, + ); + } +} +export class ERR_STREAM_UNSHIFT_AFTER_END_EVENT extends NodeError { + constructor() { + super( + "ERR_STREAM_UNSHIFT_AFTER_END_EVENT", + `stream.unshift() after end event`, + ); + } +} +export class ERR_STREAM_WRITE_AFTER_END extends NodeError { + constructor() { + super( + "ERR_STREAM_WRITE_AFTER_END", + `write after end`, + ); + } +} +export class ERR_UNKNOWN_ENCODING extends NodeTypeError { + constructor(x: string) { + super( + "ERR_UNKNOWN_ENCODING", + `Unknown encoding: ${x}`, + ); + } +} + +export class ERR_INVALID_OPT_VALUE extends NodeTypeError { + constructor(name: string, value: unknown) { + super( + "ERR_INVALID_OPT_VALUE", + `The value "${value}" is invalid for option "${name}"`, + ); + } +} diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts new file mode 100644 index 000000000..cd1b6db3c --- /dev/null +++ b/std/node/_stream/async_iterator.ts @@ -0,0 +1,260 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import type { Buffer } from "../buffer.ts"; +import finished from "./end-of-stream.ts"; +import Readable from "./readable.ts"; +import type Stream from "./stream.ts"; + +const kLastResolve = Symbol("lastResolve"); +const kLastReject = Symbol("lastReject"); +const kError = Symbol("error"); +const kEnded = Symbol("ended"); +const kLastPromise = Symbol("lastPromise"); +const kHandlePromise = Symbol("handlePromise"); +const kStream = Symbol("stream"); + +// TODO(Soremwar) +// Add Duplex streams +type IterableStreams = Stream | Readable; + +type IterableItem = Buffer | string | Uint8Array | undefined; +type ReadableIteratorResult = IteratorResult<IterableItem>; + +function initIteratorSymbols( + o: ReadableStreamAsyncIterator, + symbols: symbol[], +) { + const properties: PropertyDescriptorMap = {}; + for (const sym in symbols) { + properties[sym] = { + configurable: false, + enumerable: false, + writable: true, + }; + } + Object.defineProperties(o, properties); +} + +// TODO(Soremwar) +// Bring back once requests are implemented +// function isRequest(stream: any) { +// return stream && stream.setHeader && typeof stream.abort === "function"; +// } + +//TODO(Soremwar) +//Should be any implementation of stream +// deno-lint-ignore no-explicit-any +function destroyer(stream: any, err?: Error | null) { + // TODO(Soremwar) + // Bring back once requests are implemented + // if (isRequest(stream)) return stream.abort(); + // if (isRequest(stream.req)) return stream.req.abort(); + if (typeof stream.destroy === "function") return stream.destroy(err); + if (typeof stream.close === "function") return stream.close(); +} + +function createIterResult( + value: IterableItem, + done: boolean, +): ReadableIteratorResult { + return { value, done }; +} + +function readAndResolve(iter: ReadableStreamAsyncIterator) { + const resolve = iter[kLastResolve]; + if (resolve !== null) { + const data = iter[kStream].read(); + if (data !== null) { + iter[kLastPromise] = null; + iter[kLastResolve] = null; + iter[kLastReject] = null; + resolve(createIterResult(data, false)); + } + } +} + +function onReadable(iter: ReadableStreamAsyncIterator) { + queueMicrotask(() => readAndResolve(iter)); +} + +function wrapForNext( + lastPromise: Promise<ReadableIteratorResult>, + iter: ReadableStreamAsyncIterator, +) { + return ( + resolve: (value: ReadableIteratorResult) => void, + reject: (error: Error) => void, + ) => { + lastPromise.then(() => { + if (iter[kEnded]) { + resolve(createIterResult(undefined, true)); + return; + } + + iter[kHandlePromise](resolve, reject); + }, reject); + }; +} + +function finish(self: ReadableStreamAsyncIterator, err?: Error) { + return new Promise( + ( + resolve: (result: ReadableIteratorResult) => void, + reject: (error: Error) => void, + ) => { + const stream = self[kStream]; + + finished(stream, (err) => { + if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + destroyer(stream, err); + }, + ); +} + +const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(async function* () {}).prototype, +); + +class ReadableStreamAsyncIterator + implements AsyncIterableIterator<IterableItem> { + [kEnded]: boolean; + [kError]: Error | null = null; + [kHandlePromise] = ( + resolve: (value: ReadableIteratorResult) => void, + reject: (value: Error) => void, + ) => { + const data = this[kStream].read(); + if (data) { + this[kLastPromise] = null; + this[kLastResolve] = null; + this[kLastReject] = null; + resolve(createIterResult(data, false)); + } else { + this[kLastResolve] = resolve; + this[kLastReject] = reject; + } + }; + [kLastPromise]: null | Promise<ReadableIteratorResult>; + [kLastReject]: null | ((value: Error) => void) = null; + [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null; + [kStream]: Readable; + [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator]; + + constructor(stream: Readable) { + this[kEnded] = stream.readableEnded || stream._readableState.endEmitted; + this[kStream] = stream; + initIteratorSymbols(this, [ + kEnded, + kError, + kHandlePromise, + kLastPromise, + kLastReject, + kLastResolve, + kStream, + ]); + } + + get stream() { + return this[kStream]; + } + + next(): Promise<ReadableIteratorResult> { + const error = this[kError]; + if (error !== null) { + return Promise.reject(error); + } + + if (this[kEnded]) { + return Promise.resolve(createIterResult(undefined, true)); + } + + if (this[kStream].destroyed) { + return new Promise((resolve, reject) => { + if (this[kError]) { + reject(this[kError]); + } else if (this[kEnded]) { + resolve(createIterResult(undefined, true)); + } else { + finished(this[kStream], (err) => { + if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") { + reject(err); + } else { + resolve(createIterResult(undefined, true)); + } + }); + } + }); + } + + const lastPromise = this[kLastPromise]; + let promise; + + if (lastPromise) { + promise = new Promise(wrapForNext(lastPromise, this)); + } else { + const data = this[kStream].read(); + if (data !== null) { + return Promise.resolve(createIterResult(data, false)); + } + + promise = new Promise(this[kHandlePromise]); + } + + this[kLastPromise] = promise; + + return promise; + } + + return(): Promise<ReadableIteratorResult> { + return finish(this); + } + + throw(err: Error): Promise<ReadableIteratorResult> { + return finish(this, err); + } +} + +const createReadableStreamAsyncIterator = (stream: IterableStreams) => { + // deno-lint-ignore no-explicit-any + if (typeof (stream as any).read !== "function") { + const src = stream; + stream = new Readable({ objectMode: true }).wrap(src); + finished(stream, (err) => destroyer(src, err)); + } + + const iterator = new ReadableStreamAsyncIterator(stream as Readable); + iterator[kLastPromise] = null; + + finished(stream, { writable: false }, (err) => { + if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") { + const reject = iterator[kLastReject]; + if (reject !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + reject(err); + } + iterator[kError] = err; + return; + } + + const resolve = iterator[kLastResolve]; + if (resolve !== null) { + iterator[kLastPromise] = null; + iterator[kLastResolve] = null; + iterator[kLastReject] = null; + resolve(createIterResult(undefined, true)); + } + iterator[kEnded] = true; + }); + + stream.on("readable", onReadable.bind(null, iterator)); + + return iterator; +}; + +export default createReadableStreamAsyncIterator; diff --git a/std/node/_stream/async_iterator_test.ts b/std/node/_stream/async_iterator_test.ts new file mode 100644 index 000000000..17698e0fd --- /dev/null +++ b/std/node/_stream/async_iterator_test.ts @@ -0,0 +1,249 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import Readable from "./readable.ts"; +import Stream from "./stream.ts"; +import toReadableAsyncIterator from "./async_iterator.ts"; +import { deferred } from "../../async/mod.ts"; +import { assertEquals, assertThrowsAsync } from "../../testing/asserts.ts"; + +Deno.test("Stream to async iterator", async () => { + let destroyExecuted = 0; + const destroyExecutedExpected = 1; + const destroyExpectedExecutions = deferred(); + + class AsyncIteratorStream extends Stream { + constructor() { + super(); + } + + destroy() { + destroyExecuted++; + if (destroyExecuted == destroyExecutedExpected) { + destroyExpectedExecutions.resolve(); + } + } + + [Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; + } + + const stream = new AsyncIteratorStream(); + + queueMicrotask(() => { + stream.emit("data", "hello"); + stream.emit("data", "world"); + stream.emit("end"); + }); + + let res = ""; + + for await (const d of stream) { + res += d; + } + assertEquals(res, "helloworld"); + + const destroyTimeout = setTimeout( + () => destroyExpectedExecutions.reject(), + 1000, + ); + await destroyExpectedExecutions; + clearTimeout(destroyTimeout); + assertEquals(destroyExecuted, destroyExecutedExpected); +}); + +Deno.test("Stream to async iterator throws on 'error' emitted", async () => { + let closeExecuted = 0; + const closeExecutedExpected = 1; + const closeExpectedExecutions = deferred(); + + let errorExecuted = 0; + const errorExecutedExpected = 1; + const errorExpectedExecutions = deferred(); + + class StreamImplementation extends Stream { + close() { + closeExecuted++; + if (closeExecuted == closeExecutedExpected) { + closeExpectedExecutions.resolve(); + } + } + } + + const stream = new StreamImplementation(); + queueMicrotask(() => { + stream.emit("data", 0); + stream.emit("data", 1); + stream.emit("error", new Error("asd")); + }); + + toReadableAsyncIterator(stream) + .next() + .catch((err) => { + errorExecuted++; + if (errorExecuted == errorExecutedExpected) { + errorExpectedExecutions.resolve(); + } + assertEquals(err.message, "asd"); + }); + + const closeTimeout = setTimeout( + () => closeExpectedExecutions.reject(), + 1000, + ); + const errorTimeout = setTimeout( + () => errorExpectedExecutions.reject(), + 1000, + ); + await closeExpectedExecutions; + await errorExpectedExecutions; + clearTimeout(closeTimeout); + clearTimeout(errorTimeout); + assertEquals(closeExecuted, closeExecutedExpected); + assertEquals(errorExecuted, errorExecutedExpected); +}); + +Deno.test("Async iterator matches values of Readable", async () => { + const readable = new Readable({ + objectMode: true, + read() {}, + }); + readable.push(0); + readable.push(1); + readable.push(null); + + const iter = readable[Symbol.asyncIterator](); + + assertEquals( + await iter.next().then(({ value }) => value), + 0, + ); + for await (const d of iter) { + assertEquals(d, 1); + } +}); + +Deno.test("Async iterator throws on Readable destroyed sync", async () => { + const message = "kaboom from read"; + + const readable = new Readable({ + objectMode: true, + read() { + this.destroy(new Error(message)); + }, + }); + + await assertThrowsAsync( + async () => { + // deno-lint-ignore no-empty + for await (const k of readable) {} + }, + Error, + message, + ); +}); + +Deno.test("Async iterator throws on Readable destroyed async", async () => { + const message = "kaboom"; + const readable = new Readable({ + read() {}, + }); + const iterator = readable[Symbol.asyncIterator](); + + readable.destroy(new Error(message)); + + await assertThrowsAsync( + iterator.next.bind(iterator), + Error, + message, + ); +}); + +Deno.test("Async iterator finishes the iterator when Readable destroyed", async () => { + const readable = new Readable({ + read() {}, + }); + + readable.destroy(); + + const { done } = await readable[Symbol.asyncIterator]().next(); + assertEquals(done, true); +}); + +Deno.test("Async iterator finishes all item promises when Readable destroyed", async () => { + const r = new Readable({ + objectMode: true, + read() { + }, + }); + + const b = r[Symbol.asyncIterator](); + const c = b.next(); + const d = b.next(); + r.destroy(); + assertEquals(await c, { done: true, value: undefined }); + assertEquals(await d, { done: true, value: undefined }); +}); + +Deno.test("Async iterator: 'next' is triggered by Readable push", async () => { + const max = 42; + let readed = 0; + let received = 0; + const readable = new Readable({ + objectMode: true, + read() { + this.push("hello"); + if (++readed === max) { + this.push(null); + } + }, + }); + + for await (const k of readable) { + received++; + assertEquals(k, "hello"); + } + + assertEquals(readed, received); +}); + +Deno.test("Async iterator: 'close' called on forced iteration end", async () => { + let closeExecuted = 0; + const closeExecutedExpected = 1; + const closeExpectedExecutions = deferred(); + + class IndestructibleReadable extends Readable { + constructor() { + super({ + autoDestroy: false, + read() {}, + }); + } + + close() { + closeExecuted++; + if (closeExecuted == closeExecutedExpected) { + closeExpectedExecutions.resolve(); + } + readable.emit("close"); + } + + // deno-lint-ignore ban-ts-comment + //@ts-ignore + destroy = null; + } + + const readable = new IndestructibleReadable(); + readable.push("asd"); + readable.push("asd"); + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const d of readable) { + break; + } + + const closeTimeout = setTimeout( + () => closeExpectedExecutions.reject(), + 1000, + ); + await closeExpectedExecutions; + clearTimeout(closeTimeout); + assertEquals(closeExecuted, closeExecutedExpected); +}); diff --git a/std/node/_stream/buffer_list.ts b/std/node/_stream/buffer_list.ts new file mode 100644 index 000000000..fe1a693c0 --- /dev/null +++ b/std/node/_stream/buffer_list.ts @@ -0,0 +1,183 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; + +type BufferListItem = { + data: Buffer | string | Uint8Array; + next: BufferListItem | null; +}; + +export default class BufferList { + head: BufferListItem | null = null; + tail: BufferListItem | null = null; + length: number; + + constructor() { + this.head = null; + this.tail = null; + this.length = 0; + } + + push(v: Buffer | string | Uint8Array) { + const entry = { data: v, next: null }; + if (this.length > 0) { + (this.tail as BufferListItem).next = entry; + } else { + this.head = entry; + } + this.tail = entry; + ++this.length; + } + + unshift(v: Buffer | string | Uint8Array) { + const entry = { data: v, next: this.head }; + if (this.length === 0) { + this.tail = entry; + } + this.head = entry; + ++this.length; + } + + shift() { + if (this.length === 0) { + return; + } + const ret = (this.head as BufferListItem).data; + if (this.length === 1) { + this.head = this.tail = null; + } else { + this.head = (this.head as BufferListItem).next; + } + --this.length; + return ret; + } + + clear() { + this.head = this.tail = null; + this.length = 0; + } + + join(s: string) { + if (this.length === 0) { + return ""; + } + let p: BufferListItem | null = (this.head as BufferListItem); + let ret = "" + p.data; + p = p.next; + while (p) { + ret += s + p.data; + p = p.next; + } + return ret; + } + + concat(n: number) { + if (this.length === 0) { + return Buffer.alloc(0); + } + const ret = Buffer.allocUnsafe(n >>> 0); + let p = this.head; + let i = 0; + while (p) { + ret.set(p.data as Buffer, i); + i += p.data.length; + p = p.next; + } + return ret; + } + + // Consumes a specified amount of bytes or characters from the buffered data. + consume(n: number, hasStrings: boolean) { + const data = (this.head as BufferListItem).data; + if (n < data.length) { + // `slice` is the same for buffers and strings. + const slice = data.slice(0, n); + (this.head as BufferListItem).data = data.slice(n); + return slice; + } + if (n === data.length) { + // First chunk is a perfect match. + return this.shift(); + } + // Result spans more than one buffer. + return hasStrings ? this._getString(n) : this._getBuffer(n); + } + + first() { + return (this.head as BufferListItem).data; + } + + *[Symbol.iterator]() { + for (let p = this.head; p; p = p.next) { + yield p.data; + } + } + + // Consumes a specified amount of characters from the buffered data. + _getString(n: number) { + let ret = ""; + let p: BufferListItem | null = (this.head as BufferListItem); + let c = 0; + p = p.next as BufferListItem; + do { + const str = p.data; + if (n > str.length) { + ret += str; + n -= str.length; + } else { + if (n === str.length) { + ret += str; + ++c; + if (p.next) { + this.head = p.next; + } else { + this.head = this.tail = null; + } + } else { + ret += str.slice(0, n); + this.head = p; + p.data = str.slice(n); + } + break; + } + ++c; + p = p.next; + } while (p); + this.length -= c; + return ret; + } + + // Consumes a specified amount of bytes from the buffered data. + _getBuffer(n: number) { + const ret = Buffer.allocUnsafe(n); + const retLen = n; + let p: BufferListItem | null = (this.head as BufferListItem); + let c = 0; + p = p.next as BufferListItem; + do { + const buf = p.data as Buffer; + if (n > buf.length) { + ret.set(buf, retLen - n); + n -= buf.length; + } else { + if (n === buf.length) { + ret.set(buf, retLen - n); + ++c; + if (p.next) { + this.head = p.next; + } else { + this.head = this.tail = null; + } + } else { + ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); + this.head = p; + p.data = buf.slice(n); + } + break; + } + ++c; + p = p.next; + } while (p); + this.length -= c; + return ret; + } +} diff --git a/std/node/_stream/duplex.ts b/std/node/_stream/duplex.ts new file mode 100644 index 000000000..c5faed6f8 --- /dev/null +++ b/std/node/_stream/duplex.ts @@ -0,0 +1,3 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +// deno-lint-ignore no-explicit-any +export const errorOrDestroy = (...args: any[]) => {}; diff --git a/std/node/_stream/end-of-stream.ts b/std/node/_stream/end-of-stream.ts new file mode 100644 index 000000000..c42bb0e1c --- /dev/null +++ b/std/node/_stream/end-of-stream.ts @@ -0,0 +1,240 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { once } from "../_utils.ts"; +import type Readable from "./readable.ts"; +import type Stream from "./stream.ts"; +import type { ReadableState } from "./readable.ts"; +import type Writable from "./writable.ts"; +import type { WritableState } from "./writable.ts"; +import { + ERR_INVALID_ARG_TYPE, + ERR_STREAM_PREMATURE_CLOSE, + NodeErrorAbstraction, +} from "../_errors.ts"; + +type StreamImplementations = Readable | Stream | Writable; + +// TODO(Soremwar) +// Bring back once requests are implemented +// function isRequest(stream: Stream) { +// return stream.setHeader && typeof stream.abort === "function"; +// } + +// deno-lint-ignore no-explicit-any +function isReadable(stream: any) { + return typeof stream.readable === "boolean" || + typeof stream.readableEnded === "boolean" || + !!stream._readableState; +} + +// deno-lint-ignore no-explicit-any +function isWritable(stream: any) { + return typeof stream.writable === "boolean" || + typeof stream.writableEnded === "boolean" || + !!stream._writableState; +} + +function isWritableFinished(stream: Writable) { + if (stream.writableFinished) return true; + const wState = stream._writableState; + if (!wState || wState.errored) return false; + return wState.finished || (wState.ended && wState.length === 0); +} + +function nop() {} + +function isReadableEnded(stream: Readable) { + if (stream.readableEnded) return true; + const rState = stream._readableState; + if (!rState || rState.errored) return false; + return rState.endEmitted || (rState.ended && rState.length === 0); +} + +interface FinishedOptions { + error?: boolean; + readable?: boolean; + writable?: boolean; +} + +/** + * Appends an ending callback triggered when a stream is no longer readable, + * writable or has experienced an error or a premature close event +*/ +export default function eos( + stream: StreamImplementations, + options: FinishedOptions | null, + callback: (err?: NodeErrorAbstraction | null) => void, +): () => void; +export default function eos( + stream: StreamImplementations, + callback: (err?: NodeErrorAbstraction | null) => void, +): () => void; +export default function eos( + stream: StreamImplementations, + x: FinishedOptions | ((err?: NodeErrorAbstraction | null) => void) | null, + y?: (err?: NodeErrorAbstraction | null) => void, +) { + let opts: FinishedOptions; + let callback: (err?: NodeErrorAbstraction | null) => void; + + if (!y) { + if (typeof x !== "function") { + throw new ERR_INVALID_ARG_TYPE("callback", "function", x); + } + opts = {}; + callback = x; + } else { + if (!x || Array.isArray(x) || typeof x !== "object") { + throw new ERR_INVALID_ARG_TYPE("opts", "object", x); + } + opts = x; + + if (typeof y !== "function") { + throw new ERR_INVALID_ARG_TYPE("callback", "function", y); + } + callback = y; + } + + callback = once(callback); + + const readable = opts.readable ?? isReadable(stream); + const writable = opts.writable ?? isWritable(stream); + + // deno-lint-ignore no-explicit-any + const wState: WritableState | undefined = (stream as any)._writableState; + // deno-lint-ignore no-explicit-any + const rState: ReadableState | undefined = (stream as any)._readableState; + const validState = wState || rState; + + const onlegacyfinish = () => { + if (!(stream as Writable).writable) { + onfinish(); + } + }; + + let willEmitClose = ( + validState?.autoDestroy && + validState?.emitClose && + validState?.closed === false && + isReadable(stream) === readable && + isWritable(stream) === writable + ); + + let writableFinished = (stream as Writable).writableFinished || + wState?.finished; + const onfinish = () => { + writableFinished = true; + // deno-lint-ignore no-explicit-any + if ((stream as any).destroyed) { + willEmitClose = false; + } + + if (willEmitClose && (!(stream as Readable).readable || readable)) { + return; + } + if (!readable || readableEnded) { + callback.call(stream); + } + }; + + let readableEnded = (stream as Readable).readableEnded || rState?.endEmitted; + const onend = () => { + readableEnded = true; + // deno-lint-ignore no-explicit-any + if ((stream as any).destroyed) { + willEmitClose = false; + } + + if (willEmitClose && (!(stream as Writable).writable || writable)) { + return; + } + if (!writable || writableFinished) { + callback.call(stream); + } + }; + + const onerror = (err: NodeErrorAbstraction) => { + callback.call(stream, err); + }; + + const onclose = () => { + if (readable && !readableEnded) { + if (!isReadableEnded(stream as Readable)) { + return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } + } + if (writable && !writableFinished) { + if (!isWritableFinished(stream as Writable)) { + return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } + } + callback.call(stream); + }; + + // TODO(Soremwar) + // Bring back once requests are implemented + // const onrequest = () => { + // stream.req.on("finish", onfinish); + // }; + + // TODO(Soremwar) + // Bring back once requests are implemented + // if (isRequest(stream)) { + // stream.on("complete", onfinish); + // stream.on("abort", onclose); + // if (stream.req) { + // onrequest(); + // } else { + // stream.on("request", onrequest); + // } + // } else + if (writable && !wState) { + stream.on("end", onlegacyfinish); + stream.on("close", onlegacyfinish); + } + + // TODO(Soremwar) + // Bring back once requests are implemented + // if (typeof stream.aborted === "boolean") { + // stream.on("aborted", onclose); + // } + + stream.on("end", onend); + stream.on("finish", onfinish); + if (opts.error !== false) stream.on("error", onerror); + stream.on("close", onclose); + + const closed = ( + wState?.closed || + rState?.closed || + wState?.errorEmitted || + rState?.errorEmitted || + // TODO(Soremwar) + // Bring back once requests are implemented + // (rState && stream.req && stream.aborted) || + ( + (!writable || wState?.finished) && + (!readable || rState?.endEmitted) + ) + ); + + if (closed) { + queueMicrotask(callback); + } + + return function () { + callback = nop; + stream.removeListener("aborted", onclose); + stream.removeListener("complete", onfinish); + stream.removeListener("abort", onclose); + // TODO(Soremwar) + // Bring back once requests are implemented + // stream.removeListener("request", onrequest); + // if (stream.req) stream.req.removeListener("finish", onfinish); + stream.removeListener("end", onlegacyfinish); + stream.removeListener("close", onlegacyfinish); + stream.removeListener("finish", onfinish); + stream.removeListener("end", onend); + stream.removeListener("error", onerror); + stream.removeListener("close", onclose); + }; +} diff --git a/std/node/_stream/from.ts b/std/node/_stream/from.ts new file mode 100644 index 000000000..652c17715 --- /dev/null +++ b/std/node/_stream/from.ts @@ -0,0 +1,102 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Readable from "./readable.ts"; +import type { ReadableOptions } from "./readable.ts"; +import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts"; + +export default function from( + // deno-lint-ignore no-explicit-any + iterable: Iterable<any> | AsyncIterable<any>, + opts?: ReadableOptions, +) { + let iterator: + // deno-lint-ignore no-explicit-any + | Iterator<any, any, undefined> + // deno-lint-ignore no-explicit-any + | AsyncIterator<any, any, undefined>; + if (typeof iterable === "string" || iterable instanceof Buffer) { + return new Readable({ + objectMode: true, + ...opts, + read() { + this.push(iterable); + this.push(null); + }, + }); + } + + if (Symbol.asyncIterator in iterable) { + // deno-lint-ignore no-explicit-any + iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator](); + } else if (Symbol.iterator in iterable) { + // deno-lint-ignore no-explicit-any + iterator = (iterable as Iterable<any>)[Symbol.iterator](); + } else { + throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable); + } + + const readable = new Readable({ + objectMode: true, + highWaterMark: 1, + ...opts, + }); + + // Reading boolean to protect against _read + // being called before last iteration completion. + let reading = false; + + // needToClose boolean if iterator needs to be explicitly closed + let needToClose = false; + + readable._read = function () { + if (!reading) { + reading = true; + next(); + } + }; + + readable._destroy = function (error, cb) { + if (needToClose) { + needToClose = false; + close().then( + () => queueMicrotask(() => cb(error)), + (e) => queueMicrotask(() => cb(error || e)), + ); + } else { + cb(error); + } + }; + + async function close() { + if (typeof iterator.return === "function") { + const { value } = await iterator.return(); + await value; + } + } + + async function next() { + try { + needToClose = false; + const { value, done } = await iterator.next(); + needToClose = !done; + if (done) { + readable.push(null); + } else if (readable.destroyed) { + await close(); + } else { + const res = await value; + if (res === null) { + reading = false; + throw new ERR_STREAM_NULL_VALUES(); + } else if (readable.push(res)) { + next(); + } else { + reading = false; + } + } + } catch (err) { + readable.destroy(err); + } + } + return readable; +} diff --git a/std/node/_stream/readable.ts b/std/node/_stream/readable.ts new file mode 100644 index 000000000..72e61dff7 --- /dev/null +++ b/std/node/_stream/readable.ts @@ -0,0 +1,1248 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import EventEmitter, { captureRejectionSymbol } from "../events.ts"; +import Stream from "./stream.ts"; +import { Buffer } from "../buffer.ts"; +import BufferList from "./buffer_list.ts"; +import { + ERR_INVALID_OPT_VALUE, + ERR_METHOD_NOT_IMPLEMENTED, + ERR_MULTIPLE_CALLBACK, + ERR_STREAM_PUSH_AFTER_EOF, + ERR_STREAM_UNSHIFT_AFTER_END_EVENT, +} from "../_errors.ts"; +import { StringDecoder } from "../string_decoder.ts"; +import createReadableStreamAsyncIterator from "./async_iterator.ts"; +import streamFrom from "./from.ts"; +import { kConstruct, kDestroy, kPaused } from "./symbols.ts"; +import type Writable from "./writable.ts"; +import { errorOrDestroy as errorOrDestroyDuplex } from "./duplex.ts"; + +function construct(stream: Readable, cb: () => void) { + const r = stream._readableState; + + if (!stream._construct) { + return; + } + + stream.once(kConstruct, cb); + + r.constructed = false; + + queueMicrotask(() => { + let called = false; + stream._construct?.((err?: Error) => { + r.constructed = true; + + if (called) { + err = new ERR_MULTIPLE_CALLBACK(); + } else { + called = true; + } + + if (r.destroyed) { + stream.emit(kDestroy, err); + } else if (err) { + errorOrDestroy(stream, err, true); + } else { + queueMicrotask(() => stream.emit(kConstruct)); + } + }); + }); +} + +function _destroy( + self: Readable, + err?: Error, + cb?: (error?: Error | null) => void, +) { + self._destroy(err || null, (err) => { + const r = (self as Readable)._readableState; + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!r.errored) { + r.errored = err; + } + } + + r.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + if (!r.errorEmitted) { + r.errorEmitted = true; + self.emit("error", err); + } + r.closeEmitted = true; + if (r.emitClose) { + self.emit("close"); + } + }); + } else { + queueMicrotask(() => { + r.closeEmitted = true; + if (r.emitClose) { + self.emit("close"); + } + }); + } + }); +} + +function errorOrDestroy(stream: Readable, err: Error, sync = false) { + const r = stream._readableState; + + if (r.destroyed) { + return stream; + } + + if (r.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!r.errored) { + r.errored = err; + } + if (sync) { + queueMicrotask(() => { + if (!r.errorEmitted) { + r.errorEmitted = true; + stream.emit("error", err); + } + }); + } else if (!r.errorEmitted) { + r.errorEmitted = true; + stream.emit("error", err); + } + } +} + +function flow(stream: Readable) { + const state = stream._readableState; + while (state.flowing && stream.read() !== null); +} + +function pipeOnDrain(src: Readable, dest: Writable) { + return function pipeOnDrainFunctionResult() { + const state = src._readableState; + + if (state.awaitDrainWriters === dest) { + state.awaitDrainWriters = null; + } else if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Writable>).delete(dest); + } + + if ( + (!state.awaitDrainWriters || + (state.awaitDrainWriters as Set<Writable>).size === 0) && + src.listenerCount("data") + ) { + state.flowing = true; + flow(src); + } + }; +} + +function updateReadableListening(self: Readable) { + const state = self._readableState; + state.readableListening = self.listenerCount("readable") > 0; + + if (state.resumeScheduled && state[kPaused] === false) { + // Flowing needs to be set to true now, otherwise + // the upcoming resume will not flow. + state.flowing = true; + + // Crude way to check if we should resume. + } else if (self.listenerCount("data") > 0) { + self.resume(); + } else if (!state.readableListening) { + state.flowing = null; + } +} + +function nReadingNextTick(self: Readable) { + self.read(0); +} + +function resume(stream: Readable, state: ReadableState) { + if (!state.resumeScheduled) { + state.resumeScheduled = true; + queueMicrotask(() => resume_(stream, state)); + } +} + +function resume_(stream: Readable, state: ReadableState) { + if (!state.reading) { + stream.read(0); + } + + state.resumeScheduled = false; + stream.emit("resume"); + flow(stream); + if (state.flowing && !state.reading) { + stream.read(0); + } +} + +function readableAddChunk( + stream: Readable, + chunk: string | Buffer | Uint8Array | null, + encoding: undefined | string = undefined, + addToFront: boolean, +) { + const state = stream._readableState; + let usedEncoding = encoding; + + let err; + if (!state.objectMode) { + if (typeof chunk === "string") { + usedEncoding = encoding || state.defaultEncoding; + if (state.encoding !== usedEncoding) { + if (addToFront && state.encoding) { + chunk = Buffer.from(chunk, usedEncoding).toString(state.encoding); + } else { + chunk = Buffer.from(chunk, usedEncoding); + usedEncoding = ""; + } + } + } else if (chunk instanceof Uint8Array) { + chunk = Buffer.from(chunk); + } + } + + if (err) { + errorOrDestroy(stream, err); + } else if (chunk === null) { + state.reading = false; + onEofChunk(stream, state); + } else if (state.objectMode || (chunk.length > 0)) { + if (addToFront) { + if (state.endEmitted) { + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + } else { + addChunk(stream, state, chunk, true); + } + } else if (state.ended) { + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); + } else if (state.destroyed || state.errored) { + return false; + } else { + state.reading = false; + if (state.decoder && !usedEncoding) { + //TODO(Soremwar) + //I don't think this cast is right + chunk = state.decoder.write(Buffer.from(chunk as Uint8Array)); + if (state.objectMode || chunk.length !== 0) { + addChunk(stream, state, chunk, false); + } else { + maybeReadMore(stream, state); + } + } else { + addChunk(stream, state, chunk, false); + } + } + } else if (!addToFront) { + state.reading = false; + maybeReadMore(stream, state); + } + + return !state.ended && + (state.length < state.highWaterMark || state.length === 0); +} + +function addChunk( + stream: Readable, + state: ReadableState, + chunk: string | Buffer | Uint8Array, + addToFront: boolean, +) { + if (state.flowing && state.length === 0 && !state.sync) { + if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Writable>).clear(); + } else { + state.awaitDrainWriters = null; + } + stream.emit("data", chunk); + } else { + // Update the buffer info. + state.length += state.objectMode ? 1 : chunk.length; + if (addToFront) { + state.buffer.unshift(chunk); + } else { + state.buffer.push(chunk); + } + + if (state.needReadable) { + emitReadable(stream); + } + } + maybeReadMore(stream, state); +} + +function prependListener( + emitter: EventEmitter, + event: string, + // deno-lint-ignore no-explicit-any + fn: (...args: any[]) => any, +) { + if (typeof emitter.prependListener === "function") { + return emitter.prependListener(event, fn); + } + + // This is a hack to make sure that our error handler is attached before any + // userland ones. NEVER DO THIS. This is here only because this code needs + // to continue to work with older versions of Node.js that do not include + //the prependListener() method. The goal is to eventually remove this hack. + // TODO(Soremwar) + // Burn it with fire + // deno-lint-ignore ban-ts-comment + //@ts-ignore + if (emitter._events.get(event)?.length) { + // deno-lint-ignore ban-ts-comment + //@ts-ignore + const listeners = [fn, ...emitter._events.get(event)]; + // deno-lint-ignore ban-ts-comment + //@ts-ignore + emitter._events.set(event, listeners); + } else { + emitter.on(event, fn); + } +} + +/** Pluck off n bytes from an array of buffers. +* Length is the combined lengths of all the buffers in the list. +* This function is designed to be inlinable, so please take care when making +* changes to the function body. +*/ +function fromList(n: number, state: ReadableState) { + // nothing buffered. + if (state.length === 0) { + return null; + } + + let ret; + if (state.objectMode) { + ret = state.buffer.shift(); + } else if (!n || n >= state.length) { + if (state.decoder) { + ret = state.buffer.join(""); + } else if (state.buffer.length === 1) { + ret = state.buffer.first(); + } else { + ret = state.buffer.concat(state.length); + } + state.buffer.clear(); + } else { + ret = state.buffer.consume(n, !!state.decoder); + } + + return ret; +} + +function endReadable(stream: Readable) { + const state = stream._readableState; + + if (!state.endEmitted) { + state.ended = true; + queueMicrotask(() => endReadableNT(state, stream)); + } +} + +function endReadableNT(state: ReadableState, stream: Readable) { + if ( + !state.errorEmitted && !state.closeEmitted && + !state.endEmitted && state.length === 0 + ) { + state.endEmitted = true; + stream.emit("end"); + + if (state.autoDestroy) { + stream.destroy(); + } + } +} + +// Don't raise the hwm > 1GB. +const MAX_HWM = 0x40000000; +function computeNewHighWaterMark(n: number) { + if (n >= MAX_HWM) { + n = MAX_HWM; + } else { + n--; + n |= n >>> 1; + n |= n >>> 2; + n |= n >>> 4; + n |= n >>> 8; + n |= n >>> 16; + n++; + } + return n; +} + +function howMuchToRead(n: number, state: ReadableState) { + if (n <= 0 || (state.length === 0 && state.ended)) { + return 0; + } + if (state.objectMode) { + return 1; + } + if (Number.isNaN(n)) { + // Only flow one buffer at a time. + if (state.flowing && state.length) { + return state.buffer.first().length; + } + return state.length; + } + if (n <= state.length) { + return n; + } + return state.ended ? state.length : 0; +} + +function onEofChunk(stream: Readable, state: ReadableState) { + if (state.ended) return; + if (state.decoder) { + const chunk = state.decoder.end(); + if (chunk && chunk.length) { + state.buffer.push(chunk); + state.length += state.objectMode ? 1 : chunk.length; + } + } + state.ended = true; + + if (state.sync) { + emitReadable(stream); + } else { + state.needReadable = false; + state.emittedReadable = true; + emitReadable_(stream); + } +} + +function emitReadable(stream: Readable) { + const state = stream._readableState; + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + queueMicrotask(() => emitReadable_(stream)); + } +} + +function emitReadable_(stream: Readable) { + const state = stream._readableState; + if (!state.destroyed && !state.errored && (state.length || state.ended)) { + stream.emit("readable"); + state.emittedReadable = false; + } + + state.needReadable = !state.flowing && + !state.ended && + state.length <= state.highWaterMark; + flow(stream); +} + +function maybeReadMore(stream: Readable, state: ReadableState) { + if (!state.readingMore && state.constructed) { + state.readingMore = true; + queueMicrotask(() => maybeReadMore_(stream, state)); + } +} + +function maybeReadMore_(stream: Readable, state: ReadableState) { + while ( + !state.reading && !state.ended && + (state.length < state.highWaterMark || + (state.flowing && state.length === 0)) + ) { + const len = state.length; + stream.read(0); + if (len === state.length) { + // Didn't get any data, stop spinning. + break; + } + } + state.readingMore = false; +} + +export interface ReadableOptions { + autoDestroy?: boolean; + construct?: () => void; + //TODO(Soremwar) + //Import available encodings + defaultEncoding?: string; + destroy?( + this: Readable, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + emitClose?: boolean; + //TODO(Soremwar) + //Import available encodings + encoding?: string; + highWaterMark?: number; + objectMode?: boolean; + read?(this: Readable): void; +} + +export class ReadableState { + [kPaused]: boolean | null = null; + awaitDrainWriters: Writable | Set<Writable> | null = null; + buffer = new BufferList(); + closed = false; + closeEmitted = false; + constructed: boolean; + decoder: StringDecoder | null = null; + destroyed = false; + emittedReadable = false; + //TODO(Soremwar) + //Import available encodings + encoding: string | null = null; + ended = false; + endEmitted = false; + errored: Error | null = null; + errorEmitted = false; + flowing: boolean | null = null; + highWaterMark: number; + length = 0; + multiAwaitDrain = false; + needReadable = false; + objectMode: boolean; + pipes: Writable[] = []; + readable = true; + readableListening = false; + reading = false; + readingMore = false; + resumeScheduled = false; + sync = true; + emitClose: boolean; + autoDestroy: boolean; + defaultEncoding: string; + + constructor(options?: ReadableOptions) { + this.objectMode = !!options?.objectMode; + + this.highWaterMark = options?.highWaterMark ?? + (this.objectMode ? 16 : 16 * 1024); + if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { + this.highWaterMark = Math.floor(this.highWaterMark); + } else { + throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); + } + + this.emitClose = options?.emitClose ?? true; + this.autoDestroy = options?.autoDestroy ?? true; + this.defaultEncoding = options?.defaultEncoding || "utf8"; + + if (options?.encoding) { + this.decoder = new StringDecoder(options.encoding); + this.encoding = options.encoding; + } + + this.constructed = true; + } +} + +class Readable extends Stream { + _construct?: (cb: (error?: Error) => void) => void; + _readableState: ReadableState; + + constructor(options?: ReadableOptions) { + super(); + if (options) { + if (typeof options.read === "function") { + this._read = options.read; + } + if (typeof options.destroy === "function") { + this._destroy = options.destroy; + } + if (typeof options.construct === "function") { + this._construct = options.construct; + } + } + this._readableState = new ReadableState(options); + + construct(this, () => { + maybeReadMore(this, this._readableState); + }); + } + + static from( + // deno-lint-ignore no-explicit-any + iterable: Iterable<any> | AsyncIterable<any>, + opts?: ReadableOptions, + ): Readable { + return streamFrom(iterable, opts); + } + + static ReadableState = ReadableState; + + static _fromList = fromList; + + // You can override either this method, or the async _read(n) below. + read(n?: number) { + // Same as parseInt(undefined, 10), however V8 7.3 performance regressed + // in this scenario, so we are doing it manually. + if (n === undefined) { + n = NaN; + } + const state = this._readableState; + const nOrig = n; + + if (n > state.highWaterMark) { + state.highWaterMark = computeNewHighWaterMark(n); + } + + if (n !== 0) { + state.emittedReadable = false; + } + + if ( + n === 0 && + state.needReadable && + ((state.highWaterMark !== 0 + ? state.length >= state.highWaterMark + : state.length > 0) || + state.ended) + ) { + if (state.length === 0 && state.ended) { + endReadable(this); + } else { + emitReadable(this); + } + return null; + } + + n = howMuchToRead(n, state); + + if (n === 0 && state.ended) { + if (state.length === 0) { + endReadable(this); + } + return null; + } + + let doRead = state.needReadable; + if ( + state.length === 0 || state.length - (n as number) < state.highWaterMark + ) { + doRead = true; + } + + if ( + state.ended || state.reading || state.destroyed || state.errored || + !state.constructed + ) { + doRead = false; + } else if (doRead) { + state.reading = true; + state.sync = true; + if (state.length === 0) { + state.needReadable = true; + } + this._read(); + state.sync = false; + if (!state.reading) { + n = howMuchToRead(nOrig, state); + } + } + + let ret; + if ((n as number) > 0) { + ret = fromList((n as number), state); + } else { + ret = null; + } + + if (ret === null) { + state.needReadable = state.length <= state.highWaterMark; + n = 0; + } else { + state.length -= n as number; + if (state.multiAwaitDrain) { + (state.awaitDrainWriters as Set<Writable>).clear(); + } else { + state.awaitDrainWriters = null; + } + } + + if (state.length === 0) { + if (!state.ended) { + state.needReadable = true; + } + + if (nOrig !== n && state.ended) { + endReadable(this); + } + } + + if (ret !== null) { + this.emit("data", ret); + } + + return ret; + } + + _read() { + throw new ERR_METHOD_NOT_IMPLEMENTED("_read()"); + } + + //TODO(Soremwar) + //Should be duplex + pipe<T extends Writable>(dest: T, pipeOpts?: { end?: boolean }): T { + // deno-lint-ignore no-this-alias + const src = this; + const state = this._readableState; + + if (state.pipes.length === 1) { + if (!state.multiAwaitDrain) { + state.multiAwaitDrain = true; + state.awaitDrainWriters = new Set( + state.awaitDrainWriters ? [state.awaitDrainWriters as Writable] : [], + ); + } + } + + state.pipes.push(dest); + + const doEnd = (!pipeOpts || pipeOpts.end !== false); + + //TODO(Soremwar) + //Part of doEnd condition + //In node, output/inout are a duplex Stream + // && + // dest !== stdout && + // dest !== stderr + + const endFn = doEnd ? onend : unpipe; + if (state.endEmitted) { + queueMicrotask(endFn); + } else { + this.once("end", endFn); + } + + dest.on("unpipe", onunpipe); + function onunpipe(readable: Readable, unpipeInfo: { hasUnpiped: boolean }) { + if (readable === src) { + if (unpipeInfo && unpipeInfo.hasUnpiped === false) { + unpipeInfo.hasUnpiped = true; + cleanup(); + } + } + } + + function onend() { + dest.end(); + } + + let ondrain: () => void; + + let cleanedUp = false; + function cleanup() { + dest.removeListener("close", onclose); + dest.removeListener("finish", onfinish); + if (ondrain) { + dest.removeListener("drain", ondrain); + } + dest.removeListener("error", onerror); + dest.removeListener("unpipe", onunpipe); + src.removeListener("end", onend); + src.removeListener("end", unpipe); + src.removeListener("data", ondata); + + cleanedUp = true; + if ( + ondrain && state.awaitDrainWriters && + (!dest._writableState || dest._writableState.needDrain) + ) { + ondrain(); + } + } + + this.on("data", ondata); + // deno-lint-ignore no-explicit-any + function ondata(chunk: any) { + const ret = dest.write(chunk); + if (ret === false) { + if (!cleanedUp) { + if (state.pipes.length === 1 && state.pipes[0] === dest) { + state.awaitDrainWriters = dest; + state.multiAwaitDrain = false; + } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { + (state.awaitDrainWriters as Set<Writable>).add(dest); + } + src.pause(); + } + if (!ondrain) { + ondrain = pipeOnDrain(src, dest); + dest.on("drain", ondrain); + } + } + } + + function onerror(er: Error) { + unpipe(); + dest.removeListener("error", onerror); + if (dest.listenerCount("error") === 0) { + //TODO(Soremwar) + //Should be const s = dest._writableState || dest._readableState; + const s = dest._writableState; + if (s && !s.errorEmitted) { + // User incorrectly emitted 'error' directly on the stream. + errorOrDestroyDuplex(dest, er); + } else { + dest.emit("error", er); + } + } + } + + prependListener(dest, "error", onerror); + + function onclose() { + dest.removeListener("finish", onfinish); + unpipe(); + } + dest.once("close", onclose); + function onfinish() { + dest.removeListener("close", onclose); + unpipe(); + } + dest.once("finish", onfinish); + + function unpipe() { + src.unpipe(dest); + } + + dest.emit("pipe", this); + + if (!state.flowing) { + this.resume(); + } + + return dest; + } + + isPaused() { + return this._readableState[kPaused] === true || + this._readableState.flowing === false; + } + + //TODO(Soremwar) + //Replace string with encoding types + setEncoding(enc: string) { + const decoder = new StringDecoder(enc); + this._readableState.decoder = decoder; + this._readableState.encoding = this._readableState.decoder.encoding; + + const buffer = this._readableState.buffer; + let content = ""; + for (const data of buffer) { + content += decoder.write(data as Buffer); + } + buffer.clear(); + if (content !== "") { + buffer.push(content); + } + this._readableState.length = content.length; + return this; + } + + on( + event: "close" | "end" | "pause" | "readable" | "resume", + listener: () => void, + ): this; + // deno-lint-ignore no-explicit-any + on(event: "data", listener: (chunk: any) => void): this; + on(event: "error", listener: (err: Error) => void): this; + // deno-lint-ignore no-explicit-any + on(event: string | symbol, listener: (...args: any[]) => void): this; + on( + ev: string | symbol, + fn: + | (() => void) + // deno-lint-ignore no-explicit-any + | ((chunk: any) => void) + | ((err: Error) => void) + // deno-lint-ignore no-explicit-any + | ((...args: any[]) => void), + ) { + const res = super.on.call(this, ev, fn); + const state = this._readableState; + + if (ev === "data") { + state.readableListening = this.listenerCount("readable") > 0; + + if (state.flowing !== false) { + this.resume(); + } + } else if (ev === "readable") { + if (!state.endEmitted && !state.readableListening) { + state.readableListening = state.needReadable = true; + state.flowing = false; + state.emittedReadable = false; + if (state.length) { + emitReadable(this); + } else if (!state.reading) { + queueMicrotask(() => nReadingNextTick(this)); + } + } + } + + return res; + } + + removeListener( + event: "close" | "end" | "pause" | "readable" | "resume", + listener: () => void, + ): this; + // deno-lint-ignore no-explicit-any + removeListener(event: "data", listener: (chunk: any) => void): this; + removeListener(event: "error", listener: (err: Error) => void): this; + removeListener( + event: string | symbol, + // deno-lint-ignore no-explicit-any + listener: (...args: any[]) => void, + ): this; + removeListener( + ev: string | symbol, + fn: + | (() => void) + // deno-lint-ignore no-explicit-any + | ((chunk: any) => void) + | ((err: Error) => void) + // deno-lint-ignore no-explicit-any + | ((...args: any[]) => void), + ) { + const res = super.removeListener.call(this, ev, fn); + + if (ev === "readable") { + queueMicrotask(() => updateReadableListening(this)); + } + + return res; + } + + off = this.removeListener; + + destroy(err?: Error, cb?: () => void) { + const r = this._readableState; + + if (r.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!r.errored) { + r.errored = err; + } + } + + r.destroyed = true; + + // If still constructing then defer calling _destroy. + if (!r.constructed) { + this.once(kDestroy, (er: Error) => { + _destroy(this, err || er, cb); + }); + } else { + _destroy(this, err, cb); + } + + return this; + } + + _undestroy() { + const r = this._readableState; + r.constructed = true; + r.closed = false; + r.closeEmitted = false; + r.destroyed = false; + r.errored = null; + r.errorEmitted = false; + r.reading = false; + r.ended = false; + r.endEmitted = false; + } + + _destroy( + error: Error | null, + callback: (error?: Error | null) => void, + ): void { + callback(error); + } + + [captureRejectionSymbol](err: Error) { + this.destroy(err); + } + + //TODO(Soremwar) + //Same deal, string => encodings + // deno-lint-ignore no-explicit-any + push(chunk: any, encoding?: string): boolean { + return readableAddChunk(this, chunk, encoding, false); + } + + // deno-lint-ignore no-explicit-any + unshift(chunk: any, encoding?: string): boolean { + return readableAddChunk(this, chunk, encoding, true); + } + + unpipe(dest?: Writable): this { + const state = this._readableState; + const unpipeInfo = { hasUnpiped: false }; + + if (state.pipes.length === 0) { + return this; + } + + if (!dest) { + // remove all. + const dests = state.pipes; + state.pipes = []; + this.pause(); + + for (const dest of dests) { + dest.emit("unpipe", this, { hasUnpiped: false }); + } + return this; + } + + const index = state.pipes.indexOf(dest); + if (index === -1) { + return this; + } + + state.pipes.splice(index, 1); + if (state.pipes.length === 0) { + this.pause(); + } + + dest.emit("unpipe", this, unpipeInfo); + + return this; + } + + removeAllListeners( + ev: + | "close" + | "data" + | "end" + | "error" + | "pause" + | "readable" + | "resume" + | symbol + | undefined, + ) { + const res = super.removeAllListeners(ev); + + if (ev === "readable" || ev === undefined) { + queueMicrotask(() => updateReadableListening(this)); + } + + return res; + } + + resume() { + const state = this._readableState; + if (!state.flowing) { + // We flow only if there is no one listening + // for readable, but we still have to call + // resume(). + state.flowing = !state.readableListening; + resume(this, state); + } + state[kPaused] = false; + return this; + } + + pause() { + if (this._readableState.flowing !== false) { + this._readableState.flowing = false; + this.emit("pause"); + } + this._readableState[kPaused] = true; + return this; + } + + /** Wrap an old-style stream as the async data source. */ + wrap(stream: Stream): this { + const state = this._readableState; + let paused = false; + + stream.on("end", () => { + if (state.decoder && !state.ended) { + const chunk = state.decoder.end(); + if (chunk && chunk.length) { + this.push(chunk); + } + } + + this.push(null); + }); + + stream.on("data", (chunk) => { + if (state.decoder) { + chunk = state.decoder.write(chunk); + } + + if (state.objectMode && (chunk === null || chunk === undefined)) { + return; + } else if (!state.objectMode && (!chunk || !chunk.length)) { + return; + } + + const ret = this.push(chunk); + if (!ret) { + paused = true; + // By the time this is triggered, stream will be a readable stream + // deno-lint-ignore ban-ts-comment + // @ts-ignore + stream.pause(); + } + }); + + // TODO(Soremwar) + // There must be a clean way to implement this on TypeScript + // Proxy all the other methods. Important when wrapping filters and duplexes. + for (const i in stream) { + // deno-lint-ignore ban-ts-comment + //@ts-ignore + if (this[i] === undefined && typeof stream[i] === "function") { + // deno-lint-ignore ban-ts-comment + //@ts-ignore + this[i] = function methodWrap(method) { + return function methodWrapReturnFunction() { + // deno-lint-ignore ban-ts-comment + //@ts-ignore + return stream[method].apply(stream); + }; + }(i); + } + } + + stream.on("error", (err) => { + errorOrDestroy(this, err); + }); + + stream.on("close", () => { + this.emit("close"); + }); + + stream.on("destroy", () => { + this.emit("destroy"); + }); + + stream.on("pause", () => { + this.emit("pause"); + }); + + stream.on("resume", () => { + this.emit("resume"); + }); + + this._read = () => { + if (paused) { + paused = false; + // By the time this is triggered, stream will be a readable stream + // deno-lint-ignore ban-ts-comment + //@ts-ignore + stream.resume(); + } + }; + + return this; + } + + [Symbol.asyncIterator]() { + return createReadableStreamAsyncIterator(this); + } + + get readable(): boolean { + return this._readableState?.readable && + !this._readableState?.destroyed && + !this._readableState?.errorEmitted && + !this._readableState?.endEmitted; + } + set readable(val: boolean) { + if (this._readableState) { + this._readableState.readable = val; + } + } + + get readableHighWaterMark(): number { + return this._readableState.highWaterMark; + } + + get readableBuffer() { + return this._readableState && this._readableState.buffer; + } + + get readableFlowing(): boolean | null { + return this._readableState.flowing; + } + + set readableFlowing(state: boolean | null) { + if (this._readableState) { + this._readableState.flowing = state; + } + } + + get readableLength() { + return this._readableState.length; + } + + get readableObjectMode() { + return this._readableState ? this._readableState.objectMode : false; + } + + get readableEncoding() { + return this._readableState ? this._readableState.encoding : null; + } + + get destroyed() { + if (this._readableState === undefined) { + return false; + } + return this._readableState.destroyed; + } + + set destroyed(value: boolean) { + if (!this._readableState) { + return; + } + this._readableState.destroyed = value; + } + + get readableEnded() { + return this._readableState ? this._readableState.endEmitted : false; + } +} + +Object.defineProperties(Stream, { + _readableState: { enumerable: false }, + destroyed: { enumerable: false }, + readableBuffer: { enumerable: false }, + readableEncoding: { enumerable: false }, + readableEnded: { enumerable: false }, + readableFlowing: { enumerable: false }, + readableHighWaterMark: { enumerable: false }, + readableLength: { enumerable: false }, + readableObjectMode: { enumerable: false }, +}); + +export default Readable; diff --git a/std/node/_stream/readable_test.ts b/std/node/_stream/readable_test.ts new file mode 100644 index 000000000..72767e28f --- /dev/null +++ b/std/node/_stream/readable_test.ts @@ -0,0 +1,489 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Readable from "../_stream/readable.ts"; +import { once } from "../events.ts"; +import { deferred } from "../../async/mod.ts"; +import { + assert, + assertEquals, + assertStrictEquals, +} from "../../testing/asserts.ts"; + +Deno.test("Readable stream from iterator", async () => { + function* generate() { + yield "a"; + yield "b"; + yield "c"; + } + + const stream = Readable.from(generate()); + + const expected = ["a", "b", "c"]; + + for await (const chunk of stream) { + assertStrictEquals(chunk, expected.shift()); + } +}); + +Deno.test("Readable stream from async iterator", async () => { + async function* generate() { + yield "a"; + yield "b"; + yield "c"; + } + + const stream = Readable.from(generate()); + + const expected = ["a", "b", "c"]; + + for await (const chunk of stream) { + assertStrictEquals(chunk, expected.shift()); + } +}); + +Deno.test("Readable stream from promise", async () => { + const promises = [ + Promise.resolve("a"), + Promise.resolve("b"), + Promise.resolve("c"), + ]; + + const stream = Readable.from(promises); + + const expected = ["a", "b", "c"]; + + for await (const chunk of stream) { + assertStrictEquals(chunk, expected.shift()); + } +}); + +Deno.test("Readable stream from string", async () => { + const string = "abc"; + const stream = Readable.from(string); + + for await (const chunk of stream) { + assertStrictEquals(chunk, string); + } +}); + +Deno.test("Readable stream from Buffer", async () => { + const string = "abc"; + const stream = Readable.from(Buffer.from(string)); + + for await (const chunk of stream) { + assertStrictEquals((chunk as Buffer).toString(), string); + } +}); + +Deno.test("Readable stream gets destroyed on error", async () => { + // deno-lint-ignore require-yield + async function* generate() { + throw new Error("kaboom"); + } + + const stream = Readable.from(generate()); + + stream.read(); + + const [err] = await once(stream, "error"); + assertStrictEquals(err.message, "kaboom"); + assertStrictEquals(stream.destroyed, true); +}); + +Deno.test("Readable stream works as Transform stream", async () => { + async function* generate(stream: Readable) { + for await (const chunk of stream) { + yield (chunk as string).toUpperCase(); + } + } + + const source = new Readable({ + objectMode: true, + read() { + this.push("a"); + this.push("b"); + this.push("c"); + this.push(null); + }, + }); + + const stream = Readable.from(generate(source)); + + const expected = ["A", "B", "C"]; + + for await (const chunk of stream) { + assertStrictEquals(chunk, expected.shift()); + } +}); + +Deno.test("Readable stream can be paused", () => { + const readable = new Readable(); + + // _read is a noop, here. + readable._read = () => {}; + + // Default state of a stream is not "paused" + assert(!readable.isPaused()); + + // Make the stream start flowing... + readable.on("data", () => {}); + + // still not paused. + assert(!readable.isPaused()); + + readable.pause(); + assert(readable.isPaused()); + readable.resume(); + assert(!readable.isPaused()); +}); + +Deno.test("Readable stream sets enconding correctly", () => { + const readable = new Readable({ + read() {}, + }); + + readable.setEncoding("utf8"); + + readable.push(new TextEncoder().encode("DEF")); + readable.unshift(new TextEncoder().encode("ABC")); + + assertStrictEquals(readable.read(), "ABCDEF"); +}); + +Deno.test("Readable stream sets encoding correctly", () => { + const readable = new Readable({ + read() {}, + }); + + readable.setEncoding("utf8"); + + readable.push(new TextEncoder().encode("DEF")); + readable.unshift(new TextEncoder().encode("ABC")); + + assertStrictEquals(readable.read(), "ABCDEF"); +}); + +Deno.test("Readable stream holds up a big push", async () => { + let readExecuted = 0; + const readExecutedExpected = 3; + const readExpectedExecutions = deferred(); + + let endExecuted = 0; + const endExecutedExpected = 1; + const endExpectedExecutions = deferred(); + + const str = "asdfasdfasdfasdfasdf"; + + const r = new Readable({ + highWaterMark: 5, + encoding: "utf8", + }); + + let reads = 0; + + function _read() { + if (reads === 0) { + setTimeout(() => { + r.push(str); + }, 1); + reads++; + } else if (reads === 1) { + const ret = r.push(str); + assertEquals(ret, false); + reads++; + } else { + r.push(null); + } + } + + r._read = () => { + readExecuted++; + if (readExecuted == readExecutedExpected) { + readExpectedExecutions.resolve(); + } + _read(); + }; + + r.on("end", () => { + endExecuted++; + if (endExecuted == endExecutedExpected) { + endExpectedExecutions.resolve(); + } + }); + + // Push some data in to start. + // We've never gotten any read event at this point. + const ret = r.push(str); + assert(!ret); + let chunk = r.read(); + assertEquals(chunk, str); + chunk = r.read(); + assertEquals(chunk, null); + + r.once("readable", () => { + // This time, we'll get *all* the remaining data, because + // it's been added synchronously, as the read WOULD take + // us below the hwm, and so it triggered a _read() again, + // which synchronously added more, which we then return. + chunk = r.read(); + assertEquals(chunk, str + str); + + chunk = r.read(); + assertEquals(chunk, null); + }); + + const readTimeout = setTimeout( + () => readExpectedExecutions.reject(), + 1000, + ); + const endTimeout = setTimeout( + () => endExpectedExecutions.reject(), + 1000, + ); + await readExpectedExecutions; + await endExpectedExecutions; + clearTimeout(readTimeout); + clearTimeout(endTimeout); + assertEquals(readExecuted, readExecutedExpected); + assertEquals(endExecuted, endExecutedExpected); +}); + +Deno.test("Readable stream: 'on' event", async () => { + async function* generate() { + yield "a"; + yield "b"; + yield "c"; + } + + const stream = Readable.from(generate()); + + let iterations = 0; + const expected = ["a", "b", "c"]; + + stream.on("data", (chunk) => { + iterations++; + assertStrictEquals(chunk, expected.shift()); + }); + + await once(stream, "end"); + + assertStrictEquals(iterations, 3); +}); + +Deno.test("Readable stream: 'data' event", async () => { + async function* generate() { + yield "a"; + yield "b"; + yield "c"; + } + + const stream = Readable.from(generate(), { objectMode: false }); + + let iterations = 0; + const expected = ["a", "b", "c"]; + + stream.on("data", (chunk) => { + iterations++; + assertStrictEquals(chunk instanceof Buffer, true); + assertStrictEquals(chunk.toString(), expected.shift()); + }); + + await once(stream, "end"); + + assertStrictEquals(iterations, 3); +}); + +Deno.test("Readable stream: 'data' event on non-object", async () => { + async function* generate() { + yield "a"; + yield "b"; + yield "c"; + } + + const stream = Readable.from(generate(), { objectMode: false }); + + let iterations = 0; + const expected = ["a", "b", "c"]; + + stream.on("data", (chunk) => { + iterations++; + assertStrictEquals(chunk instanceof Buffer, true); + assertStrictEquals(chunk.toString(), expected.shift()); + }); + + await once(stream, "end"); + + assertStrictEquals(iterations, 3); +}); + +Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length exceeded", async () => { + let readableExecuted = 0; + const readableExecutedExpected = 1; + const readableExpectedExecutions = deferred(); + + const r = new Readable({ + highWaterMark: 3, + }); + + r._read = () => { + throw new Error("_read must not be called"); + }; + r.push(Buffer.from("blerg")); + + setTimeout(function () { + assert(!r._readableState.reading); + r.on("readable", () => { + readableExecuted++; + if (readableExecuted == readableExecutedExpected) { + readableExpectedExecutions.resolve(); + } + }); + }, 1); + + const readableTimeout = setTimeout( + () => readableExpectedExecutions.reject(), + 1000, + ); + await readableExpectedExecutions; + clearTimeout(readableTimeout); + assertEquals(readableExecuted, readableExecutedExpected); +}); + +Deno.test("Readable stream: 'readable' and 'read' events are emitted on highWaterMark length not reached", async () => { + let readableExecuted = 0; + const readableExecutedExpected = 1; + const readableExpectedExecutions = deferred(); + + let readExecuted = 0; + const readExecutedExpected = 1; + const readExpectedExecutions = deferred(); + + const r = new Readable({ + highWaterMark: 3, + }); + + r._read = () => { + readExecuted++; + if (readExecuted == readExecutedExpected) { + readExpectedExecutions.resolve(); + } + }; + + r.push(Buffer.from("bl")); + + setTimeout(function () { + assert(r._readableState.reading); + r.on("readable", () => { + readableExecuted++; + if (readableExecuted == readableExecutedExpected) { + readableExpectedExecutions.resolve(); + } + }); + }, 1); + + const readableTimeout = setTimeout( + () => readableExpectedExecutions.reject(), + 1000, + ); + const readTimeout = setTimeout( + () => readExpectedExecutions.reject(), + 1000, + ); + await readableExpectedExecutions; + await readExpectedExecutions; + clearTimeout(readableTimeout); + clearTimeout(readTimeout); + assertEquals(readableExecuted, readableExecutedExpected); + assertEquals(readExecuted, readExecutedExpected); +}); + +Deno.test("Readable stream: 'readable' event is emitted but 'read' is not on highWaterMark length not reached and stream ended", async () => { + let readableExecuted = 0; + const readableExecutedExpected = 1; + const readableExpectedExecutions = deferred(); + + const r = new Readable({ + highWaterMark: 30, + }); + + r._read = () => { + throw new Error("Must not be executed"); + }; + + r.push(Buffer.from("blerg")); + //This ends the stream and triggers end + r.push(null); + + setTimeout(function () { + // Assert we're testing what we think we are + assert(!r._readableState.reading); + r.on("readable", () => { + readableExecuted++; + if (readableExecuted == readableExecutedExpected) { + readableExpectedExecutions.resolve(); + } + }); + }, 1); + + const readableTimeout = setTimeout( + () => readableExpectedExecutions.reject(), + 1000, + ); + await readableExpectedExecutions; + clearTimeout(readableTimeout); + assertEquals(readableExecuted, readableExecutedExpected); +}); + +Deno.test("Readable stream: 'read' is emitted on empty string pushed in non-object mode", async () => { + let endExecuted = 0; + const endExecutedExpected = 1; + const endExpectedExecutions = deferred(); + + const underlyingData = ["", "x", "y", "", "z"]; + const expected = underlyingData.filter((data) => data); + const result: unknown[] = []; + + const r = new Readable({ + encoding: "utf8", + }); + r._read = function () { + queueMicrotask(() => { + if (!underlyingData.length) { + this.push(null); + } else { + this.push(underlyingData.shift()); + } + }); + }; + + r.on("readable", () => { + const data = r.read(); + if (data !== null) result.push(data); + }); + + r.on("end", () => { + endExecuted++; + if (endExecuted == endExecutedExpected) { + endExpectedExecutions.resolve(); + } + assertEquals(result, expected); + }); + + const endTimeout = setTimeout( + () => endExpectedExecutions.reject(), + 1000, + ); + await endExpectedExecutions; + clearTimeout(endTimeout); + assertEquals(endExecuted, endExecutedExpected); +}); + +Deno.test("Readable stream: listeners can be removed", () => { + const r = new Readable(); + r._read = () => {}; + r.on("data", () => {}); + + r.removeAllListeners("data"); + + assertEquals(r.eventNames().length, 0); +}); diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts new file mode 100644 index 000000000..708b8bcd3 --- /dev/null +++ b/std/node/_stream/stream.ts @@ -0,0 +1,79 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import EventEmitter from "../events.ts"; +import type Writable from "./writable.ts"; +import { types } from "../util.ts"; + +class Stream extends EventEmitter { + constructor() { + super(); + } + + static _isUint8Array = types.isUint8Array; + static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk); + + pipe(dest: Writable, options: { end: boolean }) { + // deno-lint-ignore no-this-alias + const source = this; + + //TODO(Soremwar) + //isStdio exist on stdin || stdout only, which extend from Duplex + //if (!dest._isStdio && (options?.end ?? true)) { + //Find an alternative to be able to pipe streams to stdin & stdout + //Port them as well? + if (options?.end ?? true) { + source.on("end", onend); + source.on("close", onclose); + } + + let didOnEnd = false; + function onend() { + if (didOnEnd) return; + didOnEnd = true; + + dest.end(); + } + + function onclose() { + if (didOnEnd) return; + didOnEnd = true; + + if (typeof dest.destroy === "function") dest.destroy(); + } + + // Don't leave dangling pipes when there are errors. + function onerror(this: Stream, er: Error) { + cleanup(); + if (this.listenerCount("error") === 0) { + throw er; // Unhandled stream error in pipe. + } + } + + source.on("error", onerror); + dest.on("error", onerror); + + // Remove all the event listeners that were added. + function cleanup() { + source.removeListener("end", onend); + source.removeListener("close", onclose); + + source.removeListener("error", onerror); + dest.removeListener("error", onerror); + + source.removeListener("end", cleanup); + source.removeListener("close", cleanup); + + dest.removeListener("close", cleanup); + } + + source.on("end", cleanup); + source.on("close", cleanup); + + dest.on("close", cleanup); + dest.emit("pipe", source); + + return dest; + } +} + +export default Stream; diff --git a/std/node/_stream/symbols.ts b/std/node/_stream/symbols.ts new file mode 100644 index 000000000..addb969d3 --- /dev/null +++ b/std/node/_stream/symbols.ts @@ -0,0 +1,4 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +export const kConstruct = Symbol("kConstruct"); +export const kDestroy = Symbol("kDestroy"); +export const kPaused = Symbol("kPaused"); diff --git a/std/node/_stream/writable.ts b/std/node/_stream/writable.ts new file mode 100644 index 000000000..158af8325 --- /dev/null +++ b/std/node/_stream/writable.ts @@ -0,0 +1,952 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import Stream from "./stream.ts"; +import { captureRejectionSymbol } from "../events.ts"; +import { kConstruct, kDestroy } from "./symbols.ts"; +import { + ERR_INVALID_ARG_TYPE, + ERR_INVALID_OPT_VALUE, + ERR_METHOD_NOT_IMPLEMENTED, + ERR_MULTIPLE_CALLBACK, + ERR_STREAM_ALREADY_FINISHED, + ERR_STREAM_CANNOT_PIPE, + ERR_STREAM_DESTROYED, + ERR_STREAM_NULL_VALUES, + ERR_STREAM_WRITE_AFTER_END, + ERR_UNKNOWN_ENCODING, +} from "../_errors.ts"; + +function nop() {} + +//TODO(Soremwar) +//Bring in encodings +type write_v = ( + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: string }>, + callback: (error?: Error | null) => void, +) => void; + +type AfterWriteTick = { + cb: (error?: Error) => void; + count: number; + state: WritableState; + stream: Writable; +}; + +const kOnFinished = Symbol("kOnFinished"); + +function destroy(this: Writable, err?: Error, cb?: () => void) { + const w = this._writableState; + + if (w.destroyed) { + if (typeof cb === "function") { + cb(); + } + + return this; + } + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.destroyed = true; + + if (!w.constructed) { + this.once(kDestroy, (er) => { + _destroy(this, err || er, cb); + }); + } else { + _destroy(this, err, cb); + } + + return this; +} + +function _destroy( + self: Writable, + err?: Error, + cb?: (error?: Error | null) => void, +) { + self._destroy(err || null, (err) => { + const w = self._writableState; + + if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + } + + w.closed = true; + + if (typeof cb === "function") { + cb(err); + } + + if (err) { + queueMicrotask(() => { + if (!w.errorEmitted) { + w.errorEmitted = true; + self.emit("error", err); + } + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } else { + queueMicrotask(() => { + w.closeEmitted = true; + if (w.emitClose) { + self.emit("close"); + } + }); + } + }); +} + +function errorOrDestroy(stream: Writable, err: Error, sync = false) { + const w = stream._writableState; + + if (w.destroyed) { + return stream; + } + + if (w.autoDestroy) { + stream.destroy(err); + } else if (err) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + err.stack; + + if (!w.errored) { + w.errored = err; + } + if (sync) { + queueMicrotask(() => { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + }); + } else { + if (w.errorEmitted) { + return; + } + w.errorEmitted = true; + stream.emit("error", err); + } + } +} + +function construct(stream: Writable, cb: (error: Error) => void) { + if (!stream._construct) { + return; + } + + stream.once(kConstruct, cb); + const w = stream._writableState; + + w.constructed = false; + + queueMicrotask(() => { + let called = false; + stream._construct?.((err) => { + w.constructed = true; + + if (called) { + err = new ERR_MULTIPLE_CALLBACK(); + } else { + called = true; + } + + if (w.destroyed) { + stream.emit(kDestroy, err); + } else if (err) { + errorOrDestroy(stream, err, true); + } else { + queueMicrotask(() => { + stream.emit(kConstruct); + }); + } + }); + }); +} + +//TODO(Soremwar) +//Bring encodings in +function writeOrBuffer( + stream: Writable, + state: WritableState, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error: Error) => void, +) { + const len = state.objectMode ? 1 : chunk.length; + + state.length += len; + + if (state.writing || state.corked || state.errored || !state.constructed) { + state.buffered.push({ chunk, encoding, callback }); + if (state.allBuffers && encoding !== "buffer") { + state.allBuffers = false; + } + if (state.allNoop && callback !== nop) { + state.allNoop = false; + } + } else { + state.writelen = len; + state.writecb = callback; + state.writing = true; + state.sync = true; + stream._write(chunk, encoding, state.onwrite); + state.sync = false; + } + + const ret = state.length < state.highWaterMark; + + if (!ret) { + state.needDrain = true; + } + + return ret && !state.errored && !state.destroyed; +} + +//TODO(Soremwar) +//Bring encodings in +function doWrite( + stream: Writable, + state: WritableState, + writev: boolean, + len: number, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + cb: (error: Error) => void, +) { + state.writelen = len; + state.writecb = cb; + state.writing = true; + state.sync = true; + if (state.destroyed) { + state.onwrite(new ERR_STREAM_DESTROYED("write")); + } else if (writev) { + (stream._writev as unknown as write_v)(chunk, state.onwrite); + } else { + stream._write(chunk, encoding, state.onwrite); + } + state.sync = false; +} + +function onwriteError( + stream: Writable, + state: WritableState, + er: Error, + cb: (error: Error) => void, +) { + --state.pendingcb; + + cb(er); + errorBuffer(state); + errorOrDestroy(stream, er); +} + +function onwrite(stream: Writable, er?: Error | null) { + const state = stream._writableState; + const sync = state.sync; + const cb = state.writecb; + + if (typeof cb !== "function") { + errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); + return; + } + + state.writing = false; + state.writecb = null; + state.length -= state.writelen; + state.writelen = 0; + + if (er) { + // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 + er.stack; + + if (!state.errored) { + state.errored = er; + } + + if (sync) { + queueMicrotask(() => onwriteError(stream, state, er, cb)); + } else { + onwriteError(stream, state, er, cb); + } + } else { + if (state.buffered.length > state.bufferedIndex) { + clearBuffer(stream, state); + } + + if (sync) { + if ( + state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb + ) { + state.afterWriteTickInfo.count++; + } else { + state.afterWriteTickInfo = { + count: 1, + cb: (cb as (error?: Error) => void), + stream, + state, + }; + queueMicrotask(() => + afterWriteTick(state.afterWriteTickInfo as AfterWriteTick) + ); + } + } else { + afterWrite(stream, state, 1, cb as (error?: Error) => void); + } + } +} + +function afterWriteTick({ + cb, + count, + state, + stream, +}: AfterWriteTick) { + state.afterWriteTickInfo = null; + return afterWrite(stream, state, count, cb); +} + +function afterWrite( + stream: Writable, + state: WritableState, + count: number, + cb: (error?: Error) => void, +) { + const needDrain = !state.ending && !stream.destroyed && state.length === 0 && + state.needDrain; + if (needDrain) { + state.needDrain = false; + stream.emit("drain"); + } + + while (count-- > 0) { + state.pendingcb--; + cb(); + } + + if (state.destroyed) { + errorBuffer(state); + } + + finishMaybe(stream, state); +} + +/** If there's something in the buffer waiting, then invoke callbacks.*/ +function errorBuffer(state: WritableState) { + if (state.writing) { + return; + } + + for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { + const { chunk, callback } = state.buffered[n]; + const len = state.objectMode ? 1 : chunk.length; + state.length -= len; + callback(new ERR_STREAM_DESTROYED("write")); + } + + for (const callback of state[kOnFinished].splice(0)) { + callback(new ERR_STREAM_DESTROYED("end")); + } + + resetBuffer(state); +} + +/** If there's something in the buffer waiting, then process it.*/ +function clearBuffer(stream: Writable, state: WritableState) { + if ( + state.corked || + state.bufferProcessing || + state.destroyed || + !state.constructed + ) { + return; + } + + const { buffered, bufferedIndex, objectMode } = state; + const bufferedLength = buffered.length - bufferedIndex; + + if (!bufferedLength) { + return; + } + + const i = bufferedIndex; + + state.bufferProcessing = true; + if (bufferedLength > 1 && stream._writev) { + state.pendingcb -= bufferedLength - 1; + + const callback = state.allNoop ? nop : (err: Error) => { + for (let n = i; n < buffered.length; ++n) { + buffered[n].callback(err); + } + }; + const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + + doWrite(stream, state, true, state.length, chunks, "", callback); + + resetBuffer(state); + } else { + do { + const { chunk, encoding, callback } = buffered[i]; + const len = objectMode ? 1 : chunk.length; + doWrite(stream, state, false, len, chunk, encoding, callback); + } while (i < buffered.length && !state.writing); + + if (i === buffered.length) { + resetBuffer(state); + } else if (i > 256) { + buffered.splice(0, i); + state.bufferedIndex = 0; + } else { + state.bufferedIndex = i; + } + } + state.bufferProcessing = false; +} + +function finish(stream: Writable, state: WritableState) { + state.pendingcb--; + if (state.errorEmitted || state.closeEmitted) { + return; + } + + state.finished = true; + + for (const callback of state[kOnFinished].splice(0)) { + callback(); + } + + stream.emit("finish"); + + if (state.autoDestroy) { + stream.destroy(); + } +} + +function finishMaybe(stream: Writable, state: WritableState, sync?: boolean) { + if (needFinish(state)) { + prefinish(stream, state); + if (state.pendingcb === 0 && needFinish(state)) { + state.pendingcb++; + if (sync) { + queueMicrotask(() => finish(stream, state)); + } else { + finish(stream, state); + } + } + } +} + +function prefinish(stream: Writable, state: WritableState) { + if (!state.prefinished && !state.finalCalled) { + if (typeof stream._final === "function" && !state.destroyed) { + state.finalCalled = true; + + state.sync = true; + state.pendingcb++; + stream._final((err) => { + state.pendingcb--; + if (err) { + for (const callback of state[kOnFinished].splice(0)) { + callback(err); + } + errorOrDestroy(stream, err, state.sync); + } else if (needFinish(state)) { + state.prefinished = true; + stream.emit("prefinish"); + state.pendingcb++; + queueMicrotask(() => finish(stream, state)); + } + }); + state.sync = false; + } else { + state.prefinished = true; + stream.emit("prefinish"); + } + } +} + +function needFinish(state: WritableState) { + return (state.ending && + state.constructed && + state.length === 0 && + !state.errored && + state.buffered.length === 0 && + !state.finished && + !state.writing); +} + +interface WritableOptions { + autoDestroy?: boolean; + decodeStrings?: boolean; + //TODO(Soremwar) + //Bring encodings in + defaultEncoding?: string; + destroy?( + this: Writable, + error: Error | null, + callback: (error: Error | null) => void, + ): void; + emitClose?: boolean; + final?(this: Writable, callback: (error?: Error | null) => void): void; + highWaterMark?: number; + objectMode?: boolean; + //TODO(Soremwar) + //Bring encodings in + write?( + this: Writable, + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + callback: (error?: Error | null) => void, + ): void; + //TODO(Soremwar) + //Bring encodings in + writev?( + this: Writable, + // deno-lint-ignore no-explicit-any + chunks: Array<{ chunk: any; encoding: string }>, + callback: (error?: Error | null) => void, + ): void; +} + +class WritableState { + [kOnFinished]: Array<(error?: Error) => void> = []; + afterWriteTickInfo: null | AfterWriteTick = null; + allBuffers = true; + allNoop = true; + autoDestroy: boolean; + //TODO(Soremwar) + //Bring in encodings + buffered: Array<{ + allBuffers?: boolean; + // deno-lint-ignore no-explicit-any + chunk: any; + encoding: string; + callback: (error: Error) => void; + }> = []; + bufferedIndex = 0; + bufferProcessing = false; + closed = false; + closeEmitted = false; + constructed: boolean; + corked = 0; + decodeStrings: boolean; + defaultEncoding: string; + destroyed = false; + emitClose: boolean; + ended = false; + ending = false; + errored: Error | null = null; + errorEmitted = false; + finalCalled = false; + finished = false; + highWaterMark: number; + length = 0; + needDrain = false; + objectMode: boolean; + onwrite: (error?: Error | null) => void; + pendingcb = 0; + prefinished = false; + sync = true; + writecb: null | ((error: Error) => void) = null; + writable = true; + writelen = 0; + writing = false; + + constructor(options: WritableOptions | undefined, stream: Writable) { + this.objectMode = !!options?.objectMode; + + this.highWaterMark = options?.highWaterMark ?? + (this.objectMode ? 16 : 16 * 1024); + + if (Number.isInteger(this.highWaterMark) && this.highWaterMark >= 0) { + this.highWaterMark = Math.floor(this.highWaterMark); + } else { + throw new ERR_INVALID_OPT_VALUE("highWaterMark", this.highWaterMark); + } + + this.decodeStrings = !options?.decodeStrings === false; + + this.defaultEncoding = options?.defaultEncoding || "utf8"; + + this.onwrite = onwrite.bind(undefined, stream); + + resetBuffer(this); + + this.emitClose = options?.emitClose ?? true; + this.autoDestroy = options?.autoDestroy ?? true; + this.constructed = true; + } + + getBuffer() { + return this.buffered.slice(this.bufferedIndex); + } + + get bufferedRequestCount() { + return this.buffered.length - this.bufferedIndex; + } +} + +function resetBuffer(state: WritableState) { + state.buffered = []; + state.bufferedIndex = 0; + state.allBuffers = true; + state.allNoop = true; +} + +/** A bit simpler than readable streams. +* Implement an async `._write(chunk, encoding, cb)`, and it'll handle all +* the drain event emission and buffering. +*/ +class Writable extends Stream { + _construct?: (cb: (error?: Error) => void) => void; + _final?: ( + this: Writable, + callback: (error?: Error | null | undefined) => void, + ) => void; + _writableState: WritableState; + _writev?: write_v | null = null; + + constructor(options?: WritableOptions) { + super(); + this._writableState = new WritableState(options, this); + + if (options) { + if (typeof options.write === "function") { + this._write = options.write; + } + + if (typeof options.writev === "function") { + this._writev = options.writev; + } + + if (typeof options.destroy === "function") { + this._destroy = options.destroy; + } + + if (typeof options.final === "function") { + this._final = options.final; + } + } + + construct(this, () => { + const state = this._writableState; + + if (!state.writing) { + clearBuffer(this, state); + } + + finishMaybe(this, state); + }); + } + + [captureRejectionSymbol](err?: Error) { + this.destroy(err); + } + + static WritableState = WritableState; + + get destroyed() { + return this._writableState ? this._writableState.destroyed : false; + } + + set destroyed(value) { + if (this._writableState) { + this._writableState.destroyed = value; + } + } + + get writable() { + const w = this._writableState; + return !w.destroyed && !w.errored && !w.ending && !w.ended; + } + + set writable(val) { + if (this._writableState) { + this._writableState.writable = !!val; + } + } + + get writableFinished() { + return this._writableState ? this._writableState.finished : false; + } + + get writableObjectMode() { + return this._writableState ? this._writableState.objectMode : false; + } + + get writableBuffer() { + return this._writableState && this._writableState.getBuffer(); + } + + get writableEnded() { + return this._writableState ? this._writableState.ending : false; + } + + get writableHighWaterMark() { + return this._writableState && this._writableState.highWaterMark; + } + + get writableCorked() { + return this._writableState ? this._writableState.corked : 0; + } + + get writableLength() { + return this._writableState && this._writableState.length; + } + + _undestroy() { + const w = this._writableState; + w.constructed = true; + w.destroyed = false; + w.closed = false; + w.closeEmitted = false; + w.errored = null; + w.errorEmitted = false; + w.ended = false; + w.ending = false; + w.finalCalled = false; + w.prefinished = false; + w.finished = false; + } + + _destroy(err: Error | null, cb: (error?: Error | null) => void) { + cb(err); + } + + destroy(err?: Error, cb?: () => void) { + const state = this._writableState; + if (!state.destroyed) { + queueMicrotask(() => errorBuffer(state)); + } + destroy.call(this, err, cb); + return this; + } + + end(cb?: () => void): void; + // deno-lint-ignore no-explicit-any + end(chunk: any, cb?: () => void): void; + //TODO(Soremwar) + //Bring in encodings + // deno-lint-ignore no-explicit-any + end(chunk: any, encoding: string, cb?: () => void): void; + + end( + // deno-lint-ignore no-explicit-any + x?: any | (() => void), + //TODO(Soremwar) + //Bring in encodings + y?: string | (() => void), + z?: () => void, + ) { + const state = this._writableState; + // deno-lint-ignore no-explicit-any + let chunk: any | null; + //TODO(Soremwar) + //Bring in encodings + let encoding: string | null; + let cb: undefined | ((error?: Error) => void); + + if (typeof x === "function") { + chunk = null; + encoding = null; + cb = x; + } else if (typeof y === "function") { + chunk = x; + encoding = null; + cb = y; + } else { + chunk = x; + encoding = y as string; + cb = z; + } + + if (chunk !== null && chunk !== undefined) { + this.write(chunk, encoding); + } + + if (state.corked) { + state.corked = 1; + this.uncork(); + } + + let err: Error | undefined; + if (!state.errored && !state.ending) { + state.ending = true; + finishMaybe(this, state, true); + state.ended = true; + } else if (state.finished) { + err = new ERR_STREAM_ALREADY_FINISHED("end"); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED("end"); + } + + if (typeof cb === "function") { + if (err || state.finished) { + queueMicrotask(() => { + (cb as (error?: Error | undefined) => void)(err); + }); + } else { + state[kOnFinished].push(cb); + } + } + + return this; + } + + //TODO(Soremwar) + //Bring in encodings + _write( + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string, + cb: (error?: Error | null) => void, + ): void { + if (this._writev) { + this._writev([{ chunk, encoding }], cb); + } else { + throw new ERR_METHOD_NOT_IMPLEMENTED("_write()"); + } + } + + //This signature was changed to keep inheritance coherent + pipe(dest: Writable): Writable { + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); + return dest; + } + + // deno-lint-ignore no-explicit-any + write(chunk: any, cb?: (error: Error | null | undefined) => void): boolean; + //TODO(Soremwar) + //Bring in encodings + write( + // deno-lint-ignore no-explicit-any + chunk: any, + encoding: string | null, + cb?: (error: Error | null | undefined) => void, + ): boolean; + + //TODO(Soremwar) + //Bring in encodings + write( + // deno-lint-ignore no-explicit-any + chunk: any, + x?: string | null | ((error: Error | null | undefined) => void), + y?: ((error: Error | null | undefined) => void), + ) { + const state = this._writableState; + //TODO(Soremwar) + //Bring in encodings + let encoding: string; + let cb: (error?: Error | null) => void; + + if (typeof x === "function") { + cb = x; + encoding = state.defaultEncoding; + } else { + if (!x) { + encoding = state.defaultEncoding; + } else if (x !== "buffer" && !Buffer.isEncoding(x)) { + throw new ERR_UNKNOWN_ENCODING(x); + } else { + encoding = x; + } + if (typeof y !== "function") { + cb = nop; + } else { + cb = y; + } + } + + if (chunk === null) { + throw new ERR_STREAM_NULL_VALUES(); + } else if (!state.objectMode) { + if (typeof chunk === "string") { + if (state.decodeStrings !== false) { + chunk = Buffer.from(chunk, encoding); + encoding = "buffer"; + } + } else if (chunk instanceof Buffer) { + encoding = "buffer"; + } else if (Stream._isUint8Array(chunk)) { + chunk = Stream._uint8ArrayToBuffer(chunk); + encoding = "buffer"; + } else { + throw new ERR_INVALID_ARG_TYPE( + "chunk", + ["string", "Buffer", "Uint8Array"], + chunk, + ); + } + } + + let err: Error | undefined; + if (state.ending) { + err = new ERR_STREAM_WRITE_AFTER_END(); + } else if (state.destroyed) { + err = new ERR_STREAM_DESTROYED("write"); + } + + if (err) { + queueMicrotask(() => cb(err)); + errorOrDestroy(this, err, true); + return false; + } + state.pendingcb++; + return writeOrBuffer(this, state, chunk, encoding, cb); + } + + cork() { + this._writableState.corked++; + } + + uncork() { + const state = this._writableState; + + if (state.corked) { + state.corked--; + + if (!state.writing) { + clearBuffer(this, state); + } + } + } + + //TODO(Soremwar) + //Bring allowed encodings + setDefaultEncoding(encoding: string) { + // node::ParseEncoding() requires lower case. + if (typeof encoding === "string") { + encoding = encoding.toLowerCase(); + } + if (!Buffer.isEncoding(encoding)) { + throw new ERR_UNKNOWN_ENCODING(encoding); + } + this._writableState.defaultEncoding = encoding; + return this; + } +} + +export default Writable; +export { WritableState }; diff --git a/std/node/_stream/writable_test.ts b/std/node/_stream/writable_test.ts new file mode 100644 index 000000000..d0650c49e --- /dev/null +++ b/std/node/_stream/writable_test.ts @@ -0,0 +1,209 @@ +// Copyright Node.js contributors. All rights reserved. MIT License. +import { Buffer } from "../buffer.ts"; +import finished from "./end-of-stream.ts"; +import Writable from "../_stream/writable.ts"; +import { deferred } from "../../async/mod.ts"; +import { + assert, + assertEquals, + assertStrictEquals, + assertThrows, +} from "../../testing/asserts.ts"; + +Deno.test("Writable stream writes correctly", async () => { + let callback: undefined | ((error?: Error | null | undefined) => void); + + let writeExecuted = 0; + const writeExecutedExpected = 1; + const writeExpectedExecutions = deferred(); + + let writevExecuted = 0; + const writevExecutedExpected = 1; + const writevExpectedExecutions = deferred(); + + const writable = new Writable({ + write: (chunk, encoding, cb) => { + writeExecuted++; + if (writeExecuted == writeExecutedExpected) { + writeExpectedExecutions.resolve(); + } + assert(chunk instanceof Buffer); + assertStrictEquals(encoding, "buffer"); + assertStrictEquals(String(chunk), "ABC"); + callback = cb; + }, + writev: (chunks) => { + writevExecuted++; + if (writevExecuted == writevExecutedExpected) { + writevExpectedExecutions.resolve(); + } + assertStrictEquals(chunks.length, 2); + assertStrictEquals(chunks[0].encoding, "buffer"); + assertStrictEquals(chunks[1].encoding, "buffer"); + assertStrictEquals(chunks[0].chunk + chunks[1].chunk, "DEFGHI"); + }, + }); + + writable.write(new TextEncoder().encode("ABC")); + writable.write(new TextEncoder().encode("DEF")); + writable.end(new TextEncoder().encode("GHI")); + callback?.(); + + const writeTimeout = setTimeout( + () => writeExpectedExecutions.reject(), + 1000, + ); + const writevTimeout = setTimeout( + () => writevExpectedExecutions.reject(), + 1000, + ); + await writeExpectedExecutions; + await writevExpectedExecutions; + clearTimeout(writeTimeout); + clearTimeout(writevTimeout); + assertEquals(writeExecuted, writeExecutedExpected); + assertEquals(writevExecuted, writevExecutedExpected); +}); + +Deno.test("Writable stream writes Uint8Array in object mode", async () => { + let writeExecuted = 0; + const writeExecutedExpected = 1; + const writeExpectedExecutions = deferred(); + + const ABC = new TextEncoder().encode("ABC"); + + const writable = new Writable({ + objectMode: true, + write: (chunk, encoding, cb) => { + writeExecuted++; + if (writeExecuted == writeExecutedExpected) { + writeExpectedExecutions.resolve(); + } + assert(!(chunk instanceof Buffer)); + assert(chunk instanceof Uint8Array); + assertEquals(chunk, ABC); + assertEquals(encoding, "utf8"); + cb(); + }, + }); + + writable.end(ABC); + + const writeTimeout = setTimeout( + () => writeExpectedExecutions.reject(), + 1000, + ); + await writeExpectedExecutions; + clearTimeout(writeTimeout); + assertEquals(writeExecuted, writeExecutedExpected); +}); + +Deno.test("Writable stream throws on unexpected close", async () => { + let finishedExecuted = 0; + const finishedExecutedExpected = 1; + const finishedExpectedExecutions = deferred(); + + const writable = new Writable({ + write: () => {}, + }); + writable.writable = false; + writable.destroy(); + + finished(writable, (err) => { + finishedExecuted++; + if (finishedExecuted == finishedExecutedExpected) { + finishedExpectedExecutions.resolve(); + } + assertEquals(err?.code, "ERR_STREAM_PREMATURE_CLOSE"); + }); + + const finishedTimeout = setTimeout( + () => finishedExpectedExecutions.reject(), + 1000, + ); + await finishedExpectedExecutions; + clearTimeout(finishedTimeout); + assertEquals(finishedExecuted, finishedExecutedExpected); +}); + +Deno.test("Writable stream finishes correctly", async () => { + let finishedExecuted = 0; + const finishedExecutedExpected = 1; + const finishedExpectedExecutions = deferred(); + + const w = new Writable({ + write(_chunk, _encoding, cb) { + cb(); + }, + autoDestroy: false, + }); + + w.end("asd"); + + queueMicrotask(() => { + finished(w, () => { + finishedExecuted++; + if (finishedExecuted == finishedExecutedExpected) { + finishedExpectedExecutions.resolve(); + } + }); + }); + + const finishedTimeout = setTimeout( + () => finishedExpectedExecutions.reject(), + 1000, + ); + await finishedExpectedExecutions; + clearTimeout(finishedTimeout); + assertEquals(finishedExecuted, finishedExecutedExpected); +}); + +Deno.test("Writable stream finishes correctly after error", async () => { + let errorExecuted = 0; + const errorExecutedExpected = 1; + const errorExpectedExecutions = deferred(); + + let finishedExecuted = 0; + const finishedExecutedExpected = 1; + const finishedExpectedExecutions = deferred(); + + const w = new Writable({ + write(_chunk, _encoding, cb) { + cb(new Error()); + }, + autoDestroy: false, + }); + w.write("asd"); + w.on("error", () => { + errorExecuted++; + if (errorExecuted == errorExecutedExpected) { + errorExpectedExecutions.resolve(); + } + finished(w, () => { + finishedExecuted++; + if (finishedExecuted == finishedExecutedExpected) { + finishedExpectedExecutions.resolve(); + } + }); + }); + + const errorTimeout = setTimeout( + () => errorExpectedExecutions.reject(), + 1000, + ); + const finishedTimeout = setTimeout( + () => finishedExpectedExecutions.reject(), + 1000, + ); + await finishedExpectedExecutions; + await errorExpectedExecutions; + clearTimeout(finishedTimeout); + clearTimeout(errorTimeout); + assertEquals(finishedExecuted, finishedExecutedExpected); + assertEquals(errorExecuted, errorExecutedExpected); +}); + +Deno.test("Writable stream fails on 'write' null value", () => { + const writable = new Writable(); + assertThrows(() => writable.write(null)); +}); diff --git a/std/node/_util.ts b/std/node/_util.ts index b21743541..78a9e1d8b 100644 --- a/std/node/_util.ts +++ b/std/node/_util.ts @@ -1,15 +1,11 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. export { promisify } from "./_util/_util_promisify.ts"; export { callbackify } from "./_util/_util_callbackify.ts"; -import { codes, errorMap } from "./_errors.ts"; +import { ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, errorMap } from "./_errors.ts"; import * as types from "./_util/_util_types.ts"; export { types }; const NumberIsSafeInteger = Number.isSafeInteger; -const { - ERR_OUT_OF_RANGE, - ERR_INVALID_ARG_TYPE, -} = codes; const DEFAULT_INSPECT_OPTIONS = { showHidden: false, diff --git a/std/node/_utils.ts b/std/node/_utils.ts index b2745bf5a..cb91fac27 100644 --- a/std/node/_utils.ts +++ b/std/node/_utils.ts @@ -131,3 +131,15 @@ export function validateIntegerRange( ); } } + +type OptionalSpread<T> = T extends undefined ? [] + : [T]; + +export function once(callback: (...args: OptionalSpread<undefined>) => void) { + let called = false; + return function (this: unknown, ...args: OptionalSpread<undefined>) { + if (called) return; + called = true; + callback.apply(this, args); + }; +} diff --git a/std/node/assertion_error.ts b/std/node/assertion_error.ts index 09e882f01..14b55156a 100644 --- a/std/node/assertion_error.ts +++ b/std/node/assertion_error.ts @@ -44,8 +44,7 @@ const { keys: ObjectKeys, } = Object; -import { codes } from "./_errors.ts"; -const { ERR_INVALID_ARG_TYPE } = codes; +import { ERR_INVALID_ARG_TYPE } from "./_errors.ts"; let blue = ""; let green = ""; |