diff options
Diffstat (limited to 'ext/node/polyfills/internal_binding/pipe_wrap.ts')
-rw-r--r-- | ext/node/polyfills/internal_binding/pipe_wrap.ts | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts new file mode 100644 index 000000000..1e0d551a4 --- /dev/null +++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts @@ -0,0 +1,397 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// This module ports: +// - https://github.com/nodejs/node/blob/master/src/pipe_wrap.cc +// - https://github.com/nodejs/node/blob/master/src/pipe_wrap.h + +import { notImplemented } from "internal:deno_node/polyfills/_utils.ts"; +import { unreachable } from "internal:deno_node/polyfills/_util/asserts.ts"; +import { ConnectionWrap } from "internal:deno_node/polyfills/internal_binding/connection_wrap.ts"; +import { + AsyncWrap, + providerType, +} from "internal:deno_node/polyfills/internal_binding/async_wrap.ts"; +import { LibuvStreamWrap } from "internal:deno_node/polyfills/internal_binding/stream_wrap.ts"; +import { codeMap } from "internal:deno_node/polyfills/internal_binding/uv.ts"; +import { delay } from "internal:deno_node/polyfills/_util/async.ts"; +import { kStreamBaseField } from "internal:deno_node/polyfills/internal_binding/stream_wrap.ts"; +import { + ceilPowOf2, + INITIAL_ACCEPT_BACKOFF_DELAY, + MAX_ACCEPT_BACKOFF_DELAY, +} from "internal:deno_node/polyfills/internal_binding/_listen.ts"; +import { isWindows } from "internal:deno_node/polyfills/_util/os.ts"; +import { fs } from "internal:deno_node/polyfills/internal_binding/constants.ts"; + +export enum socketType { + SOCKET, + SERVER, + IPC, +} + +export class Pipe extends ConnectionWrap { + override reading = false; + ipc: boolean; + + // REF: https://github.com/nodejs/node/blob/master/deps/uv/src/win/pipe.c#L48 + #pendingInstances = 4; + + #address?: string; + + #backlog?: number; + #listener!: Deno.Listener; + #connections = 0; + + #closed = false; + #acceptBackoffDelay?: number; + + constructor(type: number, conn?: Deno.UnixConn) { + let provider: providerType; + let ipc: boolean; + + switch (type) { + case socketType.SOCKET: { + provider = providerType.PIPEWRAP; + ipc = false; + + break; + } + case socketType.SERVER: { + provider = providerType.PIPESERVERWRAP; + ipc = false; + + break; + } + case socketType.IPC: { + provider = providerType.PIPEWRAP; + ipc = true; + + break; + } + default: { + unreachable(); + } + } + + super(provider, conn); + + this.ipc = ipc; + + if (conn && provider === providerType.PIPEWRAP) { + const localAddr = conn.localAddr as Deno.UnixAddr; + this.#address = localAddr.path; + } + } + + open(_fd: number): number { + // REF: https://github.com/denoland/deno/issues/6529 + notImplemented("Pipe.prototype.open"); + } + + /** + * Bind to a Unix domain or Windows named pipe. + * @param name Unix domain or Windows named pipe the server should listen to. + * @return An error status code. + */ + bind(name: string) { + // Deno doesn't currently separate bind from connect. For now we noop under + // the assumption we will connect shortly. + // REF: https://doc.deno.land/deno/unstable/~/Deno.connect + + this.#address = name; + + return 0; + } + + /** + * Connect to a Unix domain or Windows named pipe. + * @param req A PipeConnectWrap instance. + * @param address Unix domain or Windows named pipe the server should connect to. + * @return An error status code. + */ + connect(req: PipeConnectWrap, address: string) { + if (isWindows) { + // REF: https://github.com/denoland/deno/issues/10244 + notImplemented("Pipe.prototype.connect - Windows"); + } + + const connectOptions: Deno.UnixConnectOptions = { + path: address, + transport: "unix", + }; + + Deno.connect(connectOptions).then( + (conn: Deno.UnixConn) => { + const localAddr = conn.localAddr as Deno.UnixAddr; + + this.#address = req.address = localAddr.path; + this[kStreamBaseField] = conn; + + try { + this.afterConnect(req, 0); + } catch { + // swallow callback errors. + } + }, + (e) => { + // TODO(cmorten): correct mapping of connection error to status code. + let code: number; + + if (e instanceof Deno.errors.NotFound) { + code = codeMap.get("ENOENT")!; + } else if (e instanceof Deno.errors.PermissionDenied) { + code = codeMap.get("EACCES")!; + } else { + code = codeMap.get("ECONNREFUSED")!; + } + + try { + this.afterConnect(req, code); + } catch { + // swallow callback errors. + } + }, + ); + + return 0; + } + + /** + * Listen for new connections. + * @param backlog The maximum length of the queue of pending connections. + * @return An error status code. + */ + listen(backlog: number): number { + if (isWindows) { + // REF: https://github.com/denoland/deno/issues/10244 + notImplemented("Pipe.prototype.listen - Windows"); + } + + this.#backlog = isWindows + ? this.#pendingInstances + : ceilPowOf2(backlog + 1); + + const listenOptions = { + path: this.#address!, + transport: "unix" as const, + }; + + let listener; + + try { + listener = Deno.listen(listenOptions); + } catch (e) { + if (e instanceof Deno.errors.AddrInUse) { + return codeMap.get("EADDRINUSE")!; + } else if (e instanceof Deno.errors.AddrNotAvailable) { + return codeMap.get("EADDRNOTAVAIL")!; + } else if (e instanceof Deno.errors.PermissionDenied) { + throw e; + } + + // TODO(cmorten): map errors to appropriate error codes. + return codeMap.get("UNKNOWN")!; + } + + const address = listener.addr as Deno.UnixAddr; + this.#address = address.path; + + this.#listener = listener; + this.#accept(); + + return 0; + } + + override ref() { + if (this.#listener) { + this.#listener.ref(); + } + } + + override unref() { + if (this.#listener) { + this.#listener.unref(); + } + } + + /** + * Set the number of pending pipe instance handles when the pipe server is + * waiting for connections. This setting applies to Windows only. + * @param instances Number of pending pipe instances. + */ + setPendingInstances(instances: number) { + this.#pendingInstances = instances; + } + + /** + * Alters pipe permissions, allowing it to be accessed from processes run by + * different users. Makes the pipe writable or readable by all users. Mode + * can be `UV_WRITABLE`, `UV_READABLE` or `UV_WRITABLE | UV_READABLE`. This + * function is blocking. + * @param mode Pipe permissions mode. + * @return An error status code. + */ + fchmod(mode: number) { + if ( + mode != constants.UV_READABLE && + mode != constants.UV_WRITABLE && + mode != (constants.UV_WRITABLE | constants.UV_READABLE) + ) { + return codeMap.get("EINVAL"); + } + + let desiredMode = 0; + + if (mode & constants.UV_READABLE) { + desiredMode |= fs.S_IRUSR | fs.S_IRGRP | fs.S_IROTH; + } + if (mode & constants.UV_WRITABLE) { + desiredMode |= fs.S_IWUSR | fs.S_IWGRP | fs.S_IWOTH; + } + + // TODO(cmorten): this will incorrectly throw on Windows + // REF: https://github.com/denoland/deno/issues/4357 + try { + Deno.chmodSync(this.#address!, desiredMode); + } catch { + // TODO(cmorten): map errors to appropriate error codes. + return codeMap.get("UNKNOWN")!; + } + + return 0; + } + + /** Handle backoff delays following an unsuccessful accept. */ + async #acceptBackoff() { + // Backoff after transient errors to allow time for the system to + // recover, and avoid blocking up the event loop with a continuously + // running loop. + if (!this.#acceptBackoffDelay) { + this.#acceptBackoffDelay = INITIAL_ACCEPT_BACKOFF_DELAY; + } else { + this.#acceptBackoffDelay *= 2; + } + + if (this.#acceptBackoffDelay >= MAX_ACCEPT_BACKOFF_DELAY) { + this.#acceptBackoffDelay = MAX_ACCEPT_BACKOFF_DELAY; + } + + await delay(this.#acceptBackoffDelay); + + this.#accept(); + } + + /** Accept new connections. */ + async #accept(): Promise<void> { + if (this.#closed) { + return; + } + + if (this.#connections > this.#backlog!) { + this.#acceptBackoff(); + + return; + } + + let connection: Deno.Conn; + + try { + connection = await this.#listener.accept(); + } catch (e) { + if (e instanceof Deno.errors.BadResource && this.#closed) { + // Listener and server has closed. + return; + } + + try { + // TODO(cmorten): map errors to appropriate error codes. + this.onconnection!(codeMap.get("UNKNOWN")!, undefined); + } catch { + // swallow callback errors. + } + + this.#acceptBackoff(); + + return; + } + + // Reset the backoff delay upon successful accept. + this.#acceptBackoffDelay = undefined; + + const connectionHandle = new Pipe(socketType.SOCKET, connection); + this.#connections++; + + try { + this.onconnection!(0, connectionHandle); + } catch { + // swallow callback errors. + } + + return this.#accept(); + } + + /** Handle server closure. */ + override _onClose(): number { + this.#closed = true; + this.reading = false; + + this.#address = undefined; + + this.#backlog = undefined; + this.#connections = 0; + this.#acceptBackoffDelay = undefined; + + if (this.provider === providerType.PIPESERVERWRAP) { + try { + this.#listener.close(); + } catch { + // listener already closed + } + } + + return LibuvStreamWrap.prototype._onClose.call(this); + } +} + +export class PipeConnectWrap extends AsyncWrap { + oncomplete!: ( + status: number, + handle: ConnectionWrap, + req: PipeConnectWrap, + readable: boolean, + writeable: boolean, + ) => void; + address!: string; + + constructor() { + super(providerType.PIPECONNECTWRAP); + } +} + +export enum constants { + SOCKET = socketType.SOCKET, + SERVER = socketType.SERVER, + IPC = socketType.IPC, + UV_READABLE = 1, + UV_WRITABLE = 2, +} |