diff options
| author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-13 19:45:53 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-12-13 19:45:53 +0100 |
| commit | 2e74f164b6dcf0ecbf8dd38fba9fae550d784bd0 (patch) | |
| tree | 61abe8e09d5331ace5d9de529f0e2737a8e05dbb /cli/ops/worker_host.rs | |
| parent | 84ef9bd21fb48fb6b5fbc8dafc3de9f361bade3b (diff) | |
refactor: deno_runtime crate (#8640)
This commit moves Deno JS runtime, ops, permissions and
inspector implementation to new "deno_runtime" crate located
in "runtime/" directory.
Details in "runtime/README.md".
Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'cli/ops/worker_host.rs')
| -rw-r--r-- | cli/ops/worker_host.rs | 318 |
1 files changed, 0 insertions, 318 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs deleted file mode 100644 index 871e4b9fe..000000000 --- a/cli/ops/worker_host.rs +++ /dev/null @@ -1,318 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::permissions::Permissions; -use crate::web_worker::run_web_worker; -use crate::web_worker::WebWorker; -use crate::web_worker::WebWorkerHandle; -use crate::web_worker::WorkerEvent; -use deno_core::error::generic_error; -use deno_core::error::AnyError; -use deno_core::error::JsError; -use deno_core::futures::channel::mpsc; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::ModuleSpecifier; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::cell::RefCell; -use std::collections::HashMap; -use std::convert::From; -use std::rc::Rc; -use std::sync::Arc; -use std::thread::JoinHandle; - -pub struct CreateWebWorkerArgs { - pub name: String, - pub worker_id: u32, - pub permissions: Permissions, - pub main_module: ModuleSpecifier, - pub use_deno_namespace: bool, -} - -pub type CreateWebWorkerCb = - dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send; - -/// A holder for callback that is used to create a new -/// WebWorker. It's a struct instead of a type alias -/// because `GothamState` used in `OpState` overrides -/// value if type alises have the same underlying type -#[derive(Clone)] -pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>); - -#[derive(Deserialize)] -struct HostUnhandledErrorArgs { - message: String, -} - -pub fn init( - rt: &mut deno_core::JsRuntime, - sender: Option<mpsc::Sender<WorkerEvent>>, - create_web_worker_cb: Arc<CreateWebWorkerCb>, -) { - { - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - state.put::<WorkersTable>(WorkersTable::default()); - state.put::<WorkerId>(WorkerId::default()); - - let create_module_loader = CreateWebWorkerCbHolder(create_web_worker_cb); - state.put::<CreateWebWorkerCbHolder>(create_module_loader); - } - super::reg_json_sync(rt, "op_create_worker", op_create_worker); - super::reg_json_sync( - rt, - "op_host_terminate_worker", - op_host_terminate_worker, - ); - super::reg_json_sync(rt, "op_host_post_message", op_host_post_message); - super::reg_json_async(rt, "op_host_get_message", op_host_get_message); - super::reg_json_sync( - rt, - "op_host_unhandled_error", - move |_state, args, _zero_copy| { - if let Some(mut sender) = sender.clone() { - let args: HostUnhandledErrorArgs = serde_json::from_value(args)?; - sender - .try_send(WorkerEvent::Error(generic_error(args.message))) - .expect("Failed to propagate error event to parent worker"); - Ok(json!(true)) - } else { - Err(generic_error("Cannot be called from main worker.")) - } - }, - ); -} - -pub struct WorkerThread { - join_handle: JoinHandle<Result<(), AnyError>>, - worker_handle: WebWorkerHandle, -} - -pub type WorkersTable = HashMap<u32, WorkerThread>; -pub type WorkerId = u32; - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateWorkerArgs { - name: Option<String>, - specifier: String, - has_source_code: bool, - source_code: String, - use_deno_namespace: bool, -} - -/// Create worker as the host -fn op_create_worker( - state: &mut OpState, - args: Value, - _data: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: CreateWorkerArgs = serde_json::from_value(args)?; - - let specifier = args.specifier.clone(); - let maybe_source_code = if args.has_source_code { - Some(args.source_code.clone()) - } else { - None - }; - let args_name = args.name; - let use_deno_namespace = args.use_deno_namespace; - if use_deno_namespace { - super::check_unstable(state, "Worker.deno"); - } - let permissions = state.borrow::<Permissions>().clone(); - let worker_id = state.take::<WorkerId>(); - let create_module_loader = state.take::<CreateWebWorkerCbHolder>(); - state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone()); - state.put::<WorkerId>(worker_id + 1); - - let module_specifier = ModuleSpecifier::resolve_url(&specifier)?; - let worker_name = args_name.unwrap_or_else(|| "".to_string()); - - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1); - - // Setup new thread - let thread_builder = - std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); - - // Spawn it - let join_handle = thread_builder.spawn(move || { - // Any error inside this block is terminal: - // - JS worker is useless - meaning it throws an exception and can't do anything else, - // all action done upon it should be noops - // - newly spawned thread exits - - let worker = (create_module_loader.0)(CreateWebWorkerArgs { - name: worker_name, - worker_id, - permissions, - main_module: module_specifier.clone(), - use_deno_namespace, - }); - - // Send thread safe handle to newly created worker to host thread - handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); - drop(handle_sender); - - // At this point the only method of communication with host - // is using `worker.internal_channels`. - // - // Host can already push messages and interact with worker. - run_web_worker(worker, module_specifier, maybe_source_code) - })?; - - let worker_handle = handle_receiver.recv().unwrap()?; - - let worker_thread = WorkerThread { - join_handle, - worker_handle, - }; - - // At this point all interactions with worker happen using thread - // safe handler returned from previous function calls - state - .borrow_mut::<WorkersTable>() - .insert(worker_id, worker_thread); - - Ok(json!({ "id": worker_id })) -} - -#[derive(Deserialize)] -struct WorkerArgs { - id: i32, -} - -fn op_host_terminate_worker( - state: &mut OpState, - args: Value, - _data: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let worker_thread = state - .borrow_mut::<WorkersTable>() - .remove(&id) - .expect("No worker handle found"); - worker_thread.worker_handle.terminate(); - worker_thread - .join_handle - .join() - .expect("Panic in worker thread") - .expect("Panic in worker event loop"); - Ok(json!({})) -} - -fn serialize_worker_event(event: WorkerEvent) -> Value { - match event { - WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), - WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() { - Ok(js_error) => json!({ - "type": "terminalError", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "terminalError", - "error": { - "message": error.to_string(), - } - }), - }, - WorkerEvent::Error(error) => match error.downcast::<JsError>() { - Ok(js_error) => json!({ - "type": "error", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "error", - "error": { - "message": error.to_string(), - } - }), - }, - } -} - -/// Try to remove worker from workers table - NOTE: `Worker.terminate()` -/// might have been called already meaning that we won't find worker in -/// table - in that case ignore. -fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) { - let mut s = state.borrow_mut(); - let workers = s.borrow_mut::<WorkersTable>(); - if let Some(mut worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.sender.close_channel(); - worker_thread - .join_handle - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); - } -} - -/// Get message from guest worker as host -async fn op_host_get_message( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - - let worker_handle = { - let s = state.borrow(); - let workers_table = s.borrow::<WorkersTable>(); - let maybe_handle = workers_table.get(&id); - if let Some(handle) = maybe_handle { - handle.worker_handle.clone() - } else { - // If handle was not found it means worker has already shutdown - return Ok(json!({ "type": "close" })); - } - }; - - let maybe_event = worker_handle.get_event().await?; - if let Some(event) = maybe_event { - // Terminal error means that worker should be removed from worker table. - if let WorkerEvent::TerminalError(_) = &event { - try_remove_and_close(state, id); - } - return Ok(serialize_worker_event(event)); - } - - // If there was no event from worker it means it has already been closed. - try_remove_and_close(state, id); - Ok(json!({ "type": "close" })) -} - -/// Post message to guest worker as host -fn op_host_post_message( - state: &mut OpState, - args: Value, - data: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - assert_eq!(data.len(), 1, "Invalid number of arguments"); - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let msg = Vec::from(&*data[0]).into_boxed_slice(); - - debug!("post message to worker {}", id); - let worker_thread = state - .borrow::<WorkersTable>() - .get(&id) - .expect("No worker handle found"); - worker_thread.worker_handle.post_message(msg)?; - Ok(json!({})) -} |
