summaryrefslogtreecommitdiff
path: root/core/examples/http_bench_json_ops.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/examples/http_bench_json_ops.rs')
-rw-r--r--core/examples/http_bench_json_ops.rs91
1 files changed, 50 insertions, 41 deletions
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index 77f5b9dbe..c4fcd6363 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -5,10 +5,10 @@ extern crate log;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
-use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
-use deno_core::AsyncRefFuture;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
@@ -41,51 +41,65 @@ impl log::Log for Logger {
fn flush(&self) {}
}
-// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
-// a cell, because it only supports one op (`accept`) which does not require
-// a mutable reference to the listener.
-struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
-
-impl Resource for TcpListener {}
+// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell,
+// because it only supports one op (`accept`) which does not require a mutable
+// reference to the listener.
+struct TcpListener {
+ inner: tokio::net::TcpListener,
+ cancel: CancelHandle,
+}
impl TcpListener {
- /// Returns a future that yields a shared borrow of the TCP listener.
- fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
- RcRef::map(self, |r| &r.0).borrow()
+ async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> {
+ let cancel = RcRef::map(&self, |r| &r.cancel);
+ let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into();
+ Ok(stream)
+ }
+}
+
+impl Resource for TcpListener {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
}
impl TryFrom<std::net::TcpListener> for TcpListener {
type Error = Error;
- fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
- tokio::net::TcpListener::try_from(l)
- .map(AsyncRefCell::new)
- .map(Self)
+ fn try_from(
+ std_listener: std::net::TcpListener,
+ ) -> Result<Self, Self::Error> {
+ tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self {
+ inner: tokio_listener,
+ cancel: Default::default(),
+ })
}
}
struct TcpStream {
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
+ // When a `TcpStream` resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures are attached to this cancel handle.
+ cancel: CancelHandle,
}
-impl Resource for TcpStream {}
-
impl TcpStream {
- /// Returns a future that yields an exclusive borrow of the read end of the
- /// tcp stream.
- fn rd_borrow_mut(
- self: Rc<Self>,
- ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
- RcRef::map(self, |r| &r.rd).borrow_mut()
+ async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ rd.read(buf).try_or_cancel(cancel).await
}
- /// Returns a future that yields an exclusive borrow of the write end of the
- /// tcp stream.
- fn wr_borrow_mut(
- self: Rc<Self>,
- ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
- RcRef::map(self, |r| &r.wr).borrow_mut()
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> {
+ let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
+ wr.write(buf).await
+ }
+}
+
+impl Resource for TcpStream {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
}
}
@@ -95,6 +109,7 @@ impl From<tokio::net::TcpStream> for TcpStream {
Self {
rd: rd.into(),
wr: wr.into(),
+ cancel: Default::default(),
}
}
}
@@ -157,14 +172,12 @@ async fn op_accept(
.unwrap();
debug!("accept rid={}", rid);
- let listener_rc = state
+ let listener = state
.borrow()
.resource_table_2
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
- let listener_ref = listener_rc.borrow().await;
-
- let stream: TcpStream = listener_ref.accept().await?.0.into();
+ let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table_2.add(stream);
Ok(serde_json::json!({ "rid": rid }))
}
@@ -184,14 +197,12 @@ async fn op_read(
.unwrap();
debug!("read rid={}", rid);
- let stream_rc = state
+ let stream = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
-
- let nread = rd_stream_mut.read(&mut bufs[0]).await?;
+ let nread = stream.read(&mut bufs[0]).await?;
Ok(serde_json::json!({ "nread": nread }))
}
@@ -210,14 +221,12 @@ async fn op_write(
.unwrap();
debug!("write rid={}", rid);
- let stream_rc = state
+ let stream = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
-
- let nwritten = wr_stream_mut.write(&bufs[0]).await?;
+ let nwritten = stream.write(&bufs[0]).await?;
Ok(serde_json::json!({ "nwritten": nwritten }))
}