diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-11-25 00:38:23 +0100 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2020-11-25 01:15:14 +0100 |
commit | 8d12653738066facfc228b1d0d9e31b76c6d9de0 (patch) | |
tree | e45c90cc79607fd5c8a29d0231f9cc43353399ac /core/examples/http_bench_bin_ops.rs | |
parent | 605874ee98b52f5de7d1d1284507d5a9cb9eea9d (diff) |
core: implement 'AsyncRefCell' and 'ResourceTable2' (#8273)
Diffstat (limited to 'core/examples/http_bench_bin_ops.rs')
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 145 |
1 files changed, 100 insertions, 45 deletions
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 7335b8670..9af74d980 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,16 +3,21 @@ #[macro_use] extern crate log; +use deno_core::AsyncMutFuture; +use deno_core::AsyncRefCell; +use deno_core::AsyncRefFuture; use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::Op; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; -use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFuture; use futures::future::TryFutureExt; use std::cell::RefCell; +use std::convert::TryFrom; use std::convert::TryInto; use std::env; use std::fmt::Debug; @@ -20,14 +25,10 @@ use std::io::Error; use std::io::ErrorKind; use std::mem::size_of; use std::net::SocketAddr; -use std::pin::Pin; use std::ptr; use std::rc::Rc; -use tokio::io::AsyncRead; -use tokio::io::AsyncWrite; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::runtime; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; struct Logger; @@ -45,6 +46,64 @@ impl log::Log for Logger { fn flush(&self) {} } +// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in +// a cell, because it only supports one op (`accept`) which does not require +// a mutable reference to the listener. +struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); + +impl Resource for TcpListener {} + +impl TcpListener { + /// Returns a future that yields a shared borrow of the TCP listener. + fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { + RcRef::map(self, |r| &r.0).borrow() + } +} + +impl TryFrom<std::net::TcpListener> for TcpListener { + type Error = Error; + fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(l) + .map(AsyncRefCell::new) + .map(Self) + } +} + +struct TcpStream { + rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, + wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, +} + +impl Resource for TcpStream {} + +impl TcpStream { + /// Returns a future that yields an exclusive borrow of the read end of the + /// tcp stream. + fn rd_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { + RcRef::map(self, |r| &r.rd).borrow_mut() + } + + /// Returns a future that yields an exclusive borrow of the write end of the + /// tcp stream. + fn wr_borrow_mut( + self: Rc<Self>, + ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { + RcRef::map(self, |r| &r.wr).borrow_mut() + } +} + +impl From<tokio::net::TcpStream> for TcpStream { + fn from(s: tokio::net::TcpStream) -> Self { + let (rd, wr) = s.into_split(); + Self { + rd: rd.into(), + wr: wr.into(), + } + } +} + #[derive(Copy, Clone, Debug, PartialEq)] struct Record { promise_id: u32, @@ -94,8 +153,9 @@ fn op_listen( debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let rid = state.resource_table.add("tcpListener", Box::new(listener)); + std_listener.set_nonblocking(true)?; + let listener = TcpListener::try_from(std_listener)?; + let rid = state.resource_table_2.add(listener); Ok(rid) } @@ -106,7 +166,7 @@ fn op_close( ) -> Result<u32, Error> { debug!("close rid={}", rid); state - .resource_table + .resource_table_2 .close(rid) .map(|_| 0) .ok_or_else(bad_resource_id) @@ -119,56 +179,52 @@ async fn op_accept( ) -> Result<u32, Error> { debug!("accept rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; - - let listener = resource_table - .get_mut::<TcpListener>(rid) - .ok_or_else(bad_resource_id)?; - listener.poll_accept(cx).map_ok(|(stream, _addr)| { - resource_table.add("tcpStream", Box::new(stream)) - }) - }) - .await + let listener_rc = state + .borrow() + .resource_table_2 + .get::<TcpListener>(rid) + .ok_or_else(bad_resource_id)?; + let listener_ref = listener_rc.borrow().await; + + let stream: TcpStream = listener_ref.accept().await?.0.into(); + let rid = state.borrow_mut().resource_table_2.add(stream); + Ok(rid) } -fn op_read( +async fn op_read( state: Rc<RefCell<OpState>>, rid: u32, - bufs: BufVec, -) -> impl TryFuture<Ok = usize, Error = Error> { + mut bufs: BufVec, +) -> Result<usize, Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let mut buf = bufs[0].clone(); - debug!("read rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream).poll_read(cx, &mut buf) - }) + rd_stream_mut.read(&mut bufs[0]).await } -fn op_write( +async fn op_write( state: Rc<RefCell<OpState>>, rid: u32, bufs: BufVec, -) -> impl TryFuture<Ok = usize, Error = Error> { +) -> Result<usize, Error> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let buf = bufs[0].clone(); debug!("write rid={}", rid); - poll_fn(move |cx| { - let resource_table = &mut state.borrow_mut().resource_table; + let stream_rc = state + .borrow() + .resource_table_2 + .get::<TcpStream>(rid) + .ok_or_else(bad_resource_id)?; + let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - let stream = resource_table - .get_mut::<TcpStream>(rid) - .ok_or_else(bad_resource_id)?; - Pin::new(stream).poll_write(cx, &buf) - }) + wr_stream_mut.write(&bufs[0]).await } fn register_op_bin_sync<F>( @@ -247,8 +303,7 @@ fn main() { deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); - let mut runtime = runtime::Builder::new() - .basic_scheduler() + let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); |