diff options
author | Luca Casonato <hello@lcas.dev> | 2021-06-22 16:30:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-22 16:30:16 +0200 |
commit | 6261c89e04b8f1a3aabc771dbc8cddad904710e9 (patch) | |
tree | 6b1814991d77b8f238e507aa2f52e93589f3c0a1 /runtime/ops/worker_host.rs | |
parent | 0a2ced57285aa0ee4b47426382c32fb53c4e07cd (diff) |
feat: transfer MessagePort between workers (#11076)
Add support for transferring `MessagePort`s between workers.
Diffstat (limited to 'runtime/ops/worker_host.rs')
-rw-r--r-- | runtime/ops/worker_host.rs | 139 |
1 files changed, 61 insertions, 78 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 57d3ac2b8..162f9f4f7 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -12,25 +12,23 @@ use crate::permissions::UnaryPermission; use crate::permissions::UnitPermission; use crate::permissions::WriteDescriptor; use crate::web_worker::run_web_worker; +use crate::web_worker::SendableWebWorkerHandle; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; -use crate::web_worker::WorkerEvent; +use crate::web_worker::WorkerControlEvent; use crate::web_worker::WorkerId; use deno_core::error::custom_error; -use deno_core::error::null_opbuf; use deno_core::error::AnyError; -use deno_core::error::JsError; use deno_core::op_async; use deno_core::op_sync; use deno_core::serde::de; use deno_core::serde::de::SeqAccess; use deno_core::serde::Deserialize; use deno_core::serde::Deserializer; -use deno_core::serde_json::json; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; -use deno_core::ZeroCopyBuf; +use deno_web::JsMessageData; use log::debug; use std::cell::RefCell; use std::collections::HashMap; @@ -51,8 +49,9 @@ pub struct CreateWebWorkerArgs { pub use_deno_namespace: bool, } -pub type CreateWebWorkerCb = - dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send; +pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle) + + Sync + + Send; /// A holder for callback that is used to create a new /// WebWorker. It's a struct instead of a type alias @@ -87,7 +86,8 @@ pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension { op_sync(op_host_terminate_worker), ), ("op_host_post_message", op_sync(op_host_post_message)), - ("op_host_get_message", op_async(op_host_get_message)), + ("op_host_recv_ctrl", op_async(op_host_recv_ctrl)), + ("op_host_recv_message", op_async(op_host_recv_message)), ]) .build() } @@ -458,8 +458,9 @@ fn op_create_worker( let module_specifier = deno_core::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1); + let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::< + Result<SendableWebWorkerHandle, AnyError>, + >(1); // Setup new thread let thread_builder = @@ -472,17 +473,18 @@ fn op_create_worker( // all action done upon it should be noops // - newly spawned thread exits - let worker = (create_module_loader.0)(CreateWebWorkerArgs { - name: worker_name, - worker_id, - parent_permissions, - permissions: worker_permissions, - main_module: module_specifier.clone(), - use_deno_namespace, - }); + let (worker, external_handle) = + (create_module_loader.0)(CreateWebWorkerArgs { + name: worker_name, + worker_id, + parent_permissions, + permissions: worker_permissions, + main_module: module_specifier.clone(), + use_deno_namespace, + }); // Send thread safe handle from newly created worker to host thread - handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + handle_sender.send(Ok(external_handle)).unwrap(); drop(handle_sender); // At this point the only method of communication with host @@ -497,7 +499,7 @@ fn op_create_worker( let worker_thread = WorkerThread { join_handle, - worker_handle, + worker_handle: worker_handle.into(), }; // At this point all interactions with worker happen using thread @@ -514,7 +516,7 @@ fn op_host_terminate_worker( id: WorkerId, _: (), ) -> Result<(), AnyError> { - let mut worker_thread = state + let worker_thread = state .borrow_mut::<WorkersTable>() .remove(&id) .expect("No worker handle found"); @@ -527,52 +529,13 @@ fn op_host_terminate_worker( Ok(()) } -use deno_core::serde::Serialize; -use deno_core::serde::Serializer; - -impl Serialize for WorkerEvent { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - let type_id = match &self { - WorkerEvent::Message(_) => 0_i32, - WorkerEvent::TerminalError(_) => 1_i32, - WorkerEvent::Error(_) => 2_i32, - WorkerEvent::Close => 3_i32, - }; - - match self { - WorkerEvent::Message(buf) => { - Serialize::serialize(&(type_id, buf), serializer) - } - WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => { - let value = match error.downcast_ref::<JsError>() { - Some(js_error) => json!({ - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - }), - None => json!({ - "message": error.to_string(), - }), - }; - - Serialize::serialize(&(type_id, value), serializer) - } - _ => Serialize::serialize(&(type_id, ()), serializer), - } - } -} - /// Try to remove worker from workers table - NOTE: `Worker.terminate()` /// might have been called already meaning that we won't find worker in /// table - in that case ignore. fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) { let mut s = state.borrow_mut(); let workers = s.borrow_mut::<WorkersTable>(); - if let Some(mut worker_thread) = workers.remove(&id) { + if let Some(worker_thread) = workers.remove(&id) { worker_thread.worker_handle.terminate(); worker_thread .join_handle @@ -582,28 +545,28 @@ fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) { } } -/// Get message from guest worker as host -async fn op_host_get_message( +/// Get control event from guest worker as host +async fn op_host_recv_ctrl( state: Rc<RefCell<OpState>>, id: WorkerId, _: (), -) -> Result<WorkerEvent, AnyError> { +) -> Result<WorkerControlEvent, AnyError> { let worker_handle = { - let s = state.borrow(); - let workers_table = s.borrow::<WorkersTable>(); + let state = state.borrow(); + let workers_table = state.borrow::<WorkersTable>(); let maybe_handle = workers_table.get(&id); if let Some(handle) = maybe_handle { handle.worker_handle.clone() } else { // If handle was not found it means worker has already shutdown - return Ok(WorkerEvent::Close); + return Ok(WorkerControlEvent::Close); } }; - let maybe_event = worker_handle.get_event().await?; + let maybe_event = worker_handle.get_control_event().await?; if let Some(event) = maybe_event { // Terminal error means that worker should be removed from worker table. - if let WorkerEvent::TerminalError(_) = &event { + if let WorkerControlEvent::TerminalError(_) = &event { try_remove_and_close(state, id); } return Ok(event); @@ -611,22 +574,42 @@ async fn op_host_get_message( // If there was no event from worker it means it has already been closed. try_remove_and_close(state, id); - Ok(WorkerEvent::Close) + Ok(WorkerControlEvent::Close) +} + +async fn op_host_recv_message( + state: Rc<RefCell<OpState>>, + id: WorkerId, + _: (), +) -> Result<Option<JsMessageData>, AnyError> { + let worker_handle = { + let s = state.borrow(); + let workers_table = s.borrow::<WorkersTable>(); + let maybe_handle = workers_table.get(&id); + if let Some(handle) = maybe_handle { + handle.worker_handle.clone() + } else { + // If handle was not found it means worker has already shutdown + return Ok(None); + } + }; + worker_handle.port.recv(state).await } /// Post message to guest worker as host fn op_host_post_message( state: &mut OpState, id: WorkerId, - data: Option<ZeroCopyBuf>, + data: JsMessageData, ) -> Result<(), AnyError> { - let msg = data.ok_or_else(null_opbuf)?; - debug!("post message to worker {}", id); - let worker_thread = state - .borrow::<WorkersTable>() - .get(&id) - .expect("No worker handle found"); - worker_thread.worker_handle.post_message(msg)?; + let worker_handle = { + let worker_thread = state + .borrow::<WorkersTable>() + .get(&id) + .expect("No worker handle found"); + worker_thread.worker_handle.clone() + }; + worker_handle.port.send(state, data)?; Ok(()) } |