summaryrefslogtreecommitdiff
path: root/cli/ops.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops.rs')
-rw-r--r--cli/ops.rs160
1 files changed, 158 insertions, 2 deletions
diff --git a/cli/ops.rs b/cli/ops.rs
index a7c2e868f..c8119771d 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -18,6 +18,7 @@ use crate::resources::Resource;
use crate::tokio_util;
use crate::tokio_write;
use crate::version;
+use crate::workers;
use deno::deno_buf;
use deno::Buf;
use deno::JSError;
@@ -141,13 +142,24 @@ pub fn dispatch_all(
(base.sync(), boxed_op)
}
+/// Superset of op_selector_worker for compiler isolates
pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
+ _ => op_selector_worker(inner_type),
+ }
+}
+
+/// Superset of op_selector_std for worker isolates
+pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> {
+ match inner_type {
+ msg::Any::WorkerGetMessage => Some(op_worker_get_message),
+ msg::Any::WorkerPostMessage => Some(op_worker_post_message),
_ => op_selector_std(inner_type),
}
}
+/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
@@ -189,8 +201,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
- msg::Any::WorkerGetMessage => Some(op_worker_get_message),
- msg::Any::WorkerPostMessage => Some(op_worker_post_message),
+ msg::Any::CreateWorker => Some(op_create_worker),
+ msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
+ msg::Any::HostGetMessage => Some(op_host_get_message),
+ msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::Write => Some(op_write),
_ => None,
}
@@ -1741,6 +1755,7 @@ impl Future for GetMessageFuture {
}
}
+/// Get message from host as guest worker
fn op_worker_get_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
@@ -1775,6 +1790,7 @@ fn op_worker_get_message(
Box::new(op)
}
+/// Post message to host as guest worker
fn op_worker_post_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
@@ -1807,3 +1823,143 @@ fn op_worker_post_message(
});
Box::new(op)
}
+
+/// Create worker as the host
+fn op_create_worker(
+ sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_create_worker().unwrap();
+ let specifier = inner.specifier().unwrap();
+
+ Box::new(futures::future::result(move || -> OpResult {
+ let parent_state = sc.state().clone();
+ let behavior = workers::UserWorkerBehavior::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ );
+ match workers::spawn(
+ behavior,
+ &format!("USER-WORKER-{}", specifier),
+ workers::WorkerInit::Module(specifier.to_string()),
+ ) {
+ Ok(worker) => {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ let rid = worker.resource.rid.clone();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner = msg::CreateWorkerRes::create(
+ builder,
+ &msg::CreateWorkerResArgs { rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ ))
+ }
+ Err(errors::RustOrJsError::Js(_)) => Err(errors::worker_init_failed()),
+ Err(errors::RustOrJsError::Rust(err)) => Err(err),
+ }
+ }()))
+}
+
+/// Return when the worker closes
+fn op_host_get_worker_closed(
+ sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_worker_closed().unwrap();
+ let rid = inner.rid();
+ let state = sc.state().clone();
+
+ let shared_worker_future = {
+ let workers_tl = state.workers.lock().unwrap();
+ let worker = workers_tl.get(&rid).unwrap();
+ worker.clone()
+ };
+
+ Box::new(shared_worker_future.then(move |_result| {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ }))
+}
+
+/// Get message from guest worker as host
+fn op_host_get_message(
+ _sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_message().unwrap();
+ let rid = inner.rid();
+
+ let op = resources::get_message_from_worker(rid);
+ let op = op.map_err(move |_| -> DenoError { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let msg_inner = msg::HostGetMessageRes::create(
+ builder,
+ &msg::HostGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::HostGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
+
+/// Post message to guest worker as host
+fn op_host_post_message(
+ _sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_post_message().unwrap();
+ let rid = inner.rid();
+
+ let d = Vec::from(data.as_ref()).into_boxed_slice();
+
+ let op = resources::post_message_to_worker(rid, d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}