diff options
Diffstat (limited to 'extensions/net')
-rw-r--r-- | extensions/net/03_http.js | 379 | ||||
-rw-r--r-- | extensions/net/Cargo.toml | 9 | ||||
-rw-r--r-- | extensions/net/lib.deno_net.unstable.d.ts | 44 | ||||
-rw-r--r-- | extensions/net/lib.rs | 3 | ||||
-rw-r--r-- | extensions/net/ops_http.rs | 683 |
5 files changed, 1 insertions, 1117 deletions
diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js deleted file mode 100644 index db2d0a3b1..000000000 --- a/extensions/net/03_http.js +++ /dev/null @@ -1,379 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -"use strict"; - -((window) => { - const webidl = window.__bootstrap.webidl; - const { InnerBody } = window.__bootstrap.fetchBody; - const { setEventTargetData } = window.__bootstrap.eventTarget; - const { - Response, - fromInnerRequest, - toInnerResponse, - newInnerRequest, - newInnerResponse, - fromInnerResponse, - } = window.__bootstrap.fetch; - const core = window.Deno.core; - const { BadResource, Interrupted } = core; - const { ReadableStream } = window.__bootstrap.streams; - const abortSignal = window.__bootstrap.abortSignal; - const { WebSocket, _rid, _readyState, _eventLoop, _protocol } = - window.__bootstrap.webSocket; - const { - ArrayPrototypeIncludes, - ArrayPrototypePush, - Promise, - StringPrototypeIncludes, - StringPrototypeSplit, - Symbol, - SymbolAsyncIterator, - TypedArrayPrototypeSubarray, - TypeError, - Uint8Array, - } = window.__bootstrap.primordials; - - function serveHttp(conn) { - const rid = core.opSync("op_http_start", conn.rid); - return new HttpConn(rid); - } - - const connErrorSymbol = Symbol("connError"); - - class HttpConn { - #rid = 0; - - constructor(rid) { - this.#rid = rid; - } - - /** @returns {number} */ - get rid() { - return this.#rid; - } - - /** @returns {Promise<ResponseEvent | null>} */ - async nextRequest() { - let nextRequest; - try { - nextRequest = await core.opAsync( - "op_http_request_next", - this.#rid, - ); - } catch (error) { - // A connection error seen here would cause disrupted responses to throw - // a generic `BadResource` error. Instead store this error and replace - // those with it. - this[connErrorSymbol] = error; - if (error instanceof BadResource) { - return null; - } else if (error instanceof Interrupted) { - return null; - } else if ( - StringPrototypeIncludes(error.message, "connection closed") - ) { - return null; - } - throw error; - } - if (nextRequest === null) return null; - - const [ - requestRid, - responseSenderRid, - method, - headersList, - url, - ] = nextRequest; - - /** @type {ReadableStream<Uint8Array> | undefined} */ - let body = null; - if (typeof requestRid === "number") { - body = createRequestBodyStream(requestRid); - } - - const innerRequest = newInnerRequest( - method, - url, - headersList, - body !== null ? new InnerBody(body) : null, - ); - const signal = abortSignal.newSignal(); - const request = fromInnerRequest(innerRequest, signal, "immutable"); - - const respondWith = createRespondWith( - this, - responseSenderRid, - requestRid, - ); - - return { request, respondWith }; - } - - /** @returns {void} */ - close() { - core.close(this.#rid); - } - - [SymbolAsyncIterator]() { - // deno-lint-ignore no-this-alias - const httpConn = this; - return { - async next() { - const reqEvt = await httpConn.nextRequest(); - // Change with caution, current form avoids a v8 deopt - return { value: reqEvt, done: reqEvt === null }; - }, - }; - } - } - - function readRequest(requestRid, zeroCopyBuf) { - return core.opAsync( - "op_http_request_read", - requestRid, - zeroCopyBuf, - ); - } - - function createRespondWith(httpConn, responseSenderRid, requestRid) { - return async function respondWith(resp) { - if (resp instanceof Promise) { - resp = await resp; - } - - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } - - const innerResp = toInnerResponse(resp); - - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if ( - innerResp.body.length === null || - innerResp.body.source instanceof Blob - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - } - } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - } - } else { - respBody = new Uint8Array(0); - } - - let responseBodyRid; - try { - responseBodyRid = await core.opAsync("op_http_response", [ - responseSenderRid, - innerResp.status ?? 200, - innerResp.headerList, - ], respBody instanceof Uint8Array ? respBody : null); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); - } - throw error; - } - - // If `respond` returns a responseBodyRid, we should stream the body - // to that resource. - if (responseBodyRid !== null) { - try { - if (respBody === null || !(respBody instanceof ReadableStream)) { - throw new TypeError("Unreachable"); - } - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!(value instanceof Uint8Array)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; - } - try { - await core.opAsync( - "op_http_response_write", - responseBodyRid, - value, - ); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - await reader.cancel(error); - throw error; - } - } - } finally { - // Once all chunks are sent, and the request body is closed, we can - // close the response body. - try { - await core.opAsync("op_http_response_close", responseBodyRid); - } catch { /* pass */ } - } - } - - const ws = resp[_ws]; - if (ws) { - if (typeof requestRid !== "number") { - throw new TypeError( - "This request can not be upgraded to a websocket connection.", - ); - } - - const wsRid = await core.opAsync( - "op_http_upgrade_websocket", - requestRid, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - if (ws[_readyState] === WebSocket.CLOSING) { - await core.opAsync("op_ws_close", { rid: wsRid }); - - ws[_readyState] = WebSocket.CLOSED; - - const errEvent = new ErrorEvent("error"); - ws.dispatchEvent(errEvent); - - const event = new CloseEvent("close"); - ws.dispatchEvent(event); - - try { - core.close(wsRid); - } catch (err) { - // Ignore error if the socket has already been closed. - if (!(err instanceof Deno.errors.BadResource)) throw err; - } - } else { - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - } - } - }; - } - - function createRequestBodyStream(requestRid) { - return new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest( - requestRid, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - // We have reached the end of the body, so we close the stream. - controller.close(); - core.close(requestRid); - } - } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - core.close(requestRid); - } - }, - cancel() { - core.close(requestRid); - }, - }); - } - - const _ws = Symbol("[[associated_ws]]"); - - function upgradeWebSocket(request, options = {}) { - if (request.headers.get("upgrade") !== "websocket") { - throw new TypeError( - "Invalid Header: 'upgrade' header must be 'websocket'", - ); - } - - if (request.headers.get("connection") !== "Upgrade") { - throw new TypeError( - "Invalid Header: 'connection' header must be 'Upgrade'", - ); - } - - const websocketKey = request.headers.get("sec-websocket-key"); - if (websocketKey === null) { - throw new TypeError( - "Invalid Header: 'sec-websocket-key' header must be set", - ); - } - - const accept = core.opSync("op_http_websocket_accept_header", websocketKey); - - const r = newInnerResponse(101); - r.headerList = [ - ["upgrade", "websocket"], - ["connection", "Upgrade"], - ["sec-websocket-accept", accept], - ]; - - const protocolsStr = request.headers.get("sec-websocket-protocol") || ""; - const protocols = StringPrototypeSplit(protocolsStr, ", "); - if (protocols && options.protocol) { - if (ArrayPrototypeIncludes(protocols, options.protocol)) { - ArrayPrototypePush(r.headerList, [ - "sec-websocket-protocol", - options.protocol, - ]); - } else { - throw new TypeError( - `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`, - ); - } - } - - const response = fromInnerResponse(r, "immutable"); - - const websocket = webidl.createBranded(WebSocket); - setEventTargetData(websocket); - response[_ws] = websocket; - - return { response, websocket }; - } - - window.__bootstrap.http = { - serveHttp, - upgradeWebSocket, - }; -})(this); diff --git a/extensions/net/Cargo.toml b/extensions/net/Cargo.toml index c6219cad4..352308925 100644 --- a/extensions/net/Cargo.toml +++ b/extensions/net/Cargo.toml @@ -15,19 +15,12 @@ path = "lib.rs" [dependencies] deno_core = { version = "0.92.0", path = "../../core" } -deno_websocket = { version = "0.15.0", path = "../websocket" } -base64 = "0.13.0" -bytes = "1" log = "0.4.14" lazy_static = "1.4.0" -http = "0.2.4" -hyper = { version = "0.14.10", features = ["server", "stream", "http1", "http2", "runtime"] } -ring = "0.16.20" -rustls = "0.19.1" +rustls = "0.19.0" serde = { version = "1.0.126", features = ["derive"] } tokio = { version = "1.8.1", features = ["full"] } -tokio-util = { version = "0.6", features = ["io"] } webpki = "0.21.4" webpki-roots = "0.21.1" trust-dns-proto = "0.20.3" diff --git a/extensions/net/lib.deno_net.unstable.d.ts b/extensions/net/lib.deno_net.unstable.d.ts index c47558edc..adeeb1466 100644 --- a/extensions/net/lib.deno_net.unstable.d.ts +++ b/extensions/net/lib.deno_net.unstable.d.ts @@ -229,48 +229,4 @@ declare namespace Deno { */ alpnProtocols?: string[]; } - - export interface RequestEvent { - readonly request: Request; - respondWith(r: Response | Promise<Response>): Promise<void>; - } - - export interface HttpConn extends AsyncIterable<RequestEvent> { - readonly rid: number; - - nextRequest(): Promise<RequestEvent | null>; - close(): void; - } - - /** **UNSTABLE**: new API, yet to be vetted. - * - * Services HTTP requests given a TCP or TLS socket. - * - * ```ts - * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" }); - * const httpConn = Deno.serveHttp(conn); - * const e = await httpConn.nextRequest(); - * if (e) { - * e.respondWith(new Response("Hello World")); - * } - * ``` - * - * If `httpConn.nextRequest()` encounters an error or returns `null` - * then the underlying HttpConn resource is closed automatically. - */ - export function serveHttp(conn: Conn): HttpConn; - - export interface WebSocketUpgrade { - response: Response; - websocket: WebSocket; - } - - export interface UpgradeWebSocketOptions { - protocol?: string; - } - - export function upgradeWebSocket( - request: Request, - options?: UpgradeWebSocketOptions, - ): WebSocketUpgrade; } diff --git a/extensions/net/lib.rs b/extensions/net/lib.rs index d1e836fce..f3281a2fb 100644 --- a/extensions/net/lib.rs +++ b/extensions/net/lib.rs @@ -2,7 +2,6 @@ pub mod io; pub mod ops; -pub mod ops_http; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; @@ -94,14 +93,12 @@ pub fn init<P: NetPermissions + 'static>(unstable: bool) -> Extension { ops_to_register.extend(io::init()); ops_to_register.extend(ops::init::<P>()); ops_to_register.extend(ops_tls::init::<P>()); - ops_to_register.extend(ops_http::init()); Extension::builder() .js(include_js_files!( prefix "deno:extensions/net", "01_net.js", "02_tls.js", - "03_http.js", "04_net_unstable.js", )) .ops(ops_to_register) diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs deleted file mode 100644 index 782ec91d0..000000000 --- a/extensions/net/ops_http.rs +++ /dev/null @@ -1,683 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use crate::io::TcpStreamResource; -use crate::io::TlsStreamResource; -use deno_core::error::bad_resource_id; -use deno_core::error::null_opbuf; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::FutureExt; -use deno_core::futures::Stream; -use deno_core::futures::StreamExt; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::AsyncRefCell; -use deno_core::ByteString; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::OpPair; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; -use hyper::body::HttpBody; -use hyper::http; -use hyper::server::conn::Http; -use hyper::service::Service as HyperService; -use hyper::Body; -use hyper::Request; -use hyper::Response; -use serde::Deserialize; -use serde::Serialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::future::Future; -use std::net::SocketAddr; -use std::pin::Pin; -use std::rc::Rc; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncReadExt; -use tokio::sync::oneshot; -use tokio_util::io::StreamReader; - -pub fn init() -> Vec<OpPair> { - vec![ - ("op_http_start", op_sync(op_http_start)), - ("op_http_request_next", op_async(op_http_request_next)), - ("op_http_request_read", op_async(op_http_request_read)), - ("op_http_response", op_async(op_http_response)), - ("op_http_response_write", op_async(op_http_response_write)), - ("op_http_response_close", op_async(op_http_response_close)), - ( - "op_http_websocket_accept_header", - op_sync(op_http_websocket_accept_header), - ), - ( - "op_http_upgrade_websocket", - op_async(op_http_upgrade_websocket), - ), - ] -} - -struct ServiceInner { - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, -} - -#[derive(Clone, Default)] -struct Service { - inner: Rc<RefCell<Option<ServiceInner>>>, - waker: Rc<deno_core::futures::task::AtomicWaker>, -} - -impl HyperService<Request<Body>> for Service { - type Response = Response<Body>; - type Error = http::Error; - #[allow(clippy::type_complexity)] - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; - - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll<Result<(), Self::Error>> { - if self.inner.borrow().is_some() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: Request<Body>) -> Self::Future { - let (resp_tx, resp_rx) = oneshot::channel(); - self.inner.borrow_mut().replace(ServiceInner { - request: req, - response_tx: resp_tx, - }); - - async move { Ok(resp_rx.await.unwrap()) }.boxed_local() - } -} - -type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; - -struct Conn { - scheme: &'static str, - conn: Rc<RefCell<ConnFuture>>, -} - -struct ConnResource { - hyper_connection: Conn, - deno_service: Service, - addr: SocketAddr, - cancel: CancelHandle, -} - -impl ConnResource { - // TODO(ry) impl Future for ConnResource? - fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { - self - .hyper_connection - .conn - .borrow_mut() - .poll_unpin(cx) - .map_err(AnyError::from) - } -} - -impl Resource for ConnResource { - fn name(&self) -> Cow<str> { - "httpConnection".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -// We use a tuple instead of struct to avoid serialization overhead of the keys. -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct NextRequestResponse( - // request_rid: - Option<ResourceId>, - // response_sender_rid: - ResourceId, - // method: - // This is a String rather than a ByteString because reqwest will only return - // the method as a str which is guaranteed to be ASCII-only. - String, - // headers: - Vec<(ByteString, ByteString)>, - // url: - String, -); - -async fn op_http_request_next( - state: Rc<RefCell<OpState>>, - conn_rid: ResourceId, - _: (), -) -> Result<Option<NextRequestResponse>, AnyError> { - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid) - .ok_or_else(bad_resource_id)?; - - let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); - - poll_fn(|cx| { - conn_resource.deno_service.waker.register(cx.waker()); - let connection_closed = match conn_resource.poll(cx) { - Poll::Pending => false, - Poll::Ready(Ok(())) => { - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid); - true - } - Poll::Ready(Err(e)) => { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // close ConnResource - state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid) - .unwrap(); - - if should_ignore_error(&e) { - true - } else { - return Poll::Ready(Err(e)); - } - } - }; - if let Some(request_resource) = - conn_resource.deno_service.inner.borrow_mut().take() - { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - - let mut headers = Vec::with_capacity(req.headers().len()); - for (name, value) in req.headers().iter() { - let name: &[u8] = name.as_ref(); - let value = value.as_bytes(); - headers - .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); - } - - let url = { - let scheme = &conn_resource.hyper_connection.scheme; - let host: Cow<str> = if let Some(host) = req.uri().host() { - Cow::Borrowed(host) - } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) - } else { - Cow::Owned(conn_resource.addr.to_string()) - }; - let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - format!("{}://{}{}", scheme, host, path) - }; - - let is_websocket_request = req - .headers() - .get(hyper::header::CONNECTION) - .and_then(|v| { - v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s)) - }) - .unwrap_or(false) - && req - .headers() - .get(hyper::header::UPGRADE) - .and_then(|v| { - v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s)) - }) - .unwrap_or(false); - - let has_body = if let Some(exact_size) = req.size_hint().exact() { - exact_size > 0 - } else { - true - }; - - let maybe_request_rid = if is_websocket_request || has_body { - let mut state = state.borrow_mut(); - let request_rid = state.resource_table.add(RequestResource { - conn_rid, - inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), - cancel: CancelHandle::default(), - }); - Some(request_rid) - } else { - None - }; - - let mut state = state.borrow_mut(); - let response_sender_rid = - state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); - - Poll::Ready(Ok(Some(NextRequestResponse( - maybe_request_rid, - response_sender_rid, - method, - headers, - url, - )))) - } else if connection_closed { - Poll::Ready(Ok(None)) - } else { - Poll::Pending - } - }) - .try_or_cancel(cancel) - .await - .map_err(AnyError::from) -} - -fn should_ignore_error(e: &AnyError) -> bool { - if let Some(e) = e.downcast_ref::<hyper::Error>() { - use std::error::Error; - if let Some(std_err) = e.source() { - if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return true; - } - } - } - } - false -} - -fn op_http_start( - state: &mut OpState, - tcp_stream_rid: ResourceId, - _: (), -) -> Result<ResourceId, AnyError> { - let deno_service = Service::default(); - - if let Some(resource_rc) = state - .resource_table - .take::<TcpStreamResource>(tcp_stream_rid) - { - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tcp_stream = read_half.reunite(write_half)?; - let addr = tcp_stream.local_addr()?; - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(tcp_stream, deno_service.clone()) - .with_upgrades(); - let conn = Pin::new(Box::new(hyper_connection)); - let conn_resource = ConnResource { - hyper_connection: Conn { - conn: Rc::new(RefCell::new(conn)), - scheme: "http", - }, - deno_service, - addr, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - return Ok(rid); - } - - if let Some(resource_rc) = state - .resource_table - .take::<TlsStreamResource>(tcp_stream_rid) - { - let resource = Rc::try_unwrap(resource_rc) - .expect("Only a single use of this resource should happen"); - let (read_half, write_half) = resource.into_inner(); - let tls_stream = read_half.reunite(write_half); - let addr = tls_stream.get_ref().0.local_addr()?; - - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(tls_stream, deno_service.clone()) - .with_upgrades(); - let conn = Pin::new(Box::new(hyper_connection)); - let conn_resource = ConnResource { - hyper_connection: Conn { - conn: Rc::new(RefCell::new(conn)), - scheme: "https", - }, - deno_service, - addr, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - return Ok(rid); - } - - Err(bad_resource_id()) -} - -// We use a tuple instead of struct to avoid serialization overhead of the keys. -#[derive(Deserialize)] -struct RespondArgs( - // rid: - u32, - // status: - u16, - // headers: - Vec<(ByteString, ByteString)>, -); - -async fn op_http_response( - state: Rc<RefCell<OpState>>, - args: RespondArgs, - data: Option<ZeroCopyBuf>, -) -> Result<Option<ResourceId>, AnyError> { - let RespondArgs(rid, status, headers) = args; - - let response_sender = state - .borrow_mut() - .resource_table - .take::<ResponseSenderResource>(rid) - .ok_or_else(bad_resource_id)?; - let response_sender = Rc::try_unwrap(response_sender) - .ok() - .expect("multiple op_http_respond ongoing"); - - let conn_rid = response_sender.conn_rid; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut builder = Response::builder().status(status); - - builder.headers_mut().unwrap().reserve(headers.len()); - for (key, value) in &headers { - builder = builder.header(key.as_ref(), value.as_ref()); - } - - let res; - let maybe_response_body_rid = if let Some(d) = data { - // If a body is passed, we use it, and don't return a body for streaming. - res = builder.body(Vec::from(&*d).into())?; - None - } else { - // If no body is passed, we return a writer for streaming the body. - let (sender, body) = Body::channel(); - res = builder.body(body)?; - - let response_body_rid = - state.borrow_mut().resource_table.add(ResponseBodyResource { - body: AsyncRefCell::new(sender), - conn_rid, - }); - - Some(response_body_rid) - }; - - // oneshot::Sender::send(v) returns |v| on error, not an error object. - // The only failure mode is the receiver already having dropped its end - // of the channel. - if response_sender.sender.send(res).is_err() { - return Err(type_error("internal communication error")); - } - - poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => { - state.borrow_mut().resource_table.close(conn_rid); - Poll::Ready(x) - } - Poll::Pending => Poll::Ready(Ok(())), - }) - .await?; - - if maybe_response_body_rid.is_none() { - conn_resource.deno_service.waker.wake(); - } - Ok(maybe_response_body_rid) -} - -async fn op_http_response_close( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _: (), -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - drop(resource); - - let r = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - conn_resource.deno_service.waker.wake(); - r -} - -async fn op_http_request_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: Option<ZeroCopyBuf>, -) -> Result<usize, AnyError> { - let mut data = data.ok_or_else(null_opbuf)?; - - let resource = state - .borrow() - .resource_table - .get::<RequestResource>(rid as u32) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut inner = RcRef::map(resource.clone(), |r| &r.inner) - .borrow_mut() - .await; - - if let RequestOrStreamReader::Request(req) = &mut *inner { - let req = req.take().unwrap(); - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let reader = StreamReader::new(stream); - *inner = RequestOrStreamReader::StreamReader(reader); - }; - - let reader = match &mut *inner { - RequestOrStreamReader::StreamReader(reader) => reader, - _ => unreachable!(), - }; - - let cancel = RcRef::map(resource, |r| &r.cancel); - - let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); - - poll_fn(|cx| { - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); - } - - read_fut.poll_unpin(cx).map_err(AnyError::from) - }) - .await -} - -async fn op_http_response_write( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: Option<ZeroCopyBuf>, -) -> Result<(), AnyError> { - let buf = data.ok_or_else(null_opbuf)?; - let resource = state - .borrow() - .resource_table - .get::<ResponseBodyResource>(rid as u32) - .ok_or_else(bad_resource_id)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid) - .ok_or_else(bad_resource_id)?; - - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - - let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local(); - - poll_fn(|cx| { - let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); - - // Poll connection so the data is flushed - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); - } - - r - }) - .await?; - - Ok(()) -} - -fn op_http_websocket_accept_header( - _: &mut OpState, - key: String, - _: (), -) -> Result<String, AnyError> { - let digest = ring::digest::digest( - &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, - format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(), - ); - Ok(base64::encode(digest)) -} - -async fn op_http_upgrade_websocket( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _: (), -) -> Result<ResourceId, AnyError> { - let req_resource = state - .borrow_mut() - .resource_table - .take::<RequestResource>(rid) - .ok_or_else(bad_resource_id)?; - - let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; - - if let RequestOrStreamReader::Request(req) = inner.as_mut() { - let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; - let stream = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, - None, - ) - .await; - - let (ws_tx, ws_rx) = stream.split(); - let rid = - state - .borrow_mut() - .resource_table - .add(deno_websocket::WsStreamResource { - stream: deno_websocket::WebSocketStreamType::Server { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - }, - cancel: Default::default(), - }); - - Ok(rid) - } else { - Err(bad_resource_id()) - } -} - -type BytesStream = - Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; - -enum RequestOrStreamReader { - Request(Option<Request<hyper::Body>>), - StreamReader(StreamReader<BytesStream, bytes::Bytes>), -} - -struct RequestResource { - conn_rid: ResourceId, - inner: AsyncRefCell<RequestOrStreamReader>, - cancel: CancelHandle, -} - -impl Resource for RequestResource { - fn name(&self) -> Cow<str> { - "request".into() - } - - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -struct ResponseSenderResource { - sender: oneshot::Sender<Response<Body>>, - conn_rid: ResourceId, -} - -impl Resource for ResponseSenderResource { - fn name(&self) -> Cow<str> { - "responseSender".into() - } -} - -struct ResponseBodyResource { - body: AsyncRefCell<hyper::body::Sender>, - conn_rid: ResourceId, -} - -impl Resource for ResponseBodyResource { - fn name(&self) -> Cow<str> { - "responseBody".into() - } -} - -// Needed so hyper can use non Send futures -#[derive(Clone)] -struct LocalExecutor; - -impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); - } -} |