diff options
Diffstat (limited to 'runtime/web_worker.rs')
-rw-r--r-- | runtime/web_worker.rs | 295 |
1 files changed, 101 insertions, 194 deletions
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 753238052..a3a062221 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -8,6 +8,7 @@ use crate::permissions::Permissions; use crate::tokio_util::create_basic_runtime; use deno_broadcast_channel::InMemoryBroadcastChannel; use deno_core::error::AnyError; +use deno_core::error::JsError; use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; @@ -18,6 +19,7 @@ use deno_core::serde::Serialize; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::v8; +use deno_core::CancelHandle; use deno_core::Extension; use deno_core::GetErrorClassFn; use deno_core::JsErrorCreateFn; @@ -26,8 +28,9 @@ use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; -use deno_core::ZeroCopyBuf; +use deno_web::create_entangled_message_port; use deno_web::BlobUrlStore; +use deno_web::MessagePort; use log::debug; use std::cell::RefCell; use std::env; @@ -38,7 +41,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use tokio::sync::Mutex as AsyncMutex; #[derive( Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, @@ -55,29 +57,62 @@ impl WorkerId { } } -type WorkerMessage = ZeroCopyBuf; - /// Events that are sent to host from child /// worker. -pub enum WorkerEvent { - Message(WorkerMessage), +pub enum WorkerControlEvent { Error(AnyError), TerminalError(AnyError), Close, } +use deno_core::serde::Serializer; + +impl Serialize for WorkerControlEvent { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let type_id = match &self { + WorkerControlEvent::TerminalError(_) => 1_i32, + WorkerControlEvent::Error(_) => 2_i32, + WorkerControlEvent::Close => 3_i32, + }; + + match self { + WorkerControlEvent::TerminalError(error) + | WorkerControlEvent::Error(error) => { + let value = match error.downcast_ref::<JsError>() { + Some(js_error) => json!({ + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + }), + None => json!({ + "message": error.to_string(), + }), + }; + + Serialize::serialize(&(type_id, value), serializer) + } + _ => Serialize::serialize(&(type_id, ()), serializer), + } + } +} + // Channels used for communication with worker's parent #[derive(Clone)] pub struct WebWorkerInternalHandle { - sender: mpsc::Sender<WorkerEvent>, - receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>, + sender: mpsc::Sender<WorkerControlEvent>, + pub port: Rc<MessagePort>, + pub cancel: Rc<CancelHandle>, terminated: Arc<AtomicBool>, isolate_handle: v8::IsolateHandle, } impl WebWorkerInternalHandle { /// Post WorkerEvent to parent as a worker - pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { + pub fn post_event(&self, event: WorkerControlEvent) -> Result<(), AnyError> { let mut sender = self.sender.clone(); // If the channel is closed, // the worker must have terminated but the termination message has not yet been received. @@ -91,13 +126,6 @@ impl WebWorkerInternalHandle { Ok(()) } - /// Get the WorkerEvent with lock - /// Panic if more than one listener tries to get event - pub async fn get_message(&self) -> Option<WorkerMessage> { - let mut receiver = self.receiver.borrow_mut(); - receiver.next().await - } - /// Check if this worker is terminated or being terminated pub fn is_terminated(&self) -> bool { self.terminated.load(Ordering::SeqCst) @@ -106,6 +134,8 @@ impl WebWorkerInternalHandle { /// Terminate the worker /// This function will set terminated to true, terminate the isolate and close the message channel pub fn terminate(&mut self) { + self.cancel.cancel(); + // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. @@ -121,40 +151,52 @@ impl WebWorkerInternalHandle { } } -#[derive(Clone)] -pub struct WebWorkerHandle { - sender: mpsc::Sender<WorkerMessage>, - receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>, +pub struct SendableWebWorkerHandle { + port: MessagePort, + receiver: mpsc::Receiver<WorkerControlEvent>, terminated: Arc<AtomicBool>, isolate_handle: v8::IsolateHandle, } -impl WebWorkerHandle { - /// Post WorkerMessage to worker as a host - pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> { - let mut sender = self.sender.clone(); - // If the channel is closed, - // the worker must have terminated but the termination message has not yet been recieved. - // - // Therefore just treat it as if the worker has terminated and return. - if sender.is_closed() { - self.terminated.store(true, Ordering::SeqCst); - return Ok(()); +impl From<SendableWebWorkerHandle> for WebWorkerHandle { + fn from(handle: SendableWebWorkerHandle) -> Self { + WebWorkerHandle { + receiver: Rc::new(RefCell::new(handle.receiver)), + port: Rc::new(handle.port), + terminated: handle.terminated, + isolate_handle: handle.isolate_handle, } - sender.try_send(buf)?; - Ok(()) } +} + +/// This is the handle to the web worker that the parent thread uses to +/// communicate with the worker. It is created from a `SendableWebWorkerHandle` +/// which is sent to the parent thread from the worker thread where it is +/// created. The reason for this seperation is that the handle first needs to be +/// `Send` when transferring between threads, and then must be `Clone` when it +/// has arrived on the parent thread. It can not be both at once without large +/// amounts of Arc<Mutex> and other fun stuff. +#[derive(Clone)] +pub struct WebWorkerHandle { + pub port: Rc<MessagePort>, + receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>, + terminated: Arc<AtomicBool>, + isolate_handle: v8::IsolateHandle, +} +impl WebWorkerHandle { /// Get the WorkerEvent with lock /// Return error if more than one listener tries to get event - pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> { - let mut receiver = self.receiver.try_lock()?; + pub async fn get_control_event( + &self, + ) -> Result<Option<WorkerControlEvent>, AnyError> { + let mut receiver = self.receiver.borrow_mut(); Ok(receiver.next().await) } /// Terminate the worker /// This function will set terminated to true, terminate the isolate and close the message channel - pub fn terminate(&mut self) { + pub fn terminate(self) { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. @@ -165,26 +207,26 @@ impl WebWorkerHandle { self.isolate_handle.terminate_execution(); } - // Wake web worker by closing the channel - self.sender.close_channel(); + self.port.disentangle(); } } fn create_handles( isolate_handle: v8::IsolateHandle, -) -> (WebWorkerInternalHandle, WebWorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1); - let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1); +) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) { + let (parent_port, worker_port) = create_entangled_message_port(); + let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1); let terminated = Arc::new(AtomicBool::new(false)); let internal_handle = WebWorkerInternalHandle { - sender: out_tx, - receiver: Rc::new(RefCell::new(in_rx)), + sender: ctrl_tx, + port: Rc::new(parent_port), terminated: terminated.clone(), isolate_handle: isolate_handle.clone(), + cancel: CancelHandle::new_rc(), }; - let external_handle = WebWorkerHandle { - sender: in_tx, - receiver: Arc::new(AsyncMutex::new(out_rx)), + let external_handle = SendableWebWorkerHandle { + receiver: ctrl_rx, + port: worker_port, terminated, isolate_handle, }; @@ -200,7 +242,6 @@ pub struct WebWorker { pub js_runtime: JsRuntime, pub name: String, internal_handle: WebWorkerInternalHandle, - external_handle: WebWorkerHandle, pub use_deno_namespace: bool, pub main_module: ModuleSpecifier, } @@ -237,7 +278,7 @@ impl WebWorker { main_module: ModuleSpecifier, worker_id: WorkerId, options: &WebWorkerOptions, - ) -> Self { + ) -> (Self, SendableWebWorkerHandle) { // Permissions: many ops depend on this let unstable = options.unstable; let perm_ext = Extension::builder() @@ -333,15 +374,17 @@ impl WebWorker { (internal_handle, external_handle) }; - Self { - id: worker_id, - js_runtime, - name, - internal_handle, + ( + Self { + id: worker_id, + js_runtime, + name, + internal_handle, + use_deno_namespace: options.use_deno_namespace, + main_module, + }, external_handle, - use_deno_namespace: options.use_deno_namespace, - main_module, - } + ) } pub fn bootstrap(&mut self, options: &WebWorkerOptions) { @@ -419,11 +462,6 @@ impl WebWorker { } } - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.external_handle.clone() - } - pub fn poll_event_loop( &mut self, cx: &mut Context, @@ -446,7 +484,7 @@ impl WebWorker { print_worker_error(e.to_string(), &self.name); let handle = self.internal_handle.clone(); handle - .post_event(WorkerEvent::Error(e)) + .post_event(WorkerControlEvent::Error(e)) .expect("Failed to post message to host"); return Poll::Pending; @@ -513,7 +551,7 @@ pub fn run_web_worker( if let Err(e) = result { print_worker_error(e.to_string(), &name); internal_handle - .post_event(WorkerEvent::TerminalError(e)) + .post_event(WorkerControlEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -524,134 +562,3 @@ pub fn run_web_worker( debug!("Worker thread shuts down {}", &name); result } - -#[cfg(test)] -mod tests { - use super::*; - use crate::tokio_util; - - fn create_test_web_worker() -> WebWorker { - let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap(); - let module_loader = Rc::new(deno_core::NoopModuleLoader); - let create_web_worker_cb = Arc::new(|_| unreachable!()); - - let options = WebWorkerOptions { - args: vec![], - apply_source_maps: false, - debug_flag: false, - unstable: false, - ca_data: None, - user_agent: "x".to_string(), - seed: None, - module_loader, - create_web_worker_cb, - js_error_create_fn: None, - use_deno_namespace: false, - maybe_inspector_server: None, - runtime_version: "x".to_string(), - ts_version: "x".to_string(), - no_color: true, - get_error_class_fn: None, - blob_url_store: BlobUrlStore::default(), - broadcast_channel: InMemoryBroadcastChannel::default(), - }; - - let mut worker = WebWorker::from_options( - "TEST".to_string(), - Permissions::allow_all(), - main_module, - WorkerId(1), - &options, - ); - worker.bootstrap(&options); - worker - } - - #[tokio::test] - async fn test_worker_messages() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - return close(); - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute_script("a", source).unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop(false)); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded - let r = handle.post_message(msg.clone().into()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone().into()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - match maybe_msg { - Some(WorkerEvent::Message(buf)) => { - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]); - } - _ => unreachable!(), - } - - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded - let r = handle.post_message(msg.into()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - join_handle.join().expect("Failed to join worker thread"); - } - - #[tokio::test] - async fn removed_from_resource_table_on_close() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - worker - .execute_script("a", "onmessage = () => { close(); }") - .unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop(false)); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded - let r = handle.post_message(msg.clone().into()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - - join_handle.join().expect("Failed to join worker thread"); - } -} |