summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/internal_binding/pipe_wrap.ts
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/internal_binding/pipe_wrap.ts')
-rw-r--r--ext/node/polyfills/internal_binding/pipe_wrap.ts397
1 files changed, 397 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts
new file mode 100644
index 000000000..1e0d551a4
--- /dev/null
+++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts
@@ -0,0 +1,397 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+// This module ports:
+// - https://github.com/nodejs/node/blob/master/src/pipe_wrap.cc
+// - https://github.com/nodejs/node/blob/master/src/pipe_wrap.h
+
+import { notImplemented } from "internal:deno_node/polyfills/_utils.ts";
+import { unreachable } from "internal:deno_node/polyfills/_util/asserts.ts";
+import { ConnectionWrap } from "internal:deno_node/polyfills/internal_binding/connection_wrap.ts";
+import {
+ AsyncWrap,
+ providerType,
+} from "internal:deno_node/polyfills/internal_binding/async_wrap.ts";
+import { LibuvStreamWrap } from "internal:deno_node/polyfills/internal_binding/stream_wrap.ts";
+import { codeMap } from "internal:deno_node/polyfills/internal_binding/uv.ts";
+import { delay } from "internal:deno_node/polyfills/_util/async.ts";
+import { kStreamBaseField } from "internal:deno_node/polyfills/internal_binding/stream_wrap.ts";
+import {
+ ceilPowOf2,
+ INITIAL_ACCEPT_BACKOFF_DELAY,
+ MAX_ACCEPT_BACKOFF_DELAY,
+} from "internal:deno_node/polyfills/internal_binding/_listen.ts";
+import { isWindows } from "internal:deno_node/polyfills/_util/os.ts";
+import { fs } from "internal:deno_node/polyfills/internal_binding/constants.ts";
+
+export enum socketType {
+ SOCKET,
+ SERVER,
+ IPC,
+}
+
+export class Pipe extends ConnectionWrap {
+ override reading = false;
+ ipc: boolean;
+
+ // REF: https://github.com/nodejs/node/blob/master/deps/uv/src/win/pipe.c#L48
+ #pendingInstances = 4;
+
+ #address?: string;
+
+ #backlog?: number;
+ #listener!: Deno.Listener;
+ #connections = 0;
+
+ #closed = false;
+ #acceptBackoffDelay?: number;
+
+ constructor(type: number, conn?: Deno.UnixConn) {
+ let provider: providerType;
+ let ipc: boolean;
+
+ switch (type) {
+ case socketType.SOCKET: {
+ provider = providerType.PIPEWRAP;
+ ipc = false;
+
+ break;
+ }
+ case socketType.SERVER: {
+ provider = providerType.PIPESERVERWRAP;
+ ipc = false;
+
+ break;
+ }
+ case socketType.IPC: {
+ provider = providerType.PIPEWRAP;
+ ipc = true;
+
+ break;
+ }
+ default: {
+ unreachable();
+ }
+ }
+
+ super(provider, conn);
+
+ this.ipc = ipc;
+
+ if (conn && provider === providerType.PIPEWRAP) {
+ const localAddr = conn.localAddr as Deno.UnixAddr;
+ this.#address = localAddr.path;
+ }
+ }
+
+ open(_fd: number): number {
+ // REF: https://github.com/denoland/deno/issues/6529
+ notImplemented("Pipe.prototype.open");
+ }
+
+ /**
+ * Bind to a Unix domain or Windows named pipe.
+ * @param name Unix domain or Windows named pipe the server should listen to.
+ * @return An error status code.
+ */
+ bind(name: string) {
+ // Deno doesn't currently separate bind from connect. For now we noop under
+ // the assumption we will connect shortly.
+ // REF: https://doc.deno.land/deno/unstable/~/Deno.connect
+
+ this.#address = name;
+
+ return 0;
+ }
+
+ /**
+ * Connect to a Unix domain or Windows named pipe.
+ * @param req A PipeConnectWrap instance.
+ * @param address Unix domain or Windows named pipe the server should connect to.
+ * @return An error status code.
+ */
+ connect(req: PipeConnectWrap, address: string) {
+ if (isWindows) {
+ // REF: https://github.com/denoland/deno/issues/10244
+ notImplemented("Pipe.prototype.connect - Windows");
+ }
+
+ const connectOptions: Deno.UnixConnectOptions = {
+ path: address,
+ transport: "unix",
+ };
+
+ Deno.connect(connectOptions).then(
+ (conn: Deno.UnixConn) => {
+ const localAddr = conn.localAddr as Deno.UnixAddr;
+
+ this.#address = req.address = localAddr.path;
+ this[kStreamBaseField] = conn;
+
+ try {
+ this.afterConnect(req, 0);
+ } catch {
+ // swallow callback errors.
+ }
+ },
+ (e) => {
+ // TODO(cmorten): correct mapping of connection error to status code.
+ let code: number;
+
+ if (e instanceof Deno.errors.NotFound) {
+ code = codeMap.get("ENOENT")!;
+ } else if (e instanceof Deno.errors.PermissionDenied) {
+ code = codeMap.get("EACCES")!;
+ } else {
+ code = codeMap.get("ECONNREFUSED")!;
+ }
+
+ try {
+ this.afterConnect(req, code);
+ } catch {
+ // swallow callback errors.
+ }
+ },
+ );
+
+ return 0;
+ }
+
+ /**
+ * Listen for new connections.
+ * @param backlog The maximum length of the queue of pending connections.
+ * @return An error status code.
+ */
+ listen(backlog: number): number {
+ if (isWindows) {
+ // REF: https://github.com/denoland/deno/issues/10244
+ notImplemented("Pipe.prototype.listen - Windows");
+ }
+
+ this.#backlog = isWindows
+ ? this.#pendingInstances
+ : ceilPowOf2(backlog + 1);
+
+ const listenOptions = {
+ path: this.#address!,
+ transport: "unix" as const,
+ };
+
+ let listener;
+
+ try {
+ listener = Deno.listen(listenOptions);
+ } catch (e) {
+ if (e instanceof Deno.errors.AddrInUse) {
+ return codeMap.get("EADDRINUSE")!;
+ } else if (e instanceof Deno.errors.AddrNotAvailable) {
+ return codeMap.get("EADDRNOTAVAIL")!;
+ } else if (e instanceof Deno.errors.PermissionDenied) {
+ throw e;
+ }
+
+ // TODO(cmorten): map errors to appropriate error codes.
+ return codeMap.get("UNKNOWN")!;
+ }
+
+ const address = listener.addr as Deno.UnixAddr;
+ this.#address = address.path;
+
+ this.#listener = listener;
+ this.#accept();
+
+ return 0;
+ }
+
+ override ref() {
+ if (this.#listener) {
+ this.#listener.ref();
+ }
+ }
+
+ override unref() {
+ if (this.#listener) {
+ this.#listener.unref();
+ }
+ }
+
+ /**
+ * Set the number of pending pipe instance handles when the pipe server is
+ * waiting for connections. This setting applies to Windows only.
+ * @param instances Number of pending pipe instances.
+ */
+ setPendingInstances(instances: number) {
+ this.#pendingInstances = instances;
+ }
+
+ /**
+ * Alters pipe permissions, allowing it to be accessed from processes run by
+ * different users. Makes the pipe writable or readable by all users. Mode
+ * can be `UV_WRITABLE`, `UV_READABLE` or `UV_WRITABLE | UV_READABLE`. This
+ * function is blocking.
+ * @param mode Pipe permissions mode.
+ * @return An error status code.
+ */
+ fchmod(mode: number) {
+ if (
+ mode != constants.UV_READABLE &&
+ mode != constants.UV_WRITABLE &&
+ mode != (constants.UV_WRITABLE | constants.UV_READABLE)
+ ) {
+ return codeMap.get("EINVAL");
+ }
+
+ let desiredMode = 0;
+
+ if (mode & constants.UV_READABLE) {
+ desiredMode |= fs.S_IRUSR | fs.S_IRGRP | fs.S_IROTH;
+ }
+ if (mode & constants.UV_WRITABLE) {
+ desiredMode |= fs.S_IWUSR | fs.S_IWGRP | fs.S_IWOTH;
+ }
+
+ // TODO(cmorten): this will incorrectly throw on Windows
+ // REF: https://github.com/denoland/deno/issues/4357
+ try {
+ Deno.chmodSync(this.#address!, desiredMode);
+ } catch {
+ // TODO(cmorten): map errors to appropriate error codes.
+ return codeMap.get("UNKNOWN")!;
+ }
+
+ return 0;
+ }
+
+ /** Handle backoff delays following an unsuccessful accept. */
+ async #acceptBackoff() {
+ // Backoff after transient errors to allow time for the system to
+ // recover, and avoid blocking up the event loop with a continuously
+ // running loop.
+ if (!this.#acceptBackoffDelay) {
+ this.#acceptBackoffDelay = INITIAL_ACCEPT_BACKOFF_DELAY;
+ } else {
+ this.#acceptBackoffDelay *= 2;
+ }
+
+ if (this.#acceptBackoffDelay >= MAX_ACCEPT_BACKOFF_DELAY) {
+ this.#acceptBackoffDelay = MAX_ACCEPT_BACKOFF_DELAY;
+ }
+
+ await delay(this.#acceptBackoffDelay);
+
+ this.#accept();
+ }
+
+ /** Accept new connections. */
+ async #accept(): Promise<void> {
+ if (this.#closed) {
+ return;
+ }
+
+ if (this.#connections > this.#backlog!) {
+ this.#acceptBackoff();
+
+ return;
+ }
+
+ let connection: Deno.Conn;
+
+ try {
+ connection = await this.#listener.accept();
+ } catch (e) {
+ if (e instanceof Deno.errors.BadResource && this.#closed) {
+ // Listener and server has closed.
+ return;
+ }
+
+ try {
+ // TODO(cmorten): map errors to appropriate error codes.
+ this.onconnection!(codeMap.get("UNKNOWN")!, undefined);
+ } catch {
+ // swallow callback errors.
+ }
+
+ this.#acceptBackoff();
+
+ return;
+ }
+
+ // Reset the backoff delay upon successful accept.
+ this.#acceptBackoffDelay = undefined;
+
+ const connectionHandle = new Pipe(socketType.SOCKET, connection);
+ this.#connections++;
+
+ try {
+ this.onconnection!(0, connectionHandle);
+ } catch {
+ // swallow callback errors.
+ }
+
+ return this.#accept();
+ }
+
+ /** Handle server closure. */
+ override _onClose(): number {
+ this.#closed = true;
+ this.reading = false;
+
+ this.#address = undefined;
+
+ this.#backlog = undefined;
+ this.#connections = 0;
+ this.#acceptBackoffDelay = undefined;
+
+ if (this.provider === providerType.PIPESERVERWRAP) {
+ try {
+ this.#listener.close();
+ } catch {
+ // listener already closed
+ }
+ }
+
+ return LibuvStreamWrap.prototype._onClose.call(this);
+ }
+}
+
+export class PipeConnectWrap extends AsyncWrap {
+ oncomplete!: (
+ status: number,
+ handle: ConnectionWrap,
+ req: PipeConnectWrap,
+ readable: boolean,
+ writeable: boolean,
+ ) => void;
+ address!: string;
+
+ constructor() {
+ super(providerType.PIPECONNECTWRAP);
+ }
+}
+
+export enum constants {
+ SOCKET = socketType.SOCKET,
+ SERVER = socketType.SERVER,
+ IPC = socketType.IPC,
+ UV_READABLE = 1,
+ UV_WRITABLE = 2,
+}