diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-01-21 09:49:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-21 09:49:47 +0100 |
commit | 7966bf14c062a05b1606a62c996890571454ecc8 (patch) | |
tree | 65bede64b47707c3accc80d0bb18e99840c639f7 /cli | |
parent | c90036ab88bb1ae6b9c87d5e368f56d8c8afab69 (diff) |
refactor: split worker and worker host logic (#3722)
* split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs
* refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime
* BREAKING CHANGE: remove support for blob: URL in Worker
* BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor
* introduce WebWorker struct which is a stripped down version of cli::Worker
Diffstat (limited to 'cli')
-rw-r--r-- | cli/compilers/ts.rs | 2 | ||||
-rw-r--r-- | cli/compilers/wasm.rs | 2 | ||||
-rw-r--r-- | cli/js/compiler.ts | 2 | ||||
-rw-r--r-- | cli/js/globals.ts | 11 | ||||
-rw-r--r-- | cli/js/lib.deno_runtime.d.ts | 35 | ||||
-rw-r--r-- | cli/js/worker_main.ts | 98 | ||||
-rw-r--r-- | cli/js/workers.ts | 128 | ||||
-rw-r--r-- | cli/lib.rs | 17 | ||||
-rw-r--r-- | cli/ops/mod.rs | 3 | ||||
-rw-r--r-- | cli/ops/web_worker.rs | 77 | ||||
-rw-r--r-- | cli/ops/worker_host.rs (renamed from cli/ops/workers.rs) | 55 | ||||
-rw-r--r-- | cli/state.rs | 14 | ||||
-rw-r--r-- | cli/tests/026_workers.ts | 4 | ||||
-rw-r--r-- | cli/tests/039_worker_deno_ns.ts | 6 | ||||
-rw-r--r-- | cli/tests/integration_tests.rs | 4 | ||||
-rw-r--r-- | cli/tests/subdir/bench_worker.ts | 1 | ||||
-rw-r--r-- | cli/tests/workers_round_robin_bench.ts | 17 | ||||
-rw-r--r-- | cli/tests/workers_startup_bench.ts | 9 | ||||
-rw-r--r-- | cli/web_worker.rs | 145 | ||||
-rw-r--r-- | cli/worker.rs | 74 |
20 files changed, 439 insertions, 265 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index e8abbcd27..037043368 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -231,7 +231,7 @@ impl TsCompiler { fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = - ThreadSafeState::new(global_state.clone(), None, None, true, int) + ThreadSafeState::new(global_state.clone(), None, None, int) .expect("Unable to create worker state"); // Count how many times we start the compiler worker. diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index e3297283b..ca889be1f 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -45,7 +45,7 @@ impl WasmCompiler { fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = - ThreadSafeState::new(global_state.clone(), None, None, true, int) + ThreadSafeState::new(global_state.clone(), None, None, int) .expect("Unable to create worker state"); // Count how many times we start the compiler worker. diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts index 79b9fdaf6..7addfc5ca 100644 --- a/cli/js/compiler.ts +++ b/cli/js/compiler.ts @@ -32,7 +32,7 @@ import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts"; import * as os from "./os.ts"; import { assert } from "./util.ts"; import * as util from "./util.ts"; -import { postMessage, workerClose, workerMain } from "./workers.ts"; +import { postMessage, workerClose, workerMain } from "./worker_main.ts"; const self = globalThis; diff --git a/cli/js/globals.ts b/cli/js/globals.ts index 5754002c0..0f364b8e0 100644 --- a/cli/js/globals.ts +++ b/cli/js/globals.ts @@ -21,6 +21,7 @@ import * as textEncoding from "./text_encoding.ts"; import * as timers from "./timers.ts"; import * as url from "./url.ts"; import * as urlSearchParams from "./url_search_params.ts"; +import * as workerRuntime from "./worker_main.ts"; import * as workers from "./workers.ts"; import * as performanceUtil from "./performance.ts"; import * as request from "./request.ts"; @@ -194,12 +195,12 @@ const globalProperties = { Response: nonEnumerable(fetchTypes.Response), performance: writable(new performanceUtil.Performance()), - onmessage: writable(workers.onmessage), - onerror: writable(workers.onerror), + onmessage: writable(workerRuntime.onmessage), + onerror: writable(workerRuntime.onerror), - workerMain: nonEnumerable(workers.workerMain), - workerClose: nonEnumerable(workers.workerClose), - postMessage: writable(workers.postMessage), + workerMain: nonEnumerable(workerRuntime.workerMain), + workerClose: nonEnumerable(workerRuntime.workerClose), + postMessage: writable(workerRuntime.postMessage), Worker: nonEnumerable(workers.WorkerImpl), [domTypes.eventTargetHost]: nonEnumerable(null), diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts index 0fa183348..05553ffb7 100644 --- a/cli/js/lib.deno_runtime.d.ts +++ b/cli/js/lib.deno_runtime.d.ts @@ -2128,9 +2128,9 @@ declare interface Window { performance: __performanceUtil.Performance; onmessage: (e: { data: any }) => void; onerror: undefined | typeof onerror; - workerMain: typeof __workers.workerMain; - workerClose: typeof __workers.workerClose; - postMessage: typeof __workers.postMessage; + workerMain: typeof __workerMain.workerMain; + workerClose: typeof __workerMain.workerClose; + postMessage: typeof __workerMain.postMessage; Worker: typeof __workers.WorkerImpl; addEventListener: ( type: string, @@ -2187,9 +2187,9 @@ declare let onerror: e: Event ) => boolean | void) | undefined; -declare const workerMain: typeof __workers.workerMain; -declare const workerClose: typeof __workers.workerClose; -declare const postMessage: typeof __workers.postMessage; +declare const workerMain: typeof __workerMain.workerMain; +declare const workerClose: typeof __workerMain.workerClose; +declare const postMessage: typeof __workerMain.postMessage; declare const Worker: typeof __workers.WorkerImpl; declare const addEventListener: ( type: string, @@ -3437,31 +3437,25 @@ declare namespace __url { }; } -declare namespace __workers { - // @url js/workers.d.ts - - export function encodeMessage(data: any): Uint8Array; - export function decodeMessage(dataIntArray: Uint8Array): any; +declare namespace __workerMain { export let onmessage: (e: { data: any }) => void; export function postMessage(data: any): void; export function getMessage(): Promise<any>; export let isClosing: boolean; export function workerClose(): void; export function workerMain(): Promise<void>; +} + +declare namespace __workers { + // @url js/workers.d.ts export interface Worker { onerror?: (e: Event) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; - closed: Promise<void>; } - export interface WorkerOptions {} - /** Extended Deno Worker initialization options. - * `noDenoNamespace` hides global `window.Deno` namespace for - * spawned worker and nested workers spawned by it (default: false). - */ - export interface DenoWorkerOptions extends WorkerOptions { - noDenoNamespace?: boolean; + export interface WorkerOptions { + type?: "classic" | "module"; } export class WorkerImpl implements Worker { private readonly id; @@ -3470,8 +3464,7 @@ declare namespace __workers { onerror?: (e: Event) => void; onmessage?: (data: any) => void; onmessageerror?: () => void; - constructor(specifier: string, options?: DenoWorkerOptions); - readonly closed: Promise<void>; + constructor(specifier: string, options?: WorkerOptions); postMessage(data: any): void; private run; } diff --git a/cli/js/worker_main.ts b/cli/js/worker_main.ts new file mode 100644 index 000000000..cb70057ea --- /dev/null +++ b/cli/js/worker_main.ts @@ -0,0 +1,98 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { core } from "./core.ts"; +import * as dispatch from "./dispatch.ts"; +import { sendAsync, sendSync } from "./dispatch_json.ts"; +import { log } from "./util.ts"; +import { TextDecoder, TextEncoder } from "./text_encoding.ts"; + +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +function encodeMessage(data: any): Uint8Array { + const dataJson = JSON.stringify(data); + return encoder.encode(dataJson); +} + +function decodeMessage(dataIntArray: Uint8Array): any { + const dataJson = decoder.decode(dataIntArray); + return JSON.parse(dataJson); +} + +// Stuff for workers +export const onmessage: (e: { data: any }) => void = (): void => {}; +export const onerror: (e: { data: any }) => void = (): void => {}; + +export function postMessage(data: any): void { + const dataIntArray = encodeMessage(data); + sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); +} + +export async function getMessage(): Promise<any> { + log("getMessage"); + const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); + if (res.data != null) { + return decodeMessage(new Uint8Array(res.data)); + } else { + return null; + } +} + +export let isClosing = false; + +export function workerClose(): void { + isClosing = true; +} + +export async function workerMain(): Promise<void> { + const ops = core.ops(); + // TODO(bartlomieju): this is a prototype, we should come up with + // something a bit more sophisticated + for (const [name, opId] of Object.entries(ops)) { + const opName = `OP_${name.toUpperCase()}`; + // Assign op ids to actual variables + // TODO(ry) This type casting is gross and should be fixed. + ((dispatch as unknown) as { [key: string]: number })[opName] = opId; + core.setAsyncHandler(opId, dispatch.getAsyncHandler(opName)); + } + + log("workerMain"); + + while (!isClosing) { + const data = await getMessage(); + if (data == null) { + log("workerMain got null message. quitting."); + break; + } + + let result: void | Promise<void>; + const event = { data }; + + try { + if (!globalThis["onmessage"]) { + break; + } + result = globalThis.onmessage!(event); + if (result && "then" in result) { + await result; + } + if (!globalThis["onmessage"]) { + break; + } + } catch (e) { + if (globalThis["onerror"]) { + const result = globalThis.onerror( + e.message, + e.fileName, + e.lineNumber, + e.columnNumber, + e + ); + if (result === true) { + continue; + } + } + throw e; + } + } +} diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 7e8219e19..60ef73da0 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -2,35 +2,35 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import * as dispatch from "./dispatch.ts"; import { sendAsync, sendSync } from "./dispatch_json.ts"; -import { log, createResolvable, Resolvable } from "./util.ts"; +import { log } from "./util.ts"; import { TextDecoder, TextEncoder } from "./text_encoding.ts"; +/* import { blobURLMap } from "./url.ts"; import { blobBytesWeakMap } from "./blob.ts"; +*/ import { Event } from "./event.ts"; import { EventTarget } from "./event_target.ts"; const encoder = new TextEncoder(); const decoder = new TextDecoder(); -export function encodeMessage(data: any): Uint8Array { +function encodeMessage(data: any): Uint8Array { const dataJson = JSON.stringify(data); return encoder.encode(dataJson); } -export function decodeMessage(dataIntArray: Uint8Array): any { +function decodeMessage(dataIntArray: Uint8Array): any { const dataJson = decoder.decode(dataIntArray); return JSON.parse(dataJson); } function createWorker( specifier: string, - includeDenoNamespace: boolean, hasSourceCode: boolean, sourceCode: Uint8Array ): { id: number; loaded: boolean } { return sendSync(dispatch.OP_CREATE_WORKER, { specifier, - includeDenoNamespace, hasSourceCode, sourceCode: new TextDecoder().decode(sourceCode) }); @@ -67,92 +67,15 @@ async function hostGetMessage(id: number): Promise<any> { } } -// Stuff for workers -export const onmessage: (e: { data: any }) => void = (): void => {}; -export const onerror: (e: { data: any }) => void = (): void => {}; - -export function postMessage(data: any): void { - const dataIntArray = encodeMessage(data); - sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray); -} - -export async function getMessage(): Promise<any> { - log("getMessage"); - const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE); - if (res.data != null) { - return decodeMessage(new Uint8Array(res.data)); - } else { - return null; - } -} - -export let isClosing = false; - -export function workerClose(): void { - isClosing = true; -} - -export async function workerMain(): Promise<void> { - log("workerMain"); - - while (!isClosing) { - const data = await getMessage(); - if (data == null) { - log("workerMain got null message. quitting."); - break; - } - - let result: void | Promise<void>; - const event = { data }; - - try { - if (!globalThis["onmessage"]) { - break; - } - result = globalThis.onmessage!(event); - if (result && "then" in result) { - await result; - } - if (!globalThis["onmessage"]) { - break; - } - } catch (e) { - if (globalThis["onerror"]) { - const result = globalThis.onerror( - e.message, - e.fileName, - e.lineNumber, - e.columnNumber, - e - ); - if (result === true) { - continue; - } - } - throw e; - } - } -} - export interface Worker { onerror?: (e: any) => void; onmessage?: (e: { data: any }) => void; onmessageerror?: () => void; postMessage(data: any): void; - // TODO(bartlomieju): remove this - closed: Promise<void>; } -// TODO(kevinkassimo): Maybe implement reasonable web worker options? -// eslint-disable-next-line @typescript-eslint/no-empty-interface -export interface WorkerOptions {} - -/** Extended Deno Worker initialization options. - * `noDenoNamespace` hides global `globalThis.Deno` namespace for - * spawned worker and nested workers spawned by it (default: false). - */ -export interface DenoWorkerOptions extends WorkerOptions { - noDenoNamespace?: boolean; +export interface WorkerOptions { + type?: "classic" | "module"; } export class WorkerImpl extends EventTarget implements Worker { @@ -160,20 +83,29 @@ export class WorkerImpl extends EventTarget implements Worker { private isClosing = false; private messageBuffer: any[] = []; private ready = false; - private readonly isClosedPromise: Resolvable<void>; public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; - constructor(specifier: string, options?: DenoWorkerOptions) { + constructor(specifier: string, options?: WorkerOptions) { super(); - let hasSourceCode = false; - let sourceCode = new Uint8Array(); - let includeDenoNamespace = true; - if (options && options.noDenoNamespace) { - includeDenoNamespace = false; + let type = "classic"; + + if (options?.type) { + type = options.type; + } + + if (type !== "module") { + throw new Error( + 'Not yet implemented: only "module" type workers are supported' + ); } + + const hasSourceCode = false; + const sourceCode = new Uint8Array(); + + /* TODO(bartlomieju): // Handle blob URL. if (specifier.startsWith("blob:")) { hasSourceCode = true; @@ -187,23 +119,14 @@ export class WorkerImpl extends EventTarget implements Worker { } sourceCode = blobBytes!; } + */ - const { id, loaded } = createWorker( - specifier, - includeDenoNamespace, - hasSourceCode, - sourceCode - ); + const { id, loaded } = createWorker(specifier, hasSourceCode, sourceCode); this.id = id; this.ready = loaded; - this.isClosedPromise = createResolvable(); this.poll(); } - get closed(): Promise<void> { - return this.isClosedPromise; - } - private handleError(e: any): boolean { // TODO: this is being handled in a type unsafe way, it should be type safe // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -259,7 +182,6 @@ export class WorkerImpl extends EventTarget implements Worker { } else { this.isClosing = true; hostCloseWorker(this.id); - this.isClosedPromise.resolve(); break; } } diff --git a/cli/lib.rs b/cli/lib.rs index a57f224e2..e9a62375a 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -51,6 +51,7 @@ pub mod state; pub mod test_util; mod tokio_util; pub mod version; +mod web_worker; pub mod worker; use crate::deno_error::js_check; @@ -120,7 +121,6 @@ fn create_worker_and_state( global_state.clone(), None, global_state.main_module.clone(), - true, int, ) .map_err(deno_error::print_err_and_exit) @@ -346,16 +346,15 @@ fn bundle_command(flags: DenoFlags) { fn run_repl(flags: DenoFlags) { let (mut worker, _state) = create_worker_and_state(flags); - // Make repl continue to function under uncaught async errors. - worker.set_error_handler(Box::new(|err| { - eprintln!("{}", err.to_string()); - Ok(()) - })); - // Setup runtime. js_check(worker.execute("denoMain()")); let main_future = async move { - let result = worker.await; - js_check(result); + loop { + let result = worker.clone().await; + if let Err(err) = result { + eprintln!("{}", err.to_string()); + worker.clear_exception(); + } + } }; tokio_util::run(main_future); } diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index f93c5a060..203d1e17e 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -23,4 +23,5 @@ pub mod repl; pub mod resources; pub mod timers; pub mod tls; -pub mod workers; +pub mod web_worker; +pub mod worker_host; diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs new file mode 100644 index 000000000..300a0dfd1 --- /dev/null +++ b/cli/ops/web_worker.rs @@ -0,0 +1,77 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{JsonOp, Value}; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; +use crate::ops::json_op; +use crate::state::ThreadSafeState; +use deno_core::*; +use futures; +use futures::future::FutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std; +use std::convert::From; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op( + "worker_post_message", + s.core_op(json_op(s.stateful_op(op_worker_post_message))), + ); + i.register_op( + "worker_get_message", + s.core_op(json_op(s.stateful_op(op_worker_get_message))), + ); +} + +struct GetMessageFuture { + state: ThreadSafeState, +} + +impl Future for GetMessageFuture { + type Output = Option<Buf>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll_next_unpin(cx) + } +} + +/// Get message from host as guest worker +fn op_worker_get_message( + state: &ThreadSafeState, + _args: Value, + _data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let op = GetMessageFuture { + state: state.clone(), + }; + + let op = async move { + let maybe_buf = op.await; + debug!("op_worker_get_message"); + Ok(json!({ "data": maybe_buf })) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +/// Post message to host as guest worker +fn op_worker_post_message( + state: &ThreadSafeState, + _args: Value, + data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + let mut channels = state.worker_channels.lock().unwrap(); + let sender = &mut channels.sender; + futures::executor::block_on(sender.send(d)) + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + + Ok(JsonOp::Sync(json!({}))) +} diff --git a/cli/ops/workers.rs b/cli/ops/worker_host.rs index eeffb3930..c17dee444 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/worker_host.rs @@ -9,7 +9,7 @@ use crate::fmt_errors::JSError; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; -use crate::worker::Worker; +use crate::web_worker::WebWorker; use deno_core::*; use futures; use futures::channel::mpsc; @@ -54,15 +54,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { "host_get_message", s.core_op(json_op(s.stateful_op(op_host_get_message))), ); - // TODO: make sure these two ops are only accessible to appropriate Worker - i.register_op( - "worker_post_message", - s.core_op(json_op(s.stateful_op(op_worker_post_message))), - ); - i.register_op( - "worker_get_message", - s.core_op(json_op(s.stateful_op(op_worker_get_message))), - ); i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); } @@ -81,45 +72,10 @@ impl Future for GetMessageFuture { } } -/// Get message from host as guest worker -fn op_worker_get_message( - state: &ThreadSafeState, - _args: Value, - _data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let op = GetMessageFuture { - state: state.clone(), - }; - - let op = async move { - let maybe_buf = op.await; - debug!("op_worker_get_message"); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -/// Post message to host as guest worker -fn op_worker_post_message( - state: &ThreadSafeState, - _args: Value, - data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let mut channels = state.worker_channels.lock().unwrap(); - let sender = &mut channels.sender; - futures::executor::block_on(sender.send(d)) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - - Ok(JsonOp::Sync(json!({}))) -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateWorkerArgs { specifier: String, - include_deno_namespace: bool, has_source_code: bool, source_code: String, } @@ -133,10 +89,6 @@ fn op_create_worker( let args: CreateWorkerArgs = serde_json::from_value(args)?; let specifier = args.specifier.as_ref(); - // Only include deno namespace if requested AND current worker - // has included namespace (to avoid escalation). - let include_deno_namespace = - args.include_deno_namespace && state.include_deno_namespace; let has_source_code = args.has_source_code; let source_code = args.source_code; @@ -156,16 +108,13 @@ fn op_create_worker( state.global_state.clone(), Some(parent_state.permissions.clone()), // by default share with parent Some(module_specifier.clone()), - include_deno_namespace, int, )?; // TODO: add a new option to make child worker not sharing permissions // with parent (aka .clone(), requests from child won't reflect in parent) let name = format!("USER-WORKER-{}", specifier); - let deno_main_call = format!("denoMain({})", include_deno_namespace); let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), child_state, ext); - js_check(worker.execute(&deno_main_call)); + WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext); js_check(worker.execute("workerMain()")); let worker_id = parent_state.add_child_worker(worker.clone()); diff --git a/cli/state.rs b/cli/state.rs index acd661f25..4ad8241be 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -7,7 +7,7 @@ use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; -use crate::worker::Worker; +use crate::web_worker::WebWorker; use crate::worker::WorkerChannels; use deno_core::Buf; use deno_core::CoreOp; @@ -44,7 +44,6 @@ pub struct ThreadSafeState(Arc<State>); #[cfg_attr(feature = "cargo-clippy", allow(stutter))] pub struct State { pub global_state: ThreadSafeGlobalState, - pub modules: Arc<Mutex<deno_core::Modules>>, pub permissions: Arc<Mutex<DenoPermissions>>, pub main_module: Option<ModuleSpecifier>, pub worker_channels: Mutex<WorkerChannels>, @@ -53,12 +52,11 @@ pub struct State { pub import_map: Option<ImportMap>, pub metrics: Metrics, pub global_timer: Mutex<GlobalTimer>, - pub workers: Mutex<HashMap<u32, Worker>>, + pub workers: Mutex<HashMap<u32, WebWorker>>, pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>, pub next_worker_id: AtomicUsize, pub start_time: Instant, pub seeded_rng: Option<Mutex<StdRng>>, - pub include_deno_namespace: bool, pub resource_table: Mutex<ResourceTable>, } @@ -219,7 +217,6 @@ impl ThreadSafeState { // If Some(perm), use perm. Else copy from global_state. shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, main_module: Option<ModuleSpecifier>, - include_deno_namespace: bool, internal_channels: WorkerChannels, ) -> Result<Self, ErrBox> { let import_map: Option<ImportMap> = @@ -233,7 +230,6 @@ impl ThreadSafeState { None => None, }; - let modules = Arc::new(Mutex::new(deno_core::Modules::new())); let permissions = if let Some(perm) = shared_permissions { perm } else { @@ -242,7 +238,6 @@ impl ThreadSafeState { let state = State { global_state, - modules, main_module, permissions, import_map, @@ -254,14 +249,14 @@ impl ThreadSafeState { next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, - include_deno_namespace, + resource_table: Mutex::new(ResourceTable::default()), }; Ok(ThreadSafeState(Arc::new(state))) } - pub fn add_child_worker(&self, worker: Worker) -> u32 { + pub fn add_child_worker(&self, worker: WebWorker) -> u32 { let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; let mut workers_tl = self.workers.lock().unwrap(); workers_tl.insert(worker_id, worker); @@ -344,7 +339,6 @@ impl ThreadSafeState { ThreadSafeGlobalState::mock(argv), None, module_specifier, - true, internal_channels, ) .unwrap() diff --git a/cli/tests/026_workers.ts b/cli/tests/026_workers.ts index 7ac1a0f32..3043cc7b9 100644 --- a/cli/tests/026_workers.ts +++ b/cli/tests/026_workers.ts @@ -1,5 +1,5 @@ -const jsWorker = new Worker("./subdir/test_worker.js"); -const tsWorker = new Worker("./subdir/test_worker.ts"); +const jsWorker = new Worker("./subdir/test_worker.js", { type: "module" }); +const tsWorker = new Worker("./subdir/test_worker.ts", { type: "module" }); tsWorker.onmessage = (e): void => { console.log("Received ts: " + e.data); diff --git a/cli/tests/039_worker_deno_ns.ts b/cli/tests/039_worker_deno_ns.ts index 80ada4343..7cb7de7fb 100644 --- a/cli/tests/039_worker_deno_ns.ts +++ b/cli/tests/039_worker_deno_ns.ts @@ -1,7 +1,5 @@ -const w1 = new Worker("./039_worker_deno_ns/has_ns.ts"); -const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", { - noDenoNamespace: true -}); +const w1 = new Worker("./039_worker_deno_ns/has_ns.ts", { type: "module" }); +const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", { type: "module" }); let w1MsgCount = 0; let w2MsgCount = 0; w1.onmessage = (msg): void => { diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index eea1dd2c9..3e5073b45 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -301,6 +301,7 @@ itest!(_038_checkjs { output: "038_checkjs.js.out", }); +/* TODO(bartlomieju): itest!(_039_worker_deno_ns { args: "run --reload 039_worker_deno_ns.ts", output: "039_worker_deno_ns.ts.out", @@ -310,6 +311,7 @@ itest!(_040_worker_blob { args: "run --reload 040_worker_blob.ts", output: "040_worker_blob.ts.out", }); +*/ itest!(_041_dyn_import_eval { args: "eval import('./subdir/mod4.js').then(console.log)", @@ -567,12 +569,14 @@ itest!(error_type_definitions { output: "error_type_definitions.ts.out", }); +/* TODO(bartlomieju) itest!(error_worker_dynamic { args: "run --reload error_worker_dynamic.ts", check_stderr: true, exit_code: 1, output: "error_worker_dynamic.ts.out", }); +*/ itest!(exit_error42 { exit_code: 42, diff --git a/cli/tests/subdir/bench_worker.ts b/cli/tests/subdir/bench_worker.ts index 094cefb80..696a84b9f 100644 --- a/cli/tests/subdir/bench_worker.ts +++ b/cli/tests/subdir/bench_worker.ts @@ -14,6 +14,7 @@ onmessage = function(e): void { postMessage({ cmdId }); break; case 3: // Close + postMessage({ cmdId: 3 }); workerClose(); break; } diff --git a/cli/tests/workers_round_robin_bench.ts b/cli/tests/workers_round_robin_bench.ts index 992ce38dc..e8f5b2d30 100644 --- a/cli/tests/workers_round_robin_bench.ts +++ b/cli/tests/workers_round_robin_bench.ts @@ -37,12 +37,11 @@ function handleAsyncMsgFromWorker( async function main(): Promise<void> { const workers: Array<[Map<number, Resolvable<string>>, Worker]> = []; for (let i = 1; i <= workerCount; ++i) { - const worker = new Worker("./subdir/bench_worker.ts"); - const promise = new Promise((resolve): void => { - worker.onmessage = (e): void => { - if (e.data.cmdId === 0) resolve(); - }; - }); + const worker = new Worker("./subdir/bench_worker.ts", { type: "module" }); + const promise = createResolvable<void>(); + worker.onmessage = (e): void => { + if (e.data.cmdId === 0) promise.resolve(); + }; worker.postMessage({ cmdId: 0, action: 2 }); await promise; workers.push([new Map(), worker]); @@ -66,8 +65,12 @@ async function main(): Promise<void> { } } for (const [, worker] of workers) { + const promise = createResolvable<void>(); + worker.onmessage = (e): void => { + if (e.data.cmdId === 3) promise.resolve(); + }; worker.postMessage({ action: 3 }); - await worker.closed; // Required to avoid a cmdId not in table error. + await promise; } console.log("Finished!"); } diff --git a/cli/tests/workers_startup_bench.ts b/cli/tests/workers_startup_bench.ts index 5d2c24b89..60c15a4b1 100644 --- a/cli/tests/workers_startup_bench.ts +++ b/cli/tests/workers_startup_bench.ts @@ -4,7 +4,7 @@ const workerCount = 50; async function bench(): Promise<void> { const workers: Worker[] = []; for (let i = 1; i <= workerCount; ++i) { - const worker = new Worker("./subdir/bench_worker.ts"); + const worker = new Worker("./subdir/bench_worker.ts", { type: "module" }); const promise = new Promise((resolve): void => { worker.onmessage = (e): void => { if (e.data.cmdId === 0) resolve(); @@ -16,8 +16,13 @@ async function bench(): Promise<void> { } console.log("Done creating workers closing workers!"); for (const worker of workers) { + const promise = new Promise((resolve): void => { + worker.onmessage = (e): void => { + if (e.data.cmdId === 3) resolve(); + }; + }); worker.postMessage({ action: 3 }); - await worker.closed; // Required to avoid a cmdId not in table error. + await promise; } console.log("Finished!"); } diff --git a/cli/web_worker.rs b/cli/web_worker.rs new file mode 100644 index 000000000..f933cbdc4 --- /dev/null +++ b/cli/web_worker.rs @@ -0,0 +1,145 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use crate::fmt_errors::JSError; +use crate::ops; +use crate::state::ThreadSafeState; +use crate::worker::WorkerChannels; +use crate::worker::WorkerReceiver; +use deno_core; +use deno_core::Buf; +use deno_core::ErrBox; +use deno_core::ModuleSpecifier; +use deno_core::StartupData; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::task::AtomicWaker; +use std::env; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::sync::Mutex; +use std::task::Context; +use std::task::Poll; +use tokio::sync::Mutex as AsyncMutex; +use url::Url; + +#[derive(Clone)] +pub struct WebWorker { + pub name: String, + pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>, + pub state: ThreadSafeState, + external_channels: Arc<Mutex<WorkerChannels>>, +} + +impl WebWorker { + pub fn new( + name: String, + startup_data: StartupData, + state: ThreadSafeState, + external_channels: WorkerChannels, + ) -> Self { + let mut isolate = + deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); + + ops::web_worker::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); + + let global_state_ = state.global_state.clone(); + isolate.set_js_error_create(move |v8_exception| { + JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) + }); + + Self { + name, + isolate: Arc::new(AsyncMutex::new(isolate)), + state, + external_channels: Arc::new(Mutex::new(external_channels)), + } + } + + /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". + pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> { + let path = env::current_dir().unwrap().join("__anonymous__"); + let url = Url::from_file_path(path).unwrap(); + self.execute2(url.as_str(), js_source) + } + + /// Executes the provided JavaScript source code. The js_filename argument is + /// provided only for debugging purposes. + fn execute2( + &mut self, + js_filename: &str, + js_source: &str, + ) -> Result<(), ErrBox> { + let mut isolate = self.isolate.try_lock().unwrap(); + isolate.execute(js_filename, js_source) + } + + /// Executes the provided JavaScript module. + /// + /// Takes ownership of the isolate behind mutex. + pub async fn execute_mod_async( + &mut self, + module_specifier: &ModuleSpecifier, + maybe_code: Option<String>, + is_prefetch: bool, + ) -> Result<(), ErrBox> { + let specifier = module_specifier.to_string(); + let worker = self.clone(); + + let mut isolate = self.isolate.lock().await; + let id = isolate.load_module(&specifier, maybe_code).await?; + worker.state.global_state.progress.done(); + + if !is_prefetch { + return isolate.mod_evaluate(id); + } + + Ok(()) + } + + /// Post message to worker as a host. + /// + /// This method blocks current thread. + pub fn post_message( + &self, + buf: Buf, + ) -> impl Future<Output = Result<(), ErrBox>> { + let channels = self.external_channels.lock().unwrap(); + let mut sender = channels.sender.clone(); + async move { + let result = sender.send(buf).map_err(ErrBox::from).await; + drop(sender); + result + } + } + + /// Get message from worker as a host. + pub fn get_message(&self) -> WorkerReceiver { + WorkerReceiver { + channels: self.external_channels.clone(), + } + } + + pub fn clear_exception(&mut self) { + let mut isolate = self.isolate.try_lock().unwrap(); + isolate.clear_exception(); + } +} + +impl Future for WebWorker { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let waker = AtomicWaker::new(); + waker.register(cx.waker()); + match inner.isolate.try_lock() { + Ok(mut isolate) => isolate.poll_unpin(cx), + Err(_) => { + waker.wake(); + Poll::Pending + } + } + } +} diff --git a/cli/worker.rs b/cli/worker.rs index 7faf17e60..4601a6021 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -48,55 +48,42 @@ impl Worker { state: ThreadSafeState, external_channels: WorkerChannels, ) -> Self { - let isolate = Arc::new(AsyncMutex::new(deno_core::EsIsolate::new( - Box::new(state.clone()), - startup_data, - false, - ))); - { - let mut i = isolate.try_lock().unwrap(); - let op_registry = i.op_registry.clone(); - - ops::compiler::init(&mut i, &state); - ops::errors::init(&mut i, &state); - ops::fetch::init(&mut i, &state); - ops::files::init(&mut i, &state); - ops::fs::init(&mut i, &state); - ops::io::init(&mut i, &state); - ops::plugins::init(&mut i, &state, op_registry); - ops::net::init(&mut i, &state); - ops::tls::init(&mut i, &state); - ops::os::init(&mut i, &state); - ops::permissions::init(&mut i, &state); - ops::process::init(&mut i, &state); - ops::random::init(&mut i, &state); - ops::repl::init(&mut i, &state); - ops::resources::init(&mut i, &state); - ops::timers::init(&mut i, &state); - ops::workers::init(&mut i, &state); - - let global_state_ = state.global_state.clone(); - i.set_js_error_create(move |v8_exception| { - JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) - }) - } + let mut isolate = + deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); + let op_registry = isolate.op_registry.clone(); + + ops::compiler::init(&mut isolate, &state); + ops::errors::init(&mut isolate, &state); + ops::fetch::init(&mut isolate, &state); + ops::files::init(&mut isolate, &state); + ops::fs::init(&mut isolate, &state); + ops::io::init(&mut isolate, &state); + ops::plugins::init(&mut isolate, &state, op_registry); + ops::net::init(&mut isolate, &state); + ops::tls::init(&mut isolate, &state); + ops::os::init(&mut isolate, &state); + ops::permissions::init(&mut isolate, &state); + ops::process::init(&mut isolate, &state); + ops::random::init(&mut isolate, &state); + ops::repl::init(&mut isolate, &state); + ops::resources::init(&mut isolate, &state); + ops::timers::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); + ops::web_worker::init(&mut isolate, &state); + + let global_state_ = state.global_state.clone(); + isolate.set_js_error_create(move |v8_exception| { + JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) + }); Self { name, - isolate, + isolate: Arc::new(AsyncMutex::new(isolate)), state, external_channels: Arc::new(Mutex::new(external_channels)), } } - pub fn set_error_handler( - &mut self, - handler: Box<dyn FnMut(ErrBox) -> Result<(), ErrBox>>, - ) { - let mut i = self.isolate.try_lock().unwrap(); - i.set_error_handler(handler); - } - /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> { let path = env::current_dir().unwrap().join("__anonymous__"); @@ -188,7 +175,7 @@ impl Future for Worker { /// that will return message received from worker or None /// if worker's channel has been closed. pub struct WorkerReceiver { - channels: Arc<Mutex<WorkerChannels>>, + pub channels: Arc<Mutex<WorkerChannels>>, } impl Future for WorkerReceiver { @@ -255,7 +242,6 @@ mod tests { global_state, None, Some(module_specifier.clone()), - true, int, ) .unwrap(); @@ -299,7 +285,6 @@ mod tests { global_state, None, Some(module_specifier.clone()), - true, int, ) .unwrap(); @@ -342,7 +327,6 @@ mod tests { global_state.clone(), None, Some(module_specifier.clone()), - true, int, ) .unwrap(); |