summaryrefslogtreecommitdiff
path: root/runtime
diff options
context:
space:
mode:
Diffstat (limited to 'runtime')
-rw-r--r--runtime/errors.rs6
-rw-r--r--runtime/ops/fs.rs20
-rw-r--r--runtime/ops/fs_events.rs58
-rw-r--r--runtime/ops/io.rs633
-rw-r--r--runtime/ops/net.rs255
-rw-r--r--runtime/ops/net_unix.rs106
-rw-r--r--runtime/ops/plugin.rs12
-rw-r--r--runtime/ops/process.rs74
-rw-r--r--runtime/ops/signal.rs74
-rw-r--r--runtime/ops/tls.rs245
-rw-r--r--runtime/ops/tty.rs127
-rw-r--r--runtime/ops/websocket.rs173
-rw-r--r--runtime/rt/30_net.js32
-rw-r--r--runtime/rt/40_fs_events.js2
-rw-r--r--runtime/rt/40_signals.js11
-rw-r--r--runtime/web_worker.rs7
-rw-r--r--runtime/worker.rs6
17 files changed, 968 insertions, 873 deletions
diff --git a/runtime/errors.rs b/runtime/errors.rs
index f8f71a859..f82d95ed8 100644
--- a/runtime/errors.rs
+++ b/runtime/errors.rs
@@ -169,6 +169,12 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.map(get_dlopen_error_class)
})
.or_else(|| {
+ e.downcast_ref::<deno_core::Canceled>().map(|e| {
+ let io_err: io::Error = e.to_owned().into();
+ get_io_error_class(&io_err)
+ })
+ })
+ .or_else(|| {
e.downcast_ref::<env::VarError>()
.map(get_env_var_error_class)
})
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs
index 865c5bcca..d6d7d7e78 100644
--- a/runtime/ops/fs.rs
+++ b/runtime/ops/fs.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::io::std_file_resource;
-use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
+use super::io::StreamResource;
use crate::fs_util::canonicalize_path;
use crate::permissions::Permissions;
use deno_core::error::custom_error;
@@ -185,13 +185,8 @@ fn op_open_sync(
let (path, open_options) = open_helper(state, args)?;
let std_file = open_options.open(path)?;
let tokio_file = tokio::fs::File::from_std(std_file);
- let rid = state.resource_table.add(
- "fsFile",
- Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
- tokio_file,
- FileMetadata::default(),
- ))))),
- );
+ let resource = StreamResource::fs_file(tokio_file);
+ let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
@@ -204,13 +199,8 @@ async fn op_open_async(
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(path)
.await?;
- let rid = state.borrow_mut().resource_table.add(
- "fsFile",
- Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
- tokio_file,
- FileMetadata::default(),
- ))))),
- );
+ let resource = StreamResource::fs_file(tokio_file);
+ let rid = state.borrow_mut().resource_table.add(resource);
Ok(json!(rid))
}
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
index 4832c915c..38661e1d4 100644
--- a/runtime/ops/fs_events.rs
+++ b/runtime/ops/fs_events.rs
@@ -3,12 +3,16 @@
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use notify::event::Event as NotifyEvent;
use notify::Error as NotifyError;
@@ -18,6 +22,7 @@ use notify::RecursiveMode;
use notify::Watcher;
use serde::Deserialize;
use serde::Serialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::path::PathBuf;
@@ -32,7 +37,18 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
struct FsEventsResource {
#[allow(unused)]
watcher: RecommendedWatcher,
- receiver: mpsc::Receiver<Result<FsEvent, AnyError>>,
+ receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>,
+ cancel: CancelHandle,
+}
+
+impl Resource for FsEventsResource {
+ fn name(&self) -> Cow<str> {
+ "fsEvents".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
/// Represents a file system event.
@@ -99,8 +115,12 @@ fn op_fs_events_open(
.check_read(&PathBuf::from(path))?;
watcher.watch(path, recursive_mode)?;
}
- let resource = FsEventsResource { watcher, receiver };
- let rid = state.resource_table.add("fsEvents", Box::new(resource));
+ let resource = FsEventsResource {
+ watcher,
+ receiver: AsyncRefCell::new(receiver),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
@@ -114,20 +134,18 @@ async fn op_fs_events_poll(
rid: u32,
}
let PollArgs { rid } = serde_json::from_value(args)?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let watcher = state
- .resource_table
- .get_mut::<FsEventsResource>(rid)
- .ok_or_else(bad_resource_id)?;
- watcher
- .receiver
- .poll_recv(cx)
- .map(|maybe_result| match maybe_result {
- Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
- Some(Err(err)) => Err(err),
- None => Ok(json!({ "done": true })),
- })
- })
- .await
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<FsEventsResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let maybe_result = receiver.recv().or_cancel(cancel).await?;
+ match maybe_result {
+ Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
+ Some(Err(err)) => Err(err),
+ None => Ok(json!({ "done": true })),
+ }
}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 0f8af905a..de56f5b55 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -7,26 +7,29 @@ use deno_core::error::bad_resource_id;
use deno_core::error::resource_unavailable;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
-use deno_core::futures::ready;
+use deno_core::AsyncMutFuture;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use std::borrow::Cow;
use std::cell::RefCell;
-use std::collections::HashMap;
-use std::pin::Pin;
use std::rc::Rc;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::task::Context;
-use std::task::Poll;
-use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWrite;
+use tokio::io::AsyncWriteExt;
+use tokio::net::tcp;
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
-#[cfg(not(windows))]
+#[cfg(unix)]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
@@ -94,26 +97,28 @@ pub fn init(rt: &mut JsRuntime) {
}
pub fn get_stdio() -> (
- Option<StreamResourceHolder>,
- Option<StreamResourceHolder>,
- Option<StreamResourceHolder>,
+ Option<StreamResource>,
+ Option<StreamResource>,
+ Option<StreamResource>,
) {
- let stdin = get_stdio_stream(&STDIN_HANDLE);
- let stdout = get_stdio_stream(&STDOUT_HANDLE);
- let stderr = get_stdio_stream(&STDERR_HANDLE);
+ let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin");
+ let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout");
+ let stderr = get_stdio_stream(&STDERR_HANDLE, "stderr");
(stdin, stdout, stderr)
}
fn get_stdio_stream(
handle: &Option<std::fs::File>,
-) -> Option<StreamResourceHolder> {
+ name: &str,
+) -> Option<StreamResource> {
match handle {
None => None,
Some(file_handle) => match file_handle.try_clone() {
- Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile(
- Some((tokio::fs::File::from_std(clone), FileMetadata::default())),
- ))),
+ Ok(clone) => {
+ let tokio_file = tokio::fs::File::from_std(clone);
+ Some(StreamResource::stdio(tokio_file, name))
+ }
Err(_e) => None,
},
}
@@ -137,100 +142,317 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}
-pub struct StreamResourceHolder {
- pub resource: StreamResource,
- waker: HashMap<usize, futures::task::AtomicWaker>,
- waker_counter: AtomicUsize,
+#[derive(Debug)]
+pub struct FullDuplexResource<R, W> {
+ rd: AsyncRefCell<R>,
+ wr: AsyncRefCell<W>,
+ // When a full-duplex resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures should be attached to this cancel handle.
+ cancel_handle: CancelHandle,
}
-impl StreamResourceHolder {
- pub fn new(resource: StreamResource) -> StreamResourceHolder {
- StreamResourceHolder {
- resource,
- // Atleast one task is expecter for the resource
- waker: HashMap::with_capacity(1),
- // Tracks wakers Ids
- waker_counter: AtomicUsize::new(0),
+impl<R: 'static, W: 'static> FullDuplexResource<R, W> {
+ pub fn new((rd, wr): (R, W)) -> Self {
+ Self {
+ rd: rd.into(),
+ wr: wr.into(),
+ cancel_handle: Default::default(),
}
}
-}
-impl Drop for StreamResourceHolder {
- fn drop(&mut self) {
- self.wake_tasks();
+ pub fn into_inner(self) -> (R, W) {
+ (self.rd.into_inner(), self.wr.into_inner())
+ }
+
+ pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> {
+ RcRef::map(self, |r| &r.rd).borrow_mut()
+ }
+
+ pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> {
+ RcRef::map(self, |r| &r.wr).borrow_mut()
+ }
+
+ pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
+ RcRef::map(self, |r| &r.cancel_handle)
+ }
+
+ pub fn cancel_read_ops(&self) {
+ self.cancel_handle.cancel()
}
}
-impl StreamResourceHolder {
- pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> {
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- // Its OK if it overflows
- let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
- self.waker.insert(task_waker_id, waker);
- Ok(task_waker_id)
+impl<R, W> FullDuplexResource<R, W>
+where
+ R: AsyncRead + Unpin + 'static,
+ W: AsyncWrite + Unpin + 'static,
+{
+ async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ let mut rd = self.rd_borrow_mut().await;
+ let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
}
- pub fn wake_tasks(&mut self) {
- for waker in self.waker.values() {
- waker.wake();
- }
+ async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ let nwritten = wr.write(buf).await?;
+ Ok(nwritten)
}
+}
- pub fn untrack_task(&mut self, task_waker_id: usize) {
- self.waker.remove(&task_waker_id);
+pub type TcpStreamResource =
+ FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>;
+
+impl Resource for TcpStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tcpStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
}
}
-pub enum StreamResource {
- FsFile(Option<(tokio::fs::File, FileMetadata)>),
- TcpStream(Option<tokio::net::TcpStream>),
- #[cfg(not(windows))]
- UnixStream(tokio::net::UnixStream),
- ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
- ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
- ChildStdin(tokio::process::ChildStdin),
- ChildStdout(tokio::process::ChildStdout),
- ChildStderr(tokio::process::ChildStderr),
+#[derive(Default)]
+pub struct StreamResource {
+ pub fs_file:
+ Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>,
+
+ #[cfg(unix)]
+ pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>,
+
+ child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>,
+
+ child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>,
+
+ child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>,
+
+ client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>,
+
+ server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>,
+
+ cancel: CancelHandle,
+ name: String,
+}
+
+impl std::fmt::Debug for StreamResource {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "StreamResource")
+ }
}
-trait UnpinAsyncRead: AsyncRead + Unpin {}
-trait UnpinAsyncWrite: AsyncWrite + Unpin {}
+impl StreamResource {
+ pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self {
+ Self {
+ fs_file: Some(AsyncRefCell::new((
+ Some(fs_file),
+ Some(FileMetadata::default()),
+ ))),
+ name: name.to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn fs_file(fs_file: tokio::fs::File) -> Self {
+ Self {
+ fs_file: Some(AsyncRefCell::new((
+ Some(fs_file),
+ Some(FileMetadata::default()),
+ ))),
+ name: "fsFile".to_string(),
+ ..Default::default()
+ }
+ }
+
+ #[cfg(unix)]
+ pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self {
+ Self {
+ unix_stream: Some(AsyncRefCell::new(unix_stream)),
+ name: "unixStream".to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn child_stdout(child: tokio::process::ChildStdout) -> Self {
+ Self {
+ child_stdout: Some(AsyncRefCell::new(child)),
+ name: "childStdout".to_string(),
+ ..Default::default()
+ }
+ }
-impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {}
-impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {}
+ pub fn child_stderr(child: tokio::process::ChildStderr) -> Self {
+ Self {
+ child_stderr: Some(AsyncRefCell::new(child)),
+ name: "childStderr".to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn child_stdin(child: tokio::process::ChildStdin) -> Self {
+ Self {
+ child_stdin: Some(AsyncRefCell::new(child)),
+ name: "childStdin".to_string(),
+ ..Default::default()
+ }
+ }
+
+ pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self {
+ Self {
+ client_tls_stream: Some(AsyncRefCell::new(stream)),
+ name: "clientTlsStream".to_string(),
+ ..Default::default()
+ }
+ }
-/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
-/// but uses an `AnyError` error instead of `std::io:Error`
-pub trait DenoAsyncRead {
- fn poll_read(
- &mut self,
- cx: &mut Context,
- buf: &mut [u8],
- ) -> Poll<Result<usize, AnyError>>;
+ pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self {
+ Self {
+ server_tls_stream: Some(AsyncRefCell::new(stream)),
+ name: "serverTlsStream".to_string(),
+ ..Default::default()
+ }
+ }
+
+ async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ // TODO(bartlomieju): in the future, it would be better for `StreamResource`
+ // to be an enum instead a struct with many `Option` fields, however I
+ // wasn't able to get it to work with `AsyncRefCell`s.
+ if self.fs_file.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.child_stdout.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?;
+ return Ok(nwritten);
+ } else if self.child_stdout.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut child_stdout =
+ RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = child_stdout.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ } else if self.child_stderr.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut child_stderr =
+ RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = child_stderr.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ } else if self.client_tls_stream.is_some() {
+ debug_assert!(self.server_tls_stream.is_none());
+ let mut client_tls_stream =
+ RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ } else if self.server_tls_stream.is_some() {
+ let mut server_tls_stream =
+ RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ }
+
+ #[cfg(unix)]
+ if self.unix_stream.is_some() {
+ let mut unix_stream =
+ RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ let nread = unix_stream.read(buf).try_or_cancel(cancel).await?;
+ return Ok(nread);
+ }
+
+ Err(bad_resource_id())
+ }
+
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ // TODO(bartlomieju): in the future, it would be better for `StreamResource`
+ // to be an enum instead a struct with many `Option` fields, however I
+ // wasn't able to get it to work with `AsyncRefCell`s.
+ if self.fs_file.is_some() {
+ debug_assert!(self.child_stdin.is_none());
+ debug_assert!(self.child_stdout.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?;
+ (*fs_file).0.as_mut().unwrap().flush().await?;
+ return Ok(nwritten);
+ } else if self.child_stdin.is_some() {
+ debug_assert!(self.child_stdout.is_none());
+ debug_assert!(self.child_stderr.is_none());
+ debug_assert!(self.server_tls_stream.is_none());
+ debug_assert!(self.client_tls_stream.is_none());
+ let mut child_stdin =
+ RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = child_stdin.write(buf).await?;
+ child_stdin.flush().await?;
+ return Ok(nwritten);
+ } else if self.client_tls_stream.is_some() {
+ debug_assert!(self.server_tls_stream.is_none());
+ let mut client_tls_stream =
+ RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = client_tls_stream.write(buf).await?;
+ client_tls_stream.flush().await?;
+ return Ok(nwritten);
+ } else if self.server_tls_stream.is_some() {
+ let mut server_tls_stream =
+ RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = server_tls_stream.write(buf).await?;
+ server_tls_stream.flush().await?;
+ return Ok(nwritten);
+ }
+
+ #[cfg(unix)]
+ if self.unix_stream.is_some() {
+ let mut unix_stream =
+ RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ let nwritten = unix_stream.write(buf).await?;
+ unix_stream.flush().await?;
+ return Ok(nwritten);
+ }
+
+ Err(bad_resource_id())
+ }
}
-impl DenoAsyncRead for StreamResource {
- fn poll_read(
- &mut self,
- cx: &mut Context,
- buf: &mut [u8],
- ) -> Poll<Result<usize, AnyError>> {
- use StreamResource::*;
- let f: &mut dyn UnpinAsyncRead = match self {
- FsFile(Some((f, _))) => f,
- FsFile(None) => return Poll::Ready(Err(resource_unavailable())),
- TcpStream(Some(f)) => f,
- #[cfg(not(windows))]
- UnixStream(f) => f,
- ClientTlsStream(f) => f,
- ServerTlsStream(f) => f,
- ChildStdout(f) => f,
- ChildStderr(f) => f,
- _ => return Err(bad_resource_id()).into(),
- };
- let v = ready!(Pin::new(f).poll_read(cx, buf))?;
- Ok(v).into()
+impl Resource for StreamResource {
+ fn name(&self) -> Cow<str> {
+ self.name.clone().into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
}
}
@@ -263,92 +485,26 @@ pub fn op_read(
})
} else {
let mut zero_copy = zero_copy[0].clone();
- MinimalOp::Async(
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let resource_holder = state
+ MinimalOp::Async({
+ async move {
+ let resource = state
+ .borrow()
.resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
+ .get_any(rid as u32)
.ok_or_else(bad_resource_id)?;
-
- let mut task_tracker_id: Option<usize> = None;
- let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy)
+ let nread = if let Some(stream) =
+ resource.downcast_rc::<TcpStreamResource>()
{
- Poll::Ready(t) => {
- if let Some(id) = task_tracker_id {
- resource_holder.untrack_task(id);
- }
- t
- }
- Poll::Pending => {
- task_tracker_id.replace(resource_holder.track_task(cx)?);
- return Poll::Pending;
- }
- }?;
- Poll::Ready(Ok(nread as i32))
- })
- .boxed_local(),
- )
- }
-}
-
-/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
-/// but uses an `AnyError` error instead of `std::io:Error`
-pub trait DenoAsyncWrite {
- fn poll_write(
- &mut self,
- cx: &mut Context,
- buf: &[u8],
- ) -> Poll<Result<usize, AnyError>>;
-
- fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
-
- fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
-}
-
-impl DenoAsyncWrite for StreamResource {
- fn poll_write(
- &mut self,
- cx: &mut Context,
- buf: &[u8],
- ) -> Poll<Result<usize, AnyError>> {
- use StreamResource::*;
- let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(Some((f, _))) => f,
- FsFile(None) => return Poll::Pending,
- TcpStream(Some(f)) => f,
- #[cfg(not(windows))]
- UnixStream(f) => f,
- ClientTlsStream(f) => f,
- ServerTlsStream(f) => f,
- ChildStdin(f) => f,
- _ => return Err(bad_resource_id()).into(),
- };
-
- let v = ready!(Pin::new(f).poll_write(cx, buf))?;
- Ok(v).into()
- }
-
- fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> {
- use StreamResource::*;
- let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(Some((f, _))) => f,
- FsFile(None) => return Poll::Pending,
- TcpStream(Some(f)) => f,
- #[cfg(not(windows))]
- UnixStream(f) => f,
- ClientTlsStream(f) => f,
- ServerTlsStream(f) => f,
- ChildStdin(f) => f,
- _ => return Err(bad_resource_id()).into(),
- };
-
- ready!(Pin::new(f).poll_flush(cx))?;
- Ok(()).into()
- }
-
- fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> {
- unimplemented!()
+ stream.read(&mut zero_copy).await?
+ } else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ stream.clone().read(&mut zero_copy).await?
+ } else {
+ return Err(bad_resource_id());
+ };
+ Ok(nread as i32)
+ }
+ .boxed_local()
+ })
}
}
@@ -381,93 +537,76 @@ pub fn op_write(
})
} else {
let zero_copy = zero_copy[0].clone();
- MinimalOp::Async(
+ MinimalOp::Async({
async move {
- let nwritten = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource_holder = state
- .resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(bad_resource_id)?;
- resource_holder.resource.poll_write(cx, &zero_copy)
- })
- .await?;
-
- // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
- // and the reasons for the need to explicitly flush are not fully known.
- // Figure out why it's needed and preferably remove it.
- // https://github.com/denoland/deno/issues/3565
- poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource_holder = state
- .resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(bad_resource_id)?;
- resource_holder.resource.poll_flush(cx)
- })
- .await?;
-
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+ let nwritten = if let Some(stream) =
+ resource.downcast_rc::<TcpStreamResource>()
+ {
+ stream.write(&zero_copy).await?
+ } else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ stream.clone().write(&zero_copy).await?
+ } else {
+ return Err(bad_resource_id());
+ };
Ok(nwritten as i32)
}
- .boxed_local(),
- )
+ .boxed_local()
+ })
}
}
-/// Helper function for operating on a std::fs::File stored in the resource table.
-///
-/// We store file system file resources as tokio::fs::File, so this is a little
-/// utility function that gets a std::fs:File when you need to do blocking
-/// operations.
-///
-/// Returns ErrorKind::Busy if the resource is being used by another op.
pub fn std_file_resource<F, T>(
state: &mut OpState,
rid: u32,
mut f: F,
) -> Result<T, AnyError>
where
- F: FnMut(
- Result<&mut std::fs::File, &mut StreamResource>,
- ) -> Result<T, AnyError>,
+ F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>,
{
// First we look up the rid in the resource table.
- let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid);
- if let Some(ref mut resource_holder) = r {
- // Sync write only works for FsFile. It doesn't make sense to do this
- // for non-blocking sockets. So we error out if not FsFile.
- match &mut resource_holder.resource {
- StreamResource::FsFile(option_file_metadata) => {
- // The object in the resource table is a tokio::fs::File - but in
- // order to do a blocking write on it, we must turn it into a
- // std::fs::File. Hopefully this code compiles down to nothing.
- if let Some((tokio_file, metadata)) = option_file_metadata.take() {
- match tokio_file.try_into_std() {
- Ok(mut std_file) => {
- let result = f(Ok(&mut std_file));
- // Turn the std_file handle back into a tokio file, put it back
- // in the resource table.
- let tokio_file = tokio::fs::File::from_std(std_file);
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- // return the result.
- result
- }
- Err(tokio_file) => {
- // This function will return an error containing the file if
- // some operation is in-flight.
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- Err(resource_unavailable())
- }
- }
- } else {
- Err(resource_unavailable())
- }
+ let resource = state
+ .resource_table
+ .get::<StreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // Sync write only works for FsFile. It doesn't make sense to do this
+ // for non-blocking sockets. So we error out if not FsFile.
+ if resource.fs_file.is_none() {
+ return f(Err(()));
+ }
+
+ // The object in the resource table is a tokio::fs::File - but in
+ // order to do a blocking write on it, we must turn it into a
+ // std::fs::File. Hopefully this code compiles down to nothing.
+
+ let fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ if let Some(mut fs_file) = fs_file_resource {
+ let tokio_file = fs_file.0.take().unwrap();
+ match tokio_file.try_into_std() {
+ Ok(mut std_file) => {
+ let result = f(Ok(&mut std_file));
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ fs_file.0 = Some(tokio_file);
+ // return the result.
+ result
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ fs_file.0 = Some(tokio_file);
+ Err(resource_unavailable())
}
- _ => f(Err(&mut resource_holder.resource)),
}
} else {
- Err(bad_resource_id())
+ Err(resource_unavailable())
}
}
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index 8770ef103..a4bda585b 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::StreamResource;
-use crate::ops::io::StreamResourceHolder;
+use crate::ops::io::FullDuplexResource;
+use crate::ops::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -11,21 +11,24 @@ use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::net::Shutdown;
use std::net::SocketAddr;
use std::rc::Rc;
-use std::task::Context;
-use std::task::Poll;
+use tokio::net::udp;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -33,12 +36,14 @@ use tokio::net::UdpSocket;
#[cfg(unix)]
use super::net_unix;
#[cfg(unix)]
+use crate::ops::io::StreamResource;
+#[cfg(unix)]
use std::path::Path;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_accept", op_accept);
super::reg_json_async(rt, "op_connect", op_connect);
- super::reg_json_sync(rt, "op_shutdown", op_shutdown);
+ super::reg_json_async(rt, "op_shutdown", op_shutdown);
super::reg_json_sync(rt, "op_listen", op_listen);
super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
@@ -57,39 +62,31 @@ async fn accept_tcp(
) -> Result<Value, AnyError> {
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<TcpListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(AnyError::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TcpListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
}
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
+ })?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state = state.borrow_mut();
- let rid = state.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
+ let rid = state
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
@@ -138,18 +135,17 @@ async fn receive_udp(
let rid = args.rid as u32;
- let receive_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- let socket = &mut resource.socket;
- socket
- .poll_recv_from(cx, &mut zero_copy)
- .map_err(AnyError::from)
- });
- let (size, remote_addr) = receive_fut.await?;
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let (size, remote_addr) = resource
+ .rd_borrow_mut()
+ .await
+ .recv_from(&mut zero_copy)
+ .try_or_cancel(resource.cancel_handle())
+ .await?;
Ok(json!({
"size": size,
"remoteAddr": {
@@ -207,19 +203,18 @@ async fn op_datagram_send(
.check_net(&args.hostname, args.port)?;
}
let addr = resolve_addr(&args.hostname, args.port).await?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<UdpSocketResource>(rid as u32)
- .ok_or_else(|| bad_resource("Socket has been closed"))?;
- resource
- .socket
- .poll_send_to(cx, &zero_copy, &addr)
- .map_ok(|byte_length| json!(byte_length))
- .map_err(AnyError::from)
- })
- .await
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<UdpSocketResource>(rid as u32)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let byte_length = resource
+ .wr_borrow_mut()
+ .await
+ .send_to(&zero_copy, &addr)
+ .await?;
+ Ok(json!(byte_length))
}
#[cfg(unix)]
SendArgs {
@@ -232,18 +227,17 @@ async fn op_datagram_send(
let s = state.borrow();
s.borrow::<Permissions>().check_write(&address_path)?;
}
- let mut state = state.borrow_mut();
let resource = state
+ .borrow()
.resource_table
- .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
+ .get::<net_unix::UnixDatagramResource>(rid as u32)
.ok_or_else(|| {
custom_error("NotConnected", "Socket has been closed")
})?;
- let socket = &mut resource.socket;
- let byte_length = socket
- .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
- .await?;
-
+ let mut socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let byte_length = socket.send_to(&zero_copy, address_path).await?;
Ok(json!(byte_length))
}
_ => Err(type_error("Wrong argument format!")),
@@ -279,12 +273,9 @@ async fn op_connect(
let remote_addr = tcp_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let rid = state_.resource_table.add(
- "tcpStream",
- Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
- tcp_stream,
- )))),
- );
+ let rid = state_
+ .resource_table
+ .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(json!({
"rid": rid,
"localAddr": {
@@ -317,12 +308,8 @@ async fn op_connect(
let remote_addr = unix_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let rid = state_.resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
+ let resource = StreamResource::unix_stream(unix_stream);
+ let rid = state_.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
@@ -345,12 +332,12 @@ struct ShutdownArgs {
how: i32,
}
-fn op_shutdown(
- state: &mut OpState,
+async fn op_shutdown(
+ state: Rc<RefCell<OpState>>,
args: Value,
- _zero_copy: &mut [ZeroCopyBuf],
+ _zero_copy: BufVec,
) -> Result<Value, AnyError> {
- super::check_unstable(state, "Deno.shutdown");
+ super::check_unstable2(&state, "Deno.shutdown");
let args: ShutdownArgs = serde_json::from_value(args)?;
@@ -358,80 +345,61 @@ fn op_shutdown(
let how = args.how;
let shutdown_mode = match how {
- 0 => Shutdown::Read,
+ 0 => Shutdown::Read, // TODO: nonsense, remove me.
1 => Shutdown::Write,
_ => unimplemented!(),
};
- let resource_holder = state
+ let resource = state
+ .borrow()
.resource_table
- .get_mut::<StreamResourceHolder>(rid)
+ .get_any(rid)
.ok_or_else(bad_resource_id)?;
- match resource_holder.resource {
- StreamResource::TcpStream(Some(ref mut stream)) => {
- TcpStream::shutdown(stream, shutdown_mode)?;
- }
- #[cfg(unix)]
- StreamResource::UnixStream(ref mut stream) => {
- net_unix::UnixStream::shutdown(stream, shutdown_mode)?;
+ if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
+ let wr = stream.wr_borrow_mut().await;
+ TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?;
+ return Ok(json!({}));
+ }
+
+ #[cfg(unix)]
+ if let Some(stream) = resource.downcast_rc::<StreamResource>() {
+ if stream.unix_stream.is_some() {
+ let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
+ .borrow_mut()
+ .await;
+ net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?;
+ return Ok(json!({}));
}
- _ => return Err(bad_resource_id()),
}
- Ok(json!({}))
+ Err(bad_resource_id())
}
-#[allow(dead_code)]
struct TcpListenerResource {
- listener: TcpListener,
- waker: Option<futures::task::AtomicWaker>,
- local_addr: SocketAddr,
+ listener: AsyncRefCell<TcpListener>,
+ cancel: CancelHandle,
}
-impl Drop for TcpListenerResource {
- fn drop(&mut self) {
- self.wake_task();
+impl Resource for TcpListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tcpListener".into()
}
-}
-
-impl TcpListenerResource {
- /// Track the current task so future awaiting for connection
- /// can be notified when listener is closed.
- ///
- /// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
- // Currently, we only allow tracking a single accept task for a listener.
- // This might be changed in the future with multiple workers.
- // Caveat: TcpListener by itself also only tracks an accept task at a time.
- // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.waker.is_some() {
- return Err(custom_error("Busy", "Another accept task is ongoing"));
- }
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- self.waker.replace(waker);
- Ok(())
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
+}
- /// Notifies a task when listener is closed so accept future can resolve.
- pub fn wake_task(&mut self) {
- if let Some(waker) = self.waker.as_ref() {
- waker.wake();
- }
- }
+type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>;
- /// Stop tracking a task.
- /// Happens when the task is done and thus no further tracking is needed.
- pub fn untrack_task(&mut self) {
- if self.waker.is_some() {
- self.waker.take();
- }
+impl Resource for UdpSocketResource {
+ fn name(&self) -> Cow<str> {
+ "udpSocket".into()
}
-}
-struct UdpSocketResource {
- socket: UdpSocket,
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops()
+ }
}
#[derive(Deserialize)]
@@ -463,13 +431,10 @@ fn listen_tcp(
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let listener_resource = TcpListenerResource {
- listener,
- waker: None,
- local_addr,
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("tcpListener", Box::new(listener_resource));
+ let rid = state.resource_table.add(listener_resource);
Ok((rid, local_addr))
}
@@ -481,10 +446,8 @@ fn listen_udp(
let std_socket = std::net::UdpSocket::bind(&addr)?;
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
- let socket_resource = UdpSocketResource { socket };
- let rid = state
- .resource_table
- .add("udpSocket", Box::new(socket_resource));
+ let socket_resource = UdpSocketResource::new(socket.split());
+ let rid = state.resource_table.add(socket_resource);
Ok((rid, local_addr))
}
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
index 4c416a5a4..23981a7f1 100644
--- a/runtime/ops/net_unix.rs
+++ b/runtime/ops/net_unix.rs
@@ -1,34 +1,59 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops::io::StreamResource;
-use crate::ops::io::StreamResourceHolder;
use crate::ops::net::AcceptArgs;
use crate::ops::net::ReceiveArgs;
use deno_core::error::bad_resource;
+use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::fs::remove_file;
use std::os::unix;
use std::path::Path;
use std::rc::Rc;
-use std::task::Poll;
use tokio::net::UnixDatagram;
use tokio::net::UnixListener;
pub use tokio::net::UnixStream;
struct UnixListenerResource {
- listener: UnixListener,
+ listener: AsyncRefCell<UnixListener>,
+ cancel: CancelHandle,
+}
+
+impl Resource for UnixListenerResource {
+ fn name(&self) -> Cow<str> {
+ "unixListener".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
pub struct UnixDatagramResource {
- pub socket: UnixDatagram,
- pub local_addr: unix::net::SocketAddr,
+ pub socket: AsyncRefCell<UnixDatagram>,
+ pub cancel: CancelHandle,
+}
+
+impl Resource for UnixDatagramResource {
+ fn name(&self) -> Cow<str> {
+ "unixDatagram".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
#[derive(Deserialize)]
@@ -43,38 +68,23 @@ pub(crate) async fn accept_unix(
) -> Result<Value, AnyError> {
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<UnixListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- use deno_core::futures::StreamExt;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(stream)) => {
- //listener_resource.untrack_task();
- Poll::Ready(stream)
- }
- Poll::Ready(None) => todo!(),
- Poll::Pending => {
- //listener_resource.track_task(cx)?;
- Poll::Pending
- }
- }
- .map_err(AnyError::from)
- });
- let unix_stream = accept_fut.await?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<UnixListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Listener already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (unix_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
+ let resource = StreamResource::unix_stream(unix_stream);
let mut state = state.borrow_mut();
- let rid = state.resource_table.add(
- "unixStream",
- Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
- unix_stream,
- ))),
- );
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
"localAddr": {
@@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet(
let rid = args.rid as u32;
let mut buf = bufs.into_iter().next().unwrap();
- let mut state = state.borrow_mut();
let resource = state
+ .borrow()
.resource_table
- .get_mut::<UnixDatagramResource>(rid)
+ .get::<UnixDatagramResource>(rid)
.ok_or_else(|| bad_resource("Socket has been closed"))?;
- let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
+ let mut socket = RcRef::map(&resource, |r| &r.socket)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (size, remote_addr) =
+ socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
Ok(json!({
"size": size,
"remoteAddr": {
@@ -122,10 +137,11 @@ pub fn listen_unix(
}
let listener = UnixListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
- let listener_resource = UnixListenerResource { listener };
- let rid = state
- .resource_table
- .add("unixListener", Box::new(listener_resource));
+ let listener_resource = UnixListenerResource {
+ listener: AsyncRefCell::new(listener),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(listener_resource);
Ok((rid, local_addr))
}
@@ -140,12 +156,10 @@ pub fn listen_unix_packet(
let socket = UnixDatagram::bind(&addr)?;
let local_addr = socket.local_addr()?;
let datagram_resource = UnixDatagramResource {
- socket,
- local_addr: local_addr.clone(),
+ socket: AsyncRefCell::new(socket),
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("unixDatagram", Box::new(datagram_resource));
+ let rid = state.resource_table.add(datagram_resource);
Ok((rid, local_addr))
}
diff --git a/runtime/ops/plugin.rs b/runtime/ops/plugin.rs
index 1f3669b6f..953d6f7d2 100644
--- a/runtime/ops/plugin.rs
+++ b/runtime/ops/plugin.rs
@@ -14,9 +14,11 @@ use deno_core::Op;
use deno_core::OpAsyncFuture;
use deno_core::OpId;
use deno_core::OpState;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use dlopen::symbor::Library;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::pin::Pin;
@@ -53,9 +55,7 @@ pub fn op_open_plugin(
let rid;
let deno_plugin_init;
{
- rid = state
- .resource_table
- .add("plugin", Box::new(plugin_resource));
+ rid = state.resource_table.add(plugin_resource);
deno_plugin_init = *unsafe {
state
.resource_table
@@ -77,6 +77,12 @@ struct PluginResource {
lib: Rc<Library>,
}
+impl Resource for PluginResource {
+ fn name(&self) -> Cow<str> {
+ "plugin".into()
+ }
+}
+
impl PluginResource {
fn new(lib: &Rc<Library>) -> Self {
Self { lib: lib.clone() }
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index 67b3d0761..b46627e21 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -1,19 +1,22 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::io::{std_file_resource, StreamResource, StreamResourceHolder};
+use super::io::{std_file_resource, StreamResource};
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
-use deno_core::futures::future::FutureExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncMutFuture;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::rc::Rc;
use tokio::process::Command;
@@ -61,7 +64,19 @@ struct RunArgs {
}
struct ChildResource {
- child: tokio::process::Child,
+ child: AsyncRefCell<tokio::process::Child>,
+}
+
+impl Resource for ChildResource {
+ fn name(&self) -> Cow<str> {
+ "child".into()
+ }
+}
+
+impl ChildResource {
+ fn borrow_mut(self: Rc<Self>) -> AsyncMutFuture<tokio::process::Child> {
+ RcRef::map(self, |r| &r.child).borrow_mut()
+ }
}
fn op_run(
@@ -117,12 +132,9 @@ fn op_run(
let stdin_rid = match child.stdin.take() {
Some(child_stdin) => {
- let rid = state.resource_table.add(
- "childStdin",
- Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
- child_stdin,
- ))),
- );
+ let rid = state
+ .resource_table
+ .add(StreamResource::child_stdin(child_stdin));
Some(rid)
}
None => None,
@@ -130,12 +142,9 @@ fn op_run(
let stdout_rid = match child.stdout.take() {
Some(child_stdout) => {
- let rid = state.resource_table.add(
- "childStdout",
- Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
- child_stdout,
- ))),
- );
+ let rid = state
+ .resource_table
+ .add(StreamResource::child_stdout(child_stdout));
Some(rid)
}
None => None,
@@ -143,19 +152,18 @@ fn op_run(
let stderr_rid = match child.stderr.take() {
Some(child_stderr) => {
- let rid = state.resource_table.add(
- "childStderr",
- Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
- child_stderr,
- ))),
- );
+ let rid = state
+ .resource_table
+ .add(StreamResource::child_stderr(child_stderr));
Some(rid)
}
None => None,
};
- let child_resource = ChildResource { child };
- let child_rid = state.resource_table.add("child", Box::new(child_resource));
+ let child_resource = ChildResource {
+ child: AsyncRefCell::new(child),
+ };
+ let child_rid = state.resource_table.add(child_resource);
Ok(json!({
"rid": child_rid,
@@ -185,17 +193,13 @@ async fn op_run_status(
s.borrow::<Permissions>().check_run()?;
}
- let run_status = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let child_resource = state
- .resource_table
- .get_mut::<ChildResource>(rid)
- .ok_or_else(bad_resource_id)?;
- let child = &mut child_resource.child;
- child.poll_unpin(cx).map_err(AnyError::from)
- })
- .await?;
-
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ChildResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut child = resource.borrow_mut().await;
+ let run_status = (&mut *child).await?;
let code = run_status.code();
#[cfg(unix)]
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs
index be6bc0a35..b3891792c 100644
--- a/runtime/ops/signal.rs
+++ b/runtime/ops/signal.rs
@@ -11,15 +11,23 @@ use std::rc::Rc;
#[cfg(unix)]
use deno_core::error::bad_resource_id;
#[cfg(unix)]
-use deno_core::futures::future::poll_fn;
-#[cfg(unix)]
use deno_core::serde_json;
#[cfg(unix)]
use deno_core::serde_json::json;
#[cfg(unix)]
+use deno_core::AsyncRefCell;
+#[cfg(unix)]
+use deno_core::CancelFuture;
+#[cfg(unix)]
+use deno_core::CancelHandle;
+#[cfg(unix)]
+use deno_core::RcRef;
+#[cfg(unix)]
+use deno_core::Resource;
+#[cfg(unix)]
use serde::Deserialize;
#[cfg(unix)]
-use std::task::Waker;
+use std::borrow::Cow;
#[cfg(unix)]
use tokio::signal::unix::{signal, Signal, SignalKind};
@@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
#[cfg(unix)]
/// The resource for signal stream.
/// The second element is the waker of polling future.
-pub struct SignalStreamResource(pub Signal, pub Option<Waker>);
+struct SignalStreamResource {
+ signal: AsyncRefCell<Signal>,
+ cancel: CancelHandle,
+}
+
+#[cfg(unix)]
+impl Resource for SignalStreamResource {
+ fn name(&self) -> Cow<str> {
+ "signal".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
#[cfg(unix)]
#[derive(Deserialize)]
@@ -54,13 +76,13 @@ fn op_signal_bind(
) -> Result<Value, AnyError> {
super::check_unstable(state, "Deno.signal");
let args: BindSignalArgs = serde_json::from_value(args)?;
- let rid = state.resource_table.add(
- "signal",
- Box::new(SignalStreamResource(
+ let resource = SignalStreamResource {
+ signal: AsyncRefCell::new(
signal(SignalKind::from_raw(args.signo)).expect(""),
- None,
- )),
- );
+ ),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
}))
@@ -76,18 +98,18 @@ async fn op_signal_poll(
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let future = poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- if let Some(mut signal) =
- state.resource_table.get_mut::<SignalStreamResource>(rid)
- {
- signal.1 = Some(cx.waker().clone());
- return signal.0.poll_recv(cx);
- }
- std::task::Poll::Ready(None)
- });
- let result = future.await;
- Ok(json!({ "done": result.is_none() }))
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<SignalStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await;
+
+ match signal.recv().or_cancel(cancel).await {
+ Ok(result) => Ok(json!({ "done": result.is_none() })),
+ Err(_) => Ok(json!({ "done": true })),
+ }
}
#[cfg(unix)]
@@ -99,14 +121,6 @@ pub fn op_signal_unbind(
super::check_unstable(state, "Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let resource = state.resource_table.get_mut::<SignalStreamResource>(rid);
- if let Some(signal) = resource {
- if let Some(waker) = &signal.1 {
- // Wakes up the pending poll if exists.
- // This prevents the poll future from getting stuck forever.
- waker.clone().wake();
- }
- }
state
.resource_table
.close(rid)
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
index b59650ab0..0630747ed 100644
--- a/runtime/ops/tls.rs
+++ b/runtime/ops/tls.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::io::{StreamResource, StreamResourceHolder};
+use super::io::StreamResource;
+use super::io::TcpStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -8,25 +9,26 @@ use deno_core::error::bad_resource;
use deno_core::error::bad_resource_id;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::fs::File;
use std::io::BufReader;
-use std::net::SocketAddr;
use std::path::Path;
use std::rc::Rc;
use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@@ -85,60 +87,53 @@ async fn op_start_tls(
permissions.check_read(Path::new(&path))?;
}
}
- let mut resource_holder = {
- let mut state_ = state.borrow_mut();
- match state_.resource_table.remove::<StreamResourceHolder>(rid) {
- Some(resource) => *resource,
- None => return Err(bad_resource_id()),
- }
- };
- if let StreamResource::TcpStream(ref mut tcp_stream) =
- resource_holder.resource
- {
- let tcp_stream = tcp_stream.take().unwrap();
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
- let mut config = ClientConfig::new();
- config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
- if let Some(path) = cert_file {
- let key_file = File::open(path)?;
- let reader = &mut BufReader::new(key_file);
- config.root_store.add_pem_file(reader).unwrap();
- }
+ let resource_rc = state
+ .borrow_mut()
+ .resource_table
+ .take::<TcpStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let resource = Rc::try_unwrap(resource_rc)
+ .expect("Only a single use of this resource should happen");
+ let (read_half, write_half) = resource.into_inner();
+ let tcp_stream = read_half.reunite(write_half)?;
- let tls_connector = TlsConnector::from(Arc::new(config));
- let dnsname =
- DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
- let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
-
- let rid = {
- let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "clientTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
- Box::new(tls_stream),
- ))),
- )
- };
- Ok(json!({
- "rid": rid,
- "localAddr": {
- "hostname": local_addr.ip().to_string(),
- "port": local_addr.port(),
- "transport": "tcp",
- },
- "remoteAddr": {
- "hostname": remote_addr.ip().to_string(),
- "port": remote_addr.port(),
- "transport": "tcp",
- }
- }))
- } else {
- Err(bad_resource_id())
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
}
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_
+ .resource_table
+ .add(StreamResource::client_tls_stream(tls_stream))
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": "tcp",
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "tcp",
+ }
+ }))
}
async fn op_connect_tls(
@@ -180,12 +175,9 @@ async fn op_connect_tls(
let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
let rid = {
let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "clientTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
- Box::new(tls_stream),
- ))),
- )
+ state_
+ .resource_table
+ .add(StreamResource::client_tls_stream(tls_stream))
};
Ok(json!({
"rid": rid,
@@ -256,51 +248,19 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
Ok(keys)
}
-#[allow(dead_code)]
pub struct TlsListenerResource {
- listener: TcpListener,
+ listener: AsyncRefCell<TcpListener>,
tls_acceptor: TlsAcceptor,
- waker: Option<futures::task::AtomicWaker>,
- local_addr: SocketAddr,
+ cancel: CancelHandle,
}
-impl Drop for TlsListenerResource {
- fn drop(&mut self) {
- self.wake_task();
+impl Resource for TlsListenerResource {
+ fn name(&self) -> Cow<str> {
+ "tlsListener".into()
}
-}
-
-impl TlsListenerResource {
- /// Track the current task so future awaiting for connection
- /// can be notified when listener is closed.
- ///
- /// Throws an error if another task is already tracked.
- pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
- // Currently, we only allow tracking a single accept task for a listener.
- // This might be changed in the future with multiple workers.
- // Caveat: TcpListener by itself also only tracks an accept task at a time.
- // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.waker.is_some() {
- return Err(custom_error("Busy", "Another accept task is ongoing"));
- }
- let waker = futures::task::AtomicWaker::new();
- waker.register(cx.waker());
- self.waker.replace(waker);
- Ok(())
- }
-
- /// Notifies a task when listener is closed so accept future can resolve.
- pub fn wake_task(&mut self) {
- if let Some(waker) = self.waker.as_ref() {
- waker.wake();
- }
- }
-
- /// Stop tracking a task.
- /// Happens when the task is done and thus no further tracking is needed.
- pub fn untrack_task(&mut self) {
- self.waker.take();
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
}
@@ -340,15 +300,12 @@ fn op_listen_tls(
let listener = TcpListener::from_std(std_listener)?;
let local_addr = listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
- listener,
+ listener: AsyncRefCell::new(listener),
tls_acceptor,
- waker: None,
- local_addr,
+ cancel: Default::default(),
};
- let rid = state
- .resource_table
- .add("tlsListener", Box::new(tls_listener_resource));
+ let rid = state.resource_table.add(tls_listener_resource);
Ok(json!({
"rid": rid,
@@ -372,50 +329,46 @@ async fn op_accept_tls(
) -> Result<Value, AnyError> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let accept_fut = poll_fn(|cx| {
- let mut state = state.borrow_mut();
- let listener_resource = state
- .resource_table
- .get_mut::<TlsListenerResource>(rid)
- .ok_or_else(|| bad_resource("Listener has been closed"))?;
- let listener = &mut listener_resource.listener;
- match listener.poll_accept(cx).map_err(AnyError::from) {
- Poll::Ready(Ok((stream, addr))) => {
- listener_resource.untrack_task();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => {
- listener_resource.track_task(cx)?;
- Poll::Pending
- }
- Poll::Ready(Err(e)) => {
- listener_resource.untrack_task();
- Poll::Ready(Err(e))
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let mut listener = RcRef::map(&resource, |r| &r.listener)
+ .try_borrow_mut()
+ .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let (tcp_stream, _socket_addr) =
+ listener.accept().try_or_cancel(cancel).await.map_err(|e| {
+ // FIXME(bartlomieju): compatibility with current JS implementation
+ if let std::io::ErrorKind::Interrupted = e.kind() {
+ bad_resource("Listener has been closed")
+ } else {
+ e.into()
}
- }
- });
- let (tcp_stream, _socket_addr) = accept_fut.await?;
+ })?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let tls_acceptor = {
- let state_ = state.borrow();
- let resource = state_
- .resource_table
- .get::<TlsListenerResource>(rid)
- .ok_or_else(bad_resource_id)
- .expect("Can't find tls listener");
- resource.tls_acceptor.clone()
- };
- let tls_stream = tls_acceptor.accept(tcp_stream).await?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let tls_acceptor = resource.tls_acceptor.clone();
+ let tls_stream = tls_acceptor
+ .accept(tcp_stream)
+ .try_or_cancel(cancel)
+ .await?;
+
let rid = {
let mut state_ = state.borrow_mut();
- state_.resource_table.add(
- "serverTlsStream",
- Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
- Box::new(tls_stream),
- ))),
- )
+ state_
+ .resource_table
+ .add(StreamResource::server_tls_stream(tls_stream))
};
+
Ok(json!({
"rid": rid,
"localAddr": {
diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs
index ad66bcf1a..05536b429 100644
--- a/runtime/ops/tty.rs
+++ b/runtime/ops/tty.rs
@@ -2,7 +2,6 @@
use super::io::std_file_resource;
use super::io::StreamResource;
-use super::io::StreamResourceHolder;
use deno_core::error::bad_resource_id;
use deno_core::error::not_supported;
use deno_core::error::resource_unavailable;
@@ -11,6 +10,7 @@ use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::OpState;
+use deno_core::RcRef;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use serde::Serialize;
@@ -88,48 +88,47 @@ fn op_set_raw(
use winapi::shared::minwindef::FALSE;
use winapi::um::{consoleapi, handleapi};
- let resource_holder =
- state.resource_table.get_mut::<StreamResourceHolder>(rid);
- if resource_holder.is_none() {
- return Err(bad_resource_id());
- }
+ let resource = state
+ .resource_table
+ .get::<StreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
if cbreak {
return Err(not_supported());
}
- let resource_holder = resource_holder.unwrap();
-
- // For now, only stdin.
- let handle = match &mut resource_holder.resource {
- StreamResource::FsFile(ref mut option_file_metadata) => {
- if let Some((tokio_file, metadata)) = option_file_metadata.take() {
- match tokio_file.try_into_std() {
- Ok(std_file) => {
- let raw_handle = std_file.as_raw_handle();
- // Turn the std_file handle back into a tokio file, put it back
- // in the resource table.
- let tokio_file = tokio::fs::File::from_std(std_file);
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- // return the result.
- raw_handle
- }
- Err(tokio_file) => {
- // This function will return an error containing the file if
- // some operation is in-flight.
- resource_holder.resource =
- StreamResource::FsFile(Some((tokio_file, metadata)));
- return Err(resource_unavailable());
- }
- }
- } else {
- return Err(resource_unavailable());
+
+ if resource.fs_file.is_none() {
+ return Err(bad_resource_id());
+ }
+
+ let fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ let handle_result = if let Some(mut fs_file) = fs_file_resource {
+ let tokio_file = fs_file.0.take().unwrap();
+ match tokio_file.try_into_std() {
+ Ok(std_file) => {
+ let raw_handle = std_file.as_raw_handle();
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ fs_file.0 = Some(tokio_file);
+ // return the result.
+ Ok(raw_handle)
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ fs_file.0 = Some(tokio_file);
+ Err(resource_unavailable())
}
}
- _ => {
- return Err(bad_resource_id());
- }
+ } else {
+ Err(resource_unavailable())
};
+ let handle = handle_result?;
+
if handle == handleapi::INVALID_HANDLE_VALUE {
return Err(Error::last_os_error().into());
} else if handle.is_null() {
@@ -156,24 +155,31 @@ fn op_set_raw(
{
use std::os::unix::io::AsRawFd;
- let resource_holder =
- state.resource_table.get_mut::<StreamResourceHolder>(rid);
- if resource_holder.is_none() {
- return Err(bad_resource_id());
+ let resource = state
+ .resource_table
+ .get::<StreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ if resource.fs_file.is_none() {
+ return Err(not_supported());
}
- if is_raw {
- let (raw_fd, maybe_tty_mode) =
- match &mut resource_holder.unwrap().resource {
- StreamResource::FsFile(Some((f, ref mut metadata))) => {
- (f.as_raw_fd(), &mut metadata.tty.mode)
- }
- StreamResource::FsFile(None) => return Err(resource_unavailable()),
- _ => {
- return Err(not_supported());
- }
- };
+ let maybe_fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ if maybe_fs_file_resource.is_none() {
+ return Err(resource_unavailable());
+ }
+ let mut fs_file_resource = maybe_fs_file_resource.unwrap();
+ if fs_file_resource.0.is_none() {
+ return Err(resource_unavailable());
+ }
+
+ let raw_fd = fs_file_resource.0.as_ref().unwrap().as_raw_fd();
+ let maybe_tty_mode = &mut fs_file_resource.1.as_mut().unwrap().tty.mode;
+
+ if is_raw {
if maybe_tty_mode.is_none() {
// Save original mode.
let original_mode = termios::tcgetattr(raw_fd)?;
@@ -199,28 +205,14 @@ fn op_set_raw(
raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1;
raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0;
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
- Ok(json!({}))
} else {
// Try restore saved mode.
- let (raw_fd, maybe_tty_mode) =
- match &mut resource_holder.unwrap().resource {
- StreamResource::FsFile(Some((f, ref mut metadata))) => {
- (f.as_raw_fd(), &mut metadata.tty.mode)
- }
- StreamResource::FsFile(None) => {
- return Err(resource_unavailable());
- }
- _ => {
- return Err(bad_resource_id());
- }
- };
-
if let Some(mode) = maybe_tty_mode.take() {
termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
}
-
- Ok(json!({}))
}
+
+ Ok(json!({}))
}
}
@@ -255,7 +247,6 @@ fn op_isatty(
Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
}
}
- Err(StreamResource::FsFile(_)) => unreachable!(),
_ => Ok(false),
})?;
Ok(json!(isatty))
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs
index a8c591a33..d805f307b 100644
--- a/runtime/ops/websocket.rs
+++ b/runtime/ops/websocket.rs
@@ -1,18 +1,23 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::permissions::Permissions;
-use core::task::Poll;
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
+use deno_core::futures::stream::SplitSink;
+use deno_core::futures::stream::SplitStream;
+use deno_core::futures::SinkExt;
use deno_core::futures::StreamExt;
-use deno_core::futures::{ready, SinkExt};
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::url;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::{serde_json, ZeroCopyBuf};
use http::{Method, Request, Uri};
use serde::Deserialize;
@@ -62,6 +67,22 @@ type MaybeTlsStream =
StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>;
type WsStream = WebSocketStream<MaybeTlsStream>;
+struct WsStreamResource {
+ tx: AsyncRefCell<SplitSink<WsStream, Message>>,
+ rx: AsyncRefCell<SplitStream<WsStream>>,
+ // When a `WsStreamResource` resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures are attached to this cancel handle.
+ cancel: CancelHandle,
+}
+
+impl Resource for WsStreamResource {
+ fn name(&self) -> Cow<str> {
+ "webSocketStream".into()
+ }
+}
+
+impl WsStreamResource {}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
@@ -165,10 +186,14 @@ pub async fn op_ws_create(
))
})?;
+ let (ws_tx, ws_rx) = stream.split();
+ let resource = WsStreamResource {
+ rx: AsyncRefCell::new(ws_rx),
+ tx: AsyncRefCell::new(ws_tx),
+ cancel: Default::default(),
+ };
let mut state = state.borrow_mut();
- let rid = state
- .resource_table
- .add("webSocketStream", Box::new(stream));
+ let rid = state.resource_table.add(resource);
let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
Some(header) => header.to_str().unwrap(),
@@ -202,30 +227,21 @@ pub async fn op_ws_send(
) -> Result<Value, AnyError> {
let args: SendArgs = serde_json::from_value(args)?;
- let mut maybe_msg = Some(match args.text {
+ let msg = match args.text {
Some(text) => Message::Text(text),
None => Message::Binary(bufs[0].to_vec()),
- });
+ };
let rid = args.rid;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let stream = state
- .resource_table
- .get_mut::<WsStream>(rid)
- .ok_or_else(bad_resource_id)?;
-
- // TODO(ry) Handle errors below instead of unwrap.
- // Need to map `TungsteniteError` to `AnyError`.
- ready!(stream.poll_ready_unpin(cx)).unwrap();
- if let Some(msg) = maybe_msg.take() {
- stream.start_send_unpin(msg).unwrap();
- }
- ready!(stream.poll_flush_unpin(cx)).unwrap();
-
- Poll::Ready(Ok(json!({})))
- })
- .await
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
+ tx.send(msg).await?;
+ eprintln!("sent!");
+ Ok(json!({}))
}
#[derive(Deserialize)]
@@ -243,33 +259,22 @@ pub async fn op_ws_close(
) -> Result<Value, AnyError> {
let args: CloseArgs = serde_json::from_value(args)?;
let rid = args.rid;
- let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
+ let msg = Message::Close(args.code.map(|c| CloseFrame {
code: CloseCode::from(c),
reason: match args.reason {
Some(reason) => Cow::from(reason),
None => Default::default(),
},
- })));
-
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let stream = state
- .resource_table
- .get_mut::<WsStream>(rid)
- .ok_or_else(bad_resource_id)?;
-
- // TODO(ry) Handle errors below instead of unwrap.
- // Need to map `TungsteniteError` to `AnyError`.
- ready!(stream.poll_ready_unpin(cx)).unwrap();
- if let Some(msg) = maybe_msg.take() {
- stream.start_send_unpin(msg).unwrap();
- }
- ready!(stream.poll_flush_unpin(cx)).unwrap();
- ready!(stream.poll_close_unpin(cx)).unwrap();
+ }));
- Poll::Ready(Ok(json!({})))
- })
- .await
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await;
+ tx.send(msg).await?;
+ Ok(json!({}))
}
#[derive(Deserialize)]
@@ -284,43 +289,41 @@ pub async fn op_ws_next_event(
_bufs: BufVec,
) -> Result<Value, AnyError> {
let args: NextEventArgs = serde_json::from_value(args)?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let stream = state
- .resource_table
- .get_mut::<WsStream>(args.rid)
- .ok_or_else(bad_resource_id)?;
- stream
- .poll_next_unpin(cx)
- .map(|val| {
- match val {
- Some(Ok(Message::Text(text))) => json!({
- "type": "string",
- "data": text
- }),
- Some(Ok(Message::Binary(data))) => {
- // TODO(ry): don't use json to send binary data.
- json!({
- "type": "binary",
- "data": data
- })
- }
- Some(Ok(Message::Close(Some(frame)))) => json!({
- "type": "close",
- "code": u16::from(frame.code),
- "reason": frame.reason.as_ref()
- }),
- Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
- Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
- Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
- Some(Err(_)) => json!({"type": "error"}),
- None => {
- state.resource_table.close(args.rid).unwrap();
- json!({"type": "closed"})
- }
- }
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(args.rid)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let val = rx.next().or_cancel(cancel).await?;
+ let res = match val {
+ Some(Ok(Message::Text(text))) => json!({
+ "type": "string",
+ "data": text
+ }),
+ Some(Ok(Message::Binary(data))) => {
+ // TODO(ry): don't use json to send binary data.
+ json!({
+ "type": "binary",
+ "data": data
})
- .map(Ok)
- })
- .await
+ }
+ Some(Ok(Message::Close(Some(frame)))) => json!({
+ "type": "close",
+ "code": u16::from(frame.code),
+ "reason": frame.reason.as_ref()
+ }),
+ Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
+ Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
+ Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
+ Some(Err(_)) => json!({"type": "error"}),
+ None => {
+ state.borrow_mut().resource_table.close(args.rid).unwrap();
+ json!({"type": "closed"})
+ }
+ };
+ Ok(res)
}
diff --git a/runtime/rt/30_net.js b/runtime/rt/30_net.js
index 9a71f0693..7009f6f8d 100644
--- a/runtime/rt/30_net.js
+++ b/runtime/rt/30_net.js
@@ -11,20 +11,16 @@
0: "Read",
1: "Write",
2: "ReadWrite",
- Read: 0,
+ Read: 0, // TODO: nonsense, remove me.
Write: 1,
ReadWrite: 2, // unused
};
function shutdown(rid, how) {
- core.jsonOpSync("op_shutdown", { rid, how });
- return Promise.resolve();
+ return core.jsonOpAsync("op_shutdown", { rid, how });
}
- function opAccept(
- rid,
- transport,
- ) {
+ function opAccept(rid, transport) {
return core.jsonOpAsync("op_accept", { rid, transport });
}
@@ -36,11 +32,7 @@
return core.jsonOpAsync("op_connect", args);
}
- function opReceive(
- rid,
- transport,
- zeroCopy,
- ) {
+ function opReceive(rid, transport, zeroCopy) {
return core.jsonOpAsync(
"op_datagram_receive",
{ rid, transport },
@@ -56,11 +48,7 @@
#rid = 0;
#remoteAddr = null;
#localAddr = null;
- constructor(
- rid,
- remoteAddr,
- localAddr,
- ) {
+ constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
this.#localAddr = localAddr;
@@ -149,11 +137,7 @@
#rid = 0;
#addr = null;
- constructor(
- rid,
- addr,
- bufSize = 1024,
- ) {
+ constructor(rid, addr, bufSize = 1024) {
this.#rid = rid;
this.#addr = addr;
this.bufSize = bufSize;
@@ -213,9 +197,7 @@
return new Listener(res.rid, res.localAddr);
}
- async function connect(
- options,
- ) {
+ async function connect(options) {
let res;
if (options.transport === "unix") {
diff --git a/runtime/rt/40_fs_events.js b/runtime/rt/40_fs_events.js
index a36adecba..a179e8c1b 100644
--- a/runtime/rt/40_fs_events.js
+++ b/runtime/rt/40_fs_events.js
@@ -24,6 +24,8 @@
} catch (error) {
if (error instanceof errors.BadResource) {
return { value: undefined, done: true };
+ } else if (error instanceof errors.Interrupted) {
+ return { value: undefined, done: true };
}
throw error;
}
diff --git a/runtime/rt/40_signals.js b/runtime/rt/40_signals.js
index 739c963fd..091afd66a 100644
--- a/runtime/rt/40_signals.js
+++ b/runtime/rt/40_signals.js
@@ -3,6 +3,7 @@
((window) => {
const core = window.Deno.core;
const { build } = window.__bootstrap.build;
+ const { errors } = window.__bootstrap.errors;
function bindSignal(signo) {
return core.jsonOpSync("op_signal_bind", { signo });
@@ -212,7 +213,15 @@
}
#pollSignal = async () => {
- const res = await pollSignal(this.#rid);
+ let res;
+ try {
+ res = await pollSignal(this.#rid);
+ } catch (error) {
+ if (error instanceof errors.BadResource) {
+ return true;
+ }
+ throw error;
+ }
return res.done;
};
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index db97e3604..c1713f815 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -256,15 +256,16 @@ impl WebWorker {
let op_state = js_runtime.op_state();
let mut op_state = op_state.borrow_mut();
+ let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = ops::io::get_stdio();
if let Some(stream) = stdin {
- op_state.resource_table.add("stdin", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stdout {
- op_state.resource_table.add("stdout", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stderr {
- op_state.resource_table.add("stderr", Box::new(stream));
+ t.add(stream);
}
}
diff --git a/runtime/worker.rs b/runtime/worker.rs
index a0e63afad..adb525c4c 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -152,13 +152,13 @@ impl MainWorker {
let t = &mut op_state.resource_table;
let (stdin, stdout, stderr) = ops::io::get_stdio();
if let Some(stream) = stdin {
- t.add("stdin", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stdout {
- t.add("stdout", Box::new(stream));
+ t.add(stream);
}
if let Some(stream) = stderr {
- t.add("stderr", Box::new(stream));
+ t.add(stream);
}
}