diff options
Diffstat (limited to 'runtime/ops')
-rw-r--r-- | runtime/ops/fs.rs | 31 | ||||
-rw-r--r-- | runtime/ops/io.rs | 714 | ||||
-rw-r--r-- | runtime/ops/net.rs | 45 | ||||
-rw-r--r-- | runtime/ops/net_unix.rs | 4 | ||||
-rw-r--r-- | runtime/ops/process.rs | 13 | ||||
-rw-r--r-- | runtime/ops/tls.rs | 9 | ||||
-rw-r--r-- | runtime/ops/tty.rs | 48 |
7 files changed, 453 insertions, 411 deletions
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs index d1a2489ba..d1242b116 100644 --- a/runtime/ops/fs.rs +++ b/runtime/ops/fs.rs @@ -1,7 +1,6 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. // Some deserializer fields are only used on Unix and Windows build fails without it -use super::io::std_file_resource; -use super::io::StreamResource; +use super::io::StdFileResource; use crate::fs_util::canonicalize_path; use crate::permissions::Permissions; use deno_core::error::bad_resource_id; @@ -188,7 +187,7 @@ fn op_open_sync( let (path, open_options) = open_helper(state, args)?; let std_file = open_options.open(path)?; let tokio_file = tokio::fs::File::from_std(std_file); - let resource = StreamResource::fs_file(tokio_file); + let resource = StdFileResource::fs_file(tokio_file); let rid = state.resource_table.add(resource); Ok(json!(rid)) } @@ -202,7 +201,7 @@ async fn op_open_async( let tokio_file = tokio::fs::OpenOptions::from(open_options) .open(path) .await?; - let resource = StreamResource::fs_file(tokio_file); + let resource = StdFileResource::fs_file(tokio_file); let rid = state.borrow_mut().resource_table.add(resource); Ok(json!(rid)) } @@ -239,7 +238,7 @@ fn op_seek_sync( _zero_copy: &mut [ZeroCopyBuf], ) -> Result<Value, AnyError> { let (rid, seek_from) = seek_helper(args)?; - let pos = std_file_resource(state, rid, |r| match r { + let pos = StdFileResource::with(state, rid, |r| match r { Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from), Err(_) => Err(type_error( "cannot seek on this type of resource".to_string(), @@ -258,7 +257,7 @@ async fn op_seek_async( let resource = state .borrow_mut() .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { @@ -286,7 +285,7 @@ fn op_fdatasync_sync( ) -> Result<Value, AnyError> { let args: FdatasyncArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - std_file_resource(state, rid, |r| match r { + StdFileResource::with(state, rid, |r| match r { Ok(std_file) => std_file.sync_data().map_err(AnyError::from), Err(_) => Err(type_error("cannot sync this type of resource".to_string())), })?; @@ -304,7 +303,7 @@ async fn op_fdatasync_async( let resource = state .borrow_mut() .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { @@ -332,7 +331,7 @@ fn op_fsync_sync( ) -> Result<Value, AnyError> { let args: FsyncArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - std_file_resource(state, rid, |r| match r { + StdFileResource::with(state, rid, |r| match r { Ok(std_file) => std_file.sync_all().map_err(AnyError::from), Err(_) => Err(type_error("cannot sync this type of resource".to_string())), })?; @@ -350,7 +349,7 @@ async fn op_fsync_async( let resource = state .borrow_mut() .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { @@ -379,7 +378,7 @@ fn op_fstat_sync( super::check_unstable(state, "Deno.fstat"); let args: FstatArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let metadata = std_file_resource(state, rid, |r| match r { + let metadata = StdFileResource::with(state, rid, |r| match r { Ok(std_file) => std_file.metadata().map_err(AnyError::from), Err(_) => Err(type_error("cannot stat this type of resource".to_string())), })?; @@ -399,7 +398,7 @@ async fn op_fstat_async( let resource = state .borrow_mut() .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { @@ -1365,7 +1364,7 @@ fn op_ftruncate_sync( let args: FtruncateArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let len = args.len as u64; - std_file_resource(state, rid, |r| match r { + StdFileResource::with(state, rid, |r| match r { Ok(std_file) => std_file.set_len(len).map_err(AnyError::from), Err(_) => Err(type_error("cannot truncate this type of resource")), })?; @@ -1385,7 +1384,7 @@ async fn op_ftruncate_async( let resource = state .borrow_mut() .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { @@ -1648,7 +1647,7 @@ fn op_futime_sync( let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - std_file_resource(state, rid, |r| match r { + StdFileResource::with(state, rid, |r| match r { Ok(std_file) => { filetime::set_file_handle_times(std_file, Some(atime), Some(mtime)) .map_err(AnyError::from) @@ -1675,7 +1674,7 @@ async fn op_futime_async( let resource = state .borrow_mut() .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index a8654cc29..2ac8e1b78 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -3,11 +3,14 @@ use super::dispatch_minimal::minimal_op; use super::dispatch_minimal::MinimalOp; use crate::metrics::metrics_op; -use deno_core::error::bad_resource_id; use deno_core::error::resource_unavailable; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::error::{bad_resource_id, not_supported}; use deno_core::futures::future::FutureExt; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::BufVec; @@ -17,20 +20,30 @@ use deno_core::JsRuntime; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; use std::borrow::Cow; use std::cell::RefCell; +use std::convert::TryInto; +use std::io::Read; +use std::io::Write; use std::rc::Rc; +use tokio::io::split; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; +use tokio::io::ReadHalf; +use tokio::io::WriteHalf; use tokio::net::tcp; use tokio::net::TcpStream; -use tokio_rustls::client::TlsStream as ClientTlsStream; -use tokio_rustls::server::TlsStream as ServerTlsStream; +use tokio::process; +use tokio_rustls as tls; #[cfg(unix)] use std::os::unix::io::FromRawFd; +#[cfg(unix)] +use tokio::net::unix; #[cfg(windows)] use std::os::windows::io::FromRawHandle; @@ -94,12 +107,13 @@ lazy_static! { pub fn init(rt: &mut JsRuntime) { rt.register_op("op_read", metrics_op(minimal_op(op_read))); rt.register_op("op_write", metrics_op(minimal_op(op_write))); + super::reg_json_async(rt, "op_shutdown", op_shutdown); } pub fn get_stdio() -> ( - Option<StreamResource>, - Option<StreamResource>, - Option<StreamResource>, + Option<StdFileResource>, + Option<StdFileResource>, + Option<StdFileResource>, ) { let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin"); let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout"); @@ -111,13 +125,13 @@ pub fn get_stdio() -> ( fn get_stdio_stream( handle: &Option<std::fs::File>, name: &str, -) -> Option<StreamResource> { +) -> Option<StdFileResource> { match handle { None => None, Some(file_handle) => match file_handle.try_clone() { Ok(clone) => { let tokio_file = tokio::fs::File::from_std(clone); - Some(StreamResource::stdio(tokio_file, name)) + Some(StdFileResource::stdio(tokio_file, name)) } Err(_e) => None, }, @@ -143,6 +157,80 @@ pub struct FileMetadata { } #[derive(Debug)] +pub struct WriteOnlyResource<S> { + stream: AsyncRefCell<S>, +} + +impl<S: 'static> From<S> for WriteOnlyResource<S> { + fn from(stream: S) -> Self { + Self { + stream: stream.into(), + } + } +} + +impl<S> WriteOnlyResource<S> +where + S: AsyncWrite + Unpin + 'static, +{ + pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> { + RcRef::map(self, |r| &r.stream).borrow_mut() + } + + async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + let mut stream = self.borrow_mut().await; + let nwritten = stream.write(buf).await?; + Ok(nwritten) + } + + async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + let mut stream = self.borrow_mut().await; + stream.shutdown().await?; + Ok(()) + } +} + +#[derive(Debug)] +pub struct ReadOnlyResource<S> { + stream: AsyncRefCell<S>, + cancel_handle: CancelHandle, +} + +impl<S: 'static> From<S> for ReadOnlyResource<S> { + fn from(stream: S) -> Self { + Self { + stream: stream.into(), + cancel_handle: Default::default(), + } + } +} + +impl<S> ReadOnlyResource<S> +where + S: AsyncRead + Unpin + 'static, +{ + pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> { + RcRef::map(self, |r| &r.stream).borrow_mut() + } + + pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() + } + + async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { + let mut rd = self.borrow_mut().await; + let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) + } +} + +/// A full duplex resource has a read and write ends that are completely +/// independent, like TCP/Unix sockets and TLS streams. +#[derive(Debug)] pub struct FullDuplexResource<R, W> { rd: AsyncRefCell<R>, wr: AsyncRefCell<W>, @@ -152,7 +240,11 @@ pub struct FullDuplexResource<R, W> { cancel_handle: CancelHandle, } -impl<R: 'static, W: 'static> FullDuplexResource<R, W> { +impl<R, W> FullDuplexResource<R, W> +where + R: AsyncRead + Unpin + 'static, + W: AsyncWrite + Unpin + 'static, +{ pub fn new((rd, wr): (R, W)) -> Self { Self { rd: rd.into(), @@ -180,13 +272,7 @@ impl<R: 'static, W: 'static> FullDuplexResource<R, W> { pub fn cancel_read_ops(&self) { self.cancel_handle.cancel() } -} -impl<R, W> FullDuplexResource<R, W> -where - R: AsyncRead + Unpin + 'static, - W: AsyncWrite + Unpin + 'static, -{ async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { let mut rd = self.rd_borrow_mut().await; let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?; @@ -198,6 +284,56 @@ where let nwritten = wr.write(buf).await?; Ok(nwritten) } + + async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + let mut wr = self.wr_borrow_mut().await; + wr.shutdown().await?; + Ok(()) + } +} + +pub type FullDuplexSplitResource<S> = + FullDuplexResource<ReadHalf<S>, WriteHalf<S>>; + +impl<S> From<S> for FullDuplexSplitResource<S> +where + S: AsyncRead + AsyncWrite + 'static, +{ + fn from(stream: S) -> Self { + Self::new(split(stream)) + } +} + +pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>; + +impl Resource for ChildStdinResource { + fn name(&self) -> Cow<str> { + "childStdin".into() + } +} + +pub type ChildStdoutResource = ReadOnlyResource<process::ChildStdout>; + +impl Resource for ChildStdoutResource { + fn name(&self) -> Cow<str> { + "childStdout".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +pub type ChildStderrResource = ReadOnlyResource<process::ChildStderr>; + +impl Resource for ChildStderrResource { + fn name(&self) -> Cow<str> { + "childStderr".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } } pub type TcpStreamResource = @@ -213,35 +349,74 @@ impl Resource for TcpStreamResource { } } -#[derive(Default)] -pub struct StreamResource { - pub fs_file: - Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>, +pub type TlsClientStreamResource = + FullDuplexSplitResource<tls::client::TlsStream<TcpStream>>; - #[cfg(unix)] - pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>, +impl Resource for TlsClientStreamResource { + fn name(&self) -> Cow<str> { + "tlsClientStream".into() + } - child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>, + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} - child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>, +pub type TlsServerStreamResource = + FullDuplexSplitResource<tls::server::TlsStream<TcpStream>>; - child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>, +impl Resource for TlsServerStreamResource { + fn name(&self) -> Cow<str> { + "tlsServerStream".into() + } - client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>, + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} - server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>, +#[cfg(unix)] +pub type UnixStreamResource = + FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>; - cancel: CancelHandle, - name: String, +#[cfg(not(unix))] +struct UnixStreamResource; + +#[cfg(not(unix))] +impl UnixStreamResource { + async fn read(self: &Rc<Self>, _buf: &mut [u8]) -> Result<usize, AnyError> { + unreachable!() + } + async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> { + unreachable!() + } + async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + unreachable!() + } + fn cancel_read_ops(&self) { + unreachable!() + } } -impl std::fmt::Debug for StreamResource { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "StreamResource") +impl Resource for UnixStreamResource { + fn name(&self) -> Cow<str> { + "unixStream".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); } } -impl StreamResource { +#[derive(Debug, Default)] +pub struct StdFileResource { + pub fs_file: + Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>, + cancel: CancelHandle, + name: String, +} + +impl StdFileResource { pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self { Self { fs_file: Some(AsyncRefCell::new(( @@ -264,194 +439,89 @@ impl StreamResource { } } - #[cfg(unix)] - pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self { - Self { - unix_stream: Some(AsyncRefCell::new(unix_stream)), - name: "unixStream".to_string(), - ..Default::default() - } - } - - pub fn child_stdout(child: tokio::process::ChildStdout) -> Self { - Self { - child_stdout: Some(AsyncRefCell::new(child)), - name: "childStdout".to_string(), - ..Default::default() - } - } - - pub fn child_stderr(child: tokio::process::ChildStderr) -> Self { - Self { - child_stderr: Some(AsyncRefCell::new(child)), - name: "childStderr".to_string(), - ..Default::default() - } - } - - pub fn child_stdin(child: tokio::process::ChildStdin) -> Self { - Self { - child_stdin: Some(AsyncRefCell::new(child)), - name: "childStdin".to_string(), - ..Default::default() - } - } - - pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self { - Self { - client_tls_stream: Some(AsyncRefCell::new(stream)), - name: "clientTlsStream".to_string(), - ..Default::default() - } - } - - pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self { - Self { - server_tls_stream: Some(AsyncRefCell::new(stream)), - name: "serverTlsStream".to_string(), - ..Default::default() - } - } - - async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { - // TODO(bartlomieju): in the future, it would be better for `StreamResource` - // to be an enum instead a struct with many `Option` fields, however I - // wasn't able to get it to work with `AsyncRefCell`s. + async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> { if self.fs_file.is_some() { - debug_assert!(self.child_stdin.is_none()); - debug_assert!(self.child_stdout.is_none()); - debug_assert!(self.child_stderr.is_none()); - debug_assert!(self.server_tls_stream.is_none()); - debug_assert!(self.client_tls_stream.is_none()); - let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap()) + let mut fs_file = RcRef::map(&*self, |r| r.fs_file.as_ref().unwrap()) .borrow_mut() .await; - let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?; + let nwritten = fs_file.0.as_mut().unwrap().read(buf).await?; return Ok(nwritten); - } else if self.child_stdout.is_some() { - debug_assert!(self.child_stdin.is_none()); - debug_assert!(self.child_stderr.is_none()); - debug_assert!(self.server_tls_stream.is_none()); - debug_assert!(self.client_tls_stream.is_none()); - let mut child_stdout = - RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap()) - .borrow_mut() - .await; - let cancel = RcRef::map(self, |r| &r.cancel); - let nread = child_stdout.read(buf).try_or_cancel(cancel).await?; - return Ok(nread); - } else if self.child_stderr.is_some() { - debug_assert!(self.child_stdin.is_none()); - debug_assert!(self.server_tls_stream.is_none()); - debug_assert!(self.client_tls_stream.is_none()); - let mut child_stderr = - RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap()) - .borrow_mut() - .await; - let cancel = RcRef::map(self, |r| &r.cancel); - let nread = child_stderr.read(buf).try_or_cancel(cancel).await?; - return Ok(nread); - } else if self.client_tls_stream.is_some() { - debug_assert!(self.server_tls_stream.is_none()); - let mut client_tls_stream = - RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap()) - .borrow_mut() - .await; - let cancel = RcRef::map(self, |r| &r.cancel); - let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?; - return Ok(nread); - } else if self.server_tls_stream.is_some() { - let mut server_tls_stream = - RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap()) - .borrow_mut() - .await; - let cancel = RcRef::map(self, |r| &r.cancel); - let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?; - return Ok(nread); - } - - #[cfg(unix)] - if self.unix_stream.is_some() { - let mut unix_stream = - RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap()) - .borrow_mut() - .await; - let cancel = RcRef::map(self, |r| &r.cancel); - let nread = unix_stream.read(buf).try_or_cancel(cancel).await?; - return Ok(nread); + } else { + Err(resource_unavailable()) } - - Err(bad_resource_id()) } - async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { - // TODO(bartlomieju): in the future, it would be better for `StreamResource` - // to be an enum instead a struct with many `Option` fields, however I - // wasn't able to get it to work with `AsyncRefCell`s. + async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { if self.fs_file.is_some() { - debug_assert!(self.child_stdin.is_none()); - debug_assert!(self.child_stdout.is_none()); - debug_assert!(self.child_stderr.is_none()); - debug_assert!(self.server_tls_stream.is_none()); - debug_assert!(self.client_tls_stream.is_none()); - let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap()) + let mut fs_file = RcRef::map(&*self, |r| r.fs_file.as_ref().unwrap()) .borrow_mut() .await; - let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?; - (*fs_file).0.as_mut().unwrap().flush().await?; - return Ok(nwritten); - } else if self.child_stdin.is_some() { - debug_assert!(self.child_stdout.is_none()); - debug_assert!(self.child_stderr.is_none()); - debug_assert!(self.server_tls_stream.is_none()); - debug_assert!(self.client_tls_stream.is_none()); - let mut child_stdin = - RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap()) - .borrow_mut() - .await; - let nwritten = child_stdin.write(buf).await?; - child_stdin.flush().await?; - return Ok(nwritten); - } else if self.client_tls_stream.is_some() { - debug_assert!(self.server_tls_stream.is_none()); - let mut client_tls_stream = - RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap()) - .borrow_mut() - .await; - let nwritten = client_tls_stream.write(buf).await?; - client_tls_stream.flush().await?; - return Ok(nwritten); - } else if self.server_tls_stream.is_some() { - let mut server_tls_stream = - RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap()) - .borrow_mut() - .await; - let nwritten = server_tls_stream.write(buf).await?; - server_tls_stream.flush().await?; + let nwritten = fs_file.0.as_mut().unwrap().write(buf).await?; + fs_file.0.as_mut().unwrap().flush().await?; return Ok(nwritten); + } else { + Err(resource_unavailable()) } + } - #[cfg(unix)] - if self.unix_stream.is_some() { - let mut unix_stream = - RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap()) - .borrow_mut() - .await; - let nwritten = unix_stream.write(buf).await?; - unix_stream.flush().await?; - return Ok(nwritten); + pub fn with<F, R>( + state: &mut OpState, + rid: u32, + mut f: F, + ) -> Result<R, AnyError> + where + F: FnMut(Result<&mut std::fs::File, ()>) -> Result<R, AnyError>, + { + // First we look up the rid in the resource table. + let resource = state + .resource_table + .get::<StdFileResource>(rid) + .ok_or_else(bad_resource_id)?; + + // Sync write only works for FsFile. It doesn't make sense to do this + // for non-blocking sockets. So we error out if not FsFile. + if resource.fs_file.is_none() { + return f(Err(())); } - Err(bad_resource_id()) + // The object in the resource table is a tokio::fs::File - but in + // order to do a blocking write on it, we must turn it into a + // std::fs::File. Hopefully this code compiles down to nothing. + let fs_file_resource = + RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut(); + + if let Some(mut fs_file) = fs_file_resource { + let tokio_file = fs_file.0.take().unwrap(); + match tokio_file.try_into_std() { + Ok(mut std_file) => { + let result = f(Ok(&mut std_file)); + // Turn the std_file handle back into a tokio file, put it back + // in the resource table. + let tokio_file = tokio::fs::File::from_std(std_file); + fs_file.0 = Some(tokio_file); + // return the result. + result + } + Err(tokio_file) => { + // This function will return an error containing the file if + // some operation is in-flight. + fs_file.0 = Some(tokio_file); + Err(resource_unavailable()) + } + } + } else { + Err(resource_unavailable()) + } } } -impl Resource for StreamResource { +impl Resource for StdFileResource { fn name(&self) -> Cow<str> { - self.name.clone().into() + self.name.as_str().into() } fn close(self: Rc<Self>) { + // TODO: do not cancel file I/O when file is writable. self.cancel.cancel() } } @@ -460,153 +530,163 @@ pub fn op_read( state: Rc<RefCell<OpState>>, is_sync: bool, rid: i32, - mut zero_copy: BufVec, + bufs: BufVec, ) -> MinimalOp { - debug!("read rid={}", rid); - match zero_copy.len() { + match bufs.len() { 0 => return MinimalOp::Sync(Err(no_buffer_specified())), 1 => {} _ => panic!("Invalid number of arguments"), - } + }; + let buf = bufs.into_iter().next().unwrap(); if is_sync { - MinimalOp::Sync({ - // First we look up the rid in the resource table. - std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r { - Ok(std_file) => { - use std::io::Read; - std_file - .read(&mut zero_copy[0]) - .map(|n: usize| n as i32) - .map_err(AnyError::from) - } - Err(_) => Err(type_error("sync read not allowed on this resource")), - }) - }) + MinimalOp::Sync(op_read_sync(state, rid, buf)) } else { - let mut zero_copy = zero_copy[0].clone(); - MinimalOp::Async({ - async move { - let resource = state - .borrow() - .resource_table - .get_any(rid as u32) - .ok_or_else(bad_resource_id)?; - let nread = if let Some(stream) = - resource.downcast_rc::<TcpStreamResource>() - { - stream.read(&mut zero_copy).await? - } else if let Some(stream) = resource.downcast_rc::<StreamResource>() { - stream.clone().read(&mut zero_copy).await? - } else { - return Err(bad_resource_id()); - }; - Ok(nread as i32) - } - .boxed_local() - }) + MinimalOp::Async(op_read_async(state, rid, buf).boxed_local()) } } +fn op_read_sync( + state: Rc<RefCell<OpState>>, + rid: i32, + mut buf: ZeroCopyBuf, +) -> Result<i32, AnyError> { + let rid = rid.try_into().map_err(|_| bad_resource_id())?; + StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r { + Ok(std_file) => std_file + .read(&mut buf) + .map(|n: usize| n as i32) + .map_err(AnyError::from), + Err(_) => Err(not_supported()), + }) +} + +async fn op_read_async( + state: Rc<RefCell<OpState>>, + rid: i32, + mut buf: ZeroCopyBuf, +) -> Result<i32, AnyError> { + let rid = rid.try_into().map_err(|_| bad_resource_id())?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + let nread = if let Some(s) = resource.downcast_rc::<ChildStdoutResource>() { + s.read(&mut buf).await? + } else if let Some(s) = resource.downcast_rc::<ChildStderrResource>() { + s.read(&mut buf).await? + } else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.read(&mut buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() { + s.read(&mut buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() { + s.read(&mut buf).await? + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.read(&mut buf).await? + } else if let Some(s) = resource.downcast_rc::<StdFileResource>() { + s.read(&mut buf).await? + } else { + return Err(not_supported()); + }; + Ok(nread as i32) +} + pub fn op_write( state: Rc<RefCell<OpState>>, is_sync: bool, rid: i32, - zero_copy: BufVec, + bufs: BufVec, ) -> MinimalOp { - debug!("write rid={}", rid); - match zero_copy.len() { + match bufs.len() { 0 => return MinimalOp::Sync(Err(no_buffer_specified())), 1 => {} _ => panic!("Invalid number of arguments"), - } + }; + let buf = bufs.into_iter().next().unwrap(); if is_sync { - MinimalOp::Sync({ - // First we look up the rid in the resource table. - std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r { - Ok(std_file) => { - use std::io::Write; - std_file - .write(&zero_copy[0]) - .map(|nwritten: usize| nwritten as i32) - .map_err(AnyError::from) - } - Err(_) => Err(type_error("sync read not allowed on this resource")), - }) - }) + MinimalOp::Sync(op_write_sync(state, rid, buf)) } else { - let zero_copy = zero_copy[0].clone(); - MinimalOp::Async({ - async move { - let resource = state - .borrow() - .resource_table - .get_any(rid as u32) - .ok_or_else(bad_resource_id)?; - let nwritten = if let Some(stream) = - resource.downcast_rc::<TcpStreamResource>() - { - stream.write(&zero_copy).await? - } else if let Some(stream) = resource.downcast_rc::<StreamResource>() { - stream.clone().write(&zero_copy).await? - } else { - return Err(bad_resource_id()); - }; - Ok(nwritten as i32) - } - .boxed_local() - }) + MinimalOp::Async(op_write_async(state, rid, buf).boxed_local()) } } -pub fn std_file_resource<F, T>( - state: &mut OpState, - rid: u32, - mut f: F, -) -> Result<T, AnyError> -where - F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>, -{ - // First we look up the rid in the resource table. +fn op_write_sync( + state: Rc<RefCell<OpState>>, + rid: i32, + buf: ZeroCopyBuf, +) -> Result<i32, AnyError> { + let rid = rid.try_into().map_err(|_| bad_resource_id())?; + StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r { + Ok(std_file) => std_file + .write(&buf) + .map(|nwritten: usize| nwritten as i32) + .map_err(AnyError::from), + Err(_) => Err(not_supported()), + }) +} + +async fn op_write_async( + state: Rc<RefCell<OpState>>, + rid: i32, + buf: ZeroCopyBuf, +) -> Result<i32, AnyError> { + let rid = rid.try_into().map_err(|_| bad_resource_id())?; let resource = state + .borrow() .resource_table - .get::<StreamResource>(rid) + .get_any(rid) .ok_or_else(bad_resource_id)?; + let nwritten = if let Some(s) = resource.downcast_rc::<ChildStdinResource>() { + s.write(&buf).await? + } else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.write(&buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() { + s.write(&buf).await? + } else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() { + s.write(&buf).await? + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.write(&buf).await? + } else if let Some(s) = resource.downcast_rc::<StdFileResource>() { + s.write(&buf).await? + } else { + return Err(not_supported()); + }; + Ok(nwritten as i32) +} - // Sync write only works for FsFile. It doesn't make sense to do this - // for non-blocking sockets. So we error out if not FsFile. - if resource.fs_file.is_none() { - return f(Err(())); - } - - // The object in the resource table is a tokio::fs::File - but in - // order to do a blocking write on it, we must turn it into a - // std::fs::File. Hopefully this code compiles down to nothing. - - let fs_file_resource = - RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut(); - - if let Some(mut fs_file) = fs_file_resource { - let tokio_file = fs_file.0.take().unwrap(); - match tokio_file.try_into_std() { - Ok(mut std_file) => { - let result = f(Ok(&mut std_file)); - // Turn the std_file handle back into a tokio file, put it back - // in the resource table. - let tokio_file = tokio::fs::File::from_std(std_file); - fs_file.0 = Some(tokio_file); - // return the result. - result - } - Err(tokio_file) => { - // This function will return an error containing the file if - // some operation is in-flight. - fs_file.0 = Some(tokio_file); - Err(resource_unavailable()) - } - } +#[derive(Deserialize)] +struct ShutdownArgs { + rid: i32, +} + +async fn op_shutdown( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let rid = serde_json::from_value::<ShutdownArgs>(args)? + .rid + .try_into() + .map_err(|_| bad_resource_id())?; + let resource = state + .borrow() + .resource_table + .get_any(rid) + .ok_or_else(bad_resource_id)?; + if let Some(s) = resource.downcast_rc::<ChildStdinResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() { + s.shutdown().await?; + } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() { + s.shutdown().await?; } else { - Err(resource_unavailable()) + return Err(not_supported()); } + Ok(json!({})) } diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs index e3864b38a..7e80bb86b 100644 --- a/runtime/ops/net.rs +++ b/runtime/ops/net.rs @@ -5,7 +5,6 @@ use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; use deno_core::error::bad_resource; -use deno_core::error::bad_resource_id; use deno_core::error::custom_error; use deno_core::error::generic_error; use deno_core::error::type_error; @@ -27,7 +26,6 @@ use std::borrow::Cow; use std::cell::RefCell; use std::net::SocketAddr; use std::rc::Rc; -use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio::net::UdpSocket; @@ -42,14 +40,13 @@ use trust_dns_resolver::AsyncResolver; #[cfg(unix)] use super::net_unix; #[cfg(unix)] -use crate::ops::io::StreamResource; +use crate::ops::io::UnixStreamResource; #[cfg(unix)] use std::path::Path; pub fn init(rt: &mut deno_core::JsRuntime) { super::reg_json_async(rt, "op_accept", op_accept); super::reg_json_async(rt, "op_connect", op_connect); - super::reg_json_async(rt, "op_shutdown", op_shutdown); super::reg_json_sync(rt, "op_listen", op_listen); super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); super::reg_json_async(rt, "op_datagram_send", op_datagram_send); @@ -318,7 +315,7 @@ async fn op_connect( let remote_addr = unix_stream.peer_addr()?; let mut state_ = state.borrow_mut(); - let resource = StreamResource::unix_stream(unix_stream); + let resource = UnixStreamResource::new(unix_stream.into_split()); let rid = state_.resource_table.add(resource); Ok(json!({ "rid": rid, @@ -336,44 +333,6 @@ async fn op_connect( } } -#[derive(Deserialize)] -struct ShutdownArgs { - rid: i32, -} - -async fn op_shutdown( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: ShutdownArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - - let resource = state - .borrow() - .resource_table - .get_any(rid) - .ok_or_else(bad_resource_id)?; - if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() { - let mut wr = stream.wr_borrow_mut().await; - wr.shutdown().await?; - return Ok(json!({})); - } - - #[cfg(unix)] - if let Some(stream) = resource.downcast_rc::<StreamResource>() { - if stream.unix_stream.is_some() { - let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap()) - .borrow_mut() - .await; - wr.shutdown().await?; - return Ok(json!({})); - } - } - - Err(bad_resource_id()) -} - struct TcpListenerResource { listener: AsyncRefCell<TcpListener>, cancel: CancelHandle, diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs index 1177d071c..c3e561568 100644 --- a/runtime/ops/net_unix.rs +++ b/runtime/ops/net_unix.rs @@ -1,6 +1,6 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::ops::io::StreamResource; +use crate::ops::io::UnixStreamResource; use crate::ops::net::AcceptArgs; use crate::ops::net::ReceiveArgs; use deno_core::error::bad_resource; @@ -81,7 +81,7 @@ pub(crate) async fn accept_unix( let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; - let resource = StreamResource::unix_stream(unix_stream); + let resource = UnixStreamResource::new(unix_stream.into_split()); let mut state = state.borrow_mut(); let rid = state.resource_table.add(resource); Ok(json!({ diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 412d21ef2..89f323232 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -1,6 +1,9 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use super::io::{std_file_resource, StreamResource}; +use super::io::ChildStderrResource; +use super::io::ChildStdinResource; +use super::io::ChildStdoutResource; +use super::io::StdFileResource; use crate::permissions::Permissions; use deno_core::error::bad_resource_id; use deno_core::error::type_error; @@ -34,7 +37,7 @@ fn clone_file( state: &mut OpState, rid: u32, ) -> Result<std::fs::File, AnyError> { - std_file_resource(state, rid, move |r| match r { + StdFileResource::with(state, rid, move |r| match r { Ok(std_file) => std_file.try_clone().map_err(AnyError::from), Err(_) => Err(bad_resource_id()), }) @@ -134,7 +137,7 @@ fn op_run( Some(child_stdin) => { let rid = state .resource_table - .add(StreamResource::child_stdin(child_stdin)); + .add(ChildStdinResource::from(child_stdin)); Some(rid) } None => None, @@ -144,7 +147,7 @@ fn op_run( Some(child_stdout) => { let rid = state .resource_table - .add(StreamResource::child_stdout(child_stdout)); + .add(ChildStdoutResource::from(child_stdout)); Some(rid) } None => None, @@ -154,7 +157,7 @@ fn op_run( Some(child_stderr) => { let rid = state .resource_table - .add(StreamResource::child_stderr(child_stderr)); + .add(ChildStderrResource::from(child_stderr)); Some(rid) } None => None, diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs index 2e3e34da2..b81bd5e23 100644 --- a/runtime/ops/tls.rs +++ b/runtime/ops/tls.rs @@ -1,7 +1,8 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use super::io::StreamResource; use super::io::TcpStreamResource; +use super::io::TlsClientStreamResource; +use super::io::TlsServerStreamResource; use crate::permissions::Permissions; use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr_sync; @@ -148,7 +149,7 @@ async fn op_start_tls( let mut state_ = state.borrow_mut(); state_ .resource_table - .add(StreamResource::client_tls_stream(tls_stream)) + .add(TlsClientStreamResource::from(tls_stream)) }; Ok(json!({ "rid": rid, @@ -210,7 +211,7 @@ async fn op_connect_tls( let mut state_ = state.borrow_mut(); state_ .resource_table - .add(StreamResource::client_tls_stream(tls_stream)) + .add(TlsClientStreamResource::from(tls_stream)) }; Ok(json!({ "rid": rid, @@ -402,7 +403,7 @@ async fn op_accept_tls( let mut state_ = state.borrow_mut(); state_ .resource_table - .add(StreamResource::server_tls_stream(tls_stream)) + .add(TlsServerStreamResource::from(tls_stream)) }; Ok(json!({ diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index dfde8e0d3..a8ff9938b 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -1,7 +1,6 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use super::io::std_file_resource; -use super::io::StreamResource; +use super::io::StdFileResource; use deno_core::error::bad_resource_id; use deno_core::error::not_supported; use deno_core::error::resource_unavailable; @@ -90,7 +89,7 @@ fn op_set_raw( let resource = state .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if cbreak { @@ -157,7 +156,7 @@ fn op_set_raw( let resource = state .resource_table - .get::<StreamResource>(rid) + .get::<StdFileResource>(rid) .ok_or_else(bad_resource_id)?; if resource.fs_file.is_none() { @@ -229,26 +228,27 @@ fn op_isatty( let args: IsattyArgs = serde_json::from_value(args)?; let rid = args.rid; - let isatty: bool = std_file_resource(state, rid as u32, move |r| match r { - Ok(std_file) => { - #[cfg(windows)] - { - use winapi::um::consoleapi; - - let handle = get_windows_handle(&std_file)?; - let mut test_mode: DWORD = 0; - // If I cannot get mode out of console, it is not a console. - Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 }) - } - #[cfg(unix)] - { - use std::os::unix::io::AsRawFd; - let raw_fd = std_file.as_raw_fd(); - Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 }) + let isatty: bool = + StdFileResource::with(state, rid as u32, move |r| match r { + Ok(std_file) => { + #[cfg(windows)] + { + use winapi::um::consoleapi; + + let handle = get_windows_handle(&std_file)?; + let mut test_mode: DWORD = 0; + // If I cannot get mode out of console, it is not a console. + Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 }) + } + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + let raw_fd = std_file.as_raw_fd(); + Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 }) + } } - } - _ => Ok(false), - })?; + _ => Ok(false), + })?; Ok(json!(isatty)) } @@ -273,7 +273,7 @@ fn op_console_size( let args: ConsoleSizeArgs = serde_json::from_value(args)?; let rid = args.rid; - let size = std_file_resource(state, rid as u32, move |r| match r { + let size = StdFileResource::with(state, rid as u32, move |r| match r { Ok(std_file) => { #[cfg(windows)] { |