diff options
Diffstat (limited to 'cli')
-rw-r--r-- | cli/compilers/ts.rs | 7 | ||||
-rw-r--r-- | cli/js/lib.deno_runtime.d.ts | 2 | ||||
-rw-r--r-- | cli/js/workers.ts | 22 | ||||
-rw-r--r-- | cli/lib.rs | 10 | ||||
-rw-r--r-- | cli/ops/workers.rs | 73 | ||||
-rw-r--r-- | cli/state.rs | 70 | ||||
-rw-r--r-- | cli/worker.rs | 115 |
7 files changed, 155 insertions, 144 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 854963eaa..327b3fbeb 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -223,8 +223,10 @@ impl TsCompiler { /// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime. fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { - let worker_state = ThreadSafeState::new(global_state.clone(), None, true) - .expect("Unable to create worker state"); + let (int, ext) = ThreadSafeState::create_channels(); + let worker_state = + ThreadSafeState::new(global_state.clone(), None, true, int) + .expect("Unable to create worker state"); // Count how many times we start the compiler worker. global_state @@ -236,6 +238,7 @@ impl TsCompiler { "TS".to_string(), startup_data::compiler_isolate_init(), worker_state, + ext, ); worker.execute("denoMain()").unwrap(); worker.execute("workerMain()").unwrap(); diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts index 4331319bf..1f01f1384 100644 --- a/cli/js/lib.deno_runtime.d.ts +++ b/cli/js/lib.deno_runtime.d.ts @@ -2647,7 +2647,7 @@ declare namespace workers { noDenoNamespace?: boolean; } export class WorkerImpl implements Worker { - private readonly rid; + private readonly id; private isClosing; private readonly isClosedPromise; onerror?: () => void; diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 37061063f..028817573 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -35,17 +35,17 @@ function createWorker( }); } -async function hostGetWorkerClosed(rid: number): Promise<void> { - await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { rid }); +async function hostGetWorkerClosed(id: number): Promise<void> { + await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { id }); } -function hostPostMessage(rid: number, data: any): void { +function hostPostMessage(id: number, data: any): void { const dataIntArray = encodeMessage(data); - sendSync(dispatch.OP_HOST_POST_MESSAGE, { rid }, dataIntArray); + sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray); } -async function hostGetMessage(rid: number): Promise<any> { - const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { rid }); +async function hostGetMessage(id: number): Promise<any> { + const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); if (res.data != null) { return decodeMessage(new Uint8Array(res.data)); @@ -123,7 +123,7 @@ export interface DenoWorkerOptions extends WorkerOptions { } export class WorkerImpl implements Worker { - private readonly rid: number; + private readonly id: number; private isClosing = false; private readonly isClosedPromise: Promise<void>; public onerror?: () => void; @@ -152,14 +152,14 @@ export class WorkerImpl implements Worker { sourceCode = blobBytes!; } - this.rid = createWorker( + this.id = createWorker( specifier, includeDenoNamespace, hasSourceCode, sourceCode ); this.run(); - this.isClosedPromise = hostGetWorkerClosed(this.rid); + this.isClosedPromise = hostGetWorkerClosed(this.id); this.isClosedPromise.then( (): void => { this.isClosing = true; @@ -172,12 +172,12 @@ export class WorkerImpl implements Worker { } postMessage(data: any): void { - hostPostMessage(this.rid, data); + hostPostMessage(this.id, data); } private async run(): Promise<void> { while (!this.isClosing) { - const data = await hostGetMessage(this.rid); + const data = await hostGetMessage(this.id); if (data == null) { log("worker got null message. quitting."); break; diff --git a/cli/lib.rs b/cli/lib.rs index a22f61799..637986f9f 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -118,16 +118,22 @@ fn create_worker_and_state( .map_err(deno_error::print_err_and_exit) .unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::new( global_state.clone(), global_state.main_module.clone(), true, + int, ) .map_err(deno_error::print_err_and_exit) .unwrap(); - let worker = - Worker::new("main".to_string(), startup_data::deno_isolate_init(), state); + let worker = Worker::new( + "main".to_string(), + startup_data::deno_isolate_init(), + state, + ext, + ); (worker, global_state) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index f6dcf8042..cf7378a91 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -1,5 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; @@ -11,7 +12,6 @@ use deno::*; use futures; use futures::Async; use futures::Future; -use futures::IntoFuture; use futures::Sink; use futures::Stream; use std; @@ -138,23 +138,23 @@ fn op_create_worker( } } + let (int, ext) = ThreadSafeState::create_channels(); let child_state = ThreadSafeState::new( state.global_state.clone(), Some(module_specifier.clone()), include_deno_namespace, + int, )?; - let rid = child_state.rid; let name = format!("USER-WORKER-{}", specifier); let deno_main_call = format!("denoMain({})", include_deno_namespace); let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), child_state); + Worker::new(name, startup_data::deno_isolate_init(), child_state, ext); js_check(worker.execute(&deno_main_call)); js_check(worker.execute("workerMain()")); let exec_cb = move |worker: Worker| { - let mut workers_tl = parent_state.workers.lock().unwrap(); - workers_tl.insert(rid, worker.shared()); - json!(rid) + let worker_id = parent_state.add_child_worker(worker); + json!(worker_id) }; // Has provided source code, execute immediately. @@ -173,7 +173,7 @@ fn op_create_worker( #[derive(Deserialize)] struct HostGetWorkerClosedArgs { - rid: i32, + id: i32, } /// Return when the worker closes @@ -183,37 +183,41 @@ fn op_host_get_worker_closed( _data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; - let state = state.clone(); - - let shared_worker_future = { - let workers_tl = state.workers.lock().unwrap(); - let worker = workers_tl.get(&rid).unwrap(); - worker.clone() - }; - - let op = - shared_worker_future.then(move |_result| futures::future::ok(json!({}))); + let id = args.id as u32; + let state_ = state.clone(); + let workers_table = state.workers.lock().unwrap(); + // TODO: handle bad worker id gracefully + let worker = workers_table.get(&id).unwrap(); + let shared_worker_future = worker.clone().shared(); + + let op = shared_worker_future.then(move |_result| { + let mut workers_table = state_.workers.lock().unwrap(); + workers_table.remove(&id); + futures::future::ok(json!({})) + }); Ok(JsonOp::Async(Box::new(op))) } #[derive(Deserialize)] struct HostGetMessageArgs { - rid: i32, + id: i32, } /// Get message from guest worker as host fn op_host_get_message( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetMessageArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let op = Worker::get_message_from_resource(rid) + let id = args.id as u32; + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + let op = worker + .get_message() .map_err(move |_| -> ErrBox { unimplemented!() }) .and_then(move |maybe_buf| { futures::future::ok(json!({ @@ -226,27 +230,26 @@ fn op_host_get_message( #[derive(Deserialize)] struct HostPostMessageArgs { - rid: i32, + id: i32, } /// Post message to guest worker as host fn op_host_post_message( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostPostMessageArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; - - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - - // TODO: rename to post_message_to_child(rid, d) - Worker::post_message_to_resource(rid, d) - .into_future() - .wait() + let id = args.id as u32; + let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + + debug!("post message to worker {}", id); + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + worker + .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/state.rs b/cli/state.rs index 8d14042aa..edfac72c0 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -6,8 +6,6 @@ use crate::import_map::ImportMap; use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::permissions::DenoPermissions; -use crate::resources; -use crate::resources::ResourceId; use crate::worker::Worker; use crate::worker::WorkerChannels; use deno::Buf; @@ -17,7 +15,6 @@ use deno::Loader; use deno::ModuleSpecifier; use deno::Op; use deno::PinnedBuf; -use futures::future::Shared; use futures::Future; use rand::rngs::StdRng; use rand::SeedableRng; @@ -26,15 +23,12 @@ use std; use std::collections::HashMap; use std::ops::Deref; use std::str; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; -use tokio::sync::mpsc as async_mpsc; - -// TODO: hold references to concrete Workers instead of shared futures of -// those workers? -pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>; +use tokio::sync::mpsc; /// Isolate cannot be passed between threads but ThreadSafeState can. /// ThreadSafeState satisfies Send and Sync. So any state that needs to be @@ -53,10 +47,9 @@ pub struct State { pub import_map: Option<ImportMap>, pub metrics: Metrics, pub global_timer: Mutex<GlobalTimer>, - pub workers: Mutex<UserWorkerTable>, + pub workers: Mutex<HashMap<u32, Worker>>, + pub next_worker_id: AtomicUsize, pub start_time: Instant, - /// A reference to this worker's resource. - pub rid: ResourceId, pub seeded_rng: Option<Mutex<StdRng>>, pub include_deno_namespace: bool, } @@ -179,25 +172,26 @@ impl Loader for ThreadSafeState { } impl ThreadSafeState { - pub fn new( - global_state: ThreadSafeGlobalState, - main_module: Option<ModuleSpecifier>, - include_deno_namespace: bool, - ) -> Result<Self, ErrBox> { - let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1); - let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1); + pub fn create_channels() -> (WorkerChannels, WorkerChannels) { + let (in_tx, in_rx) = mpsc::channel::<Buf>(1); + let (out_tx, out_rx) = mpsc::channel::<Buf>(1); let internal_channels = WorkerChannels { - sender: worker_out_tx, - receiver: worker_in_rx, + sender: out_tx, + receiver: in_rx, }; let external_channels = WorkerChannels { - sender: worker_in_tx, - receiver: worker_out_rx, + sender: in_tx, + receiver: out_rx, }; + (internal_channels, external_channels) + } - let mut table = resources::lock_resource_table(); - let rid = table.add("worker", Box::new(external_channels)); - + pub fn new( + global_state: ThreadSafeGlobalState, + main_module: Option<ModuleSpecifier>, + include_deno_namespace: bool, + internal_channels: WorkerChannels, + ) -> Result<Self, ErrBox> { let import_map: Option<ImportMap> = match global_state.flags.import_map_path.as_ref() { None => None, @@ -221,9 +215,9 @@ impl ThreadSafeState { worker_channels: Mutex::new(internal_channels), metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), - workers: Mutex::new(UserWorkerTable::new()), + workers: Mutex::new(HashMap::new()), + next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), - rid, seeded_rng, include_deno_namespace, }; @@ -231,6 +225,13 @@ impl ThreadSafeState { Ok(ThreadSafeState(Arc::new(state))) } + pub fn add_child_worker(&self, worker: Worker) -> u32 { + let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; + let mut workers_tl = self.workers.lock().unwrap(); + workers_tl.insert(worker_id, worker); + worker_id + } + #[inline] pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> { self.permissions.check_read(filename) @@ -286,7 +287,10 @@ impl ThreadSafeState { } #[cfg(test)] - pub fn mock(argv: Vec<String>) -> ThreadSafeState { + pub fn mock( + argv: Vec<String>, + internal_channels: WorkerChannels, + ) -> ThreadSafeState { let module_specifier = if argv.is_empty() { None } else { @@ -299,6 +303,7 @@ impl ThreadSafeState { ThreadSafeGlobalState::mock(argv), module_specifier, true, + internal_channels, ) .unwrap() } @@ -331,8 +336,9 @@ impl ThreadSafeState { #[test] fn thread_safe() { fn f<S: Send + Sync>(_: S) {} - f(ThreadSafeState::mock(vec![ - String::from("./deno"), - String::from("hello.js"), - ])); + let (int, _) = ThreadSafeState::create_channels(); + f(ThreadSafeState::mock( + vec![String::from("./deno"), String::from("hello.js")], + int, + )); } diff --git a/cli/worker.rs b/cli/worker.rs index eeda364c9..5d4675d00 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,10 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::deno_error::bad_resource; use crate::fmt_errors::JSError; use crate::ops; -use crate::resources; -use crate::resources::Resource; -use crate::resources::ResourceId; use crate::state::ThreadSafeState; use deno; use deno::Buf; @@ -23,7 +19,7 @@ use std::sync::Mutex; use tokio::sync::mpsc; use url::Url; -/// Wraps mpsc channels into a generic resource so they can be referenced +/// Wraps mpsc channels so they can be referenced /// from ops and used to facilitate parent-child communication /// for workers. pub struct WorkerChannels { @@ -31,8 +27,6 @@ pub struct WorkerChannels { pub receiver: mpsc::Receiver<Buf>, } -impl Resource for WorkerChannels {} - /// Wraps deno::Isolate to provide source maps, ops for the CLI, and /// high-level module loading. #[derive(Clone)] @@ -40,6 +34,7 @@ pub struct Worker { pub name: String, isolate: Arc<Mutex<deno::Isolate>>, pub state: ThreadSafeState, + external_channels: Arc<Mutex<WorkerChannels>>, } impl Worker { @@ -47,6 +42,7 @@ impl Worker { name: String, startup_data: StartupData, state: ThreadSafeState, + external_channels: WorkerChannels, ) -> Self { let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false))); { @@ -86,10 +82,12 @@ impl Worker { JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }) } + Self { name, isolate, state, + external_channels: Arc::new(Mutex::new(external_channels)), } } @@ -140,35 +138,20 @@ impl Worker { }) } - /// Post message to worker as a host or privileged overlord - pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> { - Worker::post_message_to_resource(self.state.rid, buf) - } - - pub fn post_message_to_resource( - rid: resources::ResourceId, - buf: Buf, - ) -> Result<Async<()>, ErrBox> { - debug!("post message to resource {}", rid); - let mut table = resources::lock_resource_table(); - let worker = table - .get_mut::<WorkerChannels>(rid) - .ok_or_else(bad_resource)?; - let sender = &mut worker.sender; - sender - .send(buf) - .poll() - .map(|_| Async::Ready(())) - .map_err(ErrBox::from) + /// Post message to worker as a host. + /// + /// This method blocks current thread. + pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> { + let mut channels = self.external_channels.lock().unwrap(); + let sender = &mut channels.sender; + sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from) } + /// Get message from worker as a host. pub fn get_message(self: &Self) -> WorkerReceiver { - Worker::get_message_from_resource(self.state.rid) - } - - pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver { - debug!("get message from resource {}", rid); - WorkerReceiver { rid } + WorkerReceiver { + channels: self.external_channels.clone(), + } } } @@ -186,7 +169,7 @@ impl Future for Worker { /// that will return message received from worker or None /// if worker's channel has been closed. pub struct WorkerReceiver { - rid: ResourceId, + channels: Arc<Mutex<WorkerChannels>>, } impl Future for WorkerReceiver { @@ -194,12 +177,8 @@ impl Future for WorkerReceiver { type Error = ErrBox; fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> { - let mut table = resources::lock_resource_table(); - let worker = table - .get_mut::<WorkerChannels>(self.rid) - .ok_or_else(bad_resource)?; - let receiver = &mut worker.receiver; - receiver.poll().map_err(ErrBox::from) + let mut channels = self.channels.lock().unwrap(); + channels.receiver.poll().map_err(ErrBox::from) } } @@ -214,7 +193,6 @@ mod tests { use crate::state::ThreadSafeState; use crate::tokio_util; use futures::future::lazy; - use futures::IntoFuture; use std::sync::atomic::Ordering; #[test] @@ -233,13 +211,18 @@ mod tests { Progress::new(), ) .unwrap(); - let state = - ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) - .unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); + let state = ThreadSafeState::new( + global_state, + Some(module_specifier.clone()), + true, + int, + ) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state); + Worker::new("TEST".to_string(), StartupData::None, state, ext); worker .execute_mod_async(&module_specifier, None, false) .then(|result| { @@ -269,13 +252,18 @@ mod tests { let global_state = ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new()) .unwrap(); - let state = - ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) - .unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); + let state = ThreadSafeState::new( + global_state, + Some(module_specifier.clone()), + true, + int, + ) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state); + Worker::new("TEST".to_string(), StartupData::None, state, ext); worker .execute_mod_async(&module_specifier, None, false) .then(|result| { @@ -308,10 +296,12 @@ mod tests { flags.reload = true; let global_state = ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::new( global_state.clone(), Some(module_specifier.clone()), true, + int, ) .unwrap(); let global_state_ = global_state.clone(); @@ -321,6 +311,7 @@ mod tests { "TEST".to_string(), startup_data::deno_isolate_init(), state, + ext, ); worker.execute("denoMain()").unwrap(); worker @@ -343,12 +334,17 @@ mod tests { } fn create_test_worker() -> Worker { - let state = ThreadSafeState::mock(vec![ - String::from("./deno"), - String::from("hello.js"), - ]); - let mut worker = - Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state); + let (int, ext) = ThreadSafeState::create_channels(); + let state = ThreadSafeState::mock( + vec![String::from("./deno"), String::from("hello.js")], + int, + ); + let mut worker = Worker::new( + "TEST".to_string(), + startup_data::deno_isolate_init(), + state, + ext, + ); worker.execute("denoMain()").unwrap(); worker.execute("workerMain()").unwrap(); worker @@ -384,7 +380,7 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg).into_future().wait(); + let r = worker_.post_message(msg); assert!(r.is_ok()); let maybe_msg = worker_.get_message().wait().unwrap(); @@ -396,7 +392,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = worker_.post_message(msg).into_future().wait(); + let r = worker_.post_message(msg); assert!(r.is_ok()); }) } @@ -409,9 +405,7 @@ mod tests { .execute("onmessage = () => { delete window.onmessage; }") .unwrap(); - let rid = worker.state.rid; let worker_ = worker.clone(); - let worker_future = worker .then(move |r| -> Result<(), ()> { println!("workers.rs after resource close"); @@ -424,9 +418,8 @@ mod tests { tokio::spawn(lazy(move || worker_future_.then(|_| Ok(())))); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg).into_future().wait(); + let r = worker_.post_message(msg); assert!(r.is_ok()); - debug!("rid {:?}", rid); worker_future.wait().unwrap(); }) |