summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/worker_threads.ts
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-02-14 17:38:45 +0100
committerGitHub <noreply@github.com>2023-02-14 17:38:45 +0100
commitd47147fb6ad229b1c039aff9d0959b6e281f4df5 (patch)
tree6e9e790f2b9bc71b5f0c9c7e64b95cae31579d58 /ext/node/polyfills/worker_threads.ts
parent1d00bbe47e2ca14e2d2151518e02b2324461a065 (diff)
feat(ext/node): embed std/node into the snapshot (#17724)
This commit moves "deno_std/node" in "ext/node" crate. The code is transpiled and snapshotted during the build process. During the first pass a minimal amount of work was done to create the snapshot, a lot of code in "ext/node" depends on presence of "Deno" global. This code will be gradually fixed in the follow up PRs to migrate it to import relevant APIs from "internal:" modules. Currently the code from snapshot is not used in any way, and all Node/npm compatibility still uses code from "https://deno.land/std/node" (or from the location specified by "DENO_NODE_COMPAT_URL"). This will also be handled in a follow up PRs. --------- Co-authored-by: crowlkats <crowlkats@toaxl.com> Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com> Co-authored-by: Yoshiya Hinosawa <stibium121@gmail.com>
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r--ext/node/polyfills/worker_threads.ts248
1 files changed, 248 insertions, 0 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
new file mode 100644
index 000000000..7bca5fc4e
--- /dev/null
+++ b/ext/node/polyfills/worker_threads.ts
@@ -0,0 +1,248 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+
+import { resolve, toFileUrl } from "internal:deno_node/polyfills/path.ts";
+import { notImplemented } from "internal:deno_node/polyfills/_utils.ts";
+import { EventEmitter } from "internal:deno_node/polyfills/events.ts";
+
+const environmentData = new Map();
+let threads = 0;
+
+export interface WorkerOptions {
+ // only for typings
+ argv?: unknown[];
+ env?: Record<string, unknown>;
+ execArgv?: string[];
+ stdin?: boolean;
+ stdout?: boolean;
+ stderr?: boolean;
+ trackUnmanagedFds?: boolean;
+ resourceLimits?: {
+ maxYoungGenerationSizeMb?: number;
+ maxOldGenerationSizeMb?: number;
+ codeRangeSizeMb?: number;
+ stackSizeMb?: number;
+ };
+
+ eval?: boolean;
+ transferList?: Transferable[];
+ workerData?: unknown;
+}
+
+const kHandle = Symbol("kHandle");
+const PRIVATE_WORKER_THREAD_NAME = "$DENO_STD_NODE_WORKER_THREAD";
+class _Worker extends EventEmitter {
+ readonly threadId: number;
+ readonly resourceLimits: Required<
+ NonNullable<WorkerOptions["resourceLimits"]>
+ > = {
+ maxYoungGenerationSizeMb: -1,
+ maxOldGenerationSizeMb: -1,
+ codeRangeSizeMb: -1,
+ stackSizeMb: 4,
+ };
+ private readonly [kHandle]: Worker;
+
+ postMessage: Worker["postMessage"];
+
+ constructor(specifier: URL | string, options?: WorkerOptions) {
+ notImplemented("Worker");
+ super();
+ if (options?.eval === true) {
+ specifier = `data:text/javascript,${specifier}`;
+ } else if (typeof specifier === "string") {
+ // @ts-ignore This API is temporarily disabled
+ specifier = toFileUrl(resolve(specifier));
+ }
+ const handle = this[kHandle] = new Worker(
+ specifier,
+ {
+ name: PRIVATE_WORKER_THREAD_NAME,
+ type: "module",
+ } as globalThis.WorkerOptions, // bypass unstable type error
+ );
+ handle.addEventListener(
+ "error",
+ (event) => this.emit("error", event.error || event.message),
+ );
+ handle.addEventListener(
+ "messageerror",
+ (event) => this.emit("messageerror", event.data),
+ );
+ handle.addEventListener(
+ "message",
+ (event) => this.emit("message", event.data),
+ );
+ handle.postMessage({
+ environmentData,
+ threadId: (this.threadId = ++threads),
+ workerData: options?.workerData,
+ }, options?.transferList || []);
+ this.postMessage = handle.postMessage.bind(handle);
+ this.emit("online");
+ }
+
+ terminate() {
+ this[kHandle].terminate();
+ this.emit("exit", 0);
+ }
+
+ readonly getHeapSnapshot = () =>
+ notImplemented("Worker.prototype.getHeapSnapshot");
+ // fake performance
+ readonly performance = globalThis.performance;
+}
+
+export const isMainThread =
+ // deno-lint-ignore no-explicit-any
+ (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
+
+// fake resourceLimits
+export const resourceLimits = isMainThread ? {} : {
+ maxYoungGenerationSizeMb: 48,
+ maxOldGenerationSizeMb: 2048,
+ codeRangeSizeMb: 0,
+ stackSizeMb: 4,
+};
+
+const threadId = 0;
+const workerData: unknown = null;
+
+// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
+interface NodeEventTarget extends
+ Pick<
+ EventEmitter,
+ "eventNames" | "listenerCount" | "emit" | "removeAllListeners"
+ > {
+ setMaxListeners(n: number): void;
+ getMaxListeners(): number;
+ // deno-lint-ignore no-explicit-any
+ off(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
+ // deno-lint-ignore no-explicit-any
+ on(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
+ // deno-lint-ignore no-explicit-any
+ once(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
+ addListener: NodeEventTarget["on"];
+ removeListener: NodeEventTarget["off"];
+}
+
+type ParentPort = typeof self & NodeEventTarget;
+
+// deno-lint-ignore no-explicit-any
+const parentPort: ParentPort = null as any;
+
+/*
+if (!isMainThread) {
+ // deno-lint-ignore no-explicit-any
+ delete (globalThis as any).name;
+ // deno-lint-ignore no-explicit-any
+ const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
+
+ parentPort = self as ParentPort;
+ parentPort.off = parentPort.removeListener = function (
+ this: ParentPort,
+ name,
+ listener,
+ ) {
+ this.removeEventListener(name, listeners.get(listener)!);
+ listeners.delete(listener);
+ return this;
+ };
+ parentPort.on = parentPort.addListener = function (
+ this: ParentPort,
+ name,
+ listener,
+ ) {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
+ return this;
+ };
+ parentPort.once = function (this: ParentPort, name, listener) {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
+ return this;
+ };
+
+ // mocks
+ parentPort.setMaxListeners = () => {};
+ parentPort.getMaxListeners = () => Infinity;
+ parentPort.eventNames = () => [""];
+ parentPort.listenerCount = () => 0;
+
+ parentPort.emit = () => notImplemented("parentPort.emit");
+ parentPort.removeAllListeners = () =>
+ notImplemented("parentPort.removeAllListeners");
+
+ // Receive startup message
+ [{ threadId, workerData, environmentData }] = await once(
+ parentPort,
+ "message",
+ );
+
+ // alias
+ parentPort.addEventListener("offline", () => {
+ parentPort.emit("close");
+ });
+}
+*/
+
+export function getEnvironmentData(key: unknown) {
+ notImplemented("getEnvironmentData");
+ return environmentData.get(key);
+}
+
+export function setEnvironmentData(key: unknown, value?: unknown) {
+ notImplemented("setEnvironmentData");
+ if (value === undefined) {
+ environmentData.delete(key);
+ } else {
+ environmentData.set(key, value);
+ }
+}
+
+// deno-lint-ignore no-explicit-any
+const _MessagePort: typeof MessagePort = (globalThis as any).MessagePort;
+const _MessageChannel: typeof MessageChannel =
+ // deno-lint-ignore no-explicit-any
+ (globalThis as any).MessageChannel;
+export const BroadcastChannel = globalThis.BroadcastChannel;
+export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV");
+export function markAsUntransferable() {
+ notImplemented("markAsUntransferable");
+}
+export function moveMessagePortToContext() {
+ notImplemented("moveMessagePortToContext");
+}
+export function receiveMessageOnPort() {
+ notImplemented("receiveMessageOnPort");
+}
+export {
+ _MessageChannel as MessageChannel,
+ _MessagePort as MessagePort,
+ _Worker as Worker,
+ parentPort,
+ threadId,
+ workerData,
+};
+
+export default {
+ markAsUntransferable,
+ moveMessagePortToContext,
+ receiveMessageOnPort,
+ MessagePort: _MessagePort,
+ MessageChannel: _MessageChannel,
+ BroadcastChannel,
+ Worker: _Worker,
+ getEnvironmentData,
+ setEnvironmentData,
+ SHARE_ENV,
+ threadId,
+ workerData,
+ resourceLimits,
+ parentPort,
+ isMainThread,
+};