diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-14 04:16:57 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-13 22:16:57 -0500 |
commit | fd62379eafde6571f126df5650b80cfda9f74229 (patch) | |
tree | 34579151043837aaae17b36179c0aa5cf6b5e5aa /cli/ops/process.rs | |
parent | af448e864c4ac7e2ec601a25d46f95861ff5ade0 (diff) |
refactor: per-worker resource table (#3306)
- removes global `RESOURCE_TABLE` - resource tables are now created per `Worker`
in `State`
- renames `CliResource` to `StreamResource` and moves all logic related
to it to `cli/ops/io.rs`
- removes `cli/resources.rs`
- adds `state` argument to `op_read` and `op_write` and consequently adds
`stateful_minimal_op` to `State`
- IMPORTANT NOTE: workers don't have access to process stdio - this is
caused by fact that dropping worker would close stdout for process
(because it's constructed from raw handle, which closes underlying file
descriptor on drop)
Diffstat (limited to 'cli/ops/process.rs')
-rw-r--r-- | cli/ops/process.rs | 98 |
1 files changed, 75 insertions, 23 deletions
diff --git a/cli/ops/process.rs b/cli/ops/process.rs index f7897ec51..237b02fd0 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -1,9 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; -use crate::resources; -use crate::resources::CloneFileFuture; use crate::signal::kill; use crate::state::ThreadSafeState; use deno::*; @@ -28,6 +27,41 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); } +struct CloneFileFuture { + rid: ResourceId, + state: ThreadSafeState, +} + +impl Future for CloneFileFuture { + type Item = tokio::fs::File; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let mut table = self.state.lock_resource_table(); + let repr = table + .get_mut::<StreamResource>(self.rid) + .ok_or_else(bad_resource)?; + match repr { + StreamResource::FsFile(ref mut file) => { + file.poll_try_clone().map_err(ErrBox::from) + } + _ => Err(bad_resource()), + } + } +} + +fn clone_file( + rid: u32, + state: &ThreadSafeState, +) -> Result<std::fs::File, ErrBox> { + (CloneFileFuture { + rid, + state: state.clone(), + }) + .wait() + .map(|f| f.into_std()) +} + fn subprocess_stdio_map(s: &str) -> std::process::Stdio { match s { "inherit" => std::process::Stdio::inherit(), @@ -65,6 +99,7 @@ fn op_run( let run_args: RunArgs = serde_json::from_value(args)?; state.check_run()?; + let state_ = state.clone(); let args = run_args.args; let env = run_args.env; @@ -83,7 +118,7 @@ fn op_run( // TODO: make this work with other resources, eg. sockets let stdin_rid = run_args.stdin_rid; if stdin_rid > 0 { - let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std(); + let file = clone_file(stdin_rid, &state_)?; c.stdin(file); } else { c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())); @@ -91,7 +126,7 @@ fn op_run( let stdout_rid = run_args.stdout_rid; if stdout_rid > 0 { - let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std(); + let file = clone_file(stdout_rid, &state_)?; c.stdout(file); } else { c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())); @@ -99,7 +134,7 @@ fn op_run( let stderr_rid = run_args.stderr_rid; if stderr_rid > 0 { - let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std(); + let file = clone_file(stderr_rid, &state_)?; c.stderr(file); } else { c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())); @@ -109,29 +144,42 @@ fn op_run( let mut child = c.spawn_async().map_err(ErrBox::from)?; let pid = child.id(); - let stdin_rid = if child.stdin().is_some() { - let rid = resources::add_child_stdin(child.stdin().take().unwrap()); - Some(rid) - } else { - None + let mut table = state_.lock_resource_table(); + + let stdin_rid = match child.stdin().take() { + Some(child_stdin) => { + let rid = table.add( + "childStdin", + Box::new(StreamResource::ChildStdin(child_stdin)), + ); + Some(rid) + } + None => None, }; - let stdout_rid = if child.stdout().is_some() { - let rid = resources::add_child_stdout(child.stdout().take().unwrap()); - Some(rid) - } else { - None + let stdout_rid = match child.stdout().take() { + Some(child_stdout) => { + let rid = table.add( + "childStdout", + Box::new(StreamResource::ChildStdout(child_stdout)), + ); + Some(rid) + } + None => None, }; - let stderr_rid = if child.stderr().is_some() { - let rid = resources::add_child_stderr(child.stderr().take().unwrap()); - Some(rid) - } else { - None + let stderr_rid = match child.stderr().take() { + Some(child_stderr) => { + let rid = table.add( + "childStderr", + Box::new(StreamResource::ChildStderr(child_stderr)), + ); + Some(rid) + } + None => None, }; let child_resource = ChildResource { child }; - let mut table = resources::lock_resource_table(); let child_rid = table.add("child", Box::new(child_resource)); Ok(JsonOp::Sync(json!({ @@ -145,6 +193,7 @@ fn op_run( pub struct ChildStatus { rid: ResourceId, + state: ThreadSafeState, } impl Future for ChildStatus { @@ -152,7 +201,7 @@ impl Future for ChildStatus { type Error = ErrBox; fn poll(&mut self) -> Poll<ExitStatus, ErrBox> { - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let child_resource = table .get_mut::<ChildResource>(self.rid) .ok_or_else(bad_resource)?; @@ -177,7 +226,10 @@ fn op_run_status( state.check_run()?; - let future = ChildStatus { rid }; + let future = ChildStatus { + rid, + state: state.clone(), + }; let future = future.and_then(move |run_status| { let code = run_status.code(); |