summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/stream
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/stream')
-rw-r--r--ext/node/polyfills/stream/consumers.mjs78
-rw-r--r--ext/node/polyfills/stream/promises.mjs12
-rw-r--r--ext/node/polyfills/stream/web.ts51
3 files changed, 141 insertions, 0 deletions
diff --git a/ext/node/polyfills/stream/consumers.mjs b/ext/node/polyfills/stream/consumers.mjs
new file mode 100644
index 000000000..61fe27020
--- /dev/null
+++ b/ext/node/polyfills/stream/consumers.mjs
@@ -0,0 +1,78 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+
+import { TextDecoder } from "internal:deno_web/08_text_encoding.js";
+import { Buffer } from "internal:deno_node/polyfills/buffer.ts";
+
+/**
+ * @typedef {import('../_global.d.ts').ReadableStream
+ * } ReadableStream
+ * @typedef {import('../_stream.d.ts')} Readable
+ */
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<Blob>}
+ */
+async function blob(stream) {
+ const chunks = [];
+ for await (const chunk of stream) {
+ chunks.push(chunk);
+ }
+ return new Blob(chunks);
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<ArrayBuffer>}
+ */
+async function arrayBuffer(stream) {
+ const ret = await blob(stream);
+ return ret.arrayBuffer();
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<Buffer>}
+ */
+async function buffer(stream) {
+ return Buffer.from(await arrayBuffer(stream));
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<string>}
+ */
+async function text(stream) {
+ const dec = new TextDecoder();
+ let str = "";
+ for await (const chunk of stream) {
+ if (typeof chunk === "string") {
+ str += chunk;
+ } else {
+ str += dec.decode(chunk, { stream: true });
+ }
+ }
+ // Flush the streaming TextDecoder so that any pending
+ // incomplete multibyte characters are handled.
+ str += dec.decode(undefined, { stream: false });
+ return str;
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<any>}
+ */
+async function json(stream) {
+ const str = await text(stream);
+ return JSON.parse(str);
+}
+
+export default {
+ arrayBuffer,
+ blob,
+ buffer,
+ json,
+ text,
+};
+export { arrayBuffer, blob, buffer, json, text };
diff --git a/ext/node/polyfills/stream/promises.mjs b/ext/node/polyfills/stream/promises.mjs
new file mode 100644
index 000000000..8c1f7439b
--- /dev/null
+++ b/ext/node/polyfills/stream/promises.mjs
@@ -0,0 +1,12 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+
+import stream from "internal:deno_node/polyfills/_stream.mjs";
+
+const { finished, pipeline } = stream.promises;
+
+export default {
+ finished,
+ pipeline,
+};
+export { finished, pipeline };
diff --git a/ext/node/polyfills/stream/web.ts b/ext/node/polyfills/stream/web.ts
new file mode 100644
index 000000000..d97a6b0f1
--- /dev/null
+++ b/ext/node/polyfills/stream/web.ts
@@ -0,0 +1,51 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+import {
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
+ ReadableByteStreamController,
+ ReadableStream,
+ ReadableStreamDefaultController,
+ ReadableStreamDefaultReader,
+ TransformStream,
+ TransformStreamDefaultController,
+ WritableStream,
+ WritableStreamDefaultController,
+ WritableStreamDefaultWriter,
+} from "internal:deno_web/06_streams.js";
+import {
+ TextDecoderStream,
+ TextEncoderStream,
+} from "internal:deno_web/08_text_encoding.js";
+
+export {
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
+ ReadableByteStreamController,
+ ReadableStream,
+ ReadableStreamDefaultController,
+ ReadableStreamDefaultReader,
+ TextDecoderStream,
+ TextEncoderStream,
+ TransformStream,
+ TransformStreamDefaultController,
+ WritableStream,
+ WritableStreamDefaultController,
+ WritableStreamDefaultWriter,
+};
+
+export default {
+ ReadableStream,
+ ReadableStreamDefaultReader,
+ ReadableByteStreamController,
+ ReadableStreamDefaultController,
+ TransformStream,
+ TransformStreamDefaultController,
+ WritableStream,
+ WritableStreamDefaultWriter,
+ WritableStreamDefaultController,
+ ByteLengthQueuingStrategy,
+ CountQueuingStrategy,
+ TextEncoderStream,
+ TextDecoderStream,
+};