diff options
-rw-r--r-- | cli/ops/net.rs | 122 | ||||
-rw-r--r-- | cli/ops/process.rs | 45 | ||||
-rw-r--r-- | cli/ops/tls.rs | 80 |
3 files changed, 72 insertions, 175 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 2b3638fdb..0802e232d 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -5,13 +5,12 @@ use crate::op_error::OpError; use crate::resolve_addr::resolve_addr; use crate::state::State; use deno_core::*; +use futures::future::poll_fn; use futures::future::FutureExt; use std; use std::convert::From; -use std::future::Future; use std::net::Shutdown; use std::net::SocketAddr; -use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio; @@ -28,55 +27,6 @@ pub fn init(i: &mut Isolate, s: &State) { i.register_op("op_send", s.stateful_json_op(op_send)); } -#[derive(Debug, PartialEq)] -enum AcceptState { - Pending, - Done, -} - -/// A future representing state of accepting a TCP connection. -pub struct Accept<'a> { - accept_state: AcceptState, - rid: ResourceId, - state: &'a State, -} - -impl Future for Accept<'_> { - type Output = Result<(TcpStream, SocketAddr), OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - if inner.accept_state == AcceptState::Done { - panic!("poll Accept after it's done"); - } - - let mut state = inner.state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TcpListenerResource>(inner.rid) - .ok_or_else(|| OpError::other("Listener has been closed".to_string()))?; - - let listener = &mut listener_resource.listener; - - match listener.poll_accept(cx).map_err(OpError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - inner.accept_state = AcceptState::Done; - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - inner.accept_state = AcceptState::Done; - Poll::Ready(Err(e)) - } - } - } -} - #[derive(Deserialize)] struct AcceptArgs { rid: i32, @@ -98,12 +48,32 @@ fn op_accept( .ok_or_else(OpError::bad_resource)?; } + let state = state.clone(); + let op = async move { - let accept_fut = Accept { - accept_state: AcceptState::Pending, - rid, - state: &state_, - }; + let accept_fut = poll_fn(|cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let listener_resource = resource_table + .get_mut::<TcpListenerResource>(rid) + .ok_or_else(|| { + OpError::other("Listener has been closed".to_string()) + })?; + let listener = &mut listener_resource.listener; + match listener.poll_accept(cx).map_err(OpError::from) { + Poll::Ready(Ok((stream, addr))) => { + listener_resource.untrack_task(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending + } + Poll::Ready(Err(e)) => { + listener_resource.untrack_task(); + Poll::Ready(Err(e)) + } + } + }); let (tcp_stream, _socket_addr) = accept_fut.await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; @@ -129,31 +99,6 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } -pub struct Receive<'a> { - state: &'a State, - rid: ResourceId, - buf: ZeroCopyBuf, -} - -impl Future for Receive<'_> { - type Output = Result<(usize, SocketAddr), OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut state = inner.state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(inner.rid) - .ok_or_else(|| OpError::other("Socket has been closed".to_string()))?; - - let socket = &mut resource.socket; - - socket - .poll_recv_from(cx, &mut inner.buf) - .map_err(OpError::from) - } -} - #[derive(Deserialize)] struct ReceiveArgs { rid: i32, @@ -165,7 +110,7 @@ fn op_receive( zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { assert!(zero_copy.is_some()); - let buf = zero_copy.unwrap(); + let mut buf = zero_copy.unwrap(); let args: ReceiveArgs = serde_json::from_value(args)?; let rid = args.rid as u32; @@ -173,11 +118,14 @@ fn op_receive( let state_ = state.clone(); let op = async move { - let receive_fut = Receive { - state: &state_, - rid, - buf, - }; + let receive_fut = poll_fn(|cx| { + let resource_table = &mut state_.borrow_mut().resource_table; + let resource = resource_table + .get_mut::<UdpSocketResource>(rid) + .ok_or_else(|| OpError::other("Socket has been closed".to_string()))?; + let socket = &mut resource.socket; + socket.poll_recv_from(cx, &mut buf).map_err(OpError::from) + }); let (size, remote_addr) = receive_fut.await?; Ok(json!({ "size": size, diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 82ac25bbe..ad6a022bf 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -6,15 +6,11 @@ use crate::signal::kill; use crate::state::State; use deno_core::*; use futures; +use futures::future::poll_fn; use futures::future::FutureExt; -use futures::future::TryFutureExt; +use futures::TryFutureExt; use std; use std::convert::From; -use std::future::Future; -use std::pin::Pin; -use std::process::ExitStatus; -use std::task::Context; -use std::task::Poll; use tokio::process::Command; #[cfg(unix)] @@ -172,26 +168,6 @@ fn op_run( }))) } -pub struct ChildStatus { - rid: ResourceId, - state: State, -} - -impl Future for ChildStatus { - type Output = Result<ExitStatus, OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut state = inner.state.borrow_mut(); - let child_resource = state - .resource_table - .get_mut::<ChildResource>(inner.rid) - .ok_or_else(OpError::bad_resource)?; - let child = &mut child_resource.child; - child.map_err(OpError::from).poll_unpin(cx) - } -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct RunStatusArgs { @@ -207,14 +183,19 @@ fn op_run_status( let rid = args.rid as u32; state.check_run()?; - - let future = ChildStatus { - rid, - state: state.clone(), - }; + let state = state.clone(); let future = async move { - let run_status = future.await?; + let run_status = poll_fn(|cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let child_resource = resource_table + .get_mut::<ChildResource>(rid) + .ok_or_else(OpError::bad_resource)?; + let child = &mut child_resource.child; + child.map_err(OpError::from).poll_unpin(cx) + }) + .await?; + let code = run_status.code(); #[cfg(unix)] diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index af507ce6a..5b316804c 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -5,15 +5,14 @@ use crate::op_error::OpError; use crate::resolve_addr::resolve_addr; use crate::state::State; use deno_core::*; +use futures::future::poll_fn; use futures::future::FutureExt; use std; use std::convert::From; use std::fs::File; -use std::future::Future; use std::io::BufReader; use std::net::SocketAddr; use std::path::Path; -use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -265,55 +264,6 @@ fn op_listen_tls( }))) } -#[derive(Debug, PartialEq)] -enum AcceptTlsState { - Pending, - Done, -} - -/// A future representing state of accepting a TLS connection. -pub struct AcceptTls { - accept_state: AcceptTlsState, - rid: ResourceId, - state: State, -} - -impl Future for AcceptTls { - type Output = Result<(TcpStream, SocketAddr), OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - if inner.accept_state == AcceptTlsState::Done { - panic!("poll AcceptTls after it's done"); - } - - let mut state = inner.state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TlsListenerResource>(inner.rid) - .ok_or_else(|| OpError::other("Listener has been closed".to_string()))?; - - let listener = &mut listener_resource.listener; - - match listener.poll_accept(cx).map_err(OpError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - inner.accept_state = AcceptTlsState::Done; - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - inner.accept_state = AcceptTlsState::Done; - Poll::Ready(Err(e)) - } - } - } -} - #[derive(Deserialize)] struct AcceptTlsArgs { rid: i32, @@ -328,11 +278,29 @@ fn op_accept_tls( let rid = args.rid as u32; let state = state.clone(); let op = async move { - let accept_fut = AcceptTls { - accept_state: AcceptTlsState::Pending, - rid, - state: state.clone(), - }; + let accept_fut = poll_fn(|cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let listener_resource = resource_table + .get_mut::<TlsListenerResource>(rid) + .ok_or_else(|| { + OpError::other("Listener has been closed".to_string()) + })?; + let listener = &mut listener_resource.listener; + match listener.poll_accept(cx).map_err(OpError::from) { + Poll::Ready(Ok((stream, addr))) => { + listener_resource.untrack_task(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending + } + Poll::Ready(Err(e)) => { + listener_resource.untrack_task(); + Poll::Ready(Err(e)) + } + } + }); let (tcp_stream, _socket_addr) = accept_fut.await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; |