diff options
Diffstat (limited to 'runtime/ops/worker_host.rs')
-rw-r--r-- | runtime/ops/worker_host.rs | 125 |
1 files changed, 52 insertions, 73 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index f8d03850d..a5698fa6e 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -15,12 +15,11 @@ use crate::web_worker::run_web_worker; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; use crate::web_worker::WorkerEvent; +use crate::web_worker::WorkerId; use deno_core::error::custom_error; -use deno_core::error::generic_error; use deno_core::error::null_opbuf; use deno_core::error::AnyError; use deno_core::error::JsError; -use deno_core::futures::channel::mpsc; use deno_core::op_async; use deno_core::op_sync; use deno_core::serde::de; @@ -28,7 +27,6 @@ use deno_core::serde::de::SeqAccess; use deno_core::serde::Deserialize; use deno_core::serde::Deserializer; use deno_core::serde_json::json; -use deno_core::serde_json::Value; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; @@ -46,7 +44,7 @@ use std::thread::JoinHandle; pub struct CreateWebWorkerArgs { pub name: String, - pub worker_id: u32, + pub worker_id: WorkerId, pub parent_permissions: Permissions, pub permissions: Permissions, pub main_module: ModuleSpecifier, @@ -68,13 +66,9 @@ pub struct WorkerThread { worker_handle: WebWorkerHandle, } -pub type WorkersTable = HashMap<u32, WorkerThread>; -pub type WorkerId = u32; +pub type WorkersTable = HashMap<WorkerId, WorkerThread>; -pub fn init( - is_main_worker: bool, - create_web_worker_cb: Arc<CreateWebWorkerCb>, -) -> Extension { +pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension { Extension::builder() .state(move |state| { state.put::<WorkersTable>(WorkersTable::default()); @@ -94,20 +88,6 @@ pub fn init( ), ("op_host_post_message", op_sync(op_host_post_message)), ("op_host_get_message", op_async(op_host_get_message)), - ( - "op_host_unhandled_error", - op_sync(move |state, message: String, _: ()| { - if is_main_worker { - return Err(generic_error("Cannot be called from main worker.")); - } - - let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone(); - sender - .try_send(WorkerEvent::Error(generic_error(message))) - .expect("Failed to propagate error event to parent worker"); - Ok(true) - }), - ), ]) .build() } @@ -473,7 +453,7 @@ fn op_create_worker( let worker_id = state.take::<WorkerId>(); let create_module_loader = state.take::<CreateWebWorkerCbHolder>(); state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone()); - state.put::<WorkerId>(worker_id + 1); + state.put::<WorkerId>(worker_id.next().unwrap()); let module_specifier = deno_core::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); @@ -483,7 +463,7 @@ fn op_create_worker( // Setup new thread let thread_builder = - std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); + std::thread::Builder::new().name(format!("{}", worker_id)); // Spawn it let join_handle = thread_builder.spawn(move || { @@ -501,7 +481,7 @@ fn op_create_worker( use_deno_namespace, }); - // Send thread safe handle to newly created worker to host thread + // Send thread safe handle from newly created worker to host thread handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); @@ -512,6 +492,7 @@ fn op_create_worker( run_web_worker(worker, module_specifier, maybe_source_code) })?; + // Receive WebWorkerHandle from newly created worker let worker_handle = handle_receiver.recv().unwrap()?; let worker_thread = WorkerThread { @@ -534,7 +515,7 @@ fn op_host_terminate_worker( id: WorkerId, _: (), ) -> Result<(), AnyError> { - let worker_thread = state + let mut worker_thread = state .borrow_mut::<WorkersTable>() .remove(&id) .expect("No worker handle found"); @@ -547,54 +528,53 @@ fn op_host_terminate_worker( Ok(()) } -fn serialize_worker_event(event: WorkerEvent) -> Value { - match event { - WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), - WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() { - Ok(js_error) => json!({ - "type": "terminalError", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "terminalError", - "error": { - "message": error.to_string(), - } - }), - }, - WorkerEvent::Error(error) => match error.downcast::<JsError>() { - Ok(js_error) => json!({ - "type": "error", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "error", - "error": { - "message": error.to_string(), - } - }), - }, +use deno_core::serde::Serialize; +use deno_core::serde::Serializer; + +impl Serialize for WorkerEvent { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let type_id = match &self { + WorkerEvent::Message(_) => 0_i32, + WorkerEvent::TerminalError(_) => 1_i32, + WorkerEvent::Error(_) => 2_i32, + WorkerEvent::Close => 3_i32, + }; + + match self { + WorkerEvent::Message(buf) => { + Serialize::serialize(&(type_id, buf), serializer) + } + WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => { + let value = match error.downcast_ref::<JsError>() { + Some(js_error) => json!({ + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + }), + None => json!({ + "message": error.to_string(), + }), + }; + + Serialize::serialize(&(type_id, value), serializer) + } + _ => Serialize::serialize(&(type_id, ()), serializer), + } } } /// Try to remove worker from workers table - NOTE: `Worker.terminate()` /// might have been called already meaning that we won't find worker in /// table - in that case ignore. -fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) { +fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) { let mut s = state.borrow_mut(); let workers = s.borrow_mut::<WorkersTable>(); if let Some(mut worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.sender.close_channel(); + worker_thread.worker_handle.terminate(); worker_thread .join_handle .join() @@ -608,7 +588,7 @@ async fn op_host_get_message( state: Rc<RefCell<OpState>>, id: WorkerId, _: (), -) -> Result<Value, AnyError> { +) -> Result<WorkerEvent, AnyError> { let worker_handle = { let s = state.borrow(); let workers_table = s.borrow::<WorkersTable>(); @@ -617,7 +597,7 @@ async fn op_host_get_message( handle.worker_handle.clone() } else { // If handle was not found it means worker has already shutdown - return Ok(json!({ "type": "close" })); + return Ok(WorkerEvent::Close); } }; @@ -627,12 +607,12 @@ async fn op_host_get_message( if let WorkerEvent::TerminalError(_) = &event { try_remove_and_close(state, id); } - return Ok(serialize_worker_event(event)); + return Ok(event); } // If there was no event from worker it means it has already been closed. try_remove_and_close(state, id); - Ok(json!({ "type": "close" })) + Ok(WorkerEvent::Close) } /// Post message to guest worker as host @@ -641,8 +621,7 @@ fn op_host_post_message( id: WorkerId, data: Option<ZeroCopyBuf>, ) -> Result<(), AnyError> { - let data = data.ok_or_else(null_opbuf)?; - let msg = Vec::from(&*data).into_boxed_slice(); + let msg = data.ok_or_else(null_opbuf)?; debug!("post message to worker {}", id); let worker_thread = state |