summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml2
-rw-r--r--core/examples/http_bench.rs140
-rw-r--r--core/isolate.rs314
-rw-r--r--core/modules.rs198
-rw-r--r--core/ops.rs6
5 files changed, 385 insertions, 275 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b1f08d234..ee15308ba 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -15,7 +15,7 @@ path = "lib.rs"
[dependencies]
downcast-rs = "1.1.1"
-futures = "0.1.29"
+futures = { version = "0.3", features = [ "thread-pool", "compat" ] }
lazy_static = "1.4.0"
libc = "0.2.65"
log = "0.4.8"
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 8635d4f23..6a9213cbe 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -13,14 +13,20 @@ extern crate log;
extern crate lazy_static;
use deno::*;
-use futures::future::lazy;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use std::env;
+use std::future::Future;
use std::io::Error;
use std::io::ErrorKind;
use std::net::SocketAddr;
+use std::pin::Pin;
use std::sync::Mutex;
use std::sync::MutexGuard;
-use tokio::prelude::*;
+use std::task::Poll;
+use tokio::prelude::Async;
+use tokio::prelude::AsyncRead;
+use tokio::prelude::AsyncWrite;
static LOGGER: Logger = Logger;
struct Logger;
@@ -98,10 +104,10 @@ fn test_record_from() {
// TODO test From<&[u8]> for Record
}
-pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
+pub type HttpOp = dyn Future<Output = Result<i32, std::io::Error>> + Send;
pub type HttpOpHandler =
- fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>;
+ fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Pin<Box<HttpOp>>;
fn http_op(
handler: HttpOpHandler,
@@ -117,53 +123,28 @@ fn http_op(
let fut = Box::new(
op.and_then(move |result| {
record_a.result = result;
- Ok(record_a)
+ futures::future::ok(record_a)
})
- .or_else(|err| -> Result<Record, ()> {
+ .or_else(|err| {
eprintln!("unexpected err {}", err);
record_b.result = -1;
- Ok(record_b)
+ futures::future::ok(record_b)
})
- .then(|result| -> Result<Buf, ()> {
+ .then(|result: Result<Record, ()>| {
let record = result.unwrap();
- Ok(record.into())
+ futures::future::ok(record.into())
}),
);
if is_sync {
- Op::Sync(fut.wait().unwrap())
+ Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
- Op::Async(fut)
+ Op::Async(fut.boxed())
}
}
}
fn main() {
- let main_future = lazy(move || {
- // TODO currently isolate.execute() must be run inside tokio, hence the
- // lazy(). It would be nice to not have that contraint. Probably requires
- // using v8::MicrotasksPolicy::kExplicit
-
- let js_source = include_str!("http_bench.js");
-
- let startup_data = StartupData::Script(Script {
- source: js_source,
- filename: "http_bench.js",
- });
-
- let mut isolate = deno::Isolate::new(startup_data, false);
- isolate.register_op("listen", http_op(op_listen));
- isolate.register_op("accept", http_op(op_accept));
- isolate.register_op("read", http_op(op_read));
- isolate.register_op("write", http_op(op_write));
- isolate.register_op("close", http_op(op_close));
-
- isolate.then(|r| {
- js_check(r);
- Ok(())
- })
- });
-
let args: Vec<String> = env::args().collect();
// NOTE: `--help` arg will display V8 help and exit
let args = deno::v8_set_flags(args);
@@ -175,12 +156,33 @@ fn main() {
log::LevelFilter::Warn
});
+ let js_source = include_str!("http_bench.js");
+
+ let startup_data = StartupData::Script(Script {
+ source: js_source,
+ filename: "http_bench.js",
+ });
+
+ let mut isolate = deno::Isolate::new(startup_data, false);
+ isolate.register_op("listen", http_op(op_listen));
+ isolate.register_op("accept", http_op(op_accept));
+ isolate.register_op("read", http_op(op_read));
+ isolate.register_op("write", http_op(op_write));
+ isolate.register_op("close", http_op(op_close));
+
+ let main_future = isolate
+ .then(|r| {
+ js_check(r);
+ futures::future::ok(())
+ })
+ .boxed();
+
if args.iter().any(|a| a == "--multi-thread") {
println!("multi-thread");
- tokio::run(main_future);
+ tokio::run(main_future.compat());
} else {
println!("single-thread");
- tokio::runtime::current_thread::run(main_future);
+ tokio::runtime::current_thread::run(main_future.compat());
}
}
@@ -205,37 +207,47 @@ fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
RESOURCE_TABLE.lock().unwrap()
}
-fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_accept(
+ record: Record,
+ _zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("accept {}", rid);
- let fut = futures::future::poll_fn(move || {
+ let fut = futures::future::poll_fn(move |_cx| {
let mut table = lock_resource_table();
let listener =
table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?;
- listener.0.poll_accept()
+ match listener.0.poll_accept() {
+ Err(e) => Poll::Ready(Err(e)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
})
.and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
let mut table = lock_resource_table();
let rid = table.add("tcpStream", Box::new(TcpStream(stream)));
- Ok(rid as i32)
+ futures::future::ok(rid as i32)
});
- Box::new(fut)
+ fut.boxed()
}
fn op_listen(
_record: Record,
_zero_copy_buf: Option<PinnedBuf>,
-) -> Box<HttpOp> {
+) -> Pin<Box<HttpOp>> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
let mut table = lock_resource_table();
let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
- Box::new(futures::future::ok(rid as i32))
+ futures::future::ok(rid as i32).boxed()
}
-fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_close(
+ record: Record,
+ _zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
debug!("close");
let rid = record.arg as u32;
let mut table = lock_resource_table();
@@ -243,39 +255,53 @@ fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
Some(_) => futures::future::ok(0),
None => futures::future::err(bad_resource()),
};
- Box::new(fut)
+ fut.boxed()
}
-fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_read(
+ record: Record,
+ zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
- let fut = futures::future::poll_fn(move || {
+ let fut = futures::future::poll_fn(move |_cx| {
let mut table = lock_resource_table();
let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- stream.0.poll_read(&mut zero_copy_buf)
+ match stream.0.poll_read(&mut zero_copy_buf) {
+ Err(e) => Poll::Ready(Err(e)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
})
.and_then(move |nread| {
debug!("read success {}", nread);
- Ok(nread as i32)
+ futures::future::ok(nread as i32)
});
- Box::new(fut)
+ fut.boxed()
}
-fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_write(
+ record: Record,
+ zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
- let fut = futures::future::poll_fn(move || {
+ let fut = futures::future::poll_fn(move |_cx| {
let mut table = lock_resource_table();
let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- stream.0.poll_write(&zero_copy_buf)
+ match stream.0.poll_write(&zero_copy_buf) {
+ Err(e) => Poll::Ready(Err(e)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
})
.and_then(move |nwritten| {
debug!("write success {}", nwritten);
- Ok(nwritten as i32)
+ futures::future::ok(nwritten as i32)
});
- Box::new(fut)
+ fut.boxed()
}
fn js_check(r: Result<(), ErrBox>) {
diff --git a/core/isolate.rs b/core/isolate.rs
index 8b15befa3..079ab5dcf 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -19,20 +19,27 @@ use crate::libdeno::Snapshot2;
use crate::ops::*;
use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use futures::stream::FuturesUnordered;
+use futures::stream::IntoStream;
use futures::stream::Stream;
+use futures::stream::StreamExt;
use futures::stream::StreamFuture;
-use futures::task;
-use futures::Async::*;
-use futures::Future;
-use futures::Poll;
+use futures::stream::TryStream;
+use futures::stream::TryStreamExt;
+use futures::task::AtomicWaker;
use libc::c_char;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
use std::ptr::null;
use std::sync::{Arc, Mutex, Once};
+use std::task::Context;
+use std::task::Poll;
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
@@ -59,7 +66,7 @@ pub enum RecursiveLoadEvent {
Instantiate(deno_mod),
}
-pub trait ImportStream: Stream {
+pub trait ImportStream: TryStream {
fn register(
&mut self,
source_code_info: SourceCodeInfo,
@@ -67,8 +74,14 @@ pub trait ImportStream: Stream {
) -> Result<(), ErrBox>;
}
-type DynImportStream =
- Box<dyn ImportStream<Item = RecursiveLoadEvent, Error = ErrBox> + Send>;
+type DynImportStream = Box<
+ dyn ImportStream<
+ Ok = RecursiveLoadEvent,
+ Error = ErrBox,
+ Item = Result<RecursiveLoadEvent, ErrBox>,
+ > + Send
+ + Unpin,
+>;
type DynImportFn = dyn Fn(deno_dyn_import_id, &str, &str) -> DynImportStream;
@@ -87,15 +100,23 @@ impl fmt::Debug for DynImportStream {
}
impl Stream for DynImport {
- type Item = (deno_dyn_import_id, RecursiveLoadEvent);
- type Error = (deno_dyn_import_id, ErrBox);
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- match self.inner.poll() {
- Ok(Ready(Some(event))) => Ok(Ready(Some((self.id, event)))),
- Ok(Ready(None)) => unreachable!(),
- Err(e) => Err((self.id, e)),
- Ok(NotReady) => Ok(NotReady),
+ type Item = Result<
+ (deno_dyn_import_id, RecursiveLoadEvent),
+ (deno_dyn_import_id, ErrBox),
+ >;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let self_inner = self.get_mut();
+ match self_inner.inner.try_poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(event))) => {
+ Poll::Ready(Some(Ok((self_inner.id, event))))
+ }
+ Poll::Ready(None) => unreachable!(),
+ Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err((self_inner.id, e)))),
+ Poll::Pending => Poll::Pending,
}
}
}
@@ -154,11 +175,12 @@ pub struct Isolate {
needs_init: bool,
shared: SharedQueue,
pending_ops: FuturesUnordered<PendingOpFuture>,
- pending_dyn_imports: FuturesUnordered<StreamFuture<DynImport>>,
+ pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
op_registry: OpRegistry,
eager_poll_count: u32,
+ waker: AtomicWaker,
}
unsafe impl Send for Isolate {}
@@ -225,6 +247,7 @@ impl Isolate {
startup_script,
op_registry: OpRegistry::new(),
eager_poll_count: 0,
+ waker: AtomicWaker::new(),
}
}
@@ -296,8 +319,10 @@ impl Isolate {
if let Some(ref f) = isolate.dyn_import {
let inner = f(id, specifier, referrer);
let stream = DynImport { inner, id };
- task::current().notify();
- isolate.pending_dyn_imports.push(stream.into_future());
+ isolate.waker.wake();
+ isolate
+ .pending_dyn_imports
+ .push(stream.into_stream().into_future());
} else {
panic!("dyn_import callback not set")
}
@@ -334,10 +359,11 @@ impl Isolate {
// which case they can be turned into a sync op before we return to V8. This
// can save a boundary crossing.
#[allow(clippy::match_wild_err_arm)]
- match fut.poll() {
- Err(_) => panic!("unexpected op error"),
- Ok(Ready(buf)) => Op::Sync(buf),
- Ok(NotReady) => Op::Async(fut),
+ let mut cx = Context::from_waker(futures::task::noop_waker_ref());
+ match fut.poll_unpin(&mut cx) {
+ Poll::Ready(Err(_)) => panic!("unexpected op error"),
+ Poll::Ready(Ok(buf)) => Op::Sync(buf),
+ Poll::Pending => Op::Async(fut),
}
}
Op::Sync(buf) => Op::Sync(buf),
@@ -359,8 +385,8 @@ impl Isolate {
.expect("unexpected error");
}
Op::Async(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
- isolate.pending_ops.push(Box::new(fut2));
+ let fut2 = fut.map_ok(move |buf| (op_id, buf));
+ isolate.pending_ops.push(fut2.boxed());
isolate.have_unpolled_ops = true;
}
}
@@ -522,42 +548,45 @@ impl Isolate {
self.check_last_exception()
}
- fn poll_dyn_imports(&mut self) -> Poll<(), ErrBox> {
+ fn poll_dyn_imports(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>> {
use RecursiveLoadEvent::*;
loop {
- match self.pending_dyn_imports.poll() {
- Ok(NotReady) | Ok(Ready(None)) => {
+ match self.pending_dyn_imports.poll_next_unpin(cx) {
+ Poll::Pending | Poll::Ready(None) => {
// There are no active dynamic import loaders, or none are ready.
- return Ok(futures::Async::Ready(()));
+ return Poll::Ready(Ok(()));
}
- Ok(Ready(Some((
- Some((dyn_import_id, Fetch(source_code_info))),
+ Poll::Ready(Some((
+ Some(Ok((dyn_import_id, Fetch(source_code_info)))),
mut stream,
- )))) => {
+ ))) => {
// A module (not necessarily the one dynamically imported) has been
// fetched. Create and register it, and if successful, poll for the
// next recursive-load event related to this dynamic import.
- match stream.register(source_code_info, self) {
+ match stream.get_mut().register(source_code_info, self) {
Ok(()) => self.pending_dyn_imports.push(stream.into_future()),
Err(err) => {
self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))?
}
}
}
- Ok(Ready(Some((Some((dyn_import_id, Instantiate(module_id))), _)))) => {
+ Poll::Ready(Some((
+ Some(Ok((dyn_import_id, Instantiate(module_id)))),
+ _,
+ ))) => {
// The top-level module from a dynamic import has been instantiated.
match self.mod_evaluate(module_id) {
Ok(()) => self.dyn_import_done(dyn_import_id, Ok(module_id))?,
Err(..) => self.dyn_import_done(dyn_import_id, Err(None))?,
}
}
- Err(((dyn_import_id, err), _)) => {
+ Poll::Ready(Some((Some(Err((dyn_import_id, err))), _))) => {
// A non-javascript error occurred; this could be due to a an invalid
// module specifier, or a problem with the source map, or a failure
// to fetch the module source code.
self.dyn_import_done(dyn_import_id, Err(Some(err.to_string())))?
}
- Ok(Ready(Some((None, _)))) => unreachable!(),
+ Poll::Ready(Some((None, _))) => unreachable!(),
}
}
}
@@ -654,30 +683,34 @@ impl Drop for LockerScope {
}
impl Future for Isolate {
- type Item = ();
- type Error = ErrBox;
+ type Output = Result<(), ErrBox>;
- fn poll(&mut self) -> Poll<(), ErrBox> {
- self.shared_init();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+
+ inner.waker.register(cx.waker());
+
+ inner.shared_init();
let mut overflow_response: Option<(OpId, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
- if !self.pending_dyn_imports.is_empty() {
- self.poll_dyn_imports()?;
+ if !inner.pending_dyn_imports.is_empty() {
+ let poll_imports = inner.poll_dyn_imports(cx)?;
+ assert!(poll_imports.is_ready());
}
// Now handle actual ops.
- self.have_unpolled_ops = false;
- self.eager_poll_count = 0;
+ inner.have_unpolled_ops = false;
+ inner.eager_poll_count = 0;
#[allow(clippy::match_wild_err_arm)]
- match self.pending_ops.poll() {
- Err(_) => panic!("unexpected op error"),
- Ok(Ready(None)) => break,
- Ok(NotReady) => break,
- Ok(Ready(Some((op_id, buf)))) => {
- let successful_push = self.shared.push(op_id, &buf);
+ match inner.pending_ops.poll_next_unpin(cx) {
+ Poll::Ready(Some(Err(_))) => panic!("unexpected op error"),
+ Poll::Ready(None) => break,
+ Poll::Pending => break,
+ Poll::Ready(Some(Ok((op_id, buf)))) => {
+ let successful_push = inner.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
@@ -689,34 +722,34 @@ impl Future for Isolate {
}
}
- if self.shared.size() > 0 {
+ if inner.shared.size() > 0 {
// Lock the current thread for V8.
- let locker = LockerScope::new(self.libdeno_isolate);
- self.respond(None)?;
+ let locker = LockerScope::new(inner.libdeno_isolate);
+ inner.respond(None)?;
// The other side should have shifted off all the messages.
- assert_eq!(self.shared.size(), 0);
+ assert_eq!(inner.shared.size(), 0);
drop(locker);
}
if overflow_response.is_some() {
// Lock the current thread for V8.
- let locker = LockerScope::new(self.libdeno_isolate);
+ let locker = LockerScope::new(inner.libdeno_isolate);
let (op_id, buf) = overflow_response.take().unwrap();
- self.respond(Some((op_id, &buf)))?;
+ inner.respond(Some((op_id, &buf)))?;
drop(locker);
}
- self.check_promise_errors();
- self.check_last_exception()?;
+ inner.check_promise_errors();
+ inner.check_last_exception()?;
// We're idle if pending_ops is empty.
- if self.pending_ops.is_empty() && self.pending_dyn_imports.is_empty() {
- Ok(futures::Async::Ready(()))
+ if inner.pending_ops.is_empty() && inner.pending_dyn_imports.is_empty() {
+ Poll::Ready(Ok(()))
} else {
- if self.have_unpolled_ops {
- task::current().notify();
+ if inner.have_unpolled_ops {
+ inner.waker.wake();
}
- Ok(futures::Async::NotReady)
+ Poll::Pending
}
}
}
@@ -752,33 +785,29 @@ pub fn js_check<T>(r: Result<T, ErrBox>) -> T {
#[cfg(test)]
pub mod tests {
use super::*;
- use futures::executor::spawn;
+ use futures::executor::ThreadPool;
use futures::future::lazy;
- use futures::future::ok;
- use futures::Async;
use std::io;
use std::ops::FnOnce;
use std::sync::atomic::{AtomicUsize, Ordering};
- pub fn run_in_task<F, R>(f: F) -> R
+ pub fn run_in_task<F>(f: F)
where
- F: FnOnce() -> R,
+ F: FnOnce(&mut Context) + Send + 'static,
{
- spawn(lazy(move || ok::<R, ()>(f()))).wait_future().unwrap()
+ let poll = ThreadPool::new().unwrap();
+ poll.spawn_ok(lazy(move |cx| f(cx)));
}
- fn poll_until_ready<F>(
- future: &mut F,
- max_poll_count: usize,
- ) -> Result<F::Item, F::Error>
+ fn poll_until_ready<F>(future: &mut F, max_poll_count: usize) -> F::Output
where
- F: Future,
+ F: Future + Unpin,
{
+ let mut cx = Context::from_waker(futures::task::noop_waker_ref());
for _ in 0..max_poll_count {
- match future.poll() {
- Ok(NotReady) => continue,
- Ok(Ready(val)) => return Ok(val),
- Err(err) => return Err(err),
+ match future.poll_unpin(&mut cx) {
+ Poll::Pending => continue,
+ Poll::Ready(val) => return val,
}
}
panic!(
@@ -799,16 +828,16 @@ pub mod tests {
}
impl Future for DelayedFuture {
- type Item = Box<[u8]>;
- type Error = ();
+ type Output = Result<Box<[u8]>, ()>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.counter > 0 {
- return Ok(Async::Ready(self.buf.clone()));
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.counter > 0 {
+ return Poll::Ready(Ok(inner.buf.clone()));
}
- self.counter += 1;
- Ok(Async::NotReady)
+ inner.counter += 1;
+ Poll::Pending
}
}
@@ -835,13 +864,13 @@ pub mod tests {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(futures::future::ok(buf)))
+ Op::Async(futures::future::ok(buf).boxed())
}
Mode::AsyncDelayed => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(DelayedFuture::new(buf)))
+ Op::Async(DelayedFuture::new(buf).boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
@@ -860,7 +889,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(DelayedFuture::new(buf)))
+ Op::Async(DelayedFuture::new(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
@@ -869,7 +898,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
- Op::Async(Box::new(DelayedFuture::new(buf)))
+ Op::Async(DelayedFuture::new(buf).boxed())
}
}
};
@@ -957,7 +986,7 @@ pub mod tests {
#[test]
fn test_poll_async_immediate_ops() {
- run_in_task(|| {
+ run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncImmediate);
js_check(isolate.execute(
@@ -992,16 +1021,22 @@ pub mod tests {
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
js_check(isolate.execute("check3.js", "assert(nrecv == 0)"));
// We are idle, so the next poll should be the last.
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
});
}
#[test]
fn test_poll_async_delayed_ops() {
- run_in_task(|| {
+ run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::AsyncDelayed);
js_check(isolate.execute(
@@ -1024,7 +1059,10 @@ pub mod tests {
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
js_check(isolate.execute(
"check2.js",
@@ -1035,26 +1073,36 @@ pub mod tests {
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
js_check(isolate.execute("check3.js", "assert(nrecv == 2)"));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
// We are idle, so the next poll should be the last.
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
});
}
struct MockImportStream(Vec<Result<RecursiveLoadEvent, ErrBox>>);
impl Stream for MockImportStream {
- type Item = RecursiveLoadEvent;
- type Error = ErrBox;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- let event = if self.0.is_empty() {
+ type Item = Result<RecursiveLoadEvent, ErrBox>;
+
+ fn poll_next(
+ self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let inner = self.get_mut();
+ let event = if inner.0.is_empty() {
None
} else {
- Some(self.0.remove(0)?)
+ Some(inner.0.remove(0))
};
- Ok(Ready(event))
+ Poll::Ready(event)
}
}
@@ -1080,7 +1128,7 @@ pub mod tests {
#[test]
fn dyn_import_err() {
// Test an erroneous dynamic import where the specified module isn't found.
- run_in_task(|| {
+ run_in_task(|cx| {
let count = Arc::new(AtomicUsize::new(0));
let count_ = count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
@@ -1103,8 +1151,10 @@ pub mod tests {
assert_eq!(count.load(Ordering::Relaxed), 1);
// We should get an error here.
- let result = isolate.poll();
- assert!(result.is_err());
+ let result = isolate.poll_unpin(cx);
+ if let Poll::Ready(Ok(_)) = result {
+ unreachable!();
+ }
})
}
@@ -1113,7 +1163,7 @@ pub mod tests {
use std::convert::TryInto;
// Import multiple modules to demonstrate that after failed dynamic import
// another dynamic import can still be run
- run_in_task(|| {
+ run_in_task(|cx| {
let count = Arc::new(AtomicUsize::new(0));
let count_ = count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
@@ -1156,15 +1206,24 @@ pub mod tests {
assert_eq!(count.load(Ordering::Relaxed), 3);
// Now each poll should return error
- assert!(isolate.poll().is_err());
- assert!(isolate.poll().is_err());
- assert!(isolate.poll().is_err());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => true,
+ _ => false,
+ });
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => true,
+ _ => false,
+ });
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => true,
+ _ => false,
+ });
})
}
#[test]
fn dyn_import_ok() {
- run_in_task(|| {
+ run_in_task(|cx| {
let count = Arc::new(AtomicUsize::new(0));
let count_ = count.clone();
@@ -1224,9 +1283,15 @@ pub mod tests {
));
assert_eq!(count.load(Ordering::Relaxed), 1);
- assert_eq!(Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
assert_eq!(count.load(Ordering::Relaxed), 2);
- assert_eq!(Ready(()), isolate.poll().unwrap());
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
assert_eq!(count.load(Ordering::Relaxed), 2);
})
}
@@ -1247,7 +1312,7 @@ pub mod tests {
shared.terminate_execution();
// allow shutdown
- std::thread::sleep(std::time::Duration::from_millis(100));
+ std::thread::sleep(std::time::Duration::from_millis(200));
// unless reported otherwise the test should fail after this point
tx_clone.send(false).ok();
@@ -1345,7 +1410,7 @@ pub mod tests {
#[test]
fn overflow_req_async() {
- run_in_task(|| {
+ run_in_task(|cx| {
let (mut isolate, dispatch_count) = setup(Mode::OverflowReqAsync);
js_check(isolate.execute(
"overflow_req_async.js",
@@ -1366,14 +1431,17 @@ pub mod tests {
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert_eq!(Async::Ready(()), js_check(isolate.poll()));
+ assert!(match isolate.poll_unpin(cx) {
+ Poll::Ready(Ok(_)) => true,
+ _ => false,
+ });
js_check(isolate.execute("check.js", "assert(asyncRecv == 1);"));
});
}
#[test]
fn overflow_res_async() {
- run_in_task(|| {
+ run_in_task(|_cx| {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync);
@@ -1404,7 +1472,7 @@ pub mod tests {
fn overflow_res_multiple_dispatch_async() {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
// should optimize this.
- run_in_task(|| {
+ run_in_task(|_cx| {
let (mut isolate, dispatch_count) = setup(Mode::OverflowResAsync);
js_check(isolate.execute(
"overflow_res_multiple_dispatch_async.js",
@@ -1434,7 +1502,7 @@ pub mod tests {
#[test]
fn test_pre_dispatch() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let (mut isolate, _dispatch_count) = setup(Mode::OverflowResAsync);
js_check(isolate.execute(
"bad_op_id.js",
@@ -1448,13 +1516,15 @@ pub mod tests {
assert(thrown == "Unknown op id: 100");
"#,
));
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
+ unreachable!();
+ }
});
}
#[test]
fn test_js() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let (mut isolate, _dispatch_count) = setup(Mode::AsyncImmediate);
js_check(
isolate.execute(
@@ -1462,7 +1532,9 @@ pub mod tests {
include_str!("shared_queue_test.js"),
),
);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ if let Poll::Ready(Err(_)) = isolate.poll_unpin(&mut cx) {
+ unreachable!();
+ }
});
}
diff --git a/core/modules.rs b/core/modules.rs
index 85de79cca..9f3434a4f 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -14,21 +14,22 @@ use crate::isolate::SourceCodeInfo;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::module_specifier::ModuleSpecifier;
-use futures::future::loop_fn;
-use futures::future::Loop;
+use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::Stream;
-use futures::Async::*;
-use futures::Future;
-use futures::Poll;
+use futures::stream::TryStreamExt;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
+use std::task::Context;
+use std::task::Poll;
pub type SourceCodeInfoFuture =
- dyn Future<Item = SourceCodeInfo, Error = ErrBox> + Send;
+ dyn Future<Output = Result<SourceCodeInfo, ErrBox>> + Send;
pub trait Loader: Send + Sync {
/// Returns an absolute URL.
@@ -47,7 +48,7 @@ pub trait Loader: Send + Sync {
fn load(
&self,
module_specifier: &ModuleSpecifier,
- ) -> Box<SourceCodeInfoFuture>;
+ ) -> Pin<Box<SourceCodeInfoFuture>>;
}
#[derive(Debug, Eq, PartialEq)]
@@ -68,16 +69,16 @@ enum State {
/// This future is used to implement parallel async module loading without
/// complicating the Isolate API.
/// TODO: RecursiveLoad desperately needs to be merged with Modules.
-pub struct RecursiveLoad<L: Loader> {
+pub struct RecursiveLoad<L: Loader + Unpin> {
kind: Kind,
state: State,
loader: L,
modules: Arc<Mutex<Modules>>,
- pending: FuturesUnordered<Box<SourceCodeInfoFuture>>,
+ pending: FuturesUnordered<Pin<Box<SourceCodeInfoFuture>>>,
is_pending: HashSet<ModuleSpecifier>,
}
-impl<L: Loader> RecursiveLoad<L> {
+impl<L: Loader + Unpin> RecursiveLoad<L> {
/// Starts a new parallel load of the given URL of the main module.
pub fn main(
specifier: &str,
@@ -153,7 +154,7 @@ impl<L: Loader> RecursiveLoad<L> {
// integrated into one thing.
self
.pending
- .push(Box::new(self.loader.load(&module_specifier)));
+ .push(self.loader.load(&module_specifier).boxed());
self.state = State::LoadingRoot;
Ok(())
@@ -182,7 +183,7 @@ impl<L: Loader> RecursiveLoad<L> {
{
self
.pending
- .push(Box::new(self.loader.load(&module_specifier)));
+ .push(self.loader.load(&module_specifier).boxed());
self.is_pending.insert(module_specifier);
}
@@ -194,26 +195,24 @@ impl<L: Loader> RecursiveLoad<L> {
pub fn get_future(
self,
isolate: Arc<Mutex<Isolate>>,
- ) -> impl Future<Item = deno_mod, Error = ErrBox> {
- loop_fn(self, move |load| {
- let isolate = isolate.clone();
- load.into_future().map_err(|(e, _)| e).and_then(
- move |(event, mut load)| {
- Ok(match event.unwrap() {
- Event::Fetch(info) => {
- let mut isolate = isolate.lock().unwrap();
- load.register(info, &mut isolate)?;
- Loop::Continue(load)
- }
- Event::Instantiate(id) => Loop::Break(id),
- })
- },
- )
- })
+ ) -> impl Future<Output = Result<deno_mod, ErrBox>> {
+ async move {
+ let mut load = self;
+ loop {
+ let event = load.try_next().await?;
+ match event.unwrap() {
+ Event::Fetch(info) => {
+ let mut isolate = isolate.lock().unwrap();
+ load.register(info, &mut isolate)?;
+ }
+ Event::Instantiate(id) => return Ok(id),
+ }
+ }
+ }
}
}
-impl<L: Loader> ImportStream for RecursiveLoad<L> {
+impl<L: Loader + Unpin> ImportStream for RecursiveLoad<L> {
// TODO: this should not be part of RecursiveLoad.
fn register(
&mut self,
@@ -308,40 +307,45 @@ impl<L: Loader> ImportStream for RecursiveLoad<L> {
}
}
-impl<L: Loader> Stream for RecursiveLoad<L> {
- type Item = Event;
- type Error = ErrBox;
+impl<L: Loader + Unpin> Stream for RecursiveLoad<L> {
+ type Item = Result<Event, ErrBox>;
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- Ok(match self.state {
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let inner = self.get_mut();
+ match inner.state {
State::ResolveMain(ref specifier, Some(ref code)) => {
- let module_specifier = self.loader.resolve(
+ let module_specifier = inner.loader.resolve(
specifier,
".",
true,
- self.dyn_import_id().is_some(),
+ inner.dyn_import_id().is_some(),
)?;
let info = SourceCodeInfo {
code: code.to_owned(),
module_url_specified: module_specifier.to_string(),
module_url_found: module_specifier.to_string(),
};
- self.state = State::LoadingRoot;
- Ready(Some(Event::Fetch(info)))
+ inner.state = State::LoadingRoot;
+ Poll::Ready(Some(Ok(Event::Fetch(info))))
}
State::ResolveMain(..) | State::ResolveImport(..) => {
- self.add_root()?;
- self.poll()?
+ if let Err(e) = inner.add_root() {
+ return Poll::Ready(Some(Err(e)));
+ }
+ inner.try_poll_next_unpin(cx)
}
State::LoadingRoot | State::LoadingImports(..) => {
- match self.pending.poll()? {
- Ready(None) => unreachable!(),
- Ready(Some(info)) => Ready(Some(Event::Fetch(info))),
- NotReady => NotReady,
+ match inner.pending.try_poll_next_unpin(cx)? {
+ Poll::Ready(None) => unreachable!(),
+ Poll::Ready(Some(info)) => Poll::Ready(Some(Ok(Event::Fetch(info)))),
+ Poll::Pending => Poll::Pending,
}
}
- State::Instantiated(id) => Ready(Some(Event::Instantiate(id))),
- })
+ State::Instantiated(id) => Poll::Ready(Some(Ok(Event::Instantiate(id)))),
+ }
}
}
@@ -603,9 +607,11 @@ mod tests {
use super::*;
use crate::isolate::js_check;
use crate::isolate::tests::*;
- use futures::Async;
+ use futures::future::FutureExt;
+ use futures::stream::StreamExt;
use std::error::Error;
use std::fmt;
+ use std::future::Future;
struct MockLoader {
pub loads: Arc<Mutex<Vec<String>>>,
@@ -676,27 +682,27 @@ mod tests {
}
impl Future for DelayedSourceCodeFuture {
- type Item = SourceCodeInfo;
- type Error = ErrBox;
+ type Output = Result<SourceCodeInfo, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, ErrBox> {
- self.counter += 1;
- if self.url == "file:///never_ready.js" {
- return Ok(Async::NotReady);
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ inner.counter += 1;
+ if inner.url == "file:///never_ready.js" {
+ return Poll::Pending;
}
- if self.url == "file:///slow.js" && self.counter < 2 {
+ if inner.url == "file:///slow.js" && inner.counter < 2 {
// TODO(ry) Hopefully in the future we can remove current task
// notification. See comment above run_in_task.
- futures::task::current().notify();
- return Ok(Async::NotReady);
+ cx.waker().wake_by_ref();
+ return Poll::Pending;
}
- match mock_source_code(&self.url) {
- Some(src) => Ok(Async::Ready(SourceCodeInfo {
+ match mock_source_code(&inner.url) {
+ Some(src) => Poll::Ready(Ok(SourceCodeInfo {
code: src.0.to_owned(),
- module_url_specified: self.url.clone(),
+ module_url_specified: inner.url.clone(),
module_url_found: src.1.to_owned(),
})),
- None => Err(MockError::LoadErr.into()),
+ None => Poll::Ready(Err(MockError::LoadErr.into())),
}
}
}
@@ -733,11 +739,11 @@ mod tests {
fn load(
&self,
module_specifier: &ModuleSpecifier,
- ) -> Box<SourceCodeInfoFuture> {
+ ) -> Pin<Box<SourceCodeInfoFuture>> {
let mut loads = self.loads.lock().unwrap();
loads.push(module_specifier.to_string());
let url = module_specifier.to_string();
- Box::new(DelayedSourceCodeFuture { url, counter: 0 })
+ DelayedSourceCodeFuture { url, counter: 0 }.boxed()
}
}
@@ -780,7 +786,7 @@ mod tests {
#[test]
fn test_recursive_load() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let modules = loader.modules.clone();
let modules_ = modules.clone();
@@ -791,12 +797,12 @@ mod tests {
RecursiveLoad::main("/a.js", None, loader, modules);
let a_id = loop {
- match recursive_load.poll() {
- Ok(Ready(Some(Event::Fetch(info)))) => {
+ match recursive_load.try_poll_next_unpin(&mut cx) {
+ Poll::Ready(Some(Ok(Event::Fetch(info)))) => {
let mut isolate = isolate.lock().unwrap();
recursive_load.register(info, &mut isolate).unwrap();
}
- Ok(Ready(Some(Event::Instantiate(id)))) => break id,
+ Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id,
_ => panic!("unexpected result"),
};
};
@@ -859,7 +865,7 @@ mod tests {
#[test]
fn test_circular_load() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let isolate_ = isolate.clone();
@@ -868,9 +874,10 @@ mod tests {
let loads = loader.loads.clone();
let recursive_load =
RecursiveLoad::main("/circular1.js", None, loader, modules);
- let result = recursive_load.get_future(isolate.clone()).poll();
- assert!(result.is_ok());
- if let Async::Ready(circular1_id) = result.ok().unwrap() {
+ let mut load_fut = recursive_load.get_future(isolate.clone()).boxed();
+ let result = Pin::new(&mut load_fut).poll(&mut cx);
+ assert!(result.is_ready());
+ if let Poll::Ready(Ok(circular1_id)) = result {
let mut isolate = isolate_.lock().unwrap();
js_check(isolate.mod_evaluate(circular1_id));
@@ -930,7 +937,7 @@ mod tests {
#[test]
fn test_redirect_load() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let isolate_ = isolate.clone();
@@ -939,10 +946,11 @@ mod tests {
let loads = loader.loads.clone();
let recursive_load =
RecursiveLoad::main("/redirect1.js", None, loader, modules);
- let result = recursive_load.get_future(isolate.clone()).poll();
+ let mut load_fut = recursive_load.get_future(isolate.clone()).boxed();
+ let result = Pin::new(&mut load_fut).poll(&mut cx);
println!(">> result {:?}", result);
- assert!(result.is_ok());
- if let Async::Ready(redirect1_id) = result.ok().unwrap() {
+ assert!(result.is_ready());
+ if let Poll::Ready(Ok(redirect1_id)) = result {
let mut isolate = isolate_.lock().unwrap();
js_check(isolate.mod_evaluate(redirect1_id));
let l = loads.lock().unwrap();
@@ -995,18 +1003,18 @@ mod tests {
#[test]
fn slow_never_ready_modules() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let modules = loader.modules.clone();
let loads = loader.loads.clone();
let mut recursive_load =
RecursiveLoad::main("/main.js", None, loader, modules)
- .get_future(isolate);
+ .get_future(isolate)
+ .boxed();
- let result = recursive_load.poll();
- assert!(result.is_ok());
- assert!(result.ok().unwrap().is_not_ready());
+ let result = recursive_load.poll_unpin(&mut cx);
+ assert!(result.is_pending());
// TODO(ry) Arguably the first time we poll only the following modules
// should be loaded:
@@ -1018,9 +1026,8 @@ mod tests {
// run_in_task.
for _ in 0..10 {
- let result = recursive_load.poll();
- assert!(result.is_ok());
- assert!(result.ok().unwrap().is_not_ready());
+ let result = recursive_load.poll_unpin(&mut cx);
+ assert!(result.is_pending());
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@@ -1045,19 +1052,22 @@ mod tests {
#[test]
fn loader_disappears_after_error() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let isolate = loader.isolate.clone();
let modules = loader.modules.clone();
let recursive_load =
RecursiveLoad::main("/bad_import.js", None, loader, modules);
- let result = recursive_load.get_future(isolate).poll();
- assert!(result.is_err());
- let err = result.err().unwrap();
- assert_eq!(
- err.downcast_ref::<MockError>().unwrap(),
- &MockError::ResolveErr
- );
+ let mut load_fut = recursive_load.get_future(isolate).boxed();
+ let result = load_fut.poll_unpin(&mut cx);
+ if let Poll::Ready(Err(err)) = result {
+ assert_eq!(
+ err.downcast_ref::<MockError>().unwrap(),
+ &MockError::ResolveErr
+ );
+ } else {
+ unreachable!();
+ }
})
}
@@ -1072,7 +1082,7 @@ mod tests {
#[test]
fn recursive_load_main_with_code() {
- run_in_task(|| {
+ run_in_task(|mut cx| {
let loader = MockLoader::new();
let modules = loader.modules.clone();
let modules_ = modules.clone();
@@ -1090,12 +1100,12 @@ mod tests {
);
let main_id = loop {
- match recursive_load.poll() {
- Ok(Ready(Some(Event::Fetch(info)))) => {
+ match recursive_load.poll_next_unpin(&mut cx) {
+ Poll::Ready(Some(Ok(Event::Fetch(info)))) => {
let mut isolate = isolate.lock().unwrap();
recursive_load.register(info, &mut isolate).unwrap();
}
- Ok(Ready(Some(Event::Instantiate(id)))) => break id,
+ Poll::Ready(Some(Ok(Event::Instantiate(id)))) => break id,
_ => panic!("unexpected result"),
};
};
diff --git a/core/ops.rs b/core/ops.rs
index cce454348..3a4f51b83 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -3,13 +3,15 @@ pub use crate::libdeno::OpId;
use crate::PinnedBuf;
use futures::Future;
use std::collections::HashMap;
+use std::pin::Pin;
pub type Buf = Box<[u8]>;
-pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
+pub type OpAsyncFuture<E> =
+ Pin<Box<dyn Future<Output = Result<Buf, E>> + Send>>;
pub(crate) type PendingOpFuture =
- Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
+ Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>> + Send>>;
pub type OpResult<E> = Result<Op<E>, E>;