summaryrefslogtreecommitdiff
path: root/runtime/ops/worker_host.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/worker_host.rs')
-rw-r--r--runtime/ops/worker_host.rs125
1 files changed, 52 insertions, 73 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index f8d03850d..a5698fa6e 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::web_worker::WorkerEvent;
+use crate::web_worker::WorkerId;
use deno_core::error::custom_error;
-use deno_core::error::generic_error;
use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
use deno_core::error::JsError;
-use deno_core::futures::channel::mpsc;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::de;
@@ -28,7 +27,6 @@ use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer;
use deno_core::serde_json::json;
-use deno_core::serde_json::Value;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
@@ -46,7 +44,7 @@ use std::thread::JoinHandle;
pub struct CreateWebWorkerArgs {
pub name: String,
- pub worker_id: u32,
+ pub worker_id: WorkerId,
pub parent_permissions: Permissions,
pub permissions: Permissions,
pub main_module: ModuleSpecifier,
@@ -68,13 +66,9 @@ pub struct WorkerThread {
worker_handle: WebWorkerHandle,
}
-pub type WorkersTable = HashMap<u32, WorkerThread>;
-pub type WorkerId = u32;
+pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
-pub fn init(
- is_main_worker: bool,
- create_web_worker_cb: Arc<CreateWebWorkerCb>,
-) -> Extension {
+pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
Extension::builder()
.state(move |state| {
state.put::<WorkersTable>(WorkersTable::default());
@@ -94,20 +88,6 @@ pub fn init(
),
("op_host_post_message", op_sync(op_host_post_message)),
("op_host_get_message", op_async(op_host_get_message)),
- (
- "op_host_unhandled_error",
- op_sync(move |state, message: String, _: ()| {
- if is_main_worker {
- return Err(generic_error("Cannot be called from main worker."));
- }
-
- let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
- sender
- .try_send(WorkerEvent::Error(generic_error(message)))
- .expect("Failed to propagate error event to parent worker");
- Ok(true)
- }),
- ),
])
.build()
}
@@ -473,7 +453,7 @@ fn op_create_worker(
let worker_id = state.take::<WorkerId>();
let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
- state.put::<WorkerId>(worker_id + 1);
+ state.put::<WorkerId>(worker_id.next().unwrap());
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
@@ -483,7 +463,7 @@ fn op_create_worker(
// Setup new thread
let thread_builder =
- std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
+ std::thread::Builder::new().name(format!("{}", worker_id));
// Spawn it
let join_handle = thread_builder.spawn(move || {
@@ -501,7 +481,7 @@ fn op_create_worker(
use_deno_namespace,
});
- // Send thread safe handle to newly created worker to host thread
+ // Send thread safe handle from newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
@@ -512,6 +492,7 @@ fn op_create_worker(
run_web_worker(worker, module_specifier, maybe_source_code)
})?;
+ // Receive WebWorkerHandle from newly created worker
let worker_handle = handle_receiver.recv().unwrap()?;
let worker_thread = WorkerThread {
@@ -534,7 +515,7 @@ fn op_host_terminate_worker(
id: WorkerId,
_: (),
) -> Result<(), AnyError> {
- let worker_thread = state
+ let mut worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
@@ -547,54 +528,53 @@ fn op_host_terminate_worker(
Ok(())
}
-fn serialize_worker_event(event: WorkerEvent) -> Value {
- match event {
- WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
- WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() {
- Ok(js_error) => json!({
- "type": "terminalError",
- "error": {
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }
- }),
- Err(error) => json!({
- "type": "terminalError",
- "error": {
- "message": error.to_string(),
- }
- }),
- },
- WorkerEvent::Error(error) => match error.downcast::<JsError>() {
- Ok(js_error) => json!({
- "type": "error",
- "error": {
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }
- }),
- Err(error) => json!({
- "type": "error",
- "error": {
- "message": error.to_string(),
- }
- }),
- },
+use deno_core::serde::Serialize;
+use deno_core::serde::Serializer;
+
+impl Serialize for WorkerEvent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let type_id = match &self {
+ WorkerEvent::Message(_) => 0_i32,
+ WorkerEvent::TerminalError(_) => 1_i32,
+ WorkerEvent::Error(_) => 2_i32,
+ WorkerEvent::Close => 3_i32,
+ };
+
+ match self {
+ WorkerEvent::Message(buf) => {
+ Serialize::serialize(&(type_id, buf), serializer)
+ }
+ WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
+ let value = match error.downcast_ref::<JsError>() {
+ Some(js_error) => json!({
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }),
+ None => json!({
+ "message": error.to_string(),
+ }),
+ };
+
+ Serialize::serialize(&(type_id, value), serializer)
+ }
+ _ => Serialize::serialize(&(type_id, ()), serializer),
+ }
}
}
/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in
/// table - in that case ignore.
-fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) {
+fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
if let Some(mut worker_thread) = workers.remove(&id) {
- worker_thread.worker_handle.sender.close_channel();
+ worker_thread.worker_handle.terminate();
worker_thread
.join_handle
.join()
@@ -608,7 +588,7 @@ async fn op_host_get_message(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
-) -> Result<Value, AnyError> {
+) -> Result<WorkerEvent, AnyError> {
let worker_handle = {
let s = state.borrow();
let workers_table = s.borrow::<WorkersTable>();
@@ -617,7 +597,7 @@ async fn op_host_get_message(
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
- return Ok(json!({ "type": "close" }));
+ return Ok(WorkerEvent::Close);
}
};
@@ -627,12 +607,12 @@ async fn op_host_get_message(
if let WorkerEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
- return Ok(serialize_worker_event(event));
+ return Ok(event);
}
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
- Ok(json!({ "type": "close" }))
+ Ok(WorkerEvent::Close)
}
/// Post message to guest worker as host
@@ -641,8 +621,7 @@ fn op_host_post_message(
id: WorkerId,
data: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
- let data = data.ok_or_else(null_opbuf)?;
- let msg = Vec::from(&*data).into_boxed_slice();
+ let msg = data.ok_or_else(null_opbuf)?;
debug!("post message to worker {}", id);
let worker_thread = state