diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 166 |
1 files changed, 159 insertions, 7 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 1c5aa6edd..d603b746b 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,16 +1,20 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; +use crate::resources::CoreResource; use crate::resources::Resource; use crate::state::ThreadSafeState; -use crate::tokio_util; use deno::*; +use futures::Async; use futures::Future; +use futures::Poll; use std; use std::convert::From; use std::net::Shutdown; +use std::net::SocketAddr; use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -22,6 +26,90 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen)))); } +#[derive(Debug, PartialEq)] +enum AcceptState { + Eager, + Pending, + Done, +} + +/// Simply accepts a connection. +pub fn accept(rid: ResourceId) -> Accept { + Accept { + state: AcceptState::Eager, + rid, + } +} + +/// A future representing state of accepting a TCP connection. +#[derive(Debug)] +pub struct Accept { + state: AcceptState, + rid: ResourceId, +} + +impl Future for Accept { + type Item = (TcpStream, SocketAddr); + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.state == AcceptState::Done { + panic!("poll Accept after it's done"); + } + + let mut table = resources::lock_resource_table(); + let listener_resource = table + .get_mut::<TcpListenerResource>(self.rid) + .ok_or_else(|| { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Listener has been closed", + ); + ErrBox::from(e) + })?; + + let listener = &mut listener_resource.listener; + + if self.state == AcceptState::Eager { + // Similar to try_ready!, but also track/untrack accept task + // in TcpListener resource. + // In this way, when the listener is closed, the task can be + // notified to error out (instead of stuck forever). + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + self.state = AcceptState::Done; + return Ok((stream, addr).into()); + } + Ok(Async::NotReady) => { + self.state = AcceptState::Pending; + return Ok(Async::NotReady); + } + Err(e) => { + self.state = AcceptState::Done; + return Err(e); + } + } + } + + match listener.poll_accept().map_err(ErrBox::from) { + Ok(Async::Ready((stream, addr))) => { + listener_resource.untrack_task(); + self.state = AcceptState::Done; + Ok((stream, addr).into()) + } + Ok(Async::NotReady) => { + listener_resource.track_task()?; + Ok(Async::NotReady) + } + Err(e) => { + listener_resource.untrack_task(); + self.state = AcceptState::Done; + Err(e) + } + } + } +} + #[derive(Deserialize)] struct AcceptArgs { rid: i32, @@ -33,10 +121,14 @@ fn op_accept( _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: AcceptArgs = serde_json::from_value(args)?; - let server_rid = args.rid as u32; + let rid = args.rid as u32; + + let table = resources::lock_resource_table(); + table + .get::<TcpListenerResource>(rid) + .ok_or_else(bad_resource)?; - let server_resource = resources::lookup(server_rid)?; - let op = tokio_util::accept(server_resource) + let op = accept(rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; @@ -129,6 +221,59 @@ struct ListenArgs { port: u16, } +#[allow(dead_code)] +struct TcpListenerResource { + listener: tokio::net::TcpListener, + task: Option<futures::task::Task>, + local_addr: SocketAddr, +} + +impl CoreResource for TcpListenerResource {} + +impl Drop for TcpListenerResource { + fn drop(&mut self) { + self.notify_task(); + } +} + +impl TcpListenerResource { + /// Track the current task so future awaiting for connection + /// can be notified when listener is closed. + /// + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self) -> Result<(), ErrBox> { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if self.task.is_some() { + let e = std::io::Error::new( + std::io::ErrorKind::Other, + "Another accept task is ongoing", + ); + return Err(ErrBox::from(e)); + } + + self.task.replace(futures::task::current()); + Ok(()) + } + + /// Notifies a task when listener is closed so accept future can resolve. + pub fn notify_task(&mut self) { + if let Some(task) = self.task.take() { + task.notify(); + } + } + + /// Stop tracking a task. + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + if self.task.is_some() { + self.task.take(); + } + } +} + fn op_listen( state: &ThreadSafeState, args: Value, @@ -142,10 +287,17 @@ fn op_listen( let addr = resolve_addr(&args.hostname, args.port).wait()?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let resource = resources::add_tcp_listener(listener); + let local_addr_str = local_addr.to_string(); + let listener_resource = TcpListenerResource { + listener, + task: None, + local_addr, + }; + let mut table = resources::lock_resource_table(); + let rid = table.add("tcpListener", Box::new(listener_resource)); Ok(JsonOp::Sync(json!({ - "rid": resource.rid, - "localAddr": local_addr.to_string() + "rid": rid, + "localAddr": local_addr_str, }))) } |