summaryrefslogtreecommitdiff
path: root/runtime/web_worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/web_worker.rs')
-rw-r--r--runtime/web_worker.rs295
1 files changed, 101 insertions, 194 deletions
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 753238052..a3a062221 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -8,6 +8,7 @@ use crate::permissions::Permissions;
use crate::tokio_util::create_basic_runtime;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError;
+use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
@@ -18,6 +19,7 @@ use deno_core::serde::Serialize;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::v8;
+use deno_core::CancelHandle;
use deno_core::Extension;
use deno_core::GetErrorClassFn;
use deno_core::JsErrorCreateFn;
@@ -26,8 +28,9 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
-use deno_core::ZeroCopyBuf;
+use deno_web::create_entangled_message_port;
use deno_web::BlobUrlStore;
+use deno_web::MessagePort;
use log::debug;
use std::cell::RefCell;
use std::env;
@@ -38,7 +41,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
-use tokio::sync::Mutex as AsyncMutex;
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
@@ -55,29 +57,62 @@ impl WorkerId {
}
}
-type WorkerMessage = ZeroCopyBuf;
-
/// Events that are sent to host from child
/// worker.
-pub enum WorkerEvent {
- Message(WorkerMessage),
+pub enum WorkerControlEvent {
Error(AnyError),
TerminalError(AnyError),
Close,
}
+use deno_core::serde::Serializer;
+
+impl Serialize for WorkerControlEvent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let type_id = match &self {
+ WorkerControlEvent::TerminalError(_) => 1_i32,
+ WorkerControlEvent::Error(_) => 2_i32,
+ WorkerControlEvent::Close => 3_i32,
+ };
+
+ match self {
+ WorkerControlEvent::TerminalError(error)
+ | WorkerControlEvent::Error(error) => {
+ let value = match error.downcast_ref::<JsError>() {
+ Some(js_error) => json!({
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }),
+ None => json!({
+ "message": error.to_string(),
+ }),
+ };
+
+ Serialize::serialize(&(type_id, value), serializer)
+ }
+ _ => Serialize::serialize(&(type_id, ()), serializer),
+ }
+ }
+}
+
// Channels used for communication with worker's parent
#[derive(Clone)]
pub struct WebWorkerInternalHandle {
- sender: mpsc::Sender<WorkerEvent>,
- receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>,
+ sender: mpsc::Sender<WorkerControlEvent>,
+ pub port: Rc<MessagePort>,
+ pub cancel: Rc<CancelHandle>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
impl WebWorkerInternalHandle {
/// Post WorkerEvent to parent as a worker
- pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> {
+ pub fn post_event(&self, event: WorkerControlEvent) -> Result<(), AnyError> {
let mut sender = self.sender.clone();
// If the channel is closed,
// the worker must have terminated but the termination message has not yet been received.
@@ -91,13 +126,6 @@ impl WebWorkerInternalHandle {
Ok(())
}
- /// Get the WorkerEvent with lock
- /// Panic if more than one listener tries to get event
- pub async fn get_message(&self) -> Option<WorkerMessage> {
- let mut receiver = self.receiver.borrow_mut();
- receiver.next().await
- }
-
/// Check if this worker is terminated or being terminated
pub fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst)
@@ -106,6 +134,8 @@ impl WebWorkerInternalHandle {
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(&mut self) {
+ self.cancel.cancel();
+
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
@@ -121,40 +151,52 @@ impl WebWorkerInternalHandle {
}
}
-#[derive(Clone)]
-pub struct WebWorkerHandle {
- sender: mpsc::Sender<WorkerMessage>,
- receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
+pub struct SendableWebWorkerHandle {
+ port: MessagePort,
+ receiver: mpsc::Receiver<WorkerControlEvent>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
-impl WebWorkerHandle {
- /// Post WorkerMessage to worker as a host
- pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> {
- let mut sender = self.sender.clone();
- // If the channel is closed,
- // the worker must have terminated but the termination message has not yet been recieved.
- //
- // Therefore just treat it as if the worker has terminated and return.
- if sender.is_closed() {
- self.terminated.store(true, Ordering::SeqCst);
- return Ok(());
+impl From<SendableWebWorkerHandle> for WebWorkerHandle {
+ fn from(handle: SendableWebWorkerHandle) -> Self {
+ WebWorkerHandle {
+ receiver: Rc::new(RefCell::new(handle.receiver)),
+ port: Rc::new(handle.port),
+ terminated: handle.terminated,
+ isolate_handle: handle.isolate_handle,
}
- sender.try_send(buf)?;
- Ok(())
}
+}
+
+/// This is the handle to the web worker that the parent thread uses to
+/// communicate with the worker. It is created from a `SendableWebWorkerHandle`
+/// which is sent to the parent thread from the worker thread where it is
+/// created. The reason for this seperation is that the handle first needs to be
+/// `Send` when transferring between threads, and then must be `Clone` when it
+/// has arrived on the parent thread. It can not be both at once without large
+/// amounts of Arc<Mutex> and other fun stuff.
+#[derive(Clone)]
+pub struct WebWorkerHandle {
+ pub port: Rc<MessagePort>,
+ receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
+ terminated: Arc<AtomicBool>,
+ isolate_handle: v8::IsolateHandle,
+}
+impl WebWorkerHandle {
/// Get the WorkerEvent with lock
/// Return error if more than one listener tries to get event
- pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> {
- let mut receiver = self.receiver.try_lock()?;
+ pub async fn get_control_event(
+ &self,
+ ) -> Result<Option<WorkerControlEvent>, AnyError> {
+ let mut receiver = self.receiver.borrow_mut();
Ok(receiver.next().await)
}
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
- pub fn terminate(&mut self) {
+ pub fn terminate(self) {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
@@ -165,26 +207,26 @@ impl WebWorkerHandle {
self.isolate_handle.terminate_execution();
}
- // Wake web worker by closing the channel
- self.sender.close_channel();
+ self.port.disentangle();
}
}
fn create_handles(
isolate_handle: v8::IsolateHandle,
-) -> (WebWorkerInternalHandle, WebWorkerHandle) {
- let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1);
- let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
+) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
+ let (parent_port, worker_port) = create_entangled_message_port();
+ let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
let terminated = Arc::new(AtomicBool::new(false));
let internal_handle = WebWorkerInternalHandle {
- sender: out_tx,
- receiver: Rc::new(RefCell::new(in_rx)),
+ sender: ctrl_tx,
+ port: Rc::new(parent_port),
terminated: terminated.clone(),
isolate_handle: isolate_handle.clone(),
+ cancel: CancelHandle::new_rc(),
};
- let external_handle = WebWorkerHandle {
- sender: in_tx,
- receiver: Arc::new(AsyncMutex::new(out_rx)),
+ let external_handle = SendableWebWorkerHandle {
+ receiver: ctrl_rx,
+ port: worker_port,
terminated,
isolate_handle,
};
@@ -200,7 +242,6 @@ pub struct WebWorker {
pub js_runtime: JsRuntime,
pub name: String,
internal_handle: WebWorkerInternalHandle,
- external_handle: WebWorkerHandle,
pub use_deno_namespace: bool,
pub main_module: ModuleSpecifier,
}
@@ -237,7 +278,7 @@ impl WebWorker {
main_module: ModuleSpecifier,
worker_id: WorkerId,
options: &WebWorkerOptions,
- ) -> Self {
+ ) -> (Self, SendableWebWorkerHandle) {
// Permissions: many ops depend on this
let unstable = options.unstable;
let perm_ext = Extension::builder()
@@ -333,15 +374,17 @@ impl WebWorker {
(internal_handle, external_handle)
};
- Self {
- id: worker_id,
- js_runtime,
- name,
- internal_handle,
+ (
+ Self {
+ id: worker_id,
+ js_runtime,
+ name,
+ internal_handle,
+ use_deno_namespace: options.use_deno_namespace,
+ main_module,
+ },
external_handle,
- use_deno_namespace: options.use_deno_namespace,
- main_module,
- }
+ )
}
pub fn bootstrap(&mut self, options: &WebWorkerOptions) {
@@ -419,11 +462,6 @@ impl WebWorker {
}
}
- /// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WebWorkerHandle {
- self.external_handle.clone()
- }
-
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
@@ -446,7 +484,7 @@ impl WebWorker {
print_worker_error(e.to_string(), &self.name);
let handle = self.internal_handle.clone();
handle
- .post_event(WorkerEvent::Error(e))
+ .post_event(WorkerControlEvent::Error(e))
.expect("Failed to post message to host");
return Poll::Pending;
@@ -513,7 +551,7 @@ pub fn run_web_worker(
if let Err(e) = result {
print_worker_error(e.to_string(), &name);
internal_handle
- .post_event(WorkerEvent::TerminalError(e))
+ .post_event(WorkerControlEvent::TerminalError(e))
.expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye.
@@ -524,134 +562,3 @@ pub fn run_web_worker(
debug!("Worker thread shuts down {}", &name);
result
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::tokio_util;
-
- fn create_test_web_worker() -> WebWorker {
- let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap();
- let module_loader = Rc::new(deno_core::NoopModuleLoader);
- let create_web_worker_cb = Arc::new(|_| unreachable!());
-
- let options = WebWorkerOptions {
- args: vec![],
- apply_source_maps: false,
- debug_flag: false,
- unstable: false,
- ca_data: None,
- user_agent: "x".to_string(),
- seed: None,
- module_loader,
- create_web_worker_cb,
- js_error_create_fn: None,
- use_deno_namespace: false,
- maybe_inspector_server: None,
- runtime_version: "x".to_string(),
- ts_version: "x".to_string(),
- no_color: true,
- get_error_class_fn: None,
- blob_url_store: BlobUrlStore::default(),
- broadcast_channel: InMemoryBroadcastChannel::default(),
- };
-
- let mut worker = WebWorker::from_options(
- "TEST".to_string(),
- Permissions::allow_all(),
- main_module,
- WorkerId(1),
- &options,
- );
- worker.bootstrap(&options);
- worker
- }
-
- #[tokio::test]
- async fn test_worker_messages() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_web_worker();
- let source = r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- return close();
- } else {
- console.assert(e.data === "hi");
- }
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#;
- worker.execute_script("a", source).unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop(false));
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
- let r = handle.post_message(msg.clone().into());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
-
- let r = handle.post_message(msg.clone().into());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
- match maybe_msg {
- Some(WorkerEvent::Message(buf)) => {
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]);
- }
- _ => unreachable!(),
- }
-
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded
- let r = handle.post_message(msg.into());
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
- join_handle.join().expect("Failed to join worker thread");
- }
-
- #[tokio::test]
- async fn removed_from_resource_table_on_close() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_web_worker();
- worker
- .execute_script("a", "onmessage = () => { close(); }")
- .unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop(false));
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
- let r = handle.post_message(msg.clone().into());
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
-
- join_handle.join().expect("Failed to join worker thread");
- }
-}