summaryrefslogtreecommitdiff
path: root/cli/js/tests/streams_piping_test.ts
blob: a947b3821146dc27f5d6fb73233625922a788a82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
import { unitTest, assert, assertEquals } from "./test_util.ts";
import { assertThrowsAsync } from "../../../std/testing/asserts.ts";

unitTest(function streamPipeLocks() {
  const rs = new ReadableStream();
  const ws = new WritableStream();

  assertEquals(rs.locked, false);
  assertEquals(ws.locked, false);

  rs.pipeTo(ws);

  assert(rs.locked);
  assert(ws.locked);
});

unitTest(async function streamPipeFinishUnlocks() {
  const rs = new ReadableStream({
    start(controller: ReadableStreamDefaultController): void {
      controller.close();
    },
  });
  const ws = new WritableStream();

  await rs.pipeTo(ws);
  assertEquals(rs.locked, false);
  assertEquals(ws.locked, false);
});

unitTest(async function streamPipeReadableStreamLocked() {
  const rs = new ReadableStream();
  const ws = new WritableStream();

  rs.getReader();

  await assertThrowsAsync(async () => {
    await rs.pipeTo(ws);
  }, TypeError);
});

unitTest(async function streamPipeReadableStreamLocked() {
  const rs = new ReadableStream();
  const ws = new WritableStream();

  ws.getWriter();

  await assertThrowsAsync(async () => {
    await rs.pipeTo(ws);
  }, TypeError);
});

unitTest(async function streamPipeLotsOfChunks() {
  const CHUNKS = 10;

  const rs = new ReadableStream<number>({
    start(c: ReadableStreamDefaultController): void {
      for (let i = 0; i < CHUNKS; ++i) {
        c.enqueue(i);
      }
      c.close();
    },
  });

  const written: Array<string | number> = [];
  const ws = new WritableStream(
    {
      write(chunk: number): void {
        written.push(chunk);
      },
      close(): void {
        written.push("closed");
      },
    },
    new CountQueuingStrategy({ highWaterMark: CHUNKS })
  );

  await rs.pipeTo(ws);
  const targetValues = [];
  for (let i = 0; i < CHUNKS; ++i) {
    targetValues.push(i);
  }
  targetValues.push("closed");

  assertEquals(written, targetValues, "the correct values must be written");

  // Ensure both readable and writable are closed by the time the pipe finishes.
  await Promise.all([rs.getReader().closed, ws.getWriter().closed]);
});

for (const preventAbort of [true, false]) {
  unitTest(function undefinedRejectionFromPull() {
    const rs = new ReadableStream({
      pull(): Promise<void> {
        return Promise.reject(undefined);
      },
    });

    return rs.pipeTo(new WritableStream(), { preventAbort }).then(
      () => {
        throw new Error("pipeTo promise should be rejected");
      },
      (value) =>
        assertEquals(value, undefined, "rejection value should be undefined")
    );
  });
}

for (const preventCancel of [true, false]) {
  unitTest(function undefinedRejectionWithPreventCancel() {
    const rs = new ReadableStream({
      pull(controller: ReadableStreamDefaultController<number>): void {
        controller.enqueue(0);
      },
    });

    const ws = new WritableStream({
      write(): Promise<void> {
        return Promise.reject(undefined);
      },
    });

    return rs.pipeTo(ws, { preventCancel }).then(
      () => {
        throw new Error("pipeTo promise should be rejected");
      },
      (value) =>
        assertEquals(value, undefined, "rejection value should be undefined")
    );
  });
}