summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--std/http/server_test.ts8
-rw-r--r--std/io/bufio_test.ts9
-rw-r--r--std/testing/asserts.ts5
-rw-r--r--std/textproto/reader_test.ts9
-rw-r--r--std/ws/mod.ts93
-rw-r--r--std/ws/test.ts68
6 files changed, 136 insertions, 56 deletions
diff --git a/std/http/server_test.ts b/std/http/server_test.ts
index 89aaca4ea..b145b8353 100644
--- a/std/http/server_test.ts
+++ b/std/http/server_test.ts
@@ -12,7 +12,8 @@ import {
assertEquals,
assertNotEquals,
assertThrowsAsync,
- AssertionError
+ AssertionError,
+ assertNotEOF
} from "../testing/asserts.ts";
import {
Response,
@@ -32,11 +33,6 @@ import {
import { delay, deferred } from "../util/async.ts";
import { StringReader } from "../io/readers.ts";
-function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
- assertNotEquals(val, Deno.EOF);
- return val as T;
-}
-
interface ResponseTest {
response: Response;
raw: string;
diff --git a/std/io/bufio_test.ts b/std/io/bufio_test.ts
index 4f4bd48b1..75bd7b40e 100644
--- a/std/io/bufio_test.ts
+++ b/std/io/bufio_test.ts
@@ -8,8 +8,8 @@ type Reader = Deno.Reader;
import {
assert,
assertEquals,
- assertNotEquals,
- fail
+ fail,
+ assertNotEOF
} from "../testing/asserts.ts";
import {
BufReader,
@@ -24,11 +24,6 @@ import { charCode, copyBytes, stringsReader } from "./util.ts";
const encoder = new TextEncoder();
-function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
- assertNotEquals(val, Deno.EOF);
- return val as T;
-}
-
async function readBytes(buf: BufReader): Promise<string> {
const b = new Uint8Array(1000);
let nb = 0;
diff --git a/std/testing/asserts.ts b/std/testing/asserts.ts
index f2877eb8d..d562447c8 100644
--- a/std/testing/asserts.ts
+++ b/std/testing/asserts.ts
@@ -378,3 +378,8 @@ export function unimplemented(msg?: string): never {
export function unreachable(): never {
throw new AssertionError("unreachable");
}
+
+export function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
+ assertNotEquals(val, Deno.EOF);
+ return val as T;
+}
diff --git a/std/textproto/reader_test.ts b/std/textproto/reader_test.ts
index 8f4b07242..32182ce65 100644
--- a/std/textproto/reader_test.ts
+++ b/std/textproto/reader_test.ts
@@ -9,16 +9,11 @@ import { stringsReader } from "../io/util.ts";
import {
assert,
assertEquals,
- assertNotEquals,
- assertThrows
+ assertThrows,
+ assertNotEOF
} from "../testing/asserts.ts";
const { test } = Deno;
-function assertNotEOF<T extends {}>(val: T | Deno.EOF): T {
- assertNotEquals(val, Deno.EOF);
- return val as T;
-}
-
function reader(s: string): TextProtoReader {
return new TextProtoReader(new BufReader(stringsReader(s)));
}
diff --git a/std/ws/mod.ts b/std/ws/mod.ts
index 30cbc2935..5a8f0bd2e 100644
--- a/std/ws/mod.ts
+++ b/std/ws/mod.ts
@@ -2,15 +2,15 @@
import { decode, encode } from "../strings/mod.ts";
import { hasOwnProperty } from "../util/has_own_property.ts";
-
-type Conn = Deno.Conn;
-type Writer = Deno.Writer;
import { BufReader, BufWriter, UnexpectedEOFError } from "../io/bufio.ts";
import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { Deferred, deferred } from "../util/async.ts";
+import { assertNotEOF } from "../testing/asserts.ts";
+import Conn = Deno.Conn;
+import Writer = Deno.Writer;
export enum OpCode {
Continue = 0x0,
@@ -24,8 +24,8 @@ export enum OpCode {
export type WebSocketEvent =
| string
| Uint8Array
- | WebSocketCloseEvent
- | WebSocketPingEvent
+ | WebSocketCloseEvent // Received after closing connection finished.
+ | WebSocketPingEvent // Received after pong frame responded.
| WebSocketPongEvent;
export interface WebSocketCloseEvent {
@@ -71,7 +71,11 @@ export function append(a: Uint8Array, b: Uint8Array): Uint8Array {
return output;
}
-export class SocketClosedError extends Error {}
+export class SocketClosedError extends Error {
+ constructor(msg = "Socket has already been closed") {
+ super(msg);
+ }
+}
export interface WebSocketFrame {
isLastFrame: boolean;
@@ -86,11 +90,26 @@ export interface WebSocket {
receive(): AsyncIterableIterator<WebSocketEvent>;
+ /**
+ * @throws SocketClosedError
+ */
send(data: WebSocketMessage): Promise<void>;
+ /**
+ * @param data
+ * @throws SocketClosedError
+ */
ping(data?: WebSocketMessage): Promise<void>;
+ /** Close connection after sending close frame to peer.
+ * This is canonical way of disconnection but it may hang because of peer's response delay.
+ * @throws SocketClosedError
+ */
close(code: number, reason?: string): Promise<void>;
+
+ /** Close connection forcely without sending close frame to peer.
+ * This is basically undesirable way of disconnection. Use carefully. */
+ closeForce(): void;
}
/** Unmask masked websocket payload */
@@ -141,10 +160,12 @@ export async function writeFrame(
await w.flush();
}
-/** Read websocket frame from given BufReader */
+/** Read websocket frame from given BufReader
+ * @throws UnexpectedEOFError When peer closed connection without close frame
+ * @throws Error Frame is invalid
+ */
export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
- let b = await buf.readByte();
- if (b === Deno.EOF) throw new UnexpectedEOFError();
+ let b = assertNotEOF(await buf.readByte());
let isLastFrame = false;
switch (b >>> 4) {
case 0b1000:
@@ -158,28 +179,25 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
}
const opcode = b & 0x0f;
// has_mask & payload
- b = await buf.readByte();
- if (b === Deno.EOF) throw new UnexpectedEOFError();
+ b = assertNotEOF(await buf.readByte());
const hasMask = b >>> 7;
let payloadLength = b & 0b01111111;
if (payloadLength === 126) {
- const l = await readShort(buf);
- if (l === Deno.EOF) throw new UnexpectedEOFError();
+ const l = assertNotEOF(await readShort(buf));
payloadLength = l;
} else if (payloadLength === 127) {
- const l = await readLong(buf);
- if (l === Deno.EOF) throw new UnexpectedEOFError();
+ const l = assertNotEOF(await readLong(buf));
payloadLength = Number(l);
}
// mask
- let mask;
+ let mask: Uint8Array | undefined;
if (hasMask) {
mask = new Uint8Array(4);
- await buf.readFull(mask);
+ assertNotEOF(await buf.readFull(mask));
}
// payload
const payload = new Uint8Array(payloadLength);
- await buf.readFull(payload);
+ assertNotEOF(await buf.readFull(payload));
return {
isLastFrame,
opcode,
@@ -223,8 +241,14 @@ class WebSocketImpl implements WebSocket {
async *receive(): AsyncIterableIterator<WebSocketEvent> {
let frames: WebSocketFrame[] = [];
let payloadsLength = 0;
- while (true) {
- const frame = await readFrame(this.bufReader);
+ while (!this._isClosed) {
+ let frame: WebSocketFrame;
+ try {
+ frame = await readFrame(this.bufReader);
+ } catch (e) {
+ this.ensureSocketClosed();
+ break;
+ }
unmask(frame.payload, frame.mask);
switch (frame.opcode) {
case OpCode.TextFrame:
@@ -276,11 +300,13 @@ class WebSocketImpl implements WebSocket {
}
private dequeue(): void {
- const [e] = this.sendQueue;
- if (!e) return;
- writeFrame(e.frame, this.bufWriter)
- .then(() => e.d.resolve())
- .catch(e => e.d.reject(e))
+ const [entry] = this.sendQueue;
+ if (!entry) return;
+ if (this._isClosed) return;
+ const { d, frame } = entry;
+ writeFrame(frame, this.bufWriter)
+ .then(() => d.resolve())
+ .catch(e => d.reject(e))
.finally(() => {
this.sendQueue.shift();
this.dequeue();
@@ -288,6 +314,9 @@ class WebSocketImpl implements WebSocket {
}
private enqueue(frame: WebSocketFrame): Promise<void> {
+ if (this._isClosed) {
+ throw new SocketClosedError();
+ }
const d = deferred<void>();
this.sendQueue.push({ d, frame });
if (this.sendQueue.length === 1) {
@@ -297,9 +326,6 @@ class WebSocketImpl implements WebSocket {
}
async send(data: WebSocketMessage): Promise<void> {
- if (this.isClosed) {
- throw new SocketClosedError("socket has been closed");
- }
const opcode =
typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame;
const payload = typeof data === "string" ? encode(data) : data;
@@ -354,16 +380,21 @@ class WebSocketImpl implements WebSocket {
}
}
+ closeForce(): void {
+ this.ensureSocketClosed();
+ }
+
private ensureSocketClosed(): void {
- if (this.isClosed) {
- return;
- }
+ if (this.isClosed) return;
try {
this.conn.close();
} catch (e) {
console.error(e);
} finally {
this._isClosed = true;
+ const rest = this.sendQueue;
+ this.sendQueue = [];
+ rest.forEach(e => e.d.reject(new SocketClosedError()));
}
}
}
diff --git a/std/ws/test.ts b/std/ws/test.ts
index a8dbc903c..5a0ff9929 100644
--- a/std/ws/test.ts
+++ b/std/ws/test.ts
@@ -13,13 +13,14 @@ import {
readFrame,
unmask,
writeFrame,
- createWebSocket
+ createWebSocket,
+ SocketClosedError
} from "./mod.ts";
import { encode, decode } from "../strings/mod.ts";
-type Writer = Deno.Writer;
-type Reader = Deno.Reader;
-type Conn = Deno.Conn;
-const { Buffer } = Deno;
+import Writer = Deno.Writer;
+import Reader = Deno.Reader;
+import Conn = Deno.Conn;
+import Buffer = Deno.Buffer;
test(async function wsReadUnmaskedTextFrame(): Promise<void> {
// unmasked single text frame with payload "Hello"
@@ -326,3 +327,60 @@ test("WebSocket.send(), WebSocket.ping() should be exclusive", async (): Promise
assertEquals(third.opcode, OpCode.BinaryFrame);
assertEquals(bytes.equal(third.payload, new Uint8Array([3])), true);
});
+
+test("WebSocket should throw SocketClosedError when peer closed connection without close frame", async () => {
+ const buf = new Buffer();
+ const eofReader: Deno.Reader = {
+ async read(_: Uint8Array): Promise<number | Deno.EOF> {
+ return Deno.EOF;
+ }
+ };
+ const conn = dummyConn(eofReader, buf);
+ const sock = createWebSocket({ conn });
+ sock.closeForce();
+ await assertThrowsAsync(() => sock.send("hello"), SocketClosedError);
+ await assertThrowsAsync(() => sock.ping(), SocketClosedError);
+ await assertThrowsAsync(() => sock.close(0), SocketClosedError);
+});
+
+test("WebSocket shouldn't throw UnexpectedEOFError on recive()", async () => {
+ const buf = new Buffer();
+ const eofReader: Deno.Reader = {
+ async read(_: Uint8Array): Promise<number | Deno.EOF> {
+ return Deno.EOF;
+ }
+ };
+ const conn = dummyConn(eofReader, buf);
+ const sock = createWebSocket({ conn });
+ const it = sock.receive();
+ const { value, done } = await it.next();
+ assertEquals(value, undefined);
+ assertEquals(done, true);
+});
+
+test("WebSocket should reject sending promise when connection reset forcely", async () => {
+ const buf = new Buffer();
+ let timer: number | undefined;
+ const lazyWriter: Deno.Writer = {
+ async write(_: Uint8Array): Promise<number> {
+ return new Promise(resolve => {
+ timer = setTimeout(() => resolve(0), 1000);
+ });
+ }
+ };
+ const conn = dummyConn(buf, lazyWriter);
+ const sock = createWebSocket({ conn });
+ const onError = (e: unknown): unknown => e;
+ const p = Promise.all([
+ sock.send("hello").catch(onError),
+ sock.send(new Uint8Array([1, 2])).catch(onError),
+ sock.ping().catch(onError)
+ ]);
+ sock.closeForce();
+ assertEquals(sock.isClosed, true);
+ const [a, b, c] = await p;
+ assert(a instanceof SocketClosedError);
+ assert(b instanceof SocketClosedError);
+ assert(c instanceof SocketClosedError);
+ clearTimeout(timer);
+});