summaryrefslogtreecommitdiff
path: root/std/node/_stream/from.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-21 16:13:18 -0500
committerGitHub <noreply@github.com>2020-11-21 16:13:18 -0500
commita4f27c4d570ad9b47bbd560fbf9b017f852fc29f (patch)
tree84b3b0111fca262932aa2be7f1ef884fc6d5ddc3 /std/node/_stream/from.ts
parentce890f2ae7e8b557211e8f529180d30dc44ea7b5 (diff)
feat(std/node): Add Readable Stream / Writable Stream / errors support (#7569)
Diffstat (limited to 'std/node/_stream/from.ts')
-rw-r--r--std/node/_stream/from.ts102
1 files changed, 102 insertions, 0 deletions
diff --git a/std/node/_stream/from.ts b/std/node/_stream/from.ts
new file mode 100644
index 000000000..652c17715
--- /dev/null
+++ b/std/node/_stream/from.ts
@@ -0,0 +1,102 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import Readable from "./readable.ts";
+import type { ReadableOptions } from "./readable.ts";
+import { ERR_INVALID_ARG_TYPE, ERR_STREAM_NULL_VALUES } from "../_errors.ts";
+
+export default function from(
+ // deno-lint-ignore no-explicit-any
+ iterable: Iterable<any> | AsyncIterable<any>,
+ opts?: ReadableOptions,
+) {
+ let iterator:
+ // deno-lint-ignore no-explicit-any
+ | Iterator<any, any, undefined>
+ // deno-lint-ignore no-explicit-any
+ | AsyncIterator<any, any, undefined>;
+ if (typeof iterable === "string" || iterable instanceof Buffer) {
+ return new Readable({
+ objectMode: true,
+ ...opts,
+ read() {
+ this.push(iterable);
+ this.push(null);
+ },
+ });
+ }
+
+ if (Symbol.asyncIterator in iterable) {
+ // deno-lint-ignore no-explicit-any
+ iterator = (iterable as AsyncIterable<any>)[Symbol.asyncIterator]();
+ } else if (Symbol.iterator in iterable) {
+ // deno-lint-ignore no-explicit-any
+ iterator = (iterable as Iterable<any>)[Symbol.iterator]();
+ } else {
+ throw new ERR_INVALID_ARG_TYPE("iterable", ["Iterable"], iterable);
+ }
+
+ const readable = new Readable({
+ objectMode: true,
+ highWaterMark: 1,
+ ...opts,
+ });
+
+ // Reading boolean to protect against _read
+ // being called before last iteration completion.
+ let reading = false;
+
+ // needToClose boolean if iterator needs to be explicitly closed
+ let needToClose = false;
+
+ readable._read = function () {
+ if (!reading) {
+ reading = true;
+ next();
+ }
+ };
+
+ readable._destroy = function (error, cb) {
+ if (needToClose) {
+ needToClose = false;
+ close().then(
+ () => queueMicrotask(() => cb(error)),
+ (e) => queueMicrotask(() => cb(error || e)),
+ );
+ } else {
+ cb(error);
+ }
+ };
+
+ async function close() {
+ if (typeof iterator.return === "function") {
+ const { value } = await iterator.return();
+ await value;
+ }
+ }
+
+ async function next() {
+ try {
+ needToClose = false;
+ const { value, done } = await iterator.next();
+ needToClose = !done;
+ if (done) {
+ readable.push(null);
+ } else if (readable.destroyed) {
+ await close();
+ } else {
+ const res = await value;
+ if (res === null) {
+ reading = false;
+ throw new ERR_STREAM_NULL_VALUES();
+ } else if (readable.push(res)) {
+ next();
+ } else {
+ reading = false;
+ }
+ }
+ } catch (err) {
+ readable.destroy(err);
+ }
+ }
+ return readable;
+}