diff options
| author | Yusuke Sakurai <kerokerokerop@gmail.com> | 2019-05-15 04:19:12 +0900 |
|---|---|---|
| committer | Ryan Dahl <ry@tinyclouds.org> | 2019-05-14 15:19:11 -0400 |
| commit | e3e9269c76299df99975e17a04b4d1b1ca39dfcb (patch) | |
| tree | c0bab4773a25589edaea12287c6a8422bd5208f1 /ws/mod.ts | |
| parent | a3de8c3d8a376049a37f8193c8538acc0d7a88f3 (diff) | |
feat: ws client (denoland/deno_std#394)
Original: https://github.com/denoland/deno_std/commit/782e3f690ffb9ee0dd89a5a64a3f2b753899719b
Diffstat (limited to 'ws/mod.ts')
| -rw-r--r-- | ws/mod.ts | 249 |
1 files changed, 177 insertions, 72 deletions
@@ -1,11 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -const { Buffer } = Deno; + +import { decode, encode } from "../strings/strings.ts"; + type Conn = Deno.Conn; type Writer = Deno.Writer; import { BufReader, BufWriter } from "../io/bufio.ts"; import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts"; import { Sha1 } from "./sha1.ts"; import { writeResponse } from "../http/server.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; export enum OpCode { Continue = 0x0, @@ -70,13 +73,19 @@ export interface WebSocketFrame { } export interface WebSocket { + readonly conn: Conn; readonly isClosed: boolean; + receive(): AsyncIterableIterator<WebSocketEvent>; + send(data: WebSocketMessage): Promise<void>; + ping(data?: WebSocketMessage): Promise<void>; + close(code: number, reason?: string): Promise<void>; } +/** Unmask masked websocket payload */ export function unmask(payload: Uint8Array, mask?: Uint8Array): void { if (mask) { for (let i = 0, len = payload.length; i < len; i++) { @@ -85,6 +94,7 @@ export function unmask(payload: Uint8Array, mask?: Uint8Array): void { } } +/** Write websocket frame to given writer */ export async function writeFrame( frame: WebSocketFrame, writer: Writer @@ -92,6 +102,11 @@ export async function writeFrame( const payloadLength = frame.payload.byteLength; let header: Uint8Array; const hasMask = frame.mask ? 0x80 : 0; + if (frame.mask && frame.mask.byteLength !== 4) { + throw new Error( + "invalid mask. mask must be 4 bytes: length=" + frame.mask.byteLength + ); + } if (payloadLength < 126) { header = new Uint8Array([0x80 | frame.opcode, hasMask | payloadLength]); } else if (payloadLength < 0xffff) { @@ -108,13 +123,18 @@ export async function writeFrame( ...sliceLongToBytes(payloadLength) ]); } + if (frame.mask) { + header = append(header, frame.mask); + } unmask(frame.payload, frame.mask); - const bytes = append(header, frame.payload); - const w = new BufWriter(writer); - await w.write(bytes); - await w.flush(); + header = append(header, frame.payload); + const w = BufWriter.create(writer); + await w.write(header); + const err = await w.flush(); + if (err) throw err; } +/** Read websocket frame from given BufReader */ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> { let b = await buf.readByte(); let isLastFrame = false; @@ -155,62 +175,38 @@ export async function readFrame(buf: BufReader): Promise<WebSocketFrame> { }; } -export async function* receiveFrame( - conn: Conn -): AsyncIterableIterator<WebSocketFrame> { - let receiving = true; - const isLastFrame = true; - const reader = new BufReader(conn); - while (receiving) { - const frame = await readFrame(reader); - const { opcode, payload } = frame; - switch (opcode) { - case OpCode.TextFrame: - case OpCode.BinaryFrame: - case OpCode.Continue: - yield frame; - break; - case OpCode.Close: - await writeFrame( - { - opcode, - payload, - isLastFrame - }, - conn - ); - conn.close(); - yield frame; - receiving = false; - break; - case OpCode.Ping: - await writeFrame( - { - payload, - isLastFrame, - opcode: OpCode.Pong - }, - conn - ); - yield frame; - break; - case OpCode.Pong: - yield frame; - break; - default: - } - } +// Create client-to-server mask, random 32bit number +function createMask(): Uint8Array { + // TODO: use secure and immutable random function. Crypto.getRandomValues() + const arr = Array.from({ length: 4 }).map( + (): number => Math.round(Math.random() * 0xff) + ); + return new Uint8Array(arr); } class WebSocketImpl implements WebSocket { - encoder = new TextEncoder(); + private readonly mask?: Uint8Array; + private readonly bufReader: BufReader; + private readonly bufWriter: BufWriter; - constructor(private conn: Conn, private mask?: Uint8Array) {} + constructor( + readonly conn: Conn, + opts: { + bufReader?: BufReader; + bufWriter?: BufWriter; + mask?: Uint8Array; + } = {} + ) { + this.mask = opts.mask || createMask(); + this.bufReader = opts.bufReader || new BufReader(conn); + this.bufWriter = opts.bufWriter || new BufWriter(conn); + } async *receive(): AsyncIterableIterator<WebSocketEvent> { let frames: WebSocketFrame[] = []; let payloadsLength = 0; - for await (const frame of receiveFrame(this.conn)) { + while (true) { + const frame = await readFrame(this.bufReader); unmask(frame.payload, frame.mask); switch (frame.opcode) { case OpCode.TextFrame: @@ -227,7 +223,7 @@ class WebSocketImpl implements WebSocket { } if (frames[0].opcode === OpCode.TextFrame) { // text - yield new Buffer(concat).toString(); + yield decode(concat); } else { // binary yield concat; @@ -237,14 +233,23 @@ class WebSocketImpl implements WebSocket { } break; case OpCode.Close: - const code = (frame.payload[0] << 16) | frame.payload[1]; - const reason = new Buffer( + // [0x12, 0x34] -> 0x1234 + const code = (frame.payload[0] << 8) | frame.payload[1]; + const reason = decode( frame.payload.subarray(2, frame.payload.length) - ).toString(); - this._isClosed = true; + ); + await this.close(code, reason); yield { code, reason }; return; case OpCode.Ping: + await writeFrame( + { + opcode: OpCode.Pong, + payload: frame.payload, + isLastFrame: true + }, + this.bufWriter + ); yield ["ping", frame.payload] as WebSocketPingEvent; break; case OpCode.Pong: @@ -261,7 +266,7 @@ class WebSocketImpl implements WebSocket { } const opcode = typeof data === "string" ? OpCode.TextFrame : OpCode.BinaryFrame; - const payload = typeof data === "string" ? this.encoder.encode(data) : data; + const payload = typeof data === "string" ? encode(data) : data; const isLastFrame = true; await writeFrame( { @@ -270,20 +275,20 @@ class WebSocketImpl implements WebSocket { payload, mask: this.mask }, - this.conn + this.bufWriter ); } - async ping(data: WebSocketMessage): Promise<void> { - const payload = typeof data === "string" ? this.encoder.encode(data) : data; + async ping(data: WebSocketMessage = ""): Promise<void> { + const payload = typeof data === "string" ? encode(data) : data; await writeFrame( { isLastFrame: true, - opcode: OpCode.Close, + opcode: OpCode.Ping, mask: this.mask, payload }, - this.conn + this.bufWriter ); } @@ -297,7 +302,7 @@ class WebSocketImpl implements WebSocket { const header = [code >>> 8, code & 0x00ff]; let payload: Uint8Array; if (reason) { - const reasonBytes = this.encoder.encode(reason); + const reasonBytes = encode(reason); payload = new Uint8Array(2 + reasonBytes.byteLength); payload.set(header); payload.set(reasonBytes, 2); @@ -311,7 +316,7 @@ class WebSocketImpl implements WebSocket { mask: this.mask, payload }, - this.conn + this.bufWriter ); } catch (e) { throw e; @@ -320,11 +325,10 @@ class WebSocketImpl implements WebSocket { } } - private ensureSocketClosed(): Error { + private ensureSocketClosed(): void { if (this.isClosed) { return; } - try { this.conn.close(); } catch (e) { @@ -335,16 +339,20 @@ class WebSocketImpl implements WebSocket { } } +/** Return whether given headers is acceptable for websocket */ export function acceptable(req: { headers: Headers }): boolean { + const secKey = req.headers.get("sec-websocket-key"); return ( req.headers.get("upgrade") === "websocket" && req.headers.has("sec-websocket-key") && - req.headers.get("sec-websocket-key").length > 0 + typeof secKey === "string" && + secKey.length > 0 ); } const kGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; +/** Create sec-websocket-accept header value with given nonce */ export function createSecAccept(nonce: string): string { const sha1 = new Sha1(); sha1.update(nonce + kGUID); @@ -352,16 +360,22 @@ export function createSecAccept(nonce: string): string { return btoa(String.fromCharCode.apply(String, bytes)); } +/** Upgrade given TCP connection into websocket connection */ export async function acceptWebSocket(req: { conn: Conn; + bufWriter: BufWriter; + bufReader: BufReader; headers: Headers; }): Promise<WebSocket> { - const { conn, headers } = req; + const { conn, headers, bufReader, bufWriter } = req; if (acceptable(req)) { - const sock = new WebSocketImpl(conn); + const sock = new WebSocketImpl(conn, { bufReader, bufWriter }); const secKey = headers.get("sec-websocket-key"); + if (typeof secKey !== "string") { + throw new Error("sec-websocket-key is not provided"); + } const secAccept = createSecAccept(secKey); - await writeResponse(conn, { + await writeResponse(bufWriter, { status: 101, headers: new Headers({ Upgrade: "websocket", @@ -373,3 +387,94 @@ export async function acceptWebSocket(req: { } throw new Error("request is not acceptable"); } + +const kSecChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-.~_"; + +/** Create WebSocket-Sec-Key. Base64 encoded 16 bytes string */ +export function createSecKey(): string { + let key = ""; + for (let i = 0; i < 16; i++) { + const j = Math.round(Math.random() * kSecChars.length); + key += kSecChars[j]; + } + return btoa(key); +} + +/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */ +export async function connectWebSocket( + endpoint: string, + headers: Headers = new Headers() +): Promise<WebSocket> { + const url = new URL(endpoint); + const { hostname, pathname, searchParams } = url; + let port = url.port; + if (!url.port) { + if (url.protocol === "http" || url.protocol === "ws") { + port = "80"; + } else if (url.protocol === "https" || url.protocol === "wss") { + throw new Error("currently https/wss is not supported"); + } + } + const conn = await Deno.dial("tcp", `${hostname}:${port}`); + const abortHandshake = (err: Error): void => { + conn.close(); + throw err; + }; + const bufWriter = new BufWriter(conn); + const bufReader = new BufReader(conn); + await bufWriter.write( + encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`) + ); + const key = createSecKey(); + if (!headers.has("host")) { + headers.set("host", hostname); + } + headers.set("upgrade", "websocket"); + headers.set("connection", "upgrade"); + headers.set("sec-websocket-key", key); + let headerStr = ""; + for (const [key, value] of headers) { + headerStr += `${key}: ${value}\r\n`; + } + headerStr += "\r\n"; + await bufWriter.write(encode(headerStr)); + let err, statusLine, responseHeaders; + err = await bufWriter.flush(); + if (err) { + throw new Error("ws: failed to send handshake: " + err); + } + const tpReader = new TextProtoReader(bufReader); + [statusLine, err] = await tpReader.readLine(); + if (err) { + abortHandshake(new Error("ws: failed to read status line: " + err)); + } + const m = statusLine.match(/^(.+?) (.+?) (.+?)$/); + if (!m) { + abortHandshake(new Error("ws: invalid status line: " + statusLine)); + } + const [_, version, statusCode] = m; + if (version !== "HTTP/1.1" || statusCode !== "101") { + abortHandshake( + new Error( + `ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}` + ) + ); + } + [responseHeaders, err] = await tpReader.readMIMEHeader(); + if (err) { + abortHandshake(new Error("ws: failed to parse response headers: " + err)); + } + const expectedSecAccept = createSecAccept(key); + const secAccept = responseHeaders.get("sec-websocket-accept"); + if (secAccept !== expectedSecAccept) { + abortHandshake( + new Error( + `ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}` + ) + ); + } + return new WebSocketImpl(conn, { + bufWriter, + bufReader + }); +} |
