diff options
author | Tim Ramlot <42113979+inteon@users.noreply.github.com> | 2021-05-11 21:09:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-11 21:09:09 +0200 |
commit | 635253bd3a3895f49e6c9606beb852da22fee205 (patch) | |
tree | cec9d75354b4e985a376f888564ecb63c99f2643 /runtime/web_worker.rs | |
parent | 0d319161bc19a520df653bc0c8386f14a68efbdb (diff) |
feat(runtime/worker): Structured cloning worker message passing (#9323)
This commit upgrade "Worker.postMessage()" implementation to use
structured clone algorithm instead of non-spec compliant JSON serialization.
Diffstat (limited to 'runtime/web_worker.rs')
-rw-r--r-- | runtime/web_worker.rs | 309 |
1 files changed, 164 insertions, 145 deletions
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 690b6fb58..5b731a0f5 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -13,7 +13,8 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; -use deno_core::futures::task::AtomicWaker; +use deno_core::serde::Deserialize; +use deno_core::serde::Serialize; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -22,12 +23,16 @@ use deno_core::Extension; use deno_core::GetErrorClassFn; use deno_core::JsErrorCreateFn; use deno_core::JsRuntime; +use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; +use deno_core::ZeroCopyBuf; use deno_file::BlobUrlStore; use log::debug; +use std::cell::RefCell; use std::env; +use std::fmt; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -36,38 +41,98 @@ use std::task::Context; use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; +#[derive( + Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, +)] +pub struct WorkerId(u32); +impl fmt::Display for WorkerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "worker-{}", self.0) + } +} +impl WorkerId { + pub fn next(&self) -> Option<WorkerId> { + self.0.checked_add(1).map(WorkerId) + } +} + +type WorkerMessage = ZeroCopyBuf; + /// Events that are sent to host from child /// worker. pub enum WorkerEvent { - Message(Box<[u8]>), + Message(WorkerMessage), Error(AnyError), TerminalError(AnyError), + Close, } -pub struct WorkerChannelsInternal { - pub sender: mpsc::Sender<WorkerEvent>, - pub receiver: mpsc::Receiver<Box<[u8]>>, +// Channels used for communication with worker's parent +#[derive(Clone)] +pub struct WebWorkerInternalHandle { + sender: mpsc::Sender<WorkerEvent>, + receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>, + terminated: Arc<AtomicBool>, + isolate_handle: v8::IsolateHandle, +} + +impl WebWorkerInternalHandle { + /// Post WorkerEvent to parent as a worker + pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { + let mut sender = self.sender.clone(); + // If the channel is closed, + // the worker must have terminated but the termination message has not yet been received. + // + // Therefore just treat it as if the worker has terminated and return. + if sender.is_closed() { + self.terminated.store(true, Ordering::SeqCst); + return Ok(()); + } + sender.try_send(event)?; + Ok(()) + } + + /// Get the WorkerEvent with lock + /// Panic if more than one listener tries to get event + pub async fn get_message(&self) -> Option<WorkerMessage> { + let mut receiver = self.receiver.borrow_mut(); + receiver.next().await + } + + /// Check if this worker is terminated or being terminated + pub fn is_terminated(&self) -> bool { + self.terminated.load(Ordering::SeqCst) + } + + /// Terminate the worker + /// This function will set terminated to true, terminate the isolate and close the message channel + pub fn terminate(&mut self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::SeqCst); + + if !already_terminated { + // Stop javascript execution + self.isolate_handle.terminate_execution(); + } + + // Wake parent by closing the channel + self.sender.close_channel(); + } } -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. #[derive(Clone)] pub struct WebWorkerHandle { - pub sender: mpsc::Sender<Box<[u8]>>, - pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>, - terminate_tx: mpsc::Sender<()>, + sender: mpsc::Sender<WorkerMessage>, + receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>, terminated: Arc<AtomicBool>, isolate_handle: v8::IsolateHandle, } impl WebWorkerHandle { - /// Post message to worker as a host. - pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> { + /// Post WorkerMessage to worker as a host + pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> { let mut sender = self.sender.clone(); // If the channel is closed, // the worker must have terminated but the termination message has not yet been recieved. @@ -81,47 +146,50 @@ impl WebWorkerHandle { Ok(()) } - /// Get the event with lock. + /// Get the WorkerEvent with lock /// Return error if more than one listener tries to get event pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> { let mut receiver = self.receiver.try_lock()?; Ok(receiver.next().await) } - pub fn terminate(&self) { + /// Terminate the worker + /// This function will set terminated to true, terminate the isolate and close the message channel + pub fn terminate(&mut self) { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. let already_terminated = self.terminated.swap(true, Ordering::SeqCst); if !already_terminated { + // Stop javascript execution self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); } + + // Wake web worker by closing the channel + self.sender.close_channel(); } } -fn create_channels( +fn create_handles( isolate_handle: v8::IsolateHandle, - terminate_tx: mpsc::Sender<()>, -) -> (WorkerChannelsInternal, WebWorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::<Box<[u8]>>(1); +) -> (WebWorkerInternalHandle, WebWorkerHandle) { + let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1); let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1); - let internal_channels = WorkerChannelsInternal { + let terminated = Arc::new(AtomicBool::new(false)); + let internal_handle = WebWorkerInternalHandle { sender: out_tx, - receiver: in_rx, + receiver: Rc::new(RefCell::new(in_rx)), + terminated: terminated.clone(), + isolate_handle: isolate_handle.clone(), }; - let external_channels = WebWorkerHandle { + let external_handle = WebWorkerHandle { sender: in_tx, receiver: Arc::new(AsyncMutex::new(out_rx)), - terminated: Arc::new(AtomicBool::new(false)), - terminate_tx, + terminated, isolate_handle, }; - (internal_channels, external_channels) + (internal_handle, external_handle) } /// This struct is an implementation of `Worker` Web API @@ -129,17 +197,12 @@ fn create_channels( /// Each `WebWorker` is either a child of `MainWorker` or other /// `WebWorker`. pub struct WebWorker { - id: u32, + id: WorkerId, inspector: Option<Box<DenoInspector>>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. - pub(crate) internal_channels: WorkerChannelsInternal, pub js_runtime: JsRuntime, pub name: String, - waker: AtomicWaker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, + internal_handle: WebWorkerInternalHandle, + external_handle: WebWorkerHandle, pub use_deno_namespace: bool, pub main_module: ModuleSpecifier, } @@ -174,7 +237,7 @@ impl WebWorker { name: String, permissions: Permissions, main_module: ModuleSpecifier, - worker_id: u32, + worker_id: WorkerId, options: &WebWorkerOptions, ) -> Self { // Permissions: many ops depend on this @@ -218,7 +281,7 @@ impl WebWorker { let runtime_exts = vec![ ops::web_worker::init(), ops::runtime::init(main_module.clone()), - ops::worker_host::init(false, options.create_web_worker_cb.clone()), + ops::worker_host::init(options.create_web_worker_cb.clone()), ops::io::init(), ]; @@ -264,38 +327,24 @@ impl WebWorker { None }; - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - let isolate_handle = js_runtime.v8_isolate().thread_safe_handle(); - let (internal_channels, handle) = - create_channels(isolate_handle, terminate_tx); + let (internal_handle, external_handle) = { + let handle = js_runtime.v8_isolate().thread_safe_handle(); + let (internal_handle, external_handle) = create_handles(handle); + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put(internal_handle.clone()); + (internal_handle, external_handle) + }; - let mut worker = Self { + Self { id: worker_id, inspector, - internal_channels, js_runtime, name, - waker: AtomicWaker::new(), - event_loop_idle: false, - terminate_rx, - handle, + internal_handle, + external_handle, use_deno_namespace: options.use_deno_namespace, main_module, - }; - - // Setup worker-dependant OpState and return worker - { - let handle = worker.thread_safe_handle(); - let sender = worker.internal_channels.sender.clone(); - let js_runtime = &mut worker.js_runtime; - let op_state = js_runtime.op_state(); - let mut op_state = op_state.borrow_mut(); - - // Required by runtime::ops::worker_host/web_worker - op_state.put(handle); - op_state.put(sender); - - worker } } @@ -321,7 +370,7 @@ impl WebWorker { // Instead of using name for log we use `worker-${id}` because // WebWorkers can have empty string as name. let script = format!( - "bootstrap.workerRuntime({}, \"{}\", {}, \"worker-{}\")", + "bootstrap.workerRuntime({}, \"{}\", {}, \"{}\")", runtime_options_str, self.name, options.use_deno_namespace, self.id ); self @@ -338,12 +387,20 @@ impl WebWorker { self.js_runtime.execute(url.as_str(), js_source) } + /// Loads and instantiates specified JavaScript module. + pub async fn preload_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result<ModuleId, AnyError> { + self.js_runtime.load_module(module_specifier, None).await + } + /// Loads, instantiates and executes specified JavaScript module. pub async fn execute_module( &mut self, module_specifier: &ModuleSpecifier, ) -> Result<(), AnyError> { - let id = self.js_runtime.load_module(module_specifier, None).await?; + let id = self.preload_module(module_specifier).await?; let mut receiver = self.js_runtime.mod_evaluate(id); tokio::select! { @@ -357,8 +414,8 @@ impl WebWorker { } event_loop_result = self.run_event_loop() => { - if self.has_been_terminated() { - return Ok(()); + if self.internal_handle.is_terminated() { + return Ok(()); } event_loop_result?; let maybe_result = receiver.next().await; @@ -370,82 +427,44 @@ impl WebWorker { /// Returns a way to communicate with the Worker from other threads. pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } - - pub fn has_been_terminated(&self) -> bool { - self.handle.terminated.load(Ordering::SeqCst) + self.external_handle.clone() } pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll<Result<(), AnyError>> { - if self.has_been_terminated() { + // If awakened because we are terminating, just return Ok + if self.internal_handle.is_terminated() { return Poll::Ready(Ok(())); } - if !self.event_loop_idle { - let poll_result = { - // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) - }; - - if let Poll::Ready(r) = poll_result { - if self.has_been_terminated() { + // We always poll the inspector if it exists. + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + match self.js_runtime.poll_event_loop(cx) { + Poll::Ready(r) => { + // If js ended because we are terminating, just return Ok + if self.internal_handle.is_terminated() { return Poll::Ready(Ok(())); } + // In case of an error, pass to parent without terminating worker if let Err(e) = r { print_worker_error(e.to_string(), &self.name); - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) + let handle = self.internal_handle.clone(); + handle + .post_event(WorkerEvent::Error(e)) .expect("Failed to post message to host"); - } - self.event_loop_idle = true; - } - } - - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - let maybe_msg_poll_result = - self.internal_channels.receiver.poll_next_unpin(cx); - - if let Poll::Ready(maybe_msg) = maybe_msg_poll_result { - let msg = - maybe_msg.expect("Received `None` instead of message in worker"); - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js" - // so it's dimmed in stack trace instead of using "__anonymous__" - if let Err(e) = self.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.has_been_terminated() { - return Poll::Ready(Ok(())); + return Poll::Pending; } - // Otherwise forward error to host - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); + panic!( + "coding error: either js is polling or the worker is terminated" + ); } - - // Let event loop be polled again - self.event_loop_idle = false; - self.waker.wake(); + Poll::Pending => Poll::Pending, } - - Poll::Pending } pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { @@ -495,18 +514,18 @@ pub fn run_web_worker( rt.block_on(load_future) }; - let mut sender = worker.internal_channels.sender.clone(); + let internal_handle = worker.internal_handle.clone(); // If sender is closed it means that worker has already been closed from // within using "globalThis.close()" - if sender.is_closed() { + if internal_handle.is_terminated() { return Ok(()); } if let Err(e) = result { print_worker_error(e.to_string(), &name); - sender - .try_send(WorkerEvent::TerminalError(e)) + internal_handle + .post_event(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -522,7 +541,6 @@ pub fn run_web_worker( mod tests { use super::*; use crate::tokio_util; - use deno_core::serde_json::json; fn create_test_web_worker() -> WebWorker { let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap(); @@ -554,7 +572,7 @@ mod tests { "TEST".to_string(), Permissions::allow_all(), main_module, - 1, + WorkerId(1), &options, ); worker.bootstrap(&options); @@ -589,30 +607,30 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await.unwrap(); assert!(maybe_msg.is_some()); - let r = handle.post_message(msg.clone()); + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await.unwrap(); assert!(maybe_msg.is_some()); match maybe_msg { Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]); } _ => unreachable!(), } - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded + let r = handle.post_message(msg.into()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); assert!(event.is_none()); @@ -636,8 +654,9 @@ mod tests { let mut handle = handle_receiver.recv().unwrap(); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); + // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value + let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded + let r = handle.post_message(msg.clone().into()); assert!(r.is_ok()); let event = handle.get_event().await.unwrap(); assert!(event.is_none()); |