summaryrefslogtreecommitdiff
path: root/cli/web_worker.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-09-28 12:14:11 +0200
committerGitHub <noreply@github.com>2020-09-28 12:14:11 +0200
commit45d4fd44c9444241a898d3075b99e8871fccdd65 (patch)
treec6ac6bb880023326e935f96d56ef23ba891f92d3 /cli/web_worker.rs
parent8ceb165e5d1dc0c8d417e42ffc3a26e8f5a62a03 (diff)
refactor: move op state registration to workers (#7696)
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r--cli/web_worker.rs375
1 files changed, 0 insertions, 375 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
deleted file mode 100644
index cb2a8b87e..000000000
--- a/cli/web_worker.rs
+++ /dev/null
@@ -1,375 +0,0 @@
-// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-
-use crate::global_state::GlobalState;
-use crate::js;
-use crate::ops;
-use crate::permissions::Permissions;
-use crate::state::CliModuleLoader;
-use crate::worker::Worker;
-use crate::worker::WorkerEvent;
-use crate::worker::WorkerHandle;
-use deno_core::error::AnyError;
-use deno_core::futures::channel::mpsc;
-use deno_core::futures::future::FutureExt;
-use deno_core::futures::stream::StreamExt;
-use deno_core::v8;
-use deno_core::ModuleSpecifier;
-use std::future::Future;
-use std::ops::Deref;
-use std::ops::DerefMut;
-use std::pin::Pin;
-use std::sync::atomic::AtomicBool;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
-
-/// Wrapper for `WorkerHandle` that adds functionality
-/// for terminating workers.
-///
-/// This struct is used by host as well as worker itself.
-///
-/// Host uses it to communicate with worker and terminate it,
-/// while worker uses it only to finish execution on `self.close()`.
-#[derive(Clone)]
-pub struct WebWorkerHandle {
- worker_handle: WorkerHandle,
- terminate_tx: mpsc::Sender<()>,
- terminated: Arc<AtomicBool>,
- isolate_handle: v8::IsolateHandle,
-}
-
-impl Deref for WebWorkerHandle {
- type Target = WorkerHandle;
- fn deref(&self) -> &Self::Target {
- &self.worker_handle
- }
-}
-
-impl DerefMut for WebWorkerHandle {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.worker_handle
- }
-}
-
-impl WebWorkerHandle {
- pub fn terminate(&self) {
- // This function can be called multiple times by whomever holds
- // the handle. However only a single "termination" should occur so
- // we need a guard here.
- let already_terminated = self.terminated.swap(true, Ordering::Relaxed);
-
- if !already_terminated {
- self.isolate_handle.terminate_execution();
- let mut sender = self.terminate_tx.clone();
- // This call should be infallible hence the `expect`.
- // This might change in the future.
- sender.try_send(()).expect("Failed to terminate");
- }
- }
-}
-
-/// This worker is implementation of `Worker` Web API
-///
-/// At the moment this type of worker supports only
-/// communication with parent and creating new workers.
-///
-/// Each `WebWorker` is either a child of `MainWorker` or other
-/// `WebWorker`.
-pub struct WebWorker {
- worker: Worker,
- event_loop_idle: bool,
- terminate_rx: mpsc::Receiver<()>,
- handle: WebWorkerHandle,
- pub has_deno_namespace: bool,
-}
-
-impl WebWorker {
- pub fn new(
- name: String,
- permissions: Permissions,
- main_module: ModuleSpecifier,
- global_state: Arc<GlobalState>,
- has_deno_namespace: bool,
- ) -> Self {
- let loader = CliModuleLoader::new_for_worker();
- let mut worker = Worker::new(
- name,
- Some(js::deno_isolate_init()),
- permissions,
- main_module,
- global_state,
- loader,
- false,
- );
-
- let terminated = Arc::new(AtomicBool::new(false));
- let isolate_handle = worker.isolate.thread_safe_handle();
- let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1);
-
- let handle = WebWorkerHandle {
- worker_handle: worker.thread_safe_handle(),
- terminated,
- isolate_handle,
- terminate_tx,
- };
-
- let mut web_worker = Self {
- worker,
- event_loop_idle: false,
- terminate_rx,
- handle,
- has_deno_namespace,
- };
-
- {
- ops::runtime::init(&mut web_worker.worker);
- let sender = web_worker.worker.internal_channels.sender.clone();
- let handle = web_worker.thread_safe_handle();
- ops::web_worker::init(&mut web_worker.worker, sender, handle);
- ops::worker_host::init(&mut web_worker.worker);
- ops::reg_json_sync(
- &mut web_worker.worker,
- "op_domain_to_ascii",
- deno_web::op_domain_to_ascii,
- );
- ops::io::init(&mut web_worker.worker);
- ops::reg_json_sync(
- &mut web_worker.worker,
- "op_close",
- deno_core::op_close,
- );
- ops::reg_json_sync(
- &mut web_worker.worker,
- "op_resources",
- deno_core::op_resources,
- );
- ops::errors::init(&mut web_worker.worker);
- ops::timers::init(&mut web_worker.worker);
- ops::fetch::init(&mut web_worker.worker);
- ops::websocket::init(&mut web_worker.worker);
-
- if has_deno_namespace {
- ops::runtime_compiler::init(&mut web_worker.worker);
- ops::fs::init(&mut web_worker.worker);
- ops::fs_events::init(&mut web_worker.worker);
- ops::plugin::init(&mut web_worker.worker);
- ops::net::init(&mut web_worker.worker);
- ops::tls::init(&mut web_worker.worker);
- ops::os::init(&mut web_worker.worker);
- ops::permissions::init(&mut web_worker.worker);
- ops::process::init(&mut web_worker.worker);
- ops::random::init(&mut web_worker.worker);
- ops::signal::init(&mut web_worker.worker);
- ops::tty::init(&mut web_worker.worker);
- }
- }
-
- web_worker
- }
-}
-
-impl WebWorker {
- /// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WebWorkerHandle {
- self.handle.clone()
- }
-}
-
-impl Deref for WebWorker {
- type Target = Worker;
- fn deref(&self) -> &Self::Target {
- &self.worker
- }
-}
-
-impl DerefMut for WebWorker {
- fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.worker
- }
-}
-
-impl Future for WebWorker {
- type Output = Result<(), AnyError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let worker = &mut inner.worker;
-
- let terminated = inner.handle.terminated.load(Ordering::Relaxed);
-
- if terminated {
- return Poll::Ready(Ok(()));
- }
-
- if !inner.event_loop_idle {
- match worker.poll_unpin(cx) {
- Poll::Ready(r) => {
- let terminated = inner.handle.terminated.load(Ordering::Relaxed);
- if terminated {
- return Poll::Ready(Ok(()));
- }
-
- if let Err(e) = r {
- let mut sender = worker.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
- inner.event_loop_idle = true;
- }
- Poll::Pending => {}
- }
- }
-
- if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) {
- // terminate_rx should never be closed
- assert!(r.is_some());
- return Poll::Ready(Ok(()));
- }
-
- if let Poll::Ready(r) =
- worker.internal_channels.receiver.poll_next_unpin(cx)
- {
- match r {
- Some(msg) => {
- let msg = String::from_utf8(msg.to_vec()).unwrap();
- let script = format!("workerMessageRecvCallback({})", msg);
-
- if let Err(e) = worker.execute(&script) {
- // If execution was terminated during message callback then
- // just ignore it
- if inner.handle.terminated.load(Ordering::Relaxed) {
- return Poll::Ready(Ok(()));
- }
-
- // Otherwise forward error to host
- let mut sender = worker.internal_channels.sender.clone();
- sender
- .try_send(WorkerEvent::Error(e))
- .expect("Failed to post message to host");
- }
-
- // Let event loop be polled again
- inner.event_loop_idle = false;
- worker.waker.wake();
- }
- None => unreachable!(),
- }
- }
-
- Poll::Pending
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::tokio_util;
- use crate::worker::WorkerEvent;
- use deno_core::serde_json::json;
-
- fn create_test_worker() -> WebWorker {
- let main_module =
- ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap();
- let global_state = GlobalState::mock(vec!["deno".to_string()], None);
- let mut worker = WebWorker::new(
- "TEST".to_string(),
- Permissions::allow_all(),
- main_module,
- global_state,
- false,
- );
- worker
- .execute("bootstrap.workerRuntime(\"TEST\", false)")
- .unwrap();
- worker
- }
- #[test]
- fn test_worker_messages() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_worker();
- let source = r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- return close();
- } else {
- console.assert(e.data === "hi");
- }
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#;
- worker.execute(source).unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker);
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- tokio_util::run_basic(async move {
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
-
- let r = handle.post_message(msg.clone());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
- match maybe_msg {
- Some(WorkerEvent::Message(buf)) => {
- assert_eq!(*buf, *b"[1,2,3]");
- }
- _ => unreachable!(),
- }
-
- let msg = json!("exit")
- .to_string()
- .into_boxed_str()
- .into_boxed_bytes();
- let r = handle.post_message(msg);
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
- });
- join_handle.join().expect("Failed to join worker thread");
- }
-
- #[test]
- fn removed_from_resource_table_on_close() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_worker();
- worker.execute("onmessage = () => { close(); }").unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker);
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- tokio_util::run_basic(async move {
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = handle.post_message(msg.clone());
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
- });
- join_handle.join().expect("Failed to join worker thread");
- }
-}