diff options
author | andy finch <andyfinch7@gmail.com> | 2019-04-01 15:09:59 -0400 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-04-01 15:09:59 -0400 |
commit | b0a23beb8fae964be3cdd8c23c38af66257d34c7 (patch) | |
tree | 8f7875c8ca059dfb0a3ade4da7bfb94e57d6e1aa /cli/ops.rs | |
parent | 659acadf77fdbeef8579a37839a464feb408437a (diff) |
Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled.
* Share the worker future as a Shared
Diffstat (limited to 'cli/ops.rs')
-rw-r--r-- | cli/ops.rs | 160 |
1 files changed, 158 insertions, 2 deletions
diff --git a/cli/ops.rs b/cli/ops.rs index a7c2e868f..c8119771d 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -18,6 +18,7 @@ use crate::resources::Resource; use crate::tokio_util; use crate::tokio_write; use crate::version; +use crate::workers; use deno::deno_buf; use deno::Buf; use deno::JSError; @@ -141,13 +142,24 @@ pub fn dispatch_all( (base.sync(), boxed_op) } +/// Superset of op_selector_worker for compiler isolates pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> { match inner_type { msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data), + _ => op_selector_worker(inner_type), + } +} + +/// Superset of op_selector_std for worker isolates +pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> { + match inner_type { + msg::Any::WorkerGetMessage => Some(op_worker_get_message), + msg::Any::WorkerPostMessage => Some(op_worker_post_message), _ => op_selector_std(inner_type), } } +/// Standard ops set for most isolates pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> { match inner_type { msg::Any::Accept => Some(op_accept), @@ -189,8 +201,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> { msg::Any::Stat => Some(op_stat), msg::Any::Symlink => Some(op_symlink), msg::Any::Truncate => Some(op_truncate), - msg::Any::WorkerGetMessage => Some(op_worker_get_message), - msg::Any::WorkerPostMessage => Some(op_worker_post_message), + msg::Any::CreateWorker => Some(op_create_worker), + msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), + msg::Any::HostGetMessage => Some(op_host_get_message), + msg::Any::HostPostMessage => Some(op_host_post_message), msg::Any::Write => Some(op_write), _ => None, } @@ -1741,6 +1755,7 @@ impl Future for GetMessageFuture { } } +/// Get message from host as guest worker fn op_worker_get_message( sc: &IsolateStateContainer, base: &msg::Base<'_>, @@ -1775,6 +1790,7 @@ fn op_worker_get_message( Box::new(op) } +/// Post message to host as guest worker fn op_worker_post_message( sc: &IsolateStateContainer, base: &msg::Base<'_>, @@ -1807,3 +1823,143 @@ fn op_worker_post_message( }); Box::new(op) } + +/// Create worker as the host +fn op_create_worker( + sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_create_worker().unwrap(); + let specifier = inner.specifier().unwrap(); + + Box::new(futures::future::result(move || -> OpResult { + let parent_state = sc.state().clone(); + let behavior = workers::UserWorkerBehavior::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + ); + match workers::spawn( + behavior, + &format!("USER-WORKER-{}", specifier), + workers::WorkerInit::Module(specifier.to_string()), + ) { + Ok(worker) => { + let mut workers_tl = parent_state.workers.lock().unwrap(); + let rid = worker.resource.rid.clone(); + workers_tl.insert(rid, worker.shared()); + let builder = &mut FlatBufferBuilder::new(); + let msg_inner = msg::CreateWorkerRes::create( + builder, + &msg::CreateWorkerResArgs { rid }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::CreateWorkerRes, + ..Default::default() + }, + )) + } + Err(errors::RustOrJsError::Js(_)) => Err(errors::worker_init_failed()), + Err(errors::RustOrJsError::Rust(err)) => Err(err), + } + }())) +} + +/// Return when the worker closes +fn op_host_get_worker_closed( + sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_worker_closed().unwrap(); + let rid = inner.rid(); + let state = sc.state().clone(); + + let shared_worker_future = { + let workers_tl = state.workers.lock().unwrap(); + let worker = workers_tl.get(&rid).unwrap(); + worker.clone() + }; + + Box::new(shared_worker_future.then(move |_result| { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + })) +} + +/// Get message from guest worker as host +fn op_host_get_message( + _sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_message().unwrap(); + let rid = inner.rid(); + + let op = resources::get_message_from_worker(rid); + let op = op.map_err(move |_| -> DenoError { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> { + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let msg_inner = msg::HostGetMessageRes::create( + builder, + &msg::HostGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::HostGetMessageRes, + ..Default::default() + }, + )) + }); + Box::new(op) +} + +/// Post message to guest worker as host +fn op_host_post_message( + _sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_post_message().unwrap(); + let rid = inner.rid(); + + let d = Vec::from(data.as_ref()).into_boxed_slice(); + + let op = resources::post_message_to_worker(rid, d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult<Buf> { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) +} |