summaryrefslogtreecommitdiff
path: root/cli/web_worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/web_worker.rs')
-rw-r--r--cli/web_worker.rs72
1 files changed, 61 insertions, 11 deletions
diff --git a/cli/web_worker.rs b/cli/web_worker.rs
index c0a712aed..7efec476c 100644
--- a/cli/web_worker.rs
+++ b/cli/web_worker.rs
@@ -2,17 +2,19 @@
use crate::ops;
use crate::state::State;
use crate::worker::Worker;
+use crate::worker::WorkerEvent;
use deno_core;
use deno_core::ErrBox;
use deno_core::StartupData;
use futures::future::FutureExt;
+use futures::stream::StreamExt;
+use futures::SinkExt;
use std::future::Future;
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
-
/// This worker is implementation of `Worker` Web API
///
/// At the moment this type of worker supports only
@@ -20,7 +22,10 @@ use std::task::Poll;
///
/// Each `WebWorker` is either a child of `MainWorker` or other
/// `WebWorker`.
-pub struct WebWorker(Worker);
+pub struct WebWorker {
+ worker: Worker,
+ is_ready: bool,
+}
impl WebWorker {
pub fn new(name: String, startup_data: StartupData, state: State) -> Self {
@@ -36,20 +41,23 @@ impl WebWorker {
ops::fetch::init(isolate, &state);
}
- Self(worker)
+ Self {
+ worker,
+ is_ready: false,
+ }
}
}
impl Deref for WebWorker {
type Target = Worker;
fn deref(&self) -> &Self::Target {
- &self.0
+ &self.worker
}
}
impl DerefMut for WebWorker {
fn deref_mut(&mut self) -> &mut Self::Target {
- &mut self.0
+ &mut self.worker
}
}
@@ -58,14 +66,58 @@ impl Future for WebWorker {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = self.get_mut();
- inner.0.poll_unpin(cx)
+ let worker = &mut inner.worker;
+
+ if !inner.is_ready {
+ match worker.poll_unpin(cx) {
+ Poll::Ready(r) => {
+ if let Err(e) = r {
+ let mut sender = worker.internal_channels.sender.clone();
+ futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
+ .expect("Failed to post message to host");
+ }
+ inner.is_ready = true;
+ }
+ Poll::Pending => {}
+ }
+ }
+
+ let maybe_msg = {
+ match worker.internal_channels.receiver.poll_next_unpin(cx) {
+ Poll::Ready(r) => match r {
+ Some(msg) => {
+ let msg_str = String::from_utf8(msg.to_vec()).unwrap();
+ debug!("received message from host: {}", msg_str);
+ Some(msg_str)
+ }
+ None => {
+ debug!("channel closed by host, worker event loop shuts down");
+ return Poll::Ready(Ok(()));
+ }
+ },
+ Poll::Pending => None,
+ }
+ };
+
+ if let Some(msg) = maybe_msg {
+ // TODO: just add second value and then bind using rusty_v8
+ // to get structured clone/transfer working
+ let script = format!("workerMessageRecvCallback({})", msg);
+ worker
+ .execute(&script)
+ .expect("Failed to execute message cb");
+ // Let worker be polled again
+ inner.is_ready = false;
+ worker.waker.wake();
+ }
+
+ Poll::Pending
}
}
#[cfg(test)]
mod tests {
use super::*;
- use crate::ops::worker_host::run_worker_loop;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util;
@@ -104,8 +156,7 @@ mod tests {
worker.execute(source).unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
- let mut rt = tokio_util::create_basic_runtime();
- let r = run_worker_loop(&mut rt, &mut worker);
+ let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});
@@ -154,8 +205,7 @@ mod tests {
worker.execute("onmessage = () => { close(); }").unwrap();
let handle = worker.thread_safe_handle();
handle_sender.send(handle).unwrap();
- let mut rt = tokio_util::create_basic_runtime();
- let r = run_worker_loop(&mut rt, &mut worker);
+ let r = tokio_util::run_basic(worker);
assert!(r.is_ok())
});