diff options
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r-- | cli/web_worker.rs | 135 |
1 files changed, 78 insertions, 57 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 05e3184d9..c0a712aed 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -29,7 +29,7 @@ impl WebWorker { { let isolate = &mut worker.isolate; ops::runtime::init(isolate, &state); - ops::web_worker::init(isolate, &state); + ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); ops::worker_host::init(isolate, &state); ops::errors::init(isolate, &state); ops::timers::init(isolate, &state); @@ -65,9 +65,12 @@ impl Future for WebWorker { #[cfg(test)] mod tests { use super::*; + use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::State; use crate::tokio_util; + use crate::worker::WorkerEvent; + use crate::worker::WorkerHandle; fn create_test_worker() -> WebWorker { let state = State::mock("./hello.js"); @@ -77,77 +80,95 @@ mod tests { state, ); worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap(); - worker.execute("runWorkerMessageLoop()").unwrap(); worker } - #[test] fn test_worker_messages() { - let mut worker = create_test_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - delete self.onmessage; - return; - } else { - console.assert(e.data === "hi"); + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<WorkerHandle>(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_worker(); + let source = r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + return close(); + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute(source).unwrap(); - - let handle = worker.thread_safe_handle(); - let _ = tokio_util::spawn_thread(move || { - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; - assert!(r.is_ok()); - - let maybe_msg = handle.get_message().await; - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()).await; - assert!(r.is_ok()); - - let maybe_msg = handle.get_message().await; - assert!(maybe_msg.is_some()); - assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg).await; - assert!(r.is_ok()); - }) + "#; + worker.execute(source).unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let mut rt = tokio_util::create_basic_runtime(); + let r = run_worker_loop(&mut rt, &mut worker); + assert!(r.is_ok()) }); - let r = tokio_util::run_basic(worker); - assert!(r.is_ok()) - } + let mut handle = handle_receiver.recv().unwrap(); - #[test] - fn removed_from_resource_table_on_close() { - let mut worker = create_test_worker(); - let handle = worker.thread_safe_handle(); + tokio_util::run_basic(async move { + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()).await; + assert!(r.is_ok()); - worker - .execute("onmessage = () => { delete self.onmessage; }") - .unwrap(); + let maybe_msg = handle.get_event().await; + assert!(maybe_msg.is_some()); - let worker_post_message_fut = tokio_util::spawn_thread(move || { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = futures::executor::block_on(handle.post_message(msg)); + let r = handle.post_message(msg.clone()).await; assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await; + assert!(maybe_msg.is_some()); + match maybe_msg { + Some(WorkerEvent::Message(buf)) => { + assert_eq!(*buf, *b"[1,2,3]"); + } + _ => unreachable!(), + } + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = handle.post_message(msg).await; + assert!(r.is_ok()); + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + }); + join_handle.join().expect("Failed to join worker thread"); + } + + #[test] + fn removed_from_resource_table_on_close() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<WorkerHandle>(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_worker(); + worker.execute("onmessage = () => { close(); }").unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let mut rt = tokio_util::create_basic_runtime(); + let r = run_worker_loop(&mut rt, &mut worker); + assert!(r.is_ok()) }); + let mut handle = handle_receiver.recv().unwrap(); + tokio_util::run_basic(async move { - worker_post_message_fut.await; - let r = worker.await; + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()).await; assert!(r.is_ok()); + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); }); + join_handle.join().expect("Failed to join worker thread"); } } |