diff options
Diffstat (limited to 'extensions/http')
-rw-r--r-- | extensions/http/01_http.js | 374 | ||||
-rw-r--r-- | extensions/http/Cargo.toml | 25 | ||||
-rw-r--r-- | extensions/http/README.md | 4 | ||||
-rw-r--r-- | extensions/http/lib.deno_http.unstable.d.ts | 65 | ||||
-rw-r--r-- | extensions/http/lib.rs | 655 |
5 files changed, 1123 insertions, 0 deletions
diff --git a/extensions/http/01_http.js b/extensions/http/01_http.js new file mode 100644 index 000000000..4bcdf1f07 --- /dev/null +++ b/extensions/http/01_http.js @@ -0,0 +1,374 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const webidl = window.__bootstrap.webidl; + const { InnerBody } = window.__bootstrap.fetchBody; + const { setEventTargetData } = window.__bootstrap.eventTarget; + const { + Response, + fromInnerRequest, + toInnerResponse, + newInnerRequest, + newInnerResponse, + fromInnerResponse, + } = window.__bootstrap.fetch; + const core = window.Deno.core; + const { BadResource, Interrupted } = core; + const { ReadableStream } = window.__bootstrap.streams; + const abortSignal = window.__bootstrap.abortSignal; + const { WebSocket, _rid, _readyState, _eventLoop, _protocol } = + window.__bootstrap.webSocket; + const { + ArrayPrototypeIncludes, + ArrayPrototypePush, + Promise, + StringPrototypeIncludes, + StringPrototypeSplit, + Symbol, + SymbolAsyncIterator, + TypedArrayPrototypeSubarray, + TypeError, + Uint8Array, + } = window.__bootstrap.primordials; + + const connErrorSymbol = Symbol("connError"); + + class HttpConn { + #rid = 0; + + constructor(rid) { + this.#rid = rid; + } + + /** @returns {number} */ + get rid() { + return this.#rid; + } + + /** @returns {Promise<ResponseEvent | null>} */ + async nextRequest() { + let nextRequest; + try { + nextRequest = await core.opAsync( + "op_http_request_next", + this.#rid, + ); + } catch (error) { + // A connection error seen here would cause disrupted responses to throw + // a generic `BadResource` error. Instead store this error and replace + // those with it. + this[connErrorSymbol] = error; + if (error instanceof BadResource) { + return null; + } else if (error instanceof Interrupted) { + return null; + } else if ( + StringPrototypeIncludes(error.message, "connection closed") + ) { + return null; + } + throw error; + } + if (nextRequest === null) return null; + + const [ + requestRid, + responseSenderRid, + method, + headersList, + url, + ] = nextRequest; + + /** @type {ReadableStream<Uint8Array> | undefined} */ + let body = null; + if (typeof requestRid === "number") { + body = createRequestBodyStream(requestRid); + } + + const innerRequest = newInnerRequest( + method, + url, + headersList, + body !== null ? new InnerBody(body) : null, + ); + const signal = abortSignal.newSignal(); + const request = fromInnerRequest(innerRequest, signal, "immutable"); + + const respondWith = createRespondWith( + this, + responseSenderRid, + requestRid, + ); + + return { request, respondWith }; + } + + /** @returns {void} */ + close() { + core.close(this.#rid); + } + + [SymbolAsyncIterator]() { + // deno-lint-ignore no-this-alias + const httpConn = this; + return { + async next() { + const reqEvt = await httpConn.nextRequest(); + // Change with caution, current form avoids a v8 deopt + return { value: reqEvt, done: reqEvt === null }; + }, + }; + } + } + + function readRequest(requestRid, zeroCopyBuf) { + return core.opAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); + } + + function createRespondWith(httpConn, responseSenderRid, requestRid) { + return async function respondWith(resp) { + if (resp instanceof Promise) { + resp = await resp; + } + + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } + + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } + } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + } + } else { + respBody = new Uint8Array(0); + } + + let responseBodyRid; + try { + responseBodyRid = await core.opAsync("op_http_response", [ + responseSenderRid, + innerResp.status ?? 200, + innerResp.headerList, + ], respBody instanceof Uint8Array ? respBody : null); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; + } + + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (responseBodyRid !== null) { + try { + if (respBody === null || !(respBody instanceof ReadableStream)) { + throw new TypeError("Unreachable"); + } + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!(value instanceof Uint8Array)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; + } + try { + await core.opAsync( + "op_http_response_write", + responseBodyRid, + value, + ); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } + } finally { + // Once all chunks are sent, and the request body is closed, we can + // close the response body. + try { + await core.opAsync("op_http_response_close", responseBodyRid); + } catch { /* pass */ } + } + } + + const ws = resp[_ws]; + if (ws) { + if (typeof requestRid !== "number") { + throw new TypeError( + "This request can not be upgraded to a websocket connection.", + ); + } + + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + requestRid, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); + + ws[_readyState] = WebSocket.CLOSED; + + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + ws.dispatchEvent(event); + + try { + core.close(wsRid); + } catch (err) { + // Ignore error if the socket has already been closed. + if (!(err instanceof Deno.errors.BadResource)) throw err; + } + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + } + } + }; + } + + function createRequestBodyStream(requestRid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + try { + // This is the largest possible size for a single packet on a TLS + // stream. + const chunk = new Uint8Array(16 * 1024 + 256); + const read = await readRequest( + requestRid, + chunk, + ); + if (read > 0) { + // We read some data. Enqueue it onto the stream. + controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); + } else { + // We have reached the end of the body, so we close the stream. + controller.close(); + core.close(requestRid); + } + } catch (err) { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + controller.close(); + core.close(requestRid); + } + }, + cancel() { + core.close(requestRid); + }, + }); + } + + const _ws = Symbol("[[associated_ws]]"); + + function upgradeWebSocket(request, options = {}) { + if (request.headers.get("upgrade") !== "websocket") { + throw new TypeError( + "Invalid Header: 'upgrade' header must be 'websocket'", + ); + } + + if (request.headers.get("connection") !== "Upgrade") { + throw new TypeError( + "Invalid Header: 'connection' header must be 'Upgrade'", + ); + } + + const websocketKey = request.headers.get("sec-websocket-key"); + if (websocketKey === null) { + throw new TypeError( + "Invalid Header: 'sec-websocket-key' header must be set", + ); + } + + const accept = core.opSync("op_http_websocket_accept_header", websocketKey); + + const r = newInnerResponse(101); + r.headerList = [ + ["upgrade", "websocket"], + ["connection", "Upgrade"], + ["sec-websocket-accept", accept], + ]; + + const protocolsStr = request.headers.get("sec-websocket-protocol") || ""; + const protocols = StringPrototypeSplit(protocolsStr, ", "); + if (protocols && options.protocol) { + if (ArrayPrototypeIncludes(protocols, options.protocol)) { + ArrayPrototypePush(r.headerList, [ + "sec-websocket-protocol", + options.protocol, + ]); + } else { + throw new TypeError( + `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`, + ); + } + } + + const response = fromInnerResponse(r, "immutable"); + + const websocket = webidl.createBranded(WebSocket); + setEventTargetData(websocket); + response[_ws] = websocket; + + return { response, websocket }; + } + + window.__bootstrap.http = { + HttpConn, + upgradeWebSocket, + }; +})(this); diff --git a/extensions/http/Cargo.toml b/extensions/http/Cargo.toml new file mode 100644 index 000000000..8909301a6 --- /dev/null +++ b/extensions/http/Cargo.toml @@ -0,0 +1,25 @@ +# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_http" +version = "0.1.0" +edition = "2018" +description = "HTTP server implementation for Deno" +authors = ["the Deno authors"] +license = "MIT" +readme = "README.md" +repository = "https://github.com/denoland/deno" + +[lib] +path = "lib.rs" + +[dependencies] +base64 = "0.13.0" +bytes = "1" +deno_core = { version = "0.92.0", path = "../../core" } +deno_websocket = { version = "0.15.1", path = "../websocket" } +hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] } +ring = "0.16.20" +serde = { version = "1.0.125", features = ["derive"] } +tokio = { version = "1.8.0", features = ["full"] } +tokio-util = "0.6.7" diff --git a/extensions/http/README.md b/extensions/http/README.md new file mode 100644 index 000000000..ab557017a --- /dev/null +++ b/extensions/http/README.md @@ -0,0 +1,4 @@ +# deno_http + +This crate implements server-side HTTP based on primitives from the +[Fetch API](https://fetch.spec.whatwg.org/). diff --git a/extensions/http/lib.deno_http.unstable.d.ts b/extensions/http/lib.deno_http.unstable.d.ts new file mode 100644 index 000000000..30ffe121e --- /dev/null +++ b/extensions/http/lib.deno_http.unstable.d.ts @@ -0,0 +1,65 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +/// <reference no-default-lib="true" /> +/// <reference lib="esnext" /> + +declare namespace Deno { + export interface RequestEvent { + readonly request: Request; + respondWith(r: Response | Promise<Response>): Promise<void>; + } + + export interface HttpConn extends AsyncIterable<RequestEvent> { + readonly rid: number; + + nextRequest(): Promise<RequestEvent | null>; + close(): void; + } + + export interface WebSocketUpgrade { + response: Response; + websocket: WebSocket; + } + + export interface UpgradeWebSocketOptions { + protocol?: string; + } + + /** **UNSTABLE**: new API, yet to be vetted. + * + * Used to upgrade an incoming HTTP request to a WebSocket. + * + * Given a request, returns a pair of WebSocket and Response. The original + * request must be responded to with the returned response for the websocket + * upgrade to be successful. + * + * ```ts + * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); + * const httpConn = Deno.serveHttp(conn); + * const e = await httpConn.nextRequest(); + * if (e) { + * const { websocket, response } = Deno.upgradeWebSocket(e.request); + * websocket.onopen = () => { + * websocket.send("Hello World!"); + * }; + * websocket.onmessage = (e) => { + * console.log(e.data); + * websocket.close(); + * }; + * websocket.onclose = () => console.log("WebSocket has been closed."); + * websocket.onerror = (e) => console.error("WebSocket error:", e.message); + * e.respondWith(response); + * } + * ``` + * + * If the request body is disturbed (read from) before the upgrade is + * completed, upgrading fails. + * + * This operation does not yet consume the request or open the websocket. This + * only happens once the returned response has been passed to `respondWith`. + */ + export function upgradeWebSocket( + request: Request, + options?: UpgradeWebSocketOptions, + ): WebSocketUpgrade; +} diff --git a/extensions/http/lib.rs b/extensions/http/lib.rs new file mode 100644 index 000000000..a8d92ab46 --- /dev/null +++ b/extensions/http/lib.rs @@ -0,0 +1,655 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::bad_resource_id; +use deno_core::error::null_opbuf; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::FutureExt; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::include_js_files; +use deno_core::op_async; +use deno_core::op_sync; +use deno_core::AsyncRefCell; +use deno_core::ByteString; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use hyper::body::HttpBody; +use hyper::http; +use hyper::server::conn::Http; +use hyper::service::Service as HyperService; +use hyper::Body; +use hyper::Request; +use hyper::Response; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::sync::oneshot; +use tokio_util::io::StreamReader; + +pub fn get_unstable_declaration() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_http.unstable.d.ts") +} + +pub fn init() -> Extension { + Extension::builder() + .js(include_js_files!( + prefix "deno:extensions/http", + "01_http.js", + )) + .ops(vec![ + ("op_http_request_next", op_async(op_http_request_next)), + ("op_http_request_read", op_async(op_http_request_read)), + ("op_http_response", op_async(op_http_response)), + ("op_http_response_write", op_async(op_http_response_write)), + ("op_http_response_close", op_async(op_http_response_close)), + ( + "op_http_websocket_accept_header", + op_sync(op_http_websocket_accept_header), + ), + ( + "op_http_upgrade_websocket", + op_async(op_http_upgrade_websocket), + ), + ]) + .build() +} + +struct ServiceInner { + request: Request<Body>, + response_tx: oneshot::Sender<Response<Body>>, +} + +#[derive(Clone, Default)] +struct Service { + inner: Rc<RefCell<Option<ServiceInner>>>, + waker: Rc<deno_core::futures::task::AtomicWaker>, +} + +impl HyperService<Request<Body>> for Service { + type Response = Response<Body>; + type Error = http::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; + + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll<Result<(), Self::Error>> { + if self.inner.borrow().is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } + } + + fn call(&mut self, req: Request<Body>) -> Self::Future { + let (resp_tx, resp_rx) = oneshot::channel(); + self.inner.borrow_mut().replace(ServiceInner { + request: req, + response_tx: resp_tx, + }); + + async move { Ok(resp_rx.await.unwrap()) }.boxed_local() + } +} + +type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; + +struct Conn { + scheme: &'static str, + addr: SocketAddr, + conn: Rc<RefCell<ConnFuture>>, +} + +struct ConnResource { + hyper_connection: Conn, + deno_service: Service, + cancel: CancelHandle, +} + +impl ConnResource { + // TODO(ry) impl Future for ConnResource? + fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { + self + .hyper_connection + .conn + .borrow_mut() + .poll_unpin(cx) + .map_err(AnyError::from) + } +} + +impl Resource for ConnResource { + fn name(&self) -> Cow<str> { + "httpConnection".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct NextRequestResponse( + // request_rid: + Option<ResourceId>, + // response_sender_rid: + ResourceId, + // method: + // This is a String rather than a ByteString because reqwest will only return + // the method as a str which is guaranteed to be ASCII-only. + String, + // headers: + Vec<(ByteString, ByteString)>, + // url: + String, +); + +async fn op_http_request_next( + state: Rc<RefCell<OpState>>, + conn_rid: ResourceId, + _: (), +) -> Result<Option<NextRequestResponse>, AnyError> { + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(conn_rid) + .ok_or_else(bad_resource_id)?; + + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + + poll_fn(|cx| { + conn_resource.deno_service.waker.register(cx.waker()); + let connection_closed = match conn_resource.poll(cx) { + Poll::Pending => false, + Poll::Ready(Ok(())) => { + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state + .borrow_mut() + .resource_table + .take::<ConnResource>(conn_rid); + true + } + Poll::Ready(Err(e)) => { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // close ConnResource + state + .borrow_mut() + .resource_table + .take::<ConnResource>(conn_rid) + .unwrap(); + + if should_ignore_error(&e) { + true + } else { + return Poll::Ready(Err(e)); + } + } + }; + if let Some(request_resource) = + conn_resource.deno_service.inner.borrow_mut().take() + { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + + let mut headers = Vec::with_capacity(req.headers().len()); + for (name, value) in req.headers().iter() { + let name: &[u8] = name.as_ref(); + let value = value.as_bytes(); + headers + .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); + } + + let url = { + let scheme = &conn_resource.hyper_connection.scheme; + let host: Cow<str> = if let Some(host) = req.uri().host() { + Cow::Borrowed(host) + } else if let Some(host) = req.headers().get("HOST") { + Cow::Borrowed(host.to_str()?) + } else { + Cow::Owned(conn_resource.hyper_connection.addr.to_string()) + }; + let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); + format!("{}://{}{}", scheme, host, path) + }; + + let is_websocket_request = req + .headers() + .get(hyper::header::CONNECTION) + .and_then(|v| { + v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s)) + }) + .unwrap_or(false) + && req + .headers() + .get(hyper::header::UPGRADE) + .and_then(|v| { + v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s)) + }) + .unwrap_or(false); + + let has_body = if let Some(exact_size) = req.size_hint().exact() { + exact_size > 0 + } else { + true + }; + + let maybe_request_rid = if is_websocket_request || has_body { + let mut state = state.borrow_mut(); + let request_rid = state.resource_table.add(RequestResource { + conn_rid, + inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), + cancel: CancelHandle::default(), + }); + Some(request_rid) + } else { + None + }; + + let mut state = state.borrow_mut(); + let response_sender_rid = + state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); + + Poll::Ready(Ok(Some(NextRequestResponse( + maybe_request_rid, + response_sender_rid, + method, + headers, + url, + )))) + } else if connection_closed { + Poll::Ready(Ok(None)) + } else { + Poll::Pending + } + }) + .try_or_cancel(cancel) + .await + .map_err(AnyError::from) +} + +fn should_ignore_error(e: &AnyError) -> bool { + if let Some(e) = e.downcast_ref::<hyper::Error>() { + use std::error::Error; + if let Some(std_err) = e.source() { + if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return true; + } + } + } + } + false +} + +pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>( + state: &mut OpState, + io: IO, + addr: SocketAddr, + scheme: &'static str, +) -> Result<ResourceId, AnyError> { + let deno_service = Service::default(); + + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(io, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); + let conn_resource = ConnResource { + hyper_connection: Conn { + scheme, + addr, + conn: Rc::new(RefCell::new(conn)), + }, + deno_service, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + Ok(rid) +} + +// We use a tuple instead of struct to avoid serialization overhead of the keys. +#[derive(Deserialize)] +struct RespondArgs( + // rid: + u32, + // status: + u16, + // headers: + Vec<(ByteString, ByteString)>, +); + +async fn op_http_response( + state: Rc<RefCell<OpState>>, + args: RespondArgs, + data: Option<ZeroCopyBuf>, +) -> Result<Option<ResourceId>, AnyError> { + let RespondArgs(rid, status, headers) = args; + + let response_sender = state + .borrow_mut() + .resource_table + .take::<ResponseSenderResource>(rid) + .ok_or_else(bad_resource_id)?; + let response_sender = Rc::try_unwrap(response_sender) + .ok() + .expect("multiple op_http_respond ongoing"); + + let conn_rid = response_sender.conn_rid; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut builder = Response::builder().status(status); + + builder.headers_mut().unwrap().reserve(headers.len()); + for (key, value) in &headers { + builder = builder.header(key.as_ref(), value.as_ref()); + } + + let res; + let maybe_response_body_rid = if let Some(d) = data { + // If a body is passed, we use it, and don't return a body for streaming. + res = builder.body(Vec::from(&*d).into())?; + None + } else { + // If no body is passed, we return a writer for streaming the body. + let (sender, body) = Body::channel(); + res = builder.body(body)?; + + let response_body_rid = + state.borrow_mut().resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + conn_rid, + }); + + Some(response_body_rid) + }; + + // oneshot::Sender::send(v) returns |v| on error, not an error object. + // The only failure mode is the receiver already having dropped its end + // of the channel. + if response_sender.sender.send(res).is_err() { + return Err(type_error("internal communication error")); + } + + poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => { + state.borrow_mut().resource_table.close(conn_rid); + Poll::Ready(x) + } + Poll::Pending => Poll::Ready(Ok(())), + }) + .await?; + + if maybe_response_body_rid.is_none() { + conn_resource.deno_service.waker.wake(); + } + Ok(maybe_response_body_rid) +} + +async fn op_http_response_close( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .take::<ResponseBodyResource>(rid) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + drop(resource); + + let r = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + conn_resource.deno_service.waker.wake(); + r +} + +async fn op_http_request_read( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: Option<ZeroCopyBuf>, +) -> Result<usize, AnyError> { + let mut data = data.ok_or_else(null_opbuf)?; + + let resource = state + .borrow() + .resource_table + .get::<RequestResource>(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut inner = RcRef::map(resource.clone(), |r| &r.inner) + .borrow_mut() + .await; + + if let RequestOrStreamReader::Request(req) = &mut *inner { + let req = req.take().unwrap(); + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let reader = StreamReader::new(stream); + *inner = RequestOrStreamReader::StreamReader(reader); + }; + + let reader = match &mut *inner { + RequestOrStreamReader::StreamReader(reader) => reader, + _ => unreachable!(), + }; + + let cancel = RcRef::map(resource, |r| &r.cancel); + + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + read_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await +} + +async fn op_http_response_write( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: Option<ZeroCopyBuf>, +) -> Result<(), AnyError> { + let buf = data.ok_or_else(null_opbuf)?; + let resource = state + .borrow() + .resource_table + .get::<ResponseBodyResource>(rid as u32) + .ok_or_else(bad_resource_id)?; + + let conn_resource = state + .borrow() + .resource_table + .get::<ConnResource>(resource.conn_rid) + .ok_or_else(bad_resource_id)?; + + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); + + poll_fn(|cx| { + let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); + + // Poll connection so the data is flushed + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + r + }) + .await?; + + Ok(()) +} + +fn op_http_websocket_accept_header( + _: &mut OpState, + key: String, + _: (), +) -> Result<String, AnyError> { + let digest = ring::digest::digest( + &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, + format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(), + ); + Ok(base64::encode(digest)) +} + +async fn op_http_upgrade_websocket( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<ResourceId, AnyError> { + let req_resource = state + .borrow_mut() + .resource_table + .take::<RequestResource>(rid) + .ok_or_else(bad_resource_id)?; + + let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; + + if let RequestOrStreamReader::Request(req) = inner.as_mut() { + let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; + let stream = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + + let (ws_tx, ws_rx) = stream.split(); + let rid = + state + .borrow_mut() + .resource_table + .add(deno_websocket::WsStreamResource { + stream: deno_websocket::WebSocketStreamType::Server { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, + cancel: Default::default(), + }); + + Ok(rid) + } else { + Err(bad_resource_id()) + } +} + +type BytesStream = + Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; + +enum RequestOrStreamReader { + Request(Option<Request<hyper::Body>>), + StreamReader(StreamReader<BytesStream, bytes::Bytes>), +} + +struct RequestResource { + conn_rid: ResourceId, + inner: AsyncRefCell<RequestOrStreamReader>, + cancel: CancelHandle, +} + +impl Resource for RequestResource { + fn name(&self) -> Cow<str> { + "request".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() + } +} + +struct ResponseSenderResource { + sender: oneshot::Sender<Response<Body>>, + conn_rid: ResourceId, +} + +impl Resource for ResponseSenderResource { + fn name(&self) -> Cow<str> { + "responseSender".into() + } +} + +struct ResponseBodyResource { + body: AsyncRefCell<hyper::body::Sender>, + conn_rid: ResourceId, +} + +impl Resource for ResponseBodyResource { + fn name(&self) -> Cow<str> { + "responseBody".into() + } +} + +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} |