summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/js/tests/net_test.ts43
-rw-r--r--cli/ops/fetch.rs6
-rw-r--r--cli/ops/fs.rs13
-rw-r--r--cli/ops/io.rs100
-rw-r--r--cli/ops/net.rs26
-rw-r--r--cli/ops/process.rs20
-rw-r--r--cli/ops/tls.rs10
-rw-r--r--cli/ops/tty.rs73
-rw-r--r--cli/tests/integration_tests.rs6
9 files changed, 219 insertions, 78 deletions
diff --git a/cli/js/tests/net_test.ts b/cli/js/tests/net_test.ts
index 1a58c3531..fccd62f38 100644
--- a/cli/js/tests/net_test.ts
+++ b/cli/js/tests/net_test.ts
@@ -336,3 +336,46 @@ unitTest(
conn.close();
}
);
+
+unitTest(
+ {
+ perms: { net: true }
+ },
+ async function netHangsOnClose() {
+ let acceptedConn: Deno.Conn;
+ const resolvable = createResolvable();
+
+ async function iteratorReq(listener: Deno.Listener): Promise<void> {
+ const p = new Uint8Array(10);
+ const conn = await listener.accept();
+ acceptedConn = conn;
+
+ try {
+ while (true) {
+ const nread = await conn.read(p);
+ if (nread === Deno.EOF) {
+ break;
+ }
+ await conn.write(new Uint8Array([1, 2, 3]));
+ }
+ } catch (err) {
+ assert(!!err);
+ assert(err instanceof Deno.errors.BadResource);
+ }
+
+ resolvable.resolve();
+ }
+
+ const addr = { hostname: "127.0.0.1", port: 4500 };
+ const listener = Deno.listen(addr);
+ iteratorReq(listener);
+ const conn = await Deno.connect(addr);
+ await conn.write(new Uint8Array([1, 2, 3, 4]));
+ const buf = new Uint8Array(10);
+ await conn.read(buf);
+ conn!.close();
+ acceptedConn!.close();
+ listener.close();
+ await resolvable;
+ }
+);
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index 9f36ad5fd..d222787a6 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
+use super::io::{StreamResource, StreamResourceHolder};
use crate::http_util::{create_http_client, HttpBody};
use crate::op_error::OpError;
use crate::state::State;
@@ -80,7 +80,9 @@ pub fn op_fetch(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"httpBody",
- Box::new(StreamResource::HttpBody(Box::new(body))),
+ Box::new(StreamResourceHolder::new(StreamResource::HttpBody(
+ Box::new(body),
+ ))),
);
let json_res = json!({
diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs
index 4ef59e8e7..01ce548ba 100644
--- a/cli/ops/fs.rs
+++ b/cli/ops/fs.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
-use super::io::{FileMetadata, StreamResource};
+use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
use crate::fs as deno_fs;
use crate::op_error::OpError;
use crate::ops::dispatch_json::JsonResult;
@@ -153,7 +153,10 @@ fn op_open(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"fsFile",
- Box::new(StreamResource::FsFile(fs_file, FileMetadata::default())),
+ Box::new(StreamResourceHolder::new(StreamResource::FsFile(
+ fs_file,
+ FileMetadata::default(),
+ ))),
);
Ok(json!(rid))
};
@@ -198,12 +201,12 @@ fn op_seek(
};
let state = state.borrow();
- let resource = state
+ let resource_holder = state
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
- let tokio_file = match resource {
+ let tokio_file = match resource_holder.resource {
StreamResource::FsFile(ref file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 2562b4c55..b7f67cea4 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -7,7 +7,9 @@ use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::ready;
+use std::collections::HashMap;
use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Context;
use std::task::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -56,15 +58,23 @@ pub fn init(i: &mut Isolate, s: &State) {
);
}
-pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
- let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default());
- let stdout = StreamResource::Stdout({
+pub fn get_stdio() -> (
+ StreamResourceHolder,
+ StreamResourceHolder,
+ StreamResourceHolder,
+) {
+ let stdin = StreamResourceHolder::new(StreamResource::Stdin(
+ tokio::io::stdin(),
+ TTYMetadata::default(),
+ ));
+ let stdout = StreamResourceHolder::new(StreamResource::Stdout({
let stdout = STDOUT_HANDLE
.try_clone()
.expect("Unable to clone stdout handle");
tokio::fs::File::from_std(stdout)
- });
- let stderr = StreamResource::Stderr(tokio::io::stderr());
+ }));
+ let stderr =
+ StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));
(stdin, stdout, stderr)
}
@@ -87,6 +97,51 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}
+pub struct StreamResourceHolder {
+ pub resource: StreamResource,
+ waker: HashMap<usize, futures::task::AtomicWaker>,
+ waker_counter: AtomicUsize,
+}
+
+impl StreamResourceHolder {
+ pub fn new(resource: StreamResource) -> StreamResourceHolder {
+ StreamResourceHolder {
+ resource,
+ // Atleast one task is expecter for the resource
+ waker: HashMap::with_capacity(1),
+ // Tracks wakers Ids
+ waker_counter: AtomicUsize::new(0),
+ }
+ }
+}
+
+impl Drop for StreamResourceHolder {
+ fn drop(&mut self) {
+ self.wake_tasks();
+ }
+}
+
+impl StreamResourceHolder {
+ pub fn track_task(&mut self, cx: &Context) -> Result<usize, OpError> {
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ // Its OK if it overflows
+ let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
+ self.waker.insert(task_waker_id, waker);
+ Ok(task_waker_id)
+ }
+
+ pub fn wake_tasks(&mut self) {
+ for waker in self.waker.values() {
+ waker.wake();
+ }
+ }
+
+ pub fn untrack_task(&mut self, task_waker_id: usize) {
+ self.waker.remove(&task_waker_id);
+ }
+}
+
pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
Stdout(tokio::fs::File),
@@ -150,10 +205,27 @@ pub fn op_read(
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
- let resource = resource_table
- .get_mut::<StreamResource>(rid as u32)
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
- let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;
+
+ let mut task_tracker_id: Option<usize> = None;
+ let nread = match resource_holder
+ .resource
+ .poll_read(cx, &mut buf.as_mut()[..])
+ .map_err(OpError::from)
+ {
+ Poll::Ready(t) => {
+ if let Some(id) = task_tracker_id {
+ resource_holder.untrack_task(id);
+ }
+ t
+ }
+ Poll::Pending => {
+ task_tracker_id.replace(resource_holder.track_task(cx)?);
+ return Poll::Pending;
+ }
+ }?;
Poll::Ready(Ok(nread as i32))
})
.boxed_local()
@@ -233,10 +305,10 @@ pub fn op_write(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
- let resource = resource_table
- .get_mut::<StreamResource>(rid as u32)
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
- resource.poll_write(cx, &buf.as_ref()[..])
+ resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;
@@ -246,10 +318,10 @@ pub fn op_write(
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
- let resource = resource_table
- .get_mut::<StreamResource>(rid as u32)
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
- resource.poll_flush(cx)
+ resource_holder.resource.poll_flush(cx)
})
.await?;
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index d8dd9b3c9..3987e94c1 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
+use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
@@ -78,9 +78,12 @@ fn op_accept(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
- let rid = state
- .resource_table
- .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ let rid = state.resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
+ tcp_stream,
+ ))),
+ );
Ok(json!({
"rid": rid,
"localAddr": {
@@ -207,9 +210,12 @@ fn op_connect(
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state_.borrow_mut();
- let rid = state
- .resource_table
- .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ let rid = state.resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(
+ tcp_stream,
+ ))),
+ );
Ok(json!({
"rid": rid,
"localAddr": {
@@ -251,11 +257,11 @@ fn op_shutdown(
};
let mut state = state.borrow_mut();
- let resource = state
+ let resource_holder = state
.resource_table
- .get_mut::<StreamResource>(rid)
+ .get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
- match resource {
+ match resource_holder.resource {
StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(OpError::from)?;
}
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index 743ffa22b..55080fc2d 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
+use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::signal::kill;
use crate::state::State;
@@ -24,11 +24,11 @@ pub fn init(i: &mut Isolate, s: &State) {
fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
let mut state = state.borrow_mut();
- let repr = state
+ let repr_holder = state
.resource_table
- .get_mut::<StreamResource>(rid)
+ .get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
- let file = match repr {
+ let file = match repr_holder.resource {
StreamResource::FsFile(ref mut file, _) => file,
_ => return Err(OpError::bad_resource_id()),
};
@@ -127,7 +127,9 @@ fn op_run(
Some(child_stdin) => {
let rid = table.add(
"childStdin",
- Box::new(StreamResource::ChildStdin(child_stdin)),
+ Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
+ child_stdin,
+ ))),
);
Some(rid)
}
@@ -138,7 +140,9 @@ fn op_run(
Some(child_stdout) => {
let rid = table.add(
"childStdout",
- Box::new(StreamResource::ChildStdout(child_stdout)),
+ Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
+ child_stdout,
+ ))),
);
Some(rid)
}
@@ -149,7 +153,9 @@ fn op_run(
Some(child_stderr) => {
let rid = table.add(
"childStderr",
- Box::new(StreamResource::ChildStderr(child_stderr)),
+ Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
+ child_stderr,
+ ))),
);
Some(rid)
}
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs
index e64bc4745..642284ea2 100644
--- a/cli/ops/tls.rs
+++ b/cli/ops/tls.rs
@@ -1,6 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
+use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::resolve_addr::resolve_addr;
use crate::state::State;
@@ -85,7 +85,9 @@ pub fn op_connect_tls(
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"clientTlsStream",
- Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
+ Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
+ Box::new(tls_stream),
+ ))),
);
Ok(json!({
"rid": rid,
@@ -318,7 +320,9 @@ fn op_accept_tls(
let mut state = state.borrow_mut();
state.resource_table.add(
"serverTlsStream",
- Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
+ Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
+ Box::new(tls_stream),
+ ))),
)
};
Ok(json!({
diff --git a/cli/ops/tty.rs b/cli/ops/tty.rs
index 69ac66688..c44ab946f 100644
--- a/cli/ops/tty.rs
+++ b/cli/ops/tty.rs
@@ -1,5 +1,5 @@
use super::dispatch_json::JsonOp;
-use super::io::StreamResource;
+use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::ops::json_op;
use crate::state::State;
@@ -66,13 +66,13 @@ pub fn op_set_raw(
use winapi::um::{consoleapi, handleapi};
let state = state_.borrow_mut();
- let resource = state.resource_table.get::<StreamResource>(rid);
- if resource.is_none() {
+ let resource_holder = state.resource_table.get::<StreamResourceHolder>(rid);
+ if resource_holder.is_none() {
return Err(OpError::bad_resource_id());
}
// For now, only stdin.
- let handle = match resource.unwrap() {
+ let handle = match &resource_holder.unwrap().resource {
StreamResource::Stdin(_, _) => std::io::stdin().as_raw_handle(),
StreamResource::FsFile(f, _) => {
let tokio_file = futures::executor::block_on(f.try_clone())?;
@@ -111,25 +111,27 @@ pub fn op_set_raw(
use std::os::unix::io::AsRawFd;
let mut state = state_.borrow_mut();
- let resource = state.resource_table.get_mut::<StreamResource>(rid);
- if resource.is_none() {
+ let resource_holder =
+ state.resource_table.get_mut::<StreamResourceHolder>(rid);
+ if resource_holder.is_none() {
return Err(OpError::bad_resource_id());
}
if is_raw {
- let (raw_fd, maybe_tty_mode) = match resource.unwrap() {
- StreamResource::Stdin(_, ref mut metadata) => {
- (std::io::stdin().as_raw_fd(), &mut metadata.mode)
- }
- StreamResource::FsFile(f, ref mut metadata) => {
- let tokio_file = futures::executor::block_on(f.try_clone())?;
- let std_file = futures::executor::block_on(tokio_file.into_std());
- (std_file.as_raw_fd(), &mut metadata.tty.mode)
- }
- _ => {
- return Err(OpError::other("Not supported".to_owned()));
- }
- };
+ let (raw_fd, maybe_tty_mode) =
+ match &mut resource_holder.unwrap().resource {
+ StreamResource::Stdin(_, ref mut metadata) => {
+ (std::io::stdin().as_raw_fd(), &mut metadata.mode)
+ }
+ StreamResource::FsFile(f, ref mut metadata) => {
+ let tokio_file = futures::executor::block_on(f.try_clone())?;
+ let std_file = futures::executor::block_on(tokio_file.into_std());
+ (std_file.as_raw_fd(), &mut metadata.tty.mode)
+ }
+ _ => {
+ return Err(OpError::other("Not supported".to_owned()));
+ }
+ };
if maybe_tty_mode.is_some() {
// Already raw. Skip.
@@ -159,19 +161,20 @@ pub fn op_set_raw(
Ok(JsonOp::Sync(json!({})))
} else {
// Try restore saved mode.
- let (raw_fd, maybe_tty_mode) = match resource.unwrap() {
- StreamResource::Stdin(_, ref mut metadata) => {
- (std::io::stdin().as_raw_fd(), &mut metadata.mode)
- }
- StreamResource::FsFile(f, ref mut metadata) => {
- let tokio_file = futures::executor::block_on(f.try_clone())?;
- let std_file = futures::executor::block_on(tokio_file.into_std());
- (std_file.as_raw_fd(), &mut metadata.tty.mode)
- }
- _ => {
- return Err(OpError::other("Not supported".to_owned()));
- }
- };
+ let (raw_fd, maybe_tty_mode) =
+ match &mut resource_holder.unwrap().resource {
+ StreamResource::Stdin(_, ref mut metadata) => {
+ (std::io::stdin().as_raw_fd(), &mut metadata.mode)
+ }
+ StreamResource::FsFile(f, ref mut metadata) => {
+ let tokio_file = futures::executor::block_on(f.try_clone())?;
+ let std_file = futures::executor::block_on(tokio_file.into_std());
+ (std_file.as_raw_fd(), &mut metadata.tty.mode)
+ }
+ _ => {
+ return Err(OpError::other("Not supported".to_owned()));
+ }
+ };
if let Some(mode) = maybe_tty_mode.take() {
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
@@ -200,12 +203,12 @@ pub fn op_isatty(
return Err(OpError::bad_resource_id());
}
- let resource = state.resource_table.get::<StreamResource>(rid);
- if resource.is_none() {
+ let resource_holder = state.resource_table.get::<StreamResourceHolder>(rid);
+ if resource_holder.is_none() {
return Ok(JsonOp::Sync(json!(false)));
}
- match resource.unwrap() {
+ match &resource_holder.unwrap().resource {
StreamResource::Stdin(_, _) => {
Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stdin))))
}
diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs
index d5a3fb8c8..ce08c6b61 100644
--- a/cli/tests/integration_tests.rs
+++ b/cli/tests/integration_tests.rs
@@ -1785,7 +1785,8 @@ fn test_permissions_net_fetch_allow_localhost_4545() {
true,
"run --allow-net=localhost:4545 complex_permissions_test.ts netFetch http://localhost:4545/",
None,
- None,true,
+ None,
+ true,
);
assert!(!err.contains(util::PERMISSION_DENIED_PATTERN));
}
@@ -1918,7 +1919,7 @@ fn test_permissions_net_listen_allow_localhost() {
"run --allow-net=localhost complex_permissions_test.ts netListen localhost:4600",
None,
None,
- false,
+ false,
);
assert!(!err.contains(util::PERMISSION_DENIED_PATTERN));
}
@@ -1932,6 +1933,7 @@ mod util {
use std::process::Command;
use std::process::Output;
use std::process::Stdio;
+
use tempfile::TempDir;
pub const PERMISSION_VARIANTS: [&str; 5] =