summaryrefslogtreecommitdiff
path: root/cli/ops/workers.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r--cli/ops/workers.rs279
1 files changed, 279 insertions, 0 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
new file mode 100644
index 000000000..d47aab765
--- /dev/null
+++ b/cli/ops/workers.rs
@@ -0,0 +1,279 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::deno_error;
+use crate::deno_error::DenoError;
+use crate::deno_error::ErrorKind;
+use crate::msg;
+use crate::ops::ok_buf;
+use crate::ops::op_selector_std;
+use crate::ops::serialize_response;
+use crate::ops::CliOpResult;
+use crate::resources;
+use crate::startup_data;
+use crate::state::ThreadSafeState;
+use crate::worker::Worker;
+use deno::*;
+use flatbuffers::FlatBufferBuilder;
+use futures;
+use futures::Async;
+use futures::Future;
+use futures::Sink;
+use futures::Stream;
+use std;
+use std::convert::From;
+
+struct GetMessageFuture {
+ pub state: ThreadSafeState,
+}
+
+impl Future for GetMessageFuture {
+ type Item = Option<Buf>;
+ type Error = ();
+
+ fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
+ let mut wc = self.state.worker_channels.lock().unwrap();
+ wc.1
+ .poll()
+ .map_err(|err| panic!("worker_channel recv err {:?}", err))
+ }
+}
+
+/// Get message from host as guest worker
+pub fn op_worker_get_message(
+ state: &ThreadSafeState,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ if base.sync() {
+ return Err(deno_error::no_sync_support());
+ }
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+
+ let op = GetMessageFuture {
+ state: state.clone(),
+ };
+ let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
+ debug!("op_worker_get_message");
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let inner = msg::WorkerGetMessageRes::create(
+ builder,
+ &msg::WorkerGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::WorkerGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Ok(Op::Async(Box::new(op)))
+}
+
+/// Post message to host as guest worker
+pub fn op_worker_post_message(
+ state: &ThreadSafeState,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ let cmd_id = base.cmd_id();
+ let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
+
+ let tx = {
+ let wc = state.worker_channels.lock().unwrap();
+ wc.0.clone()
+ };
+ tx.send(d)
+ .wait()
+ .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+ let builder = &mut FlatBufferBuilder::new();
+
+ ok_buf(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+}
+
+/// Create worker as the host
+pub fn op_create_worker(
+ state: &ThreadSafeState,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_create_worker().unwrap();
+ let specifier = inner.specifier().unwrap();
+ // Only include deno namespace if requested AND current worker
+ // has included namespace (to avoid escalation).
+ let include_deno_namespace =
+ inner.include_deno_namespace() && state.include_deno_namespace;
+ let has_source_code = inner.has_source_code();
+ let source_code = inner.source_code().unwrap();
+
+ let parent_state = state.clone();
+
+ let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?;
+
+ let mut child_argv = parent_state.argv.clone();
+
+ if !has_source_code {
+ if let Some(module) = state.main_module() {
+ module_specifier =
+ ModuleSpecifier::resolve_import(specifier, &module.to_string())?;
+ child_argv[1] = module_specifier.to_string();
+ }
+ }
+
+ let child_state = ThreadSafeState::new(
+ parent_state.flags.clone(),
+ child_argv,
+ op_selector_std,
+ parent_state.progress.clone(),
+ include_deno_namespace,
+ )?;
+ let rid = child_state.resource.rid;
+ let name = format!("USER-WORKER-{}", specifier);
+ let deno_main_call = format!("denoMain({})", include_deno_namespace);
+
+ let mut worker =
+ Worker::new(name, startup_data::deno_isolate_init(), child_state);
+ worker.execute(&deno_main_call).unwrap();
+ worker.execute("workerMain()").unwrap();
+
+ let exec_cb = move |worker: Worker| {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner =
+ msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid });
+ serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ )
+ };
+
+ // Has provided source code, execute immediately.
+ if has_source_code {
+ worker.execute(&source_code).unwrap();
+ return ok_buf(exec_cb(worker));
+ }
+
+ let op = worker
+ .execute_mod_async(&module_specifier, false)
+ .and_then(move |()| Ok(exec_cb(worker)));
+
+ let result = op.wait()?;
+ Ok(Op::Sync(result))
+}
+
+/// Return when the worker closes
+pub fn op_host_get_worker_closed(
+ state: &ThreadSafeState,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ if base.sync() {
+ return Err(deno_error::no_sync_support());
+ }
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_worker_closed().unwrap();
+ let rid = inner.rid();
+ let state = state.clone();
+
+ let shared_worker_future = {
+ let workers_tl = state.workers.lock().unwrap();
+ let worker = workers_tl.get(&rid).unwrap();
+ worker.clone()
+ };
+
+ let op = Box::new(shared_worker_future.then(move |_result| {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ }));
+ Ok(Op::Async(Box::new(op)))
+}
+
+/// Get message from guest worker as host
+pub fn op_host_get_message(
+ _state: &ThreadSafeState,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ if base.sync() {
+ return Err(deno_error::no_sync_support());
+ }
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_message().unwrap();
+ let rid = inner.rid();
+
+ let op = resources::get_message_from_worker(rid);
+ let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let msg_inner = msg::HostGetMessageRes::create(
+ builder,
+ &msg::HostGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::HostGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Ok(Op::Async(Box::new(op)))
+}
+
+/// Post message to guest worker as host
+pub fn op_host_post_message(
+ _state: &ThreadSafeState,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_post_message().unwrap();
+ let rid = inner.rid();
+
+ let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
+
+ resources::post_message_to_worker(rid, d)
+ .wait()
+ .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+ let builder = &mut FlatBufferBuilder::new();
+
+ ok_buf(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+}