summaryrefslogtreecommitdiff
path: root/cli/js
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-02-11 10:04:59 +0100
committerGitHub <noreply@github.com>2020-02-11 10:04:59 +0100
commit79b3bc05d6de520f1df73face1744ae3d8be0bb8 (patch)
treef4c449efa67b88c767df52dd3ecec2246dded2e5 /cli/js
parent81905a867ea3f942619229e330840d132c57a5da (diff)
workers: basic event loop (#3828)
* establish basic event loop for workers * make "self.close()" inside worker * remove "runWorkerMessageLoop() - instead manually call global function in Rust when message arrives. This is done in preparation for structured clone * refactor "WorkerChannel" and use distinct structs for internal and external channels; "WorkerChannelsInternal" and "WorkerHandle" * move "State.worker_channels_internal" to "Worker.internal_channels" * add "WorkerEvent" enum for child->host communication; currently "Message(Buf)" and "Error(ErrBox)" variants are supported * add tests for nested workers * add tests for worker throwing error on startup
Diffstat (limited to 'cli/js')
-rw-r--r--cli/js/compiler.ts7
-rw-r--r--cli/js/dispatch.ts4
-rw-r--r--cli/js/globals.ts1
-rw-r--r--cli/js/lib.deno.worker.d.ts1
-rw-r--r--cli/js/main.ts11
-rw-r--r--cli/js/runtime_worker.ts82
-rw-r--r--cli/js/unit_tests.ts1
-rw-r--r--cli/js/workers.ts80
-rw-r--r--cli/js/workers_test.ts84
9 files changed, 170 insertions, 101 deletions
diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts
index bf0287efe..4ca2887c6 100644
--- a/cli/js/compiler.ts
+++ b/cli/js/compiler.ts
@@ -40,10 +40,7 @@ import { Diagnostic } from "./diagnostics.ts";
import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts";
import { assert } from "./util.ts";
import * as util from "./util.ts";
-import {
- bootstrapWorkerRuntime,
- runWorkerMessageLoop
-} from "./runtime_worker.ts";
+import { bootstrapWorkerRuntime } from "./runtime_worker.ts";
interface CompilerRequestCompile {
type: CompilerRequestType.Compile;
@@ -340,13 +337,11 @@ async function wasmCompilerOnMessage({
function bootstrapTsCompilerRuntime(): void {
bootstrapWorkerRuntime("TS");
globalThis.onmessage = tsCompilerOnMessage;
- runWorkerMessageLoop();
}
function bootstrapWasmCompilerRuntime(): void {
bootstrapWorkerRuntime("WASM");
globalThis.onmessage = wasmCompilerOnMessage;
- runWorkerMessageLoop();
}
Object.defineProperties(globalThis, {
diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts
index 1a6b6528d..4493d3771 100644
--- a/cli/js/dispatch.ts
+++ b/cli/js/dispatch.ts
@@ -43,10 +43,10 @@ export let OP_REVOKE_PERMISSION: number;
export let OP_REQUEST_PERMISSION: number;
export let OP_CREATE_WORKER: number;
export let OP_HOST_POST_MESSAGE: number;
-export let OP_HOST_CLOSE_WORKER: number;
+export let OP_HOST_TERMINATE_WORKER: number;
export let OP_HOST_GET_MESSAGE: number;
export let OP_WORKER_POST_MESSAGE: number;
-export let OP_WORKER_GET_MESSAGE: number;
+export let OP_WORKER_CLOSE: number;
export let OP_RUN: number;
export let OP_RUN_STATUS: number;
export let OP_KILL: number;
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index 7cce739d5..53eb696ac 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -118,7 +118,6 @@ declare global {
var bootstrapWorkerRuntime:
| ((name: string) => Promise<void> | void)
| undefined;
- var runWorkerMessageLoop: (() => Promise<void> | void) | undefined;
var onerror:
| ((
msg: string,
diff --git a/cli/js/lib.deno.worker.d.ts b/cli/js/lib.deno.worker.d.ts
index 07955345c..3311d9457 100644
--- a/cli/js/lib.deno.worker.d.ts
+++ b/cli/js/lib.deno.worker.d.ts
@@ -37,7 +37,6 @@ declare const postMessage: typeof __workerMain.postMessage;
declare namespace __workerMain {
export let onmessage: (e: { data: any }) => void;
export function postMessage(data: any): void;
- export function getMessage(): Promise<any>;
export function close(): void;
export const name: string;
}
diff --git a/cli/js/main.ts b/cli/js/main.ts
index b48277960..fbebfefe4 100644
--- a/cli/js/main.ts
+++ b/cli/js/main.ts
@@ -1,9 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { bootstrapMainRuntime } from "./runtime_main.ts";
-import {
- bootstrapWorkerRuntime,
- runWorkerMessageLoop
-} from "./runtime_worker.ts";
+import { bootstrapWorkerRuntime } from "./runtime_worker.ts";
Object.defineProperties(globalThis, {
bootstrapMainRuntime: {
@@ -17,11 +14,5 @@ Object.defineProperties(globalThis, {
enumerable: false,
writable: false,
configurable: false
- },
- runWorkerMessageLoop: {
- value: runWorkerMessageLoop,
- enumerable: false,
- writable: false,
- configurable: false
}
});
diff --git a/cli/js/runtime_worker.ts b/cli/js/runtime_worker.ts
index 0dc65fdb6..a9ed8b924 100644
--- a/cli/js/runtime_worker.ts
+++ b/cli/js/runtime_worker.ts
@@ -3,12 +3,9 @@
// This module is the entry point for "worker" isolate, ie. the one
// that is created using `new Worker()` JS API.
//
-// It provides two functions that should be called by Rust:
+// It provides a single function that should be called by Rust:
// - `bootstrapWorkerRuntime` - must be called once, when Isolate is created.
// It sets up runtime by providing globals for `DedicatedWorkerScope`.
-// - `runWorkerMessageLoop` - starts receiving messages from parent worker,
-// can be called multiple times - eg. to restart worker execution after
-// exception occurred and was handled by parent worker
/* eslint-disable @typescript-eslint/no-explicit-any */
import {
@@ -20,13 +17,12 @@ import {
eventTargetProperties
} from "./globals.ts";
import * as dispatch from "./dispatch.ts";
-import { sendAsync, sendSync } from "./dispatch_json.ts";
+import { sendSync } from "./dispatch_json.ts";
import { log } from "./util.ts";
-import { TextDecoder, TextEncoder } from "./text_encoding.ts";
+import { TextEncoder } from "./text_encoding.ts";
import * as runtime from "./runtime.ts";
const encoder = new TextEncoder();
-const decoder = new TextDecoder();
// TODO(bartlomieju): remove these funtions
// Stuff for workers
@@ -39,62 +35,46 @@ export function postMessage(data: any): void {
sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
}
-export async function getMessage(): Promise<any> {
- log("getMessage");
- const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
- if (res.data != null) {
- const dataIntArray = new Uint8Array(res.data);
- const dataJson = decoder.decode(dataIntArray);
- return JSON.parse(dataJson);
- } else {
- return null;
- }
-}
-
let isClosing = false;
let hasBootstrapped = false;
export function close(): void {
+ if (isClosing) {
+ return;
+ }
+
isClosing = true;
+ sendSync(dispatch.OP_WORKER_CLOSE);
}
-export async function runWorkerMessageLoop(): Promise<void> {
- while (!isClosing) {
- const data = await getMessage();
- if (data == null) {
- log("runWorkerMessageLoop got null message. quitting.");
- break;
- }
+export async function workerMessageRecvCallback(data: string): Promise<void> {
+ let result: void | Promise<void>;
+ const event = { data };
- let result: void | Promise<void>;
- const event = { data };
-
- try {
- if (!globalThis["onmessage"]) {
- break;
- }
+ try {
+ //
+ if (globalThis["onmessage"]) {
result = globalThis.onmessage!(event);
if (result && "then" in result) {
await result;
}
- if (!globalThis["onmessage"]) {
- break;
- }
- } catch (e) {
- if (globalThis["onerror"]) {
- const result = globalThis.onerror(
- e.message,
- e.fileName,
- e.lineNumber,
- e.columnNumber,
- e
- );
- if (result === true) {
- continue;
- }
+ }
+
+ // TODO: run the rest of liteners
+ } catch (e) {
+ if (globalThis["onerror"]) {
+ const result = globalThis.onerror(
+ e.message,
+ e.fileName,
+ e.lineNumber,
+ e.columnNumber,
+ e
+ );
+ if (result === true) {
+ return;
}
- throw e;
}
+ throw e;
}
}
@@ -102,8 +82,10 @@ export const workerRuntimeGlobalProperties = {
self: readOnly(globalThis),
onmessage: writable(onmessage),
onerror: writable(onerror),
+ // TODO: should be readonly?
close: nonEnumerable(close),
- postMessage: writable(postMessage)
+ postMessage: writable(postMessage),
+ workerMessageRecvCallback: nonEnumerable(workerMessageRecvCallback)
};
/**
diff --git a/cli/js/unit_tests.ts b/cli/js/unit_tests.ts
index 992169e55..a6435d183 100644
--- a/cli/js/unit_tests.ts
+++ b/cli/js/unit_tests.ts
@@ -59,6 +59,7 @@ import "./write_file_test.ts";
import "./performance_test.ts";
import "./permissions_test.ts";
import "./version_test.ts";
+import "./workers_test.ts";
import { runIfMain } from "../../std/testing/mod.ts";
diff --git a/cli/js/workers.ts b/cli/js/workers.ts
index fb63a3260..7b0c50336 100644
--- a/cli/js/workers.ts
+++ b/cli/js/workers.ts
@@ -38,19 +38,23 @@ function createWorker(
});
}
+function hostTerminateWorker(id: number): void {
+ sendSync(dispatch.OP_HOST_TERMINATE_WORKER, { id });
+}
+
function hostPostMessage(id: number, data: any): void {
const dataIntArray = encodeMessage(data);
sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray);
}
-async function hostGetMessage(id: number): Promise<any> {
- const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
+interface WorkerEvent {
+ event: "error" | "msg" | "close";
+ data?: any;
+ error?: any;
+}
- if (res.data != null) {
- return decodeMessage(new Uint8Array(res.data));
- } else {
- return null;
- }
+async function hostGetMessage(id: number): Promise<any> {
+ return await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
}
export interface Worker {
@@ -72,6 +76,8 @@ export class WorkerImpl extends EventTarget implements Worker {
public onerror?: (e: any) => void;
public onmessage?: (data: any) => void;
public onmessageerror?: () => void;
+ private name: string;
+ private terminated = false;
constructor(specifier: string, options?: WorkerOptions) {
super();
@@ -88,6 +94,7 @@ export class WorkerImpl extends EventTarget implements Worker {
);
}
+ this.name = options?.name ?? "unknown";
const hasSourceCode = false;
const sourceCode = new Uint8Array();
@@ -139,42 +146,53 @@ export class WorkerImpl extends EventTarget implements Worker {
}
async poll(): Promise<void> {
- while (!this.isClosing) {
- const data = await hostGetMessage(this.id);
- if (data == null) {
- log("worker got null message. quitting.");
- break;
- }
- if (this.onmessage) {
- const event = { data };
- this.onmessage(event);
+ while (!this.terminated) {
+ const event = await hostGetMessage(this.id);
+
+ // If terminate was called then we ignore all messages
+ if (this.terminated) {
+ return;
}
- }
- /*
- while (true) {
- const result = await hostPollWorker(this.id);
+ const type = event.type;
- if (result.error) {
- if (!this.handleError(result.error)) {
- throw Error(result.error.message);
- } else {
- hostResumeWorker(this.id);
+ if (type === "msg") {
+ if (this.onmessage) {
+ const message = decodeMessage(new Uint8Array(event.data));
+ this.onmessage({ data: message });
}
- } else {
- this.isClosing = true;
- hostCloseWorker(this.id);
- break;
+ continue;
}
+
+ if (type === "error") {
+ if (!this.handleError(event.error)) {
+ throw Error(event.error.message);
+ }
+ continue;
+ }
+
+ if (type === "close") {
+ log(`Host got "close" message from worker: ${this.name}`);
+ this.terminated = true;
+ return;
+ }
+
+ throw new Error(`Unknown worker event: "${type}"`);
}
- */
}
postMessage(data: any): void {
+ if (this.terminated) {
+ return;
+ }
+
hostPostMessage(this.id, data);
}
terminate(): void {
- throw new Error("Not yet implemented");
+ if (!this.terminated) {
+ this.terminated = true;
+ hostTerminateWorker(this.id);
+ }
}
}
diff --git a/cli/js/workers_test.ts b/cli/js/workers_test.ts
new file mode 100644
index 000000000..9cb4f4a07
--- /dev/null
+++ b/cli/js/workers_test.ts
@@ -0,0 +1,84 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import { test, assert, assertEquals } from "./test_util.ts";
+
+export interface ResolvableMethods<T> {
+ resolve: (value?: T | PromiseLike<T>) => void;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ reject: (reason?: any) => void;
+}
+
+export type Resolvable<T> = Promise<T> & ResolvableMethods<T>;
+
+export function createResolvable<T>(): Resolvable<T> {
+ let methods: ResolvableMethods<T>;
+ const promise = new Promise<T>((resolve, reject): void => {
+ methods = { resolve, reject };
+ });
+ // TypeScript doesn't know that the Promise callback occurs synchronously
+ // therefore use of not null assertion (`!`)
+ return Object.assign(promise, methods!) as Resolvable<T>;
+}
+
+test(async function workersBasic(): Promise<void> {
+ const promise = createResolvable();
+ const jsWorker = new Worker("../tests/subdir/test_worker.js", {
+ type: "module",
+ name: "jsWorker"
+ });
+ const tsWorker = new Worker("../tests/subdir/test_worker.ts", {
+ type: "module",
+ name: "tsWorker"
+ });
+
+ tsWorker.onmessage = (e): void => {
+ assertEquals(e.data, "Hello World");
+ promise.resolve();
+ };
+
+ jsWorker.onmessage = (e): void => {
+ assertEquals(e.data, "Hello World");
+ tsWorker.postMessage("Hello World");
+ };
+
+ jsWorker.onerror = (e: Event): void => {
+ e.preventDefault();
+ jsWorker.postMessage("Hello World");
+ };
+
+ jsWorker.postMessage("Hello World");
+ await promise;
+});
+
+test(async function nestedWorker(): Promise<void> {
+ const promise = createResolvable();
+
+ const nestedWorker = new Worker("../tests/subdir/nested_worker.js", {
+ type: "module",
+ name: "nested"
+ });
+
+ nestedWorker.onmessage = (e): void => {
+ assert(e.data.type !== "error");
+ promise.resolve();
+ };
+
+ nestedWorker.postMessage("Hello World");
+ await promise;
+});
+
+test(async function workerThrowsWhenExecuting(): Promise<void> {
+ const promise = createResolvable();
+
+ const throwingWorker = new Worker("../tests/subdir/throwing_worker.js", {
+ type: "module"
+ });
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ throwingWorker.onerror = (e: any): void => {
+ e.preventDefault();
+ assertEquals(e.message, "Uncaught Error: Thrown error");
+ promise.resolve();
+ };
+
+ await promise;
+});