summaryrefslogtreecommitdiff
path: root/runtime/ops/http.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-06-29 01:43:03 +0200
committerGitHub <noreply@github.com>2021-06-29 01:43:03 +0200
commit38a7128cdd6f3308ba3c13cfb0b0d4ef925a44c3 (patch)
tree8f0c86028d9ba0266f1846e7f3611f7049cb43a8 /runtime/ops/http.rs
parent30cba2484815f712502ae8937a25afa13aba0818 (diff)
feat: Add "deno_net" extension (#11150)
This commits moves implementation of net related APIs available on "Deno" namespace to "deno_net" extension. Following APIs were moved: - Deno.listen() - Deno.connect() - Deno.listenTls() - Deno.serveHttp() - Deno.shutdown() - Deno.resolveDns() - Deno.listenDatagram() - Deno.startTls() - Deno.Conn - Deno.Listener - Deno.DatagramConn
Diffstat (limited to 'runtime/ops/http.rs')
-rw-r--r--runtime/ops/http.rs579
1 files changed, 0 insertions, 579 deletions
diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs
deleted file mode 100644
index 01658c802..000000000
--- a/runtime/ops/http.rs
+++ /dev/null
@@ -1,579 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-
-use crate::ops::io::TcpStreamResource;
-use crate::ops::io::TlsStreamResource;
-use crate::ops::tls::TlsStream;
-use deno_core::error::bad_resource_id;
-use deno_core::error::null_opbuf;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
-use deno_core::futures::FutureExt;
-use deno_core::futures::Stream;
-use deno_core::futures::StreamExt;
-use deno_core::op_async;
-use deno_core::op_sync;
-use deno_core::AsyncRefCell;
-use deno_core::ByteString;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::Extension;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ResourceId;
-use deno_core::ZeroCopyBuf;
-use hyper::body::HttpBody;
-use hyper::http;
-use hyper::server::conn::Connection;
-use hyper::server::conn::Http;
-use hyper::service::Service as HyperService;
-use hyper::Body;
-use hyper::Request;
-use hyper::Response;
-use serde::Deserialize;
-use serde::Serialize;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::future::Future;
-use std::net::SocketAddr;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::task::Context;
-use std::task::Poll;
-use tokio::io::AsyncReadExt;
-use tokio::net::TcpStream;
-use tokio::sync::oneshot;
-use tokio_util::io::StreamReader;
-
-pub fn init() -> Extension {
- Extension::builder()
- .ops(vec![
- ("op_http_start", op_sync(op_http_start)),
- ("op_http_request_next", op_async(op_http_request_next)),
- ("op_http_request_read", op_async(op_http_request_read)),
- ("op_http_response", op_async(op_http_response)),
- ("op_http_response_write", op_async(op_http_response_write)),
- ("op_http_response_close", op_async(op_http_response_close)),
- ])
- .build()
-}
-
-struct ServiceInner {
- request: Request<Body>,
- response_tx: oneshot::Sender<Response<Body>>,
-}
-
-#[derive(Clone, Default)]
-struct Service {
- inner: Rc<RefCell<Option<ServiceInner>>>,
- waker: Rc<deno_core::futures::task::AtomicWaker>,
-}
-
-impl HyperService<Request<Body>> for Service {
- type Response = Response<Body>;
- type Error = http::Error;
- #[allow(clippy::type_complexity)]
- type Future =
- Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
-
- fn poll_ready(
- &mut self,
- _cx: &mut Context<'_>,
- ) -> Poll<Result<(), Self::Error>> {
- if self.inner.borrow().is_some() {
- Poll::Pending
- } else {
- Poll::Ready(Ok(()))
- }
- }
-
- fn call(&mut self, req: Request<Body>) -> Self::Future {
- let (resp_tx, resp_rx) = oneshot::channel();
- self.inner.borrow_mut().replace(ServiceInner {
- request: req,
- response_tx: resp_tx,
- });
-
- async move { Ok(resp_rx.await.unwrap()) }.boxed_local()
- }
-}
-
-enum ConnType {
- Tcp(Rc<RefCell<Connection<TcpStream, Service, LocalExecutor>>>),
- Tls(Rc<RefCell<Connection<TlsStream, Service, LocalExecutor>>>),
-}
-
-struct ConnResource {
- hyper_connection: ConnType,
- deno_service: Service,
- addr: SocketAddr,
- cancel: CancelHandle,
-}
-
-impl ConnResource {
- // TODO(ry) impl Future for ConnResource?
- fn poll(&self, cx: &mut Context<'_>) -> Poll<Result<(), AnyError>> {
- match &self.hyper_connection {
- ConnType::Tcp(c) => c.borrow_mut().poll_unpin(cx),
- ConnType::Tls(c) => c.borrow_mut().poll_unpin(cx),
- }
- .map_err(AnyError::from)
- }
-}
-
-impl Resource for ConnResource {
- fn name(&self) -> Cow<str> {
- "httpConnection".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-// We use a tuple instead of struct to avoid serialization overhead of the keys.
-#[derive(Serialize)]
-#[serde(rename_all = "camelCase")]
-struct NextRequestResponse(
- // request_body_rid:
- Option<ResourceId>,
- // response_sender_rid:
- ResourceId,
- // method:
- // This is a String rather than a ByteString because reqwest will only return
- // the method as a str which is guaranteed to be ASCII-only.
- String,
- // headers:
- Vec<(ByteString, ByteString)>,
- // url:
- String,
-);
-
-async fn op_http_request_next(
- state: Rc<RefCell<OpState>>,
- conn_rid: ResourceId,
- _: (),
-) -> Result<Option<NextRequestResponse>, AnyError> {
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel);
-
- poll_fn(|cx| {
- conn_resource.deno_service.waker.register(cx.waker());
- let connection_closed = match conn_resource.poll(cx) {
- Poll::Pending => false,
- Poll::Ready(Ok(())) => {
- // try to close ConnResource, but don't unwrap as it might
- // already be closed
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid);
- true
- }
- Poll::Ready(Err(e)) => {
- // TODO(ry) close RequestResource associated with connection
- // TODO(ry) close ResponseBodyResource associated with connection
- // close ConnResource
- state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid)
- .unwrap();
-
- if should_ignore_error(&e) {
- true
- } else {
- return Poll::Ready(Err(e));
- }
- }
- };
- if let Some(request_resource) =
- conn_resource.deno_service.inner.borrow_mut().take()
- {
- let tx = request_resource.response_tx;
- let req = request_resource.request;
- let method = req.method().to_string();
-
- let mut headers = Vec::with_capacity(req.headers().len());
- for (name, value) in req.headers().iter() {
- let name: &[u8] = name.as_ref();
- let value = value.as_bytes();
- headers
- .push((ByteString(name.to_owned()), ByteString(value.to_owned())));
- }
-
- let url = {
- let scheme = {
- match conn_resource.hyper_connection {
- ConnType::Tcp(_) => "http",
- ConnType::Tls(_) => "https",
- }
- };
- let host: Cow<str> = if let Some(host) = req.uri().host() {
- Cow::Borrowed(host)
- } else if let Some(host) = req.headers().get("HOST") {
- Cow::Borrowed(host.to_str()?)
- } else {
- Cow::Owned(conn_resource.addr.to_string())
- };
- let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
- format!("{}://{}{}", scheme, host, path)
- };
-
- let has_body = if let Some(exact_size) = req.size_hint().exact() {
- exact_size > 0
- } else {
- true
- };
-
- let maybe_request_body_rid = if has_body {
- let stream: BytesStream = Box::pin(req.into_body().map(|r| {
- r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
- }));
- let stream_reader = StreamReader::new(stream);
- let mut state = state.borrow_mut();
- let request_body_rid = state.resource_table.add(RequestBodyResource {
- conn_rid,
- reader: AsyncRefCell::new(stream_reader),
- cancel: CancelHandle::default(),
- });
- Some(request_body_rid)
- } else {
- None
- };
-
- let mut state = state.borrow_mut();
- let response_sender_rid =
- state.resource_table.add(ResponseSenderResource {
- sender: tx,
- conn_rid,
- });
-
- Poll::Ready(Ok(Some(NextRequestResponse(
- maybe_request_body_rid,
- response_sender_rid,
- method,
- headers,
- url,
- ))))
- } else if connection_closed {
- Poll::Ready(Ok(None))
- } else {
- Poll::Pending
- }
- })
- .try_or_cancel(cancel)
- .await
- .map_err(AnyError::from)
-}
-
-fn should_ignore_error(e: &AnyError) -> bool {
- if let Some(e) = e.downcast_ref::<hyper::Error>() {
- use std::error::Error;
- if let Some(std_err) = e.source() {
- if let Some(io_err) = std_err.downcast_ref::<std::io::Error>() {
- if io_err.kind() == std::io::ErrorKind::NotConnected {
- return true;
- }
- }
- }
- }
- false
-}
-
-fn op_http_start(
- state: &mut OpState,
- tcp_stream_rid: ResourceId,
- _: (),
-) -> Result<ResourceId, AnyError> {
- let deno_service = Service::default();
-
- if let Some(resource_rc) = state
- .resource_table
- .take::<TcpStreamResource>(tcp_stream_rid)
- {
- let resource = Rc::try_unwrap(resource_rc)
- .expect("Only a single use of this resource should happen");
- let (read_half, write_half) = resource.into_inner();
- let tcp_stream = read_half.reunite(write_half)?;
- let addr = tcp_stream.local_addr()?;
- let hyper_connection = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(tcp_stream, deno_service.clone());
- let conn_resource = ConnResource {
- hyper_connection: ConnType::Tcp(Rc::new(RefCell::new(hyper_connection))),
- deno_service,
- addr,
- cancel: CancelHandle::default(),
- };
- let rid = state.resource_table.add(conn_resource);
- return Ok(rid);
- }
-
- if let Some(resource_rc) = state
- .resource_table
- .take::<TlsStreamResource>(tcp_stream_rid)
- {
- let resource = Rc::try_unwrap(resource_rc)
- .expect("Only a single use of this resource should happen");
- let (read_half, write_half) = resource.into_inner();
- let tls_stream = read_half.reunite(write_half);
- let addr = tls_stream.get_ref().0.local_addr()?;
-
- let hyper_connection = Http::new()
- .with_executor(LocalExecutor)
- .serve_connection(tls_stream, deno_service.clone());
- let conn_resource = ConnResource {
- hyper_connection: ConnType::Tls(Rc::new(RefCell::new(hyper_connection))),
- deno_service,
- addr,
- cancel: CancelHandle::default(),
- };
- let rid = state.resource_table.add(conn_resource);
- return Ok(rid);
- }
-
- Err(bad_resource_id())
-}
-
-// We use a tuple instead of struct to avoid serialization overhead of the keys.
-#[derive(Deserialize)]
-struct RespondArgs(
- // rid:
- u32,
- // status:
- u16,
- // headers:
- Vec<(ByteString, ByteString)>,
-);
-
-async fn op_http_response(
- state: Rc<RefCell<OpState>>,
- args: RespondArgs,
- data: Option<ZeroCopyBuf>,
-) -> Result<Option<ResourceId>, AnyError> {
- let RespondArgs(rid, status, headers) = args;
-
- let response_sender = state
- .borrow_mut()
- .resource_table
- .take::<ResponseSenderResource>(rid)
- .ok_or_else(bad_resource_id)?;
- let response_sender = Rc::try_unwrap(response_sender)
- .ok()
- .expect("multiple op_http_respond ongoing");
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(response_sender.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut builder = Response::builder().status(status);
-
- builder.headers_mut().unwrap().reserve(headers.len());
- for (key, value) in &headers {
- builder = builder.header(key.as_ref(), value.as_ref());
- }
-
- let res;
- let maybe_response_body_rid = if let Some(d) = data {
- // If a body is passed, we use it, and don't return a body for streaming.
- res = builder.body(Vec::from(&*d).into())?;
- None
- } else {
- // If no body is passed, we return a writer for streaming the body.
- let (sender, body) = Body::channel();
- res = builder.body(body)?;
-
- let response_body_rid =
- state.borrow_mut().resource_table.add(ResponseBodyResource {
- body: AsyncRefCell::new(sender),
- conn_rid: response_sender.conn_rid,
- });
-
- Some(response_body_rid)
- };
-
- // oneshot::Sender::send(v) returns |v| on error, not an error object.
- // The only failure mode is the receiver already having dropped its end
- // of the channel.
- if response_sender.sender.send(res).is_err() {
- return Err(type_error("internal communication error"));
- }
-
- poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await?;
-
- if maybe_response_body_rid.is_none() {
- conn_resource.deno_service.waker.wake();
- }
- Ok(maybe_response_body_rid)
-}
-
-async fn op_http_response_close(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- _: (),
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .take::<ResponseBodyResource>(rid)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
- drop(resource);
-
- let r = poll_fn(|cx| match conn_resource.poll(cx) {
- Poll::Ready(x) => Poll::Ready(x),
- Poll::Pending => Poll::Ready(Ok(())),
- })
- .await;
- conn_resource.deno_service.waker.wake();
- r
-}
-
-async fn op_http_request_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: Option<ZeroCopyBuf>,
-) -> Result<usize, AnyError> {
- let mut data = data.ok_or_else(null_opbuf)?;
-
- let resource = state
- .borrow()
- .resource_table
- .get::<RequestBodyResource>(rid as u32)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
- let cancel = RcRef::map(resource, |r| &r.cancel);
- let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local();
-
- poll_fn(|cx| {
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
- }
-
- read_fut.poll_unpin(cx).map_err(AnyError::from)
- })
- .await
-}
-
-async fn op_http_response_write(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: Option<ZeroCopyBuf>,
-) -> Result<(), AnyError> {
- let buf = data.ok_or_else(null_opbuf)?;
- let resource = state
- .borrow()
- .resource_table
- .get::<ResponseBodyResource>(rid as u32)
- .ok_or_else(bad_resource_id)?;
-
- let conn_resource = state
- .borrow()
- .resource_table
- .get::<ConnResource>(resource.conn_rid)
- .ok_or_else(bad_resource_id)?;
-
- let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
-
- let mut send_data_fut = body.send_data(Vec::from(&*buf).into()).boxed_local();
-
- poll_fn(|cx| {
- let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from);
-
- // Poll connection so the data is flushed
- if let Poll::Ready(Err(e)) = conn_resource.poll(cx) {
- // close ConnResource
- // close RequestResource associated with connection
- // close ResponseBodyResource associated with connection
- return Poll::Ready(Err(e));
- }
-
- r
- })
- .await?;
-
- Ok(())
-}
-
-type BytesStream =
- Pin<Box<dyn Stream<Item = std::io::Result<bytes::Bytes>> + Unpin>>;
-
-struct RequestBodyResource {
- conn_rid: ResourceId,
- reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>,
- cancel: CancelHandle,
-}
-
-impl Resource for RequestBodyResource {
- fn name(&self) -> Cow<str> {
- "requestBody".into()
- }
-
- fn close(self: Rc<Self>) {
- self.cancel.cancel()
- }
-}
-
-struct ResponseSenderResource {
- sender: oneshot::Sender<Response<Body>>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseSenderResource {
- fn name(&self) -> Cow<str> {
- "responseSender".into()
- }
-}
-
-struct ResponseBodyResource {
- body: AsyncRefCell<hyper::body::Sender>,
- conn_rid: ResourceId,
-}
-
-impl Resource for ResponseBodyResource {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
-}
-
-// Needed so hyper can use non Send futures
-#[derive(Clone)]
-struct LocalExecutor;
-
-impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
-where
- Fut: Future + 'static,
- Fut::Output: 'static,
-{
- fn execute(&self, fut: Fut) {
- tokio::task::spawn_local(fut);
- }
-}