summaryrefslogtreecommitdiff
path: root/runtime/ops/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/io.rs')
-rw-r--r--runtime/ops/io.rs172
1 files changed, 117 insertions, 55 deletions
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 900be1034..0eeab4c24 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -3,6 +3,7 @@
use deno_core::error::resource_unavailable;
use deno_core::error::AnyError;
use deno_core::op;
+use deno_core::parking_lot::Mutex;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
@@ -22,11 +23,13 @@ use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::rc::Rc;
+use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::process;
+use tokio::sync::Semaphore;
#[cfg(unix)]
use std::os::unix::io::FromRawFd;
@@ -124,27 +127,30 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
let t = &mut state.resource_table;
t.add(StdFileResource::stdio(
match stdio.stdin {
- StdioPipe::Inherit => {
- StdFileResourceInner::Stdin(STDIN_HANDLE.try_clone().unwrap())
- }
+ StdioPipe::Inherit => StdFileResourceInner {
+ kind: StdFileResourceKind::Stdin,
+ file: STDIN_HANDLE.try_clone().unwrap(),
+ },
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stdin",
));
t.add(StdFileResource::stdio(
match stdio.stdout {
- StdioPipe::Inherit => {
- StdFileResourceInner::Stdout(STDOUT_HANDLE.try_clone().unwrap())
- }
+ StdioPipe::Inherit => StdFileResourceInner {
+ kind: StdFileResourceKind::Stdout,
+ file: STDOUT_HANDLE.try_clone().unwrap(),
+ },
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stdout",
));
t.add(StdFileResource::stdio(
match stdio.stderr {
- StdioPipe::Inherit => {
- StdFileResourceInner::Stderr(STDERR_HANDLE.try_clone().unwrap())
- }
+ StdioPipe::Inherit => StdFileResourceInner {
+ kind: StdFileResourceKind::Stderr,
+ file: STDERR_HANDLE.try_clone().unwrap(),
+ },
StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
},
"stderr",
@@ -308,29 +314,40 @@ impl Resource for ChildStderrResource {
}
}
-enum StdFileResourceInner {
- File(StdFile),
- Stdin(StdFile),
+#[derive(Clone, Copy)]
+enum StdFileResourceKind {
+ File,
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
// because we get some Windows specific functionality for free by using Rust
// std's wrappers. So we take a bit of a complexity hit in order to not
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
- Stdout(StdFile),
- Stderr(StdFile),
+ Stdin,
+ Stdout,
+ Stderr,
+}
+
+struct StdFileResourceInner {
+ kind: StdFileResourceKind,
+ file: StdFile,
}
impl StdFileResourceInner {
pub fn file(fs_file: StdFile) -> Self {
- StdFileResourceInner::File(fs_file)
+ StdFileResourceInner {
+ kind: StdFileResourceKind::File,
+ file: fs_file,
+ }
}
pub fn with_file<R>(&mut self, f: impl FnOnce(&mut StdFile) -> R) -> R {
- match self {
- Self::File(file)
- | Self::Stdin(file)
- | Self::Stdout(file)
- | Self::Stderr(file) => f(file),
- }
+ f(&mut self.file)
+ }
+
+ pub fn try_clone(&self) -> Result<Self, std::io::Error> {
+ Ok(Self {
+ kind: self.kind,
+ file: self.file.try_clone()?,
+ })
}
pub fn write_and_maybe_flush(
@@ -343,19 +360,19 @@ impl StdFileResourceInner {
// using the raw fds/handles, it will cause encoding issues on Windows
// that we get solved for free by using Rust's stdio wrappers (see
// std/src/sys/windows/stdio.rs in Rust's source code).
- match self {
- Self::File(file) => Ok(file.write(buf)?),
- Self::Stdin(_) => {
+ match self.kind {
+ StdFileResourceKind::File => Ok(self.file.write(buf)?),
+ StdFileResourceKind::Stdin => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
- Self::Stdout(_) => {
+ StdFileResourceKind::Stdout => {
// bypass the file and use std::io::stdout()
let mut stdout = std::io::stdout().lock();
let nwritten = stdout.write(buf)?;
stdout.flush()?;
Ok(nwritten)
}
- Self::Stderr(_) => {
+ StdFileResourceKind::Stderr => {
// bypass the file and use std::io::stderr()
let mut stderr = std::io::stderr().lock();
let nwritten = stderr.write(buf)?;
@@ -371,19 +388,19 @@ impl StdFileResourceInner {
) -> Result<(), AnyError> {
// this method exists instead of using a `Write` implementation
// so that we can acquire the locks once and do both actions
- match self {
- Self::File(file) => Ok(file.write_all(buf)?),
- Self::Stdin(_) => {
+ match self.kind {
+ StdFileResourceKind::File => Ok(self.file.write_all(buf)?),
+ StdFileResourceKind::Stdin => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
- Self::Stdout(_) => {
+ StdFileResourceKind::Stdout => {
// bypass the file and use std::io::stdout()
let mut stdout = std::io::stdout().lock();
stdout.write_all(buf)?;
stdout.flush()?;
Ok(())
}
- Self::Stderr(_) => {
+ StdFileResourceKind::Stderr => {
// bypass the file and use std::io::stderr()
let mut stderr = std::io::stderr().lock();
stderr.write_all(buf)?;
@@ -396,32 +413,61 @@ impl StdFileResourceInner {
impl Read for StdFileResourceInner {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
- match self {
- Self::File(file) | Self::Stdin(file) => file.read(buf),
- Self::Stdout(_) | Self::Stderr(_) => Err(ErrorKind::Unsupported.into()),
+ match self.kind {
+ StdFileResourceKind::File | StdFileResourceKind::Stdin => {
+ self.file.read(buf)
+ }
+ StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
+ Err(ErrorKind::Unsupported.into())
+ }
}
}
}
+struct StdFileResourceCellValue {
+ inner: StdFileResourceInner,
+ meta_data: Arc<Mutex<FileMetadata>>,
+}
+
+impl StdFileResourceCellValue {
+ pub fn try_clone(&self) -> Result<Self, std::io::Error> {
+ Ok(Self {
+ inner: self.inner.try_clone()?,
+ meta_data: self.meta_data.clone(),
+ })
+ }
+}
+
pub struct StdFileResource {
name: String,
- cell: AsyncRefCell<Option<(StdFileResourceInner, FileMetadata)>>,
+ // We can't use an AsyncRefCell here because we need to allow
+ // access to the resource synchronously at any time and
+ // asynchronously one at a time in order
+ cell: RefCell<Option<StdFileResourceCellValue>>,
+ // Used to keep async actions in order and only allow one
+ // to occurr at a time
+ cell_async_sempahore: Semaphore,
}
impl StdFileResource {
fn stdio(inner: StdFileResourceInner, name: &str) -> Self {
Self {
- cell: AsyncRefCell::new(Some((inner, Default::default()))),
+ cell: RefCell::new(Some(StdFileResourceCellValue {
+ inner,
+ meta_data: Default::default(),
+ })),
+ cell_async_sempahore: Semaphore::new(1),
name: name.to_string(),
}
}
pub fn fs_file(fs_file: StdFile) -> Self {
Self {
- cell: AsyncRefCell::new(Some((
- StdFileResourceInner::file(fs_file),
- Default::default(),
- ))),
+ cell: RefCell::new(Some(StdFileResourceCellValue {
+ inner: StdFileResourceInner::file(fs_file),
+ meta_data: Default::default(),
+ })),
+ cell_async_sempahore: Semaphore::new(1),
name: "fsFile".to_string(),
}
}
@@ -430,17 +476,17 @@ impl StdFileResource {
self: Rc<Self>,
action: impl FnOnce(
&mut StdFileResourceInner,
- &mut FileMetadata,
+ &Arc<Mutex<FileMetadata>>,
) -> Result<TResult, AnyError>,
) -> Result<TResult, AnyError> {
- match RcRef::map(&self, |r| &r.cell).try_borrow_mut() {
- Some(mut cell) => {
+ match self.cell.try_borrow_mut() {
+ Ok(mut cell) => {
let mut file = cell.take().unwrap();
- let result = action(&mut file.0, &mut file.1);
+ let result = action(&mut file.inner, &file.meta_data);
cell.replace(file);
result
}
- None => Err(resource_unavailable()),
+ Err(_) => Err(resource_unavailable()),
}
}
@@ -451,17 +497,33 @@ impl StdFileResource {
where
F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static,
{
+ // we want to restrict this to one async action at a time
+ let _permit = self.cell_async_sempahore.acquire().await.unwrap();
// we take the value out of the cell, use it on a blocking task,
// then put it back into the cell when we're done
- let mut cell = RcRef::map(&self, |r| &r.cell).borrow_mut().await;
- let mut file = cell.take().unwrap();
- let (file, result) = tokio::task::spawn_blocking(move || {
- let result = action(&mut file.0);
- (file, result)
+ let mut did_take = false;
+ let mut cell_value = {
+ let mut cell = self.cell.borrow_mut();
+ match cell.as_mut().unwrap().try_clone() {
+ Ok(value) => value,
+ Err(_) => {
+ did_take = true;
+ cell.take().unwrap()
+ }
+ }
+ };
+ let (cell_value, result) = tokio::task::spawn_blocking(move || {
+ let result = action(&mut cell_value.inner);
+ (cell_value, result)
})
.await
.unwrap();
- cell.replace(file);
+
+ if did_take {
+ // put it back
+ self.cell.borrow_mut().replace(cell_value);
+ }
+
result
}
@@ -515,7 +577,7 @@ impl StdFileResource {
f: F,
) -> Result<R, AnyError>
where
- F: FnOnce(&mut StdFile, &mut FileMetadata) -> Result<R, AnyError>,
+ F: FnOnce(&mut StdFile, &Arc<Mutex<FileMetadata>>) -> Result<R, AnyError>,
{
Self::with_resource(state, rid, move |resource| {
resource.with_inner_and_metadata(move |inner, metadata| {
@@ -556,9 +618,9 @@ impl StdFileResource {
rid: u32,
) -> Result<std::process::Stdio, AnyError> {
Self::with_resource(state, rid, |resource| {
- resource.with_inner_and_metadata(|inner, _| match inner {
- StdFileResourceInner::File(file) => {
- let file = file.try_clone()?;
+ resource.with_inner_and_metadata(|inner, _| match inner.kind {
+ StdFileResourceKind::File => {
+ let file = inner.file.try_clone()?;
Ok(file.into())
}
_ => Ok(std::process::Stdio::inherit()),