diff options
| author | Bert Belder <bertbelder@gmail.com> | 2019-05-23 19:04:06 -0700 |
|---|---|---|
| committer | Bert Belder <bertbelder@gmail.com> | 2019-05-29 09:50:12 -0700 |
| commit | b95f79d74cbcf3492abd95d4c90839e32f51399f (patch) | |
| tree | d0c68f01c798da1e3b81930cfa58a5370c56775f /ws | |
| parent | 5b37b560fb047e1df6e6f68fcbaece922637a93c (diff) | |
io: refactor BufReader/Writer interfaces to be more idiomatic (denoland/deno_std#444)
Thanks Vincent Le Goff (@zekth) for porting over the CSV reader
implementation.
Fixes: denoland/deno_std#436
Original: https://github.com/denoland/deno_std/commit/0ee6334b698072b50c6f5ac8d42d34dc4c94b948
Diffstat (limited to 'ws')
| -rw-r--r-- | ws/mod.ts | 106 | ||||
| -rw-r--r-- | ws/test.ts | 10 |
2 files changed, 62 insertions, 54 deletions
@@ -4,7 +4,7 @@ import { decode, encode } from "../strings/mod.ts"; type Conn = Deno.Conn; type Writer = Deno.Writer; -import { BufReader, BufWriter } from "../io/bufio.ts"; +import { BufReader, BufWriter, EOF, UnexpectedEOFError } from "../io/bufio.ts"; import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts"; import { Sha1 } from "./sha1.ts"; import { writeResponse } from "../http/server.ts"; @@ -130,8 +130,7 @@ export async function writeFrame( header = append(header, frame.payload); const w = BufWriter.create(writer); await w.write(header); - const err = await w.flush(); - if (err) throw err; + await w.flush(); } /** Read websocket frame from given BufReader */ @@ -403,79 +402,86 @@ export function createSecKey(): string { return btoa(key); } -/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */ -export async function connectWebSocket( - endpoint: string, - headers: Headers = new Headers() -): Promise<WebSocket> { - const url = new URL(endpoint); +async function handshake( + url: URL, + headers: Headers, + bufReader: BufReader, + bufWriter: BufWriter +): Promise<void> { const { hostname, pathname, searchParams } = url; - let port = url.port; - if (!url.port) { - if (url.protocol === "http" || url.protocol === "ws") { - port = "80"; - } else if (url.protocol === "https" || url.protocol === "wss") { - throw new Error("currently https/wss is not supported"); - } - } - const conn = await Deno.dial("tcp", `${hostname}:${port}`); - const abortHandshake = (err: Error): void => { - conn.close(); - throw err; - }; - const bufWriter = new BufWriter(conn); - const bufReader = new BufReader(conn); - await bufWriter.write( - encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`) - ); const key = createSecKey(); + if (!headers.has("host")) { headers.set("host", hostname); } headers.set("upgrade", "websocket"); headers.set("connection", "upgrade"); headers.set("sec-websocket-key", key); - let headerStr = ""; + + let headerStr = `GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`; for (const [key, value] of headers) { headerStr += `${key}: ${value}\r\n`; } headerStr += "\r\n"; + await bufWriter.write(encode(headerStr)); - let err, statusLine, responseHeaders; - err = await bufWriter.flush(); - if (err) { - throw new Error("ws: failed to send handshake: " + err); - } + await bufWriter.flush(); + const tpReader = new TextProtoReader(bufReader); - [statusLine, err] = await tpReader.readLine(); - if (err) { - abortHandshake(new Error("ws: failed to read status line: " + err)); + const statusLine = await tpReader.readLine(); + if (statusLine === EOF) { + throw new UnexpectedEOFError(); } - const m = statusLine.match(/^(.+?) (.+?) (.+?)$/); + const m = statusLine.match(/^(?<version>\S+) (?<statusCode>\S+) /); if (!m) { - abortHandshake(new Error("ws: invalid status line: " + statusLine)); + throw new Error("ws: invalid status line: " + statusLine); } - const [_, version, statusCode] = m; + const { version, statusCode } = m.groups; if (version !== "HTTP/1.1" || statusCode !== "101") { - abortHandshake( - new Error( - `ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}` - ) + throw new Error( + `ws: server didn't accept handshake: ` + + `version=${version}, statusCode=${statusCode}` ); } - [responseHeaders, err] = await tpReader.readMIMEHeader(); - if (err) { - abortHandshake(new Error("ws: failed to parse response headers: " + err)); + + const responseHeaders = await tpReader.readMIMEHeader(); + if (responseHeaders === EOF) { + throw new UnexpectedEOFError(); } + const expectedSecAccept = createSecAccept(key); const secAccept = responseHeaders.get("sec-websocket-accept"); if (secAccept !== expectedSecAccept) { - abortHandshake( - new Error( - `ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}` - ) + throw new Error( + `ws: unexpected sec-websocket-accept header: ` + + `expected=${expectedSecAccept}, actual=${secAccept}` ); } +} + +/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */ +export async function connectWebSocket( + endpoint: string, + headers: Headers = new Headers() +): Promise<WebSocket> { + const url = new URL(endpoint); + let { hostname, port } = url; + if (!port) { + if (url.protocol === "http" || url.protocol === "ws") { + port = "80"; + } else if (url.protocol === "https" || url.protocol === "wss") { + throw new Error("currently https/wss is not supported"); + } + } + const conn = await Deno.dial("tcp", `${hostname}:${port}`); + const bufWriter = new BufWriter(conn); + const bufReader = new BufReader(conn); + try { + await handshake(url, headers, bufReader, bufWriter); + } catch (err) { + conn.close(); + throw err; + } return new WebSocketImpl(conn, { bufWriter, bufReader diff --git a/ws/test.ts b/ws/test.ts index 93936988a..bac82453d 100644 --- a/ws/test.ts +++ b/ws/test.ts @@ -107,8 +107,9 @@ test(async function wsReadUnmaskedPingPongFrame(): Promise<void> { }); test(async function wsReadUnmaskedBigBinaryFrame(): Promise<void> { + const payloadLength = 0x100; const a = [0x82, 0x7e, 0x01, 0x00]; - for (let i = 0; i < 256; i++) { + for (let i = 0; i < payloadLength; i++) { a.push(i); } const buf = new BufReader(new Buffer(new Uint8Array(a))); @@ -116,12 +117,13 @@ test(async function wsReadUnmaskedBigBinaryFrame(): Promise<void> { assertEquals(bin.opcode, OpCode.BinaryFrame); assertEquals(bin.isLastFrame, true); assertEquals(bin.mask, undefined); - assertEquals(bin.payload.length, 256); + assertEquals(bin.payload.length, payloadLength); }); test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise<void> { + const payloadLength = 0x10000; const a = [0x82, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00]; - for (let i = 0; i < 0xffff; i++) { + for (let i = 0; i < payloadLength; i++) { a.push(i); } const buf = new BufReader(new Buffer(new Uint8Array(a))); @@ -129,7 +131,7 @@ test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise<void> { assertEquals(bin.opcode, OpCode.BinaryFrame); assertEquals(bin.isLastFrame, true); assertEquals(bin.mask, undefined); - assertEquals(bin.payload.length, 0xffff + 1); + assertEquals(bin.payload.length, payloadLength); }); test(async function wsCreateSecAccept(): Promise<void> { |
