summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-02-18 14:47:11 -0500
committerGitHub <noreply@github.com>2020-02-18 14:47:11 -0500
commit3d5bed35e032ee20e4fe34cad925202c6f0c0d3e (patch)
tree2f9ad905c9e55bd80832055ac7ef41c94bf419bc /cli
parent08dcf6bff73bbe579769ccd0f135ed4af919ea48 (diff)
refactor: remove run_worker_loop (#4028)
* remove run_worker_loop, impl poll for WebWorker * store JoinHandle to worker thread
Diffstat (limited to 'cli')
-rw-r--r--cli/compilers/compiler_worker.rs30
-rw-r--r--cli/compilers/ts.rs8
-rw-r--r--cli/compilers/wasm.rs8
-rw-r--r--cli/ops/worker_host.rs106
-rw-r--r--cli/state.rs19
-rw-r--r--cli/web_worker.rs72
6 files changed, 117 insertions, 126 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs
index 123c29abb..f0489e641 100644
--- a/cli/compilers/compiler_worker.rs
+++ b/cli/compilers/compiler_worker.rs
@@ -1,11 +1,17 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::ops;
use crate::state::State;
-use crate::worker::Worker;
+use crate::web_worker::WebWorker;
+use core::task::Context;
use deno_core;
+use deno_core::ErrBox;
use deno_core::StartupData;
+use futures::future::Future;
+use futures::future::FutureExt;
use std::ops::Deref;
use std::ops::DerefMut;
+use std::pin::Pin;
+use std::task::Poll;
/// This worker is used to host TypeScript and WASM compilers.
///
@@ -20,22 +26,15 @@ use std::ops::DerefMut;
///
/// TODO(bartlomieju): add support to reuse the worker - or in other
/// words support stateful TS compiler
-pub struct CompilerWorker(Worker);
+pub struct CompilerWorker(WebWorker);
impl CompilerWorker {
pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
let state_ = state.clone();
- let mut worker = Worker::new(name, startup_data, state_);
+ let mut worker = WebWorker::new(name, startup_data, state_);
{
let isolate = &mut worker.isolate;
- ops::runtime::init(isolate, &state);
ops::compiler::init(isolate, &state);
- ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
- ops::errors::init(isolate, &state);
- // for compatibility with Worker scope, though unused at
- // the moment
- ops::timers::init(isolate, &state);
- ops::fetch::init(isolate, &state);
// TODO(bartlomieju): CompilerWorker should not
// depend on those ops
ops::os::init(isolate, &state);
@@ -48,7 +47,7 @@ impl CompilerWorker {
}
impl Deref for CompilerWorker {
- type Target = Worker;
+ type Target = WebWorker;
fn deref(&self) -> &Self::Target {
&self.0
}
@@ -59,3 +58,12 @@ impl DerefMut for CompilerWorker {
&mut self.0
}
}
+
+impl Future for CompilerWorker {
+ type Output = Result<(), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ inner.0.poll_unpin(cx)
+ }
+}
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index 9bfd93eeb..dec74c256 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -9,12 +9,11 @@ use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::GlobalState;
use crate::msg;
-use crate::ops::worker_host::run_worker_loop;
use crate::ops::JsonResult;
use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
-use crate::tokio_util::create_basic_runtime;
+use crate::tokio_util;
use crate::version;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
@@ -611,11 +610,10 @@ async fn execute_in_thread(
let builder =
std::thread::Builder::new().name("deno-ts-compiler".to_string());
let join_handle = builder.spawn(move || {
- let mut worker = TsCompiler::setup_worker(global_state.clone());
+ let worker = TsCompiler::setup_worker(global_state.clone());
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
- let mut rt = create_basic_runtime();
- run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ tokio_util::run_basic(worker).expect("Panic in event loop");
})?;
let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?;
diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs
index d6f0d2f4a..bc056d4f9 100644
--- a/cli/compilers/wasm.rs
+++ b/cli/compilers/wasm.rs
@@ -3,10 +3,9 @@ use super::compiler_worker::CompilerWorker;
use crate::compilers::CompiledModule;
use crate::file_fetcher::SourceFile;
use crate::global_state::GlobalState;
-use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::*;
-use crate::tokio_util::create_basic_runtime;
+use crate::tokio_util;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::Buf;
@@ -123,11 +122,10 @@ async fn execute_in_thread(
let builder =
std::thread::Builder::new().name("deno-wasm-compiler".to_string());
let join_handle = builder.spawn(move || {
- let mut worker = WasmCompiler::setup_worker(global_state);
+ let worker = WasmCompiler::setup_worker(global_state);
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
- let mut rt = create_basic_runtime();
- run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ tokio_util::run_basic(worker).expect("Panic in event loop");
})?;
let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?;
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index 81e8f76da..13d4fffff 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -12,18 +12,15 @@ use crate::startup_data;
use crate::state::State;
use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
-use crate::worker::Worker;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::*;
use futures;
-use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
use std;
use std::convert::From;
-use std::task::Poll;
+use std::thread::JoinHandle;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
@@ -61,64 +58,6 @@ fn create_web_worker(
Ok(worker)
}
-// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
-pub fn run_worker_loop(
- rt: &mut tokio::runtime::Runtime,
- worker: &mut Worker,
-) -> Result<(), ErrBox> {
- let mut worker_is_ready = false;
-
- let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> {
- if !worker_is_ready {
- match worker.poll_unpin(cx) {
- Poll::Ready(r) => {
- if let Err(e) = r {
- let mut sender = worker.internal_channels.sender.clone();
- futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
- .expect("Failed to post message to host");
- }
- worker_is_ready = true;
- }
- Poll::Pending => {}
- }
- }
-
- let maybe_msg = {
- match worker.internal_channels.receiver.poll_next_unpin(cx) {
- Poll::Ready(r) => match r {
- Some(msg) => {
- let msg_str = String::from_utf8(msg.to_vec()).unwrap();
- debug!("received message from host: {}", msg_str);
- Some(msg_str)
- }
- None => {
- debug!("channel closed by host, worker event loop shuts down");
- return Poll::Ready(Ok(()));
- }
- },
- Poll::Pending => None,
- }
- };
-
- if let Some(msg) = maybe_msg {
- // TODO: just add second value and then bind using rusty_v8
- // to get structured clone/transfer working
- let script = format!("workerMessageRecvCallback({})", msg);
- worker
- .execute(&script)
- .expect("Failed to execute message cb");
- // Let worker be polled again
- worker_is_ready = false;
- worker.waker.wake();
- }
-
- Poll::Pending
- });
-
- rt.block_on(fut)
-}
-
-// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
// TODO(bartlomieju): check if order of actions is aligned to Worker spec
fn run_worker_thread(
name: String,
@@ -127,14 +66,13 @@ fn run_worker_thread(
specifier: ModuleSpecifier,
has_source_code: bool,
source_code: String,
-) -> Result<WorkerHandle, ErrBox> {
+) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name(format!("deno-worker-{}", name));
- // TODO(bartlomieju): store JoinHandle as well
- builder.spawn(move || {
+ let join_handle = builder.spawn(move || {
// Any error inside this block is terminal:
// - JS worker is useless - meaning it throws an exception and can't do anything else,
// all action done upon it should be noops
@@ -189,10 +127,11 @@ fn run_worker_thread(
// TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure
// that it actually terminates.
- run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ rt.block_on(worker).expect("Panic in event loop");
})?;
- handle_receiver.recv().unwrap()
+ let worker_handle = handle_receiver.recv().unwrap()?;
+ Ok((join_handle, worker_handle))
}
#[derive(Deserialize)]
@@ -230,7 +169,7 @@ fn op_create_worker(
format!("USER-WORKER-{}", specifier)
});
- let worker_handle = run_worker_thread(
+ let (join_handle, worker_handle) = run_worker_thread(
worker_name,
global_state,
permissions,
@@ -240,7 +179,12 @@ fn op_create_worker(
)?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
- let worker_id = parent_state.add_child_worker(worker_handle);
+ let mut parent_state = parent_state.borrow_mut();
+ let worker_id = parent_state.next_worker_id;
+ parent_state.next_worker_id += 1;
+ parent_state
+ .workers
+ .insert(worker_id, (join_handle, worker_handle));
Ok(JsonOp::Sync(json!({ "id": worker_id })))
}
@@ -258,9 +202,10 @@ fn op_host_terminate_worker(
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let mut state = state.borrow_mut();
- let worker_handle =
+ let (join_handle, worker_handle) =
state.workers.remove(&id).expect("No worker handle found");
worker_handle.terminate();
+ join_handle.join().expect("Panic in worker thread");
Ok(JsonOp::Sync(json!({})))
}
@@ -299,22 +244,22 @@ fn op_host_get_message(
) -> Result<JsonOp, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let state_ = state.borrow();
- let worker_handle = state_
- .workers
- .get(&id)
- .expect("No worker handle found")
- .clone();
+ let worker_handle = {
+ let state_ = state.borrow();
+ let (_join_handle, worker_handle) =
+ state_.workers.get(&id).expect("No worker handle found");
+ worker_handle.clone()
+ };
let state_ = state.clone();
let op = async move {
let response = match worker_handle.get_event().await {
Some(event) => serialize_worker_event(event),
None => {
let mut state_ = state_.borrow_mut();
- let mut handle =
+ let (join_handle, mut worker_handle) =
state_.workers.remove(&id).expect("No worker handle found");
- handle.sender.close_channel();
- // TODO(bartlomieju): join thread handle here
+ worker_handle.sender.close_channel();
+ join_handle.join().expect("Worker thread panicked");
json!({ "type": "close" })
}
};
@@ -335,7 +280,8 @@ fn op_host_post_message(
debug!("post message to worker {}", id);
let state = state.borrow();
- let worker_handle = state.workers.get(&id).expect("No worker handle found");
+ let (_, worker_handle) =
+ state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
diff --git a/cli/state.rs b/cli/state.rs
index 93eaaab43..7f342a9b1 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -30,8 +30,7 @@ use std::path::Path;
use std::pin::Pin;
use std::rc::Rc;
use std::str;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering;
+use std::thread::JoinHandle;
use std::time::Instant;
#[derive(Clone)]
@@ -54,8 +53,8 @@ pub struct StateInner {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: GlobalTimer,
- pub workers: HashMap<u32, WorkerHandle>,
- pub next_worker_id: AtomicUsize,
+ pub workers: HashMap<u32, (JoinHandle<()>, WorkerHandle)>,
+ pub next_worker_id: u32,
pub start_time: Instant,
pub seeded_rng: Option<StdRng>,
pub resource_table: ResourceTable,
@@ -231,7 +230,7 @@ impl State {
metrics: Metrics::default(),
global_timer: GlobalTimer::new(),
workers: HashMap::new(),
- next_worker_id: AtomicUsize::new(0),
+ next_worker_id: 0,
start_time: Instant::now(),
seeded_rng,
@@ -267,7 +266,7 @@ impl State {
metrics: Metrics::default(),
global_timer: GlobalTimer::new(),
workers: HashMap::new(),
- next_worker_id: AtomicUsize::new(0),
+ next_worker_id: 0,
start_time: Instant::now(),
seeded_rng,
@@ -278,14 +277,6 @@ impl State {
Ok(Self(state))
}
- pub fn add_child_worker(&self, handle: WorkerHandle) -> u32 {
- let mut inner_state = self.borrow_mut();
- let worker_id =
- inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
- inner_state.workers.insert(worker_id, handle);
- worker_id
- }
-
#[inline]
pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> {
self.borrow().permissions.check_read(path)
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index c0a712aed..7efec476c 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -2,17 +2,19 @@
use crate::ops;
use crate::state::State;
use crate::worker::Worker;
+use crate::worker::WorkerEvent;
use deno_core;
use deno_core::ErrBox;
use deno_core::StartupData;
use futures::future::FutureExt;
+use futures::stream::StreamExt;
+use futures::SinkExt;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
-
/// This worker is implementation of `Worker` Web API
///
/// At the moment this type of worker supports only
@@ -20,7 +22,10 @@ use std::task::Poll;
///
/// Each `WebWorker` is either a child of `MainWorker` or other
/// `WebWorker`.
-pub struct WebWorker(Worker);
+pub struct WebWorker {
+ worker: Worker,
+ is_ready: bool,
+}
impl WebWorker {
pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
@@ -36,20 +41,23 @@ impl WebWorker {
ops::fetch::init(isolate, &state);
}
- Self(worker)
+ Self {
+ worker,
+ is_ready: false,
+ }
}
}
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
- &self.0
+ &self.worker
}
}
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+ &mut self.worker
}
}
@@ -58,14 +66,58 @@ impl Future for WebWorker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
- inner.0.poll_unpin(cx)
+ let worker = &mut inner.worker;
+
+ if !inner.is_ready {
+ match worker.poll_unpin(cx) {
+ Poll::Ready(r) => {
+ if let Err(e) = r {
+ let mut sender = worker.internal_channels.sender.clone();
+ futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
+ .expect("Failed to post message to host");
+ }
+ inner.is_ready = true;
+ }
+ Poll::Pending => {}
+ }
+ }
+
+ let maybe_msg = {
+ match worker.internal_channels.receiver.poll_next_unpin(cx) {
+ Poll::Ready(r) => match r {
+ Some(msg) => {
+ let msg_str = String::from_utf8(msg.to_vec()).unwrap();
+ debug!("received message from host: {}", msg_str);
+ Some(msg_str)
+ }
+ None => {
+ debug!("channel closed by host, worker event loop shuts down");
+ return Poll::Ready(Ok(()));
+ }
+ },
+ Poll::Pending => None,
+ }
+ };
+
+ if let Some(msg) = maybe_msg {
+ // TODO: just add second value and then bind using rusty_v8
+ // to get structured clone/transfer working
+ let script = format!("workerMessageRecvCallback({})", msg);
+ worker
+ .execute(&script)
+ .expect("Failed to execute message cb");
+ // Let worker be polled again
+ inner.is_ready = false;
+ worker.waker.wake();
+ }
+
+ Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
- use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util;
@@ -104,8 +156,7 @@ mod tests {
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
- let mut rt = tokio_util::create_basic_runtime();
- let r = run_worker_loop(&mut rt, &mut worker);
+ let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});
@@ -154,8 +205,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
- let mut rt = tokio_util::create_basic_runtime();
- let r = run_worker_loop(&mut rt, &mut worker);
+ let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});