summaryrefslogtreecommitdiff
path: root/extensions/http
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2021-08-11 12:27:05 +0200
committerGitHub <noreply@github.com>2021-08-11 12:27:05 +0200
commita0285e2eb88f6254f6494b0ecd1878db3a3b2a58 (patch)
tree90671b004537e20f9493fd3277ffd21d30b39a0e /extensions/http
parent3a6994115176781b3a93d70794b1b81bc95e42b4 (diff)
Rename extensions/ directory to ext/ (#11643)
Diffstat (limited to 'extensions/http')
-rw-r--r--extensions/http/01_http.js383
-rw-r--r--extensions/http/Cargo.toml25
-rw-r--r--extensions/http/README.md4
-rw-r--r--extensions/http/lib.deno_http.unstable.d.ts53
-rw-r--r--extensions/http/lib.rs684
5 files changed, 0 insertions, 1149 deletions
diff --git a/extensions/http/01_http.js b/extensions/http/01_http.js
deleted file mode 100644
index 3f8bcb3a8..000000000
--- a/extensions/http/01_http.js
+++ /dev/null
@@ -1,383 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-"use strict";
-
-((window) => {
- const webidl = window.__bootstrap.webidl;
- const { InnerBody } = window.__bootstrap.fetchBody;
- const { setEventTargetData } = window.__bootstrap.eventTarget;
- const {
- Response,
- fromInnerRequest,
- toInnerResponse,
- newInnerRequest,
- newInnerResponse,
- fromInnerResponse,
- } = window.__bootstrap.fetch;
- const core = window.Deno.core;
- const { BadResource, Interrupted } = core;
- const { ReadableStream } = window.__bootstrap.streams;
- const abortSignal = window.__bootstrap.abortSignal;
- const { WebSocket, _rid, _readyState, _eventLoop, _protocol, _server } =
- window.__bootstrap.webSocket;
- const {
- ArrayPrototypeIncludes,
- ArrayPrototypePush,
- ArrayPrototypeSome,
- Promise,
- StringPrototypeIncludes,
- StringPrototypeToLowerCase,
- StringPrototypeSplit,
- Symbol,
- SymbolAsyncIterator,
- TypedArrayPrototypeSubarray,
- TypeError,
- Uint8Array,
- } = window.__bootstrap.primordials;
-
- const connErrorSymbol = Symbol("connError");
-
- class HttpConn {
- #rid = 0;
-
- constructor(rid) {
- this.#rid = rid;
- }
-
- /** @returns {number} */
- get rid() {
- return this.#rid;
- }
-
- /** @returns {Promise<ResponseEvent | null>} */
- async nextRequest() {
- let nextRequest;
- try {
- nextRequest = await core.opAsync(
- "op_http_request_next",
- this.#rid,
- );
- } catch (error) {
- // A connection error seen here would cause disrupted responses to throw
- // a generic `BadResource` error. Instead store this error and replace
- // those with it.
- this[connErrorSymbol] = error;
- if (error instanceof BadResource) {
- return null;
- } else if (error instanceof Interrupted) {
- return null;
- } else if (
- StringPrototypeIncludes(error.message, "connection closed")
- ) {
- return null;
- }
- throw error;
- }
- if (nextRequest === null) return null;
-
- const [
- requestRid,
- responseSenderRid,
- method,
- headersList,
- url,
- ] = nextRequest;
-
- /** @type {ReadableStream<Uint8Array> | undefined} */
- let body = null;
- if (typeof requestRid === "number") {
- body = createRequestBodyStream(requestRid);
- }
-
- const innerRequest = newInnerRequest(
- method,
- url,
- headersList,
- body !== null ? new InnerBody(body) : null,
- );
- const signal = abortSignal.newSignal();
- const request = fromInnerRequest(innerRequest, signal, "immutable");
-
- const respondWith = createRespondWith(
- this,
- responseSenderRid,
- requestRid,
- );
-
- return { request, respondWith };
- }
-
- /** @returns {void} */
- close() {
- core.close(this.#rid);
- }
-
- [SymbolAsyncIterator]() {
- // deno-lint-ignore no-this-alias
- const httpConn = this;
- return {
- async next() {
- const reqEvt = await httpConn.nextRequest();
- // Change with caution, current form avoids a v8 deopt
- return { value: reqEvt, done: reqEvt === null };
- },
- };
- }
- }
-
- function readRequest(requestRid, zeroCopyBuf) {
- return core.opAsync(
- "op_http_request_read",
- requestRid,
- zeroCopyBuf,
- );
- }
-
- function createRespondWith(httpConn, responseSenderRid, requestRid) {
- return async function respondWith(resp) {
- if (resp instanceof Promise) {
- resp = await resp;
- }
-
- if (!(resp instanceof Response)) {
- throw new TypeError(
- "First argument to respondWith must be a Response or a promise resolving to a Response.",
- );
- }
-
- const innerResp = toInnerResponse(resp);
-
- // If response body length is known, it will be sent synchronously in a
- // single op, in other case a "response body" resource will be created and
- // we'll be streaming it.
- /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
- let respBody = null;
- if (innerResp.body !== null) {
- if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
- if (innerResp.body.streamOrStatic instanceof ReadableStream) {
- if (
- innerResp.body.length === null ||
- innerResp.body.source instanceof Blob
- ) {
- respBody = innerResp.body.stream;
- } else {
- const reader = innerResp.body.stream.getReader();
- const r1 = await reader.read();
- if (r1.done) {
- respBody = new Uint8Array(0);
- } else {
- respBody = r1.value;
- const r2 = await reader.read();
- if (!r2.done) throw new TypeError("Unreachable");
- }
- }
- } else {
- innerResp.body.streamOrStatic.consumed = true;
- respBody = innerResp.body.streamOrStatic.body;
- }
- } else {
- respBody = new Uint8Array(0);
- }
-
- let responseBodyRid;
- try {
- responseBodyRid = await core.opAsync("op_http_response", [
- responseSenderRid,
- innerResp.status ?? 200,
- innerResp.headerList,
- ], respBody instanceof Uint8Array ? respBody : null);
- } catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (error instanceof BadResource && connError != null) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
- if (respBody !== null && respBody instanceof ReadableStream) {
- await respBody.cancel(error);
- }
- throw error;
- }
-
- // If `respond` returns a responseBodyRid, we should stream the body
- // to that resource.
- if (responseBodyRid !== null) {
- try {
- if (respBody === null || !(respBody instanceof ReadableStream)) {
- throw new TypeError("Unreachable");
- }
- const reader = respBody.getReader();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- if (!(value instanceof Uint8Array)) {
- await reader.cancel(new TypeError("Value not a Uint8Array"));
- break;
- }
- try {
- await core.opAsync(
- "op_http_response_write",
- responseBodyRid,
- value,
- );
- } catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (error instanceof BadResource && connError != null) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
- await reader.cancel(error);
- throw error;
- }
- }
- } finally {
- // Once all chunks are sent, and the request body is closed, we can
- // close the response body.
- try {
- await core.opAsync("op_http_response_close", responseBodyRid);
- } catch { /* pass */ }
- }
- }
-
- const ws = resp[_ws];
- if (ws) {
- if (typeof requestRid !== "number") {
- throw new TypeError(
- "This request can not be upgraded to a websocket connection.",
- );
- }
-
- const wsRid = await core.opAsync(
- "op_http_upgrade_websocket",
- requestRid,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
-
- if (ws[_readyState] === WebSocket.CLOSING) {
- await core.opAsync("op_ws_close", { rid: wsRid });
-
- ws[_readyState] = WebSocket.CLOSED;
-
- const errEvent = new ErrorEvent("error");
- ws.dispatchEvent(errEvent);
-
- const event = new CloseEvent("close");
- ws.dispatchEvent(event);
-
- try {
- core.close(wsRid);
- } catch (err) {
- // Ignore error if the socket has already been closed.
- if (!(err instanceof Deno.errors.BadResource)) throw err;
- }
- } else {
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- }
- }
- };
- }
-
- function createRequestBodyStream(requestRid) {
- return new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await readRequest(
- requestRid,
- chunk,
- );
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- // We have reached the end of the body, so we close the stream.
- controller.close();
- core.close(requestRid);
- }
- } catch (err) {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- controller.close();
- core.close(requestRid);
- }
- },
- cancel() {
- core.close(requestRid);
- },
- });
- }
-
- const _ws = Symbol("[[associated_ws]]");
-
- function upgradeWebSocket(request, options = {}) {
- if (request.headers.get("upgrade") !== "websocket") {
- throw new TypeError(
- "Invalid Header: 'upgrade' header must be 'websocket'",
- );
- }
-
- const connection = request.headers.get("connection");
- const connectionHasUpgradeOption = connection !== null &&
- ArrayPrototypeSome(
- StringPrototypeSplit(connection, /\s*,\s*/),
- (option) => StringPrototypeToLowerCase(option) === "upgrade",
- );
- if (!connectionHasUpgradeOption) {
- throw new TypeError(
- "Invalid Header: 'connection' header must be 'Upgrade'",
- );
- }
-
- const websocketKey = request.headers.get("sec-websocket-key");
- if (websocketKey === null) {
- throw new TypeError(
- "Invalid Header: 'sec-websocket-key' header must be set",
- );
- }
-
- const accept = core.opSync("op_http_websocket_accept_header", websocketKey);
-
- const r = newInnerResponse(101);
- r.headerList = [
- ["upgrade", "websocket"],
- ["connection", "Upgrade"],
- ["sec-websocket-accept", accept],
- ];
-
- const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
- const protocols = StringPrototypeSplit(protocolsStr, ", ");
- if (protocols && options.protocol) {
- if (ArrayPrototypeIncludes(protocols, options.protocol)) {
- ArrayPrototypePush(r.headerList, [
- "sec-websocket-protocol",
- options.protocol,
- ]);
- } else {
- throw new TypeError(
- `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
- );
- }
- }
-
- const response = fromInnerResponse(r, "immutable");
-
- const socket = webidl.createBranded(WebSocket);
- setEventTargetData(socket);
- socket[_server] = true;
- response[_ws] = socket;
-
- return { response, socket };
- }
-
- window.__bootstrap.http = {
- HttpConn,
- upgradeWebSocket,
- };
-})(this);
diff --git a/extensions/http/Cargo.toml b/extensions/http/Cargo.toml
deleted file mode 100644
index 3463735c5..000000000
--- a/extensions/http/Cargo.toml
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-[package]
-name = "deno_http"
-version = "0.5.0"
-authors = ["the Deno authors"]
-edition = "2018"
-license = "MIT"
-readme = "README.md"
-repository = "https://github.com/denoland/deno"
-description = "HTTP server implementation for Deno"
-
-[lib]
-path = "lib.rs"
-
-[dependencies]
-base64 = "0.13.0"
-bytes = "1"
-deno_core = { version = "0.96.0", path = "../../core" }
-deno_websocket = { version = "0.19.0", path = "../websocket" }
-hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
-ring = "0.16.20"
-serde = { version = "1.0.125", features = ["derive"] }
-tokio = { version = "1.8.0", features = ["full"] }
-tokio-util = { version = "0.6.7", features = ["io"] }
diff --git a/extensions/http/README.md b/extensions/http/README.md
deleted file mode 100644
index ab557017a..000000000
--- a/extensions/http/README.md
+++ /dev/null
@@ -1,4 +0,0 @@
-# deno_http
-
-This crate implements server-side HTTP based on primitives from the
-[Fetch API](https://fetch.spec.whatwg.org/).
diff --git a/extensions/http/lib.deno_http.unstable.d.ts b/extensions/http/lib.deno_http.unstable.d.ts
deleted file mode 100644
index 5c5bf78df..000000000
--- a/extensions/http/lib.deno_http.unstable.d.ts
+++ /dev/null
@@ -1,53 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-/// <reference no-default-lib="true" />
-/// <reference lib="esnext" />
-
-declare namespace Deno {
- export interface WebSocketUpgrade {
- response: Response;
- socket: WebSocket;
- }
-
- export interface UpgradeWebSocketOptions {
- protocol?: string;
- }
-
- /** **UNSTABLE**: new API, yet to be vetted.
- *
- * Used to upgrade an incoming HTTP request to a WebSocket.
- *
- * Given a request, returns a pair of WebSocket and Response. The original
- * request must be responded to with the returned response for the websocket
- * upgrade to be successful.
- *
- * ```ts
- * const conn = await Deno.connect({ port: 80, hostname: "127.0.0.1" });
- * const httpConn = Deno.serveHttp(conn);
- * const e = await httpConn.nextRequest();
- * if (e) {
- * const { socket, response } = Deno.upgradeWebSocket(e.request);
- * socket.onopen = () => {
- * socket.send("Hello World!");
- * };
- * socket.onmessage = (e) => {
- * console.log(e.data);
- * socket.close();
- * };
- * socket.onclose = () => console.log("WebSocket has been closed.");
- * socket.onerror = (e) => console.error("WebSocket error:", e.message);
- * e.respondWith(response);
- * }
- * ```
- *
- * If the request body is disturbed (read from) before the upgrade is
- * completed, upgrading fails.
- *
- * This operation does not yet consume the request or open the websocket. This
- * only happens once the returned response has been passed to `respondWith`.
- */
- export function upgradeWebSocket(
- request: Request,
- options?: UpgradeWebSocketOptions,
- ): WebSocketUpgrade;
-}
diff --git a/extensions/http/lib.rs b/extensions/http/lib.rs
deleted file mode 100644
index a20e74c03..000000000
--- a/extensions/http/lib.rs
+++ /dev/null
@@ -1,684 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-use deno_core::error::bad_resource_id;
-use deno_core::error::null_opbuf;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
-use deno_core::futures::FutureExt;
-use deno_core::futures::Stream;
-use deno_core::futures::StreamExt;
-use deno_core::include_js_files;
-use deno_core::op_async;
-use deno_core::op_sync;
-use deno_core::AsyncRefCell;
-use deno_core::ByteString;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::Extension;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
-use hyper::body::HttpBody;
-use hyper::http;
-use hyper::server::conn::Http;
-use hyper::service::Service as HyperService;
-use hyper::Body;
-use hyper::Request;
-use hyper::Response;
-use serde::Deserialize;
-use serde::Serialize;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::future::Future;
-use std::net::SocketAddr;
-use std::path::PathBuf;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::task::Context;
-use std::task::Poll;
-use tokio::io::AsyncRead;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWrite;
-use tokio::sync::oneshot;
-use tokio_util::io::StreamReader;
-
-pub fn get_unstable_declaration() -> PathBuf {
- PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_http.unstable.d.ts")
-}
-
-pub fn init() -> Extension {
- Extension::builder()
- .js(include_js_files!(
- prefix "deno:extensions/http",
- "01_http.js",
- ))
- .ops(vec![
- ("op_http_request_next", op_async(op_http_request_next)),
- ("op_http_request_read", op_async(op_http_request_read)),
- ("op_http_response", op_async(op_http_response)),
- ("op_http_response_write", op_async(op_http_response_write)),
- ("op_http_response_close", op_async(op_http_response_close)),
- (
- "op_http_websocket_accept_header",
- op_sync(op_http_websocket_accept_header),
- ),
- (
- "op_http_upgrade_websocket",
- op_async(op_http_upgrade_websocket),
- ),
- ])
- .build()
-}
-
-struct ServiceInner {
- request: Request<Body>,
- response_tx: oneshot::Sender<Response<Body>>,
-}
-
-#[derive(Clone, Default)]
-struct Service {
- inner: Rc<RefCell<Option<ServiceInner>>>,
- waker: Rc<deno_core::futures::task::AtomicWaker>,
-}
-
-impl HyperService<Request<Body>> for Service {
- type Response = Response<Body>;
- type Error = http::Error;
- #[allow(clippy::type_complexity)]
- type Future =
- Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
-
- fn poll_ready(
- &mut self,
- _cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- if self.inner.borrow().is_some() {
- Poll::Pending
- } else {
- Poll::Ready(Ok(()))
- }
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let (resp_tx, resp_rx) = oneshot::channel();
- self.inner.borrow_mut().replace(ServiceInner {
- request: req,
- response_tx: resp_tx,
- });
-
- async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
- }
-}
-
-type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>;
-
-struct Conn {
- scheme: &'static str,
- addr: SocketAddr,
- conn: Rc<RefCell<ConnFuture>>,
-}
-
-struct ConnResource {
- hyper_connection: Conn,
- deno_service: Service,
- cancel: CancelHandle,
-}
-
-impl ConnResource {
- // TODO(ry) impl Future for ConnResource?
- fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
- self
- .hyper_connection
- .conn
- .borrow_mut()
- .poll_unpin(cx)
- .map_err(AnyError::from)
- }
-}
-
-impl Resource for ConnResource {
- fn name(&self) -> Cow<str> {
- "httpConnection".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-// We use a tuple instead of struct to avoid serialization overhead of the keys.
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct NextRequestResponse(
- // request_rid:
- Option<ResourceId>,
- // response_sender_rid:
- ResourceId,
- // method:
- // This is a String rather than a ByteString because reqwest will only return
- // the method as a str which is guaranteed to be ASCII-only.
- String,
- // headers:
- Vec<(ByteString, ByteString)>,
- // url:
- String,
-);
-
-async fn op_http_request_next(
- state: Rc<RefCell<OpState>>,
- conn_rid: ResourceId,
- _: (),
-) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
-
- poll_fn(|cx| {
- conn_resource.deno_service.waker.register(cx.waker());
- let connection_closed = match conn_resource.poll(cx) {
- Poll::Pending => false,
- Poll::Ready(Ok(())) => {
- // try to close ConnResource, but don't unwrap as it might
- // already be closed
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid);
- true
- }
- Poll::Ready(Err(e)) => {
- // TODO(ry) close RequestResource associated with connection
- // TODO(ry) close ResponseBodyResource associated with connection
- // close ConnResource
- state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid)
- .unwrap();
-
- if should_ignore_error(&e) {
- true
- } else {
- return Poll::Ready(Err(e));
- }
- }
- };
- if let Some(request_resource) =
- conn_resource.deno_service.inner.borrow_mut().take()
- {
- let tx = request_resource.response_tx;
- let req = request_resource.request;
- let method = req.method().to_string();
-
- // We treat cookies specially, because we don't want them to get them
- // mangled by the `Headers` object in JS. What we do is take all cookie
- // headers and concat them into a single cookie header, seperated by
- // semicolons.
- let mut total_cookie_length = 0;
- let mut cookies = vec![];
-
- let mut headers = Vec::with_capacity(req.headers().len());
- for (name, value) in req.headers().iter() {
- if name == hyper::header::COOKIE {
- let bytes = value.as_bytes();
- total_cookie_length += bytes.len();
- cookies.push(bytes);
- } else {
- let name: &[u8] = name.as_ref();
- let value = value.as_bytes();
- headers
- .push((ByteString(name.to_owned()), ByteString(value.to_owned())));
- }
- }
-
- if !cookies.is_empty() {
- let cookie_count = cookies.len();
- total_cookie_length += (cookie_count * 2) - 2;
- let mut bytes = Vec::with_capacity(total_cookie_length);
- for (i, cookie) in cookies.into_iter().enumerate() {
- bytes.extend(cookie);
- if i != cookie_count - 1 {
- bytes.extend("; ".as_bytes());
- }
- }
- headers.push((
- ByteString("cookie".as_bytes().to_owned()),
- ByteString(bytes),
- ));
- }
-
- let url = {
- let scheme = &conn_resource.hyper_connection.scheme;
- let host: Cow<str> = if let Some(host) = req.uri().host() {
- Cow::Borrowed(host)
- } else if let Some(host) = req.headers().get("HOST") {
- Cow::Borrowed(host.to_str()?)
- } else {
- Cow::Owned(conn_resource.hyper_connection.addr.to_string())
- };
- let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
- format!("{}://{}{}", scheme, host, path)
- };
-
- let is_websocket_request = req
- .headers()
- .get(hyper::header::CONNECTION)
- .and_then(|v| {
- v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s))
- })
- .unwrap_or(false)
- && req
- .headers()
- .get(hyper::header::UPGRADE)
- .and_then(|v| {
- v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s))
- })
- .unwrap_or(false);
-
- let has_body = if let Some(exact_size) = req.size_hint().exact() {
- exact_size > 0
- } else {
- true
- };
-
- let maybe_request_rid = if is_websocket_request || has_body {
- let mut state = state.borrow_mut();
- let request_rid = state.resource_table.add(RequestResource {
- conn_rid,
- inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
- cancel: CancelHandle::default(),
- });
- Some(request_rid)
- } else {
- None
- };
-
- let mut state = state.borrow_mut();
- let response_sender_rid =
- state.resource_table.add(ResponseSenderResource {
- sender: tx,
- conn_rid,
- });
-
- Poll::Ready(Ok(Some(NextRequestResponse(
- maybe_request_rid,
- response_sender_rid,
- method,
- headers,
- url,
- ))))
- } else if connection_closed {
- Poll::Ready(Ok(None))
- } else {
- Poll::Pending
- }
- })
- .try_or_cancel(cancel)
- .await
- .map_err(AnyError::from)
-}
-
-fn should_ignore_error(e: &AnyError) -> bool {
- if let Some(e) = e.downcast_ref::<hyper::Error>() {
- use std::error::Error;
- if let Some(std_err) = e.source() {
- if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
- if io_err.kind() == std::io::ErrorKind::NotConnected {
- return true;
- }
- }
- }
- }
- false
-}
-
-pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
- state: &mut OpState,
- io: IO,
- addr: SocketAddr,
- scheme: &'static str,
-) -> Result<ResourceId, AnyError> {
- let deno_service = Service::default();
-
- let hyper_connection = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(io, deno_service.clone())
- .with_upgrades();
- let conn = Pin::new(Box::new(hyper_connection));
- let conn_resource = ConnResource {
- hyper_connection: Conn {
- scheme,
- addr,
- conn: Rc::new(RefCell::new(conn)),
- },
- deno_service,
- cancel: CancelHandle::default(),
- };
- let rid = state.resource_table.add(conn_resource);
- Ok(rid)
-}
-
-// We use a tuple instead of struct to avoid serialization overhead of the keys.
-#[derive(Deserialize)]
-struct RespondArgs(
- // rid:
- u32,
- // status:
- u16,
- // headers:
- Vec<(ByteString, ByteString)>,
-);
-
-async fn op_http_response(
- state: Rc<RefCell<OpState>>,
- args: RespondArgs,
- data: Option<ZeroCopyBuf>,
-) -> Result<Option<ResourceId>, AnyError> {
- let RespondArgs(rid, status, headers) = args;
-
- let response_sender = state
- .borrow_mut()
- .resource_table
- .take::<ResponseSenderResource>(rid)
- .ok_or_else(bad_resource_id)?;
- let response_sender = Rc::try_unwrap(response_sender)
- .ok()
- .expect("multiple op_http_respond ongoing");
-
- let conn_rid = response_sender.conn_rid;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut builder = Response::builder().status(status);
-
- builder.headers_mut().unwrap().reserve(headers.len());
- for (key, value) in &headers {
- builder = builder.header(key.as_ref(), value.as_ref());
- }
-
- let res;
- let maybe_response_body_rid = if let Some(d) = data {
- // If a body is passed, we use it, and don't return a body for streaming.
- res = builder.body(Vec::from(&*d).into())?;
- None
- } else {
- // If no body is passed, we return a writer for streaming the body.
- let (sender, body) = Body::channel();
- res = builder.body(body)?;
-
- let response_body_rid =
- state.borrow_mut().resource_table.add(ResponseBodyResource {
- body: AsyncRefCell::new(sender),
- conn_rid,
- });
-
- Some(response_body_rid)
- };
-
- // oneshot::Sender::send(v) returns |v| on error, not an error object.
- // The only failure mode is the receiver already having dropped its end
- // of the channel.
- if response_sender.sender.send(res).is_err() {
- return Err(type_error("internal communication error"));
- }
-
- poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => {
- state.borrow_mut().resource_table.close(conn_rid);
- Poll::Ready(x)
- }
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await?;
-
- if maybe_response_body_rid.is_none() {
- conn_resource.deno_service.waker.wake();
- }
- Ok(maybe_response_body_rid)
-}
-
-async fn op_http_response_close(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- _: (),
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .take::<ResponseBodyResource>(rid)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
- drop(resource);
-
- let r = poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await;
- conn_resource.deno_service.waker.wake();
- r
-}
-
-async fn op_http_request_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: Option<ZeroCopyBuf>,
-) -> Result<usize, AnyError> {
- let mut data = data.ok_or_else(null_opbuf)?;
-
- let resource = state
- .borrow()
- .resource_table
- .get::<RequestResource>(rid as u32)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut inner = RcRef::map(resource.clone(), |r| &r.inner)
- .borrow_mut()
- .await;
-
- if let RequestOrStreamReader::Request(req) = &mut *inner {
- let req = req.take().unwrap();
- let stream: BytesStream = Box::pin(req.into_body().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
- let reader = StreamReader::new(stream);
- *inner = RequestOrStreamReader::StreamReader(reader);
- };
-
- let reader = match &mut *inner {
- RequestOrStreamReader::StreamReader(reader) => reader,
- _ => unreachable!(),
- };
-
- let cancel = RcRef::map(resource, |r| &r.cancel);
-
- let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
-
- poll_fn(|cx| {
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
- }
-
- read_fut.poll_unpin(cx).map_err(AnyError::from)
- })
- .await
-}
-
-async fn op_http_response_write(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: Option<ZeroCopyBuf>,
-) -> Result<(), AnyError> {
- let buf = data.ok_or_else(null_opbuf)?;
- let resource = state
- .borrow()
- .resource_table
- .get::<ResponseBodyResource>(rid as u32)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
-
- let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
-
- poll_fn(|cx| {
- let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
-
- // Poll connection so the data is flushed
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
- }
-
- r
- })
- .await?;
-
- Ok(())
-}
-
-fn op_http_websocket_accept_header(
- _: &mut OpState,
- key: String,
- _: (),
-) -> Result<String, AnyError> {
- let digest = ring::digest::digest(
- &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
- format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(),
- );
- Ok(base64::encode(digest))
-}
-
-async fn op_http_upgrade_websocket(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- _: (),
-) -> Result<ResourceId, AnyError> {
- let req_resource = state
- .borrow_mut()
- .resource_table
- .take::<RequestResource>(rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await;
-
- if let RequestOrStreamReader::Request(req) = inner.as_mut() {
- let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?;
- let stream =
- deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
- upgraded,
- deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
- None,
- )
- .await;
-
- let (ws_tx, ws_rx) = stream.split();
- let rid =
- state
- .borrow_mut()
- .resource_table
- .add(deno_websocket::WsStreamResource {
- stream: deno_websocket::WebSocketStreamType::Server {
- rx: AsyncRefCell::new(ws_rx),
- tx: AsyncRefCell::new(ws_tx),
- },
- cancel: Default::default(),
- });
-
- Ok(rid)
- } else {
- Err(bad_resource_id())
- }
-}
-
-type BytesStream =
- Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
-
-enum RequestOrStreamReader {
- Request(Option<Request<hyper::Body>>),
- StreamReader(StreamReader<BytesStream, bytes::Bytes>),
-}
-
-struct RequestResource {
- conn_rid: ResourceId,
- inner: AsyncRefCell<RequestOrStreamReader>,
- cancel: CancelHandle,
-}
-
-impl Resource for RequestResource {
- fn name(&self) -> Cow<str> {
- "request".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-struct ResponseSenderResource {
- sender: oneshot::Sender<Response<Body>>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseSenderResource {
- fn name(&self) -> Cow<str> {
- "responseSender".into()
- }
-}
-
-struct ResponseBodyResource {
- body: AsyncRefCell<hyper::body::Sender>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseBodyResource {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
-}
-
-// Needed so hyper can use non Send futures
-#[derive(Clone)]
-struct LocalExecutor;
-
-impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
-where
- Fut: Future + 'static,
- Fut::Output: 'static,
-{
- fn execute(&self, fut: Fut) {
- tokio::task::spawn_local(fut);
- }
-}