summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js42
1 files changed, 42 insertions, 0 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 6daea0898..492694563 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -8,6 +8,7 @@
"use strict";
((window) => {
+ const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
window.__bootstrap.abortSignal;
@@ -640,6 +641,41 @@
return stream[_disturbed];
}
+ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
+
+ function readableStreamForRid(rid) {
+ const stream = new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ const v = controller.byobRequest.view;
+ try {
+ const bytesRead = await core.read(rid, v);
+ if (bytesRead === 0) {
+ core.tryClose(rid);
+ controller.close();
+ controller.byobRequest.respond(0);
+ } else {
+ controller.byobRequest.respond(bytesRead);
+ }
+ } catch (e) {
+ controller.error(e);
+ core.tryClose(rid);
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
+ });
+
+ stream[_maybeRid] = rid;
+ return stream;
+ }
+
+ function getReadableStreamRid(stream) {
+ return stream[_maybeRid];
+ }
+
/**
* @param {unknown} value
* @returns {value is WritableStream}
@@ -4288,6 +4324,7 @@
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}
+ const _maybeRid = Symbol("[[maybeRid]]");
/** @template R */
class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
@@ -4302,6 +4339,8 @@
[_state];
/** @type {any} */
[_storedError];
+ /** @type {number | null} */
+ [_maybeRid] = null;
/**
* @param {UnderlyingSource<R>=} underlyingSource
@@ -5840,6 +5879,9 @@
errorReadableStream,
createProxy,
writableStreamClose,
+ readableStreamClose,
+ readableStreamForRid,
+ getReadableStreamRid,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,