diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-02-14 17:38:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-14 17:38:45 +0100 |
commit | d47147fb6ad229b1c039aff9d0959b6e281f4df5 (patch) | |
tree | 6e9e790f2b9bc71b5f0c9c7e64b95cae31579d58 /ext/node/polyfills/_http_agent.mjs | |
parent | 1d00bbe47e2ca14e2d2151518e02b2324461a065 (diff) |
feat(ext/node): embed std/node into the snapshot (#17724)
This commit moves "deno_std/node" in "ext/node" crate. The code is
transpiled and snapshotted during the build process.
During the first pass a minimal amount of work was done to create the
snapshot, a lot of code in "ext/node" depends on presence of "Deno"
global. This code will be gradually fixed in the follow up PRs to migrate
it to import relevant APIs from "internal:" modules.
Currently the code from snapshot is not used in any way, and all
Node/npm compatibility still uses code from
"https://deno.land/std/node" (or from the location specified by
"DENO_NODE_COMPAT_URL"). This will also be handled in a follow
up PRs.
---------
Co-authored-by: crowlkats <crowlkats@toaxl.com>
Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Co-authored-by: Yoshiya Hinosawa <stibium121@gmail.com>
Diffstat (limited to 'ext/node/polyfills/_http_agent.mjs')
-rw-r--r-- | ext/node/polyfills/_http_agent.mjs | 526 |
1 files changed, 526 insertions, 0 deletions
diff --git a/ext/node/polyfills/_http_agent.mjs b/ext/node/polyfills/_http_agent.mjs new file mode 100644 index 000000000..20c1eaf0b --- /dev/null +++ b/ext/node/polyfills/_http_agent.mjs @@ -0,0 +1,526 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent and Node contributors. All rights reserved. MIT license. + +import * as net from "internal:deno_node/polyfills/net.ts"; +import EventEmitter from "internal:deno_node/polyfills/events.ts"; +import { debuglog } from "internal:deno_node/polyfills/internal/util/debuglog.ts"; +let debug = debuglog("http", (fn) => { + debug = fn; +}); +import { AsyncResource } from "internal:deno_node/polyfills/async_hooks.ts"; +import { symbols } from "internal:deno_node/polyfills/internal/async_hooks.ts"; +// deno-lint-ignore camelcase +const { async_id_symbol } = symbols; +import { ERR_OUT_OF_RANGE } from "internal:deno_node/polyfills/internal/errors.ts"; +import { once } from "internal:deno_node/polyfills/internal/util.mjs"; +import { + validateNumber, + validateOneOf, + validateString, +} from "internal:deno_node/polyfills/internal/validators.mjs"; + +const kOnKeylog = Symbol("onkeylog"); +const kRequestOptions = Symbol("requestOptions"); +const kRequestAsyncResource = Symbol("requestAsyncResource"); +// New Agent code. + +// The largest departure from the previous implementation is that +// an Agent instance holds connections for a variable number of host:ports. +// Surprisingly, this is still API compatible as far as third parties are +// concerned. The only code that really notices the difference is the +// request object. + +// Another departure is that all code related to HTTP parsing is in +// ClientRequest.onSocket(). The Agent is now *strictly* +// concerned with managing a connection pool. + +class ReusedHandle { + constructor(type, handle) { + this.type = type; + this.handle = handle; + } +} + +function freeSocketErrorListener(err) { + // deno-lint-ignore no-this-alias + const socket = this; + debug("SOCKET ERROR on FREE socket:", err.message, err.stack); + socket.destroy(); + socket.emit("agentRemove"); +} + +export function Agent(options) { + if (!(this instanceof Agent)) { + return new Agent(options); + } + + EventEmitter.call(this); + + this.defaultPort = 80; + this.protocol = "http:"; + + this.options = { __proto__: null, ...options }; + + // Don't confuse net and make it think that we're connecting to a pipe + this.options.path = null; + this.requests = Object.create(null); + this.sockets = Object.create(null); + this.freeSockets = Object.create(null); + this.keepAliveMsecs = this.options.keepAliveMsecs || 1000; + this.keepAlive = this.options.keepAlive || false; + this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets; + this.maxFreeSockets = this.options.maxFreeSockets || 256; + this.scheduling = this.options.scheduling || "lifo"; + this.maxTotalSockets = this.options.maxTotalSockets; + this.totalSocketCount = 0; + + validateOneOf(this.scheduling, "scheduling", ["fifo", "lifo"]); + + if (this.maxTotalSockets !== undefined) { + validateNumber(this.maxTotalSockets, "maxTotalSockets"); + if (this.maxTotalSockets <= 0 || Number.isNaN(this.maxTotalSockets)) { + throw new ERR_OUT_OF_RANGE( + "maxTotalSockets", + "> 0", + this.maxTotalSockets, + ); + } + } else { + this.maxTotalSockets = Infinity; + } + + this.on("free", (socket, options) => { + const name = this.getName(options); + debug("agent.on(free)", name); + + // TODO(ronag): socket.destroy(err) might have been called + // before coming here and have an 'error' scheduled. In the + // case of socket.destroy() below this 'error' has no handler + // and could cause unhandled exception. + + if (!socket.writable) { + socket.destroy(); + return; + } + + const requests = this.requests[name]; + if (requests && requests.length) { + const req = requests.shift(); + const reqAsyncRes = req[kRequestAsyncResource]; + if (reqAsyncRes) { + // Run request within the original async context. + reqAsyncRes.runInAsyncScope(() => { + asyncResetHandle(socket); + setRequestSocket(this, req, socket); + }); + req[kRequestAsyncResource] = null; + } else { + setRequestSocket(this, req, socket); + } + if (requests.length === 0) { + delete this.requests[name]; + } + return; + } + + // If there are no pending requests, then put it in + // the freeSockets pool, but only if we're allowed to do so. + const req = socket._httpMessage; + if (!req || !req.shouldKeepAlive || !this.keepAlive) { + socket.destroy(); + return; + } + + const freeSockets = this.freeSockets[name] || []; + const freeLen = freeSockets.length; + let count = freeLen; + if (this.sockets[name]) { + count += this.sockets[name].length; + } + + if ( + this.totalSocketCount > this.maxTotalSockets || + count > this.maxSockets || + freeLen >= this.maxFreeSockets || + !this.keepSocketAlive(socket) + ) { + socket.destroy(); + return; + } + + this.freeSockets[name] = freeSockets; + socket[async_id_symbol] = -1; + socket._httpMessage = null; + this.removeSocket(socket, options); + + socket.once("error", freeSocketErrorListener); + freeSockets.push(socket); + }); + + // Don't emit keylog events unless there is a listener for them. + this.on("newListener", maybeEnableKeylog); +} +Object.setPrototypeOf(Agent.prototype, EventEmitter.prototype); +Object.setPrototypeOf(Agent, EventEmitter); + +function maybeEnableKeylog(eventName) { + if (eventName === "keylog") { + this.removeListener("newListener", maybeEnableKeylog); + // Future sockets will listen on keylog at creation. + // deno-lint-ignore no-this-alias + const agent = this; + this[kOnKeylog] = function onkeylog(keylog) { + agent.emit("keylog", keylog, this); + }; + // Existing sockets will start listening on keylog now. + const sockets = ObjectValues(this.sockets); + for (let i = 0; i < sockets.length; i++) { + sockets[i].on("keylog", this[kOnKeylog]); + } + } +} + +Agent.defaultMaxSockets = Infinity; + +Agent.prototype.createConnection = net.createConnection; + +// Get the key for a given set of request options +Agent.prototype.getName = function getName(options = {}) { + let name = options.host || "localhost"; + + name += ":"; + if (options.port) { + name += options.port; + } + + name += ":"; + if (options.localAddress) { + name += options.localAddress; + } + + // Pacify parallel/test-http-agent-getname by only appending + // the ':' when options.family is set. + if (options.family === 4 || options.family === 6) { + name += `:${options.family}`; + } + + if (options.socketPath) { + name += `:${options.socketPath}`; + } + + return name; +}; + +Agent.prototype.addRequest = function addRequest( + req, + options, + port, /* legacy */ + localAddress, /* legacy */ +) { + // Legacy API: addRequest(req, host, port, localAddress) + if (typeof options === "string") { + options = { + __proto__: null, + host: options, + port, + localAddress, + }; + } + + options = { __proto__: null, ...options, ...this.options }; + if (options.socketPath) { + options.path = options.socketPath; + } + + if (!options.servername && options.servername !== "") { + options.servername = calculateServerName(options, req); + } + + const name = this.getName(options); + if (!this.sockets[name]) { + this.sockets[name] = []; + } + + const freeSockets = this.freeSockets[name]; + let socket; + if (freeSockets) { + while (freeSockets.length && freeSockets[0].destroyed) { + freeSockets.shift(); + } + socket = this.scheduling === "fifo" + ? freeSockets.shift() + : freeSockets.pop(); + if (!freeSockets.length) { + delete this.freeSockets[name]; + } + } + + const freeLen = freeSockets ? freeSockets.length : 0; + const sockLen = freeLen + this.sockets[name].length; + + if (socket) { + asyncResetHandle(socket); + this.reuseSocket(socket, req); + setRequestSocket(this, req, socket); + this.sockets[name].push(socket); + } else if ( + sockLen < this.maxSockets && + this.totalSocketCount < this.maxTotalSockets + ) { + debug("call onSocket", sockLen, freeLen); + // If we are under maxSockets create a new one. + this.createSocket(req, options, (err, socket) => { + if (err) { + req.onSocket(socket, err); + } else { + setRequestSocket(this, req, socket); + } + }); + } else { + debug("wait for socket"); + // We are over limit so we'll add it to the queue. + if (!this.requests[name]) { + this.requests[name] = []; + } + + // Used to create sockets for pending requests from different origin + req[kRequestOptions] = options; + // Used to capture the original async context. + req[kRequestAsyncResource] = new AsyncResource("QueuedRequest"); + + this.requests[name].push(req); + } +}; + +Agent.prototype.createSocket = function createSocket(req, options, cb) { + options = { __proto__: null, ...options, ...this.options }; + if (options.socketPath) { + options.path = options.socketPath; + } + + if (!options.servername && options.servername !== "") { + options.servername = calculateServerName(options, req); + } + + const name = this.getName(options); + options._agentKey = name; + + debug("createConnection", name, options); + options.encoding = null; + + const oncreate = once((err, s) => { + if (err) { + return cb(err); + } + if (!this.sockets[name]) { + this.sockets[name] = []; + } + this.sockets[name].push(s); + this.totalSocketCount++; + debug("sockets", name, this.sockets[name].length, this.totalSocketCount); + installListeners(this, s, options); + cb(null, s); + }); + + const newSocket = this.createConnection(options, oncreate); + if (newSocket) { + oncreate(null, newSocket); + } +}; + +function calculateServerName(options, req) { + let servername = options.host; + const hostHeader = req.getHeader("host"); + if (hostHeader) { + validateString(hostHeader, "options.headers.host"); + + // abc => abc + // abc:123 => abc + // [::1] => ::1 + // [::1]:123 => ::1 + if (hostHeader.startsWith("[")) { + const index = hostHeader.indexOf("]"); + if (index === -1) { + // Leading '[', but no ']'. Need to do something... + servername = hostHeader; + } else { + servername = hostHeader.slice(1, index); + } + } else { + servername = hostHeader.split(":", 1)[0]; + } + } + // Don't implicitly set invalid (IP) servernames. + if (net.isIP(servername)) { + servername = ""; + } + return servername; +} + +function installListeners(agent, s, options) { + function onFree() { + debug("CLIENT socket onFree"); + agent.emit("free", s, options); + } + s.on("free", onFree); + + function onClose(_err) { + debug("CLIENT socket onClose"); + // This is the only place where sockets get removed from the Agent. + // If you want to remove a socket from the pool, just close it. + // All socket errors end in a close event anyway. + agent.totalSocketCount--; + agent.removeSocket(s, options); + } + s.on("close", onClose); + + function onTimeout() { + debug("CLIENT socket onTimeout"); + + // Destroy if in free list. + // TODO(ronag): Always destroy, even if not in free list. + const sockets = agent.freeSockets; + if (Object.keys(sockets).some((name) => sockets[name].includes(s))) { + return s.destroy(); + } + } + s.on("timeout", onTimeout); + + function onRemove() { + // We need this function for cases like HTTP 'upgrade' + // (defined by WebSockets) where we need to remove a socket from the + // pool because it'll be locked up indefinitely + debug("CLIENT socket onRemove"); + agent.totalSocketCount--; + agent.removeSocket(s, options); + s.removeListener("close", onClose); + s.removeListener("free", onFree); + s.removeListener("timeout", onTimeout); + s.removeListener("agentRemove", onRemove); + } + s.on("agentRemove", onRemove); + + if (agent[kOnKeylog]) { + s.on("keylog", agent[kOnKeylog]); + } +} + +Agent.prototype.removeSocket = function removeSocket(s, options) { + const name = this.getName(options); + debug("removeSocket", name, "writable:", s.writable); + const sets = [this.sockets]; + + // If the socket was destroyed, remove it from the free buffers too. + if (!s.writable) { + sets.push(this.freeSockets); + } + + for (let sk = 0; sk < sets.length; sk++) { + const sockets = sets[sk]; + + if (sockets[name]) { + const index = sockets[name].indexOf(s); + if (index !== -1) { + sockets[name].splice(index, 1); + // Don't leak + if (sockets[name].length === 0) { + delete sockets[name]; + } + } + } + } + + let req; + if (this.requests[name] && this.requests[name].length) { + debug("removeSocket, have a request, make a socket"); + req = this.requests[name][0]; + } else { + // TODO(rickyes): this logic will not be FIFO across origins. + // There might be older requests in a different origin, but + // if the origin which releases the socket has pending requests + // that will be prioritized. + const keys = Object.keys(this.requests); + for (let i = 0; i < keys.length; i++) { + const prop = keys[i]; + // Check whether this specific origin is already at maxSockets + if (this.sockets[prop] && this.sockets[prop].length) break; + debug( + "removeSocket, have a request with different origin," + + " make a socket", + ); + req = this.requests[prop][0]; + options = req[kRequestOptions]; + break; + } + } + + if (req && options) { + req[kRequestOptions] = undefined; + // If we have pending requests and a socket gets closed make a new one + this.createSocket(req, options, (err, socket) => { + if (err) { + req.onSocket(socket, err); + } else { + socket.emit("free"); + } + }); + } +}; + +Agent.prototype.keepSocketAlive = function keepSocketAlive(socket) { + socket.setKeepAlive(true, this.keepAliveMsecs); + socket.unref(); + + const agentTimeout = this.options.timeout || 0; + if (socket.timeout !== agentTimeout) { + socket.setTimeout(agentTimeout); + } + + return true; +}; + +Agent.prototype.reuseSocket = function reuseSocket(socket, req) { + debug("have free socket"); + socket.removeListener("error", freeSocketErrorListener); + req.reusedSocket = true; + socket.ref(); +}; + +Agent.prototype.destroy = function destroy() { + const sets = [this.freeSockets, this.sockets]; + for (let s = 0; s < sets.length; s++) { + const set = sets[s]; + const keys = Object.keys(set); + for (let v = 0; v < keys.length; v++) { + const setName = set[keys[v]]; + for (let n = 0; n < setName.length; n++) { + setName[n].destroy(); + } + } + } +}; + +function setRequestSocket(agent, req, socket) { + req.onSocket(socket); + const agentTimeout = agent.options.timeout || 0; + if (req.timeout === undefined || req.timeout === agentTimeout) { + return; + } + socket.setTimeout(req.timeout); +} + +function asyncResetHandle(socket) { + // Guard against an uninitialized or user supplied Socket. + const handle = socket._handle; + if (handle && typeof handle.asyncReset === "function") { + // Assign the handle a new asyncId and run any destroy()/init() hooks. + handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle)); + socket[async_id_symbol] = handle.getAsyncId(); + } +} + +export const globalAgent = new Agent(); +export default { + Agent, + globalAgent, +}; |