diff options
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r-- | cli/web_worker.rs | 193 |
1 files changed, 150 insertions, 43 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 683ba9ef4..795409175 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -3,17 +3,68 @@ use crate::ops; use crate::state::State; use crate::worker::Worker; use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; +use deno_core::v8; use deno_core::ErrBox; use deno_core::StartupData; +use futures::channel::mpsc; use futures::future::FutureExt; use futures::stream::StreamExt; -use futures::SinkExt; use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; use std::task::Context; use std::task::Poll; + +/// Wrapper for `WorkerHandle` that adds functionality +/// for terminating workers. +/// +/// This struct is used by host as well as worker itself. +/// +/// Host uses it to communicate with worker and terminate it, +/// while worker uses it only to finish execution on `self.close()`. +#[derive(Clone)] +pub struct WebWorkerHandle { + worker_handle: WorkerHandle, + terminate_tx: mpsc::Sender<()>, + terminated: Arc<AtomicBool>, + isolate_handle: v8::IsolateHandle, +} + +impl Deref for WebWorkerHandle { + type Target = WorkerHandle; + fn deref(&self) -> &Self::Target { + &self.worker_handle + } +} + +impl DerefMut for WebWorkerHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker_handle + } +} + +impl WebWorkerHandle { + pub fn terminate(&self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + + if !already_terminated { + self.isolate_handle.terminate_execution(); + let mut sender = self.terminate_tx.clone(); + // This call should be infallible hence the `expect`. + // This might change in the future. + sender.try_send(()).expect("Failed to terminate"); + } + } +} + /// This worker is implementation of `Worker` Web API /// /// At the moment this type of worker supports only @@ -23,17 +74,50 @@ use std::task::Poll; /// `WebWorker`. pub struct WebWorker { worker: Worker, - is_ready: bool, + event_loop_idle: bool, + terminate_rx: mpsc::Receiver<()>, + handle: WebWorkerHandle, } impl WebWorker { pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let state_ = state.clone(); let mut worker = Worker::new(name, startup_data, state_); + + let terminated = Arc::new(AtomicBool::new(false)); + let isolate_handle = worker + .isolate + .v8_isolate + .as_mut() + .unwrap() + .thread_safe_handle(); + let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); + + let handle = WebWorkerHandle { + worker_handle: worker.thread_safe_handle(), + terminated, + isolate_handle, + terminate_tx, + }; + + let mut web_worker = Self { + worker, + event_loop_idle: false, + terminate_rx, + handle, + }; + + let handle = web_worker.thread_safe_handle(); + { - let isolate = &mut worker.isolate; + let isolate = &mut web_worker.worker.isolate; ops::runtime::init(isolate, &state); - ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); + ops::web_worker::init( + isolate, + &state, + &web_worker.worker.internal_channels.sender, + handle, + ); ops::worker_host::init(isolate, &state); ops::io::init(isolate, &state); ops::resources::init(isolate, &state); @@ -42,10 +126,14 @@ impl WebWorker { ops::fetch::init(isolate, &state); } - Self { - worker, - is_ready: false, - } + web_worker + } +} + +impl WebWorker { + /// Returns a way to communicate with the Worker from other threads. + pub fn thread_safe_handle(&self) -> WebWorkerHandle { + self.handle.clone() } } @@ -69,47 +157,67 @@ impl Future for WebWorker { let inner = self.get_mut(); let worker = &mut inner.worker; - if !inner.is_ready { + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + + if terminated { + return Poll::Ready(Ok(())); + } + + if !inner.event_loop_idle { match worker.poll_unpin(cx) { Poll::Ready(r) => { + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + if terminated { + return Poll::Ready(Ok(())); + } + if let Err(e) = r { let mut sender = worker.internal_channels.sender.clone(); - futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + sender + .try_send(WorkerEvent::Error(e)) .expect("Failed to post message to host"); } - inner.is_ready = true; + inner.event_loop_idle = true; } Poll::Pending => {} } } - let maybe_msg = { - match worker.internal_channels.receiver.poll_next_unpin(cx) { - Poll::Ready(r) => match r { - Some(msg) => { - let msg_str = String::from_utf8(msg.to_vec()).unwrap(); - debug!("received message from host: {}", msg_str); - Some(msg_str) - } - None => { - debug!("channel closed by host, worker event loop shuts down"); - return Poll::Ready(Ok(())); + if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { + // terminate_rx should never be closed + assert!(r.is_some()); + return Poll::Ready(Ok(())); + } + + if let Poll::Ready(r) = + worker.internal_channels.receiver.poll_next_unpin(cx) + { + match r { + Some(msg) => { + let msg = String::from_utf8(msg.to_vec()).unwrap(); + debug!("received message from host: {}", msg); + let script = format!("workerMessageRecvCallback({})", msg); + + if let Err(e) = worker.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if inner.handle.terminated.load(Ordering::Relaxed) { + return Poll::Ready(Ok(())); + } + + // Otherwise forward error to host + let mut sender = worker.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); } - }, - Poll::Pending => None, - } - }; - if let Some(msg) = maybe_msg { - // TODO: just add second value and then bind using rusty_v8 - // to get structured clone/transfer working - let script = format!("workerMessageRecvCallback({})", msg); - worker - .execute(&script) - .expect("Failed to execute message cb"); - // Let worker be polled again - inner.is_ready = false; - worker.waker.wake(); + // Let event loop be polled again + inner.event_loop_idle = false; + worker.waker.wake(); + } + None => unreachable!(), + } } Poll::Pending @@ -123,7 +231,6 @@ mod tests { use crate::state::State; use crate::tokio_util; use crate::worker::WorkerEvent; - use crate::worker::WorkerHandle; fn create_test_worker() -> WebWorker { let state = State::mock("./hello.js"); @@ -138,7 +245,7 @@ mod tests { #[test] fn test_worker_messages() { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WorkerHandle>(1); + std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); let join_handle = std::thread::spawn(move || { let mut worker = create_test_worker(); @@ -165,13 +272,13 @@ mod tests { tokio_util::run_basic(async move { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; + let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await; assert!(maybe_msg.is_some()); - let r = handle.post_message(msg.clone()).await; + let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let maybe_msg = handle.get_event().await; @@ -187,7 +294,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = handle.post_message(msg).await; + let r = handle.post_message(msg); assert!(r.is_ok()); let event = handle.get_event().await; assert!(event.is_none()); @@ -199,7 +306,7 @@ mod tests { #[test] fn removed_from_resource_table_on_close() { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WorkerHandle>(1); + std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); let join_handle = std::thread::spawn(move || { let mut worker = create_test_worker(); @@ -214,7 +321,7 @@ mod tests { tokio_util::run_basic(async move { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; + let r = handle.post_message(msg.clone()); assert!(r.is_ok()); let event = handle.get_event().await; assert!(event.is_none()); |