diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-16 17:14:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-16 17:14:12 +0100 |
commit | 6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch) | |
tree | 5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops | |
parent | 9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff) |
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table
and "AsyncRefCell".
Old implementation of resource table was completely
removed and all code referencing it was updated to use
new system.
Diffstat (limited to 'runtime/ops')
-rw-r--r-- | runtime/ops/fs.rs | 20 | ||||
-rw-r--r-- | runtime/ops/fs_events.rs | 58 | ||||
-rw-r--r-- | runtime/ops/io.rs | 633 | ||||
-rw-r--r-- | runtime/ops/net.rs | 255 | ||||
-rw-r--r-- | runtime/ops/net_unix.rs | 106 | ||||
-rw-r--r-- | runtime/ops/plugin.rs | 12 | ||||
-rw-r--r-- | runtime/ops/process.rs | 74 | ||||
-rw-r--r-- | runtime/ops/signal.rs | 74 | ||||
-rw-r--r-- | runtime/ops/tls.rs | 245 | ||||
-rw-r--r-- | runtime/ops/tty.rs | 127 | ||||
-rw-r--r-- | runtime/ops/websocket.rs | 173 |
11 files changed, 936 insertions, 841 deletions
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs index 865c5bcca..d6d7d7e78 100644 --- a/runtime/ops/fs.rs +++ b/runtime/ops/fs.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Some deserializer fields are only used on Unix and Windows build fails without it use super::io::std_file_resource; -use super::io::{FileMetadata, StreamResource, StreamResourceHolder}; +use super::io::StreamResource; use crate::fs_util::canonicalize_path; use crate::permissions::Permissions; use deno_core::error::custom_error; @@ -185,13 +185,8 @@ fn op_open_sync( let (path, open_options) = open_helper(state, args)?; let std_file = open_options.open(path)?; let tokio_file = tokio::fs::File::from_std(std_file); - let rid = state.resource_table.add( - "fsFile", - Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some(( - tokio_file, - FileMetadata::default(), - ))))), - ); + let resource = StreamResource::fs_file(tokio_file); + let rid = state.resource_table.add(resource); Ok(json!(rid)) } @@ -204,13 +199,8 @@ async fn op_open_async( let tokio_file = tokio::fs::OpenOptions::from(open_options) .open(path) .await?; - let rid = state.borrow_mut().resource_table.add( - "fsFile", - Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some(( - tokio_file, - FileMetadata::default(), - ))))), - ); + let resource = StreamResource::fs_file(tokio_file); + let rid = state.borrow_mut().resource_table.add(resource); Ok(json!(rid)) } diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs index 4832c915c..38661e1d4 100644 --- a/runtime/ops/fs_events.rs +++ b/runtime/ops/fs_events.rs @@ -3,12 +3,16 @@ use crate::permissions::Permissions; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use notify::event::Event as NotifyEvent; use notify::Error as NotifyError; @@ -18,6 +22,7 @@ use notify::RecursiveMode; use notify::Watcher; use serde::Deserialize; use serde::Serialize; +use std::borrow::Cow; use std::cell::RefCell; use std::convert::From; use std::path::PathBuf; @@ -32,7 +37,18 @@ pub fn init(rt: &mut deno_core::JsRuntime) { struct FsEventsResource { #[allow(unused)] watcher: RecommendedWatcher, - receiver: mpsc::Receiver<Result<FsEvent, AnyError>>, + receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>, + cancel: CancelHandle, +} + +impl Resource for FsEventsResource { + fn name(&self) -> Cow<str> { + "fsEvents".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } /// Represents a file system event. @@ -99,8 +115,12 @@ fn op_fs_events_open( .check_read(&PathBuf::from(path))?; watcher.watch(path, recursive_mode)?; } - let resource = FsEventsResource { watcher, receiver }; - let rid = state.resource_table.add("fsEvents", Box::new(resource)); + let resource = FsEventsResource { + watcher, + receiver: AsyncRefCell::new(receiver), + cancel: Default::default(), + }; + let rid = state.resource_table.add(resource); Ok(json!(rid)) } @@ -114,20 +134,18 @@ async fn op_fs_events_poll( rid: u32, } let PollArgs { rid } = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let watcher = state - .resource_table - .get_mut::<FsEventsResource>(rid) - .ok_or_else(bad_resource_id)?; - watcher - .receiver - .poll_recv(cx) - .map(|maybe_result| match maybe_result { - Some(Ok(value)) => Ok(json!({ "value": value, "done": false })), - Some(Err(err)) => Err(err), - None => Ok(json!({ "done": true })), - }) - }) - .await + + let resource = state + .borrow() + .resource_table + .get::<FsEventsResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let maybe_result = receiver.recv().or_cancel(cancel).await?; + match maybe_result { + Some(Ok(value)) => Ok(json!({ "value": value, "done": false })), + Some(Err(err)) => Err(err), + None => Ok(json!({ "done": true })), + } } diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 0f8af905a..de56f5b55 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -7,26 +7,29 @@ use deno_core::error::bad_resource_id; use deno_core::error::resource_unavailable; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; -use deno_core::futures::ready; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use std::borrow::Cow; use std::cell::RefCell; -use std::collections::HashMap; -use std::pin::Pin; use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::Context; -use std::task::Poll; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp; use tokio::net::TcpStream; use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; -#[cfg(not(windows))] +#[cfg(unix)] use std::os::unix::io::FromRawFd; #[cfg(windows)] @@ -94,26 +97,28 @@ pub fn init(rt: &mut JsRuntime) { } pub fn get_stdio() -> ( - Option<StreamResourceHolder>, - Option<StreamResourceHolder>, - Option<StreamResourceHolder>, + Option<StreamResource>, + Option<StreamResource>, + Option<StreamResource>, ) { - let stdin = get_stdio_stream(&STDIN_HANDLE); - let stdout = get_stdio_stream(&STDOUT_HANDLE); - let stderr = get_stdio_stream(&STDERR_HANDLE); + let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin"); + let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout"); + let stderr = get_stdio_stream(&STDERR_HANDLE, "stderr"); (stdin, stdout, stderr) } fn get_stdio_stream( handle: &Option<std::fs::File>, -) -> Option<StreamResourceHolder> { + name: &str, +) -> Option<StreamResource> { match handle { None => None, Some(file_handle) => match file_handle.try_clone() { - Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile( - Some((tokio::fs::File::from_std(clone), FileMetadata::default())), - ))), + Ok(clone) => { + let tokio_file = tokio::fs::File::from_std(clone); + Some(StreamResource::stdio(tokio_file, name)) + } Err(_e) => None, }, } @@ -137,100 +142,317 @@ pub struct FileMetadata { pub tty: TTYMetadata, } -pub struct StreamResourceHolder { - pub resource: StreamResource, - waker: HashMap<usize, futures::task::AtomicWaker>, - waker_counter: AtomicUsize, +#[derive(Debug)] +pub struct FullDuplexResource<R, W> { + rd: AsyncRefCell<R>, + wr: AsyncRefCell<W>, + // When a full-duplex resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures should be attached to this cancel handle. + cancel_handle: CancelHandle, } -impl StreamResourceHolder { - pub fn new(resource: StreamResource) -> StreamResourceHolder { - StreamResourceHolder { - resource, - // Atleast one task is expecter for the resource - waker: HashMap::with_capacity(1), - // Tracks wakers Ids - waker_counter: AtomicUsize::new(0), +impl<R: 'static, W: 'static> FullDuplexResource<R, W> { + pub fn new((rd, wr): (R, W)) -> Self { + Self { + rd: rd.into(), + wr: wr.into(), + cancel_handle: Default::default(), } } -} -impl Drop for StreamResourceHolder { - fn drop(&mut self) { - self.wake_tasks(); + pub fn into_inner(self) -> (R, W) { + (self.rd.into_inner(), self.wr.into_inner()) + } + + pub fn rd_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<R> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + pub fn wr_borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<W> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } + + pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() } } -impl StreamResourceHolder { - pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> { - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - // Its OK if it overflows - let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed); - self.waker.insert(task_waker_id, waker); - Ok(task_waker_id) +impl<R, W> FullDuplexResource<R, W> +where + R: AsyncRead + Unpin + 'static, + W: AsyncWrite + Unpin + 'static, +{ + async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + let mut rd = self.rd_borrow_mut().await; + let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) } - pub fn wake_tasks(&mut self) { - for waker in self.waker.values() { - waker.wake(); - } + async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let mut wr = self.wr_borrow_mut().await; + let nwritten = wr.write(buf).await?; + Ok(nwritten) } +} - pub fn untrack_task(&mut self, task_waker_id: usize) { - self.waker.remove(&task_waker_id); +pub type TcpStreamResource = + FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>; + +impl Resource for TcpStreamResource { + fn name(&self) -> Cow<str> { + "tcpStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); } } -pub enum StreamResource { - FsFile(Option<(tokio::fs::File, FileMetadata)>), - TcpStream(Option<tokio::net::TcpStream>), - #[cfg(not(windows))] - UnixStream(tokio::net::UnixStream), - ServerTlsStream(Box<ServerTlsStream<TcpStream>>), - ClientTlsStream(Box<ClientTlsStream<TcpStream>>), - ChildStdin(tokio::process::ChildStdin), - ChildStdout(tokio::process::ChildStdout), - ChildStderr(tokio::process::ChildStderr), +#[derive(Default)] +pub struct StreamResource { + pub fs_file: + Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>, + + #[cfg(unix)] + pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>, + + child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>, + + child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>, + + child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>, + + client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>, + + server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>, + + cancel: CancelHandle, + name: String, +} + +impl std::fmt::Debug for StreamResource { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "StreamResource") + } } -trait UnpinAsyncRead: AsyncRead + Unpin {} -trait UnpinAsyncWrite: AsyncWrite + Unpin {} +impl StreamResource { + pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self { + Self { + fs_file: Some(AsyncRefCell::new(( + Some(fs_file), + Some(FileMetadata::default()), + ))), + name: name.to_string(), + ..Default::default() + } + } + + pub fn fs_file(fs_file: tokio::fs::File) -> Self { + Self { + fs_file: Some(AsyncRefCell::new(( + Some(fs_file), + Some(FileMetadata::default()), + ))), + name: "fsFile".to_string(), + ..Default::default() + } + } + + #[cfg(unix)] + pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self { + Self { + unix_stream: Some(AsyncRefCell::new(unix_stream)), + name: "unixStream".to_string(), + ..Default::default() + } + } + + pub fn child_stdout(child: tokio::process::ChildStdout) -> Self { + Self { + child_stdout: Some(AsyncRefCell::new(child)), + name: "childStdout".to_string(), + ..Default::default() + } + } -impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {} -impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {} + pub fn child_stderr(child: tokio::process::ChildStderr) -> Self { + Self { + child_stderr: Some(AsyncRefCell::new(child)), + name: "childStderr".to_string(), + ..Default::default() + } + } + + pub fn child_stdin(child: tokio::process::ChildStdin) -> Self { + Self { + child_stdin: Some(AsyncRefCell::new(child)), + name: "childStdin".to_string(), + ..Default::default() + } + } + + pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self { + Self { + client_tls_stream: Some(AsyncRefCell::new(stream)), + name: "clientTlsStream".to_string(), + ..Default::default() + } + } -/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait -/// but uses an `AnyError` error instead of `std::io:Error` -pub trait DenoAsyncRead { - fn poll_read( - &mut self, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<Result<usize, AnyError>>; + pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self { + Self { + server_tls_stream: Some(AsyncRefCell::new(stream)), + name: "serverTlsStream".to_string(), + ..Default::default() + } + } + + async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + // TODO(bartlomieju): in the future, it would be better for `StreamResource` + // to be an enum instead a struct with many `Option` fields, however I + // wasn't able to get it to work with `AsyncRefCell`s. + if self.fs_file.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.child_stdout.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?; + return Ok(nwritten); + } else if self.child_stdout.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut child_stdout = + RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = child_stdout.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } else if self.child_stderr.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut child_stderr = + RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = child_stderr.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } else if self.client_tls_stream.is_some() { + debug_assert!(self.server_tls_stream.is_none()); + let mut client_tls_stream = + RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } else if self.server_tls_stream.is_some() { + let mut server_tls_stream = + RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } + + #[cfg(unix)] + if self.unix_stream.is_some() { + let mut unix_stream = + RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let cancel = RcRef::map(self, |r| &r.cancel); + let nread = unix_stream.read(buf).try_or_cancel(cancel).await?; + return Ok(nread); + } + + Err(bad_resource_id()) + } + + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + // TODO(bartlomieju): in the future, it would be better for `StreamResource` + // to be an enum instead a struct with many `Option` fields, however I + // wasn't able to get it to work with `AsyncRefCell`s. + if self.fs_file.is_some() { + debug_assert!(self.child_stdin.is_none()); + debug_assert!(self.child_stdout.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?; + (*fs_file).0.as_mut().unwrap().flush().await?; + return Ok(nwritten); + } else if self.child_stdin.is_some() { + debug_assert!(self.child_stdout.is_none()); + debug_assert!(self.child_stderr.is_none()); + debug_assert!(self.server_tls_stream.is_none()); + debug_assert!(self.client_tls_stream.is_none()); + let mut child_stdin = + RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = child_stdin.write(buf).await?; + child_stdin.flush().await?; + return Ok(nwritten); + } else if self.client_tls_stream.is_some() { + debug_assert!(self.server_tls_stream.is_none()); + let mut client_tls_stream = + RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = client_tls_stream.write(buf).await?; + client_tls_stream.flush().await?; + return Ok(nwritten); + } else if self.server_tls_stream.is_some() { + let mut server_tls_stream = + RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = server_tls_stream.write(buf).await?; + server_tls_stream.flush().await?; + return Ok(nwritten); + } + + #[cfg(unix)] + if self.unix_stream.is_some() { + let mut unix_stream = + RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap()) + .borrow_mut() + .await; + let nwritten = unix_stream.write(buf).await?; + unix_stream.flush().await?; + return Ok(nwritten); + } + + Err(bad_resource_id()) + } } -impl DenoAsyncRead for StreamResource { - fn poll_read( - &mut self, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<Result<usize, AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncRead = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Ready(Err(resource_unavailable())), - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdout(f) => f, - ChildStderr(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - let v = ready!(Pin::new(f).poll_read(cx, buf))?; - Ok(v).into() +impl Resource for StreamResource { + fn name(&self) -> Cow<str> { + self.name.clone().into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel() } } @@ -263,92 +485,26 @@ pub fn op_read( }) } else { let mut zero_copy = zero_copy[0].clone(); - MinimalOp::Async( - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let resource_holder = state + MinimalOp::Async({ + async move { + let resource = state + .borrow() .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) + .get_any(rid as u32) .ok_or_else(bad_resource_id)?; - - let mut task_tracker_id: Option<usize> = None; - let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy) + let nread = if let Some(stream) = + resource.downcast_rc::<TcpStreamResource>() { - Poll::Ready(t) => { - if let Some(id) = task_tracker_id { - resource_holder.untrack_task(id); - } - t - } - Poll::Pending => { - task_tracker_id.replace(resource_holder.track_task(cx)?); - return Poll::Pending; - } - }?; - Poll::Ready(Ok(nread as i32)) - }) - .boxed_local(), - ) - } -} - -/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait -/// but uses an `AnyError` error instead of `std::io:Error` -pub trait DenoAsyncWrite { - fn poll_write( - &mut self, - cx: &mut Context, - buf: &[u8], - ) -> Poll<Result<usize, AnyError>>; - - fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; - - fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; -} - -impl DenoAsyncWrite for StreamResource { - fn poll_write( - &mut self, - cx: &mut Context, - buf: &[u8], - ) -> Poll<Result<usize, AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncWrite = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Pending, - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdin(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - - let v = ready!(Pin::new(f).poll_write(cx, buf))?; - Ok(v).into() - } - - fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncWrite = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Pending, - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdin(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - - ready!(Pin::new(f).poll_flush(cx))?; - Ok(()).into() - } - - fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> { - unimplemented!() + stream.read(&mut zero_copy).await? + } else if let Some(stream) = resource.downcast_rc::<StreamResource>() { + stream.clone().read(&mut zero_copy).await? + } else { + return Err(bad_resource_id()); + }; + Ok(nread as i32) + } + .boxed_local() + }) } } @@ -381,93 +537,76 @@ pub fn op_write( }) } else { let zero_copy = zero_copy[0].clone(); - MinimalOp::Async( + MinimalOp::Async({ async move { - let nwritten = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - resource_holder.resource.poll_write(cx, &zero_copy) - }) - .await?; - - // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 - // and the reasons for the need to explicitly flush are not fully known. - // Figure out why it's needed and preferably remove it. - // https://github.com/denoland/deno/issues/3565 - poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - resource_holder.resource.poll_flush(cx) - }) - .await?; - + let resource = state + .borrow() + .resource_table + .get_any(rid as u32) + .ok_or_else(bad_resource_id)?; + let nwritten = if let Some(stream) = + resource.downcast_rc::<TcpStreamResource>() + { + stream.write(&zero_copy).await? + } else if let Some(stream) = resource.downcast_rc::<StreamResource>() { + stream.clone().write(&zero_copy).await? + } else { + return Err(bad_resource_id()); + }; Ok(nwritten as i32) } - .boxed_local(), - ) + .boxed_local() + }) } } -/// Helper function for operating on a std::fs::File stored in the resource table. -/// -/// We store file system file resources as tokio::fs::File, so this is a little -/// utility function that gets a std::fs:File when you need to do blocking -/// operations. -/// -/// Returns ErrorKind::Busy if the resource is being used by another op. pub fn std_file_resource<F, T>( state: &mut OpState, rid: u32, mut f: F, ) -> Result<T, AnyError> where - F: FnMut( - Result<&mut std::fs::File, &mut StreamResource>, - ) -> Result<T, AnyError>, + F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>, { // First we look up the rid in the resource table. - let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid); - if let Some(ref mut resource_holder) = r { - // Sync write only works for FsFile. It doesn't make sense to do this - // for non-blocking sockets. So we error out if not FsFile. - match &mut resource_holder.resource { - StreamResource::FsFile(option_file_metadata) => { - // The object in the resource table is a tokio::fs::File - but in - // order to do a blocking write on it, we must turn it into a - // std::fs::File. Hopefully this code compiles down to nothing. - if let Some((tokio_file, metadata)) = option_file_metadata.take() { - match tokio_file.try_into_std() { - Ok(mut std_file) => { - let result = f(Ok(&mut std_file)); - // Turn the std_file handle back into a tokio file, put it back - // in the resource table. - let tokio_file = tokio::fs::File::from_std(std_file); - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - // return the result. - result - } - Err(tokio_file) => { - // This function will return an error containing the file if - // some operation is in-flight. - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - Err(resource_unavailable()) - } - } - } else { - Err(resource_unavailable()) - } + let resource = state + .resource_table + .get::<StreamResource>(rid) + .ok_or_else(bad_resource_id)?; + + // Sync write only works for FsFile. It doesn't make sense to do this + // for non-blocking sockets. So we error out if not FsFile. + if resource.fs_file.is_none() { + return f(Err(())); + } + + // The object in the resource table is a tokio::fs::File - but in + // order to do a blocking write on it, we must turn it into a + // std::fs::File. Hopefully this code compiles down to nothing. + + let fs_file_resource = + RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut(); + + if let Some(mut fs_file) = fs_file_resource { + let tokio_file = fs_file.0.take().unwrap(); + match tokio_file.try_into_std() { + Ok(mut std_file) => { + let result = f(Ok(&mut std_file)); + // Turn the std_file handle back into a tokio file, put it back + // in the resource table. + let tokio_file = tokio::fs::File::from_std(std_file); + fs_file.0 = Some(tokio_file); + // return the result. + result + } + Err(tokio_file) => { + // This function will return an error containing the file if + // some operation is in-flight. + fs_file.0 = Some(tokio_file); + Err(resource_unavailable()) } - _ => f(Err(&mut resource_holder.resource)), } } else { - Err(bad_resource_id()) + Err(resource_unavailable()) } } diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index 8770ef103..a4bda585b 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; +use crate::ops::io::FullDuplexResource; +use crate::ops::io::TcpStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; @@ -11,21 +11,24 @@ use deno_core::error::custom_error; use deno_core::error::generic_error; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::net::Shutdown; use std::net::SocketAddr; use std::rc::Rc; -use std::task::Context; -use std::task::Poll; +use tokio::net::udp; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -33,12 +36,14 @@ use tokio::net::UdpSocket; #[cfg(unix)] use super::net_unix; #[cfg(unix)] +use crate::ops::io::StreamResource; +#[cfg(unix)] use std::path::Path; pub fn init(rt: &mut deno_core::JsRuntime) { super::reg_json_async(rt, "op_accept", op_accept); super::reg_json_async(rt, "op_connect", op_connect); - super::reg_json_sync(rt, "op_shutdown", op_shutdown); + super::reg_json_async(rt, "op_shutdown", op_shutdown); super::reg_json_sync(rt, "op_listen", op_listen); super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); super::reg_json_async(rt, "op_datagram_send", op_datagram_send); @@ -57,39 +62,31 @@ async fn accept_tcp( ) -> Result<Value, AnyError> { let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TcpListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(AnyError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) + let resource = state + .borrow() + .resource_table + .get::<TcpListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; + })?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); + let rid = state + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); Ok(json!({ "rid": rid, "localAddr": { @@ -138,18 +135,17 @@ async fn receive_udp( let rid = args.rid as u32; - let receive_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = &mut resource.socket; - socket - .poll_recv_from(cx, &mut zero_copy) - .map_err(AnyError::from) - }); - let (size, remote_addr) = receive_fut.await?; + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let (size, remote_addr) = resource + .rd_borrow_mut() + .await + .recv_from(&mut zero_copy) + .try_or_cancel(resource.cancel_handle()) + .await?; Ok(json!({ "size": size, "remoteAddr": { @@ -207,19 +203,18 @@ async fn op_datagram_send( .check_net(&args.hostname, args.port)?; } let addr = resolve_addr(&args.hostname, args.port).await?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid as u32) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - resource - .socket - .poll_send_to(cx, &zero_copy, &addr) - .map_ok(|byte_length| json!(byte_length)) - .map_err(AnyError::from) - }) - .await + + let resource = state + .borrow_mut() + .resource_table + .get::<UdpSocketResource>(rid as u32) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let byte_length = resource + .wr_borrow_mut() + .await + .send_to(&zero_copy, &addr) + .await?; + Ok(json!(byte_length)) } #[cfg(unix)] SendArgs { @@ -232,18 +227,17 @@ async fn op_datagram_send( let s = state.borrow(); s.borrow::<Permissions>().check_write(&address_path)?; } - let mut state = state.borrow_mut(); let resource = state + .borrow() .resource_table - .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .get::<net_unix::UnixDatagramResource>(rid as u32) .ok_or_else(|| { custom_error("NotConnected", "Socket has been closed") })?; - let socket = &mut resource.socket; - let byte_length = socket - .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap()) - .await?; - + let mut socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let byte_length = socket.send_to(&zero_copy, address_path).await?; Ok(json!(byte_length)) } _ => Err(type_error("Wrong argument format!")), @@ -279,12 +273,9 @@ async fn op_connect( let remote_addr = tcp_stream.peer_addr()?; let mut state_ = state.borrow_mut(); - let rid = state_.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); + let rid = state_ + .resource_table + .add(TcpStreamResource::new(tcp_stream.into_split())); Ok(json!({ "rid": rid, "localAddr": { @@ -317,12 +308,8 @@ async fn op_connect( let remote_addr = unix_stream.peer_addr()?; let mut state_ = state.borrow_mut(); - let rid = state_.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); + let resource = StreamResource::unix_stream(unix_stream); + let rid = state_.resource_table.add(resource); Ok(json!({ "rid": rid, "localAddr": { @@ -345,12 +332,12 @@ struct ShutdownArgs { how: i32, } -fn op_shutdown( - state: &mut OpState, +async fn op_shutdown( + state: Rc<RefCell<OpState>>, args: Value, - _zero_copy: &mut [ZeroCopyBuf], + _zero_copy: BufVec, ) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.shutdown"); + super::check_unstable2(&state, "Deno.shutdown"); let args: ShutdownArgs = serde_json::from_value(args)?; @@ -358,80 +345,61 @@ fn op_shutdown( let how = args.how; let shutdown_mode = match how { - 0 => Shutdown::Read, + 0 => Shutdown::Read, // TODO: nonsense, remove me. 1 => Shutdown::Write, _ => unimplemented!(), }; - let resource_holder = state + let resource = state + .borrow() .resource_table - .get_mut::<StreamResourceHolder>(rid) + .get_any(rid) .ok_or_else(bad_resource_id)?; - match resource_holder.resource { - StreamResource::TcpStream(Some(ref mut stream)) => { - TcpStream::shutdown(stream, shutdown_mode)?; - } - #[cfg(unix)] - StreamResource::UnixStream(ref mut stream) => { - net_unix::UnixStream::shutdown(stream, shutdown_mode)?; + if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() { + let wr = stream.wr_borrow_mut().await; + TcpStream::shutdown((*wr).as_ref(), shutdown_mode)?; + return Ok(json!({})); + } + + #[cfg(unix)] + if let Some(stream) = resource.downcast_rc::<StreamResource>() { + if stream.unix_stream.is_some() { + let wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) + .borrow_mut() + .await; + net_unix::UnixStream::shutdown(&*wr, shutdown_mode)?; + return Ok(json!({})); } - _ => return Err(bad_resource_id()), } - Ok(json!({})) + Err(bad_resource_id()) } -#[allow(dead_code)] struct TcpListenerResource { - listener: TcpListener, - waker: Option<futures::task::AtomicWaker>, - local_addr: SocketAddr, + listener: AsyncRefCell<TcpListener>, + cancel: CancelHandle, } -impl Drop for TcpListenerResource { - fn drop(&mut self) { - self.wake_task(); +impl Resource for TcpListenerResource { + fn name(&self) -> Cow<str> { + "tcpListener".into() } -} - -impl TcpListenerResource { - /// Track the current task so future awaiting for connection - /// can be notified when listener is closed. - /// - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.waker.is_some() { - return Err(custom_error("Busy", "Another accept task is ongoing")); - } - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - self.waker.replace(waker); - Ok(()) + fn close(self: Rc<Self>) { + self.cancel.cancel(); } +} - /// Notifies a task when listener is closed so accept future can resolve. - pub fn wake_task(&mut self) { - if let Some(waker) = self.waker.as_ref() { - waker.wake(); - } - } +type UdpSocketResource = FullDuplexResource<udp::RecvHalf, udp::SendHalf>; - /// Stop tracking a task. - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - if self.waker.is_some() { - self.waker.take(); - } +impl Resource for UdpSocketResource { + fn name(&self) -> Cow<str> { + "udpSocket".into() } -} -struct UdpSocketResource { - socket: UdpSocket, + fn close(self: Rc<Self>) { + self.cancel_read_ops() + } } #[derive(Deserialize)] @@ -463,13 +431,10 @@ fn listen_tcp( let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let listener_resource = TcpListenerResource { - listener, - waker: None, - local_addr, + listener: AsyncRefCell::new(listener), + cancel: Default::default(), }; - let rid = state - .resource_table - .add("tcpListener", Box::new(listener_resource)); + let rid = state.resource_table.add(listener_resource); Ok((rid, local_addr)) } @@ -481,10 +446,8 @@ fn listen_udp( let std_socket = std::net::UdpSocket::bind(&addr)?; let socket = UdpSocket::from_std(std_socket)?; let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource { socket }; - let rid = state - .resource_table - .add("udpSocket", Box::new(socket_resource)); + let socket_resource = UdpSocketResource::new(socket.split()); + let rid = state.resource_table.add(socket_resource); Ok((rid, local_addr)) } diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs index 4c416a5a4..23981a7f1 100644 --- a/runtime/ops/net_unix.rs +++ b/runtime/ops/net_unix.rs @@ -1,34 +1,59 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; use crate::ops::net::AcceptArgs; use crate::ops::net::ReceiveArgs; use deno_core::error::bad_resource; +use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::fs::remove_file; use std::os::unix; use std::path::Path; use std::rc::Rc; -use std::task::Poll; use tokio::net::UnixDatagram; use tokio::net::UnixListener; pub use tokio::net::UnixStream; struct UnixListenerResource { - listener: UnixListener, + listener: AsyncRefCell<UnixListener>, + cancel: CancelHandle, +} + +impl Resource for UnixListenerResource { + fn name(&self) -> Cow<str> { + "unixListener".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } pub struct UnixDatagramResource { - pub socket: UnixDatagram, - pub local_addr: unix::net::SocketAddr, + pub socket: AsyncRefCell<UnixDatagram>, + pub cancel: CancelHandle, +} + +impl Resource for UnixDatagramResource { + fn name(&self) -> Cow<str> { + "unixDatagram".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } #[derive(Deserialize)] @@ -43,38 +68,23 @@ pub(crate) async fn accept_unix( ) -> Result<Value, AnyError> { let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<UnixListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - use deno_core::futures::StreamExt; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(stream)) => { - //listener_resource.untrack_task(); - Poll::Ready(stream) - } - Poll::Ready(None) => todo!(), - Poll::Pending => { - //listener_resource.track_task(cx)?; - Poll::Pending - } - } - .map_err(AnyError::from) - }); - let unix_stream = accept_fut.await?; + let resource = state + .borrow() + .resource_table + .get::<UnixListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (unix_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; + let resource = StreamResource::unix_stream(unix_stream); let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); + let rid = state.resource_table.add(resource); Ok(json!({ "rid": rid, "localAddr": { @@ -98,12 +108,17 @@ pub(crate) async fn receive_unix_packet( let rid = args.rid as u32; let mut buf = bufs.into_iter().next().unwrap(); - let mut state = state.borrow_mut(); let resource = state + .borrow() .resource_table - .get_mut::<UnixDatagramResource>(rid) + .get::<UnixDatagramResource>(rid) .ok_or_else(|| bad_resource("Socket has been closed"))?; - let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; + let mut socket = RcRef::map(&resource, |r| &r.socket) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (size, remote_addr) = + socket.recv_from(&mut buf).try_or_cancel(cancel).await?; Ok(json!({ "size": size, "remoteAddr": { @@ -122,10 +137,11 @@ pub fn listen_unix( } let listener = UnixListener::bind(&addr)?; let local_addr = listener.local_addr()?; - let listener_resource = UnixListenerResource { listener }; - let rid = state - .resource_table - .add("unixListener", Box::new(listener_resource)); + let listener_resource = UnixListenerResource { + listener: AsyncRefCell::new(listener), + cancel: Default::default(), + }; + let rid = state.resource_table.add(listener_resource); Ok((rid, local_addr)) } @@ -140,12 +156,10 @@ pub fn listen_unix_packet( let socket = UnixDatagram::bind(&addr)?; let local_addr = socket.local_addr()?; let datagram_resource = UnixDatagramResource { - socket, - local_addr: local_addr.clone(), + socket: AsyncRefCell::new(socket), + cancel: Default::default(), }; - let rid = state - .resource_table - .add("unixDatagram", Box::new(datagram_resource)); + let rid = state.resource_table.add(datagram_resource); Ok((rid, local_addr)) } diff --git a/runtime/ops/plugin.rs b/runtime/ops/plugin.rs index 1f3669b6f..953d6f7d2 100644 --- a/runtime/ops/plugin.rs +++ b/runtime/ops/plugin.rs @@ -14,9 +14,11 @@ use deno_core::Op; use deno_core::OpAsyncFuture; use deno_core::OpId; use deno_core::OpState; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use dlopen::symbor::Library; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::path::PathBuf; use std::pin::Pin; @@ -53,9 +55,7 @@ pub fn op_open_plugin( let rid; let deno_plugin_init; { - rid = state - .resource_table - .add("plugin", Box::new(plugin_resource)); + rid = state.resource_table.add(plugin_resource); deno_plugin_init = *unsafe { state .resource_table @@ -77,6 +77,12 @@ struct PluginResource { lib: Rc<Library>, } +impl Resource for PluginResource { + fn name(&self) -> Cow<str> { + "plugin".into() + } +} + impl PluginResource { fn new(lib: &Rc<Library>) -> Self { Self { lib: lib.clone() } diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 67b3d0761..b46627e21 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -1,19 +1,22 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::io::{std_file_resource, StreamResource, StreamResourceHolder}; +use super::io::{std_file_resource, StreamResource}; use crate::permissions::Permissions; use deno_core::error::bad_resource_id; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::future::FutureExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; use deno_core::BufVec; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::rc::Rc; use tokio::process::Command; @@ -61,7 +64,19 @@ struct RunArgs { } struct ChildResource { - child: tokio::process::Child, + child: AsyncRefCell<tokio::process::Child>, +} + +impl Resource for ChildResource { + fn name(&self) -> Cow<str> { + "child".into() + } +} + +impl ChildResource { + fn borrow_mut(self: Rc<Self>) -> AsyncMutFuture<tokio::process::Child> { + RcRef::map(self, |r| &r.child).borrow_mut() + } } fn op_run( @@ -117,12 +132,9 @@ fn op_run( let stdin_rid = match child.stdin.take() { Some(child_stdin) => { - let rid = state.resource_table.add( - "childStdin", - Box::new(StreamResourceHolder::new(StreamResource::ChildStdin( - child_stdin, - ))), - ); + let rid = state + .resource_table + .add(StreamResource::child_stdin(child_stdin)); Some(rid) } None => None, @@ -130,12 +142,9 @@ fn op_run( let stdout_rid = match child.stdout.take() { Some(child_stdout) => { - let rid = state.resource_table.add( - "childStdout", - Box::new(StreamResourceHolder::new(StreamResource::ChildStdout( - child_stdout, - ))), - ); + let rid = state + .resource_table + .add(StreamResource::child_stdout(child_stdout)); Some(rid) } None => None, @@ -143,19 +152,18 @@ fn op_run( let stderr_rid = match child.stderr.take() { Some(child_stderr) => { - let rid = state.resource_table.add( - "childStderr", - Box::new(StreamResourceHolder::new(StreamResource::ChildStderr( - child_stderr, - ))), - ); + let rid = state + .resource_table + .add(StreamResource::child_stderr(child_stderr)); Some(rid) } None => None, }; - let child_resource = ChildResource { child }; - let child_rid = state.resource_table.add("child", Box::new(child_resource)); + let child_resource = ChildResource { + child: AsyncRefCell::new(child), + }; + let child_rid = state.resource_table.add(child_resource); Ok(json!({ "rid": child_rid, @@ -185,17 +193,13 @@ async fn op_run_status( s.borrow::<Permissions>().check_run()?; } - let run_status = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let child_resource = state - .resource_table - .get_mut::<ChildResource>(rid) - .ok_or_else(bad_resource_id)?; - let child = &mut child_resource.child; - child.poll_unpin(cx).map_err(AnyError::from) - }) - .await?; - + let resource = state + .borrow_mut() + .resource_table + .get::<ChildResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut child = resource.borrow_mut().await; + let run_status = (&mut *child).await?; let code = run_status.code(); #[cfg(unix)] diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs index be6bc0a35..b3891792c 100644 --- a/runtime/ops/signal.rs +++ b/runtime/ops/signal.rs @@ -11,15 +11,23 @@ use std::rc::Rc; #[cfg(unix)] use deno_core::error::bad_resource_id; #[cfg(unix)] -use deno_core::futures::future::poll_fn; -#[cfg(unix)] use deno_core::serde_json; #[cfg(unix)] use deno_core::serde_json::json; #[cfg(unix)] +use deno_core::AsyncRefCell; +#[cfg(unix)] +use deno_core::CancelFuture; +#[cfg(unix)] +use deno_core::CancelHandle; +#[cfg(unix)] +use deno_core::RcRef; +#[cfg(unix)] +use deno_core::Resource; +#[cfg(unix)] use serde::Deserialize; #[cfg(unix)] -use std::task::Waker; +use std::borrow::Cow; #[cfg(unix)] use tokio::signal::unix::{signal, Signal, SignalKind}; @@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) { #[cfg(unix)] /// The resource for signal stream. /// The second element is the waker of polling future. -pub struct SignalStreamResource(pub Signal, pub Option<Waker>); +struct SignalStreamResource { + signal: AsyncRefCell<Signal>, + cancel: CancelHandle, +} + +#[cfg(unix)] +impl Resource for SignalStreamResource { + fn name(&self) -> Cow<str> { + "signal".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} #[cfg(unix)] #[derive(Deserialize)] @@ -54,13 +76,13 @@ fn op_signal_bind( ) -> Result<Value, AnyError> { super::check_unstable(state, "Deno.signal"); let args: BindSignalArgs = serde_json::from_value(args)?; - let rid = state.resource_table.add( - "signal", - Box::new(SignalStreamResource( + let resource = SignalStreamResource { + signal: AsyncRefCell::new( signal(SignalKind::from_raw(args.signo)).expect(""), - None, - )), - ); + ), + cancel: Default::default(), + }; + let rid = state.resource_table.add(resource); Ok(json!({ "rid": rid, })) @@ -76,18 +98,18 @@ async fn op_signal_poll( let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let future = poll_fn(move |cx| { - let mut state = state.borrow_mut(); - if let Some(mut signal) = - state.resource_table.get_mut::<SignalStreamResource>(rid) - { - signal.1 = Some(cx.waker().clone()); - return signal.0.poll_recv(cx); - } - std::task::Poll::Ready(None) - }); - let result = future.await; - Ok(json!({ "done": result.is_none() })) + let resource = state + .borrow_mut() + .resource_table + .get::<SignalStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let cancel = RcRef::map(&resource, |r| &r.cancel); + let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await; + + match signal.recv().or_cancel(cancel).await { + Ok(result) => Ok(json!({ "done": result.is_none() })), + Err(_) => Ok(json!({ "done": true })), + } } #[cfg(unix)] @@ -99,14 +121,6 @@ pub fn op_signal_unbind( super::check_unstable(state, "Deno.signal"); let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let resource = state.resource_table.get_mut::<SignalStreamResource>(rid); - if let Some(signal) = resource { - if let Some(waker) = &signal.1 { - // Wakes up the pending poll if exists. - // This prevents the poll future from getting stuck forever. - waker.clone().wake(); - } - } state .resource_table .close(rid) diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs index b59650ab0..0630747ed 100644 --- a/runtime/ops/tls.rs +++ b/runtime/ops/tls.rs @@ -1,6 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::io::{StreamResource, StreamResourceHolder}; +use super::io::StreamResource; +use super::io::TcpStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; @@ -8,25 +9,26 @@ use deno_core::error::bad_resource; use deno_core::error::bad_resource_id; use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use serde::Deserialize; +use std::borrow::Cow; use std::cell::RefCell; use std::convert::From; use std::fs::File; use std::io::BufReader; -use std::net::SocketAddr; use std::path::Path; use std::rc::Rc; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; @@ -85,60 +87,53 @@ async fn op_start_tls( permissions.check_read(Path::new(&path))?; } } - let mut resource_holder = { - let mut state_ = state.borrow_mut(); - match state_.resource_table.remove::<StreamResourceHolder>(rid) { - Some(resource) => *resource, - None => return Err(bad_resource_id()), - } - }; - if let StreamResource::TcpStream(ref mut tcp_stream) = - resource_holder.resource - { - let tcp_stream = tcp_stream.take().unwrap(); - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } + let resource_rc = state + .borrow_mut() + .resource_table + .take::<TcpStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let resource = Rc::try_unwrap(resource_rc) + .expect("Only a single use of this resource should happen"); + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; - - let rid = { - let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "clientTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( - Box::new(tls_stream), - ))), - ) - }; - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": "tcp", - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "tcp", - } - })) - } else { - Err(bad_resource_id()) + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; + + let rid = { + let mut state_ = state.borrow_mut(); + state_ + .resource_table + .add(StreamResource::client_tls_stream(tls_stream)) + }; + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": "tcp", + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "tcp", + } + })) } async fn op_connect_tls( @@ -180,12 +175,9 @@ async fn op_connect_tls( let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; let rid = { let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "clientTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( - Box::new(tls_stream), - ))), - ) + state_ + .resource_table + .add(StreamResource::client_tls_stream(tls_stream)) }; Ok(json!({ "rid": rid, @@ -256,51 +248,19 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { Ok(keys) } -#[allow(dead_code)] pub struct TlsListenerResource { - listener: TcpListener, + listener: AsyncRefCell<TcpListener>, tls_acceptor: TlsAcceptor, - waker: Option<futures::task::AtomicWaker>, - local_addr: SocketAddr, + cancel: CancelHandle, } -impl Drop for TlsListenerResource { - fn drop(&mut self) { - self.wake_task(); +impl Resource for TlsListenerResource { + fn name(&self) -> Cow<str> { + "tlsListener".into() } -} - -impl TlsListenerResource { - /// Track the current task so future awaiting for connection - /// can be notified when listener is closed. - /// - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.waker.is_some() { - return Err(custom_error("Busy", "Another accept task is ongoing")); - } - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - self.waker.replace(waker); - Ok(()) - } - - /// Notifies a task when listener is closed so accept future can resolve. - pub fn wake_task(&mut self) { - if let Some(waker) = self.waker.as_ref() { - waker.wake(); - } - } - - /// Stop tracking a task. - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - self.waker.take(); + fn close(self: Rc<Self>) { + self.cancel.cancel(); } } @@ -340,15 +300,12 @@ fn op_listen_tls( let listener = TcpListener::from_std(std_listener)?; let local_addr = listener.local_addr()?; let tls_listener_resource = TlsListenerResource { - listener, + listener: AsyncRefCell::new(listener), tls_acceptor, - waker: None, - local_addr, + cancel: Default::default(), }; - let rid = state - .resource_table - .add("tlsListener", Box::new(tls_listener_resource)); + let rid = state.resource_table.add(tls_listener_resource); Ok(json!({ "rid": rid, @@ -372,50 +329,46 @@ async fn op_accept_tls( ) -> Result<Value, AnyError> { let args: AcceptTlsArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TlsListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(AnyError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) + + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let mut listener = RcRef::map(&resource, |r| &r.listener) + .try_borrow_mut() + .ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?; + let cancel = RcRef::map(resource, |r| &r.cancel); + let (tcp_stream, _socket_addr) = + listener.accept().try_or_cancel(cancel).await.map_err(|e| { + // FIXME(bartlomieju): compatibility with current JS implementation + if let std::io::ErrorKind::Interrupted = e.kind() { + bad_resource("Listener has been closed") + } else { + e.into() } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; + })?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let tls_acceptor = { - let state_ = state.borrow(); - let resource = state_ - .resource_table - .get::<TlsListenerResource>(rid) - .ok_or_else(bad_resource_id) - .expect("Can't find tls listener"); - resource.tls_acceptor.clone() - }; - let tls_stream = tls_acceptor.accept(tcp_stream).await?; + let resource = state + .borrow() + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let cancel = RcRef::map(&resource, |r| &r.cancel); + let tls_acceptor = resource.tls_acceptor.clone(); + let tls_stream = tls_acceptor + .accept(tcp_stream) + .try_or_cancel(cancel) + .await?; + let rid = { let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "serverTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream( - Box::new(tls_stream), - ))), - ) + state_ + .resource_table + .add(StreamResource::server_tls_stream(tls_stream)) }; + Ok(json!({ "rid": rid, "localAddr": { diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index ad66bcf1a..05536b429 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -2,7 +2,6 @@ use super::io::std_file_resource; use super::io::StreamResource; -use super::io::StreamResourceHolder; use deno_core::error::bad_resource_id; use deno_core::error::not_supported; use deno_core::error::resource_unavailable; @@ -11,6 +10,7 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::OpState; +use deno_core::RcRef; use deno_core::ZeroCopyBuf; use serde::Deserialize; use serde::Serialize; @@ -88,48 +88,47 @@ fn op_set_raw( use winapi::shared::minwindef::FALSE; use winapi::um::{consoleapi, handleapi}; - let resource_holder = - state.resource_table.get_mut::<StreamResourceHolder>(rid); - if resource_holder.is_none() { - return Err(bad_resource_id()); - } + let resource = state + .resource_table + .get::<StreamResource>(rid) + .ok_or_else(bad_resource_id)?; + if cbreak { return Err(not_supported()); } - let resource_holder = resource_holder.unwrap(); - - // For now, only stdin. - let handle = match &mut resource_holder.resource { - StreamResource::FsFile(ref mut option_file_metadata) => { - if let Some((tokio_file, metadata)) = option_file_metadata.take() { - match tokio_file.try_into_std() { - Ok(std_file) => { - let raw_handle = std_file.as_raw_handle(); - // Turn the std_file handle back into a tokio file, put it back - // in the resource table. - let tokio_file = tokio::fs::File::from_std(std_file); - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - // return the result. - raw_handle - } - Err(tokio_file) => { - // This function will return an error containing the file if - // some operation is in-flight. - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - return Err(resource_unavailable()); - } - } - } else { - return Err(resource_unavailable()); + + if resource.fs_file.is_none() { + return Err(bad_resource_id()); + } + + let fs_file_resource = + RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut(); + + let handle_result = if let Some(mut fs_file) = fs_file_resource { + let tokio_file = fs_file.0.take().unwrap(); + match tokio_file.try_into_std() { + Ok(std_file) => { + let raw_handle = std_file.as_raw_handle(); + // Turn the std_file handle back into a tokio file, put it back + // in the resource table. + let tokio_file = tokio::fs::File::from_std(std_file); + fs_file.0 = Some(tokio_file); + // return the result. + Ok(raw_handle) + } + Err(tokio_file) => { + // This function will return an error containing the file if + // some operation is in-flight. + fs_file.0 = Some(tokio_file); + Err(resource_unavailable()) } } - _ => { - return Err(bad_resource_id()); - } + } else { + Err(resource_unavailable()) }; + let handle = handle_result?; + if handle == handleapi::INVALID_HANDLE_VALUE { return Err(Error::last_os_error().into()); } else if handle.is_null() { @@ -156,24 +155,31 @@ fn op_set_raw( { use std::os::unix::io::AsRawFd; - let resource_holder = - state.resource_table.get_mut::<StreamResourceHolder>(rid); - if resource_holder.is_none() { - return Err(bad_resource_id()); + let resource = state + .resource_table + .get::<StreamResource>(rid) + .ok_or_else(bad_resource_id)?; + + if resource.fs_file.is_none() { + return Err(not_supported()); } - if is_raw { - let (raw_fd, maybe_tty_mode) = - match &mut resource_holder.unwrap().resource { - StreamResource::FsFile(Some((f, ref mut metadata))) => { - (f.as_raw_fd(), &mut metadata.tty.mode) - } - StreamResource::FsFile(None) => return Err(resource_unavailable()), - _ => { - return Err(not_supported()); - } - }; + let maybe_fs_file_resource = + RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut(); + + if maybe_fs_file_resource.is_none() { + return Err(resource_unavailable()); + } + let mut fs_file_resource = maybe_fs_file_resource.unwrap(); + if fs_file_resource.0.is_none() { + return Err(resource_unavailable()); + } + + let raw_fd = fs_file_resource.0.as_ref().unwrap().as_raw_fd(); + let maybe_tty_mode = &mut fs_file_resource.1.as_mut().unwrap().tty.mode; + + if is_raw { if maybe_tty_mode.is_none() { // Save original mode. let original_mode = termios::tcgetattr(raw_fd)?; @@ -199,28 +205,14 @@ fn op_set_raw( raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1; raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0; termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; - Ok(json!({})) } else { // Try restore saved mode. - let (raw_fd, maybe_tty_mode) = - match &mut resource_holder.unwrap().resource { - StreamResource::FsFile(Some((f, ref mut metadata))) => { - (f.as_raw_fd(), &mut metadata.tty.mode) - } - StreamResource::FsFile(None) => { - return Err(resource_unavailable()); - } - _ => { - return Err(bad_resource_id()); - } - }; - if let Some(mode) = maybe_tty_mode.take() { termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; } - - Ok(json!({})) } + + Ok(json!({})) } } @@ -255,7 +247,6 @@ fn op_isatty( Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 }) } } - Err(StreamResource::FsFile(_)) => unreachable!(), _ => Ok(false), })?; Ok(json!(isatty)) diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs index a8c591a33..d805f307b 100644 --- a/runtime/ops/websocket.rs +++ b/runtime/ops/websocket.rs @@ -1,18 +1,23 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::permissions::Permissions; -use core::task::Poll; use deno_core::error::bad_resource_id; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; +use deno_core::futures::stream::SplitSink; +use deno_core::futures::stream::SplitStream; +use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; -use deno_core::futures::{ready, SinkExt}; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::url; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::{serde_json, ZeroCopyBuf}; use http::{Method, Request, Uri}; use serde::Deserialize; @@ -62,6 +67,22 @@ type MaybeTlsStream = StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; type WsStream = WebSocketStream<MaybeTlsStream>; +struct WsStreamResource { + tx: AsyncRefCell<SplitSink<WsStream, Message>>, + rx: AsyncRefCell<SplitStream<WsStream>>, + // When a `WsStreamResource` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, +} + +impl Resource for WsStreamResource { + fn name(&self) -> Cow<str> { + "webSocketStream".into() + } +} + +impl WsStreamResource {} #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -165,10 +186,14 @@ pub async fn op_ws_create( )) })?; + let (ws_tx, ws_rx) = stream.split(); + let resource = WsStreamResource { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + cancel: Default::default(), + }; let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add("webSocketStream", Box::new(stream)); + let rid = state.resource_table.add(resource); let protocol = match response.headers().get("Sec-WebSocket-Protocol") { Some(header) => header.to_str().unwrap(), @@ -202,30 +227,21 @@ pub async fn op_ws_send( ) -> Result<Value, AnyError> { let args: SendArgs = serde_json::from_value(args)?; - let mut maybe_msg = Some(match args.text { + let msg = match args.text { Some(text) => Message::Text(text), None => Message::Binary(bufs[0].to_vec()), - }); + }; let rid = args.rid; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - - Poll::Ready(Ok(json!({}))) - }) - .await + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + eprintln!("sent!"); + Ok(json!({})) } #[derive(Deserialize)] @@ -243,33 +259,22 @@ pub async fn op_ws_close( ) -> Result<Value, AnyError> { let args: CloseArgs = serde_json::from_value(args)?; let rid = args.rid; - let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { + let msg = Message::Close(args.code.map(|c| CloseFrame { code: CloseCode::from(c), reason: match args.reason { Some(reason) => Cow::from(reason), None => Default::default(), }, - }))); - - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - ready!(stream.poll_close_unpin(cx)).unwrap(); + })); - Poll::Ready(Ok(json!({}))) - }) - .await + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut tx = RcRef::map(&resource, |r| &r.tx).borrow_mut().await; + tx.send(msg).await?; + Ok(json!({})) } #[derive(Deserialize)] @@ -284,43 +289,41 @@ pub async fn op_ws_next_event( _bufs: BufVec, ) -> Result<Value, AnyError> { let args: NextEventArgs = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(args.rid) - .ok_or_else(bad_resource_id)?; - stream - .poll_next_unpin(cx) - .map(|val| { - match val { - Some(Ok(Message::Text(text))) => json!({ - "type": "string", - "data": text - }), - Some(Ok(Message::Binary(data))) => { - // TODO(ry): don't use json to send binary data. - json!({ - "type": "binary", - "data": data - }) - } - Some(Ok(Message::Close(Some(frame)))) => json!({ - "type": "close", - "code": u16::from(frame.code), - "reason": frame.reason.as_ref() - }), - Some(Ok(Message::Close(None))) => json!({ "type": "close" }), - Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), - Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), - Some(Err(_)) => json!({"type": "error"}), - None => { - state.resource_table.close(args.rid).unwrap(); - json!({"type": "closed"}) - } - } + + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(args.rid) + .ok_or_else(bad_resource_id)?; + + let mut rx = RcRef::map(&resource, |r| &r.rx).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let val = rx.next().or_cancel(cancel).await?; + let res = match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data }) - .map(Ok) - }) - .await + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.borrow_mut().resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + }; + Ok(res) } |