summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml2
-rw-r--r--core/examples/http_bench.rs128
2 files changed, 61 insertions, 69 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f4f4a0bc3..cea3992c9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -28,4 +28,4 @@ path = "examples/http_bench.rs"
# tokio is only used for deno_core_http_bench
[dev_dependencies]
-tokio = { version = "0.2.0", features = ["full"] }
+tokio = "0.1.18"
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 40083ee4f..a7b26f4b1 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -13,23 +13,25 @@ extern crate log;
extern crate lazy_static;
use deno::*;
-use futures::future::Future;
+use futures::compat::AsyncRead01CompatExt;
+use futures::compat::AsyncWrite01CompatExt;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
+use futures::io::{AsyncRead, AsyncWrite};
+use futures::stream::StreamExt;
use std::env;
+use std::future::Future;
use std::io::Error;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::MutexGuard;
-use tokio::io::AsyncRead;
-use tokio::io::AsyncWrite;
+use std::task::Poll;
+use tokio::net::tcp::Incoming;
static LOGGER: Logger = Logger;
-
struct Logger;
-
impl log::Log for Logger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
metadata.level() <= log::max_level()
@@ -170,30 +172,27 @@ fn main() {
isolate.register_op("write", http_op(op_write));
isolate.register_op("close", http_op(op_close));
- let multi_thread = args.iter().any(|a| a == "--multi-thread");
+ let main_future = isolate
+ .then(|r| {
+ js_check(r);
+ futures::future::ok(())
+ })
+ .boxed();
- let mut builder = tokio::runtime::Builder::new();
- let builder = if multi_thread {
+ if args.iter().any(|a| a == "--multi-thread") {
println!("multi-thread");
- builder.threaded_scheduler()
+ tokio::run(main_future.compat());
} else {
println!("single-thread");
- builder.basic_scheduler()
- };
-
- let mut runtime = builder
- .enable_io()
- .build()
- .expect("Unable to create tokio runtime");
- let result = runtime.block_on(isolate.boxed());
- js_check(result);
+ tokio::runtime::current_thread::run(main_future.compat());
+ }
}
pub fn bad_resource() -> Error {
Error::new(ErrorKind::NotFound, "bad resource id")
}
-struct TcpListener(tokio::net::TcpListener);
+struct TcpListener(Incoming);
impl Resource for TcpListener {}
@@ -216,23 +215,27 @@ fn op_accept(
) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("accept {}", rid);
-
- let accept_fut = futures::future::poll_fn(move |cx| {
+ let fut = futures::future::poll_fn(move |cx| {
let mut table = lock_resource_table();
let listener =
table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?;
- let listener = &mut listener.0;
- listener.poll_accept(cx)
- });
-
- let fut = async move {
- let (stream, addr) = accept_fut.await?;
+ let mut listener = futures::compat::Compat01As03::new(&mut listener.0);
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)),
+ Poll::Ready(Some(Ok(stream))) => {
+ let addr = stream.peer_addr().unwrap();
+ Poll::Ready(Ok((stream, addr)))
+ }
+ Poll::Pending => Poll::Pending,
+ _ => unreachable!(),
+ }
+ })
+ .and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
let mut table = lock_resource_table();
let rid = table.add("tcpStream", Box::new(TcpStream(stream)));
- Ok(rid as i32)
- };
-
+ futures::future::ok(rid as i32)
+ });
fut.boxed()
}
@@ -241,15 +244,12 @@ fn op_listen(
_zero_copy_buf: Option<PinnedBuf>,
) -> Pin<Box<HttpOp>> {
debug!("listen");
- let fut = async {
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let listener = tokio::net::TcpListener::bind(&addr).await?;
- let mut table = lock_resource_table();
- let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
- Ok(rid as i32)
- };
-
- fut.boxed()
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let listener = tokio::net::TcpListener::bind(&addr).unwrap();
+ let mut table = lock_resource_table();
+ let rid =
+ table.add("tcpListener", Box::new(TcpListener(listener.incoming())));
+ futures::future::ok(rid as i32).boxed()
}
fn op_close(
@@ -257,13 +257,11 @@ fn op_close(
_zero_copy_buf: Option<PinnedBuf>,
) -> Pin<Box<HttpOp>> {
debug!("close");
- let fut = async move {
- let rid = record.arg as u32;
- let mut table = lock_resource_table();
- match table.close(rid) {
- Some(_) => Ok(0),
- None => Err(bad_resource()),
- }
+ let rid = record.arg as u32;
+ let mut table = lock_resource_table();
+ let fut = match table.close(rid) {
+ Some(_) => futures::future::ok(0),
+ None => futures::future::err(bad_resource()),
};
fut.boxed()
}
@@ -275,20 +273,17 @@ fn op_read(
let rid = record.arg as u32;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
-
- let read_fut = futures::future::poll_fn(move |cx| {
+ let fut = futures::future::poll_fn(move |cx| {
let mut table = lock_resource_table();
let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- let pinned_stream = Pin::new(&mut stream.0);
- pinned_stream.poll_read(cx, &mut zero_copy_buf)
- });
-
- let fut = async move {
- let nread = read_fut.await?;
+ let mut f: Box<dyn AsyncRead + Unpin> =
+ Box::new(AsyncRead01CompatExt::compat(&stream.0));
+ AsyncRead::poll_read(Pin::new(&mut f), cx, &mut zero_copy_buf)
+ })
+ .and_then(move |nread| {
debug!("read success {}", nread);
- Ok(nread as i32)
- };
-
+ futures::future::ok(nread as i32)
+ });
fut.boxed()
}
@@ -299,20 +294,17 @@ fn op_write(
let rid = record.arg as u32;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
-
- let write_fut = futures::future::poll_fn(move |cx| {
+ let fut = futures::future::poll_fn(move |cx| {
let mut table = lock_resource_table();
let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- let pinned_stream = Pin::new(&mut stream.0);
- pinned_stream.poll_write(cx, &zero_copy_buf)
- });
-
- let fut = async move {
- let nwritten = write_fut.await?;
+ let mut f: Box<dyn AsyncWrite + Unpin> =
+ Box::new(AsyncWrite01CompatExt::compat(&stream.0));
+ AsyncWrite::poll_write(Pin::new(&mut f), cx, &zero_copy_buf)
+ })
+ .and_then(move |nwritten| {
debug!("write success {}", nwritten);
- Ok(nwritten as i32)
- };
-
+ futures::future::ok(nwritten as i32)
+ });
fut.boxed()
}