summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-09 21:07:14 +0100
committerBert Belder <bertbelder@gmail.com>2019-11-09 12:07:14 -0800
commit335e8bd33cd22891f12b58c4d9489d19c886ae4c (patch)
tree29d2c7098d922d325ea7ddf26a935eb498062f9d
parentd586f119fa588a590a4ba2b74c8c210de710e3e7 (diff)
refactor: worker is no longer a resource (#3290)
-rw-r--r--cli/compilers/ts.rs7
-rw-r--r--cli/js/lib.deno_runtime.d.ts2
-rw-r--r--cli/js/workers.ts22
-rw-r--r--cli/lib.rs10
-rw-r--r--cli/ops/workers.rs73
-rw-r--r--cli/state.rs70
-rw-r--r--cli/worker.rs115
7 files changed, 155 insertions, 144 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index 854963eaa..327b3fbeb 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -223,8 +223,10 @@ impl TsCompiler {
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
- let worker_state = ThreadSafeState::new(global_state.clone(), None, true)
- .expect("Unable to create worker state");
+ let (int, ext) = ThreadSafeState::create_channels();
+ let worker_state =
+ ThreadSafeState::new(global_state.clone(), None, true, int)
+ .expect("Unable to create worker state");
// Count how many times we start the compiler worker.
global_state
@@ -236,6 +238,7 @@ impl TsCompiler {
"TS".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
+ ext,
);
worker.execute("denoMain()").unwrap();
worker.execute("workerMain()").unwrap();
diff --git a/cli/js/lib.deno_runtime.d.ts b/cli/js/lib.deno_runtime.d.ts
index 4331319bf..1f01f1384 100644
--- a/cli/js/lib.deno_runtime.d.ts
+++ b/cli/js/lib.deno_runtime.d.ts
@@ -2647,7 +2647,7 @@ declare namespace workers {
noDenoNamespace?: boolean;
}
export class WorkerImpl implements Worker {
- private readonly rid;
+ private readonly id;
private isClosing;
private readonly isClosedPromise;
onerror?: () => void;
diff --git a/cli/js/workers.ts b/cli/js/workers.ts
index 37061063f..028817573 100644
--- a/cli/js/workers.ts
+++ b/cli/js/workers.ts
@@ -35,17 +35,17 @@ function createWorker(
});
}
-async function hostGetWorkerClosed(rid: number): Promise<void> {
- await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { rid });
+async function hostGetWorkerClosed(id: number): Promise<void> {
+ await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { id });
}
-function hostPostMessage(rid: number, data: any): void {
+function hostPostMessage(id: number, data: any): void {
const dataIntArray = encodeMessage(data);
- sendSync(dispatch.OP_HOST_POST_MESSAGE, { rid }, dataIntArray);
+ sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray);
}
-async function hostGetMessage(rid: number): Promise<any> {
- const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { rid });
+async function hostGetMessage(id: number): Promise<any> {
+ const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
if (res.data != null) {
return decodeMessage(new Uint8Array(res.data));
@@ -123,7 +123,7 @@ export interface DenoWorkerOptions extends WorkerOptions {
}
export class WorkerImpl implements Worker {
- private readonly rid: number;
+ private readonly id: number;
private isClosing = false;
private readonly isClosedPromise: Promise<void>;
public onerror?: () => void;
@@ -152,14 +152,14 @@ export class WorkerImpl implements Worker {
sourceCode = blobBytes!;
}
- this.rid = createWorker(
+ this.id = createWorker(
specifier,
includeDenoNamespace,
hasSourceCode,
sourceCode
);
this.run();
- this.isClosedPromise = hostGetWorkerClosed(this.rid);
+ this.isClosedPromise = hostGetWorkerClosed(this.id);
this.isClosedPromise.then(
(): void => {
this.isClosing = true;
@@ -172,12 +172,12 @@ export class WorkerImpl implements Worker {
}
postMessage(data: any): void {
- hostPostMessage(this.rid, data);
+ hostPostMessage(this.id, data);
}
private async run(): Promise<void> {
while (!this.isClosing) {
- const data = await hostGetMessage(this.rid);
+ const data = await hostGetMessage(this.id);
if (data == null) {
log("worker got null message. quitting.");
break;
diff --git a/cli/lib.rs b/cli/lib.rs
index a22f61799..637986f9f 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -118,16 +118,22 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
global_state.main_module.clone(),
true,
+ int,
)
.map_err(deno_error::print_err_and_exit)
.unwrap();
- let worker =
- Worker::new("main".to_string(), startup_data::deno_isolate_init(), state);
+ let worker = Worker::new(
+ "main".to_string(),
+ startup_data::deno_isolate_init(),
+ state,
+ ext,
+ );
(worker, global_state)
}
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index f6dcf8042..cf7378a91 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -1,5 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use crate::deno_error::bad_resource;
use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
@@ -11,7 +12,6 @@ use deno::*;
use futures;
use futures::Async;
use futures::Future;
-use futures::IntoFuture;
use futures::Sink;
use futures::Stream;
use std;
@@ -138,23 +138,23 @@ fn op_create_worker(
}
}
+ let (int, ext) = ThreadSafeState::create_channels();
let child_state = ThreadSafeState::new(
state.global_state.clone(),
Some(module_specifier.clone()),
include_deno_namespace,
+ int,
)?;
- let rid = child_state.rid;
let name = format!("USER-WORKER-{}", specifier);
let deno_main_call = format!("denoMain({})", include_deno_namespace);
let mut worker =
- Worker::new(name, startup_data::deno_isolate_init(), child_state);
+ Worker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute(&deno_main_call));
js_check(worker.execute("workerMain()"));
let exec_cb = move |worker: Worker| {
- let mut workers_tl = parent_state.workers.lock().unwrap();
- workers_tl.insert(rid, worker.shared());
- json!(rid)
+ let worker_id = parent_state.add_child_worker(worker);
+ json!(worker_id)
};
// Has provided source code, execute immediately.
@@ -173,7 +173,7 @@ fn op_create_worker(
#[derive(Deserialize)]
struct HostGetWorkerClosedArgs {
- rid: i32,
+ id: i32,
}
/// Return when the worker closes
@@ -183,37 +183,41 @@ fn op_host_get_worker_closed(
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
- let state = state.clone();
-
- let shared_worker_future = {
- let workers_tl = state.workers.lock().unwrap();
- let worker = workers_tl.get(&rid).unwrap();
- worker.clone()
- };
-
- let op =
- shared_worker_future.then(move |_result| futures::future::ok(json!({})));
+ let id = args.id as u32;
+ let state_ = state.clone();
+ let workers_table = state.workers.lock().unwrap();
+ // TODO: handle bad worker id gracefully
+ let worker = workers_table.get(&id).unwrap();
+ let shared_worker_future = worker.clone().shared();
+
+ let op = shared_worker_future.then(move |_result| {
+ let mut workers_table = state_.workers.lock().unwrap();
+ workers_table.remove(&id);
+ futures::future::ok(json!({}))
+ });
Ok(JsonOp::Async(Box::new(op)))
}
#[derive(Deserialize)]
struct HostGetMessageArgs {
- rid: i32,
+ id: i32,
}
/// Get message from guest worker as host
fn op_host_get_message(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetMessageArgs = serde_json::from_value(args)?;
- let rid = args.rid as u32;
- let op = Worker::get_message_from_resource(rid)
+ let id = args.id as u32;
+ let mut table = state.workers.lock().unwrap();
+ // TODO: don't return bad resource anymore
+ let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
+ let op = worker
+ .get_message()
.map_err(move |_| -> ErrBox { unimplemented!() })
.and_then(move |maybe_buf| {
futures::future::ok(json!({
@@ -226,27 +230,26 @@ fn op_host_get_message(
#[derive(Deserialize)]
struct HostPostMessageArgs {
- rid: i32,
+ id: i32,
}
/// Post message to guest worker as host
fn op_host_post_message(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostPostMessageArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
-
- let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
-
- // TODO: rename to post_message_to_child(rid, d)
- Worker::post_message_to_resource(rid, d)
- .into_future()
- .wait()
+ let id = args.id as u32;
+ let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
+
+ debug!("post message to worker {}", id);
+ let mut table = state.workers.lock().unwrap();
+ // TODO: don't return bad resource anymore
+ let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
+ worker
+ .post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
-
Ok(JsonOp::Sync(json!({})))
}
diff --git a/cli/state.rs b/cli/state.rs
index 8d14042aa..edfac72c0 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -6,8 +6,6 @@ use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::permissions::DenoPermissions;
-use crate::resources;
-use crate::resources::ResourceId;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
use deno::Buf;
@@ -17,7 +15,6 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
-use futures::future::Shared;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@@ -26,15 +23,12 @@ use std;
use std::collections::HashMap;
use std::ops::Deref;
use std::str;
+use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
-use tokio::sync::mpsc as async_mpsc;
-
-// TODO: hold references to concrete Workers instead of shared futures of
-// those workers?
-pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
+use tokio::sync::mpsc;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@@ -53,10 +47,9 @@ pub struct State {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
- pub workers: Mutex<UserWorkerTable>,
+ pub workers: Mutex<HashMap<u32, Worker>>,
+ pub next_worker_id: AtomicUsize,
pub start_time: Instant,
- /// A reference to this worker's resource.
- pub rid: ResourceId,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
}
@@ -179,25 +172,26 @@ impl Loader for ThreadSafeState {
}
impl ThreadSafeState {
- pub fn new(
- global_state: ThreadSafeGlobalState,
- main_module: Option<ModuleSpecifier>,
- include_deno_namespace: bool,
- ) -> Result<Self, ErrBox> {
- let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
- let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
+ pub fn create_channels() -> (WorkerChannels, WorkerChannels) {
+ let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
+ let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannels {
- sender: worker_out_tx,
- receiver: worker_in_rx,
+ sender: out_tx,
+ receiver: in_rx,
};
let external_channels = WorkerChannels {
- sender: worker_in_tx,
- receiver: worker_out_rx,
+ sender: in_tx,
+ receiver: out_rx,
};
+ (internal_channels, external_channels)
+ }
- let mut table = resources::lock_resource_table();
- let rid = table.add("worker", Box::new(external_channels));
-
+ pub fn new(
+ global_state: ThreadSafeGlobalState,
+ main_module: Option<ModuleSpecifier>,
+ include_deno_namespace: bool,
+ internal_channels: WorkerChannels,
+ ) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> =
match global_state.flags.import_map_path.as_ref() {
None => None,
@@ -221,9 +215,9 @@ impl ThreadSafeState {
worker_channels: Mutex::new(internal_channels),
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
- workers: Mutex::new(UserWorkerTable::new()),
+ workers: Mutex::new(HashMap::new()),
+ next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
- rid,
seeded_rng,
include_deno_namespace,
};
@@ -231,6 +225,13 @@ impl ThreadSafeState {
Ok(ThreadSafeState(Arc::new(state)))
}
+ pub fn add_child_worker(&self, worker: Worker) -> u32 {
+ let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
+ let mut workers_tl = self.workers.lock().unwrap();
+ workers_tl.insert(worker_id, worker);
+ worker_id
+ }
+
#[inline]
pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> {
self.permissions.check_read(filename)
@@ -286,7 +287,10 @@ impl ThreadSafeState {
}
#[cfg(test)]
- pub fn mock(argv: Vec<String>) -> ThreadSafeState {
+ pub fn mock(
+ argv: Vec<String>,
+ internal_channels: WorkerChannels,
+ ) -> ThreadSafeState {
let module_specifier = if argv.is_empty() {
None
} else {
@@ -299,6 +303,7 @@ impl ThreadSafeState {
ThreadSafeGlobalState::mock(argv),
module_specifier,
true,
+ internal_channels,
)
.unwrap()
}
@@ -331,8 +336,9 @@ impl ThreadSafeState {
#[test]
fn thread_safe() {
fn f<S: Send + Sync>(_: S) {}
- f(ThreadSafeState::mock(vec![
- String::from("./deno"),
- String::from("hello.js"),
- ]));
+ let (int, _) = ThreadSafeState::create_channels();
+ f(ThreadSafeState::mock(
+ vec![String::from("./deno"), String::from("hello.js")],
+ int,
+ ));
}
diff --git a/cli/worker.rs b/cli/worker.rs
index eeda364c9..5d4675d00 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -1,10 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::deno_error::bad_resource;
use crate::fmt_errors::JSError;
use crate::ops;
-use crate::resources;
-use crate::resources::Resource;
-use crate::resources::ResourceId;
use crate::state::ThreadSafeState;
use deno;
use deno::Buf;
@@ -23,7 +19,7 @@ use std::sync::Mutex;
use tokio::sync::mpsc;
use url::Url;
-/// Wraps mpsc channels into a generic resource so they can be referenced
+/// Wraps mpsc channels so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
pub struct WorkerChannels {
@@ -31,8 +27,6 @@ pub struct WorkerChannels {
pub receiver: mpsc::Receiver<Buf>,
}
-impl Resource for WorkerChannels {}
-
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
/// high-level module loading.
#[derive(Clone)]
@@ -40,6 +34,7 @@ pub struct Worker {
pub name: String,
isolate: Arc<Mutex<deno::Isolate>>,
pub state: ThreadSafeState,
+ external_channels: Arc<Mutex<WorkerChannels>>,
}
impl Worker {
@@ -47,6 +42,7 @@ impl Worker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
+ external_channels: WorkerChannels,
) -> Self {
let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
{
@@ -86,10 +82,12 @@ impl Worker {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
+
Self {
name,
isolate,
state,
+ external_channels: Arc::new(Mutex::new(external_channels)),
}
}
@@ -140,35 +138,20 @@ impl Worker {
})
}
- /// Post message to worker as a host or privileged overlord
- pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> {
- Worker::post_message_to_resource(self.state.rid, buf)
- }
-
- pub fn post_message_to_resource(
- rid: resources::ResourceId,
- buf: Buf,
- ) -> Result<Async<()>, ErrBox> {
- debug!("post message to resource {}", rid);
- let mut table = resources::lock_resource_table();
- let worker = table
- .get_mut::<WorkerChannels>(rid)
- .ok_or_else(bad_resource)?;
- let sender = &mut worker.sender;
- sender
- .send(buf)
- .poll()
- .map(|_| Async::Ready(()))
- .map_err(ErrBox::from)
+ /// Post message to worker as a host.
+ ///
+ /// This method blocks current thread.
+ pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
+ let mut channels = self.external_channels.lock().unwrap();
+ let sender = &mut channels.sender;
+ sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from)
}
+ /// Get message from worker as a host.
pub fn get_message(self: &Self) -> WorkerReceiver {
- Worker::get_message_from_resource(self.state.rid)
- }
-
- pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver {
- debug!("get message from resource {}", rid);
- WorkerReceiver { rid }
+ WorkerReceiver {
+ channels: self.external_channels.clone(),
+ }
}
}
@@ -186,7 +169,7 @@ impl Future for Worker {
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
- rid: ResourceId,
+ channels: Arc<Mutex<WorkerChannels>>,
}
impl Future for WorkerReceiver {
@@ -194,12 +177,8 @@ impl Future for WorkerReceiver {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
- let mut table = resources::lock_resource_table();
- let worker = table
- .get_mut::<WorkerChannels>(self.rid)
- .ok_or_else(bad_resource)?;
- let receiver = &mut worker.receiver;
- receiver.poll().map_err(ErrBox::from)
+ let mut channels = self.channels.lock().unwrap();
+ channels.receiver.poll().map_err(ErrBox::from)
}
}
@@ -214,7 +193,6 @@ mod tests {
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::future::lazy;
- use futures::IntoFuture;
use std::sync::atomic::Ordering;
#[test]
@@ -233,13 +211,18 @@ mod tests {
Progress::new(),
)
.unwrap();
- let state =
- ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
- .unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
+ let state = ThreadSafeState::new(
+ global_state,
+ Some(module_specifier.clone()),
+ true,
+ int,
+ )
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state);
+ Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
.execute_mod_async(&module_specifier, None, false)
.then(|result| {
@@ -269,13 +252,18 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
.unwrap();
- let state =
- ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
- .unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
+ let state = ThreadSafeState::new(
+ global_state,
+ Some(module_specifier.clone()),
+ true,
+ int,
+ )
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
- Worker::new("TEST".to_string(), StartupData::None, state);
+ Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
.execute_mod_async(&module_specifier, None, false)
.then(|result| {
@@ -308,10 +296,12 @@ mod tests {
flags.reload = true;
let global_state =
ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
+ let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
Some(module_specifier.clone()),
true,
+ int,
)
.unwrap();
let global_state_ = global_state.clone();
@@ -321,6 +311,7 @@ mod tests {
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
+ ext,
);
worker.execute("denoMain()").unwrap();
worker
@@ -343,12 +334,17 @@ mod tests {
}
fn create_test_worker() -> Worker {
- let state = ThreadSafeState::mock(vec![
- String::from("./deno"),
- String::from("hello.js"),
- ]);
- let mut worker =
- Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state);
+ let (int, ext) = ThreadSafeState::create_channels();
+ let state = ThreadSafeState::mock(
+ vec![String::from("./deno"), String::from("hello.js")],
+ int,
+ );
+ let mut worker = Worker::new(
+ "TEST".to_string(),
+ startup_data::deno_isolate_init(),
+ state,
+ ext,
+ );
worker.execute("denoMain()").unwrap();
worker.execute("workerMain()").unwrap();
worker
@@ -384,7 +380,7 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = worker_.post_message(msg).into_future().wait();
+ let r = worker_.post_message(msg);
assert!(r.is_ok());
let maybe_msg = worker_.get_message().wait().unwrap();
@@ -396,7 +392,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
- let r = worker_.post_message(msg).into_future().wait();
+ let r = worker_.post_message(msg);
assert!(r.is_ok());
})
}
@@ -409,9 +405,7 @@ mod tests {
.execute("onmessage = () => { delete window.onmessage; }")
.unwrap();
- let rid = worker.state.rid;
let worker_ = worker.clone();
-
let worker_future = worker
.then(move |r| -> Result<(), ()> {
println!("workers.rs after resource close");
@@ -424,9 +418,8 @@ mod tests {
tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = worker_.post_message(msg).into_future().wait();
+ let r = worker_.post_message(msg);
assert!(r.is_ok());
- debug!("rid {:?}", rid);
worker_future.wait().unwrap();
})