diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-20 01:17:05 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-19 19:17:05 -0500 |
commit | 1912ed674097588adb7b83e7b78043b2168821f3 (patch) | |
tree | f906b35ca15f031e332b6f496e50db6b59491e8f /cli/ops/workers.rs | |
parent | 6708fcc38688b80d4e052f755f02efb09a2071d1 (diff) |
remove tokio_util::block_on from ops/workers.rs (#3381)
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r-- | cli/ops/workers.rs | 35 |
1 files changed, 22 insertions, 13 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 42f93ec57..a03290545 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -7,7 +7,6 @@ use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; -use crate::tokio_util; use crate::worker::Worker; use deno::*; use futures; @@ -20,6 +19,7 @@ use std::convert::From; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering; +use std::sync::mpsc; use std::task::Context; use std::task::Poll; @@ -153,23 +153,31 @@ fn op_create_worker( js_check(worker.execute(&deno_main_call)); js_check(worker.execute("workerMain()")); - let exec_cb = move |worker: Worker| { - let worker_id = parent_state.add_child_worker(worker); - json!(worker_id) - }; + let worker_id = parent_state.add_child_worker(worker.clone()); + let response = json!(worker_id); // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(exec_cb(worker))); + return Ok(JsonOp::Sync(response)); } - let op = worker + // TODO(bartlomieju): this should spawn mod execution on separate tokio task + // and block on receving message on a channel or even use sync channel /shrug + let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1); + let fut = worker .execute_mod_async(&module_specifier, None, false) - .and_then(move |()| futures::future::ok(exec_cb(worker))); - - let result = tokio_util::block_on(op.boxed())?; - Ok(JsonOp::Sync(result)) + .then(move |result| { + sender.send(result).expect("Failed to send message"); + futures::future::ok(()) + }) + .boxed() + .compat(); + tokio::spawn(fut); + + let result = receiver.recv().expect("Failed to receive message"); + result?; + Ok(JsonOp::Sync(response)) } struct GetWorkerClosedFuture { @@ -271,9 +279,10 @@ fn op_host_post_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - worker + let fut = worker .post_message(msg) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); + futures::executor::block_on(fut)?; Ok(JsonOp::Sync(json!({}))) } |