diff options
author | Leo Kettmeir <crowlkats@toaxl.com> | 2022-04-21 00:20:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-21 00:20:33 +0200 |
commit | 8a7539cab36699465ec6e37455c54fa86f3c0cbe (patch) | |
tree | c3df15f3b673d1ec1a9c4ffada1a9274e3aca942 /runtime | |
parent | 8b258070542a81d217226fe832b26d81cf20113d (diff) |
feat(runtime): two-tier subprocess API (#11618)
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/js/40_spawn.js | 206 | ||||
-rw-r--r-- | runtime/js/90_deno_ns.js | 4 | ||||
-rw-r--r-- | runtime/ops/io.rs | 8 | ||||
-rw-r--r-- | runtime/ops/mod.rs | 1 | ||||
-rw-r--r-- | runtime/ops/spawn.rs | 263 | ||||
-rw-r--r-- | runtime/web_worker.rs | 1 | ||||
-rw-r--r-- | runtime/worker.rs | 1 |
7 files changed, 484 insertions, 0 deletions
diff --git a/runtime/js/40_spawn.js b/runtime/js/40_spawn.js new file mode 100644 index 000000000..c55ce657d --- /dev/null +++ b/runtime/js/40_spawn.js @@ -0,0 +1,206 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +"use strict"; + +((window) => { + const core = window.Deno.core; + const { pathFromURL } = window.__bootstrap.util; + const { illegalConstructorKey } = window.__bootstrap.webUtil; + const { + ArrayPrototypeMap, + ObjectEntries, + String, + TypeError, + Uint8Array, + PromiseAll, + } = window.__bootstrap.primordials; + const { readableStreamForRid, writableStreamForRid } = + window.__bootstrap.streamUtils; + + function spawnChild(command, { + args = [], + cwd = undefined, + clearEnv = false, + env = {}, + uid = undefined, + gid = undefined, + stdin = "null", + stdout = "piped", + stderr = "piped", + } = {}) { + const child = core.opSync("op_spawn_child", { + cmd: pathFromURL(command), + args: ArrayPrototypeMap(args, String), + cwd: pathFromURL(cwd), + clearEnv, + env: ObjectEntries(env), + uid, + gid, + stdin, + stdout, + stderr, + }); + return new Child(illegalConstructorKey, child); + } + + async function collectOutput(readableStream) { + if (!(readableStream instanceof ReadableStream)) { + return null; + } + + const bufs = []; + let size = 0; + for await (const chunk of readableStream) { + bufs.push(chunk); + size += chunk.byteLength; + } + + const buffer = new Uint8Array(size); + let offset = 0; + for (const chunk of bufs) { + buffer.set(chunk, offset); + offset += chunk.byteLength; + } + + return buffer; + } + + class Child { + #rid; + + #pid; + get pid() { + return this.#pid; + } + + #stdinRid; + #stdin = null; + get stdin() { + return this.#stdin; + } + + #stdoutRid; + #stdout = null; + get stdout() { + return this.#stdout; + } + + #stderrRid; + #stderr = null; + get stderr() { + return this.#stderr; + } + + constructor(key = null, { + rid, + pid, + stdinRid, + stdoutRid, + stderrRid, + } = null) { + if (key !== illegalConstructorKey) { + throw new TypeError("Illegal constructor."); + } + + this.#rid = rid; + this.#pid = pid; + + if (stdinRid !== null) { + this.#stdinRid = stdinRid; + this.#stdin = writableStreamForRid(stdinRid); + } + + if (stdoutRid !== null) { + this.#stdoutRid = stdoutRid; + this.#stdout = readableStreamForRid(stdoutRid); + } + + if (stderrRid !== null) { + this.#stderrRid = stderrRid; + this.#stderr = readableStreamForRid(stderrRid); + } + + this.#status = core.opAsync("op_spawn_wait", this.#rid).then((res) => { + this.#rid = null; + return res; + }); + } + + #status; + get status() { + return this.#status; + } + + async output() { + if (this.#rid === null) { + throw new TypeError("Child process has already terminated."); + } + if (this.#stdout?.locked) { + throw new TypeError( + "Can't collect output because stdout is locked", + ); + } + if (this.#stderr?.locked) { + throw new TypeError( + "Can't collect output because stderr is locked", + ); + } + + const [status, stdout, stderr] = await PromiseAll([ + this.#status, + collectOutput(this.#stdout), + collectOutput(this.#stderr), + ]); + + return { + status, + stdout, + stderr, + }; + } + + kill(signo) { + if (this.#rid === null) { + throw new TypeError("Child process has already terminated."); + } + core.opSync("op_kill", this.#pid, signo); + } + } + + function spawn(command, options) { // TODO(@crowlKats): more options (like input)? + return spawnChild(command, { + ...options, + stdin: "null", + stdout: "piped", + stderr: "piped", + }).output(); + } + + function spawnSync(command, { + args = [], + cwd = undefined, + clearEnv = false, + env = {}, + uid = undefined, + gid = undefined, + } = {}) { // TODO(@crowlKats): more options (like input)? + return core.opSync("op_spawn_sync", { + cmd: pathFromURL(command), + args: ArrayPrototypeMap(args, String), + cwd: pathFromURL(cwd), + clearEnv, + env: ObjectEntries(env), + uid, + gid, + stdin: "null", + stdout: "piped", + stderr: "piped", + }); + } + + window.__bootstrap.spawn = { + Child, + spawnChild, + spawn, + spawnSync, + }; +})(this); diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index ddaecd7c9..61e894f8a 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -151,5 +151,9 @@ funlockSync: __bootstrap.fs.funlockSync, refTimer: __bootstrap.timers.refTimer, unrefTimer: __bootstrap.timers.unrefTimer, + Child: __bootstrap.spawn.Child, + spawnChild: __bootstrap.spawn.spawnChild, + spawn: __bootstrap.spawn.spawn, + spawnSync: __bootstrap.spawn.spawnSync, }; })(this); diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index b8449af86..34cd541d5 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -134,6 +134,10 @@ where stream.shutdown().await?; Ok(()) } + + pub fn into_inner(self) -> S { + self.stream.into_inner() + } } #[derive(Debug)] @@ -178,6 +182,10 @@ where .await?; Ok((nread, buf)) } + + pub fn into_inner(self) -> S { + self.stream.into_inner() + } } pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>; diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs index 750dfe0f2..526c36d63 100644 --- a/runtime/ops/mod.rs +++ b/runtime/ops/mod.rs @@ -9,6 +9,7 @@ pub mod permissions; pub mod process; pub mod runtime; pub mod signal; +pub mod spawn; pub mod tty; mod utils; pub mod web_worker; diff --git a/runtime/ops/spawn.rs b/runtime/ops/spawn.rs new file mode 100644 index 000000000..196a7eed6 --- /dev/null +++ b/runtime/ops/spawn.rs @@ -0,0 +1,263 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use super::io::ChildStderrResource; +use super::io::ChildStdinResource; +use super::io::ChildStdoutResource; +use crate::permissions::Permissions; +use deno_core::error::AnyError; +use deno_core::op; +use deno_core::Extension; +use deno_core::OpState; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use serde::Serialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::process::ExitStatus; +use std::rc::Rc; + +#[cfg(unix)] +use std::os::unix::prelude::ExitStatusExt; +#[cfg(unix)] +use std::os::unix::process::CommandExt; + +pub fn init() -> Extension { + Extension::builder() + .ops(vec![ + op_spawn_child::decl(), + op_spawn_wait::decl(), + op_spawn_sync::decl(), + ]) + .build() +} + +struct ChildResource(tokio::process::Child); + +impl Resource for ChildResource { + fn name(&self) -> Cow<str> { + "child".into() + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Stdio { + Inherit, + Piped, + Null, +} + +fn subprocess_stdio_map(s: &Stdio) -> Result<std::process::Stdio, AnyError> { + match s { + Stdio::Inherit => Ok(std::process::Stdio::inherit()), + Stdio::Piped => Ok(std::process::Stdio::piped()), + Stdio::Null => Ok(std::process::Stdio::null()), + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SpawnArgs { + cmd: String, + args: Vec<String>, + cwd: Option<String>, + clear_env: bool, + env: Vec<(String, String)>, + #[cfg(unix)] + gid: Option<u32>, + #[cfg(unix)] + uid: Option<u32>, + + #[serde(flatten)] + stdio: ChildStdio, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChildStdio { + stdin: Stdio, + stdout: Stdio, + stderr: Stdio, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ChildStatus { + success: bool, + code: i32, + signal: Option<i32>, +} + +impl From<std::process::ExitStatus> for ChildStatus { + fn from(status: ExitStatus) -> Self { + let code = status.code(); + #[cfg(unix)] + let signal = status.signal(); + #[cfg(not(unix))] + let signal = None; + + if let Some(signal) = signal { + ChildStatus { + success: false, + code: 128 + signal, + signal: Some(signal), + } + } else { + let code = code.expect("Should have either an exit code or a signal."); + + ChildStatus { + success: code == 0, + code, + signal: None, + } + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SpawnOutput { + status: ChildStatus, + stdout: Option<ZeroCopyBuf>, + stderr: Option<ZeroCopyBuf>, +} + +fn create_command( + state: &mut OpState, + args: SpawnArgs, +) -> Result<std::process::Command, AnyError> { + super::check_unstable(state, "Deno.spawn"); + state.borrow_mut::<Permissions>().run.check(&args.cmd)?; + + let mut command = std::process::Command::new(args.cmd); + command.args(args.args); + + if let Some(cwd) = args.cwd { + command.current_dir(cwd); + } + + if args.clear_env { + command.env_clear(); + } + command.envs(args.env); + + #[cfg(unix)] + if let Some(gid) = args.gid { + super::check_unstable(state, "Deno.spawn.gid"); + command.gid(gid); + } + #[cfg(unix)] + if let Some(uid) = args.uid { + super::check_unstable(state, "Deno.spawn.uid"); + command.uid(uid); + } + #[cfg(unix)] + unsafe { + command.pre_exec(|| { + libc::setgroups(0, std::ptr::null()); + Ok(()) + }); + } + + command.stdin(subprocess_stdio_map(&args.stdio.stdin)?); + command.stdout(subprocess_stdio_map(&args.stdio.stdout)?); + command.stderr(subprocess_stdio_map(&args.stdio.stderr)?); + + Ok(command) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Child { + rid: ResourceId, + pid: u32, + stdin_rid: Option<ResourceId>, + stdout_rid: Option<ResourceId>, + stderr_rid: Option<ResourceId>, +} + +#[op] +fn op_spawn_child( + state: &mut OpState, + args: SpawnArgs, +) -> Result<Child, AnyError> { + let mut command = tokio::process::Command::from(create_command(state, args)?); + // TODO(@crowlkats): allow detaching processes. + // currently deno will orphan a process when exiting with an error or Deno.exit() + // We want to kill child when it's closed + command.kill_on_drop(true); + + let mut child = command.spawn()?; + let pid = child.id().expect("Process ID should be set."); + + let stdin_rid = child + .stdin + .take() + .map(|stdin| state.resource_table.add(ChildStdinResource::from(stdin))); + + let stdout_rid = child + .stdout + .take() + .map(|stdout| state.resource_table.add(ChildStdoutResource::from(stdout))); + + let stderr_rid = child + .stderr + .take() + .map(|stderr| state.resource_table.add(ChildStderrResource::from(stderr))); + + let child_rid = state.resource_table.add(ChildResource(child)); + + Ok(Child { + rid: child_rid, + pid, + stdin_rid, + stdout_rid, + stderr_rid, + }) +} + +#[op] +async fn op_spawn_wait( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<ChildStatus, AnyError> { + let resource = state + .borrow_mut() + .resource_table + .take::<ChildResource>(rid)?; + Ok( + Rc::try_unwrap(resource) + .ok() + .unwrap() + .0 + .wait() + .await? + .into(), + ) +} + +#[op] +fn op_spawn_sync( + state: &mut OpState, + args: SpawnArgs, +) -> Result<SpawnOutput, AnyError> { + let stdout = matches!(args.stdio.stdout, Stdio::Piped); + let stderr = matches!(args.stdio.stderr, Stdio::Piped); + let output = create_command(state, args)?.output()?; + + Ok(SpawnOutput { + status: output.status.into(), + stdout: if stdout { + Some(output.stdout.into()) + } else { + None + }, + stderr: if stderr { + Some(output.stderr.into()) + } else { + None + }, + }) +} diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index f4c040aa4..ac103adda 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -427,6 +427,7 @@ impl WebWorker { .enabled(options.use_deno_namespace), ops::permissions::init().enabled(options.use_deno_namespace), ops::process::init().enabled(options.use_deno_namespace), + ops::spawn::init().enabled(options.use_deno_namespace), ops::signal::init().enabled(options.use_deno_namespace), ops::tty::init().enabled(options.use_deno_namespace), deno_http::init().enabled(options.use_deno_namespace), diff --git a/runtime/worker.rs b/runtime/worker.rs index fa147a7e6..370475703 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -132,6 +132,7 @@ impl MainWorker { options.create_web_worker_cb.clone(), options.web_worker_preload_module_cb.clone(), ), + ops::spawn::init(), ops::fs_events::init(), ops::fs::init(), ops::io::init(), |