summaryrefslogtreecommitdiff
path: root/ws
diff options
context:
space:
mode:
authorYusuke Sakurai <kerokerokerop@gmail.com>2019-05-15 04:19:12 +0900
committerRyan Dahl <ry@tinyclouds.org>2019-05-14 15:19:11 -0400
commite3e9269c76299df99975e17a04b4d1b1ca39dfcb (patch)
treec0bab4773a25589edaea12287c6a8422bd5208f1 /ws
parenta3de8c3d8a376049a37f8193c8538acc0d7a88f3 (diff)
feat: ws client (denoland/deno_std#394)
Original: https://github.com/denoland/deno_std/commit/782e3f690ffb9ee0dd89a5a64a3f2b753899719b
Diffstat (limited to 'ws')
-rw-r--r--ws/example_client.ts55
-rw-r--r--ws/example_server.ts66
-rw-r--r--ws/mod.ts249
-rw-r--r--ws/test.ts52
4 files changed, 337 insertions, 85 deletions
diff --git a/ws/example_client.ts b/ws/example_client.ts
new file mode 100644
index 000000000..16f37d021
--- /dev/null
+++ b/ws/example_client.ts
@@ -0,0 +1,55 @@
+import {
+ connectWebSocket,
+ isWebSocketCloseEvent,
+ isWebSocketPingEvent,
+ isWebSocketPongEvent
+} from "../ws/mod.ts";
+import { encode } from "../strings/strings.ts";
+import { BufReader } from "../io/bufio.ts";
+import { TextProtoReader } from "../textproto/mod.ts";
+import { blue, green, red, yellow } from "../colors/mod.ts";
+
+const endpoint = Deno.args[1] || "ws://127.0.0.1:8080";
+/** simple websocket cli */
+async function main(): Promise<void> {
+ const sock = await connectWebSocket(endpoint);
+ console.log(green("ws connected! (type 'close' to quit)"));
+ (async function(): Promise<void> {
+ for await (const msg of sock.receive()) {
+ if (typeof msg === "string") {
+ console.log(yellow("< " + msg));
+ } else if (isWebSocketPingEvent(msg)) {
+ console.log(blue("< ping"));
+ } else if (isWebSocketPongEvent(msg)) {
+ console.log(blue("< pong"));
+ } else if (isWebSocketCloseEvent(msg)) {
+ console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
+ }
+ }
+ })();
+ const tpr = new TextProtoReader(new BufReader(Deno.stdin));
+ while (true) {
+ await Deno.stdout.write(encode("> "));
+ const [line, err] = await tpr.readLine();
+ if (err) {
+ console.error(red(`failed to read line from stdin: ${err}`));
+ break;
+ }
+ if (line === "close") {
+ break;
+ } else if (line === "ping") {
+ await sock.ping();
+ } else {
+ await sock.send(line);
+ }
+ // FIXME: Without this, sock.receive() won't resolved though it is readable...
+ await new Promise((resolve): void => setTimeout(resolve, 0));
+ }
+ await sock.close(1000);
+ // FIXME: conn.close() won't shutdown process...
+ Deno.exit(0);
+}
+
+if (import.meta.main) {
+ main();
+}
diff --git a/ws/example_server.ts b/ws/example_server.ts
new file mode 100644
index 000000000..cd51ff94c
--- /dev/null
+++ b/ws/example_server.ts
@@ -0,0 +1,66 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+import { serve } from "../http/server.ts";
+import {
+ acceptWebSocket,
+ isWebSocketCloseEvent,
+ isWebSocketPingEvent,
+ WebSocket
+} from "./mod.ts";
+
+/** websocket echo server */
+const port = Deno.args[1] || "8080";
+async function main(): Promise<void> {
+ console.log(`websocket server is running on :${port}`);
+ for await (const req of serve(`:${port}`)) {
+ const { headers, conn } = req;
+ acceptWebSocket({
+ conn,
+ headers,
+ bufReader: req.r,
+ bufWriter: req.w
+ })
+ .then(
+ async (sock: WebSocket): Promise<void> => {
+ console.log("socket connected!");
+ const it = sock.receive();
+ while (true) {
+ try {
+ const { done, value } = await it.next();
+ if (done) {
+ break;
+ }
+ const ev = value;
+ if (typeof ev === "string") {
+ // text message
+ console.log("ws:Text", ev);
+ await sock.send(ev);
+ } else if (ev instanceof Uint8Array) {
+ // binary message
+ console.log("ws:Binary", ev);
+ } else if (isWebSocketPingEvent(ev)) {
+ const [, body] = ev;
+ // ping
+ console.log("ws:Ping", body);
+ } else if (isWebSocketCloseEvent(ev)) {
+ // close
+ const { code, reason } = ev;
+ console.log("ws:Close", code, reason);
+ }
+ } catch (e) {
+ console.error(`failed to receive frame: ${e}`);
+ await sock.close(1000).catch(console.error);
+ }
+ }
+ }
+ )
+ .catch(
+ (err: Error): void => {
+ console.error(`failed to accept websocket: ${err}`);
+ }
+ );
+ }
+}
+
+if (import.meta.main) {
+ main();
+}
diff --git a/ws/mod.ts b/ws/mod.ts
index 65dc142ca..3e20e2d3b 100644
--- a/ws/mod.ts
+++ b/ws/mod.ts
@@ -1,11 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-const { Buffer } = Deno;
+
+import { decode, encode } from "../strings/strings.ts";
+
type Conn = Deno.Conn;
type Writer = Deno.Writer;
import { BufReader, BufWriter } from "../io/bufio.ts";
import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts";
+import { TextProtoReader } from "../textproto/mod.ts";
export enum OpCode {
Continue = 0x0,
@@ -70,13 +73,19 @@ export interface WebSocketFrame {
}
export interface WebSocket {
+ readonly conn: Conn;
readonly isClosed: boolean;
+
receive(): AsyncIterableIterator<WebSocketEvent>;
+
send(data: WebSocketMessage): Promise<void>;
+
ping(data?: WebSocketMessage): Promise<void>;
+
close(code: number, reason?: string): Promise<void>;
}
+/** Unmask masked websocket payload */
export function unmask(payload: Uint8Array, mask?: Uint8Array): void {
if (mask) {
for (let i = 0, len = payload.length; i < len; i++) {
@@ -85,6 +94,7 @@ export function unmask(payload: Uint8Array, mask?: Uint8Array): void {
}
}
+/** Write websocket frame to given writer */
export async function writeFrame(
frame: WebSocketFrame,
writer: Writer
@@ -92,6 +102,11 @@ export async function writeFrame(
const payloadLength = frame.payload.byteLength;
let header: Uint8Array;
const hasMask = frame.mask ? 0x80 : 0;
+ if (frame.mask && frame.mask.byteLength !== 4) {
+ throw new Error(
+ "invalid mask. mask must be 4 bytes: length=" + frame.mask.byteLength
+ );
+ }
if (payloadLength < 126) {
header = new Uint8Array([0x80 | frame.opcode, hasMask | payloadLength]);
} else if (payloadLength < 0xffff) {
@@ -108,13 +123,18 @@ export async function writeFrame(
...sliceLongToBytes(payloadLength)
]);
}
+ if (frame.mask) {
+ header = append(header, frame.mask);
+ }
unmask(frame.payload, frame.mask);
- const bytes = append(header, frame.payload);
- const w = new BufWriter(writer);
- await w.write(bytes);
- await w.flush();
+ header = append(header, frame.payload);
+ const w = BufWriter.create(writer);
+ await w.write(header);
+ const err = await w.flush();
+ if (err) throw err;
}
+/** Read websocket frame from given BufReader */
export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
let b = await buf.readByte();
let isLastFrame = false;
@@ -155,62 +175,38 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> {
};
}
-export async function* receiveFrame(
- conn: Conn
-): AsyncIterableIterator<WebSocketFrame> {
- let receiving = true;
- const isLastFrame = true;
- const reader = new BufReader(conn);
- while (receiving) {
- const frame = await readFrame(reader);
- const { opcode, payload } = frame;
- switch (opcode) {
- case OpCode.TextFrame:
- case OpCode.BinaryFrame:
- case OpCode.Continue:
- yield frame;
- break;
- case OpCode.Close:
- await writeFrame(
- {
- opcode,
- payload,
- isLastFrame
- },
- conn
- );
- conn.close();
- yield frame;
- receiving = false;
- break;
- case OpCode.Ping:
- await writeFrame(
- {
- payload,
- isLastFrame,
- opcode: OpCode.Pong
- },
- conn
- );
- yield frame;
- break;
- case OpCode.Pong:
- yield frame;
- break;
- default:
- }
- }
+// Create client-to-server mask, random 32bit number
+function createMask(): Uint8Array {
+ // TODO: use secure and immutable random function. Crypto.getRandomValues()
+ const arr = Array.from({ length: 4 }).map(
+ (): number => Math.round(Math.random() * 0xff)
+ );
+ return new Uint8Array(arr);
}
class WebSocketImpl implements WebSocket {
- encoder = new TextEncoder();
+ private readonly mask?: Uint8Array;
+ private readonly bufReader: BufReader;
+ private readonly bufWriter: BufWriter;
- constructor(private conn: Conn, private mask?: Uint8Array) {}
+ constructor(
+ readonly conn: Conn,
+ opts: {
+ bufReader?: BufReader;
+ bufWriter?: BufWriter;
+ mask?: Uint8Array;
+ } = {}
+ ) {
+ this.mask = opts.mask || createMask();
+ this.bufReader = opts.bufReader || new BufReader(conn);
+ this.bufWriter = opts.bufWriter || new BufWriter(conn);
+ }
async *receive(): AsyncIterableIterator<WebSocketEvent> {
let frames: WebSocketFrame[] = [];
let payloadsLength = 0;
- for await (const frame of receiveFrame(this.conn)) {
+ while (true) {
+ const frame = await readFrame(this.bufReader);
unmask(frame.payload, frame.mask);
switch (frame.opcode) {
case OpCode.TextFrame:
@@ -227,7 +223,7 @@ class WebSocketImpl implements WebSocket {
}
if (frames[0].opcode === OpCode.TextFrame) {
// text
- yield new Buffer(concat).toString();
+ yield decode(concat);
} else {
// binary
yield concat;
@@ -237,14 +233,23 @@ class WebSocketImpl implements WebSocket {
}
break;
case OpCode.Close:
- const code = (frame.payload[0] << 16) | frame.payload[1];
- const reason = new Buffer(
+ // [0x12, 0x34] -> 0x1234
+ const code = (frame.payload[0] << 8) | frame.payload[1];
+ const reason = decode(
frame.payload.subarray(2, frame.payload.length)
- ).toString();
- this._isClosed = true;
+ );
+ await this.close(code, reason);
yield { code, reason };
return;
case OpCode.Ping:
+ await writeFrame(
+ {
+ opcode: OpCode.Pong,
+ payload: frame.payload,
+ isLastFrame: true
+ },
+ this.bufWriter
+ );
yield ["ping", frame.payload] as WebSocketPingEvent;
break;
case OpCode.Pong:
@@ -261,7 +266,7 @@ class WebSocketImpl implements WebSocket {
}
const opcode =
typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame;
- const payload = typeof data === "string" ? this.encoder.encode(data) : data;
+ const payload = typeof data === "string" ? encode(data) : data;
const isLastFrame = true;
await writeFrame(
{
@@ -270,20 +275,20 @@ class WebSocketImpl implements WebSocket {
payload,
mask: this.mask
},
- this.conn
+ this.bufWriter
);
}
- async ping(data: WebSocketMessage): Promise<void> {
- const payload = typeof data === "string" ? this.encoder.encode(data) : data;
+ async ping(data: WebSocketMessage = ""): Promise<void> {
+ const payload = typeof data === "string" ? encode(data) : data;
await writeFrame(
{
isLastFrame: true,
- opcode: OpCode.Close,
+ opcode: OpCode.Ping,
mask: this.mask,
payload
},
- this.conn
+ this.bufWriter
);
}
@@ -297,7 +302,7 @@ class WebSocketImpl implements WebSocket {
const header = [code >>> 8, code & 0x00ff];
let payload: Uint8Array;
if (reason) {
- const reasonBytes = this.encoder.encode(reason);
+ const reasonBytes = encode(reason);
payload = new Uint8Array(2 + reasonBytes.byteLength);
payload.set(header);
payload.set(reasonBytes, 2);
@@ -311,7 +316,7 @@ class WebSocketImpl implements WebSocket {
mask: this.mask,
payload
},
- this.conn
+ this.bufWriter
);
} catch (e) {
throw e;
@@ -320,11 +325,10 @@ class WebSocketImpl implements WebSocket {
}
}
- private ensureSocketClosed(): Error {
+ private ensureSocketClosed(): void {
if (this.isClosed) {
return;
}
-
try {
this.conn.close();
} catch (e) {
@@ -335,16 +339,20 @@ class WebSocketImpl implements WebSocket {
}
}
+/** Return whether given headers is acceptable for websocket */
export function acceptable(req: { headers: Headers }): boolean {
+ const secKey = req.headers.get("sec-websocket-key");
return (
req.headers.get("upgrade") === "websocket" &&
req.headers.has("sec-websocket-key") &&
- req.headers.get("sec-websocket-key").length > 0
+ typeof secKey === "string" &&
+ secKey.length > 0
);
}
const kGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+/** Create sec-websocket-accept header value with given nonce */
export function createSecAccept(nonce: string): string {
const sha1 = new Sha1();
sha1.update(nonce + kGUID);
@@ -352,16 +360,22 @@ export function createSecAccept(nonce: string): string {
return btoa(String.fromCharCode.apply(String, bytes));
}
+/** Upgrade given TCP connection into websocket connection */
export async function acceptWebSocket(req: {
conn: Conn;
+ bufWriter: BufWriter;
+ bufReader: BufReader;
headers: Headers;
}): Promise<WebSocket> {
- const { conn, headers } = req;
+ const { conn, headers, bufReader, bufWriter } = req;
if (acceptable(req)) {
- const sock = new WebSocketImpl(conn);
+ const sock = new WebSocketImpl(conn, { bufReader, bufWriter });
const secKey = headers.get("sec-websocket-key");
+ if (typeof secKey !== "string") {
+ throw new Error("sec-websocket-key is not provided");
+ }
const secAccept = createSecAccept(secKey);
- await writeResponse(conn, {
+ await writeResponse(bufWriter, {
status: 101,
headers: new Headers({
Upgrade: "websocket",
@@ -373,3 +387,94 @@ export async function acceptWebSocket(req: {
}
throw new Error("request is not acceptable");
}
+
+const kSecChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-.~_";
+
+/** Create WebSocket-Sec-Key. Base64 encoded 16 bytes string */
+export function createSecKey(): string {
+ let key = "";
+ for (let i = 0; i < 16; i++) {
+ const j = Math.round(Math.random() * kSecChars.length);
+ key += kSecChars[j];
+ }
+ return btoa(key);
+}
+
+/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */
+export async function connectWebSocket(
+ endpoint: string,
+ headers: Headers = new Headers()
+): Promise<WebSocket> {
+ const url = new URL(endpoint);
+ const { hostname, pathname, searchParams } = url;
+ let port = url.port;
+ if (!url.port) {
+ if (url.protocol === "http" || url.protocol === "ws") {
+ port = "80";
+ } else if (url.protocol === "https" || url.protocol === "wss") {
+ throw new Error("currently https/wss is not supported");
+ }
+ }
+ const conn = await Deno.dial("tcp", `${hostname}:${port}`);
+ const abortHandshake = (err: Error): void => {
+ conn.close();
+ throw err;
+ };
+ const bufWriter = new BufWriter(conn);
+ const bufReader = new BufReader(conn);
+ await bufWriter.write(
+ encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`)
+ );
+ const key = createSecKey();
+ if (!headers.has("host")) {
+ headers.set("host", hostname);
+ }
+ headers.set("upgrade", "websocket");
+ headers.set("connection", "upgrade");
+ headers.set("sec-websocket-key", key);
+ let headerStr = "";
+ for (const [key, value] of headers) {
+ headerStr += `${key}: ${value}\r\n`;
+ }
+ headerStr += "\r\n";
+ await bufWriter.write(encode(headerStr));
+ let err, statusLine, responseHeaders;
+ err = await bufWriter.flush();
+ if (err) {
+ throw new Error("ws: failed to send handshake: " + err);
+ }
+ const tpReader = new TextProtoReader(bufReader);
+ [statusLine, err] = await tpReader.readLine();
+ if (err) {
+ abortHandshake(new Error("ws: failed to read status line: " + err));
+ }
+ const m = statusLine.match(/^(.+?) (.+?) (.+?)$/);
+ if (!m) {
+ abortHandshake(new Error("ws: invalid status line: " + statusLine));
+ }
+ const [_, version, statusCode] = m;
+ if (version !== "HTTP/1.1" || statusCode !== "101") {
+ abortHandshake(
+ new Error(
+ `ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}`
+ )
+ );
+ }
+ [responseHeaders, err] = await tpReader.readMIMEHeader();
+ if (err) {
+ abortHandshake(new Error("ws: failed to parse response headers: " + err));
+ }
+ const expectedSecAccept = createSecAccept(key);
+ const secAccept = responseHeaders.get("sec-websocket-accept");
+ if (secAccept !== expectedSecAccept) {
+ abortHandshake(
+ new Error(
+ `ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}`
+ )
+ );
+ }
+ return new WebSocketImpl(conn, {
+ bufWriter,
+ bufReader
+ });
+}
diff --git a/ws/test.ts b/ws/test.ts
index 5d0cc9093..7b0bd6949 100644
--- a/ws/test.ts
+++ b/ws/test.ts
@@ -1,19 +1,21 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import "./sha1_test.ts";
-
-const { Buffer } = Deno;
import { BufReader } from "../io/bufio.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
-import { test } from "../testing/mod.ts";
+import { runIfMain, test } from "../testing/mod.ts";
import {
acceptable,
createSecAccept,
OpCode,
readFrame,
- unmask
+ unmask,
+ writeFrame
} from "./mod.ts";
+import { encode } from "../strings/strings.ts";
+
+const { Buffer } = Deno;
-test(async function testReadUnmaskedTextFrame(): Promise<void> {
+test(async function wsReadUnmaskedTextFrame(): Promise<void> {
// unmasked single text frame with payload "Hello"
const buf = new BufReader(
new Buffer(new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]))
@@ -25,7 +27,7 @@ test(async function testReadUnmaskedTextFrame(): Promise<void> {
assertEquals(frame.isLastFrame, true);
});
-test(async function testReadMakedTextFrame(): Promise<void> {
+test(async function wsReadMaskedTextFrame(): Promise<void> {
//a masked single text frame with payload "Hello"
const buf = new BufReader(
new Buffer(
@@ -52,7 +54,7 @@ test(async function testReadMakedTextFrame(): Promise<void> {
assertEquals(frame.isLastFrame, true);
});
-test(async function testReadUnmaskedSplittedTextFrames(): Promise<void> {
+test(async function wsReadUnmaskedSplitTextFrames(): Promise<void> {
const buf1 = new BufReader(
new Buffer(new Uint8Array([0x01, 0x03, 0x48, 0x65, 0x6c]))
);
@@ -71,7 +73,7 @@ test(async function testReadUnmaskedSplittedTextFrames(): Promise<void> {
assertEquals(new Buffer(f2.payload).toString(), "lo");
});
-test(async function testReadUnmaksedPingPongFrame(): Promise<void> {
+test(async function wsReadUnmaskedPingPongFrame(): Promise<void> {
// unmasked ping with payload "Hello"
const buf = new BufReader(
new Buffer(new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]))
@@ -104,7 +106,7 @@ test(async function testReadUnmaksedPingPongFrame(): Promise<void> {
assertEquals(new Buffer(pong.payload).toString(), "Hello");
});
-test(async function testReadUnmaksedBigBinaryFrame(): Promise<void> {
+test(async function wsReadUnmaskedBigBinaryFrame(): Promise<void> {
const a = [0x82, 0x7e, 0x01, 0x00];
for (let i = 0; i < 256; i++) {
a.push(i);
@@ -117,7 +119,7 @@ test(async function testReadUnmaksedBigBinaryFrame(): Promise<void> {
assertEquals(bin.payload.length, 256);
});
-test(async function testReadUnmaskedBigBigBinaryFrame(): Promise<void> {
+test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise<void> {
const a = [0x82, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00];
for (let i = 0; i < 0xffff; i++) {
a.push(i);
@@ -130,13 +132,13 @@ test(async function testReadUnmaskedBigBigBinaryFrame(): Promise<void> {
assertEquals(bin.payload.length, 0xffff + 1);
});
-test(async function testCreateSecAccept(): Promise<void> {
+test(async function wsCreateSecAccept(): Promise<void> {
const nonce = "dGhlIHNhbXBsZSBub25jZQ==";
const d = createSecAccept(nonce);
assertEquals(d, "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=");
});
-test(function testAcceptable(): void {
+test(function wsAcceptable(): void {
const ret = acceptable({
headers: new Headers({
upgrade: "websocket",
@@ -153,7 +155,7 @@ const invalidHeaders = [
{ upgrade: "websocket", "sec-websocket-ky": "" }
];
-test(function testAcceptableInvalid(): void {
+test(function wsAcceptableInvalid(): void {
for (const pat of invalidHeaders) {
const ret = acceptable({
headers: new Headers(pat)
@@ -161,3 +163,27 @@ test(function testAcceptableInvalid(): void {
assertEquals(ret, false);
}
});
+
+test(async function wsWriteReadMaskedFrame(): Promise<void> {
+ const mask = new Uint8Array([0, 1, 2, 3]);
+ const msg = "hello";
+ const buf = new Buffer();
+ const r = new BufReader(buf);
+ await writeFrame(
+ {
+ isLastFrame: true,
+ mask,
+ opcode: OpCode.TextFrame,
+ payload: encode(msg)
+ },
+ buf
+ );
+ const frame = await readFrame(r);
+ assertEquals(frame.opcode, OpCode.TextFrame);
+ assertEquals(frame.isLastFrame, true);
+ assertEquals(frame.mask, mask);
+ unmask(frame.payload, frame.mask);
+ assertEquals(frame.payload, encode(msg));
+});
+
+runIfMain(import.meta);