summaryrefslogtreecommitdiff
path: root/cli/tests/unit/streams_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tests/unit/streams_test.ts')
-rw-r--r--cli/tests/unit/streams_test.ts71
1 files changed, 71 insertions, 0 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts
index c62c48469..4ecea39da 100644
--- a/cli/tests/unit/streams_test.ts
+++ b/cli/tests/unit/streams_test.ts
@@ -77,6 +77,22 @@ function emptyStream(onPull: boolean) {
}).pipeThrough(new TextEncoderStream());
}
+function largePacketStream(packetSize: number, count: number) {
+ return new ReadableStream({
+ pull(controller) {
+ if (count-- > 0) {
+ const buffer = new Uint8Array(packetSize);
+ for (let i = 0; i < 256; i++) {
+ buffer[i * (packetSize / 256)] = i;
+ }
+ controller.enqueue(buffer);
+ } else {
+ controller.close();
+ }
+ },
+ });
+}
+
// Include an empty chunk
function emptyChunkStream() {
return new ReadableStream({
@@ -260,6 +276,61 @@ Deno.test(async function readableStreamWithEmptyChunkOneByOne() {
core.ops.op_close(rid);
});
+// Ensure that we correctly transmit all the sub-chunks of the larger chunks.
+Deno.test(async function readableStreamReadSmallerChunks() {
+ const packetSize = 16 * 1024;
+ const rid = resourceForReadableStream(largePacketStream(packetSize, 1));
+ const buffer = new Uint8Array(packetSize);
+ for (let i = 0; i < packetSize / 1024; i++) {
+ await core.ops.op_read(rid, buffer.subarray(i * 1024, i * 1024 + 1024));
+ }
+ for (let i = 0; i < 256; i++) {
+ assertEquals(
+ i,
+ buffer[i * (packetSize / 256)],
+ `at index ${i * (packetSize / 256)}`,
+ );
+ }
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamLargePackets() {
+ const packetSize = 128 * 1024;
+ const rid = resourceForReadableStream(largePacketStream(packetSize, 1024));
+ for (let i = 0; i < 1024; i++) {
+ const buffer = new Uint8Array(packetSize);
+ assertEquals(packetSize, await core.ops.op_read(rid, buffer));
+ for (let i = 0; i < 256; i++) {
+ assertEquals(
+ i,
+ buffer[i * (packetSize / 256)],
+ `at index ${i * (packetSize / 256)}`,
+ );
+ }
+ }
+ assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1)));
+ core.ops.op_close(rid);
+});
+
+Deno.test(async function readableStreamVeryLargePackets() {
+ // 1024 packets of 1MB
+ const rid = resourceForReadableStream(largePacketStream(1024 * 1024, 1024));
+ let total = 0;
+ // Read 96kB up to 12,288 times (96kB is not an even multiple of the 1MB packet size to test this)
+ const readCounts: Record<number, number> = {};
+ for (let i = 0; i < 12 * 1024; i++) {
+ const nread = await core.ops.op_read(rid, new Uint8Array(96 * 1024));
+ total += nread;
+ readCounts[nread] = (readCounts[nread] || 0) + 1;
+ if (nread == 0) {
+ break;
+ }
+ }
+ assertEquals({ 0: 1, 65536: 1024, 98304: 10 * 1024 }, readCounts);
+ assertEquals(total, 1024 * 1024 * 1024);
+ core.ops.op_close(rid);
+});
+
for (const count of [0, 1, 2, 3]) {
for (const delay of [0, 1, 10]) {
// Creating a stream that errors in start will throw