diff options
Diffstat (limited to 'cli/js')
-rw-r--r-- | cli/js/compiler.ts | 7 | ||||
-rw-r--r-- | cli/js/dispatch.ts | 4 | ||||
-rw-r--r-- | cli/js/globals.ts | 1 | ||||
-rw-r--r-- | cli/js/lib.deno.worker.d.ts | 1 | ||||
-rw-r--r-- | cli/js/main.ts | 11 | ||||
-rw-r--r-- | cli/js/runtime_worker.ts | 82 | ||||
-rw-r--r-- | cli/js/unit_tests.ts | 1 | ||||
-rw-r--r-- | cli/js/workers.ts | 80 | ||||
-rw-r--r-- | cli/js/workers_test.ts | 84 |
9 files changed, 170 insertions, 101 deletions
diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts index bf0287efe..4ca2887c6 100644 --- a/cli/js/compiler.ts +++ b/cli/js/compiler.ts @@ -40,10 +40,7 @@ import { Diagnostic } from "./diagnostics.ts"; import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts"; import { assert } from "./util.ts"; import * as util from "./util.ts"; -import { - bootstrapWorkerRuntime, - runWorkerMessageLoop -} from "./runtime_worker.ts"; +import { bootstrapWorkerRuntime } from "./runtime_worker.ts"; interface CompilerRequestCompile { type: CompilerRequestType.Compile; @@ -340,13 +337,11 @@ async function wasmCompilerOnMessage({ function bootstrapTsCompilerRuntime(): void { bootstrapWorkerRuntime("TS"); globalThis.onmessage = tsCompilerOnMessage; - runWorkerMessageLoop(); } function bootstrapWasmCompilerRuntime(): void { bootstrapWorkerRuntime("WASM"); globalThis.onmessage = wasmCompilerOnMessage; - runWorkerMessageLoop(); } Object.defineProperties(globalThis, { diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts index 1a6b6528d..4493d3771 100644 --- a/cli/js/dispatch.ts +++ b/cli/js/dispatch.ts @@ -43,10 +43,10 @@ export let OP_REVOKE_PERMISSION: number; export let OP_REQUEST_PERMISSION: number; export let OP_CREATE_WORKER: number; export let OP_HOST_POST_MESSAGE: number; -export let OP_HOST_CLOSE_WORKER: number; +export let OP_HOST_TERMINATE_WORKER: number; export let OP_HOST_GET_MESSAGE: number; export let OP_WORKER_POST_MESSAGE: number; -export let OP_WORKER_GET_MESSAGE: number; +export let OP_WORKER_CLOSE: number; export let OP_RUN: number; export let OP_RUN_STATUS: number; export let OP_KILL: number; diff --git a/cli/js/globals.ts b/cli/js/globals.ts index 7cce739d5..53eb696ac 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -118,7 +118,6 @@ declare global { var bootstrapWorkerRuntime: | ((name: string) => Promise<void> | void) | undefined; - var runWorkerMessageLoop: (() => Promise<void> | void) | undefined; var onerror: | (( msg: string, diff --git a/cli/js/lib.deno.worker.d.ts b/cli/js/lib.deno.worker.d.ts index 07955345c..3311d9457 100644 --- a/cli/js/lib.deno.worker.d.ts +++ b/cli/js/lib.deno.worker.d.ts @@ -37,7 +37,6 @@ declare const postMessage: typeof __workerMain.postMessage; declare namespace __workerMain { export let onmessage: (e: { data: any }) => void; export function postMessage(data: any): void; - export function getMessage(): Promise<any>; export function close(): void; export const name: string; } diff --git a/cli/js/main.ts b/cli/js/main.ts index b48277960..fbebfefe4 100644 --- a/cli/js/main.ts +++ b/cli/js/main.ts @@ -1,9 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. import { bootstrapMainRuntime } from "./runtime_main.ts"; -import { - bootstrapWorkerRuntime, - runWorkerMessageLoop -} from "./runtime_worker.ts"; +import { bootstrapWorkerRuntime } from "./runtime_worker.ts"; Object.defineProperties(globalThis, { bootstrapMainRuntime: { @@ -17,11 +14,5 @@ Object.defineProperties(globalThis, { enumerable: false, writable: false, configurable: false - }, - runWorkerMessageLoop: { - value: runWorkerMessageLoop, - enumerable: false, - writable: false, - configurable: false } }); diff --git a/cli/js/runtime_worker.ts b/cli/js/runtime_worker.ts index 0dc65fdb6..a9ed8b924 100644 --- a/cli/js/runtime_worker.ts +++ b/cli/js/runtime_worker.ts @@ -3,12 +3,9 @@ // This module is the entry point for "worker" isolate, ie. the one // that is created using `new Worker()` JS API. // -// It provides two functions that should be called by Rust: +// It provides a single function that should be called by Rust: // - `bootstrapWorkerRuntime` - must be called once, when Isolate is created. // It sets up runtime by providing globals for `DedicatedWorkerScope`. -// - `runWorkerMessageLoop` - starts receiving messages from parent worker, -// can be called multiple times - eg. to restart worker execution after -// exception occurred and was handled by parent worker /* eslint-disable @typescript-eslint/no-explicit-any */ import { @@ -20,13 +17,12 @@ import { eventTargetProperties } from "./globals.ts"; import * as dispatch from "./dispatch.ts"; -import { sendAsync, sendSync } from "./dispatch_json.ts"; +import { sendSync } from "./dispatch_json.ts"; import { log } from "./util.ts"; -import { TextDecoder, TextEncoder } from "./text_encoding.ts"; +import { TextEncoder } from "./text_encoding.ts"; import * as runtime from "./runtime.ts"; const encoder = new TextEncoder(); -const decoder = new TextDecoder(); // TODO(bartlomieju): remove these funtions // Stuff for workers @@ -39,62 +35,46 @@ export function postMessage(data: any): void { sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); } -export async function getMessage(): Promise<any> { - log("getMessage"); - const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); - if (res.data != null) { - const dataIntArray = new Uint8Array(res.data); - const dataJson = decoder.decode(dataIntArray); - return JSON.parse(dataJson); - } else { - return null; - } -} - let isClosing = false; let hasBootstrapped = false; export function close(): void { + if (isClosing) { + return; + } + isClosing = true; + sendSync(dispatch.OP_WORKER_CLOSE); } -export async function runWorkerMessageLoop(): Promise<void> { - while (!isClosing) { - const data = await getMessage(); - if (data == null) { - log("runWorkerMessageLoop got null message. quitting."); - break; - } +export async function workerMessageRecvCallback(data: string): Promise<void> { + let result: void | Promise<void>; + const event = { data }; - let result: void | Promise<void>; - const event = { data }; - - try { - if (!globalThis["onmessage"]) { - break; - } + try { + // + if (globalThis["onmessage"]) { result = globalThis.onmessage!(event); if (result && "then" in result) { await result; } - if (!globalThis["onmessage"]) { - break; - } - } catch (e) { - if (globalThis["onerror"]) { - const result = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e - ); - if (result === true) { - continue; - } + } + + // TODO: run the rest of liteners + } catch (e) { + if (globalThis["onerror"]) { + const result = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e + ); + if (result === true) { + return; } - throw e; } + throw e; } } @@ -102,8 +82,10 @@ export const workerRuntimeGlobalProperties = { self: readOnly(globalThis), onmessage: writable(onmessage), onerror: writable(onerror), + // TODO: should be readonly? close: nonEnumerable(close), - postMessage: writable(postMessage) + postMessage: writable(postMessage), + workerMessageRecvCallback: nonEnumerable(workerMessageRecvCallback) }; /** diff --git a/cli/js/unit_tests.ts b/cli/js/unit_tests.ts index 992169e55..a6435d183 100644 --- a/cli/js/unit_tests.ts +++ b/cli/js/unit_tests.ts @@ -59,6 +59,7 @@ import "./write_file_test.ts"; import "./performance_test.ts"; import "./permissions_test.ts"; import "./version_test.ts"; +import "./workers_test.ts"; import { runIfMain } from "../../std/testing/mod.ts"; diff --git a/cli/js/workers.ts b/cli/js/workers.ts index fb63a3260..7b0c50336 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -38,19 +38,23 @@ function createWorker( }); } +function hostTerminateWorker(id: number): void { + sendSync(dispatch.OP_HOST_TERMINATE_WORKER, { id }); +} + function hostPostMessage(id: number, data: any): void { const dataIntArray = encodeMessage(data); sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray); } -async function hostGetMessage(id: number): Promise<any> { - const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); +interface WorkerEvent { + event: "error" | "msg" | "close"; + data?: any; + error?: any; +} - if (res.data != null) { - return decodeMessage(new Uint8Array(res.data)); - } else { - return null; - } +async function hostGetMessage(id: number): Promise<any> { + return await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); } export interface Worker { @@ -72,6 +76,8 @@ export class WorkerImpl extends EventTarget implements Worker { public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; + private name: string; + private terminated = false; constructor(specifier: string, options?: WorkerOptions) { super(); @@ -88,6 +94,7 @@ export class WorkerImpl extends EventTarget implements Worker { ); } + this.name = options?.name ?? "unknown"; const hasSourceCode = false; const sourceCode = new Uint8Array(); @@ -139,42 +146,53 @@ export class WorkerImpl extends EventTarget implements Worker { } async poll(): Promise<void> { - while (!this.isClosing) { - const data = await hostGetMessage(this.id); - if (data == null) { - log("worker got null message. quitting."); - break; - } - if (this.onmessage) { - const event = { data }; - this.onmessage(event); + while (!this.terminated) { + const event = await hostGetMessage(this.id); + + // If terminate was called then we ignore all messages + if (this.terminated) { + return; } - } - /* - while (true) { - const result = await hostPollWorker(this.id); + const type = event.type; - if (result.error) { - if (!this.handleError(result.error)) { - throw Error(result.error.message); - } else { - hostResumeWorker(this.id); + if (type === "msg") { + if (this.onmessage) { + const message = decodeMessage(new Uint8Array(event.data)); + this.onmessage({ data: message }); } - } else { - this.isClosing = true; - hostCloseWorker(this.id); - break; + continue; } + + if (type === "error") { + if (!this.handleError(event.error)) { + throw Error(event.error.message); + } + continue; + } + + if (type === "close") { + log(`Host got "close" message from worker: ${this.name}`); + this.terminated = true; + return; + } + + throw new Error(`Unknown worker event: "${type}"`); } - */ } postMessage(data: any): void { + if (this.terminated) { + return; + } + hostPostMessage(this.id, data); } terminate(): void { - throw new Error("Not yet implemented"); + if (!this.terminated) { + this.terminated = true; + hostTerminateWorker(this.id); + } } } diff --git a/cli/js/workers_test.ts b/cli/js/workers_test.ts new file mode 100644 index 000000000..9cb4f4a07 --- /dev/null +++ b/cli/js/workers_test.ts @@ -0,0 +1,84 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +import { test, assert, assertEquals } from "./test_util.ts"; + +export interface ResolvableMethods<T> { + resolve: (value?: T | PromiseLike<T>) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reject: (reason?: any) => void; +} + +export type Resolvable<T> = Promise<T> & ResolvableMethods<T>; + +export function createResolvable<T>(): Resolvable<T> { + let methods: ResolvableMethods<T>; + const promise = new Promise<T>((resolve, reject): void => { + methods = { resolve, reject }; + }); + // TypeScript doesn't know that the Promise callback occurs synchronously + // therefore use of not null assertion (`!`) + return Object.assign(promise, methods!) as Resolvable<T>; +} + +test(async function workersBasic(): Promise<void> { + const promise = createResolvable(); + const jsWorker = new Worker("../tests/subdir/test_worker.js", { + type: "module", + name: "jsWorker" + }); + const tsWorker = new Worker("../tests/subdir/test_worker.ts", { + type: "module", + name: "tsWorker" + }); + + tsWorker.onmessage = (e): void => { + assertEquals(e.data, "Hello World"); + promise.resolve(); + }; + + jsWorker.onmessage = (e): void => { + assertEquals(e.data, "Hello World"); + tsWorker.postMessage("Hello World"); + }; + + jsWorker.onerror = (e: Event): void => { + e.preventDefault(); + jsWorker.postMessage("Hello World"); + }; + + jsWorker.postMessage("Hello World"); + await promise; +}); + +test(async function nestedWorker(): Promise<void> { + const promise = createResolvable(); + + const nestedWorker = new Worker("../tests/subdir/nested_worker.js", { + type: "module", + name: "nested" + }); + + nestedWorker.onmessage = (e): void => { + assert(e.data.type !== "error"); + promise.resolve(); + }; + + nestedWorker.postMessage("Hello World"); + await promise; +}); + +test(async function workerThrowsWhenExecuting(): Promise<void> { + const promise = createResolvable(); + + const throwingWorker = new Worker("../tests/subdir/throwing_worker.js", { + type: "module" + }); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + throwingWorker.onerror = (e: any): void => { + e.preventDefault(); + assertEquals(e.message, "Uncaught Error: Thrown error"); + promise.resolve(); + }; + + await promise; +}); |