summaryrefslogtreecommitdiff
path: root/cli/ops/io.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-14 04:16:57 +0100
committerRy Dahl <ry@tinyclouds.org>2019-11-13 22:16:57 -0500
commitfd62379eafde6571f126df5650b80cfda9f74229 (patch)
tree34579151043837aaae17b36179c0aa5cf6b5e5aa /cli/ops/io.rs
parentaf448e864c4ac7e2ec601a25d46f95861ff5ade0 (diff)
refactor: per-worker resource table (#3306)
- removes global `RESOURCE_TABLE` - resource tables are now created per `Worker` in `State` - renames `CliResource` to `StreamResource` and moves all logic related to it to `cli/ops/io.rs` - removes `cli/resources.rs` - adds `state` argument to `op_read` and `op_write` and consequently adds `stateful_minimal_op` to `State` - IMPORTANT NOTE: workers don't have access to process stdio - this is caused by fact that dropping worker would close stdout for process (because it's constructed from raw handle, which closes underlying file descriptor on drop)
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r--cli/ops/io.rs175
1 files changed, 149 insertions, 26 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 3ede4b411..959147f19 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -1,19 +1,101 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
use crate::deno_error::bad_resource;
+use crate::http_body::HttpBody;
use crate::ops::minimal_op;
-use crate::resources;
-use crate::resources::CliResource;
-use crate::resources::DenoAsyncRead;
-use crate::resources::DenoAsyncWrite;
use crate::state::ThreadSafeState;
+use deno::ErrBox;
+use deno::Resource;
use deno::*;
+use futures;
use futures::Future;
use futures::Poll;
+use std;
+use tokio;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpStream;
+use tokio_process;
+use tokio_rustls::client::TlsStream as ClientTlsStream;
+use tokio_rustls::server::TlsStream as ServerTlsStream;
+
+#[cfg(not(windows))]
+use std::os::unix::io::FromRawFd;
+
+#[cfg(windows)]
+use std::os::windows::io::FromRawHandle;
+
+#[cfg(windows)]
+extern crate winapi;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
- i.register_op("read", s.core_op(minimal_op(op_read)));
- i.register_op("write", s.core_op(minimal_op(op_write)));
+ i.register_op(
+ "read",
+ s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
+ );
+ i.register_op(
+ "write",
+ s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
+ );
+}
+
+pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
+ let stdin = StreamResource::Stdin(tokio::io::stdin());
+ let stdout = StreamResource::Stdout({
+ #[cfg(not(windows))]
+ let stdout = unsafe { std::fs::File::from_raw_fd(1) };
+ #[cfg(windows)]
+ let stdout = unsafe {
+ std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_OUTPUT_HANDLE,
+ ))
+ };
+ tokio::fs::File::from_std(stdout)
+ });
+ let stderr = StreamResource::Stderr(tokio::io::stderr());
+
+ (stdin, stdout, stderr)
+}
+
+pub enum StreamResource {
+ Stdin(tokio::io::Stdin),
+ Stdout(tokio::fs::File),
+ Stderr(tokio::io::Stderr),
+ FsFile(tokio::fs::File),
+ TcpStream(tokio::net::TcpStream),
+ ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
+ ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
+ HttpBody(HttpBody),
+ ChildStdin(tokio_process::ChildStdin),
+ ChildStdout(tokio_process::ChildStdout),
+ ChildStderr(tokio_process::ChildStderr),
+}
+
+impl Resource for StreamResource {}
+
+/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
+/// but uses an `ErrBox` error instead of `std::io:Error`
+pub trait DenoAsyncRead {
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
+}
+
+impl DenoAsyncRead for StreamResource {
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
+ let r = match self {
+ StreamResource::FsFile(ref mut f) => f.poll_read(buf),
+ StreamResource::Stdin(ref mut f) => f.poll_read(buf),
+ StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
+ StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
+ StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
+ StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
+ StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
+ StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
+ _ => {
+ return Err(bad_resource());
+ }
+ };
+
+ r.map_err(ErrBox::from)
+ }
}
#[derive(Debug, PartialEq)]
@@ -27,14 +109,15 @@ enum IoState {
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
-pub fn read<T>(rid: ResourceId, buf: T) -> Read<T>
+pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T>
where
T: AsMut<[u8]>,
{
Read {
rid,
buf,
- state: IoState::Pending,
+ io_state: IoState::Pending,
+ state: state.clone(),
}
}
@@ -42,11 +125,11 @@ where
/// a buffer.
///
/// Created by the [`read`] function.
-#[derive(Debug)]
pub struct Read<T> {
rid: ResourceId,
buf: T,
- state: IoState,
+ io_state: IoState,
+ state: ThreadSafeState,
}
impl<T> Future for Read<T>
@@ -57,21 +140,25 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == IoState::Done {
+ if self.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let resource = table
- .get_mut::<CliResource>(self.rid)
+ .get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
- self.state = IoState::Done;
+ self.io_state = IoState::Done;
Ok(nread.into())
}
}
-pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
+pub fn op_read(
+ state: &ThreadSafeState,
+ rid: i32,
+ zero_copy: Option<PinnedBuf>,
+) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
@@ -80,19 +167,50 @@ pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
- let fut = read(rid as u32, zero_copy)
+ let fut = read(state, rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nread| Ok(nread as i32));
Box::new(fut)
}
+/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
+/// but uses an `ErrBox` error instead of `std::io:Error`
+pub trait DenoAsyncWrite {
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
+
+ fn shutdown(&mut self) -> Poll<(), ErrBox>;
+}
+
+impl DenoAsyncWrite for StreamResource {
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
+ let r = match self {
+ StreamResource::FsFile(ref mut f) => f.poll_write(buf),
+ StreamResource::Stdout(ref mut f) => f.poll_write(buf),
+ StreamResource::Stderr(ref mut f) => f.poll_write(buf),
+ StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
+ StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
+ StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
+ StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
+ _ => {
+ return Err(bad_resource());
+ }
+ };
+
+ r.map_err(ErrBox::from)
+ }
+
+ fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
+ unimplemented!()
+ }
+}
+
/// A future used to write some data to a stream.
-#[derive(Debug)]
pub struct Write<T> {
rid: ResourceId,
buf: T,
- state: IoState,
+ io_state: IoState,
+ state: ThreadSafeState,
}
/// Creates a future that will write some of the buffer `buf` to
@@ -100,14 +218,15 @@ pub struct Write<T> {
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
-pub fn write<T>(rid: ResourceId, buf: T) -> Write<T>
+pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T>
where
T: AsRef<[u8]>,
{
Write {
rid,
buf,
- state: IoState::Pending,
+ io_state: IoState::Pending,
+ state: state.clone(),
}
}
@@ -121,21 +240,25 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == IoState::Done {
+ if self.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let resource = table
- .get_mut::<CliResource>(self.rid)
+ .get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
- self.state = IoState::Done;
+ self.io_state = IoState::Done;
Ok(nwritten.into())
}
}
-pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
+pub fn op_write(
+ state: &ThreadSafeState,
+ rid: i32,
+ zero_copy: Option<PinnedBuf>,
+) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
@@ -144,7 +267,7 @@ pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
- let fut = write(rid as u32, zero_copy)
+ let fut = write(state, rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nwritten| Ok(nwritten as i32));