summaryrefslogtreecommitdiff
path: root/cli/ops/workers.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-20 01:17:05 +0100
committerRy Dahl <ry@tinyclouds.org>2019-11-19 19:17:05 -0500
commit1912ed674097588adb7b83e7b78043b2168821f3 (patch)
treef906b35ca15f031e332b6f496e50db6b59491e8f /cli/ops/workers.rs
parent6708fcc38688b80d4e052f755f02efb09a2071d1 (diff)
remove tokio_util::block_on from ops/workers.rs (#3381)
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r--cli/ops/workers.rs35
1 files changed, 22 insertions, 13 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 42f93ec57..a03290545 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -7,7 +7,6 @@ use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
-use crate::tokio_util;
use crate::worker::Worker;
use deno::*;
use futures;
@@ -20,6 +19,7 @@ use std::convert::From;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
+use std::sync::mpsc;
use std::task::Context;
use std::task::Poll;
@@ -153,23 +153,31 @@ fn op_create_worker(
js_check(worker.execute(&deno_main_call));
js_check(worker.execute("workerMain()"));
- let exec_cb = move |worker: Worker| {
- let worker_id = parent_state.add_child_worker(worker);
- json!(worker_id)
- };
+ let worker_id = parent_state.add_child_worker(worker.clone());
+ let response = json!(worker_id);
// Has provided source code, execute immediately.
if has_source_code {
js_check(worker.execute(&source_code));
- return Ok(JsonOp::Sync(exec_cb(worker)));
+ return Ok(JsonOp::Sync(response));
}
- let op = worker
+ // TODO(bartlomieju): this should spawn mod execution on separate tokio task
+ // and block on receving message on a channel or even use sync channel /shrug
+ let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
+ let fut = worker
.execute_mod_async(&module_specifier, None, false)
- .and_then(move |()| futures::future::ok(exec_cb(worker)));
-
- let result = tokio_util::block_on(op.boxed())?;
- Ok(JsonOp::Sync(result))
+ .then(move |result| {
+ sender.send(result).expect("Failed to send message");
+ futures::future::ok(())
+ })
+ .boxed()
+ .compat();
+ tokio::spawn(fut);
+
+ let result = receiver.recv().expect("Failed to receive message");
+ result?;
+ Ok(JsonOp::Sync(response))
}
struct GetWorkerClosedFuture {
@@ -271,9 +279,10 @@ fn op_host_post_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
- worker
+ let fut = worker
.post_message(msg)
- .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+ .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
+ futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({})))
}