summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcos Casagrande <marcoscvp90@gmail.com>2020-06-27 22:55:01 +0200
committerGitHub <noreply@github.com>2020-06-27 16:55:01 -0400
commita829fa8f57a2063492aab564ec1f15da21eb851c (patch)
treeb1516b8a6eeeedabc8dd754d98a83c5b35cddeab
parenta216bd06fc7dfb4a136e9fc04ae119c2e4801b6e (diff)
feat(std/io): add fromStreamReader, fromStreamWriter (#5789)
-rw-r--r--std/io/streams.ts34
-rw-r--r--std/io/streams_test.ts134
2 files changed, 168 insertions, 0 deletions
diff --git a/std/io/streams.ts b/std/io/streams.ts
new file mode 100644
index 000000000..3969746ef
--- /dev/null
+++ b/std/io/streams.ts
@@ -0,0 +1,34 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+export function fromStreamWriter(
+ streamWriter: WritableStreamDefaultWriter<Uint8Array>
+): Deno.Writer {
+ return {
+ async write(p: Uint8Array): Promise<number> {
+ await streamWriter.ready;
+ await streamWriter.write(p);
+ return p.length;
+ },
+ };
+}
+
+export function fromStreamReader(
+ streamReader: ReadableStreamDefaultReader<Uint8Array>
+): Deno.Reader {
+ const buffer = new Deno.Buffer();
+
+ return {
+ async read(p: Uint8Array): Promise<number | null> {
+ if (buffer.empty()) {
+ const res = await streamReader.read();
+ if (res.done) {
+ return null; // EOF
+ }
+
+ await Deno.writeAll(buffer, res.value);
+ }
+
+ return buffer.read(p);
+ },
+ };
+}
diff --git a/std/io/streams_test.ts b/std/io/streams_test.ts
new file mode 100644
index 000000000..00d056e2f
--- /dev/null
+++ b/std/io/streams_test.ts
@@ -0,0 +1,134 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+import { assertEquals, assert } from "../testing/asserts.ts";
+import { fromStreamWriter, fromStreamReader } from "./streams.ts";
+
+function repeat(c: string, bytes: number): Uint8Array {
+ assertEquals(c.length, 1);
+ const ui8 = new Uint8Array(bytes);
+ ui8.fill(c.charCodeAt(0));
+ return ui8;
+}
+
+Deno.test("toWriterCheck", async function (): Promise<void> {
+ const written: string[] = [];
+ const chunks: string[] = ["hello", "deno", "land"];
+ const writableStream = new WritableStream({
+ write(chunk): void {
+ const decoder = new TextDecoder();
+ written.push(decoder.decode(chunk));
+ },
+ });
+
+ const encoder = new TextEncoder();
+ const writer = fromStreamWriter(writableStream.getWriter());
+
+ for (const chunk of chunks) {
+ const n = await writer.write(encoder.encode(chunk));
+ // stream writers always write all the bytes
+ assertEquals(n, chunk.length);
+ }
+
+ assertEquals(written, chunks);
+});
+
+Deno.test("toReaderCheck", async function (): Promise<void> {
+ const chunks: string[] = ["hello", "deno", "land"];
+ const expected = chunks.slice();
+ const readChunks: Uint8Array[] = [];
+ const readableStream = new ReadableStream({
+ pull(controller): void {
+ const encoder = new TextEncoder();
+ const chunk = chunks.shift();
+ if (!chunk) return controller.close();
+ controller.enqueue(encoder.encode(chunk));
+ },
+ });
+
+ const decoder = new TextDecoder();
+ const reader = fromStreamReader(readableStream.getReader());
+
+ let i = 0;
+
+ while (true) {
+ const b = new Uint8Array(1024);
+ const n = await reader.read(b);
+
+ if (n === null) break;
+
+ readChunks.push(b.subarray(0, n));
+ assert(i < expected.length);
+
+ i++;
+ }
+
+ assertEquals(
+ expected,
+ readChunks.map((chunk) => decoder.decode(chunk))
+ );
+});
+
+Deno.test("toReaderBigChunksCheck", async function (): Promise<void> {
+ const bufSize = 1024;
+ const chunkSize = 3 * bufSize;
+ const writer = new Deno.Buffer();
+
+ // A readable stream can enqueue chunks bigger than Copy bufSize
+ // Reader returned by toReader should enqueue exceeding bytes
+ const chunks: string[] = [
+ "a".repeat(chunkSize),
+ "b".repeat(chunkSize),
+ "c".repeat(chunkSize),
+ ];
+ const expected = chunks.slice();
+ const readableStream = new ReadableStream({
+ pull(controller): void {
+ const encoder = new TextEncoder();
+ const chunk = chunks.shift();
+ if (!chunk) return controller.close();
+
+ controller.enqueue(encoder.encode(chunk));
+ },
+ });
+
+ const reader = fromStreamReader(readableStream.getReader());
+ const n = await Deno.copy(reader, writer, { bufSize });
+
+ const expectedWritten = chunkSize * expected.length;
+ assertEquals(n, chunkSize * expected.length);
+ assertEquals(writer.length, expectedWritten);
+});
+
+Deno.test("toReaderBigIrregularChunksCheck", async function (): Promise<void> {
+ const bufSize = 1024;
+ const chunkSize = 3 * bufSize;
+ const writer = new Deno.Buffer();
+
+ // A readable stream can enqueue chunks bigger than Copy bufSize
+ // Reader returned by toReader should enqueue exceeding bytes
+ const chunks: Uint8Array[] = [
+ repeat("a", chunkSize),
+ repeat("b", chunkSize + 253),
+ repeat("c", chunkSize + 8),
+ ];
+ const expected = new Uint8Array(
+ chunks
+ .slice()
+ .map((chunk) => [...chunk])
+ .flat()
+ );
+ const readableStream = new ReadableStream({
+ pull(controller): void {
+ const chunk = chunks.shift();
+ if (!chunk) return controller.close();
+
+ controller.enqueue(chunk);
+ },
+ });
+
+ const reader = fromStreamReader(readableStream.getReader());
+
+ const n = await Deno.copy(reader, writer, { bufSize });
+ assertEquals(n, expected.length);
+ assertEquals(expected, writer.bytes());
+});