diff options
author | Tim Ramlot <42113979+inteon@users.noreply.github.com> | 2021-05-11 21:09:09 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-11 21:09:09 +0200 |
commit | 635253bd3a3895f49e6c9606beb852da22fee205 (patch) | |
tree | cec9d75354b4e985a376f888564ecb63c99f2643 /runtime/js | |
parent | 0d319161bc19a520df653bc0c8386f14a68efbdb (diff) |
feat(runtime/worker): Structured cloning worker message passing (#9323)
This commit upgrade "Worker.postMessage()" implementation to use
structured clone algorithm instead of non-spec compliant JSON serialization.
Diffstat (limited to 'runtime/js')
-rw-r--r-- | runtime/js/11_workers.js | 102 | ||||
-rw-r--r-- | runtime/js/99_main.js | 103 |
2 files changed, 81 insertions, 124 deletions
diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 508dd46d4..dca83c818 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -39,26 +39,8 @@ return core.opAsync("op_host_get_message", id); } - const encoder = new TextEncoder(); const decoder = new TextDecoder(); - function encodeMessage(data) { - const dataJson = JSON.stringify(data); - return encoder.encode(dataJson); - } - - function decodeMessage(dataIntArray) { - // Temporary solution until structured clone arrives in v8. - // Current clone is made by parsing json to byte array and from byte array back to json. - // In that case "undefined" transforms to empty byte array, but empty byte array does not transform back to undefined. - // Thats why this special is statement is needed. - if (dataIntArray.length == 0) { - return undefined; - } - const dataJson = decoder.decode(dataIntArray); - return JSON.parse(dataJson); - } - /** * @param {string} permission * @return {boolean} @@ -211,18 +193,7 @@ this.#poll(); } - #handleMessage = (msgData) => { - let data; - try { - data = decodeMessage(new Uint8Array(msgData)); - } catch (e) { - const msgErrorEvent = new MessageEvent("messageerror", { - cancelable: false, - data, - }); - return; - } - + #handleMessage = (data) => { const msgEvent = new MessageEvent("message", { cancelable: false, data, @@ -253,56 +224,44 @@ #poll = async () => { while (!this.#terminated) { - const event = await hostGetMessage(this.#id); + const [type, data] = await hostGetMessage(this.#id); // If terminate was called then we ignore all messages if (this.#terminated) { return; } - const type = event.type; - - if (type === "terminalError") { - this.#terminated = true; - if (!this.#handleError(event.error)) { - if (globalThis instanceof Window) { - throw new Error("Unhandled error event reached main worker."); - } else { - core.opSync( - "op_host_unhandled_error", - event.error.message, - ); - } + switch (type) { + case 0: { // Message + const msg = core.deserialize(data); + this.#handleMessage(msg); + break; } - continue; - } - - if (type === "msg") { - this.#handleMessage(event.data); - continue; - } - - if (type === "error") { - if (!this.#handleError(event.error)) { - if (globalThis instanceof Window) { - throw new Error("Unhandled error event reached main worker."); - } else { - core.opSync( - "op_host_unhandled_error", - event.error.message, - ); + case 1: { // TerminalError + this.#terminated = true; + } /* falls through */ + case 2: { // Error + if (!this.#handleError(data)) { + if (globalThis instanceof Window) { + throw new Error("Unhandled error event reached main worker."); + } else { + core.opSync( + "op_worker_unhandled_error", + data.message, + ); + } } + break; + } + case 3: { // Close + log(`Host got "close" message from worker: ${this.#name}`); + this.#terminated = true; + return; + } + default: { + throw new Error(`Unknown worker event: "${type}"`); } - continue; - } - - if (type === "close") { - log(`Host got "close" message from worker: ${this.#name}`); - this.#terminated = true; - return; } - - throw new Error(`Unknown worker event: "${type}"`); } }; @@ -317,7 +276,8 @@ return; } - hostPostMessage(this.#id, encodeMessage(message)); + const bufferMsg = core.serialize(message); + hostPostMessage(this.#id, bufferMsg); } terminate() { diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index d2926bb1f..082c83593 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -67,7 +67,7 @@ delete Object.prototype.__proto__; } isClosing = true; - opCloseWorker(); + core.opSync("op_worker_close"); } // TODO(bartlomieju): remove these functions @@ -76,68 +76,64 @@ delete Object.prototype.__proto__; const onerror = () => {}; function postMessage(data) { - const dataJson = JSON.stringify(data); - const dataIntArray = encoder.encode(dataJson); - opPostMessage(dataIntArray); + const dataIntArray = core.serialize(data); + core.opSync("op_worker_post_message", null, dataIntArray); } let isClosing = false; - async function workerMessageRecvCallback(data) { - const msgEvent = new MessageEvent("message", { - cancelable: false, - data, - }); - - try { - if (globalThis["onmessage"]) { - const result = globalThis.onmessage(msgEvent); - if (result && "then" in result) { - await result; - } - } - globalThis.dispatchEvent(msgEvent); - } catch (e) { - let handled = false; - - const errorEvent = new ErrorEvent("error", { - cancelable: true, - message: e.message, - lineno: e.lineNumber ? e.lineNumber + 1 : undefined, - colno: e.columnNumber ? e.columnNumber + 1 : undefined, - filename: e.fileName, - error: null, + async function pollForMessages() { + while (!isClosing) { + const bufferMsg = await core.opAsync("op_worker_get_message"); + const data = core.deserialize(bufferMsg); + + const msgEvent = new MessageEvent("message", { + cancelable: false, + data, }); - if (globalThis["onerror"]) { - const ret = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e, - ); - handled = ret === true; - } + try { + if (globalThis.onmessage) { + await globalThis.onmessage(msgEvent); + } + globalThis.dispatchEvent(msgEvent); + } catch (e) { + let handled = false; + + const errorEvent = new ErrorEvent("error", { + cancelable: true, + message: e.message, + lineno: e.lineNumber ? e.lineNumber + 1 : undefined, + colno: e.columnNumber ? e.columnNumber + 1 : undefined, + filename: e.fileName, + error: null, + }); + + if (globalThis["onerror"]) { + const ret = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e, + ); + handled = ret === true; + } - globalThis.dispatchEvent(errorEvent); - if (errorEvent.defaultPrevented) { - handled = true; - } + globalThis.dispatchEvent(errorEvent); + if (errorEvent.defaultPrevented) { + handled = true; + } - if (!handled) { - throw e; + if (!handled) { + core.opSync( + "op_worker_unhandled_error", + e.message, + ); + } } } } - function opPostMessage(data) { - core.opSync("op_worker_post_message", null, data); - } - - function opCloseWorker() { - core.opSync("op_worker_close"); - } - function opMainModule() { return core.opSync("op_main_module"); } @@ -395,7 +391,6 @@ delete Object.prototype.__proto__; // TODO(bartlomieju): should be readonly? close: util.nonEnumerable(workerClose), postMessage: util.writable(postMessage), - workerMessageRecvCallback: util.nonEnumerable(workerMessageRecvCallback), }; let hasBootstrapped = false; @@ -506,6 +501,8 @@ delete Object.prototype.__proto__; location.setLocationHref(locationHref); registerErrors(); + pollForMessages(); + const internalSymbol = Symbol("Deno.internal"); const finalDenoNs = { |