summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-01-21 09:49:47 +0100
committerGitHub <noreply@github.com>2020-01-21 09:49:47 +0100
commit7966bf14c062a05b1606a62c996890571454ecc8 (patch)
tree65bede64b47707c3accc80d0bb18e99840c639f7 /cli
parentc90036ab88bb1ae6b9c87d5e368f56d8c8afab69 (diff)
refactor: split worker and worker host logic (#3722)
* split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs * refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime * BREAKING CHANGE: remove support for blob: URL in Worker * BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor * introduce WebWorker struct which is a stripped down version of cli::Worker
Diffstat (limited to 'cli')
-rw-r--r--cli/compilers/ts.rs2
-rw-r--r--cli/compilers/wasm.rs2
-rw-r--r--cli/js/compiler.ts2
-rw-r--r--cli/js/globals.ts11
-rw-r--r--cli/js/lib.deno_runtime.d.ts35
-rw-r--r--cli/js/worker_main.ts98
-rw-r--r--cli/js/workers.ts128
-rw-r--r--cli/lib.rs17
-rw-r--r--cli/ops/mod.rs3
-rw-r--r--cli/ops/web_worker.rs77
-rw-r--r--cli/ops/worker_host.rs (renamed from cli/ops/workers.rs)55
-rw-r--r--cli/state.rs14
-rw-r--r--cli/tests/026_workers.ts4
-rw-r--r--cli/tests/039_worker_deno_ns.ts6
-rw-r--r--cli/tests/integration_tests.rs4
-rw-r--r--cli/tests/subdir/bench_worker.ts1
-rw-r--r--cli/tests/workers_round_robin_bench.ts17
-rw-r--r--cli/tests/workers_startup_bench.ts9
-rw-r--r--cli/web_worker.rs145
-rw-r--r--cli/worker.rs74
20 files changed, 439 insertions, 265 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index e8abbcd27..037043368 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -231,7 +231,7 @@ impl TsCompiler {
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
- ThreadSafeState::new(global_state.clone(), None, None, true, int)
+ ThreadSafeState::new(global_state.clone(), None, None, int)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs
index e3297283b..ca889be1f 100644
--- a/cli/compilers/wasm.rs
+++ b/cli/compilers/wasm.rs
@@ -45,7 +45,7 @@ impl WasmCompiler {
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
- ThreadSafeState::new(global_state.clone(), None, None, true, int)
+ ThreadSafeState::new(global_state.clone(), None, None, int)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts
index 79b9fdaf6..7addfc5ca 100644
--- a/cli/js/compiler.ts
+++ b/cli/js/compiler.ts
@@ -32,7 +32,7 @@ import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts";
import * as os from "./os.ts";
import { assert } from "./util.ts";
import * as util from "./util.ts";
-import { postMessage, workerClose, workerMain } from "./workers.ts";
+import { postMessage, workerClose, workerMain } from "./worker_main.ts";
const self = globalThis;
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index 5754002c0..0f364b8e0 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -21,6 +21,7 @@ import * as textEncoding from "./text_encoding.ts";
import * as timers from "./timers.ts";
import * as url from "./url.ts";
import * as urlSearchParams from "./url_search_params.ts";
+import * as workerRuntime from "./worker_main.ts";
import * as workers from "./workers.ts";
import * as performanceUtil from "./performance.ts";
import * as request from "./request.ts";
@@ -194,12 +195,12 @@ const globalProperties = {
Response: nonEnumerable(fetchTypes.Response),
performance: writable(new performanceUtil.Performance()),
- onmessage: writable(workers.onmessage),
- onerror: writable(workers.onerror),
+ onmessage: writable(workerRuntime.onmessage),
+ onerror: writable(workerRuntime.onerror),
- workerMain: nonEnumerable(workers.workerMain),
- workerClose: nonEnumerable(workers.workerClose),
- postMessage: writable(workers.postMessage),
+ workerMain: nonEnumerable(workerRuntime.workerMain),
+ workerClose: nonEnumerable(workerRuntime.workerClose),
+ postMessage: writable(workerRuntime.postMessage),
Worker: nonEnumerable(workers.WorkerImpl),
[domTypes.eventTargetHost]: nonEnumerable(null),
diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts
index 0fa183348..05553ffb7 100644
--- a/cli/js/lib.deno_runtime.d.ts
+++ b/cli/js/lib.deno_runtime.d.ts
@@ -2128,9 +2128,9 @@ declare interface Window {
performance: __performanceUtil.Performance;
onmessage: (e: { data: any }) => void;
onerror: undefined | typeof onerror;
- workerMain: typeof __workers.workerMain;
- workerClose: typeof __workers.workerClose;
- postMessage: typeof __workers.postMessage;
+ workerMain: typeof __workerMain.workerMain;
+ workerClose: typeof __workerMain.workerClose;
+ postMessage: typeof __workerMain.postMessage;
Worker: typeof __workers.WorkerImpl;
addEventListener: (
type: string,
@@ -2187,9 +2187,9 @@ declare let onerror:
e: Event
) => boolean | void)
| undefined;
-declare const workerMain: typeof __workers.workerMain;
-declare const workerClose: typeof __workers.workerClose;
-declare const postMessage: typeof __workers.postMessage;
+declare const workerMain: typeof __workerMain.workerMain;
+declare const workerClose: typeof __workerMain.workerClose;
+declare const postMessage: typeof __workerMain.postMessage;
declare const Worker: typeof __workers.WorkerImpl;
declare const addEventListener: (
type: string,
@@ -3437,31 +3437,25 @@ declare namespace __url {
};
}
-declare namespace __workers {
- // @url js/workers.d.ts
-
- export function encodeMessage(data: any): Uint8Array;
- export function decodeMessage(dataIntArray: Uint8Array): any;
+declare namespace __workerMain {
export let onmessage: (e: { data: any }) => void;
export function postMessage(data: any): void;
export function getMessage(): Promise<any>;
export let isClosing: boolean;
export function workerClose(): void;
export function workerMain(): Promise<void>;
+}
+
+declare namespace __workers {
+ // @url js/workers.d.ts
export interface Worker {
onerror?: (e: Event) => void;
onmessage?: (e: { data: any }) => void;
onmessageerror?: () => void;
postMessage(data: any): void;
- closed: Promise<void>;
}
- export interface WorkerOptions {}
- /** Extended Deno Worker initialization options.
- * `noDenoNamespace` hides global `window.Deno` namespace for
- * spawned worker and nested workers spawned by it (default: false).
- */
- export interface DenoWorkerOptions extends WorkerOptions {
- noDenoNamespace?: boolean;
+ export interface WorkerOptions {
+ type?: "classic" | "module";
}
export class WorkerImpl implements Worker {
private readonly id;
@@ -3470,8 +3464,7 @@ declare namespace __workers {
onerror?: (e: Event) => void;
onmessage?: (data: any) => void;
onmessageerror?: () => void;
- constructor(specifier: string, options?: DenoWorkerOptions);
- readonly closed: Promise<void>;
+ constructor(specifier: string, options?: WorkerOptions);
postMessage(data: any): void;
private run;
}
diff --git a/cli/js/worker_main.ts b/cli/js/worker_main.ts
new file mode 100644
index 000000000..cb70057ea
--- /dev/null
+++ b/cli/js/worker_main.ts
@@ -0,0 +1,98 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+/* eslint-disable @typescript-eslint/no-explicit-any */
+import { core } from "./core.ts";
+import * as dispatch from "./dispatch.ts";
+import { sendAsync, sendSync } from "./dispatch_json.ts";
+import { log } from "./util.ts";
+import { TextDecoder, TextEncoder } from "./text_encoding.ts";
+
+const encoder = new TextEncoder();
+const decoder = new TextDecoder();
+
+function encodeMessage(data: any): Uint8Array {
+ const dataJson = JSON.stringify(data);
+ return encoder.encode(dataJson);
+}
+
+function decodeMessage(dataIntArray: Uint8Array): any {
+ const dataJson = decoder.decode(dataIntArray);
+ return JSON.parse(dataJson);
+}
+
+// Stuff for workers
+export const onmessage: (e: { data: any }) => void = (): void => {};
+export const onerror: (e: { data: any }) => void = (): void => {};
+
+export function postMessage(data: any): void {
+ const dataIntArray = encodeMessage(data);
+ sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
+}
+
+export async function getMessage(): Promise<any> {
+ log("getMessage");
+ const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
+ if (res.data != null) {
+ return decodeMessage(new Uint8Array(res.data));
+ } else {
+ return null;
+ }
+}
+
+export let isClosing = false;
+
+export function workerClose(): void {
+ isClosing = true;
+}
+
+export async function workerMain(): Promise<void> {
+ const ops = core.ops();
+ // TODO(bartlomieju): this is a prototype, we should come up with
+ // something a bit more sophisticated
+ for (const [name, opId] of Object.entries(ops)) {
+ const opName = `OP_${name.toUpperCase()}`;
+ // Assign op ids to actual variables
+ // TODO(ry) This type casting is gross and should be fixed.
+ ((dispatch as unknown) as { [key: string]: number })[opName] = opId;
+ core.setAsyncHandler(opId, dispatch.getAsyncHandler(opName));
+ }
+
+ log("workerMain");
+
+ while (!isClosing) {
+ const data = await getMessage();
+ if (data == null) {
+ log("workerMain got null message. quitting.");
+ break;
+ }
+
+ let result: void | Promise<void>;
+ const event = { data };
+
+ try {
+ if (!globalThis["onmessage"]) {
+ break;
+ }
+ result = globalThis.onmessage!(event);
+ if (result && "then" in result) {
+ await result;
+ }
+ if (!globalThis["onmessage"]) {
+ break;
+ }
+ } catch (e) {
+ if (globalThis["onerror"]) {
+ const result = globalThis.onerror(
+ e.message,
+ e.fileName,
+ e.lineNumber,
+ e.columnNumber,
+ e
+ );
+ if (result === true) {
+ continue;
+ }
+ }
+ throw e;
+ }
+ }
+}
diff --git a/cli/js/workers.ts b/cli/js/workers.ts
index 7e8219e19..60ef73da0 100644
--- a/cli/js/workers.ts
+++ b/cli/js/workers.ts
@@ -2,35 +2,35 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as dispatch from "./dispatch.ts";
import { sendAsync, sendSync } from "./dispatch_json.ts";
-import { log, createResolvable, Resolvable } from "./util.ts";
+import { log } from "./util.ts";
import { TextDecoder, TextEncoder } from "./text_encoding.ts";
+/*
import { blobURLMap } from "./url.ts";
import { blobBytesWeakMap } from "./blob.ts";
+*/
import { Event } from "./event.ts";
import { EventTarget } from "./event_target.ts";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
-export function encodeMessage(data: any): Uint8Array {
+function encodeMessage(data: any): Uint8Array {
const dataJson = JSON.stringify(data);
return encoder.encode(dataJson);
}
-export function decodeMessage(dataIntArray: Uint8Array): any {
+function decodeMessage(dataIntArray: Uint8Array): any {
const dataJson = decoder.decode(dataIntArray);
return JSON.parse(dataJson);
}
function createWorker(
specifier: string,
- includeDenoNamespace: boolean,
hasSourceCode: boolean,
sourceCode: Uint8Array
): { id: number; loaded: boolean } {
return sendSync(dispatch.OP_CREATE_WORKER, {
specifier,
- includeDenoNamespace,
hasSourceCode,
sourceCode: new TextDecoder().decode(sourceCode)
});
@@ -67,92 +67,15 @@ async function hostGetMessage(id: number): Promise<any> {
}
}
-// Stuff for workers
-export const onmessage: (e: { data: any }) => void = (): void => {};
-export const onerror: (e: { data: any }) => void = (): void => {};
-
-export function postMessage(data: any): void {
- const dataIntArray = encodeMessage(data);
- sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
-}
-
-export async function getMessage(): Promise<any> {
- log("getMessage");
- const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
- if (res.data != null) {
- return decodeMessage(new Uint8Array(res.data));
- } else {
- return null;
- }
-}
-
-export let isClosing = false;
-
-export function workerClose(): void {
- isClosing = true;
-}
-
-export async function workerMain(): Promise<void> {
- log("workerMain");
-
- while (!isClosing) {
- const data = await getMessage();
- if (data == null) {
- log("workerMain got null message. quitting.");
- break;
- }
-
- let result: void | Promise<void>;
- const event = { data };
-
- try {
- if (!globalThis["onmessage"]) {
- break;
- }
- result = globalThis.onmessage!(event);
- if (result && "then" in result) {
- await result;
- }
- if (!globalThis["onmessage"]) {
- break;
- }
- } catch (e) {
- if (globalThis["onerror"]) {
- const result = globalThis.onerror(
- e.message,
- e.fileName,
- e.lineNumber,
- e.columnNumber,
- e
- );
- if (result === true) {
- continue;
- }
- }
- throw e;
- }
- }
-}
-
export interface Worker {
onerror?: (e: any) => void;
onmessage?: (e: { data: any }) => void;
onmessageerror?: () => void;
postMessage(data: any): void;
- // TODO(bartlomieju): remove this
- closed: Promise<void>;
}
-// TODO(kevinkassimo): Maybe implement reasonable web worker options?
-// eslint-disable-next-line @typescript-eslint/no-empty-interface
-export interface WorkerOptions {}
-
-/** Extended Deno Worker initialization options.
- * `noDenoNamespace` hides global `globalThis.Deno` namespace for
- * spawned worker and nested workers spawned by it (default: false).
- */
-export interface DenoWorkerOptions extends WorkerOptions {
- noDenoNamespace?: boolean;
+export interface WorkerOptions {
+ type?: "classic" | "module";
}
export class WorkerImpl extends EventTarget implements Worker {
@@ -160,20 +83,29 @@ export class WorkerImpl extends EventTarget implements Worker {
private isClosing = false;
private messageBuffer: any[] = [];
private ready = false;
- private readonly isClosedPromise: Resolvable<void>;
public onerror?: (e: any) => void;
public onmessage?: (data: any) => void;
public onmessageerror?: () => void;
- constructor(specifier: string, options?: DenoWorkerOptions) {
+ constructor(specifier: string, options?: WorkerOptions) {
super();
- let hasSourceCode = false;
- let sourceCode = new Uint8Array();
- let includeDenoNamespace = true;
- if (options && options.noDenoNamespace) {
- includeDenoNamespace = false;
+ let type = "classic";
+
+ if (options?.type) {
+ type = options.type;
+ }
+
+ if (type !== "module") {
+ throw new Error(
+ 'Not yet implemented: only "module" type workers are supported'
+ );
}
+
+ const hasSourceCode = false;
+ const sourceCode = new Uint8Array();
+
+ /* TODO(bartlomieju):
// Handle blob URL.
if (specifier.startsWith("blob:")) {
hasSourceCode = true;
@@ -187,23 +119,14 @@ export class WorkerImpl extends EventTarget implements Worker {
}
sourceCode = blobBytes!;
}
+ */
- const { id, loaded } = createWorker(
- specifier,
- includeDenoNamespace,
- hasSourceCode,
- sourceCode
- );
+ const { id, loaded } = createWorker(specifier, hasSourceCode, sourceCode);
this.id = id;
this.ready = loaded;
- this.isClosedPromise = createResolvable();
this.poll();
}
- get closed(): Promise<void> {
- return this.isClosedPromise;
- }
-
private handleError(e: any): boolean {
// TODO: this is being handled in a type unsafe way, it should be type safe
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -259,7 +182,6 @@ export class WorkerImpl extends EventTarget implements Worker {
} else {
this.isClosing = true;
hostCloseWorker(this.id);
- this.isClosedPromise.resolve();
break;
}
}
diff --git a/cli/lib.rs b/cli/lib.rs
index a57f224e2..e9a62375a 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -51,6 +51,7 @@ pub mod state;
pub mod test_util;
mod tokio_util;
pub mod version;
+mod web_worker;
pub mod worker;
use crate::deno_error::js_check;
@@ -120,7 +121,6 @@ fn create_worker_and_state(
global_state.clone(),
None,
global_state.main_module.clone(),
- true,
int,
)
.map_err(deno_error::print_err_and_exit)
@@ -346,16 +346,15 @@ fn bundle_command(flags: DenoFlags) {
fn run_repl(flags: DenoFlags) {
let (mut worker, _state) = create_worker_and_state(flags);
- // Make repl continue to function under uncaught async errors.
- worker.set_error_handler(Box::new(|err| {
- eprintln!("{}", err.to_string());
- Ok(())
- }));
- // Setup runtime.
js_check(worker.execute("denoMain()"));
let main_future = async move {
- let result = worker.await;
- js_check(result);
+ loop {
+ let result = worker.clone().await;
+ if let Err(err) = result {
+ eprintln!("{}", err.to_string());
+ worker.clear_exception();
+ }
+ }
};
tokio_util::run(main_future);
}
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index f93c5a060..203d1e17e 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -23,4 +23,5 @@ pub mod repl;
pub mod resources;
pub mod timers;
pub mod tls;
-pub mod workers;
+pub mod web_worker;
+pub mod worker_host;
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
new file mode 100644
index 000000000..300a0dfd1
--- /dev/null
+++ b/cli/ops/web_worker.rs
@@ -0,0 +1,77 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use super::dispatch_json::{JsonOp, Value};
+use crate::deno_error::DenoError;
+use crate::deno_error::ErrorKind;
+use crate::ops::json_op;
+use crate::state::ThreadSafeState;
+use deno_core::*;
+use futures;
+use futures::future::FutureExt;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std;
+use std::convert::From;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
+ i.register_op(
+ "worker_post_message",
+ s.core_op(json_op(s.stateful_op(op_worker_post_message))),
+ );
+ i.register_op(
+ "worker_get_message",
+ s.core_op(json_op(s.stateful_op(op_worker_get_message))),
+ );
+}
+
+struct GetMessageFuture {
+ state: ThreadSafeState,
+}
+
+impl Future for GetMessageFuture {
+ type Output = Option<Buf>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut channels = inner.state.worker_channels.lock().unwrap();
+ let receiver = &mut channels.receiver;
+ receiver.poll_next_unpin(cx)
+ }
+}
+
+/// Get message from host as guest worker
+fn op_worker_get_message(
+ state: &ThreadSafeState,
+ _args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let op = GetMessageFuture {
+ state: state.clone(),
+ };
+
+ let op = async move {
+ let maybe_buf = op.await;
+ debug!("op_worker_get_message");
+ Ok(json!({ "data": maybe_buf }))
+ };
+
+ Ok(JsonOp::Async(op.boxed()))
+}
+
+/// Post message to host as guest worker
+fn op_worker_post_message(
+ state: &ThreadSafeState,
+ _args: Value,
+ data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
+ let mut channels = state.worker_channels.lock().unwrap();
+ let sender = &mut channels.sender;
+ futures::executor::block_on(sender.send(d))
+ .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+
+ Ok(JsonOp::Sync(json!({})))
+}
diff --git a/cli/ops/workers.rs b/cli/ops/worker_host.rs
index eeffb3930..c17dee444 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/worker_host.rs
@@ -9,7 +9,7 @@ use crate::fmt_errors::JSError;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
-use crate::worker::Worker;
+use crate::web_worker::WebWorker;
use deno_core::*;
use futures;
use futures::channel::mpsc;
@@ -54,15 +54,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
"host_get_message",
s.core_op(json_op(s.stateful_op(op_host_get_message))),
);
- // TODO: make sure these two ops are only accessible to appropriate Worker
- i.register_op(
- "worker_post_message",
- s.core_op(json_op(s.stateful_op(op_worker_post_message))),
- );
- i.register_op(
- "worker_get_message",
- s.core_op(json_op(s.stateful_op(op_worker_get_message))),
- );
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
@@ -81,45 +72,10 @@ impl Future for GetMessageFuture {
}
}
-/// Get message from host as guest worker
-fn op_worker_get_message(
- state: &ThreadSafeState,
- _args: Value,
- _data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let op = GetMessageFuture {
- state: state.clone(),
- };
-
- let op = async move {
- let maybe_buf = op.await;
- debug!("op_worker_get_message");
- Ok(json!({ "data": maybe_buf }))
- };
-
- Ok(JsonOp::Async(op.boxed()))
-}
-
-/// Post message to host as guest worker
-fn op_worker_post_message(
- state: &ThreadSafeState,
- _args: Value,
- data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let mut channels = state.worker_channels.lock().unwrap();
- let sender = &mut channels.sender;
- futures::executor::block_on(sender.send(d))
- .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
-
- Ok(JsonOp::Sync(json!({})))
-}
-
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateWorkerArgs {
specifier: String,
- include_deno_namespace: bool,
has_source_code: bool,
source_code: String,
}
@@ -133,10 +89,6 @@ fn op_create_worker(
let args: CreateWorkerArgs = serde_json::from_value(args)?;
let specifier = args.specifier.as_ref();
- // Only include deno namespace if requested AND current worker
- // has included namespace (to avoid escalation).
- let include_deno_namespace =
- args.include_deno_namespace && state.include_deno_namespace;
let has_source_code = args.has_source_code;
let source_code = args.source_code;
@@ -156,16 +108,13 @@ fn op_create_worker(
state.global_state.clone(),
Some(parent_state.permissions.clone()), // by default share with parent
Some(module_specifier.clone()),
- include_deno_namespace,
int,
)?;
// TODO: add a new option to make child worker not sharing permissions
// with parent (aka .clone(), requests from child won't reflect in parent)
let name = format!("USER-WORKER-{}", specifier);
- let deno_main_call = format!("denoMain({})", include_deno_namespace);
let mut worker =
- Worker::new(name, startup_data::deno_isolate_init(), child_state, ext);
- js_check(worker.execute(&deno_main_call));
+ WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute("workerMain()"));
let worker_id = parent_state.add_child_worker(worker.clone());
diff --git a/cli/state.rs b/cli/state.rs
index acd661f25..4ad8241be 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -7,7 +7,7 @@ use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
-use crate::worker::Worker;
+use crate::web_worker::WebWorker;
use crate::worker::WorkerChannels;
use deno_core::Buf;
use deno_core::CoreOp;
@@ -44,7 +44,6 @@ pub struct ThreadSafeState(Arc<State>);
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct State {
pub global_state: ThreadSafeGlobalState,
- pub modules: Arc<Mutex<deno_core::Modules>>,
pub permissions: Arc<Mutex<DenoPermissions>>,
pub main_module: Option<ModuleSpecifier>,
pub worker_channels: Mutex<WorkerChannels>,
@@ -53,12 +52,11 @@ pub struct State {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
- pub workers: Mutex<HashMap<u32, Worker>>,
+ pub workers: Mutex<HashMap<u32, WebWorker>>,
pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
- pub include_deno_namespace: bool,
pub resource_table: Mutex<ResourceTable>,
}
@@ -219,7 +217,6 @@ impl ThreadSafeState {
// If Some(perm), use perm. Else copy from global_state.
shared_permissions: Option<Arc<Mutex<DenoPermissions>>>,
main_module: Option<ModuleSpecifier>,
- include_deno_namespace: bool,
internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> =
@@ -233,7 +230,6 @@ impl ThreadSafeState {
None => None,
};
- let modules = Arc::new(Mutex::new(deno_core::Modules::new()));
let permissions = if let Some(perm) = shared_permissions {
perm
} else {
@@ -242,7 +238,6 @@ impl ThreadSafeState {
let state = State {
global_state,
- modules,
main_module,
permissions,
import_map,
@@ -254,14 +249,14 @@ impl ThreadSafeState {
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
seeded_rng,
- include_deno_namespace,
+
resource_table: Mutex::new(ResourceTable::default()),
};
Ok(ThreadSafeState(Arc::new(state)))
}
- pub fn add_child_worker(&self, worker: Worker) -> u32 {
+ pub fn add_child_worker(&self, worker: WebWorker) -> u32 {
let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
let mut workers_tl = self.workers.lock().unwrap();
workers_tl.insert(worker_id, worker);
@@ -344,7 +339,6 @@ impl ThreadSafeState {
ThreadSafeGlobalState::mock(argv),
None,
module_specifier,
- true,
internal_channels,
)
.unwrap()
diff --git a/cli/tests/026_workers.ts b/cli/tests/026_workers.ts
index 7ac1a0f32..3043cc7b9 100644
--- a/cli/tests/026_workers.ts
+++ b/cli/tests/026_workers.ts
@@ -1,5 +1,5 @@
-const jsWorker = new Worker("./subdir/test_worker.js");
-const tsWorker = new Worker("./subdir/test_worker.ts");
+const jsWorker = new Worker("./subdir/test_worker.js", { type: "module" });
+const tsWorker = new Worker("./subdir/test_worker.ts", { type: "module" });
tsWorker.onmessage = (e): void => {
console.log("Received ts: " + e.data);
diff --git a/cli/tests/039_worker_deno_ns.ts b/cli/tests/039_worker_deno_ns.ts
index 80ada4343..7cb7de7fb 100644
--- a/cli/tests/039_worker_deno_ns.ts
+++ b/cli/tests/039_worker_deno_ns.ts
@@ -1,7 +1,5 @@
-const w1 = new Worker("./039_worker_deno_ns/has_ns.ts");
-const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", {
- noDenoNamespace: true
-});
+const w1 = new Worker("./039_worker_deno_ns/has_ns.ts", { type: "module" });
+const w2 = new Worker("./039_worker_deno_ns/no_ns.ts", { type: "module" });
let w1MsgCount = 0;
let w2MsgCount = 0;
w1.onmessage = (msg): void => {
diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs
index eea1dd2c9..3e5073b45 100644
--- a/cli/tests/integration_tests.rs
+++ b/cli/tests/integration_tests.rs
@@ -301,6 +301,7 @@ itest!(_038_checkjs {
output: "038_checkjs.js.out",
});
+/* TODO(bartlomieju):
itest!(_039_worker_deno_ns {
args: "run --reload 039_worker_deno_ns.ts",
output: "039_worker_deno_ns.ts.out",
@@ -310,6 +311,7 @@ itest!(_040_worker_blob {
args: "run --reload 040_worker_blob.ts",
output: "040_worker_blob.ts.out",
});
+*/
itest!(_041_dyn_import_eval {
args: "eval import('./subdir/mod4.js').then(console.log)",
@@ -567,12 +569,14 @@ itest!(error_type_definitions {
output: "error_type_definitions.ts.out",
});
+/* TODO(bartlomieju)
itest!(error_worker_dynamic {
args: "run --reload error_worker_dynamic.ts",
check_stderr: true,
exit_code: 1,
output: "error_worker_dynamic.ts.out",
});
+*/
itest!(exit_error42 {
exit_code: 42,
diff --git a/cli/tests/subdir/bench_worker.ts b/cli/tests/subdir/bench_worker.ts
index 094cefb80..696a84b9f 100644
--- a/cli/tests/subdir/bench_worker.ts
+++ b/cli/tests/subdir/bench_worker.ts
@@ -14,6 +14,7 @@ onmessage = function(e): void {
postMessage({ cmdId });
break;
case 3: // Close
+ postMessage({ cmdId: 3 });
workerClose();
break;
}
diff --git a/cli/tests/workers_round_robin_bench.ts b/cli/tests/workers_round_robin_bench.ts
index 992ce38dc..e8f5b2d30 100644
--- a/cli/tests/workers_round_robin_bench.ts
+++ b/cli/tests/workers_round_robin_bench.ts
@@ -37,12 +37,11 @@ function handleAsyncMsgFromWorker(
async function main(): Promise<void> {
const workers: Array<[Map<number, Resolvable<string>>, Worker]> = [];
for (let i = 1; i <= workerCount; ++i) {
- const worker = new Worker("./subdir/bench_worker.ts");
- const promise = new Promise((resolve): void => {
- worker.onmessage = (e): void => {
- if (e.data.cmdId === 0) resolve();
- };
- });
+ const worker = new Worker("./subdir/bench_worker.ts", { type: "module" });
+ const promise = createResolvable<void>();
+ worker.onmessage = (e): void => {
+ if (e.data.cmdId === 0) promise.resolve();
+ };
worker.postMessage({ cmdId: 0, action: 2 });
await promise;
workers.push([new Map(), worker]);
@@ -66,8 +65,12 @@ async function main(): Promise<void> {
}
}
for (const [, worker] of workers) {
+ const promise = createResolvable<void>();
+ worker.onmessage = (e): void => {
+ if (e.data.cmdId === 3) promise.resolve();
+ };
worker.postMessage({ action: 3 });
- await worker.closed; // Required to avoid a cmdId not in table error.
+ await promise;
}
console.log("Finished!");
}
diff --git a/cli/tests/workers_startup_bench.ts b/cli/tests/workers_startup_bench.ts
index 5d2c24b89..60c15a4b1 100644
--- a/cli/tests/workers_startup_bench.ts
+++ b/cli/tests/workers_startup_bench.ts
@@ -4,7 +4,7 @@ const workerCount = 50;
async function bench(): Promise<void> {
const workers: Worker[] = [];
for (let i = 1; i <= workerCount; ++i) {
- const worker = new Worker("./subdir/bench_worker.ts");
+ const worker = new Worker("./subdir/bench_worker.ts", { type: "module" });
const promise = new Promise((resolve): void => {
worker.onmessage = (e): void => {
if (e.data.cmdId === 0) resolve();
@@ -16,8 +16,13 @@ async function bench(): Promise<void> {
}
console.log("Done creating workers closing workers!");
for (const worker of workers) {
+ const promise = new Promise((resolve): void => {
+ worker.onmessage = (e): void => {
+ if (e.data.cmdId === 3) resolve();
+ };
+ });
worker.postMessage({ action: 3 });
- await worker.closed; // Required to avoid a cmdId not in table error.
+ await promise;
}
console.log("Finished!");
}
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
new file mode 100644
index 000000000..f933cbdc4
--- /dev/null
+++ b/cli/web_worker.rs
@@ -0,0 +1,145 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use crate::fmt_errors::JSError;
+use crate::ops;
+use crate::state::ThreadSafeState;
+use crate::worker::WorkerChannels;
+use crate::worker::WorkerReceiver;
+use deno_core;
+use deno_core::Buf;
+use deno_core::ErrBox;
+use deno_core::ModuleSpecifier;
+use deno_core::StartupData;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::sink::SinkExt;
+use futures::task::AtomicWaker;
+use std::env;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::task::Context;
+use std::task::Poll;
+use tokio::sync::Mutex as AsyncMutex;
+use url::Url;
+
+#[derive(Clone)]
+pub struct WebWorker {
+ pub name: String,
+ pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
+ pub state: ThreadSafeState,
+ external_channels: Arc<Mutex<WorkerChannels>>,
+}
+
+impl WebWorker {
+ pub fn new(
+ name: String,
+ startup_data: StartupData,
+ state: ThreadSafeState,
+ external_channels: WorkerChannels,
+ ) -> Self {
+ let mut isolate =
+ deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
+
+ ops::web_worker::init(&mut isolate, &state);
+ ops::worker_host::init(&mut isolate, &state);
+
+ let global_state_ = state.global_state.clone();
+ isolate.set_js_error_create(move |v8_exception| {
+ JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
+ });
+
+ Self {
+ name,
+ isolate: Arc::new(AsyncMutex::new(isolate)),
+ state,
+ external_channels: Arc::new(Mutex::new(external_channels)),
+ }
+ }
+
+ /// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
+ pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
+ let path = env::current_dir().unwrap().join("__anonymous__");
+ let url = Url::from_file_path(path).unwrap();
+ self.execute2(url.as_str(), js_source)
+ }
+
+ /// Executes the provided JavaScript source code. The js_filename argument is
+ /// provided only for debugging purposes.
+ fn execute2(
+ &mut self,
+ js_filename: &str,
+ js_source: &str,
+ ) -> Result<(), ErrBox> {
+ let mut isolate = self.isolate.try_lock().unwrap();
+ isolate.execute(js_filename, js_source)
+ }
+
+ /// Executes the provided JavaScript module.
+ ///
+ /// Takes ownership of the isolate behind mutex.
+ pub async fn execute_mod_async(
+ &mut self,
+ module_specifier: &ModuleSpecifier,
+ maybe_code: Option<String>,
+ is_prefetch: bool,
+ ) -> Result<(), ErrBox> {
+ let specifier = module_specifier.to_string();
+ let worker = self.clone();
+
+ let mut isolate = self.isolate.lock().await;
+ let id = isolate.load_module(&specifier, maybe_code).await?;
+ worker.state.global_state.progress.done();
+
+ if !is_prefetch {
+ return isolate.mod_evaluate(id);
+ }
+
+ Ok(())
+ }
+
+ /// Post message to worker as a host.
+ ///
+ /// This method blocks current thread.
+ pub fn post_message(
+ &self,
+ buf: Buf,
+ ) -> impl Future<Output = Result<(), ErrBox>> {
+ let channels = self.external_channels.lock().unwrap();
+ let mut sender = channels.sender.clone();
+ async move {
+ let result = sender.send(buf).map_err(ErrBox::from).await;
+ drop(sender);
+ result
+ }
+ }
+
+ /// Get message from worker as a host.
+ pub fn get_message(&self) -> WorkerReceiver {
+ WorkerReceiver {
+ channels: self.external_channels.clone(),
+ }
+ }
+
+ pub fn clear_exception(&mut self) {
+ let mut isolate = self.isolate.try_lock().unwrap();
+ isolate.clear_exception();
+ }
+}
+
+impl Future for WebWorker {
+ type Output = Result<(), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let waker = AtomicWaker::new();
+ waker.register(cx.waker());
+ match inner.isolate.try_lock() {
+ Ok(mut isolate) => isolate.poll_unpin(cx),
+ Err(_) => {
+ waker.wake();
+ Poll::Pending
+ }
+ }
+ }
+}
diff --git a/cli/worker.rs b/cli/worker.rs
index 7faf17e60..4601a6021 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -48,55 +48,42 @@ impl Worker {
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
- let isolate = Arc::new(AsyncMutex::new(deno_core::EsIsolate::new(
- Box::new(state.clone()),
- startup_data,
- false,
- )));
- {
- let mut i = isolate.try_lock().unwrap();
- let op_registry = i.op_registry.clone();
-
- ops::compiler::init(&mut i, &state);
- ops::errors::init(&mut i, &state);
- ops::fetch::init(&mut i, &state);
- ops::files::init(&mut i, &state);
- ops::fs::init(&mut i, &state);
- ops::io::init(&mut i, &state);
- ops::plugins::init(&mut i, &state, op_registry);
- ops::net::init(&mut i, &state);
- ops::tls::init(&mut i, &state);
- ops::os::init(&mut i, &state);
- ops::permissions::init(&mut i, &state);
- ops::process::init(&mut i, &state);
- ops::random::init(&mut i, &state);
- ops::repl::init(&mut i, &state);
- ops::resources::init(&mut i, &state);
- ops::timers::init(&mut i, &state);
- ops::workers::init(&mut i, &state);
-
- let global_state_ = state.global_state.clone();
- i.set_js_error_create(move |v8_exception| {
- JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
- })
- }
+ let mut isolate =
+ deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
+ let op_registry = isolate.op_registry.clone();
+
+ ops::compiler::init(&mut isolate, &state);
+ ops::errors::init(&mut isolate, &state);
+ ops::fetch::init(&mut isolate, &state);
+ ops::files::init(&mut isolate, &state);
+ ops::fs::init(&mut isolate, &state);
+ ops::io::init(&mut isolate, &state);
+ ops::plugins::init(&mut isolate, &state, op_registry);
+ ops::net::init(&mut isolate, &state);
+ ops::tls::init(&mut isolate, &state);
+ ops::os::init(&mut isolate, &state);
+ ops::permissions::init(&mut isolate, &state);
+ ops::process::init(&mut isolate, &state);
+ ops::random::init(&mut isolate, &state);
+ ops::repl::init(&mut isolate, &state);
+ ops::resources::init(&mut isolate, &state);
+ ops::timers::init(&mut isolate, &state);
+ ops::worker_host::init(&mut isolate, &state);
+ ops::web_worker::init(&mut isolate, &state);
+
+ let global_state_ = state.global_state.clone();
+ isolate.set_js_error_create(move |v8_exception| {
+ JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
+ });
Self {
name,
- isolate,
+ isolate: Arc::new(AsyncMutex::new(isolate)),
state,
external_channels: Arc::new(Mutex::new(external_channels)),
}
}
- pub fn set_error_handler(
- &mut self,
- handler: Box<dyn FnMut(ErrBox) -> Result<(), ErrBox>>,
- ) {
- let mut i = self.isolate.try_lock().unwrap();
- i.set_error_handler(handler);
- }
-
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
let path = env::current_dir().unwrap().join("__anonymous__");
@@ -188,7 +175,7 @@ impl Future for Worker {
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
- channels: Arc<Mutex<WorkerChannels>>,
+ pub channels: Arc<Mutex<WorkerChannels>>,
}
impl Future for WorkerReceiver {
@@ -255,7 +242,6 @@ mod tests {
global_state,
None,
Some(module_specifier.clone()),
- true,
int,
)
.unwrap();
@@ -299,7 +285,6 @@ mod tests {
global_state,
None,
Some(module_specifier.clone()),
- true,
int,
)
.unwrap();
@@ -342,7 +327,6 @@ mod tests {
global_state.clone(),
None,
Some(module_specifier.clone()),
- true,
int,
)
.unwrap();