diff options
Diffstat (limited to 'ext/node/polyfills/internal/stream_base_commons.ts')
-rw-r--r-- | ext/node/polyfills/internal/stream_base_commons.ts | 355 |
1 files changed, 355 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal/stream_base_commons.ts b/ext/node/polyfills/internal/stream_base_commons.ts new file mode 100644 index 000000000..dd1c74d0f --- /dev/null +++ b/ext/node/polyfills/internal/stream_base_commons.ts @@ -0,0 +1,355 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +import { ownerSymbol } from "internal:deno_node/polyfills/internal/async_hooks.ts"; +import { + kArrayBufferOffset, + kBytesWritten, + kLastWriteWasAsync, + LibuvStreamWrap, + streamBaseState, + WriteWrap, +} from "internal:deno_node/polyfills/internal_binding/stream_wrap.ts"; +import { isUint8Array } from "internal:deno_node/polyfills/internal/util/types.ts"; +import { errnoException } from "internal:deno_node/polyfills/internal/errors.ts"; +import { + getTimerDuration, + kTimeout, +} from "internal:deno_node/polyfills/internal/timers.mjs"; +import { setUnrefTimeout } from "internal:deno_node/polyfills/timers.ts"; +import { validateFunction } from "internal:deno_node/polyfills/internal/validators.mjs"; +import { codeMap } from "internal:deno_node/polyfills/internal_binding/uv.ts"; +import { Buffer } from "internal:deno_node/polyfills/buffer.ts"; + +export const kMaybeDestroy = Symbol("kMaybeDestroy"); +export const kUpdateTimer = Symbol("kUpdateTimer"); +export const kAfterAsyncWrite = Symbol("kAfterAsyncWrite"); +export const kHandle = Symbol("kHandle"); +export const kSession = Symbol("kSession"); +export const kBuffer = Symbol("kBuffer"); +export const kBufferGen = Symbol("kBufferGen"); +export const kBufferCb = Symbol("kBufferCb"); + +// deno-lint-ignore no-explicit-any +function handleWriteReq(req: any, data: any, encoding: string) { + const { handle } = req; + + switch (encoding) { + case "buffer": { + const ret = handle.writeBuffer(req, data); + + if (streamBaseState[kLastWriteWasAsync]) { + req.buffer = data; + } + + return ret; + } + case "latin1": + case "binary": + return handle.writeLatin1String(req, data); + case "utf8": + case "utf-8": + return handle.writeUtf8String(req, data); + case "ascii": + return handle.writeAsciiString(req, data); + case "ucs2": + case "ucs-2": + case "utf16le": + case "utf-16le": + return handle.writeUcs2String(req, data); + default: { + const buffer = Buffer.from(data, encoding); + const ret = handle.writeBuffer(req, buffer); + + if (streamBaseState[kLastWriteWasAsync]) { + req.buffer = buffer; + } + + return ret; + } + } +} + +// deno-lint-ignore no-explicit-any +function onWriteComplete(this: any, status: number) { + let stream = this.handle[ownerSymbol]; + + if (stream.constructor.name === "ReusedHandle") { + stream = stream.handle; + } + + if (stream.destroyed) { + if (typeof this.callback === "function") { + this.callback(null); + } + + return; + } + + if (status < 0) { + const ex = errnoException(status, "write", this.error); + + if (typeof this.callback === "function") { + this.callback(ex); + } else { + stream.destroy(ex); + } + + return; + } + + stream[kUpdateTimer](); + stream[kAfterAsyncWrite](this); + + if (typeof this.callback === "function") { + this.callback(null); + } +} + +function createWriteWrap( + handle: LibuvStreamWrap, + callback: (err?: Error | null) => void, +) { + const req = new WriteWrap<LibuvStreamWrap>(); + + req.handle = handle; + req.oncomplete = onWriteComplete; + req.async = false; + req.bytes = 0; + req.buffer = null; + req.callback = callback; + + return req; +} + +export function writevGeneric( + // deno-lint-ignore no-explicit-any + owner: any, + // deno-lint-ignore no-explicit-any + data: any, + cb: (err?: Error | null) => void, +) { + const req = createWriteWrap(owner[kHandle], cb); + const allBuffers = data.allBuffers; + let chunks; + + if (allBuffers) { + chunks = data; + + for (let i = 0; i < data.length; i++) { + data[i] = data[i].chunk; + } + } else { + chunks = new Array(data.length << 1); + + for (let i = 0; i < data.length; i++) { + const entry = data[i]; + chunks[i * 2] = entry.chunk; + chunks[i * 2 + 1] = entry.encoding; + } + } + + const err = req.handle.writev(req, chunks, allBuffers); + + // Retain chunks + if (err === 0) { + req._chunks = chunks; + } + + afterWriteDispatched(req, err, cb); + + return req; +} + +export function writeGeneric( + // deno-lint-ignore no-explicit-any + owner: any, + // deno-lint-ignore no-explicit-any + data: any, + encoding: string, + cb: (err?: Error | null) => void, +) { + const req = createWriteWrap(owner[kHandle], cb); + const err = handleWriteReq(req, data, encoding); + + afterWriteDispatched(req, err, cb); + + return req; +} + +function afterWriteDispatched( + // deno-lint-ignore no-explicit-any + req: any, + err: number, + cb: (err?: Error | null) => void, +) { + req.bytes = streamBaseState[kBytesWritten]; + req.async = !!streamBaseState[kLastWriteWasAsync]; + + if (err !== 0) { + return cb(errnoException(err, "write", req.error)); + } + + if (!req.async && typeof req.callback === "function") { + req.callback(); + } +} + +// Here we differ from Node slightly. Node makes use of the `kReadBytesOrError` +// entry of the `streamBaseState` array from the `stream_wrap` internal binding. +// Here we pass the `nread` value directly to this method as async Deno APIs +// don't grant us the ability to rely on some mutable array entry setting. +export function onStreamRead( + // deno-lint-ignore no-explicit-any + this: any, + arrayBuffer: Uint8Array, + nread: number, +) { + // deno-lint-ignore no-this-alias + const handle = this; + + let stream = this[ownerSymbol]; + + if (stream.constructor.name === "ReusedHandle") { + stream = stream.handle; + } + + stream[kUpdateTimer](); + + if (nread > 0 && !stream.destroyed) { + let ret; + let result; + const userBuf = stream[kBuffer]; + + if (userBuf) { + result = stream[kBufferCb](nread, userBuf) !== false; + const bufGen = stream[kBufferGen]; + + if (bufGen !== null) { + const nextBuf = bufGen(); + + if (isUint8Array(nextBuf)) { + stream[kBuffer] = ret = nextBuf; + } + } + } else { + const offset = streamBaseState[kArrayBufferOffset]; + const buf = Buffer.from(arrayBuffer, offset, nread); + result = stream.push(buf); + } + + if (!result) { + handle.reading = false; + + if (!stream.destroyed) { + const err = handle.readStop(); + + if (err) { + stream.destroy(errnoException(err, "read")); + } + } + } + + return ret; + } + + if (nread === 0) { + return; + } + + if (nread !== codeMap.get("EOF")) { + // CallJSOnreadMethod expects the return value to be a buffer. + // Ref: https://github.com/nodejs/node/pull/34375 + stream.destroy(errnoException(nread, "read")); + + return; + } + + // Defer this until we actually emit end + if (stream._readableState.endEmitted) { + if (stream[kMaybeDestroy]) { + stream[kMaybeDestroy](); + } + } else { + if (stream[kMaybeDestroy]) { + stream.on("end", stream[kMaybeDestroy]); + } + + if (handle.readStop) { + const err = handle.readStop(); + + if (err) { + // CallJSOnreadMethod expects the return value to be a buffer. + // Ref: https://github.com/nodejs/node/pull/34375 + stream.destroy(errnoException(err, "read")); + + return; + } + } + + // Push a null to signal the end of data. + // Do it before `maybeDestroy` for correct order of events: + // `end` -> `close` + stream.push(null); + stream.read(0); + } +} + +export function setStreamTimeout( + // deno-lint-ignore no-explicit-any + this: any, + msecs: number, + callback?: () => void, +) { + if (this.destroyed) { + return this; + } + + this.timeout = msecs; + + // Type checking identical to timers.enroll() + msecs = getTimerDuration(msecs, "msecs"); + + // Attempt to clear an existing timer in both cases - + // even if it will be rescheduled we don't want to leak an existing timer. + clearTimeout(this[kTimeout]); + + if (msecs === 0) { + if (callback !== undefined) { + validateFunction(callback, "callback"); + this.removeListener("timeout", callback); + } + } else { + this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); + + if (this[kSession]) { + this[kSession][kUpdateTimer](); + } + + if (callback !== undefined) { + validateFunction(callback, "callback"); + this.once("timeout", callback); + } + } + + return this; +} |