summaryrefslogtreecommitdiff
path: root/std/node/_stream/async_iterator.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-21 16:13:18 -0500
committerGitHub <noreply@github.com>2020-11-21 16:13:18 -0500
commita4f27c4d570ad9b47bbd560fbf9b017f852fc29f (patch)
tree84b3b0111fca262932aa2be7f1ef884fc6d5ddc3 /std/node/_stream/async_iterator.ts
parentce890f2ae7e8b557211e8f529180d30dc44ea7b5 (diff)
feat(std/node): Add Readable Stream / Writable Stream / errors support (#7569)
Diffstat (limited to 'std/node/_stream/async_iterator.ts')
-rw-r--r--std/node/_stream/async_iterator.ts260
1 files changed, 260 insertions, 0 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts
new file mode 100644
index 000000000..cd1b6db3c
--- /dev/null
+++ b/std/node/_stream/async_iterator.ts
@@ -0,0 +1,260 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import type { Buffer } from "../buffer.ts";
+import finished from "./end-of-stream.ts";
+import Readable from "./readable.ts";
+import type Stream from "./stream.ts";
+
+const kLastResolve = Symbol("lastResolve");
+const kLastReject = Symbol("lastReject");
+const kError = Symbol("error");
+const kEnded = Symbol("ended");
+const kLastPromise = Symbol("lastPromise");
+const kHandlePromise = Symbol("handlePromise");
+const kStream = Symbol("stream");
+
+// TODO(Soremwar)
+// Add Duplex streams
+type IterableStreams = Stream | Readable;
+
+type IterableItem = Buffer | string | Uint8Array | undefined;
+type ReadableIteratorResult = IteratorResult<IterableItem>;
+
+function initIteratorSymbols(
+ o: ReadableStreamAsyncIterator,
+ symbols: symbol[],
+) {
+ const properties: PropertyDescriptorMap = {};
+ for (const sym in symbols) {
+ properties[sym] = {
+ configurable: false,
+ enumerable: false,
+ writable: true,
+ };
+ }
+ Object.defineProperties(o, properties);
+}
+
+// TODO(Soremwar)
+// Bring back once requests are implemented
+// function isRequest(stream: any) {
+// return stream && stream.setHeader && typeof stream.abort === "function";
+// }
+
+//TODO(Soremwar)
+//Should be any implementation of stream
+// deno-lint-ignore no-explicit-any
+function destroyer(stream: any, err?: Error | null) {
+ // TODO(Soremwar)
+ // Bring back once requests are implemented
+ // if (isRequest(stream)) return stream.abort();
+ // if (isRequest(stream.req)) return stream.req.abort();
+ if (typeof stream.destroy === "function") return stream.destroy(err);
+ if (typeof stream.close === "function") return stream.close();
+}
+
+function createIterResult(
+ value: IterableItem,
+ done: boolean,
+): ReadableIteratorResult {
+ return { value, done };
+}
+
+function readAndResolve(iter: ReadableStreamAsyncIterator) {
+ const resolve = iter[kLastResolve];
+ if (resolve !== null) {
+ const data = iter[kStream].read();
+ if (data !== null) {
+ iter[kLastPromise] = null;
+ iter[kLastResolve] = null;
+ iter[kLastReject] = null;
+ resolve(createIterResult(data, false));
+ }
+ }
+}
+
+function onReadable(iter: ReadableStreamAsyncIterator) {
+ queueMicrotask(() => readAndResolve(iter));
+}
+
+function wrapForNext(
+ lastPromise: Promise<ReadableIteratorResult>,
+ iter: ReadableStreamAsyncIterator,
+) {
+ return (
+ resolve: (value: ReadableIteratorResult) => void,
+ reject: (error: Error) => void,
+ ) => {
+ lastPromise.then(() => {
+ if (iter[kEnded]) {
+ resolve(createIterResult(undefined, true));
+ return;
+ }
+
+ iter[kHandlePromise](resolve, reject);
+ }, reject);
+ };
+}
+
+function finish(self: ReadableStreamAsyncIterator, err?: Error) {
+ return new Promise(
+ (
+ resolve: (result: ReadableIteratorResult) => void,
+ reject: (error: Error) => void,
+ ) => {
+ const stream = self[kStream];
+
+ finished(stream, (err) => {
+ if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
+ reject(err);
+ } else {
+ resolve(createIterResult(undefined, true));
+ }
+ });
+ destroyer(stream, err);
+ },
+ );
+}
+
+const AsyncIteratorPrototype = Object.getPrototypeOf(
+ Object.getPrototypeOf(async function* () {}).prototype,
+);
+
+class ReadableStreamAsyncIterator
+ implements AsyncIterableIterator<IterableItem> {
+ [kEnded]: boolean;
+ [kError]: Error | null = null;
+ [kHandlePromise] = (
+ resolve: (value: ReadableIteratorResult) => void,
+ reject: (value: Error) => void,
+ ) => {
+ const data = this[kStream].read();
+ if (data) {
+ this[kLastPromise] = null;
+ this[kLastResolve] = null;
+ this[kLastReject] = null;
+ resolve(createIterResult(data, false));
+ } else {
+ this[kLastResolve] = resolve;
+ this[kLastReject] = reject;
+ }
+ };
+ [kLastPromise]: null | Promise<ReadableIteratorResult>;
+ [kLastReject]: null | ((value: Error) => void) = null;
+ [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null;
+ [kStream]: Readable;
+ [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator];
+
+ constructor(stream: Readable) {
+ this[kEnded] = stream.readableEnded || stream._readableState.endEmitted;
+ this[kStream] = stream;
+ initIteratorSymbols(this, [
+ kEnded,
+ kError,
+ kHandlePromise,
+ kLastPromise,
+ kLastReject,
+ kLastResolve,
+ kStream,
+ ]);
+ }
+
+ get stream() {
+ return this[kStream];
+ }
+
+ next(): Promise<ReadableIteratorResult> {
+ const error = this[kError];
+ if (error !== null) {
+ return Promise.reject(error);
+ }
+
+ if (this[kEnded]) {
+ return Promise.resolve(createIterResult(undefined, true));
+ }
+
+ if (this[kStream].destroyed) {
+ return new Promise((resolve, reject) => {
+ if (this[kError]) {
+ reject(this[kError]);
+ } else if (this[kEnded]) {
+ resolve(createIterResult(undefined, true));
+ } else {
+ finished(this[kStream], (err) => {
+ if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
+ reject(err);
+ } else {
+ resolve(createIterResult(undefined, true));
+ }
+ });
+ }
+ });
+ }
+
+ const lastPromise = this[kLastPromise];
+ let promise;
+
+ if (lastPromise) {
+ promise = new Promise(wrapForNext(lastPromise, this));
+ } else {
+ const data = this[kStream].read();
+ if (data !== null) {
+ return Promise.resolve(createIterResult(data, false));
+ }
+
+ promise = new Promise(this[kHandlePromise]);
+ }
+
+ this[kLastPromise] = promise;
+
+ return promise;
+ }
+
+ return(): Promise<ReadableIteratorResult> {
+ return finish(this);
+ }
+
+ throw(err: Error): Promise<ReadableIteratorResult> {
+ return finish(this, err);
+ }
+}
+
+const createReadableStreamAsyncIterator = (stream: IterableStreams) => {
+ // deno-lint-ignore no-explicit-any
+ if (typeof (stream as any).read !== "function") {
+ const src = stream;
+ stream = new Readable({ objectMode: true }).wrap(src);
+ finished(stream, (err) => destroyer(src, err));
+ }
+
+ const iterator = new ReadableStreamAsyncIterator(stream as Readable);
+ iterator[kLastPromise] = null;
+
+ finished(stream, { writable: false }, (err) => {
+ if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
+ const reject = iterator[kLastReject];
+ if (reject !== null) {
+ iterator[kLastPromise] = null;
+ iterator[kLastResolve] = null;
+ iterator[kLastReject] = null;
+ reject(err);
+ }
+ iterator[kError] = err;
+ return;
+ }
+
+ const resolve = iterator[kLastResolve];
+ if (resolve !== null) {
+ iterator[kLastPromise] = null;
+ iterator[kLastResolve] = null;
+ iterator[kLastReject] = null;
+ resolve(createIterResult(undefined, true));
+ }
+ iterator[kEnded] = true;
+ });
+
+ stream.on("readable", onReadable.bind(null, iterator));
+
+ return iterator;
+};
+
+export default createReadableStreamAsyncIterator;