summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/node/polyfills/02_init.js7
-rw-r--r--ext/node/polyfills/worker_threads.ts7
-rw-r--r--runtime/js/99_main.js2
-rw-r--r--runtime/ops/worker_host.rs4
-rw-r--r--runtime/web_worker.rs35
-rw-r--r--tests/integration/worker_tests.rs6
-rw-r--r--tests/testdata/workers/worker_ids_are_sequential.ts34
-rw-r--r--tests/testdata/workers/worker_ids_are_sequential.ts.out4
8 files changed, 78 insertions, 21 deletions
diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js
index 04820b837..85f924493 100644
--- a/ext/node/polyfills/02_init.js
+++ b/ext/node/polyfills/02_init.js
@@ -14,6 +14,7 @@ function initialize(
usesLocalNodeModulesDir,
argv0,
runningOnMainThread,
+ workerId,
maybeWorkerMetadata,
) {
if (initialized) {
@@ -39,7 +40,11 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
- internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata);
+ internals.__initWorkerThreads(
+ runningOnMainThread,
+ workerId,
+ maybeWorkerMetadata,
+ );
internals.__setupChildProcessIpcChannel();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 4563f157f..49f2f3e3e 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -134,7 +134,6 @@ function toFileUrl(path: string): URL {
: toFileUrlPosix(path);
}
-let threads = 0;
const privateWorkerRef = Symbol("privateWorkerRef");
class NodeWorker extends EventEmitter {
#id = 0;
@@ -195,12 +194,10 @@ class NodeWorker extends EventEmitter {
name = "[worker eval]";
}
this.#name = name;
- this.threadId = ++threads;
const serializedWorkerMetadata = serializeJsMessageData({
workerData: options?.workerData,
environmentData: environmentData,
- threadId: this.threadId,
}, options?.transferList ?? []);
const id = op_create_worker(
{
@@ -216,6 +213,7 @@ class NodeWorker extends EventEmitter {
serializedWorkerMetadata,
);
this.#id = id;
+ this.threadId = id;
this.#pollControl();
this.#pollMessages();
// https://nodejs.org/api/worker_threads.html#event-online
@@ -391,6 +389,7 @@ let parentPort: ParentPort = null as any;
internals.__initWorkerThreads = (
runningOnMainThread: boolean,
+ workerId,
maybeWorkerMetadata,
) => {
isMainThread = runningOnMainThread;
@@ -414,11 +413,11 @@ internals.__initWorkerThreads = (
>();
parentPort = self as ParentPort;
+ threadId = workerId;
if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData;
environmentData = metadata.environmentData;
- threadId = metadata.threadId;
}
defaultExport.workerData = workerData;
defaultExport.parentPort = parentPort;
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 585128ba8..62e7278ff 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -794,6 +794,7 @@ function bootstrapWorkerRuntime(
runtimeOptions,
name,
internalName,
+ workerId,
maybeWorkerMetadata,
) {
if (hasBootstrapped) {
@@ -929,6 +930,7 @@ function bootstrapWorkerRuntime(
hasNodeModulesDir,
argv0,
/* runningOnMainThread */ false,
+ workerId,
workerMetadata,
);
}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index 3cfad5abb..242d3bcda 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -95,7 +95,6 @@ deno_core::extension!(
},
state = |state, options| {
state.put::<WorkersTable>(WorkersTable::default());
- state.put::<WorkerId>(WorkerId::default());
let create_web_worker_cb_holder =
CreateWebWorkerCbHolder(options.create_web_worker_cb);
@@ -163,10 +162,9 @@ fn op_create_worker(
parent_permissions.clone()
};
let parent_permissions = parent_permissions.clone();
- let worker_id = state.take::<WorkerId>();
let create_web_worker_cb = state.borrow::<CreateWebWorkerCbHolder>().clone();
let format_js_error_fn = state.borrow::<FormatJsErrorFnHolder>().clone();
- state.put::<WorkerId>(worker_id.next().unwrap());
+ let worker_id = WorkerId::new();
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_default();
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 82da9de9e..31930be39 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -55,33 +55,40 @@ use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
+use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
-#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
-#[serde(rename_all = "lowercase")]
-pub enum WebWorkerType {
- Classic,
- Module,
-}
+static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
-#[derive(
- Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
-)]
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct WorkerId(u32);
+impl WorkerId {
+ pub fn new() -> WorkerId {
+ let id = WORKER_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
+ WorkerId(id)
+ }
+}
impl fmt::Display for WorkerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "worker-{}", self.0)
}
}
-impl WorkerId {
- pub fn next(&self) -> Option<WorkerId> {
- self.0.checked_add(1).map(WorkerId)
+impl Default for WorkerId {
+ fn default() -> Self {
+ Self::new()
}
}
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum WebWorkerType {
+ Classic,
+ Module,
+}
+
/// Events that are sent to host from child
/// worker.
pub enum WorkerControlEvent {
@@ -630,11 +637,13 @@ impl WebWorker {
v8::String::new(scope, &format!("{}", self.id))
.unwrap()
.into();
+ let id: v8::Local<v8::Value> =
+ v8::Integer::new(scope, self.id.0 as i32).into();
bootstrap_fn
.call(
scope,
undefined.into(),
- &[args, name_str, id_str, worker_data],
+ &[args, name_str, id_str, id, worker_data],
)
.unwrap();
}
diff --git a/tests/integration/worker_tests.rs b/tests/integration/worker_tests.rs
index 492a06e36..8fdef8b2b 100644
--- a/tests/integration/worker_tests.rs
+++ b/tests/integration/worker_tests.rs
@@ -112,6 +112,12 @@ itest!(worker_doest_stall_event_loop {
exit_code: 0,
});
+itest!(worker_ids_are_sequential {
+ args: "run --quiet -A workers/worker_ids_are_sequential.ts",
+ output: "workers/worker_ids_are_sequential.ts.out",
+ exit_code: 0,
+});
+
// Test for https://github.com/denoland/deno/issues/22629
itest!(node_worker_auto_exits {
args: "run --quiet --allow-read workers/node_worker_auto_exits.mjs",
diff --git a/tests/testdata/workers/worker_ids_are_sequential.ts b/tests/testdata/workers/worker_ids_are_sequential.ts
new file mode 100644
index 000000000..eb90f0d47
--- /dev/null
+++ b/tests/testdata/workers/worker_ids_are_sequential.ts
@@ -0,0 +1,34 @@
+import {
+ isMainThread,
+ parentPort,
+ threadId,
+ Worker,
+} from "node:worker_threads";
+
+console.log("threadId", threadId);
+
+if (isMainThread) {
+ const worker = new Worker(new URL(import.meta.url));
+ worker.on("message", (msg) => console.log("from worker:", msg));
+ worker.on("error", () => {
+ throw new Error("error");
+ });
+ worker.on("exit", (code) => {
+ if (code !== 0) {
+ reject(new Error(`Worker stopped with exit code ${code}`));
+ }
+ });
+} else if (threadId == 1) {
+ const worker = new Worker(new URL(import.meta.url));
+ worker.on("message", (msg) => console.log("from worker:", msg));
+ worker.on("error", () => {
+ throw new Error("error");
+ });
+ worker.on("exit", (code) => {
+ if (code !== 0) {
+ reject(new Error(`Worker stopped with exit code ${code}`));
+ }
+ });
+} else {
+ parentPort.postMessage("hello!");
+}
diff --git a/tests/testdata/workers/worker_ids_are_sequential.ts.out b/tests/testdata/workers/worker_ids_are_sequential.ts.out
new file mode 100644
index 000000000..1f0552864
--- /dev/null
+++ b/tests/testdata/workers/worker_ids_are_sequential.ts.out
@@ -0,0 +1,4 @@
+threadId 0
+threadId 1
+threadId 2
+from worker: hello!