summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
Diffstat (limited to 'cli')
-rw-r--r--cli/fmt_errors.rs6
-rw-r--r--cli/js/dispatch.ts5
-rw-r--r--cli/js/globals.ts1
-rw-r--r--cli/js/lib.deno_runtime.d.ts6
-rw-r--r--cli/js/workers.ts139
-rw-r--r--cli/ops/workers.rs148
-rw-r--r--cli/state.rs2
-rw-r--r--cli/tests/026_workers.ts6
-rw-r--r--cli/tests/026_workers.ts.out3
-rw-r--r--cli/tests/subdir/test_worker.js12
-rw-r--r--cli/worker.rs23
11 files changed, 291 insertions, 60 deletions
diff --git a/cli/fmt_errors.rs b/cli/fmt_errors.rs
index 83e417fe2..9979eeb29 100644
--- a/cli/fmt_errors.rs
+++ b/cli/fmt_errors.rs
@@ -151,6 +151,12 @@ impl JSError {
}
}
+impl Into<V8Exception> for JSError {
+ fn into(self) -> V8Exception {
+ self.0
+ }
+}
+
impl DisplayFormatter for JSError {
fn format_category_and_code(&self) -> String {
"".to_string()
diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts
index d658c5de7..7e6709dc6 100644
--- a/cli/js/dispatch.ts
+++ b/cli/js/dispatch.ts
@@ -42,8 +42,11 @@ export let OP_QUERY_PERMISSION: number;
export let OP_REVOKE_PERMISSION: number;
export let OP_REQUEST_PERMISSION: number;
export let OP_CREATE_WORKER: number;
-export let OP_HOST_GET_WORKER_CLOSED: number;
+export let OP_HOST_GET_WORKER_LOADED: number;
export let OP_HOST_POST_MESSAGE: number;
+export let OP_HOST_POLL_WORKER: number;
+export let OP_HOST_CLOSE_WORKER: number;
+export let OP_HOST_RESUME_WORKER: number;
export let OP_HOST_GET_MESSAGE: number;
export let OP_WORKER_POST_MESSAGE: number;
export let OP_WORKER_GET_MESSAGE: number;
diff --git a/cli/js/globals.ts b/cli/js/globals.ts
index c7f3b23f2..f090afcd4 100644
--- a/cli/js/globals.ts
+++ b/cli/js/globals.ts
@@ -150,6 +150,7 @@ window.performance = new performanceUtil.Performance();
// This variable functioning correctly depends on `declareAsLet`
// in //tools/ts_library_builder/main.ts
window.onmessage = workers.onmessage;
+window.onerror = workers.onerror;
window.workerMain = workers.workerMain;
window.workerClose = workers.workerClose;
diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts
index 8740af062..109c0367d 100644
--- a/cli/js/lib.deno_runtime.d.ts
+++ b/cli/js/lib.deno_runtime.d.ts
@@ -1986,6 +1986,7 @@ declare interface Window {
Response: typeof __fetch.Response;
performance: __performanceUtil.Performance;
onmessage: (e: { data: any }) => void;
+ onerror: undefined | typeof onerror;
workerMain: typeof __workers.workerMain;
workerClose: typeof __workers.workerClose;
postMessage: typeof __workers.postMessage;
@@ -2036,6 +2037,7 @@ declare const Request: __domTypes.RequestConstructor;
declare const Response: typeof __fetch.Response;
declare const performance: __performanceUtil.Performance;
declare let onmessage: (e: { data: any }) => void;
+declare let onerror: (e: Event) => void;
declare const workerMain: typeof __workers.workerMain;
declare const workerClose: typeof __workers.workerClose;
declare const postMessage: typeof __workers.postMessage;
@@ -3293,7 +3295,7 @@ declare namespace __workers {
export function workerClose(): void;
export function workerMain(): Promise<void>;
export interface Worker {
- onerror?: () => void;
+ onerror?: (e: Event) => void;
onmessage?: (e: { data: any }) => void;
onmessageerror?: () => void;
postMessage(data: any): void;
@@ -3311,7 +3313,7 @@ declare namespace __workers {
private readonly id;
private isClosing;
private readonly isClosedPromise;
- onerror?: () => void;
+ onerror?: (e: Event) => void;
onmessage?: (data: any) => void;
onmessageerror?: () => void;
constructor(specifier: string, options?: DenoWorkerOptions);
diff --git a/cli/js/workers.ts b/cli/js/workers.ts
index 27f873100..d1d8f78e2 100644
--- a/cli/js/workers.ts
+++ b/cli/js/workers.ts
@@ -2,11 +2,12 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as dispatch from "./dispatch.ts";
import { sendAsync, sendSync } from "./dispatch_json.ts";
-import { log } from "./util.ts";
+import { log, createResolvable, Resolvable } from "./util.ts";
import { TextDecoder, TextEncoder } from "./text_encoding.ts";
import { window } from "./window.ts";
import { blobURLMap } from "./url.ts";
import { blobBytesWeakMap } from "./blob.ts";
+import { EventTarget } from "./event_target.ts";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
@@ -26,7 +27,7 @@ function createWorker(
includeDenoNamespace: boolean,
hasSourceCode: boolean,
sourceCode: Uint8Array
-): number {
+): { id: number; loaded: boolean } {
return sendSync(dispatch.OP_CREATE_WORKER, {
specifier,
includeDenoNamespace,
@@ -35,8 +36,20 @@ function createWorker(
});
}
-async function hostGetWorkerClosed(id: number): Promise<void> {
- await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { id });
+async function hostGetWorkerLoaded(id: number): Promise<any> {
+ return await sendAsync(dispatch.OP_HOST_GET_WORKER_LOADED, { id });
+}
+
+async function hostPollWorker(id: number): Promise<any> {
+ return await sendAsync(dispatch.OP_HOST_POLL_WORKER, { id });
+}
+
+function hostCloseWorker(id: number): void {
+ sendSync(dispatch.OP_HOST_CLOSE_WORKER, { id });
+}
+
+function hostResumeWorker(id: number): void {
+ sendSync(dispatch.OP_HOST_RESUME_WORKER, { id });
}
function hostPostMessage(id: number, data: any): void {
@@ -56,6 +69,7 @@ async function hostGetMessage(id: number): Promise<any> {
// Stuff for workers
export const onmessage: (e: { data: any }) => void = (): void => {};
+export const onerror: (e: { data: any }) => void = (): void => {};
export function postMessage(data: any): void {
const dataIntArray = encodeMessage(data);
@@ -88,25 +102,41 @@ export async function workerMain(): Promise<void> {
break;
}
- if (window["onmessage"]) {
- const event = { data };
- const result: void | Promise<void> = window.onmessage(event);
+ let result: void | Promise<void>;
+ const event = { data };
+
+ try {
+ result = window.onmessage(event);
if (result && "then" in result) {
await result;
}
- }
-
- if (!window["onmessage"]) {
- break;
+ if (!window["onmessage"]) {
+ break;
+ }
+ } catch (e) {
+ if (window["onerror"]) {
+ const result = window.onerror(
+ e.message,
+ e.fileName,
+ e.lineNumber,
+ e.columnNumber,
+ e
+ );
+ if (result === true) {
+ continue;
+ }
+ }
+ throw e;
}
}
}
export interface Worker {
- onerror?: () => void;
+ onerror?: (e: any) => void;
onmessage?: (e: { data: any }) => void;
onmessageerror?: () => void;
postMessage(data: any): void;
+ // TODO(bartlomieju): remove this
closed: Promise<void>;
}
@@ -122,15 +152,18 @@ export interface DenoWorkerOptions extends WorkerOptions {
noDenoNamespace?: boolean;
}
-export class WorkerImpl implements Worker {
+export class WorkerImpl extends EventTarget implements Worker {
private readonly id: number;
private isClosing = false;
- private readonly isClosedPromise: Promise<void>;
- public onerror?: () => void;
+ private messageBuffer: any[] = [];
+ private ready = false;
+ private readonly isClosedPromise: Resolvable<void>;
+ public onerror?: (e: any) => void;
public onmessage?: (data: any) => void;
public onmessageerror?: () => void;
constructor(specifier: string, options?: DenoWorkerOptions) {
+ super();
let hasSourceCode = false;
let sourceCode = new Uint8Array();
@@ -152,24 +185,87 @@ export class WorkerImpl implements Worker {
sourceCode = blobBytes!;
}
- this.id = createWorker(
+ const { id, loaded } = createWorker(
specifier,
includeDenoNamespace,
hasSourceCode,
sourceCode
);
- this.run();
- this.isClosedPromise = hostGetWorkerClosed(this.id);
- this.isClosedPromise.then((): void => {
- this.isClosing = true;
- });
+ this.id = id;
+ this.ready = loaded;
+ this.isClosedPromise = createResolvable();
+ this.poll();
}
get closed(): Promise<void> {
return this.isClosedPromise;
}
+ private handleError(e: any): boolean {
+ const event = new window.Event("error", { cancelable: true });
+ event.message = e.message;
+ event.lineNumber = e.lineNumber ? e.lineNumber + 1 : null;
+ event.columnNumber = e.columnNumber ? e.columnNumber + 1 : null;
+ event.fileName = e.fileName;
+ event.error = null;
+
+ let handled = false;
+ if (this.onerror) {
+ this.onerror(event);
+ if (event.defaultPrevented) {
+ handled = true;
+ }
+ }
+
+ return handled;
+ }
+
+ async poll(): Promise<void> {
+ // If worker has not been immediately executed
+ // then let's await it's readiness
+ if (!this.ready) {
+ const result = await hostGetWorkerLoaded(this.id);
+
+ if (result.error) {
+ if (!this.handleError(result.error)) {
+ throw new Error(result.error.message);
+ }
+ return;
+ }
+ }
+
+ // drain messages
+ for (const data of this.messageBuffer) {
+ hostPostMessage(this.id, data);
+ }
+ this.messageBuffer = [];
+ this.ready = true;
+ this.run();
+
+ while (true) {
+ const result = await hostPollWorker(this.id);
+
+ if (result.error) {
+ if (!this.handleError(result.error)) {
+ throw Error(result.error.message);
+ } else {
+ hostResumeWorker(this.id);
+ }
+ } else {
+ this.isClosing = true;
+ hostCloseWorker(this.id);
+ this.isClosedPromise.resolve();
+ break;
+ }
+ }
+ }
+
postMessage(data: any): void {
+ if (!this.ready) {
+ this.messageBuffer.push(data);
+ return;
+ }
+
hostPostMessage(this.id, data);
}
@@ -180,7 +276,6 @@ export class WorkerImpl implements Worker {
log("worker got null message. quitting.");
break;
}
- // TODO(afinch7) stop this from eating messages before onmessage has been assigned
if (this.onmessage) {
const event = { data };
this.onmessage(event);
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 6ebaa141f..eeffb3930 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -4,12 +4,15 @@ use crate::deno_error::bad_resource;
use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
+use crate::deno_error::GetErrorKind;
+use crate::fmt_errors::JSError;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use deno_core::*;
use futures;
+use futures::channel::mpsc;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::sink::SinkExt;
@@ -19,7 +22,6 @@ use std::convert::From;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
-use std::sync::mpsc;
use std::task::Context;
use std::task::Poll;
@@ -29,8 +31,20 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
s.core_op(json_op(s.stateful_op(op_create_worker))),
);
i.register_op(
- "host_get_worker_closed",
- s.core_op(json_op(s.stateful_op(op_host_get_worker_closed))),
+ "host_get_worker_loaded",
+ s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))),
+ );
+ i.register_op(
+ "host_poll_worker",
+ s.core_op(json_op(s.stateful_op(op_host_poll_worker))),
+ );
+ i.register_op(
+ "host_close_worker",
+ s.core_op(json_op(s.stateful_op(op_host_close_worker))),
+ );
+ i.register_op(
+ "host_resume_worker",
+ s.core_op(json_op(s.stateful_op(op_host_resume_worker))),
);
i.register_op(
"host_post_message",
@@ -155,37 +169,36 @@ fn op_create_worker(
js_check(worker.execute("workerMain()"));
let worker_id = parent_state.add_child_worker(worker.clone());
- let response = json!(worker_id);
// Has provided source code, execute immediately.
if has_source_code {
js_check(worker.execute(&source_code));
- return Ok(JsonOp::Sync(response));
+ return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true})));
}
- // TODO(bartlomieju): this should spawn mod execution on separate tokio task
- // and block on receving message on a channel or even use sync channel /shrug
- let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
+ let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1);
+
+ // TODO(bartlomieju): this future should be spawned on the separate thread,
+ // dedicated to that worker
let fut = async move {
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
- sender.send(result).expect("Failed to send message");
+ sender.send(result).await.expect("Failed to send message");
}
.boxed();
tokio::spawn(fut);
-
- let result = receiver.recv().expect("Failed to receive message");
- result?;
- Ok(JsonOp::Sync(response))
+ let mut table = state.loading_workers.lock().unwrap();
+ table.insert(worker_id, receiver);
+ Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false})))
}
-struct GetWorkerClosedFuture {
+struct WorkerPollFuture {
state: ThreadSafeState,
rid: ResourceId,
}
-impl Future for GetWorkerClosedFuture {
+impl Future for WorkerPollFuture {
type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -203,39 +216,114 @@ impl Future for GetWorkerClosedFuture {
}
}
+fn serialize_worker_result(result: Result<(), ErrBox>) -> Value {
+ if let Err(error) = result {
+ match error.kind() {
+ ErrorKind::JSError => {
+ let error = error.downcast::<JSError>().unwrap();
+ let exception: V8Exception = error.into();
+ json!({"error": {
+ "message": exception.message,
+ "fileName": exception.script_resource_name,
+ "lineNumber": exception.line_number,
+ "columnNumber": exception.start_column,
+ }})
+ }
+ _ => json!({"error": {
+ "message": error.to_string(),
+ }}),
+ }
+ } else {
+ json!({"ok": true})
+ }
+}
+
#[derive(Deserialize)]
-struct HostGetWorkerClosedArgs {
+struct WorkerArgs {
id: i32,
}
-/// Return when the worker closes
-fn op_host_get_worker_closed(
+fn op_host_get_worker_loaded(
+ state: &ThreadSafeState,
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let mut table = state.loading_workers.lock().unwrap();
+ let mut receiver = table.remove(&id).unwrap();
+
+ let op = async move {
+ let result = receiver.next().await.unwrap();
+ Ok(serialize_worker_result(result))
+ };
+
+ Ok(JsonOp::Async(op.boxed()))
+}
+
+fn op_host_poll_worker(
state: &ThreadSafeState,
args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
- let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
+ let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state_ = state.clone();
- let future = GetWorkerClosedFuture {
+ let future = WorkerPollFuture {
state: state.clone(),
rid: id,
};
- let op = future.then(move |_result| {
- let mut workers_table = state_.workers.lock().unwrap();
- let maybe_worker = workers_table.remove(&id);
- if let Some(worker) = maybe_worker {
- let mut channels = worker.state.worker_channels.lock().unwrap();
- channels.sender.close_channel();
- channels.receiver.close();
- };
- futures::future::ok(json!({}))
- });
+ let op = async move {
+ let result = future.await;
+
+ if result.is_err() {
+ let mut workers_table = state_.workers.lock().unwrap();
+ let worker = workers_table.get_mut(&id).unwrap();
+ worker.clear_exception();
+ }
+
+ Ok(serialize_worker_result(result))
+ };
Ok(JsonOp::Async(op.boxed()))
}
+fn op_host_close_worker(
+ state: &ThreadSafeState,
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let state_ = state.clone();
+
+ let mut workers_table = state_.workers.lock().unwrap();
+ let maybe_worker = workers_table.remove(&id);
+ if let Some(worker) = maybe_worker {
+ let mut channels = worker.state.worker_channels.lock().unwrap();
+ channels.sender.close_channel();
+ channels.receiver.close();
+ };
+
+ Ok(JsonOp::Sync(json!({})))
+}
+
+fn op_host_resume_worker(
+ state: &ThreadSafeState,
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let state_ = state.clone();
+
+ let mut workers_table = state_.workers.lock().unwrap();
+ let worker = workers_table.get_mut(&id).unwrap();
+ js_check(worker.execute("workerMain()"));
+ Ok(JsonOp::Sync(json!({})))
+}
+
#[derive(Deserialize)]
struct HostGetMessageArgs {
id: i32,
diff --git a/cli/state.rs b/cli/state.rs
index a4974958b..ed7b8e438 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -53,6 +53,7 @@ pub struct State {
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<HashMap<u32, Worker>>,
+ pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
@@ -248,6 +249,7 @@ impl ThreadSafeState {
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(HashMap::new()),
+ loading_workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
seeded_rng,
diff --git a/cli/tests/026_workers.ts b/cli/tests/026_workers.ts
index f45fc4b77..7ac1a0f32 100644
--- a/cli/tests/026_workers.ts
+++ b/cli/tests/026_workers.ts
@@ -11,4 +11,10 @@ jsWorker.onmessage = (e): void => {
tsWorker.postMessage("Hello World");
};
+jsWorker.onerror = (e: Event): void => {
+ e.preventDefault();
+ console.log("called onerror in script");
+ jsWorker.postMessage("Hello World");
+};
+
jsWorker.postMessage("Hello World");
diff --git a/cli/tests/026_workers.ts.out b/cli/tests/026_workers.ts.out
index 7538cc867..92f7550ad 100644
--- a/cli/tests/026_workers.ts.out
+++ b/cli/tests/026_workers.ts.out
@@ -1,4 +1,7 @@
Hello World
+called onerror in worker
+called onerror in script
+Hello World
Received js: Hello World
Hello World
Received ts: Hello World
diff --git a/cli/tests/subdir/test_worker.js b/cli/tests/subdir/test_worker.js
index 53d38ba96..cec5bdf9b 100644
--- a/cli/tests/subdir/test_worker.js
+++ b/cli/tests/subdir/test_worker.js
@@ -1,7 +1,19 @@
+let thrown = false;
+
onmessage = function(e) {
console.log(e.data);
+ if (thrown === false) {
+ thrown = true;
+ throw new SyntaxError("[test error]");
+ }
+
postMessage(e.data);
workerClose();
};
+
+onerror = function() {
+ console.log("called onerror in worker");
+ return false;
+};
diff --git a/cli/worker.rs b/cli/worker.rs
index 2b335127f..7faf17e60 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -12,6 +12,7 @@ use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
+use futures::task::AtomicWaker;
use std::env;
use std::future::Future;
use std::pin::Pin;
@@ -159,6 +160,11 @@ impl Worker {
channels: self.external_channels.clone(),
}
}
+
+ pub fn clear_exception(&mut self) {
+ let mut isolate = self.isolate.try_lock().unwrap();
+ isolate.clear_exception();
+ }
}
impl Future for Worker {
@@ -166,8 +172,15 @@ impl Future for Worker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
- let mut isolate = inner.isolate.try_lock().unwrap();
- isolate.poll_unpin(cx)
+ let waker = AtomicWaker::new();
+ waker.register(cx.waker());
+ match inner.isolate.try_lock() {
+ Ok(mut isolate) => isolate.poll_unpin(cx),
+ Err(_) => {
+ waker.wake();
+ Poll::Pending
+ }
+ }
}
}
@@ -436,7 +449,7 @@ mod tests {
let worker_ = worker.clone();
let worker_future = async move {
- let result = worker.await;
+ let result = worker_.await;
println!("workers.rs after resource close");
result.unwrap();
}
@@ -446,10 +459,10 @@ mod tests {
tokio::spawn(worker_future_);
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = block_on(worker_.post_message(msg));
+ let r = block_on(worker.post_message(msg));
assert!(r.is_ok());
- block_on(worker_future);
+ block_on(worker_future)
})
}