diff options
Diffstat (limited to 'cli/ops/dispatch_json.rs')
-rw-r--r-- | cli/ops/dispatch_json.rs | 47 |
1 files changed, 18 insertions, 29 deletions
diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 5f1caac6f..38dc7932e 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -1,13 +1,15 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::tokio_util; use deno::*; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::task::SpawnExt; pub use serde_derive::Deserialize; use serde_json::json; pub use serde_json::Value; +use std::future::Future; +use std::pin::Pin; -pub type AsyncJsonOp = Box<dyn Future<Item = Value, Error = ErrBox> + Send>; +pub type AsyncJsonOp = + Pin<Box<dyn Future<Output = Result<Value, ErrBox>> + Send>>; pub enum JsonOp { Sync(Value), @@ -70,48 +72,35 @@ where } Ok(JsonOp::Async(fut)) => { assert!(promise_id.is_some()); - let fut2 = Box::new(fut.then(move |result| -> Result<Buf, ()> { - Ok(serialize_result(promise_id, result)) - })); - CoreOp::Async(fut2) + let fut2 = fut.then(move |result| { + futures::future::ok(serialize_result(promise_id, result)) + }); + CoreOp::Async(fut2.boxed()) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { CoreOp::Sync(buf) } else { - CoreOp::Async(Box::new(futures::future::ok(buf))) + CoreOp::Async(futures::future::ok(buf).boxed()) } } } } } -// This is just type conversion. Implement From trait? -// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85 -fn convert_blocking_json<F>(f: F) -> Poll<Value, ErrBox> -where - F: FnOnce() -> Result<Value, ErrBox>, -{ - use futures::Async::*; - match tokio_threadpool::blocking(f) { - Ok(Ready(Ok(v))) => Ok(Ready(v)), - Ok(Ready(Err(err))) => Err(err), - Ok(NotReady) => Ok(NotReady), - Err(err) => panic!("blocking error {}", err), - } -} - pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox> where - F: 'static + Send + FnOnce() -> Result<Value, ErrBox>, + F: 'static + Send + FnOnce() -> Result<Value, ErrBox> + Unpin, { if is_sync { Ok(JsonOp::Sync(f()?)) } else { - Ok(JsonOp::Async(Box::new(futures::sync::oneshot::spawn( - tokio_util::poll_fn(move || convert_blocking_json(f)), - &tokio_executor::DefaultExecutor::current(), - )))) + //TODO(afinch7) replace this with something more efficent. + let pool = futures::executor::ThreadPool::new().unwrap(); + let handle = pool + .spawn_with_handle(futures::future::lazy(move |_cx| f())) + .unwrap(); + Ok(JsonOp::Async(handle.boxed())) } } |