diff options
Diffstat (limited to 'std/ws/mod.ts')
-rw-r--r-- | std/ws/mod.ts | 93 |
1 files changed, 62 insertions, 31 deletions
diff --git a/std/ws/mod.ts b/std/ws/mod.ts index 30cbc2935..5a8f0bd2e 100644 --- a/std/ws/mod.ts +++ b/std/ws/mod.ts @@ -2,15 +2,15 @@ import { decode, encode } from "../strings/mod.ts"; import { hasOwnProperty } from "../util/has_own_property.ts"; - -type Conn = Deno.Conn; -type Writer = Deno.Writer; import { BufReader, BufWriter, UnexpectedEOFError } from "../io/bufio.ts"; import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts"; import { Sha1 } from "./sha1.ts"; import { writeResponse } from "../http/server.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { Deferred, deferred } from "../util/async.ts"; +import { assertNotEOF } from "../testing/asserts.ts"; +import Conn = Deno.Conn; +import Writer = Deno.Writer; export enum OpCode { Continue = 0x0, @@ -24,8 +24,8 @@ export enum OpCode { export type WebSocketEvent = | string | Uint8Array - | WebSocketCloseEvent - | WebSocketPingEvent + | WebSocketCloseEvent // Received after closing connection finished. + | WebSocketPingEvent // Received after pong frame responded. | WebSocketPongEvent; export interface WebSocketCloseEvent { @@ -71,7 +71,11 @@ export function append(a: Uint8Array, b: Uint8Array): Uint8Array { return output; } -export class SocketClosedError extends Error {} +export class SocketClosedError extends Error { + constructor(msg = "Socket has already been closed") { + super(msg); + } +} export interface WebSocketFrame { isLastFrame: boolean; @@ -86,11 +90,26 @@ export interface WebSocket { receive(): AsyncIterableIterator<WebSocketEvent>; + /** + * @throws SocketClosedError + */ send(data: WebSocketMessage): Promise<void>; + /** + * @param data + * @throws SocketClosedError + */ ping(data?: WebSocketMessage): Promise<void>; + /** Close connection after sending close frame to peer. + * This is canonical way of disconnection but it may hang because of peer's response delay. + * @throws SocketClosedError + */ close(code: number, reason?: string): Promise<void>; + + /** Close connection forcely without sending close frame to peer. + * This is basically undesirable way of disconnection. Use carefully. */ + closeForce(): void; } /** Unmask masked websocket payload */ @@ -141,10 +160,12 @@ export async function writeFrame( await w.flush(); } -/** Read websocket frame from given BufReader */ +/** Read websocket frame from given BufReader + * @throws UnexpectedEOFError When peer closed connection without close frame + * @throws Error Frame is invalid + */ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> { - let b = await buf.readByte(); - if (b === Deno.EOF) throw new UnexpectedEOFError(); + let b = assertNotEOF(await buf.readByte()); let isLastFrame = false; switch (b >>> 4) { case 0b1000: @@ -158,28 +179,25 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> { } const opcode = b & 0x0f; // has_mask & payload - b = await buf.readByte(); - if (b === Deno.EOF) throw new UnexpectedEOFError(); + b = assertNotEOF(await buf.readByte()); const hasMask = b >>> 7; let payloadLength = b & 0b01111111; if (payloadLength === 126) { - const l = await readShort(buf); - if (l === Deno.EOF) throw new UnexpectedEOFError(); + const l = assertNotEOF(await readShort(buf)); payloadLength = l; } else if (payloadLength === 127) { - const l = await readLong(buf); - if (l === Deno.EOF) throw new UnexpectedEOFError(); + const l = assertNotEOF(await readLong(buf)); payloadLength = Number(l); } // mask - let mask; + let mask: Uint8Array | undefined; if (hasMask) { mask = new Uint8Array(4); - await buf.readFull(mask); + assertNotEOF(await buf.readFull(mask)); } // payload const payload = new Uint8Array(payloadLength); - await buf.readFull(payload); + assertNotEOF(await buf.readFull(payload)); return { isLastFrame, opcode, @@ -223,8 +241,14 @@ class WebSocketImpl implements WebSocket { async *receive(): AsyncIterableIterator<WebSocketEvent> { let frames: WebSocketFrame[] = []; let payloadsLength = 0; - while (true) { - const frame = await readFrame(this.bufReader); + while (!this._isClosed) { + let frame: WebSocketFrame; + try { + frame = await readFrame(this.bufReader); + } catch (e) { + this.ensureSocketClosed(); + break; + } unmask(frame.payload, frame.mask); switch (frame.opcode) { case OpCode.TextFrame: @@ -276,11 +300,13 @@ class WebSocketImpl implements WebSocket { } private dequeue(): void { - const [e] = this.sendQueue; - if (!e) return; - writeFrame(e.frame, this.bufWriter) - .then(() => e.d.resolve()) - .catch(e => e.d.reject(e)) + const [entry] = this.sendQueue; + if (!entry) return; + if (this._isClosed) return; + const { d, frame } = entry; + writeFrame(frame, this.bufWriter) + .then(() => d.resolve()) + .catch(e => d.reject(e)) .finally(() => { this.sendQueue.shift(); this.dequeue(); @@ -288,6 +314,9 @@ class WebSocketImpl implements WebSocket { } private enqueue(frame: WebSocketFrame): Promise<void> { + if (this._isClosed) { + throw new SocketClosedError(); + } const d = deferred<void>(); this.sendQueue.push({ d, frame }); if (this.sendQueue.length === 1) { @@ -297,9 +326,6 @@ class WebSocketImpl implements WebSocket { } async send(data: WebSocketMessage): Promise<void> { - if (this.isClosed) { - throw new SocketClosedError("socket has been closed"); - } const opcode = typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame; const payload = typeof data === "string" ? encode(data) : data; @@ -354,16 +380,21 @@ class WebSocketImpl implements WebSocket { } } + closeForce(): void { + this.ensureSocketClosed(); + } + private ensureSocketClosed(): void { - if (this.isClosed) { - return; - } + if (this.isClosed) return; try { this.conn.close(); } catch (e) { console.error(e); } finally { this._isClosed = true; + const rest = this.sendQueue; + this.sendQueue = []; + rest.forEach(e => e.d.reject(new SocketClosedError())); } } } |