summaryrefslogtreecommitdiff
path: root/ext/net
diff options
context:
space:
mode:
Diffstat (limited to 'ext/net')
-rw-r--r--ext/net/01_net.js30
-rw-r--r--ext/net/io.rs16
-rw-r--r--ext/net/ops_tls.rs9
3 files changed, 18 insertions, 37 deletions
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 48cbfaaab..fde75fe56 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -4,7 +4,7 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype } = core;
- const { ReadableStream, WritableStream } = window.__bootstrap.streams;
+ const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@@ -65,8 +65,6 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
- const DEFAULT_CHUNK_SIZE = 64 * 1024;
-
function tryClose(rid) {
try {
core.close(rid);
@@ -75,32 +73,6 @@
}
}
- function readableStreamForRid(rid) {
- return new ReadableStream({
- type: "bytes",
- async pull(controller) {
- const v = controller.byobRequest.view;
- try {
- const bytesRead = await read(rid, v);
- if (bytesRead === null) {
- tryClose(rid);
- controller.close();
- controller.byobRequest.respond(0);
- } else {
- controller.byobRequest.respond(bytesRead);
- }
- } catch (e) {
- controller.error(e);
- tryClose(rid);
- }
- },
- cancel() {
- tryClose(rid);
- },
- autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
- });
- }
-
function writableStreamForRid(rid) {
return new WritableStream({
async write(chunk, controller) {
diff --git a/ext/net/io.rs b/ext/net/io.rs
index 17b86af17..02caf7473 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -70,13 +70,13 @@ where
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.rd_borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
- Ok(nread)
+ Ok((nread, buf))
}
pub async fn write(
@@ -103,7 +103,10 @@ impl Resource for TcpStreamResource {
"tcpStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@@ -160,7 +163,7 @@ impl UnixStreamResource {
pub async fn read(
self: Rc<Self>,
_buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
unreachable!()
}
pub async fn write(
@@ -182,7 +185,10 @@ impl Resource for UnixStreamResource {
"unixStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index d6b83e6e8..ca922203c 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -674,11 +674,11 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<usize, AnyError> {
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
- Ok(nread)
+ Ok((nread, buf))
}
pub async fn write(
@@ -722,7 +722,10 @@ impl Resource for TlsStreamResource {
"tlsStream".into()
}
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
+ fn read_return(
+ self: Rc<Self>,
+ buf: ZeroCopyBuf,
+ ) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}