diff options
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r-- | cli/ops/workers.rs | 87 |
1 files changed, 55 insertions, 32 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index cf7378a91..ee60c6824 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -7,16 +7,21 @@ use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; +use crate::tokio_util; use crate::worker::Worker; use deno::*; use futures; -use futures::Async; -use futures::Future; -use futures::Sink; -use futures::Stream; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; use std; use std::convert::From; +use std::future::Future; +use std::pin::Pin; use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op( @@ -52,13 +57,13 @@ struct GetMessageFuture { } impl Future for GetMessageFuture { - type Item = Option<Buf>; - type Error = ErrBox; + type Output = Option<Buf>; - fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { - let mut channels = self.state.worker_channels.lock().unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); let receiver = &mut channels.receiver; - receiver.poll().map_err(ErrBox::from) + receiver.poll_next_unpin(cx) } } @@ -72,17 +77,15 @@ fn op_worker_get_message( state: state.clone(), }; - let op = op - .map_err(move |_| -> ErrBox { unimplemented!() }) - .and_then(move |maybe_buf| { - debug!("op_worker_get_message"); + let op = op.then(move |maybe_buf| { + debug!("op_worker_get_message"); - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf.to_owned()) - })) - }); + futures::future::ok(json!({ + "data": maybe_buf.map(|buf| buf.to_owned()) + })) + }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } /// Post message to host as guest worker @@ -94,9 +97,7 @@ fn op_worker_post_message( let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let mut channels = state.worker_channels.lock().unwrap(); let sender = &mut channels.sender; - sender - .send(d) - .wait() + futures::executor::block_on(sender.send(d)) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; Ok(JsonOp::Sync(json!({}))) @@ -165,12 +166,35 @@ fn op_create_worker( let op = worker .execute_mod_async(&module_specifier, None, false) - .and_then(move |()| Ok(exec_cb(worker))); + .and_then(move |()| futures::future::ok(exec_cb(worker))); - let result = op.wait()?; + let result = tokio_util::block_on(op.boxed())?; Ok(JsonOp::Sync(result)) } +struct GetWorkerClosedFuture { + state: ThreadSafeState, + rid: ResourceId, +} + +impl Future for GetWorkerClosedFuture { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut workers_table = inner.state.workers.lock().unwrap(); + let maybe_worker = workers_table.get_mut(&inner.rid); + if maybe_worker.is_none() { + return Poll::Ready(Ok(())); + } + match maybe_worker.unwrap().poll_unpin(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } +} + #[derive(Deserialize)] struct HostGetWorkerClosedArgs { id: i32, @@ -185,18 +209,18 @@ fn op_host_get_worker_closed( let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; let id = args.id as u32; let state_ = state.clone(); - let workers_table = state.workers.lock().unwrap(); - // TODO: handle bad worker id gracefully - let worker = workers_table.get(&id).unwrap(); - let shared_worker_future = worker.clone().shared(); - let op = shared_worker_future.then(move |_result| { + let future = GetWorkerClosedFuture { + state: state.clone(), + rid: id, + }; + let op = future.then(move |_result| { let mut workers_table = state_.workers.lock().unwrap(); workers_table.remove(&id); futures::future::ok(json!({})) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -225,7 +249,7 @@ fn op_host_get_message( })) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -247,8 +271,7 @@ fn op_host_post_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - worker - .post_message(msg) + tokio_util::block_on(worker.post_message(msg).boxed()) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; Ok(JsonOp::Sync(json!({}))) } |