From 7afa3aceb04e6b2c8820b7326d6f648db6b571c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 4 Mar 2023 20:39:48 -0400 Subject: refactor(runtime): factor out deno_io extension crate (#18001) This is a prerequisite to factor out FS ops to a separate crate. --- Cargo.lock | 12 + Cargo.toml | 2 + cli/build.rs | 1 + cli/lsp/testing/execution.rs | 4 +- cli/tools/test.rs | 4 +- cli/util/console.rs | 2 +- cli/worker.rs | 6 +- ext/io/12_io.js | 238 ++++++++++++++ ext/io/Cargo.toml | 25 ++ ext/io/README.md | 4 + ext/io/lib.rs | 713 ++++++++++++++++++++++++++++++++++++++++++ runtime/Cargo.toml | 2 + runtime/build.rs | 3 +- runtime/js/12_io.js | 238 -------------- runtime/js/40_files.js | 2 +- runtime/js/40_process.js | 2 +- runtime/js/90_deno_ns.js | 2 +- runtime/lib.rs | 1 + runtime/ops/fs.rs | 2 +- runtime/ops/io.rs | 715 ------------------------------------------- runtime/ops/mod.rs | 1 - runtime/ops/process.rs | 8 +- runtime/ops/spawn.rs | 6 +- runtime/ops/tty.rs | 2 +- runtime/web_worker.rs | 5 +- runtime/worker.rs | 5 +- 26 files changed, 1024 insertions(+), 981 deletions(-) create mode 100644 ext/io/12_io.js create mode 100644 ext/io/Cargo.toml create mode 100644 ext/io/README.md create mode 100644 ext/io/lib.rs delete mode 100644 runtime/js/12_io.js delete mode 100644 runtime/ops/io.rs diff --git a/Cargo.lock b/Cargo.lock index 180f4afb8..9c6c30f3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,6 +1161,17 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "deno_io" +version = "0.1.0" +dependencies = [ + "deno_core", + "nix", + "once_cell", + "tokio", + "winapi", +] + [[package]] name = "deno_lint" version = "0.41.0" @@ -1265,6 +1276,7 @@ dependencies = [ "deno_ffi", "deno_flash", "deno_http", + "deno_io", "deno_napi", "deno_net", "deno_node", diff --git a/Cargo.toml b/Cargo.toml index b4131c636..94b8ea470 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "ext/flash", "ext/ffi", "ext/http", + "ext/io", "ext/net", "ext/node", "ext/url", @@ -62,6 +63,7 @@ deno_fetch = { version = "0.115.0", path = "./ext/fetch" } deno_ffi = { version = "0.78.0", path = "./ext/ffi" } deno_flash = { version = "0.27.0", path = "./ext/flash" } deno_http = { version = "0.86.0", path = "./ext/http" } +deno_io = { version = "0.1.0", path = "./ext/io" } deno_net = { version = "0.83.0", path = "./ext/net" } deno_node = { version = "0.28.0", path = "./ext/node" } deno_tls = { version = "0.78.0", path = "./ext/tls" } diff --git a/cli/build.rs b/cli/build.rs index 01127fded..c737c1f6d 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -351,6 +351,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) { ), deno_napi::init::(), deno_http::init(), + deno_io::init(Default::default()), deno_flash::init::(false), // No --unstable ]; diff --git a/cli/lsp/testing/execution.rs b/cli/lsp/testing/execution.rs index 28f15a7ce..af890cc0d 100644 --- a/cli/lsp/testing/execution.rs +++ b/cli/lsp/testing/execution.rs @@ -27,8 +27,8 @@ use deno_core::futures::StreamExt; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::RwLock; use deno_core::ModuleSpecifier; -use deno_runtime::ops::io::Stdio; -use deno_runtime::ops::io::StdioPipe; +use deno_runtime::deno_io::Stdio; +use deno_runtime::deno_io::StdioPipe; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use deno_runtime::tokio_util::run_local; diff --git a/cli/tools/test.rs b/cli/tools/test.rs index ae831b95b..0a0f186fb 100644 --- a/cli/tools/test.rs +++ b/cli/tools/test.rs @@ -31,9 +31,9 @@ use deno_core::futures::StreamExt; use deno_core::parking_lot::Mutex; use deno_core::url::Url; use deno_core::ModuleSpecifier; +use deno_runtime::deno_io::Stdio; +use deno_runtime::deno_io::StdioPipe; use deno_runtime::fmt_errors::format_js_error; -use deno_runtime::ops::io::Stdio; -use deno_runtime::ops::io::StdioPipe; use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use deno_runtime::tokio_util::run_local; diff --git a/cli/util/console.rs b/cli/util/console.rs index ac78ca4df..dc37138b1 100644 --- a/cli/util/console.rs +++ b/cli/util/console.rs @@ -4,6 +4,6 @@ use deno_runtime::ops::tty::ConsoleSize; /// Gets the console size. pub fn console_size() -> Option { - let stderr = &deno_runtime::ops::io::STDERR_HANDLE; + let stderr = &deno_runtime::deno_io::STDERR_HANDLE; deno_runtime::ops::tty::console_size(stderr).ok() } diff --git a/cli/worker.rs b/cli/worker.rs index a112663c9..7ef90d79f 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -420,7 +420,7 @@ pub async fn create_main_worker_for_test_or_bench( main_module: ModuleSpecifier, permissions: PermissionsContainer, custom_extensions: Vec, - stdio: deno_runtime::ops::io::Stdio, + stdio: deno_runtime::deno_io::Stdio, ) -> Result { create_main_worker_internal( ps, @@ -438,7 +438,7 @@ async fn create_main_worker_internal( main_module: ModuleSpecifier, permissions: PermissionsContainer, mut custom_extensions: Vec, - stdio: deno_runtime::ops::io::Stdio, + stdio: deno_runtime::deno_io::Stdio, bench_or_test: bool, ) -> Result { let (main_module, is_main_cjs) = if let Ok(package_ref) = @@ -642,7 +642,7 @@ fn create_web_worker_pre_execute_module_callback( fn create_web_worker_callback( ps: ProcState, - stdio: deno_runtime::ops::io::Stdio, + stdio: deno_runtime::deno_io::Stdio, ) -> Arc { Arc::new(move |args| { let maybe_inspector_server = ps.maybe_inspector_server.clone(); diff --git a/ext/io/12_io.js b/ext/io/12_io.js new file mode 100644 index 000000000..b9ff1190b --- /dev/null +++ b/ext/io/12_io.js @@ -0,0 +1,238 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// Interfaces 100% copied from Go. +// Documentation liberally lifted from them too. +// Thank you! We love Go! <3 + +const core = globalThis.Deno.core; +const ops = core.ops; +const primordials = globalThis.__bootstrap.primordials; +const { + Uint8Array, + ArrayPrototypePush, + MathMin, + TypedArrayPrototypeSubarray, + TypedArrayPrototypeSet, +} = primordials; + +const DEFAULT_BUFFER_SIZE = 32 * 1024; +// Seek whence values. +// https://golang.org/pkg/io/#pkg-constants +const SeekMode = { + 0: "Start", + 1: "Current", + 2: "End", + + Start: 0, + Current: 1, + End: 2, +}; + +async function copy( + src, + dst, + options, +) { + let n = 0; + const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; + const b = new Uint8Array(bufSize); + let gotEOF = false; + while (gotEOF === false) { + const result = await src.read(b); + if (result === null) { + gotEOF = true; + } else { + let nwritten = 0; + while (nwritten < result) { + nwritten += await dst.write( + TypedArrayPrototypeSubarray(b, nwritten, result), + ); + } + n += nwritten; + } + } + return n; +} + +async function* iter( + r, + options, +) { + const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; + const b = new Uint8Array(bufSize); + while (true) { + const result = await r.read(b); + if (result === null) { + break; + } + + yield TypedArrayPrototypeSubarray(b, 0, result); + } +} + +function* iterSync( + r, + options, +) { + const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; + const b = new Uint8Array(bufSize); + while (true) { + const result = r.readSync(b); + if (result === null) { + break; + } + + yield TypedArrayPrototypeSubarray(b, 0, result); + } +} + +function readSync(rid, buffer) { + if (buffer.length === 0) { + return 0; + } + + const nread = ops.op_read_sync(rid, buffer); + + return nread === 0 ? null : nread; +} + +async function read(rid, buffer) { + if (buffer.length === 0) { + return 0; + } + + const nread = await core.read(rid, buffer); + + return nread === 0 ? null : nread; +} + +function writeSync(rid, data) { + return ops.op_write_sync(rid, data); +} + +function write(rid, data) { + return core.write(rid, data); +} + +const READ_PER_ITER = 64 * 1024; // 64kb + +function readAll(r) { + return readAllInner(r); +} +async function readAllInner(r, options) { + const buffers = []; + const signal = options?.signal ?? null; + while (true) { + signal?.throwIfAborted(); + const buf = new Uint8Array(READ_PER_ITER); + const read = await r.read(buf); + if (typeof read == "number") { + ArrayPrototypePush(buffers, new Uint8Array(buf.buffer, 0, read)); + } else { + break; + } + } + signal?.throwIfAborted(); + + return concatBuffers(buffers); +} + +function readAllSync(r) { + const buffers = []; + + while (true) { + const buf = new Uint8Array(READ_PER_ITER); + const read = r.readSync(buf); + if (typeof read == "number") { + ArrayPrototypePush(buffers, TypedArrayPrototypeSubarray(buf, 0, read)); + } else { + break; + } + } + + return concatBuffers(buffers); +} + +function concatBuffers(buffers) { + let totalLen = 0; + for (let i = 0; i < buffers.length; ++i) { + totalLen += buffers[i].byteLength; + } + + const contents = new Uint8Array(totalLen); + + let n = 0; + for (let i = 0; i < buffers.length; ++i) { + const buf = buffers[i]; + TypedArrayPrototypeSet(contents, buf, n); + n += buf.byteLength; + } + + return contents; +} + +function readAllSyncSized(r, size) { + const buf = new Uint8Array(size + 1); // 1B to detect extended files + let cursor = 0; + + while (cursor < size) { + const sliceEnd = MathMin(size + 1, cursor + READ_PER_ITER); + const slice = TypedArrayPrototypeSubarray(buf, cursor, sliceEnd); + const read = r.readSync(slice); + if (typeof read == "number") { + cursor += read; + } else { + break; + } + } + + // Handle truncated or extended files during read + if (cursor > size) { + // Read remaining and concat + return concatBuffers([buf, readAllSync(r)]); + } else { // cursor == size + return TypedArrayPrototypeSubarray(buf, 0, cursor); + } +} + +async function readAllInnerSized(r, size, options) { + const buf = new Uint8Array(size + 1); // 1B to detect extended files + let cursor = 0; + const signal = options?.signal ?? null; + while (cursor < size) { + signal?.throwIfAborted(); + const sliceEnd = MathMin(size + 1, cursor + READ_PER_ITER); + const slice = TypedArrayPrototypeSubarray(buf, cursor, sliceEnd); + const read = await r.read(slice); + if (typeof read == "number") { + cursor += read; + } else { + break; + } + } + signal?.throwIfAborted(); + + // Handle truncated or extended files during read + if (cursor > size) { + // Read remaining and concat + return concatBuffers([buf, await readAllInner(r, options)]); + } else { + return TypedArrayPrototypeSubarray(buf, 0, cursor); + } +} + +export { + copy, + iter, + iterSync, + read, + readAll, + readAllInner, + readAllInnerSized, + readAllSync, + readAllSyncSized, + readSync, + SeekMode, + write, + writeSync, +}; diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml new file mode 100644 index 000000000..7ade84c98 --- /dev/null +++ b/ext/io/Cargo.toml @@ -0,0 +1,25 @@ +# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +[package] +name = "deno_io" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +readme = "README.md" +repository.workspace = true +description = "IO promitives for Deno extensions" + +[lib] +path = "lib.rs" + +[dependencies] +deno_core.workspace = true +once_cell.workspace = true +tokio.workspace = true + +[target.'cfg(unix)'.dependencies] +nix.workspace = true + +[target.'cfg(windows)'.dependencies] +winapi = { workspace = true, features = ["winbase"] } diff --git a/ext/io/README.md b/ext/io/README.md new file mode 100644 index 000000000..b66dda76e --- /dev/null +++ b/ext/io/README.md @@ -0,0 +1,4 @@ +# deno_io + +This crate provides IO primitives for other Deno extensions, this includes stdio +streams and abstraction over File System files. diff --git a/ext/io/lib.rs b/ext/io/lib.rs new file mode 100644 index 000000000..f8dde139f --- /dev/null +++ b/ext/io/lib.rs @@ -0,0 +1,713 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::resource_unavailable; +use deno_core::error::AnyError; +use deno_core::include_js_files; +use deno_core::op; +use deno_core::parking_lot::Mutex; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufMutView; +use deno_core::BufView; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::TaskQueue; +use once_cell::sync::Lazy; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::File as StdFile; +use std::io::ErrorKind; +use std::io::Read; +use std::io::Write; +use std::rc::Rc; +use std::sync::Arc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::process; + +#[cfg(unix)] +use std::os::unix::io::FromRawFd; + +#[cfg(windows)] +use std::os::windows::io::FromRawHandle; +#[cfg(windows)] +use winapi::um::processenv::GetStdHandle; +#[cfg(windows)] +use winapi::um::winbase; + +// Store the stdio fd/handles in global statics in order to keep them +// alive for the duration of the application since the last handle/fd +// being dropped will close the corresponding pipe. +#[cfg(unix)] +pub static STDIN_HANDLE: Lazy = Lazy::new(|| { + // SAFETY: corresponds to OS stdin + unsafe { StdFile::from_raw_fd(0) } +}); +#[cfg(unix)] +pub static STDOUT_HANDLE: Lazy = Lazy::new(|| { + // SAFETY: corresponds to OS stdout + unsafe { StdFile::from_raw_fd(1) } +}); +#[cfg(unix)] +pub static STDERR_HANDLE: Lazy = Lazy::new(|| { + // SAFETY: corresponds to OS stderr + unsafe { StdFile::from_raw_fd(2) } +}); + +#[cfg(windows)] +pub static STDIN_HANDLE: Lazy = Lazy::new(|| { + // SAFETY: corresponds to OS stdin + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) } +}); +#[cfg(windows)] +pub static STDOUT_HANDLE: Lazy = Lazy::new(|| { + // SAFETY: corresponds to OS stdout + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) } +}); +#[cfg(windows)] +pub static STDERR_HANDLE: Lazy = Lazy::new(|| { + // SAFETY: corresponds to OS stderr + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) } +}); + +pub fn init(stdio: Stdio) -> Extension { + // todo(dsheret): don't do this? Taking out the writers was necessary to prevent invalid handle panics + let stdio = Rc::new(RefCell::new(Some(stdio))); + + Extension::builder("deno_io") + .ops(vec![op_read_sync::decl(), op_write_sync::decl()]) + .dependencies(vec!["deno_web"]) + .esm(include_js_files!("12_io.js",)) + .middleware(|op| match op.name { + "op_print" => op_print::decl(), + _ => op, + }) + .state(move |state| { + let stdio = stdio + .borrow_mut() + .take() + .expect("Extension only supports being used once."); + let t = &mut state.resource_table; + + let rid = t.add(StdFileResource::stdio( + match stdio.stdin { + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stdin, + file: STDIN_HANDLE.try_clone().unwrap(), + }, + StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + }, + "stdin", + )); + assert_eq!(rid, 0, "stdin must have ResourceId 0"); + + let rid = t.add(StdFileResource::stdio( + match stdio.stdout { + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stdout, + file: STDOUT_HANDLE.try_clone().unwrap(), + }, + StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + }, + "stdout", + )); + assert_eq!(rid, 1, "stdout must have ResourceId 1"); + + let rid = t.add(StdFileResource::stdio( + match stdio.stderr { + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stderr, + file: STDERR_HANDLE.try_clone().unwrap(), + }, + StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + }, + "stderr", + )); + assert_eq!(rid, 2, "stderr must have ResourceId 2"); + Ok(()) + }) + .build() +} + +pub enum StdioPipe { + Inherit, + File(StdFile), +} + +impl Default for StdioPipe { + fn default() -> Self { + Self::Inherit + } +} + +impl Clone for StdioPipe { + fn clone(&self) -> Self { + match self { + StdioPipe::Inherit => StdioPipe::Inherit, + StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()), + } + } +} + +/// Specify how stdin, stdout, and stderr are piped. +/// By default, inherits from the process. +#[derive(Clone, Default)] +pub struct Stdio { + pub stdin: StdioPipe, + pub stdout: StdioPipe, + pub stderr: StdioPipe, +} + +#[cfg(unix)] +use nix::sys::termios; + +#[derive(Default)] +pub struct TtyMetadata { + #[cfg(unix)] + pub mode: Option, +} + +#[derive(Default)] +pub struct FileMetadata { + pub tty: TtyMetadata, +} + +#[derive(Debug)] +pub struct WriteOnlyResource { + stream: AsyncRefCell, +} + +impl From for WriteOnlyResource { + fn from(stream: S) -> Self { + Self { + stream: stream.into(), + } + } +} + +impl WriteOnlyResource +where + S: AsyncWrite + Unpin + 'static, +{ + pub fn borrow_mut(self: &Rc) -> AsyncMutFuture { + RcRef::map(self, |r| &r.stream).borrow_mut() + } + + async fn write(self: Rc, data: &[u8]) -> Result { + let mut stream = self.borrow_mut().await; + let nwritten = stream.write(data).await?; + Ok(nwritten) + } + + async fn shutdown(self: Rc) -> Result<(), AnyError> { + let mut stream = self.borrow_mut().await; + stream.shutdown().await?; + Ok(()) + } + + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +#[derive(Debug)] +pub struct ReadOnlyResource { + stream: AsyncRefCell, + cancel_handle: CancelHandle, +} + +impl From for ReadOnlyResource { + fn from(stream: S) -> Self { + Self { + stream: stream.into(), + cancel_handle: Default::default(), + } + } +} + +impl ReadOnlyResource +where + S: AsyncRead + Unpin + 'static, +{ + pub fn borrow_mut(self: &Rc) -> AsyncMutFuture { + RcRef::map(self, |r| &r.stream).borrow_mut() + } + + pub fn cancel_handle(self: &Rc) -> RcRef { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() + } + + async fn read(self: Rc, data: &mut [u8]) -> Result { + let mut rd = self.borrow_mut().await; + let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) + } + + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +pub type ChildStdinResource = WriteOnlyResource; + +impl Resource for ChildStdinResource { + fn name(&self) -> Cow { + "childStdin".into() + } + + deno_core::impl_writable!(); + + fn shutdown(self: Rc) -> AsyncResult<()> { + Box::pin(self.shutdown()) + } +} + +pub type ChildStdoutResource = ReadOnlyResource; + +impl Resource for ChildStdoutResource { + deno_core::impl_readable_byob!(); + + fn name(&self) -> Cow { + "childStdout".into() + } + + fn close(self: Rc) { + self.cancel_read_ops(); + } +} + +pub type ChildStderrResource = ReadOnlyResource; + +impl Resource for ChildStderrResource { + deno_core::impl_readable_byob!(); + + fn name(&self) -> Cow { + "childStderr".into() + } + + fn close(self: Rc) { + self.cancel_read_ops(); + } +} + +#[derive(Clone, Copy)] +enum StdFileResourceKind { + File, + // For stdout and stderr, we sometimes instead use std::io::stdout() directly, + // because we get some Windows specific functionality for free by using Rust + // std's wrappers. So we take a bit of a complexity hit in order to not + // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs + Stdin, + Stdout, + Stderr, +} + +struct StdFileResourceInner { + kind: StdFileResourceKind, + file: StdFile, +} + +impl StdFileResourceInner { + pub fn file(fs_file: StdFile) -> Self { + StdFileResourceInner { + kind: StdFileResourceKind::File, + file: fs_file, + } + } + + pub fn with_file(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R { + f(&mut self.file) + } + + pub fn try_clone(&self) -> Result { + Ok(Self { + kind: self.kind, + file: self.file.try_clone()?, + }) + } + + pub fn write_and_maybe_flush( + &mut self, + buf: &[u8], + ) -> Result { + // Rust will line buffer and we don't want that behavior + // (see https://github.com/denoland/deno/issues/948), so flush stdout and stderr. + // Although an alternative solution could be to bypass Rust's std by + // using the raw fds/handles, it will cause encoding issues on Windows + // that we get solved for free by using Rust's stdio wrappers (see + // std/src/sys/windows/stdio.rs in Rust's source code). + match self.kind { + StdFileResourceKind::File => Ok(self.file.write(buf)?), + StdFileResourceKind::Stdin => { + Err(Into::::into(ErrorKind::Unsupported).into()) + } + StdFileResourceKind::Stdout => { + // bypass the file and use std::io::stdout() + let mut stdout = std::io::stdout().lock(); + let nwritten = stdout.write(buf)?; + stdout.flush()?; + Ok(nwritten) + } + StdFileResourceKind::Stderr => { + // bypass the file and use std::io::stderr() + let mut stderr = std::io::stderr().lock(); + let nwritten = stderr.write(buf)?; + stderr.flush()?; + Ok(nwritten) + } + } + } + + pub fn write_all_and_maybe_flush( + &mut self, + buf: &[u8], + ) -> Result<(), AnyError> { + // this method exists instead of using a `Write` implementation + // so that we can acquire the locks once and do both actions + match self.kind { + StdFileResourceKind::File => Ok(self.file.write_all(buf)?), + StdFileResourceKind::Stdin => { + Err(Into::::into(ErrorKind::Unsupported).into()) + } + StdFileResourceKind::Stdout => { + // bypass the file and use std::io::stdout() + let mut stdout = std::io::stdout().lock(); + stdout.write_all(buf)?; + stdout.flush()?; + Ok(()) + } + StdFileResourceKind::Stderr => { + // bypass the file and use std::io::stderr() + let mut stderr = std::io::stderr().lock(); + stderr.write_all(buf)?; + stderr.flush()?; + Ok(()) + } + } + } +} + +impl Read for StdFileResourceInner { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self.kind { + StdFileResourceKind::File | StdFileResourceKind::Stdin => { + self.file.read(buf) + } + StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => { + Err(ErrorKind::Unsupported.into()) + } + } + } +} + +struct StdFileResourceCellValue { + inner: StdFileResourceInner, + meta_data: Arc>, +} + +impl StdFileResourceCellValue { + pub fn try_clone(&self) -> Result { + Ok(Self { + inner: self.inner.try_clone()?, + meta_data: self.meta_data.clone(), + }) + } +} + +pub struct StdFileResource { + name: String, + // We can't use an AsyncRefCell here because we need to allow + // access to the resource synchronously at any time and + // asynchronously one at a time in order + cell: RefCell>, + // Used to keep async actions in order and only allow one + // to occur at a time + cell_async_task_queue: TaskQueue, +} + +impl StdFileResource { + fn stdio(inner: StdFileResourceInner, name: &str) -> Self { + Self { + cell: RefCell::new(Some(StdFileResourceCellValue { + inner, + meta_data: Default::default(), + })), + cell_async_task_queue: Default::default(), + name: name.to_string(), + } + } + + pub fn fs_file(fs_file: StdFile) -> Self { + Self { + cell: RefCell::new(Some(StdFileResourceCellValue { + inner: StdFileResourceInner::file(fs_file), + meta_data: Default::default(), + })), + cell_async_task_queue: Default::default(), + name: "fsFile".to_string(), + } + } + + fn with_inner_and_metadata( + self: Rc, + action: impl FnOnce( + &mut StdFileResourceInner, + &Arc>, + ) -> Result, + ) -> Result { + match self.cell.try_borrow_mut() { + Ok(mut cell) => { + let mut file = cell.take().unwrap(); + let result = action(&mut file.inner, &file.meta_data); + cell.replace(file); + result + } + Err(_) => Err(resource_unavailable()), + } + } + + async fn with_inner_blocking_task( + self: Rc, + action: F, + ) -> R + where + F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, + { + // we want to restrict this to one async action at a time + let _permit = self.cell_async_task_queue.acquire().await; + // we take the value out of the cell, use it on a blocking task, + // then put it back into the cell when we're done + let mut did_take = false; + let mut cell_value = { + let mut cell = self.cell.borrow_mut(); + match cell.as_mut().unwrap().try_clone() { + Ok(value) => value, + Err(_) => { + did_take = true; + cell.take().unwrap() + } + } + }; + let (cell_value, result) = tokio::task::spawn_blocking(move || { + let result = action(&mut cell_value.inner); + (cell_value, result) + }) + .await + .unwrap(); + + if did_take { + // put it back + self.cell.borrow_mut().replace(cell_value); + } + + result + } + + async fn read_byob( + self: Rc, + mut buf: BufMutView, + ) -> Result<(usize, BufMutView), AnyError> { + self + .with_inner_blocking_task(move |inner| { + let nread = inner.read(&mut buf)?; + Ok((nread, buf)) + }) + .await + } + + async fn write(self: Rc, data: &[u8]) -> Result { + let buf = data.to_owned(); + self + .with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf)) + .await + } + + async fn write_all(self: Rc, data: &[u8]) -> Result<(), AnyError> { + let buf = data.to_owned(); + self + .with_inner_blocking_task(move |inner| { + inner.write_all_and_maybe_flush(&buf) + }) + .await + } + + fn with_resource( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result + where + F: FnOnce(Rc) -> Result, + { + let resource = state.resource_table.get::(rid)?; + f(resource) + } + + pub fn with_file( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result + where + F: FnOnce(&mut StdFile) -> Result, + { + Self::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(move |inner, _| inner.with_file(f)) + }) + } + + pub fn with_file_and_metadata( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result + where + F: FnOnce(&mut StdFile, &Arc>) -> Result, + { + Self::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(move |inner, metadata| { + inner.with_file(move |file| f(file, metadata)) + }) + }) + } + + pub async fn with_file_blocking_task( + state: Rc>, + rid: ResourceId, + f: F, + ) -> Result + where + F: (FnOnce(&mut StdFile) -> Result) + Send + 'static, + { + let resource = state + .borrow_mut() + .resource_table + .get::(rid)?; + + resource + .with_inner_blocking_task(move |inner| inner.with_file(f)) + .await + } + + pub fn clone_file( + state: &mut OpState, + rid: ResourceId, + ) -> Result { + Self::with_file(state, rid, move |std_file| { + std_file.try_clone().map_err(AnyError::from) + }) + } + + pub fn as_stdio( + state: &mut OpState, + rid: u32, + ) -> Result { + Self::with_resource(state, rid, |resource| { + resource.with_inner_and_metadata(|inner, _| match inner.kind { + StdFileResourceKind::File => { + let file = inner.file.try_clone()?; + Ok(file.into()) + } + _ => Ok(std::process::Stdio::inherit()), + }) + }) + } +} + +impl Resource for StdFileResource { + fn name(&self) -> Cow { + self.name.as_str().into() + } + + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(async move { + let vec = vec![0; limit]; + let buf = BufMutView::from(vec); + let (nread, buf) = self.read_byob(buf).await?; + let mut vec = buf.unwrap_vec(); + if vec.len() != nread { + vec.truncate(nread); + } + Ok(BufView::from(vec)) + }) + } + + fn read_byob( + self: Rc, + buf: deno_core::BufMutView, + ) -> AsyncResult<(usize, deno_core::BufMutView)> { + Box::pin(self.read_byob(buf)) + } + + deno_core::impl_writable!(with_all); + + #[cfg(unix)] + fn backing_fd(self: Rc) -> Option { + use std::os::unix::io::AsRawFd; + self + .with_inner_and_metadata(move |std_file, _| { + Ok(std_file.with_file(|f| f.as_raw_fd())) + }) + .ok() + } +} + +// override op_print to use the stdout and stderr in the resource table +#[op] +pub fn op_print( + state: &mut OpState, + msg: &str, + is_err: bool, +) -> Result<(), AnyError> { + let rid = if is_err { 2 } else { 1 }; + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(|inner, _| { + inner.write_all_and_maybe_flush(msg.as_bytes())?; + Ok(()) + }) + }) +} + +#[op(fast)] +fn op_read_sync( + state: &mut OpState, + rid: u32, + buf: &mut [u8], +) -> Result { + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(|inner, _| { + inner + .read(buf) + .map(|n: usize| n as u32) + .map_err(AnyError::from) + }) + }) +} + +#[op(fast)] +fn op_write_sync( + state: &mut OpState, + rid: u32, + buf: &mut [u8], +) -> Result { + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(|inner, _| { + inner + .write_and_maybe_flush(buf) + .map(|nwritten: usize| nwritten as u32) + .map_err(AnyError::from) + }) + }) +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 7fb759b0f..5a4028349 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -43,6 +43,7 @@ deno_fetch.workspace = true deno_ffi.workspace = true deno_flash.workspace = true deno_http.workspace = true +deno_io.workspace = true deno_net.workspace = true deno_node.workspace = true deno_tls.workspace = true @@ -70,6 +71,7 @@ deno_fetch.workspace = true deno_ffi.workspace = true deno_flash.workspace = true deno_http.workspace = true +deno_io.workspace = true deno_napi.workspace = true deno_net.workspace = true deno_node.workspace = true diff --git a/runtime/build.rs b/runtime/build.rs index 4678eab99..fd93b9001 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -190,6 +190,7 @@ mod startup_snapshot { "deno_napi", "deno_http", "deno_flash", + "deno_io", ]) .esm(include_js_files!( dir "js", @@ -199,7 +200,6 @@ mod startup_snapshot { "06_util.js", "10_permissions.js", "11_workers.js", - "12_io.js", "13_buffer.js", "30_fs.js", "30_os.js", @@ -245,6 +245,7 @@ mod startup_snapshot { ), deno_napi::init::(), deno_http::init(), + deno_io::init(Default::default()), deno_flash::init::(false), // No --unstable runtime_extension, // FIXME(bartlomieju): these extensions are specified last, because they diff --git a/runtime/js/12_io.js b/runtime/js/12_io.js deleted file mode 100644 index b9ff1190b..000000000 --- a/runtime/js/12_io.js +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -// Interfaces 100% copied from Go. -// Documentation liberally lifted from them too. -// Thank you! We love Go! <3 - -const core = globalThis.Deno.core; -const ops = core.ops; -const primordials = globalThis.__bootstrap.primordials; -const { - Uint8Array, - ArrayPrototypePush, - MathMin, - TypedArrayPrototypeSubarray, - TypedArrayPrototypeSet, -} = primordials; - -const DEFAULT_BUFFER_SIZE = 32 * 1024; -// Seek whence values. -// https://golang.org/pkg/io/#pkg-constants -const SeekMode = { - 0: "Start", - 1: "Current", - 2: "End", - - Start: 0, - Current: 1, - End: 2, -}; - -async function copy( - src, - dst, - options, -) { - let n = 0; - const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; - const b = new Uint8Array(bufSize); - let gotEOF = false; - while (gotEOF === false) { - const result = await src.read(b); - if (result === null) { - gotEOF = true; - } else { - let nwritten = 0; - while (nwritten < result) { - nwritten += await dst.write( - TypedArrayPrototypeSubarray(b, nwritten, result), - ); - } - n += nwritten; - } - } - return n; -} - -async function* iter( - r, - options, -) { - const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; - const b = new Uint8Array(bufSize); - while (true) { - const result = await r.read(b); - if (result === null) { - break; - } - - yield TypedArrayPrototypeSubarray(b, 0, result); - } -} - -function* iterSync( - r, - options, -) { - const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE; - const b = new Uint8Array(bufSize); - while (true) { - const result = r.readSync(b); - if (result === null) { - break; - } - - yield TypedArrayPrototypeSubarray(b, 0, result); - } -} - -function readSync(rid, buffer) { - if (buffer.length === 0) { - return 0; - } - - const nread = ops.op_read_sync(rid, buffer); - - return nread === 0 ? null : nread; -} - -async function read(rid, buffer) { - if (buffer.length === 0) { - return 0; - } - - const nread = await core.read(rid, buffer); - - return nread === 0 ? null : nread; -} - -function writeSync(rid, data) { - return ops.op_write_sync(rid, data); -} - -function write(rid, data) { - return core.write(rid, data); -} - -const READ_PER_ITER = 64 * 1024; // 64kb - -function readAll(r) { - return readAllInner(r); -} -async function readAllInner(r, options) { - const buffers = []; - const signal = options?.signal ?? null; - while (true) { - signal?.throwIfAborted(); - const buf = new Uint8Array(READ_PER_ITER); - const read = await r.read(buf); - if (typeof read == "number") { - ArrayPrototypePush(buffers, new Uint8Array(buf.buffer, 0, read)); - } else { - break; - } - } - signal?.throwIfAborted(); - - return concatBuffers(buffers); -} - -function readAllSync(r) { - const buffers = []; - - while (true) { - const buf = new Uint8Array(READ_PER_ITER); - const read = r.readSync(buf); - if (typeof read == "number") { - ArrayPrototypePush(buffers, TypedArrayPrototypeSubarray(buf, 0, read)); - } else { - break; - } - } - - return concatBuffers(buffers); -} - -function concatBuffers(buffers) { - let totalLen = 0; - for (let i = 0; i < buffers.length; ++i) { - totalLen += buffers[i].byteLength; - } - - const contents = new Uint8Array(totalLen); - - let n = 0; - for (let i = 0; i < buffers.length; ++i) { - const buf = buffers[i]; - TypedArrayPrototypeSet(contents, buf, n); - n += buf.byteLength; - } - - return contents; -} - -function readAllSyncSized(r, size) { - const buf = new Uint8Array(size + 1); // 1B to detect extended files - let cursor = 0; - - while (cursor < size) { - const sliceEnd = MathMin(size + 1, cursor + READ_PER_ITER); - const slice = TypedArrayPrototypeSubarray(buf, cursor, sliceEnd); - const read = r.readSync(slice); - if (typeof read == "number") { - cursor += read; - } else { - break; - } - } - - // Handle truncated or extended files during read - if (cursor > size) { - // Read remaining and concat - return concatBuffers([buf, readAllSync(r)]); - } else { // cursor == size - return TypedArrayPrototypeSubarray(buf, 0, cursor); - } -} - -async function readAllInnerSized(r, size, options) { - const buf = new Uint8Array(size + 1); // 1B to detect extended files - let cursor = 0; - const signal = options?.signal ?? null; - while (cursor < size) { - signal?.throwIfAborted(); - const sliceEnd = MathMin(size + 1, cursor + READ_PER_ITER); - const slice = TypedArrayPrototypeSubarray(buf, cursor, sliceEnd); - const read = await r.read(slice); - if (typeof read == "number") { - cursor += read; - } else { - break; - } - } - signal?.throwIfAborted(); - - // Handle truncated or extended files during read - if (cursor > size) { - // Read remaining and concat - return concatBuffers([buf, await readAllInner(r, options)]); - } else { - return TypedArrayPrototypeSubarray(buf, 0, cursor); - } -} - -export { - copy, - iter, - iterSync, - read, - readAll, - readAllInner, - readAllInnerSized, - readAllSync, - readAllSyncSized, - readSync, - SeekMode, - write, - writeSync, -}; diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js index f380ca7eb..654895254 100644 --- a/runtime/js/40_files.js +++ b/runtime/js/40_files.js @@ -2,7 +2,7 @@ const core = globalThis.Deno.core; const ops = core.ops; -import { read, readSync, write, writeSync } from "internal:runtime/js/12_io.js"; +import { read, readSync, write, writeSync } from "internal:deno_io/12_io.js"; import { fstat, fstatSync, diff --git a/runtime/js/40_process.js b/runtime/js/40_process.js index a949e48ed..601c79975 100644 --- a/runtime/js/40_process.js +++ b/runtime/js/40_process.js @@ -3,7 +3,7 @@ const core = globalThis.Deno.core; const ops = core.ops; import { FsFile } from "internal:runtime/js/40_files.js"; -import { readAll } from "internal:runtime/js/12_io.js"; +import { readAll } from "internal:deno_io/12_io.js"; import { pathFromURL } from "internal:runtime/js/06_util.js"; import { assert } from "internal:deno_web/00_infra.js"; const primordials = globalThis.__bootstrap.primordials; diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 7b9d8e6e6..8ad285e42 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -14,7 +14,7 @@ import * as build from "internal:runtime/js/01_build.js"; import * as errors from "internal:runtime/js/01_errors.js"; import * as version from "internal:runtime/js/01_version.ts"; import * as permissions from "internal:runtime/js/10_permissions.js"; -import * as io from "internal:runtime/js/12_io.js"; +import * as io from "internal:deno_io/12_io.js"; import * as buffer from "internal:runtime/js/13_buffer.js"; import * as fs from "internal:runtime/js/30_fs.js"; import * as os from "internal:runtime/js/30_os.js"; diff --git a/runtime/lib.rs b/runtime/lib.rs index 26e8d9bb4..6bb84698d 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -9,6 +9,7 @@ pub use deno_fetch; pub use deno_ffi; pub use deno_flash; pub use deno_http; +pub use deno_io; pub use deno_napi; pub use deno_net; pub use deno_node; diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs index 3b22bb696..546d1d68a 100644 --- a/runtime/ops/fs.rs +++ b/runtime/ops/fs.rs @@ -1,6 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Some deserializer fields are only used on Unix and Windows build fails without it -use super::io::StdFileResource; use super::utils::into_string; use crate::fs_util::canonicalize_path; use deno_core::error::custom_error; @@ -15,6 +14,7 @@ use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use deno_crypto::rand::thread_rng; use deno_crypto::rand::Rng; +use deno_io::StdFileResource; use log::debug; use serde::Deserialize; use serde::Serialize; diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs deleted file mode 100644 index d0ee116a5..000000000 --- a/runtime/ops/io.rs +++ /dev/null @@ -1,715 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::resource_unavailable; -use deno_core::error::AnyError; -use deno_core::op; -use deno_core::parking_lot::Mutex; -use deno_core::AsyncMutFuture; -use deno_core::AsyncRefCell; -use deno_core::AsyncResult; -use deno_core::BufMutView; -use deno_core::BufView; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::TaskQueue; -use once_cell::sync::Lazy; -use std::borrow::Cow; -use std::cell::RefCell; -use std::fs::File as StdFile; -use std::io::ErrorKind; -use std::io::Read; -use std::io::Write; -use std::rc::Rc; -use std::sync::Arc; -use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWrite; -use tokio::io::AsyncWriteExt; -use tokio::process; - -#[cfg(unix)] -use std::os::unix::io::FromRawFd; - -#[cfg(windows)] -use std::os::windows::io::FromRawHandle; -#[cfg(windows)] -use winapi::um::processenv::GetStdHandle; -#[cfg(windows)] -use winapi::um::winbase; - -// Store the stdio fd/handles in global statics in order to keep them -// alive for the duration of the application since the last handle/fd -// being dropped will close the corresponding pipe. -#[cfg(unix)] -pub static STDIN_HANDLE: Lazy = Lazy::new(|| { - // SAFETY: corresponds to OS stdin - unsafe { StdFile::from_raw_fd(0) } -}); -#[cfg(unix)] -pub static STDOUT_HANDLE: Lazy = Lazy::new(|| { - // SAFETY: corresponds to OS stdout - unsafe { StdFile::from_raw_fd(1) } -}); -#[cfg(unix)] -pub static STDERR_HANDLE: Lazy = Lazy::new(|| { - // SAFETY: corresponds to OS stderr - unsafe { StdFile::from_raw_fd(2) } -}); - -#[cfg(windows)] -pub static STDIN_HANDLE: Lazy = Lazy::new(|| { - // SAFETY: corresponds to OS stdin - unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) } -}); -#[cfg(windows)] -pub static STDOUT_HANDLE: Lazy = Lazy::new(|| { - // SAFETY: corresponds to OS stdout - unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) } -}); -#[cfg(windows)] -pub static STDERR_HANDLE: Lazy = Lazy::new(|| { - // SAFETY: corresponds to OS stderr - unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) } -}); - -pub fn init() -> Extension { - Extension::builder("deno_io") - .ops(vec![op_read_sync::decl(), op_write_sync::decl()]) - .build() -} - -pub enum StdioPipe { - Inherit, - File(StdFile), -} - -impl Default for StdioPipe { - fn default() -> Self { - Self::Inherit - } -} - -impl Clone for StdioPipe { - fn clone(&self) -> Self { - match self { - StdioPipe::Inherit => StdioPipe::Inherit, - StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()), - } - } -} - -/// Specify how stdin, stdout, and stderr are piped. -/// By default, inherits from the process. -#[derive(Clone, Default)] -pub struct Stdio { - pub stdin: StdioPipe, - pub stdout: StdioPipe, - pub stderr: StdioPipe, -} - -pub fn init_stdio(stdio: Stdio) -> Extension { - // todo(dsheret): don't do this? Taking out the writers was necessary to prevent invalid handle panics - let stdio = Rc::new(RefCell::new(Some(stdio))); - - Extension::builder("deno_stdio") - .middleware(|op| match op.name { - "op_print" => op_print::decl(), - _ => op, - }) - .state(move |state| { - let stdio = stdio - .borrow_mut() - .take() - .expect("Extension only supports being used once."); - let t = &mut state.resource_table; - - let rid = t.add(StdFileResource::stdio( - match stdio.stdin { - StdioPipe::Inherit => StdFileResourceInner { - kind: StdFileResourceKind::Stdin, - file: STDIN_HANDLE.try_clone().unwrap(), - }, - StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), - }, - "stdin", - )); - assert_eq!(rid, 0, "stdin must have ResourceId 0"); - - let rid = t.add(StdFileResource::stdio( - match stdio.stdout { - StdioPipe::Inherit => StdFileResourceInner { - kind: StdFileResourceKind::Stdout, - file: STDOUT_HANDLE.try_clone().unwrap(), - }, - StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), - }, - "stdout", - )); - assert_eq!(rid, 1, "stdout must have ResourceId 1"); - - let rid = t.add(StdFileResource::stdio( - match stdio.stderr { - StdioPipe::Inherit => StdFileResourceInner { - kind: StdFileResourceKind::Stderr, - file: STDERR_HANDLE.try_clone().unwrap(), - }, - StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), - }, - "stderr", - )); - assert_eq!(rid, 2, "stderr must have ResourceId 2"); - Ok(()) - }) - .build() -} - -#[cfg(unix)] -use nix::sys::termios; - -#[derive(Default)] -pub struct TtyMetadata { - #[cfg(unix)] - pub mode: Option, -} - -#[derive(Default)] -pub struct FileMetadata { - pub tty: TtyMetadata, -} - -#[derive(Debug)] -pub struct WriteOnlyResource { - stream: AsyncRefCell, -} - -impl From for WriteOnlyResource { - fn from(stream: S) -> Self { - Self { - stream: stream.into(), - } - } -} - -impl WriteOnlyResource -where - S: AsyncWrite + Unpin + 'static, -{ - pub fn borrow_mut(self: &Rc) -> AsyncMutFuture { - RcRef::map(self, |r| &r.stream).borrow_mut() - } - - async fn write(self: Rc, data: &[u8]) -> Result { - let mut stream = self.borrow_mut().await; - let nwritten = stream.write(data).await?; - Ok(nwritten) - } - - async fn shutdown(self: Rc) -> Result<(), AnyError> { - let mut stream = self.borrow_mut().await; - stream.shutdown().await?; - Ok(()) - } - - pub fn into_inner(self) -> S { - self.stream.into_inner() - } -} - -#[derive(Debug)] -pub struct ReadOnlyResource { - stream: AsyncRefCell, - cancel_handle: CancelHandle, -} - -impl From for ReadOnlyResource { - fn from(stream: S) -> Self { - Self { - stream: stream.into(), - cancel_handle: Default::default(), - } - } -} - -impl ReadOnlyResource -where - S: AsyncRead + Unpin + 'static, -{ - pub fn borrow_mut(self: &Rc) -> AsyncMutFuture { - RcRef::map(self, |r| &r.stream).borrow_mut() - } - - pub fn cancel_handle(self: &Rc) -> RcRef { - RcRef::map(self, |r| &r.cancel_handle) - } - - pub fn cancel_read_ops(&self) { - self.cancel_handle.cancel() - } - - async fn read(self: Rc, data: &mut [u8]) -> Result { - let mut rd = self.borrow_mut().await; - let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; - Ok(nread) - } - - pub fn into_inner(self) -> S { - self.stream.into_inner() - } -} - -pub type ChildStdinResource = WriteOnlyResource; - -impl Resource for ChildStdinResource { - fn name(&self) -> Cow { - "childStdin".into() - } - - deno_core::impl_writable!(); - - fn shutdown(self: Rc) -> AsyncResult<()> { - Box::pin(self.shutdown()) - } -} - -pub type ChildStdoutResource = ReadOnlyResource; - -impl Resource for ChildStdoutResource { - deno_core::impl_readable_byob!(); - - fn name(&self) -> Cow { - "childStdout".into() - } - - fn close(self: Rc) { - self.cancel_read_ops(); - } -} - -pub type ChildStderrResource = ReadOnlyResource; - -impl Resource for ChildStderrResource { - deno_core::impl_readable_byob!(); - - fn name(&self) -> Cow { - "childStderr".into() - } - - fn close(self: Rc) { - self.cancel_read_ops(); - } -} - -#[derive(Clone, Copy)] -enum StdFileResourceKind { - File, - // For stdout and stderr, we sometimes instead use std::io::stdout() directly, - // because we get some Windows specific functionality for free by using Rust - // std's wrappers. So we take a bit of a complexity hit in order to not - // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs - Stdin, - Stdout, - Stderr, -} - -struct StdFileResourceInner { - kind: StdFileResourceKind, - file: StdFile, -} - -impl StdFileResourceInner { - pub fn file(fs_file: StdFile) -> Self { - StdFileResourceInner { - kind: StdFileResourceKind::File, - file: fs_file, - } - } - - pub fn with_file(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R { - f(&mut self.file) - } - - pub fn try_clone(&self) -> Result { - Ok(Self { - kind: self.kind, - file: self.file.try_clone()?, - }) - } - - pub fn write_and_maybe_flush( - &mut self, - buf: &[u8], - ) -> Result { - // Rust will line buffer and we don't want that behavior - // (see https://github.com/denoland/deno/issues/948), so flush stdout and stderr. - // Although an alternative solution could be to bypass Rust's std by - // using the raw fds/handles, it will cause encoding issues on Windows - // that we get solved for free by using Rust's stdio wrappers (see - // std/src/sys/windows/stdio.rs in Rust's source code). - match self.kind { - StdFileResourceKind::File => Ok(self.file.write(buf)?), - StdFileResourceKind::Stdin => { - Err(Into::::into(ErrorKind::Unsupported).into()) - } - StdFileResourceKind::Stdout => { - // bypass the file and use std::io::stdout() - let mut stdout = std::io::stdout().lock(); - let nwritten = stdout.write(buf)?; - stdout.flush()?; - Ok(nwritten) - } - StdFileResourceKind::Stderr => { - // bypass the file and use std::io::stderr() - let mut stderr = std::io::stderr().lock(); - let nwritten = stderr.write(buf)?; - stderr.flush()?; - Ok(nwritten) - } - } - } - - pub fn write_all_and_maybe_flush( - &mut self, - buf: &[u8], - ) -> Result<(), AnyError> { - // this method exists instead of using a `Write` implementation - // so that we can acquire the locks once and do both actions - match self.kind { - StdFileResourceKind::File => Ok(self.file.write_all(buf)?), - StdFileResourceKind::Stdin => { - Err(Into::::into(ErrorKind::Unsupported).into()) - } - StdFileResourceKind::Stdout => { - // bypass the file and use std::io::stdout() - let mut stdout = std::io::stdout().lock(); - stdout.write_all(buf)?; - stdout.flush()?; - Ok(()) - } - StdFileResourceKind::Stderr => { - // bypass the file and use std::io::stderr() - let mut stderr = std::io::stderr().lock(); - stderr.write_all(buf)?; - stderr.flush()?; - Ok(()) - } - } - } -} - -impl Read for StdFileResourceInner { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self.kind { - StdFileResourceKind::File | StdFileResourceKind::Stdin => { - self.file.read(buf) - } - StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => { - Err(ErrorKind::Unsupported.into()) - } - } - } -} - -struct StdFileResourceCellValue { - inner: StdFileResourceInner, - meta_data: Arc>, -} - -impl StdFileResourceCellValue { - pub fn try_clone(&self) -> Result { - Ok(Self { - inner: self.inner.try_clone()?, - meta_data: self.meta_data.clone(), - }) - } -} - -pub struct StdFileResource { - name: String, - // We can't use an AsyncRefCell here because we need to allow - // access to the resource synchronously at any time and - // asynchronously one at a time in order - cell: RefCell>, - // Used to keep async actions in order and only allow one - // to occur at a time - cell_async_task_queue: TaskQueue, -} - -impl StdFileResource { - fn stdio(inner: StdFileResourceInner, name: &str) -> Self { - Self { - cell: RefCell::new(Some(StdFileResourceCellValue { - inner, - meta_data: Default::default(), - })), - cell_async_task_queue: Default::default(), - name: name.to_string(), - } - } - - pub fn fs_file(fs_file: StdFile) -> Self { - Self { - cell: RefCell::new(Some(StdFileResourceCellValue { - inner: StdFileResourceInner::file(fs_file), - meta_data: Default::default(), - })), - cell_async_task_queue: Default::default(), - name: "fsFile".to_string(), - } - } - - fn with_inner_and_metadata( - self: Rc, - action: impl FnOnce( - &mut StdFileResourceInner, - &Arc>, - ) -> Result, - ) -> Result { - match self.cell.try_borrow_mut() { - Ok(mut cell) => { - let mut file = cell.take().unwrap(); - let result = action(&mut file.inner, &file.meta_data); - cell.replace(file); - result - } - Err(_) => Err(resource_unavailable()), - } - } - - async fn with_inner_blocking_task( - self: Rc, - action: F, - ) -> R - where - F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, - { - // we want to restrict this to one async action at a time - let _permit = self.cell_async_task_queue.acquire().await; - // we take the value out of the cell, use it on a blocking task, - // then put it back into the cell when we're done - let mut did_take = false; - let mut cell_value = { - let mut cell = self.cell.borrow_mut(); - match cell.as_mut().unwrap().try_clone() { - Ok(value) => value, - Err(_) => { - did_take = true; - cell.take().unwrap() - } - } - }; - let (cell_value, result) = tokio::task::spawn_blocking(move || { - let result = action(&mut cell_value.inner); - (cell_value, result) - }) - .await - .unwrap(); - - if did_take { - // put it back - self.cell.borrow_mut().replace(cell_value); - } - - result - } - - async fn read_byob( - self: Rc, - mut buf: BufMutView, - ) -> Result<(usize, BufMutView), AnyError> { - self - .with_inner_blocking_task(move |inner| { - let nread = inner.read(&mut buf)?; - Ok((nread, buf)) - }) - .await - } - - async fn write(self: Rc, data: &[u8]) -> Result { - let buf = data.to_owned(); - self - .with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf)) - .await - } - - async fn write_all(self: Rc, data: &[u8]) -> Result<(), AnyError> { - let buf = data.to_owned(); - self - .with_inner_blocking_task(move |inner| { - inner.write_all_and_maybe_flush(&buf) - }) - .await - } - - fn with_resource( - state: &mut OpState, - rid: ResourceId, - f: F, - ) -> Result - where - F: FnOnce(Rc) -> Result, - { - let resource = state.resource_table.get::(rid)?; - f(resource) - } - - pub fn with_file( - state: &mut OpState, - rid: ResourceId, - f: F, - ) -> Result - where - F: FnOnce(&mut StdFile) -> Result, - { - Self::with_resource(state, rid, move |resource| { - resource.with_inner_and_metadata(move |inner, _| inner.with_file(f)) - }) - } - - pub fn with_file_and_metadata( - state: &mut OpState, - rid: ResourceId, - f: F, - ) -> Result - where - F: FnOnce(&mut StdFile, &Arc>) -> Result, - { - Self::with_resource(state, rid, move |resource| { - resource.with_inner_and_metadata(move |inner, metadata| { - inner.with_file(move |file| f(file, metadata)) - }) - }) - } - - pub async fn with_file_blocking_task( - state: Rc>, - rid: ResourceId, - f: F, - ) -> Result - where - F: (FnOnce(&mut StdFile) -> Result) + Send + 'static, - { - let resource = state - .borrow_mut() - .resource_table - .get::(rid)?; - - resource - .with_inner_blocking_task(move |inner| inner.with_file(f)) - .await - } - - pub fn clone_file( - state: &mut OpState, - rid: ResourceId, - ) -> Result { - Self::with_file(state, rid, move |std_file| { - std_file.try_clone().map_err(AnyError::from) - }) - } - - pub fn as_stdio( - state: &mut OpState, - rid: u32, - ) -> Result { - Self::with_resource(state, rid, |resource| { - resource.with_inner_and_metadata(|inner, _| match inner.kind { - StdFileResourceKind::File => { - let file = inner.file.try_clone()?; - Ok(file.into()) - } - _ => Ok(std::process::Stdio::inherit()), - }) - }) - } -} - -impl Resource for StdFileResource { - fn name(&self) -> Cow { - self.name.as_str().into() - } - - fn read(self: Rc, limit: usize) -> AsyncResult { - Box::pin(async move { - let vec = vec![0; limit]; - let buf = BufMutView::from(vec); - let (nread, buf) = self.read_byob(buf).await?; - let mut vec = buf.unwrap_vec(); - if vec.len() != nread { - vec.truncate(nread); - } - Ok(BufView::from(vec)) - }) - } - - fn read_byob( - self: Rc, - buf: deno_core::BufMutView, - ) -> AsyncResult<(usize, deno_core::BufMutView)> { - Box::pin(self.read_byob(buf)) - } - - deno_core::impl_writable!(with_all); - - #[cfg(unix)] - fn backing_fd(self: Rc) -> Option { - use std::os::unix::io::AsRawFd; - self - .with_inner_and_metadata(move |std_file, _| { - Ok(std_file.with_file(|f| f.as_raw_fd())) - }) - .ok() - } -} - -// override op_print to use the stdout and stderr in the resource table -#[op] -pub fn op_print( - state: &mut OpState, - msg: &str, - is_err: bool, -) -> Result<(), AnyError> { - let rid = if is_err { 2 } else { 1 }; - StdFileResource::with_resource(state, rid, move |resource| { - resource.with_inner_and_metadata(|inner, _| { - inner.write_all_and_maybe_flush(msg.as_bytes())?; - Ok(()) - }) - }) -} - -#[op(fast)] -fn op_read_sync( - state: &mut OpState, - rid: u32, - buf: &mut [u8], -) -> Result { - StdFileResource::with_resource(state, rid, move |resource| { - resource.with_inner_and_metadata(|inner, _| { - inner - .read(buf) - .map(|n: usize| n as u32) - .map_err(AnyError::from) - }) - }) -} - -#[op(fast)] -fn op_write_sync( - state: &mut OpState, - rid: u32, - buf: &mut [u8], -) -> Result { - StdFileResource::with_resource(state, rid, move |resource| { - resource.with_inner_and_metadata(|inner, _| { - inner - .write_and_maybe_flush(buf) - .map(|nwritten: usize| nwritten as u32) - .map_err(AnyError::from) - }) - }) -} diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index ce7c52d64..0564474b1 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -3,7 +3,6 @@ pub mod fs; pub mod fs_events; pub mod http; -pub mod io; pub mod os; pub mod permissions; pub mod process; diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 9c7a3243a..ca37c08b7 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -1,12 +1,12 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use super::io::ChildStderrResource; -use super::io::ChildStdinResource; -use super::io::ChildStdoutResource; -use super::io::StdFileResource; use crate::permissions::PermissionsContainer; use deno_core::error::AnyError; use deno_core::op; +use deno_io::ChildStderrResource; +use deno_io::ChildStdinResource; +use deno_io::ChildStdoutResource; +use deno_io::StdFileResource; use deno_core::serde_json; use deno_core::AsyncMutFuture; diff --git a/runtime/ops/spawn.rs b/runtime/ops/spawn.rs index 089a53c23..884c46229 100644 --- a/runtime/ops/spawn.rs +++ b/runtime/ops/spawn.rs @@ -1,8 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use super::io::ChildStderrResource; -use super::io::ChildStdinResource; -use super::io::ChildStdoutResource; use super::process::Stdio; use super::process::StdioOrRid; use crate::permissions::PermissionsContainer; @@ -13,6 +10,9 @@ use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; +use deno_io::ChildStderrResource; +use deno_io::ChildStdinResource; +use deno_io::ChildStdoutResource; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs index 739b84ab3..bd9331d4f 100644 --- a/runtime/ops/tty.rs +++ b/runtime/ops/tty.rs @@ -1,10 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use super::io::StdFileResource; use deno_core::error::AnyError; use deno_core::op; use deno_core::Extension; use deno_core::OpState; +use deno_io::StdFileResource; use std::io::Error; #[cfg(unix)] diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index d52ea337f..fbf10905f 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -2,7 +2,6 @@ use crate::colors; use crate::inspector_server::InspectorServer; use crate::ops; -use crate::ops::io::Stdio; use crate::permissions::PermissionsContainer; use crate::tokio_util::run_local; use crate::worker::FormatJsErrorFn; @@ -33,6 +32,7 @@ use deno_core::RuntimeOptions; use deno_core::SharedArrayBufferStore; use deno_core::Snapshot; use deno_core::SourceMapGetter; +use deno_io::Stdio; use deno_node::RequireNpmResolver; use deno_tls::rustls::RootCertStore; use deno_web::create_entangled_message_port; @@ -426,8 +426,7 @@ impl WebWorker { // Extensions providing Deno.* features ops::fs_events::init(), ops::fs::init::(), - ops::io::init(), - ops::io::init_stdio(options.stdio), + deno_io::init(options.stdio), deno_tls::init(), deno_net::init::( options.root_cert_store.clone(), diff --git a/runtime/worker.rs b/runtime/worker.rs index 1de7a8617..1f567837a 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -28,6 +28,7 @@ use deno_core::RuntimeOptions; use deno_core::SharedArrayBufferStore; use deno_core::Snapshot; use deno_core::SourceMapGetter; +use deno_io::Stdio; use deno_node::RequireNpmResolver; use deno_tls::rustls::RootCertStore; use deno_web::BlobStore; @@ -35,7 +36,6 @@ use log::debug; use crate::inspector_server::InspectorServer; use crate::ops; -use crate::ops::io::Stdio; use crate::permissions::PermissionsContainer; use crate::BootstrapOptions; @@ -257,8 +257,7 @@ impl MainWorker { ops::spawn::init(), ops::fs_events::init(), ops::fs::init::(), - ops::io::init(), - ops::io::init_stdio(options.stdio), + deno_io::init(options.stdio), deno_tls::init(), deno_net::init::( options.root_cert_store.clone(), -- cgit v1.2.3