diff options
Diffstat (limited to 'cli/tests/unit/streams_test.ts')
-rw-r--r-- | cli/tests/unit/streams_test.ts | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index c62c48469..4ecea39da 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -77,6 +77,22 @@ function emptyStream(onPull: boolean) { }).pipeThrough(new TextEncoderStream()); } +function largePacketStream(packetSize: number, count: number) { + return new ReadableStream({ + pull(controller) { + if (count-- > 0) { + const buffer = new Uint8Array(packetSize); + for (let i = 0; i < 256; i++) { + buffer[i * (packetSize / 256)] = i; + } + controller.enqueue(buffer); + } else { + controller.close(); + } + }, + }); +} + // Include an empty chunk function emptyChunkStream() { return new ReadableStream({ @@ -260,6 +276,61 @@ Deno.test(async function readableStreamWithEmptyChunkOneByOne() { core.ops.op_close(rid); }); +// Ensure that we correctly transmit all the sub-chunks of the larger chunks. +Deno.test(async function readableStreamReadSmallerChunks() { + const packetSize = 16 * 1024; + const rid = resourceForReadableStream(largePacketStream(packetSize, 1)); + const buffer = new Uint8Array(packetSize); + for (let i = 0; i < packetSize / 1024; i++) { + await core.ops.op_read(rid, buffer.subarray(i * 1024, i * 1024 + 1024)); + } + for (let i = 0; i < 256; i++) { + assertEquals( + i, + buffer[i * (packetSize / 256)], + `at index ${i * (packetSize / 256)}`, + ); + } + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamLargePackets() { + const packetSize = 128 * 1024; + const rid = resourceForReadableStream(largePacketStream(packetSize, 1024)); + for (let i = 0; i < 1024; i++) { + const buffer = new Uint8Array(packetSize); + assertEquals(packetSize, await core.ops.op_read(rid, buffer)); + for (let i = 0; i < 256; i++) { + assertEquals( + i, + buffer[i * (packetSize / 256)], + `at index ${i * (packetSize / 256)}`, + ); + } + } + assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1))); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamVeryLargePackets() { + // 1024 packets of 1MB + const rid = resourceForReadableStream(largePacketStream(1024 * 1024, 1024)); + let total = 0; + // Read 96kB up to 12,288 times (96kB is not an even multiple of the 1MB packet size to test this) + const readCounts: Record<number, number> = {}; + for (let i = 0; i < 12 * 1024; i++) { + const nread = await core.ops.op_read(rid, new Uint8Array(96 * 1024)); + total += nread; + readCounts[nread] = (readCounts[nread] || 0) + 1; + if (nread == 0) { + break; + } + } + assertEquals({ 0: 1, 65536: 1024, 98304: 10 * 1024 }, readCounts); + assertEquals(total, 1024 * 1024 * 1024); + core.ops.op_close(rid); +}); + for (const count of [0, 1, 2, 3]) { for (const delay of [0, 1, 10]) { // Creating a stream that errors in start will throw |