summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/net_test.ts60
-rw-r--r--core/async_cell.rs15
-rw-r--r--core/examples/http_bench_bin_ops.rs12
-rw-r--r--core/examples/http_bench_json_ops.rs12
-rw-r--r--core/lib.rs6
-rw-r--r--core/ops.rs15
-rw-r--r--core/resources.rs253
-rw-r--r--core/resources2.rs146
-rw-r--r--op_crates/fetch/lib.rs68
-rw-r--r--runtime/errors.rs6
-rw-r--r--runtime/ops/fs.rs20
-rw-r--r--runtime/ops/fs_events.rs58
-rw-r--r--runtime/ops/io.rs633
-rw-r--r--runtime/ops/net.rs255
-rw-r--r--runtime/ops/net_unix.rs106
-rw-r--r--runtime/ops/plugin.rs12
-rw-r--r--runtime/ops/process.rs74
-rw-r--r--runtime/ops/signal.rs74
-rw-r--r--runtime/ops/tls.rs245
-rw-r--r--runtime/ops/tty.rs127
-rw-r--r--runtime/ops/websocket.rs173
-rw-r--r--runtime/rt/30_net.js32
-rw-r--r--runtime/rt/40_fs_events.js2
-rw-r--r--runtime/rt/40_signals.js11
-rw-r--r--runtime/web_worker.rs7
-rw-r--r--runtime/worker.rs6
26 files changed, 1222 insertions, 1206 deletions
diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts
index abbb23b33..62b00e43c 100644
--- a/cli/tests/unit/net_test.ts
+++ b/cli/tests/unit/net_test.ts
@@ -21,8 +21,6 @@ unitTest({ perms: { net: true } }, function netTcpListenClose(): void {
unitTest(
{
perms: { net: true },
- // TODO:
- ignore: Deno.build.os === "windows",
},
function netUdpListenClose(): void {
const socket = Deno.listenDatagram({
@@ -257,7 +255,7 @@ unitTest(
);
unitTest(
- { ignore: Deno.build.os === "windows", perms: { net: true } },
+ { perms: { net: true } },
async function netUdpSendReceive(): Promise<void> {
const alice = Deno.listenDatagram({ port: 3500, transport: "udp" });
assert(alice.addr.transport === "udp");
@@ -287,7 +285,31 @@ unitTest(
);
unitTest(
- { ignore: Deno.build.os === "windows", perms: { net: true } },
+ { perms: { net: true } },
+ async function netUdpConcurrentSendReceive(): Promise<void> {
+ const socket = Deno.listenDatagram({ port: 3500, transport: "udp" });
+ assert(socket.addr.transport === "udp");
+ assertEquals(socket.addr.port, 3500);
+ assertEquals(socket.addr.hostname, "127.0.0.1");
+
+ const recvPromise = socket.receive();
+
+ const sendBuf = new Uint8Array([1, 2, 3]);
+ const sendLen = await socket.send(sendBuf, socket.addr);
+ assertEquals(sendLen, 3);
+
+ const [recvBuf, recvAddr] = await recvPromise;
+ assertEquals(recvBuf.length, 3);
+ assertEquals(1, recvBuf[0]);
+ assertEquals(2, recvBuf[1]);
+ assertEquals(3, recvBuf[2]);
+
+ socket.close();
+ },
+);
+
+unitTest(
+ { perms: { net: true } },
async function netUdpBorrowMutError(): Promise<void> {
const socket = Deno.listenDatagram({
port: 4501,
@@ -335,6 +357,34 @@ unitTest(
},
);
+// TODO(piscisaureus): Enable after Tokio v0.3/v1.0 upgrade.
+unitTest(
+ { ignore: true, perms: { read: true, write: true } },
+ async function netUnixPacketConcurrentSendReceive(): Promise<void> {
+ const filePath = await Deno.makeTempFile();
+ const socket = Deno.listenDatagram({
+ path: filePath,
+ transport: "unixpacket",
+ });
+ assert(socket.addr.transport === "unixpacket");
+ assertEquals(socket.addr.path, filePath);
+
+ const recvPromise = socket.receive();
+
+ const sendBuf = new Uint8Array([1, 2, 3]);
+ const sendLen = await socket.send(sendBuf, socket.addr);
+ assertEquals(sendLen, 3);
+
+ const [recvBuf, recvAddr] = await recvPromise;
+ assertEquals(recvBuf.length, 3);
+ assertEquals(1, recvBuf[0]);
+ assertEquals(2, recvBuf[1]);
+ assertEquals(3, recvBuf[2]);
+
+ socket.close();
+ },
+);
+
unitTest(
{ perms: { net: true } },
async function netTcpListenIteratorBreakClosesResource(): Promise<void> {
@@ -385,7 +435,7 @@ unitTest(
);
unitTest(
- { ignore: Deno.build.os === "windows", perms: { net: true } },
+ { perms: { net: true } },
async function netUdpListenCloseWhileIterating(): Promise<void> {
const socket = Deno.listenDatagram({ port: 8000, transport: "udp" });
const nextWhileClosing = socket[Symbol.asyncIterator]().next();
diff --git a/core/async_cell.rs b/core/async_cell.rs
index d11b83932..cd6c51682 100644
--- a/core/async_cell.rs
+++ b/core/async_cell.rs
@@ -1,10 +1,14 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use std::any::type_name;
use std::any::Any;
use std::borrow::Borrow;
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
use std::ops::Deref;
use std::rc::Rc;
@@ -45,6 +49,17 @@ impl<T: 'static> AsyncRefCell<T> {
pub fn as_ptr(&self) -> *mut T {
self.value.get()
}
+
+ pub fn into_inner(self) -> T {
+ assert!(self.borrow_count.get().is_empty());
+ self.value.into_inner()
+ }
+}
+
+impl<T> Debug for AsyncRefCell<T> {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(f, "AsyncRefCell<{}>", type_name::<T>())
+ }
}
impl<T: Default + 'static> Default for AsyncRefCell<T> {
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs
index 1d7a76c3d..32529652b 100644
--- a/core/examples/http_bench_bin_ops.rs
+++ b/core/examples/http_bench_bin_ops.rs
@@ -170,7 +170,7 @@ fn op_listen(
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
- let rid = state.resource_table_2.add(listener);
+ let rid = state.resource_table.add(listener);
Ok(rid)
}
@@ -181,7 +181,7 @@ fn op_close(
) -> Result<u32, Error> {
debug!("close rid={}", rid);
state
- .resource_table_2
+ .resource_table
.close(rid)
.map(|_| 0)
.ok_or_else(bad_resource_id)
@@ -196,11 +196,11 @@ async fn op_accept(
let listener = state
.borrow()
- .resource_table_2
+ .resource_table
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?;
- let rid = state.borrow_mut().resource_table_2.add(stream);
+ let rid = state.borrow_mut().resource_table.add(stream);
Ok(rid)
}
@@ -214,7 +214,7 @@ async fn op_read(
let stream = state
.borrow()
- .resource_table_2
+ .resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
stream.read(&mut bufs[0]).await
@@ -230,7 +230,7 @@ async fn op_write(
let stream = state
.borrow()
- .resource_table_2
+ .resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
stream.write(&bufs[0]).await
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index c4fcd6363..8cf4061cc 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -134,7 +134,7 @@ fn op_listen(
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
- let rid = state.resource_table_2.add(listener);
+ let rid = state.resource_table.add(listener);
Ok(serde_json::json!({ "rid": rid }))
}
@@ -152,7 +152,7 @@ fn op_close(
.unwrap();
debug!("close rid={}", rid);
state
- .resource_table_2
+ .resource_table
.close(rid)
.map(|_| serde_json::json!(()))
.ok_or_else(bad_resource_id)
@@ -174,11 +174,11 @@ async fn op_accept(
let listener = state
.borrow()
- .resource_table_2
+ .resource_table
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?;
- let rid = state.borrow_mut().resource_table_2.add(stream);
+ let rid = state.borrow_mut().resource_table.add(stream);
Ok(serde_json::json!({ "rid": rid }))
}
@@ -199,7 +199,7 @@ async fn op_read(
let stream = state
.borrow()
- .resource_table_2
+ .resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nread = stream.read(&mut bufs[0]).await?;
@@ -223,7 +223,7 @@ async fn op_write(
let stream = state
.borrow()
- .resource_table_2
+ .resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nwritten = stream.write(&bufs[0]).await?;
diff --git a/core/lib.rs b/core/lib.rs
index 5846ad99d..6cecd4d75 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -17,7 +17,6 @@ mod normalize_path;
mod ops;
pub mod plugin_api;
mod resources;
-mod resources2;
mod runtime;
mod shared_queue;
mod zero_copy_buf;
@@ -64,10 +63,9 @@ pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
+pub use crate::resources::Resource;
+pub use crate::resources::ResourceId;
pub use crate::resources::ResourceTable;
-pub use crate::resources2::Resource;
-pub use crate::resources2::ResourceId;
-pub use crate::resources2::ResourceTable2;
pub use crate::runtime::GetErrorClassFn;
pub use crate::runtime::JsErrorCreateFn;
pub use crate::runtime::JsRuntime;
diff --git a/core/ops.rs b/core/ops.rs
index bf10d3d86..2907d2552 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -4,6 +4,8 @@ use crate::error::bad_resource_id;
use crate::error::type_error;
use crate::error::AnyError;
use crate::gotham_state::GothamState;
+use crate::resources::ResourceTable;
+use crate::runtime::GetErrorClassFn;
use crate::BufVec;
use crate::ZeroCopyBuf;
use futures::Future;
@@ -33,10 +35,9 @@ pub enum Op {
/// Maintains the resources and ops inside a JS runtime.
pub struct OpState {
- pub resource_table: crate::ResourceTable,
- pub resource_table_2: crate::resources2::ResourceTable,
+ pub resource_table: ResourceTable,
pub op_table: OpTable,
- pub get_error_class_fn: crate::runtime::GetErrorClassFn,
+ pub get_error_class_fn: GetErrorClassFn,
gotham_state: GothamState,
}
@@ -47,7 +48,6 @@ impl Default for OpState {
fn default() -> OpState {
OpState {
resource_table: Default::default(),
- resource_table_2: Default::default(),
op_table: OpTable::default(),
get_error_class_fn: &|_| "Error",
gotham_state: Default::default(),
@@ -279,7 +279,11 @@ pub fn op_resources(
_args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, AnyError> {
- let serialized_resources = state.resource_table.entries();
+ let serialized_resources: HashMap<u32, String> = state
+ .resource_table
+ .names()
+ .map(|(rid, name)| (rid, name.to_string()))
+ .collect();
Ok(json!(serialized_resources))
}
@@ -300,5 +304,6 @@ pub fn op_close(
.resource_table
.close(rid as u32)
.ok_or_else(bad_resource_id)?;
+
Ok(json!({}))
}
diff --git a/core/resources.rs b/core/resources.rs
index 753fa9713..da3b634fc 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -1,20 +1,63 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-// Think of Resources as File Descriptors. They are integers that are allocated by
-// the privileged side of Deno to refer to various rust objects that need to be
-// referenced between multiple ops. For example, network sockets are resources.
-// Resources may or may not correspond to a real operating system file
-// descriptor (hence the different name).
+// Think of Resources as File Descriptors. They are integers that are allocated
+// by the privileged side of Deno which refer to various rust objects that need
+// to be persisted between various ops. For example, network sockets are
+// resources. Resources may or may not correspond to a real operating system
+// file descriptor (hence the different name).
-use crate::resources2::ResourceId;
+use std::any::type_name;
use std::any::Any;
+use std::any::TypeId;
+use std::borrow::Cow;
use std::collections::HashMap;
+use std::iter::Iterator;
+use std::rc::Rc;
+
+/// All objects that can be store in the resource table should implement the
+/// `Resource` trait.
+pub trait Resource: Any + 'static {
+ /// Returns a string representation of the resource which is made available
+ /// to JavaScript code through `op_resources`. The default implementation
+ /// returns the Rust type name, but specific resource types may override this
+ /// trait method.
+ fn name(&self) -> Cow<str> {
+ type_name::<Self>().into()
+ }
+
+ /// Resources may implement the `close()` trait method if they need to do
+ /// resource specific clean-ups, such as cancelling pending futures, after a
+ /// resource has been removed from the resource table.
+ fn close(self: Rc<Self>) {}
+}
+
+impl dyn Resource {
+ #[inline(always)]
+ fn is<T: Resource>(&self) -> bool {
+ self.type_id() == TypeId::of::<T>()
+ }
-/// These store Deno's file descriptors. These are not necessarily the operating
-/// system ones.
-type ResourceMap = HashMap<ResourceId, (String, Box<dyn Any>)>;
+ #[inline(always)]
+ #[allow(clippy::needless_lifetimes)]
+ pub fn downcast_rc<'a, T: Resource>(self: &'a Rc<Self>) -> Option<&'a Rc<T>> {
+ if self.is::<T>() {
+ let ptr = self as *const Rc<_> as *const Rc<T>;
+ Some(unsafe { &*ptr })
+ } else {
+ None
+ }
+ }
+}
-/// Map-like data structure storing Deno's resources (equivalent to file descriptors).
+/// A `ResourceId` is an integer value referencing a resource. It could be
+/// considered to be the Deno equivalent of a `file descriptor` in POSIX like
+/// operating systems. Elsewhere in the code base it is commonly abbreviated
+/// to `rid`.
+// TODO: use `u64` instead?
+pub type ResourceId = u32;
+
+/// Map-like data structure storing Deno's resources (equivalent to file
+/// descriptors).
///
/// Provides basic methods for element access. A resource can be of any type.
/// Different types of resources can be stored in the same map, and provided
@@ -24,156 +67,98 @@ type ResourceMap = HashMap<ResourceId, (String, Box<dyn Any>)>;
/// the key in the map.
#[derive(Default)]
pub struct ResourceTable {
- map: ResourceMap,
- next_id: u32,
+ index: HashMap<ResourceId, Rc<dyn Resource>>,
+ next_rid: ResourceId,
}
impl ResourceTable {
- /// Checks if the given resource ID is contained.
- pub fn has(&self, rid: ResourceId) -> bool {
- self.map.contains_key(&rid)
- }
-
- /// Returns a shared reference to a resource.
+ /// Inserts resource into the resource table, which takes ownership of it.
///
- /// Returns `None`, if `rid` is not stored or has a type different from `T`.
- pub fn get<T: Any>(&self, rid: ResourceId) -> Option<&T> {
- let (_, resource) = self.map.get(&rid)?;
- resource.downcast_ref::<T>()
- }
-
- /// Returns a mutable reference to a resource.
+ /// The resource type is erased at runtime and must be statically known
+ /// when retrieving it through `get()`.
///
- /// Returns `None`, if `rid` is not stored or has a type different from `T`.
- pub fn get_mut<T: Any>(&mut self, rid: ResourceId) -> Option<&mut T> {
- let (_, resource) = self.map.get_mut(&rid)?;
- resource.downcast_mut::<T>()
- }
-
- // TODO: resource id allocation should probably be randomized for security.
- fn next_rid(&mut self) -> ResourceId {
- let next_rid = self.next_id;
- self.next_id += 1;
- next_rid as ResourceId
+ /// Returns a unique resource ID, which acts as a key for this resource.
+ pub fn add<T: Resource>(&mut self, resource: T) -> ResourceId {
+ self.add_rc(Rc::new(resource))
}
- /// Inserts a resource, taking ownership of it.
+ /// Inserts a `Rc`-wrapped resource into the resource table.
///
/// The resource type is erased at runtime and must be statically known
/// when retrieving it through `get()`.
///
/// Returns a unique resource ID, which acts as a key for this resource.
- pub fn add(&mut self, name: &str, resource: Box<dyn Any>) -> ResourceId {
- let rid = self.next_rid();
- let r = self.map.insert(rid, (name.to_string(), resource));
- assert!(r.is_none());
+ pub fn add_rc<T: Resource>(&mut self, resource: Rc<T>) -> ResourceId {
+ let resource = resource as Rc<dyn Resource>;
+ let rid = self.next_rid;
+ let removed_resource = self.index.insert(rid, resource);
+ assert!(removed_resource.is_none());
+ self.next_rid += 1;
rid
}
- /// Returns a map of resource IDs to names.
- ///
- /// The name is the one specified during `add()`. To access resources themselves,
- /// use the `get()` or `get_mut()` functions.
- pub fn entries(&self) -> HashMap<ResourceId, String> {
- self
- .map
- .iter()
- .map(|(key, (name, _resource))| (*key, name.clone()))
- .collect()
- }
-
- // close(2) is done by dropping the value. Therefore we just need to remove
- // the resource from the resource table.
- pub fn close(&mut self, rid: ResourceId) -> Option<()> {
- self.map.remove(&rid).map(|(_name, _resource)| ())
- }
-
- /// Removes the resource identified by `rid` and returns it.
- ///
- /// When the provided `rid` is stored, the associated resource will be removed.
- /// Otherwise, nothing happens and `None` is returned.
- ///
- /// If the type `T` matches the resource's type, the resource will be returned.
- /// If the type mismatches, `None` is returned, but the resource is still removed.
- pub fn remove<T: Any>(&mut self, rid: ResourceId) -> Option<Box<T>> {
- if let Some((_name, resource)) = self.map.remove(&rid) {
- let res = match resource.downcast::<T>() {
- Ok(res) => Some(res),
- Err(_e) => None,
- };
- return res;
- }
- None
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- struct FakeResource {
- not_empty: u128,
- }
-
- impl FakeResource {
- fn new(value: u128) -> FakeResource {
- FakeResource { not_empty: value }
- }
+ /// Returns true if any resource with the given `rid` exists.
+ pub fn has(&self, rid: ResourceId) -> bool {
+ self.index.contains_key(&rid)
}
- #[test]
- fn test_create_resource_table_default() {
- let table = ResourceTable::default();
- assert_eq!(table.map.len(), 0);
+ /// Returns a reference counted pointer to the resource of type `T` with the
+ /// given `rid`. If `rid` is not present or has a type different than `T`,
+ /// this function returns `None`.
+ pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<Rc<T>> {
+ self
+ .index
+ .get(&rid)
+ .and_then(|rc| rc.downcast_rc::<T>())
+ .map(Clone::clone)
}
- #[test]
- fn test_add_to_resource_table_not_empty() {
- let mut table = ResourceTable::default();
- table.add("fake1", Box::new(FakeResource::new(1)));
- table.add("fake2", Box::new(FakeResource::new(2)));
- assert_eq!(table.map.len(), 2);
+ pub fn get_any(&self, rid: ResourceId) -> Option<Rc<dyn Resource>> {
+ self.index.get(&rid).map(Clone::clone)
}
- #[test]
- fn test_add_to_resource_table_are_contiguous() {
- let mut table = ResourceTable::default();
- let rid1 = table.add("fake1", Box::new(FakeResource::new(1)));
- let rid2 = table.add("fake2", Box::new(FakeResource::new(2)));
- assert_eq!(rid1 + 1, rid2);
+ /// Removes a resource of type `T` from the resource table and returns it.
+ /// If a resource with the given `rid` exists but its type does not match `T`,
+ /// it is not removed from the resource table. Note that the resource's
+ /// `close()` method is *not* called.
+ pub fn take<T: Resource>(&mut self, rid: ResourceId) -> Option<Rc<T>> {
+ let resource = self.get::<T>(rid)?;
+ self.index.remove(&rid);
+ Some(resource)
}
- #[test]
- fn test_get_from_resource_table_is_what_was_given() {
- let mut table = ResourceTable::default();
- let rid = table.add("fake", Box::new(FakeResource::new(7)));
- let resource = table.get::<FakeResource>(rid);
- assert_eq!(resource.unwrap().not_empty, 7);
+ /// Removes a resource from the resource table and returns it. Note that the
+ /// resource's `close()` method is *not* called.
+ pub fn take_any(&mut self, rid: ResourceId) -> Option<Rc<dyn Resource>> {
+ self.index.remove(&rid)
}
- #[test]
- fn test_remove_from_resource_table() {
- let mut table = ResourceTable::default();
- let rid1 = table.add("fake1", Box::new(FakeResource::new(1)));
- let rid2 = table.add("fake2", Box::new(FakeResource::new(2)));
- assert_eq!(table.map.len(), 2);
- table.close(rid1);
- assert_eq!(table.map.len(), 1);
- table.close(rid2);
- assert_eq!(table.map.len(), 0);
+ /// Removes the resource with the given `rid` from the resource table. If the
+ /// only reference to this resource existed in the resource table, this will
+ /// cause the resource to be dropped. However, since resources are reference
+ /// counted, therefore pending ops are not automatically cancelled. A resource
+ /// may implement the `close()` method to perform clean-ups such as canceling
+ /// ops.
+ pub fn close(&mut self, rid: ResourceId) -> Option<()> {
+ self.index.remove(&rid).map(|resource| resource.close())
}
- #[test]
- fn test_take_from_resource_table() {
- let mut table = ResourceTable::default();
- let rid1 = table.add("fake1", Box::new(FakeResource::new(1)));
- let rid2 = table.add("fake2", Box::new(FakeResource::new(2)));
- assert_eq!(table.map.len(), 2);
- let res1 = table.remove::<FakeResource>(rid1);
- assert_eq!(table.map.len(), 1);
- assert!(res1.is_some());
- let res2 = table.remove::<FakeResource>(rid2);
- assert_eq!(table.map.len(), 0);
- assert!(res2.is_some());
+ /// Returns an iterator that yields a `(id, name)` pair for every resource
+ /// that's currently in the resource table. This can be used for debugging
+ /// purposes or to implement the `op_resources` op. Note that the order in
+ /// which items appear is not specified.
+ ///
+ /// # Example
+ ///
+ /// ```
+ /// # use deno_core::ResourceTable;
+ /// # let resource_table = ResourceTable::default();
+ /// let resource_names = resource_table.names().collect::<Vec<_>>();
+ /// ```
+ pub fn names(&self) -> impl Iterator<Item = (ResourceId, Cow<str>)> {
+ self
+ .index
+ .iter()
+ .map(|(&id, resource)| (id, resource.name()))
}
}
diff --git a/core/resources2.rs b/core/resources2.rs
deleted file mode 100644
index 989ea8328..000000000
--- a/core/resources2.rs
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-
-// Think of Resources as File Descriptors. They are integers that are allocated
-// by the privileged side of Deno which refer to various rust objects that need
-// to be persisted between various ops. For example, network sockets are
-// resources. Resources may or may not correspond to a real operating system
-// file descriptor (hence the different name).
-
-use std::any::type_name;
-use std::any::Any;
-use std::any::TypeId;
-use std::borrow::Cow;
-use std::collections::HashMap;
-use std::iter::Iterator;
-use std::rc::Rc;
-
-/// All objects that can be store in the resource table should implement the
-/// `Resource` trait.
-pub trait Resource: Any + 'static {
- /// Returns a string representation of the resource which is made available
- /// to JavaScript code through `op_resources`. The default implementation
- /// returns the Rust type name, but specific resource types may override this
- /// trait method.
- fn name(&self) -> Cow<str> {
- type_name::<Self>().into()
- }
-
- /// Resources may implement the `close()` trait method if they need to do
- /// resource specific clean-ups, such as cancelling pending futures, after a
- /// resource has been removed from the resource table.
- fn close(self: Rc<Self>) {}
-}
-
-impl dyn Resource {
- #[inline(always)]
- fn is<T: Resource>(&self) -> bool {
- self.type_id() == TypeId::of::<T>()
- }
-
- #[inline(always)]
- #[allow(clippy::needless_lifetimes)]
- fn downcast_rc<'a, T: Resource>(self: &'a Rc<Self>) -> Option<&'a Rc<T>> {
- if self.is::<T>() {
- let ptr = self as *const Rc<_> as *const Rc<T>;
- Some(unsafe { &*ptr })
- } else {
- None
- }
- }
-}
-
-/// A `ResourceId` is an integer value referencing a resource. It could be
-/// considered to be the Deno equivalent of a `file descriptor` in POSIX like
-/// operating systems. Elsewhere in the code base it is commonly abbreviated
-/// to `rid`.
-// TODO: use `u64` instead?
-pub type ResourceId = u32;
-
-/// Temporary alias for `crate::resources2::ResourceTable`.
-// TODO: remove this when the old `ResourceTable` is obsolete.
-pub type ResourceTable2 = ResourceTable;
-
-/// Map-like data structure storing Deno's resources (equivalent to file
-/// descriptors).
-///
-/// Provides basic methods for element access. A resource can be of any type.
-/// Different types of resources can be stored in the same map, and provided
-/// with a name for description.
-///
-/// Each resource is identified through a _resource ID (rid)_, which acts as
-/// the key in the map.
-#[derive(Default)]
-pub struct ResourceTable {
- index: HashMap<ResourceId, Rc<dyn Resource>>,
- next_rid: ResourceId,
-}
-
-impl ResourceTable {
- /// Returns true if any resource with the given `rid` is exists.
- pub fn has(&self, rid: ResourceId) -> bool {
- self.index.contains_key(&rid)
- }
-
- /// Returns a reference counted pointer to the resource of type `T` with the
- /// given `rid`. If `rid` is not present or has a type different than `T`,
- /// this function returns `None`.
- pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<Rc<T>> {
- self
- .index
- .get(&rid)
- .and_then(|resource| resource.downcast_rc::<T>())
- .map(Clone::clone)
- }
-
- /// Inserts resource into the resource table, which takes ownership of it.
- ///
- /// The resource type is erased at runtime and must be statically known
- /// when retrieving it through `get()`.
- ///
- /// Returns a unique resource ID, which acts as a key for this resource.
- pub fn add<T: Resource>(&mut self, resource: T) -> ResourceId {
- self.add_rc(Rc::new(resource))
- }
-
- /// Inserts a `Rc`-wrapped resource into the resource table.
- ///
- /// The resource type is erased at runtime and must be statically known
- /// when retrieving it through `get()`.
- ///
- /// Returns a unique resource ID, which acts as a key for this resource.
- pub fn add_rc<T: Resource>(&mut self, resource: Rc<T>) -> ResourceId {
- let resource = resource as Rc<dyn Resource>;
- let rid = self.next_rid;
- let removed_resource = self.index.insert(rid, resource);
- assert!(removed_resource.is_none());
- self.next_rid += 1;
- rid
- }
-
- /// Removes the resource with the given `rid` from the resource table. If the
- /// only reference to this resource existed in the resource table, this will
- /// cause the resource to be dropped. However, since resources are reference
- /// counted, therefore pending ops are not automatically cancelled.
- pub fn close(&mut self, rid: ResourceId) -> Option<()> {
- self.index.remove(&rid).map(|resource| resource.close())
- }
-
- /// Returns an iterator that yields a `(id, name)` pair for every resource
- /// that's currently in the resource table. This can be used for debugging
- /// purposes or to implement the `op_resources` op. Note that the order in
- /// which items appear is not specified.
- ///
- /// # Example
- ///
- /// ```
- /// # use deno_core::ResourceTable2;
- /// # let resource_table = ResourceTable2::default();
- /// let resource_names = resource_table.names().collect::<Vec<_>>();
- /// ```
- pub fn names(&self) -> impl Iterator<Item = (ResourceId, Cow<str>)> {
- self
- .index
- .iter()
- .map(|(&id, resource)| (id, resource.name()))
- }
-}
diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs
index 8a4c1ee16..c2c08d2cf 100644
--- a/op_crates/fetch/lib.rs
+++ b/op_crates/fetch/lib.rs
@@ -5,15 +5,19 @@
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::url;
use deno_core::url::Url;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::JsRuntime;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use reqwest::header::HeaderName;
@@ -23,6 +27,7 @@ use reqwest::Client;
use reqwest::Method;
use reqwest::Response;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::fs::File;
@@ -172,10 +177,10 @@ where
}
}
- let rid = state
- .borrow_mut()
- .resource_table
- .add("httpBody", Box::new(res));
+ let rid = state.borrow_mut().resource_table.add(HttpBodyResource {
+ response: AsyncRefCell::new(res),
+ cancel: Default::default(),
+ });
Ok(json!({
"bodyRid": rid,
@@ -199,32 +204,43 @@ pub async fn op_fetch_read(
let args: Args = serde_json::from_value(args)?;
let rid = args.rid;
- use futures::future::poll_fn;
- use futures::ready;
- use futures::FutureExt;
- let f = poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let response = state
- .resource_table
- .get_mut::<Response>(rid as u32)
- .ok_or_else(bad_resource_id)?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<HttpBodyResource>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+ let mut response = RcRef::map(&resource, |r| &r.response).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let maybe_chunk = response.chunk().or_cancel(cancel).await??;
+ if let Some(chunk) = maybe_chunk {
+ // TODO(ry) This is terribly inefficient. Make this zero-copy.
+ Ok(json!({ "chunk": &*chunk }))
+ } else {
+ Ok(json!({ "chunk": null }))
+ }
+}
- let mut chunk_fut = response.chunk().boxed_local();
- let r = ready!(chunk_fut.poll_unpin(cx))?;
- if let Some(chunk) = r {
- // TODO(ry) This is terribly inefficient. Make this zero-copy.
- Ok(json!({ "chunk": &*chunk })).into()
- } else {
- Ok(json!({ "chunk": null })).into()
- }
- });
- f.await
+struct HttpBodyResource {
+ response: AsyncRefCell<Response>,
+ cancel: CancelHandle,
+}
+
+impl Resource for HttpBodyResource {
+ fn name(&self) -> Cow<str> {
+ "httpBody".into()
+ }
}
struct HttpClientResource {
client: Client,
}
+impl Resource for HttpClientResource {
+ fn name(&self) -> Cow<str> {
+ "httpClient".into()
+ }
+}
+
impl HttpClientResource {
fn new(client: Client) -> Self {
Self { client }
@@ -255,9 +271,7 @@ where
let client = create_http_client(args.ca_file.as_deref()).unwrap();
- let rid = state
- .resource_table
- .add("httpClient", Box::new(HttpClientResource::new(client)));
+ let rid = state.resource_table.add(HttpClientResource::new(client));
Ok(json!(rid))
}
diff --git a/runtime/errors.rs b/runtime/errors.rs
index f8f71a859..f82d95ed8 100644
--- a/runtime/errors.rs
+++ b/runtime/errors.rs
@@ -169,6 +169,12 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.map(get_dlopen_error_class)
})
.or_else(|| {
+ e.downcast_ref::<deno_core::Canceled>().map(|e| {
+ let io_err: io::Error = e.to_owned().into();
+ get_io_error_class(&io_err)
+ })
+ })
+ .or_else(|| {
e.downcast_ref::<env::VarError>()
.map(get_env_var_error_class)
})
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs
index 865c5bcca..d6d7d7e78 100644
--- a/runtime/ops/fs.rs
+++ b/runtime/ops/fs.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::io::std_file_resource;
-use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
+use super::io::StreamResource;
use crate::fs_util::canonicalize_path;
use crate::permissions::Permissions;
use deno_core::error::custom_error;
@@ -185,13 +185,8 @@ fn op_open_sync(
let (path, open_options) = open_helper(state, args)?;
let std_file = open_options.open(path)?;
let tokio_file = tokio::fs::File::from_std(std_file);
- let rid = state.resource_table.add(
- "fsFile",
- Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
- tokio_file,
- FileMetadata::default(),
- ))))),
- );
+ let resource = StreamResource::fs_file(tokio_file);
+ let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
@@ -204,13 +199,8 @@ async fn op_open_async(
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(path)
.await?;
- let rid = state.borrow_mut().resource_table.add(
- "fsFile",
- Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
- tokio_file,
- FileMetadata::default(),
- ))))),
- );
+ let resource = StreamResource::fs_file(tokio_file);
+ let rid = state.borrow_mut().resource_table.add(resource);
Ok(json!(rid))
}
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
index 4832c915c..38661e1d4 100644
--- a/runtime/ops/fs_events.rs
+++ b/runtime/ops/fs_events.rs
@@ -3,12 +3,16 @@
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use notify::event::Event as NotifyEvent;
use notify::Error as NotifyError;
@@ -18,6 +22,7 @@ use notify::RecursiveMode;
use notify::Watcher;
use serde::Deserialize;
use serde::Serialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::path::PathBuf;
@@ -32,7 +37,18 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
struct FsEventsResource {
#[allow(unused)]
watcher: RecommendedWatcher,
- receiver: mpsc::Receiver<Result<FsEvent, AnyError>>,
+ receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>,
+ cancel: CancelHandle,
+}
+
+impl Resource for FsEventsResource {
+ fn name(&self) -> Cow<str> {
+ "fsEvents".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
/// Represents a file system event.
@@ -99,8 +115,12 @@ fn op_fs_events_open(
.check_read(&PathBuf::from(path))?;
watcher.watch(path, recursive_mode)?;
}
- let resource = FsEventsResource { watcher, receiver };
- let rid = state.resource_table.add("fsEvents", Box::new(resource));
+ let resource = FsEventsResource {
+ watcher,
+ receiver: AsyncRefCell::new(receiver),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
@@ -114,20 +134,18 @@ async fn op_fs_events_poll(
rid: u32,
}
let PollArgs { rid } = serde_json::from_value(args)?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let watcher = state
- .resource_table
- .get_mut::<FsEventsResource>(rid)
- .ok_or_else(bad_resource_id)?;
- watcher
- .receiver
- .poll_recv(cx)
- .map(|maybe_result| match maybe_result {
- Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
- Some(Err(err)) => Err(err),
- None => Ok(json!({ "done": true })),
- })
- })
- .await
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<FsEventsResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let maybe_result = receiver.recv().or_cancel(cancel).await?;
+ match maybe_result {
+ Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
+ Some(Err(err)) => Err(err),
+ None => Ok(json!({ "done": true })),
+ }
}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 0f8af905a..de56f5b55 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -7,26 +7,29 @@ use deno_core::error::bad_resource_id;
use deno_core::error::resource_unavailable;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
-use deno_core::futures::ready;
+use deno_core::AsyncMutFuture;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use std::borrow::Cow;
use std::cell::RefCell;
-use std::collections::HashMap;
-use std::pin::Pin;
use std::rc::Rc;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::task::Context;
-use std::task::Poll;
-use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
+use tokio::io::AsyncWriteExt;
+use tokio::net::tcp;
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
-#[cfg(not(windows))]
+#[cfg(unix)]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
@@ -94,26 +97,28 @@ pub fn init(rt: &mut JsRuntime) {
}
pub fn get_stdio() -> (
- Option<StreamResourceHolder>,
- Option<StreamResourceHolder>,
- Option<StreamResourceHolder>,
+ Option<StreamResource>,
+ Option<StreamResource>,
+ Option<StreamResource>,
) {
- let stdin = get_stdio_stream(&STDIN_HANDLE);
- let stdout = get_stdio_stream(&STDOUT_HANDLE);
- let stderr = get_stdio_stream(&STDERR_HANDLE);
+ let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin");
+ let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout");
+ let stderr = get_stdio_stream(&STDERR_HANDLE, "stderr");
(stdin, stdout, stderr)
}
fn get_stdio_stream(
handle: &Option<std::fs::File>,
-) -> Option<StreamResourceHolder> {
+ name: &str,
+) -> Option<StreamResource> {
match handle {
None => None,
Some(file_handle) => match file_handle.try_clone() {
- Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile(
- Some((tokio::fs::File::from_std(clone), FileMetadata::default())),
- ))),
+ Ok(clone) => {
+ let tokio_file = tokio::fs::File::from_std(clone);
+ Some(StreamResource::stdio(tokio_file, name))
+ }
Err(_e) => None,
},
}
@@ -137,100 +142,317 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}
-pub struct StreamResourceHolder {
- pub resource: StreamResource,
- waker: HashMap<usize, futures::task::AtomicWaker>,
- waker_counter: AtomicUsize,
+#[derive(Debug)]
+pub struct FullDuplexResource<R, W> {
+ rd: AsyncRefCell<R>,
+ wr: AsyncRefCell<W>,
+ // When a full-duplex resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures should be attached to this cancel handle.
+ cancel_handle: CancelHandle,
}
-impl StreamResourceHolder {
- pub fn new(resource: StreamResource) -> StreamResourceHolder {
- StreamResourceHolder {
- resource,
- // Atleast one task is expecter for the resource
- waker: HashMap::with_capacity(1),
- // Tracks wakers Ids
- waker_counter: AtomicUsize::new(0),
+impl<R: 'static, W: 'static> FullDuplexResource<R, W> {
+ pub fn new((rd, wr): (R, W)) -> Self {
+ Self {
+ rd: rd.into(),
+ wr: wr.into(),
+ cancel_handle: Default::default(),
}
}
-}
-impl Drop for StreamResourceHolder {
- fn drop(&mut self) {
- self.wake_tasks();
+ pub fn into_inner(self) -> (R, W) {
+ (self.rd.into_inner(), self.wr.into_inner())
+ }
+
+ pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> {
+ RcRef::map(self, |r| &r.rd).borrow_mut()
+ }
+
+ pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> {
+ RcRef::map(self, |r| &r.wr).borrow_mut()
+ }
+
+ pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
+ RcRef::map(self, |r| &r.cancel_handle)
+ }
+
+ pub fn cancel_read_ops(&self) {
+ self.cancel_handle.cancel()
}
}
-impl StreamResourceHolder {
- pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> {
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- // Its OK if it overflows
- let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
- self.waker.insert(task_waker_id, waker);
- Ok(task_waker_id)
+impl<R, W> FullDuplexResource<R, W>
+where
+ R: AsyncRead + Unpin + 'static,
+ W: AsyncWrite + Unpin + 'static,
+{
+ async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ let mut rd = self.rd_borrow_mut().await;
+ let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
}
- pub fn wake_tasks(&mut self) {
- for waker in self.waker.values() {
- waker.wake();
- }
+ async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ let nwritten = wr.write(buf).await?;
+ Ok(nwritten)
}
+}
- pub fn untrack_task(&mut self, task_waker_id: usize) {
- self.waker.remove(&task_waker_id);
+pub type TcpStreamResource =
+ FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
+
+impl Resource for TcpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tcpStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
}
}
-pub enum StreamResource {
- FsFile(Option<(tokio::fs::File, FileMetadata)>),
- TcpStream(Option<tokio::net::TcpStream>),
- #[cfg(not(windows))]
- UnixStream(tokio::net::UnixStream),
- ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
- ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
- ChildStdin(tokio::process::ChildStdin),
- ChildStdout(tokio::process::ChildStdout),
- ChildStderr(tokio::process::ChildStderr),
+#[derive(Default)]
+pub struct StreamResource {
+ pub fs_file:
+ Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>,
+
+ #[cfg(unix)]
+ pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>,
+
+ child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>,
+
+ child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>,
+
+ child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>,
+
+ client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>,
+
+ server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>,
+
+ cancel: CancelHandle,
+ name: String,
+}
+
+impl std::fmt::Debug for StreamResource {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "StreamResource")
+ }
}
-trait UnpinAsyncRead: AsyncRead + Unpin {}
-trait UnpinAsyncWrite: AsyncWrite + Unpin {}
+impl StreamResource {
+ pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self {
+ Self {
+ fs_file: Some(AsyncRefCell::new((
+ Some(fs_file),
+ Some(FileMetadata::default()),
+ ))),
+ name: name.to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn fs_file(fs_file: tokio::fs::File) -> Self {
+ Self {
+ fs_file: Some(AsyncRefCell::new((
+ Some(fs_file),
+ Some(FileMetadata::default()),
+ ))),
+ name: "fsFile".to_string(),
+ ..Default::default()
+ }
+ }
+
+ #[cfg(unix)]
+ pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self {
+ Self {
+ unix_stream: Some(AsyncRefCell::new(unix_stream)),
+ name: "unixStream".to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn child_stdout(child: tokio::process::ChildStdout) -> Self {
+ Self {
+ child_stdout: Some(AsyncRefCell::new(child)),
+ name: "childStdout".to_string(),
+ ..Default::default()
+ }
+ }
-impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {}
-impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {}
+ pub fn child_stderr(child: tokio::process::ChildStderr) -> Self {
+ Self {
+ child_stderr: Some(AsyncRefCell::new(child)),
+ name: "childStderr".to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn child_stdin(child: tokio::process::ChildStdin) -> Self {
+ Self {
+ child_stdin: Some(AsyncRefCell::new(child)),
+ name: "childStdin".to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self {
+ Self {
+ client_tls_stream: Some(AsyncRefCell::new(stream)),
+ name: "clientTlsStream".to_string(),
+ ..Default::default()
+ }
+ }
-/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
-/// but uses an `AnyError` error instead of `std::io:Error`
-pub trait DenoAsyncRead {
- fn poll_read(
- &mut self,
- cx: &mut Context,
- buf: &mut [u8],
- ) -> Poll<Result<usize, AnyError>>;
+ pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self {
+ Self {
+ server_tls_stream: Some(AsyncRefCell::new(stream)),
+ name: "serverTlsStream".to_string(),
+ ..Default::default()
+ }
+ }
+
+ async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ // TODO(bartlomieju): in the future, it would be better for `StreamResource`
+ // to be an enum instead a struct with many `Option` fields, however I
+ // wasn't able to get it to work with `AsyncRefCell`s.
+ if self.fs_file.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.child_stdout.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?;
+ return Ok(nwritten);
+ } else if self.child_stdout.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut child_stdout =
+ RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = child_stdout.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ } else if self.child_stderr.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut child_stderr =
+ RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = child_stderr.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ } else if self.client_tls_stream.is_some() {
+ debug_assert!(self.server_tls_stream.is_none());
+ let mut client_tls_stream =
+ RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ } else if self.server_tls_stream.is_some() {
+ let mut server_tls_stream =
+ RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ }
+
+ #[cfg(unix)]
+ if self.unix_stream.is_some() {
+ let mut unix_stream =
+ RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = unix_stream.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ }
+
+ Err(bad_resource_id())
+ }
+
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ // TODO(bartlomieju): in the future, it would be better for `StreamResource`
+ // to be an enum instead a struct with many `Option` fields, however I
+ // wasn't able to get it to work with `AsyncRefCell`s.
+ if self.fs_file.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.child_stdout.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?;
+ (*fs_file).0.as_mut().unwrap().flush().await?;
+ return Ok(nwritten);
+ } else if self.child_stdin.is_some() {
+ debug_assert!(self.child_stdout.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut child_stdin =
+ RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = child_stdin.write(buf).await?;
+ child_stdin.flush().await?;
+ return Ok(nwritten);
+ } else if self.client_tls_stream.is_some() {
+ debug_assert!(self.server_tls_stream.is_none());
+ let mut client_tls_stream =
+ RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = client_tls_stream.write(buf).await?;
+ client_tls_stream.flush().await?;
+ return Ok(nwritten);
+ } else if self.server_tls_stream.is_some() {
+ let mut server_tls_stream =
+ RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = server_tls_stream.write(buf).await?;
+ server_tls_stream.flush().await?;
+ return Ok(nwritten);
+ }
+
+ #[cfg(unix)]
+ if self.unix_stream.is_some() {
+ let mut unix_stream =
+ RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = unix_stream.write(buf).await?;
+ unix_stream.flush().await?;
+ return Ok(nwritten);
+ }
+
+ Err(bad_resource_id())
+ }
}
-impl DenoAsyncRead for StreamResource {
- fn poll_read(
- &mut self,
- cx: &mut Context,
- buf: &mut [u8],
- ) -> Poll<Result<usize, AnyError>> {
- use StreamResource::*;
- let f: &mut dyn UnpinAsyncRead = match self {
- FsFile(Some((f, _))) => f,
- FsFile(None) => return Poll::Ready(Err(resource_unavailable())),
- TcpStream(Some(f)) => f,
- #[cfg(not(windows))]
- UnixStream(f) => f,
- ClientTlsStream(f) => f,
- ServerTlsStream(f) => f,
- ChildStdout(f) => f,
- ChildStderr(f) => f,
- _ => return Err(bad_resource_id()).into(),
- };
- let v = ready!(Pin::new(f).poll_read(cx, buf))?;
- Ok(v).into()
+impl Resource for StreamResource {
+ fn name(&self) -> Cow<str> {
+ self.name.clone().into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
}
}
@@ -263,92 +485,26 @@ pub fn op_read(
})
} else {
let mut zero_copy = zero_copy[0].clone();
- MinimalOp::Async(
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let resource_holder = state
+ MinimalOp::Async({
+ async move {
+ let resource = state
+ .borrow()
.resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
+ .get_any(rid as u32)
.ok_or_else(bad_resource_id)?;
-
- let mut task_tracker_id: Option<usize> = None;
- let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy)
+ let nread = if let Some(stream) =
+ resource.downcast_rc::<TcpStreamResource>()
{
- Poll::Ready(t) => {
- if let Some(id) = task_tracker_id {
- resource_holder.untrack_task(id);
- }
- t
- }
- Poll::Pending => {
- task_tracker_id.replace(resource_holder.track_task(cx)?);
- return Poll::Pending;
- }
- }?;
- Poll::Ready(Ok(nread as i32))
- })
- .boxed_local(),
- )
- }
-}
-
-/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
-/// but uses an `AnyError` error instead of `std::io:Error`
-pub trait DenoAsyncWrite {
- fn poll_write(
- &mut self,
- cx: &mut Context,
- buf: &[u8],
- ) -> Poll<Result<usize, AnyError>>;
-
- fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
-
- fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
-}
-
-impl DenoAsyncWrite for StreamResource {
- fn poll_write(
- &mut self,
- cx: &mut Context,
- buf: &[u8],
- ) -> Poll<Result<usize, AnyError>> {
- use StreamResource::*;
- let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(Some((f, _))) => f,
- FsFile(None) => return Poll::Pending,
- TcpStream(Some(f)) => f,
- #[cfg(not(windows))]
- UnixStream(f) => f,
- ClientTlsStream(f) => f,
- ServerTlsStream(f) => f,
- ChildStdin(f) => f,
- _ => return Err(bad_resource_id()).into(),
- };
-
- let v = ready!(Pin::new(f).poll_write(cx, buf))?;
- Ok(v).into()
- }
-
- fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> {
- use StreamResource::*;
- let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(Some((f, _))) => f,
- FsFile(None) => return Poll::Pending,
- TcpStream(Some(f)) => f,
- #[cfg(not(windows))]
- UnixStream(f) => f,
- ClientTlsStream(f) => f,
- ServerTlsStream(f) => f,
- ChildStdin(f) => f,
- _ => return Err(bad_resource_id()).into(),
- };
-
- ready!(Pin::new(f).poll_flush(cx))?;
- Ok(()).into()
- }
-
- fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> {
- unimplemented!()
+ stream.read(&mut zero_copy).await?
+ } else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ stream.clone().read(&mut zero_copy).await?
+ } else {
+ return Err(bad_resource_id());
+ };
+ Ok(nread as i32)
+ }
+ .boxed_local()
+ })
}
}
@@ -381,93 +537,76 @@ pub fn op_write(
})
} else {
let zero_copy = zero_copy[0].clone();
- MinimalOp::Async(
+ MinimalOp::Async({
async move {
- let nwritten = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource_holder = state
- .resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(bad_resource_id)?;
- resource_holder.resource.poll_write(cx, &zero_copy)
- })
- .await?;
-
- // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
- // and the reasons for the need to explicitly flush are not fully known.
- // Figure out why it's needed and preferably remove it.
- // https://github.com/denoland/deno/issues/3565
- poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource_holder = state
- .resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(bad_resource_id)?;
- resource_holder.resource.poll_flush(cx)
- })
- .await?;
-
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+ let nwritten = if let Some(stream) =
+ resource.downcast_rc::<TcpStreamResource>()
+ {
+ stream.write(&zero_copy).await?
+ } else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ stream.clone().write(&zero_copy).await?
+ } else {
+ return Err(bad_resource_id());
+ };
Ok(nwritten as i32)
}
- .boxed_local(),
- )
+ .boxed_local()
+ })
}
}
-/// Helper function for operating on a std::fs::File stored in the resource table.
-///
-/// We store file system file resources as tokio::fs::File, so this is a little
-/// utility function that gets a std::fs:File when you need to do blocking
-/// operations.
-///
-/// Returns ErrorKind::Busy if the resource is being used by another op.
pub fn std_file_resource<F, T>(
state: &mut OpState,
rid: u32,
mut f: F,
) -> Result<T, AnyError>
where
- F: FnMut(
- Result<&mut std::fs::File, &mut StreamResource>,
- ) -> Result<T, AnyError>,
+ F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>,
{
// First we look up the rid in the resource table.
- let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid);
- if let Some(ref mut resource_holder) = r {
- // Sync write only works for FsFile. It doesn't make sense to do this
- // for non-blocking sockets. So we error out if not FsFile.
- match &mut resource_holder.resource {
- StreamResource::FsFile(option_file_metadata) => {
- // The object in the resource table is a tokio::fs::File - but in
- // order to do a blocking write on it, we must turn it into a
- // std::fs::File. Hopefully this code compiles down to nothing.
- if let Some((tokio_file, metadata)) = option_file_metadata.take() {
- match tokio_file.try_into_std() {
- Ok(mut std_file) => {
- let result = f(Ok(&mut std_file));
- // Turn the std_file handle back into a tokio file, put it back
- // in the resource table.
- let tokio_file = tokio::fs::File::from_std(std_file);
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- // return the result.
- result
- }
- Err(tokio_file) => {
- // This function will return an error containing the file if
- // some operation is in-flight.
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- Err(resource_unavailable())
- }
- }
- } else {
- Err(resource_unavailable())
- }
+ let resource = state
+ .resource_table
+ .get::<StreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // Sync write only works for FsFile. It doesn't make sense to do this
+ // for non-blocking sockets. So we error out if not FsFile.
+ if resource.fs_file.is_none() {
+ return f(Err(()));
+ }
+
+ // The object in the resource table is a tokio::fs::File - but in
+ // order to do a blocking write on it, we must turn it into a
+ // std::fs::File. Hopefully this code compiles down to nothing.
+
+ let fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ if let Some(mut fs_file) = fs_file_resource {
+ let tokio_file = fs_file.0.take().unwrap();
+ match tokio_file.try_into_std() {
+ Ok(mut std_file) => {
+ let result = f(Ok(&mut std_file));
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ fs_file.0 = Some(tokio_file);
+ // return the result.
+ result
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ fs_file.0 = Some(tokio_file);
+ Err(resource_unavailable())
}
- _ => f(Err(&mut resource_holder.resource)),
}
} else {
- Err(bad_resource_id())
+ Err(resource_unavailable())
}
}
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 8770ef103..a4bda585b 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::StreamResource;
-use crate::ops::io::StreamResourceHolder;
+use crate::ops::io::FullDuplexResource;
+use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -11,21 +11,24 @@ use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
-use std::task::Context;
-use std::task::Poll;
+use tokio::net::udp;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -33,12 +36,14 @@ use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
#[cfg(unix)]
+use crate::ops::io::StreamResource;
+#[cfg(unix)]
use std::path::Path;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_accept", op_accept);
super::reg_json_async(rt, "op_connect", op_connect);
- super::reg_json_sync(rt, "op_shutdown", op_shutdown);
+ super::reg_json_async(rt, "op_shutdown", op_shutdown);
super::reg_json_sync(rt, "op_listen", op_listen);
super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
@@ -57,39 +62,31 @@ async fn accept_tcp(
) -> Result<Value, AnyError> {
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<TcpListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(AnyError::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TcpListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
}
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
+ })?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state.borrow_mut();
- let rid = state.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
+ let rid = state
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
@@ -138,18 +135,17 @@ async fn receive_udp(
let rid = args.rid as u32;
- let receive_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- let socket = &mut resource.socket;
- socket
- .poll_recv_from(cx, &mut zero_copy)
- .map_err(AnyError::from)
- });
- let (size, remote_addr) = receive_fut.await?;
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let (size, remote_addr) = resource
+ .rd_borrow_mut()
+ .await
+ .recv_from(&mut zero_copy)
+ .try_or_cancel(resource.cancel_handle())
+ .await?;
Ok(json!({
"size": size,
"remoteAddr": {
@@ -207,19 +203,18 @@ async fn op_datagram_send(
.check_net(&args.hostname, args.port)?;
}
let addr = resolve_addr(&args.hostname, args.port).await?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid as u32)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- resource
- .socket
- .poll_send_to(cx, &zero_copy, &addr)
- .map_ok(|byte_length| json!(byte_length))
- .map_err(AnyError::from)
- })
- .await
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid as u32)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let byte_length = resource
+ .wr_borrow_mut()
+ .await
+ .send_to(&zero_copy, &addr)
+ .await?;
+ Ok(json!(byte_length))
}
#[cfg(unix)]
SendArgs {
@@ -232,18 +227,17 @@ async fn op_datagram_send(
let s = state.borrow();
s.borrow::<Permissions>().check_write(&address_path)?;
}
- let mut state = state.borrow_mut();
let resource = state
+ .borrow()
.resource_table
- .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
+ .get::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
- let socket = &mut resource.socket;
- let byte_length = socket
- .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
- .await?;
-
+ let mut socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let byte_length = socket.send_to(&zero_copy, address_path).await?;
Ok(json!(byte_length))
}
_ => Err(type_error("Wrong argument format!")),
@@ -279,12 +273,9 @@ async fn op_connect(
let remote_addr = tcp_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let rid = state_.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
+ let rid = state_
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
@@ -317,12 +308,8 @@ async fn op_connect(
let remote_addr = unix_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let rid = state_.resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
+ let resource = StreamResource::unix_stream(unix_stream);
+ let rid = state_.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
@@ -345,12 +332,12 @@ struct ShutdownArgs {
how: i32,
}
-fn op_shutdown(
- state: &mut OpState,
+async fn op_shutdown(
+ state: Rc<RefCell<OpState>>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
+ _zero_copy: BufVec,
) -> Result<Value, AnyError> {
- super::check_unstable(state, "Deno.shutdown");
+ super::check_unstable2(&state, "Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@@ -358,80 +345,61 @@ fn op_shutdown(
let how = args.how;
let shutdown_mode = match how {
- 0 => Shutdown::Read,
+ 0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
};
- let resource_holder = state
+ let resource = state
+ .borrow()
.resource_table
- .get_mut::<StreamResourceHolder>(rid)
+ .get_any(rid)
.ok_or_else(bad_resource_id)?;
- match resource_holder.resource {
- StreamResource::TcpStream(Some(ref mut stream)) => {
- TcpStream::shutdown(stream, shutdown_mode)?;
- }
- #[cfg(unix)]
- StreamResource::UnixStream(ref mut stream) => {
- net_unix::UnixStream::shutdown(stream, shutdown_mode)?;
+ if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
+ let wr = stream.wr_borrow_mut().await;
+ TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
+ return Ok(json!({}));
+ }
+
+ #[cfg(unix)]
+ if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ if stream.unix_stream.is_some() {
+ let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
+ return Ok(json!({}));
}
- _ => return Err(bad_resource_id()),
}
- Ok(json!({}))
+ Err(bad_resource_id())
}
-#[allow(dead_code)]
struct TcpListenerResource {
- listener: TcpListener,
- waker: Option<futures::task::AtomicWaker>,
- local_addr: SocketAddr,
+ listener: AsyncRefCell<TcpListener>,
+ cancel: CancelHandle,
}
-impl Drop for TcpListenerResource {
- fn drop(&mut self) {
- self.wake_task();
+impl Resource for TcpListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tcpListener".into()
}
-}
-
-impl TcpListenerResource {
- /// Track the current task so future awaiting for connection
- /// can be notified when listener is closed.
- ///
- /// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
- // Currently, we only allow tracking a single accept task for a listener.
- // This might be changed in the future with multiple workers.
- // Caveat: TcpListener by itself also only tracks an accept task at a time.
- // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.waker.is_some() {
- return Err(custom_error("Busy", "Another accept task is ongoing"));
- }
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- self.waker.replace(waker);
- Ok(())
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
+}
- /// Notifies a task when listener is closed so accept future can resolve.
- pub fn wake_task(&mut self) {
- if let Some(waker) = self.waker.as_ref() {
- waker.wake();
- }
- }
+type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
- /// Stop tracking a task.
- /// Happens when the task is done and thus no further tracking is needed.
- pub fn untrack_task(&mut self) {
- if self.waker.is_some() {
- self.waker.take();
- }
+impl Resource for UdpSocketResource {
+ fn name(&self) -> Cow<str> {
+ "udpSocket".into()
}
-}
-struct UdpSocketResource {
- socket: UdpSocket,
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops()
+ }
}
#[derive(Deserialize)]
@@ -463,13 +431,10 @@ fn listen_tcp(
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
- listener,
- waker: None,
- local_addr,
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("tcpListener", Box::new(listener_resource));
+ let rid = state.resource_table.add(listener_resource);
Ok((rid, local_addr))
}
@@ -481,10 +446,8 @@ fn listen_udp(
let std_socket = std::net::UdpSocket::bind(&addr)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource { socket };
- let rid = state
- .resource_table
- .add("udpSocket", Box::new(socket_resource));
+ let socket_resource = UdpSocketResource::new(socket.split());
+ let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))
}
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
index 4c416a5a4..23981a7f1 100644
--- a/runtime/ops/net_unix.rs
+++ b/runtime/ops/net_unix.rs
@@ -1,34 +1,59 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops::io::StreamResource;
-use crate::ops::io::StreamResourceHolder;
use crate::ops::net::AcceptArgs;
use crate::ops::net::ReceiveArgs;
use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::remove_file;
use std::os::unix;
use std::path::Path;
use std::rc::Rc;
-use std::task::Poll;
use tokio::net::UnixDatagram;
use tokio::net::UnixListener;
pub use tokio::net::UnixStream;
struct UnixListenerResource {
- listener: UnixListener,
+ listener: AsyncRefCell<UnixListener>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UnixListenerResource {
+ fn name(&self) -> Cow<str> {
+ "unixListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
pub struct UnixDatagramResource {
- pub socket: UnixDatagram,
- pub local_addr: unix::net::SocketAddr,
+ pub socket: AsyncRefCell<UnixDatagram>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for UnixDatagramResource {
+ fn name(&self) -> Cow<str> {
+ "unixDatagram".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
#[derive(Deserialize)]
@@ -43,38 +68,23 @@ pub(crate) async fn accept_unix(
) -> Result<Value, AnyError> {
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<UnixListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- use deno_core::futures::StreamExt;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(stream)) => {
- //listener_resource.untrack_task();
- Poll::Ready(stream)
- }
- Poll::Ready(None) => todo!(),
- Poll::Pending => {
- //listener_resource.track_task(cx)?;
- Poll::Pending
- }
- }
- .map_err(AnyError::from)
- });
- let unix_stream = accept_fut.await?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (unix_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
+ let resource = StreamResource::unix_stream(unix_stream);
let mut state = state.borrow_mut();
- let rid = state.resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
@@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet(
let rid = args.rid as u32;
let mut buf = bufs.into_iter().next().unwrap();
- let mut state = state.borrow_mut();
let resource = state
+ .borrow()
.resource_table
- .get_mut::<UnixDatagramResource>(rid)
+ .get::<UnixDatagramResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
+ let mut socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (size, remote_addr) =
+ socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
Ok(json!({
"size": size,
"remoteAddr": {
@@ -122,10 +137,11 @@ pub fn listen_unix(
}
let listener = UnixListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
- let listener_resource = UnixListenerResource { listener };
- let rid = state
- .resource_table
- .add("unixListener", Box::new(listener_resource));
+ let listener_resource = UnixListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
Ok((rid, local_addr))
}
@@ -140,12 +156,10 @@ pub fn listen_unix_packet(
let socket = UnixDatagram::bind(&addr)?;
let local_addr = socket.local_addr()?;
let datagram_resource = UnixDatagramResource {
- socket,
- local_addr: local_addr.clone(),
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("unixDatagram", Box::new(datagram_resource));
+ let rid = state.resource_table.add(datagram_resource);
Ok((rid, local_addr))
}
diff --git a/runtime/ops/plugin.rs b/runtime/ops/plugin.rs
index 1f3669b6f..953d6f7d2 100644
--- a/runtime/ops/plugin.rs
+++ b/runtime/ops/plugin.rs
@@ -14,9 +14,11 @@ use deno_core::Op;
use deno_core::OpAsyncFuture;
use deno_core::OpId;
use deno_core::OpState;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use dlopen::symbor::Library;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::pin::Pin;
@@ -53,9 +55,7 @@ pub fn op_open_plugin(
let rid;
let deno_plugin_init;
{
- rid = state
- .resource_table
- .add("plugin", Box::new(plugin_resource));
+ rid = state.resource_table.add(plugin_resource);
deno_plugin_init = *unsafe {
state
.resource_table
@@ -77,6 +77,12 @@ struct PluginResource {
lib: Rc<Library>,
}
+impl Resource for PluginResource {
+ fn name(&self) -> Cow<str> {
+ "plugin".into()
+ }
+}
+
impl PluginResource {
fn new(lib: &Rc<Library>) -> Self {
Self { lib: lib.clone() }
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index 67b3d0761..b46627e21 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -1,19 +1,22 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::io::{std_file_resource, StreamResource, StreamResourceHolder};
+use super::io::{std_file_resource, StreamResource};
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
-use deno_core::futures::future::FutureExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncMutFuture;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::rc::Rc;
use tokio::process::Command;
@@ -61,7 +64,19 @@ struct RunArgs {
}
struct ChildResource {
- child: tokio::process::Child,
+ child: AsyncRefCell<tokio::process::Child>,
+}
+
+impl Resource for ChildResource {
+ fn name(&self) -> Cow<str> {
+ "child".into()
+ }
+}
+
+impl ChildResource {
+ fn borrow_mut(self: Rc<Self>) -> AsyncMutFuture<tokio::process::Child> {
+ RcRef::map(self, |r| &r.child).borrow_mut()
+ }
}
fn op_run(
@@ -117,12 +132,9 @@ fn op_run(
let stdin_rid = match child.stdin.take() {
Some(child_stdin) => {
- let rid = state.resource_table.add(
- "childStdin",
- Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
- child_stdin,
- ))),
- );
+ let rid = state
+ .resource_table
+ .add(StreamResource::child_stdin(child_stdin));
Some(rid)
}
None => None,
@@ -130,12 +142,9 @@ fn op_run(
let stdout_rid = match child.stdout.take() {
Some(child_stdout) => {
- let rid = state.resource_table.add(
- "childStdout",
- Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
- child_stdout,
- ))),
- );
+ let rid = state
+ .resource_table
+ .add(StreamResource::child_stdout(child_stdout));
Some(rid)
}
None => None,
@@ -143,19 +152,18 @@ fn op_run(
let stderr_rid = match child.stderr.take() {
Some(child_stderr) => {
- let rid = state.resource_table.add(
- "childStderr",
- Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
- child_stderr,
- ))),
- );
+ let rid = state
+ .resource_table
+ .add(StreamResource::child_stderr(child_stderr));
Some(rid)
}
None => None,
};
- let child_resource = ChildResource { child };
- let child_rid = state.resource_table.add("child", Box::new(child_resource));
+ let child_resource = ChildResource {
+ child: AsyncRefCell::new(child),
+ };
+ let child_rid = state.resource_table.add(child_resource);
Ok(json!({
"rid": child_rid,
@@ -185,17 +193,13 @@ async fn op_run_status(
s.borrow::<Permissions>().check_run()?;
}
- let run_status = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let child_resource = state
- .resource_table
- .get_mut::<ChildResource>(rid)
- .ok_or_else(bad_resource_id)?;
- let child = &mut child_resource.child;
- child.poll_unpin(cx).map_err(AnyError::from)
- })
- .await?;
-
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ChildResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut child = resource.borrow_mut().await;
+ let run_status = (&mut *child).await?;
let code = run_status.code();
#[cfg(unix)]
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs
index be6bc0a35..b3891792c 100644
--- a/runtime/ops/signal.rs
+++ b/runtime/ops/signal.rs
@@ -11,15 +11,23 @@ use std::rc::Rc;
#[cfg(unix)]
use deno_core::error::bad_resource_id;
#[cfg(unix)]
-use deno_core::futures::future::poll_fn;
-#[cfg(unix)]
use deno_core::serde_json;
#[cfg(unix)]
use deno_core::serde_json::json;
#[cfg(unix)]
+use deno_core::AsyncRefCell;
+#[cfg(unix)]
+use deno_core::CancelFuture;
+#[cfg(unix)]
+use deno_core::CancelHandle;
+#[cfg(unix)]
+use deno_core::RcRef;
+#[cfg(unix)]
+use deno_core::Resource;
+#[cfg(unix)]
use serde::Deserialize;
#[cfg(unix)]
-use std::task::Waker;
+use std::borrow::Cow;
#[cfg(unix)]
use tokio::signal::unix::{signal, Signal, SignalKind};
@@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
#[cfg(unix)]
/// The resource for signal stream.
/// The second element is the waker of polling future.
-pub struct SignalStreamResource(pub Signal, pub Option<Waker>);
+struct SignalStreamResource {
+ signal: AsyncRefCell<Signal>,
+ cancel: CancelHandle,
+}
+
+#[cfg(unix)]
+impl Resource for SignalStreamResource {
+ fn name(&self) -> Cow<str> {
+ "signal".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
#[cfg(unix)]
#[derive(Deserialize)]
@@ -54,13 +76,13 @@ fn op_signal_bind(
) -> Result<Value, AnyError> {
super::check_unstable(state, "Deno.signal");
let args: BindSignalArgs = serde_json::from_value(args)?;
- let rid = state.resource_table.add(
- "signal",
- Box::new(SignalStreamResource(
+ let resource = SignalStreamResource {
+ signal: AsyncRefCell::new(
signal(SignalKind::from_raw(args.signo)).expect(""),
- None,
- )),
- );
+ ),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
}))
@@ -76,18 +98,18 @@ async fn op_signal_poll(
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let future = poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- if let Some(mut signal) =
- state.resource_table.get_mut::<SignalStreamResource>(rid)
- {
- signal.1 = Some(cx.waker().clone());
- return signal.0.poll_recv(cx);
- }
- std::task::Poll::Ready(None)
- });
- let result = future.await;
- Ok(json!({ "done": result.is_none() }))
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<SignalStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await;
+
+ match signal.recv().or_cancel(cancel).await {
+ Ok(result) => Ok(json!({ "done": result.is_none() })),
+ Err(_) => Ok(json!({ "done": true })),
+ }
}
#[cfg(unix)]
@@ -99,14 +121,6 @@ pub fn op_signal_unbind(
super::check_unstable(state, "Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let resource = state.resource_table.get_mut::<SignalStreamResource>(rid);
- if let Some(signal) = resource {
- if let Some(waker) = &signal.1 {
- // Wakes up the pending poll if exists.
- // This prevents the poll future from getting stuck forever.
- waker.clone().wake();
- }
- }
state
.resource_table
.close(rid)
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
index b59650ab0..0630747ed 100644
--- a/runtime/ops/tls.rs
+++ b/runtime/ops/tls.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::io::{StreamResource, StreamResourceHolder};
+use super::io::StreamResource;
+use super::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -8,25 +9,26 @@ use deno_core::error::bad_resource;
use deno_core::error::bad_resource_id;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::fs::File;
use std::io::BufReader;
-use std::net::SocketAddr;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@@ -85,60 +87,53 @@ async fn op_start_tls(
permissions.check_read(Path::new(&path))?;
}
}
- let mut resource_holder = {
- let mut state_ = state.borrow_mut();
- match state_.resource_table.remove::<StreamResourceHolder>(rid) {
- Some(resource) => *resource,
- None => return Err(bad_resource_id()),
- }
- };
- if let StreamResource::TcpStream(ref mut tcp_stream) =
- resource_holder.resource
- {
- let tcp_stream = tcp_stream.take().unwrap();
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
- let mut config = ClientConfig::new();
- config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
- if let Some(path) = cert_file {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- config.root_store.add_pem_file(reader).unwrap();
- }
+ let resource_rc = state
+ .borrow_mut()
+ .resource_table
+ .take::<TcpStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
- let tls_connector = TlsConnector::from(Arc::new(config));
- let dnsname =
- DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
- let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
-
- let rid = {
- let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "clientTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
- Box::new(tls_stream),
- ))),
- )
- };
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": "tcp",
- },
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": "tcp",
- }
- }))
- } else {
- Err(bad_resource_id())
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
}
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(StreamResource::client_tls_stream(tls_stream))
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": "tcp",
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "tcp",
+ }
+ }))
}
async fn op_connect_tls(
@@ -180,12 +175,9 @@ async fn op_connect_tls(
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let rid = {
let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "clientTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
- Box::new(tls_stream),
- ))),
- )
+ state_
+ .resource_table
+ .add(StreamResource::client_tls_stream(tls_stream))
};
Ok(json!({
"rid": rid,
@@ -256,51 +248,19 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
Ok(keys)
}
-#[allow(dead_code)]
pub struct TlsListenerResource {
- listener: TcpListener,
+ listener: AsyncRefCell<TcpListener>,
tls_acceptor: TlsAcceptor,
- waker: Option<futures::task::AtomicWaker>,
- local_addr: SocketAddr,
+ cancel: CancelHandle,
}
-impl Drop for TlsListenerResource {
- fn drop(&mut self) {
- self.wake_task();
+impl Resource for TlsListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tlsListener".into()
}
-}
-
-impl TlsListenerResource {
- /// Track the current task so future awaiting for connection
- /// can be notified when listener is closed.
- ///
- /// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
- // Currently, we only allow tracking a single accept task for a listener.
- // This might be changed in the future with multiple workers.
- // Caveat: TcpListener by itself also only tracks an accept task at a time.
- // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.waker.is_some() {
- return Err(custom_error("Busy", "Another accept task is ongoing"));
- }
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- self.waker.replace(waker);
- Ok(())
- }
-
- /// Notifies a task when listener is closed so accept future can resolve.
- pub fn wake_task(&mut self) {
- if let Some(waker) = self.waker.as_ref() {
- waker.wake();
- }
- }
-
- /// Stop tracking a task.
- /// Happens when the task is done and thus no further tracking is needed.
- pub fn untrack_task(&mut self) {
- self.waker.take();
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
}
@@ -340,15 +300,12 @@ fn op_listen_tls(
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
- listener,
+ listener: AsyncRefCell::new(listener),
tls_acceptor,
- waker: None,
- local_addr,
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("tlsListener", Box::new(tls_listener_resource));
+ let rid = state.resource_table.add(tls_listener_resource);
Ok(json!({
"rid": rid,
@@ -372,50 +329,46 @@ async fn op_accept_tls(
) -> Result<Value, AnyError> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<TlsListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(AnyError::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
}
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
+ })?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let tls_acceptor = {
- let state_ = state.borrow();
- let resource = state_
- .resource_table
- .get::<TlsListenerResource>(rid)
- .ok_or_else(bad_resource_id)
- .expect("Can't find tls listener");
- resource.tls_acceptor.clone()
- };
- let tls_stream = tls_acceptor.accept(tcp_stream).await?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let tls_acceptor = resource.tls_acceptor.clone();
+ let tls_stream = tls_acceptor
+ .accept(tcp_stream)
+ .try_or_cancel(cancel)
+ .await?;
+
let rid = {
let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "serverTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
- Box::new(tls_stream),
- ))),
- )
+ state_
+ .resource_table
+ .add(StreamResource::server_tls_stream(tls_stream))
};
+
Ok(json!({
"rid": rid,
"localAddr": {
diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs
index ad66bcf1a..05536b429 100644
--- a/runtime/ops/tty.rs
+++ b/runtime/ops/tty.rs
@@ -2,7 +2,6 @@
use super::io::std_file_resource;
use super::io::StreamResource;
-use super::io::StreamResourceHolder;
use deno_core::error::bad_resource_id;
use deno_core::error::not_supported;
use deno_core::error::resource_unavailable;
@@ -11,6 +10,7 @@ use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::OpState;
+use deno_core::RcRef;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use serde::Serialize;
@@ -88,48 +88,47 @@ fn op_set_raw(
use winapi::shared::minwindef::FALSE;
use winapi::um::{consoleapi, handleapi};
- let resource_holder =
- state.resource_table.get_mut::<StreamResourceHolder>(rid);
- if resource_holder.is_none() {
- return Err(bad_resource_id());
- }
+ let resource = state
+ .resource_table
+ .get::<StreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
if cbreak {
return Err(not_supported());
}
- let resource_holder = resource_holder.unwrap();
-
- // For now, only stdin.
- let handle = match &mut resource_holder.resource {
- StreamResource::FsFile(ref mut option_file_metadata) => {
- if let Some((tokio_file, metadata)) = option_file_metadata.take() {
- match tokio_file.try_into_std() {
- Ok(std_file) => {
- let raw_handle = std_file.as_raw_handle();
- // Turn the std_file handle back into a tokio file, put it back
- // in the resource table.
- let tokio_file = tokio::fs::File::from_std(std_file);
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- // return the result.
- raw_handle
- }
- Err(tokio_file) => {
- // This function will return an error containing the file if
- // some operation is in-flight.
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- return Err(resource_unavailable());
- }
- }
- } else {
- return Err(resource_unavailable());
+
+ if resource.fs_file.is_none() {
+ return Err(bad_resource_id());
+ }
+
+ let fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ let handle_result = if let Some(mut fs_file) = fs_file_resource {
+ let tokio_file = fs_file.0.take().unwrap();
+ match tokio_file.try_into_std() {
+ Ok(std_file) => {
+ let raw_handle = std_file.as_raw_handle();
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ fs_file.0 = Some(tokio_file);
+ // return the result.
+ Ok(raw_handle)
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ fs_file.0 = Some(tokio_file);
+ Err(resource_unavailable())
}
}
- _ => {
- return Err(bad_resource_id());
- }
+ } else {
+ Err(resource_unavailable())
};
+ let handle = handle_result?;
+
if handle == handleapi::INVALID_HANDLE_VALUE {
return Err(Error::last_os_error().into());
} else if handle.is_null() {
@@ -156,24 +155,31 @@ fn op_set_raw(
{
use std::os::unix::io::AsRawFd;
- let resource_holder =
- state.resource_table.get_mut::<StreamResourceHolder>(rid);
- if resource_holder.is_none() {
- return Err(bad_resource_id());
+ let resource = state
+ .resource_table
+ .get::<StreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ if resource.fs_file.is_none() {
+ return Err(not_supported());
}
- if is_raw {
- let (raw_fd, maybe_tty_mode) =
- match &mut resource_holder.unwrap().resource {
- StreamResource::FsFile(Some((f, ref mut metadata))) => {
- (f.as_raw_fd(), &mut metadata.tty.mode)
- }
- StreamResource::FsFile(None) => return Err(resource_unavailable()),
- _ => {
- return Err(not_supported());
- }
- };
+ let maybe_fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ if maybe_fs_file_resource.is_none() {
+ return Err(resource_unavailable());
+ }
+ let mut fs_file_resource = maybe_fs_file_resource.unwrap();
+ if fs_file_resource.0.is_none() {
+ return Err(resource_unavailable());
+ }
+
+ let raw_fd = fs_file_resource.0.as_ref().unwrap().as_raw_fd();
+ let maybe_tty_mode = &mut fs_file_resource.1.as_mut().unwrap().tty.mode;
+
+ if is_raw {
if maybe_tty_mode.is_none() {
// Save original mode.
let original_mode = termios::tcgetattr(raw_fd)?;
@@ -199,28 +205,14 @@ fn op_set_raw(
raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1;
raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0;
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
- Ok(json!({}))
} else {
// Try restore saved mode.
- let (raw_fd, maybe_tty_mode) =
- match &mut resource_holder.unwrap().resource {
- StreamResource::FsFile(Some((f, ref mut metadata))) => {
- (f.as_raw_fd(), &mut metadata.tty.mode)
- }
- StreamResource::FsFile(None) => {
- return Err(resource_unavailable());
- }
- _ => {
- return Err(bad_resource_id());
- }
- };
-
if let Some(mode) = maybe_tty_mode.take() {
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
}
-
- Ok(json!({}))
}
+
+ Ok(json!({}))
}
}
@@ -255,7 +247,6 @@ fn op_isatty(
Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
}
}
- Err(StreamResource::FsFile(_)) => unreachable!(),
_ => Ok(false),
})?;
Ok(json!(isatty))
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs
index a8c591a33..d805f307b 100644
--- a/runtime/ops/websocket.rs
+++ b/runtime/ops/websocket.rs
@@ -1,18 +1,23 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::permissions::Permissions;
-use core::task::Poll;
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
+use deno_core::futures::stream::SplitSink;
+use deno_core::futures::stream::SplitStream;
+use deno_core::futures::SinkExt;
use deno_core::futures::StreamExt;
-use deno_core::futures::{ready, SinkExt};
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::url;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::{serde_json, ZeroCopyBuf};
use http::{Method, Request, Uri};
use serde::Deserialize;
@@ -62,6 +67,22 @@ type MaybeTlsStream =
StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>;
type WsStream = WebSocketStream<MaybeTlsStream>;
+struct WsStreamResource {
+ tx: AsyncRefCell<SplitSink<WsStream, Message>>,
+ rx: AsyncRefCell<SplitStream<WsStream>>,
+ // When a `WsStreamResource` resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures are attached to this cancel handle.
+ cancel: CancelHandle,
+}
+
+impl Resource for WsStreamResource {
+ fn name(&self) -> Cow<str> {
+ "webSocketStream".into()
+ }
+}
+
+impl WsStreamResource {}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -165,10 +186,14 @@ pub async fn op_ws_create(
))
})?;
+ let (ws_tx, ws_rx) = stream.split();
+ let resource = WsStreamResource {
+ rx: AsyncRefCell::new(ws_rx),
+ tx: AsyncRefCell::new(ws_tx),
+ cancel: Default::default(),
+ };
let mut state = state.borrow_mut();
- let rid = state
- .resource_table
- .add("webSocketStream", Box::new(stream));
+ let rid = state.resource_table.add(resource);
let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
Some(header) => header.to_str().unwrap(),
@@ -202,30 +227,21 @@ pub async fn op_ws_send(
) -> Result<Value, AnyError> {
let args: SendArgs = serde_json::from_value(args)?;
- let mut maybe_msg = Some(match args.text {
+ let msg = match args.text {
Some(text) => Message::Text(text),
None => Message::Binary(bufs[0].to_vec()),
- });
+ };
let rid = args.rid;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let stream = state
- .resource_table
- .get_mut::<WsStream>(rid)
- .ok_or_else(bad_resource_id)?;
-
- // TODO(ry) Handle errors below instead of unwrap.
- // Need to map `TungsteniteError` to `AnyError`.
- ready!(stream.poll_ready_unpin(cx)).unwrap();
- if let Some(msg) = maybe_msg.take() {
- stream.start_send_unpin(msg).unwrap();
- }
- ready!(stream.poll_flush_unpin(cx)).unwrap();
-
- Poll::Ready(Ok(json!({})))
- })
- .await
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
+ tx.send(msg).await?;
+ eprintln!("sent!");
+ Ok(json!({}))
}
#[derive(Deserialize)]
@@ -243,33 +259,22 @@ pub async fn op_ws_close(
) -> Result<Value, AnyError> {
let args: CloseArgs = serde_json::from_value(args)?;
let rid = args.rid;
- let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
+ let msg = Message::Close(args.code.map(|c| CloseFrame {
code: CloseCode::from(c),
reason: match args.reason {
Some(reason) => Cow::from(reason),
None => Default::default(),
},
- })));
-
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let stream = state
- .resource_table
- .get_mut::<WsStream>(rid)
- .ok_or_else(bad_resource_id)?;
-
- // TODO(ry) Handle errors below instead of unwrap.
- // Need to map `TungsteniteError` to `AnyError`.
- ready!(stream.poll_ready_unpin(cx)).unwrap();
- if let Some(msg) = maybe_msg.take() {
- stream.start_send_unpin(msg).unwrap();
- }
- ready!(stream.poll_flush_unpin(cx)).unwrap();
- ready!(stream.poll_close_unpin(cx)).unwrap();
+ }));
- Poll::Ready(Ok(json!({})))
- })
- .await
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
+ tx.send(msg).await?;
+ Ok(json!({}))
}
#[derive(Deserialize)]
@@ -284,43 +289,41 @@ pub async fn op_ws_next_event(
_bufs: BufVec,
) -> Result<Value, AnyError> {
let args: NextEventArgs = serde_json::from_value(args)?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let stream = state
- .resource_table
- .get_mut::<WsStream>(args.rid)
- .ok_or_else(bad_resource_id)?;
- stream
- .poll_next_unpin(cx)
- .map(|val| {
- match val {
- Some(Ok(Message::Text(text))) => json!({
- "type": "string",
- "data": text
- }),
- Some(Ok(Message::Binary(data))) => {
- // TODO(ry): don't use json to send binary data.
- json!({
- "type": "binary",
- "data": data
- })
- }
- Some(Ok(Message::Close(Some(frame)))) => json!({
- "type": "close",
- "code": u16::from(frame.code),
- "reason": frame.reason.as_ref()
- }),
- Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
- Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
- Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
- Some(Err(_)) => json!({"type": "error"}),
- None => {
- state.resource_table.close(args.rid).unwrap();
- json!({"type": "closed"})
- }
- }
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(args.rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let val = rx.next().or_cancel(cancel).await?;
+ let res = match val {
+ Some(Ok(Message::Text(text))) => json!({
+ "type": "string",
+ "data": text
+ }),
+ Some(Ok(Message::Binary(data))) => {
+ // TODO(ry): don't use json to send binary data.
+ json!({
+ "type": "binary",
+ "data": data
})
- .map(Ok)
- })
- .await
+ }
+ Some(Ok(Message::Close(Some(frame)))) => json!({
+ "type": "close",
+ "code": u16::from(frame.code),
+ "reason": frame.reason.as_ref()
+ }),
+ Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
+ Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
+ Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
+ Some(Err(_)) => json!({"type": "error"}),
+ None => {
+ state.borrow_mut().resource_table.close(args.rid).unwrap();
+ json!({"type": "closed"})
+ }
+ };
+ Ok(res)
}
diff --git a/runtime/rt/30_net.js b/runtime/rt/30_net.js
index 9a71f0693..7009f6f8d 100644
--- a/runtime/rt/30_net.js
+++ b/runtime/rt/30_net.js
@@ -11,20 +11,16 @@
0: "Read",
1: "Write",
2: "ReadWrite",
- Read: 0,
+ Read: 0, // TODO: nonsense, remove me.
Write: 1,
ReadWrite: 2, // unused
};
function shutdown(rid, how) {
- core.jsonOpSync("op_shutdown", { rid, how });
- return Promise.resolve();
+ return core.jsonOpAsync("op_shutdown", { rid, how });
}
- function opAccept(
- rid,
- transport,
- ) {
+ function opAccept(rid, transport) {
return core.jsonOpAsync("op_accept", { rid, transport });
}
@@ -36,11 +32,7 @@
return core.jsonOpAsync("op_connect", args);
}
- function opReceive(
- rid,
- transport,
- zeroCopy,
- ) {
+ function opReceive(rid, transport, zeroCopy) {
return core.jsonOpAsync(
"op_datagram_receive",
{ rid, transport },
@@ -56,11 +48,7 @@
#rid = 0;
#remoteAddr = null;
#localAddr = null;
- constructor(
- rid,
- remoteAddr,
- localAddr,
- ) {
+ constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
this.#localAddr = localAddr;
@@ -149,11 +137,7 @@
#rid = 0;
#addr = null;
- constructor(
- rid,
- addr,
- bufSize = 1024,
- ) {
+ constructor(rid, addr, bufSize = 1024) {
this.#rid = rid;
this.#addr = addr;
this.bufSize = bufSize;
@@ -213,9 +197,7 @@
return new Listener(res.rid, res.localAddr);
}
- async function connect(
- options,
- ) {
+ async function connect(options) {
let res;
if (options.transport === "unix") {
diff --git a/runtime/rt/40_fs_events.js b/runtime/rt/40_fs_events.js
index a36adecba..a179e8c1b 100644
--- a/runtime/rt/40_fs_events.js
+++ b/runtime/rt/40_fs_events.js
@@ -24,6 +24,8 @@
} catch (error) {
if (error instanceof errors.BadResource) {
return { value: undefined, done: true };
+ } else if (error instanceof errors.Interrupted) {
+ return { value: undefined, done: true };
}
throw error;
}
diff --git a/runtime/rt/40_signals.js b/runtime/rt/40_signals.js
index 739c963fd..091afd66a 100644
--- a/runtime/rt/40_signals.js
+++ b/runtime/rt/40_signals.js
@@ -3,6 +3,7 @@
((window) => {
const core = window.Deno.core;
const { build } = window.__bootstrap.build;
+ const { errors } = window.__bootstrap.errors;
function bindSignal(signo) {
return core.jsonOpSync("op_signal_bind", { signo });
@@ -212,7 +213,15 @@
}
#pollSignal = async () => {
- const res = await pollSignal(this.#rid);
+ let res;
+ try {
+ res = await pollSignal(this.#rid);
+ } catch (error) {
+ if (error instanceof errors.BadResource) {
+ return true;
+ }
+ throw error;
+ }
return res.done;
};
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index db97e3604..c1713f815 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -256,15 +256,16 @@ impl WebWorker {
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
+ let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = ops::io::get_stdio();
if let Some(stream) = stdin {
- op_state.resource_table.add("stdin", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stdout {
- op_state.resource_table.add("stdout", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stderr {
- op_state.resource_table.add("stderr", Box::new(stream));
+ t.add(stream);
}
}
diff --git a/runtime/worker.rs b/runtime/worker.rs
index a0e63afad..adb525c4c 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -152,13 +152,13 @@ impl MainWorker {
let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = ops::io::get_stdio();
if let Some(stream) = stdin {
- t.add("stdin", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stdout {
- t.add("stdout", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stderr {
- t.add("stderr", Box::new(stream));
+ t.add(stream);
}
}