diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-03-04 20:39:48 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-05 00:39:48 +0000 |
commit | 7afa3aceb04e6b2c8820b7326d6f648db6b571c6 (patch) | |
tree | 951ff96a156dee83cd4fd7615e3b422ca0cda448 /ext/io/lib.rs | |
parent | 4894e500cf8c60c2971d186d6a21b994bf36e7d1 (diff) |
refactor(runtime): factor out deno_io extension crate (#18001)
This is a prerequisite to factor out FS ops to a separate crate.
Diffstat (limited to 'ext/io/lib.rs')
-rw-r--r-- | ext/io/lib.rs | 713 |
1 files changed, 713 insertions, 0 deletions
diff --git a/ext/io/lib.rs b/ext/io/lib.rs new file mode 100644 index 000000000..f8dde139f --- /dev/null +++ b/ext/io/lib.rs @@ -0,0 +1,713 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::resource_unavailable; +use deno_core::error::AnyError; +use deno_core::include_js_files; +use deno_core::op; +use deno_core::parking_lot::Mutex; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufMutView; +use deno_core::BufView; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::TaskQueue; +use once_cell::sync::Lazy; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::File as StdFile; +use std::io::ErrorKind; +use std::io::Read; +use std::io::Write; +use std::rc::Rc; +use std::sync::Arc; +use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; +use tokio::process; + +#[cfg(unix)] +use std::os::unix::io::FromRawFd; + +#[cfg(windows)] +use std::os::windows::io::FromRawHandle; +#[cfg(windows)] +use winapi::um::processenv::GetStdHandle; +#[cfg(windows)] +use winapi::um::winbase; + +// Store the stdio fd/handles in global statics in order to keep them +// alive for the duration of the application since the last handle/fd +// being dropped will close the corresponding pipe. +#[cfg(unix)] +pub static STDIN_HANDLE: Lazy<StdFile> = Lazy::new(|| { + // SAFETY: corresponds to OS stdin + unsafe { StdFile::from_raw_fd(0) } +}); +#[cfg(unix)] +pub static STDOUT_HANDLE: Lazy<StdFile> = Lazy::new(|| { + // SAFETY: corresponds to OS stdout + unsafe { StdFile::from_raw_fd(1) } +}); +#[cfg(unix)] +pub static STDERR_HANDLE: Lazy<StdFile> = Lazy::new(|| { + // SAFETY: corresponds to OS stderr + unsafe { StdFile::from_raw_fd(2) } +}); + +#[cfg(windows)] +pub static STDIN_HANDLE: Lazy<StdFile> = Lazy::new(|| { + // SAFETY: corresponds to OS stdin + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_INPUT_HANDLE)) } +}); +#[cfg(windows)] +pub static STDOUT_HANDLE: Lazy<StdFile> = Lazy::new(|| { + // SAFETY: corresponds to OS stdout + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_OUTPUT_HANDLE)) } +}); +#[cfg(windows)] +pub static STDERR_HANDLE: Lazy<StdFile> = Lazy::new(|| { + // SAFETY: corresponds to OS stderr + unsafe { StdFile::from_raw_handle(GetStdHandle(winbase::STD_ERROR_HANDLE)) } +}); + +pub fn init(stdio: Stdio) -> Extension { + // todo(dsheret): don't do this? Taking out the writers was necessary to prevent invalid handle panics + let stdio = Rc::new(RefCell::new(Some(stdio))); + + Extension::builder("deno_io") + .ops(vec![op_read_sync::decl(), op_write_sync::decl()]) + .dependencies(vec!["deno_web"]) + .esm(include_js_files!("12_io.js",)) + .middleware(|op| match op.name { + "op_print" => op_print::decl(), + _ => op, + }) + .state(move |state| { + let stdio = stdio + .borrow_mut() + .take() + .expect("Extension only supports being used once."); + let t = &mut state.resource_table; + + let rid = t.add(StdFileResource::stdio( + match stdio.stdin { + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stdin, + file: STDIN_HANDLE.try_clone().unwrap(), + }, + StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + }, + "stdin", + )); + assert_eq!(rid, 0, "stdin must have ResourceId 0"); + + let rid = t.add(StdFileResource::stdio( + match stdio.stdout { + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stdout, + file: STDOUT_HANDLE.try_clone().unwrap(), + }, + StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + }, + "stdout", + )); + assert_eq!(rid, 1, "stdout must have ResourceId 1"); + + let rid = t.add(StdFileResource::stdio( + match stdio.stderr { + StdioPipe::Inherit => StdFileResourceInner { + kind: StdFileResourceKind::Stderr, + file: STDERR_HANDLE.try_clone().unwrap(), + }, + StdioPipe::File(pipe) => StdFileResourceInner::file(pipe), + }, + "stderr", + )); + assert_eq!(rid, 2, "stderr must have ResourceId 2"); + Ok(()) + }) + .build() +} + +pub enum StdioPipe { + Inherit, + File(StdFile), +} + +impl Default for StdioPipe { + fn default() -> Self { + Self::Inherit + } +} + +impl Clone for StdioPipe { + fn clone(&self) -> Self { + match self { + StdioPipe::Inherit => StdioPipe::Inherit, + StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()), + } + } +} + +/// Specify how stdin, stdout, and stderr are piped. +/// By default, inherits from the process. +#[derive(Clone, Default)] +pub struct Stdio { + pub stdin: StdioPipe, + pub stdout: StdioPipe, + pub stderr: StdioPipe, +} + +#[cfg(unix)] +use nix::sys::termios; + +#[derive(Default)] +pub struct TtyMetadata { + #[cfg(unix)] + pub mode: Option<termios::Termios>, +} + +#[derive(Default)] +pub struct FileMetadata { + pub tty: TtyMetadata, +} + +#[derive(Debug)] +pub struct WriteOnlyResource<S> { + stream: AsyncRefCell<S>, +} + +impl<S: 'static> From<S> for WriteOnlyResource<S> { + fn from(stream: S) -> Self { + Self { + stream: stream.into(), + } + } +} + +impl<S> WriteOnlyResource<S> +where + S: AsyncWrite + Unpin + 'static, +{ + pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> { + RcRef::map(self, |r| &r.stream).borrow_mut() + } + + async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { + let mut stream = self.borrow_mut().await; + let nwritten = stream.write(data).await?; + Ok(nwritten) + } + + async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> { + let mut stream = self.borrow_mut().await; + stream.shutdown().await?; + Ok(()) + } + + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +#[derive(Debug)] +pub struct ReadOnlyResource<S> { + stream: AsyncRefCell<S>, + cancel_handle: CancelHandle, +} + +impl<S: 'static> From<S> for ReadOnlyResource<S> { + fn from(stream: S) -> Self { + Self { + stream: stream.into(), + cancel_handle: Default::default(), + } + } +} + +impl<S> ReadOnlyResource<S> +where + S: AsyncRead + Unpin + 'static, +{ + pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> { + RcRef::map(self, |r| &r.stream).borrow_mut() + } + + pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { + RcRef::map(self, |r| &r.cancel_handle) + } + + pub fn cancel_read_ops(&self) { + self.cancel_handle.cancel() + } + + async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> { + let mut rd = self.borrow_mut().await; + let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) + } + + pub fn into_inner(self) -> S { + self.stream.into_inner() + } +} + +pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>; + +impl Resource for ChildStdinResource { + fn name(&self) -> Cow<str> { + "childStdin".into() + } + + deno_core::impl_writable!(); + + fn shutdown(self: Rc<Self>) -> AsyncResult<()> { + Box::pin(self.shutdown()) + } +} + +pub type ChildStdoutResource = ReadOnlyResource<process::ChildStdout>; + +impl Resource for ChildStdoutResource { + deno_core::impl_readable_byob!(); + + fn name(&self) -> Cow<str> { + "childStdout".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +pub type ChildStderrResource = ReadOnlyResource<process::ChildStderr>; + +impl Resource for ChildStderrResource { + deno_core::impl_readable_byob!(); + + fn name(&self) -> Cow<str> { + "childStderr".into() + } + + fn close(self: Rc<Self>) { + self.cancel_read_ops(); + } +} + +#[derive(Clone, Copy)] +enum StdFileResourceKind { + File, + // For stdout and stderr, we sometimes instead use std::io::stdout() directly, + // because we get some Windows specific functionality for free by using Rust + // std's wrappers. So we take a bit of a complexity hit in order to not + // have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs + Stdin, + Stdout, + Stderr, +} + +struct StdFileResourceInner { + kind: StdFileResourceKind, + file: StdFile, +} + +impl StdFileResourceInner { + pub fn file(fs_file: StdFile) -> Self { + StdFileResourceInner { + kind: StdFileResourceKind::File, + file: fs_file, + } + } + + pub fn with_file<R>(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R { + f(&mut self.file) + } + + pub fn try_clone(&self) -> Result<Self, std::io::Error> { + Ok(Self { + kind: self.kind, + file: self.file.try_clone()?, + }) + } + + pub fn write_and_maybe_flush( + &mut self, + buf: &[u8], + ) -> Result<usize, AnyError> { + // Rust will line buffer and we don't want that behavior + // (see https://github.com/denoland/deno/issues/948), so flush stdout and stderr. + // Although an alternative solution could be to bypass Rust's std by + // using the raw fds/handles, it will cause encoding issues on Windows + // that we get solved for free by using Rust's stdio wrappers (see + // std/src/sys/windows/stdio.rs in Rust's source code). + match self.kind { + StdFileResourceKind::File => Ok(self.file.write(buf)?), + StdFileResourceKind::Stdin => { + Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into()) + } + StdFileResourceKind::Stdout => { + // bypass the file and use std::io::stdout() + let mut stdout = std::io::stdout().lock(); + let nwritten = stdout.write(buf)?; + stdout.flush()?; + Ok(nwritten) + } + StdFileResourceKind::Stderr => { + // bypass the file and use std::io::stderr() + let mut stderr = std::io::stderr().lock(); + let nwritten = stderr.write(buf)?; + stderr.flush()?; + Ok(nwritten) + } + } + } + + pub fn write_all_and_maybe_flush( + &mut self, + buf: &[u8], + ) -> Result<(), AnyError> { + // this method exists instead of using a `Write` implementation + // so that we can acquire the locks once and do both actions + match self.kind { + StdFileResourceKind::File => Ok(self.file.write_all(buf)?), + StdFileResourceKind::Stdin => { + Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into()) + } + StdFileResourceKind::Stdout => { + // bypass the file and use std::io::stdout() + let mut stdout = std::io::stdout().lock(); + stdout.write_all(buf)?; + stdout.flush()?; + Ok(()) + } + StdFileResourceKind::Stderr => { + // bypass the file and use std::io::stderr() + let mut stderr = std::io::stderr().lock(); + stderr.write_all(buf)?; + stderr.flush()?; + Ok(()) + } + } + } +} + +impl Read for StdFileResourceInner { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + match self.kind { + StdFileResourceKind::File | StdFileResourceKind::Stdin => { + self.file.read(buf) + } + StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => { + Err(ErrorKind::Unsupported.into()) + } + } + } +} + +struct StdFileResourceCellValue { + inner: StdFileResourceInner, + meta_data: Arc<Mutex<FileMetadata>>, +} + +impl StdFileResourceCellValue { + pub fn try_clone(&self) -> Result<Self, std::io::Error> { + Ok(Self { + inner: self.inner.try_clone()?, + meta_data: self.meta_data.clone(), + }) + } +} + +pub struct StdFileResource { + name: String, + // We can't use an AsyncRefCell here because we need to allow + // access to the resource synchronously at any time and + // asynchronously one at a time in order + cell: RefCell<Option<StdFileResourceCellValue>>, + // Used to keep async actions in order and only allow one + // to occur at a time + cell_async_task_queue: TaskQueue, +} + +impl StdFileResource { + fn stdio(inner: StdFileResourceInner, name: &str) -> Self { + Self { + cell: RefCell::new(Some(StdFileResourceCellValue { + inner, + meta_data: Default::default(), + })), + cell_async_task_queue: Default::default(), + name: name.to_string(), + } + } + + pub fn fs_file(fs_file: StdFile) -> Self { + Self { + cell: RefCell::new(Some(StdFileResourceCellValue { + inner: StdFileResourceInner::file(fs_file), + meta_data: Default::default(), + })), + cell_async_task_queue: Default::default(), + name: "fsFile".to_string(), + } + } + + fn with_inner_and_metadata<TResult>( + self: Rc<Self>, + action: impl FnOnce( + &mut StdFileResourceInner, + &Arc<Mutex<FileMetadata>>, + ) -> Result<TResult, AnyError>, + ) -> Result<TResult, AnyError> { + match self.cell.try_borrow_mut() { + Ok(mut cell) => { + let mut file = cell.take().unwrap(); + let result = action(&mut file.inner, &file.meta_data); + cell.replace(file); + result + } + Err(_) => Err(resource_unavailable()), + } + } + + async fn with_inner_blocking_task<F, R: Send + 'static>( + self: Rc<Self>, + action: F, + ) -> R + where + F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, + { + // we want to restrict this to one async action at a time + let _permit = self.cell_async_task_queue.acquire().await; + // we take the value out of the cell, use it on a blocking task, + // then put it back into the cell when we're done + let mut did_take = false; + let mut cell_value = { + let mut cell = self.cell.borrow_mut(); + match cell.as_mut().unwrap().try_clone() { + Ok(value) => value, + Err(_) => { + did_take = true; + cell.take().unwrap() + } + } + }; + let (cell_value, result) = tokio::task::spawn_blocking(move || { + let result = action(&mut cell_value.inner); + (cell_value, result) + }) + .await + .unwrap(); + + if did_take { + // put it back + self.cell.borrow_mut().replace(cell_value); + } + + result + } + + async fn read_byob( + self: Rc<Self>, + mut buf: BufMutView, + ) -> Result<(usize, BufMutView), AnyError> { + self + .with_inner_blocking_task(move |inner| { + let nread = inner.read(&mut buf)?; + Ok((nread, buf)) + }) + .await + } + + async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { + let buf = data.to_owned(); + self + .with_inner_blocking_task(move |inner| inner.write_and_maybe_flush(&buf)) + .await + } + + async fn write_all(self: Rc<Self>, data: &[u8]) -> Result<(), AnyError> { + let buf = data.to_owned(); + self + .with_inner_blocking_task(move |inner| { + inner.write_all_and_maybe_flush(&buf) + }) + .await + } + + fn with_resource<F, R>( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result<R, AnyError> + where + F: FnOnce(Rc<StdFileResource>) -> Result<R, AnyError>, + { + let resource = state.resource_table.get::<StdFileResource>(rid)?; + f(resource) + } + + pub fn with_file<F, R>( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result<R, AnyError> + where + F: FnOnce(&mut StdFile) -> Result<R, AnyError>, + { + Self::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(move |inner, _| inner.with_file(f)) + }) + } + + pub fn with_file_and_metadata<F, R>( + state: &mut OpState, + rid: ResourceId, + f: F, + ) -> Result<R, AnyError> + where + F: FnOnce(&mut StdFile, &Arc<Mutex<FileMetadata>>) -> Result<R, AnyError>, + { + Self::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(move |inner, metadata| { + inner.with_file(move |file| f(file, metadata)) + }) + }) + } + + pub async fn with_file_blocking_task<F, R: Send + 'static>( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + f: F, + ) -> Result<R, AnyError> + where + F: (FnOnce(&mut StdFile) -> Result<R, AnyError>) + Send + 'static, + { + let resource = state + .borrow_mut() + .resource_table + .get::<StdFileResource>(rid)?; + + resource + .with_inner_blocking_task(move |inner| inner.with_file(f)) + .await + } + + pub fn clone_file( + state: &mut OpState, + rid: ResourceId, + ) -> Result<StdFile, AnyError> { + Self::with_file(state, rid, move |std_file| { + std_file.try_clone().map_err(AnyError::from) + }) + } + + pub fn as_stdio( + state: &mut OpState, + rid: u32, + ) -> Result<std::process::Stdio, AnyError> { + Self::with_resource(state, rid, |resource| { + resource.with_inner_and_metadata(|inner, _| match inner.kind { + StdFileResourceKind::File => { + let file = inner.file.try_clone()?; + Ok(file.into()) + } + _ => Ok(std::process::Stdio::inherit()), + }) + }) + } +} + +impl Resource for StdFileResource { + fn name(&self) -> Cow<str> { + self.name.as_str().into() + } + + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<deno_core::BufView> { + Box::pin(async move { + let vec = vec![0; limit]; + let buf = BufMutView::from(vec); + let (nread, buf) = self.read_byob(buf).await?; + let mut vec = buf.unwrap_vec(); + if vec.len() != nread { + vec.truncate(nread); + } + Ok(BufView::from(vec)) + }) + } + + fn read_byob( + self: Rc<Self>, + buf: deno_core::BufMutView, + ) -> AsyncResult<(usize, deno_core::BufMutView)> { + Box::pin(self.read_byob(buf)) + } + + deno_core::impl_writable!(with_all); + + #[cfg(unix)] + fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> { + use std::os::unix::io::AsRawFd; + self + .with_inner_and_metadata(move |std_file, _| { + Ok(std_file.with_file(|f| f.as_raw_fd())) + }) + .ok() + } +} + +// override op_print to use the stdout and stderr in the resource table +#[op] +pub fn op_print( + state: &mut OpState, + msg: &str, + is_err: bool, +) -> Result<(), AnyError> { + let rid = if is_err { 2 } else { 1 }; + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(|inner, _| { + inner.write_all_and_maybe_flush(msg.as_bytes())?; + Ok(()) + }) + }) +} + +#[op(fast)] +fn op_read_sync( + state: &mut OpState, + rid: u32, + buf: &mut [u8], +) -> Result<u32, AnyError> { + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(|inner, _| { + inner + .read(buf) + .map(|n: usize| n as u32) + .map_err(AnyError::from) + }) + }) +} + +#[op(fast)] +fn op_write_sync( + state: &mut OpState, + rid: u32, + buf: &mut [u8], +) -> Result<u32, AnyError> { + StdFileResource::with_resource(state, rid, move |resource| { + resource.with_inner_and_metadata(|inner, _| { + inner + .write_and_maybe_flush(buf) + .map(|nwritten: usize| nwritten as u32) + .map_err(AnyError::from) + }) + }) +} |