summaryrefslogtreecommitdiff
path: root/ws/mod.ts
diff options
context:
space:
mode:
authorYusuke Sakurai <kerokerokerop@gmail.com>2019-05-15 04:19:12 +0900
committerRyan Dahl <ry@tinyclouds.org>2019-05-14 15:19:11 -0400
commite3e9269c76299df99975e17a04b4d1b1ca39dfcb (patch)
treec0bab4773a25589edaea12287c6a8422bd5208f1 /ws/mod.ts
parenta3de8c3d8a376049a37f8193c8538acc0d7a88f3 (diff)
feat: ws client (denoland/deno_std#394)
Original: https://github.com/denoland/deno_std/commit/782e3f690ffb9ee0dd89a5a64a3f2b753899719b
Diffstat (limited to 'ws/mod.ts')
-rw-r--r--ws/mod.ts249
1 files changed, 177 insertions, 72 deletions
diff --git a/ws/mod.ts b/ws/mod.ts
index 65dc142ca..3e20e2d3b 100644
--- a/ws/mod.ts
+++ b/ws/mod.ts
@@ -1,11 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-const { Buffer } = Deno;
+
+import { decode, encode } from "../strings/strings.ts";
+
type Conn = Deno.Conn;
type Writer = Deno.Writer;
import { BufReader, BufWriter } from "../io/bufio.ts";
import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts";
+import { TextProtoReader } from "../textproto/mod.ts";
export enum OpCode {
Continue = 0x0,
@@ -70,13 +73,19 @@ export interface WebSocketFrame {
}
export interface WebSocket {
+ readonly conn: Conn;
readonly isClosed: boolean;
+
receive(): AsyncIterableIterator<WebSocketEvent>;
+
send(data: WebSocketMessage): Promise<void>;
+
ping(data?: WebSocketMessage): Promise<void>;
+
close(code: number, reason?: string): Promise<void>;
}
+/** Unmask masked websocket payload */
export function unmask(payload: Uint8Array, mask?: Uint8Array): void {
if (mask) {
for (let i = 0, len = payload.length; i < len; i++) {
@@ -85,6 +94,7 @@ export function unmask(payload: Uint8Array, mask?: Uint8Array): void {
}
}
+/** Write websocket frame to given writer */
export async function writeFrame(
frame: WebSocketFrame,
writer: Writer
@@ -92,6 +102,11 @@ export async function writeFrame(
const payloadLength = frame.payload.byteLength;
let header: Uint8Array;
const hasMask = frame.mask ? 0x80 : 0;
+ if (frame.mask && frame.mask.byteLength !== 4) {
+ throw new Error(
+ "invalid mask. mask must be 4 bytes: length=" + frame.mask.byteLength
+ );
+ }
if (payloadLength < 126) {
header = new Uint8Array([0x80 | frame.opcode, hasMask | payloadLength]);
} else if (payloadLength < 0xffff) {
@@ -108,13 +123,18 @@ export async function writeFrame(
...sliceLongToBytes(payloadLength)
]);
}
+ if (frame.mask) {
+ header = append(header, frame.mask);
+ }
unmask(frame.payload, frame.mask);
- const bytes = append(header, frame.payload);
- const w = new BufWriter(writer);
- await w.write(bytes);
- await w.flush();
+ header = append(header, frame.payload);
+ const w = BufWriter.create(writer);
+ await w.write(header);
+ const err = await w.flush();
+ if (err) throw err;
}
+/** Read websocket frame from given BufReader */
export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
let b = await buf.readByte();
let isLastFrame = false;
@@ -155,62 +175,38 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
};
}
-export async function* receiveFrame(
- conn: Conn
-): AsyncIterableIterator<WebSocketFrame> {
- let receiving = true;
- const isLastFrame = true;
- const reader = new BufReader(conn);
- while (receiving) {
- const frame = await readFrame(reader);
- const { opcode, payload } = frame;
- switch (opcode) {
- case OpCode.TextFrame:
- case OpCode.BinaryFrame:
- case OpCode.Continue:
- yield frame;
- break;
- case OpCode.Close:
- await writeFrame(
- {
- opcode,
- payload,
- isLastFrame
- },
- conn
- );
- conn.close();
- yield frame;
- receiving = false;
- break;
- case OpCode.Ping:
- await writeFrame(
- {
- payload,
- isLastFrame,
- opcode: OpCode.Pong
- },
- conn
- );
- yield frame;
- break;
- case OpCode.Pong:
- yield frame;
- break;
- default:
- }
- }
+// Create client-to-server mask, random 32bit number
+function createMask(): Uint8Array {
+ // TODO: use secure and immutable random function. Crypto.getRandomValues()
+ const arr = Array.from({ length: 4 }).map(
+ (): number => Math.round(Math.random() * 0xff)
+ );
+ return new Uint8Array(arr);
}
class WebSocketImpl implements WebSocket {
- encoder = new TextEncoder();
+ private readonly mask?: Uint8Array;
+ private readonly bufReader: BufReader;
+ private readonly bufWriter: BufWriter;
- constructor(private conn: Conn, private mask?: Uint8Array) {}
+ constructor(
+ readonly conn: Conn,
+ opts: {
+ bufReader?: BufReader;
+ bufWriter?: BufWriter;
+ mask?: Uint8Array;
+ } = {}
+ ) {
+ this.mask = opts.mask || createMask();
+ this.bufReader = opts.bufReader || new BufReader(conn);
+ this.bufWriter = opts.bufWriter || new BufWriter(conn);
+ }
async *receive(): AsyncIterableIterator<WebSocketEvent> {
let frames: WebSocketFrame[] = [];
let payloadsLength = 0;
- for await (const frame of receiveFrame(this.conn)) {
+ while (true) {
+ const frame = await readFrame(this.bufReader);
unmask(frame.payload, frame.mask);
switch (frame.opcode) {
case OpCode.TextFrame:
@@ -227,7 +223,7 @@ class WebSocketImpl implements WebSocket {
}
if (frames[0].opcode === OpCode.TextFrame) {
// text
- yield new Buffer(concat).toString();
+ yield decode(concat);
} else {
// binary
yield concat;
@@ -237,14 +233,23 @@ class WebSocketImpl implements WebSocket {
}
break;
case OpCode.Close:
- const code = (frame.payload[0] << 16) | frame.payload[1];
- const reason = new Buffer(
+ // [0x12, 0x34] -> 0x1234
+ const code = (frame.payload[0] << 8) | frame.payload[1];
+ const reason = decode(
frame.payload.subarray(2, frame.payload.length)
- ).toString();
- this._isClosed = true;
+ );
+ await this.close(code, reason);
yield { code, reason };
return;
case OpCode.Ping:
+ await writeFrame(
+ {
+ opcode: OpCode.Pong,
+ payload: frame.payload,
+ isLastFrame: true
+ },
+ this.bufWriter
+ );
yield ["ping", frame.payload] as WebSocketPingEvent;
break;
case OpCode.Pong:
@@ -261,7 +266,7 @@ class WebSocketImpl implements WebSocket {
}
const opcode =
typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame;
- const payload = typeof data === "string" ? this.encoder.encode(data) : data;
+ const payload = typeof data === "string" ? encode(data) : data;
const isLastFrame = true;
await writeFrame(
{
@@ -270,20 +275,20 @@ class WebSocketImpl implements WebSocket {
payload,
mask: this.mask
},
- this.conn
+ this.bufWriter
);
}
- async ping(data: WebSocketMessage): Promise<void> {
- const payload = typeof data === "string" ? this.encoder.encode(data) : data;
+ async ping(data: WebSocketMessage = ""): Promise<void> {
+ const payload = typeof data === "string" ? encode(data) : data;
await writeFrame(
{
isLastFrame: true,
- opcode: OpCode.Close,
+ opcode: OpCode.Ping,
mask: this.mask,
payload
},
- this.conn
+ this.bufWriter
);
}
@@ -297,7 +302,7 @@ class WebSocketImpl implements WebSocket {
const header = [code >>> 8, code & 0x00ff];
let payload: Uint8Array;
if (reason) {
- const reasonBytes = this.encoder.encode(reason);
+ const reasonBytes = encode(reason);
payload = new Uint8Array(2 + reasonBytes.byteLength);
payload.set(header);
payload.set(reasonBytes, 2);
@@ -311,7 +316,7 @@ class WebSocketImpl implements WebSocket {
mask: this.mask,
payload
},
- this.conn
+ this.bufWriter
);
} catch (e) {
throw e;
@@ -320,11 +325,10 @@ class WebSocketImpl implements WebSocket {
}
}
- private ensureSocketClosed(): Error {
+ private ensureSocketClosed(): void {
if (this.isClosed) {
return;
}
-
try {
this.conn.close();
} catch (e) {
@@ -335,16 +339,20 @@ class WebSocketImpl implements WebSocket {
}
}
+/** Return whether given headers is acceptable for websocket */
export function acceptable(req: { headers: Headers }): boolean {
+ const secKey = req.headers.get("sec-websocket-key");
return (
req.headers.get("upgrade") === "websocket" &&
req.headers.has("sec-websocket-key") &&
- req.headers.get("sec-websocket-key").length > 0
+ typeof secKey === "string" &&
+ secKey.length > 0
);
}
const kGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+/** Create sec-websocket-accept header value with given nonce */
export function createSecAccept(nonce: string): string {
const sha1 = new Sha1();
sha1.update(nonce + kGUID);
@@ -352,16 +360,22 @@ export function createSecAccept(nonce: string): string {
return btoa(String.fromCharCode.apply(String, bytes));
}
+/** Upgrade given TCP connection into websocket connection */
export async function acceptWebSocket(req: {
conn: Conn;
+ bufWriter: BufWriter;
+ bufReader: BufReader;
headers: Headers;
}): Promise<WebSocket> {
- const { conn, headers } = req;
+ const { conn, headers, bufReader, bufWriter } = req;
if (acceptable(req)) {
- const sock = new WebSocketImpl(conn);
+ const sock = new WebSocketImpl(conn, { bufReader, bufWriter });
const secKey = headers.get("sec-websocket-key");
+ if (typeof secKey !== "string") {
+ throw new Error("sec-websocket-key is not provided");
+ }
const secAccept = createSecAccept(secKey);
- await writeResponse(conn, {
+ await writeResponse(bufWriter, {
status: 101,
headers: new Headers({
Upgrade: "websocket",
@@ -373,3 +387,94 @@ export async function acceptWebSocket(req: {
}
throw new Error("request is not acceptable");
}
+
+const kSecChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-.~_";
+
+/** Create WebSocket-Sec-Key. Base64 encoded 16 bytes string */
+export function createSecKey(): string {
+ let key = "";
+ for (let i = 0; i < 16; i++) {
+ const j = Math.round(Math.random() * kSecChars.length);
+ key += kSecChars[j];
+ }
+ return btoa(key);
+}
+
+/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */
+export async function connectWebSocket(
+ endpoint: string,
+ headers: Headers = new Headers()
+): Promise<WebSocket> {
+ const url = new URL(endpoint);
+ const { hostname, pathname, searchParams } = url;
+ let port = url.port;
+ if (!url.port) {
+ if (url.protocol === "http" || url.protocol === "ws") {
+ port = "80";
+ } else if (url.protocol === "https" || url.protocol === "wss") {
+ throw new Error("currently https/wss is not supported");
+ }
+ }
+ const conn = await Deno.dial("tcp", `${hostname}:${port}`);
+ const abortHandshake = (err: Error): void => {
+ conn.close();
+ throw err;
+ };
+ const bufWriter = new BufWriter(conn);
+ const bufReader = new BufReader(conn);
+ await bufWriter.write(
+ encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`)
+ );
+ const key = createSecKey();
+ if (!headers.has("host")) {
+ headers.set("host", hostname);
+ }
+ headers.set("upgrade", "websocket");
+ headers.set("connection", "upgrade");
+ headers.set("sec-websocket-key", key);
+ let headerStr = "";
+ for (const [key, value] of headers) {
+ headerStr += `${key}: ${value}\r\n`;
+ }
+ headerStr += "\r\n";
+ await bufWriter.write(encode(headerStr));
+ let err, statusLine, responseHeaders;
+ err = await bufWriter.flush();
+ if (err) {
+ throw new Error("ws: failed to send handshake: " + err);
+ }
+ const tpReader = new TextProtoReader(bufReader);
+ [statusLine, err] = await tpReader.readLine();
+ if (err) {
+ abortHandshake(new Error("ws: failed to read status line: " + err));
+ }
+ const m = statusLine.match(/^(.+?) (.+?) (.+?)$/);
+ if (!m) {
+ abortHandshake(new Error("ws: invalid status line: " + statusLine));
+ }
+ const [_, version, statusCode] = m;
+ if (version !== "HTTP/1.1" || statusCode !== "101") {
+ abortHandshake(
+ new Error(
+ `ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}`
+ )
+ );
+ }
+ [responseHeaders, err] = await tpReader.readMIMEHeader();
+ if (err) {
+ abortHandshake(new Error("ws: failed to parse response headers: " + err));
+ }
+ const expectedSecAccept = createSecAccept(key);
+ const secAccept = responseHeaders.get("sec-websocket-accept");
+ if (secAccept !== expectedSecAccept) {
+ abortHandshake(
+ new Error(
+ `ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}`
+ )
+ );
+ }
+ return new WebSocketImpl(conn, {
+ bufWriter,
+ bufReader
+ });
+}