diff options
| author | Ryan Dahl <ry@tinyclouds.org> | 2020-02-03 18:08:44 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-02-03 18:08:44 -0500 |
| commit | 161cf7cdfd44ace8937fb7940727984990742d18 (patch) | |
| tree | 1ef88b3cd6427353366d930ea9be5ae494504255 /cli/web_worker.rs | |
| parent | 0471243334ac1aeb76dcaadbc3f0b8114d188fb8 (diff) | |
refactor: Use Tokio's single-threaded runtime (#3844)
This change simplifies how we execute V8. Previously V8 Isolates jumped
around threads every time they were woken up. This was overly complex and
potentially hurting performance in a myriad ways. Now isolates run on
their own dedicated thread and never move.
- blocking_json spawns a thread and does not use a thread pool
- op_host_poll_worker and op_host_resume_worker are non-operational
- removes Worker::get_message and Worker::post_message
- ThreadSafeState::workers table contains WorkerChannel entries instead
of actual Worker instances.
- MainWorker and CompilerWorker are no longer Futures.
- The multi-threaded version of deno_core_http_bench was removed.
- AyncOps no longer need to be Send + Sync
This PR is very large and several tests were disabled to speed
integration:
- installer_test_local_module_run
- installer_test_remote_module_run
- _015_duplicate_parallel_import
- _026_workers
Diffstat (limited to 'cli/web_worker.rs')
| -rw-r--r-- | cli/web_worker.rs | 87 |
1 files changed, 34 insertions, 53 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs index a3f7eb685..575910cfa 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -21,7 +21,6 @@ use std::task::Poll; /// /// Each `WebWorker` is either a child of `MainWorker` or other /// `WebWorker`. -#[derive(Clone)] pub struct WebWorker(Worker); impl WebWorker { @@ -32,15 +31,15 @@ impl WebWorker { external_channels: WorkerChannels, ) -> Self { let state_ = state.clone(); - let worker = Worker::new(name, startup_data, state_, external_channels); + let mut worker = Worker::new(name, startup_data, state_, external_channels); { - let mut isolate = worker.isolate.try_lock().unwrap(); - ops::runtime::init(&mut isolate, &state); - ops::web_worker::init(&mut isolate, &state); - ops::worker_host::init(&mut isolate, &state); - ops::errors::init(&mut isolate, &state); - ops::timers::init(&mut isolate, &state); - ops::fetch::init(&mut isolate, &state); + let isolate = &mut worker.isolate; + ops::runtime::init(isolate, &state); + ops::web_worker::init(isolate, &state); + ops::worker_host::init(isolate, &state); + ops::errors::init(isolate, &state); + ops::timers::init(isolate, &state); + ops::fetch::init(isolate, &state); } Self(worker) @@ -75,15 +74,6 @@ mod tests { use crate::startup_data; use crate::state::ThreadSafeState; use crate::tokio_util; - use futures::executor::block_on; - - pub fn run_in_task<F>(f: F) - where - F: FnOnce() + Send + 'static, - { - let fut = futures::future::lazy(move |_cx| f()); - tokio_util::run(fut) - } fn create_test_worker() -> WebWorker { let (int, ext) = ThreadSafeState::create_channels(); @@ -104,9 +94,8 @@ mod tests { #[test] fn test_worker_messages() { - run_in_task(|| { - let mut worker = create_test_worker(); - let source = r#" + let mut worker = create_test_worker(); + let source = r#" onmessage = function(e) { console.log("msg from main script", e.data); if (e.data == "exit") { @@ -119,60 +108,52 @@ mod tests { console.log("after postMessage"); } "#; - worker.execute(source).unwrap(); - - let worker_ = worker.clone(); - - let fut = async move { - let r = worker.await; - r.unwrap(); - }; + worker.execute(source).unwrap(); - tokio::spawn(fut); + let handle = worker.thread_safe_handle(); + let _ = tokio_util::spawn_thread(move || tokio_util::run_basic(worker)); + tokio_util::run_basic(async move { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()).await; + assert!(r.is_ok()); + + let maybe_msg = handle.get_message().await; + assert!(maybe_msg.is_some()); - let r = block_on(worker_.post_message(msg)); + let r = handle.post_message(msg.clone()).await; assert!(r.is_ok()); - let maybe_msg = block_on(worker_.get_message()); + let maybe_msg = handle.get_message().await; assert!(maybe_msg.is_some()); - // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); let msg = json!("exit") .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = block_on(worker_.post_message(msg)); + let r = handle.post_message(msg).await; assert!(r.is_ok()); - }) + }); } #[test] fn removed_from_resource_table_on_close() { - run_in_task(|| { - let mut worker = create_test_worker(); + let mut worker = create_test_worker(); + let handle = worker.thread_safe_handle(); + let worker_complete_fut = tokio_util::spawn_thread(move || { worker .execute("onmessage = () => { delete self.onmessage; }") .unwrap(); + tokio_util::run_basic(worker) + }); - let worker_ = worker.clone(); - let worker_future = async move { - let result = worker_.await; - println!("workers.rs after resource close"); - result.unwrap(); - } - .shared(); - - let worker_future_ = worker_future.clone(); - tokio::spawn(worker_future_); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = block_on(worker.post_message(msg)); + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + tokio_util::run_basic(async move { + let r = handle.post_message(msg).await; assert!(r.is_ok()); - - block_on(worker_future) - }) + let r = worker_complete_fut.await; + assert!(r.is_ok()); + }); } } |
