summaryrefslogtreecommitdiff
path: root/std/ws/mod.ts
diff options
context:
space:
mode:
Diffstat (limited to 'std/ws/mod.ts')
-rw-r--r--std/ws/mod.ts127
1 files changed, 79 insertions, 48 deletions
diff --git a/std/ws/mod.ts b/std/ws/mod.ts
index 96ba4df62..217ebc8b5 100644
--- a/std/ws/mod.ts
+++ b/std/ws/mod.ts
@@ -10,6 +10,7 @@ import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts";
import { TextProtoReader } from "../textproto/mod.ts";
+import { Deferred, deferred } from "../util/async.ts";
export enum OpCode {
Continue = 0x0,
@@ -193,21 +194,30 @@ function createMask(): Uint8Array {
}
class WebSocketImpl implements WebSocket {
+ readonly conn: Conn;
private readonly mask?: Uint8Array;
private readonly bufReader: BufReader;
private readonly bufWriter: BufWriter;
+ private sendQueue: Array<{
+ frame: WebSocketFrame;
+ d: Deferred<void>;
+ }> = [];
- constructor(
- readonly conn: Conn,
- opts: {
- bufReader?: BufReader;
- bufWriter?: BufWriter;
- mask?: Uint8Array;
- }
- ) {
- this.mask = opts.mask;
- this.bufReader = opts.bufReader || new BufReader(conn);
- this.bufWriter = opts.bufWriter || new BufWriter(conn);
+ constructor({
+ conn,
+ bufReader,
+ bufWriter,
+ mask
+ }: {
+ conn: Conn;
+ bufReader?: BufReader;
+ bufWriter?: BufWriter;
+ mask?: Uint8Array;
+ }) {
+ this.conn = conn;
+ this.mask = mask;
+ this.bufReader = bufReader || new BufReader(conn);
+ this.bufWriter = bufWriter || new BufWriter(conn);
}
async *receive(): AsyncIterableIterator<WebSocketEvent> {
@@ -250,14 +260,11 @@ class WebSocketImpl implements WebSocket {
yield { code, reason };
return;
case OpCode.Ping:
- await writeFrame(
- {
- opcode: OpCode.Pong,
- payload: frame.payload,
- isLastFrame: true
- },
- this.bufWriter
- );
+ await this.enqueue({
+ opcode: OpCode.Pong,
+ payload: frame.payload,
+ isLastFrame: true
+ });
yield ["ping", frame.payload] as WebSocketPingEvent;
break;
case OpCode.Pong:
@@ -268,6 +275,27 @@ class WebSocketImpl implements WebSocket {
}
}
+ private dequeue(): void {
+ const [e] = this.sendQueue;
+ if (!e) return;
+ writeFrame(e.frame, this.bufWriter)
+ .then(() => e.d.resolve())
+ .catch(e => e.d.reject(e))
+ .finally(() => {
+ this.sendQueue.shift();
+ this.dequeue();
+ });
+ }
+
+ private enqueue(frame: WebSocketFrame): Promise<void> {
+ const d = deferred<void>();
+ this.sendQueue.push({ d, frame });
+ if (this.sendQueue.length === 1) {
+ this.dequeue();
+ }
+ return d;
+ }
+
async send(data: WebSocketMessage): Promise<void> {
if (this.isClosed) {
throw new SocketClosedError("socket has been closed");
@@ -276,28 +304,24 @@ class WebSocketImpl implements WebSocket {
typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame;
const payload = typeof data === "string" ? encode(data) : data;
const isLastFrame = true;
- await writeFrame(
- {
- isLastFrame,
- opcode,
- payload,
- mask: this.mask
- },
- this.bufWriter
- );
+ const frame = {
+ isLastFrame,
+ opcode,
+ payload,
+ mask: this.mask
+ };
+ return this.enqueue(frame);
}
async ping(data: WebSocketMessage = ""): Promise<void> {
const payload = typeof data === "string" ? encode(data) : data;
- await writeFrame(
- {
- isLastFrame: true,
- opcode: OpCode.Ping,
- mask: this.mask,
- payload
- },
- this.bufWriter
- );
+ const frame = {
+ isLastFrame: true,
+ opcode: OpCode.Ping,
+ mask: this.mask,
+ payload
+ };
+ return this.enqueue(frame);
}
private _isClosed = false;
@@ -317,15 +341,12 @@ class WebSocketImpl implements WebSocket {
} else {
payload = new Uint8Array(header);
}
- await writeFrame(
- {
- isLastFrame: true,
- opcode: OpCode.Close,
- mask: this.mask,
- payload
- },
- this.bufWriter
- );
+ await this.enqueue({
+ isLastFrame: true,
+ opcode: OpCode.Close,
+ mask: this.mask,
+ payload
+ });
} catch (e) {
throw e;
} finally {
@@ -380,7 +401,7 @@ export async function acceptWebSocket(req: {
}): Promise<WebSocket> {
const { conn, headers, bufReader, bufWriter } = req;
if (acceptable(req)) {
- const sock = new WebSocketImpl(conn, { bufReader, bufWriter });
+ const sock = new WebSocketImpl({ conn, bufReader, bufWriter });
const secKey = headers.get("sec-websocket-key");
if (typeof secKey !== "string") {
throw new Error("sec-websocket-key is not provided");
@@ -499,9 +520,19 @@ export async function connectWebSocket(
conn.close();
throw err;
}
- return new WebSocketImpl(conn, {
+ return new WebSocketImpl({
+ conn,
bufWriter,
bufReader,
mask: createMask()
});
}
+
+export function createWebSocket(params: {
+ conn: Conn;
+ bufWriter?: BufWriter;
+ bufReader?: BufReader;
+ mask?: Uint8Array;
+}): WebSocket {
+ return new WebSocketImpl(params);
+}