summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/ops_builtin.rs2
-rw-r--r--core/resources.rs9
-rw-r--r--ext/flash/01_http.js9
-rw-r--r--ext/flash/lib.rs12
-rw-r--r--ext/http/01_http.js52
-rw-r--r--ext/http/lib.rs102
-rw-r--r--ext/web/06_streams.js23
-rw-r--r--runtime/js/40_testing.js1
8 files changed, 91 insertions, 119 deletions
diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs
index 02ecabc9c..6ca2a132c 100644
--- a/core/ops_builtin.rs
+++ b/core/ops_builtin.rs
@@ -165,7 +165,7 @@ async fn op_read(
buf: ZeroCopyBuf,
) -> Result<u32, Error> {
let resource = state.borrow().resource_table.get_any(rid)?;
- resource.read(buf).await.map(|n| n as u32)
+ resource.read_return(buf).await.map(|(n, _)| n as u32)
}
#[op]
diff --git a/core/resources.rs b/core/resources.rs
index eaa1fb3cf..56c9298af 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -35,14 +35,7 @@ pub trait Resource: Any + 'static {
type_name::<Self>().into()
}
- /// Resources may implement `read()` to be a readable stream
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(async move {
- let (nread, _) = self.read_return(buf).await?;
- Ok(nread)
- })
- }
-
+ /// Resources may implement `read_return()` to be a readable stream
fn read_return(
self: Rc<Self>,
_buf: ZeroCopyBuf,
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index fcea23a95..789979ff9 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -10,7 +10,7 @@
const {
ReadableStream,
ReadableStreamPrototype,
- getReadableStreamRid,
+ getReadableStreamResourceBacking,
readableStreamClose,
_state,
} = window.__bootstrap.streams;
@@ -333,8 +333,8 @@
}
if (isStreamingResponseBody === true) {
- const resourceRid = getReadableStreamRid(respBody);
- if (resourceRid) {
+ const resourceBacking = getReadableStreamResourceBacking(respBody);
+ if (resourceBacking) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
@@ -352,7 +352,8 @@
),
serverId,
i,
- resourceRid,
+ resourceBacking.rid,
+ resourceBacking.autoClose,
).then(() => {
// Release JS lock.
readableStreamClose(respBody);
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index 0714e379d..a7bd8b439 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -205,16 +205,20 @@ async fn op_flash_write_resource(
server_id: u32,
token: u32,
resource_id: deno_core::ResourceId,
+ auto_close: bool,
) -> Result<(), AnyError> {
- let resource = op_state.borrow_mut().resource_table.take_any(resource_id)?;
- let sock = {
+ let (resource, sock) = {
let op_state = &mut op_state.borrow_mut();
+ let resource = if auto_close {
+ op_state.resource_table.take_any(resource_id)?
+ } else {
+ op_state.resource_table.get_any(resource_id)?
+ };
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- ctx.requests.remove(&token).unwrap().socket()
+ (resource, ctx.requests.remove(&token).unwrap().socket())
};
- drop(op_state);
let _ = sock.write(&response);
#[cfg(unix)]
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index 63023a296..588a7da57 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -17,8 +17,7 @@
} = window.__bootstrap.fetch;
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
- const { ReadableStream, ReadableStreamPrototype } =
- window.__bootstrap.streams;
+ const { ReadableStreamPrototype } = window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal;
const {
WebSocket,
@@ -33,8 +32,12 @@
} = window.__bootstrap.webSocket;
const { TcpConn, UnixConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls;
- const { Deferred, getReadableStreamRid, readableStreamClose } =
- window.__bootstrap.streams;
+ const {
+ Deferred,
+ getReadableStreamResourceBacking,
+ readableStreamForRid,
+ readableStreamClose,
+ } = window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
@@ -50,7 +53,6 @@
StringPrototypeSplit,
Symbol,
SymbolAsyncIterator,
- TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
Uint8ArrayPrototype,
@@ -121,7 +123,7 @@
// It will be closed automatically once the request has been handled and
// the response has been sent.
if (method !== "GET" && method !== "HEAD") {
- body = createRequestBodyStream(streamRid);
+ body = readableStreamForRid(streamRid, false);
}
const innerRequest = newInnerRequest(
@@ -170,10 +172,6 @@
}
}
- function readRequest(streamRid, buf) {
- return core.opAsync("op_http_read", streamRid, buf);
- }
-
function createRespondWith(
httpConn,
streamRid,
@@ -270,9 +268,9 @@
) {
throw new TypeError("Unreachable");
}
- const resourceRid = getReadableStreamRid(respBody);
+ const resourceBacking = getReadableStreamResourceBacking(respBody);
let reader;
- if (resourceRid) {
+ if (resourceBacking) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
@@ -281,9 +279,9 @@
await core.opAsync(
"op_http_write_resource",
streamRid,
- resourceRid,
+ resourceBacking.rid,
);
- core.tryClose(resourceRid);
+ if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
readableStreamClose(respBody); // Release JS lock.
} catch (error) {
const connError = httpConn[connErrorSymbol];
@@ -379,32 +377,6 @@
};
}
- function createRequestBodyStream(streamRid) {
- return new ReadableStream({
- type: "bytes",
- async pull(controller) {
- try {
- // This is the largest possible size for a single packet on a TLS
- // stream.
- const chunk = new Uint8Array(16 * 1024 + 256);
- const read = await readRequest(streamRid, chunk);
- if (read > 0) {
- // We read some data. Enqueue it onto the stream.
- controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
- } else {
- // We have reached the end of the body, so we close the stream.
- controller.close();
- }
- } catch (err) {
- // There was an error while reading a chunk of the body, so we
- // error.
- controller.error(err);
- controller.close();
- }
- },
- });
- }
-
const _ws = Symbol("[[associated_ws]]");
function upgradeWebSocket(request, options = {}) {
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index d1b38fb42..bffe3c3d5 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -78,7 +78,6 @@ pub fn init() -> Extension {
))
.ops(vec![
op_http_accept::decl(),
- op_http_read::decl(),
op_http_write_headers::decl(),
op_http_headers::decl(),
op_http_write::decl(),
@@ -329,11 +328,63 @@ impl HttpStreamResource {
}
}
+impl HttpStreamResource {
+ async fn read(
+ self: Rc<Self>,
+ mut buf: ZeroCopyBuf,
+ ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+
+ let body = loop {
+ match &mut *rd {
+ HttpRequestReader::Headers(_) => {}
+ HttpRequestReader::Body(_, body) => break body,
+ HttpRequestReader::Closed => return Ok((0, buf)),
+ }
+ match take(&mut *rd) {
+ HttpRequestReader::Headers(request) => {
+ let (parts, body) = request.into_parts();
+ *rd = HttpRequestReader::Body(parts.headers, body.peekable());
+ }
+ _ => unreachable!(),
+ };
+ };
+
+ let fut = async {
+ let mut body = Pin::new(body);
+ loop {
+ match body.as_mut().peek_mut().await {
+ Some(Ok(chunk)) if !chunk.is_empty() => {
+ let len = min(buf.len(), chunk.len());
+ buf[..len].copy_from_slice(&chunk.split_to(len));
+ break Ok((len, buf));
+ }
+ Some(_) => match body.as_mut().next().await.unwrap() {
+ Ok(chunk) => assert!(chunk.is_empty()),
+ Err(err) => break Err(AnyError::from(err)),
+ },
+ None => break Ok((0, buf)),
+ }
+ }
+ };
+
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
+ fut.try_or_cancel(cancel_handle).await
+ }
+}
+
impl Resource for HttpStreamResource {
fn name(&self) -> Cow<str> {
"httpStream".into()
}
+ fn read_return(
+ self: Rc<Self>,
+ _buf: ZeroCopyBuf,
+ ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> {
+ Box::pin(self.read(_buf))
+ }
+
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
@@ -817,55 +868,6 @@ async fn op_http_shutdown(
}
#[op]
-async fn op_http_read(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- mut buf: ZeroCopyBuf,
-) -> Result<usize, AnyError> {
- let stream = state
- .borrow_mut()
- .resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
-
- let body = loop {
- match &mut *rd {
- HttpRequestReader::Headers(_) => {}
- HttpRequestReader::Body(_, body) => break body,
- HttpRequestReader::Closed => return Ok(0),
- }
- match take(&mut *rd) {
- HttpRequestReader::Headers(request) => {
- let (parts, body) = request.into_parts();
- *rd = HttpRequestReader::Body(parts.headers, body.peekable());
- }
- _ => unreachable!(),
- };
- };
-
- let fut = async {
- let mut body = Pin::new(body);
- loop {
- match body.as_mut().peek_mut().await {
- Some(Ok(chunk)) if !chunk.is_empty() => {
- let len = min(buf.len(), chunk.len());
- buf[..len].copy_from_slice(&chunk.split_to(len));
- break Ok(len);
- }
- Some(_) => match body.as_mut().next().await.unwrap() {
- Ok(chunk) => assert!(chunk.is_empty()),
- Err(err) => break Err(AnyError::from(err)),
- },
- None => break Ok(0),
- }
- }
- };
-
- let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle);
- fut.try_or_cancel(cancel_handle).await
-}
-
-#[op]
fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
let digest = ring::digest::digest(
&ring::digest::SHA1_FOR_LEGACY_USE_ONLY,
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index cbf781b53..412c58c3c 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -654,11 +654,12 @@
* read directly from the underlying resource if they so choose (FastStream).
*
* @param {number} rid The resource ID to read from.
+ * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true.
* @returns {ReadableStream<Uint8Array>}
*/
- function readableStreamForRid(rid) {
+ function readableStreamForRid(rid, autoClose = true) {
const stream = webidl.createBranded(ReadableStream);
- stream[_maybeRid] = rid;
+ stream[_resourceBacking] = { rid, autoClose };
const underlyingSource = {
type: "bytes",
async pull(controller) {
@@ -666,7 +667,7 @@
try {
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
- core.tryClose(rid);
+ if (autoClose) core.tryClose(rid);
controller.close();
controller.byobRequest.respond(0);
} else {
@@ -674,11 +675,11 @@
}
} catch (e) {
controller.error(e);
- core.tryClose(rid);
+ if (autoClose) core.tryClose(rid);
}
},
cancel() {
- core.tryClose(rid);
+ if (autoClose) core.tryClose(rid);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
};
@@ -761,8 +762,8 @@
}
}
- function getReadableStreamRid(stream) {
- return stream[_maybeRid];
+ function getReadableStreamResourceBacking(stream) {
+ return stream[_resourceBacking];
}
/**
@@ -4424,7 +4425,7 @@
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}
- const _maybeRid = Symbol("[[maybeRid]]");
+ const _resourceBacking = Symbol("[[resourceBacking]]");
/** @template R */
class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
@@ -4439,8 +4440,8 @@
[_state];
/** @type {any} */
[_storedError];
- /** @type {number | null} */
- [_maybeRid] = null;
+ /** @type {{ rid: number, autoClose: boolean } | null} */
+ [_resourceBacking] = null;
/**
* @param {UnderlyingSource<R>=} underlyingSource
@@ -5986,7 +5987,7 @@
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
- getReadableStreamRid,
+ getReadableStreamResourceBacking,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,
diff --git a/runtime/js/40_testing.js b/runtime/js/40_testing.js
index c1ce3e726..069b27957 100644
--- a/runtime/js/40_testing.js
+++ b/runtime/js/40_testing.js
@@ -90,7 +90,6 @@
"op_funlock_async": ["unlock a file", "awaiting the result of a `Deno.funlock` call"],
"op_futime_async": ["change file timestamps", "awaiting the result of a `Deno.futime` call"],
"op_http_accept": ["accept a HTTP request", "closing a `Deno.HttpConn`"],
- "op_http_read": ["read the body of a HTTP request", "consuming the entire request body"],
"op_http_shutdown": ["shutdown a HTTP connection", "awaiting `Deno.HttpEvent#respondWith`"],
"op_http_upgrade_websocket": ["upgrade a HTTP connection to a WebSocket", "awaiting `Deno.HttpEvent#respondWith`"],
"op_http_write_headers": ["write HTTP response headers", "awaiting `Deno.HttpEvent#respondWith`"],