summaryrefslogtreecommitdiff
path: root/cli/ops/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r--cli/ops/io.rs263
1 files changed, 187 insertions, 76 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 0c9a83883..e045eddfb 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -27,12 +27,14 @@ use std::os::windows::io::FromRawHandle;
extern crate winapi;
lazy_static! {
- /// Due to portability issues on Windows handle to stdout is created from raw file descriptor.
- /// The caveat of that approach is fact that when this handle is dropped underlying
- /// file descriptor is closed - that is highly not desirable in case of stdout.
- /// That's why we store this global handle that is then cloned when obtaining stdio
- /// for process. In turn when resource table is dropped storing reference to that handle,
- /// the handle itself won't be closed (so Deno.core.print) will still work.
+ /// Due to portability issues on Windows handle to stdout is created from raw
+ /// file descriptor. The caveat of that approach is fact that when this
+ /// handle is dropped underlying file descriptor is closed - that is highly
+ /// not desirable in case of stdout. That's why we store this global handle
+ /// that is then cloned when obtaining stdio for process. In turn when
+ /// resource table is dropped storing reference to that handle, the handle
+ /// itself won't be closed (so Deno.core.print) will still work.
+ // TODO(ry) It should be possible to close stdout.
static ref STDOUT_HANDLE: std::fs::File = {
#[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) };
@@ -42,9 +44,19 @@ lazy_static! {
winapi::um::winbase::STD_OUTPUT_HANDLE,
))
};
-
stdout
};
+ static ref STDERR_HANDLE: std::fs::File = {
+ #[cfg(not(windows))]
+ let stderr = unsafe { std::fs::File::from_raw_fd(2) };
+ #[cfg(windows)]
+ let stderr = unsafe {
+ std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_ERROR_HANDLE,
+ ))
+ };
+ stderr
+ };
}
pub fn init(i: &mut Isolate, s: &State) {
@@ -67,14 +79,14 @@ pub fn get_stdio() -> (
tokio::io::stdin(),
TTYMetadata::default(),
));
- let stdout = StreamResourceHolder::new(StreamResource::Stdout({
- let stdout = STDOUT_HANDLE
- .try_clone()
- .expect("Unable to clone stdout handle");
- tokio::fs::File::from_std(stdout)
- }));
- let stderr =
- StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));
+ let stdout = StreamResourceHolder::new(StreamResource::FsFile(Some({
+ let stdout = STDOUT_HANDLE.try_clone().unwrap();
+ (tokio::fs::File::from_std(stdout), FileMetadata::default())
+ })));
+ let stderr = StreamResourceHolder::new(StreamResource::FsFile(Some({
+ let stderr = STDERR_HANDLE.try_clone().unwrap();
+ (tokio::fs::File::from_std(stderr), FileMetadata::default())
+ })));
(stdin, stdout, stderr)
}
@@ -144,9 +156,7 @@ impl StreamResourceHolder {
pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
- Stdout(tokio::fs::File),
- Stderr(tokio::io::Stderr),
- FsFile(tokio::fs::File, FileMetadata),
+ FsFile(Option<(tokio::fs::File, FileMetadata)>),
TcpStream(tokio::net::TcpStream),
#[cfg(not(windows))]
UnixStream(tokio::net::UnixStream),
@@ -182,7 +192,8 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, OpError>> {
use StreamResource::*;
let f: &mut dyn UnpinAsyncRead = match self {
- FsFile(f, _) => f,
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Ready(Err(OpError::resource_unavailable())),
Stdin(f, _) => f,
TcpStream(f) => f,
#[cfg(not(windows))]
@@ -201,43 +212,65 @@ impl DenoAsyncRead for StreamResource {
pub fn op_read(
state: &State,
+ is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
-) -> Pin<Box<MinimalOp>> {
+) -> MinimalOp {
debug!("read rid={}", rid);
if zero_copy.is_none() {
- return futures::future::err(no_buffer_specified()).boxed_local();
+ return MinimalOp::Sync(Err(no_buffer_specified()));
}
let state = state.clone();
let mut buf = zero_copy.unwrap();
- poll_fn(move |cx| {
- let resource_table = &mut state.borrow_mut().resource_table;
- let resource_holder = resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(OpError::bad_resource_id)?;
-
- let mut task_tracker_id: Option<usize> = None;
- let nread = match resource_holder
- .resource
- .poll_read(cx, &mut buf.as_mut()[..])
- .map_err(OpError::from)
- {
- Poll::Ready(t) => {
- if let Some(id) = task_tracker_id {
- resource_holder.untrack_task(id);
+ if is_sync {
+ MinimalOp::Sync({
+ // First we look up the rid in the resource table.
+ let resource_table = &mut state.borrow_mut().resource_table;
+ std_file_resource(resource_table, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ use std::io::Read;
+ std_file
+ .read(&mut buf)
+ .map(|n: usize| n as i32)
+ .map_err(OpError::from)
}
- t
- }
- Poll::Pending => {
- task_tracker_id.replace(resource_holder.track_task(cx)?);
- return Poll::Pending;
- }
- }?;
- Poll::Ready(Ok(nread as i32))
- })
- .boxed_local()
+ Err(_) => Err(OpError::type_error(
+ "sync read not allowed on this resource".to_string(),
+ )),
+ })
+ })
+ } else {
+ MinimalOp::Async(
+ poll_fn(move |cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(OpError::bad_resource_id)?;
+
+ let mut task_tracker_id: Option<usize> = None;
+ let nread = match resource_holder
+ .resource
+ .poll_read(cx, &mut buf.as_mut()[..])
+ .map_err(OpError::from)
+ {
+ Poll::Ready(t) => {
+ if let Some(id) = task_tracker_id {
+ resource_holder.untrack_task(id);
+ }
+ t
+ }
+ Poll::Pending => {
+ task_tracker_id.replace(resource_holder.track_task(cx)?);
+ return Poll::Pending;
+ }
+ }?;
+ Poll::Ready(Ok(nread as i32))
+ })
+ .boxed_local(),
+ )
+ }
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
@@ -262,9 +295,8 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, OpError>> {
use StreamResource::*;
let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(f, _) => f,
- Stdout(f) => f,
- Stderr(f) => f,
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Pending,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
@@ -281,9 +313,8 @@ impl DenoAsyncWrite for StreamResource {
fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), OpError>> {
use StreamResource::*;
let f: &mut dyn UnpinAsyncWrite = match self {
- FsFile(f, _) => f,
- Stdout(f) => f,
- Stderr(f) => f,
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Pending,
TcpStream(f) => f,
#[cfg(not(windows))]
UnixStream(f) => f,
@@ -304,41 +335,121 @@ impl DenoAsyncWrite for StreamResource {
pub fn op_write(
state: &State,
+ is_sync: bool,
rid: i32,
zero_copy: Option<ZeroCopyBuf>,
-) -> Pin<Box<MinimalOp>> {
+) -> MinimalOp {
debug!("write rid={}", rid);
if zero_copy.is_none() {
- return futures::future::err(no_buffer_specified()).boxed_local();
+ return MinimalOp::Sync(Err(no_buffer_specified()));
}
let state = state.clone();
let buf = zero_copy.unwrap();
- async move {
- let nwritten = poll_fn(|cx| {
- let resource_table = &mut state.borrow_mut().resource_table;
- let resource_holder = resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(OpError::bad_resource_id)?;
- resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
- })
- .await?;
-
- // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
- // and the reasons for the need to explicitly flush are not fully known.
- // Figure out why it's needed and preferably remove it.
- // https://github.com/denoland/deno/issues/3565
- poll_fn(|cx| {
+ if is_sync {
+ MinimalOp::Sync({
+ // First we look up the rid in the resource table.
let resource_table = &mut state.borrow_mut().resource_table;
- let resource_holder = resource_table
- .get_mut::<StreamResourceHolder>(rid as u32)
- .ok_or_else(OpError::bad_resource_id)?;
- resource_holder.resource.poll_flush(cx)
+ std_file_resource(resource_table, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ use std::io::Write;
+ std_file
+ .write(&buf)
+ .map(|nwritten: usize| nwritten as i32)
+ .map_err(OpError::from)
+ }
+ Err(_) => Err(OpError::type_error(
+ "sync read not allowed on this resource".to_string(),
+ )),
+ })
})
- .await?;
+ } else {
+ MinimalOp::Async(
+ async move {
+ let nwritten = poll_fn(|cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(OpError::bad_resource_id)?;
+ resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
+ })
+ .await?;
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ poll_fn(|cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(OpError::bad_resource_id)?;
+ resource_holder.resource.poll_flush(cx)
+ })
+ .await?;
+
+ Ok(nwritten as i32)
+ }
+ .boxed_local(),
+ )
+ }
+}
- Ok(nwritten as i32)
+/// Helper function for operating on a std::fs::File stored in the resource table.
+///
+/// We store file system file resources as tokio::fs::File, so this is a little
+/// utility function that gets a std::fs:File when you need to do blocking
+/// operations.
+///
+/// Returns ErrorKind::Busy if the resource is being used by another op.
+pub fn std_file_resource<F, T>(
+ resource_table: &mut ResourceTable,
+ rid: u32,
+ mut f: F,
+) -> Result<T, OpError>
+where
+ F: FnMut(
+ Result<&mut std::fs::File, &mut StreamResource>,
+ ) -> Result<T, OpError>,
+{
+ // First we look up the rid in the resource table.
+ let mut r = resource_table.get_mut::<StreamResourceHolder>(rid);
+ if let Some(ref mut resource_holder) = r {
+ // Sync write only works for FsFile. It doesn't make sense to do this
+ // for non-blocking sockets. So we error out if not FsFile.
+ match &mut resource_holder.resource {
+ StreamResource::FsFile(option_file_metadata) => {
+ // The object in the resource table is a tokio::fs::File - but in
+ // order to do a blocking write on it, we must turn it into a
+ // std::fs::File. Hopefully this code compiles down to nothing.
+ if let Some((tokio_file, metadata)) = option_file_metadata.take() {
+ match tokio_file.try_into_std() {
+ Ok(mut std_file) => {
+ let result = f(Ok(&mut std_file));
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ // return the result.
+ result
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ Err(OpError::resource_unavailable())
+ }
+ }
+ } else {
+ Err(OpError::resource_unavailable())
+ }
+ }
+ _ => f(Err(&mut resource_holder.resource)),
+ }
+ } else {
+ Err(OpError::bad_resource_id())
}
- .boxed_local()
}