summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/compiler.rs19
-rw-r--r--cli/ops/dispatch_json.rs47
-rw-r--r--cli/ops/dispatch_minimal.rs40
-rw-r--r--cli/ops/fetch.rs46
-rw-r--r--cli/ops/files.rs52
-rw-r--r--cli/ops/io.rs197
-rw-r--r--cli/ops/net.rs136
-rw-r--r--cli/ops/process.rs54
-rw-r--r--cli/ops/timers.rs4
-rw-r--r--cli/ops/tls.rs201
-rw-r--r--cli/ops/workers.rs87
11 files changed, 531 insertions, 352 deletions
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs
index a722db6af..fdb62ca32 100644
--- a/cli/ops/compiler.rs
+++ b/cli/ops/compiler.rs
@@ -1,7 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use crate::futures::future::join_all;
-use crate::futures::Future;
+use crate::futures::future::try_join_all;
+use crate::futures::future::FutureExt;
+use crate::futures::future::TryFutureExt;
use crate::msg;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
@@ -77,7 +78,7 @@ fn op_fetch_source_files(
let global_state = state.global_state.clone();
- let future = join_all(futures)
+ let future = try_join_all(futures)
.map_err(ErrBox::from)
.and_then(move |files| {
// We want to get an array of futures that resolves to
@@ -88,17 +89,19 @@ fn op_fetch_source_files(
// compile them into JS first!
// This allows TS to do correct export types.
if file.media_type == msg::MediaType::Wasm {
- return futures::future::Either::A(
+ return futures::future::Either::Left(
global_state
.wasm_compiler
.compile_async(global_state.clone(), &file)
- .and_then(|compiled_mod| Ok((file, Some(compiled_mod.code)))),
+ .and_then(|compiled_mod| {
+ futures::future::ok((file, Some(compiled_mod.code)))
+ }),
);
}
- futures::future::Either::B(futures::future::ok((file, None)))
+ futures::future::Either::Right(futures::future::ok((file, None)))
})
.collect();
- join_all(v)
+ try_join_all(v)
})
.and_then(move |files_with_code| {
let res = files_with_code
@@ -120,7 +123,7 @@ fn op_fetch_source_files(
futures::future::ok(res)
});
- Ok(JsonOp::Async(Box::new(future)))
+ Ok(JsonOp::Async(future.boxed()))
}
#[derive(Deserialize)]
diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs
index 5f1caac6f..38dc7932e 100644
--- a/cli/ops/dispatch_json.rs
+++ b/cli/ops/dispatch_json.rs
@@ -1,13 +1,15 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::tokio_util;
use deno::*;
-use futures::Future;
-use futures::Poll;
+use futures::future::FutureExt;
+use futures::task::SpawnExt;
pub use serde_derive::Deserialize;
use serde_json::json;
pub use serde_json::Value;
+use std::future::Future;
+use std::pin::Pin;
-pub type AsyncJsonOp = Box<dyn Future<Item = Value, Error = ErrBox> + Send>;
+pub type AsyncJsonOp =
+ Pin<Box<dyn Future<Output = Result<Value, ErrBox>> + Send>>;
pub enum JsonOp {
Sync(Value),
@@ -70,48 +72,35 @@ where
}
Ok(JsonOp::Async(fut)) => {
assert!(promise_id.is_some());
- let fut2 = Box::new(fut.then(move |result| -> Result<Buf, ()> {
- Ok(serialize_result(promise_id, result))
- }));
- CoreOp::Async(fut2)
+ let fut2 = fut.then(move |result| {
+ futures::future::ok(serialize_result(promise_id, result))
+ });
+ CoreOp::Async(fut2.boxed())
}
Err(sync_err) => {
let buf = serialize_result(promise_id, Err(sync_err));
if is_sync {
CoreOp::Sync(buf)
} else {
- CoreOp::Async(Box::new(futures::future::ok(buf)))
+ CoreOp::Async(futures::future::ok(buf).boxed())
}
}
}
}
}
-// This is just type conversion. Implement From trait?
-// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85
-fn convert_blocking_json<F>(f: F) -> Poll<Value, ErrBox>
-where
- F: FnOnce() -> Result<Value, ErrBox>,
-{
- use futures::Async::*;
- match tokio_threadpool::blocking(f) {
- Ok(Ready(Ok(v))) => Ok(Ready(v)),
- Ok(Ready(Err(err))) => Err(err),
- Ok(NotReady) => Ok(NotReady),
- Err(err) => panic!("blocking error {}", err),
- }
-}
-
pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox>
where
- F: 'static + Send + FnOnce() -> Result<Value, ErrBox>,
+ F: 'static + Send + FnOnce() -> Result<Value, ErrBox> + Unpin,
{
if is_sync {
Ok(JsonOp::Sync(f()?))
} else {
- Ok(JsonOp::Async(Box::new(futures::sync::oneshot::spawn(
- tokio_util::poll_fn(move || convert_blocking_json(f)),
- &tokio_executor::DefaultExecutor::current(),
- ))))
+ //TODO(afinch7) replace this with something more efficent.
+ let pool = futures::executor::ThreadPool::new().unwrap();
+ let handle = pool
+ .spawn_with_handle(futures::future::lazy(move |_cx| f()))
+ .unwrap();
+ Ok(JsonOp::Async(handle.boxed()))
}
}
diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs
index 355a24634..13738ba56 100644
--- a/cli/ops/dispatch_minimal.rs
+++ b/cli/ops/dispatch_minimal.rs
@@ -12,9 +12,11 @@ use deno::CoreOp;
use deno::ErrBox;
use deno::Op;
use deno::PinnedBuf;
-use futures::Future;
+use futures::future::FutureExt;
+use std::future::Future;
+use std::pin::Pin;
-pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
+pub type MinimalOp = dyn Future<Output = Result<i32, ErrBox>> + Send;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
@@ -113,7 +115,7 @@ fn test_parse_min_record() {
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
where
- D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
+ D: Fn(i32, Option<PinnedBuf>) -> Pin<Box<MinimalOp>>,
{
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
@@ -136,21 +138,19 @@ where
let min_op = d(rid, zero_copy);
// Convert to CoreOp
- let fut = Box::new(min_op.then(move |result| -> Result<Buf, ()> {
- match result {
- Ok(r) => {
- record.result = r;
- Ok(record.into())
- }
- Err(err) => {
- let error_record = ErrorRecord {
- promise_id: record.promise_id,
- arg: -1,
- error_code: err.kind() as i32,
- error_message: err.to_string().as_bytes().to_owned(),
- };
- Ok(error_record.into())
- }
+ let fut = Box::new(min_op.then(move |result| match result {
+ Ok(r) => {
+ record.result = r;
+ futures::future::ok(record.into())
+ }
+ Err(err) => {
+ let error_record = ErrorRecord {
+ promise_id: record.promise_id,
+ arg: -1,
+ error_code: err.kind() as i32,
+ error_message: err.to_string().as_bytes().to_owned(),
+ };
+ futures::future::ok(error_record.into())
}
}));
@@ -160,9 +160,9 @@ where
// tokio_util::block_on.
// This block is only exercised for readSync and writeSync, which I think
// works since they're simple polling futures.
- Op::Sync(fut.wait().unwrap())
+ Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
- Op::Async(fut)
+ Op::Async(fut.boxed())
}
}
}
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index a1c0fe29c..25cf99812 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -6,11 +6,11 @@ use crate::http_util::get_client;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
-use hyper;
-use hyper::rt::Future;
use std;
use std::convert::From;
@@ -56,26 +56,32 @@ pub fn op_fetch(
}
debug!("Before fetch {}", url);
let state_ = state.clone();
- let future = request.send().map_err(ErrBox::from).and_then(move |res| {
- let status = res.status();
- let mut res_headers = Vec::new();
- for (key, val) in res.headers().iter() {
- res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
- }
+ let future = futures::compat::Compat01As03::new(request.send())
+ .map_err(ErrBox::from)
+ .and_then(move |res| {
+ debug!("Fetch response {}", url);
+ let status = res.status();
+ let mut res_headers = Vec::new();
+ for (key, val) in res.headers().iter() {
+ res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
+ }
- let body = HttpBody::from(res.into_body());
- let mut table = state_.lock_resource_table();
- let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));
+ let body = HttpBody::from(res.into_body());
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "httpBody",
+ Box::new(StreamResource::HttpBody(Box::new(body))),
+ );
- let json_res = json!({
- "bodyRid": rid,
- "status": status.as_u16(),
- "statusText": status.canonical_reason().unwrap_or(""),
- "headers": res_headers
- });
+ let json_res = json!({
+ "bodyRid": rid,
+ "status": status.as_u16(),
+ "statusText": status.canonical_reason().unwrap_or(""),
+ "headers": res_headers
+ });
- futures::future::ok(json_res)
- });
+ futures::future::ok(json_res)
+ });
- Ok(JsonOp::Async(Box::new(future)))
+ Ok(JsonOp::Async(future.boxed()))
}
diff --git a/cli/ops/files.rs b/cli/ops/files.rs
index fc1b8e7d8..1c041b38d 100644
--- a/cli/ops/files.rs
+++ b/cli/ops/files.rs
@@ -8,11 +8,15 @@ use crate::fs as deno_fs;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
-use futures::Future;
-use futures::Poll;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use std;
use std::convert::From;
+use std::future::Future;
use std::io::SeekFrom;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
use tokio;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@@ -88,19 +92,21 @@ fn op_open(
}
let is_sync = args.promise_id.is_none();
- let op = open_options.open(filename).map_err(ErrBox::from).and_then(
- move |fs_file| {
- let mut table = state_.lock_resource_table();
- let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
- futures::future::ok(json!(rid))
- },
- );
+ let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err(
+ open_options.open(filename),
+ ErrBox::from,
+ ))
+ .and_then(move |fs_file| {
+ let mut table = state_.lock_resource_table();
+ let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
+ futures::future::ok(json!(rid))
+ });
if is_sync {
- let buf = op.wait()?;
+ let buf = futures::executor::block_on(op)?;
Ok(JsonOp::Sync(buf))
} else {
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
}
@@ -128,21 +134,27 @@ pub struct SeekFuture {
}
impl Future for SeekFuture {
- type Item = u64;
- type Error = ErrBox;
+ type Output = Result<u64, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let mut table = self.state.lock_resource_table();
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut table = inner.state.lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file,
- _ => return Err(bad_resource()),
+ _ => return Poll::Ready(Err(bad_resource())),
};
- tokio_file.poll_seek(self.seek_from).map_err(ErrBox::from)
+ use tokio::prelude::Async::*;
+
+ match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) {
+ Ok(Ready(v)) => Poll::Ready(Ok(v)),
+ Err(err) => Poll::Ready(Err(err)),
+ Ok(NotReady) => Poll::Pending,
+ }
}
}
@@ -185,9 +197,9 @@ fn op_seek(
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
- let buf = op.wait()?;
+ let buf = futures::executor::block_on(op)?;
Ok(JsonOp::Sync(buf))
} else {
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
}
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 30c933999..11afb1891 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -8,11 +8,16 @@ use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
-use futures::Future;
-use futures::Poll;
+use futures::compat::AsyncRead01CompatExt;
+use futures::compat::AsyncWrite01CompatExt;
+use futures::future::FutureExt;
+use futures::io::{AsyncRead, AsyncWrite};
use std;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
use tokio;
-use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
@@ -80,7 +85,7 @@ pub enum StreamResource {
TcpStream(tokio::net::TcpStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
- HttpBody(HttpBody),
+ HttpBody(Box<HttpBody>),
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
@@ -91,26 +96,49 @@ impl Resource for StreamResource {}
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, ErrBox>>;
}
impl DenoAsyncRead for StreamResource {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- StreamResource::FsFile(ref mut f) => f.poll_read(buf),
- StreamResource::Stdin(ref mut f) => f.poll_read(buf),
- StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
- StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
- StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
- StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
- StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
- StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncRead + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
+ StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
+ StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
+ StreamResource::ClientTlsStream(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
+ StreamResource::ServerTlsStream(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
+ StreamResource::HttpBody(f) => Box::new(f),
+ StreamResource::ChildStdout(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
+ StreamResource::ChildStderr(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
_ => {
- return Err(bad_resource());
+ return Poll::Ready(Err(bad_resource()));
}
};
- r.map_err(ErrBox::from)
+ let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
+ Poll::Pending => Poll::Pending,
+ }
}
}
@@ -150,23 +178,31 @@ pub struct Read<T> {
impl<T> Future for Read<T>
where
- T: AsMut<[u8]>,
+ T: AsMut<[u8]> + Unpin,
{
- type Item = usize;
- type Error = ErrBox;
+ type Output = Result<i32, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.io_state == IoState::Done {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = inner.state.lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
- self.io_state = IoState::Done;
- Ok(nread.into())
+ let nread = match DenoAsyncRead::poll_read(
+ Pin::new(resource),
+ cx,
+ &mut inner.buf.as_mut()[..],
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Done;
+ Poll::Ready(Ok(nread as i32))
}
}
@@ -174,49 +210,76 @@ pub fn op_read(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
-) -> Box<MinimalOp> {
+) -> Pin<Box<MinimalOp>> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
- return Box::new(futures::future::err(deno_error::no_buffer_specified()));
+ return futures::future::err(deno_error::no_buffer_specified()).boxed()
}
Some(buf) => buf,
};
- let fut = read(state, rid as u32, zero_copy)
- .map_err(ErrBox::from)
- .and_then(move |nread| Ok(nread as i32));
+ let fut = read(state, rid as u32, zero_copy);
- Box::new(fut)
+ fut.boxed()
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
-
- fn shutdown(&mut self) -> Poll<(), ErrBox>;
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, ErrBox>>;
+
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>>;
}
impl DenoAsyncWrite for StreamResource {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- StreamResource::FsFile(ref mut f) => f.poll_write(buf),
- StreamResource::Stdout(ref mut f) => f.poll_write(buf),
- StreamResource::Stderr(ref mut f) => f.poll_write(buf),
- StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
- StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
- StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
- StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
+ StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
+ StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
+ StreamResource::TcpStream(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
+ StreamResource::ClientTlsStream(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
+ StreamResource::ServerTlsStream(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
+ StreamResource::ChildStdin(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
_ => {
- return Err(bad_resource());
+ return Poll::Ready(Err(bad_resource()));
}
};
- r.map_err(ErrBox::from)
+ let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
+ Poll::Pending => Poll::Pending,
+ }
}
- fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
+ fn poll_close(
+ self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>> {
unimplemented!()
}
}
@@ -250,23 +313,31 @@ where
/// that error type is `ErrBox` instead of `std::io::Error`.
impl<T> Future for Write<T>
where
- T: AsRef<[u8]>,
+ T: AsRef<[u8]> + Unpin,
{
- type Item = usize;
- type Error = ErrBox;
+ type Output = Result<i32, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.io_state == IoState::Done {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = inner.state.lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
- self.io_state = IoState::Done;
- Ok(nwritten.into())
+ let nwritten = match DenoAsyncWrite::poll_write(
+ Pin::new(resource),
+ cx,
+ inner.buf.as_ref(),
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Done;
+ Poll::Ready(Ok(nwritten as i32))
}
}
@@ -274,18 +345,16 @@ pub fn op_write(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
-) -> Box<MinimalOp> {
+) -> Pin<Box<MinimalOp>> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
- return Box::new(futures::future::err(deno_error::no_buffer_specified()));
+ return futures::future::err(deno_error::no_buffer_specified()).boxed()
}
Some(buf) => buf,
};
- let fut = write(state, rid as u32, zero_copy)
- .map_err(ErrBox::from)
- .and_then(move |nwritten| Ok(nwritten as i32));
+ let fut = write(state, rid as u32, zero_copy);
- Box::new(fut)
+ fut.boxed()
}
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 2fe81e140..929b87dde 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -7,14 +7,20 @@ use crate::resolve_addr::resolve_addr;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
-use futures::Async;
-use futures::Future;
-use futures::Poll;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::stream::StreamExt;
+use futures::stream::TryStreamExt;
use std;
use std::convert::From;
+use std::future::Future;
use std::net::Shutdown;
use std::net::SocketAddr;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
use tokio;
+use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
@@ -49,17 +55,17 @@ pub struct Accept {
}
impl Future for Accept {
- type Item = (TcpStream, SocketAddr);
- type Error = ErrBox;
+ type Output = Result<(TcpStream, SocketAddr), ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.accept_state == AcceptState::Done {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.accept_state == AcceptState::Done {
panic!("poll Accept after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = inner.state.lock_resource_table();
let listener_resource = table
- .get_mut::<TcpListenerResource>(self.rid)
+ .get_mut::<TcpListenerResource>(inner.rid)
.ok_or_else(|| {
let e = std::io::Error::new(
std::io::ErrorKind::Other,
@@ -68,44 +74,50 @@ impl Future for Accept {
ErrBox::from(e)
})?;
- let listener = &mut listener_resource.listener;
+ let mut listener =
+ futures::compat::Compat01As03::new(&mut listener_resource.listener)
+ .map_err(ErrBox::from);
- if self.accept_state == AcceptState::Eager {
+ if inner.accept_state == AcceptState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
- match listener.poll_accept().map_err(ErrBox::from) {
- Ok(Async::Ready((stream, addr))) => {
- self.accept_state = AcceptState::Done;
- return Ok((stream, addr).into());
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(stream))) => {
+ inner.accept_state = AcceptState::Done;
+ let addr = stream.peer_addr().unwrap();
+ return Poll::Ready(Ok((stream, addr)));
}
- Ok(Async::NotReady) => {
- self.accept_state = AcceptState::Pending;
- return Ok(Async::NotReady);
+ Poll::Pending => {
+ inner.accept_state = AcceptState::Pending;
+ return Poll::Pending;
}
- Err(e) => {
- self.accept_state = AcceptState::Done;
- return Err(e);
+ Poll::Ready(Some(Err(e))) => {
+ inner.accept_state = AcceptState::Done;
+ return Poll::Ready(Err(e));
}
+ _ => unreachable!(),
}
}
- match listener.poll_accept().map_err(ErrBox::from) {
- Ok(Async::Ready((stream, addr))) => {
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(stream))) => {
listener_resource.untrack_task();
- self.accept_state = AcceptState::Done;
- Ok((stream, addr).into())
+ inner.accept_state = AcceptState::Done;
+ let addr = stream.peer_addr().unwrap();
+ Poll::Ready(Ok((stream, addr)))
}
- Ok(Async::NotReady) => {
- listener_resource.track_task()?;
- Ok(Async::NotReady)
+ Poll::Pending => {
+ listener_resource.track_task(cx)?;
+ Poll::Pending
}
- Err(e) => {
+ Poll::Ready(Some(Err(e))) => {
listener_resource.untrack_task();
- self.accept_state = AcceptState::Done;
- Err(e)
+ inner.accept_state = AcceptState::Done;
+ Poll::Ready(Err(e))
}
+ _ => unreachable!(),
}
}
}
@@ -130,12 +142,18 @@ fn op_accept(
let op = accept(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
+ let local_addr = match tcp_stream.local_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(ErrBox::from(e)),
+ };
+ let remote_addr = match tcp_stream.peer_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(ErrBox::from(e)),
+ };
let mut table = state_.lock_resource_table();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
- Ok((rid, local_addr, remote_addr))
+ futures::future::ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
.and_then(move |(rid, local_addr, remote_addr)| {
@@ -146,7 +164,7 @@ fn op_accept(
}))
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
#[derive(Deserialize)]
@@ -167,15 +185,21 @@ fn op_dial(
state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- TcpStream::connect(&addr)
+ futures::compat::Compat01As03::new(TcpStream::connect(&addr))
.map_err(ErrBox::from)
.and_then(move |tcp_stream| {
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
+ let local_addr = match tcp_stream.local_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(ErrBox::from(e)),
+ };
+ let remote_addr = match tcp_stream.peer_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(ErrBox::from(e)),
+ };
let mut table = state_.lock_resource_table();
let rid = table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
- Ok((rid, local_addr, remote_addr))
+ futures::future::ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
.and_then(move |(rid, local_addr, remote_addr)| {
@@ -187,7 +211,7 @@ fn op_dial(
})
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
#[derive(Deserialize)]
@@ -235,8 +259,8 @@ struct ListenArgs {
#[allow(dead_code)]
struct TcpListenerResource {
- listener: tokio::net::TcpListener,
- task: Option<futures::task::Task>,
+ listener: Incoming,
+ waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
}
@@ -244,7 +268,7 @@ impl Resource for TcpListenerResource {}
impl Drop for TcpListenerResource {
fn drop(&mut self) {
- self.notify_task();
+ self.wake_task();
}
}
@@ -253,12 +277,12 @@ impl TcpListenerResource {
/// can be notified when listener is closed.
///
/// Throws an error if another task is already tracked.
- pub fn track_task(&mut self) -> Result<(), ErrBox> {
+ pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.task.is_some() {
+ if self.waker.is_some() {
let e = std::io::Error::new(
std::io::ErrorKind::Other,
"Another accept task is ongoing",
@@ -266,22 +290,24 @@ impl TcpListenerResource {
return Err(ErrBox::from(e));
}
- self.task.replace(futures::task::current());
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ self.waker.replace(waker);
Ok(())
}
/// Notifies a task when listener is closed so accept future can resolve.
- pub fn notify_task(&mut self) {
- if let Some(task) = self.task.take() {
- task.notify();
+ pub fn wake_task(&mut self) {
+ if let Some(waker) = self.waker.as_ref() {
+ waker.wake();
}
}
/// Stop tracking a task.
/// Happens when the task is done and thus no further tracking is needed.
pub fn untrack_task(&mut self) {
- if self.task.is_some() {
- self.task.take();
+ if self.waker.is_some() {
+ self.waker.take();
}
}
}
@@ -296,17 +322,19 @@ fn op_listen(
state.check_net(&args.hostname, args.port)?;
- let addr = resolve_addr(&args.hostname, args.port).wait()?;
+ let addr =
+ futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let listener = TcpListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let listener_resource = TcpListenerResource {
- listener,
- task: None,
+ listener: listener.incoming(),
+ waker: None,
local_addr,
};
let mut table = state.lock_resource_table();
let rid = table.add("tcpListener", Box::new(listener_resource));
+ debug!("New listener {} {}", rid, local_addr_str);
Ok(JsonOp::Sync(json!({
"rid": rid,
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index 237b02fd0..a267130ec 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -7,12 +7,18 @@ use crate::signal::kill;
use crate::state::ThreadSafeState;
use deno::*;
use futures;
-use futures::Future;
-use futures::Poll;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::task::SpawnExt;
use std;
use std::convert::From;
+use std::future::Future;
+use std::pin::Pin;
use std::process::Command;
use std::process::ExitStatus;
+use std::task::Context;
+use std::task::Poll;
+use tokio::prelude::Async;
use tokio_process::CommandExt;
#[cfg(unix)]
@@ -33,19 +39,23 @@ struct CloneFileFuture {
}
impl Future for CloneFileFuture {
- type Item = tokio::fs::File;
- type Error = ErrBox;
+ type Output = Result<tokio::fs::File, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let mut table = self.state.lock_resource_table();
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut table = inner.state.lock_resource_table();
let repr = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
match repr {
StreamResource::FsFile(ref mut file) => {
- file.poll_try_clone().map_err(ErrBox::from)
+ match file.poll_try_clone().map_err(ErrBox::from) {
+ Err(err) => Poll::Ready(Err(err)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
}
- _ => Err(bad_resource()),
+ _ => Poll::Ready(Err(bad_resource())),
}
}
}
@@ -54,11 +64,10 @@ fn clone_file(
rid: u32,
state: &ThreadSafeState,
) -> Result<std::fs::File, ErrBox> {
- (CloneFileFuture {
+ futures::executor::block_on(CloneFileFuture {
rid,
state: state.clone(),
})
- .wait()
.map(|f| f.into_std())
}
@@ -86,7 +95,7 @@ struct RunArgs {
}
struct ChildResource {
- child: tokio_process::Child,
+ child: futures::compat::Compat01As03<tokio_process::Child>,
}
impl Resource for ChildResource {}
@@ -179,7 +188,9 @@ fn op_run(
None => None,
};
- let child_resource = ChildResource { child };
+ let child_resource = ChildResource {
+ child: futures::compat::Compat01As03::new(child),
+ };
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
@@ -197,16 +208,16 @@ pub struct ChildStatus {
}
impl Future for ChildStatus {
- type Item = ExitStatus;
- type Error = ErrBox;
+ type Output = Result<ExitStatus, ErrBox>;
- fn poll(&mut self) -> Poll<ExitStatus, ErrBox> {
- let mut table = self.state.lock_resource_table();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut table = inner.state.lock_resource_table();
let child_resource = table
- .get_mut::<ChildResource>(self.rid)
+ .get_mut::<ChildResource>(inner.rid)
.ok_or_else(bad_resource)?;
let child = &mut child_resource.child;
- child.poll().map_err(ErrBox::from)
+ child.map_err(ErrBox::from).poll_unpin(cx)
}
}
@@ -251,7 +262,10 @@ fn op_run_status(
}))
});
- Ok(JsonOp::Async(Box::new(future)))
+ let pool = futures::executor::ThreadPool::new().unwrap();
+ let handle = pool.spawn_with_handle(future).unwrap();
+
+ Ok(JsonOp::Async(handle.boxed()))
}
#[derive(Deserialize)]
diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs
index 9d87aaf5c..7223633f8 100644
--- a/cli/ops/timers.rs
+++ b/cli/ops/timers.rs
@@ -3,7 +3,7 @@ use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
-use futures::Future;
+use futures::future::FutureExt;
use std;
use std::time::Duration;
use std::time::Instant;
@@ -51,7 +51,7 @@ fn op_global_timer(
.new_timeout(deadline)
.then(move |_| futures::future::ok(json!({})));
- Ok(JsonOp::Async(Box::new(f)))
+ Ok(JsonOp::Async(f.boxed()))
}
// Returns a milliseconds and nanoseconds subsec
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs
index 48419f76f..484b7057c 100644
--- a/cli/ops/tls.rs
+++ b/cli/ops/tls.rs
@@ -9,16 +9,22 @@ use crate::resolve_addr::resolve_addr;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
-use futures::Async;
-use futures::Future;
-use futures::Poll;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::stream::StreamExt;
+use futures::stream::TryStreamExt;
use std;
use std::convert::From;
use std::fs::File;
+use std::future::Future;
use std::io::BufReader;
use std::net::SocketAddr;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
use tokio;
+use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@@ -72,49 +78,63 @@ pub fn op_dial_tls(
}
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- TcpStream::connect(&addr)
+ futures::compat::Compat01As03::new(TcpStream::connect(&addr))
.and_then(move |tcp_stream| {
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
+ let local_addr = match tcp_stream.local_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(e),
+ };
+ let remote_addr = match tcp_stream.peer_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(e),
+ };
let mut config = ClientConfig::new();
config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
if let Some(path) = cert_file {
- let key_file = File::open(path)?;
+ let key_file = match File::open(path) {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(e),
+ };
let reader = &mut BufReader::new(key_file);
config.root_store.add_pem_file(reader).unwrap();
}
-
let tls_connector = TlsConnector::from(Arc::new(config));
- Ok((tls_connector, tcp_stream, local_addr, remote_addr))
+ futures::future::ok((
+ tls_connector,
+ tcp_stream,
+ local_addr,
+ remote_addr,
+ ))
})
.map_err(ErrBox::from)
.and_then(
move |(tls_connector, tcp_stream, local_addr, remote_addr)| {
let dnsname = DNSNameRef::try_from_ascii_str(&domain)
.expect("Invalid DNS lookup");
- tls_connector
- .connect(dnsname, tcp_stream)
- .map_err(ErrBox::from)
- .and_then(move |tls_stream| {
- let mut table = state_.lock_resource_table();
- let rid = table.add(
- "clientTlsStream",
- Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
- );
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- })
+ futures::compat::Compat01As03::new(
+ tls_connector.connect(dnsname, tcp_stream),
+ )
+ .map_err(ErrBox::from)
+ .and_then(move |tls_stream| {
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "clientTlsStream",
+ Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
+ );
+ futures::future::ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ })
},
)
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> {
@@ -177,9 +197,9 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> {
#[allow(dead_code)]
pub struct TlsListenerResource {
- listener: tokio::net::TcpListener,
+ listener: Incoming,
tls_acceptor: TlsAcceptor,
- task: Option<futures::task::Task>,
+ waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
}
@@ -187,7 +207,7 @@ impl Resource for TlsListenerResource {}
impl Drop for TlsListenerResource {
fn drop(&mut self) {
- self.notify_task();
+ self.wake_task();
}
}
@@ -196,12 +216,12 @@ impl TlsListenerResource {
/// can be notified when listener is closed.
///
/// Throws an error if another task is already tracked.
- pub fn track_task(&mut self) -> Result<(), ErrBox> {
+ pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> {
// Currently, we only allow tracking a single accept task for a listener.
// This might be changed in the future with multiple workers.
// Caveat: TcpListener by itself also only tracks an accept task at a time.
// See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
- if self.task.is_some() {
+ if self.waker.is_some() {
let e = std::io::Error::new(
std::io::ErrorKind::Other,
"Another accept task is ongoing",
@@ -209,22 +229,24 @@ impl TlsListenerResource {
return Err(ErrBox::from(e));
}
- self.task.replace(futures::task::current());
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ self.waker.replace(waker);
Ok(())
}
/// Notifies a task when listener is closed so accept future can resolve.
- pub fn notify_task(&mut self) {
- if let Some(task) = self.task.take() {
- task.notify();
+ pub fn wake_task(&mut self) {
+ if let Some(waker) = self.waker.as_ref() {
+ waker.wake();
}
}
/// Stop tracking a task.
/// Happens when the task is done and thus no further tracking is needed.
pub fn untrack_task(&mut self) {
- if self.task.is_some() {
- self.task.take();
+ if self.waker.is_some() {
+ self.waker.take();
}
}
}
@@ -259,14 +281,15 @@ fn op_listen_tls(
.set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0))
.expect("invalid key or certificate");
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
- let addr = resolve_addr(&args.hostname, args.port).wait()?;
+ let addr =
+ futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
let listener = TcpListener::bind(&addr)?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let tls_listener_resource = TlsListenerResource {
- listener,
+ listener: listener.incoming(),
tls_acceptor,
- task: None,
+ waker: None,
local_addr,
};
let mut table = state.lock_resource_table();
@@ -302,17 +325,17 @@ pub struct AcceptTls {
}
impl Future for AcceptTls {
- type Item = (TcpStream, SocketAddr);
- type Error = ErrBox;
+ type Output = Result<(TcpStream, SocketAddr), ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.accept_state == AcceptTlsState::Done {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.accept_state == AcceptTlsState::Done {
panic!("poll AcceptTls after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = inner.state.lock_resource_table();
let listener_resource = table
- .get_mut::<TlsListenerResource>(self.rid)
+ .get_mut::<TlsListenerResource>(inner.rid)
.ok_or_else(|| {
let e = std::io::Error::new(
std::io::ErrorKind::Other,
@@ -321,44 +344,50 @@ impl Future for AcceptTls {
ErrBox::from(e)
})?;
- let listener = &mut listener_resource.listener;
+ let mut listener =
+ futures::compat::Compat01As03::new(&mut listener_resource.listener)
+ .map_err(ErrBox::from);
- if self.accept_state == AcceptTlsState::Eager {
+ if inner.accept_state == AcceptTlsState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
- match listener.poll_accept().map_err(ErrBox::from) {
- Ok(Async::Ready((stream, addr))) => {
- self.accept_state = AcceptTlsState::Done;
- return Ok((stream, addr).into());
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(stream))) => {
+ inner.accept_state = AcceptTlsState::Done;
+ let addr = stream.peer_addr().unwrap();
+ return Poll::Ready(Ok((stream, addr)));
}
- Ok(Async::NotReady) => {
- self.accept_state = AcceptTlsState::Pending;
- return Ok(Async::NotReady);
+ Poll::Pending => {
+ inner.accept_state = AcceptTlsState::Pending;
+ return Poll::Pending;
}
- Err(e) => {
- self.accept_state = AcceptTlsState::Done;
- return Err(e);
+ Poll::Ready(Some(Err(e))) => {
+ inner.accept_state = AcceptTlsState::Done;
+ return Poll::Ready(Err(e));
}
+ _ => unreachable!(),
}
}
- match listener.poll_accept().map_err(ErrBox::from) {
- Ok(Async::Ready((stream, addr))) => {
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(Ok(stream))) => {
listener_resource.untrack_task();
- self.accept_state = AcceptTlsState::Done;
- Ok((stream, addr).into())
+ inner.accept_state = AcceptTlsState::Done;
+ let addr = stream.peer_addr().unwrap();
+ Poll::Ready(Ok((stream, addr)))
}
- Ok(Async::NotReady) => {
- listener_resource.track_task()?;
- Ok(Async::NotReady)
+ Poll::Pending => {
+ listener_resource.track_task(cx)?;
+ Poll::Pending
}
- Err(e) => {
+ Poll::Ready(Some(Err(e))) => {
listener_resource.untrack_task();
- self.accept_state = AcceptTlsState::Done;
- Err(e)
+ inner.accept_state = AcceptTlsState::Done;
+ Poll::Ready(Err(e))
}
+ _ => unreachable!(),
}
}
}
@@ -379,9 +408,15 @@ fn op_accept_tls(
let state2 = state.clone();
let op = accept_tls(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
- let local_addr = tcp_stream.local_addr()?;
- let remote_addr = tcp_stream.peer_addr()?;
- Ok((tcp_stream, local_addr, remote_addr))
+ let local_addr = match tcp_stream.local_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(ErrBox::from(e)),
+ };
+ let remote_addr = match tcp_stream.peer_addr() {
+ Ok(v) => v,
+ Err(e) => return futures::future::err(ErrBox::from(e)),
+ };
+ futures::future::ok((tcp_stream, local_addr, remote_addr))
})
.and_then(move |(tcp_stream, local_addr, remote_addr)| {
let table = state1.lock_resource_table();
@@ -390,18 +425,18 @@ fn op_accept_tls(
.ok_or_else(bad_resource)
.expect("Can't find tls listener");
- resource
- .tls_acceptor
- .accept(tcp_stream)
- .map_err(ErrBox::from)
- .and_then(move |tls_stream| {
- let mut table = state2.lock_resource_table();
- let rid = table.add(
- "serverTlsStream",
- Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
- );
- Ok((rid, local_addr, remote_addr))
- })
+ futures::compat::Compat01As03::new(
+ resource.tls_acceptor.accept(tcp_stream),
+ )
+ .map_err(ErrBox::from)
+ .and_then(move |tls_stream| {
+ let mut table = state2.lock_resource_table();
+ let rid = table.add(
+ "serverTlsStream",
+ Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
+ );
+ futures::future::ok((rid, local_addr, remote_addr))
+ })
})
.and_then(move |(rid, local_addr, remote_addr)| {
futures::future::ok(json!({
@@ -411,5 +446,5 @@ fn op_accept_tls(
}))
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index cf7378a91..ee60c6824 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -7,16 +7,21 @@ use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
+use crate::tokio_util;
use crate::worker::Worker;
use deno::*;
use futures;
-use futures::Async;
-use futures::Future;
-use futures::Sink;
-use futures::Stream;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
use std;
use std::convert::From;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::atomic::Ordering;
+use std::task::Context;
+use std::task::Poll;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op(
@@ -52,13 +57,13 @@ struct GetMessageFuture {
}
impl Future for GetMessageFuture {
- type Item = Option<Buf>;
- type Error = ErrBox;
+ type Output = Option<Buf>;
- fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
- let mut channels = self.state.worker_channels.lock().unwrap();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut channels = inner.state.worker_channels.lock().unwrap();
let receiver = &mut channels.receiver;
- receiver.poll().map_err(ErrBox::from)
+ receiver.poll_next_unpin(cx)
}
}
@@ -72,17 +77,15 @@ fn op_worker_get_message(
state: state.clone(),
};
- let op = op
- .map_err(move |_| -> ErrBox { unimplemented!() })
- .and_then(move |maybe_buf| {
- debug!("op_worker_get_message");
+ let op = op.then(move |maybe_buf| {
+ debug!("op_worker_get_message");
- futures::future::ok(json!({
- "data": maybe_buf.map(|buf| buf.to_owned())
- }))
- });
+ futures::future::ok(json!({
+ "data": maybe_buf.map(|buf| buf.to_owned())
+ }))
+ });
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
/// Post message to host as guest worker
@@ -94,9 +97,7 @@ fn op_worker_post_message(
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let mut channels = state.worker_channels.lock().unwrap();
let sender = &mut channels.sender;
- sender
- .send(d)
- .wait()
+ futures::executor::block_on(sender.send(d))
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
@@ -165,12 +166,35 @@ fn op_create_worker(
let op = worker
.execute_mod_async(&module_specifier, None, false)
- .and_then(move |()| Ok(exec_cb(worker)));
+ .and_then(move |()| futures::future::ok(exec_cb(worker)));
- let result = op.wait()?;
+ let result = tokio_util::block_on(op.boxed())?;
Ok(JsonOp::Sync(result))
}
+struct GetWorkerClosedFuture {
+ state: ThreadSafeState,
+ rid: ResourceId,
+}
+
+impl Future for GetWorkerClosedFuture {
+ type Output = Result<(), ErrBox>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut workers_table = inner.state.workers.lock().unwrap();
+ let maybe_worker = workers_table.get_mut(&inner.rid);
+ if maybe_worker.is_none() {
+ return Poll::Ready(Ok(()));
+ }
+ match maybe_worker.unwrap().poll_unpin(cx) {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+ Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
#[derive(Deserialize)]
struct HostGetWorkerClosedArgs {
id: i32,
@@ -185,18 +209,18 @@ fn op_host_get_worker_closed(
let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state_ = state.clone();
- let workers_table = state.workers.lock().unwrap();
- // TODO: handle bad worker id gracefully
- let worker = workers_table.get(&id).unwrap();
- let shared_worker_future = worker.clone().shared();
- let op = shared_worker_future.then(move |_result| {
+ let future = GetWorkerClosedFuture {
+ state: state.clone(),
+ rid: id,
+ };
+ let op = future.then(move |_result| {
let mut workers_table = state_.workers.lock().unwrap();
workers_table.remove(&id);
futures::future::ok(json!({}))
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
#[derive(Deserialize)]
@@ -225,7 +249,7 @@ fn op_host_get_message(
}))
});
- Ok(JsonOp::Async(Box::new(op)))
+ Ok(JsonOp::Async(op.boxed()))
}
#[derive(Deserialize)]
@@ -247,8 +271,7 @@ fn op_host_post_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
- worker
- .post_message(msg)
+ tokio_util::block_on(worker.post_message(msg).boxed())
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}