summaryrefslogtreecommitdiff
path: root/cli/web_worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r--cli/web_worker.rs135
1 files changed, 78 insertions, 57 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index 05e3184d9..c0a712aed 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -29,7 +29,7 @@ impl WebWorker {
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
- ops::web_worker::init(isolate, &state);
+ ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
ops::worker_host::init(isolate, &state);
ops::errors::init(isolate, &state);
ops::timers::init(isolate, &state);
@@ -65,9 +65,12 @@ impl Future for WebWorker {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util;
+ use crate::worker::WorkerEvent;
+ use crate::worker::WorkerHandle;
fn create_test_worker() -> WebWorker {
let state = State::mock("./hello.js");
@@ -77,77 +80,95 @@ mod tests {
state,
);
worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap();
- worker.execute("runWorkerMessageLoop()").unwrap();
worker
}
-
#[test]
fn test_worker_messages() {
- let mut worker = create_test_worker();
- let source = r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- delete self.onmessage;
- return;
- } else {
- console.assert(e.data === "hi");
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<WorkerHandle>(1);
+
+ let join_handle = std::thread::spawn(move || {
+ let mut worker = create_test_worker();
+ let source = r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ return close();
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
}
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#;
- worker.execute(source).unwrap();
-
- let handle = worker.thread_safe_handle();
- let _ = tokio_util::spawn_thread(move || {
- tokio_util::run_basic(async move {
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone()).await;
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_message().await;
- assert!(maybe_msg.is_some());
-
- let r = handle.post_message(msg.clone()).await;
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_message().await;
- assert!(maybe_msg.is_some());
- assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
-
- let msg = json!("exit")
- .to_string()
- .into_boxed_str()
- .into_boxed_bytes();
- let r = handle.post_message(msg).await;
- assert!(r.is_ok());
- })
+ "#;
+ worker.execute(source).unwrap();
+ let handle = worker.thread_safe_handle();
+ handle_sender.send(handle).unwrap();
+ let mut rt = tokio_util::create_basic_runtime();
+ let r = run_worker_loop(&mut rt, &mut worker);
+ assert!(r.is_ok())
});
- let r = tokio_util::run_basic(worker);
- assert!(r.is_ok())
- }
+ let mut handle = handle_receiver.recv().unwrap();
- #[test]
- fn removed_from_resource_table_on_close() {
- let mut worker = create_test_worker();
- let handle = worker.thread_safe_handle();
+ tokio_util::run_basic(async move {
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = handle.post_message(msg.clone()).await;
+ assert!(r.is_ok());
- worker
- .execute("onmessage = () => { delete self.onmessage; }")
- .unwrap();
+ let maybe_msg = handle.get_event().await;
+ assert!(maybe_msg.is_some());
- let worker_post_message_fut = tokio_util::spawn_thread(move || {
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = futures::executor::block_on(handle.post_message(msg));
+ let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
+
+ let maybe_msg = handle.get_event().await;
+ assert!(maybe_msg.is_some());
+ match maybe_msg {
+ Some(WorkerEvent::Message(buf)) => {
+ assert_eq!(*buf, *b"[1,2,3]");
+ }
+ _ => unreachable!(),
+ }
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = handle.post_message(msg).await;
+ assert!(r.is_ok());
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
+ });
+ join_handle.join().expect("Failed to join worker thread");
+ }
+
+ #[test]
+ fn removed_from_resource_table_on_close() {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<WorkerHandle>(1);
+
+ let join_handle = std::thread::spawn(move || {
+ let mut worker = create_test_worker();
+ worker.execute("onmessage = () => { close(); }").unwrap();
+ let handle = worker.thread_safe_handle();
+ handle_sender.send(handle).unwrap();
+ let mut rt = tokio_util::create_basic_runtime();
+ let r = run_worker_loop(&mut rt, &mut worker);
+ assert!(r.is_ok())
});
+ let mut handle = handle_receiver.recv().unwrap();
+
tokio_util::run_basic(async move {
- worker_post_message_fut.await;
- let r = worker.await;
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
});
+ join_handle.join().expect("Failed to join worker thread");
}
}