summaryrefslogtreecommitdiff
path: root/extensions/http
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/http')
-rw-r--r--extensions/http/01_http.js374
-rw-r--r--extensions/http/Cargo.toml25
-rw-r--r--extensions/http/README.md4
-rw-r--r--extensions/http/lib.deno_http.unstable.d.ts65
-rw-r--r--extensions/http/lib.rs655
5 files changed, 1123 insertions, 0 deletions
diff --git a/extensions/http/01_http.js b/extensions/http/01_http.js
new file mode 100644
index 000000000..4bcdf1f07
--- /dev/null
+++ b/extensions/http/01_http.js
@@ -0,0 +1,374 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+"use strict";
+
+((window) => {
+ const webidl = window.__bootstrap.webidl;
+ const { InnerBody } = window.__bootstrap.fetchBody;
+ const { setEventTargetData } = window.__bootstrap.eventTarget;
+ const {
+ Response,
+ fromInnerRequest,
+ toInnerResponse,
+ newInnerRequest,
+ newInnerResponse,
+ fromInnerResponse,
+ } = window.__bootstrap.fetch;
+ const core = window.Deno.core;
+ const { BadResource, Interrupted } = core;
+ const { ReadableStream } = window.__bootstrap.streams;
+ const abortSignal = window.__bootstrap.abortSignal;
+ const { WebSocket, _rid, _readyState, _eventLoop, _protocol } =
+ window.__bootstrap.webSocket;
+ const {
+ ArrayPrototypeIncludes,
+ ArrayPrototypePush,
+ Promise,
+ StringPrototypeIncludes,
+ StringPrototypeSplit,
+ Symbol,
+ SymbolAsyncIterator,
+ TypedArrayPrototypeSubarray,
+ TypeError,
+ Uint8Array,
+ } = window.__bootstrap.primordials;
+
+ const connErrorSymbol = Symbol("connError");
+
+ class HttpConn {
+ #rid = 0;
+
+ constructor(rid) {
+ this.#rid = rid;
+ }
+
+ /** @returns {number} */
+ get rid() {
+ return this.#rid;
+ }
+
+ /** @returns {Promise<ResponseEvent | null>} */
+ async nextRequest() {
+ let nextRequest;
+ try {
+ nextRequest = await core.opAsync(
+ "op_http_request_next",
+ this.#rid,
+ );
+ } catch (error) {
+ // A connection error seen here would cause disrupted responses to throw
+ // a generic `BadResource` error. Instead store this error and replace
+ // those with it.
+ this[connErrorSymbol] = error;
+ if (error instanceof BadResource) {
+ return null;
+ } else if (error instanceof Interrupted) {
+ return null;
+ } else if (
+ StringPrototypeIncludes(error.message, "connection closed")
+ ) {
+ return null;
+ }
+ throw error;
+ }
+ if (nextRequest === null) return null;
+
+ const [
+ requestRid,
+ responseSenderRid,
+ method,
+ headersList,
+ url,
+ ] = nextRequest;
+
+ /** @type {ReadableStream<Uint8Array> | undefined} */
+ let body = null;
+ if (typeof requestRid === "number") {
+ body = createRequestBodyStream(requestRid);
+ }
+
+ const innerRequest = newInnerRequest(
+ method,
+ url,
+ headersList,
+ body !== null ? new InnerBody(body) : null,
+ );
+ const signal = abortSignal.newSignal();
+ const request = fromInnerRequest(innerRequest, signal, "immutable");
+
+ const respondWith = createRespondWith(
+ this,
+ responseSenderRid,
+ requestRid,
+ );
+
+ return { request, respondWith };
+ }
+
+ /** @returns {void} */
+ close() {
+ core.close(this.#rid);
+ }
+
+ [SymbolAsyncIterator]() {
+ // deno-lint-ignore no-this-alias
+ const httpConn = this;
+ return {
+ async next() {
+ const reqEvt = await httpConn.nextRequest();
+ // Change with caution, current form avoids a v8 deopt
+ return { value: reqEvt, done: reqEvt === null };
+ },
+ };
+ }
+ }
+
+ function readRequest(requestRid, zeroCopyBuf) {
+ return core.opAsync(
+ "op_http_request_read",
+ requestRid,
+ zeroCopyBuf,
+ );
+ }
+
+ function createRespondWith(httpConn, responseSenderRid, requestRid) {
+ return async function respondWith(resp) {
+ if (resp instanceof Promise) {
+ resp = await resp;
+ }
+
+ if (!(resp instanceof Response)) {
+ throw new TypeError(
+ "First argument to respondWith must be a Response or a promise resolving to a Response.",
+ );
+ }
+
+ const innerResp = toInnerResponse(resp);
+
+ // If response body length is known, it will be sent synchronously in a
+ // single op, in other case a "response body" resource will be created and
+ // we'll be streaming it.
+ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
+ let respBody = null;
+ if (innerResp.body !== null) {
+ if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
+ if (innerResp.body.streamOrStatic instanceof ReadableStream) {
+ if (
+ innerResp.body.length === null ||
+ innerResp.body.source instanceof Blob
+ ) {
+ respBody = innerResp.body.stream;
+ } else {
+ const reader = innerResp.body.stream.getReader();
+ const r1 = await reader.read();
+ if (r1.done) {
+ respBody = new Uint8Array(0);
+ } else {
+ respBody = r1.value;
+ const r2 = await reader.read();
+ if (!r2.done) throw new TypeError("Unreachable");
+ }
+ }
+ } else {
+ innerResp.body.streamOrStatic.consumed = true;
+ respBody = innerResp.body.streamOrStatic.body;
+ }
+ } else {
+ respBody = new Uint8Array(0);
+ }
+
+ let responseBodyRid;
+ try {
+ responseBodyRid = await core.opAsync("op_http_response", [
+ responseSenderRid,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ ], respBody instanceof Uint8Array ? respBody : null);
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ if (respBody !== null && respBody instanceof ReadableStream) {
+ await respBody.cancel(error);
+ }
+ throw error;
+ }
+
+ // If `respond` returns a responseBodyRid, we should stream the body
+ // to that resource.
+ if (responseBodyRid !== null) {
+ try {
+ if (respBody === null || !(respBody instanceof ReadableStream)) {
+ throw new TypeError("Unreachable");
+ }
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!(value instanceof Uint8Array)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
+ }
+ try {
+ await core.opAsync(
+ "op_http_response_write",
+ responseBodyRid,
+ value,
+ );
+ } catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (error instanceof BadResource && connError != null) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
+ await reader.cancel(error);
+ throw error;
+ }
+ }
+ } finally {
+ // Once all chunks are sent, and the request body is closed, we can
+ // close the response body.
+ try {
+ await core.opAsync("op_http_response_close", responseBodyRid);
+ } catch { /* pass */ }
+ }
+ }
+
+ const ws = resp[_ws];
+ if (ws) {
+ if (typeof requestRid !== "number") {
+ throw new TypeError(
+ "This request can not be upgraded to a websocket connection.",
+ );
+ }
+
+ const wsRid = await core.opAsync(
+ "op_http_upgrade_websocket",
+ requestRid,
+ );
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ if (ws[_readyState] === WebSocket.CLOSING) {
+ await core.opAsync("op_ws_close", { rid: wsRid });
+
+ ws[_readyState] = WebSocket.CLOSED;
+
+ const errEvent = new ErrorEvent("error");
+ ws.dispatchEvent(errEvent);
+
+ const event = new CloseEvent("close");
+ ws.dispatchEvent(event);
+
+ try {
+ core.close(wsRid);
+ } catch (err) {
+ // Ignore error if the socket has already been closed.
+ if (!(err instanceof Deno.errors.BadResource)) throw err;
+ }
+ } else {
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ }
+ }
+ };
+ }
+
+ function createRequestBodyStream(requestRid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ try {
+ // This is the largest possible size for a single packet on a TLS
+ // stream.
+ const chunk = new Uint8Array(16 * 1024 + 256);
+ const read = await readRequest(
+ requestRid,
+ chunk,
+ );
+ if (read > 0) {
+ // We read some data. Enqueue it onto the stream.
+ controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
+ } else {
+ // We have reached the end of the body, so we close the stream.
+ controller.close();
+ core.close(requestRid);
+ }
+ } catch (err) {
+ // There was an error while reading a chunk of the body, so we
+ // error.
+ controller.error(err);
+ controller.close();
+ core.close(requestRid);
+ }
+ },
+ cancel() {
+ core.close(requestRid);
+ },
+ });
+ }
+
+ const _ws = Symbol("[[associated_ws]]");
+
+ function upgradeWebSocket(request, options = {}) {
+ if (request.headers.get("upgrade") !== "websocket") {
+ throw new TypeError(
+ "Invalid Header: 'upgrade' header must be 'websocket'",
+ );
+ }
+
+ if (request.headers.get("connection") !== "Upgrade") {
+ throw new TypeError(
+ "Invalid Header: 'connection' header must be 'Upgrade'",
+ );
+ }
+
+ const websocketKey = request.headers.get("sec-websocket-key");
+ if (websocketKey === null) {
+ throw new TypeError(
+ "Invalid Header: 'sec-websocket-key' header must be set",
+ );
+ }
+
+ const accept = core.opSync("op_http_websocket_accept_header", websocketKey);
+
+ const r = newInnerResponse(101);
+ r.headerList = [
+ ["upgrade", "websocket"],
+ ["connection", "Upgrade"],
+ ["sec-websocket-accept", accept],
+ ];
+
+ const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
+ const protocols = StringPrototypeSplit(protocolsStr, ", ");
+ if (protocols && options.protocol) {
+ if (ArrayPrototypeIncludes(protocols, options.protocol)) {
+ ArrayPrototypePush(r.headerList, [
+ "sec-websocket-protocol",
+ options.protocol,
+ ]);
+ } else {
+ throw new TypeError(
+ `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
+ );
+ }
+ }
+
+ const response = fromInnerResponse(r, "immutable");
+
+ const websocket = webidl.createBranded(WebSocket);
+ setEventTargetData(websocket);
+ response[_ws] = websocket;
+
+ return { response, websocket };
+ }
+
+ window.__bootstrap.http = {
+ HttpConn,
+ upgradeWebSocket,
+ };
+})(this);
diff --git a/extensions/http/Cargo.toml b/extensions/http/Cargo.toml
new file mode 100644
index 000000000..8909301a6
--- /dev/null
+++ b/extensions/http/Cargo.toml
@@ -0,0 +1,25 @@
+# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_http"
+version = "0.1.0"
+edition = "2018"
+description = "HTTP server implementation for Deno"
+authors = ["the Deno authors"]
+license = "MIT"
+readme = "README.md"
+repository = "https://github.com/denoland/deno"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+base64 = "0.13.0"
+bytes = "1"
+deno_core = { version = "0.92.0", path = "../../core" }
+deno_websocket = { version = "0.15.1", path = "../websocket" }
+hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
+ring = "0.16.20"
+serde = { version = "1.0.125", features = ["derive"] }
+tokio = { version = "1.8.0", features = ["full"] }
+tokio-util = "0.6.7"
diff --git a/extensions/http/README.md b/extensions/http/README.md
new file mode 100644
index 000000000..ab557017a
--- /dev/null
+++ b/extensions/http/README.md
@@ -0,0 +1,4 @@
+# deno_http
+
+This crate implements server-side HTTP based on primitives from the
+[Fetch API](https://fetch.spec.whatwg.org/).
diff --git a/extensions/http/lib.deno_http.unstable.d.ts b/extensions/http/lib.deno_http.unstable.d.ts
new file mode 100644
index 000000000..30ffe121e
--- /dev/null
+++ b/extensions/http/lib.deno_http.unstable.d.ts
@@ -0,0 +1,65 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+/// <reference no-default-lib="true" />
+/// <reference lib="esnext" />
+
+declare namespace Deno {
+ export interface RequestEvent {
+ readonly request: Request;
+ respondWith(r: Response | Promise<Response>): Promise<void>;
+ }
+
+ export interface HttpConn extends AsyncIterable<RequestEvent> {
+ readonly rid: number;
+
+ nextRequest(): Promise<RequestEvent | null>;
+ close(): void;
+ }
+
+ export interface WebSocketUpgrade {
+ response: Response;
+ websocket: WebSocket;
+ }
+
+ export interface UpgradeWebSocketOptions {
+ protocol?: string;
+ }
+
+ /** **UNSTABLE**: new API, yet to be vetted.
+ *
+ * Used to upgrade an incoming HTTP request to a WebSocket.
+ *
+ * Given a request, returns a pair of WebSocket and Response. The original
+ * request must be responded to with the returned response for the websocket
+ * upgrade to be successful.
+ *
+ * ```ts
+ * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" });
+ * const httpConn = Deno.serveHttp(conn);
+ * const e = await httpConn.nextRequest();
+ * if (e) {
+ * const { websocket, response } = Deno.upgradeWebSocket(e.request);
+ * websocket.onopen = () => {
+ * websocket.send("Hello World!");
+ * };
+ * websocket.onmessage = (e) => {
+ * console.log(e.data);
+ * websocket.close();
+ * };
+ * websocket.onclose = () => console.log("WebSocket has been closed.");
+ * websocket.onerror = (e) => console.error("WebSocket error:", e.message);
+ * e.respondWith(response);
+ * }
+ * ```
+ *
+ * If the request body is disturbed (read from) before the upgrade is
+ * completed, upgrading fails.
+ *
+ * This operation does not yet consume the request or open the websocket. This
+ * only happens once the returned response has been passed to `respondWith`.
+ */
+ export function upgradeWebSocket(
+ request: Request,
+ options?: UpgradeWebSocketOptions,
+ ): WebSocketUpgrade;
+}
diff --git a/extensions/http/lib.rs b/extensions/http/lib.rs
new file mode 100644
index 000000000..a8d92ab46
--- /dev/null
+++ b/extensions/http/lib.rs
@@ -0,0 +1,655 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::error::bad_resource_id;
+use deno_core::error::null_opbuf;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::FutureExt;
+use deno_core::futures::Stream;
+use deno_core::futures::StreamExt;
+use deno_core::include_js_files;
+use deno_core::op_async;
+use deno_core::op_sync;
+use deno_core::AsyncRefCell;
+use deno_core::ByteString;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::Extension;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::ZeroCopyBuf;
+use hyper::body::HttpBody;
+use hyper::http;
+use hyper::server::conn::Http;
+use hyper::service::Service as HyperService;
+use hyper::Body;
+use hyper::Request;
+use hyper::Response;
+use serde::Deserialize;
+use serde::Serialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::future::Future;
+use std::net::SocketAddr;
+use std::path::PathBuf;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
+use tokio::sync::oneshot;
+use tokio_util::io::StreamReader;
+
+pub fn get_unstable_declaration() -> PathBuf {
+ PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_http.unstable.d.ts")
+}
+
+pub fn init() -> Extension {
+ Extension::builder()
+ .js(include_js_files!(
+ prefix "deno:extensions/http",
+ "01_http.js",
+ ))
+ .ops(vec![
+ ("op_http_request_next", op_async(op_http_request_next)),
+ ("op_http_request_read", op_async(op_http_request_read)),
+ ("op_http_response", op_async(op_http_response)),
+ ("op_http_response_write", op_async(op_http_response_write)),
+ ("op_http_response_close", op_async(op_http_response_close)),
+ (
+ "op_http_websocket_accept_header",
+ op_sync(op_http_websocket_accept_header),
+ ),
+ (
+ "op_http_upgrade_websocket",
+ op_async(op_http_upgrade_websocket),
+ ),
+ ])
+ .build()
+}
+
+struct ServiceInner {
+ request: Request<Body>,
+ response_tx: oneshot::Sender<Response<Body>>,
+}
+
+#[derive(Clone, Default)]
+struct Service {
+ inner: Rc<RefCell<Option<ServiceInner>>>,
+ waker: Rc<deno_core::futures::task::AtomicWaker>,
+}
+
+impl HyperService<Request<Body>> for Service {
+ type Response = Response<Body>;
+ type Error = http::Error;
+ #[allow(clippy::type_complexity)]
+ type Future =
+ Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
+
+ fn poll_ready(
+ &mut self,
+ _cx: &mut Context<'_>,
+ ) -> Poll<Result<(), Self::Error>> {
+ if self.inner.borrow().is_some() {
+ Poll::Pending
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn call(&mut self, req: Request<Body>) -> Self::Future {
+ let (resp_tx, resp_rx) = oneshot::channel();
+ self.inner.borrow_mut().replace(ServiceInner {
+ request: req,
+ response_tx: resp_tx,
+ });
+
+ async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
+ }
+}
+
+type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>;
+
+struct Conn {
+ scheme: &'static str,
+ addr: SocketAddr,
+ conn: Rc<RefCell<ConnFuture>>,
+}
+
+struct ConnResource {
+ hyper_connection: Conn,
+ deno_service: Service,
+ cancel: CancelHandle,
+}
+
+impl ConnResource {
+ // TODO(ry) impl Future for ConnResource?
+ fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
+ self
+ .hyper_connection
+ .conn
+ .borrow_mut()
+ .poll_unpin(cx)
+ .map_err(AnyError::from)
+ }
+}
+
+impl Resource for ConnResource {
+ fn name(&self) -> Cow<str> {
+ "httpConnection".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+// We use a tuple instead of struct to avoid serialization overhead of the keys.
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+struct NextRequestResponse(
+ // request_rid:
+ Option<ResourceId>,
+ // response_sender_rid:
+ ResourceId,
+ // method:
+ // This is a String rather than a ByteString because reqwest will only return
+ // the method as a str which is guaranteed to be ASCII-only.
+ String,
+ // headers:
+ Vec<(ByteString, ByteString)>,
+ // url:
+ String,
+);
+
+async fn op_http_request_next(
+ state: Rc<RefCell<OpState>>,
+ conn_rid: ResourceId,
+ _: (),
+) -> Result<Option<NextRequestResponse>, AnyError> {
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
+
+ poll_fn(|cx| {
+ conn_resource.deno_service.waker.register(cx.waker());
+ let connection_closed = match conn_resource.poll(cx) {
+ Poll::Pending => false,
+ Poll::Ready(Ok(())) => {
+ // try to close ConnResource, but don't unwrap as it might
+ // already be closed
+ let _ = state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid);
+ true
+ }
+ Poll::Ready(Err(e)) => {
+ // TODO(ry) close RequestResource associated with connection
+ // TODO(ry) close ResponseBodyResource associated with connection
+ // close ConnResource
+ state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid)
+ .unwrap();
+
+ if should_ignore_error(&e) {
+ true
+ } else {
+ return Poll::Ready(Err(e));
+ }
+ }
+ };
+ if let Some(request_resource) =
+ conn_resource.deno_service.inner.borrow_mut().take()
+ {
+ let tx = request_resource.response_tx;
+ let req = request_resource.request;
+ let method = req.method().to_string();
+
+ let mut headers = Vec::with_capacity(req.headers().len());
+ for (name, value) in req.headers().iter() {
+ let name: &[u8] = name.as_ref();
+ let value = value.as_bytes();
+ headers
+ .push((ByteString(name.to_owned()), ByteString(value.to_owned())));
+ }
+
+ let url = {
+ let scheme = &conn_resource.hyper_connection.scheme;
+ let host: Cow<str> = if let Some(host) = req.uri().host() {
+ Cow::Borrowed(host)
+ } else if let Some(host) = req.headers().get("HOST") {
+ Cow::Borrowed(host.to_str()?)
+ } else {
+ Cow::Owned(conn_resource.hyper_connection.addr.to_string())
+ };
+ let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
+ format!("{}://{}{}", scheme, host, path)
+ };
+
+ let is_websocket_request = req
+ .headers()
+ .get(hyper::header::CONNECTION)
+ .and_then(|v| {
+ v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s))
+ })
+ .unwrap_or(false)
+ && req
+ .headers()
+ .get(hyper::header::UPGRADE)
+ .and_then(|v| {
+ v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s))
+ })
+ .unwrap_or(false);
+
+ let has_body = if let Some(exact_size) = req.size_hint().exact() {
+ exact_size > 0
+ } else {
+ true
+ };
+
+ let maybe_request_rid = if is_websocket_request || has_body {
+ let mut state = state.borrow_mut();
+ let request_rid = state.resource_table.add(RequestResource {
+ conn_rid,
+ inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
+ cancel: CancelHandle::default(),
+ });
+ Some(request_rid)
+ } else {
+ None
+ };
+
+ let mut state = state.borrow_mut();
+ let response_sender_rid =
+ state.resource_table.add(ResponseSenderResource {
+ sender: tx,
+ conn_rid,
+ });
+
+ Poll::Ready(Ok(Some(NextRequestResponse(
+ maybe_request_rid,
+ response_sender_rid,
+ method,
+ headers,
+ url,
+ ))))
+ } else if connection_closed {
+ Poll::Ready(Ok(None))
+ } else {
+ Poll::Pending
+ }
+ })
+ .try_or_cancel(cancel)
+ .await
+ .map_err(AnyError::from)
+}
+
+fn should_ignore_error(e: &AnyError) -> bool {
+ if let Some(e) = e.downcast_ref::<hyper::Error>() {
+ use std::error::Error;
+ if let Some(std_err) = e.source() {
+ if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
+ if io_err.kind() == std::io::ErrorKind::NotConnected {
+ return true;
+ }
+ }
+ }
+ }
+ false
+}
+
+pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
+ state: &mut OpState,
+ io: IO,
+ addr: SocketAddr,
+ scheme: &'static str,
+) -> Result<ResourceId, AnyError> {
+ let deno_service = Service::default();
+
+ let hyper_connection = Http::new()
+ .with_executor(LocalExecutor)
+ .serve_connection(io, deno_service.clone())
+ .with_upgrades();
+ let conn = Pin::new(Box::new(hyper_connection));
+ let conn_resource = ConnResource {
+ hyper_connection: Conn {
+ scheme,
+ addr,
+ conn: Rc::new(RefCell::new(conn)),
+ },
+ deno_service,
+ cancel: CancelHandle::default(),
+ };
+ let rid = state.resource_table.add(conn_resource);
+ Ok(rid)
+}
+
+// We use a tuple instead of struct to avoid serialization overhead of the keys.
+#[derive(Deserialize)]
+struct RespondArgs(
+ // rid:
+ u32,
+ // status:
+ u16,
+ // headers:
+ Vec<(ByteString, ByteString)>,
+);
+
+async fn op_http_response(
+ state: Rc<RefCell<OpState>>,
+ args: RespondArgs,
+ data: Option<ZeroCopyBuf>,
+) -> Result<Option<ResourceId>, AnyError> {
+ let RespondArgs(rid, status, headers) = args;
+
+ let response_sender = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseSenderResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let response_sender = Rc::try_unwrap(response_sender)
+ .ok()
+ .expect("multiple op_http_respond ongoing");
+
+ let conn_rid = response_sender.conn_rid;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut builder = Response::builder().status(status);
+
+ builder.headers_mut().unwrap().reserve(headers.len());
+ for (key, value) in &headers {
+ builder = builder.header(key.as_ref(), value.as_ref());
+ }
+
+ let res;
+ let maybe_response_body_rid = if let Some(d) = data {
+ // If a body is passed, we use it, and don't return a body for streaming.
+ res = builder.body(Vec::from(&*d).into())?;
+ None
+ } else {
+ // If no body is passed, we return a writer for streaming the body.
+ let (sender, body) = Body::channel();
+ res = builder.body(body)?;
+
+ let response_body_rid =
+ state.borrow_mut().resource_table.add(ResponseBodyResource {
+ body: AsyncRefCell::new(sender),
+ conn_rid,
+ });
+
+ Some(response_body_rid)
+ };
+
+ // oneshot::Sender::send(v) returns |v| on error, not an error object.
+ // The only failure mode is the receiver already having dropped its end
+ // of the channel.
+ if response_sender.sender.send(res).is_err() {
+ return Err(type_error("internal communication error"));
+ }
+
+ poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => {
+ state.borrow_mut().resource_table.close(conn_rid);
+ Poll::Ready(x)
+ }
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await?;
+
+ if maybe_response_body_rid.is_none() {
+ conn_resource.deno_service.waker.wake();
+ }
+ Ok(maybe_response_body_rid)
+}
+
+async fn op_http_response_close(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .take::<ResponseBodyResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+ drop(resource);
+
+ let r = poll_fn(|cx| match conn_resource.poll(cx) {
+ Poll::Ready(x) => Poll::Ready(x),
+ Poll::Pending => Poll::Ready(Ok(())),
+ })
+ .await;
+ conn_resource.deno_service.waker.wake();
+ r
+}
+
+async fn op_http_request_read(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: Option<ZeroCopyBuf>,
+) -> Result<usize, AnyError> {
+ let mut data = data.ok_or_else(null_opbuf)?;
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<RequestResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut inner = RcRef::map(resource.clone(), |r| &r.inner)
+ .borrow_mut()
+ .await;
+
+ if let RequestOrStreamReader::Request(req) = &mut *inner {
+ let req = req.take().unwrap();
+ let stream: BytesStream = Box::pin(req.into_body().map(|r| {
+ r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
+ }));
+ let reader = StreamReader::new(stream);
+ *inner = RequestOrStreamReader::StreamReader(reader);
+ };
+
+ let reader = match &mut *inner {
+ RequestOrStreamReader::StreamReader(reader) => reader,
+ _ => unreachable!(),
+ };
+
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+
+ let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
+
+ poll_fn(|cx| {
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ read_fut.poll_unpin(cx).map_err(AnyError::from)
+ })
+ .await
+}
+
+async fn op_http_response_write(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: Option<ZeroCopyBuf>,
+) -> Result<(), AnyError> {
+ let buf = data.ok_or_else(null_opbuf)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<ResponseBodyResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let conn_resource = state
+ .borrow()
+ .resource_table
+ .get::<ConnResource>(resource.conn_rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+
+ let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
+
+ poll_fn(|cx| {
+ let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
+
+ // Poll connection so the data is flushed
+ if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
+ // close ConnResource
+ // close RequestResource associated with connection
+ // close ResponseBodyResource associated with connection
+ return Poll::Ready(Err(e));
+ }
+
+ r
+ })
+ .await?;
+
+ Ok(())
+}
+
+fn op_http_websocket_accept_header(
+ _: &mut OpState,
+ key: String,
+ _: (),
+) -> Result<String, AnyError> {
+ let digest = ring::digest::digest(
+ &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
+ format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(),
+ );
+ Ok(base64::encode(digest))
+}
+
+async fn op_http_upgrade_websocket(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<ResourceId, AnyError> {
+ let req_resource = state
+ .borrow_mut()
+ .resource_table
+ .take::<RequestResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await;
+
+ if let RequestOrStreamReader::Request(req) = inner.as_mut() {
+ let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?;
+ let stream =
+ deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
+ upgraded,
+ deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
+ None,
+ )
+ .await;
+
+ let (ws_tx, ws_rx) = stream.split();
+ let rid =
+ state
+ .borrow_mut()
+ .resource_table
+ .add(deno_websocket::WsStreamResource {
+ stream: deno_websocket::WebSocketStreamType::Server {
+ rx: AsyncRefCell::new(ws_rx),
+ tx: AsyncRefCell::new(ws_tx),
+ },
+ cancel: Default::default(),
+ });
+
+ Ok(rid)
+ } else {
+ Err(bad_resource_id())
+ }
+}
+
+type BytesStream =
+ Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
+
+enum RequestOrStreamReader {
+ Request(Option<Request<hyper::Body>>),
+ StreamReader(StreamReader<BytesStream, bytes::Bytes>),
+}
+
+struct RequestResource {
+ conn_rid: ResourceId,
+ inner: AsyncRefCell<RequestOrStreamReader>,
+ cancel: CancelHandle,
+}
+
+impl Resource for RequestResource {
+ fn name(&self) -> Cow<str> {
+ "request".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
+ }
+}
+
+struct ResponseSenderResource {
+ sender: oneshot::Sender<Response<Body>>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseSenderResource {
+ fn name(&self) -> Cow<str> {
+ "responseSender".into()
+ }
+}
+
+struct ResponseBodyResource {
+ body: AsyncRefCell<hyper::body::Sender>,
+ conn_rid: ResourceId,
+}
+
+impl Resource for ResponseBodyResource {
+ fn name(&self) -> Cow<str> {
+ "responseBody".into()
+ }
+}
+
+// Needed so hyper can use non Send futures
+#[derive(Clone)]
+struct LocalExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ tokio::task::spawn_local(fut);
+ }
+}