summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/stream/consumers.mjs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/stream/consumers.mjs')
-rw-r--r--ext/node/polyfills/stream/consumers.mjs78
1 files changed, 78 insertions, 0 deletions
diff --git a/ext/node/polyfills/stream/consumers.mjs b/ext/node/polyfills/stream/consumers.mjs
new file mode 100644
index 000000000..61fe27020
--- /dev/null
+++ b/ext/node/polyfills/stream/consumers.mjs
@@ -0,0 +1,78 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+
+import { TextDecoder } from "internal:deno_web/08_text_encoding.js";
+import { Buffer } from "internal:deno_node/polyfills/buffer.ts";
+
+/**
+ * @typedef {import('../_global.d.ts').ReadableStream
+ * } ReadableStream
+ * @typedef {import('../_stream.d.ts')} Readable
+ */
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<Blob>}
+ */
+async function blob(stream) {
+ const chunks = [];
+ for await (const chunk of stream) {
+ chunks.push(chunk);
+ }
+ return new Blob(chunks);
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<ArrayBuffer>}
+ */
+async function arrayBuffer(stream) {
+ const ret = await blob(stream);
+ return ret.arrayBuffer();
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<Buffer>}
+ */
+async function buffer(stream) {
+ return Buffer.from(await arrayBuffer(stream));
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<string>}
+ */
+async function text(stream) {
+ const dec = new TextDecoder();
+ let str = "";
+ for await (const chunk of stream) {
+ if (typeof chunk === "string") {
+ str += chunk;
+ } else {
+ str += dec.decode(chunk, { stream: true });
+ }
+ }
+ // Flush the streaming TextDecoder so that any pending
+ // incomplete multibyte characters are handled.
+ str += dec.decode(undefined, { stream: false });
+ return str;
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<any>}
+ */
+async function json(stream) {
+ const str = await text(stream);
+ return JSON.parse(str);
+}
+
+export default {
+ arrayBuffer,
+ blob,
+ buffer,
+ json,
+ text,
+};
+export { arrayBuffer, blob, buffer, json, text };