summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/js/errors.ts10
-rw-r--r--cli/js/lib.deno.ns.d.ts1
-rw-r--r--cli/js/tests/read_file_test.ts6
-rw-r--r--cli/op_error.rs8
-rw-r--r--cli/ops/dispatch_minimal.rs50
-rw-r--r--cli/ops/fs.rs91
-rw-r--r--cli/ops/io.rs263
-rw-r--r--cli/ops/process.rs16
-rw-r--r--cli/ops/tty.rs89
-rw-r--r--cli/state.rs13
-rw-r--r--core/ops.rs3
11 files changed, 352 insertions, 198 deletions
diff --git a/cli/js/errors.ts b/cli/js/errors.ts
index fc4021321..69afcf148 100644
--- a/cli/js/errors.ts
+++ b/cli/js/errors.ts
@@ -23,6 +23,7 @@ export enum ErrorKind {
URIError = 20,
TypeError = 21,
Other = 22,
+ Busy = 23,
}
export function getErrorClass(kind: ErrorKind): { new (msg: string): Error } {
@@ -67,6 +68,8 @@ export function getErrorClass(kind: ErrorKind): { new (msg: string): Error } {
return BadResource;
case ErrorKind.Http:
return Http;
+ case ErrorKind.Busy:
+ return Busy;
}
}
@@ -172,6 +175,12 @@ class Http extends Error {
this.name = "Http";
}
}
+class Busy extends Error {
+ constructor(msg: string) {
+ super(msg);
+ this.name = "Busy";
+ }
+}
export const errors = {
NotFound: NotFound,
@@ -191,4 +200,5 @@ export const errors = {
UnexpectedEof: UnexpectedEof,
BadResource: BadResource,
Http: Http,
+ Busy: Busy,
};
diff --git a/cli/js/lib.deno.ns.d.ts b/cli/js/lib.deno.ns.d.ts
index 0ec27dbce..443952c96 100644
--- a/cli/js/lib.deno.ns.d.ts
+++ b/cli/js/lib.deno.ns.d.ts
@@ -1663,6 +1663,7 @@ declare namespace Deno {
UnexpectedEof: ErrorConstructor;
BadResource: ErrorConstructor;
Http: ErrorConstructor;
+ Busy: ErrorConstructor;
};
/** **UNSTABLE**: potentially want names to overlap more with browser.
diff --git a/cli/js/tests/read_file_test.ts b/cli/js/tests/read_file_test.ts
index 1b709b1f4..0d3cdf422 100644
--- a/cli/js/tests/read_file_test.ts
+++ b/cli/js/tests/read_file_test.ts
@@ -57,3 +57,9 @@ unitTest({ perms: { read: false } }, async function readFilePerm(): Promise<
}
assert(caughtError);
});
+
+unitTest({ perms: { read: true } }, function readFileSyncLoop(): void {
+ for (let i = 0; i < 256; i++) {
+ Deno.readFileSync("cli/tests/fixture.json");
+ }
+});
diff --git a/cli/op_error.rs b/cli/op_error.rs
index 258642f8a..a687eed2b 100644
--- a/cli/op_error.rs
+++ b/cli/op_error.rs
@@ -50,6 +50,7 @@ pub enum ErrorKind {
/// if no better context is available.
/// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Error
Other = 22,
+ Busy = 23,
}
#[derive(Debug)]
@@ -103,6 +104,13 @@ impl OpError {
pub fn invalid_utf8() -> OpError {
Self::new(ErrorKind::InvalidData, "invalid utf8".to_string())
}
+
+ pub fn resource_unavailable() -> OpError {
+ Self::new(
+ ErrorKind::Busy,
+ "resource is unavailable because it is in use by a promise".to_string(),
+ )
+ }
}
impl Error for OpError {}
diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs
index 2dd4db9ef..299462ca0 100644
--- a/cli/ops/dispatch_minimal.rs
+++ b/cli/ops/dispatch_minimal.rs
@@ -14,7 +14,10 @@ use futures::future::FutureExt;
use std::future::Future;
use std::pin::Pin;
-pub type MinimalOp = dyn Future<Output = Result<i32, OpError>>;
+pub enum MinimalOp {
+ Sync(Result<i32, OpError>),
+ Async(Pin<Box<dyn Future<Output = Result<i32, OpError>>>>),
+}
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
@@ -113,7 +116,7 @@ fn test_parse_min_record() {
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp
where
- D: Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>,
+ D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| {
let mut record = match parse_min_record(control) {
@@ -131,14 +134,13 @@ where
};
let is_sync = record.promise_id == 0;
let rid = record.arg;
- let min_op = d(rid, zero_copy);
+ let min_op = d(is_sync, rid, zero_copy);
- // Convert to CoreOp
- let fut = async move {
- match min_op.await {
+ match min_op {
+ MinimalOp::Sync(sync_result) => Op::Sync(match sync_result {
Ok(r) => {
record.result = r;
- Ok(record.into())
+ record.into()
}
Err(err) => {
let error_record = ErrorRecord {
@@ -147,20 +149,30 @@ where
error_code: err.kind as i32,
error_message: err.msg.as_bytes().to_owned(),
};
- Ok(error_record.into())
+ error_record.into()
}
+ }),
+ MinimalOp::Async(min_fut) => {
+ // Convert to CoreOp
+ let core_fut = async move {
+ match min_fut.await {
+ Ok(r) => {
+ record.result = r;
+ Ok(record.into())
+ }
+ Err(err) => {
+ let error_record = ErrorRecord {
+ promise_id: record.promise_id,
+ arg: -1,
+ error_code: err.kind as i32,
+ error_message: err.msg.as_bytes().to_owned(),
+ };
+ Ok(error_record.into())
+ }
+ }
+ };
+ Op::Async(core_fut.boxed_local())
}
- };
-
- if is_sync {
- // Warning! Possible deadlocks can occur if we try to wait for a future
- // while in a future. The safe but expensive alternative is to use
- // tokio_util::block_on.
- // This block is only exercised for readSync and writeSync, which I think
- // works since they're simple polling futures.
- Op::Sync(futures::executor::block_on(fut).unwrap())
- } else {
- Op::Async(fut.boxed_local())
}
}
}
diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs
index 288d56901..6f73b1a2d 100644
--- a/cli/ops/fs.rs
+++ b/cli/ops/fs.rs
@@ -1,16 +1,17 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
+use super::io::std_file_resource;
use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
use crate::fs::resolve_from_cwd;
use crate::op_error::OpError;
use crate::ops::dispatch_json::JsonResult;
use crate::state::State;
-use deno_core::*;
+use deno_core::Isolate;
+use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use std::convert::From;
use std::env::{current_dir, set_current_dir, temp_dir};
-use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::time::UNIX_EPOCH;
@@ -75,22 +76,19 @@ fn op_open(
let path = resolve_from_cwd(Path::new(&args.path))?;
let state_ = state.clone();
- let mut open_options = if let Some(mode) = args.mode {
- #[allow(unused_mut)]
- let mut std_options = std::fs::OpenOptions::new();
+ let mut open_options = std::fs::OpenOptions::new();
+
+ if let Some(mode) = args.mode {
// mode only used if creating the file on Unix
// if not specified, defaults to 0o666
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
- std_options.mode(mode & 0o777);
+ open_options.mode(mode & 0o777);
}
#[cfg(not(unix))]
let _ = mode; // avoid unused warning
- tokio::fs::OpenOptions::from(std_options)
- } else {
- tokio::fs::OpenOptions::new()
- };
+ }
if let Some(options) = args.options {
if options.read {
@@ -165,23 +163,33 @@ fn op_open(
let is_sync = args.promise_id.is_none();
- let fut = async move {
- let fs_file = open_options.open(path).await?;
+ if is_sync {
+ let std_file = open_options.open(path)?;
+ let tokio_file = tokio::fs::File::from_std(std_file);
let mut state = state_.borrow_mut();
let rid = state.resource_table.add(
"fsFile",
- Box::new(StreamResourceHolder::new(StreamResource::FsFile(
- fs_file,
+ Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
+ tokio_file,
FileMetadata::default(),
- ))),
+ ))))),
);
- Ok(json!(rid))
- };
-
- if is_sync {
- let buf = futures::executor::block_on(fut)?;
- Ok(JsonOp::Sync(buf))
+ Ok(JsonOp::Sync(json!(rid)))
} else {
+ let fut = async move {
+ let tokio_file = tokio::fs::OpenOptions::from(open_options)
+ .open(path)
+ .await?;
+ let mut state = state_.borrow_mut();
+ let rid = state.resource_table.add(
+ "fsFile",
+ Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
+ tokio_file,
+ FileMetadata::default(),
+ ))))),
+ );
+ Ok(json!(rid))
+ };
Ok(JsonOp::Async(fut.boxed_local()))
}
}
@@ -200,6 +208,7 @@ fn op_seek(
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
+ use std::io::{Seek, SeekFrom};
let args: SeekArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let offset = args.offset;
@@ -217,29 +226,31 @@ fn op_seek(
}
};
- let state = state.borrow();
- let resource_holder = state
- .resource_table
- .get::<StreamResourceHolder>(rid)
- .ok_or_else(OpError::bad_resource_id)?;
-
- let tokio_file = match resource_holder.resource {
- StreamResource::FsFile(ref file, _) => file,
- _ => return Err(OpError::bad_resource_id()),
- };
- let mut file = futures::executor::block_on(tokio_file.try_clone())?;
-
+ let state = state.clone();
let is_sync = args.promise_id.is_none();
- let fut = async move {
- debug!("op_seek {} {} {}", rid, offset, whence);
- let pos = file.seek(seek_from).await?;
- Ok(json!(pos))
- };
if is_sync {
- let buf = futures::executor::block_on(fut)?;
- Ok(JsonOp::Sync(buf))
+ let mut s = state.borrow_mut();
+ let pos = std_file_resource(&mut s.resource_table, rid, |r| match r {
+ Ok(std_file) => std_file.seek(seek_from).map_err(OpError::from),
+ Err(_) => Err(OpError::type_error(
+ "cannot seek on this type of resource".to_string(),
+ )),
+ })?;
+ Ok(JsonOp::Sync(json!(pos)))
} else {
+ // TODO(ry) This is a fake async op. We need to use poll_fn,
+ // tokio::fs::File::start_seek and tokio::fs::File::poll_complete
+ let fut = async move {
+ let mut s = state.borrow_mut();
+ let pos = std_file_resource(&mut s.resource_table, rid, |r| match r {
+ Ok(std_file) => std_file.seek(seek_from).map_err(OpError::from),
+ Err(_) => Err(OpError::type_error(
+ "cannot seek on this type of resource".to_string(),
+ )),
+ })?;
+ Ok(json!(pos))
+ };
Ok(JsonOp::Async(fut.boxed_local()))
}
}
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 0c9a83883..e045eddfb 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -27,12 +27,14 @@ use std::os::windows::io::FromRawHandle;
extern crate winapi;
lazy_static! {
- /// Due to portability issues on Windows handle to stdout is created from raw file descriptor.
- /// The caveat of that approach is fact that when this handle is dropped underlying
- /// file descriptor is closed - that is highly not desirable in case of stdout.
- /// That's why we store this global handle that is then cloned when obtaining stdio
- /// for process. In turn when resource table is dropped storing reference to that handle,
- /// the handle itself won't be closed (so Deno.core.print) will still work.
+ /// Due to portability issues on Windows handle to stdout is created from raw
+ /// file descriptor. The caveat of that approach is fact that when this
+ /// handle is dropped underlying file descriptor is closed - that is highly
+ /// not desirable in case of stdout. That's why we store this global handle
+ /// that is then cloned when obtaining stdio for process. In turn when
+ /// resource table is dropped storing reference to that handle, the handle
+ /// itself won't be closed (so Deno.core.print) will still work.
+ // TODO(ry) It should be possible to close stdout.
static ref STDOUT_HANDLE: std::fs::File = {
#[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) };
@@ -42,9 +44,19 @@ lazy_static! {
winapi::um::winbase::STD_OUTPUT_HANDLE,
))
};
-
stdout
};
+ static ref STDERR_HANDLE: std::fs::File = {
+ #[cfg(not(windows))]
+ let stderr = unsafe { std::fs::File::from_raw_fd(2) };
+ #[cfg(windows)]
+ let stderr = unsafe {
+ std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_ERROR_HANDLE,
+ ))
+ };
+ stderr
+ };
}
pub fn init(i: &mut Isolate, s: &State) {
@@ -67,14 +79,14 @@ pub fn get_stdio() -> (
tokio::io::stdin(),
TTYMetadata::default(),
));
- let stdout = StreamResourceHolder::new(StreamResource::Stdout({
- let stdout = STDOUT_HANDLE
- .try_clone()
- .expect("Unable to clone stdout handle");
- tokio::fs::File::from_std(stdout)
- }));
- let stderr =
- StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));
+ let stdout = StreamResourceHolder::new(StreamResource::FsFile(Some({
+ let stdout = STDOUT_HANDLE.try_clone().unwrap();
+ (tokio::fs::File::from_std(stdout), FileMetadata::default())
+ })));
+ let stderr = StreamResourceHolder::new(StreamResource::FsFile(Some({
+ let stderr = STDERR_HANDLE.try_clone().unwrap();
+ (tokio::fs::File::from_std(stderr), FileMetadata::default())
+ })));
(stdin, stdout, stderr)
}
@@ -144,9 +156,7 @@ impl StreamResourceHolder {
pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
- Stdout(tokio::fs::File),
- Stderr(tokio::io::Stderr),
- FsFile(tokio::fs::File, FileMetadata),
+ FsFile(Option<(tokio::fs::File, FileMetadata)>),
TcpStream(tokio::net::TcpStream),
#[cfg(not(windows))]
UnixStream(tokio::net::UnixStream),
@@ -182,7 +192,8 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, OpError>> {
use StreamResource::*;
let f: &mut dyn UnpinAsyncRead = match self {
- FsFile(f, _) => f,
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Ready(Err(OpError::resource_unavailable())),
Stdin(f, _) => f,
TcpStream(f) => f,
#[cfg(not(windows))]
@@ -201,43 +212,65 @@ impl DenoAsyncRead for StreamResource {
pub fn op_read(
state: &State,
+ is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
-) -> Pin<Box<MinimalOp>> {
+) -> MinimalOp {
debug!("read rid={}", rid);
if zero_copy.is_none() {
- return futures::future::err(no_buffer_specified()).boxed_local();
+ return MinimalOp::Sync(Err(no_buffer_specified()));
}
let state = state.clone();
let mut buf = zero_copy.unwrap();
- poll_fn(move |cx| {
- let resource_table = &mut state.borrow_mut().resource_table;
- let resource_holder = resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(OpError::bad_resource_id)?;
-
- let mut task_tracker_id: Option<usize> = None;
- let nread = match resource_holder
- .resource
- .poll_read(cx, &mut buf.as_mut()[..])
- .map_err(OpError::from)
- {
- Poll::Ready(t) => {
- if let Some(id) = task_tracker_id {
- resource_holder.untrack_task(id);
+ if is_sync {
+ MinimalOp::Sync({
+ // First we look up the rid in the resource table.
+ let resource_table = &mut state.borrow_mut().resource_table;
+ std_file_resource(resource_table, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ use std::io::Read;
+ std_file
+ .read(&mut buf)
+ .map(|n: usize| n as i32)
+ .map_err(OpError::from)
}
- t
- }
- Poll::Pending => {
- task_tracker_id.replace(resource_holder.track_task(cx)?);
- return Poll::Pending;
- }
- }?;
- Poll::Ready(Ok(nread as i32))
- })
- .boxed_local()
+ Err(_) => Err(OpError::type_error(
+ "sync read not allowed on this resource".to_string(),
+ )),
+ })
+ })
+ } else {
+ MinimalOp::Async(
+ poll_fn(move |cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(OpError::bad_resource_id)?;
+
+ let mut task_tracker_id: Option<usize> = None;
+ let nread = match resource_holder
+ .resource
+ .poll_read(cx, &mut buf.as_mut()[..])
+ .map_err(OpError::from)
+ {
+ Poll::Ready(t) => {
+ if let Some(id) = task_tracker_id {
+ resource_holder.untrack_task(id);
+ }
+ t
+ }
+ Poll::Pending => {
+ task_tracker_id.replace(resource_holder.track_task(cx)?);
+ return Poll::Pending;
+ }
+ }?;
+ Poll::Ready(Ok(nread as i32))
+ })
+ .boxed_local(),
+ )
+ }
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
@@ -262,9 +295,8 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, OpError>> {
use StreamResource::*;
let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(f, _) => f,
- Stdout(f) => f,
- Stderr(f) => f,
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Pending,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
@@ -281,9 +313,8 @@ impl DenoAsyncWrite for StreamResource {
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), OpError>> {
use StreamResource::*;
let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(f, _) => f,
- Stdout(f) => f,
- Stderr(f) => f,
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Pending,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
@@ -304,41 +335,121 @@ impl DenoAsyncWrite for StreamResource {
pub fn op_write(
state: &State,
+ is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
-) -> Pin<Box<MinimalOp>> {
+) -> MinimalOp {
debug!("write rid={}", rid);
if zero_copy.is_none() {
- return futures::future::err(no_buffer_specified()).boxed_local();
+ return MinimalOp::Sync(Err(no_buffer_specified()));
}
let state = state.clone();
let buf = zero_copy.unwrap();
- async move {
- let nwritten = poll_fn(|cx| {
- let resource_table = &mut state.borrow_mut().resource_table;
- let resource_holder = resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(OpError::bad_resource_id)?;
- resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
- })
- .await?;
-
- // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
- // and the reasons for the need to explicitly flush are not fully known.
- // Figure out why it's needed and preferably remove it.
- // https://github.com/denoland/deno/issues/3565
- poll_fn(|cx| {
+ if is_sync {
+ MinimalOp::Sync({
+ // First we look up the rid in the resource table.
let resource_table = &mut state.borrow_mut().resource_table;
- let resource_holder = resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(OpError::bad_resource_id)?;
- resource_holder.resource.poll_flush(cx)
+ std_file_resource(resource_table, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ use std::io::Write;
+ std_file
+ .write(&buf)
+ .map(|nwritten: usize| nwritten as i32)
+ .map_err(OpError::from)
+ }
+ Err(_) => Err(OpError::type_error(
+ "sync read not allowed on this resource".to_string(),
+ )),
+ })
})
- .await?;
+ } else {
+ MinimalOp::Async(
+ async move {
+ let nwritten = poll_fn(|cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(OpError::bad_resource_id)?;
+ resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
+ })
+ .await?;
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ poll_fn(|cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(OpError::bad_resource_id)?;
+ resource_holder.resource.poll_flush(cx)
+ })
+ .await?;
+
+ Ok(nwritten as i32)
+ }
+ .boxed_local(),
+ )
+ }
+}
- Ok(nwritten as i32)
+/// Helper function for operating on a std::fs::File stored in the resource table.
+///
+/// We store file system file resources as tokio::fs::File, so this is a little
+/// utility function that gets a std::fs:File when you need to do blocking
+/// operations.
+///
+/// Returns ErrorKind::Busy if the resource is being used by another op.
+pub fn std_file_resource<F, T>(
+ resource_table: &mut ResourceTable,
+ rid: u32,
+ mut f: F,
+) -> Result<T, OpError>
+where
+ F: FnMut(
+ Result<&mut std::fs::File, &mut StreamResource>,
+ ) -> Result<T, OpError>,
+{
+ // First we look up the rid in the resource table.
+ let mut r = resource_table.get_mut::<StreamResourceHolder>(rid);
+ if let Some(ref mut resource_holder) = r {
+ // Sync write only works for FsFile. It doesn't make sense to do this
+ // for non-blocking sockets. So we error out if not FsFile.
+ match &mut resource_holder.resource {
+ StreamResource::FsFile(option_file_metadata) => {
+ // The object in the resource table is a tokio::fs::File - but in
+ // order to do a blocking write on it, we must turn it into a
+ // std::fs::File. Hopefully this code compiles down to nothing.
+ if let Some((tokio_file, metadata)) = option_file_metadata.take() {
+ match tokio_file.try_into_std() {
+ Ok(mut std_file) => {
+ let result = f(Ok(&mut std_file));
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ // return the result.
+ result
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ Err(OpError::resource_unavailable())
+ }
+ }
+ } else {
+ Err(OpError::resource_unavailable())
+ }
+ }
+ _ => f(Err(&mut resource_holder.resource)),
+ }
+ } else {
+ Err(OpError::bad_resource_id())
}
- .boxed_local()
}
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index 096b09bd0..980147832 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -26,13 +26,15 @@ fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, OpError> {
.resource_table
.get_mut::<StreamResourceHolder>(rid)
.ok_or_else(OpError::bad_resource_id)?;
- let file = match repr_holder.resource {
- StreamResource::FsFile(ref mut file, _) => file,
- _ => return Err(OpError::bad_resource_id()),
- };
- let tokio_file = futures::executor::block_on(file.try_clone())?;
- let std_file = futures::executor::block_on(tokio_file.into_std());
- Ok(std_file)
+ match repr_holder.resource {
+ StreamResource::FsFile(Some((ref mut file, _))) => {
+ let tokio_file = futures::executor::block_on(file.try_clone())?;
+ let std_file = futures::executor::block_on(tokio_file.into_std());
+ Ok(std_file)
+ }
+ StreamResource::FsFile(None) => Err(OpError::resource_unavailable()),
+ _ => Err(OpError::bad_resource_id()),
+ }
}
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
diff --git a/cli/ops/tty.rs b/cli/ops/tty.rs
index ebe6c39e3..461c1ca30 100644
--- a/cli/ops/tty.rs
+++ b/cli/ops/tty.rs
@@ -1,4 +1,5 @@
use super::dispatch_json::JsonOp;
+use super::io::std_file_resource;
use super::io::{StreamResource, StreamResourceHolder};
use crate::op_error::OpError;
use crate::ops::json_op;
@@ -73,13 +74,16 @@ pub fn op_set_raw(
// For now, only stdin.
let handle = match &resource_holder.unwrap().resource {
StreamResource::Stdin(_, _) => std::io::stdin().as_raw_handle(),
- StreamResource::FsFile(f, _) => {
+ StreamResource::FsFile(None) => {
+ return Err(OpError::resource_unavailable())
+ }
+ StreamResource::FsFile(Some((f, _))) => {
let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std());
std_file.as_raw_handle()
}
_ => {
- return Err(OpError::other("Not supported".to_owned()));
+ return Err(OpError::bad_resource_id());
}
};
@@ -122,11 +126,14 @@ pub fn op_set_raw(
StreamResource::Stdin(_, ref mut metadata) => {
(std::io::stdin().as_raw_fd(), &mut metadata.mode)
}
- StreamResource::FsFile(f, ref mut metadata) => {
+ StreamResource::FsFile(Some((f, ref mut metadata))) => {
let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std());
(std_file.as_raw_fd(), &mut metadata.tty.mode)
}
+ StreamResource::FsFile(None) => {
+ return Err(OpError::resource_unavailable())
+ }
_ => {
return Err(OpError::other("Not supported".to_owned()));
}
@@ -165,13 +172,16 @@ pub fn op_set_raw(
StreamResource::Stdin(_, ref mut metadata) => {
(std::io::stdin().as_raw_fd(), &mut metadata.mode)
}
- StreamResource::FsFile(f, ref mut metadata) => {
+ StreamResource::FsFile(Some((f, ref mut metadata))) => {
let tokio_file = futures::executor::block_on(f.try_clone())?;
let std_file = futures::executor::block_on(tokio_file.into_std());
(std_file.as_raw_fd(), &mut metadata.tty.mode)
}
+ StreamResource::FsFile(None) => {
+ return Err(OpError::resource_unavailable());
+ }
_ => {
- return Err(OpError::other("Not supported".to_owned()));
+ return Err(OpError::bad_resource_id());
}
};
@@ -190,55 +200,36 @@ struct IsattyArgs {
}
pub fn op_isatty(
- state_: &State,
+ state: &State,
args: Value,
_zero_copy: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let args: IsattyArgs = serde_json::from_value(args)?;
let rid = args.rid;
- let state = state_.borrow_mut();
- if !state.resource_table.has(rid) {
- return Err(OpError::bad_resource_id());
- }
-
- let resource_holder = state.resource_table.get::<StreamResourceHolder>(rid);
- if resource_holder.is_none() {
- return Ok(JsonOp::Sync(json!(false)));
- }
-
- match &resource_holder.unwrap().resource {
- StreamResource::Stdin(_, _) => {
- Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stdin))))
- }
- StreamResource::Stdout(_) => {
- Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stdout))))
- }
- StreamResource::Stderr(_) => {
- Ok(JsonOp::Sync(json!(atty::is(atty::Stream::Stderr))))
- }
- StreamResource::FsFile(f, _) => {
- let tokio_file = futures::executor::block_on(f.try_clone())?;
- let std_file = futures::executor::block_on(tokio_file.into_std());
- #[cfg(windows)]
- {
- use winapi::um::consoleapi;
-
- let handle = get_windows_handle(&std_file)?;
- let mut test_mode: DWORD = 0;
- // If I cannot get mode out of console, it is not a console.
- let result =
- unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 };
- Ok(JsonOp::Sync(json!(result)))
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let isatty: bool =
+ std_file_resource(resource_table, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ #[cfg(windows)]
+ {
+ use winapi::um::consoleapi;
+
+ let handle = get_windows_handle(&std_file)?;
+ let mut test_mode: DWORD = 0;
+ // If I cannot get mode out of console, it is not a console.
+ Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 })
+ }
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+ let raw_fd = std_file.as_raw_fd();
+ Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
+ }
}
- #[cfg(unix)]
- {
- use std::os::unix::io::AsRawFd;
- let raw_fd = std_file.as_raw_fd();
- let result = unsafe { libc::isatty(raw_fd as libc::c_int) == 1 };
- Ok(JsonOp::Sync(json!(result)))
- }
- }
- _ => Ok(JsonOp::Sync(json!(false))),
- }
+ Err(StreamResource::FsFile(_)) => unreachable!(),
+ Err(StreamResource::Stdin(_, _)) => Ok(atty::is(atty::Stream::Stdin)),
+ _ => Ok(false),
+ })?;
+ Ok(JsonOp::Sync(json!(isatty)))
}
diff --git a/cli/state.rs b/cli/state.rs
index 228ef1200..f67d5e9a6 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -145,15 +145,15 @@ impl State {
pub fn stateful_minimal_op<D>(
&self,
dispatcher: D,
- ) -> impl Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>
+ ) -> impl Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp
where
- D: Fn(&State, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>,
+ D: Fn(&State, bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{
let state = self.clone();
-
- move |rid: i32, zero_copy: Option<ZeroCopyBuf>| -> Pin<Box<MinimalOp>> {
- dispatcher(&state, rid, zero_copy)
- }
+ move |is_sync: bool,
+ rid: i32,
+ zero_copy: Option<ZeroCopyBuf>|
+ -> MinimalOp { dispatcher(&state, is_sync, rid, zero_copy) }
}
/// This is a special function that provides `state` argument to dispatcher.
@@ -169,7 +169,6 @@ impl State {
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{
let state = self.clone();
-
move |args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) }
diff --git a/core/ops.rs b/core/ops.rs
index 6b1aec6ab..16807196e 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -17,6 +17,9 @@ pub(crate) type PendingOpFuture =
pub type OpResult<E> = Result<Op<E>, E>;
+// TODO(ry) Op::Async should be Op::Async(Pin<Box<dyn Future<Output = Buf>>>)
+// The error should be encoded in the Buf. Notice how Sync ops do not return a
+// result. The Sync and Async should be symmetrical!
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),