summaryrefslogtreecommitdiff
path: root/cli/ops/workers.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r--cli/ops/workers.rs87
1 files changed, 55 insertions, 32 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index cf7378a91..ee60c6824 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -7,16 +7,21 @@ use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
+use crate::tokio_util;
use crate::worker::Worker;
use deno::*;
use futures;
-use futures::Async;
-use futures::Future;
-use futures::Sink;
-use futures::Stream;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
use std;
use std::convert::From;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::atomic::Ordering;
+use std::task::Context;
+use std::task::Poll;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op(
@@ -52,13 +57,13 @@ struct GetMessageFuture {
}
impl Future for GetMessageFuture {
- type Item = Option<Buf>;
- type Error = ErrBox;
+ type Output = Option<Buf>;
- fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
- let mut channels = self.state.worker_channels.lock().unwrap();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut channels = inner.state.worker_channels.lock().unwrap();
let receiver = &mut channels.receiver;
- receiver.poll().map_err(ErrBox::from)
+ receiver.poll_next_unpin(cx)
}
}
@@ -72,17 +77,15 @@ fn op_worker_get_message(
state: state.clone(),
};
- let op = op
- .map_err(move |_| -> ErrBox { unimplemented!() })
- .and_then(move |maybe_buf| {
- debug!("op_worker_get_message");
+ let op = op.then(move |maybe_buf| {
+ debug!("op_worker_get_message");
- futures::future::ok(json!({
- "data": maybe_buf.map(|buf| buf.to_owned())
- }))
- });
+ futures::future::ok(json!({
+ "data": maybe_buf.map(|buf| buf.to_owned())
+ }))
+ });
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
/// Post message to host as guest worker
@@ -94,9 +97,7 @@ fn op_worker_post_message(
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let mut channels = state.worker_channels.lock().unwrap();
let sender = &mut channels.sender;
- sender
- .send(d)
- .wait()
+ futures::executor::block_on(sender.send(d))
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
@@ -165,12 +166,35 @@ fn op_create_worker(
let op = worker
.execute_mod_async(&module_specifier, None, false)
- .and_then(move |()| Ok(exec_cb(worker)));
+ .and_then(move |()| futures::future::ok(exec_cb(worker)));
- let result = op.wait()?;
+ let result = tokio_util::block_on(op.boxed())?;
Ok(JsonOp::Sync(result))
}
+struct GetWorkerClosedFuture {
+ state: ThreadSafeState,
+ rid: ResourceId,
+}
+
+impl Future for GetWorkerClosedFuture {
+ type Output = Result<(), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut workers_table = inner.state.workers.lock().unwrap();
+ let maybe_worker = workers_table.get_mut(&inner.rid);
+ if maybe_worker.is_none() {
+ return Poll::Ready(Ok(()));
+ }
+ match maybe_worker.unwrap().poll_unpin(cx) {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
#[derive(Deserialize)]
struct HostGetWorkerClosedArgs {
id: i32,
@@ -185,18 +209,18 @@ fn op_host_get_worker_closed(
let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state_ = state.clone();
- let workers_table = state.workers.lock().unwrap();
- // TODO: handle bad worker id gracefully
- let worker = workers_table.get(&id).unwrap();
- let shared_worker_future = worker.clone().shared();
- let op = shared_worker_future.then(move |_result| {
+ let future = GetWorkerClosedFuture {
+ state: state.clone(),
+ rid: id,
+ };
+ let op = future.then(move |_result| {
let mut workers_table = state_.workers.lock().unwrap();
workers_table.remove(&id);
futures::future::ok(json!({}))
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
#[derive(Deserialize)]
@@ -225,7 +249,7 @@ fn op_host_get_message(
}))
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
#[derive(Deserialize)]
@@ -247,8 +271,7 @@ fn op_host_post_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
- worker
- .post_message(msg)
+ tokio_util::block_on(worker.post_message(msg).boxed())
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}