summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-09 21:07:14 +0100
committerBert Belder <bertbelder@gmail.com>2019-11-09 12:07:14 -0800
commit335e8bd33cd22891f12b58c4d9489d19c886ae4c (patch)
tree29d2c7098d922d325ea7ddf26a935eb498062f9d /cli/worker.rs
parentd586f119fa588a590a4ba2b74c8c210de710e3e7 (diff)
refactor: worker is no longer a resource (#3290)
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs115
1 files changed, 54 insertions, 61 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index eeda364c9..5d4675d00 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -1,10 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::deno_error::bad_resource;
use crate::fmt_errors::JSError;
use crate::ops;
-use crate::resources;
-use crate::resources::Resource;
-use crate::resources::ResourceId;
use crate::state::ThreadSafeState;
use deno;
use deno::Buf;
@@ -23,7 +19,7 @@ use std::sync::Mutex;
use tokio::sync::mpsc;
use url::Url;
-/// Wraps mpsc channels into a generic resource so they can be referenced
+/// Wraps mpsc channels so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
pub struct WorkerChannels {
@@ -31,8 +27,6 @@ pub struct WorkerChannels {
pub receiver: mpsc::Receiver<Buf>,
}
-impl Resource for WorkerChannels {}
-
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
/// high-level module loading.
#[derive(Clone)]
@@ -40,6 +34,7 @@ pub struct Worker {
pub name: String,
isolate: Arc<Mutex<deno::Isolate>>,
pub state: ThreadSafeState,
+ external_channels: Arc<Mutex<WorkerChannels>>,
}
impl Worker {
@@ -47,6 +42,7 @@ impl Worker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
+ external_channels: WorkerChannels,
) -> Self {
let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
{
@@ -86,10 +82,12 @@ impl Worker {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
+
Self {
name,
isolate,
state,
+ external_channels: Arc::new(Mutex::new(external_channels)),
}
}
@@ -140,35 +138,20 @@ impl Worker {
})
}
- /// Post message to worker as a host or privileged overlord
- pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> {
- Worker::post_message_to_resource(self.state.rid, buf)
- }
-
- pub fn post_message_to_resource(
- rid: resources::ResourceId,
- buf: Buf,
- ) -> Result<Async<()>, ErrBox> {
- debug!("post message to resource {}", rid);
- let mut table = resources::lock_resource_table();
- let worker = table
- .get_mut::<WorkerChannels>(rid)
- .ok_or_else(bad_resource)?;
- let sender = &mut worker.sender;
- sender
- .send(buf)
- .poll()
- .map(|_| Async::Ready(()))
- .map_err(ErrBox::from)
+ /// Post message to worker as a host.
+ ///
+ /// This method blocks current thread.
+ pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
+ let mut channels = self.external_channels.lock().unwrap();
+ let sender = &mut channels.sender;
+ sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from)
}
+ /// Get message from worker as a host.
pub fn get_message(self: &Self) -> WorkerReceiver {
- Worker::get_message_from_resource(self.state.rid)
- }
-
- pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver {
- debug!("get message from resource {}", rid);
- WorkerReceiver { rid }
+ WorkerReceiver {
+ channels: self.external_channels.clone(),
+ }
}
}
@@ -186,7 +169,7 @@ impl Future for Worker {
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
- rid: ResourceId,
+ channels: Arc<Mutex<WorkerChannels>>,
}
impl Future for WorkerReceiver {
@@ -194,12 +177,8 @@ impl Future for WorkerReceiver {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
- let mut table = resources::lock_resource_table();
- let worker = table
- .get_mut::<WorkerChannels>(self.rid)
- .ok_or_else(bad_resource)?;
- let receiver = &mut worker.receiver;
- receiver.poll().map_err(ErrBox::from)
+ let mut channels = self.channels.lock().unwrap();
+ channels.receiver.poll().map_err(ErrBox::from)
}
}
@@ -214,7 +193,6 @@ mod tests {
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::future::lazy;
- use futures::IntoFuture;
use std::sync::atomic::Ordering;
#[test]
@@ -233,13 +211,18 @@ mod tests {
Progress::new(),
)
.unwrap();
- let state =
- ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
- .unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
+ let state = ThreadSafeState::new(
+ global_state,
+ Some(module_specifier.clone()),
+ true,
+ int,
+ )
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state);
+ Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
.execute_mod_async(&module_specifier, None, false)
.then(|result| {
@@ -269,13 +252,18 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
.unwrap();
- let state =
- ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
- .unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
+ let state = ThreadSafeState::new(
+ global_state,
+ Some(module_specifier.clone()),
+ true,
+ int,
+ )
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state);
+ Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
.execute_mod_async(&module_specifier, None, false)
.then(|result| {
@@ -308,10 +296,12 @@ mod tests {
flags.reload = true;
let global_state =
ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
Some(module_specifier.clone()),
true,
+ int,
)
.unwrap();
let global_state_ = global_state.clone();
@@ -321,6 +311,7 @@ mod tests {
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
+ ext,
);
worker.execute("denoMain()").unwrap();
worker
@@ -343,12 +334,17 @@ mod tests {
}
fn create_test_worker() -> Worker {
- let state = ThreadSafeState::mock(vec![
- String::from("./deno"),
- String::from("hello.js"),
- ]);
- let mut worker =
- Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state);
+ let (int, ext) = ThreadSafeState::create_channels();
+ let state = ThreadSafeState::mock(
+ vec![String::from("./deno"), String::from("hello.js")],
+ int,
+ );
+ let mut worker = Worker::new(
+ "TEST".to_string(),
+ startup_data::deno_isolate_init(),
+ state,
+ ext,
+ );
worker.execute("denoMain()").unwrap();
worker.execute("workerMain()").unwrap();
worker
@@ -384,7 +380,7 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = worker_.post_message(msg).into_future().wait();
+ let r = worker_.post_message(msg);
assert!(r.is_ok());
let maybe_msg = worker_.get_message().wait().unwrap();
@@ -396,7 +392,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
- let r = worker_.post_message(msg).into_future().wait();
+ let r = worker_.post_message(msg);
assert!(r.is_ok());
})
}
@@ -409,9 +405,7 @@ mod tests {
.execute("onmessage = () => { delete window.onmessage; }")
.unwrap();
- let rid = worker.state.rid;
let worker_ = worker.clone();
-
let worker_future = worker
.then(move |r| -> Result<(), ()> {
println!("workers.rs after resource close");
@@ -424,9 +418,8 @@ mod tests {
tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = worker_.post_message(msg).into_future().wait();
+ let r = worker_.post_message(msg);
assert!(r.is_ok());
- debug!("rid {:?}", rid);
worker_future.wait().unwrap();
})