diff options
Diffstat (limited to 'core/examples/http_bench_json_ops.rs')
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 155 |
1 files changed, 104 insertions, 51 deletions
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2cf3d09e3..77f5b9dbe 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -5,25 +5,25 @@ extern crate log; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncRefFuture; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; -use futures::future::Future; use serde_json::Value; use std::cell::RefCell; +use std::convert::TryFrom; use std::convert::TryInto; use std::env; +use std::io::Error; use std::net::SocketAddr; -use std::pin::Pin; use std::rc::Rc; -use std::task::Poll; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::runtime; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; struct Logger; @@ -41,6 +41,64 @@ impl log::Log for Logger { fn flush(&self) {} } +// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in +// a cell, because it only supports one op (`accept`) which does not require +// a mutable reference to the listener. +struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); + +impl Resource for TcpListener {} + +impl TcpListener { + /// Returns a future that yields a shared borrow of the TCP listener. + fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { + RcRef::map(self, |r| &r.0).borrow() + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = Error; + fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(l) + .map(AsyncRefCell::new) + .map(Self) + } +} + +struct TcpStream { + rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, + wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, +} + +impl Resource for TcpStream {} + +impl TcpStream { + /// Returns a future that yields an exclusive borrow of the read end of the + /// tcp stream. + fn rd_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + /// Returns a future that yields an exclusive borrow of the write end of the + /// tcp stream. + fn wr_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } +} + +impl From<tokio::net::TcpStream> for TcpStream { + fn from(s: tokio::net::TcpStream) -> Self { + let (rd, wr) = s.into_split(); + Self { + rd: rd.into(), + wr: wr.into(), + } + } +} + fn create_js_runtime() -> JsRuntime { let mut runtime = JsRuntime::new(Default::default()); runtime.register_op("listen", deno_core::json_op_sync(op_listen)); @@ -59,8 +117,9 @@ fn op_listen( debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let rid = state.resource_table.add("tcpListener", Box::new(listener)); + std_listener.set_nonblocking(true)?; + let listener = TcpListener::try_from(std_listener)?; + let rid = state.resource_table_2.add(listener); Ok(serde_json::json!({ "rid": rid })) } @@ -78,17 +137,17 @@ fn op_close( .unwrap(); debug!("close rid={}", rid); state - .resource_table + .resource_table_2 .close(rid) .map(|_| serde_json::json!(())) .ok_or_else(bad_resource_id) } -fn op_accept( +async fn op_accept( state: Rc<RefCell<OpState>>, args: Value, _bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { let rid: u32 = args .get("rid") .unwrap() @@ -98,26 +157,24 @@ fn op_accept( .unwrap(); debug!("accept rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; - - let listener = resource_table - .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource_id)?; - listener.poll_accept(cx)?.map(|(stream, _addr)| { - let rid = resource_table.add("tcpStream", Box::new(stream)); - Ok(serde_json::json!({ "rid": rid })) - }) - }) + let listener_rc = state + .borrow() + .resource_table_2 + .get::<TcpListener>(rid) + .ok_or_else(bad_resource_id)?; + let listener_ref = listener_rc.borrow().await; + + let stream: TcpStream = listener_ref.accept().await?.0.into(); + let rid = state.borrow_mut().resource_table_2.add(stream); + Ok(serde_json::json!({ "rid": rid })) } -fn op_read( +async fn op_read( state: Rc<RefCell<OpState>>, args: Value, mut bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid: u32 = args .get("rid") .unwrap() @@ -127,25 +184,23 @@ fn op_read( .unwrap(); debug!("read rid={}", rid); - poll_fn(move |cx| -> Poll<Result<Value, AnyError>> { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream) - .poll_read(cx, &mut bufs[0])? - .map(|nread| Ok(serde_json::json!({ "nread": nread }))) - }) + let nread = rd_stream_mut.read(&mut bufs[0]).await?; + Ok(serde_json::json!({ "nread": nread })) } -fn op_write( +async fn op_write( state: Rc<RefCell<OpState>>, args: Value, bufs: BufVec, -) -> impl Future<Output = Result<Value, AnyError>> { +) -> Result<Value, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid: u32 = args .get("rid") .unwrap() @@ -155,16 +210,15 @@ fn op_write( .unwrap(); debug!("write rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream) - .poll_write(cx, &bufs[0])? - .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) - }) + let nwritten = wr_stream_mut.write(&bufs[0]).await?; + Ok(serde_json::json!({ "nwritten": nwritten })) } fn main() { @@ -180,8 +234,7 @@ fn main() { deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); - let mut runtime = runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); |