summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/compilers/compiler_worker.rs4
-rw-r--r--cli/compilers/ts.rs4
-rw-r--r--cli/compilers/wasm.rs4
-rw-r--r--cli/lib.rs10
-rw-r--r--cli/ops/web_worker.rs11
-rw-r--r--cli/ops/worker_host.rs3
-rw-r--r--cli/state.rs38
-rw-r--r--cli/web_worker.rs8
-rw-r--r--cli/worker.rs76
9 files changed, 79 insertions, 79 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs
index 87144c1d0..f76395e75 100644
--- a/cli/compilers/compiler_worker.rs
+++ b/cli/compilers/compiler_worker.rs
@@ -2,7 +2,6 @@
use crate::ops;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
-use crate::worker::WorkerChannels;
use deno_core;
use deno_core::StartupData;
use std::ops::Deref;
@@ -28,10 +27,9 @@ impl CompilerWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
- external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
- let mut worker = Worker::new(name, startup_data, state_, external_channels);
+ let mut worker = Worker::new(name, startup_data, state_);
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index 8517c319b..c7896ec61 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -240,11 +240,10 @@ impl TsCompiler {
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's
/// runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
- let (int, ext) = ThreadSafeState::create_channels();
let entry_point =
ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap();
let worker_state =
- ThreadSafeState::new(global_state.clone(), None, entry_point, int)
+ ThreadSafeState::new(global_state.clone(), None, entry_point)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
@@ -257,7 +256,6 @@ impl TsCompiler {
"TS".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
- ext,
);
worker.execute("bootstrapTsCompilerRuntime()").unwrap();
worker
diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs
index 0cba8118e..a3e578f01 100644
--- a/cli/compilers/wasm.rs
+++ b/cli/compilers/wasm.rs
@@ -45,12 +45,11 @@ pub struct WasmCompiler {
impl WasmCompiler {
/// Create a new V8 worker with snapshot of WASM compiler and setup compiler's runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
- let (int, ext) = ThreadSafeState::create_channels();
let entry_point =
ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts")
.unwrap();
let worker_state =
- ThreadSafeState::new(global_state.clone(), None, entry_point, int)
+ ThreadSafeState::new(global_state.clone(), None, entry_point)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
@@ -63,7 +62,6 @@ impl WasmCompiler {
"WASM".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
- ext,
);
worker.execute("bootstrapWasmCompilerRuntime()").unwrap();
worker
diff --git a/cli/lib.rs b/cli/lib.rs
index a7390b515..fee9e079f 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -122,8 +122,7 @@ fn create_main_worker(
global_state: ThreadSafeGlobalState,
main_module: ModuleSpecifier,
) -> MainWorker {
- let (int, ext) = ThreadSafeState::create_channels();
- let state = ThreadSafeState::new(global_state, None, main_module, int)
+ let state = ThreadSafeState::new(global_state, None, main_module)
.map_err(deno_error::print_err_and_exit)
.unwrap();
@@ -136,12 +135,7 @@ fn create_main_worker(
resource_table.add("stderr", Box::new(stderr));
}
- MainWorker::new(
- "main".to_string(),
- startup_data::deno_isolate_init(),
- state,
- ext,
- )
+ MainWorker::new("main".to_string(), startup_data::deno_isolate_init(), state)
}
fn types_command() {
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
index db7086c59..7bfc70a84 100644
--- a/cli/ops/web_worker.rs
+++ b/cli/ops/web_worker.rs
@@ -7,8 +7,6 @@ use crate::state::ThreadSafeState;
use deno_core::*;
use futures;
use futures::future::FutureExt;
-use futures::sink::SinkExt;
-use futures::stream::StreamExt;
use std;
use std::convert::From;
@@ -31,8 +29,8 @@ fn op_worker_get_message(
) -> Result<JsonOp, ErrBox> {
let state_ = state.clone();
let op = async move {
- let mut receiver = state_.worker_channels.receiver.lock().await;
- let maybe_buf = receiver.next().await;
+ let c = state_.worker_channels_internal.lock().unwrap();
+ let maybe_buf = c.as_ref().unwrap().get_message().await;
debug!("op_worker_get_message");
Ok(json!({ "data": maybe_buf }))
};
@@ -47,8 +45,9 @@ fn op_worker_post_message(
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let mut sender = state.worker_channels.sender.clone();
- futures::executor::block_on(sender.send(d))
+ let c = state.worker_channels_internal.lock().unwrap();
+ let fut = c.as_ref().unwrap().post_message(d);
+ futures::executor::block_on(fut)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index b033b9ed9..c1dcd6aaa 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -99,12 +99,10 @@ fn op_create_worker(
result.unwrap()
};
- let (int, ext) = ThreadSafeState::create_channels();
let result = ThreadSafeState::new_for_worker(
parent_state.global_state.clone(),
Some(parent_state.permissions.clone()), // by default share with parent
module_specifier.clone(),
- int,
);
if let Err(err) = result {
load_sender.send(Err(err)).unwrap();
@@ -122,7 +120,6 @@ fn op_create_worker(
worker_name.to_string(),
startup_data::deno_isolate_init(),
child_state,
- ext,
);
let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name);
js_check(worker.execute(&script));
diff --git a/cli/state.rs b/cli/state.rs
index 903c5871b..f09191f11 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -9,7 +9,8 @@ use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::web_worker::WebWorker;
-use crate::worker::WorkerChannels;
+use crate::worker::WorkerChannelsExternal;
+use crate::worker::WorkerChannelsInternal;
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::ErrBox;
@@ -36,7 +37,6 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;
-use tokio::sync::Mutex as AsyncMutex;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@@ -48,14 +48,13 @@ pub struct State {
pub global_state: ThreadSafeGlobalState,
pub permissions: Arc<Mutex<DenoPermissions>>,
pub main_module: ModuleSpecifier,
- // TODO(ry) rename to worker_channels_internal
- pub worker_channels: WorkerChannels,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
- pub workers: Mutex<HashMap<u32, WorkerChannels>>,
+ pub workers: Mutex<HashMap<u32, WorkerChannelsExternal>>,
+ pub worker_channels_internal: Mutex<Option<WorkerChannelsInternal>>,
pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
@@ -222,26 +221,11 @@ impl Loader for ThreadSafeState {
}
impl ThreadSafeState {
- pub fn create_channels() -> (WorkerChannels, WorkerChannels) {
- let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
- let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
- let internal_channels = WorkerChannels {
- sender: out_tx,
- receiver: Arc::new(AsyncMutex::new(in_rx)),
- };
- let external_channels = WorkerChannels {
- sender: in_tx,
- receiver: Arc::new(AsyncMutex::new(out_rx)),
- };
- (internal_channels, external_channels)
- }
-
/// If `shared_permission` is None then permissions from globa state are used.
pub fn new(
global_state: ThreadSafeGlobalState,
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>,
main_module: ModuleSpecifier,
- internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> =
match global_state.flags.import_map_path.as_ref() {
@@ -265,9 +249,9 @@ impl ThreadSafeState {
main_module,
permissions,
import_map,
- worker_channels: internal_channels,
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
+ worker_channels_internal: Mutex::new(None),
workers: Mutex::new(HashMap::new()),
loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0),
@@ -286,7 +270,6 @@ impl ThreadSafeState {
global_state: ThreadSafeGlobalState,
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>,
main_module: ModuleSpecifier,
- internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let seeded_rng = match global_state.flags.seed {
Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))),
@@ -304,9 +287,9 @@ impl ThreadSafeState {
main_module,
permissions,
import_map: None,
- worker_channels: internal_channels,
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
+ worker_channels_internal: Mutex::new(None),
workers: Mutex::new(HashMap::new()),
loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0),
@@ -388,17 +371,13 @@ impl ThreadSafeState {
}
#[cfg(test)]
- pub fn mock(
- main_module: &str,
- internal_channels: WorkerChannels,
- ) -> ThreadSafeState {
+ pub fn mock(main_module: &str) -> ThreadSafeState {
let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module)
.expect("Invalid entry module");
ThreadSafeState::new(
ThreadSafeGlobalState::mock(vec!["deno".to_string()]),
None,
module_specifier,
- internal_channels,
)
.unwrap()
}
@@ -431,6 +410,5 @@ impl ThreadSafeState {
#[test]
fn thread_safe() {
fn f<S: Send + Sync>(_: S) {}
- let (int, _) = ThreadSafeState::create_channels();
- f(ThreadSafeState::mock("./hello.js", int));
+ f(ThreadSafeState::mock("./hello.js"));
}
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index d984c47b6..7115b9027 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -2,7 +2,6 @@
use crate::ops;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
-use crate::worker::WorkerChannels;
use deno_core;
use deno_core::ErrBox;
use deno_core::StartupData;
@@ -28,10 +27,9 @@ impl WebWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
- external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
- let mut worker = Worker::new(name, startup_data, state_, external_channels);
+ let mut worker = Worker::new(name, startup_data, state_);
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
@@ -76,13 +74,11 @@ mod tests {
use crate::tokio_util;
fn create_test_worker() -> WebWorker {
- let (int, ext) = ThreadSafeState::create_channels();
- let state = ThreadSafeState::mock("./hello.js", int);
+ let state = ThreadSafeState::mock("./hello.js");
let mut worker = WebWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
- ext,
);
worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap();
worker.execute("runWorkerMessageLoop()").unwrap();
diff --git a/cli/worker.rs b/cli/worker.rs
index b81a77a9e..1725dbe14 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -52,6 +52,51 @@ impl WorkerChannels {
}
}
+pub struct WorkerChannelsInternal(WorkerChannels);
+
+impl Deref for WorkerChannelsInternal {
+ type Target = WorkerChannels;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for WorkerChannelsInternal {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+#[derive(Clone)]
+pub struct WorkerChannelsExternal(WorkerChannels);
+
+impl Deref for WorkerChannelsExternal {
+ type Target = WorkerChannels;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for WorkerChannelsExternal {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
+ let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
+ let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
+ let internal_channels = WorkerChannelsInternal(WorkerChannels {
+ sender: out_tx,
+ receiver: Arc::new(AsyncMutex::new(in_rx)),
+ });
+ let external_channels = WorkerChannelsExternal(WorkerChannels {
+ sender: in_tx,
+ receiver: Arc::new(AsyncMutex::new(out_rx)),
+ });
+ (internal_channels, external_channels)
+}
+
/// Worker is a CLI wrapper for `deno_core::Isolate`.
///
/// It provides infrastructure to communicate with a worker and
@@ -68,7 +113,7 @@ pub struct Worker {
pub name: String,
pub isolate: Box<deno_core::EsIsolate>,
pub state: ThreadSafeState,
- external_channels: WorkerChannels,
+ external_channels: WorkerChannelsExternal,
}
impl Worker {
@@ -76,7 +121,6 @@ impl Worker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
- external_channels: WorkerChannels,
) -> Self {
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
@@ -86,6 +130,12 @@ impl Worker {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
});
+ let (internal_channels, external_channels) = create_channels();
+ {
+ let mut c = state.worker_channels_internal.lock().unwrap();
+ *c = Some(internal_channels);
+ }
+
Self {
name,
isolate,
@@ -128,7 +178,7 @@ impl Worker {
}
/// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WorkerChannels {
+ pub fn thread_safe_handle(&self) -> WorkerChannelsExternal {
self.external_channels.clone()
}
}
@@ -157,10 +207,9 @@ impl MainWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
- external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
- let mut worker = Worker::new(name, startup_data, state_, external_channels);
+ let mut worker = Worker::new(name, startup_data, state_);
{
let op_registry = worker.isolate.op_registry.clone();
let isolate = &mut worker.isolate;
@@ -233,14 +282,13 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new())
.unwrap();
- let (int, ext) = ThreadSafeState::create_channels();
let state =
- ThreadSafeState::new(global_state, None, module_specifier.clone(), int)
+ ThreadSafeState::new(global_state, None, module_specifier.clone())
.unwrap();
let state_ = state.clone();
tokio_util::run_basic(async move {
let mut worker =
- MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -269,14 +317,13 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new())
.unwrap();
- let (int, ext) = ThreadSafeState::create_channels();
let state =
- ThreadSafeState::new(global_state, None, module_specifier.clone(), int)
+ ThreadSafeState::new(global_state, None, module_specifier.clone())
.unwrap();
let state_ = state.clone();
tokio_util::run_basic(async move {
let mut worker =
- MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -312,19 +359,16 @@ mod tests {
};
let global_state =
ThreadSafeGlobalState::new(flags, Progress::new()).unwrap();
- let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
None,
module_specifier.clone(),
- int,
)
.unwrap();
let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state.clone(),
- ext,
);
worker.execute("bootstrapMainRuntime()").unwrap();
let result = worker
@@ -346,13 +390,11 @@ mod tests {
}
fn create_test_worker() -> MainWorker {
- let (int, ext) = ThreadSafeState::create_channels();
- let state = ThreadSafeState::mock("./hello.js", int);
+ let state = ThreadSafeState::mock("./hello.js");
let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
- ext,
);
worker.execute("bootstrapMainRuntime()").unwrap();
worker