diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-02-27 21:08:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-27 21:08:21 +0100 |
commit | fa5f3aa600311000cee0a4c794f85d48e6397362 (patch) | |
tree | c7219b232716c5bd850c4e19708d89d0d9739e41 /cli/ops/net.rs | |
parent | ff4b7b0921760f97e83ec34435f875e074f3d069 (diff) |
replace impl Future with poll_fn for net.rs, process.rs, tls.rs (#4158)
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 122 |
1 files changed, 35 insertions, 87 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 2b3638fdb..0802e232d 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -5,13 +5,12 @@ use crate::op_error::OpError; use crate::resolve_addr::resolve_addr; use crate::state::State; use deno_core::*; +use futures::future::poll_fn; use futures::future::FutureExt; use std; use std::convert::From; -use std::future::Future; use std::net::Shutdown; use std::net::SocketAddr; -use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio; @@ -28,55 +27,6 @@ pub fn init(i: &mut Isolate, s: &State) { i.register_op("op_send", s.stateful_json_op(op_send)); } -#[derive(Debug, PartialEq)] -enum AcceptState { - Pending, - Done, -} - -/// A future representing state of accepting a TCP connection. -pub struct Accept<'a> { - accept_state: AcceptState, - rid: ResourceId, - state: &'a State, -} - -impl Future for Accept<'_> { - type Output = Result<(TcpStream, SocketAddr), OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - if inner.accept_state == AcceptState::Done { - panic!("poll Accept after it's done"); - } - - let mut state = inner.state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TcpListenerResource>(inner.rid) - .ok_or_else(|| OpError::other("Listener has been closed".to_string()))?; - - let listener = &mut listener_resource.listener; - - match listener.poll_accept(cx).map_err(OpError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - inner.accept_state = AcceptState::Done; - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - inner.accept_state = AcceptState::Done; - Poll::Ready(Err(e)) - } - } - } -} - #[derive(Deserialize)] struct AcceptArgs { rid: i32, @@ -98,12 +48,32 @@ fn op_accept( .ok_or_else(OpError::bad_resource)?; } + let state = state.clone(); + let op = async move { - let accept_fut = Accept { - accept_state: AcceptState::Pending, - rid, - state: &state_, - }; + let accept_fut = poll_fn(|cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let listener_resource = resource_table + .get_mut::<TcpListenerResource>(rid) + .ok_or_else(|| { + OpError::other("Listener has been closed".to_string()) + })?; + let listener = &mut listener_resource.listener; + match listener.poll_accept(cx).map_err(OpError::from) { + Poll::Ready(Ok((stream, addr))) => { + listener_resource.untrack_task(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending + } + Poll::Ready(Err(e)) => { + listener_resource.untrack_task(); + Poll::Ready(Err(e)) + } + } + }); let (tcp_stream, _socket_addr) = accept_fut.await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; @@ -129,31 +99,6 @@ fn op_accept( Ok(JsonOp::Async(op.boxed_local())) } -pub struct Receive<'a> { - state: &'a State, - rid: ResourceId, - buf: ZeroCopyBuf, -} - -impl Future for Receive<'_> { - type Output = Result<(usize, SocketAddr), OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut state = inner.state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(inner.rid) - .ok_or_else(|| OpError::other("Socket has been closed".to_string()))?; - - let socket = &mut resource.socket; - - socket - .poll_recv_from(cx, &mut inner.buf) - .map_err(OpError::from) - } -} - #[derive(Deserialize)] struct ReceiveArgs { rid: i32, @@ -165,7 +110,7 @@ fn op_receive( zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { assert!(zero_copy.is_some()); - let buf = zero_copy.unwrap(); + let mut buf = zero_copy.unwrap(); let args: ReceiveArgs = serde_json::from_value(args)?; let rid = args.rid as u32; @@ -173,11 +118,14 @@ fn op_receive( let state_ = state.clone(); let op = async move { - let receive_fut = Receive { - state: &state_, - rid, - buf, - }; + let receive_fut = poll_fn(|cx| { + let resource_table = &mut state_.borrow_mut().resource_table; + let resource = resource_table + .get_mut::<UdpSocketResource>(rid) + .ok_or_else(|| OpError::other("Socket has been closed".to_string()))?; + let socket = &mut resource.socket; + socket.poll_recv_from(cx, &mut buf).map_err(OpError::from) + }); let (size, remote_addr) = receive_fut.await?; Ok(json!({ "size": size, |