summaryrefslogtreecommitdiff
path: root/extensions/net/ops_http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/net/ops_http.rs')
-rw-r--r--extensions/net/ops_http.rs188
1 files changed, 147 insertions, 41 deletions
diff --git a/extensions/net/ops_http.rs b/extensions/net/ops_http.rs
index 54e06c3a7..782ec91d0 100644
--- a/extensions/net/ops_http.rs
+++ b/extensions/net/ops_http.rs
@@ -2,7 +2,6 @@
use crate::io::TcpStreamResource;
use crate::io::TlsStreamResource;
-use crate::ops_tls::TlsStream;
use deno_core::error::bad_resource_id;
use deno_core::error::null_opbuf;
use deno_core::error::type_error;
@@ -25,7 +24,6 @@ use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use hyper::body::HttpBody;
use hyper::http;
-use hyper::server::conn::Connection;
use hyper::server::conn::Http;
use hyper::service::Service as HyperService;
use hyper::Body;
@@ -42,7 +40,6 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncReadExt;
-use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio_util::io::StreamReader;
@@ -54,6 +51,14 @@ pub fn init() -> Vec<OpPair> {
("op_http_response", op_async(op_http_response)),
("op_http_response_write", op_async(op_http_response_write)),
("op_http_response_close", op_async(op_http_response_close)),
+ (
+ "op_http_websocket_accept_header",
+ op_sync(op_http_websocket_accept_header),
+ ),
+ (
+ "op_http_upgrade_websocket",
+ op_async(op_http_upgrade_websocket),
+ ),
]
}
@@ -97,13 +102,15 @@ impl HyperService<Request<Body>> for Service {
}
}
-enum ConnType {
- Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>),
- Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>),
+type ConnFuture = Pin<Box<dyn Future<Output = hyper::Result<()>>>>;
+
+struct Conn {
+ scheme: &'static str,
+ conn: Rc<RefCell<ConnFuture>>,
}
struct ConnResource {
- hyper_connection: ConnType,
+ hyper_connection: Conn,
deno_service: Service,
addr: SocketAddr,
cancel: CancelHandle,
@@ -112,11 +119,12 @@ struct ConnResource {
impl ConnResource {
// TODO(ry) impl Future for ConnResource?
fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
- match &self.hyper_connection {
- ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
- ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
- }
- .map_err(AnyError::from)
+ self
+ .hyper_connection
+ .conn
+ .borrow_mut()
+ .poll_unpin(cx)
+ .map_err(AnyError::from)
}
}
@@ -134,7 +142,7 @@ impl Resource for ConnResource {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
- // request_body_rid:
+ // request_rid:
Option<ResourceId>,
// response_sender_rid:
ResourceId,
@@ -207,12 +215,7 @@ async fn op_http_request_next(
}
let url = {
- let scheme = {
- match conn_resource.hyper_connection {
- ConnType::Tcp(_) => "http",
- ConnType::Tls(_) => "https",
- }
- };
+ let scheme = &conn_resource.hyper_connection.scheme;
let host: Cow<str> = if let Some(host) = req.uri().host() {
Cow::Borrowed(host)
} else if let Some(host) = req.headers().get("HOST") {
@@ -224,24 +227,35 @@ async fn op_http_request_next(
format!("{}://{}{}", scheme, host, path)
};
+ let is_websocket_request = req
+ .headers()
+ .get(hyper::header::CONNECTION)
+ .and_then(|v| {
+ v.to_str().ok().map(|s| "Upgrade".eq_ignore_ascii_case(s))
+ })
+ .unwrap_or(false)
+ && req
+ .headers()
+ .get(hyper::header::UPGRADE)
+ .and_then(|v| {
+ v.to_str().ok().map(|s| "websocket".eq_ignore_ascii_case(s))
+ })
+ .unwrap_or(false);
+
let has_body = if let Some(exact_size) = req.size_hint().exact() {
exact_size > 0
} else {
true
};
- let maybe_request_body_rid = if has_body {
- let stream: BytesStream = Box::pin(req.into_body().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
- let stream_reader = StreamReader::new(stream);
+ let maybe_request_rid = if is_websocket_request || has_body {
let mut state = state.borrow_mut();
- let request_body_rid = state.resource_table.add(RequestBodyResource {
+ let request_rid = state.resource_table.add(RequestResource {
conn_rid,
- reader: AsyncRefCell::new(stream_reader),
+ inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
cancel: CancelHandle::default(),
});
- Some(request_body_rid)
+ Some(request_rid)
} else {
None
};
@@ -254,7 +268,7 @@ async fn op_http_request_next(
});
Poll::Ready(Ok(Some(NextRequestResponse(
- maybe_request_body_rid,
+ maybe_request_rid,
response_sender_rid,
method,
headers,
@@ -303,9 +317,14 @@ fn op_http_start(
let addr = tcp_stream.local_addr()?;
let hyper_connection = Http::new()
.with_executor(LocalExecutor)
- .serve_connection(tcp_stream, deno_service.clone());
+ .serve_connection(tcp_stream, deno_service.clone())
+ .with_upgrades();
+ let conn = Pin::new(Box::new(hyper_connection));
let conn_resource = ConnResource {
- hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
+ hyper_connection: Conn {
+ conn: Rc::new(RefCell::new(conn)),
+ scheme: "http",
+ },
deno_service,
addr,
cancel: CancelHandle::default(),
@@ -326,9 +345,14 @@ fn op_http_start(
let hyper_connection = Http::new()
.with_executor(LocalExecutor)
- .serve_connection(tls_stream, deno_service.clone());
+ .serve_connection(tls_stream, deno_service.clone())
+ .with_upgrades();
+ let conn = Pin::new(Box::new(hyper_connection));
let conn_resource = ConnResource {
- hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
+ hyper_connection: Conn {
+ conn: Rc::new(RefCell::new(conn)),
+ scheme: "https",
+ },
deno_service,
addr,
cancel: CancelHandle::default(),
@@ -367,10 +391,12 @@ async fn op_http_response(
.ok()
.expect("multiple op_http_respond ongoing");
+ let conn_rid = response_sender.conn_rid;
+
let conn_resource = state
.borrow()
.resource_table
- .get::<ConnResource>(response_sender.conn_rid)
+ .get::<ConnResource>(conn_rid)
.ok_or_else(bad_resource_id)?;
let mut builder = Response::builder().status(status);
@@ -393,7 +419,7 @@ async fn op_http_response(
let response_body_rid =
state.borrow_mut().resource_table.add(ResponseBodyResource {
body: AsyncRefCell::new(sender),
- conn_rid: response_sender.conn_rid,
+ conn_rid,
});
Some(response_body_rid)
@@ -407,7 +433,10 @@ async fn op_http_response(
}
poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
+ Poll::Ready(x) => {
+ state.borrow_mut().resource_table.close(conn_rid);
+ Poll::Ready(x)
+ }
Poll::Pending => Poll::Ready(Ok(())),
})
.await?;
@@ -455,7 +484,7 @@ async fn op_http_request_read(
let resource = state
.borrow()
.resource_table
- .get::<RequestBodyResource>(rid as u32)
+ .get::<RequestResource>(rid as u32)
.ok_or_else(bad_resource_id)?;
let conn_resource = state
@@ -464,8 +493,26 @@ async fn op_http_request_read(
.get::<ConnResource>(resource.conn_rid)
.ok_or_else(bad_resource_id)?;
- let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
+ let mut inner = RcRef::map(resource.clone(), |r| &r.inner)
+ .borrow_mut()
+ .await;
+
+ if let RequestOrStreamReader::Request(req) = &mut *inner {
+ let req = req.take().unwrap();
+ let stream: BytesStream = Box::pin(req.into_body().map(|r| {
+ r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
+ }));
+ let reader = StreamReader::new(stream);
+ *inner = RequestOrStreamReader::StreamReader(reader);
+ };
+
+ let reader = match &mut *inner {
+ RequestOrStreamReader::StreamReader(reader) => reader,
+ _ => unreachable!(),
+ };
+
let cancel = RcRef::map(resource, |r| &r.cancel);
+
let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
poll_fn(|cx| {
@@ -521,18 +568,77 @@ async fn op_http_response_write(
Ok(())
}
+fn op_http_websocket_accept_header(
+ _: &mut OpState,
+ key: String,
+ _: (),
+) -> Result<String, AnyError> {
+ let digest = ring::digest::digest(
+ &ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
+ format!("{}258EAFA5-E914-47DA-95CA-C5AB0DC85B11", key).as_bytes(),
+ );
+ Ok(base64::encode(digest))
+}
+
+async fn op_http_upgrade_websocket(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ _: (),
+) -> Result<ResourceId, AnyError> {
+ let req_resource = state
+ .borrow_mut()
+ .resource_table
+ .take::<RequestResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await;
+
+ if let RequestOrStreamReader::Request(req) = inner.as_mut() {
+ let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?;
+ let stream =
+ deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
+ upgraded,
+ deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server,
+ None,
+ )
+ .await;
+
+ let (ws_tx, ws_rx) = stream.split();
+ let rid =
+ state
+ .borrow_mut()
+ .resource_table
+ .add(deno_websocket::WsStreamResource {
+ stream: deno_websocket::WebSocketStreamType::Server {
+ rx: AsyncRefCell::new(ws_rx),
+ tx: AsyncRefCell::new(ws_tx),
+ },
+ cancel: Default::default(),
+ });
+
+ Ok(rid)
+ } else {
+ Err(bad_resource_id())
+ }
+}
+
type BytesStream =
Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
-struct RequestBodyResource {
+enum RequestOrStreamReader {
+ Request(Option<Request<hyper::Body>>),
+ StreamReader(StreamReader<BytesStream, bytes::Bytes>),
+}
+
+struct RequestResource {
conn_rid: ResourceId,
- reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
+ inner: AsyncRefCell<RequestOrStreamReader>,
cancel: CancelHandle,
}
-impl Resource for RequestBodyResource {
+impl Resource for RequestResource {
fn name(&self) -> Cow<str> {
- "requestBody".into()
+ "request".into()
}
fn close(self: Rc<Self>) {