diff options
Diffstat (limited to 'extensions/net')
-rw-r--r-- | extensions/net/03_http.js | 144 | ||||
-rw-r--r-- | extensions/net/Cargo.toml | 3 | ||||
-rw-r--r-- | extensions/net/lib.deno_net.unstable.d.ts | 14 | ||||
-rw-r--r-- | extensions/net/ops_http.rs | 188 |
4 files changed, 293 insertions, 56 deletions
diff --git a/extensions/net/03_http.js b/extensions/net/03_http.js index 343b305a6..db2d0a3b1 100644 --- a/extensions/net/03_http.js +++ b/extensions/net/03_http.js @@ -2,21 +2,34 @@ "use strict"; ((window) => { + const webidl = window.__bootstrap.webidl; const { InnerBody } = window.__bootstrap.fetchBody; - const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } = - window.__bootstrap.fetch; + const { setEventTargetData } = window.__bootstrap.eventTarget; + const { + Response, + fromInnerRequest, + toInnerResponse, + newInnerRequest, + newInnerResponse, + fromInnerResponse, + } = window.__bootstrap.fetch; const core = window.Deno.core; const { BadResource, Interrupted } = core; const { ReadableStream } = window.__bootstrap.streams; const abortSignal = window.__bootstrap.abortSignal; + const { WebSocket, _rid, _readyState, _eventLoop, _protocol } = + window.__bootstrap.webSocket; const { - Symbol, - Uint8Array, + ArrayPrototypeIncludes, + ArrayPrototypePush, Promise, StringPrototypeIncludes, + StringPrototypeSplit, + Symbol, SymbolAsyncIterator, - TypeError, TypedArrayPrototypeSubarray, + TypeError, + Uint8Array, } = window.__bootstrap.primordials; function serveHttp(conn) { @@ -65,7 +78,7 @@ if (nextRequest === null) return null; const [ - requestBodyRid, + requestRid, responseSenderRid, method, headersList, @@ -74,8 +87,8 @@ /** @type {ReadableStream<Uint8Array> | undefined} */ let body = null; - if (typeof requestBodyRid === "number") { - body = createRequestBodyStream(requestBodyRid); + if (typeof requestRid === "number") { + body = createRequestBodyStream(requestRid); } const innerRequest = newInnerRequest( @@ -87,7 +100,11 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - const respondWith = createRespondWith(this, responseSenderRid); + const respondWith = createRespondWith( + this, + responseSenderRid, + requestRid, + ); return { request, respondWith }; } @@ -118,7 +135,7 @@ ); } - function createRespondWith(httpConn, responseSenderRid) { + function createRespondWith(httpConn, responseSenderRid, requestRid) { return async function respondWith(resp) { if (resp instanceof Promise) { resp = await resp; @@ -222,10 +239,51 @@ } catch { /* pass */ } } } + + const ws = resp[_ws]; + if (ws) { + if (typeof requestRid !== "number") { + throw new TypeError( + "This request can not be upgraded to a websocket connection.", + ); + } + + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + requestRid, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); + + ws[_readyState] = WebSocket.CLOSED; + + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + ws.dispatchEvent(event); + + try { + core.close(wsRid); + } catch (err) { + // Ignore error if the socket has already been closed. + if (!(err instanceof Deno.errors.BadResource)) throw err; + } + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + } + } }; } - function createRequestBodyStream(requestBodyRid) { + function createRequestBodyStream(requestRid) { return new ReadableStream({ type: "bytes", async pull(controller) { @@ -234,7 +292,7 @@ // stream. const chunk = new Uint8Array(16 * 1024 + 256); const read = await readRequest( - requestBodyRid, + requestRid, chunk, ); if (read > 0) { @@ -243,23 +301,79 @@ } else { // We have reached the end of the body, so we close the stream. controller.close(); - core.close(requestBodyRid); + core.close(requestRid); } } catch (err) { // There was an error while reading a chunk of the body, so we // error. controller.error(err); controller.close(); - core.close(requestBodyRid); + core.close(requestRid); } }, cancel() { - core.close(requestBodyRid); + core.close(requestRid); }, }); } + const _ws = Symbol("[[associated_ws]]"); + + function upgradeWebSocket(request, options = {}) { + if (request.headers.get("upgrade") !== "websocket") { + throw new TypeError( + "Invalid Header: 'upgrade' header must be 'websocket'", + ); + } + + if (request.headers.get("connection") !== "Upgrade") { + throw new TypeError( + "Invalid Header: 'connection' header must be 'Upgrade'", + ); + } + + const websocketKey = request.headers.get("sec-websocket-key"); + if (websocketKey === null) { + throw new TypeError( + "Invalid Header: 'sec-websocket-key' header must be set", + ); + } + + const accept = core.opSync("op_http_websocket_accept_header", websocketKey); + + const r = newInnerResponse(101); + r.headerList = [ + ["upgrade", "websocket"], + ["connection", "Upgrade"], + ["sec-websocket-accept", accept], + ]; + + const protocolsStr = request.headers.get("sec-websocket-protocol") || ""; + const protocols = StringPrototypeSplit(protocolsStr, ", "); + if (protocols && options.protocol) { + if (ArrayPrototypeIncludes(protocols, options.protocol)) { + ArrayPrototypePush(r.headerList, [ + "sec-websocket-protocol", + options.protocol, + ]); + } else { + throw new TypeError( + `Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`, + ); + } + } + + const response = fromInnerResponse(r, "immutable"); + + const websocket = webidl.createBranded(WebSocket); + setEventTargetData(websocket); + response[_ws] = websocket; + + return { response, websocket }; + } + window.__bootstrap.http = { serveHttp, + upgradeWebSocket, }; })(this); diff --git a/extensions/net/Cargo.toml b/extensions/net/Cargo.toml index e39c5c488..de84dd851 100644 --- a/extensions/net/Cargo.toml +++ b/extensions/net/Cargo.toml @@ -15,12 +15,15 @@ path = "lib.rs" [dependencies] deno_core = { version = "0.92.0", path = "../../core" } +deno_websocket = { version = "0.15.0", path = "../websocket" } +base64 = "0.13.0" bytes = "1" log = "0.4.14" lazy_static = "1.4.0" http = "0.2.3" hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] } +ring = "0.16.20" rustls = "0.19.0" serde = { version = "1.0.125", features = ["derive"] } tokio = { version = "1.8.0", features = ["full"] } diff --git a/extensions/net/lib.deno_net.unstable.d.ts b/extensions/net/lib.deno_net.unstable.d.ts index 905a7acc1..c47558edc 100644 --- a/extensions/net/lib.deno_net.unstable.d.ts +++ b/extensions/net/lib.deno_net.unstable.d.ts @@ -259,4 +259,18 @@ declare namespace Deno { * then the underlying HttpConn resource is closed automatically. */ export function serveHttp(conn: Conn): HttpConn; + + export interface WebSocketUpgrade { + response: Response; + websocket: WebSocket; + } + + export interface UpgradeWebSocketOptions { + protocol?: string; + } + + export function upgradeWebSocket( + request: Request, + options?: UpgradeWebSocketOptions, + ): WebSocketUpgrade; } diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs index 54e06c3a7..782ec91d0 100644 --- a/extensions/net/ops_http.rs +++ b/extensions/net/ops_http.rs @@ -2,7 +2,6 @@ use crate::io::TcpStreamResource; use crate::io::TlsStreamResource; -use crate::ops_tls::TlsStream; use deno_core::error::bad_resource_id; use deno_core::error::null_opbuf; use deno_core::error::type_error; @@ -25,7 +24,6 @@ use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use hyper::body::HttpBody; use hyper::http; -use hyper::server::conn::Connection; use hyper::server::conn::Http; use hyper::service::Service as HyperService; use hyper::Body; @@ -42,7 +40,6 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncReadExt; -use tokio::net::TcpStream; use tokio::sync::oneshot; use tokio_util::io::StreamReader; @@ -54,6 +51,14 @@ pub fn init() -> Vec<OpPair> { ("op_http_response", op_async(op_http_response)), ("op_http_response_write", op_async(op_http_response_write)), ("op_http_response_close", op_async(op_http_response_close)), + ( + "op_http_websocket_accept_header", + op_sync(op_http_websocket_accept_header), + ), + ( + "op_http_upgrade_websocket", + op_async(op_http_upgrade_websocket), + ), ] } @@ -97,13 +102,15 @@ impl HyperService<Request<Body>> for Service { } } -enum ConnType { - Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>), - Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>), +type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>; + +struct Conn { + scheme: &'static str, + conn: Rc<RefCell<ConnFuture>>, } struct ConnResource { - hyper_connection: ConnType, + hyper_connection: Conn, deno_service: Service, addr: SocketAddr, cancel: CancelHandle, @@ -112,11 +119,12 @@ struct ConnResource { impl ConnResource { // TODO(ry) impl Future for ConnResource? fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> { - match &self.hyper_connection { - ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx), - ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx), - } - .map_err(AnyError::from) + self + .hyper_connection + .conn + .borrow_mut() + .poll_unpin(cx) + .map_err(AnyError::from) } } @@ -134,7 +142,7 @@ impl Resource for ConnResource { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // request_body_rid: + // request_rid: Option<ResourceId>, // response_sender_rid: ResourceId, @@ -207,12 +215,7 @@ async fn op_http_request_next( } let url = { - let scheme = { - match conn_resource.hyper_connection { - ConnType::Tcp(_) => "http", - ConnType::Tls(_) => "https", - } - }; + let scheme = &conn_resource.hyper_connection.scheme; let host: Cow<str> = if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { @@ -224,24 +227,35 @@ async fn op_http_request_next( format!("{}://{}{}", scheme, host, path) }; + let is_websocket_request = req + .headers() + .get(hyper::header::CONNECTION) + .and_then(|v| { + v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s)) + }) + .unwrap_or(false) + && req + .headers() + .get(hyper::header::UPGRADE) + .and_then(|v| { + v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s)) + }) + .unwrap_or(false); + let has_body = if let Some(exact_size) = req.size_hint().exact() { exact_size > 0 } else { true }; - let maybe_request_body_rid = if has_body { - let stream: BytesStream = Box::pin(req.into_body().map(|r| { - r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) - })); - let stream_reader = StreamReader::new(stream); + let maybe_request_rid = if is_websocket_request || has_body { let mut state = state.borrow_mut(); - let request_body_rid = state.resource_table.add(RequestBodyResource { + let request_rid = state.resource_table.add(RequestResource { conn_rid, - reader: AsyncRefCell::new(stream_reader), + inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), cancel: CancelHandle::default(), }); - Some(request_body_rid) + Some(request_rid) } else { None }; @@ -254,7 +268,7 @@ async fn op_http_request_next( }); Poll::Ready(Ok(Some(NextRequestResponse( - maybe_request_body_rid, + maybe_request_rid, response_sender_rid, method, headers, @@ -303,9 +317,14 @@ fn op_http_start( let addr = tcp_stream.local_addr()?; let hyper_connection = Http::new() .with_executor(LocalExecutor) - .serve_connection(tcp_stream, deno_service.clone()); + .serve_connection(tcp_stream, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); let conn_resource = ConnResource { - hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))), + hyper_connection: Conn { + conn: Rc::new(RefCell::new(conn)), + scheme: "http", + }, deno_service, addr, cancel: CancelHandle::default(), @@ -326,9 +345,14 @@ fn op_http_start( let hyper_connection = Http::new() .with_executor(LocalExecutor) - .serve_connection(tls_stream, deno_service.clone()); + .serve_connection(tls_stream, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); let conn_resource = ConnResource { - hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))), + hyper_connection: Conn { + conn: Rc::new(RefCell::new(conn)), + scheme: "https", + }, deno_service, addr, cancel: CancelHandle::default(), @@ -367,10 +391,12 @@ async fn op_http_response( .ok() .expect("multiple op_http_respond ongoing"); + let conn_rid = response_sender.conn_rid; + let conn_resource = state .borrow() .resource_table - .get::<ConnResource>(response_sender.conn_rid) + .get::<ConnResource>(conn_rid) .ok_or_else(bad_resource_id)?; let mut builder = Response::builder().status(status); @@ -393,7 +419,7 @@ async fn op_http_response( let response_body_rid = state.borrow_mut().resource_table.add(ResponseBodyResource { body: AsyncRefCell::new(sender), - conn_rid: response_sender.conn_rid, + conn_rid, }); Some(response_body_rid) @@ -407,7 +433,10 @@ async fn op_http_response( } poll_fn(|cx| match conn_resource.poll(cx) { - Poll::Ready(x) => Poll::Ready(x), + Poll::Ready(x) => { + state.borrow_mut().resource_table.close(conn_rid); + Poll::Ready(x) + } Poll::Pending => Poll::Ready(Ok(())), }) .await?; @@ -455,7 +484,7 @@ async fn op_http_request_read( let resource = state .borrow() .resource_table - .get::<RequestBodyResource>(rid as u32) + .get::<RequestResource>(rid as u32) .ok_or_else(bad_resource_id)?; let conn_resource = state @@ -464,8 +493,26 @@ async fn op_http_request_read( .get::<ConnResource>(resource.conn_rid) .ok_or_else(bad_resource_id)?; - let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; + let mut inner = RcRef::map(resource.clone(), |r| &r.inner) + .borrow_mut() + .await; + + if let RequestOrStreamReader::Request(req) = &mut *inner { + let req = req.take().unwrap(); + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let reader = StreamReader::new(stream); + *inner = RequestOrStreamReader::StreamReader(reader); + }; + + let reader = match &mut *inner { + RequestOrStreamReader::StreamReader(reader) => reader, + _ => unreachable!(), + }; + let cancel = RcRef::map(resource, |r| &r.cancel); + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); poll_fn(|cx| { @@ -521,18 +568,77 @@ async fn op_http_response_write( Ok(()) } +fn op_http_websocket_accept_header( + _: &mut OpState, + key: String, + _: (), +) -> Result<String, AnyError> { + let digest = ring::digest::digest( + &ring::digest::SHA1_FOR_LEGACY_USE_ONLY, + format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(), + ); + Ok(base64::encode(digest)) +} + +async fn op_http_upgrade_websocket( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + _: (), +) -> Result<ResourceId, AnyError> { + let req_resource = state + .borrow_mut() + .resource_table + .take::<RequestResource>(rid) + .ok_or_else(bad_resource_id)?; + + let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; + + if let RequestOrStreamReader::Request(req) = inner.as_mut() { + let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; + let stream = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + + let (ws_tx, ws_rx) = stream.split(); + let rid = + state + .borrow_mut() + .resource_table + .add(deno_websocket::WsStreamResource { + stream: deno_websocket::WebSocketStreamType::Server { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, + cancel: Default::default(), + }); + + Ok(rid) + } else { + Err(bad_resource_id()) + } +} + type BytesStream = Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>; -struct RequestBodyResource { +enum RequestOrStreamReader { + Request(Option<Request<hyper::Body>>), + StreamReader(StreamReader<BytesStream, bytes::Bytes>), +} + +struct RequestResource { conn_rid: ResourceId, - reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, + inner: AsyncRefCell<RequestOrStreamReader>, cancel: CancelHandle, } -impl Resource for RequestBodyResource { +impl Resource for RequestResource { fn name(&self) -> Cow<str> { - "requestBody".into() + "request".into() } fn close(self: Rc<Self>) { |