diff options
Diffstat (limited to 'ext/node/polyfills/dgram.ts')
-rw-r--r-- | ext/node/polyfills/dgram.ts | 1558 |
1 files changed, 1558 insertions, 0 deletions
diff --git a/ext/node/polyfills/dgram.ts b/ext/node/polyfills/dgram.ts new file mode 100644 index 000000000..222421eb0 --- /dev/null +++ b/ext/node/polyfills/dgram.ts @@ -0,0 +1,1558 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +import { Buffer } from "internal:deno_node/polyfills/buffer.ts"; +import { EventEmitter } from "internal:deno_node/polyfills/events.ts"; +import { lookup as defaultLookup } from "internal:deno_node/polyfills/dns.ts"; +import type { + ErrnoException, + NodeSystemErrorCtx, +} from "internal:deno_node/polyfills/internal/errors.ts"; +import { + ERR_BUFFER_OUT_OF_BOUNDS, + ERR_INVALID_ARG_TYPE, + ERR_INVALID_FD_TYPE, + ERR_MISSING_ARGS, + ERR_SOCKET_ALREADY_BOUND, + ERR_SOCKET_BAD_BUFFER_SIZE, + ERR_SOCKET_BUFFER_SIZE, + ERR_SOCKET_DGRAM_IS_CONNECTED, + ERR_SOCKET_DGRAM_NOT_CONNECTED, + ERR_SOCKET_DGRAM_NOT_RUNNING, + errnoException, + exceptionWithHostPort, +} from "internal:deno_node/polyfills/internal/errors.ts"; +import type { Abortable } from "internal:deno_node/polyfills/_events.d.ts"; +import { + kStateSymbol, + newHandle, +} from "internal:deno_node/polyfills/internal/dgram.ts"; +import type { SocketType } from "internal:deno_node/polyfills/internal/dgram.ts"; +import { + asyncIdSymbol, + defaultTriggerAsyncIdScope, + ownerSymbol, +} from "internal:deno_node/polyfills/internal/async_hooks.ts"; +import { + SendWrap, + UDP, +} from "internal:deno_node/polyfills/internal_binding/udp_wrap.ts"; +import { + isInt32, + validateAbortSignal, + validateNumber, + validatePort, + validateString, +} from "internal:deno_node/polyfills/internal/validators.mjs"; +import { guessHandleType } from "internal:deno_node/polyfills/internal_binding/util.ts"; +import { os } from "internal:deno_node/polyfills/internal_binding/constants.ts"; +import { nextTick } from "internal:deno_node/polyfills/process.ts"; +import { channel } from "internal:deno_node/polyfills/diagnostics_channel.ts"; +import { isArrayBufferView } from "internal:deno_node/polyfills/internal/util/types.ts"; + +const { UV_UDP_REUSEADDR, UV_UDP_IPV6ONLY } = os; + +const udpSocketChannel = channel("udp.socket"); + +const BIND_STATE_UNBOUND = 0; +const BIND_STATE_BINDING = 1; +const BIND_STATE_BOUND = 2; + +const CONNECT_STATE_DISCONNECTED = 0; +const CONNECT_STATE_CONNECTING = 1; +const CONNECT_STATE_CONNECTED = 2; + +const RECV_BUFFER = true; +const SEND_BUFFER = false; + +export interface AddressInfo { + address: string; + family: number; + port: number; +} + +export type MessageType = string | Uint8Array | Buffer | DataView; + +export type RemoteInfo = { + address: string; + family: "IPv4" | "IPv6"; + port: number; + size?: number; +}; + +export interface BindOptions { + port?: number; + address?: string; + exclusive?: boolean; + fd?: number; +} + +export interface SocketOptions extends Abortable { + type: SocketType; + reuseAddr?: boolean; + /** + * @default false + */ + ipv6Only?: boolean; + recvBufferSize?: number; + sendBufferSize?: number; + lookup?: typeof defaultLookup; +} + +interface SocketInternalState { + handle: UDP | null; + receiving: boolean; + bindState: + | typeof BIND_STATE_UNBOUND + | typeof BIND_STATE_BINDING + | typeof BIND_STATE_BOUND; + connectState: + | typeof CONNECT_STATE_DISCONNECTED + | typeof CONNECT_STATE_CONNECTING + | typeof CONNECT_STATE_CONNECTED; + queue?: Array<() => void>; + reuseAddr?: boolean; + ipv6Only?: boolean; + recvBufferSize?: number; + sendBufferSize?: number; +} + +const isSocketOptions = ( + socketOption: unknown, +): socketOption is SocketOptions => + socketOption !== null && typeof socketOption === "object"; + +const isUdpHandle = (handle: unknown): handle is UDP => + handle !== null && + typeof handle === "object" && + typeof (handle as UDP).recvStart === "function"; + +const isBindOptions = (options: unknown): options is BindOptions => + options !== null && typeof options === "object"; + +/** + * Encapsulates the datagram functionality. + * + * New instances of `dgram.Socket` are created using `createSocket`. + * The `new` keyword is not to be used to create `dgram.Socket` instances. + */ +export class Socket extends EventEmitter { + [asyncIdSymbol]!: number; + [kStateSymbol]!: SocketInternalState; + + type!: SocketType; + + constructor( + type: SocketType | SocketOptions, + listener?: (msg: Buffer, rinfo: RemoteInfo) => void, + ) { + super(); + + let lookup; + let recvBufferSize; + let sendBufferSize; + + let options: SocketOptions | undefined; + + if (isSocketOptions(type)) { + options = type; + type = options.type; + lookup = options.lookup; + recvBufferSize = options.recvBufferSize; + sendBufferSize = options.sendBufferSize; + } + + const handle = newHandle(type, lookup); + handle[ownerSymbol] = this; + + this[asyncIdSymbol] = handle.getAsyncId(); + this.type = type; + + if (typeof listener === "function") { + this.on("message", listener); + } + + this[kStateSymbol] = { + handle, + receiving: false, + bindState: BIND_STATE_UNBOUND, + connectState: CONNECT_STATE_DISCONNECTED, + queue: undefined, + reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. + ipv6Only: options && options.ipv6Only, + recvBufferSize, + sendBufferSize, + }; + + if (options?.signal !== undefined) { + const { signal } = options; + + validateAbortSignal(signal, "options.signal"); + + const onAborted = () => { + this.close(); + }; + + if (signal.aborted) { + onAborted(); + } else { + signal.addEventListener("abort", onAborted); + + this.once( + "close", + () => signal.removeEventListener("abort", onAborted), + ); + } + } + + if (udpSocketChannel.hasSubscribers) { + udpSocketChannel.publish({ + socket: this, + }); + } + } + + /** + * Tells the kernel to join a multicast group at the given `multicastAddress` + * and `multicastInterface` using the `IP_ADD_MEMBERSHIP` socket option. If + * the`multicastInterface` argument is not specified, the operating system + * will choose one interface and will add membership to it. To add membership + * to every available interface, call `addMembership` multiple times, once + * per interface. + * + * When called on an unbound socket, this method will implicitly bind to a + * random port, listening on all interfaces. + * + * When sharing a UDP socket across multiple `cluster` workers, the + * `socket.addMembership()` function must be called only once or an + * `EADDRINUSE` error will occur: + * + * ```js + * import cluster from "internal:deno_node/polyfills/cluster"; + * import dgram from "internal:deno_node/polyfills/dgram"; + * + * if (cluster.isPrimary) { + * cluster.fork(); // Works ok. + * cluster.fork(); // Fails with EADDRINUSE. + * } else { + * const s = dgram.createSocket('udp4'); + * s.bind(1234, () => { + * s.addMembership('224.0.0.114'); + * }); + * } + * ``` + */ + addMembership(multicastAddress: string, interfaceAddress?: string) { + healthCheck(this); + + if (!multicastAddress) { + throw new ERR_MISSING_ARGS("multicastAddress"); + } + + const { handle } = this[kStateSymbol]; + const err = handle!.addMembership(multicastAddress, interfaceAddress); + + if (err) { + throw errnoException(err, "addMembership"); + } + } + + /** + * Tells the kernel to join a source-specific multicast channel at the given + * `sourceAddress` and `groupAddress`, using the `multicastInterface` with + * the `IP_ADD_SOURCE_MEMBERSHIP` socket option. If the `multicastInterface` + * argument is not specified, the operating system will choose one interface + * and will add membership to it. To add membership to every available + * interface, call `socket.addSourceSpecificMembership()` multiple times, + * once per interface. + * + * When called on an unbound socket, this method will implicitly bind to a + * random port, listening on all interfaces. + */ + addSourceSpecificMembership( + sourceAddress: string, + groupAddress: string, + interfaceAddress?: string, + ) { + healthCheck(this); + + validateString(sourceAddress, "sourceAddress"); + validateString(groupAddress, "groupAddress"); + + const err = this[kStateSymbol].handle!.addSourceSpecificMembership( + sourceAddress, + groupAddress, + interfaceAddress, + ); + + if (err) { + throw errnoException(err, "addSourceSpecificMembership"); + } + } + + /** + * Returns an object containing the address information for a socket. + * For UDP sockets, this object will contain `address`, `family` and `port`properties. + * + * This method throws `EBADF` if called on an unbound socket. + */ + address(): AddressInfo { + healthCheck(this); + + const out = {}; + const err = this[kStateSymbol].handle!.getsockname(out); + + if (err) { + throw errnoException(err, "getsockname"); + } + + return out as AddressInfo; + } + + /** + * For UDP sockets, causes the `dgram.Socket` to listen for datagram + * messages on a named `port` and optional `address`. If `port` is not + * specified or is `0`, the operating system will attempt to bind to a + * random port. If `address` is not specified, the operating system will + * attempt to listen on all addresses. Once binding is complete, a + * `'listening'` event is emitted and the optional `callback` function is + * called. + * + * Specifying both a `'listening'` event listener and passing a `callback` to + * the `socket.bind()` method is not harmful but not very useful. + * + * A bound datagram socket keeps the Node.js process running to receive + * datagram messages. + * + * If binding fails, an `'error'` event is generated. In rare case (e.g. + * attempting to bind with a closed socket), an `Error` may be thrown. + * + * Example of a UDP server listening on port 41234: + * + * ```js + * import dgram from "internal:deno_node/polyfills/dgram"; + * + * const server = dgram.createSocket('udp4'); + * + * server.on('error', (err) => { + * console.log(`server error:\n${err.stack}`); + * server.close(); + * }); + * + * server.on('message', (msg, rinfo) => { + * console.log(`server got: ${msg} from ${rinfo.address}:${rinfo.port}`); + * }); + * + * server.on('listening', () => { + * const address = server.address(); + * console.log(`server listening ${address.address}:${address.port}`); + * }); + * + * server.bind(41234); + * // Prints: server listening 0.0.0.0:41234 + * ``` + * + * @param callback with no parameters. Called when binding is complete. + */ + bind(port?: number, address?: string, callback?: () => void): this; + bind(port: number, callback?: () => void): this; + bind(callback: () => void): this; + bind(options: BindOptions, callback?: () => void): this; + bind(port_?: unknown, address_?: unknown /* callback */): this { + let port = typeof port_ === "function" ? null : port_; + + healthCheck(this); + + const state = this[kStateSymbol]; + + if (state.bindState !== BIND_STATE_UNBOUND) { + throw new ERR_SOCKET_ALREADY_BOUND(); + } + + state.bindState = BIND_STATE_BINDING; + + const cb = arguments.length && arguments[arguments.length - 1]; + + if (typeof cb === "function") { + // deno-lint-ignore no-inner-declarations + function removeListeners(this: Socket) { + this.removeListener("error", removeListeners); + this.removeListener("listening", onListening); + } + + // deno-lint-ignore no-inner-declarations + function onListening(this: Socket) { + removeListeners.call(this); + cb.call(this); + } + + this.on("error", removeListeners); + this.on("listening", onListening); + } + + if (isUdpHandle(port)) { + replaceHandle(this, port); + startListening(this); + + return this; + } + + // Open an existing fd instead of creating a new one. + if (isBindOptions(port) && isInt32(port.fd!) && port.fd! > 0) { + const fd = port.fd!; + const state = this[kStateSymbol]; + + // TODO(cmorten): here we deviate somewhat from the Node implementation which + // makes use of the https://nodejs.org/api/cluster.html module to run servers + // across a "cluster" of Node processes to take advantage of multi-core + // systems. + // + // Though Deno has has a Worker capability from which we could simulate this, + // for now we assert that we are _always_ on the primary process. + + const type = guessHandleType(fd); + + if (type !== "UDP") { + throw new ERR_INVALID_FD_TYPE(type); + } + + const err = state.handle!.open(fd); + + if (err) { + throw errnoException(err, "open"); + } + + startListening(this); + + return this; + } + + let address: string; + + if (isBindOptions(port)) { + address = port.address || ""; + port = port.port; + } else { + address = typeof address_ === "function" ? "" : (address_ as string); + } + + // Defaulting address for bind to all interfaces + if (!address) { + if (this.type === "udp4") { + address = "0.0.0.0"; + } else { + address = "::"; + } + } + + // Resolve address first + state.handle!.lookup(address, (lookupError, ip) => { + if (lookupError) { + state.bindState = BIND_STATE_UNBOUND; + this.emit("error", lookupError); + + return; + } + + let flags: number | undefined = 0; + + if (state.reuseAddr) { + flags |= UV_UDP_REUSEADDR; + } + if (state.ipv6Only) { + flags |= UV_UDP_IPV6ONLY!; + } + + // TODO(cmorten): here we deviate somewhat from the Node implementation which + // makes use of the https://nodejs.org/api/cluster.html module to run servers + // across a "cluster" of Node processes to take advantage of multi-core + // systems. + // + // Though Deno has has a Worker capability from which we could simulate this, + // for now we assert that we are _always_ on the primary process. + + if (!state.handle) { + return; // Handle has been closed in the mean time + } + + const err = state.handle.bind(ip, port as number || 0, flags); + + if (err) { + const ex = exceptionWithHostPort(err, "bind", ip, port as number); + state.bindState = BIND_STATE_UNBOUND; + this.emit("error", ex); + + // Todo(@bartlomieju): close? + return; + } + + startListening(this); + }); + + return this; + } + + /** + * Close the underlying socket and stop listening for data on it. If a + * callback is provided, it is added as a listener for the `'close'` event. + * + * @param callback Called when the socket has been closed. + */ + close(callback?: () => void): this { + const state = this[kStateSymbol]; + const queue = state.queue; + + if (typeof callback === "function") { + this.on("close", callback); + } + + if (queue !== undefined) { + queue.push(this.close.bind(this)); + + return this; + } + + healthCheck(this); + stopReceiving(this); + + state.handle!.close(); + state.handle = null; + + defaultTriggerAsyncIdScope( + this[asyncIdSymbol], + nextTick, + socketCloseNT, + this, + ); + + return this; + } + + /** + * Associates the `dgram.Socket` to a remote address and port. Every + * message sent by this handle is automatically sent to that destination. + * Also, the socket will only receive messages from that remote peer. + * Trying to call `connect()` on an already connected socket will result + * in an `ERR_SOCKET_DGRAM_IS_CONNECTED` exception. If `address` is not + * provided, `'127.0.0.1'` (for `udp4` sockets) or `'::1'` (for `udp6` sockets) + * will be used by default. Once the connection is complete, a `'connect'` event + * is emitted and the optional `callback` function is called. In case of failure, + * the `callback` is called or, failing this, an `'error'` event is emitted. + * + * @param callback Called when the connection is completed or on error. + */ + connect( + port: number, + address?: string, + callback?: (err?: ErrnoException) => void, + ): void; + connect(port: number, callback: (err?: ErrnoException) => void): void; + connect(port: number, address?: unknown, callback?: unknown) { + port = validatePort(port, "Port", false); + + if (typeof address === "function") { + callback = address; + address = ""; + } else if (address === undefined) { + address = ""; + } + + validateString(address, "address"); + + const state = this[kStateSymbol]; + + if (state.connectState !== CONNECT_STATE_DISCONNECTED) { + throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); + } + + state.connectState = CONNECT_STATE_CONNECTING; + + if (state.bindState === BIND_STATE_UNBOUND) { + this.bind({ port: 0, exclusive: true }); + } + + if (state.bindState !== BIND_STATE_BOUND) { + enqueue( + this, + _connect.bind( + this, + port, + address as string, + callback as (err?: ErrnoException) => void, + ), + ); + + return; + } + + Reflect.apply(_connect, this, [port, address, callback]); + } + + /** + * A synchronous function that disassociates a connected `dgram.Socket` from + * its remote address. Trying to call `disconnect()` on an unbound or already + * disconnected socket will result in an `ERR_SOCKET_DGRAM_NOT_CONNECTED` + * exception. + */ + disconnect() { + const state = this[kStateSymbol]; + + if (state.connectState !== CONNECT_STATE_CONNECTED) { + throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); + } + + const err = state.handle!.disconnect(); + + if (err) { + throw errnoException(err, "connect"); + } else { + state.connectState = CONNECT_STATE_DISCONNECTED; + } + } + + /** + * Instructs the kernel to leave a multicast group at `multicastAddress` + * using the `IP_DROP_MEMBERSHIP` socket option. This method is automatically + * called by the kernel when the socket is closed or the process terminates, + * so most apps will never have reason to call this. + * + * If `multicastInterface` is not specified, the operating system will + * attempt to drop membership on all valid interfaces. + */ + dropMembership(multicastAddress: string, interfaceAddress?: string) { + healthCheck(this); + + if (!multicastAddress) { + throw new ERR_MISSING_ARGS("multicastAddress"); + } + + const err = this[kStateSymbol].handle!.dropMembership( + multicastAddress, + interfaceAddress, + ); + + if (err) { + throw errnoException(err, "dropMembership"); + } + } + + /** + * Instructs the kernel to leave a source-specific multicast channel at the + * given `sourceAddress` and `groupAddress` using the + * `IP_DROP_SOURCE_MEMBERSHIP` socket option. This method is automatically + * called by the kernel when the socket is closed or the process terminates, + * so most apps will never have reason to call this. + * + * If `multicastInterface` is not specified, the operating system will + * attempt to drop membership on all valid interfaces. + */ + dropSourceSpecificMembership( + sourceAddress: string, + groupAddress: string, + interfaceAddress?: string, + ) { + healthCheck(this); + + validateString(sourceAddress, "sourceAddress"); + validateString(groupAddress, "groupAddress"); + + const err = this[kStateSymbol].handle!.dropSourceSpecificMembership( + sourceAddress, + groupAddress, + interfaceAddress, + ); + + if (err) { + throw errnoException(err, "dropSourceSpecificMembership"); + } + } + + /** + * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound + * socket. + * + * @return the `SO_RCVBUF` socket receive buffer size in bytes. + */ + getRecvBufferSize(): number { + return bufferSize(this, 0, RECV_BUFFER); + } + + /** + * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound + * socket. + * + * @return the `SO_SNDBUF` socket send buffer size in bytes. + */ + getSendBufferSize(): number { + return bufferSize(this, 0, SEND_BUFFER); + } + + /** + * By default, binding a socket will cause it to block the Node.js process + * from exiting as long as the socket is open. The `socket.unref()` method + * can be used to exclude the socket from the reference counting that keeps + * the Node.js process active. The `socket.ref()` method adds the socket back + * to the reference counting and restores the default behavior. + * + * Calling `socket.ref()` multiples times will have no additional effect. + * + * The `socket.ref()` method returns a reference to the socket so calls can + * be chained. + */ + ref(): this { + const handle = this[kStateSymbol].handle; + + if (handle) { + handle.ref(); + } + + return this; + } + + /** + * Returns an object containing the `address`, `family`, and `port` of the + * remote endpoint. This method throws an `ERR_SOCKET_DGRAM_NOT_CONNECTED` + * exception if the socket is not connected. + */ + remoteAddress(): AddressInfo { + healthCheck(this); + + const state = this[kStateSymbol]; + + if (state.connectState !== CONNECT_STATE_CONNECTED) { + throw new ERR_SOCKET_DGRAM_NOT_CONNECTED(); + } + + const out = {}; + const err = state.handle!.getpeername(out); + + if (err) { + throw errnoException(err, "getpeername"); + } + + return out as AddressInfo; + } + + /** + * Broadcasts a datagram on the socket. + * For connectionless sockets, the destination `port` and `address` must be + * specified. Connected sockets, on the other hand, will use their associated + * remote endpoint, so the `port` and `address` arguments must not be set. + * + * The `msg` argument contains the message to be sent. + * Depending on its type, different behavior can apply. If `msg` is a + * `Buffer`, any `TypedArray` or a `DataView`, + * the `offset` and `length` specify the offset within the `Buffer` where the + * message begins and the number of bytes in the message, respectively. + * If `msg` is a `String`, then it is automatically converted to a `Buffer` + * with `'utf8'` encoding. With messages that contain multi-byte characters, + * `offset` and `length` will be calculated with respect to `byte length` and + * not the character position. If `msg` is an array, `offset` and `length` + * must not be specified. + * + * The `address` argument is a string. If the value of `address` is a host + * name, DNS will be used to resolve the address of the host. If `address` + * is not provided or otherwise nullish, `'127.0.0.1'` (for `udp4` sockets) + * or `'::1'`(for `udp6` sockets) will be used by default. + * + * If the socket has not been previously bound with a call to `bind`, the + * socket is assigned a random port number and is bound to the "all + * interfaces" address (`'0.0.0.0'` for `udp4` sockets, `'::0'` for `udp6` + * sockets.) + * + * An optional `callback` function may be specified to as a way of + * reporting DNS errors or for determining when it is safe to reuse the `buf` + * object. DNS lookups delay the time to send for at least one tick of the + * Node.js event loop. + * + * The only way to know for sure that the datagram has been sent is by using + * a `callback`. If an error occurs and a `callback` is given, the error will + * be passed as the first argument to the `callback`. If a `callback` is not + * given, the error is emitted as an `'error'` event on the `socket` object. + * + * Offset and length are optional but both _must_ be set if either are used. + * They are supported only when the first argument is a `Buffer`, a + * `TypedArray`, or a `DataView`. + * + * This method throws `ERR_SOCKET_BAD_PORT` if called on an unbound socket. + * + * Example of sending a UDP packet to a port on `localhost`; + * + * ```js + * import dgram from "internal:deno_node/polyfills/dgram"; + * import { Buffer } from "internal:deno_node/polyfills/buffer"; + * + * const message = Buffer.from('Some bytes'); + * const client = dgram.createSocket('udp4'); + * client.send(message, 41234, 'localhost', (err) => { + * client.close(); + * }); + * ``` + * + * Example of sending a UDP packet composed of multiple buffers to a port on + * `127.0.0.1`; + * + * ```js + * import dgram from "internal:deno_node/polyfills/dgram"; + * import { Buffer } from "internal:deno_node/polyfills/buffer"; + * + * const buf1 = Buffer.from('Some '); + * const buf2 = Buffer.from('bytes'); + * const client = dgram.createSocket('udp4'); + * client.send([buf1, buf2], 41234, (err) => { + * client.close(); + * }); + * ``` + * + * Sending multiple buffers might be faster or slower depending on the + * application and operating system. Run benchmarks to + * determine the optimal strategy on a case-by-case basis. Generally + * speaking, however, sending multiple buffers is faster. + * + * Example of sending a UDP packet using a socket connected to a port on + * `localhost`: + * + * ```js + * import dgram from "internal:deno_node/polyfills/dgram"; + * import { Buffer } from "internal:deno_node/polyfills/buffer"; + * + * const message = Buffer.from('Some bytes'); + * const client = dgram.createSocket('udp4'); + * client.connect(41234, 'localhost', (err) => { + * client.send(message, (err) => { + * client.close(); + * }); + * }); + * ``` + * + * @param msg Message to be sent. + * @param offset Offset in the buffer where the message starts. + * @param length Number of bytes in the message. + * @param port Destination port. + * @param address Destination host name or IP address. + * @param callback Called when the message has been sent. + */ + send( + msg: MessageType | ReadonlyArray<MessageType>, + port?: number, + address?: string, + callback?: (error: ErrnoException | null, bytes?: number) => void, + ): void; + send( + msg: MessageType | ReadonlyArray<MessageType>, + port?: number, + callback?: (error: ErrnoException | null, bytes?: number) => void, + ): void; + send( + msg: MessageType | ReadonlyArray<MessageType>, + callback?: (error: ErrnoException | null, bytes?: number) => void, + ): void; + send( + msg: MessageType, + offset: number, + length: number, + port?: number, + address?: string, + callback?: (error: ErrnoException | null, bytes?: number) => void, + ): void; + send( + msg: MessageType, + offset: number, + length: number, + port?: number, + callback?: (error: ErrnoException | null, bytes?: number) => void, + ): void; + send( + msg: MessageType, + offset: number, + length: number, + callback?: (error: ErrnoException | null, bytes?: number) => void, + ): void; + send( + buffer: unknown, + offset?: unknown, + length?: unknown, + port?: unknown, + address?: unknown, + callback?: unknown, + ) { + let list: MessageType[] | null; + + const state = this[kStateSymbol]; + const connected = state.connectState === CONNECT_STATE_CONNECTED; + + if (!connected) { + if (address || (port && typeof port !== "function")) { + buffer = sliceBuffer( + buffer as MessageType, + offset as number, + length as number, + ); + } else { + callback = port; + port = offset; + address = length; + } + } else { + if (typeof length === "number") { + buffer = sliceBuffer(buffer as MessageType, offset as number, length); + + if (typeof port === "function") { + callback = port; + port = null; + } + } else { + callback = offset; + } + + if (port || address) { + throw new ERR_SOCKET_DGRAM_IS_CONNECTED(); + } + } + + if (!Array.isArray(buffer)) { + if (typeof buffer === "string") { + list = [Buffer.from(buffer)]; + } else if (!isArrayBufferView(buffer)) { + throw new ERR_INVALID_ARG_TYPE( + "buffer", + ["Buffer", "TypedArray", "DataView", "string"], + buffer, + ); + } else { + list = [buffer as MessageType]; + } + } else if (!(list = fixBufferList(buffer))) { + throw new ERR_INVALID_ARG_TYPE( + "buffer list arguments", + ["Buffer", "TypedArray", "DataView", "string"], + buffer, + ); + } + + if (!connected) { + port = validatePort(port, "Port", false); + } + + // Normalize callback so it's either a function or undefined but not anything + // else. + if (typeof callback !== "function") { + callback = undefined; + } + + if (typeof address === "function") { + callback = address; + address = undefined; + } else if (address && typeof address !== "string") { + throw new ERR_INVALID_ARG_TYPE("address", ["string", "falsy"], address); + } + + healthCheck(this); + + if (state.bindState === BIND_STATE_UNBOUND) { + this.bind({ port: 0, exclusive: true }); + } + + if (list.length === 0) { + list.push(Buffer.alloc(0)); + } + + // If the socket hasn't been bound yet, push the outbound packet onto the + // send queue and send after binding is complete. + if (state.bindState !== BIND_STATE_BOUND) { + // @ts-ignore mapping unknowns back onto themselves doesn't type nicely + enqueue(this, this.send.bind(this, list, port, address, callback)); + + return; + } + + const afterDns = (ex: ErrnoException | null, ip: string) => { + defaultTriggerAsyncIdScope( + this[asyncIdSymbol], + doSend, + ex, + this, + ip, + list, + address, + port, + callback, + ); + }; + + if (!connected) { + state.handle!.lookup(address as string, afterDns); + } else { + afterDns(null, ""); + } + } + + /** + * Sets or clears the `SO_BROADCAST` socket option. When set to `true`, UDP + * packets may be sent to a local interface's broadcast address. + * + * This method throws `EBADF` if called on an unbound socket. + */ + setBroadcast(arg: boolean) { + const err = this[kStateSymbol].handle!.setBroadcast(arg ? 1 : 0); + + if (err) { + throw errnoException(err, "setBroadcast"); + } + } + + /** + * _All references to scope in this section are referring to [IPv6 Zone Indices](https://en.wikipedia.org/wiki/IPv6_address#Scoped_literal_IPv6_addresses), which are defined by [RFC + * 4007](https://tools.ietf.org/html/rfc4007). In string form, an IP_ + * _with a scope index is written as `'IP%scope'` where scope is an interface name_ + * _or interface number._ + * + * Sets the default outgoing multicast interface of the socket to a chosen + * interface or back to system interface selection. The `multicastInterface` must + * be a valid string representation of an IP from the socket's family. + * + * For IPv4 sockets, this should be the IP configured for the desired physical + * interface. All packets sent to multicast on the socket will be sent on the + * interface determined by the most recent successful use of this call. + * + * For IPv6 sockets, `multicastInterface` should include a scope to indicate the + * interface as in the examples that follow. In IPv6, individual `send` calls can + * also use explicit scope in addresses, so only packets sent to a multicast + * address without specifying an explicit scope are affected by the most recent + * successful use of this call. + * + * This method throws `EBADF` if called on an unbound socket. + * + * #### Example: IPv6 outgoing multicast interface + * + * On most systems, where scope format uses the interface name: + * + * ```js + * const socket = dgram.createSocket('udp6'); + * + * socket.bind(1234, () => { + * socket.setMulticastInterface('::%eth1'); + * }); + * ``` + * + * On Windows, where scope format uses an interface number: + * + * ```js + * const socket = dgram.createSocket('udp6'); + * + * socket.bind(1234, () => { + * socket.setMulticastInterface('::%2'); + * }); + * ``` + * + * #### Example: IPv4 outgoing multicast interface + * + * All systems use an IP of the host on the desired physical interface: + * + * ```js + * const socket = dgram.createSocket('udp4'); + * + * socket.bind(1234, () => { + * socket.setMulticastInterface('10.0.0.2'); + * }); + * ``` + */ + setMulticastInterface(interfaceAddress: string) { + healthCheck(this); + validateString(interfaceAddress, "interfaceAddress"); + + const err = this[kStateSymbol].handle!.setMulticastInterface( + interfaceAddress, + ); + + if (err) { + throw errnoException(err, "setMulticastInterface"); + } + } + + /** + * Sets or clears the `IP_MULTICAST_LOOP` socket option. When set to `true`, + * multicast packets will also be received on the local interface. + * + * This method throws `EBADF` if called on an unbound socket. + */ + setMulticastLoopback(arg: boolean): typeof arg { + const err = this[kStateSymbol].handle!.setMulticastLoopback(arg ? 1 : 0); + + if (err) { + throw errnoException(err, "setMulticastLoopback"); + } + + return arg; // 0.4 compatibility + } + + /** + * Sets the `IP_MULTICAST_TTL` socket option. While TTL generally stands for + * "Time to Live", in this context it specifies the number of IP hops that a + * packet is allowed to travel through, specifically for multicast traffic. Each + * router or gateway that forwards a packet decrements the TTL. If the TTL is + * decremented to 0 by a router, it will not be forwarded. + * + * The `ttl` argument may be between 0 and 255\. The default on most systems is `1`. + * + * This method throws `EBADF` if called on an unbound socket. + */ + setMulticastTTL(ttl: number): typeof ttl { + validateNumber(ttl, "ttl"); + + const err = this[kStateSymbol].handle!.setMulticastTTL(ttl); + + if (err) { + throw errnoException(err, "setMulticastTTL"); + } + + return ttl; + } + + /** + * Sets the `SO_RCVBUF` socket option. Sets the maximum socket receive buffer + * in bytes. + * + * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound socket. + */ + setRecvBufferSize(size: number) { + bufferSize(this, size, RECV_BUFFER); + } + + /** + * Sets the `SO_SNDBUF` socket option. Sets the maximum socket send buffer + * in bytes. + * + * This method throws `ERR_SOCKET_BUFFER_SIZE` if called on an unbound socket. + */ + setSendBufferSize(size: number) { + bufferSize(this, size, SEND_BUFFER); + } + + /** + * Sets the `IP_TTL` socket option. While TTL generally stands for "Time to Live", + * in this context it specifies the number of IP hops that a packet is allowed to + * travel through. Each router or gateway that forwards a packet decrements the + * TTL. If the TTL is decremented to 0 by a router, it will not be forwarded. + * Changing TTL values is typically done for network probes or when multicasting. + * + * The `ttl` argument may be between between 1 and 255\. The default on most systems + * is 64. + * + * This method throws `EBADF` if called on an unbound socket. + */ + setTTL(ttl: number): typeof ttl { + validateNumber(ttl, "ttl"); + + const err = this[kStateSymbol].handle!.setTTL(ttl); + + if (err) { + throw errnoException(err, "setTTL"); + } + + return ttl; + } + + /** + * By default, binding a socket will cause it to block the Node.js process from + * exiting as long as the socket is open. The `socket.unref()` method can be used + * to exclude the socket from the reference counting that keeps the Node.js + * process active, allowing the process to exit even if the socket is still + * listening. + * + * Calling `socket.unref()` multiple times will have no addition effect. + * + * The `socket.unref()` method returns a reference to the socket so calls can be + * chained. + */ + unref(): this { + const handle = this[kStateSymbol].handle; + + if (handle) { + handle.unref(); + } + + return this; + } +} + +/** + * Creates a `dgram.Socket` object. Once the socket is created, calling + * `socket.bind()` will instruct the socket to begin listening for datagram + * messages. When `address` and `port` are not passed to `socket.bind()` the + * method will bind the socket to the "all interfaces" address on a random port + * (it does the right thing for both `udp4` and `udp6` sockets). The bound + * address and port can be retrieved using `socket.address().address` and + * `socket.address().port`. + * + * If the `signal` option is enabled, calling `.abort()` on the corresponding + * `AbortController` is similar to calling `.close()` on the socket: + * + * ```js + * const controller = new AbortController(); + * const { signal } = controller; + * const server = dgram.createSocket({ type: 'udp4', signal }); + * server.on('message', (msg, rinfo) => { + * console.log(`server got: ${msg} from ${rinfo.address}:${rinfo.port}`); + * }); + * // Later, when you want to close the server. + * controller.abort(); + * ``` + * + * @param options + * @param callback Attached as a listener for `'message'` events. Optional. + */ +export function createSocket( + type: SocketType, + listener?: (msg: Buffer, rinfo: RemoteInfo) => void, +): Socket; +export function createSocket( + type: SocketOptions, + listener?: (msg: Buffer, rinfo: RemoteInfo) => void, +): Socket; +export function createSocket( + type: SocketType | SocketOptions, + listener?: (msg: Buffer, rinfo: RemoteInfo) => void, +): Socket { + return new Socket(type, listener); +} + +function startListening(socket: Socket) { + const state = socket[kStateSymbol]; + + state.handle!.onmessage = onMessage; + // Todo(@bartlomieju): handle errors + state.handle!.recvStart(); + state.receiving = true; + state.bindState = BIND_STATE_BOUND; + + if (state.recvBufferSize) { + bufferSize(socket, state.recvBufferSize, RECV_BUFFER); + } + + if (state.sendBufferSize) { + bufferSize(socket, state.sendBufferSize, SEND_BUFFER); + } + + socket.emit("listening"); +} + +function replaceHandle(self: Socket, newHandle: UDP) { + const state = self[kStateSymbol]; + const oldHandle = state.handle!; + + // Set up the handle that we got from primary. + newHandle.lookup = oldHandle.lookup; + newHandle.bind = oldHandle.bind; + newHandle.send = oldHandle.send; + newHandle[ownerSymbol] = self; + + // Replace the existing handle by the handle we got from primary. + oldHandle.close(); + state.handle = newHandle; +} + +function bufferSize(self: Socket, size: number, buffer: boolean): number { + if (size >>> 0 !== size) { + throw new ERR_SOCKET_BAD_BUFFER_SIZE(); + } + + const ctx = {}; + const ret = self[kStateSymbol].handle!.bufferSize(size, buffer, ctx); + + if (ret === undefined) { + throw new ERR_SOCKET_BUFFER_SIZE(ctx as NodeSystemErrorCtx); + } + + return ret; +} + +function socketCloseNT(self: Socket) { + self.emit("close"); +} + +function healthCheck(socket: Socket) { + if (!socket[kStateSymbol].handle) { + // Error message from dgram_legacy.js. + throw new ERR_SOCKET_DGRAM_NOT_RUNNING(); + } +} + +function stopReceiving(socket: Socket) { + const state = socket[kStateSymbol]; + + if (!state.receiving) { + return; + } + + state.handle!.recvStop(); + state.receiving = false; +} + +function onMessage( + nread: number, + handle: UDP, + buf?: Buffer, + rinfo?: RemoteInfo, +) { + const self = handle[ownerSymbol] as Socket; + + if (nread < 0) { + self.emit("error", errnoException(nread, "recvmsg")); + + return; + } + + rinfo!.size = buf!.length; // compatibility + + self.emit("message", buf, rinfo); +} + +function sliceBuffer(buffer: MessageType, offset: number, length: number) { + if (typeof buffer === "string") { + buffer = Buffer.from(buffer); + } else if (!isArrayBufferView(buffer)) { + throw new ERR_INVALID_ARG_TYPE( + "buffer", + ["Buffer", "TypedArray", "DataView", "string"], + buffer, + ); + } + + offset = offset >>> 0; + length = length >>> 0; + + if (offset > buffer.byteLength) { + throw new ERR_BUFFER_OUT_OF_BOUNDS("offset"); + } + + if (offset + length > buffer.byteLength) { + throw new ERR_BUFFER_OUT_OF_BOUNDS("length"); + } + + return Buffer.from(buffer.buffer, buffer.byteOffset + offset, length); +} + +function fixBufferList( + list: ReadonlyArray<MessageType>, +): Array<MessageType> | null { + const newList = new Array(list.length); + + for (let i = 0, l = list.length; i < l; i++) { + const buf = list[i]; + + if (typeof buf === "string") { + newList[i] = Buffer.from(buf); + } else if (!isArrayBufferView(buf)) { + return null; + } else { + newList[i] = Buffer.from(buf.buffer, buf.byteOffset, buf.byteLength); + } + } + + return newList; +} + +function enqueue(self: Socket, toEnqueue: () => void) { + const state = self[kStateSymbol]; + + // If the send queue hasn't been initialized yet, do it, and install an + // event handler that flushes the send queue after binding is done. + if (state.queue === undefined) { + state.queue = []; + + self.once(EventEmitter.errorMonitor, onListenError); + self.once("listening", onListenSuccess); + } + + state.queue.push(toEnqueue); +} + +function onListenSuccess(this: Socket) { + this.removeListener(EventEmitter.errorMonitor, onListenError); + clearQueue.call(this); +} + +function onListenError(this: Socket) { + this.removeListener("listening", onListenSuccess); + this[kStateSymbol].queue = undefined; +} + +function clearQueue(this: Socket) { + const state = this[kStateSymbol]; + const queue = state.queue; + state.queue = undefined; + + // Flush the send queue. + for (const queueEntry of queue!) { + queueEntry(); + } +} + +function _connect( + this: Socket, + port: number, + address: string, + callback: (err?: ErrnoException) => void, +) { + const state = this[kStateSymbol]; + + if (callback) { + this.once("connect", callback); + } + + const afterDns = (ex: ErrnoException | null, ip: string) => { + defaultTriggerAsyncIdScope( + this[asyncIdSymbol], + doConnect, + ex, + this, + ip, + address, + port, + callback, + ); + }; + + state.handle!.lookup(address, afterDns); +} + +function doConnect( + ex: ErrnoException | null, + self: Socket, + ip: string, + address: string, + port: number, + callback: (err?: ErrnoException) => void, +) { + const state = self[kStateSymbol]; + + if (!state.handle) { + return; + } + + if (!ex) { + const err = state.handle.connect(ip, port); + + if (err) { + ex = exceptionWithHostPort(err, "connect", address, port); + } + } + + if (ex) { + state.connectState = CONNECT_STATE_DISCONNECTED; + + return nextTick(() => { + if (callback) { + self.removeListener("connect", callback); + + callback(ex!); + } else { + self.emit("error", ex); + } + }); + } + + state.connectState = CONNECT_STATE_CONNECTED; + + nextTick(() => self.emit("connect")); +} + +function doSend( + ex: ErrnoException | null, + self: Socket, + ip: string, + list: MessageType[], + address: string, + port: number, + callback?: (error: ErrnoException | null, bytes?: number) => void, +) { + const state = self[kStateSymbol]; + + if (ex) { + if (typeof callback === "function") { + nextTick(callback, ex); + + return; + } + + nextTick(() => self.emit("error", ex)); + + return; + } else if (!state.handle) { + return; + } + + const req = new SendWrap(); + req.list = list; // Keep reference alive. + req.address = address; + req.port = port; + + if (callback) { + req.callback = callback; + req.oncomplete = afterSend; + } + + let err; + + if (port) { + err = state.handle.send(req, list, list.length, port, ip, !!callback); + } else { + err = state.handle.send(req, list, list.length, !!callback); + } + + if (err >= 1) { + // Synchronous finish. The return code is msg_length + 1 so that we can + // distinguish between synchronous success and asynchronous success. + if (callback) { + nextTick(callback, null, err - 1); + } + + return; + } + + if (err && callback) { + // Don't emit as error, dgram_legacy.js compatibility + const ex = exceptionWithHostPort(err, "send", address, port); + + nextTick(callback, ex); + } +} + +function afterSend(this: SendWrap, err: number | null, sent?: number) { + let ex: ErrnoException | null; + + if (err) { + ex = exceptionWithHostPort(err, "send", this.address, this.port); + } else { + ex = null; + } + + this.callback(ex, sent); +} + +export type { SocketType }; + +export default { + createSocket, + Socket, +}; |