summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/internal/stream_base_commons.ts
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/internal/stream_base_commons.ts')
-rw-r--r--ext/node/polyfills/internal/stream_base_commons.ts355
1 files changed, 355 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal/stream_base_commons.ts b/ext/node/polyfills/internal/stream_base_commons.ts
new file mode 100644
index 000000000..dd1c74d0f
--- /dev/null
+++ b/ext/node/polyfills/internal/stream_base_commons.ts
@@ -0,0 +1,355 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import { ownerSymbol } from "internal:deno_node/polyfills/internal/async_hooks.ts";
+import {
+ kArrayBufferOffset,
+ kBytesWritten,
+ kLastWriteWasAsync,
+ LibuvStreamWrap,
+ streamBaseState,
+ WriteWrap,
+} from "internal:deno_node/polyfills/internal_binding/stream_wrap.ts";
+import { isUint8Array } from "internal:deno_node/polyfills/internal/util/types.ts";
+import { errnoException } from "internal:deno_node/polyfills/internal/errors.ts";
+import {
+ getTimerDuration,
+ kTimeout,
+} from "internal:deno_node/polyfills/internal/timers.mjs";
+import { setUnrefTimeout } from "internal:deno_node/polyfills/timers.ts";
+import { validateFunction } from "internal:deno_node/polyfills/internal/validators.mjs";
+import { codeMap } from "internal:deno_node/polyfills/internal_binding/uv.ts";
+import { Buffer } from "internal:deno_node/polyfills/buffer.ts";
+
+export const kMaybeDestroy = Symbol("kMaybeDestroy");
+export const kUpdateTimer = Symbol("kUpdateTimer");
+export const kAfterAsyncWrite = Symbol("kAfterAsyncWrite");
+export const kHandle = Symbol("kHandle");
+export const kSession = Symbol("kSession");
+export const kBuffer = Symbol("kBuffer");
+export const kBufferGen = Symbol("kBufferGen");
+export const kBufferCb = Symbol("kBufferCb");
+
+// deno-lint-ignore no-explicit-any
+function handleWriteReq(req: any, data: any, encoding: string) {
+ const { handle } = req;
+
+ switch (encoding) {
+ case "buffer": {
+ const ret = handle.writeBuffer(req, data);
+
+ if (streamBaseState[kLastWriteWasAsync]) {
+ req.buffer = data;
+ }
+
+ return ret;
+ }
+ case "latin1":
+ case "binary":
+ return handle.writeLatin1String(req, data);
+ case "utf8":
+ case "utf-8":
+ return handle.writeUtf8String(req, data);
+ case "ascii":
+ return handle.writeAsciiString(req, data);
+ case "ucs2":
+ case "ucs-2":
+ case "utf16le":
+ case "utf-16le":
+ return handle.writeUcs2String(req, data);
+ default: {
+ const buffer = Buffer.from(data, encoding);
+ const ret = handle.writeBuffer(req, buffer);
+
+ if (streamBaseState[kLastWriteWasAsync]) {
+ req.buffer = buffer;
+ }
+
+ return ret;
+ }
+ }
+}
+
+// deno-lint-ignore no-explicit-any
+function onWriteComplete(this: any, status: number) {
+ let stream = this.handle[ownerSymbol];
+
+ if (stream.constructor.name === "ReusedHandle") {
+ stream = stream.handle;
+ }
+
+ if (stream.destroyed) {
+ if (typeof this.callback === "function") {
+ this.callback(null);
+ }
+
+ return;
+ }
+
+ if (status < 0) {
+ const ex = errnoException(status, "write", this.error);
+
+ if (typeof this.callback === "function") {
+ this.callback(ex);
+ } else {
+ stream.destroy(ex);
+ }
+
+ return;
+ }
+
+ stream[kUpdateTimer]();
+ stream[kAfterAsyncWrite](this);
+
+ if (typeof this.callback === "function") {
+ this.callback(null);
+ }
+}
+
+function createWriteWrap(
+ handle: LibuvStreamWrap,
+ callback: (err?: Error | null) => void,
+) {
+ const req = new WriteWrap<LibuvStreamWrap>();
+
+ req.handle = handle;
+ req.oncomplete = onWriteComplete;
+ req.async = false;
+ req.bytes = 0;
+ req.buffer = null;
+ req.callback = callback;
+
+ return req;
+}
+
+export function writevGeneric(
+ // deno-lint-ignore no-explicit-any
+ owner: any,
+ // deno-lint-ignore no-explicit-any
+ data: any,
+ cb: (err?: Error | null) => void,
+) {
+ const req = createWriteWrap(owner[kHandle], cb);
+ const allBuffers = data.allBuffers;
+ let chunks;
+
+ if (allBuffers) {
+ chunks = data;
+
+ for (let i = 0; i < data.length; i++) {
+ data[i] = data[i].chunk;
+ }
+ } else {
+ chunks = new Array(data.length << 1);
+
+ for (let i = 0; i < data.length; i++) {
+ const entry = data[i];
+ chunks[i * 2] = entry.chunk;
+ chunks[i * 2 + 1] = entry.encoding;
+ }
+ }
+
+ const err = req.handle.writev(req, chunks, allBuffers);
+
+ // Retain chunks
+ if (err === 0) {
+ req._chunks = chunks;
+ }
+
+ afterWriteDispatched(req, err, cb);
+
+ return req;
+}
+
+export function writeGeneric(
+ // deno-lint-ignore no-explicit-any
+ owner: any,
+ // deno-lint-ignore no-explicit-any
+ data: any,
+ encoding: string,
+ cb: (err?: Error | null) => void,
+) {
+ const req = createWriteWrap(owner[kHandle], cb);
+ const err = handleWriteReq(req, data, encoding);
+
+ afterWriteDispatched(req, err, cb);
+
+ return req;
+}
+
+function afterWriteDispatched(
+ // deno-lint-ignore no-explicit-any
+ req: any,
+ err: number,
+ cb: (err?: Error | null) => void,
+) {
+ req.bytes = streamBaseState[kBytesWritten];
+ req.async = !!streamBaseState[kLastWriteWasAsync];
+
+ if (err !== 0) {
+ return cb(errnoException(err, "write", req.error));
+ }
+
+ if (!req.async && typeof req.callback === "function") {
+ req.callback();
+ }
+}
+
+// Here we differ from Node slightly. Node makes use of the `kReadBytesOrError`
+// entry of the `streamBaseState` array from the `stream_wrap` internal binding.
+// Here we pass the `nread` value directly to this method as async Deno APIs
+// don't grant us the ability to rely on some mutable array entry setting.
+export function onStreamRead(
+ // deno-lint-ignore no-explicit-any
+ this: any,
+ arrayBuffer: Uint8Array,
+ nread: number,
+) {
+ // deno-lint-ignore no-this-alias
+ const handle = this;
+
+ let stream = this[ownerSymbol];
+
+ if (stream.constructor.name === "ReusedHandle") {
+ stream = stream.handle;
+ }
+
+ stream[kUpdateTimer]();
+
+ if (nread > 0 && !stream.destroyed) {
+ let ret;
+ let result;
+ const userBuf = stream[kBuffer];
+
+ if (userBuf) {
+ result = stream[kBufferCb](nread, userBuf) !== false;
+ const bufGen = stream[kBufferGen];
+
+ if (bufGen !== null) {
+ const nextBuf = bufGen();
+
+ if (isUint8Array(nextBuf)) {
+ stream[kBuffer] = ret = nextBuf;
+ }
+ }
+ } else {
+ const offset = streamBaseState[kArrayBufferOffset];
+ const buf = Buffer.from(arrayBuffer, offset, nread);
+ result = stream.push(buf);
+ }
+
+ if (!result) {
+ handle.reading = false;
+
+ if (!stream.destroyed) {
+ const err = handle.readStop();
+
+ if (err) {
+ stream.destroy(errnoException(err, "read"));
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ if (nread === 0) {
+ return;
+ }
+
+ if (nread !== codeMap.get("EOF")) {
+ // CallJSOnreadMethod expects the return value to be a buffer.
+ // Ref: https://github.com/nodejs/node/pull/34375
+ stream.destroy(errnoException(nread, "read"));
+
+ return;
+ }
+
+ // Defer this until we actually emit end
+ if (stream._readableState.endEmitted) {
+ if (stream[kMaybeDestroy]) {
+ stream[kMaybeDestroy]();
+ }
+ } else {
+ if (stream[kMaybeDestroy]) {
+ stream.on("end", stream[kMaybeDestroy]);
+ }
+
+ if (handle.readStop) {
+ const err = handle.readStop();
+
+ if (err) {
+ // CallJSOnreadMethod expects the return value to be a buffer.
+ // Ref: https://github.com/nodejs/node/pull/34375
+ stream.destroy(errnoException(err, "read"));
+
+ return;
+ }
+ }
+
+ // Push a null to signal the end of data.
+ // Do it before `maybeDestroy` for correct order of events:
+ // `end` -> `close`
+ stream.push(null);
+ stream.read(0);
+ }
+}
+
+export function setStreamTimeout(
+ // deno-lint-ignore no-explicit-any
+ this: any,
+ msecs: number,
+ callback?: () => void,
+) {
+ if (this.destroyed) {
+ return this;
+ }
+
+ this.timeout = msecs;
+
+ // Type checking identical to timers.enroll()
+ msecs = getTimerDuration(msecs, "msecs");
+
+ // Attempt to clear an existing timer in both cases -
+ // even if it will be rescheduled we don't want to leak an existing timer.
+ clearTimeout(this[kTimeout]);
+
+ if (msecs === 0) {
+ if (callback !== undefined) {
+ validateFunction(callback, "callback");
+ this.removeListener("timeout", callback);
+ }
+ } else {
+ this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs);
+
+ if (this[kSession]) {
+ this[kSession][kUpdateTimer]();
+ }
+
+ if (callback !== undefined) {
+ validateFunction(callback, "callback");
+ this.once("timeout", callback);
+ }
+ }
+
+ return this;
+}