summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/integration/run_tests.rs5
-rw-r--r--cli/tests/testdata/worker_drop_handle_race_terminate.js40
-rw-r--r--cli/tests/testdata/worker_drop_handle_race_terminate.js.out4
-rw-r--r--runtime/ops/worker_host.rs92
4 files changed, 100 insertions, 41 deletions
diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs
index e6004ba49..4c4b42142 100644
--- a/cli/tests/integration/run_tests.rs
+++ b/cli/tests/integration/run_tests.rs
@@ -1610,6 +1610,11 @@ itest!(worker_drop_handle_race {
exit_code: 1,
});
+itest!(worker_drop_handle_race_terminate {
+ args: "run --unstable worker_drop_handle_race_terminate.js",
+ output: "worker_drop_handle_race_terminate.js.out",
+});
+
itest!(worker_close_nested {
args: "run --quiet --reload --allow-read worker_close_nested.js",
output: "worker_close_nested.js.out",
diff --git a/cli/tests/testdata/worker_drop_handle_race_terminate.js b/cli/tests/testdata/worker_drop_handle_race_terminate.js
new file mode 100644
index 000000000..dfdd9c561
--- /dev/null
+++ b/cli/tests/testdata/worker_drop_handle_race_terminate.js
@@ -0,0 +1,40 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+// Test that the panic in https://github.com/denoland/deno/issues/11342 does not
+// happen when calling worker.terminate() after fixing
+// https://github.com/denoland/deno/issues/13705
+
+function getCodeBlobUrl(code) {
+ const blob = new Blob([code], { type: "text/javascript" });
+ return URL.createObjectURL(blob);
+}
+
+const WORKER2 = getCodeBlobUrl(`
+ console.log("Worker 2");
+ self.postMessage(undefined);
+
+ // We sleep for slightly under 2 seconds in order to make sure that worker 1
+ // has closed, and that this worker's thread finishes normally rather than
+ // being killed (which happens 2 seconds after calling terminate).
+ Deno.sleepSync(1800);
+ console.log("Finished sleeping in worker 2");
+`);
+
+const WORKER1 = getCodeBlobUrl(`
+ console.log("Worker 1");
+ const worker = new Worker(
+ ${JSON.stringify(WORKER2)},
+ { type: "module", deno: { namespace: true } }
+ );
+
+ worker.addEventListener("message", () => {
+ console.log("Terminating");
+ worker.terminate();
+ self.close();
+ });
+`);
+
+new Worker(WORKER1, { type: "module", deno: { namespace: true } });
+
+// Don't kill the process before worker 2 is finished.
+setTimeout(() => {}, 3000);
diff --git a/cli/tests/testdata/worker_drop_handle_race_terminate.js.out b/cli/tests/testdata/worker_drop_handle_race_terminate.js.out
new file mode 100644
index 000000000..5ec1e7ff8
--- /dev/null
+++ b/cli/tests/testdata/worker_drop_handle_race_terminate.js.out
@@ -0,0 +1,4 @@
+Worker 1
+Worker 2
+Terminating
+Finished sleeping in worker 2
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index 23ffcd8b1..bd1f1e3f5 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -17,6 +17,8 @@ use deno_core::futures::future::LocalFutureObj;
use deno_core::op;
use deno_core::serde::Deserialize;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
@@ -27,7 +29,6 @@ use std::collections::HashMap;
use std::rc::Rc;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
-use std::thread::JoinHandle;
pub struct CreateWebWorkerArgs {
pub name: String,
@@ -66,9 +67,8 @@ pub struct FormatJsErrorFnHolder(Option<Arc<FormatJsErrorFn>>);
pub struct PreloadModuleCbHolder(Arc<PreloadModuleCb>);
pub struct WorkerThread {
- // It's an Option so we can take the value before dropping the WorkerThread.
- join_handle: Option<JoinHandle<Result<(), AnyError>>>,
worker_handle: WebWorkerHandle,
+ cancel_handle: Rc<CancelHandle>,
// A WorkerThread that hasn't been explicitly terminated can only be removed
// from the WorkersTable once close messages have been received for both the
@@ -78,30 +78,16 @@ pub struct WorkerThread {
}
impl WorkerThread {
- fn terminate(mut self) {
- self.worker_handle.clone().terminate();
- self
- .join_handle
- .take()
- .unwrap()
- .join()
- .expect("Worker thread panicked")
- .expect("Panic in worker event loop");
-
- // Optimization so the Drop impl doesn't try to terminate the worker handle
- // again.
- self.ctrl_closed = true;
- self.message_closed = true;
+ fn terminate(self) {
+ // Cancel recv ops when terminating the worker, so they don't show up as
+ // pending ops.
+ self.cancel_handle.cancel();
}
}
impl Drop for WorkerThread {
fn drop(&mut self) {
- // If either of the channels is closed, the worker thread has at least
- // started closing, and its event loop won't start another run.
- if !(self.ctrl_closed || self.message_closed) {
- self.worker_handle.clone().terminate();
- }
+ self.worker_handle.clone().terminate();
}
}
@@ -217,7 +203,7 @@ fn op_create_worker(
std::thread::Builder::new().name(format!("{}", worker_id));
// Spawn it
- let join_handle = thread_builder.spawn(move || {
+ thread_builder.spawn(move || {
// Any error inside this block is terminal:
// - JS worker is useless - meaning it throws an exception and can't do anything else,
// all action done upon it should be noops
@@ -256,8 +242,8 @@ fn op_create_worker(
let worker_handle = handle_receiver.recv().unwrap()?;
let worker_thread = WorkerThread {
- join_handle: Some(join_handle),
worker_handle: worker_handle.into(),
+ cancel_handle: CancelHandle::new_rc(),
ctrl_closed: false,
message_closed: false,
};
@@ -330,30 +316,41 @@ async fn op_host_recv_ctrl(
state: Rc<RefCell<OpState>>,
id: WorkerId,
) -> Result<WorkerControlEvent, AnyError> {
- let worker_handle = {
+ let (worker_handle, cancel_handle) = {
let state = state.borrow();
let workers_table = state.borrow::<WorkersTable>();
let maybe_handle = workers_table.get(&id);
if let Some(handle) = maybe_handle {
- handle.worker_handle.clone()
+ (handle.worker_handle.clone(), handle.cancel_handle.clone())
} else {
// If handle was not found it means worker has already shutdown
return Ok(WorkerControlEvent::Close);
}
};
- let maybe_event = worker_handle.get_control_event().await?;
- if let Some(event) = maybe_event {
- // Terminal error means that worker should be removed from worker table.
- if let WorkerControlEvent::TerminalError(_) = &event {
+ let maybe_event = worker_handle
+ .get_control_event()
+ .or_cancel(cancel_handle)
+ .await;
+ match maybe_event {
+ Ok(Ok(Some(event))) => {
+ // Terminal error means that worker should be removed from worker table.
+ if let WorkerControlEvent::TerminalError(_) = &event {
+ close_channel(state, id, WorkerChannel::Ctrl);
+ }
+ Ok(event)
+ }
+ Ok(Ok(None)) => {
+ // If there was no event from worker it means it has already been closed.
close_channel(state, id, WorkerChannel::Ctrl);
+ Ok(WorkerControlEvent::Close)
+ }
+ Ok(Err(err)) => Err(err),
+ Err(_) => {
+ // The worker was terminated.
+ Ok(WorkerControlEvent::Close)
}
- return Ok(event);
}
-
- // If there was no event from worker it means it has already been closed.
- close_channel(state, id, WorkerChannel::Ctrl);
- Ok(WorkerControlEvent::Close)
}
#[op]
@@ -361,23 +358,36 @@ async fn op_host_recv_message(
state: Rc<RefCell<OpState>>,
id: WorkerId,
) -> Result<Option<JsMessageData>, AnyError> {
- let worker_handle = {
+ let (worker_handle, cancel_handle) = {
let s = state.borrow();
let workers_table = s.borrow::<WorkersTable>();
let maybe_handle = workers_table.get(&id);
if let Some(handle) = maybe_handle {
- handle.worker_handle.clone()
+ (handle.worker_handle.clone(), handle.cancel_handle.clone())
} else {
// If handle was not found it means worker has already shutdown
return Ok(None);
}
};
- let ret = worker_handle.port.recv(state.clone()).await?;
- if ret.is_none() {
- close_channel(state, id, WorkerChannel::Messages);
+ let ret = worker_handle
+ .port
+ .recv(state.clone())
+ .or_cancel(cancel_handle)
+ .await;
+ match ret {
+ Ok(Ok(ret)) => {
+ if ret.is_none() {
+ close_channel(state, id, WorkerChannel::Messages);
+ }
+ Ok(ret)
+ }
+ Ok(Err(err)) => Err(err),
+ Err(_) => {
+ // The worker was terminated.
+ Ok(None)
+ }
}
- Ok(ret)
}
/// Post message to guest worker as host