diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 115 |
1 files changed, 54 insertions, 61 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index eeda364c9..5d4675d00 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,10 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::deno_error::bad_resource; use crate::fmt_errors::JSError; use crate::ops; -use crate::resources; -use crate::resources::Resource; -use crate::resources::ResourceId; use crate::state::ThreadSafeState; use deno; use deno::Buf; @@ -23,7 +19,7 @@ use std::sync::Mutex; use tokio::sync::mpsc; use url::Url; -/// Wraps mpsc channels into a generic resource so they can be referenced +/// Wraps mpsc channels so they can be referenced /// from ops and used to facilitate parent-child communication /// for workers. pub struct WorkerChannels { @@ -31,8 +27,6 @@ pub struct WorkerChannels { pub receiver: mpsc::Receiver<Buf>, } -impl Resource for WorkerChannels {} - /// Wraps deno::Isolate to provide source maps, ops for the CLI, and /// high-level module loading. #[derive(Clone)] @@ -40,6 +34,7 @@ pub struct Worker { pub name: String, isolate: Arc<Mutex<deno::Isolate>>, pub state: ThreadSafeState, + external_channels: Arc<Mutex<WorkerChannels>>, } impl Worker { @@ -47,6 +42,7 @@ impl Worker { name: String, startup_data: StartupData, state: ThreadSafeState, + external_channels: WorkerChannels, ) -> Self { let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false))); { @@ -86,10 +82,12 @@ impl Worker { JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }) } + Self { name, isolate, state, + external_channels: Arc::new(Mutex::new(external_channels)), } } @@ -140,35 +138,20 @@ impl Worker { }) } - /// Post message to worker as a host or privileged overlord - pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> { - Worker::post_message_to_resource(self.state.rid, buf) - } - - pub fn post_message_to_resource( - rid: resources::ResourceId, - buf: Buf, - ) -> Result<Async<()>, ErrBox> { - debug!("post message to resource {}", rid); - let mut table = resources::lock_resource_table(); - let worker = table - .get_mut::<WorkerChannels>(rid) - .ok_or_else(bad_resource)?; - let sender = &mut worker.sender; - sender - .send(buf) - .poll() - .map(|_| Async::Ready(())) - .map_err(ErrBox::from) + /// Post message to worker as a host. + /// + /// This method blocks current thread. + pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> { + let mut channels = self.external_channels.lock().unwrap(); + let sender = &mut channels.sender; + sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from) } + /// Get message from worker as a host. pub fn get_message(self: &Self) -> WorkerReceiver { - Worker::get_message_from_resource(self.state.rid) - } - - pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver { - debug!("get message from resource {}", rid); - WorkerReceiver { rid } + WorkerReceiver { + channels: self.external_channels.clone(), + } } } @@ -186,7 +169,7 @@ impl Future for Worker { /// that will return message received from worker or None /// if worker's channel has been closed. pub struct WorkerReceiver { - rid: ResourceId, + channels: Arc<Mutex<WorkerChannels>>, } impl Future for WorkerReceiver { @@ -194,12 +177,8 @@ impl Future for WorkerReceiver { type Error = ErrBox; fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> { - let mut table = resources::lock_resource_table(); - let worker = table - .get_mut::<WorkerChannels>(self.rid) - .ok_or_else(bad_resource)?; - let receiver = &mut worker.receiver; - receiver.poll().map_err(ErrBox::from) + let mut channels = self.channels.lock().unwrap(); + channels.receiver.poll().map_err(ErrBox::from) } } @@ -214,7 +193,6 @@ mod tests { use crate::state::ThreadSafeState; use crate::tokio_util; use futures::future::lazy; - use futures::IntoFuture; use std::sync::atomic::Ordering; #[test] @@ -233,13 +211,18 @@ mod tests { Progress::new(), ) .unwrap(); - let state = - ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) - .unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); + let state = ThreadSafeState::new( + global_state, + Some(module_specifier.clone()), + true, + int, + ) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state); + Worker::new("TEST".to_string(), StartupData::None, state, ext); worker .execute_mod_async(&module_specifier, None, false) .then(|result| { @@ -269,13 +252,18 @@ mod tests { let global_state = ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new()) .unwrap(); - let state = - ThreadSafeState::new(global_state, Some(module_specifier.clone()), true) - .unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); + let state = ThreadSafeState::new( + global_state, + Some(module_specifier.clone()), + true, + int, + ) + .unwrap(); let state_ = state.clone(); tokio_util::run(lazy(move || { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state); + Worker::new("TEST".to_string(), StartupData::None, state, ext); worker .execute_mod_async(&module_specifier, None, false) .then(|result| { @@ -308,10 +296,12 @@ mod tests { flags.reload = true; let global_state = ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap(); + let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::new( global_state.clone(), Some(module_specifier.clone()), true, + int, ) .unwrap(); let global_state_ = global_state.clone(); @@ -321,6 +311,7 @@ mod tests { "TEST".to_string(), startup_data::deno_isolate_init(), state, + ext, ); worker.execute("denoMain()").unwrap(); worker @@ -343,12 +334,17 @@ mod tests { } fn create_test_worker() -> Worker { - let state = ThreadSafeState::mock(vec![ - String::from("./deno"), - String::from("hello.js"), - ]); - let mut worker = - Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state); + let (int, ext) = ThreadSafeState::create_channels(); + let state = ThreadSafeState::mock( + vec![String::from("./deno"), String::from("hello.js")], + int, + ); + let mut worker = Worker::new( + "TEST".to_string(), + startup_data::deno_isolate_init(), + state, + ext, + ); worker.execute("denoMain()").unwrap(); worker.execute("workerMain()").unwrap(); worker @@ -384,7 +380,7 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg).into_future().wait(); + let r = worker_.post_message(msg); assert!(r.is_ok()); let maybe_msg = worker_.get_message().wait().unwrap(); @@ -396,7 +392,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = worker_.post_message(msg).into_future().wait(); + let r = worker_.post_message(msg); assert!(r.is_ok()); }) } @@ -409,9 +405,7 @@ mod tests { .execute("onmessage = () => { delete window.onmessage; }") .unwrap(); - let rid = worker.state.rid; let worker_ = worker.clone(); - let worker_future = worker .then(move |r| -> Result<(), ()> { println!("workers.rs after resource close"); @@ -424,9 +418,8 @@ mod tests { tokio::spawn(lazy(move || worker_future_.then(|_| Ok(())))); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = worker_.post_message(msg).into_future().wait(); + let r = worker_.post_message(msg); assert!(r.is_ok()); - debug!("rid {:?}", rid); worker_future.wait().unwrap(); }) |