summaryrefslogtreecommitdiff
path: root/std/node/_stream/stream.ts
diff options
context:
space:
mode:
authorSteven Guerrero <stephenguerrero43@gmail.com>2020-11-21 16:13:18 -0500
committerGitHub <noreply@github.com>2020-11-21 16:13:18 -0500
commita4f27c4d570ad9b47bbd560fbf9b017f852fc29f (patch)
tree84b3b0111fca262932aa2be7f1ef884fc6d5ddc3 /std/node/_stream/stream.ts
parentce890f2ae7e8b557211e8f529180d30dc44ea7b5 (diff)
feat(std/node): Add Readable Stream / Writable Stream / errors support (#7569)
Diffstat (limited to 'std/node/_stream/stream.ts')
-rw-r--r--std/node/_stream/stream.ts79
1 files changed, 79 insertions, 0 deletions
diff --git a/std/node/_stream/stream.ts b/std/node/_stream/stream.ts
new file mode 100644
index 000000000..708b8bcd3
--- /dev/null
+++ b/std/node/_stream/stream.ts
@@ -0,0 +1,79 @@
+// Copyright Node.js contributors. All rights reserved. MIT License.
+import { Buffer } from "../buffer.ts";
+import EventEmitter from "../events.ts";
+import type Writable from "./writable.ts";
+import { types } from "../util.ts";
+
+class Stream extends EventEmitter {
+ constructor() {
+ super();
+ }
+
+ static _isUint8Array = types.isUint8Array;
+ static _uint8ArrayToBuffer = (chunk: Uint8Array) => Buffer.from(chunk);
+
+ pipe(dest: Writable, options: { end: boolean }) {
+ // deno-lint-ignore no-this-alias
+ const source = this;
+
+ //TODO(Soremwar)
+ //isStdio exist on stdin || stdout only, which extend from Duplex
+ //if (!dest._isStdio && (options?.end ?? true)) {
+ //Find an alternative to be able to pipe streams to stdin & stdout
+ //Port them as well?
+ if (options?.end ?? true) {
+ source.on("end", onend);
+ source.on("close", onclose);
+ }
+
+ let didOnEnd = false;
+ function onend() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ dest.end();
+ }
+
+ function onclose() {
+ if (didOnEnd) return;
+ didOnEnd = true;
+
+ if (typeof dest.destroy === "function") dest.destroy();
+ }
+
+ // Don't leave dangling pipes when there are errors.
+ function onerror(this: Stream, er: Error) {
+ cleanup();
+ if (this.listenerCount("error") === 0) {
+ throw er; // Unhandled stream error in pipe.
+ }
+ }
+
+ source.on("error", onerror);
+ dest.on("error", onerror);
+
+ // Remove all the event listeners that were added.
+ function cleanup() {
+ source.removeListener("end", onend);
+ source.removeListener("close", onclose);
+
+ source.removeListener("error", onerror);
+ dest.removeListener("error", onerror);
+
+ source.removeListener("end", cleanup);
+ source.removeListener("close", cleanup);
+
+ dest.removeListener("close", cleanup);
+ }
+
+ source.on("end", cleanup);
+ source.on("close", cleanup);
+
+ dest.on("close", cleanup);
+ dest.emit("pipe", source);
+
+ return dest;
+ }
+}
+
+export default Stream;