diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/fetch/22_body.js | 16 | ||||
-rw-r--r-- | ext/fetch/23_request.js | 77 | ||||
-rw-r--r-- | ext/fetch/23_response.js | 20 | ||||
-rw-r--r-- | ext/flash/01_http.js | 569 | ||||
-rw-r--r-- | ext/flash/Cargo.toml | 29 | ||||
-rw-r--r-- | ext/flash/README.md | 7 | ||||
-rw-r--r-- | ext/flash/chunked.rs | 272 | ||||
-rw-r--r-- | ext/flash/lib.rs | 1567 | ||||
-rw-r--r-- | ext/flash/sendfile.rs | 81 | ||||
-rw-r--r-- | ext/http/01_http.js | 16 | ||||
-rw-r--r-- | ext/http/lib.rs | 39 | ||||
-rw-r--r-- | ext/web/06_streams.js | 1 | ||||
-rw-r--r-- | ext/websocket/lib.rs | 23 |
13 files changed, 2691 insertions, 26 deletions
diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js index 10ddb7603..a51cdc184 100644 --- a/ext/fetch/22_body.js +++ b/ext/fetch/22_body.js @@ -388,7 +388,10 @@ let source = null; let length = null; let contentType = null; - if (ObjectPrototypeIsPrototypeOf(BlobPrototype, object)) { + if (typeof object === "string") { + source = object; + contentType = "text/plain;charset=UTF-8"; + } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, object)) { stream = object.stream(); source = object; length = object.size; @@ -424,24 +427,21 @@ // TODO(@satyarohith): not sure what primordial here. source = object.toString(); contentType = "application/x-www-form-urlencoded;charset=UTF-8"; - } else if (typeof object === "string") { - source = object; - contentType = "text/plain;charset=UTF-8"; } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, object)) { stream = object; if (object.locked || isReadableStreamDisturbed(object)) { throw new TypeError("ReadableStream is locked or disturbed"); } } - if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, source)) { - stream = { body: source, consumed: false }; - length = source.byteLength; - } else if (typeof source === "string") { + if (typeof source === "string") { // WARNING: this deviates from spec (expects length to be set) // https://fetch.spec.whatwg.org/#bodyinit > 7. // no observable side-effect for users so far, but could change stream = { body: source, consumed: false }; length = null; // NOTE: string length != byte length + } else if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, source)) { + stream = { body: source, consumed: false }; + length = source.byteLength; } const body = new InnerBody(stream); body.source = source; diff --git a/ext/fetch/23_request.js b/ext/fetch/23_request.js index 63fc6a26e..5221d5ca9 100644 --- a/ext/fetch/23_request.js +++ b/ext/fetch/23_request.js @@ -16,7 +16,7 @@ const { HTTP_TOKEN_CODE_POINT_RE, byteUpperCase } = window.__bootstrap.infra; const { URL } = window.__bootstrap.url; const { guardFromHeaders } = window.__bootstrap.headers; - const { mixinBody, extractBody } = window.__bootstrap.fetchBody; + const { mixinBody, extractBody, InnerBody } = window.__bootstrap.fetchBody; const { getLocationHref } = window.__bootstrap.location; const { extractMimeType } = window.__bootstrap.mimesniff; const { blobFromObjectUrl } = window.__bootstrap.file; @@ -48,6 +48,9 @@ const _signal = Symbol("signal"); const _mimeType = Symbol("mime type"); const _body = Symbol("body"); + const _flash = Symbol("flash"); + const _url = Symbol("url"); + const _method = Symbol("method"); /** * @param {(() => string)[]} urlList @@ -266,7 +269,11 @@ return extractMimeType(values); } get [_body]() { - return this[_request].body; + if (this[_flash]) { + return this[_flash].body; + } else { + return this[_request].body; + } } /** @@ -427,12 +434,31 @@ get method() { webidl.assertBranded(this, RequestPrototype); - return this[_request].method; + if (this[_method]) { + return this[_method]; + } + if (this[_flash]) { + this[_method] = this[_flash].methodCb(); + return this[_method]; + } else { + this[_method] = this[_request].method; + return this[_method]; + } } get url() { webidl.assertBranded(this, RequestPrototype); - return this[_request].url(); + if (this[_url]) { + return this[_url]; + } + + if (this[_flash]) { + this[_url] = this[_flash].urlCb(); + return this[_url]; + } else { + this[_url] = this[_request].url(); + return this[_url]; + } } get headers() { @@ -442,6 +468,9 @@ get redirect() { webidl.assertBranded(this, RequestPrototype); + if (this[_flash]) { + return this[_flash].redirectMode; + } return this[_request].redirectMode; } @@ -455,7 +484,12 @@ if (this[_body] && this[_body].unusable()) { throw new TypeError("Body is unusable."); } - const newReq = cloneInnerRequest(this[_request]); + let newReq; + if (this[_flash]) { + newReq = cloneInnerRequest(this[_flash]); + } else { + newReq = cloneInnerRequest(this[_request]); + } const newSignal = abortSignal.newSignal(); abortSignal.follow(newSignal, this[_signal]); return fromInnerRequest( @@ -549,10 +583,43 @@ return request; } + /** + * @param {number} serverId + * @param {number} streamRid + * @param {ReadableStream} body + * @param {() => string} methodCb + * @param {() => string} urlCb + * @param {() => [string, string][]} headersCb + * @returns {Request} + */ + function fromFlashRequest( + serverId, + streamRid, + body, + methodCb, + urlCb, + headersCb, + ) { + const request = webidl.createBranded(Request); + request[_flash] = { + body: body !== null ? new InnerBody(body) : null, + methodCb, + urlCb, + streamRid, + serverId, + redirectMode: "follow", + redirectCount: 0, + }; + request[_getHeaders] = () => headersFromHeaderList(headersCb(), "request"); + return request; + } + window.__bootstrap.fetch ??= {}; window.__bootstrap.fetch.Request = Request; window.__bootstrap.fetch.toInnerRequest = toInnerRequest; + window.__bootstrap.fetch.fromFlashRequest = fromFlashRequest; window.__bootstrap.fetch.fromInnerRequest = fromInnerRequest; window.__bootstrap.fetch.newInnerRequest = newInnerRequest; window.__bootstrap.fetch.processUrlList = processUrlList; + window.__bootstrap.fetch._flash = _flash; })(globalThis); diff --git a/ext/fetch/23_response.js b/ext/fetch/23_response.js index 226a751bd..3c19f963a 100644 --- a/ext/fetch/23_response.js +++ b/ext/fetch/23_response.js @@ -15,6 +15,9 @@ const { isProxy } = Deno.core; const webidl = window.__bootstrap.webidl; const consoleInternal = window.__bootstrap.console; + const { + byteLowerCase, + } = window.__bootstrap.infra; const { HTTP_TAB_OR_SPACE, regexMatcher, serializeJSValueToJSONString } = window.__bootstrap.infra; const { extractBody, mixinBody } = window.__bootstrap.fetchBody; @@ -185,7 +188,6 @@ // 4. response[_response].statusMessage = init.statusText; - // 5. /** @type {__bootstrap.headers.Headers} */ const headers = response[_headers]; @@ -200,10 +202,22 @@ "Response with null body status cannot have body", ); } + const { body, contentType } = bodyWithType; response[_response].body = body; - if (contentType !== null && !headers.has("content-type")) { - headers.append("Content-Type", contentType); + + if (contentType !== null) { + let hasContentType = false; + const list = headerListFromHeaders(headers); + for (let i = 0; i < list.length; i++) { + if (byteLowerCase(list[i][0]) === "content-type") { + hasContentType = true; + break; + } + } + if (!hasContentType) { + ArrayPrototypePush(list, ["Content-Type", contentType]); + } } } } diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js new file mode 100644 index 000000000..fd817219e --- /dev/null +++ b/ext/flash/01_http.js @@ -0,0 +1,569 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const { BlobPrototype } = window.__bootstrap.file; + const { fromFlashRequest, toInnerResponse } = window.__bootstrap.fetch; + const core = window.Deno.core; + const { + ReadableStream, + ReadableStreamPrototype, + getReadableStreamRid, + readableStreamClose, + _state, + } = window.__bootstrap.streams; + const { + WebSocket, + _rid, + _readyState, + _eventLoop, + _protocol, + _server, + _idleTimeoutDuration, + _idleTimeoutTimeout, + _serverHandleIdleTimeout, + } = window.__bootstrap.webSocket; + const { _ws } = window.__bootstrap.http; + const { + ObjectPrototypeIsPrototypeOf, + TypedArrayPrototypeSubarray, + TypeError, + Uint8Array, + Uint8ArrayPrototype, + } = window.__bootstrap.primordials; + + const statusCodes = { + 100: "Continue", + 101: "Switching Protocols", + 102: "Processing", + 200: "OK", + 201: "Created", + 202: "Accepted", + 203: "Non Authoritative Information", + 204: "No Content", + 205: "Reset Content", + 206: "Partial Content", + 207: "Multi-Status", + 208: "Already Reported", + 226: "IM Used", + 300: "Multiple Choices", + 301: "Moved Permanently", + 302: "Found", + 303: "See Other", + 304: "Not Modified", + 305: "Use Proxy", + 307: "Temporary Redirect", + 308: "Permanent Redirect", + 400: "Bad Request", + 401: "Unauthorized", + 402: "Payment Required", + 403: "Forbidden", + 404: "Not Found", + 405: "Method Not Allowed", + 406: "Not Acceptable", + 407: "Proxy Authentication Required", + 408: "Request Timeout", + 409: "Conflict", + 410: "Gone", + 411: "Length Required", + 412: "Precondition Failed", + 413: "Payload Too Large", + 414: "URI Too Long", + 415: "Unsupported Media Type", + 416: "Range Not Satisfiable", + 418: "I'm a teapot", + 421: "Misdirected Request", + 422: "Unprocessable Entity", + 423: "Locked", + 424: "Failed Dependency", + 426: "Upgrade Required", + 428: "Precondition Required", + 429: "Too Many Requests", + 431: "Request Header Fields Too Large", + 451: "Unavailable For Legal Reasons", + 500: "Internal Server Error", + 501: "Not Implemented", + 502: "Bad Gateway", + 503: "Service Unavailable", + 504: "Gateway Timeout", + 505: "HTTP Version Not Supported", + 506: "Variant Also Negotiates", + 507: "Insufficient Storage", + 508: "Loop Detected", + 510: "Not Extended", + 511: "Network Authentication Required", + }; + + const methods = { + 0: "GET", + 1: "HEAD", + 2: "CONNECT", + 3: "PUT", + 4: "DELETE", + 5: "OPTIONS", + 6: "TRACE", + 7: "POST", + 8: "PATCH", + }; + + let dateInterval; + let date; + let stringResources = {}; + + // Construct an HTTP response message. + // All HTTP/1.1 messages consist of a start-line followed by a sequence + // of octets. + // + // HTTP-message = start-line + // *( header-field CRLF ) + // CRLF + // [ message-body ] + // + function http1Response(method, status, headerList, body, earlyEnd = false) { + // HTTP uses a "<major>.<minor>" numbering scheme + // HTTP-version = HTTP-name "/" DIGIT "." DIGIT + // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive + // + // status-line = HTTP-version SP status-code SP reason-phrase CRLF + // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2 + let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`; + for (const [name, value] of headerList) { + // header-field = field-name ":" OWS field-value OWS + str += `${name}: ${value}\r\n`; + } + + // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6 + if (status === 205 || status === 304) { + // MUST NOT generate a payload in a 205 response. + // indicate a zero-length body for the response by + // including a Content-Length header field with a value of 0. + str += "Content-Length: 0\r\n"; + return str; + } + + // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204. + if (status == 204 && status <= 100) { + return str; + } + + if (earlyEnd === true) { + return str; + } + + // null body status is validated by inititalizeAResponse in ext/fetch + if (body !== null && body !== undefined) { + str += `Content-Length: ${body.length}\r\n\r\n`; + } else { + str += "Transfer-Encoding: chunked\r\n\r\n"; + return str; + } + + // A HEAD request. + if (method === 1) return str; + + if (typeof body === "string") { + str += body ?? ""; + } else { + const head = core.encode(str); + const response = new Uint8Array(head.byteLength + body.byteLength); + response.set(head, 0); + response.set(body, head.byteLength); + return response; + } + + return str; + } + + function prepareFastCalls() { + return core.opSync("op_flash_make_request"); + } + + function hostnameForDisplay(hostname) { + // If the hostname is "0.0.0.0", we display "localhost" in console + // because browsers in Windows don't resolve "0.0.0.0". + // See the discussion in https://github.com/denoland/deno_std/issues/1165 + return hostname === "0.0.0.0" ? "localhost" : hostname; + } + + function serve(handler, opts = {}) { + delete opts.key; + delete opts.cert; + return serveInner(handler, opts, false); + } + + function serveTls(handler, opts = {}) { + return serveInner(handler, opts, true); + } + + function serveInner(handler, opts, useTls) { + opts = { hostname: "127.0.0.1", port: 9000, useTls, ...opts }; + const signal = opts.signal; + delete opts.signal; + const onError = opts.onError ?? function (error) { + console.error(error); + return new Response("Internal Server Error", { status: 500 }); + }; + delete opts.onError; + const onListen = opts.onListen ?? function () { + console.log( + `Listening on http://${ + hostnameForDisplay(opts.hostname) + }:${opts.port}/`, + ); + }; + delete opts.onListen; + const serverId = core.ops.op_flash_serve(opts); + const serverPromise = core.opAsync("op_flash_drive_server", serverId); + + core.opAsync("op_flash_wait_for_listening", serverId).then(() => { + onListen({ hostname: opts.hostname, port: opts.port }); + }); + + const server = { + id: serverId, + transport: opts.cert && opts.key ? "https" : "http", + hostname: opts.hostname, + port: opts.port, + closed: false, + finished: (async () => { + return await serverPromise; + })(), + async close() { + if (server.closed) { + return; + } + server.closed = true; + await core.opAsync("op_flash_close_server", serverId); + await server.finished; + }, + async serve() { + while (true) { + if (server.closed) { + break; + } + + let token = nextRequestSync(); + if (token === 0) { + token = await core.opAsync("op_flash_next_async", serverId); + if (server.closed) { + break; + } + } + + for (let i = 0; i < token; i++) { + let body = null; + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + const method = getMethodSync(i); + let hasBody = method > 2; // Not GET/HEAD/CONNECT + if (hasBody) { + body = createRequestBodyStream(serverId, i); + if (body === null) { + hasBody = false; + } + } + + const req = fromFlashRequest( + serverId, + /* streamRid */ + i, + body, + /* methodCb */ + () => methods[method], + /* urlCb */ + () => { + const path = core.ops.op_flash_path(serverId, i); + return `${server.transport}://${server.hostname}:${server.port}${path}`; + }, + /* headersCb */ + () => core.ops.op_flash_headers(serverId, i), + ); + + let resp; + try { + resp = await handler(req); + } catch (e) { + resp = await onError(e); + } + // there might've been an HTTP upgrade. + if (resp === undefined) { + continue; + } + + const ws = resp[_ws]; + if (!ws) { + if (hasBody && body[_state] !== "closed") { + // TODO(@littledivy): Optimize by draining in a single op. + try { + await req.arrayBuffer(); + } catch { /* pass */ } + } + } + + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ + let respBody = null; + let isStreamingResponseBody = false; + if (innerResp.body !== null) { + if (typeof innerResp.body.streamOrStatic?.body === "string") { + if (innerResp.body.streamOrStatic.consumed === true) { + throw new TypeError("Body is unusable."); + } + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + isStreamingResponseBody = false; + } else if ( + ObjectPrototypeIsPrototypeOf( + ReadableStreamPrototype, + innerResp.body.streamOrStatic, + ) + ) { + if (innerResp.body.unusable()) { + throw new TypeError("Body is unusable."); + } + if ( + innerResp.body.length === null || + ObjectPrototypeIsPrototypeOf( + BlobPrototype, + innerResp.body.source, + ) + ) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } + } + isStreamingResponseBody = !( + typeof respBody === "string" || + ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) + ); + } else { + if (innerResp.body.streamOrStatic.consumed === true) { + throw new TypeError("Body is unusable."); + } + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + } + } else { + respBody = new Uint8Array(0); + } + + if (isStreamingResponseBody === true) { + const resourceRid = getReadableStreamRid(respBody); + if (resourceRid) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); + } + const reader = respBody.getReader(); // Aquire JS lock. + try { + core.opAsync( + "op_flash_write_resource", + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + null, + true, + ), + serverId, + i, + resourceRid, + ).then(() => { + // Release JS lock. + readableStreamClose(respBody); + }); + } catch (error) { + await reader.cancel(error); + throw error; + } + } else { + const reader = respBody.getReader(); + let first = true; + a: + while (true) { + const { value, done } = await reader.read(); + if (first) { + first = false; + core.ops.op_flash_respond( + serverId, + i, + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + null, + ), + value ?? new Uint8Array(), + false, + ); + } else { + if (value === undefined) { + core.ops.op_flash_respond_chuncked( + serverId, + i, + undefined, + done, + ); + } else { + respondChunked( + i, + value, + done, + ); + } + } + if (done) break a; + } + } + } else { + const responseStr = http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + respBody, + ); + + // TypedArray + if (typeof responseStr !== "string") { + respondFast(i, responseStr, !ws); + } else { + // string + const maybeResponse = stringResources[responseStr]; + if (maybeResponse === undefined) { + stringResources[responseStr] = core.encode(responseStr); + core.ops.op_flash_respond( + serverId, + i, + stringResources[responseStr], + null, + !ws, // Don't close socket if there is a deferred websocket upgrade. + ); + } else { + respondFast(i, maybeResponse, !ws); + } + } + } + + if (ws) { + const wsRid = await core.opAsync( + "op_flash_upgrade_websocket", + serverId, + i, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); + } + } + } + await server.finished; + }, + }; + + signal?.addEventListener("abort", () => { + clearInterval(dateInterval); + server.close().then(() => {}, () => {}); + }, { + once: true, + }); + + const fastOp = prepareFastCalls(); + let nextRequestSync = () => fastOp.nextRequest(); + let getMethodSync = (token) => fastOp.getMethod(token); + let respondChunked = (token, chunk, shutdown) => + fastOp.respondChunked(token, chunk, shutdown); + let respondFast = (token, response, shutdown) => + fastOp.respond(token, response, shutdown); + if (serverId > 0) { + nextRequestSync = () => core.ops.op_flash_next_server(serverId); + getMethodSync = (token) => core.ops.op_flash_method(serverId, token); + respondChunked = (token, chunk, shutdown) => + core.ops.op_flash_respond_chuncked(serverId, token, chunk, shutdown); + respondFast = (token, response, shutdown) => + core.ops.op_flash_respond(serverId, token, response, null, shutdown); + } + + if (!dateInterval) { + date = new Date().toUTCString(); + dateInterval = setInterval(() => { + date = new Date().toUTCString(); + stringResources = {}; + }, 1000); + } + + return server.serve().catch(console.error); + } + + function createRequestBodyStream(serverId, token) { + // The first packet is left over bytes after parsing the request + const firstRead = core.ops.op_flash_first_packet( + serverId, + token, + ); + if (!firstRead) return null; + let firstEnqueued = firstRead.byteLength == 0; + + return new ReadableStream({ + type: "bytes", + async pull(controller) { + try { + if (firstEnqueued === false) { + controller.enqueue(firstRead); + firstEnqueued = true; + return; + } + // This is the largest possible size for a single packet on a TLS + // stream. + const chunk = new Uint8Array(16 * 1024 + 256); + const read = await core.opAsync( + "op_flash_read_body", + serverId, + token, + chunk, + ); + if (read > 0) { + // We read some data. Enqueue it onto the stream. + controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); + } else { + // We have reached the end of the body, so we close the stream. + controller.close(); + } + } catch (err) { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + controller.close(); + } + }, + }); + } + + window.__bootstrap.flash = { + serve, + serveTls, + }; +})(this); diff --git a/ext/flash/Cargo.toml b/ext/flash/Cargo.toml new file mode 100644 index 000000000..f61bc025d --- /dev/null +++ b/ext/flash/Cargo.toml @@ -0,0 +1,29 @@ +# Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_flash" +version = "0.1.0" +authors = ["the Deno authors"] +edition = "2021" +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" +description = "Fast HTTP/1 server implementation for Deno" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core = { path = "../../core", version = "0.147.0" } +deno_tls = { version = "0.52.0", path = "../tls" } +# For HTTP/2 and websocket upgrades +deno_websocket = { version = "0.70.0", path = "../websocket" } +http = "0.2.6" +httparse = "1.7" +libc = "0.2" +log = "0.4.17" +mio = { version = "0.8.1", features = ["os-poll", "net"] } +rustls = { version = "0.20" } +rustls-pemfile = { version = "0.2.1" } +serde = { version = "1.0.136", features = ["derive"] } +tokio = { version = "1.19", features = ["full"] } diff --git a/ext/flash/README.md b/ext/flash/README.md new file mode 100644 index 000000000..465c60d47 --- /dev/null +++ b/ext/flash/README.md @@ -0,0 +1,7 @@ +# flash + +Flash is a fast HTTP/1.1 server implementation for Deno. + +```js +serve((req) => new Response("Hello World")); +``` diff --git a/ext/flash/chunked.rs b/ext/flash/chunked.rs new file mode 100644 index 000000000..86417807d --- /dev/null +++ b/ext/flash/chunked.rs @@ -0,0 +1,272 @@ +// Based on https://github.com/frewsxcv/rust-chunked-transfer/blob/5c08614458580f9e7a85124021006d83ce1ed6e9/src/decoder.rs +// Copyright 2015 The tiny-http Contributors +// Copyright 2015 The rust-chunked-transfer Contributors +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::error::Error; +use std::fmt; +use std::io::Error as IoError; +use std::io::ErrorKind; +use std::io::Read; +use std::io::Result as IoResult; + +pub struct Decoder<R> { + pub source: R, + + // remaining size of the chunk being read + // none if we are not in a chunk + pub remaining_chunks_size: Option<usize>, + pub end: bool, +} + +impl<R> Decoder<R> +where + R: Read, +{ + pub fn new(source: R, remaining_chunks_size: Option<usize>) -> Decoder<R> { + Decoder { + source, + remaining_chunks_size, + end: false, + } + } + + fn read_chunk_size(&mut self) -> IoResult<usize> { + let mut chunk_size_bytes = Vec::new(); + let mut has_ext = false; + + loop { + let byte = match self.source.by_ref().bytes().next() { + Some(b) => b?, + None => { + return Err(IoError::new(ErrorKind::InvalidInput, DecoderError)) + } + }; + + if byte == b'\r' { + break; + } + + if byte == b';' { + has_ext = true; + break; + } + + chunk_size_bytes.push(byte); + } + + // Ignore extensions for now + if has_ext { + loop { + let byte = match self.source.by_ref().bytes().next() { + Some(b) => b?, + None => { + return Err(IoError::new(ErrorKind::InvalidInput, DecoderError)) + } + }; + if byte == b'\r' { + break; + } + } + } + + self.read_line_feed()?; + + let chunk_size = String::from_utf8(chunk_size_bytes) + .ok() + .and_then(|c| usize::from_str_radix(c.trim(), 16).ok()) + .ok_or_else(|| IoError::new(ErrorKind::InvalidInput, DecoderError))?; + + Ok(chunk_size) + } + + fn read_carriage_return(&mut self) -> IoResult<()> { + match self.source.by_ref().bytes().next() { + Some(Ok(b'\r')) => Ok(()), + _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)), + } + } + + fn read_line_feed(&mut self) -> IoResult<()> { + match self.source.by_ref().bytes().next() { + Some(Ok(b'\n')) => Ok(()), + _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)), + } + } +} + +impl<R> Read for Decoder<R> +where + R: Read, +{ + fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> { + let remaining_chunks_size = match self.remaining_chunks_size { + Some(c) => c, + None => { + // first possibility: we are not in a chunk, so we'll attempt to determine + // the chunks size + let chunk_size = self.read_chunk_size()?; + + // if the chunk size is 0, we are at EOF + if chunk_size == 0 { + self.read_carriage_return()?; + self.read_line_feed()?; + self.end = true; + return Ok(0); + } + + chunk_size + } + }; + + // second possibility: we continue reading from a chunk + if buf.len() < remaining_chunks_size { + let read = self.source.read(buf)?; + self.remaining_chunks_size = Some(remaining_chunks_size - read); + return Ok(read); + } + + // third possibility: the read request goes further than the current chunk + // we simply read until the end of the chunk and return + let buf = &mut buf[..remaining_chunks_size]; + let read = self.source.read(buf)?; + self.remaining_chunks_size = if read == remaining_chunks_size { + self.read_carriage_return()?; + self.read_line_feed()?; + None + } else { + Some(remaining_chunks_size - read) + }; + + Ok(read) + } +} + +#[derive(Debug, Copy, Clone)] +struct DecoderError; + +impl fmt::Display for DecoderError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(fmt, "Error while decoding chunks") + } +} + +impl Error for DecoderError { + fn description(&self) -> &str { + "Error while decoding chunks" + } +} + +#[cfg(test)] +mod test { + use super::Decoder; + use std::io; + use std::io::Read; + + /// This unit test is taken from from Hyper + /// https://github.com/hyperium/hyper + /// Copyright (c) 2014 Sean McArthur + #[test] + fn test_read_chunk_size() { + fn read(s: &str, expected: usize) { + let mut decoded = Decoder::new(s.as_bytes(), None); + let actual = decoded.read_chunk_size().unwrap(); + assert_eq!(expected, actual); + } + + fn read_err(s: &str) { + let mut decoded = Decoder::new(s.as_bytes(), None); + let err_kind = decoded.read_chunk_size().unwrap_err().kind(); + assert_eq!(err_kind, io::ErrorKind::InvalidInput); + } + + read("1\r\n", 1); + read("01\r\n", 1); + read("0\r\n", 0); + read("00\r\n", 0); + read("A\r\n", 10); + read("a\r\n", 10); + read("Ff\r\n", 255); + read("Ff \r\n", 255); + // Missing LF or CRLF + read_err("F\rF"); + read_err("F"); + // Invalid hex digit + read_err("X\r\n"); + read_err("1X\r\n"); + read_err("-\r\n"); + read_err("-1\r\n"); + // Acceptable (if not fully valid) extensions do not influence the size + read("1;extension\r\n", 1); + read("a;ext name=value\r\n", 10); + read("1;extension;extension2\r\n", 1); + read("1;;; ;\r\n", 1); + read("2; extension...\r\n", 2); + read("3 ; extension=123\r\n", 3); + read("3 ;\r\n", 3); + read("3 ; \r\n", 3); + // Invalid extensions cause an error + read_err("1 invalid extension\r\n"); + read_err("1 A\r\n"); + read_err("1;no CRLF"); + } + + #[test] + fn test_valid_chunk_decode() { + let source = io::Cursor::new( + "3\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n\r\n" + .to_string() + .into_bytes(), + ); + let mut decoded = Decoder::new(source, None); + + let mut string = String::new(); + decoded.read_to_string(&mut string).unwrap(); + + assert_eq!(string, "hello world!!!"); + } + + #[test] + fn test_decode_zero_length() { + let mut decoder = Decoder::new(b"0\r\n\r\n" as &[u8], None); + + let mut decoded = String::new(); + decoder.read_to_string(&mut decoded).unwrap(); + + assert_eq!(decoded, ""); + } + + #[test] + fn test_decode_invalid_chunk_length() { + let mut decoder = Decoder::new(b"m\r\n\r\n" as &[u8], None); + + let mut decoded = String::new(); + assert!(decoder.read_to_string(&mut decoded).is_err()); + } + + #[test] + fn invalid_input1() { + let source = io::Cursor::new( + "2\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n" + .to_string() + .into_bytes(), + ); + let mut decoded = Decoder::new(source, None); + + let mut string = String::new(); + assert!(decoded.read_to_string(&mut string).is_err()); + } + + #[test] + fn invalid_input2() { + let source = io::Cursor::new( + "3\rhel\r\nb\r\nlo world!!!\r\n0\r\n" + .to_string() + .into_bytes(), + ); + let mut decoded = Decoder::new(source, None); + + let mut string = String::new(); + assert!(decoded.read_to_string(&mut string).is_err()); + } +} diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs new file mode 100644 index 000000000..2c0cc548a --- /dev/null +++ b/ext/flash/lib.rs @@ -0,0 +1,1567 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +// False positive lint for explicit drops. +// https://github.com/rust-lang/rust-clippy/issues/6446 +#![allow(clippy::await_holding_lock)] + +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op; +use deno_core::serde_v8; +use deno_core::v8; +use deno_core::v8::fast_api; +use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::StringOrBuffer; +use deno_core::ZeroCopyBuf; +use deno_core::V8_WRAPPER_OBJECT_INDEX; +use deno_tls::load_certs; +use deno_tls::load_private_keys; +use http::header::HeaderName; +use http::header::CONNECTION; +use http::header::CONTENT_LENGTH; +use http::header::EXPECT; +use http::header::TRANSFER_ENCODING; +use http::header::UPGRADE; +use http::HeaderValue; +use log::trace; +use mio::net::TcpListener; +use mio::net::TcpStream; +use mio::Events; +use mio::Interest; +use mio::Poll; +use mio::Token; +use serde::Deserialize; +use serde::Serialize; +use std::cell::RefCell; +use std::cell::UnsafeCell; +use std::collections::HashMap; +use std::ffi::c_void; +use std::future::Future; +use std::intrinsics::transmute; +use std::io::BufReader; +use std::io::Read; +use std::io::Write; +use std::marker::PhantomPinned; +use std::mem::replace; +use std::net::SocketAddr; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; +use std::task::Context; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +mod chunked; + +#[cfg(unix)] +mod sendfile; + +pub struct FlashContext { + next_server_id: u32, + join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>, + pub servers: HashMap<u32, ServerContext>, +} + +pub struct ServerContext { + _addr: SocketAddr, + tx: mpsc::Sender<NextRequest>, + rx: mpsc::Receiver<NextRequest>, + response: HashMap<u32, NextRequest>, + listening_rx: Option<mpsc::Receiver<()>>, + close_tx: mpsc::Sender<()>, + cancel_handle: Rc<CancelHandle>, +} + +struct InnerRequest { + _headers: Vec<httparse::Header<'static>>, + req: httparse::Request<'static, 'static>, + body_offset: usize, + body_len: usize, + buffer: Pin<Box<[u8]>>, +} + +#[derive(Debug, PartialEq)] +enum ParseStatus { + None, + Ongoing(usize), +} + +type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>; + +enum InnerStream { + Tcp(TcpStream), + Tls(Box<TlsTcpStream>), +} + +struct Stream { + inner: InnerStream, + detached: bool, + read_rx: Option<mpsc::Receiver<()>>, + read_tx: Option<mpsc::Sender<()>>, + parse_done: ParseStatus, + buffer: UnsafeCell<Vec<u8>>, + read_lock: Arc<Mutex<()>>, + _pin: PhantomPinned, +} + +impl Stream { + pub fn detach_ownership(&mut self) { + self.detached = true; + } + + fn reattach_ownership(&mut self) { + self.detached = false; + } +} + +impl Write for Stream { + #[inline] + fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { + match self.inner { + InnerStream::Tcp(ref mut stream) => stream.write(buf), + InnerStream::Tls(ref mut stream) => stream.write(buf), + } + } + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + match self.inner { + InnerStream::Tcp(ref mut stream) => stream.flush(), + InnerStream::Tls(ref mut stream) => stream.flush(), + } + } +} + +impl Read for Stream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + match self.inner { + InnerStream::Tcp(ref mut stream) => stream.read(buf), + InnerStream::Tls(ref mut stream) => stream.read(buf), + } + } +} + +struct NextRequest { + // Pointer to stream owned by the server loop thread. + // + // Why not Arc<Mutex<Stream>>? Performance. The stream + // is never written to by the server loop thread. + // + // Dereferencing is safe until server thread finishes and + // op_flash_serve resolves or websocket upgrade is performed. + socket: *mut Stream, + inner: InnerRequest, + keep_alive: bool, + #[allow(dead_code)] + upgrade: bool, + content_read: usize, + content_length: Option<u64>, + remaining_chunk_size: Option<usize>, + te_chunked: bool, + expect_continue: bool, +} + +// SAFETY: Sent from server thread to JS thread. +// See comment above for `socket`. +unsafe impl Send for NextRequest {} + +impl NextRequest { + #[inline(always)] + pub fn socket<'a>(&self) -> &'a mut Stream { + // SAFETY: Dereferencing is safe until server thread detaches socket or finishes. + unsafe { &mut *self.socket } + } +} + +#[op] +fn op_flash_respond( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: StringOrBuffer, + maybe_body: Option<ZeroCopyBuf>, + shutdown: bool, +) { + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + + let mut close = false; + let sock = match shutdown { + true => { + let tx = ctx.response.remove(&token).unwrap(); + close = !tx.keep_alive; + tx.socket() + } + // In case of a websocket upgrade or streaming response. + false => { + let tx = ctx.response.get(&token).unwrap(); + tx.socket() + } + }; + + sock.read_tx.take(); + sock.read_rx.take(); + + let _ = sock.write(&response); + if let Some(response) = maybe_body { + let _ = sock.write(format!("{:x}", response.len()).as_bytes()); + let _ = sock.write(b"\r\n"); + let _ = sock.write(&response); + let _ = sock.write(b"\r\n"); + } + + // server is done writing and request doesn't want to kept alive. + if shutdown && close { + match &mut sock.inner { + InnerStream::Tcp(stream) => { + // Typically shutdown shouldn't fail. + let _ = stream.shutdown(std::net::Shutdown::Both); + } + InnerStream::Tls(stream) => { + let _ = stream.sock.shutdown(std::net::Shutdown::Both); + } + } + } +} + +#[op] +fn op_flash_respond_chuncked( + op_state: &mut OpState, + server_id: u32, + token: u32, + response: Option<ZeroCopyBuf>, + shutdown: bool, +) { + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + match response { + Some(response) => { + respond_chunked(ctx, token, shutdown, Some(&response)); + } + None => { + respond_chunked(ctx, token, shutdown, None); + } + } +} + +#[op] +async fn op_flash_write_resource( + op_state: Rc<RefCell<OpState>>, + response: StringOrBuffer, + server_id: u32, + token: u32, + resource_id: deno_core::ResourceId, +) -> Result<(), AnyError> { + let resource = op_state.borrow_mut().resource_table.take_any(resource_id)?; + let sock = { + let op_state = &mut op_state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + ctx.response.remove(&token).unwrap().socket() + }; + + drop(op_state); + let _ = sock.write(&response); + + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + if let InnerStream::Tcp(stream_handle) = &sock.inner { + let stream_handle = stream_handle.as_raw_fd(); + if let Some(fd) = resource.clone().backing_fd() { + // SAFETY: all-zero byte-pattern is a valid value for libc::stat. + let mut stat: libc::stat = unsafe { std::mem::zeroed() }; + // SAFETY: call to libc::fstat. + if unsafe { libc::fstat(fd, &mut stat) } >= 0 { + let _ = sock.write( + format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(), + ); + let tx = sendfile::SendFile { + io: (fd, stream_handle), + written: 0, + }; + tx.await?; + return Ok(()); + } + } + } + } + + let _ = sock.write(b"Transfer-Encoding: chunked\r\n\r\n"); + loop { + let vec = vec![0u8; 64 * 1024]; // 64KB + let buf = ZeroCopyBuf::new_temp(vec); + let (nread, buf) = resource.clone().read_return(buf).await?; + if nread == 0 { + let _ = sock.write(b"0\r\n\r\n"); + break; + } + let response = &buf[..nread]; + + let _ = sock.write(format!("{:x}", response.len()).as_bytes()); + let _ = sock.write(b"\r\n"); + let _ = sock.write(response); + let _ = sock.write(b"\r\n"); + } + + resource.close(); + Ok(()) +} + +pub struct RespondFast; + +impl fast_api::FastFunction for RespondFast { + fn function(&self) -> *const c_void { + op_flash_respond_fast as *const c_void + } + + fn args(&self) -> &'static [fast_api::Type] { + &[ + fast_api::Type::V8Value, + fast_api::Type::Uint32, + fast_api::Type::TypedArray(fast_api::CType::Uint8), + fast_api::Type::Bool, + ] + } + + fn return_type(&self) -> fast_api::CType { + fast_api::CType::Void + } +} + +fn flash_respond( + ctx: &mut ServerContext, + token: u32, + shutdown: bool, + response: &[u8], +) { + let mut close = false; + let sock = match shutdown { + true => { + let tx = ctx.response.remove(&token).unwrap(); + close = !tx.keep_alive; + tx.socket() + } + // In case of a websocket upgrade or streaming response. + false => { + let tx = ctx.response.get(&token).unwrap(); + tx.socket() + } + }; + + sock.read_tx.take(); + sock.read_rx.take(); + + let _ = sock.write(response); + // server is done writing and request doesn't want to kept alive. + if shutdown && close { + match &mut sock.inner { + InnerStream::Tcp(stream) => { + // Typically shutdown shouldn't fail. + let _ = stream.shutdown(std::net::Shutdown::Both); + } + InnerStream::Tls(stream) => { + let _ = stream.sock.shutdown(std::net::Shutdown::Both); + } + } + } +} + +unsafe fn op_flash_respond_fast( + recv: v8::Local<v8::Object>, + token: u32, + response: *const fast_api::FastApiTypedArray<u8>, + shutdown: bool, +) { + let ptr = + recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); + let ctx = &mut *(ptr as *mut ServerContext); + + let response = &*response; + if let Some(response) = response.get_storage_if_aligned() { + flash_respond(ctx, token, shutdown, response); + } else { + todo!(); + } +} + +pub struct RespondChunkedFast; + +impl fast_api::FastFunction for RespondChunkedFast { + fn function(&self) -> *const c_void { + op_flash_respond_chunked_fast as *const c_void + } + + fn args(&self) -> &'static [fast_api::Type] { + &[ + fast_api::Type::V8Value, + fast_api::Type::Uint32, + fast_api::Type::TypedArray(fast_api::CType::Uint8), + fast_api::Type::Bool, + ] + } + + fn return_type(&self) -> fast_api::CType { + fast_api::CType::Void + } +} + +unsafe fn op_flash_respond_chunked_fast( + recv: v8::Local<v8::Object>, + token: u32, + response: *const fast_api::FastApiTypedArray<u8>, + shutdown: bool, +) { + let ptr = + recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); + let ctx = &mut *(ptr as *mut ServerContext); + + let response = &*response; + if let Some(response) = response.get_storage_if_aligned() { + respond_chunked(ctx, token, shutdown, Some(response)); + } else { + todo!(); + } +} + +fn respond_chunked( + ctx: &mut ServerContext, + token: u32, + shutdown: bool, + response: Option<&[u8]>, +) { + let sock = match shutdown { + true => { + let tx = ctx.response.remove(&token).unwrap(); + tx.socket() + } + // In case of a websocket upgrade or streaming response. + false => { + let tx = ctx.response.get(&token).unwrap(); + tx.socket() + } + }; + + if let Some(response) = response { + let _ = sock.write(format!("{:x}", response.len()).as_bytes()); + let _ = sock.write(b"\r\n"); + let _ = sock.write(response); + let _ = sock.write(b"\r\n"); + } + + // The last chunk + if shutdown { + let _ = sock.write(b"0\r\n\r\n"); + } + sock.reattach_ownership(); +} + +macro_rules! get_request { + ($op_state: ident, $token: ident) => { + get_request!($op_state, 0, $token) + }; + ($op_state: ident, $server_id: expr, $token: ident) => {{ + let flash_ctx = $op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap(); + ctx.response.get_mut(&$token).unwrap() + }}; +} + +#[repr(u32)] +pub enum Method { + GET = 0, + HEAD, + CONNECT, + PUT, + DELETE, + OPTIONS, + TRACE, + POST, + PATCH, +} + +#[inline] +fn get_method(req: &mut NextRequest) -> u32 { + let method = match req.inner.req.method.unwrap() { + "GET" => Method::GET, + "POST" => Method::POST, + "PUT" => Method::PUT, + "DELETE" => Method::DELETE, + "OPTIONS" => Method::OPTIONS, + "HEAD" => Method::HEAD, + "PATCH" => Method::PATCH, + "TRACE" => Method::TRACE, + "CONNECT" => Method::CONNECT, + _ => Method::GET, + }; + method as u32 +} + +#[op] +fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 { + let req = get_request!(state, server_id, token); + get_method(req) +} + +#[op] +async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) { + let close_tx = { + let mut op_state = state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + ctx.cancel_handle.cancel(); + ctx.close_tx.clone() + }; + let _ = close_tx.send(()).await; +} + +#[op] +fn op_flash_path( + state: Rc<RefCell<OpState>>, + server_id: u32, + token: u32, +) -> String { + let mut op_state = state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + ctx + .response + .get(&token) + .unwrap() + .inner + .req + .path + .unwrap() + .to_string() +} + +#[inline] +fn next_request_sync(ctx: &mut ServerContext) -> u32 { + let mut tokens = 0; + while let Ok(token) = ctx.rx.try_recv() { + ctx.response.insert(tokens, token); + tokens += 1; + } + tokens +} + +pub struct NextRequestFast; + +impl fast_api::FastFunction for NextRequestFast { + fn function(&self) -> *const c_void { + op_flash_next_fast as *const c_void + } + + fn args(&self) -> &'static [fast_api::Type] { + &[fast_api::Type::V8Value] + } + + fn return_type(&self) -> fast_api::CType { + fast_api::CType::Uint32 + } +} + +unsafe fn op_flash_next_fast(recv: v8::Local<v8::Object>) -> u32 { + let ptr = + recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); + let ctx = &mut *(ptr as *mut ServerContext); + next_request_sync(ctx) +} + +pub struct GetMethodFast; + +impl fast_api::FastFunction for GetMethodFast { + fn function(&self) -> *const c_void { + op_flash_get_method_fast as *const c_void + } + + fn args(&self) -> &'static [fast_api::Type] { + &[fast_api::Type::V8Value, fast_api::Type::Uint32] + } + + fn return_type(&self) -> fast_api::CType { + fast_api::CType::Uint32 + } +} + +unsafe fn op_flash_get_method_fast( + recv: v8::Local<v8::Object>, + token: u32, +) -> u32 { + let ptr = + recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); + let ctx = &mut *(ptr as *mut ServerContext); + let req = ctx.response.get_mut(&token).unwrap(); + get_method(req) +} + +// Fast calls +#[op(v8)] +fn op_flash_make_request<'scope>( + scope: &mut v8::HandleScope<'scope>, + state: &mut OpState, +) -> serde_v8::Value<'scope> { + let object_template = v8::ObjectTemplate::new(scope); + assert!(object_template + .set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize)); + let obj = object_template.new_instance(scope).unwrap(); + let ctx = { + let flash_ctx = state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&0).unwrap(); + ctx as *mut ServerContext + }; + obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _); + + // nextRequest + { + let builder = v8::FunctionTemplate::builder( + |_: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue| { + let external: v8::Local<v8::External> = + args.data().unwrap().try_into().unwrap(); + // SAFETY: This external is guaranteed to be a pointer to a ServerContext + let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; + rv.set_uint32(next_request_sync(ctx)); + }, + ) + .data(v8::External::new(scope, ctx as *mut _).into()); + + let func = builder.build_fast(scope, &NextRequestFast, None); + let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into(); + + let key = v8::String::new(scope, "nextRequest").unwrap(); + obj.set(scope, key.into(), func).unwrap(); + } + + // getMethod + { + let builder = v8::FunctionTemplate::builder( + |scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + mut rv: v8::ReturnValue| { + let external: v8::Local<v8::External> = + args.data().unwrap().try_into().unwrap(); + // SAFETY: This external is guaranteed to be a pointer to a ServerContext + let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; + let token = args.get(0).uint32_value(scope).unwrap(); + let req = ctx.response.get_mut(&token).unwrap(); + rv.set_uint32(get_method(req)); + }, + ) + .data(v8::External::new(scope, ctx as *mut _).into()); + + let func = builder.build_fast(scope, &GetMethodFast, None); + let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into(); + + let key = v8::String::new(scope, "getMethod").unwrap(); + obj.set(scope, key.into(), func).unwrap(); + } + + // respondChunked + { + let builder = v8::FunctionTemplate::builder( + |scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + _: v8::ReturnValue| { + let external: v8::Local<v8::External> = + args.data().unwrap().try_into().unwrap(); + // SAFETY: This external is guaranteed to be a pointer to a ServerContext + let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; + + let token = args.get(0).uint32_value(scope).unwrap(); + + let response: v8::Local<v8::ArrayBufferView> = + args.get(1).try_into().unwrap(); + let ab = response.buffer(scope).unwrap(); + let store = ab.get_backing_store(); + let (offset, len) = (response.byte_offset(), response.byte_length()); + // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>, + // it points to a fixed continuous slice of bytes on the heap. + // We assume it's initialized and thus safe to read (though may not contain meaningful data) + let response = unsafe { + &*(&store[offset..offset + len] as *const _ as *const [u8]) + }; + + let shutdown = args.get(2).boolean_value(scope); + + respond_chunked(ctx, token, shutdown, Some(response)); + }, + ) + .data(v8::External::new(scope, ctx as *mut _).into()); + + let func = builder.build_fast(scope, &RespondChunkedFast, None); + let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into(); + + let key = v8::String::new(scope, "respondChunked").unwrap(); + obj.set(scope, key.into(), func).unwrap(); + } + + // respond + { + let builder = v8::FunctionTemplate::builder( + |scope: &mut v8::HandleScope, + args: v8::FunctionCallbackArguments, + _: v8::ReturnValue| { + let external: v8::Local<v8::External> = + args.data().unwrap().try_into().unwrap(); + // SAFETY: This external is guaranteed to be a pointer to a ServerContext + let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; + + let token = args.get(0).uint32_value(scope).unwrap(); + + let response: v8::Local<v8::ArrayBufferView> = + args.get(1).try_into().unwrap(); + let ab = response.buffer(scope).unwrap(); + let store = ab.get_backing_store(); + let (offset, len) = (response.byte_offset(), response.byte_length()); + // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>, + // it points to a fixed continuous slice of bytes on the heap. + // We assume it's initialized and thus safe to read (though may not contain meaningful data) + let response = unsafe { + &*(&store[offset..offset + len] as *const _ as *const [u8]) + }; + + let shutdown = args.get(2).boolean_value(scope); + + flash_respond(ctx, token, shutdown, response); + }, + ) + .data(v8::External::new(scope, ctx as *mut _).into()); + + let func = builder.build_fast(scope, &RespondFast, None); + let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into(); + + let key = v8::String::new(scope, "respond").unwrap(); + obj.set(scope, key.into(), func).unwrap(); + } + + let value: v8::Local<v8::Value> = obj.into(); + value.into() +} + +#[inline] +fn has_body_stream(req: &NextRequest) -> bool { + let sock = req.socket(); + sock.read_rx.is_some() +} + +#[op] +fn op_flash_has_body_stream( + op_state: &mut OpState, + server_id: u32, + token: u32, +) -> bool { + let req = get_request!(op_state, server_id, token); + has_body_stream(req) +} + +#[op] +fn op_flash_headers( + state: Rc<RefCell<OpState>>, + server_id: u32, + token: u32, +) -> Result<Vec<(ByteString, ByteString)>, AnyError> { + let mut op_state = state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx + .servers + .get_mut(&server_id) + .ok_or_else(|| type_error("server closed"))?; + let inner_req = &ctx + .response + .get(&token) + .ok_or_else(|| type_error("request closed"))? + .inner + .req; + Ok( + inner_req + .headers + .iter() + .map(|h| (h.name.as_bytes().into(), h.value.into())) + .collect(), + ) +} + +// Remember the first packet we read? It probably also has some body data. This op quickly copies it into +// a buffer and sets up channels for streaming the rest. +#[op] +fn op_flash_first_packet( + op_state: &mut OpState, + server_id: u32, + token: u32, +) -> Result<Option<ZeroCopyBuf>, AnyError> { + let tx = get_request!(op_state, server_id, token); + let sock = tx.socket(); + + if !tx.te_chunked && tx.content_length.is_none() { + return Ok(None); + } + + if tx.expect_continue { + let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n"); + tx.expect_continue = false; + } + + let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len]; + // Oh there is nothing here. + if buffer.is_empty() { + return Ok(Some(ZeroCopyBuf::empty())); + } + + if tx.te_chunked { + let mut buf = vec![0; 1024]; + let mut offset = 0; + let mut decoder = chunked::Decoder::new( + std::io::Cursor::new(buffer), + tx.remaining_chunk_size, + ); + + loop { + match decoder.read(&mut buf[offset..]) { + Ok(n) => { + tx.remaining_chunk_size = decoder.remaining_chunks_size; + offset += n; + + if n == 0 { + tx.te_chunked = false; + buf.truncate(offset); + return Ok(Some(buf.into())); + } + + if offset < buf.len() + && decoder.source.position() < buffer.len() as u64 + { + continue; + } + + buf.truncate(offset); + return Ok(Some(buf.into())); + } + Err(e) => { + return Err(type_error(format!("{}", e))); + } + } + } + } + + tx.content_length + .ok_or_else(|| type_error("no content-length"))?; + tx.content_read += buffer.len(); + + Ok(Some(buffer.to_vec().into())) +} + +#[op] +async fn op_flash_read_body( + state: Rc<RefCell<OpState>>, + server_id: u32, + token: u32, + mut buf: ZeroCopyBuf, +) -> usize { + // SAFETY: we cannot hold op_state borrow across the await point. The JS caller + // is responsible for ensuring this is not called concurrently. + let ctx = unsafe { + { + let op_state = &mut state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext + } + .as_mut() + .unwrap() + }; + let tx = ctx.response.get_mut(&token).unwrap(); + + if tx.te_chunked { + let mut decoder = + chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size); + loop { + let sock = tx.socket(); + + let _lock = sock.read_lock.lock().unwrap(); + match decoder.read(&mut buf) { + Ok(n) => { + tx.remaining_chunk_size = decoder.remaining_chunks_size; + return n; + } + Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => { + panic!("chunked read error: {}", e); + } + Err(_) => { + drop(_lock); + sock.read_rx.as_mut().unwrap().recv().await.unwrap(); + } + } + } + } + + if let Some(content_length) = tx.content_length { + let sock = tx.socket(); + let l = sock.read_lock.clone(); + + loop { + let _lock = l.lock().unwrap(); + if tx.content_read >= content_length as usize { + return 0; + } + match sock.read(&mut buf) { + Ok(n) => { + tx.content_read += n; + return n; + } + _ => { + drop(_lock); + sock.read_rx.as_mut().unwrap().recv().await.unwrap(); + } + } + } + } + + 0 +} + +// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67 +#[inline] +fn from_digits(bytes: &[u8]) -> Option<u64> { + // cannot use FromStr for u64, since it allows a signed prefix + let mut result = 0u64; + const RADIX: u64 = 10; + if bytes.is_empty() { + return None; + } + for &b in bytes { + // can't use char::to_digit, since we haven't verified these bytes + // are utf-8. + match b { + b'0'..=b'9' => { + result = result.checked_mul(RADIX)?; + result = result.checked_add((b - b'0') as u64)?; + } + _ => { + return None; + } + } + } + Some(result) +} + +#[inline] +fn connection_has(value: &HeaderValue, needle: &str) -> bool { + if let Ok(s) = value.to_str() { + for val in s.split(',') { + if val.trim().eq_ignore_ascii_case(needle) { + return true; + } + } + } + false +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ListenOpts { + cert: Option<String>, + key: Option<String>, + hostname: String, + port: u16, + use_tls: bool, +} + +fn run_server( + tx: mpsc::Sender<NextRequest>, + listening_tx: mpsc::Sender<()>, + mut close_rx: mpsc::Receiver<()>, + addr: SocketAddr, + maybe_cert: Option<String>, + maybe_key: Option<String>, +) -> Result<(), AnyError> { + let mut listener = TcpListener::bind(addr)?; + let mut poll = Poll::new()?; + let token = Token(0); + poll + .registry() + .register(&mut listener, token, Interest::READABLE) + .unwrap(); + + let tls_context: Option<Arc<rustls::ServerConfig>> = { + if let Some(cert) = maybe_cert { + let key = maybe_key.unwrap(); + let certificate_chain: Vec<rustls::Certificate> = + load_certs(&mut BufReader::new(cert.as_bytes()))?; + let private_key = load_private_keys(key.as_bytes())?.remove(0); + + let config = rustls::ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(certificate_chain, private_key) + .expect("invalid key or certificate"); + Some(Arc::new(config)) + } else { + None + } + }; + + listening_tx.blocking_send(()).unwrap(); + let mut sockets = HashMap::with_capacity(1000); + let mut counter: usize = 1; + let mut events = Events::with_capacity(1024); + 'outer: loop { + let result = close_rx.try_recv(); + if result.is_ok() { + break 'outer; + } + // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms + // timeout here to handle close signal. + match poll.poll(&mut events, Some(Duration::from_millis(100))) { + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, + Err(e) => panic!("{}", e), + Ok(()) => (), + } + 'events: for event in &events { + if close_rx.try_recv().is_ok() { + break 'outer; + } + let token = event.token(); + match token { + Token(0) => loop { + match listener.accept() { + Ok((mut socket, _)) => { + counter += 1; + let token = Token(counter); + poll + .registry() + .register(&mut socket, token, Interest::READABLE) + .unwrap(); + let socket = match tls_context { + Some(ref tls_conf) => { + let connection = + rustls::ServerConnection::new(tls_conf.clone()).unwrap(); + InnerStream::Tls(Box::new(rustls::StreamOwned::new( + connection, socket, + ))) + } + None => InnerStream::Tcp(socket), + }; + let stream = Box::pin(Stream { + inner: socket, + detached: false, + read_rx: None, + read_tx: None, + read_lock: Arc::new(Mutex::new(())), + parse_done: ParseStatus::None, + buffer: UnsafeCell::new(vec![0_u8; 1024]), + _pin: PhantomPinned, + }); + + trace!("New connection: {}", token.0); + sockets.insert(token, stream); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(_) => break, + } + }, + token => { + let socket = sockets.get_mut(&token).unwrap(); + // SAFETY: guarantee that we will never move the data out of the mutable reference. + let socket = unsafe { + let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket); + Pin::get_unchecked_mut(mut_ref) + }; + let sock_ptr = socket as *mut _; + + if socket.detached { + match &mut socket.inner { + InnerStream::Tcp(ref mut socket) => { + poll.registry().deregister(socket).unwrap(); + } + InnerStream::Tls(_) => { + todo!("upgrade tls not implemented"); + } + } + + let boxed = sockets.remove(&token).unwrap(); + std::mem::forget(boxed); + trace!("Socket detached: {}", token.0); + continue; + } + + debug_assert!(event.is_readable()); + + trace!("Socket readable: {}", token.0); + if let Some(tx) = &socket.read_tx { + { + let _l = socket.read_lock.lock().unwrap(); + } + trace!("Sending readiness notification: {}", token.0); + let _ = tx.blocking_send(()); + + continue; + } + + let mut headers = vec![httparse::EMPTY_HEADER; 40]; + let mut req = httparse::Request::new(&mut headers); + let body_offset; + let body_len; + loop { + // SAFETY: It is safe for the read buf to be mutable here. + let buffer = unsafe { &mut *socket.buffer.get() }; + let offset = match socket.parse_done { + ParseStatus::None => 0, + ParseStatus::Ongoing(offset) => offset, + }; + if offset >= buffer.len() { + buffer.resize(offset * 2, 0); + } + let nread = socket.read(&mut buffer[offset..]); + + match nread { + Ok(0) => { + trace!("Socket closed: {}", token.0); + // FIXME: don't remove while JS is writing! + // sockets.remove(&token); + continue 'events; + } + Ok(read) => match req.parse(&buffer[..offset + read]) { + Ok(httparse::Status::Complete(n)) => { + body_offset = n; + body_len = offset + read; + socket.parse_done = ParseStatus::None; + break; + } + Ok(httparse::Status::Partial) => { + socket.parse_done = ParseStatus::Ongoing(offset + read); + continue; + } + Err(_) => { + let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); + continue 'events; + } + }, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + break 'events + } + Err(_) => break 'events, + } + } + + debug_assert_eq!(socket.parse_done, ParseStatus::None); + if let Some(method) = &req.method { + if method == &"POST" || method == &"PUT" { + let (tx, rx) = mpsc::channel(100); + socket.read_tx = Some(tx); + socket.read_rx = Some(rx); + } + } + + // SAFETY: It is safe for the read buf to be mutable here. + let buffer = unsafe { &mut *socket.buffer.get() }; + let inner_req = InnerRequest { + // SAFETY: backing buffer is pinned and lives as long as the request. + req: unsafe { transmute::<httparse::Request<'_, '_>, _>(req) }, + // SAFETY: backing buffer is pinned and lives as long as the request. + _headers: unsafe { + transmute::<Vec<httparse::Header<'_>>, _>(headers) + }, + buffer: Pin::new( + replace(buffer, vec![0_u8; 1024]).into_boxed_slice(), + ), + body_offset, + body_len, + }; + // h1 + // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177 + // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127 + let mut keep_alive = inner_req.req.version.unwrap() == 1; + let mut upgrade = false; + let mut expect_continue = false; + let mut te = false; + let mut te_chunked = false; + let mut content_length = None; + for header in inner_req.req.headers.iter() { + match HeaderName::from_bytes(header.name.as_bytes()) { + Ok(CONNECTION) => { + // SAFETY: illegal bytes are validated by httparse. + let value = unsafe { + HeaderValue::from_maybe_shared_unchecked(header.value) + }; + if keep_alive { + // 1.1 + keep_alive = !connection_has(&value, "close"); + } else { + // 1.0 + keep_alive = connection_has(&value, "keep-alive"); + } + } + Ok(UPGRADE) => { + upgrade = inner_req.req.version.unwrap() == 1; + } + Ok(TRANSFER_ENCODING) => { + // https://tools.ietf.org/html/rfc7230#section-3.3.3 + debug_assert!(inner_req.req.version.unwrap() == 1); + // Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it. + te = true; + content_length = None; + // SAFETY: illegal bytes are validated by httparse. + let value = unsafe { + HeaderValue::from_maybe_shared_unchecked(header.value) + }; + if let Ok(Some(encoding)) = + value.to_str().map(|s| s.rsplit(',').next()) + { + // Chunked must always be the last encoding + if encoding.trim().eq_ignore_ascii_case("chunked") { + te_chunked = true; + } + } + } + // Transfer-Encoding overrides the Content-Length. + Ok(CONTENT_LENGTH) if !te => { + if let Some(len) = from_digits(header.value) { + if let Some(prev) = content_length { + if prev != len { + let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); + continue 'events; + } + continue; + } + content_length = Some(len); + } else { + let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); + continue 'events; + } + } + Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => { + expect_continue = + header.value.eq_ignore_ascii_case(b"100-continue"); + } + _ => {} + } + } + + // There is Transfer-Encoding but its not chunked. + if te && !te_chunked { + let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); + continue 'events; + } + + tx.blocking_send(NextRequest { + socket: sock_ptr, + // SAFETY: headers backing buffer outlives the mio event loop ('static) + inner: inner_req, + keep_alive, + upgrade, + te_chunked, + remaining_chunk_size: None, + content_read: 0, + content_length, + expect_continue, + }) + .ok(); + } + } + } + } + + Ok(()) +} + +#[op] +fn op_flash_serve<P>( + state: &mut OpState, + opts: ListenOpts, +) -> Result<u32, AnyError> +where + P: FlashPermissions + 'static, +{ + if opts.use_tls { + check_unstable(state, "Deno.serveTls"); + } else { + check_unstable(state, "Deno.serve"); + } + state + .borrow_mut::<P>() + .check_net(&(&opts.hostname, Some(opts.port)))?; + let addr = SocketAddr::new(opts.hostname.parse()?, opts.port); + let (tx, rx) = mpsc::channel(100); + let (close_tx, close_rx) = mpsc::channel(1); + let (listening_tx, listening_rx) = mpsc::channel(1); + let ctx = ServerContext { + _addr: addr, + tx, + rx, + response: HashMap::with_capacity(1000), + close_tx, + listening_rx: Some(listening_rx), + cancel_handle: CancelHandle::new_rc(), + }; + let tx = ctx.tx.clone(); + let maybe_cert = opts.cert; + let maybe_key = opts.key; + let join_handle = tokio::task::spawn_blocking(move || { + run_server(tx, listening_tx, close_rx, addr, maybe_cert, maybe_key) + }); + let flash_ctx = state.borrow_mut::<FlashContext>(); + let server_id = flash_ctx.next_server_id; + flash_ctx.next_server_id += 1; + flash_ctx.join_handles.insert(server_id, join_handle); + flash_ctx.servers.insert(server_id, ctx); + Ok(server_id) +} + +#[op] +fn op_flash_wait_for_listening( + state: &mut OpState, + server_id: u32, +) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> { + let mut listening_rx = { + let flash_ctx = state.borrow_mut::<FlashContext>(); + let server_ctx = flash_ctx + .servers + .get_mut(&server_id) + .ok_or_else(|| type_error("server not found"))?; + server_ctx.listening_rx.take().unwrap() + }; + Ok(async move { + listening_rx.recv().await; + Ok(()) + }) +} + +#[op] +fn op_flash_drive_server( + state: &mut OpState, + server_id: u32, +) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> { + let join_handle = { + let flash_ctx = state.borrow_mut::<FlashContext>(); + flash_ctx + .join_handles + .remove(&server_id) + .ok_or_else(|| type_error("server not found"))? + }; + Ok(async move { + join_handle + .await + .map_err(|_| type_error("server join error"))??; + Ok(()) + }) +} + +// Asychronous version of op_flash_next. This can be a bottleneck under +// heavy load, it should be used as a fallback if there are no buffered +// requests i.e `op_flash_next() == 0`. +#[op] +async fn op_flash_next_async( + op_state: Rc<RefCell<OpState>>, + server_id: u32, +) -> u32 { + let ctx = { + let mut op_state = op_state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + ctx as *mut ServerContext + }; + // SAFETY: we cannot hold op_state borrow across the await point. The JS caller + // is responsible for ensuring this is not called concurrently. + let ctx = unsafe { &mut *ctx }; + let cancel_handle = &ctx.cancel_handle; + let mut tokens = 0; + while let Ok(token) = ctx.rx.try_recv() { + ctx.response.insert(tokens, token); + tokens += 1; + } + if tokens == 0 { + if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { + ctx.response.insert(tokens, req); + tokens += 1; + } + } + tokens +} + +// Syncrhonous version of op_flash_next_async. Under heavy load, +// this can collect buffered requests from rx channel and return tokens in a single batch. +// +// perf: please do not add any arguments to this op. With optimizations enabled, +// the ContextScope creation is optimized away and the op is as simple as: +// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); } +#[op] +fn op_flash_next(op_state: &mut OpState) -> u32 { + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&0).unwrap(); + next_request_sync(ctx) +} + +// Syncrhonous version of op_flash_next_async. Under heavy load, +// this can collect buffered requests from rx channel and return tokens in a single batch. +#[op] +fn op_flash_next_server(op_state: &mut OpState, server_id: u32) -> u32 { + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); + next_request_sync(ctx) +} + +// Wrapper type for tokio::net::TcpStream that implements +// deno_websocket::UpgradedStream +struct UpgradedStream(tokio::net::TcpStream); +impl tokio::io::AsyncRead for UpgradedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll<std::result::Result<(), std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for UpgradedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> std::task::Poll<Result<(), std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> std::task::Poll<Result<(), std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_shutdown(cx) + } +} + +impl deno_websocket::Upgraded for UpgradedStream {} + +#[inline] +pub fn detach_socket( + ctx: &mut ServerContext, + token: u32, +) -> Result<tokio::net::TcpStream, AnyError> { + // Two main 'hacks' to get this working: + // * make server thread forget about the socket. `detach_ownership` prevents the socket from being + // dropped on the server thread. + // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we + // use raw fds. + let tx = ctx + .response + .remove(&token) + .ok_or_else(|| type_error("request closed"))?; + let stream = tx.socket(); + // prevent socket from being dropped on server thread. + // TODO(@littledivy): Box-ify, since there is no overhead. + stream.detach_ownership(); + + #[cfg(unix)] + let std_stream = { + use std::os::unix::prelude::AsRawFd; + use std::os::unix::prelude::FromRawFd; + let fd = match stream.inner { + InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(), + _ => todo!(), + }; + // SAFETY: `fd` is a valid file descriptor. + unsafe { std::net::TcpStream::from_raw_fd(fd) } + }; + #[cfg(windows)] + let std_stream = { + use std::os::windows::prelude::AsRawSocket; + use std::os::windows::prelude::FromRawSocket; + let fd = match stream.inner { + InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(), + _ => todo!(), + }; + // SAFETY: `fd` is a valid file descriptor. + unsafe { std::net::TcpStream::from_raw_socket(fd) } + }; + let stream = tokio::net::TcpStream::from_std(std_stream)?; + Ok(stream) +} + +#[op] +async fn op_flash_upgrade_websocket( + state: Rc<RefCell<OpState>>, + server_id: u32, + token: u32, +) -> Result<deno_core::ResourceId, AnyError> { + let stream = { + let op_state = &mut state.borrow_mut(); + let flash_ctx = op_state.borrow_mut::<FlashContext>(); + detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)? + }; + deno_websocket::ws_create_server_stream( + &state, + Box::pin(UpgradedStream(stream)), + ) + .await +} + +pub struct Unstable(pub bool); + +fn check_unstable(state: &OpState, api_name: &str) { + let unstable = state.borrow::<Unstable>(); + + if !unstable.0 { + eprintln!( + "Unstable API '{}'. The --unstable flag must be provided.", + api_name + ); + std::process::exit(70); + } +} + +pub trait FlashPermissions { + fn check_net<T: AsRef<str>>( + &mut self, + _host: &(T, Option<u16>), + ) -> Result<(), AnyError>; +} + +pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension { + Extension::builder() + .js(deno_core::include_js_files!( + prefix "deno:ext/flash", + "01_http.js", + )) + .ops(vec![ + op_flash_serve::decl::<P>(), + op_flash_respond::decl(), + op_flash_respond_chuncked::decl(), + op_flash_method::decl(), + op_flash_path::decl(), + op_flash_headers::decl(), + op_flash_next::decl(), + op_flash_next_server::decl(), + op_flash_next_async::decl(), + op_flash_read_body::decl(), + op_flash_upgrade_websocket::decl(), + op_flash_drive_server::decl(), + op_flash_wait_for_listening::decl(), + op_flash_first_packet::decl(), + op_flash_has_body_stream::decl(), + op_flash_close_server::decl(), + op_flash_make_request::decl(), + op_flash_write_resource::decl(), + ]) + .state(move |op_state| { + op_state.put(Unstable(unstable)); + op_state.put(FlashContext { + next_server_id: 0, + join_handles: HashMap::default(), + servers: HashMap::default(), + }); + Ok(()) + }) + .build() +} diff --git a/ext/flash/sendfile.rs b/ext/flash/sendfile.rs new file mode 100644 index 000000000..4efa7bc35 --- /dev/null +++ b/ext/flash/sendfile.rs @@ -0,0 +1,81 @@ +// Forked from https://github.com/Thomasdezeeuw/sendfile/blob/024f82cd4dede9048392a5bd6d8afcd4d5aa83d5/src/lib.rs +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::future::Future; +use std::io; +use std::os::unix::io::RawFd; +use std::pin::Pin; +use std::task::{self, Poll}; + +pub struct SendFile { + pub io: (RawFd, RawFd), + pub written: usize, +} + +impl SendFile { + #[inline] + pub fn try_send(&mut self) -> Result<usize, std::io::Error> { + #[cfg(target_os = "linux")] + { + // This is the maximum the Linux kernel will write in a single call. + let count = 0x7ffff000; + let mut offset = self.written as libc::off_t; + + // SAFETY: call to libc::sendfile() + let res = + unsafe { libc::sendfile(self.io.1, self.io.0, &mut offset, count) }; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + self.written = offset as usize; + Ok(res as usize) + } + } + #[cfg(target_os = "macos")] + { + // Send all bytes. + let mut length = 0; + // On macOS `length` is value-result parameter. It determines the number + // of bytes to write and returns the number of bytes written also in + // case of `EAGAIN` errors. + // SAFETY: call to libc::sendfile() + let res = unsafe { + libc::sendfile( + self.io.0, + self.io.1, + self.written as libc::off_t, + &mut length, + std::ptr::null_mut(), + 0, + ) + }; + self.written += length as usize; + if res == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(length as usize) + } + } + } +} + +impl Future for SendFile { + type Output = Result<(), std::io::Error>; + + fn poll( + mut self: Pin<&mut Self>, + _: &mut task::Context<'_>, + ) -> Poll<Self::Output> { + loop { + match self.try_send() { + Ok(0) => break Poll::Ready(Ok(())), + Ok(_) => continue, + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + break Poll::Pending + } + Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, // Try again. + Err(err) => break Poll::Ready(Err(err)), + } + } + } +} diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 7dfe86a75..6df26d09f 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -13,6 +13,7 @@ newInnerRequest, newInnerResponse, fromInnerResponse, + _flash, } = window.__bootstrap.fetch; const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; @@ -475,6 +476,20 @@ } function upgradeHttp(req) { + if (req[_flash]) { + // NOTE(bartlomieju): + // Access these fields so they are cached on `req` object, otherwise + // they wouldn't be available after the connection gets upgraded. + req.url; + req.method; + req.headers; + + const { serverId, streamRid } = req[_flash]; + const connRid = core.ops.op_flash_upgrade_http(streamRid, serverId); + // TODO(@littledivy): return already read first packet too. + return [new TcpConn(connRid), new Uint8Array()]; + } + req[_deferred] = new Deferred(); return req[_deferred].promise; } @@ -483,5 +498,6 @@ HttpConn, upgradeWebSocket, upgradeHttp, + _ws, }; })(this); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 27d277654..d1b38fb42 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -874,6 +874,41 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> { Ok(base64::encode(digest)) } +struct UpgradedStream(hyper::upgrade::Upgraded); +impl tokio::io::AsyncRead for UpgradedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio::io::ReadBuf, + ) -> std::task::Poll<std::result::Result<(), std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for UpgradedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> std::task::Poll<Result<(), std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> std::task::Poll<Result<(), std::io::Error>> { + Pin::new(&mut self.get_mut().0).poll_shutdown(cx) + } +} + +impl deno_websocket::Upgraded for UpgradedStream {} + #[op] async fn op_http_upgrade_websocket( state: Rc<RefCell<OpState>>, @@ -893,7 +928,9 @@ async fn op_http_upgrade_websocket( }; let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; + let ws_rid = + ws_create_server_stream(&state, Box::pin(UpgradedStream(transport))) + .await?; Ok(ws_rid) } diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 7f67e81ed..1b753572b 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -5897,6 +5897,7 @@ window.__bootstrap.streams = { // Non-Public + _state, isReadableStreamDisturbed, errorReadableStream, createProxy, diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 0e642be3f..515f798ac 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -34,8 +34,11 @@ use std::cell::RefCell; use std::convert::TryFrom; use std::fmt; use std::path::PathBuf; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::rustls::ServerName; @@ -67,23 +70,25 @@ pub trait WebSocketPermissions { /// would override previously used alias. pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>); -type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; +type ClientWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; +type ServerWsStream = WebSocketStream<Pin<Box<dyn Upgraded>>>; + pub enum WebSocketStreamType { Client { - tx: AsyncRefCell<SplitSink<WsStream, Message>>, - rx: AsyncRefCell<SplitStream<WsStream>>, + tx: AsyncRefCell<SplitSink<ClientWsStream, Message>>, + rx: AsyncRefCell<SplitStream<ClientWsStream>>, }, Server { - tx: AsyncRefCell< - SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>, - >, - rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>, + tx: AsyncRefCell<SplitSink<ServerWsStream, Message>>, + rx: AsyncRefCell<SplitStream<ServerWsStream>>, }, } +pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {} + pub async fn ws_create_server_stream( state: &Rc<RefCell<OpState>>, - transport: hyper::upgrade::Upgraded, + transport: Pin<Box<dyn Upgraded>>, ) -> Result<ResourceId, AnyError> { let ws_stream = WebSocketStream::from_raw_socket( transport, @@ -340,7 +345,7 @@ where ..Default::default() }), ); - let (stream, response): (WsStream, Response) = + let (stream, response): (ClientWsStream, Response) = if let Some(cancel_resource) = cancel_resource { client.or_cancel(cancel_resource.0.to_owned()).await? } else { |