summaryrefslogtreecommitdiff
path: root/cli/ops/web_worker.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-02-11 10:04:59 +0100
committerGitHub <noreply@github.com>2020-02-11 10:04:59 +0100
commit79b3bc05d6de520f1df73face1744ae3d8be0bb8 (patch)
treef4c449efa67b88c767df52dd3ecec2246dded2e5 /cli/ops/web_worker.rs
parent81905a867ea3f942619229e330840d132c57a5da (diff)
workers: basic event loop (#3828)
* establish basic event loop for workers * make "self.close()" inside worker * remove "runWorkerMessageLoop() - instead manually call global function in Rust when message arrives. This is done in preparation for structured clone * refactor "WorkerChannel" and use distinct structs for internal and external channels; "WorkerChannelsInternal" and "WorkerHandle" * move "State.worker_channels_internal" to "Worker.internal_channels" * add "WorkerEvent" enum for child->host communication; currently "Message(Buf)" and "Error(ErrBox)" variants are supported * add tests for nested workers * add tests for worker throwing error on startup
Diffstat (limited to 'cli/ops/web_worker.rs')
-rw-r--r--cli/ops/web_worker.rs80
1 files changed, 40 insertions, 40 deletions
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
index ae6b10abc..e22c0f221 100644
--- a/cli/ops/web_worker.rs
+++ b/cli/ops/web_worker.rs
@@ -1,65 +1,65 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
-use crate::deno_error::DenoError;
-use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::state::State;
+use crate::worker::WorkerEvent;
use deno_core::*;
use futures;
-use futures::future::FutureExt;
+use futures::channel::mpsc;
+use futures::sink::SinkExt;
use std;
use std::convert::From;
-pub fn init(i: &mut Isolate, s: &State) {
+pub fn web_worker_op<D>(
+ sender: mpsc::Sender<WorkerEvent>,
+ dispatcher: D,
+) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>
+where
+ D: Fn(
+ &mpsc::Sender<WorkerEvent>,
+ Value,
+ Option<ZeroCopyBuf>,
+ ) -> Result<JsonOp, ErrBox>,
+{
+ move |args: Value, zero_copy: Option<ZeroCopyBuf>| -> Result<JsonOp, ErrBox> {
+ dispatcher(&sender, args, zero_copy)
+ }
+}
+
+pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) {
i.register_op(
"worker_post_message",
- s.core_op(json_op(s.stateful_op(op_worker_post_message))),
+ s.core_op(json_op(web_worker_op(
+ sender.clone(),
+ op_worker_post_message,
+ ))),
);
i.register_op(
- "worker_get_message",
- s.core_op(json_op(s.stateful_op(op_worker_get_message))),
+ "worker_close",
+ s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))),
);
}
-/// Get message from host as guest worker
-fn op_worker_get_message(
- state: &State,
- _args: Value,
- _data: Option<ZeroCopyBuf>,
-) -> Result<JsonOp, ErrBox> {
- let state_ = state.clone();
- let op = async move {
- let fut = {
- let state = state_.borrow();
- state
- .worker_channels_internal
- .as_ref()
- .unwrap()
- .get_message()
- };
- let maybe_buf = fut.await;
- debug!("op_worker_get_message");
- Ok(json!({ "data": maybe_buf }))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
-}
-
/// Post message to host as guest worker
fn op_worker_post_message(
- state: &State,
+ sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let state = state.borrow();
- let fut = state
- .worker_channels_internal
- .as_ref()
- .unwrap()
- .post_message(d);
- futures::executor::block_on(fut)
- .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+ let mut sender = sender.clone();
+ let fut = sender.send(WorkerEvent::Message(d));
+ futures::executor::block_on(fut).expect("Failed to post message to host");
+ Ok(JsonOp::Sync(json!({})))
+}
+/// Notify host that guest worker closes
+fn op_worker_close(
+ sender: &mpsc::Sender<WorkerEvent>,
+ _args: Value,
+ _data: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let mut sender = sender.clone();
+ sender.close_channel();
Ok(JsonOp::Sync(json!({})))
}