summaryrefslogtreecommitdiff
path: root/runtime/ops/worker_host.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2021-06-22 16:30:16 +0200
committerGitHub <noreply@github.com>2021-06-22 16:30:16 +0200
commit6261c89e04b8f1a3aabc771dbc8cddad904710e9 (patch)
tree6b1814991d77b8f238e507aa2f52e93589f3c0a1 /runtime/ops/worker_host.rs
parent0a2ced57285aa0ee4b47426382c32fb53c4e07cd (diff)
feat: transfer MessagePort between workers (#11076)
Add support for transferring `MessagePort`s between workers.
Diffstat (limited to 'runtime/ops/worker_host.rs')
-rw-r--r--runtime/ops/worker_host.rs139
1 files changed, 61 insertions, 78 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index 57d3ac2b8..162f9f4f7 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -12,25 +12,23 @@ use crate::permissions::UnaryPermission;
use crate::permissions::UnitPermission;
use crate::permissions::WriteDescriptor;
use crate::web_worker::run_web_worker;
+use crate::web_worker::SendableWebWorkerHandle;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
-use crate::web_worker::WorkerEvent;
+use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use deno_core::error::custom_error;
-use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
-use deno_core::error::JsError;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::de;
use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer;
-use deno_core::serde_json::json;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
-use deno_core::ZeroCopyBuf;
+use deno_web::JsMessageData;
use log::debug;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -51,8 +49,9 @@ pub struct CreateWebWorkerArgs {
pub use_deno_namespace: bool,
}
-pub type CreateWebWorkerCb =
- dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send;
+pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
+ + Sync
+ + Send;
/// A holder for callback that is used to create a new
/// WebWorker. It's a struct instead of a type alias
@@ -87,7 +86,8 @@ pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
op_sync(op_host_terminate_worker),
),
("op_host_post_message", op_sync(op_host_post_message)),
- ("op_host_get_message", op_async(op_host_get_message)),
+ ("op_host_recv_ctrl", op_async(op_host_recv_ctrl)),
+ ("op_host_recv_message", op_async(op_host_recv_message)),
])
.build()
}
@@ -458,8 +458,9 @@ fn op_create_worker(
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1);
+ let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::<
+ Result<SendableWebWorkerHandle, AnyError>,
+ >(1);
// Setup new thread
let thread_builder =
@@ -472,17 +473,18 @@ fn op_create_worker(
// all action done upon it should be noops
// - newly spawned thread exits
- let worker = (create_module_loader.0)(CreateWebWorkerArgs {
- name: worker_name,
- worker_id,
- parent_permissions,
- permissions: worker_permissions,
- main_module: module_specifier.clone(),
- use_deno_namespace,
- });
+ let (worker, external_handle) =
+ (create_module_loader.0)(CreateWebWorkerArgs {
+ name: worker_name,
+ worker_id,
+ parent_permissions,
+ permissions: worker_permissions,
+ main_module: module_specifier.clone(),
+ use_deno_namespace,
+ });
// Send thread safe handle from newly created worker to host thread
- handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ handle_sender.send(Ok(external_handle)).unwrap();
drop(handle_sender);
// At this point the only method of communication with host
@@ -497,7 +499,7 @@ fn op_create_worker(
let worker_thread = WorkerThread {
join_handle,
- worker_handle,
+ worker_handle: worker_handle.into(),
};
// At this point all interactions with worker happen using thread
@@ -514,7 +516,7 @@ fn op_host_terminate_worker(
id: WorkerId,
_: (),
) -> Result<(), AnyError> {
- let mut worker_thread = state
+ let worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
@@ -527,52 +529,13 @@ fn op_host_terminate_worker(
Ok(())
}
-use deno_core::serde::Serialize;
-use deno_core::serde::Serializer;
-
-impl Serialize for WorkerEvent {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- let type_id = match &self {
- WorkerEvent::Message(_) => 0_i32,
- WorkerEvent::TerminalError(_) => 1_i32,
- WorkerEvent::Error(_) => 2_i32,
- WorkerEvent::Close => 3_i32,
- };
-
- match self {
- WorkerEvent::Message(buf) => {
- Serialize::serialize(&(type_id, buf), serializer)
- }
- WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
- let value = match error.downcast_ref::<JsError>() {
- Some(js_error) => json!({
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }),
- None => json!({
- "message": error.to_string(),
- }),
- };
-
- Serialize::serialize(&(type_id, value), serializer)
- }
- _ => Serialize::serialize(&(type_id, ()), serializer),
- }
- }
-}
-
/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in
/// table - in that case ignore.
fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
- if let Some(mut worker_thread) = workers.remove(&id) {
+ if let Some(worker_thread) = workers.remove(&id) {
worker_thread.worker_handle.terminate();
worker_thread
.join_handle
@@ -582,28 +545,28 @@ fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
}
}
-/// Get message from guest worker as host
-async fn op_host_get_message(
+/// Get control event from guest worker as host
+async fn op_host_recv_ctrl(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
-) -> Result<WorkerEvent, AnyError> {
+) -> Result<WorkerControlEvent, AnyError> {
let worker_handle = {
- let s = state.borrow();
- let workers_table = s.borrow::<WorkersTable>();
+ let state = state.borrow();
+ let workers_table = state.borrow::<WorkersTable>();
let maybe_handle = workers_table.get(&id);
if let Some(handle) = maybe_handle {
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
- return Ok(WorkerEvent::Close);
+ return Ok(WorkerControlEvent::Close);
}
};
- let maybe_event = worker_handle.get_event().await?;
+ let maybe_event = worker_handle.get_control_event().await?;
if let Some(event) = maybe_event {
// Terminal error means that worker should be removed from worker table.
- if let WorkerEvent::TerminalError(_) = &event {
+ if let WorkerControlEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
return Ok(event);
@@ -611,22 +574,42 @@ async fn op_host_get_message(
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
- Ok(WorkerEvent::Close)
+ Ok(WorkerControlEvent::Close)
+}
+
+async fn op_host_recv_message(
+ state: Rc<RefCell<OpState>>,
+ id: WorkerId,
+ _: (),
+) -> Result<Option<JsMessageData>, AnyError> {
+ let worker_handle = {
+ let s = state.borrow();
+ let workers_table = s.borrow::<WorkersTable>();
+ let maybe_handle = workers_table.get(&id);
+ if let Some(handle) = maybe_handle {
+ handle.worker_handle.clone()
+ } else {
+ // If handle was not found it means worker has already shutdown
+ return Ok(None);
+ }
+ };
+ worker_handle.port.recv(state).await
}
/// Post message to guest worker as host
fn op_host_post_message(
state: &mut OpState,
id: WorkerId,
- data: Option<ZeroCopyBuf>,
+ data: JsMessageData,
) -> Result<(), AnyError> {
- let msg = data.ok_or_else(null_opbuf)?;
-
debug!("post message to worker {}", id);
- let worker_thread = state
- .borrow::<WorkersTable>()
- .get(&id)
- .expect("No worker handle found");
- worker_thread.worker_handle.post_message(msg)?;
+ let worker_handle = {
+ let worker_thread = state
+ .borrow::<WorkersTable>()
+ .get(&id)
+ .expect("No worker handle found");
+ worker_thread.worker_handle.clone()
+ };
+ worker_handle.port.send(state, data)?;
Ok(())
}