summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs76
1 files changed, 59 insertions, 17 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index b81a77a9e..1725dbe14 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -52,6 +52,51 @@ impl WorkerChannels {
}
}
+pub struct WorkerChannelsInternal(WorkerChannels);
+
+impl Deref for WorkerChannelsInternal {
+ type Target = WorkerChannels;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for WorkerChannelsInternal {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+#[derive(Clone)]
+pub struct WorkerChannelsExternal(WorkerChannels);
+
+impl Deref for WorkerChannelsExternal {
+ type Target = WorkerChannels;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl DerefMut for WorkerChannelsExternal {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
+ let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
+ let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
+ let internal_channels = WorkerChannelsInternal(WorkerChannels {
+ sender: out_tx,
+ receiver: Arc::new(AsyncMutex::new(in_rx)),
+ });
+ let external_channels = WorkerChannelsExternal(WorkerChannels {
+ sender: in_tx,
+ receiver: Arc::new(AsyncMutex::new(out_rx)),
+ });
+ (internal_channels, external_channels)
+}
+
/// Worker is a CLI wrapper for `deno_core::Isolate`.
///
/// It provides infrastructure to communicate with a worker and
@@ -68,7 +113,7 @@ pub struct Worker {
pub name: String,
pub isolate: Box<deno_core::EsIsolate>,
pub state: ThreadSafeState,
- external_channels: WorkerChannels,
+ external_channels: WorkerChannelsExternal,
}
impl Worker {
@@ -76,7 +121,6 @@ impl Worker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
- external_channels: WorkerChannels,
) -> Self {
let mut isolate =
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
@@ -86,6 +130,12 @@ impl Worker {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
});
+ let (internal_channels, external_channels) = create_channels();
+ {
+ let mut c = state.worker_channels_internal.lock().unwrap();
+ *c = Some(internal_channels);
+ }
+
Self {
name,
isolate,
@@ -128,7 +178,7 @@ impl Worker {
}
/// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WorkerChannels {
+ pub fn thread_safe_handle(&self) -> WorkerChannelsExternal {
self.external_channels.clone()
}
}
@@ -157,10 +207,9 @@ impl MainWorker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
- external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
- let mut worker = Worker::new(name, startup_data, state_, external_channels);
+ let mut worker = Worker::new(name, startup_data, state_);
{
let op_registry = worker.isolate.op_registry.clone();
let isolate = &mut worker.isolate;
@@ -233,14 +282,13 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new())
.unwrap();
- let (int, ext) = ThreadSafeState::create_channels();
let state =
- ThreadSafeState::new(global_state, None, module_specifier.clone(), int)
+ ThreadSafeState::new(global_state, None, module_specifier.clone())
.unwrap();
let state_ = state.clone();
tokio_util::run_basic(async move {
let mut worker =
- MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -269,14 +317,13 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new())
.unwrap();
- let (int, ext) = ThreadSafeState::create_channels();
let state =
- ThreadSafeState::new(global_state, None, module_specifier.clone(), int)
+ ThreadSafeState::new(global_state, None, module_specifier.clone())
.unwrap();
let state_ = state.clone();
tokio_util::run_basic(async move {
let mut worker =
- MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
+ MainWorker::new("TEST".to_string(), StartupData::None, state);
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
@@ -312,19 +359,16 @@ mod tests {
};
let global_state =
ThreadSafeGlobalState::new(flags, Progress::new()).unwrap();
- let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
None,
module_specifier.clone(),
- int,
)
.unwrap();
let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state.clone(),
- ext,
);
worker.execute("bootstrapMainRuntime()").unwrap();
let result = worker
@@ -346,13 +390,11 @@ mod tests {
}
fn create_test_worker() -> MainWorker {
- let (int, ext) = ThreadSafeState::create_channels();
- let state = ThreadSafeState::mock("./hello.js", int);
+ let state = ThreadSafeState::mock("./hello.js");
let mut worker = MainWorker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
- ext,
);
worker.execute("bootstrapMainRuntime()").unwrap();
worker