diff options
Diffstat (limited to 'cli/js/workers.ts')
-rw-r--r-- | cli/js/workers.ts | 139 |
1 files changed, 117 insertions, 22 deletions
diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 27f873100..d1d8f78e2 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -2,11 +2,12 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import * as dispatch from "./dispatch.ts"; import { sendAsync, sendSync } from "./dispatch_json.ts"; -import { log } from "./util.ts"; +import { log, createResolvable, Resolvable } from "./util.ts"; import { TextDecoder, TextEncoder } from "./text_encoding.ts"; import { window } from "./window.ts"; import { blobURLMap } from "./url.ts"; import { blobBytesWeakMap } from "./blob.ts"; +import { EventTarget } from "./event_target.ts"; const encoder = new TextEncoder(); const decoder = new TextDecoder(); @@ -26,7 +27,7 @@ function createWorker( includeDenoNamespace: boolean, hasSourceCode: boolean, sourceCode: Uint8Array -): number { +): { id: number; loaded: boolean } { return sendSync(dispatch.OP_CREATE_WORKER, { specifier, includeDenoNamespace, @@ -35,8 +36,20 @@ function createWorker( }); } -async function hostGetWorkerClosed(id: number): Promise<void> { - await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { id }); +async function hostGetWorkerLoaded(id: number): Promise<any> { + return await sendAsync(dispatch.OP_HOST_GET_WORKER_LOADED, { id }); +} + +async function hostPollWorker(id: number): Promise<any> { + return await sendAsync(dispatch.OP_HOST_POLL_WORKER, { id }); +} + +function hostCloseWorker(id: number): void { + sendSync(dispatch.OP_HOST_CLOSE_WORKER, { id }); +} + +function hostResumeWorker(id: number): void { + sendSync(dispatch.OP_HOST_RESUME_WORKER, { id }); } function hostPostMessage(id: number, data: any): void { @@ -56,6 +69,7 @@ async function hostGetMessage(id: number): Promise<any> { // Stuff for workers export const onmessage: (e: { data: any }) => void = (): void => {}; +export const onerror: (e: { data: any }) => void = (): void => {}; export function postMessage(data: any): void { const dataIntArray = encodeMessage(data); @@ -88,25 +102,41 @@ export async function workerMain(): Promise<void> { break; } - if (window["onmessage"]) { - const event = { data }; - const result: void | Promise<void> = window.onmessage(event); + let result: void | Promise<void>; + const event = { data }; + + try { + result = window.onmessage(event); if (result && "then" in result) { await result; } - } - - if (!window["onmessage"]) { - break; + if (!window["onmessage"]) { + break; + } + } catch (e) { + if (window["onerror"]) { + const result = window.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e + ); + if (result === true) { + continue; + } + } + throw e; } } } export interface Worker { - onerror?: () => void; + onerror?: (e: any) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; + // TODO(bartlomieju): remove this closed: Promise<void>; } @@ -122,15 +152,18 @@ export interface DenoWorkerOptions extends WorkerOptions { noDenoNamespace?: boolean; } -export class WorkerImpl implements Worker { +export class WorkerImpl extends EventTarget implements Worker { private readonly id: number; private isClosing = false; - private readonly isClosedPromise: Promise<void>; - public onerror?: () => void; + private messageBuffer: any[] = []; + private ready = false; + private readonly isClosedPromise: Resolvable<void>; + public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; constructor(specifier: string, options?: DenoWorkerOptions) { + super(); let hasSourceCode = false; let sourceCode = new Uint8Array(); @@ -152,24 +185,87 @@ export class WorkerImpl implements Worker { sourceCode = blobBytes!; } - this.id = createWorker( + const { id, loaded } = createWorker( specifier, includeDenoNamespace, hasSourceCode, sourceCode ); - this.run(); - this.isClosedPromise = hostGetWorkerClosed(this.id); - this.isClosedPromise.then((): void => { - this.isClosing = true; - }); + this.id = id; + this.ready = loaded; + this.isClosedPromise = createResolvable(); + this.poll(); } get closed(): Promise<void> { return this.isClosedPromise; } + private handleError(e: any): boolean { + const event = new window.Event("error", { cancelable: true }); + event.message = e.message; + event.lineNumber = e.lineNumber ? e.lineNumber + 1 : null; + event.columnNumber = e.columnNumber ? e.columnNumber + 1 : null; + event.fileName = e.fileName; + event.error = null; + + let handled = false; + if (this.onerror) { + this.onerror(event); + if (event.defaultPrevented) { + handled = true; + } + } + + return handled; + } + + async poll(): Promise<void> { + // If worker has not been immediately executed + // then let's await it's readiness + if (!this.ready) { + const result = await hostGetWorkerLoaded(this.id); + + if (result.error) { + if (!this.handleError(result.error)) { + throw new Error(result.error.message); + } + return; + } + } + + // drain messages + for (const data of this.messageBuffer) { + hostPostMessage(this.id, data); + } + this.messageBuffer = []; + this.ready = true; + this.run(); + + while (true) { + const result = await hostPollWorker(this.id); + + if (result.error) { + if (!this.handleError(result.error)) { + throw Error(result.error.message); + } else { + hostResumeWorker(this.id); + } + } else { + this.isClosing = true; + hostCloseWorker(this.id); + this.isClosedPromise.resolve(); + break; + } + } + } + postMessage(data: any): void { + if (!this.ready) { + this.messageBuffer.push(data); + return; + } + hostPostMessage(this.id, data); } @@ -180,7 +276,6 @@ export class WorkerImpl implements Worker { log("worker got null message. quitting."); break; } - // TODO(afinch7) stop this from eating messages before onmessage has been assigned if (this.onmessage) { const event = { data }; this.onmessage(event); |