summaryrefslogtreecommitdiff
path: root/std/node/_stream/async_iterator.ts
diff options
context:
space:
mode:
authorCasper Beyer <caspervonb@pm.me>2021-02-02 19:05:46 +0800
committerGitHub <noreply@github.com>2021-02-02 12:05:46 +0100
commit6abf126c2a7a451cded8c6b5e6ddf1b69c84055d (patch)
treefd94c013a19fcb38954844085821ec1601c20e18 /std/node/_stream/async_iterator.ts
parenta2b5d44f1aa9d64f448a2a3cc2001272e2f60b98 (diff)
chore: remove std directory (#9361)
This removes the std folder from the tree. Various parts of the tests are pretty tightly dependent on std (47 direct imports and 75 indirect imports, not counting the cli tests that use them as fixtures) so I've added std as a submodule for now.
Diffstat (limited to 'std/node/_stream/async_iterator.ts')
-rw-r--r--std/node/_stream/async_iterator.ts243
1 files changed, 0 insertions, 243 deletions
diff --git a/std/node/_stream/async_iterator.ts b/std/node/_stream/async_iterator.ts
deleted file mode 100644
index 5369ef39c..000000000
--- a/std/node/_stream/async_iterator.ts
+++ /dev/null
@@ -1,243 +0,0 @@
-// Copyright Node.js contributors. All rights reserved. MIT License.
-import type { Buffer } from "../buffer.ts";
-import finished from "./end_of_stream.ts";
-import Readable from "./readable.ts";
-import type Stream from "./stream.ts";
-import { destroyer } from "./destroy.ts";
-
-const kLastResolve = Symbol("lastResolve");
-const kLastReject = Symbol("lastReject");
-const kError = Symbol("error");
-const kEnded = Symbol("ended");
-const kLastPromise = Symbol("lastPromise");
-const kHandlePromise = Symbol("handlePromise");
-const kStream = Symbol("stream");
-
-// TODO(Soremwar)
-// Add Duplex streams
-type IterableStreams = Stream | Readable;
-
-type IterableItem = Buffer | string | Uint8Array | undefined;
-type ReadableIteratorResult = IteratorResult<IterableItem>;
-
-function initIteratorSymbols(
- o: ReadableStreamAsyncIterator,
- symbols: symbol[],
-) {
- const properties: PropertyDescriptorMap = {};
- for (const sym in symbols) {
- properties[sym] = {
- configurable: false,
- enumerable: false,
- writable: true,
- };
- }
- Object.defineProperties(o, properties);
-}
-
-function createIterResult(
- value: IterableItem,
- done: boolean,
-): ReadableIteratorResult {
- return { value, done };
-}
-
-function readAndResolve(iter: ReadableStreamAsyncIterator) {
- const resolve = iter[kLastResolve];
- if (resolve !== null) {
- const data = iter[kStream].read();
- if (data !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- resolve(createIterResult(data, false));
- }
- }
-}
-
-function onReadable(iter: ReadableStreamAsyncIterator) {
- queueMicrotask(() => readAndResolve(iter));
-}
-
-function wrapForNext(
- lastPromise: Promise<ReadableIteratorResult>,
- iter: ReadableStreamAsyncIterator,
-) {
- return (
- resolve: (value: ReadableIteratorResult) => void,
- reject: (error: Error) => void,
- ) => {
- lastPromise.then(() => {
- if (iter[kEnded]) {
- resolve(createIterResult(undefined, true));
- return;
- }
-
- iter[kHandlePromise](resolve, reject);
- }, reject);
- };
-}
-
-function finish(self: ReadableStreamAsyncIterator, err?: Error) {
- return new Promise(
- (
- resolve: (result: ReadableIteratorResult) => void,
- reject: (error: Error) => void,
- ) => {
- const stream = self[kStream];
-
- finished(stream, (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- destroyer(stream, err);
- },
- );
-}
-
-const AsyncIteratorPrototype = Object.getPrototypeOf(
- Object.getPrototypeOf(async function* () {}).prototype,
-);
-
-export class ReadableStreamAsyncIterator
- implements AsyncIterableIterator<IterableItem> {
- [kEnded]: boolean;
- [kError]: Error | null = null;
- [kHandlePromise] = (
- resolve: (value: ReadableIteratorResult) => void,
- reject: (value: Error) => void,
- ) => {
- const data = this[kStream].read();
- if (data) {
- this[kLastPromise] = null;
- this[kLastResolve] = null;
- this[kLastReject] = null;
- resolve(createIterResult(data, false));
- } else {
- this[kLastResolve] = resolve;
- this[kLastReject] = reject;
- }
- };
- [kLastPromise]: null | Promise<ReadableIteratorResult>;
- [kLastReject]: null | ((value: Error) => void) = null;
- [kLastResolve]: null | ((value: ReadableIteratorResult) => void) = null;
- [kStream]: Readable;
- [Symbol.asyncIterator] = AsyncIteratorPrototype[Symbol.asyncIterator];
-
- constructor(stream: Readable) {
- this[kEnded] = stream.readableEnded || stream._readableState.endEmitted;
- this[kStream] = stream;
- initIteratorSymbols(this, [
- kEnded,
- kError,
- kHandlePromise,
- kLastPromise,
- kLastReject,
- kLastResolve,
- kStream,
- ]);
- }
-
- get stream() {
- return this[kStream];
- }
-
- next(): Promise<ReadableIteratorResult> {
- const error = this[kError];
- if (error !== null) {
- return Promise.reject(error);
- }
-
- if (this[kEnded]) {
- return Promise.resolve(createIterResult(undefined, true));
- }
-
- if (this[kStream].destroyed) {
- return new Promise((resolve, reject) => {
- if (this[kError]) {
- reject(this[kError]);
- } else if (this[kEnded]) {
- resolve(createIterResult(undefined, true));
- } else {
- finished(this[kStream], (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- }
- });
- }
-
- const lastPromise = this[kLastPromise];
- let promise;
-
- if (lastPromise) {
- promise = new Promise(wrapForNext(lastPromise, this));
- } else {
- const data = this[kStream].read();
- if (data !== null) {
- return Promise.resolve(createIterResult(data, false));
- }
-
- promise = new Promise(this[kHandlePromise]);
- }
-
- this[kLastPromise] = promise;
-
- return promise;
- }
-
- return(): Promise<ReadableIteratorResult> {
- return finish(this);
- }
-
- throw(err: Error): Promise<ReadableIteratorResult> {
- return finish(this, err);
- }
-}
-
-const createReadableStreamAsyncIterator = (stream: IterableStreams) => {
- // deno-lint-ignore no-explicit-any
- if (typeof (stream as any).read !== "function") {
- const src = stream;
- stream = new Readable({ objectMode: true }).wrap(src);
- finished(stream, (err) => destroyer(src, err));
- }
-
- const iterator = new ReadableStreamAsyncIterator(stream as Readable);
- iterator[kLastPromise] = null;
-
- finished(stream, { writable: false }, (err) => {
- if (err && err.code !== "ERR_STREAM_PREMATURE_CLOSE") {
- const reject = iterator[kLastReject];
- if (reject !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- reject(err);
- }
- iterator[kError] = err;
- return;
- }
-
- const resolve = iterator[kLastResolve];
- if (resolve !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(undefined, true));
- }
- iterator[kEnded] = true;
- });
-
- stream.on("readable", onReadable.bind(null, iterator));
-
- return iterator;
-};
-
-export default createReadableStreamAsyncIterator;