diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-02-18 14:47:11 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-18 14:47:11 -0500 |
commit | 3d5bed35e032ee20e4fe34cad925202c6f0c0d3e (patch) | |
tree | 2f9ad905c9e55bd80832055ac7ef41c94bf419bc | |
parent | 08dcf6bff73bbe579769ccd0f135ed4af919ea48 (diff) |
refactor: remove run_worker_loop (#4028)
* remove run_worker_loop, impl poll for WebWorker
* store JoinHandle to worker thread
-rw-r--r-- | cli/compilers/compiler_worker.rs | 30 | ||||
-rw-r--r-- | cli/compilers/ts.rs | 8 | ||||
-rw-r--r-- | cli/compilers/wasm.rs | 8 | ||||
-rw-r--r-- | cli/ops/worker_host.rs | 106 | ||||
-rw-r--r-- | cli/state.rs | 19 | ||||
-rw-r--r-- | cli/web_worker.rs | 72 |
6 files changed, 117 insertions, 126 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs index 123c29abb..f0489e641 100644 --- a/cli/compilers/compiler_worker.rs +++ b/cli/compilers/compiler_worker.rs @@ -1,11 +1,17 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::ops; use crate::state::State; -use crate::worker::Worker; +use crate::web_worker::WebWorker; +use core::task::Context; use deno_core; +use deno_core::ErrBox; use deno_core::StartupData; +use futures::future::Future; +use futures::future::FutureExt; use std::ops::Deref; use std::ops::DerefMut; +use std::pin::Pin; +use std::task::Poll; /// This worker is used to host TypeScript and WASM compilers. /// @@ -20,22 +26,15 @@ use std::ops::DerefMut; /// /// TODO(bartlomieju): add support to reuse the worker - or in other /// words support stateful TS compiler -pub struct CompilerWorker(Worker); +pub struct CompilerWorker(WebWorker); impl CompilerWorker { pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let state_ = state.clone(); - let mut worker = Worker::new(name, startup_data, state_); + let mut worker = WebWorker::new(name, startup_data, state_); { let isolate = &mut worker.isolate; - ops::runtime::init(isolate, &state); ops::compiler::init(isolate, &state); - ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); - ops::errors::init(isolate, &state); - // for compatibility with Worker scope, though unused at - // the moment - ops::timers::init(isolate, &state); - ops::fetch::init(isolate, &state); // TODO(bartlomieju): CompilerWorker should not // depend on those ops ops::os::init(isolate, &state); @@ -48,7 +47,7 @@ impl CompilerWorker { } impl Deref for CompilerWorker { - type Target = Worker; + type Target = WebWorker; fn deref(&self) -> &Self::Target { &self.0 } @@ -59,3 +58,12 @@ impl DerefMut for CompilerWorker { &mut self.0 } } + +impl Future for CompilerWorker { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + inner.0.poll_unpin(cx) + } +} diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 9bfd93eeb..dec74c256 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -9,12 +9,11 @@ use crate::file_fetcher::SourceFile; use crate::file_fetcher::SourceFileFetcher; use crate::global_state::GlobalState; use crate::msg; -use crate::ops::worker_host::run_worker_loop; use crate::ops::JsonResult; use crate::source_maps::SourceMapGetter; use crate::startup_data; use crate::state::*; -use crate::tokio_util::create_basic_runtime; +use crate::tokio_util; use crate::version; use crate::worker::WorkerEvent; use crate::worker::WorkerHandle; @@ -611,11 +610,10 @@ async fn execute_in_thread( let builder = std::thread::Builder::new().name("deno-ts-compiler".to_string()); let join_handle = builder.spawn(move || { - let mut worker = TsCompiler::setup_worker(global_state.clone()); + let worker = TsCompiler::setup_worker(global_state.clone()); handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); - let mut rt = create_basic_runtime(); - run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + tokio_util::run_basic(worker).expect("Panic in event loop"); })?; let mut handle = handle_receiver.recv().unwrap()?; handle.post_message(req).await?; diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index d6f0d2f4a..bc056d4f9 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -3,10 +3,9 @@ use super::compiler_worker::CompilerWorker; use crate::compilers::CompiledModule; use crate::file_fetcher::SourceFile; use crate::global_state::GlobalState; -use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::*; -use crate::tokio_util::create_basic_runtime; +use crate::tokio_util; use crate::worker::WorkerEvent; use crate::worker::WorkerHandle; use deno_core::Buf; @@ -123,11 +122,10 @@ async fn execute_in_thread( let builder = std::thread::Builder::new().name("deno-wasm-compiler".to_string()); let join_handle = builder.spawn(move || { - let mut worker = WasmCompiler::setup_worker(global_state); + let worker = WasmCompiler::setup_worker(global_state); handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); - let mut rt = create_basic_runtime(); - run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + tokio_util::run_basic(worker).expect("Panic in event loop"); })?; let mut handle = handle_receiver.recv().unwrap()?; handle.post_message(req).await?; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 81e8f76da..13d4fffff 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -12,18 +12,15 @@ use crate::startup_data; use crate::state::State; use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; -use crate::worker::Worker; use crate::worker::WorkerEvent; use crate::worker::WorkerHandle; use deno_core::*; use futures; -use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::stream::StreamExt; use std; use std::convert::From; -use std::task::Poll; +use std::thread::JoinHandle; pub fn init(i: &mut Isolate, s: &State) { i.register_op( @@ -61,64 +58,6 @@ fn create_web_worker( Ok(worker) } -// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs` -pub fn run_worker_loop( - rt: &mut tokio::runtime::Runtime, - worker: &mut Worker, -) -> Result<(), ErrBox> { - let mut worker_is_ready = false; - - let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> { - if !worker_is_ready { - match worker.poll_unpin(cx) { - Poll::Ready(r) => { - if let Err(e) = r { - let mut sender = worker.internal_channels.sender.clone(); - futures::executor::block_on(sender.send(WorkerEvent::Error(e))) - .expect("Failed to post message to host"); - } - worker_is_ready = true; - } - Poll::Pending => {} - } - } - - let maybe_msg = { - match worker.internal_channels.receiver.poll_next_unpin(cx) { - Poll::Ready(r) => match r { - Some(msg) => { - let msg_str = String::from_utf8(msg.to_vec()).unwrap(); - debug!("received message from host: {}", msg_str); - Some(msg_str) - } - None => { - debug!("channel closed by host, worker event loop shuts down"); - return Poll::Ready(Ok(())); - } - }, - Poll::Pending => None, - } - }; - - if let Some(msg) = maybe_msg { - // TODO: just add second value and then bind using rusty_v8 - // to get structured clone/transfer working - let script = format!("workerMessageRecvCallback({})", msg); - worker - .execute(&script) - .expect("Failed to execute message cb"); - // Let worker be polled again - worker_is_ready = false; - worker.waker.wake(); - } - - Poll::Pending - }); - - rt.block_on(fut) -} - -// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs` // TODO(bartlomieju): check if order of actions is aligned to Worker spec fn run_worker_thread( name: String, @@ -127,14 +66,13 @@ fn run_worker_thread( specifier: ModuleSpecifier, has_source_code: bool, source_code: String, -) -> Result<WorkerHandle, ErrBox> { +) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> { let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); let builder = std::thread::Builder::new().name(format!("deno-worker-{}", name)); - // TODO(bartlomieju): store JoinHandle as well - builder.spawn(move || { + let join_handle = builder.spawn(move || { // Any error inside this block is terminal: // - JS worker is useless - meaning it throws an exception and can't do anything else, // all action done upon it should be noops @@ -189,10 +127,11 @@ fn run_worker_thread( // TODO(bartlomieju): this thread should return result of event loop // that means that we should store JoinHandle to thread to ensure // that it actually terminates. - run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + rt.block_on(worker).expect("Panic in event loop"); })?; - handle_receiver.recv().unwrap() + let worker_handle = handle_receiver.recv().unwrap()?; + Ok((join_handle, worker_handle)) } #[derive(Deserialize)] @@ -230,7 +169,7 @@ fn op_create_worker( format!("USER-WORKER-{}", specifier) }); - let worker_handle = run_worker_thread( + let (join_handle, worker_handle) = run_worker_thread( worker_name, global_state, permissions, @@ -240,7 +179,12 @@ fn op_create_worker( )?; // At this point all interactions with worker happen using thread // safe handler returned from previous function call - let worker_id = parent_state.add_child_worker(worker_handle); + let mut parent_state = parent_state.borrow_mut(); + let worker_id = parent_state.next_worker_id; + parent_state.next_worker_id += 1; + parent_state + .workers + .insert(worker_id, (join_handle, worker_handle)); Ok(JsonOp::Sync(json!({ "id": worker_id }))) } @@ -258,9 +202,10 @@ fn op_host_terminate_worker( let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let mut state = state.borrow_mut(); - let worker_handle = + let (join_handle, worker_handle) = state.workers.remove(&id).expect("No worker handle found"); worker_handle.terminate(); + join_handle.join().expect("Panic in worker thread"); Ok(JsonOp::Sync(json!({}))) } @@ -299,22 +244,22 @@ fn op_host_get_message( ) -> Result<JsonOp, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state_ = state.borrow(); - let worker_handle = state_ - .workers - .get(&id) - .expect("No worker handle found") - .clone(); + let worker_handle = { + let state_ = state.borrow(); + let (_join_handle, worker_handle) = + state_.workers.get(&id).expect("No worker handle found"); + worker_handle.clone() + }; let state_ = state.clone(); let op = async move { let response = match worker_handle.get_event().await { Some(event) => serialize_worker_event(event), None => { let mut state_ = state_.borrow_mut(); - let mut handle = + let (join_handle, mut worker_handle) = state_.workers.remove(&id).expect("No worker handle found"); - handle.sender.close_channel(); - // TODO(bartlomieju): join thread handle here + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); json!({ "type": "close" }) } }; @@ -335,7 +280,8 @@ fn op_host_post_message( debug!("post message to worker {}", id); let state = state.borrow(); - let worker_handle = state.workers.get(&id).expect("No worker handle found"); + let (_, worker_handle) = + state.workers.get(&id).expect("No worker handle found"); let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); diff --git a/cli/state.rs b/cli/state.rs index 93eaaab43..7f342a9b1 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -30,8 +30,7 @@ use std::path::Path; use std::pin::Pin; use std::rc::Rc; use std::str; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; +use std::thread::JoinHandle; use std::time::Instant; #[derive(Clone)] @@ -54,8 +53,8 @@ pub struct StateInner { pub import_map: Option<ImportMap>, pub metrics: Metrics, pub global_timer: GlobalTimer, - pub workers: HashMap<u32, WorkerHandle>, - pub next_worker_id: AtomicUsize, + pub workers: HashMap<u32, (JoinHandle<()>, WorkerHandle)>, + pub next_worker_id: u32, pub start_time: Instant, pub seeded_rng: Option<StdRng>, pub resource_table: ResourceTable, @@ -231,7 +230,7 @@ impl State { metrics: Metrics::default(), global_timer: GlobalTimer::new(), workers: HashMap::new(), - next_worker_id: AtomicUsize::new(0), + next_worker_id: 0, start_time: Instant::now(), seeded_rng, @@ -267,7 +266,7 @@ impl State { metrics: Metrics::default(), global_timer: GlobalTimer::new(), workers: HashMap::new(), - next_worker_id: AtomicUsize::new(0), + next_worker_id: 0, start_time: Instant::now(), seeded_rng, @@ -278,14 +277,6 @@ impl State { Ok(Self(state)) } - pub fn add_child_worker(&self, handle: WorkerHandle) -> u32 { - let mut inner_state = self.borrow_mut(); - let worker_id = - inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; - inner_state.workers.insert(worker_id, handle); - worker_id - } - #[inline] pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> { self.borrow().permissions.check_read(path) diff --git a/cli/web_worker.rs b/cli/web_worker.rs index c0a712aed..7efec476c 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -2,17 +2,19 @@ use crate::ops; use crate::state::State; use crate::worker::Worker; +use crate::worker::WorkerEvent; use deno_core; use deno_core::ErrBox; use deno_core::StartupData; use futures::future::FutureExt; +use futures::stream::StreamExt; +use futures::SinkExt; use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; use std::task::Context; use std::task::Poll; - /// This worker is implementation of `Worker` Web API /// /// At the moment this type of worker supports only @@ -20,7 +22,10 @@ use std::task::Poll; /// /// Each `WebWorker` is either a child of `MainWorker` or other /// `WebWorker`. -pub struct WebWorker(Worker); +pub struct WebWorker { + worker: Worker, + is_ready: bool, +} impl WebWorker { pub fn new(name: String, startup_data: StartupData, state: State) -> Self { @@ -36,20 +41,23 @@ impl WebWorker { ops::fetch::init(isolate, &state); } - Self(worker) + Self { + worker, + is_ready: false, + } } } impl Deref for WebWorker { type Target = Worker; fn deref(&self) -> &Self::Target { - &self.0 + &self.worker } } impl DerefMut for WebWorker { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.worker } } @@ -58,14 +66,58 @@ impl Future for WebWorker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let inner = self.get_mut(); - inner.0.poll_unpin(cx) + let worker = &mut inner.worker; + + if !inner.is_ready { + match worker.poll_unpin(cx) { + Poll::Ready(r) => { + if let Err(e) = r { + let mut sender = worker.internal_channels.sender.clone(); + futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + .expect("Failed to post message to host"); + } + inner.is_ready = true; + } + Poll::Pending => {} + } + } + + let maybe_msg = { + match worker.internal_channels.receiver.poll_next_unpin(cx) { + Poll::Ready(r) => match r { + Some(msg) => { + let msg_str = String::from_utf8(msg.to_vec()).unwrap(); + debug!("received message from host: {}", msg_str); + Some(msg_str) + } + None => { + debug!("channel closed by host, worker event loop shuts down"); + return Poll::Ready(Ok(())); + } + }, + Poll::Pending => None, + } + }; + + if let Some(msg) = maybe_msg { + // TODO: just add second value and then bind using rusty_v8 + // to get structured clone/transfer working + let script = format!("workerMessageRecvCallback({})", msg); + worker + .execute(&script) + .expect("Failed to execute message cb"); + // Let worker be polled again + inner.is_ready = false; + worker.waker.wake(); + } + + Poll::Pending } } #[cfg(test)] mod tests { use super::*; - use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::State; use crate::tokio_util; @@ -104,8 +156,7 @@ mod tests { worker.execute(source).unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let mut rt = tokio_util::create_basic_runtime(); - let r = run_worker_loop(&mut rt, &mut worker); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); @@ -154,8 +205,7 @@ mod tests { worker.execute("onmessage = () => { close(); }").unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let mut rt = tokio_util::create_basic_runtime(); - let r = run_worker_loop(&mut rt, &mut worker); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); |