diff options
author | Aaron O'Mullan <aaron.omullan@gmail.com> | 2021-11-09 19:26:17 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-09 19:26:17 +0100 |
commit | 375ce63c6390cf7710210ce22f14a2b5a02cbfc3 (patch) | |
tree | 85100876e5e0b50514385ae3c7ce08493c82b38b /ext/net/ops_tls.rs | |
parent | 1eae6c139ee1dac28df57d67d993792b773fa1ff (diff) |
feat(core): streams (#12596)
This allows resources to be "streams" by implementing read/write/shutdown. These streams are implicit since their nature (read/write/duplex) isn't known until called, but we could easily add another method to explicitly tag resources as streams.
`op_read/op_write/op_shutdown` are now builtin ops provided by `deno_core`
Note: this current implementation is simple & straightforward but it results in an additional alloc per read/write call
Closes #12556
Diffstat (limited to 'ext/net/ops_tls.rs')
-rw-r--r-- | ext/net/ops_tls.rs | 33 |
1 files changed, 25 insertions, 8 deletions
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index 43652a9fe..87744ed63 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -28,6 +28,7 @@ use deno_core::op_async; use deno_core::op_sync; use deno_core::parking_lot::Mutex; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::OpPair; @@ -35,6 +36,7 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; use deno_tls::create_client_config; use deno_tls::load_certs; use deno_tls::load_private_keys; @@ -715,24 +717,27 @@ impl TlsStreamResource { } pub async fn read( - self: &Rc<Self>, - buf: &mut [u8], + self: Rc<Self>, + mut buf: ZeroCopyBuf, ) -> Result<usize, AnyError> { - let mut rd = RcRef::map(self, |r| &r.rd).borrow_mut().await; - let cancel_handle = RcRef::map(self, |r| &r.cancel_handle); - let nread = rd.read(buf).try_or_cancel(cancel_handle).await?; + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; Ok(nread) } - pub async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> { + pub async fn write( + self: Rc<Self>, + buf: ZeroCopyBuf, + ) -> Result<usize, AnyError> { self.handshake().await?; let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; - let nwritten = wr.write(buf).await?; + let nwritten = wr.write(&buf).await?; wr.flush().await?; Ok(nwritten) } - pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> { + pub async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> { self.handshake().await?; let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; wr.shutdown().await?; @@ -755,6 +760,18 @@ impl Resource for TlsStreamResource { "tlsStream".into() } + fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + Box::pin(self.read(buf)) + } + + fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + Box::pin(self.write(buf)) + } + + fn shutdown(self: Rc<Self>) -> AsyncResult<()> { + Box::pin(self.shutdown()) + } + fn close(self: Rc<Self>) { self.cancel_handle.cancel(); } |