diff options
Diffstat (limited to 'core/examples/http_bench_json_ops.rs')
-rw-r--r-- | core/examples/http_bench_json_ops.rs | 91 |
1 files changed, 50 insertions, 41 deletions
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 77f5b9dbe..c4fcd6363 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -5,10 +5,10 @@ extern crate log; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; -use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; -use deno_core::AsyncRefFuture; use deno_core::BufVec; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; use deno_core::RcRef; @@ -41,51 +41,65 @@ impl log::Log for Logger { fn flush(&self) {} } -// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in -// a cell, because it only supports one op (`accept`) which does not require -// a mutable reference to the listener. -struct TcpListener(AsyncRefCell<tokio::net::TcpListener>); - -impl Resource for TcpListener {} +// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell, +// because it only supports one op (`accept`) which does not require a mutable +// reference to the listener. +struct TcpListener { + inner: tokio::net::TcpListener, + cancel: CancelHandle, +} impl TcpListener { - /// Returns a future that yields a shared borrow of the TCP listener. - fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> { - RcRef::map(self, |r| &r.0).borrow() + async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> { + let cancel = RcRef::map(&self, |r| &r.cancel); + let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into(); + Ok(stream) + } +} + +impl Resource for TcpListener { + fn close(self: Rc<Self>) { + self.cancel.cancel(); } } impl TryFrom<std::net::TcpListener> for TcpListener { type Error = Error; - fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> { - tokio::net::TcpListener::try_from(l) - .map(AsyncRefCell::new) - .map(Self) + fn try_from( + std_listener: std::net::TcpListener, + ) -> Result<Self, Self::Error> { + tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self { + inner: tokio_listener, + cancel: Default::default(), + }) } } struct TcpStream { rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>, wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>, + // When a `TcpStream` resource is closed, all pending 'read' ops are + // canceled, while 'write' ops are allowed to complete. Therefore only + // 'read' futures are attached to this cancel handle. + cancel: CancelHandle, } -impl Resource for TcpStream {} - impl TcpStream { - /// Returns a future that yields an exclusive borrow of the read end of the - /// tcp stream. - fn rd_borrow_mut( - self: Rc<Self>, - ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> { - RcRef::map(self, |r| &r.rd).borrow_mut() + async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + rd.read(buf).try_or_cancel(cancel).await } - /// Returns a future that yields an exclusive borrow of the write end of the - /// tcp stream. - fn wr_borrow_mut( - self: Rc<Self>, - ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> { - RcRef::map(self, |r| &r.wr).borrow_mut() + async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> { + let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; + wr.write(buf).await + } +} + +impl Resource for TcpStream { + fn close(self: Rc<Self>) { + self.cancel.cancel() } } @@ -95,6 +109,7 @@ impl From<tokio::net::TcpStream> for TcpStream { Self { rd: rd.into(), wr: wr.into(), + cancel: Default::default(), } } } @@ -157,14 +172,12 @@ async fn op_accept( .unwrap(); debug!("accept rid={}", rid); - let listener_rc = state + let listener = state .borrow() .resource_table_2 .get::<TcpListener>(rid) .ok_or_else(bad_resource_id)?; - let listener_ref = listener_rc.borrow().await; - - let stream: TcpStream = listener_ref.accept().await?.0.into(); + let stream = listener.accept().await?; let rid = state.borrow_mut().resource_table_2.add(stream); Ok(serde_json::json!({ "rid": rid })) } @@ -184,14 +197,12 @@ async fn op_read( .unwrap(); debug!("read rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - let mut rd_stream_mut = stream_rc.rd_borrow_mut().await; - - let nread = rd_stream_mut.read(&mut bufs[0]).await?; + let nread = stream.read(&mut bufs[0]).await?; Ok(serde_json::json!({ "nread": nread })) } @@ -210,14 +221,12 @@ async fn op_write( .unwrap(); debug!("write rid={}", rid); - let stream_rc = state + let stream = state .borrow() .resource_table_2 .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - let mut wr_stream_mut = stream_rc.wr_borrow_mut().await; - - let nwritten = wr_stream_mut.write(&bufs[0]).await?; + let nwritten = stream.write(&bufs[0]).await?; Ok(serde_json::json!({ "nwritten": nwritten })) } |