diff options
Diffstat (limited to 'std/ws/mod.ts')
-rw-r--r-- | std/ws/mod.ts | 127 |
1 files changed, 79 insertions, 48 deletions
diff --git a/std/ws/mod.ts b/std/ws/mod.ts index 96ba4df62..217ebc8b5 100644 --- a/std/ws/mod.ts +++ b/std/ws/mod.ts @@ -10,6 +10,7 @@ import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts"; import { Sha1 } from "./sha1.ts"; import { writeResponse } from "../http/server.ts"; import { TextProtoReader } from "../textproto/mod.ts"; +import { Deferred, deferred } from "../util/async.ts"; export enum OpCode { Continue = 0x0, @@ -193,21 +194,30 @@ function createMask(): Uint8Array { } class WebSocketImpl implements WebSocket { + readonly conn: Conn; private readonly mask?: Uint8Array; private readonly bufReader: BufReader; private readonly bufWriter: BufWriter; + private sendQueue: Array<{ + frame: WebSocketFrame; + d: Deferred<void>; + }> = []; - constructor( - readonly conn: Conn, - opts: { - bufReader?: BufReader; - bufWriter?: BufWriter; - mask?: Uint8Array; - } - ) { - this.mask = opts.mask; - this.bufReader = opts.bufReader || new BufReader(conn); - this.bufWriter = opts.bufWriter || new BufWriter(conn); + constructor({ + conn, + bufReader, + bufWriter, + mask + }: { + conn: Conn; + bufReader?: BufReader; + bufWriter?: BufWriter; + mask?: Uint8Array; + }) { + this.conn = conn; + this.mask = mask; + this.bufReader = bufReader || new BufReader(conn); + this.bufWriter = bufWriter || new BufWriter(conn); } async *receive(): AsyncIterableIterator<WebSocketEvent> { @@ -250,14 +260,11 @@ class WebSocketImpl implements WebSocket { yield { code, reason }; return; case OpCode.Ping: - await writeFrame( - { - opcode: OpCode.Pong, - payload: frame.payload, - isLastFrame: true - }, - this.bufWriter - ); + await this.enqueue({ + opcode: OpCode.Pong, + payload: frame.payload, + isLastFrame: true + }); yield ["ping", frame.payload] as WebSocketPingEvent; break; case OpCode.Pong: @@ -268,6 +275,27 @@ class WebSocketImpl implements WebSocket { } } + private dequeue(): void { + const [e] = this.sendQueue; + if (!e) return; + writeFrame(e.frame, this.bufWriter) + .then(() => e.d.resolve()) + .catch(e => e.d.reject(e)) + .finally(() => { + this.sendQueue.shift(); + this.dequeue(); + }); + } + + private enqueue(frame: WebSocketFrame): Promise<void> { + const d = deferred<void>(); + this.sendQueue.push({ d, frame }); + if (this.sendQueue.length === 1) { + this.dequeue(); + } + return d; + } + async send(data: WebSocketMessage): Promise<void> { if (this.isClosed) { throw new SocketClosedError("socket has been closed"); @@ -276,28 +304,24 @@ class WebSocketImpl implements WebSocket { typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame; const payload = typeof data === "string" ? encode(data) : data; const isLastFrame = true; - await writeFrame( - { - isLastFrame, - opcode, - payload, - mask: this.mask - }, - this.bufWriter - ); + const frame = { + isLastFrame, + opcode, + payload, + mask: this.mask + }; + return this.enqueue(frame); } async ping(data: WebSocketMessage = ""): Promise<void> { const payload = typeof data === "string" ? encode(data) : data; - await writeFrame( - { - isLastFrame: true, - opcode: OpCode.Ping, - mask: this.mask, - payload - }, - this.bufWriter - ); + const frame = { + isLastFrame: true, + opcode: OpCode.Ping, + mask: this.mask, + payload + }; + return this.enqueue(frame); } private _isClosed = false; @@ -317,15 +341,12 @@ class WebSocketImpl implements WebSocket { } else { payload = new Uint8Array(header); } - await writeFrame( - { - isLastFrame: true, - opcode: OpCode.Close, - mask: this.mask, - payload - }, - this.bufWriter - ); + await this.enqueue({ + isLastFrame: true, + opcode: OpCode.Close, + mask: this.mask, + payload + }); } catch (e) { throw e; } finally { @@ -380,7 +401,7 @@ export async function acceptWebSocket(req: { }): Promise<WebSocket> { const { conn, headers, bufReader, bufWriter } = req; if (acceptable(req)) { - const sock = new WebSocketImpl(conn, { bufReader, bufWriter }); + const sock = new WebSocketImpl({ conn, bufReader, bufWriter }); const secKey = headers.get("sec-websocket-key"); if (typeof secKey !== "string") { throw new Error("sec-websocket-key is not provided"); @@ -499,9 +520,19 @@ export async function connectWebSocket( conn.close(); throw err; } - return new WebSocketImpl(conn, { + return new WebSocketImpl({ + conn, bufWriter, bufReader, mask: createMask() }); } + +export function createWebSocket(params: { + conn: Conn; + bufWriter?: BufWriter; + bufReader?: BufReader; + mask?: Uint8Array; +}): WebSocket { + return new WebSocketImpl(params); +} |