diff options
-rw-r--r-- | cli/tests/unit/http_test.ts | 78 | ||||
-rw-r--r-- | ext/http/01_http.js | 272 | ||||
-rw-r--r-- | ext/http/lib.rs | 891 | ||||
-rw-r--r-- | ext/websocket/lib.rs | 26 | ||||
-rw-r--r-- | runtime/errors.rs | 5 | ||||
-rw-r--r-- | runtime/ops/http.rs | 5 |
6 files changed, 661 insertions, 616 deletions
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index a6f80eb2c..c57b85441 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -7,6 +7,7 @@ import { assert, assertEquals, assertRejects, + assertStrictEquals, assertThrows, deferred, delay, @@ -386,7 +387,7 @@ unitTest( Deno.errors.Http, "connection closed", ); - // The error from `op_http_request_next` reroutes to `respondWith()`. + // The error from `op_http_accept` reroutes to `respondWith()`. assertEquals(await nextRequestPromise, null); listener.close(); })(); @@ -865,6 +866,7 @@ unitTest( const writer = writable.getWriter(); async function writeResponse() { + await delay(50); await writer.write( new TextEncoder().encode( "written to the writable side of a TransformStream", @@ -1000,6 +1002,80 @@ unitTest( }, ); +// https://github.com/denoland/deno/issues/12193 +unitTest( + { permissions: { net: true } }, + async function httpConnConcurrentNextRequestCalls() { + const hostname = "localhost"; + const port = 4501; + + async function server() { + const listener = Deno.listen({ hostname, port }); + const tcpConn = await listener.accept(); + const httpConn = Deno.serveHttp(tcpConn); + const promises = new Array(10).fill(null).map(async (_, i) => { + const event = await httpConn.nextRequest(); + assert(event); + const { pathname } = new URL(event.request.url); + assertStrictEquals(pathname, `/${i}`); + const response = new Response(`Response #${i}`); + await event.respondWith(response); + }); + await Promise.all(promises); + httpConn.close(); + listener.close(); + } + + async function client() { + for (let i = 0; i < 10; i++) { + const response = await fetch(`http://${hostname}:${port}/${i}`); + const body = await response.text(); + assertStrictEquals(body, `Response #${i}`); + } + } + + await Promise.all([server(), delay(100).then(client)]); + }, +); + +// https://github.com/denoland/deno/pull/12704 +// https://github.com/denoland/deno/pull/12732 +unitTest( + { permissions: { net: true } }, + async function httpConnAutoCloseDelayedOnUpgrade() { + const hostname = "localhost"; + const port = 4501; + + async function server() { + const listener = Deno.listen({ hostname, port }); + const tcpConn = await listener.accept(); + const httpConn = Deno.serveHttp(tcpConn); + + const event1 = await httpConn.nextRequest() as Deno.RequestEvent; + const event2Promise = httpConn.nextRequest(); + + const { socket, response } = Deno.upgradeWebSocket(event1.request); + socket.onmessage = (event) => socket.send(event.data); + event1.respondWith(response); + + const event2 = await event2Promise; + assertStrictEquals(event2, null); + + listener.close(); + } + + async function client() { + const socket = new WebSocket(`ws://${hostname}:${port}/`); + socket.onopen = () => socket.send("bla bla"); + const { data } = await new Promise((res) => socket.onmessage = res); + assertStrictEquals(data, "bla bla"); + socket.close(); + } + + await Promise.all([server(), client()]); + }, +); + unitTest( { permissions: { net: true } }, async function httpServerRespondNonAsciiUint8Array() { diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 9f05809f5..94f1a1051 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -27,6 +27,7 @@ Set, SetPrototypeAdd, SetPrototypeDelete, + SetPrototypeHas, SetPrototypeValues, StringPrototypeIncludes, StringPrototypeToLowerCase, @@ -42,6 +43,8 @@ class HttpConn { #rid = 0; + #closed = false; + // This set holds resource ids of resources // that were created during lifecycle of this request. // When the connection is closed these resources should be closed @@ -62,10 +65,11 @@ let nextRequest; try { nextRequest = await core.opAsync( - "op_http_request_next", + "op_http_accept", this.#rid, ); } catch (error) { + this.close(); // A connection error seen here would cause disrupted responses to throw // a generic `BadResource` error. Instead store this error and replace // those with it. @@ -79,26 +83,31 @@ } throw error; } - if (nextRequest === null) return null; + if (nextRequest == null) { + // Work-around for servers (deno_std/http in particular) that call + // `nextRequest()` before upgrading a previous request which has a + // `connection: upgrade` header. + await null; + + this.close(); + return null; + } const [ - requestRid, - responseSenderRid, + streamRid, method, headersList, url, ] = nextRequest; + SetPrototypeAdd(this.managedResources, streamRid); /** @type {ReadableStream<Uint8Array> | undefined} */ let body = null; - if (typeof requestRid === "number") { - SetPrototypeAdd(this.managedResources, requestRid); - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(this, requestRid); - } + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + if (method !== "GET" && method !== "HEAD") { + body = createRequestBodyStream(streamRid); } const innerRequest = newInnerRequest( @@ -111,22 +120,21 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - SetPrototypeAdd(this.managedResources, responseSenderRid); - const respondWith = createRespondWith( - this, - responseSenderRid, - requestRid, - ); + const respondWith = createRespondWith(this, streamRid); return { request, respondWith }; } /** @returns {void} */ close() { - for (const rid of SetPrototypeValues(this.managedResources)) { - core.tryClose(rid); + if (!this.#closed) { + this.#closed = true; + core.close(this.#rid); + for (const rid of SetPrototypeValues(this.managedResources)) { + SetPrototypeDelete(this.managedResources, rid); + core.close(rid); + } } - core.close(this.#rid); } [SymbolAsyncIterator]() { @@ -136,97 +144,86 @@ async next() { const reqEvt = await httpConn.nextRequest(); // Change with caution, current form avoids a v8 deopt - return { value: reqEvt, done: reqEvt === null }; + return { value: reqEvt ?? undefined, done: reqEvt === null }; }, }; } } - function readRequest(requestRid, zeroCopyBuf) { - return core.opAsync( - "op_http_request_read", - requestRid, - zeroCopyBuf, - ); + function readRequest(streamRid, buf) { + return core.opAsync("op_http_read", streamRid, buf); } - function createRespondWith(httpConn, responseSenderRid, requestRid) { + function createRespondWith(httpConn, streamRid) { return async function respondWith(resp) { - if (resp instanceof Promise) { - resp = await resp; - } + try { + if (resp instanceof Promise) { + resp = await resp; + } - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } - const innerResp = toInnerResponse(resp); - - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if ( - innerResp.body.length === null || - innerResp.body.source instanceof Blob - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) { + throw new TypeError("Body is unusable."); + } + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { + respBody = innerResp.body.stream; } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; } } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; + respBody = new Uint8Array(0); } - } else { - respBody = new Uint8Array(0); - } + const isStreamingResponseBody = + !(typeof respBody === "string" || respBody instanceof Uint8Array); - SetPrototypeDelete(httpConn.managedResources, responseSenderRid); - let responseBodyRid; - try { - responseBodyRid = await core.opAsync( - "op_http_response", - [ - responseSenderRid, + try { + await core.opAsync("op_http_write_headers", [ + streamRid, innerResp.status ?? 200, innerResp.headerList, - ], - (respBody instanceof Uint8Array || typeof respBody === "string") - ? respBody - : null, - ); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); + ], isStreamingResponseBody ? null : respBody); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; } - throw error; - } - // If `respond` returns a responseBodyRid, we should stream the body - // to that resource. - if (responseBodyRid !== null) { - SetPrototypeAdd(httpConn.managedResources, responseBodyRid); - try { + if (isStreamingResponseBody) { if (respBody === null || !(respBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); } @@ -239,11 +236,7 @@ break; } try { - await core.opAsync( - "op_http_response_write", - responseBodyRid, - value, - ); + await core.opAsync("op_http_write", streamRid, value); } catch (error) { const connError = httpConn[connErrorSymbol]; if (error instanceof BadResource && connError != null) { @@ -254,61 +247,55 @@ throw error; } } - } finally { - // Once all chunks are sent, and the request body is closed, we can - // close the response body. - SetPrototypeDelete(httpConn.managedResources, responseBodyRid); try { - await core.opAsync("op_http_response_close", responseBodyRid); - } catch { /* pass */ } + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } - } - const ws = resp[_ws]; - if (ws) { - if (typeof requestRid !== "number") { - throw new TypeError( - "This request can not be upgraded to a websocket connection.", + const ws = resp[_ws]; + if (ws) { + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + streamRid, ); - } + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - const wsRid = await core.opAsync( - "op_http_upgrade_websocket", - requestRid, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + httpConn.close(); - if (ws[_readyState] === WebSocket.CLOSING) { - await core.opAsync("op_ws_close", { rid: wsRid }); + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); - ws[_readyState] = WebSocket.CLOSED; + ws[_readyState] = WebSocket.CLOSED; - const errEvent = new ErrorEvent("error"); - ws.dispatchEvent(errEvent); + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); - const event = new CloseEvent("close"); - ws.dispatchEvent(event); + const event = new CloseEvent("close"); + ws.dispatchEvent(event); - core.tryClose(wsRid); - } else { - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); + core.tryClose(wsRid); + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); - ws[_eventLoop](); + ws[_eventLoop](); + } + } + } finally { + if (SetPrototypeHas(httpConn.managedResources, streamRid)) { + SetPrototypeDelete(httpConn.managedResources, streamRid); + core.close(streamRid); } - } else if (typeof requestRid === "number") { - // Try to close "request" resource. It might have been already consumed, - // but if it hasn't been we need to close it here to avoid resource - // leak. - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.tryClose(requestRid); } }; } - function createRequestBodyStream(httpConn, requestRid) { + function createRequestBodyStream(streamRid) { return new ReadableStream({ type: "bytes", async pull(controller) { @@ -316,32 +303,21 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest( - requestRid, - chunk, - ); + const read = await readRequest(streamRid, chunk); if (read > 0) { // We read some data. Enqueue it onto the stream. controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); } else { // We have reached the end of the body, so we close the stream. controller.close(); - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); } } catch (err) { // There was an error while reading a chunk of the body, so we // error. controller.error(err); controller.close(); - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); } }, - cancel() { - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); - }, }); } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index aae6415cb..5a14f845f 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,17 +1,29 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use deno_core::error::bad_resource_id; -use deno_core::error::type_error; +use bytes::Bytes; +use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; +use deno_core::futures::channel::mpsc; +use deno_core::futures::channel::oneshot; +use deno_core::futures::future::pending; +use deno_core::futures::future::select; +use deno_core::futures::future::Either; +use deno_core::futures::future::Pending; +use deno_core::futures::future::RemoteHandle; +use deno_core::futures::future::Shared; +use deno_core::futures::never::Never; +use deno_core::futures::pin_mut; +use deno_core::futures::ready; +use deno_core::futures::stream::Peekable; use deno_core::futures::FutureExt; -use deno_core::futures::Stream; use deno_core::futures::StreamExt; +use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; use deno_core::ByteString; +use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -21,33 +33,31 @@ use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; -use hyper::body::HttpBody; -use hyper::header::CONNECTION; -use hyper::header::SEC_WEBSOCKET_KEY; -use hyper::header::SEC_WEBSOCKET_VERSION; -use hyper::header::UPGRADE; -use hyper::http; +use deno_websocket::ws_create_server_stream; use hyper::server::conn::Http; -use hyper::service::Service as HyperService; +use hyper::service::Service; use hyper::Body; -use hyper::Method; use hyper::Request; use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::cmp::min; +use std::error::Error; use std::future::Future; +use std::io; +use std::mem::replace; +use std::mem::take; use std::net::SocketAddr; use std::pin::Pin; use std::rc::Rc; +use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; -use tokio::sync::oneshot; -use tokio_util::io::StreamReader; +use tokio::task::spawn_local; pub fn init() -> Extension { Extension::builder() @@ -56,11 +66,11 @@ pub fn init() -> Extension { "01_http.js", )) .ops(vec![ - ("op_http_request_next", op_async(op_http_request_next)), - ("op_http_request_read", op_async(op_http_request_read)), - ("op_http_response", op_async(op_http_response)), - ("op_http_response_write", op_async(op_http_response_write)), - ("op_http_response_close", op_async(op_http_response_close)), + ("op_http_accept", op_async(op_http_accept)), + ("op_http_read", op_async(op_http_read)), + ("op_http_write_headers", op_async(op_http_write_headers)), + ("op_http_write", op_async(op_http_write)), + ("op_http_shutdown", op_async(op_http_shutdown)), ( "op_http_websocket_accept_header", op_sync(op_http_websocket_accept_header), @@ -73,86 +83,247 @@ pub fn init() -> Extension { .build() } -struct ServiceInner { - request: Request<Body>, - response_tx: oneshot::Sender<Response<Body>>, +struct HttpConnResource { + addr: SocketAddr, + scheme: &'static str, + acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>, + closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>, + cancel_handle: Rc<CancelHandle>, // Closes gracefully and cancels accept ops. } -#[derive(Clone, Default)] -struct Service { - inner: Rc<RefCell<Option<ServiceInner>>>, - waker: Rc<deno_core::futures::task::AtomicWaker>, +impl HttpConnResource { + fn new<S>(io: S, scheme: &'static str, addr: SocketAddr) -> Self + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let (acceptors_tx, acceptors_rx) = mpsc::unbounded::<HttpAcceptor>(); + let service = HttpService::new(acceptors_rx); + + let conn_fut = Http::new() + .with_executor(LocalExecutor) + .serve_connection(io, service) + .with_upgrades(); + + // When the cancel handle is used, the connection shuts down gracefully. + // No new HTTP streams will be accepted, but existing streams will be able + // to continue operating and eventually shut down cleanly. + let cancel_handle = CancelHandle::new_rc(); + let shutdown_fut = never().or_cancel(&cancel_handle).fuse(); + + // A local task that polls the hyper connection future to completion. + let task_fut = async move { + pin_mut!(shutdown_fut); + pin_mut!(conn_fut); + let result = match select(conn_fut, shutdown_fut).await { + Either::Left((result, _)) => result, + Either::Right((_, mut conn_fut)) => { + conn_fut.as_mut().graceful_shutdown(); + conn_fut.await + } + }; + filter_enotconn(result).map_err(Arc::from) + }; + let (task_fut, closed_fut) = task_fut.remote_handle(); + let closed_fut = closed_fut.shared(); + spawn_local(task_fut); + + Self { + addr, + scheme, + acceptors_tx, + closed_fut, + cancel_handle, + } + } + + // Accepts a new incoming HTTP request. + async fn accept( + self: &Rc<Self>, + ) -> Result<Option<HttpStreamResource>, AnyError> { + let fut = async { + let (request_tx, request_rx) = oneshot::channel(); + let (response_tx, response_rx) = oneshot::channel(); + + let acceptor = HttpAcceptor::new(request_tx, response_rx); + self.acceptors_tx.unbounded_send(acceptor).ok()?; + + let request = request_rx.await.ok()?; + let stream = HttpStreamResource::new(self, request, response_tx); + Some(stream) + }; + + async { + match fut.await { + Some(stream) => Ok(Some(stream)), + // Return the connection error, if any. + None => self.closed().map_ok(|_| None).await, + } + } + .try_or_cancel(&self.cancel_handle) + .await + } + + /// A future that completes when this HTTP connection is closed or errors. + async fn closed(&self) -> Result<(), AnyError> { + self.closed_fut.clone().map_err(AnyError::from).await + } + + fn scheme(&self) -> &'static str { + self.scheme + } + + fn addr(&self) -> SocketAddr { + self.addr + } } -impl HyperService<Request<Body>> for Service { +impl Resource for HttpConnResource { + fn name(&self) -> Cow<str> { + "httpConn".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel(); + } +} + +/// Creates a new HttpConn resource which uses `io` as its transport. +pub fn http_create_conn_resource<S>( + state: &mut OpState, + io: S, + addr: SocketAddr, + scheme: &'static str, +) -> Result<ResourceId, AnyError> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let conn = HttpConnResource::new(io, scheme, addr); + let rid = state.resource_table.add(conn); + Ok(rid) +} + +/// An object that implements the `hyper::Service` trait, through which Hyper +/// delivers incoming HTTP requests. +struct HttpService { + acceptors_rx: Peekable<mpsc::UnboundedReceiver<HttpAcceptor>>, +} + +impl HttpService { + fn new(acceptors_rx: mpsc::UnboundedReceiver<HttpAcceptor>) -> Self { + let acceptors_rx = acceptors_rx.peekable(); + Self { acceptors_rx } + } +} + +impl Service<Request<Body>> for HttpService { type Response = Response<Body>; - type Error = http::Error; - #[allow(clippy::type_complexity)] - type Future = - Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>; + type Error = oneshot::Canceled; + type Future = oneshot::Receiver<Response<Body>>; fn poll_ready( &mut self, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>> { - if self.inner.borrow().is_some() { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } + let acceptors_rx = Pin::new(&mut self.acceptors_rx); + let result = ready!(acceptors_rx.poll_peek(cx)) + .map(|_| ()) + .ok_or(oneshot::Canceled); + Poll::Ready(result) } - fn call(&mut self, req: Request<Body>) -> Self::Future { - let (resp_tx, resp_rx) = oneshot::channel(); - self.inner.borrow_mut().replace(ServiceInner { - request: req, - response_tx: resp_tx, - }); - - async move { - resp_rx.await.or_else(|_| - // Fallback dummy response in case sender was dropped due to closed conn - Response::builder() - .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) - .body(vec![].into())) - } - .boxed_local() + fn call(&mut self, request: Request<Body>) -> Self::Future { + let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap(); + acceptor.call(request) } } -type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; +/// A pair of one-shot channels which first transfer a HTTP request from the +/// Hyper service to the HttpConn resource, and then take the Response back to +/// the service. +struct HttpAcceptor { + request_tx: oneshot::Sender<Request<Body>>, + response_rx: oneshot::Receiver<Response<Body>>, +} -struct Conn { - scheme: &'static str, - addr: SocketAddr, - conn: Rc<RefCell<ConnFuture>>, +impl HttpAcceptor { + fn new( + request_tx: oneshot::Sender<Request<Body>>, + response_rx: oneshot::Receiver<Response<Body>>, + ) -> Self { + Self { + request_tx, + response_rx, + } + } + + fn call(self, request: Request<Body>) -> oneshot::Receiver<Response<Body>> { + let Self { + request_tx, + response_rx, + } = self; + request_tx + .send(request) + .map(|_| response_rx) + .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver. + } } -struct ConnResource { - hyper_connection: Conn, - deno_service: Service, - cancel: CancelHandle, +/// A resource representing a single HTTP request/response stream. +struct HttpStreamResource { + conn: Rc<HttpConnResource>, + rd: AsyncRefCell<HttpRequestReader>, + wr: AsyncRefCell<HttpResponseWriter>, + cancel_handle: CancelHandle, } -impl ConnResource { - // TODO(ry) impl Future for ConnResource? - fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { - self - .hyper_connection - .conn - .borrow_mut() - .poll_unpin(cx) - .map_err(AnyError::from) +impl HttpStreamResource { + fn new( + conn: &Rc<HttpConnResource>, + request: Request<Body>, + response_tx: oneshot::Sender<Response<Body>>, + ) -> Self { + Self { + conn: conn.clone(), + rd: HttpRequestReader::Headers(request).into(), + wr: HttpResponseWriter::Headers(response_tx).into(), + cancel_handle: CancelHandle::new(), + } } } -impl Resource for ConnResource { +impl Resource for HttpStreamResource { fn name(&self) -> Cow<str> { - "httpConnection".into() + "httpStream".into() } fn close(self: Rc<Self>) { - self.cancel.cancel() + self.cancel_handle.cancel(); + } +} + +/// The read half of an HTTP stream. +enum HttpRequestReader { + Headers(Request<Body>), + Body(Peekable<Body>), + Closed, +} + +impl Default for HttpRequestReader { + fn default() -> Self { + Self::Closed + } +} + +/// The write half of an HTTP stream. +enum HttpResponseWriter { + Headers(oneshot::Sender<Response<Body>>), + Body(hyper::body::Sender), + Closed, +} + +impl Default for HttpResponseWriter { + fn default() -> Self { + Self::Closed } } @@ -160,9 +331,7 @@ impl Resource for ConnResource { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // request_rid: - Option<ResourceId>, - // response_sender_rid: + // stream_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -174,111 +343,40 @@ struct NextRequestResponse( String, ); -async fn op_http_request_next( +async fn op_http_accept( state: Rc<RefCell<OpState>>, - conn_rid: ResourceId, + rid: ResourceId, _: (), ) -> Result<Option<NextRequestResponse>, AnyError> { - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid)?; - - let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); - - poll_fn(|cx| { - conn_resource.deno_service.waker.register(cx.waker()); - - // Check if conn is open/close/errored - let (conn_closed, conn_result) = match conn_resource.poll(cx) { - Poll::Pending => (false, Ok(())), - Poll::Ready(Ok(())) => (true, Ok(())), - Poll::Ready(Err(e)) => { - if should_ignore_error(&e) { - (true, Ok(())) - } else { - (true, Err(e)) - } - } - }; - // Drop conn resource if closed - if conn_closed { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid); - - // Fail with err if unexpected conn error, early return None otherwise - return Poll::Ready(conn_result.map(|_| None)); - } + let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?; - if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() { - let Conn { scheme, addr, .. } = conn_resource.hyper_connection; - let mut state = state.borrow_mut(); - let next = - prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?; - Poll::Ready(Ok(Some(next))) - } else { - Poll::Pending - } - }) - .try_or_cancel(cancel) - .await - .map_err(AnyError::from) -} + let stream = match conn.accept().await { + Ok(Some(stream)) => Rc::new(stream), + Ok(None) => return Ok(None), + Err(err) => return Err(err), + }; -fn prepare_next_request( - state: &mut OpState, - conn_rid: ResourceId, - request_resource: ServiceInner, - scheme: &'static str, - addr: SocketAddr, -) -> Result<NextRequestResponse, AnyError> { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - let headers = req_headers(&req); - let url = req_url(&req, scheme, addr)?; - - let is_websocket = is_websocket_request(&req); - let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD); - let has_body = - is_websocket || (can_have_body && req.size_hint().exact() != Some(0)); - - let maybe_request_rid = if has_body { - let request_rid = state.resource_table.add(RequestResource { - conn_rid, - inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), - cancel: CancelHandle::default(), - }); - Some(request_rid) - } else { - None + let rd = RcRef::map(&stream, |r| &r.rd).borrow().await; + let request = match &*rd { + HttpRequestReader::Headers(request) => request, + _ => unreachable!(), }; - let response_sender_rid = state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); - - Ok(NextRequestResponse( - maybe_request_rid, - response_sender_rid, - method, - headers, - url, - )) + let method = request.method().to_string(); + let headers = req_headers(request); + let url = req_url(request, conn.scheme(), conn.addr()); + + let stream_rid = state.borrow_mut().resource_table.add_rc(stream); + + let r = NextRequestResponse(stream_rid, method, headers, url); + Ok(Some(r)) } fn req_url( req: &hyper::Request<hyper::Body>, scheme: &'static str, addr: SocketAddr, -) -> Result<String, AnyError> { +) -> String { let host: Cow<str> = if let Some(auth) = req.uri().authority() { match addr.port() { 443 if scheme == "https" => Cow::Borrowed(auth.host()), @@ -288,12 +386,22 @@ fn req_url( } else if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) + match host.to_str() { + Ok(host) => Cow::Borrowed(host), + Err(_) => Cow::Owned( + host + .as_bytes() + .iter() + .cloned() + .map(char::from) + .collect::<String>(), + ), + } } else { Cow::Owned(addr.to_string()) }; let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - Ok([scheme, "://", &host, path].concat()) + [scheme, "://", &host, path].concat() } fn req_headers( @@ -327,68 +435,6 @@ fn req_headers( headers } -fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool { - req.version() == hyper::Version::HTTP_11 - && req.method() == hyper::Method::GET - && req.headers().contains_key(&SEC_WEBSOCKET_KEY) - && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13" - && header(req.headers(), &UPGRADE) - .split(|c| *c == b' ' || *c == b',') - .any(|token| token.eq_ignore_ascii_case(b"websocket")) - && header(req.headers(), &CONNECTION) - .split(|c| *c == b' ' || *c == b',') - .any(|token| token.eq_ignore_ascii_case(b"upgrade")) -} - -fn header<'a>( - h: &'a hyper::http::HeaderMap, - name: &hyper::header::HeaderName, -) -> &'a [u8] { - h.get(name) - .map(hyper::header::HeaderValue::as_bytes) - .unwrap_or_default() -} - -fn should_ignore_error(e: &AnyError) -> bool { - if let Some(e) = e.downcast_ref::<hyper::Error>() { - use std::error::Error; - if let Some(std_err) = e.source() { - if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() { - if io_err.kind() == std::io::ErrorKind::NotConnected { - return true; - } - } - } - } - false -} - -pub fn start_http<IO: AsyncRead + AsyncWrite + Unpin + Send + 'static>( - state: &mut OpState, - io: IO, - addr: SocketAddr, - scheme: &'static str, -) -> Result<ResourceId, AnyError> { - let deno_service = Service::default(); - - let hyper_connection = Http::new() - .with_executor(LocalExecutor) - .serve_connection(io, deno_service.clone()) - .with_upgrades(); - let conn = Pin::new(Box::new(hyper_connection)); - let conn_resource = ConnResource { - hyper_connection: Conn { - scheme, - addr, - conn: Rc::new(RefCell::new(conn)), - }, - deno_service, - cancel: CancelHandle::default(), - }; - let rid = state.resource_table.add(conn_resource); - Ok(rid) -} - // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Deserialize)] struct RespondArgs( @@ -400,27 +446,16 @@ struct RespondArgs( Vec<(ByteString, ByteString)>, ); -async fn op_http_response( +async fn op_http_write_headers( state: Rc<RefCell<OpState>>, args: RespondArgs, data: Option<StringOrBuffer>, -) -> Result<Option<ResourceId>, AnyError> { +) -> Result<(), AnyError> { let RespondArgs(rid, status, headers) = args; - - let response_sender = state + let stream = state .borrow_mut() .resource_table - .take::<ResponseSenderResource>(rid)?; - let response_sender = Rc::try_unwrap(response_sender) - .ok() - .expect("multiple op_http_respond ongoing"); - - let conn_rid = response_sender.conn_rid; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(conn_rid)?; + .get::<HttpStreamResource>(rid)?; let mut builder = Response::builder().status(status); @@ -429,171 +464,138 @@ async fn op_http_response( builder = builder.header(key.as_ref(), value.as_ref()); } - let (maybe_response_body_rid, res) = if let Some(d) = data { - // If a body is passed, we use it, and don't return a body for streaming. - (None, builder.body(d.into_bytes().into())?) - } else { - // If no body is passed, we return a writer for streaming the body. - let (sender, body) = Body::channel(); - let res = builder.body(body)?; - - let response_body_rid = - state.borrow_mut().resource_table.add(ResponseBodyResource { - body: AsyncRefCell::new(sender), - conn_rid, - }); - - (Some(response_body_rid), res) - }; - - // oneshot::Sender::send(v) returns |v| on error, not an error object. - // The only failure mode is the receiver already having dropped its end - // of the channel. - if response_sender.sender.send(res).is_err() { - if let Some(rid) = maybe_response_body_rid { - let _ = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid); - } - return Err(type_error("internal communication error")); - } + let body: Response<Body>; + let new_wr: HttpResponseWriter; - let result = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => { - state.borrow_mut().resource_table.close(conn_rid).ok(); - Poll::Ready(x) + match data { + Some(data) => { + // If a buffer was passed, we use it to construct a response body. + body = builder.body(data.into_bytes().into())?; + new_wr = HttpResponseWriter::Closed; } - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - - if let Err(e) = result { - if let Some(rid) = maybe_response_body_rid { - let _ = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid); + None => { + // If no buffer was passed, the caller will stream the response body. + let (body_tx, body_rx) = Body::channel(); + body = builder.body(body_rx)?; + new_wr = HttpResponseWriter::Body(body_tx); } - return Err(e); } - if maybe_response_body_rid.is_none() { - conn_resource.deno_service.waker.wake(); + let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + let response_tx = match replace(&mut *old_wr, new_wr) { + HttpResponseWriter::Headers(response_tx) => response_tx, + _ => return Err(http_error("response headers already sent")), + }; + + match response_tx.send(body) { + Ok(_) => Ok(()), + Err(_) => { + stream.conn.closed().await?; + Err(http_error("connection closed while sending response")) + } } - Ok(maybe_response_body_rid) } -async fn op_http_response_close( +async fn op_http_write( state: Rc<RefCell<OpState>>, rid: ResourceId, - _: (), + buf: ZeroCopyBuf, ) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .take::<ResponseBodyResource>(rid)?; - - let conn_resource = state + let stream = state .borrow() .resource_table - .get::<ConnResource>(resource.conn_rid)?; - drop(resource); - - let r = poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), - Poll::Pending => Poll::Ready(Ok(())), - }) - .await; - conn_resource.deno_service.waker.wake(); - r -} - -async fn op_http_request_read( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - mut data: ZeroCopyBuf, -) -> Result<usize, AnyError> { - let resource = state - .borrow() - .resource_table - .get::<RequestResource>(rid as u32)?; - - let conn_resource = state - .borrow() - .resource_table - .get::<ConnResource>(resource.conn_rid)?; - - let mut inner = RcRef::map(resource.clone(), |r| &r.inner) - .borrow_mut() - .await; - - if let RequestOrStreamReader::Request(req) = &mut *inner { - let req = req.take().unwrap(); - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let reader = StreamReader::new(stream); - *inner = RequestOrStreamReader::StreamReader(reader); - }; - - let reader = match &mut *inner { - RequestOrStreamReader::StreamReader(reader) => reader, - _ => unreachable!(), - }; - - let cancel = RcRef::map(resource, |r| &r.cancel); - - let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + + loop { + let body_tx = match &mut *wr { + HttpResponseWriter::Body(body_tx) => body_tx, + HttpResponseWriter::Headers(_) => { + break Err(http_error("no response headers")) + } + HttpResponseWriter::Closed => { + break Err(http_error("response already completed")) + } + }; - poll_fn(|cx| { - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); + let bytes = Bytes::copy_from_slice(&buf[..]); + match body_tx.send_data(bytes).await { + Ok(_) => break Ok(()), + Err(err) => { + // Don't return "channel closed", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + assert!(err.is_closed()); + stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } } - - read_fut.poll_unpin(cx).map_err(AnyError::from) - }) - .await + } } -async fn op_http_response_write( +/// Gracefully closes the write half of the HTTP stream. Note that this does not +/// remove the HTTP stream resource from the resource table; it still has to be +/// closed with `Deno.core.close()`. +async fn op_http_shutdown( state: Rc<RefCell<OpState>>, rid: ResourceId, - data: ZeroCopyBuf, + _: (), ) -> Result<(), AnyError> { - let resource = state + let stream = state .borrow() .resource_table - .get::<ResponseBodyResource>(rid as u32)?; + .get::<HttpStreamResource>(rid)?; + let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; + take(&mut *wr); + Ok(()) +} - let conn_resource = state - .borrow() +async fn op_http_read( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + mut buf: ZeroCopyBuf, +) -> Result<usize, AnyError> { + let stream = state + .borrow_mut() .resource_table - .get::<ConnResource>(resource.conn_rid)?; - - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; - - let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local(); - - poll_fn(|cx| { - let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); - - // Poll connection so the data is flushed - if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { - // close ConnResource - // close RequestResource associated with connection - // close ResponseBodyResource associated with connection - return Poll::Ready(Err(e)); + .get::<HttpStreamResource>(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(body) => break body, + HttpRequestReader::Closed => return Ok(0), } + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let body = request.into_body().peekable(); + *rd = HttpRequestReader::Body(body); + } + _ => unreachable!(), + }; + }; - r - }) - .await?; + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(buf.len(), chunk.len()); + buf[..len].copy_from_slice(&chunk.split_to(len)); + break Ok(len); + } + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(0), + } + } + }; - Ok(()) + let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await } fn op_http_websocket_accept_header( @@ -613,86 +615,22 @@ async fn op_http_upgrade_websocket( rid: ResourceId, _: (), ) -> Result<ResourceId, AnyError> { - let req_resource = state + let stream = state .borrow_mut() .resource_table - .take::<RequestResource>(rid)?; - - let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; - - if let RequestOrStreamReader::Request(req) = inner.as_mut() { - let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; - let stream = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, - None, - ) - .await; - - let (ws_tx, ws_rx) = stream.split(); - let rid = - state - .borrow_mut() - .resource_table - .add(deno_websocket::WsStreamResource { - stream: deno_websocket::WebSocketStreamType::Server { - rx: AsyncRefCell::new(ws_rx), - tx: AsyncRefCell::new(ws_tx), - }, - cancel: Default::default(), - }); - - Ok(rid) - } else { - Err(bad_resource_id()) - } -} - -type BytesStream = - Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; - -enum RequestOrStreamReader { - Request(Option<Request<hyper::Body>>), - StreamReader(StreamReader<BytesStream, bytes::Bytes>), -} - -struct RequestResource { - conn_rid: ResourceId, - inner: AsyncRefCell<RequestOrStreamReader>, - cancel: CancelHandle, -} - -impl Resource for RequestResource { - fn name(&self) -> Cow<str> { - "request".into() - } + .get::<HttpStreamResource>(rid)?; + let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; - fn close(self: Rc<Self>) { - self.cancel.cancel() - } -} - -struct ResponseSenderResource { - sender: oneshot::Sender<Response<Body>>, - conn_rid: ResourceId, -} - -impl Resource for ResponseSenderResource { - fn name(&self) -> Cow<str> { - "responseSender".into() - } -} - -struct ResponseBodyResource { - body: AsyncRefCell<hyper::body::Sender>, - conn_rid: ResourceId, -} + let request = match &mut *rd { + HttpRequestReader::Headers(request) => request, + _ => { + return Err(http_error("cannot upgrade because request body was used")) + } + }; -impl Resource for ResponseBodyResource { - fn name(&self) -> Cow<str> { - "responseBody".into() - } + let transport = hyper::upgrade::on(request).await?; + let ws_rid = ws_create_server_stream(&state, transport).await?; + Ok(ws_rid) } // Needed so hyper can use non Send futures @@ -705,6 +643,33 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); + spawn_local(fut); + } +} + +fn http_error(message: &'static str) -> AnyError { + custom_error("Http", message) +} + +/// Filters out the ever-surprising 'shutdown ENOTCONN' errors. +fn filter_enotconn( + result: Result<(), hyper::Error>, +) -> Result<(), hyper::Error> { + if result + .as_ref() + .err() + .and_then(|err| err.source()) + .and_then(|err| err.downcast_ref::<io::Error>()) + .filter(|err| err.kind() == io::ErrorKind::NotConnected) + .is_some() + { + Ok(()) + } else { + result } } + +/// Create a future that is forever pending. +fn never() -> Pending<Never> { + pending() +} diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index d469b5aaf..ba626a45a 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -34,12 +34,13 @@ use std::sync::Arc; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::TlsConnector; +use tokio_tungstenite::client_async; use tokio_tungstenite::tungstenite::{ handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, Message, + protocol::CloseFrame, protocol::Role, Message, }; use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::{client_async, WebSocketStream}; +use tokio_tungstenite::WebSocketStream; pub use tokio_tungstenite; // Re-export tokio_tungstenite @@ -72,6 +73,27 @@ pub enum WebSocketStreamType { }, } +pub async fn ws_create_server_stream( + state: &Rc<RefCell<OpState>>, + transport: hyper::upgrade::Upgraded, +) -> Result<ResourceId, AnyError> { + let ws_stream = + WebSocketStream::from_raw_socket(transport, Role::Server, None).await; + let (ws_tx, ws_rx) = ws_stream.split(); + + let ws_resource = WsStreamResource { + stream: WebSocketStreamType::Server { + tx: AsyncRefCell::new(ws_tx), + rx: AsyncRefCell::new(ws_rx), + }, + cancel: Default::default(), + }; + + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add(ws_resource); + Ok(rid) +} + pub struct WsStreamResource { pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are diff --git a/runtime/errors.rs b/runtime/errors.rs index fe6e71193..1491161d3 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -17,6 +17,7 @@ use deno_fetch::reqwest; use std::env; use std::error::Error; use std::io; +use std::sync::Arc; fn get_dlopen_error_class(error: &dlopen::Error) -> &'static str { use dlopen::Error::*; @@ -164,6 +165,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { }) .or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class)) .or_else(|| { + e.downcast_ref::<Arc<hyper::Error>>() + .map(|e| get_hyper_error_class(&**e)) + }) + .or_else(|| { e.downcast_ref::<deno_core::Canceled>().map(|e| { let io_err: io::Error = e.to_owned().into(); get_io_error_class(&io_err) diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index 683dc1a57..fddac9261 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -6,6 +6,7 @@ use deno_core::op_sync; use deno_core::Extension; use deno_core::OpState; use deno_core::ResourceId; +use deno_http::http_create_conn_resource; use deno_net::io::TcpStreamResource; use deno_net::ops_tls::TlsStreamResource; @@ -29,7 +30,7 @@ fn op_http_start( let (read_half, write_half) = resource.into_inner(); let tcp_stream = read_half.reunite(write_half)?; let addr = tcp_stream.local_addr()?; - return deno_http::start_http(state, tcp_stream, addr, "http"); + return http_create_conn_resource(state, tcp_stream, addr, "http"); } if let Ok(resource_rc) = state @@ -41,7 +42,7 @@ fn op_http_start( let (read_half, write_half) = resource.into_inner(); let tls_stream = read_half.reunite(write_half); let addr = tls_stream.get_ref().0.local_addr()?; - return deno_http::start_http(state, tls_stream, addr, "https"); + return http_create_conn_resource(state, tls_stream, addr, "https"); } Err(bad_resource_id()) |