summaryrefslogtreecommitdiff
path: root/runtime/ops
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops')
-rw-r--r--runtime/ops/web_worker.rs92
-rw-r--r--runtime/ops/worker_host.rs125
2 files changed, 120 insertions, 97 deletions
diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs
index 1689b2587..e3ede869d 100644
--- a/runtime/ops/web_worker.rs
+++ b/runtime/ops/web_worker.rs
@@ -1,41 +1,85 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::web_worker::WebWorkerHandle;
+use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WorkerEvent;
+use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
-use deno_core::futures::channel::mpsc;
+use deno_core::error::AnyError;
+use deno_core::op_async;
use deno_core::op_sync;
use deno_core::Extension;
+use deno_core::OpState;
use deno_core::ZeroCopyBuf;
+use std::cell::RefCell;
+use std::rc::Rc;
pub fn init() -> Extension {
Extension::builder()
.ops(vec![
- (
- "op_worker_post_message",
- op_sync(move |state, _args: (), buf: Option<ZeroCopyBuf>| {
- let buf = buf.ok_or_else(null_opbuf)?;
- let msg_buf: Box<[u8]> = (*buf).into();
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender
- .try_send(WorkerEvent::Message(msg_buf))
- .expect("Failed to post message to host");
- Ok(())
- }),
- ),
+ ("op_worker_post_message", op_sync(op_worker_post_message)),
+ ("op_worker_get_message", op_async(op_worker_get_message)),
// Notify host that guest worker closes.
+ ("op_worker_close", op_sync(op_worker_close)),
+ // Notify host that guest worker has unhandled error.
(
- "op_worker_close",
- op_sync(move |state, _: (), _: ()| {
- // Notify parent that we're finished
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender.close_channel();
- // Terminate execution of current worker
- let handle = state.borrow::<WebWorkerHandle>();
- handle.terminate();
- Ok(())
- }),
+ "op_worker_unhandled_error",
+ op_sync(op_worker_unhandled_error),
),
])
.build()
}
+
+fn op_worker_post_message(
+ state: &mut OpState,
+ _: (),
+ buf: Option<ZeroCopyBuf>,
+) -> Result<(), AnyError> {
+ let buf = buf.ok_or_else(null_opbuf)?;
+ let handle = state.borrow::<WebWorkerInternalHandle>().clone();
+ handle
+ .post_event(WorkerEvent::Message(buf))
+ .expect("Failed to post message to host");
+ Ok(())
+}
+
+async fn op_worker_get_message(
+ state: Rc<RefCell<OpState>>,
+ _: (),
+ _: (),
+) -> Result<ZeroCopyBuf, AnyError> {
+ let temp = {
+ let a = state.borrow();
+ a.borrow::<WebWorkerInternalHandle>().clone()
+ };
+
+ let maybe_data = temp.get_message().await;
+
+ Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty))
+}
+
+#[allow(clippy::unnecessary_wraps)]
+fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> {
+ // Notify parent that we're finished
+ let mut handle = state.borrow_mut::<WebWorkerInternalHandle>().clone();
+
+ handle.terminate();
+ Ok(())
+}
+
+/// A worker that encounters an uncaught error will pass this error
+/// to its parent worker using this op. The parent worker will use
+/// this same op to pass the error to its own parent (in case
+/// `e.preventDefault()` was not called in `worker.onerror`). This
+/// is done until the error reaches the root/ main worker.
+#[allow(clippy::unnecessary_wraps)]
+fn op_worker_unhandled_error(
+ state: &mut OpState,
+ message: String,
+ _: (),
+) -> Result<(), AnyError> {
+ let sender = state.borrow::<WebWorkerInternalHandle>().clone();
+ sender
+ .post_event(WorkerEvent::Error(generic_error(message)))
+ .expect("Failed to propagate error event to parent worker");
+ Ok(())
+}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index f8d03850d..a5698fa6e 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WorkerEvent;
+use crate::web_worker::WorkerId;
use deno_core::error::custom_error;
-use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
use deno_core::error::JsError;
-use deno_core::futures::channel::mpsc;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::de;
@@ -28,7 +27,6 @@ use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer;
use deno_core::serde_json::json;
-use deno_core::serde_json::Value;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
@@ -46,7 +44,7 @@ use std::thread::JoinHandle;
pub struct CreateWebWorkerArgs {
pub name: String,
- pub worker_id: u32,
+ pub worker_id: WorkerId,
pub parent_permissions: Permissions,
pub permissions: Permissions,
pub main_module: ModuleSpecifier,
@@ -68,13 +66,9 @@ pub struct WorkerThread {
worker_handle: WebWorkerHandle,
}
-pub type WorkersTable = HashMap<u32, WorkerThread>;
-pub type WorkerId = u32;
+pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
-pub fn init(
- is_main_worker: bool,
- create_web_worker_cb: Arc<CreateWebWorkerCb>,
-) -> Extension {
+pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
Extension::builder()
.state(move |state| {
state.put::<WorkersTable>(WorkersTable::default());
@@ -94,20 +88,6 @@ pub fn init(
),
("op_host_post_message", op_sync(op_host_post_message)),
("op_host_get_message", op_async(op_host_get_message)),
- (
- "op_host_unhandled_error",
- op_sync(move |state, message: String, _: ()| {
- if is_main_worker {
- return Err(generic_error("Cannot be called from main worker."));
- }
-
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender
- .try_send(WorkerEvent::Error(generic_error(message)))
- .expect("Failed to propagate error event to parent worker");
- Ok(true)
- }),
- ),
])
.build()
}
@@ -473,7 +453,7 @@ fn op_create_worker(
let worker_id = state.take::<WorkerId>();
let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
- state.put::<WorkerId>(worker_id + 1);
+ state.put::<WorkerId>(worker_id.next().unwrap());
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
@@ -483,7 +463,7 @@ fn op_create_worker(
// Setup new thread
let thread_builder =
- std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
+ std::thread::Builder::new().name(format!("{}", worker_id));
// Spawn it
let join_handle = thread_builder.spawn(move || {
@@ -501,7 +481,7 @@ fn op_create_worker(
use_deno_namespace,
});
- // Send thread safe handle to newly created worker to host thread
+ // Send thread safe handle from newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
@@ -512,6 +492,7 @@ fn op_create_worker(
run_web_worker(worker, module_specifier, maybe_source_code)
})?;
+ // Receive WebWorkerHandle from newly created worker
let worker_handle = handle_receiver.recv().unwrap()?;
let worker_thread = WorkerThread {
@@ -534,7 +515,7 @@ fn op_host_terminate_worker(
id: WorkerId,
_: (),
) -> Result<(), AnyError> {
- let worker_thread = state
+ let mut worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
@@ -547,54 +528,53 @@ fn op_host_terminate_worker(
Ok(())
}
-fn serialize_worker_event(event: WorkerEvent) -> Value {
- match event {
- WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
- WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() {
- Ok(js_error) => json!({
- "type": "terminalError",
- "error": {
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }
- }),
- Err(error) => json!({
- "type": "terminalError",
- "error": {
- "message": error.to_string(),
- }
- }),
- },
- WorkerEvent::Error(error) => match error.downcast::<JsError>() {
- Ok(js_error) => json!({
- "type": "error",
- "error": {
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }
- }),
- Err(error) => json!({
- "type": "error",
- "error": {
- "message": error.to_string(),
- }
- }),
- },
+use deno_core::serde::Serialize;
+use deno_core::serde::Serializer;
+
+impl Serialize for WorkerEvent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let type_id = match &self {
+ WorkerEvent::Message(_) => 0_i32,
+ WorkerEvent::TerminalError(_) => 1_i32,
+ WorkerEvent::Error(_) => 2_i32,
+ WorkerEvent::Close => 3_i32,
+ };
+
+ match self {
+ WorkerEvent::Message(buf) => {
+ Serialize::serialize(&(type_id, buf), serializer)
+ }
+ WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
+ let value = match error.downcast_ref::<JsError>() {
+ Some(js_error) => json!({
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }),
+ None => json!({
+ "message": error.to_string(),
+ }),
+ };
+
+ Serialize::serialize(&(type_id, value), serializer)
+ }
+ _ => Serialize::serialize(&(type_id, ()), serializer),
+ }
}
}
/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in
/// table - in that case ignore.
-fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) {
+fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
if let Some(mut worker_thread) = workers.remove(&id) {
- worker_thread.worker_handle.sender.close_channel();
+ worker_thread.worker_handle.terminate();
worker_thread
.join_handle
.join()
@@ -608,7 +588,7 @@ async fn op_host_get_message(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
-) -> Result<Value, AnyError> {
+) -> Result<WorkerEvent, AnyError> {
let worker_handle = {
let s = state.borrow();
let workers_table = s.borrow::<WorkersTable>();
@@ -617,7 +597,7 @@ async fn op_host_get_message(
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
- return Ok(json!({ "type": "close" }));
+ return Ok(WorkerEvent::Close);
}
};
@@ -627,12 +607,12 @@ async fn op_host_get_message(
if let WorkerEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
- return Ok(serialize_worker_event(event));
+ return Ok(event);
}
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
- Ok(json!({ "type": "close" }))
+ Ok(WorkerEvent::Close)
}
/// Post message to guest worker as host
@@ -641,8 +621,7 @@ fn op_host_post_message(
id: WorkerId,
data: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
- let data = data.ok_or_else(null_opbuf)?;
- let msg = Vec::from(&*data).into_boxed_slice();
+ let msg = data.ok_or_else(null_opbuf)?;
debug!("post message to worker {}", id);
let worker_thread = state