diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-02-11 10:04:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-11 10:04:59 +0100 |
commit | 79b3bc05d6de520f1df73face1744ae3d8be0bb8 (patch) | |
tree | f4c449efa67b88c767df52dd3ecec2246dded2e5 /cli/ops/web_worker.rs | |
parent | 81905a867ea3f942619229e330840d132c57a5da (diff) |
workers: basic event loop (#3828)
* establish basic event loop for workers
* make "self.close()" inside worker
* remove "runWorkerMessageLoop() - instead manually call global function
in Rust when message arrives. This is done in preparation for structured clone
* refactor "WorkerChannel" and use distinct structs for internal
and external channels; "WorkerChannelsInternal" and "WorkerHandle"
* move "State.worker_channels_internal" to "Worker.internal_channels"
* add "WorkerEvent" enum for child->host communication;
currently "Message(Buf)" and "Error(ErrBox)" variants are supported
* add tests for nested workers
* add tests for worker throwing error on startup
Diffstat (limited to 'cli/ops/web_worker.rs')
-rw-r--r-- | cli/ops/web_worker.rs | 80 |
1 files changed, 40 insertions, 40 deletions
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index ae6b10abc..e22c0f221 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -1,65 +1,65 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; -use crate::deno_error::DenoError; -use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::state::State; +use crate::worker::WorkerEvent; use deno_core::*; use futures; -use futures::future::FutureExt; +use futures::channel::mpsc; +use futures::sink::SinkExt; use std; use std::convert::From; -pub fn init(i: &mut Isolate, s: &State) { +pub fn web_worker_op<D>( + sender: mpsc::Sender<WorkerEvent>, + dispatcher: D, +) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox> +where + D: Fn( + &mpsc::Sender<WorkerEvent>, + Value, + Option<ZeroCopyBuf>, + ) -> Result<JsonOp, ErrBox>, +{ + move |args: Value, zero_copy: Option<ZeroCopyBuf>| -> Result<JsonOp, ErrBox> { + dispatcher(&sender, args, zero_copy) + } +} + +pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) { i.register_op( "worker_post_message", - s.core_op(json_op(s.stateful_op(op_worker_post_message))), + s.core_op(json_op(web_worker_op( + sender.clone(), + op_worker_post_message, + ))), ); i.register_op( - "worker_get_message", - s.core_op(json_op(s.stateful_op(op_worker_get_message))), + "worker_close", + s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))), ); } -/// Get message from host as guest worker -fn op_worker_get_message( - state: &State, - _args: Value, - _data: Option<ZeroCopyBuf>, -) -> Result<JsonOp, ErrBox> { - let state_ = state.clone(); - let op = async move { - let fut = { - let state = state_.borrow(); - state - .worker_channels_internal - .as_ref() - .unwrap() - .get_message() - }; - let maybe_buf = fut.await; - debug!("op_worker_get_message"); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed_local())) -} - /// Post message to host as guest worker fn op_worker_post_message( - state: &State, + sender: &mpsc::Sender<WorkerEvent>, _args: Value, data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let state = state.borrow(); - let fut = state - .worker_channels_internal - .as_ref() - .unwrap() - .post_message(d); - futures::executor::block_on(fut) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + let mut sender = sender.clone(); + let fut = sender.send(WorkerEvent::Message(d)); + futures::executor::block_on(fut).expect("Failed to post message to host"); + Ok(JsonOp::Sync(json!({}))) +} +/// Notify host that guest worker closes +fn op_worker_close( + sender: &mpsc::Sender<WorkerEvent>, + _args: Value, + _data: Option<ZeroCopyBuf>, +) -> Result<JsonOp, ErrBox> { + let mut sender = sender.clone(); + sender.close_channel(); Ok(JsonOp::Sync(json!({}))) } |