From 79b3bc05d6de520f1df73face1744ae3d8be0bb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 11 Feb 2020 10:04:59 +0100 Subject: workers: basic event loop (#3828) * establish basic event loop for workers * make "self.close()" inside worker * remove "runWorkerMessageLoop() - instead manually call global function in Rust when message arrives. This is done in preparation for structured clone * refactor "WorkerChannel" and use distinct structs for internal and external channels; "WorkerChannelsInternal" and "WorkerHandle" * move "State.worker_channels_internal" to "Worker.internal_channels" * add "WorkerEvent" enum for child->host communication; currently "Message(Buf)" and "Error(ErrBox)" variants are supported * add tests for nested workers * add tests for worker throwing error on startup --- cli/js/workers.ts | 80 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 31 deletions(-) (limited to 'cli/js/workers.ts') diff --git a/cli/js/workers.ts b/cli/js/workers.ts index fb63a3260..7b0c50336 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -38,19 +38,23 @@ function createWorker( }); } +function hostTerminateWorker(id: number): void { + sendSync(dispatch.OP_HOST_TERMINATE_WORKER, { id }); +} + function hostPostMessage(id: number, data: any): void { const dataIntArray = encodeMessage(data); sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray); } -async function hostGetMessage(id: number): Promise { - const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); +interface WorkerEvent { + event: "error" | "msg" | "close"; + data?: any; + error?: any; +} - if (res.data != null) { - return decodeMessage(new Uint8Array(res.data)); - } else { - return null; - } +async function hostGetMessage(id: number): Promise { + return await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id }); } export interface Worker { @@ -72,6 +76,8 @@ export class WorkerImpl extends EventTarget implements Worker { public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; + private name: string; + private terminated = false; constructor(specifier: string, options?: WorkerOptions) { super(); @@ -88,6 +94,7 @@ export class WorkerImpl extends EventTarget implements Worker { ); } + this.name = options?.name ?? "unknown"; const hasSourceCode = false; const sourceCode = new Uint8Array(); @@ -139,42 +146,53 @@ export class WorkerImpl extends EventTarget implements Worker { } async poll(): Promise { - while (!this.isClosing) { - const data = await hostGetMessage(this.id); - if (data == null) { - log("worker got null message. quitting."); - break; - } - if (this.onmessage) { - const event = { data }; - this.onmessage(event); + while (!this.terminated) { + const event = await hostGetMessage(this.id); + + // If terminate was called then we ignore all messages + if (this.terminated) { + return; } - } - /* - while (true) { - const result = await hostPollWorker(this.id); + const type = event.type; - if (result.error) { - if (!this.handleError(result.error)) { - throw Error(result.error.message); - } else { - hostResumeWorker(this.id); + if (type === "msg") { + if (this.onmessage) { + const message = decodeMessage(new Uint8Array(event.data)); + this.onmessage({ data: message }); } - } else { - this.isClosing = true; - hostCloseWorker(this.id); - break; + continue; } + + if (type === "error") { + if (!this.handleError(event.error)) { + throw Error(event.error.message); + } + continue; + } + + if (type === "close") { + log(`Host got "close" message from worker: ${this.name}`); + this.terminated = true; + return; + } + + throw new Error(`Unknown worker event: "${type}"`); } - */ } postMessage(data: any): void { + if (this.terminated) { + return; + } + hostPostMessage(this.id, data); } terminate(): void { - throw new Error("Not yet implemented"); + if (!this.terminated) { + this.terminated = true; + hostTerminateWorker(this.id); + } } } -- cgit v1.2.3