diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/compiler.rs | 19 | ||||
-rw-r--r-- | cli/ops/dispatch_json.rs | 47 | ||||
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 40 | ||||
-rw-r--r-- | cli/ops/fetch.rs | 46 | ||||
-rw-r--r-- | cli/ops/files.rs | 52 | ||||
-rw-r--r-- | cli/ops/io.rs | 197 | ||||
-rw-r--r-- | cli/ops/net.rs | 136 | ||||
-rw-r--r-- | cli/ops/process.rs | 54 | ||||
-rw-r--r-- | cli/ops/timers.rs | 4 | ||||
-rw-r--r-- | cli/ops/tls.rs | 201 | ||||
-rw-r--r-- | cli/ops/workers.rs | 87 |
11 files changed, 531 insertions, 352 deletions
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index a722db6af..fdb62ca32 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -1,7 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use crate::futures::future::join_all; -use crate::futures::Future; +use crate::futures::future::try_join_all; +use crate::futures::future::FutureExt; +use crate::futures::future::TryFutureExt; use crate::msg; use crate::ops::json_op; use crate::state::ThreadSafeState; @@ -77,7 +78,7 @@ fn op_fetch_source_files( let global_state = state.global_state.clone(); - let future = join_all(futures) + let future = try_join_all(futures) .map_err(ErrBox::from) .and_then(move |files| { // We want to get an array of futures that resolves to @@ -88,17 +89,19 @@ fn op_fetch_source_files( // compile them into JS first! // This allows TS to do correct export types. if file.media_type == msg::MediaType::Wasm { - return futures::future::Either::A( + return futures::future::Either::Left( global_state .wasm_compiler .compile_async(global_state.clone(), &file) - .and_then(|compiled_mod| Ok((file, Some(compiled_mod.code)))), + .and_then(|compiled_mod| { + futures::future::ok((file, Some(compiled_mod.code))) + }), ); } - futures::future::Either::B(futures::future::ok((file, None))) + futures::future::Either::Right(futures::future::ok((file, None))) }) .collect(); - join_all(v) + try_join_all(v) }) .and_then(move |files_with_code| { let res = files_with_code @@ -120,7 +123,7 @@ fn op_fetch_source_files( futures::future::ok(res) }); - Ok(JsonOp::Async(Box::new(future))) + Ok(JsonOp::Async(future.boxed())) } #[derive(Deserialize)] diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 5f1caac6f..38dc7932e 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -1,13 +1,15 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::tokio_util; use deno::*; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::task::SpawnExt; pub use serde_derive::Deserialize; use serde_json::json; pub use serde_json::Value; +use std::future::Future; +use std::pin::Pin; -pub type AsyncJsonOp = Box<dyn Future<Item = Value, Error = ErrBox> + Send>; +pub type AsyncJsonOp = + Pin<Box<dyn Future<Output = Result<Value, ErrBox>> + Send>>; pub enum JsonOp { Sync(Value), @@ -70,48 +72,35 @@ where } Ok(JsonOp::Async(fut)) => { assert!(promise_id.is_some()); - let fut2 = Box::new(fut.then(move |result| -> Result<Buf, ()> { - Ok(serialize_result(promise_id, result)) - })); - CoreOp::Async(fut2) + let fut2 = fut.then(move |result| { + futures::future::ok(serialize_result(promise_id, result)) + }); + CoreOp::Async(fut2.boxed()) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { CoreOp::Sync(buf) } else { - CoreOp::Async(Box::new(futures::future::ok(buf))) + CoreOp::Async(futures::future::ok(buf).boxed()) } } } } } -// This is just type conversion. Implement From trait? -// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85 -fn convert_blocking_json<F>(f: F) -> Poll<Value, ErrBox> -where - F: FnOnce() -> Result<Value, ErrBox>, -{ - use futures::Async::*; - match tokio_threadpool::blocking(f) { - Ok(Ready(Ok(v))) => Ok(Ready(v)), - Ok(Ready(Err(err))) => Err(err), - Ok(NotReady) => Ok(NotReady), - Err(err) => panic!("blocking error {}", err), - } -} - pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox> where - F: 'static + Send + FnOnce() -> Result<Value, ErrBox>, + F: 'static + Send + FnOnce() -> Result<Value, ErrBox> + Unpin, { if is_sync { Ok(JsonOp::Sync(f()?)) } else { - Ok(JsonOp::Async(Box::new(futures::sync::oneshot::spawn( - tokio_util::poll_fn(move || convert_blocking_json(f)), - &tokio_executor::DefaultExecutor::current(), - )))) + //TODO(afinch7) replace this with something more efficent. + let pool = futures::executor::ThreadPool::new().unwrap(); + let handle = pool + .spawn_with_handle(futures::future::lazy(move |_cx| f())) + .unwrap(); + Ok(JsonOp::Async(handle.boxed())) } } diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 355a24634..13738ba56 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -12,9 +12,11 @@ use deno::CoreOp; use deno::ErrBox; use deno::Op; use deno::PinnedBuf; -use futures::Future; +use futures::future::FutureExt; +use std::future::Future; +use std::pin::Pin; -pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send; +pub type MinimalOp = dyn Future<Output = Result<i32, ErrBox>> + Send; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. @@ -113,7 +115,7 @@ fn test_parse_min_record() { pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp where - D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>, + D: Fn(i32, Option<PinnedBuf>) -> Pin<Box<MinimalOp>>, { move |control: &[u8], zero_copy: Option<PinnedBuf>| { let mut record = match parse_min_record(control) { @@ -136,21 +138,19 @@ where let min_op = d(rid, zero_copy); // Convert to CoreOp - let fut = Box::new(min_op.then(move |result| -> Result<Buf, ()> { - match result { - Ok(r) => { - record.result = r; - Ok(record.into()) - } - Err(err) => { - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_code: err.kind() as i32, - error_message: err.to_string().as_bytes().to_owned(), - }; - Ok(error_record.into()) - } + let fut = Box::new(min_op.then(move |result| match result { + Ok(r) => { + record.result = r; + futures::future::ok(record.into()) + } + Err(err) => { + let error_record = ErrorRecord { + promise_id: record.promise_id, + arg: -1, + error_code: err.kind() as i32, + error_message: err.to_string().as_bytes().to_owned(), + }; + futures::future::ok(error_record.into()) } })); @@ -160,9 +160,9 @@ where // tokio_util::block_on. // This block is only exercised for readSync and writeSync, which I think // works since they're simple polling futures. - Op::Sync(fut.wait().unwrap()) + Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut) + Op::Async(fut.boxed()) } } } diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index a1c0fe29c..25cf99812 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -6,11 +6,11 @@ use crate::http_util::get_client; use crate::ops::json_op; use crate::state::ThreadSafeState; use deno::*; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use http::header::HeaderName; use http::header::HeaderValue; use http::Method; -use hyper; -use hyper::rt::Future; use std; use std::convert::From; @@ -56,26 +56,32 @@ pub fn op_fetch( } debug!("Before fetch {}", url); let state_ = state.clone(); - let future = request.send().map_err(ErrBox::from).and_then(move |res| { - let status = res.status(); - let mut res_headers = Vec::new(); - for (key, val) in res.headers().iter() { - res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); - } + let future = futures::compat::Compat01As03::new(request.send()) + .map_err(ErrBox::from) + .and_then(move |res| { + debug!("Fetch response {}", url); + let status = res.status(); + let mut res_headers = Vec::new(); + for (key, val) in res.headers().iter() { + res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); + } - let body = HttpBody::from(res.into_body()); - let mut table = state_.lock_resource_table(); - let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body))); + let body = HttpBody::from(res.into_body()); + let mut table = state_.lock_resource_table(); + let rid = table.add( + "httpBody", + Box::new(StreamResource::HttpBody(Box::new(body))), + ); - let json_res = json!({ - "bodyRid": rid, - "status": status.as_u16(), - "statusText": status.canonical_reason().unwrap_or(""), - "headers": res_headers - }); + let json_res = json!({ + "bodyRid": rid, + "status": status.as_u16(), + "statusText": status.canonical_reason().unwrap_or(""), + "headers": res_headers + }); - futures::future::ok(json_res) - }); + futures::future::ok(json_res) + }); - Ok(JsonOp::Async(Box::new(future))) + Ok(JsonOp::Async(future.boxed())) } diff --git a/cli/ops/files.rs b/cli/ops/files.rs index fc1b8e7d8..1c041b38d 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -8,11 +8,15 @@ use crate::fs as deno_fs; use crate::ops::json_op; use crate::state::ThreadSafeState; use deno::*; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use std; use std::convert::From; +use std::future::Future; use std::io::SeekFrom; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { @@ -88,19 +92,21 @@ fn op_open( } let is_sync = args.promise_id.is_none(); - let op = open_options.open(filename).map_err(ErrBox::from).and_then( - move |fs_file| { - let mut table = state_.lock_resource_table(); - let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); - futures::future::ok(json!(rid)) - }, - ); + let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err( + open_options.open(filename), + ErrBox::from, + )) + .and_then(move |fs_file| { + let mut table = state_.lock_resource_table(); + let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); + futures::future::ok(json!(rid)) + }); if is_sync { - let buf = op.wait()?; + let buf = futures::executor::block_on(op)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } } @@ -128,21 +134,27 @@ pub struct SeekFuture { } impl Future for SeekFuture { - type Item = u64; - type Error = ErrBox; + type Output = Result<u64, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let mut table = self.state.lock_resource_table(); + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; let tokio_file = match resource { StreamResource::FsFile(ref mut file) => file, - _ => return Err(bad_resource()), + _ => return Poll::Ready(Err(bad_resource())), }; - tokio_file.poll_seek(self.seek_from).map_err(ErrBox::from) + use tokio::prelude::Async::*; + + match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) { + Ok(Ready(v)) => Poll::Ready(Ok(v)), + Err(err) => Poll::Ready(Err(err)), + Ok(NotReady) => Poll::Pending, + } } } @@ -185,9 +197,9 @@ fn op_seek( let op = fut.and_then(move |_| futures::future::ok(json!({}))); if args.promise_id.is_none() { - let buf = op.wait()?; + let buf = futures::executor::block_on(op)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } } diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 30c933999..11afb1891 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -8,11 +8,16 @@ use deno::ErrBox; use deno::Resource; use deno::*; use futures; -use futures::Future; -use futures::Poll; +use futures::compat::AsyncRead01CompatExt; +use futures::compat::AsyncWrite01CompatExt; +use futures::future::FutureExt; +use futures::io::{AsyncRead, AsyncWrite}; use std; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; @@ -80,7 +85,7 @@ pub enum StreamResource { TcpStream(tokio::net::TcpStream), ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), - HttpBody(HttpBody), + HttpBody(Box<HttpBody>), ChildStdin(tokio_process::ChildStdin), ChildStdout(tokio_process::ChildStdout), ChildStderr(tokio_process::ChildStderr), @@ -91,26 +96,49 @@ impl Resource for StreamResource {} /// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncRead { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>; + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, ErrBox>>; } impl DenoAsyncRead for StreamResource { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_read(buf), - StreamResource::Stdin(ref mut f) => f.poll_read(buf), - StreamResource::TcpStream(ref mut f) => f.poll_read(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::HttpBody(ref mut f) => f.poll_read(buf), - StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), - StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, ErrBox>> { + let inner = self.get_mut(); + let mut f: Box<dyn AsyncRead + Unpin> = match inner { + StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::ClientTlsStream(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::ServerTlsStream(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::HttpBody(f) => Box::new(f), + StreamResource::ChildStdout(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::ChildStderr(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } _ => { - return Err(bad_resource()); + return Poll::Ready(Err(bad_resource())); } }; - r.map_err(ErrBox::from) + let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } } @@ -150,23 +178,31 @@ pub struct Read<T> { impl<T> Future for Read<T> where - T: AsMut<[u8]>, + T: AsMut<[u8]> + Unpin, { - type Item = usize; - type Error = ErrBox; + type Output = Result<i32, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.io_state == IoState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; - let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.io_state = IoState::Done; - Ok(nread.into()) + let nread = match DenoAsyncRead::poll_read( + Pin::new(resource), + cx, + &mut inner.buf.as_mut()[..], + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Done; + Poll::Ready(Ok(nread as i32)) } } @@ -174,49 +210,76 @@ pub fn op_read( state: &ThreadSafeState, rid: i32, zero_copy: Option<PinnedBuf>, -) -> Box<MinimalOp> { +) -> Pin<Box<MinimalOp>> { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())); + return futures::future::err(deno_error::no_buffer_specified()).boxed() } Some(buf) => buf, }; - let fut = read(state, rid as u32, zero_copy) - .map_err(ErrBox::from) - .and_then(move |nread| Ok(nread as i32)); + let fut = read(state, rid as u32, zero_copy); - Box::new(fut) + fut.boxed() } /// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncWrite { - fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>; - - fn shutdown(&mut self) -> Poll<(), ErrBox>; + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll<Result<usize, ErrBox>>; + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<(), ErrBox>>; } impl DenoAsyncWrite for StreamResource { - fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_write(buf), - StreamResource::Stdout(ref mut f) => f.poll_write(buf), - StreamResource::Stderr(ref mut f) => f.poll_write(buf), - StreamResource::TcpStream(ref mut f) => f.poll_write(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll<Result<usize, ErrBox>> { + let inner = self.get_mut(); + let mut f: Box<dyn AsyncWrite + Unpin> = match inner { + StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::TcpStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ClientTlsStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ServerTlsStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ChildStdin(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } _ => { - return Err(bad_resource()); + return Poll::Ready(Err(bad_resource())); } }; - r.map_err(ErrBox::from) + let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } - fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<(), ErrBox>> { unimplemented!() } } @@ -250,23 +313,31 @@ where /// that error type is `ErrBox` instead of `std::io::Error`. impl<T> Future for Write<T> where - T: AsRef<[u8]>, + T: AsRef<[u8]> + Unpin, { - type Item = usize; - type Error = ErrBox; + type Output = Result<i32, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.io_state == IoState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; - let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.io_state = IoState::Done; - Ok(nwritten.into()) + let nwritten = match DenoAsyncWrite::poll_write( + Pin::new(resource), + cx, + inner.buf.as_ref(), + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Done; + Poll::Ready(Ok(nwritten as i32)) } } @@ -274,18 +345,16 @@ pub fn op_write( state: &ThreadSafeState, rid: i32, zero_copy: Option<PinnedBuf>, -) -> Box<MinimalOp> { +) -> Pin<Box<MinimalOp>> { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())); + return futures::future::err(deno_error::no_buffer_specified()).boxed() } Some(buf) => buf, }; - let fut = write(state, rid as u32, zero_copy) - .map_err(ErrBox::from) - .and_then(move |nwritten| Ok(nwritten as i32)); + let fut = write(state, rid as u32, zero_copy); - Box::new(fut) + fut.boxed() } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 2fe81e140..929b87dde 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -7,14 +7,20 @@ use crate::resolve_addr::resolve_addr; use crate::state::ThreadSafeState; use deno::Resource; use deno::*; -use futures::Async; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use std; use std::convert::From; +use std::future::Future; use std::net::Shutdown; use std::net::SocketAddr; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio; +use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -49,17 +55,17 @@ pub struct Accept { } impl Future for Accept { - type Item = (TcpStream, SocketAddr); - type Error = ErrBox; + type Output = Result<(TcpStream, SocketAddr), ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.accept_state == AcceptState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.accept_state == AcceptState::Done { panic!("poll Accept after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let listener_resource = table - .get_mut::<TcpListenerResource>(self.rid) + .get_mut::<TcpListenerResource>(inner.rid) .ok_or_else(|| { let e = std::io::Error::new( std::io::ErrorKind::Other, @@ -68,44 +74,50 @@ impl Future for Accept { ErrBox::from(e) })?; - let listener = &mut listener_resource.listener; + let mut listener = + futures::compat::Compat01As03::new(&mut listener_resource.listener) + .map_err(ErrBox::from); - if self.accept_state == AcceptState::Eager { + if inner.accept_state == AcceptState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { - self.accept_state = AcceptState::Done; - return Ok((stream, addr).into()); + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { + inner.accept_state = AcceptState::Done; + let addr = stream.peer_addr().unwrap(); + return Poll::Ready(Ok((stream, addr))); } - Ok(Async::NotReady) => { - self.accept_state = AcceptState::Pending; - return Ok(Async::NotReady); + Poll::Pending => { + inner.accept_state = AcceptState::Pending; + return Poll::Pending; } - Err(e) => { - self.accept_state = AcceptState::Done; - return Err(e); + Poll::Ready(Some(Err(e))) => { + inner.accept_state = AcceptState::Done; + return Poll::Ready(Err(e)); } + _ => unreachable!(), } } - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { listener_resource.untrack_task(); - self.accept_state = AcceptState::Done; - Ok((stream, addr).into()) + inner.accept_state = AcceptState::Done; + let addr = stream.peer_addr().unwrap(); + Poll::Ready(Ok((stream, addr))) } - Ok(Async::NotReady) => { - listener_resource.track_task()?; - Ok(Async::NotReady) + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending } - Err(e) => { + Poll::Ready(Some(Err(e))) => { listener_resource.untrack_task(); - self.accept_state = AcceptState::Done; - Err(e) + inner.accept_state = AcceptState::Done; + Poll::Ready(Err(e)) } + _ => unreachable!(), } } } @@ -130,12 +142,18 @@ fn op_accept( let op = accept(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; let mut table = state_.lock_resource_table(); let rid = table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - Ok((rid, local_addr, remote_addr)) + futures::future::ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) .and_then(move |(rid, local_addr, remote_addr)| { @@ -146,7 +164,7 @@ fn op_accept( })) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -167,15 +185,21 @@ fn op_dial( state.check_net(&args.hostname, args.port)?; let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - TcpStream::connect(&addr) + futures::compat::Compat01As03::new(TcpStream::connect(&addr)) .map_err(ErrBox::from) .and_then(move |tcp_stream| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; let mut table = state_.lock_resource_table(); let rid = table .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - Ok((rid, local_addr, remote_addr)) + futures::future::ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) .and_then(move |(rid, local_addr, remote_addr)| { @@ -187,7 +211,7 @@ fn op_dial( }) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -235,8 +259,8 @@ struct ListenArgs { #[allow(dead_code)] struct TcpListenerResource { - listener: tokio::net::TcpListener, - task: Option<futures::task::Task>, + listener: Incoming, + waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, } @@ -244,7 +268,7 @@ impl Resource for TcpListenerResource {} impl Drop for TcpListenerResource { fn drop(&mut self) { - self.notify_task(); + self.wake_task(); } } @@ -253,12 +277,12 @@ impl TcpListenerResource { /// can be notified when listener is closed. /// /// Throws an error if another task is already tracked. - pub fn track_task(&mut self) -> Result<(), ErrBox> { + pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> { // Currently, we only allow tracking a single accept task for a listener. // This might be changed in the future with multiple workers. // Caveat: TcpListener by itself also only tracks an accept task at a time. // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.task.is_some() { + if self.waker.is_some() { let e = std::io::Error::new( std::io::ErrorKind::Other, "Another accept task is ongoing", @@ -266,22 +290,24 @@ impl TcpListenerResource { return Err(ErrBox::from(e)); } - self.task.replace(futures::task::current()); + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + self.waker.replace(waker); Ok(()) } /// Notifies a task when listener is closed so accept future can resolve. - pub fn notify_task(&mut self) { - if let Some(task) = self.task.take() { - task.notify(); + pub fn wake_task(&mut self) { + if let Some(waker) = self.waker.as_ref() { + waker.wake(); } } /// Stop tracking a task. /// Happens when the task is done and thus no further tracking is needed. pub fn untrack_task(&mut self) { - if self.task.is_some() { - self.task.take(); + if self.waker.is_some() { + self.waker.take(); } } } @@ -296,17 +322,19 @@ fn op_listen( state.check_net(&args.hostname, args.port)?; - let addr = resolve_addr(&args.hostname, args.port).wait()?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let listener_resource = TcpListenerResource { - listener, - task: None, + listener: listener.incoming(), + waker: None, local_addr, }; let mut table = state.lock_resource_table(); let rid = table.add("tcpListener", Box::new(listener_resource)); + debug!("New listener {} {}", rid, local_addr_str); Ok(JsonOp::Sync(json!({ "rid": rid, diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 237b02fd0..a267130ec 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -7,12 +7,18 @@ use crate::signal::kill; use crate::state::ThreadSafeState; use deno::*; use futures; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::task::SpawnExt; use std; use std::convert::From; +use std::future::Future; +use std::pin::Pin; use std::process::Command; use std::process::ExitStatus; +use std::task::Context; +use std::task::Poll; +use tokio::prelude::Async; use tokio_process::CommandExt; #[cfg(unix)] @@ -33,19 +39,23 @@ struct CloneFileFuture { } impl Future for CloneFileFuture { - type Item = tokio::fs::File; - type Error = ErrBox; + type Output = Result<tokio::fs::File, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - let mut table = self.state.lock_resource_table(); + fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut table = inner.state.lock_resource_table(); let repr = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; match repr { StreamResource::FsFile(ref mut file) => { - file.poll_try_clone().map_err(ErrBox::from) + match file.poll_try_clone().map_err(ErrBox::from) { + Err(err) => Poll::Ready(Err(err)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } } - _ => Err(bad_resource()), + _ => Poll::Ready(Err(bad_resource())), } } } @@ -54,11 +64,10 @@ fn clone_file( rid: u32, state: &ThreadSafeState, ) -> Result<std::fs::File, ErrBox> { - (CloneFileFuture { + futures::executor::block_on(CloneFileFuture { rid, state: state.clone(), }) - .wait() .map(|f| f.into_std()) } @@ -86,7 +95,7 @@ struct RunArgs { } struct ChildResource { - child: tokio_process::Child, + child: futures::compat::Compat01As03<tokio_process::Child>, } impl Resource for ChildResource {} @@ -179,7 +188,9 @@ fn op_run( None => None, }; - let child_resource = ChildResource { child }; + let child_resource = ChildResource { + child: futures::compat::Compat01As03::new(child), + }; let child_rid = table.add("child", Box::new(child_resource)); Ok(JsonOp::Sync(json!({ @@ -197,16 +208,16 @@ pub struct ChildStatus { } impl Future for ChildStatus { - type Item = ExitStatus; - type Error = ErrBox; + type Output = Result<ExitStatus, ErrBox>; - fn poll(&mut self) -> Poll<ExitStatus, ErrBox> { - let mut table = self.state.lock_resource_table(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut table = inner.state.lock_resource_table(); let child_resource = table - .get_mut::<ChildResource>(self.rid) + .get_mut::<ChildResource>(inner.rid) .ok_or_else(bad_resource)?; let child = &mut child_resource.child; - child.poll().map_err(ErrBox::from) + child.map_err(ErrBox::from).poll_unpin(cx) } } @@ -251,7 +262,10 @@ fn op_run_status( })) }); - Ok(JsonOp::Async(Box::new(future))) + let pool = futures::executor::ThreadPool::new().unwrap(); + let handle = pool.spawn_with_handle(future).unwrap(); + + Ok(JsonOp::Async(handle.boxed())) } #[derive(Deserialize)] diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 9d87aaf5c..7223633f8 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -3,7 +3,7 @@ use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::ops::json_op; use crate::state::ThreadSafeState; use deno::*; -use futures::Future; +use futures::future::FutureExt; use std; use std::time::Duration; use std::time::Instant; @@ -51,7 +51,7 @@ fn op_global_timer( .new_timeout(deadline) .then(move |_| futures::future::ok(json!({}))); - Ok(JsonOp::Async(Box::new(f))) + Ok(JsonOp::Async(f.boxed())) } // Returns a milliseconds and nanoseconds subsec diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 48419f76f..484b7057c 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -9,16 +9,22 @@ use crate::resolve_addr::resolve_addr; use crate::state::ThreadSafeState; use deno::Resource; use deno::*; -use futures::Async; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use std; use std::convert::From; use std::fs::File; +use std::future::Future; use std::io::BufReader; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; use tokio; +use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; @@ -72,49 +78,63 @@ pub fn op_dial_tls( } let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - TcpStream::connect(&addr) + futures::compat::Compat01As03::new(TcpStream::connect(&addr)) .and_then(move |tcp_stream| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(e), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(e), + }; let mut config = ClientConfig::new(); config .root_store .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); if let Some(path) = cert_file { - let key_file = File::open(path)?; + let key_file = match File::open(path) { + Ok(v) => v, + Err(e) => return futures::future::err(e), + }; let reader = &mut BufReader::new(key_file); config.root_store.add_pem_file(reader).unwrap(); } - let tls_connector = TlsConnector::from(Arc::new(config)); - Ok((tls_connector, tcp_stream, local_addr, remote_addr)) + futures::future::ok(( + tls_connector, + tcp_stream, + local_addr, + remote_addr, + )) }) .map_err(ErrBox::from) .and_then( move |(tls_connector, tcp_stream, local_addr, remote_addr)| { let dnsname = DNSNameRef::try_from_ascii_str(&domain) .expect("Invalid DNS lookup"); - tls_connector - .connect(dnsname, tcp_stream) - .map_err(ErrBox::from) - .and_then(move |tls_stream| { - let mut table = state_.lock_resource_table(); - let rid = table.add( - "clientTlsStream", - Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), - ); - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }) + futures::compat::Compat01As03::new( + tls_connector.connect(dnsname, tcp_stream), + ) + .map_err(ErrBox::from) + .and_then(move |tls_stream| { + let mut table = state_.lock_resource_table(); + let rid = table.add( + "clientTlsStream", + Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), + ); + futures::future::ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }) }, ) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> { @@ -177,9 +197,9 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> { #[allow(dead_code)] pub struct TlsListenerResource { - listener: tokio::net::TcpListener, + listener: Incoming, tls_acceptor: TlsAcceptor, - task: Option<futures::task::Task>, + waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, } @@ -187,7 +207,7 @@ impl Resource for TlsListenerResource {} impl Drop for TlsListenerResource { fn drop(&mut self) { - self.notify_task(); + self.wake_task(); } } @@ -196,12 +216,12 @@ impl TlsListenerResource { /// can be notified when listener is closed. /// /// Throws an error if another task is already tracked. - pub fn track_task(&mut self) -> Result<(), ErrBox> { + pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> { // Currently, we only allow tracking a single accept task for a listener. // This might be changed in the future with multiple workers. // Caveat: TcpListener by itself also only tracks an accept task at a time. // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.task.is_some() { + if self.waker.is_some() { let e = std::io::Error::new( std::io::ErrorKind::Other, "Another accept task is ongoing", @@ -209,22 +229,24 @@ impl TlsListenerResource { return Err(ErrBox::from(e)); } - self.task.replace(futures::task::current()); + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + self.waker.replace(waker); Ok(()) } /// Notifies a task when listener is closed so accept future can resolve. - pub fn notify_task(&mut self) { - if let Some(task) = self.task.take() { - task.notify(); + pub fn wake_task(&mut self) { + if let Some(waker) = self.waker.as_ref() { + waker.wake(); } } /// Stop tracking a task. /// Happens when the task is done and thus no further tracking is needed. pub fn untrack_task(&mut self) { - if self.task.is_some() { - self.task.take(); + if self.waker.is_some() { + self.waker.take(); } } } @@ -259,14 +281,15 @@ fn op_listen_tls( .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) .expect("invalid key or certificate"); let tls_acceptor = TlsAcceptor::from(Arc::new(config)); - let addr = resolve_addr(&args.hostname, args.port).wait()?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let tls_listener_resource = TlsListenerResource { - listener, + listener: listener.incoming(), tls_acceptor, - task: None, + waker: None, local_addr, }; let mut table = state.lock_resource_table(); @@ -302,17 +325,17 @@ pub struct AcceptTls { } impl Future for AcceptTls { - type Item = (TcpStream, SocketAddr); - type Error = ErrBox; + type Output = Result<(TcpStream, SocketAddr), ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.accept_state == AcceptTlsState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.accept_state == AcceptTlsState::Done { panic!("poll AcceptTls after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let listener_resource = table - .get_mut::<TlsListenerResource>(self.rid) + .get_mut::<TlsListenerResource>(inner.rid) .ok_or_else(|| { let e = std::io::Error::new( std::io::ErrorKind::Other, @@ -321,44 +344,50 @@ impl Future for AcceptTls { ErrBox::from(e) })?; - let listener = &mut listener_resource.listener; + let mut listener = + futures::compat::Compat01As03::new(&mut listener_resource.listener) + .map_err(ErrBox::from); - if self.accept_state == AcceptTlsState::Eager { + if inner.accept_state == AcceptTlsState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { - self.accept_state = AcceptTlsState::Done; - return Ok((stream, addr).into()); + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { + inner.accept_state = AcceptTlsState::Done; + let addr = stream.peer_addr().unwrap(); + return Poll::Ready(Ok((stream, addr))); } - Ok(Async::NotReady) => { - self.accept_state = AcceptTlsState::Pending; - return Ok(Async::NotReady); + Poll::Pending => { + inner.accept_state = AcceptTlsState::Pending; + return Poll::Pending; } - Err(e) => { - self.accept_state = AcceptTlsState::Done; - return Err(e); + Poll::Ready(Some(Err(e))) => { + inner.accept_state = AcceptTlsState::Done; + return Poll::Ready(Err(e)); } + _ => unreachable!(), } } - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { listener_resource.untrack_task(); - self.accept_state = AcceptTlsState::Done; - Ok((stream, addr).into()) + inner.accept_state = AcceptTlsState::Done; + let addr = stream.peer_addr().unwrap(); + Poll::Ready(Ok((stream, addr))) } - Ok(Async::NotReady) => { - listener_resource.track_task()?; - Ok(Async::NotReady) + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending } - Err(e) => { + Poll::Ready(Some(Err(e))) => { listener_resource.untrack_task(); - self.accept_state = AcceptTlsState::Done; - Err(e) + inner.accept_state = AcceptTlsState::Done; + Poll::Ready(Err(e)) } + _ => unreachable!(), } } } @@ -379,9 +408,15 @@ fn op_accept_tls( let state2 = state.clone(); let op = accept_tls(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - Ok((tcp_stream, local_addr, remote_addr)) + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + futures::future::ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { let table = state1.lock_resource_table(); @@ -390,18 +425,18 @@ fn op_accept_tls( .ok_or_else(bad_resource) .expect("Can't find tls listener"); - resource - .tls_acceptor - .accept(tcp_stream) - .map_err(ErrBox::from) - .and_then(move |tls_stream| { - let mut table = state2.lock_resource_table(); - let rid = table.add( - "serverTlsStream", - Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), - ); - Ok((rid, local_addr, remote_addr)) - }) + futures::compat::Compat01As03::new( + resource.tls_acceptor.accept(tcp_stream), + ) + .map_err(ErrBox::from) + .and_then(move |tls_stream| { + let mut table = state2.lock_resource_table(); + let rid = table.add( + "serverTlsStream", + Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), + ); + futures::future::ok((rid, local_addr, remote_addr)) + }) }) .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ @@ -411,5 +446,5 @@ fn op_accept_tls( })) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index cf7378a91..ee60c6824 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -7,16 +7,21 @@ use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; +use crate::tokio_util; use crate::worker::Worker; use deno::*; use futures; -use futures::Async; -use futures::Future; -use futures::Sink; -use futures::Stream; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; use std; use std::convert::From; +use std::future::Future; +use std::pin::Pin; use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op( @@ -52,13 +57,13 @@ struct GetMessageFuture { } impl Future for GetMessageFuture { - type Item = Option<Buf>; - type Error = ErrBox; + type Output = Option<Buf>; - fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { - let mut channels = self.state.worker_channels.lock().unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); let receiver = &mut channels.receiver; - receiver.poll().map_err(ErrBox::from) + receiver.poll_next_unpin(cx) } } @@ -72,17 +77,15 @@ fn op_worker_get_message( state: state.clone(), }; - let op = op - .map_err(move |_| -> ErrBox { unimplemented!() }) - .and_then(move |maybe_buf| { - debug!("op_worker_get_message"); + let op = op.then(move |maybe_buf| { + debug!("op_worker_get_message"); - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf.to_owned()) - })) - }); + futures::future::ok(json!({ + "data": maybe_buf.map(|buf| buf.to_owned()) + })) + }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } /// Post message to host as guest worker @@ -94,9 +97,7 @@ fn op_worker_post_message( let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let mut channels = state.worker_channels.lock().unwrap(); let sender = &mut channels.sender; - sender - .send(d) - .wait() + futures::executor::block_on(sender.send(d)) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; Ok(JsonOp::Sync(json!({}))) @@ -165,12 +166,35 @@ fn op_create_worker( let op = worker .execute_mod_async(&module_specifier, None, false) - .and_then(move |()| Ok(exec_cb(worker))); + .and_then(move |()| futures::future::ok(exec_cb(worker))); - let result = op.wait()?; + let result = tokio_util::block_on(op.boxed())?; Ok(JsonOp::Sync(result)) } +struct GetWorkerClosedFuture { + state: ThreadSafeState, + rid: ResourceId, +} + +impl Future for GetWorkerClosedFuture { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut workers_table = inner.state.workers.lock().unwrap(); + let maybe_worker = workers_table.get_mut(&inner.rid); + if maybe_worker.is_none() { + return Poll::Ready(Ok(())); + } + match maybe_worker.unwrap().poll_unpin(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } +} + #[derive(Deserialize)] struct HostGetWorkerClosedArgs { id: i32, @@ -185,18 +209,18 @@ fn op_host_get_worker_closed( let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; let id = args.id as u32; let state_ = state.clone(); - let workers_table = state.workers.lock().unwrap(); - // TODO: handle bad worker id gracefully - let worker = workers_table.get(&id).unwrap(); - let shared_worker_future = worker.clone().shared(); - let op = shared_worker_future.then(move |_result| { + let future = GetWorkerClosedFuture { + state: state.clone(), + rid: id, + }; + let op = future.then(move |_result| { let mut workers_table = state_.workers.lock().unwrap(); workers_table.remove(&id); futures::future::ok(json!({})) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -225,7 +249,7 @@ fn op_host_get_message( })) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -247,8 +271,7 @@ fn op_host_post_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - worker - .post_message(msg) + tokio_util::block_on(worker.post_message(msg).boxed()) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; Ok(JsonOp::Sync(json!({}))) } |