diff options
Diffstat (limited to 'core/examples/http_bench_bin_ops.rs')
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 9af74d980..1d7a76c3d 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,10 +3,10 @@ #[macro_use] extern crate log; -use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; -use deno_core::AsyncRefFuture; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::Op; use deno_core::OpState; @@ -46,51 +46,65 @@ impl log::Log for Logger { fn flush(&self) {} } -// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in -// a cell, because it only supports one op (`accept`) which does not require -// a mutable reference to the listener. -struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); - -impl Resource for TcpListener {} +// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell, +// because it only supports one op (`accept`) which does not require a mutable +// reference to the listener. +struct TcpListener { + inner: tokio::net::TcpListener, + cancel: CancelHandle, +} impl TcpListener { - /// Returns a future that yields a shared borrow of the TCP listener. - fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { - RcRef::map(self, |r| &r.0).borrow() + async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> { + let cancel = RcRef::map(&self, |r| &r.cancel); + let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into(); + Ok(stream) + } +} + +impl Resource for TcpListener { + fn close(self: Rc<Self>) { + self.cancel.cancel(); } } impl TryFrom<std::net::TcpListener> for TcpListener { type Error = Error; - fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { - tokio::net::TcpListener::try_from(l) - .map(AsyncRefCell::new) - .map(Self) + fn try_from( + std_listener: std::net::TcpListener, + ) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self { + inner: tokio_listener, + cancel: Default::default(), + }) } } struct TcpStream { rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, + // When a `TcpStream` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, } -impl Resource for TcpStream {} - impl TcpStream { - /// Returns a future that yields an exclusive borrow of the read end of the - /// tcp stream. - fn rd_borrow_mut( - self: Rc<Self>, - ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { - RcRef::map(self, |r| &r.rd).borrow_mut() + async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + rd.read(buf).try_or_cancel(cancel).await } - /// Returns a future that yields an exclusive borrow of the write end of the - /// tcp stream. - fn wr_borrow_mut( - self: Rc<Self>, - ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { - RcRef::map(self, |r| &r.wr).borrow_mut() + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> { + let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; + wr.write(buf).await + } +} + +impl Resource for TcpStream { + fn close(self: Rc<Self>) { + self.cancel.cancel() } } @@ -100,6 +114,7 @@ impl From<tokio::net::TcpStream> for TcpStream { Self { rd: rd.into(), wr: wr.into(), + cancel: Default::default(), } } } @@ -179,14 +194,12 @@ async fn op_accept( ) -> Result<u32, Error> { debug!("accept rid={}", rid); - let listener_rc = state + let listener = state .borrow() .resource_table_2 .get::<TcpListener>(rid) .ok_or_else(bad_resource_id)?; - let listener_ref = listener_rc.borrow().await; - - let stream: TcpStream = listener_ref.accept().await?.0.into(); + let stream = listener.accept().await?; let rid = state.borrow_mut().resource_table_2.add(stream); Ok(rid) } @@ -199,14 +212,12 @@ async fn op_read( assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("read rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - - rd_stream_mut.read(&mut bufs[0]).await + stream.read(&mut bufs[0]).await } async fn op_write( @@ -217,14 +228,12 @@ async fn op_write( assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("write rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - - wr_stream_mut.write(&bufs[0]).await + stream.write(&bufs[0]).await } fn register_op_bin_sync<F>( |