From 7966bf14c062a05b1606a62c996890571454ecc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 21 Jan 2020 09:49:47 +0100 Subject: refactor: split worker and worker host logic (#3722) * split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs * refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime * BREAKING CHANGE: remove support for blob: URL in Worker * BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor * introduce WebWorker struct which is a stripped down version of cli::Worker --- cli/compilers/ts.rs | 2 +- cli/compilers/wasm.rs | 2 +- cli/js/compiler.ts | 2 +- cli/js/globals.ts | 11 +- cli/js/lib.deno_runtime.d.ts | 35 ++- cli/js/worker_main.ts | 98 ++++++++ cli/js/workers.ts | 128 +++-------- cli/lib.rs | 17 +- cli/ops/mod.rs | 3 +- cli/ops/web_worker.rs | 77 +++++++ cli/ops/worker_host.rs | 343 ++++++++++++++++++++++++++++ cli/ops/workers.rs | 394 --------------------------------- cli/state.rs | 14 +- cli/tests/026_workers.ts | 4 +- cli/tests/039_worker_deno_ns.ts | 6 +- cli/tests/integration_tests.rs | 4 + cli/tests/subdir/bench_worker.ts | 1 + cli/tests/workers_round_robin_bench.ts | 17 +- cli/tests/workers_startup_bench.ts | 9 +- cli/web_worker.rs | 145 ++++++++++++ cli/worker.rs | 74 +++---- core/isolate.rs | 2 + 22 files changed, 782 insertions(+), 606 deletions(-) create mode 100644 cli/js/worker_main.ts create mode 100644 cli/ops/web_worker.rs create mode 100644 cli/ops/worker_host.rs delete mode 100644 cli/ops/workers.rs create mode 100644 cli/web_worker.rs diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index e8abbcd27..037043368 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -231,7 +231,7 @@ impl TsCompiler { fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = - ThreadSafeState::new(global_state.clone(), None, None, true, int) + ThreadSafeState::new(global_state.clone(), None, None, int) .expect("Unable to create worker state"); // Count how many times we start the compiler worker. diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index e3297283b..ca889be1f 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -45,7 +45,7 @@ impl WasmCompiler { fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = - ThreadSafeState::new(global_state.clone(), None, None, true, int) + ThreadSafeState::new(global_state.clone(), None, None, int) .expect("Unable to create worker state"); // Count how many times we start the compiler worker. diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts index 79b9fdaf6..7addfc5ca 100644 --- a/cli/js/compiler.ts +++ b/cli/js/compiler.ts @@ -32,7 +32,7 @@ import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts"; import * as os from "./os.ts"; import { assert } from "./util.ts"; import * as util from "./util.ts"; -import { postMessage, workerClose, workerMain } from "./workers.ts"; +import { postMessage, workerClose, workerMain } from "./worker_main.ts"; const self = globalThis; diff --git a/cli/js/globals.ts b/cli/js/globals.ts index 5754002c0..0f364b8e0 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -21,6 +21,7 @@ import * as textEncoding from "./text_encoding.ts"; import * as timers from "./timers.ts"; import * as url from "./url.ts"; import * as urlSearchParams from "./url_search_params.ts"; +import * as workerRuntime from "./worker_main.ts"; import * as workers from "./workers.ts"; import * as performanceUtil from "./performance.ts"; import * as request from "./request.ts"; @@ -194,12 +195,12 @@ const globalProperties = { Response: nonEnumerable(fetchTypes.Response), performance: writable(new performanceUtil.Performance()), - onmessage: writable(workers.onmessage), - onerror: writable(workers.onerror), + onmessage: writable(workerRuntime.onmessage), + onerror: writable(workerRuntime.onerror), - workerMain: nonEnumerable(workers.workerMain), - workerClose: nonEnumerable(workers.workerClose), - postMessage: writable(workers.postMessage), + workerMain: nonEnumerable(workerRuntime.workerMain), + workerClose: nonEnumerable(workerRuntime.workerClose), + postMessage: writable(workerRuntime.postMessage), Worker: nonEnumerable(workers.WorkerImpl), [domTypes.eventTargetHost]: nonEnumerable(null), diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts index 0fa183348..05553ffb7 100644 --- a/cli/js/lib.deno_runtime.d.ts +++ b/cli/js/lib.deno_runtime.d.ts @@ -2128,9 +2128,9 @@ declare interface Window { performance: __performanceUtil.Performance; onmessage: (e: { data: any }) => void; onerror: undefined | typeof onerror; - workerMain: typeof __workers.workerMain; - workerClose: typeof __workers.workerClose; - postMessage: typeof __workers.postMessage; + workerMain: typeof __workerMain.workerMain; + workerClose: typeof __workerMain.workerClose; + postMessage: typeof __workerMain.postMessage; Worker: typeof __workers.WorkerImpl; addEventListener: ( type: string, @@ -2187,9 +2187,9 @@ declare let onerror: e: Event ) => boolean | void) | undefined; -declare const workerMain: typeof __workers.workerMain; -declare const workerClose: typeof __workers.workerClose; -declare const postMessage: typeof __workers.postMessage; +declare const workerMain: typeof __workerMain.workerMain; +declare const workerClose: typeof __workerMain.workerClose; +declare const postMessage: typeof __workerMain.postMessage; declare const Worker: typeof __workers.WorkerImpl; declare const addEventListener: ( type: string, @@ -3437,31 +3437,25 @@ declare namespace __url { }; } -declare namespace __workers { - // @url js/workers.d.ts - - export function encodeMessage(data: any): Uint8Array; - export function decodeMessage(dataIntArray: Uint8Array): any; +declare namespace __workerMain { export let onmessage: (e: { data: any }) => void; export function postMessage(data: any): void; export function getMessage(): Promise; export let isClosing: boolean; export function workerClose(): void; export function workerMain(): Promise; +} + +declare namespace __workers { + // @url js/workers.d.ts export interface Worker { onerror?: (e: Event) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; - closed: Promise; } - export interface WorkerOptions {} - /** Extended Deno Worker initialization options. - * `noDenoNamespace` hides global `window.Deno` namespace for - * spawned worker and nested workers spawned by it (default: false). - */ - export interface DenoWorkerOptions extends WorkerOptions { - noDenoNamespace?: boolean; + export interface WorkerOptions { + type?: "classic" | "module"; } export class WorkerImpl implements Worker { private readonly id; @@ -3470,8 +3464,7 @@ declare namespace __workers { onerror?: (e: Event) => void; onmessage?: (data: any) => void; onmessageerror?: () => void; - constructor(specifier: string, options?: DenoWorkerOptions); - readonly closed: Promise; + constructor(specifier: string, options?: WorkerOptions); postMessage(data: any): void; private run; } diff --git a/cli/js/worker_main.ts b/cli/js/worker_main.ts new file mode 100644 index 000000000..cb70057ea --- /dev/null +++ b/cli/js/worker_main.ts @@ -0,0 +1,98 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { core } from "./core.ts"; +import * as dispatch from "./dispatch.ts"; +import { sendAsync, sendSync } from "./dispatch_json.ts"; +import { log } from "./util.ts"; +import { TextDecoder, TextEncoder } from "./text_encoding.ts"; + +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +function encodeMessage(data: any): Uint8Array { + const dataJson = JSON.stringify(data); + return encoder.encode(dataJson); +} + +function decodeMessage(dataIntArray: Uint8Array): any { + const dataJson = decoder.decode(dataIntArray); + return JSON.parse(dataJson); +} + +// Stuff for workers +export const onmessage: (e: { data: any }) => void = (): void => {}; +export const onerror: (e: { data: any }) => void = (): void => {}; + +export function postMessage(data: any): void { + const dataIntArray = encodeMessage(data); + sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); +} + +export async function getMessage(): Promise { + log("getMessage"); + const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); + if (res.data != null) { + return decodeMessage(new Uint8Array(res.data)); + } else { + return null; + } +} + +export let isClosing = false; + +export function workerClose(): void { + isClosing = true; +} + +export async function workerMain(): Promise { + const ops = core.ops(); + // TODO(bartlomieju): this is a prototype, we should come up with + // something a bit more sophisticated + for (const [name, opId] of Object.entries(ops)) { + const opName = `OP_${name.toUpperCase()}`; + // Assign op ids to actual variables + // TODO(ry) This type casting is gross and should be fixed. + ((dispatch as unknown) as { [key: string]: number })[opName] = opId; + core.setAsyncHandler(opId, dispatch.getAsyncHandler(opName)); + } + + log("workerMain"); + + while (!isClosing) { + const data = await getMessage(); + if (data == null) { + log("workerMain got null message. quitting."); + break; + } + + let result: void | Promise; + const event = { data }; + + try { + if (!globalThis["onmessage"]) { + break; + } + result = globalThis.onmessage!(event); + if (result && "then" in result) { + await result; + } + if (!globalThis["onmessage"]) { + break; + } + } catch (e) { + if (globalThis["onerror"]) { + const result = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e + ); + if (result === true) { + continue; + } + } + throw e; + } + } +} diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 7e8219e19..60ef73da0 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -2,35 +2,35 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import * as dispatch from "./dispatch.ts"; import { sendAsync, sendSync } from "./dispatch_json.ts"; -import { log, createResolvable, Resolvable } from "./util.ts"; +import { log } from "./util.ts"; import { TextDecoder, TextEncoder } from "./text_encoding.ts"; +/* import { blobURLMap } from "./url.ts"; import { blobBytesWeakMap } from "./blob.ts"; +*/ import { Event } from "./event.ts"; import { EventTarget } from "./event_target.ts"; const encoder = new TextEncoder(); const decoder = new TextDecoder(); -export function encodeMessage(data: any): Uint8Array { +function encodeMessage(data: any): Uint8Array { const dataJson = JSON.stringify(data); return encoder.encode(dataJson); } -export function decodeMessage(dataIntArray: Uint8Array): any { +function decodeMessage(dataIntArray: Uint8Array): any { const dataJson = decoder.decode(dataIntArray); return JSON.parse(dataJson); } function createWorker( specifier: string, - includeDenoNamespace: boolean, hasSourceCode: boolean, sourceCode: Uint8Array ): { id: number; loaded: boolean } { return sendSync(dispatch.OP_CREATE_WORKER, { specifier, - includeDenoNamespace, hasSourceCode, sourceCode: new TextDecoder().decode(sourceCode) }); @@ -67,92 +67,15 @@ async function hostGetMessage(id: number): Promise { } } -// Stuff for workers -export const onmessage: (e: { data: any }) => void = (): void => {}; -export const onerror: (e: { data: any }) => void = (): void => {}; - -export function postMessage(data: any): void { - const dataIntArray = encodeMessage(data); - sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); -} - -export async function getMessage(): Promise { - log("getMessage"); - const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); - if (res.data != null) { - return decodeMessage(new Uint8Array(res.data)); - } else { - return null; - } -} - -export let isClosing = false; - -export function workerClose(): void { - isClosing = true; -} - -export async function workerMain(): Promise { - log("workerMain"); - - while (!isClosing) { - const data = await getMessage(); - if (data == null) { - log("workerMain got null message. quitting."); - break; - } - - let result: void | Promise; - const event = { data }; - - try { - if (!globalThis["onmessage"]) { - break; - } - result = globalThis.onmessage!(event); - if (result && "then" in result) { - await result; - } - if (!globalThis["onmessage"]) { - break; - } - } catch (e) { - if (globalThis["onerror"]) { - const result = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e - ); - if (result === true) { - continue; - } - } - throw e; - } - } -} - export interface Worker { onerror?: (e: any) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; - // TODO(bartlomieju): remove this - closed: Promise; } -// TODO(kevinkassimo): Maybe implement reasonable web worker options? -// eslint-disable-next-line @typescript-eslint/no-empty-interface -export interface WorkerOptions {} - -/** Extended Deno Worker initialization options. - * `noDenoNamespace` hides global `globalThis.Deno` namespace for - * spawned worker and nested workers spawned by it (default: false). - */ -export interface DenoWorkerOptions extends WorkerOptions { - noDenoNamespace?: boolean; +export interface WorkerOptions { + type?: "classic" | "module"; } export class WorkerImpl extends EventTarget implements Worker { @@ -160,20 +83,29 @@ export class WorkerImpl extends EventTarget implements Worker { private isClosing = false; private messageBuffer: any[] = []; private ready = false; - private readonly isClosedPromise: Resolvable; public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; - constructor(specifier: string, options?: DenoWorkerOptions) { + constructor(specifier: string, options?: WorkerOptions) { super(); - let hasSourceCode = false; - let sourceCode = new Uint8Array(); - let includeDenoNamespace = true; - if (options && options.noDenoNamespace) { - includeDenoNamespace = false; + let type = "classic"; + + if (options?.type) { + type = options.type; + } + + if (type !== "module") { + throw new Error( + 'Not yet implemented: only "module" type workers are supported' + ); } + + const hasSourceCode = false; + const sourceCode = new Uint8Array(); + + /* TODO(bartlomieju): // Handle blob URL. if (specifier.startsWith("blob:")) { hasSourceCode = true; @@ -187,23 +119,14 @@ export class WorkerImpl extends EventTarget implements Worker { } sourceCode = blobBytes!; } + */ - const { id, loaded } = createWorker( - specifier, - includeDenoNamespace, - hasSourceCode, - sourceCode - ); + const { id, loaded } = createWorker(specifier, hasSourceCode, sourceCode); this.id = id; this.ready = loaded; - this.isClosedPromise = createResolvable(); this.poll(); } - get closed(): Promise { - return this.isClosedPromise; - } - private handleError(e: any): boolean { // TODO: this is being handled in a type unsafe way, it should be type safe // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -259,7 +182,6 @@ export class WorkerImpl extends EventTarget implements Worker { } else { this.isClosing = true; hostCloseWorker(this.id); - this.isClosedPromise.resolve(); break; } } diff --git a/cli/lib.rs b/cli/lib.rs index a57f224e2..e9a62375a 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -51,6 +51,7 @@ pub mod state; pub mod test_util; mod tokio_util; pub mod version; +mod web_worker; pub mod worker; use crate::deno_error::js_check; @@ -120,7 +121,6 @@ fn create_worker_and_state( global_state.clone(), None, global_state.main_module.clone(), - true, int, ) .map_err(deno_error::print_err_and_exit) @@ -346,16 +346,15 @@ fn bundle_command(flags: DenoFlags) { fn run_repl(flags: DenoFlags) { let (mut worker, _state) = create_worker_and_state(flags); - // Make repl continue to function under uncaught async errors. - worker.set_error_handler(Box::new(|err| { - eprintln!("{}", err.to_string()); - Ok(()) - })); - // Setup runtime. js_check(worker.execute("denoMain()")); let main_future = async move { - let result = worker.await; - js_check(result); + loop { + let result = worker.clone().await; + if let Err(err) = result { + eprintln!("{}", err.to_string()); + worker.clear_exception(); + } + } }; tokio_util::run(main_future); } diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index f93c5a060..203d1e17e 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -23,4 +23,5 @@ pub mod repl; pub mod resources; pub mod timers; pub mod tls; -pub mod workers; +pub mod web_worker; +pub mod worker_host; diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs new file mode 100644 index 000000000..300a0dfd1 --- /dev/null +++ b/cli/ops/web_worker.rs @@ -0,0 +1,77 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{JsonOp, Value}; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; +use crate::ops::json_op; +use crate::state::ThreadSafeState; +use deno_core::*; +use futures; +use futures::future::FutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std; +use std::convert::From; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op( + "worker_post_message", + s.core_op(json_op(s.stateful_op(op_worker_post_message))), + ); + i.register_op( + "worker_get_message", + s.core_op(json_op(s.stateful_op(op_worker_get_message))), + ); +} + +struct GetMessageFuture { + state: ThreadSafeState, +} + +impl Future for GetMessageFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll_next_unpin(cx) + } +} + +/// Get message from host as guest worker +fn op_worker_get_message( + state: &ThreadSafeState, + _args: Value, + _data: Option, +) -> Result { + let op = GetMessageFuture { + state: state.clone(), + }; + + let op = async move { + let maybe_buf = op.await; + debug!("op_worker_get_message"); + Ok(json!({ "data": maybe_buf })) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +/// Post message to host as guest worker +fn op_worker_post_message( + state: &ThreadSafeState, + _args: Value, + data: Option, +) -> Result { + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + let mut channels = state.worker_channels.lock().unwrap(); + let sender = &mut channels.sender; + futures::executor::block_on(sender.send(d)) + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + + Ok(JsonOp::Sync(json!({}))) +} diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs new file mode 100644 index 000000000..c17dee444 --- /dev/null +++ b/cli/ops/worker_host.rs @@ -0,0 +1,343 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; +use crate::deno_error::js_check; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; +use crate::deno_error::GetErrorKind; +use crate::fmt_errors::JSError; +use crate::ops::json_op; +use crate::startup_data; +use crate::state::ThreadSafeState; +use crate::web_worker::WebWorker; +use deno_core::*; +use futures; +use futures::channel::mpsc; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std; +use std::convert::From; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op( + "create_worker", + s.core_op(json_op(s.stateful_op(op_create_worker))), + ); + i.register_op( + "host_get_worker_loaded", + s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), + ); + i.register_op( + "host_poll_worker", + s.core_op(json_op(s.stateful_op(op_host_poll_worker))), + ); + i.register_op( + "host_close_worker", + s.core_op(json_op(s.stateful_op(op_host_close_worker))), + ); + i.register_op( + "host_resume_worker", + s.core_op(json_op(s.stateful_op(op_host_resume_worker))), + ); + i.register_op( + "host_post_message", + s.core_op(json_op(s.stateful_op(op_host_post_message))), + ); + i.register_op( + "host_get_message", + s.core_op(json_op(s.stateful_op(op_host_get_message))), + ); + i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); +} + +struct GetMessageFuture { + state: ThreadSafeState, +} + +impl Future for GetMessageFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll_next_unpin(cx) + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateWorkerArgs { + specifier: String, + has_source_code: bool, + source_code: String, +} + +/// Create worker as the host +fn op_create_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: CreateWorkerArgs = serde_json::from_value(args)?; + + let specifier = args.specifier.as_ref(); + let has_source_code = args.has_source_code; + let source_code = args.source_code; + + let parent_state = state.clone(); + + // TODO(bartlomieju): Isn't this wrong? + let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; + if !has_source_code { + if let Some(referrer) = parent_state.main_module.as_ref() { + let referrer = referrer.clone().to_string(); + module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; + } + } + + let (int, ext) = ThreadSafeState::create_channels(); + let child_state = ThreadSafeState::new( + state.global_state.clone(), + Some(parent_state.permissions.clone()), // by default share with parent + Some(module_specifier.clone()), + int, + )?; + // TODO: add a new option to make child worker not sharing permissions + // with parent (aka .clone(), requests from child won't reflect in parent) + let name = format!("USER-WORKER-{}", specifier); + let mut worker = + WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext); + js_check(worker.execute("workerMain()")); + + let worker_id = parent_state.add_child_worker(worker.clone()); + + // Has provided source code, execute immediately. + if has_source_code { + js_check(worker.execute(&source_code)); + return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); + } + + let (mut sender, receiver) = mpsc::channel::>(1); + + // TODO(bartlomieju): this future should be spawned on the separate thread, + // dedicated to that worker + let fut = async move { + let result = worker + .execute_mod_async(&module_specifier, None, false) + .await; + sender.send(result).await.expect("Failed to send message"); + } + .boxed(); + tokio::spawn(fut); + let mut table = state.loading_workers.lock().unwrap(); + table.insert(worker_id, receiver); + Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) +} + +struct WorkerPollFuture { + state: ThreadSafeState, + rid: ResourceId, +} + +impl Future for WorkerPollFuture { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut workers_table = inner.state.workers.lock().unwrap(); + let maybe_worker = workers_table.get_mut(&inner.rid); + if maybe_worker.is_none() { + return Poll::Ready(Ok(())); + } + match maybe_worker.unwrap().poll_unpin(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } +} + +fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { + if let Err(error) = result { + match error.kind() { + ErrorKind::JSError => { + let error = error.downcast::().unwrap(); + let exception: V8Exception = error.into(); + json!({"error": { + "message": exception.message, + "fileName": exception.script_resource_name, + "lineNumber": exception.line_number, + "columnNumber": exception.start_column, + }}) + } + _ => json!({"error": { + "message": error.to_string(), + }}), + } + } else { + json!({"ok": true}) + } +} + +#[derive(Deserialize)] +struct WorkerArgs { + id: i32, +} + +fn op_host_get_worker_loaded( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let mut table = state.loading_workers.lock().unwrap(); + let mut receiver = table.remove(&id).unwrap(); + + let op = async move { + let result = receiver.next().await.unwrap(); + Ok(serialize_worker_result(result)) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +fn op_host_poll_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let future = WorkerPollFuture { + state: state.clone(), + rid: id, + }; + + let op = async move { + let result = future.await; + + if result.is_err() { + let mut workers_table = state_.workers.lock().unwrap(); + let worker = workers_table.get_mut(&id).unwrap(); + worker.clear_exception(); + } + + Ok(serialize_worker_result(result)) + }; + Ok(JsonOp::Async(op.boxed())) +} + +fn op_host_close_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let mut workers_table = state_.workers.lock().unwrap(); + let maybe_worker = workers_table.remove(&id); + if let Some(worker) = maybe_worker { + let mut channels = worker.state.worker_channels.lock().unwrap(); + channels.sender.close_channel(); + channels.receiver.close(); + }; + + Ok(JsonOp::Sync(json!({}))) +} + +fn op_host_resume_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let mut workers_table = state_.workers.lock().unwrap(); + let worker = workers_table.get_mut(&id).unwrap(); + js_check(worker.execute("workerMain()")); + Ok(JsonOp::Sync(json!({}))) +} + +#[derive(Deserialize)] +struct HostGetMessageArgs { + id: i32, +} + +/// Get message from guest worker as host +fn op_host_get_message( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: HostGetMessageArgs = serde_json::from_value(args)?; + + let id = args.id as u32; + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker.get_message(); + + let op = async move { + let maybe_buf = fut.await.unwrap(); + Ok(json!({ "data": maybe_buf })) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +#[derive(Deserialize)] +struct HostPostMessageArgs { + id: i32, +} + +/// Post message to guest worker as host +fn op_host_post_message( + state: &ThreadSafeState, + args: Value, + data: Option, +) -> Result { + let args: HostPostMessageArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + + debug!("post message to worker {}", id); + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker + .post_message(msg) + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); + futures::executor::block_on(fut)?; + Ok(JsonOp::Sync(json!({}))) +} + +fn op_metrics( + state: &ThreadSafeState, + _args: Value, + _zero_copy: Option, +) -> Result { + let m = &state.metrics; + + Ok(JsonOp::Sync(json!({ + "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, + "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, + "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, + "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, + "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 + }))) +} diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs deleted file mode 100644 index eeffb3930..000000000 --- a/cli/ops/workers.rs +++ /dev/null @@ -1,394 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; -use crate::deno_error::bad_resource; -use crate::deno_error::js_check; -use crate::deno_error::DenoError; -use crate::deno_error::ErrorKind; -use crate::deno_error::GetErrorKind; -use crate::fmt_errors::JSError; -use crate::ops::json_op; -use crate::startup_data; -use crate::state::ThreadSafeState; -use crate::worker::Worker; -use deno_core::*; -use futures; -use futures::channel::mpsc; -use futures::future::FutureExt; -use futures::future::TryFutureExt; -use futures::sink::SinkExt; -use futures::stream::StreamExt; -use std; -use std::convert::From; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; - -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { - i.register_op( - "create_worker", - s.core_op(json_op(s.stateful_op(op_create_worker))), - ); - i.register_op( - "host_get_worker_loaded", - s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), - ); - i.register_op( - "host_poll_worker", - s.core_op(json_op(s.stateful_op(op_host_poll_worker))), - ); - i.register_op( - "host_close_worker", - s.core_op(json_op(s.stateful_op(op_host_close_worker))), - ); - i.register_op( - "host_resume_worker", - s.core_op(json_op(s.stateful_op(op_host_resume_worker))), - ); - i.register_op( - "host_post_message", - s.core_op(json_op(s.stateful_op(op_host_post_message))), - ); - i.register_op( - "host_get_message", - s.core_op(json_op(s.stateful_op(op_host_get_message))), - ); - // TODO: make sure these two ops are only accessible to appropriate Worker - i.register_op( - "worker_post_message", - s.core_op(json_op(s.stateful_op(op_worker_post_message))), - ); - i.register_op( - "worker_get_message", - s.core_op(json_op(s.stateful_op(op_worker_get_message))), - ); - i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); -} - -struct GetMessageFuture { - state: ThreadSafeState, -} - -impl Future for GetMessageFuture { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut channels = inner.state.worker_channels.lock().unwrap(); - let receiver = &mut channels.receiver; - receiver.poll_next_unpin(cx) - } -} - -/// Get message from host as guest worker -fn op_worker_get_message( - state: &ThreadSafeState, - _args: Value, - _data: Option, -) -> Result { - let op = GetMessageFuture { - state: state.clone(), - }; - - let op = async move { - let maybe_buf = op.await; - debug!("op_worker_get_message"); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -/// Post message to host as guest worker -fn op_worker_post_message( - state: &ThreadSafeState, - _args: Value, - data: Option, -) -> Result { - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let mut channels = state.worker_channels.lock().unwrap(); - let sender = &mut channels.sender; - futures::executor::block_on(sender.send(d)) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - - Ok(JsonOp::Sync(json!({}))) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateWorkerArgs { - specifier: String, - include_deno_namespace: bool, - has_source_code: bool, - source_code: String, -} - -/// Create worker as the host -fn op_create_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: CreateWorkerArgs = serde_json::from_value(args)?; - - let specifier = args.specifier.as_ref(); - // Only include deno namespace if requested AND current worker - // has included namespace (to avoid escalation). - let include_deno_namespace = - args.include_deno_namespace && state.include_deno_namespace; - let has_source_code = args.has_source_code; - let source_code = args.source_code; - - let parent_state = state.clone(); - - // TODO(bartlomieju): Isn't this wrong? - let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; - if !has_source_code { - if let Some(referrer) = parent_state.main_module.as_ref() { - let referrer = referrer.clone().to_string(); - module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; - } - } - - let (int, ext) = ThreadSafeState::create_channels(); - let child_state = ThreadSafeState::new( - state.global_state.clone(), - Some(parent_state.permissions.clone()), // by default share with parent - Some(module_specifier.clone()), - include_deno_namespace, - int, - )?; - // TODO: add a new option to make child worker not sharing permissions - // with parent (aka .clone(), requests from child won't reflect in parent) - let name = format!("USER-WORKER-{}", specifier); - let deno_main_call = format!("denoMain({})", include_deno_namespace); - let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), child_state, ext); - js_check(worker.execute(&deno_main_call)); - js_check(worker.execute("workerMain()")); - - let worker_id = parent_state.add_child_worker(worker.clone()); - - // Has provided source code, execute immediately. - if has_source_code { - js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); - } - - let (mut sender, receiver) = mpsc::channel::>(1); - - // TODO(bartlomieju): this future should be spawned on the separate thread, - // dedicated to that worker - let fut = async move { - let result = worker - .execute_mod_async(&module_specifier, None, false) - .await; - sender.send(result).await.expect("Failed to send message"); - } - .boxed(); - tokio::spawn(fut); - let mut table = state.loading_workers.lock().unwrap(); - table.insert(worker_id, receiver); - Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) -} - -struct WorkerPollFuture { - state: ThreadSafeState, - rid: ResourceId, -} - -impl Future for WorkerPollFuture { - type Output = Result<(), ErrBox>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut workers_table = inner.state.workers.lock().unwrap(); - let maybe_worker = workers_table.get_mut(&inner.rid); - if maybe_worker.is_none() { - return Poll::Ready(Ok(())); - } - match maybe_worker.unwrap().poll_unpin(cx) { - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Pending => Poll::Pending, - } - } -} - -fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { - if let Err(error) = result { - match error.kind() { - ErrorKind::JSError => { - let error = error.downcast::().unwrap(); - let exception: V8Exception = error.into(); - json!({"error": { - "message": exception.message, - "fileName": exception.script_resource_name, - "lineNumber": exception.line_number, - "columnNumber": exception.start_column, - }}) - } - _ => json!({"error": { - "message": error.to_string(), - }}), - } - } else { - json!({"ok": true}) - } -} - -#[derive(Deserialize)] -struct WorkerArgs { - id: i32, -} - -fn op_host_get_worker_loaded( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let mut table = state.loading_workers.lock().unwrap(); - let mut receiver = table.remove(&id).unwrap(); - - let op = async move { - let result = receiver.next().await.unwrap(); - Ok(serialize_worker_result(result)) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -fn op_host_poll_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state_ = state.clone(); - - let future = WorkerPollFuture { - state: state.clone(), - rid: id, - }; - - let op = async move { - let result = future.await; - - if result.is_err() { - let mut workers_table = state_.workers.lock().unwrap(); - let worker = workers_table.get_mut(&id).unwrap(); - worker.clear_exception(); - } - - Ok(serialize_worker_result(result)) - }; - Ok(JsonOp::Async(op.boxed())) -} - -fn op_host_close_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state_ = state.clone(); - - let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker = workers_table.remove(&id); - if let Some(worker) = maybe_worker { - let mut channels = worker.state.worker_channels.lock().unwrap(); - channels.sender.close_channel(); - channels.receiver.close(); - }; - - Ok(JsonOp::Sync(json!({}))) -} - -fn op_host_resume_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state_ = state.clone(); - - let mut workers_table = state_.workers.lock().unwrap(); - let worker = workers_table.get_mut(&id).unwrap(); - js_check(worker.execute("workerMain()")); - Ok(JsonOp::Sync(json!({}))) -} - -#[derive(Deserialize)] -struct HostGetMessageArgs { - id: i32, -} - -/// Get message from guest worker as host -fn op_host_get_message( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: HostGetMessageArgs = serde_json::from_value(args)?; - - let id = args.id as u32; - let mut table = state.workers.lock().unwrap(); - // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker.get_message(); - - let op = async move { - let maybe_buf = fut.await.unwrap(); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -#[derive(Deserialize)] -struct HostPostMessageArgs { - id: i32, -} - -/// Post message to guest worker as host -fn op_host_post_message( - state: &ThreadSafeState, - args: Value, - data: Option, -) -> Result { - let args: HostPostMessageArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - - debug!("post message to worker {}", id); - let mut table = state.workers.lock().unwrap(); - // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker - .post_message(msg) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); - futures::executor::block_on(fut)?; - Ok(JsonOp::Sync(json!({}))) -} - -fn op_metrics( - state: &ThreadSafeState, - _args: Value, - _zero_copy: Option, -) -> Result { - let m = &state.metrics; - - Ok(JsonOp::Sync(json!({ - "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, - "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, - "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, - "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, - "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 - }))) -} diff --git a/cli/state.rs b/cli/state.rs index acd661f25..4ad8241be 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -7,7 +7,7 @@ use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; -use crate::worker::Worker; +use crate::web_worker::WebWorker; use crate::worker::WorkerChannels; use deno_core::Buf; use deno_core::CoreOp; @@ -44,7 +44,6 @@ pub struct ThreadSafeState(Arc); #[cfg_attr(feature = "cargo-clippy", allow(stutter))] pub struct State { pub global_state: ThreadSafeGlobalState, - pub modules: Arc>, pub permissions: Arc>, pub main_module: Option, pub worker_channels: Mutex, @@ -53,12 +52,11 @@ pub struct State { pub import_map: Option, pub metrics: Metrics, pub global_timer: Mutex, - pub workers: Mutex>, + pub workers: Mutex>, pub loading_workers: Mutex>>>, pub next_worker_id: AtomicUsize, pub start_time: Instant, pub seeded_rng: Option>, - pub include_deno_namespace: bool, pub resource_table: Mutex, } @@ -219,7 +217,6 @@ impl ThreadSafeState { // If Some(perm), use perm. Else copy from global_state. shared_permissions: Option>>, main_module: Option, - include_deno_namespace: bool, internal_channels: WorkerChannels, ) -> Result { let import_map: Option = @@ -233,7 +230,6 @@ impl ThreadSafeState { None => None, }; - let modules = Arc::new(Mutex::new(deno_core::Modules::new())); let permissions = if let Some(perm) = shared_permissions { perm } else { @@ -242,7 +238,6 @@ impl ThreadSafeState { let state = State { global_state, - modules, main_module, permissions, import_map, @@ -254,14 +249,14 @@ impl ThreadSafeState { next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, - include_deno_namespace, + resource_table: Mutex::new(ResourceTable::default()), }; Ok(ThreadSafeState(Arc::new(state))) } - pub fn add_child_worker(&self, worker: Worker) -> u32 { + pub fn add_child_worker(&self, worker: WebWorker) -> u32 { let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; let mut workers_tl = self.workers.lock().unwrap(); workers_tl.insert(worker_id, worker); @@ -344,7 +339,6 @@ impl ThreadSafeState { ThreadSafeGlobalState::mock(argv), None, module_specifier, - true, internal_channels, ) .unwrap() diff --git a/cli/tests/026_workers.ts b/cli/tests/026_workers.ts index 7ac1a0f32..3043cc7b9 100644 --- a/cli/tests/026_workers.ts +++ b/cli/tests/026_workers.ts @@ -1,5 +1,5 @@ -const jsWorker = new Worker("./subdir/test_worker.js"); -const tsWorker = new Worker("./subdir/test_worker.ts"); +const jsWorker = new Worker("./subdir/test_worker.js", { type: "module" }); +const tsWorker = new Worker("./subdir/test_worker.ts", { type: "module" }); tsWorker.onmessage = (e): void => { console.log("Received ts: " + e.data); diff --git a/cli/tests/039_worker_deno_ns.ts b/cli/tests/039_worker_deno_ns.ts index 80ada4343..7cb7de7fb 100644 --- a/cli/tests/039_worker_deno_ns.ts +++ b/cli/tests/039_worker_deno_ns.ts @@ -1,7 +1,5 @@ -const w1 = new Worker("./039_worker_deno_ns/has_ns.ts"); -const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", { - noDenoNamespace: true -}); +const w1 = new Worker("./039_worker_deno_ns/has_ns.ts", { type: "module" }); +const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", { type: "module" }); let w1MsgCount = 0; let w2MsgCount = 0; w1.onmessage = (msg): void => { diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index eea1dd2c9..3e5073b45 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -301,6 +301,7 @@ itest!(_038_checkjs { output: "038_checkjs.js.out", }); +/* TODO(bartlomieju): itest!(_039_worker_deno_ns { args: "run --reload 039_worker_deno_ns.ts", output: "039_worker_deno_ns.ts.out", @@ -310,6 +311,7 @@ itest!(_040_worker_blob { args: "run --reload 040_worker_blob.ts", output: "040_worker_blob.ts.out", }); +*/ itest!(_041_dyn_import_eval { args: "eval import('./subdir/mod4.js').then(console.log)", @@ -567,12 +569,14 @@ itest!(error_type_definitions { output: "error_type_definitions.ts.out", }); +/* TODO(bartlomieju) itest!(error_worker_dynamic { args: "run --reload error_worker_dynamic.ts", check_stderr: true, exit_code: 1, output: "error_worker_dynamic.ts.out", }); +*/ itest!(exit_error42 { exit_code: 42, diff --git a/cli/tests/subdir/bench_worker.ts b/cli/tests/subdir/bench_worker.ts index 094cefb80..696a84b9f 100644 --- a/cli/tests/subdir/bench_worker.ts +++ b/cli/tests/subdir/bench_worker.ts @@ -14,6 +14,7 @@ onmessage = function(e): void { postMessage({ cmdId }); break; case 3: // Close + postMessage({ cmdId: 3 }); workerClose(); break; } diff --git a/cli/tests/workers_round_robin_bench.ts b/cli/tests/workers_round_robin_bench.ts index 992ce38dc..e8f5b2d30 100644 --- a/cli/tests/workers_round_robin_bench.ts +++ b/cli/tests/workers_round_robin_bench.ts @@ -37,12 +37,11 @@ function handleAsyncMsgFromWorker( async function main(): Promise { const workers: Array<[Map>, Worker]> = []; for (let i = 1; i <= workerCount; ++i) { - const worker = new Worker("./subdir/bench_worker.ts"); - const promise = new Promise((resolve): void => { - worker.onmessage = (e): void => { - if (e.data.cmdId === 0) resolve(); - }; - }); + const worker = new Worker("./subdir/bench_worker.ts", { type: "module" }); + const promise = createResolvable(); + worker.onmessage = (e): void => { + if (e.data.cmdId === 0) promise.resolve(); + }; worker.postMessage({ cmdId: 0, action: 2 }); await promise; workers.push([new Map(), worker]); @@ -66,8 +65,12 @@ async function main(): Promise { } } for (const [, worker] of workers) { + const promise = createResolvable(); + worker.onmessage = (e): void => { + if (e.data.cmdId === 3) promise.resolve(); + }; worker.postMessage({ action: 3 }); - await worker.closed; // Required to avoid a cmdId not in table error. + await promise; } console.log("Finished!"); } diff --git a/cli/tests/workers_startup_bench.ts b/cli/tests/workers_startup_bench.ts index 5d2c24b89..60c15a4b1 100644 --- a/cli/tests/workers_startup_bench.ts +++ b/cli/tests/workers_startup_bench.ts @@ -4,7 +4,7 @@ const workerCount = 50; async function bench(): Promise { const workers: Worker[] = []; for (let i = 1; i <= workerCount; ++i) { - const worker = new Worker("./subdir/bench_worker.ts"); + const worker = new Worker("./subdir/bench_worker.ts", { type: "module" }); const promise = new Promise((resolve): void => { worker.onmessage = (e): void => { if (e.data.cmdId === 0) resolve(); @@ -16,8 +16,13 @@ async function bench(): Promise { } console.log("Done creating workers closing workers!"); for (const worker of workers) { + const promise = new Promise((resolve): void => { + worker.onmessage = (e): void => { + if (e.data.cmdId === 3) resolve(); + }; + }); worker.postMessage({ action: 3 }); - await worker.closed; // Required to avoid a cmdId not in table error. + await promise; } console.log("Finished!"); } diff --git a/cli/web_worker.rs b/cli/web_worker.rs new file mode 100644 index 000000000..f933cbdc4 --- /dev/null +++ b/cli/web_worker.rs @@ -0,0 +1,145 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use crate::fmt_errors::JSError; +use crate::ops; +use crate::state::ThreadSafeState; +use crate::worker::WorkerChannels; +use crate::worker::WorkerReceiver; +use deno_core; +use deno_core::Buf; +use deno_core::ErrBox; +use deno_core::ModuleSpecifier; +use deno_core::StartupData; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::task::AtomicWaker; +use std::env; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::Mutex; +use std::task::Context; +use std::task::Poll; +use tokio::sync::Mutex as AsyncMutex; +use url::Url; + +#[derive(Clone)] +pub struct WebWorker { + pub name: String, + pub isolate: Arc>>, + pub state: ThreadSafeState, + external_channels: Arc>, +} + +impl WebWorker { + pub fn new( + name: String, + startup_data: StartupData, + state: ThreadSafeState, + external_channels: WorkerChannels, + ) -> Self { + let mut isolate = + deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); + + ops::web_worker::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); + + let global_state_ = state.global_state.clone(); + isolate.set_js_error_create(move |v8_exception| { + JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) + }); + + Self { + name, + isolate: Arc::new(AsyncMutex::new(isolate)), + state, + external_channels: Arc::new(Mutex::new(external_channels)), + } + } + + /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". + pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> { + let path = env::current_dir().unwrap().join("__anonymous__"); + let url = Url::from_file_path(path).unwrap(); + self.execute2(url.as_str(), js_source) + } + + /// Executes the provided JavaScript source code. The js_filename argument is + /// provided only for debugging purposes. + fn execute2( + &mut self, + js_filename: &str, + js_source: &str, + ) -> Result<(), ErrBox> { + let mut isolate = self.isolate.try_lock().unwrap(); + isolate.execute(js_filename, js_source) + } + + /// Executes the provided JavaScript module. + /// + /// Takes ownership of the isolate behind mutex. + pub async fn execute_mod_async( + &mut self, + module_specifier: &ModuleSpecifier, + maybe_code: Option, + is_prefetch: bool, + ) -> Result<(), ErrBox> { + let specifier = module_specifier.to_string(); + let worker = self.clone(); + + let mut isolate = self.isolate.lock().await; + let id = isolate.load_module(&specifier, maybe_code).await?; + worker.state.global_state.progress.done(); + + if !is_prefetch { + return isolate.mod_evaluate(id); + } + + Ok(()) + } + + /// Post message to worker as a host. + /// + /// This method blocks current thread. + pub fn post_message( + &self, + buf: Buf, + ) -> impl Future> { + let channels = self.external_channels.lock().unwrap(); + let mut sender = channels.sender.clone(); + async move { + let result = sender.send(buf).map_err(ErrBox::from).await; + drop(sender); + result + } + } + + /// Get message from worker as a host. + pub fn get_message(&self) -> WorkerReceiver { + WorkerReceiver { + channels: self.external_channels.clone(), + } + } + + pub fn clear_exception(&mut self) { + let mut isolate = self.isolate.try_lock().unwrap(); + isolate.clear_exception(); + } +} + +impl Future for WebWorker { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let waker = AtomicWaker::new(); + waker.register(cx.waker()); + match inner.isolate.try_lock() { + Ok(mut isolate) => isolate.poll_unpin(cx), + Err(_) => { + waker.wake(); + Poll::Pending + } + } + } +} diff --git a/cli/worker.rs b/cli/worker.rs index 7faf17e60..4601a6021 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -48,55 +48,42 @@ impl Worker { state: ThreadSafeState, external_channels: WorkerChannels, ) -> Self { - let isolate = Arc::new(AsyncMutex::new(deno_core::EsIsolate::new( - Box::new(state.clone()), - startup_data, - false, - ))); - { - let mut i = isolate.try_lock().unwrap(); - let op_registry = i.op_registry.clone(); - - ops::compiler::init(&mut i, &state); - ops::errors::init(&mut i, &state); - ops::fetch::init(&mut i, &state); - ops::files::init(&mut i, &state); - ops::fs::init(&mut i, &state); - ops::io::init(&mut i, &state); - ops::plugins::init(&mut i, &state, op_registry); - ops::net::init(&mut i, &state); - ops::tls::init(&mut i, &state); - ops::os::init(&mut i, &state); - ops::permissions::init(&mut i, &state); - ops::process::init(&mut i, &state); - ops::random::init(&mut i, &state); - ops::repl::init(&mut i, &state); - ops::resources::init(&mut i, &state); - ops::timers::init(&mut i, &state); - ops::workers::init(&mut i, &state); - - let global_state_ = state.global_state.clone(); - i.set_js_error_create(move |v8_exception| { - JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) - }) - } + let mut isolate = + deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); + let op_registry = isolate.op_registry.clone(); + + ops::compiler::init(&mut isolate, &state); + ops::errors::init(&mut isolate, &state); + ops::fetch::init(&mut isolate, &state); + ops::files::init(&mut isolate, &state); + ops::fs::init(&mut isolate, &state); + ops::io::init(&mut isolate, &state); + ops::plugins::init(&mut isolate, &state, op_registry); + ops::net::init(&mut isolate, &state); + ops::tls::init(&mut isolate, &state); + ops::os::init(&mut isolate, &state); + ops::permissions::init(&mut isolate, &state); + ops::process::init(&mut isolate, &state); + ops::random::init(&mut isolate, &state); + ops::repl::init(&mut isolate, &state); + ops::resources::init(&mut isolate, &state); + ops::timers::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); + ops::web_worker::init(&mut isolate, &state); + + let global_state_ = state.global_state.clone(); + isolate.set_js_error_create(move |v8_exception| { + JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) + }); Self { name, - isolate, + isolate: Arc::new(AsyncMutex::new(isolate)), state, external_channels: Arc::new(Mutex::new(external_channels)), } } - pub fn set_error_handler( - &mut self, - handler: Box Result<(), ErrBox>>, - ) { - let mut i = self.isolate.try_lock().unwrap(); - i.set_error_handler(handler); - } - /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> { let path = env::current_dir().unwrap().join("__anonymous__"); @@ -188,7 +175,7 @@ impl Future for Worker { /// that will return message received from worker or None /// if worker's channel has been closed. pub struct WorkerReceiver { - channels: Arc>, + pub channels: Arc>, } impl Future for WorkerReceiver { @@ -255,7 +242,6 @@ mod tests { global_state, None, Some(module_specifier.clone()), - true, int, ) .unwrap(); @@ -299,7 +285,6 @@ mod tests { global_state, None, Some(module_specifier.clone()), - true, int, ) .unwrap(); @@ -342,7 +327,6 @@ mod tests { global_state.clone(), None, Some(module_specifier.clone()), - true, int, ) .unwrap(); diff --git a/core/isolate.rs b/core/isolate.rs index 5617caa86..f734f687c 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -440,6 +440,8 @@ impl Isolate { isolate.exit(); } + // TODO(bartlomieju): `error_handler` should be removed + #[allow(dead_code)] pub fn set_error_handler(&mut self, handler: Box) { self.error_handler = Some(handler); } -- cgit v1.2.3