summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs166
1 files changed, 159 insertions, 7 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 1c5aa6edd..d603b746b 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,16 +1,20 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
use crate::resources;
+use crate::resources::CoreResource;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
-use crate::tokio_util;
use deno::*;
+use futures::Async;
use futures::Future;
+use futures::Poll;
use std;
use std::convert::From;
use std::net::Shutdown;
+use std::net::SocketAddr;
use tokio;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
@@ -22,6 +26,90 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("listen", s.core_op(json_op(s.stateful_op(op_listen))));
}
+#[derive(Debug, PartialEq)]
+enum AcceptState {
+ Eager,
+ Pending,
+ Done,
+}
+
+/// Simply accepts a connection.
+pub fn accept(rid: ResourceId) -> Accept {
+ Accept {
+ state: AcceptState::Eager,
+ rid,
+ }
+}
+
+/// A future representing state of accepting a TCP connection.
+#[derive(Debug)]
+pub struct Accept {
+ state: AcceptState,
+ rid: ResourceId,
+}
+
+impl Future for Accept {
+ type Item = (TcpStream, SocketAddr);
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ if self.state == AcceptState::Done {
+ panic!("poll Accept after it's done");
+ }
+
+ let mut table = resources::lock_resource_table();
+ let listener_resource = table
+ .get_mut::<TcpListenerResource>(self.rid)
+ .ok_or_else(|| {
+ let e = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Listener has been closed",
+ );
+ ErrBox::from(e)
+ })?;
+
+ let listener = &mut listener_resource.listener;
+
+ if self.state == AcceptState::Eager {
+ // Similar to try_ready!, but also track/untrack accept task
+ // in TcpListener resource.
+ // In this way, when the listener is closed, the task can be
+ // notified to error out (instead of stuck forever).
+ match listener.poll_accept().map_err(ErrBox::from) {
+ Ok(Async::Ready((stream, addr))) => {
+ self.state = AcceptState::Done;
+ return Ok((stream, addr).into());
+ }
+ Ok(Async::NotReady) => {
+ self.state = AcceptState::Pending;
+ return Ok(Async::NotReady);
+ }
+ Err(e) => {
+ self.state = AcceptState::Done;
+ return Err(e);
+ }
+ }
+ }
+
+ match listener.poll_accept().map_err(ErrBox::from) {
+ Ok(Async::Ready((stream, addr))) => {
+ listener_resource.untrack_task();
+ self.state = AcceptState::Done;
+ Ok((stream, addr).into())
+ }
+ Ok(Async::NotReady) => {
+ listener_resource.track_task()?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => {
+ listener_resource.untrack_task();
+ self.state = AcceptState::Done;
+ Err(e)
+ }
+ }
+ }
+}
+
#[derive(Deserialize)]
struct AcceptArgs {
rid: i32,
@@ -33,10 +121,14 @@ fn op_accept(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
- let server_rid = args.rid as u32;
+ let rid = args.rid as u32;
+
+ let table = resources::lock_resource_table();
+ table
+ .get::<TcpListenerResource>(rid)
+ .ok_or_else(bad_resource)?;
- let server_resource = resources::lookup(server_rid)?;
- let op = tokio_util::accept(server_resource)
+ let op = accept(rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
@@ -129,6 +221,59 @@ struct ListenArgs {
port: u16,
}
+#[allow(dead_code)]
+struct TcpListenerResource {
+ listener: tokio::net::TcpListener,
+ task: Option<futures::task::Task>,
+ local_addr: SocketAddr,
+}
+
+impl CoreResource for TcpListenerResource {}
+
+impl Drop for TcpListenerResource {
+ fn drop(&mut self) {
+ self.notify_task();
+ }
+}
+
+impl TcpListenerResource {
+ /// Track the current task so future awaiting for connection
+ /// can be notified when listener is closed.
+ ///
+ /// Throws an error if another task is already tracked.
+ pub fn track_task(&mut self) -> Result<(), ErrBox> {
+ // Currently, we only allow tracking a single accept task for a listener.
+ // This might be changed in the future with multiple workers.
+ // Caveat: TcpListener by itself also only tracks an accept task at a time.
+ // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
+ if self.task.is_some() {
+ let e = std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "Another accept task is ongoing",
+ );
+ return Err(ErrBox::from(e));
+ }
+
+ self.task.replace(futures::task::current());
+ Ok(())
+ }
+
+ /// Notifies a task when listener is closed so accept future can resolve.
+ pub fn notify_task(&mut self) {
+ if let Some(task) = self.task.take() {
+ task.notify();
+ }
+ }
+
+ /// Stop tracking a task.
+ /// Happens when the task is done and thus no further tracking is needed.
+ pub fn untrack_task(&mut self) {
+ if self.task.is_some() {
+ self.task.take();
+ }
+ }
+}
+
fn op_listen(
state: &ThreadSafeState,
args: Value,
@@ -142,10 +287,17 @@ fn op_listen(
let addr = resolve_addr(&args.hostname, args.port).wait()?;
let listener = TcpListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
- let resource = resources::add_tcp_listener(listener);
+ let local_addr_str = local_addr.to_string();
+ let listener_resource = TcpListenerResource {
+ listener,
+ task: None,
+ local_addr,
+ };
+ let mut table = resources::lock_resource_table();
+ let rid = table.add("tcpListener", Box::new(listener_resource));
Ok(JsonOp::Sync(json!({
- "rid": resource.rid,
- "localAddr": local_addr.to_string()
+ "rid": rid,
+ "localAddr": local_addr_str,
})))
}