diff options
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r-- | cli/ops/io.rs | 175 |
1 files changed, 26 insertions, 149 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 959147f19..3ede4b411 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,101 +1,19 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; use crate::deno_error::bad_resource; -use crate::http_body::HttpBody; use crate::ops::minimal_op; +use crate::resources; +use crate::resources::CliResource; +use crate::resources::DenoAsyncRead; +use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; -use deno::ErrBox; -use deno::Resource; use deno::*; -use futures; use futures::Future; use futures::Poll; -use std; -use tokio; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpStream; -use tokio_process; -use tokio_rustls::client::TlsStream as ClientTlsStream; -use tokio_rustls::server::TlsStream as ServerTlsStream; - -#[cfg(not(windows))] -use std::os::unix::io::FromRawFd; - -#[cfg(windows)] -use std::os::windows::io::FromRawHandle; - -#[cfg(windows)] -extern crate winapi; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { - i.register_op( - "read", - s.core_op(minimal_op(s.stateful_minimal_op(op_read))), - ); - i.register_op( - "write", - s.core_op(minimal_op(s.stateful_minimal_op(op_write))), - ); -} - -pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { - let stdin = StreamResource::Stdin(tokio::io::stdin()); - let stdout = StreamResource::Stdout({ - #[cfg(not(windows))] - let stdout = unsafe { std::fs::File::from_raw_fd(1) }; - #[cfg(windows)] - let stdout = unsafe { - std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle( - winapi::um::winbase::STD_OUTPUT_HANDLE, - )) - }; - tokio::fs::File::from_std(stdout) - }); - let stderr = StreamResource::Stderr(tokio::io::stderr()); - - (stdin, stdout, stderr) -} - -pub enum StreamResource { - Stdin(tokio::io::Stdin), - Stdout(tokio::fs::File), - Stderr(tokio::io::Stderr), - FsFile(tokio::fs::File), - TcpStream(tokio::net::TcpStream), - ServerTlsStream(Box<ServerTlsStream<TcpStream>>), - ClientTlsStream(Box<ClientTlsStream<TcpStream>>), - HttpBody(HttpBody), - ChildStdin(tokio_process::ChildStdin), - ChildStdout(tokio_process::ChildStdout), - ChildStderr(tokio_process::ChildStderr), -} - -impl Resource for StreamResource {} - -/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait -/// but uses an `ErrBox` error instead of `std::io:Error` -pub trait DenoAsyncRead { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>; -} - -impl DenoAsyncRead for StreamResource { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_read(buf), - StreamResource::Stdin(ref mut f) => f.poll_read(buf), - StreamResource::TcpStream(ref mut f) => f.poll_read(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::HttpBody(ref mut f) => f.poll_read(buf), - StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), - StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), - _ => { - return Err(bad_resource()); - } - }; - - r.map_err(ErrBox::from) - } + i.register_op("read", s.core_op(minimal_op(op_read))); + i.register_op("write", s.core_op(minimal_op(op_write))); } #[derive(Debug, PartialEq)] @@ -109,15 +27,14 @@ enum IoState { /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T> +pub fn read<T>(rid: ResourceId, buf: T) -> Read<T> where T: AsMut<[u8]>, { Read { rid, buf, - io_state: IoState::Pending, - state: state.clone(), + state: IoState::Pending, } } @@ -125,11 +42,11 @@ where /// a buffer. /// /// Created by the [`read`] function. +#[derive(Debug)] pub struct Read<T> { rid: ResourceId, buf: T, - io_state: IoState, - state: ThreadSafeState, + state: IoState, } impl<T> Future for Read<T> @@ -140,25 +57,21 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.io_state == IoState::Done { + if self.state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<CliResource>(self.rid) .ok_or_else(bad_resource)?; let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.io_state = IoState::Done; + self.state = IoState::Done; Ok(nread.into()) } } -pub fn op_read( - state: &ThreadSafeState, - rid: i32, - zero_copy: Option<PinnedBuf>, -) -> Box<MinimalOp> { +pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { @@ -167,50 +80,19 @@ pub fn op_read( Some(buf) => buf, }; - let fut = read(state, rid as u32, zero_copy) + let fut = read(rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nread| Ok(nread as i32)); Box::new(fut) } -/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait -/// but uses an `ErrBox` error instead of `std::io:Error` -pub trait DenoAsyncWrite { - fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>; - - fn shutdown(&mut self) -> Poll<(), ErrBox>; -} - -impl DenoAsyncWrite for StreamResource { - fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_write(buf), - StreamResource::Stdout(ref mut f) => f.poll_write(buf), - StreamResource::Stderr(ref mut f) => f.poll_write(buf), - StreamResource::TcpStream(ref mut f) => f.poll_write(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), - _ => { - return Err(bad_resource()); - } - }; - - r.map_err(ErrBox::from) - } - - fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { - unimplemented!() - } -} - /// A future used to write some data to a stream. +#[derive(Debug)] pub struct Write<T> { rid: ResourceId, buf: T, - io_state: IoState, - state: ThreadSafeState, + state: IoState, } /// Creates a future that will write some of the buffer `buf` to @@ -218,15 +100,14 @@ pub struct Write<T> { /// /// Any error which happens during writing will cause both the stream and the /// buffer to get destroyed. -pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T> +pub fn write<T>(rid: ResourceId, buf: T) -> Write<T> where T: AsRef<[u8]>, { Write { rid, buf, - io_state: IoState::Pending, - state: state.clone(), + state: IoState::Pending, } } @@ -240,25 +121,21 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.io_state == IoState::Done { + if self.state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = resources::lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<CliResource>(self.rid) .ok_or_else(bad_resource)?; let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.io_state = IoState::Done; + self.state = IoState::Done; Ok(nwritten.into()) } } -pub fn op_write( - state: &ThreadSafeState, - rid: i32, - zero_copy: Option<PinnedBuf>, -) -> Box<MinimalOp> { +pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { @@ -267,7 +144,7 @@ pub fn op_write( Some(buf) => buf, }; - let fut = write(state, rid as u32, zero_copy) + let fut = write(rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nwritten| Ok(nwritten as i32)); |