diff options
Diffstat (limited to 'ext/net')
-rw-r--r-- | ext/net/01_net.js | 30 | ||||
-rw-r--r-- | ext/net/io.rs | 16 | ||||
-rw-r--r-- | ext/net/ops_tls.rs | 9 |
3 files changed, 18 insertions, 37 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 48cbfaaab..fde75fe56 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,7 +4,7 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype } = core; - const { ReadableStream, WritableStream } = window.__bootstrap.streams; + const { WritableStream, readableStreamForRid } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -65,8 +65,6 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } - const DEFAULT_CHUNK_SIZE = 64 * 1024; - function tryClose(rid) { try { core.close(rid); @@ -75,32 +73,6 @@ } } - function readableStreamForRid(rid) { - return new ReadableStream({ - type: "bytes", - async pull(controller) { - const v = controller.byobRequest.view; - try { - const bytesRead = await read(rid, v); - if (bytesRead === null) { - tryClose(rid); - controller.close(); - controller.byobRequest.respond(0); - } else { - controller.byobRequest.respond(bytesRead); - } - } catch (e) { - controller.error(e); - tryClose(rid); - } - }, - cancel() { - tryClose(rid); - }, - autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, - }); - } - function writableStreamForRid(rid) { return new WritableStream({ async write(chunk, controller) { diff --git a/ext/net/io.rs b/ext/net/io.rs index 17b86af17..02caf7473 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -70,13 +70,13 @@ where pub async fn read( self: Rc<Self>, mut buf: ZeroCopyBuf, - ) -> Result<usize, AnyError> { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = self.rd_borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok(nread) + Ok((nread, buf)) } pub async fn write( @@ -103,7 +103,10 @@ impl Resource for TcpStreamResource { "tcpStream".into() } - fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + fn read_return( + self: Rc<Self>, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -160,7 +163,7 @@ impl UnixStreamResource { pub async fn read( self: Rc<Self>, _buf: ZeroCopyBuf, - ) -> Result<usize, AnyError> { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { unreachable!() } pub async fn write( @@ -182,7 +185,10 @@ impl Resource for UnixStreamResource { "unixStream".into() } - fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + fn read_return( + self: Rc<Self>, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index d6b83e6e8..ca922203c 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -674,11 +674,11 @@ impl TlsStreamResource { pub async fn read( self: Rc<Self>, mut buf: ZeroCopyBuf, - ) -> Result<usize, AnyError> { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; - Ok(nread) + Ok((nread, buf)) } pub async fn write( @@ -722,7 +722,10 @@ impl Resource for TlsStreamResource { "tlsStream".into() } - fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + fn read_return( + self: Rc<Self>, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } |