diff options
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 28 |
1 files changed, 8 insertions, 20 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 6ac48228d..c64e86c1c 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -57,21 +57,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); } -struct GetMessageFuture { - state: ThreadSafeState, -} - -impl Future for GetMessageFuture { - type Output = Option<Buf>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut channels = inner.state.worker_channels.lock().unwrap(); - let receiver = &mut channels.receiver; - receiver.poll_next_unpin(cx) - } -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateWorkerArgs { @@ -250,9 +235,12 @@ fn op_host_close_worker( let mut workers_table = state_.workers.lock().unwrap(); let maybe_worker = workers_table.remove(&id); if let Some(worker) = maybe_worker { - let mut channels = worker.state.worker_channels.lock().unwrap(); - channels.sender.close_channel(); - channels.receiver.close(); + let channels = worker.state.worker_channels.clone(); + let mut sender = channels.sender.clone(); + sender.close_channel(); + + let mut receiver = futures::executor::block_on(channels.receiver.lock()); + receiver.close(); }; Ok(JsonOp::Sync(json!({}))) @@ -285,9 +273,9 @@ fn op_host_get_message( _data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetMessageArgs = serde_json::from_value(args)?; - + let state_ = state.clone(); let id = args.id as u32; - let mut table = state.workers.lock().unwrap(); + let mut table = state_.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; let fut = worker.get_message(); |