diff options
| author | Bert Belder <bertbelder@gmail.com> | 2019-05-23 19:04:06 -0700 |
|---|---|---|
| committer | Bert Belder <bertbelder@gmail.com> | 2019-05-29 09:50:12 -0700 |
| commit | b95f79d74cbcf3492abd95d4c90839e32f51399f (patch) | |
| tree | d0c68f01c798da1e3b81930cfa58a5370c56775f /io | |
| parent | 5b37b560fb047e1df6e6f68fcbaece922637a93c (diff) | |
io: refactor BufReader/Writer interfaces to be more idiomatic (denoland/deno_std#444)
Thanks Vincent Le Goff (@zekth) for porting over the CSV reader
implementation.
Fixes: denoland/deno_std#436
Original: https://github.com/denoland/deno_std/commit/0ee6334b698072b50c6f5ac8d42d34dc4c94b948
Diffstat (limited to 'io')
| -rw-r--r-- | io/bufio.ts | 392 | ||||
| -rw-r--r-- | io/bufio_test.ts | 112 |
2 files changed, 276 insertions, 228 deletions
diff --git a/io/bufio.ts b/io/bufio.ts index 749a7e8fa..815c94eed 100644 --- a/io/bufio.ts +++ b/io/bufio.ts @@ -15,13 +15,28 @@ const MAX_CONSECUTIVE_EMPTY_READS = 100; const CR = charCode("\r"); const LF = charCode("\n"); -export type BufState = - | null - | "EOF" - | "BufferFull" - | "ShortWrite" - | "NoProgress" - | Error; +export class BufferFullError extends Error { + name = "BufferFullError"; + constructor(public partial: Uint8Array) { + super("Buffer full"); + } +} + +export class UnexpectedEOFError extends Error { + name = "UnexpectedEOFError"; + constructor() { + super("Unexpected EOF"); + } +} + +export const EOF: unique symbol = Symbol("EOF"); +export type EOF = typeof EOF; + +/** Result type returned by of BufReader.readLine(). */ +export interface ReadLineResult { + line: Uint8Array; + more: boolean; +} /** BufReader implements buffering for a Reader object. */ export class BufReader implements Reader { @@ -29,9 +44,9 @@ export class BufReader implements Reader { private rd: Reader; // Reader provided by caller. private r = 0; // buf read position. private w = 0; // buf write position. - private lastByte: number; - private lastCharSize: number; - private err: BufState; + private eof = false; + // private lastByte: number; + // private lastCharSize: number; /** return new BufReader unless r is BufReader */ static create(r: Reader, size = DEFAULT_BUF_SIZE): BufReader { @@ -54,12 +69,6 @@ export class BufReader implements Reader { return this.w - this.r; } - private _readErr(): BufState { - const err = this.err; - this.err = null; - return err; - } - // Reads a new chunk into the buffer. private async _fill(): Promise<void> { // Slide existing data to beginning. @@ -75,24 +84,21 @@ export class BufReader implements Reader { // Read new data: try a limited number of times. for (let i = MAX_CONSECUTIVE_EMPTY_READS; i > 0; i--) { - let rr: ReadResult; - try { - rr = await this.rd.read(this.buf.subarray(this.w)); - } catch (e) { - this.err = e; - return; - } + let rr: ReadResult = await this.rd.read(this.buf.subarray(this.w)); assert(rr.nread >= 0, "negative read"); this.w += rr.nread; if (rr.eof) { - this.err = "EOF"; + this.eof = true; return; } if (rr.nread > 0) { return; } } - this.err = "NoProgress"; + + throw new Error( + `No progress after ${MAX_CONSECUTIVE_EMPTY_READS} read() calls` + ); } /** Discards any buffered data, resets all state, and switches @@ -105,108 +111,96 @@ export class BufReader implements Reader { private _reset(buf: Uint8Array, rd: Reader): void { this.buf = buf; this.rd = rd; - this.lastByte = -1; - // this.lastRuneSize = -1; + this.eof = false; + // this.lastByte = -1; + // this.lastCharSize = -1; } /** reads data into p. * It returns the number of bytes read into p. * The bytes are taken from at most one Read on the underlying Reader, * hence n may be less than len(p). - * At EOF, the count will be zero and err will be io.EOF. * To read exactly len(p) bytes, use io.ReadFull(b, p). */ async read(p: Uint8Array): Promise<ReadResult> { let rr: ReadResult = { nread: p.byteLength, eof: false }; - if (rr.nread === 0) { - if (this.err) { - throw this._readErr(); - } - return rr; - } + if (p.byteLength === 0) return rr; if (this.r === this.w) { - if (this.err) { - throw this._readErr(); - } if (p.byteLength >= this.buf.byteLength) { // Large read, empty buffer. // Read directly into p to avoid copy. - rr = await this.rd.read(p); + const rr = await this.rd.read(p); assert(rr.nread >= 0, "negative read"); - if (rr.nread > 0) { - this.lastByte = p[rr.nread - 1]; - // this.lastRuneSize = -1; - } - if (this.err) { - throw this._readErr(); - } + // if (rr.nread > 0) { + // this.lastByte = p[rr.nread - 1]; + // this.lastCharSize = -1; + // } return rr; } + // One read. // Do not use this.fill, which will loop. this.r = 0; this.w = 0; - try { - rr = await this.rd.read(this.buf); - } catch (e) { - this.err = e; - } + rr = await this.rd.read(this.buf); assert(rr.nread >= 0, "negative read"); - if (rr.nread === 0) { - if (this.err) { - throw this._readErr(); - } - return rr; - } + if (rr.nread === 0) return rr; this.w += rr.nread; } // copy as much as we can - rr.nread = copyBytes(p as Uint8Array, this.buf.subarray(this.r, this.w), 0); + rr.nread = copyBytes(p, this.buf.subarray(this.r, this.w), 0); this.r += rr.nread; - this.lastByte = this.buf[this.r - 1]; - // this.lastRuneSize = -1; + // this.lastByte = this.buf[this.r - 1]; + // this.lastCharSize = -1; return rr; } - /** reads exactly len(p) bytes into p. + /** reads exactly `p.length` bytes into `p`. + * + * If successful, `p` is returned. + * + * If the end of the underlying stream has been reached, and there are no more + * bytes available in the buffer, `readFull()` returns `EOF` instead. + * + * An error is thrown if some bytes could be read, but not enough to fill `p` + * entirely before the underlying stream reported an error or EOF. Any error + * thrown will have a `partial` property that indicates the slice of the + * buffer that has been successfully filled with data. + * * Ported from https://golang.org/pkg/io/#ReadFull - * It returns the number of bytes copied and an error if fewer bytes were read. - * The error is EOF only if no bytes were read. - * If an EOF happens after reading some but not all the bytes, - * readFull returns ErrUnexpectedEOF. ("EOF" for current impl) - * On return, n == len(p) if and only if err == nil. - * If r returns an error having read at least len(buf) bytes, - * the error is dropped. */ - async readFull(p: Uint8Array): Promise<[number, BufState]> { - let rr = await this.read(p); - let nread = rr.nread; - if (rr.eof) { - return [nread, nread < p.length ? "EOF" : null]; - } - while (!rr.eof && nread < p.length) { - rr = await this.read(p.subarray(nread)); - nread += rr.nread; + async readFull(p: Uint8Array): Promise<Uint8Array | EOF> { + let bytesRead = 0; + while (bytesRead < p.length) { + try { + const rr = await this.read(p.subarray(bytesRead)); + bytesRead += rr.nread; + if (rr.eof) { + if (bytesRead === 0) { + return EOF; + } else { + throw new UnexpectedEOFError(); + } + } + } catch (err) { + err.partial = p.subarray(0, bytesRead); + throw err; + } } - return [nread, nread < p.length ? "EOF" : null]; + return p; } /** Returns the next byte [0, 255] or -1 if EOF. */ async readByte(): Promise<number> { while (this.r === this.w) { + if (this.eof) return -1; await this._fill(); // buffer is empty. - if (this.err == "EOF") { - return -1; - } - if (this.err != null) { - throw this._readErr(); - } } const c = this.buf[this.r]; this.r++; - this.lastByte = c; + // this.lastByte = c; return c; } @@ -218,46 +212,73 @@ export class BufReader implements Reader { * delim. * For simple uses, a Scanner may be more convenient. */ - async readString(_delim: string): Promise<string> { + async readString(_delim: string): Promise<string | EOF> { throw new Error("Not implemented"); } - /** readLine() is a low-level line-reading primitive. Most callers should use - * readBytes('\n') or readString('\n') instead or use a Scanner. + /** `readLine()` is a low-level line-reading primitive. Most callers should + * use `readString('\n')` instead or use a Scanner. * - * readLine tries to return a single line, not including the end-of-line bytes. - * If the line was too long for the buffer then isPrefix is set and the + * `readLine()` tries to return a single line, not including the end-of-line + * bytes. If the line was too long for the buffer then `more` is set and the * beginning of the line is returned. The rest of the line will be returned - * from future calls. isPrefix will be false when returning the last fragment + * from future calls. `more` will be false when returning the last fragment * of the line. The returned buffer is only valid until the next call to - * ReadLine. ReadLine either returns a non-nil line or it returns an error, - * never both. + * `readLine()`. * - * The text returned from ReadLine does not include the line end ("\r\n" or "\n"). - * No indication or error is given if the input ends without a final line end. - * Calling UnreadByte after ReadLine will always unread the last byte read - * (possibly a character belonging to the line end) even if that byte is not - * part of the line returned by ReadLine. + * The text returned from ReadLine does not include the line end ("\r\n" or + * "\n"). + * + * When the end of the underlying stream is reached, the final bytes in the + * stream are returned. No indication or error is given if the input ends + * without a final line end. When there are no more trailing bytes to read, + * `readLine()` returns the `EOF` symbol. + * + * Calling `unreadByte()` after `readLine()` will always unread the last byte + * read (possibly a character belonging to the line end) even if that byte is + * not part of the line returned by `readLine()`. */ - async readLine(): Promise<[Uint8Array, boolean, BufState]> { - let [line, err] = await this.readSlice(LF); + async readLine(): Promise<ReadLineResult | EOF> { + let line: Uint8Array | EOF; + + try { + line = await this.readSlice(LF); + } catch (err) { + let { partial } = err; + assert( + partial instanceof Uint8Array, + "bufio: caught error from `readSlice()` without `partial` property" + ); + + // Don't throw if `readSlice()` failed with `BufferFullError`, instead we + // just return whatever is available and set the `more` flag. + if (!(err instanceof BufferFullError)) { + throw err; + } - if (err === "BufferFull") { // Handle the case where "\r\n" straddles the buffer. - if (line.byteLength > 0 && line[line.byteLength - 1] === CR) { + if ( + !this.eof && + partial.byteLength > 0 && + partial[partial.byteLength - 1] === CR + ) { // Put the '\r' back on buf and drop it from line. // Let the next call to ReadLine check for "\r\n". assert(this.r > 0, "bufio: tried to rewind past start of buffer"); this.r--; - line = line.subarray(0, line.byteLength - 1); + partial = partial.subarray(0, partial.byteLength - 1); } - return [line, true, null]; + + return { line: partial, more: !this.eof }; + } + + if (line === EOF) { + return EOF; } if (line.byteLength === 0) { - return [line, false, err]; + return { line, more: false }; } - err = null; if (line[line.byteLength - 1] == LF) { let drop = 1; @@ -266,98 +287,112 @@ export class BufReader implements Reader { } line = line.subarray(0, line.byteLength - drop); } - return [line, false, err]; + return { line, more: false }; } - /** readSlice() reads until the first occurrence of delim in the input, + /** `readSlice()` reads until the first occurrence of `delim` in the input, * returning a slice pointing at the bytes in the buffer. The bytes stop - * being valid at the next read. If readSlice() encounters an error before - * finding a delimiter, it returns all the data in the buffer and the error - * itself (often io.EOF). readSlice() fails with error ErrBufferFull if the - * buffer fills without a delim. Because the data returned from readSlice() - * will be overwritten by the next I/O operation, most clients should use - * readBytes() or readString() instead. readSlice() returns err != nil if and - * only if line does not end in delim. + * being valid at the next read. + * + * If `readSlice()` encounters an error before finding a delimiter, or the + * buffer fills without finding a delimiter, it throws an error with a + * `partial` property that contains the entire buffer. + * + * If `readSlice()` encounters the end of the underlying stream and there are + * any bytes left in the buffer, the rest of the buffer is returned. In other + * words, EOF is always treated as a delimiter. Once the buffer is empty, + * it returns `EOF`. + * + * Because the data returned from `readSlice()` will be overwritten by the + * next I/O operation, most clients should use `readString()` instead. */ - async readSlice(delim: number): Promise<[Uint8Array, BufState]> { + async readSlice(delim: number): Promise<Uint8Array | EOF> { let s = 0; // search start index - let line: Uint8Array; - let err: BufState; + let slice: Uint8Array; + while (true) { // Search buffer. let i = this.buf.subarray(this.r + s, this.w).indexOf(delim); if (i >= 0) { i += s; - line = this.buf.subarray(this.r, this.r + i + 1); + slice = this.buf.subarray(this.r, this.r + i + 1); this.r += i + 1; break; } - // Pending error? - if (this.err) { - line = this.buf.subarray(this.r, this.w); + // EOF? + if (this.eof) { + if (this.r === this.w) { + return EOF; + } + slice = this.buf.subarray(this.r, this.w); this.r = this.w; - err = this._readErr(); break; } // Buffer full? if (this.buffered() >= this.buf.byteLength) { this.r = this.w; - line = this.buf; - err = "BufferFull"; - break; + throw new BufferFullError(this.buf); } s = this.w - this.r; // do not rescan area we scanned before - await this._fill(); // buffer is not full + // Buffer is not full. + try { + await this._fill(); + } catch (err) { + err.partial = slice; + throw err; + } } // Handle last byte, if any. - let i = line.byteLength - 1; - if (i >= 0) { - this.lastByte = line[i]; - // this.lastRuneSize = -1 - } + // const i = slice.byteLength - 1; + // if (i >= 0) { + // this.lastByte = slice[i]; + // this.lastCharSize = -1 + // } - return [line, err]; + return slice; } - /** Peek returns the next n bytes without advancing the reader. The bytes stop - * being valid at the next read call. If Peek returns fewer than n bytes, it - * also returns an error explaining why the read is short. The error is - * ErrBufferFull if n is larger than b's buffer size. + /** `peek()` returns the next `n` bytes without advancing the reader. The + * bytes stop being valid at the next read call. + * + * When the end of the underlying stream is reached, but there are unread + * bytes left in the buffer, those bytes are returned. If there are no bytes + * left in the buffer, it returns `EOF`. + * + * If an error is encountered before `n` bytes are available, `peek()` throws + * an error with the `partial` property set to a slice of the buffer that + * contains the bytes that were available before the error occurred. */ - async peek(n: number): Promise<[Uint8Array, BufState]> { + async peek(n: number): Promise<Uint8Array | EOF> { if (n < 0) { throw Error("negative count"); } - while ( - this.w - this.r < n && - this.w - this.r < this.buf.byteLength && - this.err == null - ) { - await this._fill(); // this.w - this.r < len(this.buf) => buffer is not full + let avail = this.w - this.r; + while (avail < n && avail < this.buf.byteLength && !this.eof) { + try { + await this._fill(); + } catch (err) { + err.partial = this.buf.subarray(this.r, this.w); + throw err; + } + avail = this.w - this.r; } - if (n > this.buf.byteLength) { - return [this.buf.subarray(this.r, this.w), "BufferFull"]; + if (avail === 0 && this.eof) { + return EOF; + } else if (avail < n && this.eof) { + return this.buf.subarray(this.r, this.r + avail); + } else if (avail < n) { + throw new BufferFullError(this.buf.subarray(this.r, this.w)); } - // 0 <= n <= len(this.buf) - let err: BufState; - let avail = this.w - this.r; - if (avail < n) { - // not enough data in buffer - n = avail; - err = this._readErr(); - if (!err) { - err = "BufferFull"; - } - } - return [this.buf.subarray(this.r, this.r + n), err]; + return this.buf.subarray(this.r, this.r + n); } } @@ -371,7 +406,7 @@ export class BufReader implements Reader { export class BufWriter implements Writer { buf: Uint8Array; n: number = 0; - err: null | BufState = null; + err: Error | null = null; /** return new BufWriter unless w is BufWriter */ static create(w: Writer, size = DEFAULT_BUF_SIZE): BufWriter { @@ -400,34 +435,27 @@ export class BufWriter implements Writer { } /** Flush writes any buffered data to the underlying io.Writer. */ - async flush(): Promise<BufState> { - if (this.err != null) { - return this.err; - } - if (this.n == 0) { - return null; - } + async flush(): Promise<void> { + if (this.err !== null) throw this.err; + if (this.n === 0) return; let n: number; - let err: BufState = null; try { n = await this.wr.write(this.buf.subarray(0, this.n)); } catch (e) { - err = e; + this.err = e; + throw e; } - if (n < this.n && err == null) { - err = "ShortWrite"; - } - - if (err != null) { - if (n > 0 && n < this.n) { + if (n < this.n) { + if (n > 0) { this.buf.copyWithin(0, n, this.n); + this.n -= n; } - this.n -= n; - this.err = err; - return err; + this.err = new Error("Short write"); + throw this.err; } + this.n = 0; } @@ -447,16 +475,20 @@ export class BufWriter implements Writer { * Returns the number of bytes written. */ async write(p: Uint8Array): Promise<number> { + if (this.err !== null) throw this.err; + if (p.length === 0) return 0; + let nn = 0; let n: number; - while (p.byteLength > this.available() && !this.err) { - if (this.buffered() == 0) { + while (p.byteLength > this.available()) { + if (this.buffered() === 0) { // Large write, empty buffer. // Write directly from p to avoid copy. try { n = await this.wr.write(p); } catch (e) { this.err = e; + throw e; } } else { n = copyBytes(this.buf, p, this.n); @@ -466,9 +498,7 @@ export class BufWriter implements Writer { nn += n; p = p.subarray(n); } - if (this.err) { - throw this.err; - } + n = copyBytes(this.buf, p, this.n); this.n += n; nn += n; diff --git a/io/bufio_test.ts b/io/bufio_test.ts index d1db119d8..84b6f9142 100644 --- a/io/bufio_test.ts +++ b/io/bufio_test.ts @@ -6,14 +6,30 @@ const { Buffer } = Deno; type Reader = Deno.Reader; type ReadResult = Deno.ReadResult; -import { test } from "../testing/mod.ts"; -import { assert, assertEquals } from "../testing/asserts.ts"; -import { BufReader, BufWriter } from "./bufio.ts"; +import { test, runIfMain } from "../testing/mod.ts"; +import { + assert, + assertEquals, + assertNotEquals, + fail +} from "../testing/asserts.ts"; +import { + BufReader, + BufWriter, + EOF, + BufferFullError, + UnexpectedEOFError +} from "./bufio.ts"; import * as iotest from "./iotest.ts"; import { charCode, copyBytes, stringsReader } from "./util.ts"; const encoder = new TextEncoder(); +function assertNotEOF<T extends {}>(val: T | EOF): T { + assertNotEquals(val, EOF); + return val as T; +} + async function readBytes(buf: BufReader): Promise<string> { const b = new Uint8Array(1000); let nb = 0; @@ -129,17 +145,20 @@ test(async function bufioBufferFull(): Promise<void> { const longString = "And now, hello, world! It is the time for all good men to come to the aid of their party"; const buf = new BufReader(stringsReader(longString), MIN_READ_BUFFER_SIZE); - let [line, err] = await buf.readSlice(charCode("!")); - const decoder = new TextDecoder(); - let actual = decoder.decode(line); - assertEquals(err, "BufferFull"); - assertEquals(actual, "And now, hello, "); - [line, err] = await buf.readSlice(charCode("!")); - actual = decoder.decode(line); + try { + await buf.readSlice(charCode("!")); + fail("readSlice should throw"); + } catch (err) { + assert(err instanceof BufferFullError); + assert(err.partial instanceof Uint8Array); + assertEquals(decoder.decode(err.partial), "And now, hello, "); + } + + const line = assertNotEOF(await buf.readSlice(charCode("!"))); + const actual = decoder.decode(line); assertEquals(actual, "world!"); - assert(err == null); }); const testInput = encoder.encode( @@ -178,14 +197,12 @@ async function testReadLine(input: Uint8Array): Promise<void> { let reader = new TestReader(input, stride); let l = new BufReader(reader, input.byteLength + 1); while (true) { - let [line, isPrefix, err] = await l.readLine(); - if (line.byteLength > 0 && err != null) { - throw Error("readLine returned both data and error"); - } - assertEquals(isPrefix, false); - if (err == "EOF") { + const r = await l.readLine(); + if (r === EOF) { break; } + const { line, more } = r; + assertEquals(more, false); // eslint-disable-next-line @typescript-eslint/restrict-plus-operands let want = testOutput.subarray(done, done + line.byteLength); assertEquals( @@ -218,56 +235,51 @@ test(async function bufioPeek(): Promise<void> { MIN_READ_BUFFER_SIZE ); - let [actual, err] = await buf.peek(1); + let actual = assertNotEOF(await buf.peek(1)); assertEquals(decoder.decode(actual), "a"); - assert(err == null); - [actual, err] = await buf.peek(4); + actual = assertNotEOF(await buf.peek(4)); assertEquals(decoder.decode(actual), "abcd"); - assert(err == null); - [actual, err] = await buf.peek(32); - assertEquals(decoder.decode(actual), "abcdefghijklmnop"); - assertEquals(err, "BufferFull"); + try { + await buf.peek(32); + fail("peek() should throw"); + } catch (err) { + assert(err instanceof BufferFullError); + assert(err.partial instanceof Uint8Array); + assertEquals(decoder.decode(err.partial), "abcdefghijklmnop"); + } await buf.read(p.subarray(0, 3)); assertEquals(decoder.decode(p.subarray(0, 3)), "abc"); - [actual, err] = await buf.peek(1); + actual = assertNotEOF(await buf.peek(1)); assertEquals(decoder.decode(actual), "d"); - assert(err == null); - [actual, err] = await buf.peek(1); + actual = assertNotEOF(await buf.peek(1)); assertEquals(decoder.decode(actual), "d"); - assert(err == null); - [actual, err] = await buf.peek(1); + actual = assertNotEOF(await buf.peek(1)); assertEquals(decoder.decode(actual), "d"); - assert(err == null); - [actual, err] = await buf.peek(2); + actual = assertNotEOF(await buf.peek(2)); assertEquals(decoder.decode(actual), "de"); - assert(err == null); let { eof } = await buf.read(p.subarray(0, 3)); assertEquals(decoder.decode(p.subarray(0, 3)), "def"); assert(!eof); - assert(err == null); - [actual, err] = await buf.peek(4); + actual = assertNotEOF(await buf.peek(4)); assertEquals(decoder.decode(actual), "ghij"); - assert(err == null); await buf.read(p); assertEquals(decoder.decode(p), "ghijklmnop"); - [actual, err] = await buf.peek(0); + actual = assertNotEOF(await buf.peek(0)); assertEquals(decoder.decode(actual), ""); - assert(err == null); - [actual, err] = await buf.peek(1); - assertEquals(decoder.decode(actual), ""); - assert(err == "EOF"); + const r = await buf.peek(1); + assert(r === EOF); /* TODO // Test for issue 3022, not exposing a reader's error on a successful Peek. buf = NewReaderSize(dataAndEOFReader("abcd"), 32) @@ -328,16 +340,22 @@ test(async function bufReaderReadFull(): Promise<void> { const bufr = new BufReader(data, 3); { const buf = new Uint8Array(6); - const [nread, err] = await bufr.readFull(buf); - assertEquals(nread, 6); - assert(!err); + const r = assertNotEOF(await bufr.readFull(buf)); + assertEquals(r, buf); assertEquals(dec.decode(buf), "Hello "); } { const buf = new Uint8Array(6); - const [nread, err] = await bufr.readFull(buf); - assertEquals(nread, 5); - assertEquals(err, "EOF"); - assertEquals(dec.decode(buf.subarray(0, 5)), "World"); + try { + await bufr.readFull(buf); + fail("readFull() should throw"); + } catch (err) { + assert(err instanceof UnexpectedEOFError); + assert(err.partial instanceof Uint8Array); + assertEquals(err.partial.length, 5); + assertEquals(dec.decode(buf.subarray(0, 5)), "World"); + } } }); + +runIfMain(import.meta); |
