summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/compilers/compiler_worker.rs2
-rw-r--r--cli/compilers/ts.rs84
-rw-r--r--cli/compilers/wasm.rs118
-rw-r--r--cli/js/compiler.ts7
-rw-r--r--cli/js/dispatch.ts4
-rw-r--r--cli/js/globals.ts1
-rw-r--r--cli/js/lib.deno.worker.d.ts1
-rw-r--r--cli/js/main.ts11
-rw-r--r--cli/js/runtime_worker.ts82
-rw-r--r--cli/js/unit_tests.ts1
-rw-r--r--cli/js/workers.ts80
-rw-r--r--cli/js/workers_test.ts84
-rw-r--r--cli/ops/runtime.rs19
-rw-r--r--cli/ops/web_worker.rs80
-rw-r--r--cli/ops/worker_host.rs351
-rw-r--r--cli/state.rs10
-rw-r--r--cli/tests/integration_tests.rs2
-rw-r--r--cli/tests/subdir/nested_worker.js19
-rw-r--r--cli/tests/subdir/sibling_worker.js4
-rw-r--r--cli/tests/subdir/test_worker.js2
-rw-r--r--cli/tests/subdir/test_worker.ts2
-rw-r--r--cli/tests/subdir/throwing_worker.js2
-rw-r--r--cli/tokio_util.rs27
-rw-r--r--cli/web_worker.rs135
-rw-r--r--cli/worker.rs102
-rw-r--r--core/es_isolate.rs48
26 files changed, 770 insertions, 508 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs
index 3252aae02..123c29abb 100644
--- a/cli/compilers/compiler_worker.rs
+++ b/cli/compilers/compiler_worker.rs
@@ -30,7 +30,7 @@ impl CompilerWorker {
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
ops::compiler::init(isolate, &state);
- ops::web_worker::init(isolate, &state);
+ ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
ops::errors::init(isolate, &state);
// for compatibility with Worker scope, though unused at
// the moment
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index 91c264345..d1fb3a6ac 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -8,11 +8,15 @@ use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::GlobalState;
use crate::msg;
+use crate::ops::worker_host::run_worker_loop;
use crate::ops::JsonResult;
use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
+use crate::tokio_util::create_basic_runtime;
use crate::version;
+use crate::worker::WorkerEvent;
+use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
@@ -288,13 +292,11 @@ impl TsCompiler {
true,
);
- let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?;
- if let Some(ref msg) = maybe_msg {
- let json_str = std::str::from_utf8(msg).unwrap();
- debug!("Message: {}", json_str);
- if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
- return Err(ErrBox::from(diagnostics));
- }
+ let msg = execute_in_thread(global_state.clone(), req_msg).await?;
+ let json_str = std::str::from_utf8(&msg).unwrap();
+ debug!("Message: {}", json_str);
+ if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
+ return Err(ErrBox::from(diagnostics));
}
Ok(())
}
@@ -376,13 +378,11 @@ impl TsCompiler {
let compiling_job = global_state
.progress
.add("Compile", &module_url.to_string());
- let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?;
+ let msg = execute_in_thread(global_state.clone(), req_msg).await?;
- if let Some(ref msg) = maybe_msg {
- let json_str = std::str::from_utf8(msg).unwrap();
- if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
- return Err(ErrBox::from(diagnostics));
- }
+ let json_str = std::str::from_utf8(&msg).unwrap();
+ if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
+ return Err(ErrBox::from(diagnostics));
}
let compiled_module = ts_compiler.get_compiled_module(&source_file_.url)?;
drop(compiling_job);
@@ -602,45 +602,45 @@ impl TsCompiler {
}
}
+// TODO(bartlomieju): exactly same function is in `wasm.rs` - only difference
+// it created WasmCompiler instead of TsCompiler - deduplicate
async fn execute_in_thread(
global_state: GlobalState,
req: Buf,
-) -> Result<Option<Buf>, ErrBox> {
- let (load_sender, load_receiver) =
- tokio::sync::oneshot::channel::<Result<Option<Buf>, ErrBox>>();
- std::thread::spawn(move || {
- debug!(">>>>> compile_async START");
-
+) -> Result<Buf, ErrBox> {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
+ let builder =
+ std::thread::Builder::new().name("deno-ts-compiler".to_string());
+ let join_handle = builder.spawn(move || {
let mut worker = TsCompiler::setup_worker(global_state.clone());
- let handle = worker.thread_safe_handle();
-
- crate::tokio_util::run_basic(
- async move {
- if let Err(err) = handle.post_message(req).await {
- load_sender.send(Err(err)).unwrap();
- return;
- }
- if let Err(err) = (&mut *worker).await {
- load_sender.send(Err(err)).unwrap();
- return;
- }
- let maybe_msg = handle.get_message().await;
- load_sender.send(Ok(maybe_msg)).unwrap();
- debug!(">>>>> compile_sync END");
- }
- .boxed_local(),
- );
- });
-
- load_receiver.await.unwrap()
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+ let mut rt = create_basic_runtime();
+ run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ })?;
+ let mut handle = handle_receiver.recv().unwrap()?;
+ handle.post_message(req).await?;
+ let event = handle.get_event().await.expect("Compiler didn't respond");
+ let buf = match event {
+ WorkerEvent::Message(buf) => Ok(buf),
+ WorkerEvent::Error(error) => Err(error),
+ }?;
+ // Compiler worker finishes after one request
+ // so we should receive signal that channel was closed.
+ // Then close worker's channel and join the thread.
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
+ join_handle.join().unwrap();
+ Ok(buf)
}
async fn execute_in_thread_json(
req_msg: Buf,
global_state: GlobalState,
) -> JsonResult {
- let maybe_msg = execute_in_thread(global_state, req_msg).await?;
- let msg = maybe_msg.unwrap();
+ let msg = execute_in_thread(global_state, req_msg).await?;
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}
diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs
index c1c179f62..9bc9d2ab4 100644
--- a/cli/compilers/wasm.rs
+++ b/cli/compilers/wasm.rs
@@ -3,8 +3,13 @@ use super::compiler_worker::CompilerWorker;
use crate::compilers::CompiledModule;
use crate::file_fetcher::SourceFile;
use crate::global_state::GlobalState;
+use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::*;
+use crate::tokio_util::create_basic_runtime;
+use crate::worker::WorkerEvent;
+use crate::worker::WorkerHandle;
+use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
use serde_derive::Deserialize;
@@ -83,64 +88,67 @@ impl WasmCompiler {
if let Some(m) = maybe_cached {
return Ok(m);
}
-
- let (load_sender, load_receiver) =
- tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>();
-
- std::thread::spawn(move || {
- debug!(">>>>> wasm_compile_async START");
- let base64_data = base64::encode(&source_file.source_code);
- let mut worker = WasmCompiler::setup_worker(global_state);
- let handle = worker.thread_safe_handle();
- let url = source_file.url.clone();
-
- let fut = async move {
- let _ = handle
- .post_message(
- serde_json::to_string(&base64_data)
- .unwrap()
- .into_boxed_str()
- .into_boxed_bytes(),
- )
- .await;
-
- if let Err(err) = (&mut *worker).await {
- load_sender.send(Err(err)).unwrap();
- return;
- }
-
- debug!("Sent message to worker");
- let json_msg = handle.get_message().await.expect("not handled");
-
- debug!("Received message from worker");
- let module_info: WasmModuleInfo =
- serde_json::from_slice(&json_msg).unwrap();
-
- debug!("WASM module info: {:#?}", &module_info);
- let code = wrap_wasm_code(
- &base64_data,
- &module_info.import_list,
- &module_info.export_list,
- );
-
- debug!("Generated code: {}", &code);
- let module = CompiledModule {
- code,
- name: url.to_string(),
- };
- {
- cache_.lock().unwrap().insert(url.clone(), module.clone());
- }
- debug!("<<<<< wasm_compile_async END");
- load_sender.send(Ok(module)).unwrap();
- };
-
- crate::tokio_util::run_basic(fut);
- });
- load_receiver.await.unwrap()
+ debug!(">>>>> wasm_compile_async START");
+ let base64_data = base64::encode(&source_file.source_code);
+ let url = source_file.url.clone();
+ let req_msg = serde_json::to_string(&base64_data)
+ .unwrap()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let msg = execute_in_thread(global_state.clone(), req_msg).await?;
+ debug!("Received message from worker");
+ let module_info: WasmModuleInfo = serde_json::from_slice(&msg).unwrap();
+ debug!("WASM module info: {:#?}", &module_info);
+ let code = wrap_wasm_code(
+ &base64_data,
+ &module_info.import_list,
+ &module_info.export_list,
+ );
+ debug!("Generated code: {}", &code);
+ let module = CompiledModule {
+ code,
+ name: url.to_string(),
+ };
+ {
+ cache_.lock().unwrap().insert(url.clone(), module.clone());
+ }
+ debug!("<<<<< wasm_compile_async END");
+ Ok(module)
}
}
+async fn execute_in_thread(
+ global_state: GlobalState,
+ req: Buf,
+) -> Result<Buf, ErrBox> {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
+ let builder =
+ std::thread::Builder::new().name("deno-wasm-compiler".to_string());
+ let join_handle = builder.spawn(move || {
+ let mut worker = WasmCompiler::setup_worker(global_state);
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+ let mut rt = create_basic_runtime();
+ run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ })?;
+ let mut handle = handle_receiver.recv().unwrap()?;
+ handle.post_message(req).await?;
+ let event = handle.get_event().await.expect("Compiler didn't respond");
+ let buf = match event {
+ WorkerEvent::Message(buf) => Ok(buf),
+ WorkerEvent::Error(error) => Err(error),
+ }?;
+ // Compiler worker finishes after one request
+ // so we should receive signal that channel was closed.
+ // Then close worker's channel and join the thread.
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
+ join_handle.join().unwrap();
+ Ok(buf)
+}
+
fn build_single_import(index: usize, origin: &str) -> String {
let origin_json = serde_json::to_string(origin).unwrap();
format!(
diff --git a/cli/js/compiler.ts b/cli/js/compiler.ts
index bf0287efe..4ca2887c6 100644
--- a/cli/js/compiler.ts
+++ b/cli/js/compiler.ts
@@ -40,10 +40,7 @@ import { Diagnostic } from "./diagnostics.ts";
import { fromTypeScriptDiagnostic } from "./diagnostics_util.ts";
import { assert } from "./util.ts";
import * as util from "./util.ts";
-import {
- bootstrapWorkerRuntime,
- runWorkerMessageLoop
-} from "./runtime_worker.ts";
+import { bootstrapWorkerRuntime } from "./runtime_worker.ts";
interface CompilerRequestCompile {
type: CompilerRequestType.Compile;
@@ -340,13 +337,11 @@ async function wasmCompilerOnMessage({
function bootstrapTsCompilerRuntime(): void {
bootstrapWorkerRuntime("TS");
globalThis.onmessage = tsCompilerOnMessage;
- runWorkerMessageLoop();
}
function bootstrapWasmCompilerRuntime(): void {
bootstrapWorkerRuntime("WASM");
globalThis.onmessage = wasmCompilerOnMessage;
- runWorkerMessageLoop();
}
Object.defineProperties(globalThis, {
diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts
index 1a6b6528d..4493d3771 100644
--- a/cli/js/dispatch.ts
+++ b/cli/js/dispatch.ts
@@ -43,10 +43,10 @@ export let OP_REVOKE_PERMISSION: number;
export let OP_REQUEST_PERMISSION: number;
export let OP_CREATE_WORKER: number;
export let OP_HOST_POST_MESSAGE: number;
-export let OP_HOST_CLOSE_WORKER: number;
+export let OP_HOST_TERMINATE_WORKER: number;
export let OP_HOST_GET_MESSAGE: number;
export let OP_WORKER_POST_MESSAGE: number;
-export let OP_WORKER_GET_MESSAGE: number;
+export let OP_WORKER_CLOSE: number;
export let OP_RUN: number;
export let OP_RUN_STATUS: number;
export let OP_KILL: number;
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index 7cce739d5..53eb696ac 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -118,7 +118,6 @@ declare global {
var bootstrapWorkerRuntime:
| ((name: string) => Promise<void> | void)
| undefined;
- var runWorkerMessageLoop: (() => Promise<void> | void) | undefined;
var onerror:
| ((
msg: string,
diff --git a/cli/js/lib.deno.worker.d.ts b/cli/js/lib.deno.worker.d.ts
index 07955345c..3311d9457 100644
--- a/cli/js/lib.deno.worker.d.ts
+++ b/cli/js/lib.deno.worker.d.ts
@@ -37,7 +37,6 @@ declare const postMessage: typeof __workerMain.postMessage;
declare namespace __workerMain {
export let onmessage: (e: { data: any }) => void;
export function postMessage(data: any): void;
- export function getMessage(): Promise<any>;
export function close(): void;
export const name: string;
}
diff --git a/cli/js/main.ts b/cli/js/main.ts
index b48277960..fbebfefe4 100644
--- a/cli/js/main.ts
+++ b/cli/js/main.ts
@@ -1,9 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { bootstrapMainRuntime } from "./runtime_main.ts";
-import {
- bootstrapWorkerRuntime,
- runWorkerMessageLoop
-} from "./runtime_worker.ts";
+import { bootstrapWorkerRuntime } from "./runtime_worker.ts";
Object.defineProperties(globalThis, {
bootstrapMainRuntime: {
@@ -17,11 +14,5 @@ Object.defineProperties(globalThis, {
enumerable: false,
writable: false,
configurable: false
- },
- runWorkerMessageLoop: {
- value: runWorkerMessageLoop,
- enumerable: false,
- writable: false,
- configurable: false
}
});
diff --git a/cli/js/runtime_worker.ts b/cli/js/runtime_worker.ts
index 0dc65fdb6..a9ed8b924 100644
--- a/cli/js/runtime_worker.ts
+++ b/cli/js/runtime_worker.ts
@@ -3,12 +3,9 @@
// This module is the entry point for "worker" isolate, ie. the one
// that is created using `new Worker()` JS API.
//
-// It provides two functions that should be called by Rust:
+// It provides a single function that should be called by Rust:
// - `bootstrapWorkerRuntime` - must be called once, when Isolate is created.
// It sets up runtime by providing globals for `DedicatedWorkerScope`.
-// - `runWorkerMessageLoop` - starts receiving messages from parent worker,
-// can be called multiple times - eg. to restart worker execution after
-// exception occurred and was handled by parent worker
/* eslint-disable @typescript-eslint/no-explicit-any */
import {
@@ -20,13 +17,12 @@ import {
eventTargetProperties
} from "./globals.ts";
import * as dispatch from "./dispatch.ts";
-import { sendAsync, sendSync } from "./dispatch_json.ts";
+import { sendSync } from "./dispatch_json.ts";
import { log } from "./util.ts";
-import { TextDecoder, TextEncoder } from "./text_encoding.ts";
+import { TextEncoder } from "./text_encoding.ts";
import * as runtime from "./runtime.ts";
const encoder = new TextEncoder();
-const decoder = new TextDecoder();
// TODO(bartlomieju): remove these funtions
// Stuff for workers
@@ -39,62 +35,46 @@ export function postMessage(data: any): void {
sendSync(dispatch.OP_WORKER_POST_MESSAGE, {}, dataIntArray);
}
-export async function getMessage(): Promise<any> {
- log("getMessage");
- const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
- if (res.data != null) {
- const dataIntArray = new Uint8Array(res.data);
- const dataJson = decoder.decode(dataIntArray);
- return JSON.parse(dataJson);
- } else {
- return null;
- }
-}
-
let isClosing = false;
let hasBootstrapped = false;
export function close(): void {
+ if (isClosing) {
+ return;
+ }
+
isClosing = true;
+ sendSync(dispatch.OP_WORKER_CLOSE);
}
-export async function runWorkerMessageLoop(): Promise<void> {
- while (!isClosing) {
- const data = await getMessage();
- if (data == null) {
- log("runWorkerMessageLoop got null message. quitting.");
- break;
- }
+export async function workerMessageRecvCallback(data: string): Promise<void> {
+ let result: void | Promise<void>;
+ const event = { data };
- let result: void | Promise<void>;
- const event = { data };
-
- try {
- if (!globalThis["onmessage"]) {
- break;
- }
+ try {
+ //
+ if (globalThis["onmessage"]) {
result = globalThis.onmessage!(event);
if (result && "then" in result) {
await result;
}
- if (!globalThis["onmessage"]) {
- break;
- }
- } catch (e) {
- if (globalThis["onerror"]) {
- const result = globalThis.onerror(
- e.message,
- e.fileName,
- e.lineNumber,
- e.columnNumber,
- e
- );
- if (result === true) {
- continue;
- }
+ }
+
+ // TODO: run the rest of liteners
+ } catch (e) {
+ if (globalThis["onerror"]) {
+ const result = globalThis.onerror(
+ e.message,
+ e.fileName,
+ e.lineNumber,
+ e.columnNumber,
+ e
+ );
+ if (result === true) {
+ return;
}
- throw e;
}
+ throw e;
}
}
@@ -102,8 +82,10 @@ export const workerRuntimeGlobalProperties = {
self: readOnly(globalThis),
onmessage: writable(onmessage),
onerror: writable(onerror),
+ // TODO: should be readonly?
close: nonEnumerable(close),
- postMessage: writable(postMessage)
+ postMessage: writable(postMessage),
+ workerMessageRecvCallback: nonEnumerable(workerMessageRecvCallback)
};
/**
diff --git a/cli/js/unit_tests.ts b/cli/js/unit_tests.ts
index 992169e55..a6435d183 100644
--- a/cli/js/unit_tests.ts
+++ b/cli/js/unit_tests.ts
@@ -59,6 +59,7 @@ import "./write_file_test.ts";
import "./performance_test.ts";
import "./permissions_test.ts";
import "./version_test.ts";
+import "./workers_test.ts";
import { runIfMain } from "../../std/testing/mod.ts";
diff --git a/cli/js/workers.ts b/cli/js/workers.ts
index fb63a3260..7b0c50336 100644
--- a/cli/js/workers.ts
+++ b/cli/js/workers.ts
@@ -38,19 +38,23 @@ function createWorker(
});
}
+function hostTerminateWorker(id: number): void {
+ sendSync(dispatch.OP_HOST_TERMINATE_WORKER, { id });
+}
+
function hostPostMessage(id: number, data: any): void {
const dataIntArray = encodeMessage(data);
sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray);
}
-async function hostGetMessage(id: number): Promise<any> {
- const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
+interface WorkerEvent {
+ event: "error" | "msg" | "close";
+ data?: any;
+ error?: any;
+}
- if (res.data != null) {
- return decodeMessage(new Uint8Array(res.data));
- } else {
- return null;
- }
+async function hostGetMessage(id: number): Promise<any> {
+ return await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
}
export interface Worker {
@@ -72,6 +76,8 @@ export class WorkerImpl extends EventTarget implements Worker {
public onerror?: (e: any) => void;
public onmessage?: (data: any) => void;
public onmessageerror?: () => void;
+ private name: string;
+ private terminated = false;
constructor(specifier: string, options?: WorkerOptions) {
super();
@@ -88,6 +94,7 @@ export class WorkerImpl extends EventTarget implements Worker {
);
}
+ this.name = options?.name ?? "unknown";
const hasSourceCode = false;
const sourceCode = new Uint8Array();
@@ -139,42 +146,53 @@ export class WorkerImpl extends EventTarget implements Worker {
}
async poll(): Promise<void> {
- while (!this.isClosing) {
- const data = await hostGetMessage(this.id);
- if (data == null) {
- log("worker got null message. quitting.");
- break;
- }
- if (this.onmessage) {
- const event = { data };
- this.onmessage(event);
+ while (!this.terminated) {
+ const event = await hostGetMessage(this.id);
+
+ // If terminate was called then we ignore all messages
+ if (this.terminated) {
+ return;
}
- }
- /*
- while (true) {
- const result = await hostPollWorker(this.id);
+ const type = event.type;
- if (result.error) {
- if (!this.handleError(result.error)) {
- throw Error(result.error.message);
- } else {
- hostResumeWorker(this.id);
+ if (type === "msg") {
+ if (this.onmessage) {
+ const message = decodeMessage(new Uint8Array(event.data));
+ this.onmessage({ data: message });
}
- } else {
- this.isClosing = true;
- hostCloseWorker(this.id);
- break;
+ continue;
}
+
+ if (type === "error") {
+ if (!this.handleError(event.error)) {
+ throw Error(event.error.message);
+ }
+ continue;
+ }
+
+ if (type === "close") {
+ log(`Host got "close" message from worker: ${this.name}`);
+ this.terminated = true;
+ return;
+ }
+
+ throw new Error(`Unknown worker event: "${type}"`);
}
- */
}
postMessage(data: any): void {
+ if (this.terminated) {
+ return;
+ }
+
hostPostMessage(this.id, data);
}
terminate(): void {
- throw new Error("Not yet implemented");
+ if (!this.terminated) {
+ this.terminated = true;
+ hostTerminateWorker(this.id);
+ }
}
}
diff --git a/cli/js/workers_test.ts b/cli/js/workers_test.ts
new file mode 100644
index 000000000..9cb4f4a07
--- /dev/null
+++ b/cli/js/workers_test.ts
@@ -0,0 +1,84 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+import { test, assert, assertEquals } from "./test_util.ts";
+
+export interface ResolvableMethods<T> {
+ resolve: (value?: T | PromiseLike<T>) => void;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ reject: (reason?: any) => void;
+}
+
+export type Resolvable<T> = Promise<T> & ResolvableMethods<T>;
+
+export function createResolvable<T>(): Resolvable<T> {
+ let methods: ResolvableMethods<T>;
+ const promise = new Promise<T>((resolve, reject): void => {
+ methods = { resolve, reject };
+ });
+ // TypeScript doesn't know that the Promise callback occurs synchronously
+ // therefore use of not null assertion (`!`)
+ return Object.assign(promise, methods!) as Resolvable<T>;
+}
+
+test(async function workersBasic(): Promise<void> {
+ const promise = createResolvable();
+ const jsWorker = new Worker("../tests/subdir/test_worker.js", {
+ type: "module",
+ name: "jsWorker"
+ });
+ const tsWorker = new Worker("../tests/subdir/test_worker.ts", {
+ type: "module",
+ name: "tsWorker"
+ });
+
+ tsWorker.onmessage = (e): void => {
+ assertEquals(e.data, "Hello World");
+ promise.resolve();
+ };
+
+ jsWorker.onmessage = (e): void => {
+ assertEquals(e.data, "Hello World");
+ tsWorker.postMessage("Hello World");
+ };
+
+ jsWorker.onerror = (e: Event): void => {
+ e.preventDefault();
+ jsWorker.postMessage("Hello World");
+ };
+
+ jsWorker.postMessage("Hello World");
+ await promise;
+});
+
+test(async function nestedWorker(): Promise<void> {
+ const promise = createResolvable();
+
+ const nestedWorker = new Worker("../tests/subdir/nested_worker.js", {
+ type: "module",
+ name: "nested"
+ });
+
+ nestedWorker.onmessage = (e): void => {
+ assert(e.data.type !== "error");
+ promise.resolve();
+ };
+
+ nestedWorker.postMessage("Hello World");
+ await promise;
+});
+
+test(async function workerThrowsWhenExecuting(): Promise<void> {
+ const promise = createResolvable();
+
+ const throwingWorker = new Worker("../tests/subdir/throwing_worker.js", {
+ type: "module"
+ });
+
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ throwingWorker.onerror = (e: any): void => {
+ e.preventDefault();
+ assertEquals(e.message, "Uncaught Error: Thrown error");
+ promise.resolve();
+ };
+
+ await promise;
+});
diff --git a/cli/ops/runtime.rs b/cli/ops/runtime.rs
index a962f4e83..7773e461c 100644
--- a/cli/ops/runtime.rs
+++ b/cli/ops/runtime.rs
@@ -8,6 +8,7 @@ use crate::version;
use crate::DenoSubcommand;
use deno_core::*;
use std::env;
+use std::sync::atomic::Ordering;
/// BUILD_OS and BUILD_ARCH match the values in Deno.build. See js/build.ts.
#[cfg(target_os = "macos")]
@@ -21,6 +22,7 @@ static BUILD_ARCH: &str = "x64";
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("start", s.core_op(json_op(s.stateful_op(op_start))));
+ i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
fn op_start(
@@ -47,3 +49,20 @@ fn op_start(
"arch": BUILD_ARCH,
})))
}
+
+fn op_metrics(
+ state: &State,
+ _args: Value,
+ _zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let state = state.borrow();
+ let m = &state.metrics;
+
+ Ok(JsonOp::Sync(json!({
+ "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
+ "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
+ "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
+ "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
+ "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
+ })))
+}
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
index ae6b10abc..e22c0f221 100644
--- a/cli/ops/web_worker.rs
+++ b/cli/ops/web_worker.rs
@@ -1,65 +1,65 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
-use crate::deno_error::DenoError;
-use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::state::State;
+use crate::worker::WorkerEvent;
use deno_core::*;
use futures;
-use futures::future::FutureExt;
+use futures::channel::mpsc;
+use futures::sink::SinkExt;
use std;
use std::convert::From;
-pub fn init(i: &mut Isolate, s: &State) {
+pub fn web_worker_op<D>(
+ sender: mpsc::Sender<WorkerEvent>,
+ dispatcher: D,
+) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>
+where
+ D: Fn(
+ &mpsc::Sender<WorkerEvent>,
+ Value,
+ Option<ZeroCopyBuf>,
+ ) -> Result<JsonOp, ErrBox>,
+{
+ move |args: Value, zero_copy: Option<ZeroCopyBuf>| -> Result<JsonOp, ErrBox> {
+ dispatcher(&sender, args, zero_copy)
+ }
+}
+
+pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) {
i.register_op(
"worker_post_message",
- s.core_op(json_op(s.stateful_op(op_worker_post_message))),
+ s.core_op(json_op(web_worker_op(
+ sender.clone(),
+ op_worker_post_message,
+ ))),
);
i.register_op(
- "worker_get_message",
- s.core_op(json_op(s.stateful_op(op_worker_get_message))),
+ "worker_close",
+ s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))),
);
}
-/// Get message from host as guest worker
-fn op_worker_get_message(
- state: &State,
- _args: Value,
- _data: Option<ZeroCopyBuf>,
-) -> Result<JsonOp, ErrBox> {
- let state_ = state.clone();
- let op = async move {
- let fut = {
- let state = state_.borrow();
- state
- .worker_channels_internal
- .as_ref()
- .unwrap()
- .get_message()
- };
- let maybe_buf = fut.await;
- debug!("op_worker_get_message");
- Ok(json!({ "data": maybe_buf }))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
-}
-
/// Post message to host as guest worker
fn op_worker_post_message(
- state: &State,
+ sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let state = state.borrow();
- let fut = state
- .worker_channels_internal
- .as_ref()
- .unwrap()
- .post_message(d);
- futures::executor::block_on(fut)
- .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+ let mut sender = sender.clone();
+ let fut = sender.send(WorkerEvent::Message(d));
+ futures::executor::block_on(fut).expect("Failed to post message to host");
+ Ok(JsonOp::Sync(json!({})))
+}
+/// Notify host that guest worker closes
+fn op_worker_close(
+ sender: &mpsc::Sender<WorkerEvent>,
+ _args: Value,
+ _data: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let mut sender = sender.clone();
+ sender.close_channel();
Ok(JsonOp::Sync(json!({})))
}
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index fabe0b5e8..4f6f996ee 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -1,21 +1,29 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use crate::deno_error::bad_resource;
-use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
+use crate::deno_error::GetErrorKind;
+use crate::fmt_errors::JSError;
+use crate::futures::SinkExt;
+use crate::global_state::GlobalState;
use crate::ops::json_op;
+use crate::permissions::DenoPermissions;
use crate::startup_data;
use crate::state::State;
+use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
-use crate::worker::WorkerChannelsExternal;
+use crate::worker::Worker;
+use crate::worker::WorkerEvent;
+use crate::worker::WorkerHandle;
use deno_core::*;
use futures;
+use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
+use futures::stream::StreamExt;
use std;
use std::convert::From;
-use std::sync::atomic::Ordering;
+use std::task::Poll;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
@@ -23,8 +31,8 @@ pub fn init(i: &mut Isolate, s: &State) {
s.core_op(json_op(s.stateful_op(op_create_worker))),
);
i.register_op(
- "host_close_worker",
- s.core_op(json_op(s.stateful_op(op_host_close_worker))),
+ "host_terminate_worker",
+ s.core_op(json_op(s.stateful_op(op_host_terminate_worker))),
);
i.register_op(
"host_post_message",
@@ -34,7 +42,159 @@ pub fn init(i: &mut Isolate, s: &State) {
"host_get_message",
s.core_op(json_op(s.stateful_op(op_host_get_message))),
);
- i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
+}
+
+fn create_web_worker(
+ name: String,
+ global_state: GlobalState,
+ permissions: DenoPermissions,
+ specifier: ModuleSpecifier,
+) -> Result<WebWorker, ErrBox> {
+ let state =
+ State::new_for_worker(global_state, Some(permissions), specifier)?;
+
+ let mut worker =
+ WebWorker::new(name.to_string(), startup_data::deno_isolate_init(), state);
+ let script = format!("bootstrapWorkerRuntime(\"{}\")", name);
+ worker.execute(&script)?;
+
+ Ok(worker)
+}
+
+// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
+pub fn run_worker_loop(
+ rt: &mut tokio::runtime::Runtime,
+ worker: &mut Worker,
+) -> Result<(), ErrBox> {
+ let mut worker_is_ready = false;
+
+ let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> {
+ if !worker_is_ready {
+ match worker.poll_unpin(cx) {
+ Poll::Ready(r) => {
+ if let Err(e) = r {
+ let mut sender = worker.internal_channels.sender.clone();
+ futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
+ .expect("Failed to post message to host");
+ }
+ worker_is_ready = true;
+ }
+ Poll::Pending => {}
+ }
+ }
+
+ let maybe_msg = {
+ match worker.internal_channels.receiver.poll_next_unpin(cx) {
+ Poll::Ready(r) => match r {
+ Some(msg) => {
+ let msg_str = String::from_utf8(msg.to_vec()).unwrap();
+ debug!("received message from host: {}", msg_str);
+ Some(msg_str)
+ }
+ None => {
+ debug!("channel closed by host, worker event loop shuts down");
+ return Poll::Ready(Ok(()));
+ }
+ },
+ Poll::Pending => None,
+ }
+ };
+
+ if let Some(msg) = maybe_msg {
+ // TODO: just add second value and then bind using rusty_v8
+ // to get structured clone/transfer working
+ let script = format!("workerMessageRecvCallback({})", msg);
+ worker
+ .execute(&script)
+ .expect("Failed to execute message cb");
+ // Let worker be polled again
+ worker_is_ready = false;
+ worker.waker.wake();
+ }
+
+ Poll::Pending
+ });
+
+ rt.block_on(fut)
+}
+
+// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
+// TODO(bartlomieju): check if order of actions is aligned to Worker spec
+fn run_worker_thread(
+ name: String,
+ global_state: GlobalState,
+ permissions: DenoPermissions,
+ specifier: ModuleSpecifier,
+ has_source_code: bool,
+ source_code: String,
+) -> Result<WorkerHandle, ErrBox> {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
+
+ let builder =
+ std::thread::Builder::new().name(format!("deno-worker-{}", name));
+ // TODO(bartlomieju): store JoinHandle as well
+ builder.spawn(move || {
+ // Any error inside this block is terminal:
+ // - JS worker is useless - meaning it throws an exception and can't do anything else,
+ // all action done upon it should be noops
+ // - newly spawned thread exits
+ let result =
+ create_web_worker(name, global_state, permissions, specifier.clone());
+
+ if let Err(err) = result {
+ handle_sender.send(Err(err)).unwrap();
+ return;
+ }
+
+ let mut worker = result.unwrap();
+ // Send thread safe handle to newly created worker to host thread
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+
+ // At this point the only method of communication with host
+ // is using `worker.internal_channels`.
+ //
+ // Host can already push messages and interact with worker.
+ //
+ // Next steps:
+ // - create tokio runtime
+ // - load provided module or code
+ // - start driving worker's event loop
+
+ let mut rt = create_basic_runtime();
+
+ // TODO: run with using select with terminate
+
+ // Execute provided source code immediately
+ let result = if has_source_code {
+ worker.execute(&source_code)
+ } else {
+ // TODO(bartlomieju): add "type": "classic", ie. ability to load
+ // script instead of module
+ let load_future = worker
+ .execute_mod_async(&specifier, None, false)
+ .boxed_local();
+
+ rt.block_on(load_future)
+ };
+
+ if let Err(e) = result {
+ let mut sender = worker.internal_channels.sender.clone();
+ futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
+ .expect("Failed to post message to host");
+
+ // Failure to execute script is a terminal error, bye, bye.
+ return;
+ }
+
+ // TODO(bartlomieju): this thread should return result of event loop
+ // that means that we should store JoinHandle to thread to ensure
+ // that it actually terminates.
+ run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ })?;
+
+ handle_receiver.recv().unwrap()
}
#[derive(Deserialize)]
@@ -61,72 +221,28 @@ fn op_create_worker(
let parent_state = state.clone();
let state = state.borrow();
let global_state = state.global_state.clone();
- let child_permissions = state.permissions.clone();
+ let permissions = state.permissions.clone();
let referrer = state.main_module.to_string();
drop(state);
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<Result<WorkerChannelsExternal, ErrBox>>(1);
-
- // TODO(bartlomieju): Isn't this wrong?
- let result = ModuleSpecifier::resolve_url_or_path(&specifier)?;
- let module_specifier = if !has_source_code {
- ModuleSpecifier::resolve_import(&specifier, &referrer)?
- } else {
- result
- };
-
- std::thread::spawn(move || {
- let result = State::new_for_worker(
- global_state,
- Some(child_permissions), // by default share with parent
- module_specifier.clone(),
- );
- if let Err(err) = result {
- handle_sender.send(Err(err)).unwrap();
- return;
- }
- let child_state = result.unwrap();
- let worker_name = args_name.unwrap_or_else(|| {
- // TODO(bartlomieju): change it to something more descriptive
- format!("USER-WORKER-{}", specifier)
- });
-
- // TODO: add a new option to make child worker not sharing permissions
- // with parent (aka .clone(), requests from child won't reflect in parent)
- let mut worker = WebWorker::new(
- worker_name.to_string(),
- startup_data::deno_isolate_init(),
- child_state,
- );
- let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name);
- js_check(worker.execute(&script));
- js_check(worker.execute("runWorkerMessageLoop()"));
-
- handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
-
- // Has provided source code, execute immediately.
- if has_source_code {
- js_check(worker.execute(&source_code));
- // FIXME(bartlomieju): runtime is not run in this case
- return;
- }
-
- let fut = async move {
- let r = worker
- .execute_mod_async(&module_specifier, None, false)
- .await;
- if r.is_ok() {
- let _ = (&mut *worker).await;
- }
- }
- .boxed_local();
-
- crate::tokio_util::run_basic(fut);
+ let module_specifier =
+ ModuleSpecifier::resolve_import(&specifier, &referrer)?;
+ let worker_name = args_name.unwrap_or_else(|| {
+ // TODO(bartlomieju): change it to something more descriptive
+ format!("USER-WORKER-{}", specifier)
});
- let handle = handle_receiver.recv().unwrap()?;
- let worker_id = parent_state.add_child_worker(handle);
+ let worker_handle = run_worker_thread(
+ worker_name,
+ global_state,
+ permissions,
+ module_specifier,
+ has_source_code,
+ source_code,
+ )?;
+ // At this point all interactions with worker happen using thread
+ // safe handler returned from previous function call
+ let worker_id = parent_state.add_child_worker(worker_handle);
Ok(JsonOp::Sync(json!({ "id": worker_id })))
}
@@ -136,7 +252,7 @@ struct WorkerArgs {
id: i32,
}
-fn op_host_close_worker(
+fn op_host_terminate_worker(
state: &State,
args: Value,
_data: Option<ZeroCopyBuf>,
@@ -144,23 +260,37 @@ fn op_host_close_worker(
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let mut state = state.borrow_mut();
-
- let maybe_worker_handle = state.workers.remove(&id);
- if let Some(worker_handle) = maybe_worker_handle {
- let mut sender = worker_handle.sender.clone();
- sender.close_channel();
-
- let mut receiver =
- futures::executor::block_on(worker_handle.receiver.lock());
- receiver.close();
- };
-
+ let worker_handle =
+ state.workers.remove(&id).expect("No worker handle found");
+ worker_handle.terminate();
Ok(JsonOp::Sync(json!({})))
}
-#[derive(Deserialize)]
-struct HostGetMessageArgs {
- id: i32,
+fn serialize_worker_event(event: WorkerEvent) -> Value {
+ match event {
+ WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
+ WorkerEvent::Error(error) => match error.kind() {
+ ErrorKind::JSError => {
+ let error = error.downcast::<JSError>().unwrap();
+ let exception: V8Exception = error.into();
+ json!({
+ "type": "error",
+ "error": {
+ "message": exception.message,
+ "fileName": exception.script_resource_name,
+ "lineNumber": exception.line_number,
+ "columnNumber": exception.start_column,
+ }
+ })
+ }
+ _ => json!({
+ "type": "error",
+ "error": {
+ "message": error.to_string(),
+ }
+ }),
+ },
+ }
}
/// Get message from guest worker as host
@@ -169,59 +299,48 @@ fn op_host_get_message(
args: Value,
_data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
- let args: HostGetMessageArgs = serde_json::from_value(args)?;
+ let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
-
- let state = state.borrow();
- // TODO: don't return bad resource anymore
- let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
- let fut = worker_handle.get_message();
+ let state_ = state.borrow();
+ let worker_handle = state_
+ .workers
+ .get(&id)
+ .expect("No worker handle found")
+ .clone();
+ let state_ = state.clone();
let op = async move {
- let maybe_buf = fut.await;
- Ok(json!({ "data": maybe_buf }))
+ let response = match worker_handle.get_event().await {
+ Some(event) => serialize_worker_event(event),
+ None => {
+ let mut state_ = state_.borrow_mut();
+ let mut handle =
+ state_.workers.remove(&id).expect("No worker handle found");
+ handle.sender.close_channel();
+ // TODO(bartlomieju): join thread handle here
+ json!({ "type": "close" })
+ }
+ };
+ Ok(response)
};
Ok(JsonOp::Async(op.boxed_local()))
}
-#[derive(Deserialize)]
-struct HostPostMessageArgs {
- id: i32,
-}
-
/// Post message to guest worker as host
fn op_host_post_message(
state: &State,
args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
- let args: HostPostMessageArgs = serde_json::from_value(args)?;
+ let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
debug!("post message to worker {}", id);
let state = state.borrow();
- // TODO: don't return bad resource anymore
- let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
+ let worker_handle = state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({})))
}
-
-fn op_metrics(
- state: &State,
- _args: Value,
- _zero_copy: Option<ZeroCopyBuf>,
-) -> Result<JsonOp, ErrBox> {
- let state = state.borrow();
- let m = &state.metrics;
-
- Ok(JsonOp::Sync(json!({
- "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
- "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
- "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
- "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
- "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
- })))
-}
diff --git a/cli/state.rs b/cli/state.rs
index 4e2f47e62..b9ef62053 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -8,8 +8,7 @@ use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
-use crate::worker::WorkerChannelsExternal;
-use crate::worker::WorkerChannelsInternal;
+use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::ErrBox;
@@ -55,8 +54,7 @@ pub struct StateInner {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: GlobalTimer,
- pub workers: HashMap<u32, WorkerChannelsExternal>,
- pub worker_channels_internal: Option<WorkerChannelsInternal>,
+ pub workers: HashMap<u32, WorkerHandle>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<StdRng>,
@@ -232,7 +230,6 @@ impl State {
import_map,
metrics: Metrics::default(),
global_timer: GlobalTimer::new(),
- worker_channels_internal: None,
workers: HashMap::new(),
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
@@ -269,7 +266,6 @@ impl State {
import_map: None,
metrics: Metrics::default(),
global_timer: GlobalTimer::new(),
- worker_channels_internal: None,
workers: HashMap::new(),
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
@@ -282,7 +278,7 @@ impl State {
Ok(Self(state))
}
- pub fn add_child_worker(&self, handle: WorkerChannelsExternal) -> u32 {
+ pub fn add_child_worker(&self, handle: WorkerHandle) -> u32 {
let mut inner_state = self.borrow_mut();
let worker_id =
inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs
index dd242a32a..a42dd439e 100644
--- a/cli/tests/integration_tests.rs
+++ b/cli/tests/integration_tests.rs
@@ -397,12 +397,10 @@ itest!(_026_redirect_javascript {
http_server: true,
});
-/* TODO(ry) Disabled to get #3844 landed faster. Re-enable.
itest!(_026_workers {
args: "run --reload 026_workers.ts",
output: "026_workers.ts.out",
});
-*/
itest!(workers_basic {
args: "run --reload workers_basic.ts",
diff --git a/cli/tests/subdir/nested_worker.js b/cli/tests/subdir/nested_worker.js
new file mode 100644
index 000000000..b0acd70d7
--- /dev/null
+++ b/cli/tests/subdir/nested_worker.js
@@ -0,0 +1,19 @@
+// Specifier should be resolved relative to current file
+const jsWorker = new Worker("./sibling_worker.js", {
+ type: "module",
+ name: "sibling"
+});
+
+jsWorker.onerror = _e => {
+ postMessage({ type: "error" });
+};
+
+jsWorker.onmessage = e => {
+ console.log("js worker on message");
+ postMessage({ type: "msg", text: e });
+ close();
+};
+
+onmessage = function(e) {
+ jsWorker.postMessage(e.data);
+};
diff --git a/cli/tests/subdir/sibling_worker.js b/cli/tests/subdir/sibling_worker.js
new file mode 100644
index 000000000..0e91141ce
--- /dev/null
+++ b/cli/tests/subdir/sibling_worker.js
@@ -0,0 +1,4 @@
+onmessage = e => {
+ postMessage(e.data);
+ close();
+};
diff --git a/cli/tests/subdir/test_worker.js b/cli/tests/subdir/test_worker.js
index f0d9fbed6..70e1d8b73 100644
--- a/cli/tests/subdir/test_worker.js
+++ b/cli/tests/subdir/test_worker.js
@@ -1,6 +1,5 @@
let thrown = false;
-// TODO(bartlomieju): add test for throwing in web worker
if (self.name !== "jsWorker") {
throw Error(`Bad worker name: ${self.name}, expected jsWorker`);
}
@@ -14,7 +13,6 @@ onmessage = function(e) {
}
postMessage(e.data);
-
close();
};
diff --git a/cli/tests/subdir/test_worker.ts b/cli/tests/subdir/test_worker.ts
index bc3f358f8..2ea8f9214 100644
--- a/cli/tests/subdir/test_worker.ts
+++ b/cli/tests/subdir/test_worker.ts
@@ -4,8 +4,6 @@ if (self.name !== "tsWorker") {
onmessage = function(e): void {
console.log(e.data);
-
postMessage(e.data);
-
close();
};
diff --git a/cli/tests/subdir/throwing_worker.js b/cli/tests/subdir/throwing_worker.js
new file mode 100644
index 000000000..56ee4ff88
--- /dev/null
+++ b/cli/tests/subdir/throwing_worker.js
@@ -0,0 +1,2 @@
+// This worker just throws error when it's being executed
+throw Error("Thrown error");
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs
index e5878cdf7..0e1257da7 100644
--- a/cli/tokio_util.rs
+++ b/cli/tokio_util.rs
@@ -1,30 +1,19 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-// TODO(ry) rename to run_local ?
-pub fn run_basic<F, R>(future: F) -> R
-where
- F: std::future::Future<Output = R> + 'static,
-{
- let mut rt = tokio::runtime::Builder::new()
+pub fn create_basic_runtime() -> tokio::runtime::Runtime {
+ tokio::runtime::Builder::new()
.basic_scheduler()
.enable_io()
.enable_time()
.build()
- .unwrap();
- rt.block_on(future)
+ .unwrap()
}
-// TODO(ry) maybe replace with tokio::task::spawn_blocking
-#[cfg(test)]
-pub fn spawn_thread<F, R>(f: F) -> impl std::future::Future<Output = R>
+// TODO(ry) rename to run_local ?
+pub fn run_basic<F, R>(future: F) -> R
where
- F: 'static + Send + FnOnce() -> R,
- R: 'static + Send,
+ F: std::future::Future<Output = R> + 'static,
{
- let (sender, receiver) = tokio::sync::oneshot::channel::<R>();
- std::thread::spawn(move || {
- let result = f();
- sender.send(result)
- });
- async { receiver.await.unwrap() }
+ let mut rt = create_basic_runtime();
+ rt.block_on(future)
}
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index 05e3184d9..c0a712aed 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -29,7 +29,7 @@ impl WebWorker {
{
let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state);
- ops::web_worker::init(isolate, &state);
+ ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
ops::worker_host::init(isolate, &state);
ops::errors::init(isolate, &state);
ops::timers::init(isolate, &state);
@@ -65,9 +65,12 @@ impl Future for WebWorker {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util;
+ use crate::worker::WorkerEvent;
+ use crate::worker::WorkerHandle;
fn create_test_worker() -> WebWorker {
let state = State::mock("./hello.js");
@@ -77,77 +80,95 @@ mod tests {
state,
);
worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap();
- worker.execute("runWorkerMessageLoop()").unwrap();
worker
}
-
#[test]
fn test_worker_messages() {
- let mut worker = create_test_worker();
- let source = r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- delete self.onmessage;
- return;
- } else {
- console.assert(e.data === "hi");
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<WorkerHandle>(1);
+
+ let join_handle = std::thread::spawn(move || {
+ let mut worker = create_test_worker();
+ let source = r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ return close();
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
}
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#;
- worker.execute(source).unwrap();
-
- let handle = worker.thread_safe_handle();
- let _ = tokio_util::spawn_thread(move || {
- tokio_util::run_basic(async move {
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone()).await;
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_message().await;
- assert!(maybe_msg.is_some());
-
- let r = handle.post_message(msg.clone()).await;
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_message().await;
- assert!(maybe_msg.is_some());
- assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
-
- let msg = json!("exit")
- .to_string()
- .into_boxed_str()
- .into_boxed_bytes();
- let r = handle.post_message(msg).await;
- assert!(r.is_ok());
- })
+ "#;
+ worker.execute(source).unwrap();
+ let handle = worker.thread_safe_handle();
+ handle_sender.send(handle).unwrap();
+ let mut rt = tokio_util::create_basic_runtime();
+ let r = run_worker_loop(&mut rt, &mut worker);
+ assert!(r.is_ok())
});
- let r = tokio_util::run_basic(worker);
- assert!(r.is_ok())
- }
+ let mut handle = handle_receiver.recv().unwrap();
- #[test]
- fn removed_from_resource_table_on_close() {
- let mut worker = create_test_worker();
- let handle = worker.thread_safe_handle();
+ tokio_util::run_basic(async move {
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = handle.post_message(msg.clone()).await;
+ assert!(r.is_ok());
- worker
- .execute("onmessage = () => { delete self.onmessage; }")
- .unwrap();
+ let maybe_msg = handle.get_event().await;
+ assert!(maybe_msg.is_some());
- let worker_post_message_fut = tokio_util::spawn_thread(move || {
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = futures::executor::block_on(handle.post_message(msg));
+ let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
+
+ let maybe_msg = handle.get_event().await;
+ assert!(maybe_msg.is_some());
+ match maybe_msg {
+ Some(WorkerEvent::Message(buf)) => {
+ assert_eq!(*buf, *b"[1,2,3]");
+ }
+ _ => unreachable!(),
+ }
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = handle.post_message(msg).await;
+ assert!(r.is_ok());
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
+ });
+ join_handle.join().expect("Failed to join worker thread");
+ }
+
+ #[test]
+ fn removed_from_resource_table_on_close() {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<WorkerHandle>(1);
+
+ let join_handle = std::thread::spawn(move || {
+ let mut worker = create_test_worker();
+ worker.execute("onmessage = () => { close(); }").unwrap();
+ let handle = worker.thread_safe_handle();
+ handle_sender.send(handle).unwrap();
+ let mut rt = tokio_util::create_basic_runtime();
+ let r = run_worker_loop(&mut rt, &mut worker);
+ assert!(r.is_ok())
});
+ let mut handle = handle_receiver.recv().unwrap();
+
tokio_util::run_basic(async move {
- worker_post_message_fut.await;
- let r = worker.await;
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = handle.post_message(msg.clone()).await;
assert!(r.is_ok());
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
});
+ join_handle.join().expect("Failed to join worker thread");
}
}
diff --git a/cli/worker.rs b/cli/worker.rs
index 20b8b8021..b804ff449 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -24,76 +24,55 @@ use std::task::Poll;
use tokio::sync::Mutex as AsyncMutex;
use url::Url;
-/// Wraps mpsc channels so they can be referenced
-/// from ops and used to facilitate parent-child communication
-/// for workers.
-#[derive(Clone)]
-pub struct WorkerChannels {
- pub sender: mpsc::Sender<Buf>,
- pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
+/// Events that are sent to host from child
+/// worker.
+pub enum WorkerEvent {
+ Message(Buf),
+ Error(ErrBox),
}
-impl WorkerChannels {
- /// Post message to worker as a host.
- pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
- let mut sender = self.sender.clone();
- sender.send(buf).map_err(ErrBox::from).await
- }
-
- /// Get message from worker as a host.
- pub fn get_message(&self) -> Pin<Box<dyn Future<Output = Option<Buf>>>> {
- let receiver_mutex = self.receiver.clone();
-
- async move {
- let mut receiver = receiver_mutex.lock().await;
- receiver.next().await
- }
- .boxed_local()
- }
+pub struct WorkerChannelsInternal {
+ pub sender: mpsc::Sender<WorkerEvent>,
+ pub receiver: mpsc::Receiver<Buf>,
}
-pub struct WorkerChannelsInternal(WorkerChannels);
-
-impl Deref for WorkerChannelsInternal {
- type Target = WorkerChannels;
- fn deref(&self) -> &Self::Target {
- &self.0
- }
+#[derive(Clone)]
+pub struct WorkerHandle {
+ pub sender: mpsc::Sender<Buf>,
+ pub receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
+ // terminate_channel
}
-impl DerefMut for WorkerChannelsInternal {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+impl WorkerHandle {
+ pub fn terminate(&self) {
+ todo!()
}
-}
-
-#[derive(Clone)]
-pub struct WorkerChannelsExternal(WorkerChannels);
-impl Deref for WorkerChannelsExternal {
- type Target = WorkerChannels;
- fn deref(&self) -> &Self::Target {
- &self.0
+ /// Post message to worker as a host.
+ pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
+ let mut sender = self.sender.clone();
+ sender.send(buf).map_err(ErrBox::from).await
}
-}
-impl DerefMut for WorkerChannelsExternal {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+ // TODO: should use `try_lock` and return error if
+ // more than one listener tries to get event
+ pub async fn get_event(&self) -> Option<WorkerEvent> {
+ let mut receiver = self.receiver.lock().await;
+ receiver.next().await
}
}
-fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) {
+fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) {
let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
- let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
- let internal_channels = WorkerChannelsInternal(WorkerChannels {
+ let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
+ let internal_channels = WorkerChannelsInternal {
sender: out_tx,
- receiver: Arc::new(AsyncMutex::new(in_rx)),
- });
- let external_channels = WorkerChannelsExternal(WorkerChannels {
+ receiver: in_rx,
+ };
+ let external_channels = WorkerHandle {
sender: in_tx,
receiver: Arc::new(AsyncMutex::new(out_rx)),
- });
+ };
(internal_channels, external_channels)
}
@@ -113,7 +92,9 @@ pub struct Worker {
pub name: String,
pub isolate: Box<deno_core::EsIsolate>,
pub state: State,
- external_channels: WorkerChannelsExternal,
+ pub waker: AtomicWaker,
+ pub(crate) internal_channels: WorkerChannelsInternal,
+ external_channels: WorkerHandle,
}
impl Worker {
@@ -127,15 +108,13 @@ impl Worker {
});
let (internal_channels, external_channels) = create_channels();
- {
- let mut state = state.borrow_mut();
- state.worker_channels_internal = Some(internal_channels);
- }
Self {
name,
isolate,
state,
+ waker: AtomicWaker::new(),
+ internal_channels,
external_channels,
}
}
@@ -174,7 +153,7 @@ impl Worker {
}
/// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WorkerChannelsExternal {
+ pub fn thread_safe_handle(&self) -> WorkerHandle {
self.external_channels.clone()
}
}
@@ -184,8 +163,7 @@ impl Future for Worker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
- let waker = AtomicWaker::new();
- waker.register(cx.waker());
+ inner.waker.register(cx.waker());
inner.isolate.poll_unpin(cx)
}
}
@@ -224,7 +202,7 @@ impl MainWorker {
ops::signal::init(isolate, &state);
ops::timers::init(isolate, &state);
ops::worker_host::init(isolate, &state);
- ops::web_worker::init(isolate, &state);
+ ops::web_worker::init(isolate, &state, &worker.internal_channels.sender);
}
Self(worker)
}
diff --git a/core/es_isolate.rs b/core/es_isolate.rs
index 295fe00ed..aeb6e318a 100644
--- a/core/es_isolate.rs
+++ b/core/es_isolate.rs
@@ -235,6 +235,51 @@ impl EsIsolate {
}
}
+ /// TODO(bartlomieju): copy-pasta to avoid problem with global handle attached
+ /// to ErrBox
+ pub fn mod_evaluate_dyn_import(
+ &mut self,
+ id: ModuleId,
+ ) -> Result<(), ErrBox> {
+ let isolate = self.core_isolate.v8_isolate.as_ref().unwrap();
+ let mut locker = v8::Locker::new(isolate);
+ let mut hs = v8::HandleScope::new(locker.enter());
+ let scope = hs.enter();
+ assert!(!self.core_isolate.global_context.is_empty());
+ let context = self.core_isolate.global_context.get(scope).unwrap();
+ let mut cs = v8::ContextScope::new(scope, context);
+ let scope = cs.enter();
+
+ let info = self.modules.get_info(id).expect("ModuleInfo not found");
+ let mut module = info.handle.get(scope).expect("Empty module handle");
+ let mut status = module.get_status();
+
+ if status == v8::ModuleStatus::Instantiated {
+ let ok = module.evaluate(scope, context).is_some();
+ // Update status after evaluating.
+ status = module.get_status();
+ if ok {
+ assert!(
+ status == v8::ModuleStatus::Evaluated
+ || status == v8::ModuleStatus::Errored
+ );
+ } else {
+ assert!(status == v8::ModuleStatus::Errored);
+ }
+ }
+
+ match status {
+ v8::ModuleStatus::Evaluated => Ok(()),
+ v8::ModuleStatus::Errored => {
+ let i = &mut self.core_isolate;
+ let exception = module.get_exception();
+ i.exception_to_err_result(scope, exception)
+ .map_err(|err| i.attach_handle_to_error(scope, err, exception))
+ }
+ other => panic!("Unexpected module status {:?}", other),
+ }
+ }
+
/// Evaluates an already instantiated ES module.
///
/// ErrBox can be downcast to a type that exposes additional information about
@@ -274,7 +319,6 @@ impl EsIsolate {
let i = &mut self.core_isolate;
let exception = module.get_exception();
i.exception_to_err_result(scope, exception)
- .map_err(|err| i.attach_handle_to_error(scope, err, exception))
}
other => panic!("Unexpected module status {:?}", other),
}
@@ -425,7 +469,7 @@ impl EsIsolate {
// Load is done.
let module_id = load.root_module_id.unwrap();
self.mod_instantiate(module_id)?;
- match self.mod_evaluate(module_id) {
+ match self.mod_evaluate_dyn_import(module_id) {
Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?,
Err(err) => self.dyn_import_error(dyn_import_id, err)?,
};