diff options
author | andy finch <andyfinch7@gmail.com> | 2019-04-01 15:09:59 -0400 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-04-01 15:09:59 -0400 |
commit | b0a23beb8fae964be3cdd8c23c38af66257d34c7 (patch) | |
tree | 8f7875c8ca059dfb0a3ade4da7bfb94e57d6e1aa /js/workers.ts | |
parent | 659acadf77fdbeef8579a37839a464feb408437a (diff) |
Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled.
* Share the worker future as a Shared
Diffstat (limited to 'js/workers.ts')
-rw-r--r-- | js/workers.ts | 147 |
1 files changed, 133 insertions, 14 deletions
diff --git a/js/workers.ts b/js/workers.ts index bdfbed640..601ffa0b1 100644 --- a/js/workers.ts +++ b/js/workers.ts @@ -1,33 +1,110 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import * as dispatch from "./dispatch"; +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { sendAsync, sendSync } from "./dispatch"; import * as msg from "gen/cli/msg_generated"; import * as flatbuffers from "./flatbuffers"; import { assert, log } from "./util"; +import { TextDecoder, TextEncoder } from "./text_encoding"; import { window } from "./window"; -export async function postMessage(data: Uint8Array): Promise<void> { +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +export function encodeMessage(data: any): Uint8Array { + const dataJson = JSON.stringify(data); + return encoder.encode(dataJson); +} + +export function decodeMessage(dataIntArray: Uint8Array): any { + const dataJson = decoder.decode(dataIntArray); + return JSON.parse(dataJson); +} + +function createWorker(specifier: string): number { + const builder = flatbuffers.createBuilder(); + const specifier_ = builder.createString(specifier); + msg.CreateWorker.startCreateWorker(builder); + msg.CreateWorker.addSpecifier(builder, specifier_); + const inner = msg.CreateWorker.endCreateWorker(builder); + const baseRes = sendSync(builder, msg.Any.CreateWorker, inner); + assert(baseRes != null); + assert( + msg.Any.CreateWorkerRes === baseRes!.innerType(), + `base.innerType() unexpectedly is ${baseRes!.innerType()}` + ); + const res = new msg.CreateWorkerRes(); + assert(baseRes!.inner(res) != null); + return res.rid(); +} + +async function hostGetWorkerClosed(rid: number): Promise<void> { + const builder = flatbuffers.createBuilder(); + msg.HostGetWorkerClosed.startHostGetWorkerClosed(builder); + msg.HostGetWorkerClosed.addRid(builder, rid); + const inner = msg.HostGetWorkerClosed.endHostGetWorkerClosed(builder); + await sendAsync(builder, msg.Any.HostGetWorkerClosed, inner); +} + +function hostPostMessage(rid: number, data: any): void { + const dataIntArray = encodeMessage(data); + const builder = flatbuffers.createBuilder(); + msg.HostPostMessage.startHostPostMessage(builder); + msg.HostPostMessage.addRid(builder, rid); + const inner = msg.HostPostMessage.endHostPostMessage(builder); + const baseRes = sendSync( + builder, + msg.Any.HostPostMessage, + inner, + dataIntArray + ); + assert(baseRes != null); +} + +async function hostGetMessage(rid: number): Promise<any> { + const builder = flatbuffers.createBuilder(); + msg.HostGetMessage.startHostGetMessage(builder); + msg.HostGetMessage.addRid(builder, rid); + const inner = msg.HostGetMessage.endHostGetMessage(builder); + const baseRes = await sendAsync(builder, msg.Any.HostGetMessage, inner); + assert(baseRes != null); + assert( + msg.Any.HostGetMessageRes === baseRes!.innerType(), + `base.innerType() unexpectedly is ${baseRes!.innerType()}` + ); + const res = new msg.HostGetMessageRes(); + assert(baseRes!.inner(res) != null); + + const dataArray = res.dataArray(); + if (dataArray != null) { + return decodeMessage(dataArray); + } else { + return null; + } +} + +// Stuff for workers +export let onmessage: (e: { data: any }) => void = (): void => {}; + +export function postMessage(data: any): void { + const dataIntArray = encodeMessage(data); const builder = flatbuffers.createBuilder(); msg.WorkerPostMessage.startWorkerPostMessage(builder); const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder); - const baseRes = await dispatch.sendAsync( + const baseRes = sendSync( builder, msg.Any.WorkerPostMessage, inner, - data + dataIntArray ); assert(baseRes != null); } -export async function getMessage(): Promise<null | Uint8Array> { +export async function getMessage(): Promise<any> { log("getMessage"); const builder = flatbuffers.createBuilder(); msg.WorkerGetMessage.startWorkerGetMessage(builder); const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder); - const baseRes = await dispatch.sendAsync( - builder, - msg.Any.WorkerGetMessage, - inner - ); + const baseRes = await sendAsync(builder, msg.Any.WorkerGetMessage, inner); assert(baseRes != null); assert( msg.Any.WorkerGetMessageRes === baseRes!.innerType(), @@ -37,14 +114,14 @@ export async function getMessage(): Promise<null | Uint8Array> { assert(baseRes!.inner(res) != null); const dataArray = res.dataArray(); - if (dataArray == null) { - return null; + if (dataArray != null) { + return decodeMessage(dataArray); } else { - return new Uint8Array(dataArray!); + return null; } } -let isClosing = false; +export let isClosing = false; export function workerClose(): void { isClosing = true; @@ -67,3 +144,45 @@ export async function workerMain(): Promise<void> { } } } + +export interface Worker { + onerror?: () => void; + onmessage?: (e: { data: any }) => void; + onmessageerror?: () => void; + postMessage(data: any): void; +} + +export class WorkerImpl implements Worker { + private readonly rid: number; + private isClosing: boolean = false; + public onerror?: () => void; + public onmessage?: (data: any) => void; + public onmessageerror?: () => void; + + constructor(specifier: string) { + this.rid = createWorker(specifier); + this.run(); + hostGetWorkerClosed(this.rid).then(() => { + this.isClosing = true; + }); + } + + postMessage(data: any): void { + hostPostMessage(this.rid, data); + } + + private async run(): Promise<void> { + while (!this.isClosing) { + const data = await hostGetMessage(this.rid); + if (data == null) { + log("worker got null message. quitting."); + break; + } + // TODO(afinch7) stop this from eating messages before onmessage has been assigned + if (this.onmessage) { + const event = { data }; + this.onmessage(event); + } + } + } +} |