From 8b90b8e88325d28fe41d8312ea91417b6e66a12e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 14 Nov 2019 18:10:25 +0100 Subject: refactor: per-worker resource table, take 2 (#3342) - removes global `RESOURCE_TABLE` - resource tables are now created per `Worker` in `State` - renames `CliResource` to `StreamResource` and moves all logic related to it to `cli/ops/io.rs` - removes `cli/resources.rs` - adds `state` argument to `op_read` and `op_write` and consequently adds `stateful_minimal_op` to `State` - IMPORTANT NOTE: workers don't have access to process stdio - this is caused by fact that dropping worker would close stdout for process (because it's constructed from raw handle, which closes underlying file descriptor on drop) --- cli/ops/io.rs | 195 +++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 167 insertions(+), 28 deletions(-) (limited to 'cli/ops/io.rs') diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 3ede4b411..30c933999 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,19 +1,117 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; use crate::deno_error::bad_resource; +use crate::http_body::HttpBody; use crate::ops::minimal_op; -use crate::resources; -use crate::resources::CliResource; -use crate::resources::DenoAsyncRead; -use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; +use deno::ErrBox; +use deno::Resource; use deno::*; +use futures; use futures::Future; use futures::Poll; +use std; +use tokio; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; +use tokio_process; +use tokio_rustls::client::TlsStream as ClientTlsStream; +use tokio_rustls::server::TlsStream as ServerTlsStream; + +#[cfg(not(windows))] +use std::os::unix::io::FromRawFd; + +#[cfg(windows)] +use std::os::windows::io::FromRawHandle; + +#[cfg(windows)] +extern crate winapi; + +lazy_static! { + /// Due to portability issues on Windows handle to stdout is created from raw file descriptor. + /// The caveat of that approach is fact that when this handle is dropped underlying + /// file descriptor is closed - that is highly not desirable in case of stdout. + /// That's why we store this global handle that is then cloned when obtaining stdio + /// for process. In turn when resource table is dropped storing reference to that handle, + /// the handle itself won't be closed (so Deno.core.print) will still work. + static ref STDOUT_HANDLE: std::fs::File = { + #[cfg(not(windows))] + let stdout = unsafe { std::fs::File::from_raw_fd(1) }; + #[cfg(windows)] + let stdout = unsafe { + std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle( + winapi::um::winbase::STD_OUTPUT_HANDLE, + )) + }; + + stdout + }; +} pub fn init(i: &mut Isolate, s: &ThreadSafeState) { - i.register_op("read", s.core_op(minimal_op(op_read))); - i.register_op("write", s.core_op(minimal_op(op_write))); + i.register_op( + "read", + s.core_op(minimal_op(s.stateful_minimal_op(op_read))), + ); + i.register_op( + "write", + s.core_op(minimal_op(s.stateful_minimal_op(op_write))), + ); +} + +pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { + let stdin = StreamResource::Stdin(tokio::io::stdin()); + let stdout = StreamResource::Stdout({ + let stdout = STDOUT_HANDLE + .try_clone() + .expect("Unable to clone stdout handle"); + tokio::fs::File::from_std(stdout) + }); + let stderr = StreamResource::Stderr(tokio::io::stderr()); + + (stdin, stdout, stderr) +} + +pub enum StreamResource { + Stdin(tokio::io::Stdin), + Stdout(tokio::fs::File), + Stderr(tokio::io::Stderr), + FsFile(tokio::fs::File), + TcpStream(tokio::net::TcpStream), + ServerTlsStream(Box>), + ClientTlsStream(Box>), + HttpBody(HttpBody), + ChildStdin(tokio_process::ChildStdin), + ChildStdout(tokio_process::ChildStdout), + ChildStderr(tokio_process::ChildStderr), +} + +impl Resource for StreamResource {} + +/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait +/// but uses an `ErrBox` error instead of `std::io:Error` +pub trait DenoAsyncRead { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll; +} + +impl DenoAsyncRead for StreamResource { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + let r = match self { + StreamResource::FsFile(ref mut f) => f.poll_read(buf), + StreamResource::Stdin(ref mut f) => f.poll_read(buf), + StreamResource::TcpStream(ref mut f) => f.poll_read(buf), + StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), + StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), + StreamResource::HttpBody(ref mut f) => f.poll_read(buf), + StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), + StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), + _ => { + return Err(bad_resource()); + } + }; + + r.map_err(ErrBox::from) + } } #[derive(Debug, PartialEq)] @@ -27,14 +125,15 @@ enum IoState { /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub fn read(rid: ResourceId, buf: T) -> Read +pub fn read(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read where T: AsMut<[u8]>, { Read { rid, buf, - state: IoState::Pending, + io_state: IoState::Pending, + state: state.clone(), } } @@ -42,11 +141,11 @@ where /// a buffer. /// /// Created by the [`read`] function. -#[derive(Debug)] pub struct Read { rid: ResourceId, buf: T, - state: IoState, + io_state: IoState, + state: ThreadSafeState, } impl Future for Read @@ -57,42 +156,77 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.state == IoState::Done { + if self.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.state = IoState::Done; + self.io_state = IoState::Done; Ok(nread.into()) } } -pub fn op_read(rid: i32, zero_copy: Option) -> Box { +pub fn op_read( + state: &ThreadSafeState, + rid: i32, + zero_copy: Option, +) -> Box { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())) + return Box::new(futures::future::err(deno_error::no_buffer_specified())); } Some(buf) => buf, }; - let fut = read(rid as u32, zero_copy) + let fut = read(state, rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nread| Ok(nread as i32)); Box::new(fut) } +/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait +/// but uses an `ErrBox` error instead of `std::io:Error` +pub trait DenoAsyncWrite { + fn poll_write(&mut self, buf: &[u8]) -> Poll; + + fn shutdown(&mut self) -> Poll<(), ErrBox>; +} + +impl DenoAsyncWrite for StreamResource { + fn poll_write(&mut self, buf: &[u8]) -> Poll { + let r = match self { + StreamResource::FsFile(ref mut f) => f.poll_write(buf), + StreamResource::Stdout(ref mut f) => f.poll_write(buf), + StreamResource::Stderr(ref mut f) => f.poll_write(buf), + StreamResource::TcpStream(ref mut f) => f.poll_write(buf), + StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), + StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), + StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), + _ => { + return Err(bad_resource()); + } + }; + + r.map_err(ErrBox::from) + } + + fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { + unimplemented!() + } +} + /// A future used to write some data to a stream. -#[derive(Debug)] pub struct Write { rid: ResourceId, buf: T, - state: IoState, + io_state: IoState, + state: ThreadSafeState, } /// Creates a future that will write some of the buffer `buf` to @@ -100,14 +234,15 @@ pub struct Write { /// /// Any error which happens during writing will cause both the stream and the /// buffer to get destroyed. -pub fn write(rid: ResourceId, buf: T) -> Write +pub fn write(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write where T: AsRef<[u8]>, { Write { rid, buf, - state: IoState::Pending, + io_state: IoState::Pending, + state: state.clone(), } } @@ -121,30 +256,34 @@ where type Error = ErrBox; fn poll(&mut self) -> Poll { - if self.state == IoState::Done { + if self.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(self.rid) .ok_or_else(bad_resource)?; let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.state = IoState::Done; + self.io_state = IoState::Done; Ok(nwritten.into()) } } -pub fn op_write(rid: i32, zero_copy: Option) -> Box { +pub fn op_write( + state: &ThreadSafeState, + rid: i32, + zero_copy: Option, +) -> Box { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())) + return Box::new(futures::future::err(deno_error::no_buffer_specified())); } Some(buf) => buf, }; - let fut = write(rid as u32, zero_copy) + let fut = write(state, rid as u32, zero_copy) .map_err(ErrBox::from) .and_then(move |nwritten| Ok(nwritten as i32)); -- cgit v1.2.3