diff options
Diffstat (limited to 'core/examples')
-rw-r--r-- | core/examples/http_bench_bin_ops.js | 1 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 145 | ||||
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 155 |
3 files changed, 204 insertions, 97 deletions
diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js index 066d5bf58..a90be70c0 100644 --- a/core/examples/http_bench_bin_ops.js +++ b/core/examples/http_bench_bin_ops.js @@ -134,7 +134,6 @@ async function main() { for (;;) { const rid = await accept(listenerRid); - // Deno.core.print(`accepted ${rid}`); if (rid < 0) { Deno.core.print(`accept error ${rid}`); return; diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 7335b8670..9af74d980 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,16 +3,21 @@ #[macro_use] extern crate log; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncRefFuture; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::Op; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFuture; use futures::future::TryFutureExt; use std::cell::RefCell; +use std::convert::TryFrom; use std::convert::TryInto; use std::env; use std::fmt::Debug; @@ -20,14 +25,10 @@ use std::io::Error; use std::io::ErrorKind; use std::mem::size_of; use std::net::SocketAddr; -use std::pin::Pin; use std::ptr; use std::rc::Rc; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::runtime; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; struct Logger; @@ -45,6 +46,64 @@ impl log::Log for Logger { fn flush(&self) {} } +// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in +// a cell, because it only supports one op (`accept`) which does not require +// a mutable reference to the listener. +struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); + +impl Resource for TcpListener {} + +impl TcpListener { + /// Returns a future that yields a shared borrow of the TCP listener. + fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { + RcRef::map(self, |r| &r.0).borrow() + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = Error; + fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(l) + .map(AsyncRefCell::new) + .map(Self) + } +} + +struct TcpStream { + rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, + wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, +} + +impl Resource for TcpStream {} + +impl TcpStream { + /// Returns a future that yields an exclusive borrow of the read end of the + /// tcp stream. + fn rd_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + /// Returns a future that yields an exclusive borrow of the write end of the + /// tcp stream. + fn wr_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } +} + +impl From<tokio::net::TcpStream> for TcpStream { + fn from(s: tokio::net::TcpStream) -> Self { + let (rd, wr) = s.into_split(); + Self { + rd: rd.into(), + wr: wr.into(), + } + } +} + #[derive(Copy, Clone, Debug, PartialEq)] struct Record { promise_id: u32, @@ -94,8 +153,9 @@ fn op_listen( debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let rid = state.resource_table.add("tcpListener", Box::new(listener)); + std_listener.set_nonblocking(true)?; + let listener = TcpListener::try_from(std_listener)?; + let rid = state.resource_table_2.add(listener); Ok(rid) } @@ -106,7 +166,7 @@ fn op_close( ) -> Result<u32, Error> { debug!("close rid={}", rid); state - .resource_table + .resource_table_2 .close(rid) .map(|_| 0) .ok_or_else(bad_resource_id) @@ -119,56 +179,52 @@ async fn op_accept( ) -> Result<u32, Error> { debug!("accept rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; - - let listener = resource_table - .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource_id)?; - listener.poll_accept(cx).map_ok(|(stream, _addr)| { - resource_table.add("tcpStream", Box::new(stream)) - }) - }) - .await + let listener_rc = state + .borrow() + .resource_table_2 + .get::<TcpListener>(rid) + .ok_or_else(bad_resource_id)?; + let listener_ref = listener_rc.borrow().await; + + let stream: TcpStream = listener_ref.accept().await?.0.into(); + let rid = state.borrow_mut().resource_table_2.add(stream); + Ok(rid) } -fn op_read( +async fn op_read( state: Rc<RefCell<OpState>>, rid: u32, - bufs: BufVec, -) -> impl TryFuture<Ok = usize, Error = Error> { + mut bufs: BufVec, +) -> Result<usize, Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let mut buf = bufs[0].clone(); - debug!("read rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream).poll_read(cx, &mut buf) - }) + rd_stream_mut.read(&mut bufs[0]).await } -fn op_write( +async fn op_write( state: Rc<RefCell<OpState>>, rid: u32, bufs: BufVec, -) -> impl TryFuture<Ok = usize, Error = Error> { +) -> Result<usize, Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let buf = bufs[0].clone(); debug!("write rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream).poll_write(cx, &buf) - }) + wr_stream_mut.write(&bufs[0]).await } fn register_op_bin_sync<F>( @@ -247,8 +303,7 @@ fn main() { deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); - let mut runtime = runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2cf3d09e3..77f5b9dbe 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -5,25 +5,25 @@ extern crate log; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncRefFuture; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; -use futures::future::Future; use serde_json::Value; use std::cell::RefCell; +use std::convert::TryFrom; use std::convert::TryInto; use std::env; +use std::io::Error; use std::net::SocketAddr; -use std::pin::Pin; use std::rc::Rc; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::runtime; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; struct Logger; @@ -41,6 +41,64 @@ impl log::Log for Logger { fn flush(&self) {} } +// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in +// a cell, because it only supports one op (`accept`) which does not require +// a mutable reference to the listener. +struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); + +impl Resource for TcpListener {} + +impl TcpListener { + /// Returns a future that yields a shared borrow of the TCP listener. + fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { + RcRef::map(self, |r| &r.0).borrow() + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = Error; + fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(l) + .map(AsyncRefCell::new) + .map(Self) + } +} + +struct TcpStream { + rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, + wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, +} + +impl Resource for TcpStream {} + +impl TcpStream { + /// Returns a future that yields an exclusive borrow of the read end of the + /// tcp stream. + fn rd_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + /// Returns a future that yields an exclusive borrow of the write end of the + /// tcp stream. + fn wr_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } +} + +impl From<tokio::net::TcpStream> for TcpStream { + fn from(s: tokio::net::TcpStream) -> Self { + let (rd, wr) = s.into_split(); + Self { + rd: rd.into(), + wr: wr.into(), + } + } +} + fn create_js_runtime() -> JsRuntime { let mut runtime = JsRuntime::new(Default::default()); runtime.register_op("listen", deno_core::json_op_sync(op_listen)); @@ -59,8 +117,9 @@ fn op_listen( debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let rid = state.resource_table.add("tcpListener", Box::new(listener)); + std_listener.set_nonblocking(true)?; + let listener = TcpListener::try_from(std_listener)?; + let rid = state.resource_table_2.add(listener); Ok(serde_json::json!({ "rid": rid })) } @@ -78,17 +137,17 @@ fn op_close( .unwrap(); debug!("close rid={}", rid); state - .resource_table + .resource_table_2 .close(rid) .map(|_| serde_json::json!(())) .ok_or_else(bad_resource_id) } -fn op_accept( +async fn op_accept( state: Rc<RefCell<OpState>>, args: Value, _bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { let rid: u32 = args .get("rid") .unwrap() @@ -98,26 +157,24 @@ fn op_accept( .unwrap(); debug!("accept rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; - - let listener = resource_table - .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource_id)?; - listener.poll_accept(cx)?.map(|(stream, _addr)| { - let rid = resource_table.add("tcpStream", Box::new(stream)); - Ok(serde_json::json!({ "rid": rid })) - }) - }) + let listener_rc = state + .borrow() + .resource_table_2 + .get::<TcpListener>(rid) + .ok_or_else(bad_resource_id)?; + let listener_ref = listener_rc.borrow().await; + + let stream: TcpStream = listener_ref.accept().await?.0.into(); + let rid = state.borrow_mut().resource_table_2.add(stream); + Ok(serde_json::json!({ "rid": rid })) } -fn op_read( +async fn op_read( state: Rc<RefCell<OpState>>, args: Value, mut bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid: u32 = args .get("rid") .unwrap() @@ -127,25 +184,23 @@ fn op_read( .unwrap(); debug!("read rid={}", rid); - poll_fn(move |cx| -> Poll<Result<Value, AnyError>> { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream) - .poll_read(cx, &mut bufs[0])? - .map(|nread| Ok(serde_json::json!({ "nread": nread }))) - }) + let nread = rd_stream_mut.read(&mut bufs[0]).await?; + Ok(serde_json::json!({ "nread": nread })) } -fn op_write( +async fn op_write( state: Rc<RefCell<OpState>>, args: Value, bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid: u32 = args .get("rid") .unwrap() @@ -155,16 +210,15 @@ fn op_write( .unwrap(); debug!("write rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream) - .poll_write(cx, &bufs[0])? - .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) - }) + let nwritten = wr_stream_mut.write(&bufs[0]).await?; + Ok(serde_json::json!({ "nwritten": nwritten })) } fn main() { @@ -180,8 +234,7 @@ fn main() { deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); - let mut runtime = runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); |