diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-09-28 12:14:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-28 12:14:11 +0200 |
commit | 45d4fd44c9444241a898d3075b99e8871fccdd65 (patch) | |
tree | c6ac6bb880023326e935f96d56ef23ba891f92d3 /cli/web_worker.rs | |
parent | 8ceb165e5d1dc0c8d417e42ffc3a26e8f5a62a03 (diff) |
refactor: move op state registration to workers (#7696)
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r-- | cli/web_worker.rs | 375 |
1 files changed, 0 insertions, 375 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs deleted file mode 100644 index cb2a8b87e..000000000 --- a/cli/web_worker.rs +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::global_state::GlobalState; -use crate::js; -use crate::ops; -use crate::permissions::Permissions; -use crate::state::CliModuleLoader; -use crate::worker::Worker; -use crate::worker::WorkerEvent; -use crate::worker::WorkerHandle; -use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::future::FutureExt; -use deno_core::futures::stream::StreamExt; -use deno_core::v8; -use deno_core::ModuleSpecifier; -use std::future::Future; -use std::ops::Deref; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; - -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. -#[derive(Clone)] -pub struct WebWorkerHandle { - worker_handle: WorkerHandle, - terminate_tx: mpsc::Sender<()>, - terminated: Arc<AtomicBool>, - isolate_handle: v8::IsolateHandle, -} - -impl Deref for WebWorkerHandle { - type Target = WorkerHandle; - fn deref(&self) -> &Self::Target { - &self.worker_handle - } -} - -impl DerefMut for WebWorkerHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker_handle - } -} - -impl WebWorkerHandle { - pub fn terminate(&self) { - // This function can be called multiple times by whomever holds - // the handle. However only a single "termination" should occur so - // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); - - if !already_terminated { - self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); - } - } -} - -/// This worker is implementation of `Worker` Web API -/// -/// At the moment this type of worker supports only -/// communication with parent and creating new workers. -/// -/// Each `WebWorker` is either a child of `MainWorker` or other -/// `WebWorker`. -pub struct WebWorker { - worker: Worker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, - pub has_deno_namespace: bool, -} - -impl WebWorker { - pub fn new( - name: String, - permissions: Permissions, - main_module: ModuleSpecifier, - global_state: Arc<GlobalState>, - has_deno_namespace: bool, - ) -> Self { - let loader = CliModuleLoader::new_for_worker(); - let mut worker = Worker::new( - name, - Some(js::deno_isolate_init()), - permissions, - main_module, - global_state, - loader, - false, - ); - - let terminated = Arc::new(AtomicBool::new(false)); - let isolate_handle = worker.isolate.thread_safe_handle(); - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - - let handle = WebWorkerHandle { - worker_handle: worker.thread_safe_handle(), - terminated, - isolate_handle, - terminate_tx, - }; - - let mut web_worker = Self { - worker, - event_loop_idle: false, - terminate_rx, - handle, - has_deno_namespace, - }; - - { - ops::runtime::init(&mut web_worker.worker); - let sender = web_worker.worker.internal_channels.sender.clone(); - let handle = web_worker.thread_safe_handle(); - ops::web_worker::init(&mut web_worker.worker, sender, handle); - ops::worker_host::init(&mut web_worker.worker); - ops::reg_json_sync( - &mut web_worker.worker, - "op_domain_to_ascii", - deno_web::op_domain_to_ascii, - ); - ops::io::init(&mut web_worker.worker); - ops::reg_json_sync( - &mut web_worker.worker, - "op_close", - deno_core::op_close, - ); - ops::reg_json_sync( - &mut web_worker.worker, - "op_resources", - deno_core::op_resources, - ); - ops::errors::init(&mut web_worker.worker); - ops::timers::init(&mut web_worker.worker); - ops::fetch::init(&mut web_worker.worker); - ops::websocket::init(&mut web_worker.worker); - - if has_deno_namespace { - ops::runtime_compiler::init(&mut web_worker.worker); - ops::fs::init(&mut web_worker.worker); - ops::fs_events::init(&mut web_worker.worker); - ops::plugin::init(&mut web_worker.worker); - ops::net::init(&mut web_worker.worker); - ops::tls::init(&mut web_worker.worker); - ops::os::init(&mut web_worker.worker); - ops::permissions::init(&mut web_worker.worker); - ops::process::init(&mut web_worker.worker); - ops::random::init(&mut web_worker.worker); - ops::signal::init(&mut web_worker.worker); - ops::tty::init(&mut web_worker.worker); - } - } - - web_worker - } -} - -impl WebWorker { - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } -} - -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker - } -} - -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker - } -} - -impl Future for WebWorker { - type Output = Result<(), AnyError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let worker = &mut inner.worker; - - let terminated = inner.handle.terminated.load(Ordering::Relaxed); - - if terminated { - return Poll::Ready(Ok(())); - } - - if !inner.event_loop_idle { - match worker.poll_unpin(cx) { - Poll::Ready(r) => { - let terminated = inner.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - - if let Err(e) = r { - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - inner.event_loop_idle = true; - } - Poll::Pending => {} - } - } - - if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - - if let Poll::Ready(r) = - worker.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - if let Err(e) = worker.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if inner.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } - - // Otherwise forward error to host - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - - // Let event loop be polled again - inner.event_loop_idle = false; - worker.waker.wake(); - } - None => unreachable!(), - } - } - - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::tokio_util; - use crate::worker::WorkerEvent; - use deno_core::serde_json::json; - - fn create_test_worker() -> WebWorker { - let main_module = - ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); - let global_state = GlobalState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( - "TEST".to_string(), - Permissions::allow_all(), - main_module, - global_state, - false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker - } - #[test] - fn test_worker_messages() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - return close(); - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute(source).unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - match maybe_msg { - Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); - } - _ => unreachable!(), - } - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - }); - join_handle.join().expect("Failed to join worker thread"); - } - - #[test] - fn removed_from_resource_table_on_close() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<WebWorkerHandle>(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_worker(); - worker.execute("onmessage = () => { close(); }").unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - }); - join_handle.join().expect("Failed to join worker thread"); - } -} |