diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/Cargo.toml | 2 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 128 |
2 files changed, 61 insertions, 69 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml index f4f4a0bc3..cea3992c9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -28,4 +28,4 @@ path = "examples/http_bench.rs" # tokio is only used for deno_core_http_bench [dev_dependencies] -tokio = { version = "0.2.0", features = ["full"] } +tokio = "0.1.18" diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 40083ee4f..a7b26f4b1 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -13,23 +13,25 @@ extern crate log; extern crate lazy_static; use deno::*; -use futures::future::Future; +use futures::compat::AsyncRead01CompatExt; +use futures::compat::AsyncWrite01CompatExt; use futures::future::FutureExt; use futures::future::TryFutureExt; +use futures::io::{AsyncRead, AsyncWrite}; +use futures::stream::StreamExt; use std::env; +use std::future::Future; use std::io::Error; use std::io::ErrorKind; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Mutex; use std::sync::MutexGuard; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; +use std::task::Poll; +use tokio::net::tcp::Incoming; static LOGGER: Logger = Logger; - struct Logger; - impl log::Log for Logger { fn enabled(&self, metadata: &log::Metadata) -> bool { metadata.level() <= log::max_level() @@ -170,30 +172,27 @@ fn main() { isolate.register_op("write", http_op(op_write)); isolate.register_op("close", http_op(op_close)); - let multi_thread = args.iter().any(|a| a == "--multi-thread"); + let main_future = isolate + .then(|r| { + js_check(r); + futures::future::ok(()) + }) + .boxed(); - let mut builder = tokio::runtime::Builder::new(); - let builder = if multi_thread { + if args.iter().any(|a| a == "--multi-thread") { println!("multi-thread"); - builder.threaded_scheduler() + tokio::run(main_future.compat()); } else { println!("single-thread"); - builder.basic_scheduler() - }; - - let mut runtime = builder - .enable_io() - .build() - .expect("Unable to create tokio runtime"); - let result = runtime.block_on(isolate.boxed()); - js_check(result); + tokio::runtime::current_thread::run(main_future.compat()); + } } pub fn bad_resource() -> Error { Error::new(ErrorKind::NotFound, "bad resource id") } -struct TcpListener(tokio::net::TcpListener); +struct TcpListener(Incoming); impl Resource for TcpListener {} @@ -216,23 +215,27 @@ fn op_accept( ) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("accept {}", rid); - - let accept_fut = futures::future::poll_fn(move |cx| { + let fut = futures::future::poll_fn(move |cx| { let mut table = lock_resource_table(); let listener = table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?; - let listener = &mut listener.0; - listener.poll_accept(cx) - }); - - let fut = async move { - let (stream, addr) = accept_fut.await?; + let mut listener = futures::compat::Compat01As03::new(&mut listener.0); + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)), + Poll::Ready(Some(Ok(stream))) => { + let addr = stream.peer_addr().unwrap(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => Poll::Pending, + _ => unreachable!(), + } + }) + .and_then(move |(stream, addr)| { debug!("accept success {}", addr); let mut table = lock_resource_table(); let rid = table.add("tcpStream", Box::new(TcpStream(stream))); - Ok(rid as i32) - }; - + futures::future::ok(rid as i32) + }); fut.boxed() } @@ -241,15 +244,12 @@ fn op_listen( _zero_copy_buf: Option<PinnedBuf>, ) -> Pin<Box<HttpOp>> { debug!("listen"); - let fut = async { - let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).await?; - let mut table = lock_resource_table(); - let rid = table.add("tcpListener", Box::new(TcpListener(listener))); - Ok(rid as i32) - }; - - fut.boxed() + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let mut table = lock_resource_table(); + let rid = + table.add("tcpListener", Box::new(TcpListener(listener.incoming()))); + futures::future::ok(rid as i32).boxed() } fn op_close( @@ -257,13 +257,11 @@ fn op_close( _zero_copy_buf: Option<PinnedBuf>, ) -> Pin<Box<HttpOp>> { debug!("close"); - let fut = async move { - let rid = record.arg as u32; - let mut table = lock_resource_table(); - match table.close(rid) { - Some(_) => Ok(0), - None => Err(bad_resource()), - } + let rid = record.arg as u32; + let mut table = lock_resource_table(); + let fut = match table.close(rid) { + Some(_) => futures::future::ok(0), + None => futures::future::err(bad_resource()), }; fut.boxed() } @@ -275,20 +273,17 @@ fn op_read( let rid = record.arg as u32; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); - - let read_fut = futures::future::poll_fn(move |cx| { + let fut = futures::future::poll_fn(move |cx| { let mut table = lock_resource_table(); let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - let pinned_stream = Pin::new(&mut stream.0); - pinned_stream.poll_read(cx, &mut zero_copy_buf) - }); - - let fut = async move { - let nread = read_fut.await?; + let mut f: Box<dyn AsyncRead + Unpin> = + Box::new(AsyncRead01CompatExt::compat(&stream.0)); + AsyncRead::poll_read(Pin::new(&mut f), cx, &mut zero_copy_buf) + }) + .and_then(move |nread| { debug!("read success {}", nread); - Ok(nread as i32) - }; - + futures::future::ok(nread as i32) + }); fut.boxed() } @@ -299,20 +294,17 @@ fn op_write( let rid = record.arg as u32; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); - - let write_fut = futures::future::poll_fn(move |cx| { + let fut = futures::future::poll_fn(move |cx| { let mut table = lock_resource_table(); let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - let pinned_stream = Pin::new(&mut stream.0); - pinned_stream.poll_write(cx, &zero_copy_buf) - }); - - let fut = async move { - let nwritten = write_fut.await?; + let mut f: Box<dyn AsyncWrite + Unpin> = + Box::new(AsyncWrite01CompatExt::compat(&stream.0)); + AsyncWrite::poll_write(Pin::new(&mut f), cx, &zero_copy_buf) + }) + .and_then(move |nwritten| { debug!("write success {}", nwritten); - Ok(nwritten as i32) - }; - + futures::future::ok(nwritten as i32) + }); fut.boxed() } |