diff options
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r-- | cli/web_worker.rs | 72 |
1 files changed, 61 insertions, 11 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs index c0a712aed..7efec476c 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -2,17 +2,19 @@ use crate::ops; use crate::state::State; use crate::worker::Worker; +use crate::worker::WorkerEvent; use deno_core; use deno_core::ErrBox; use deno_core::StartupData; use futures::future::FutureExt; +use futures::stream::StreamExt; +use futures::SinkExt; use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; use std::task::Context; use std::task::Poll; - /// This worker is implementation of `Worker` Web API /// /// At the moment this type of worker supports only @@ -20,7 +22,10 @@ use std::task::Poll; /// /// Each `WebWorker` is either a child of `MainWorker` or other /// `WebWorker`. -pub struct WebWorker(Worker); +pub struct WebWorker { + worker: Worker, + is_ready: bool, +} impl WebWorker { pub fn new(name: String, startup_data: StartupData, state: State) -> Self { @@ -36,20 +41,23 @@ impl WebWorker { ops::fetch::init(isolate, &state); } - Self(worker) + Self { + worker, + is_ready: false, + } } } impl Deref for WebWorker { type Target = Worker; fn deref(&self) -> &Self::Target { - &self.0 + &self.worker } } impl DerefMut for WebWorker { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.worker } } @@ -58,14 +66,58 @@ impl Future for WebWorker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let inner = self.get_mut(); - inner.0.poll_unpin(cx) + let worker = &mut inner.worker; + + if !inner.is_ready { + match worker.poll_unpin(cx) { + Poll::Ready(r) => { + if let Err(e) = r { + let mut sender = worker.internal_channels.sender.clone(); + futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + .expect("Failed to post message to host"); + } + inner.is_ready = true; + } + Poll::Pending => {} + } + } + + let maybe_msg = { + match worker.internal_channels.receiver.poll_next_unpin(cx) { + Poll::Ready(r) => match r { + Some(msg) => { + let msg_str = String::from_utf8(msg.to_vec()).unwrap(); + debug!("received message from host: {}", msg_str); + Some(msg_str) + } + None => { + debug!("channel closed by host, worker event loop shuts down"); + return Poll::Ready(Ok(())); + } + }, + Poll::Pending => None, + } + }; + + if let Some(msg) = maybe_msg { + // TODO: just add second value and then bind using rusty_v8 + // to get structured clone/transfer working + let script = format!("workerMessageRecvCallback({})", msg); + worker + .execute(&script) + .expect("Failed to execute message cb"); + // Let worker be polled again + inner.is_ready = false; + worker.waker.wake(); + } + + Poll::Pending } } #[cfg(test)] mod tests { use super::*; - use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::State; use crate::tokio_util; @@ -104,8 +156,7 @@ mod tests { worker.execute(source).unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let mut rt = tokio_util::create_basic_runtime(); - let r = run_worker_loop(&mut rt, &mut worker); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); @@ -154,8 +205,7 @@ mod tests { worker.execute("onmessage = () => { close(); }").unwrap(); let handle = worker.thread_safe_handle(); handle_sender.send(handle).unwrap(); - let mut rt = tokio_util::create_basic_runtime(); - let r = run_worker_loop(&mut rt, &mut worker); + let r = tokio_util::run_basic(worker); assert!(r.is_ok()) }); |