summaryrefslogtreecommitdiff
path: root/ws
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2019-05-23 19:04:06 -0700
committerBert Belder <bertbelder@gmail.com>2019-05-29 09:50:12 -0700
commitb95f79d74cbcf3492abd95d4c90839e32f51399f (patch)
treed0c68f01c798da1e3b81930cfa58a5370c56775f /ws
parent5b37b560fb047e1df6e6f68fcbaece922637a93c (diff)
io: refactor BufReader/Writer interfaces to be more idiomatic (denoland/deno_std#444)
Thanks Vincent Le Goff (@zekth) for porting over the CSV reader implementation. Fixes: denoland/deno_std#436 Original: https://github.com/denoland/deno_std/commit/0ee6334b698072b50c6f5ac8d42d34dc4c94b948
Diffstat (limited to 'ws')
-rw-r--r--ws/mod.ts106
-rw-r--r--ws/test.ts10
2 files changed, 62 insertions, 54 deletions
diff --git a/ws/mod.ts b/ws/mod.ts
index ced566d45..7d8200dfc 100644
--- a/ws/mod.ts
+++ b/ws/mod.ts
@@ -4,7 +4,7 @@ import { decode, encode } from "../strings/mod.ts";
type Conn = Deno.Conn;
type Writer = Deno.Writer;
-import { BufReader, BufWriter } from "../io/bufio.ts";
+import { BufReader, BufWriter, EOF, UnexpectedEOFError } from "../io/bufio.ts";
import { readLong, readShort, sliceLongToBytes } from "../io/ioutil.ts";
import { Sha1 } from "./sha1.ts";
import { writeResponse } from "../http/server.ts";
@@ -130,8 +130,7 @@ export async function writeFrame(
header = append(header, frame.payload);
const w = BufWriter.create(writer);
await w.write(header);
- const err = await w.flush();
- if (err) throw err;
+ await w.flush();
}
/** Read websocket frame from given BufReader */
@@ -403,79 +402,86 @@ export function createSecKey(): string {
return btoa(key);
}
-/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */
-export async function connectWebSocket(
- endpoint: string,
- headers: Headers = new Headers()
-): Promise<WebSocket> {
- const url = new URL(endpoint);
+async function handshake(
+ url: URL,
+ headers: Headers,
+ bufReader: BufReader,
+ bufWriter: BufWriter
+): Promise<void> {
const { hostname, pathname, searchParams } = url;
- let port = url.port;
- if (!url.port) {
- if (url.protocol === "http" || url.protocol === "ws") {
- port = "80";
- } else if (url.protocol === "https" || url.protocol === "wss") {
- throw new Error("currently https/wss is not supported");
- }
- }
- const conn = await Deno.dial("tcp", `${hostname}:${port}`);
- const abortHandshake = (err: Error): void => {
- conn.close();
- throw err;
- };
- const bufWriter = new BufWriter(conn);
- const bufReader = new BufReader(conn);
- await bufWriter.write(
- encode(`GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`)
- );
const key = createSecKey();
+
if (!headers.has("host")) {
headers.set("host", hostname);
}
headers.set("upgrade", "websocket");
headers.set("connection", "upgrade");
headers.set("sec-websocket-key", key);
- let headerStr = "";
+
+ let headerStr = `GET ${pathname}?${searchParams || ""} HTTP/1.1\r\n`;
for (const [key, value] of headers) {
headerStr += `${key}: ${value}\r\n`;
}
headerStr += "\r\n";
+
await bufWriter.write(encode(headerStr));
- let err, statusLine, responseHeaders;
- err = await bufWriter.flush();
- if (err) {
- throw new Error("ws: failed to send handshake: " + err);
- }
+ await bufWriter.flush();
+
const tpReader = new TextProtoReader(bufReader);
- [statusLine, err] = await tpReader.readLine();
- if (err) {
- abortHandshake(new Error("ws: failed to read status line: " + err));
+ const statusLine = await tpReader.readLine();
+ if (statusLine === EOF) {
+ throw new UnexpectedEOFError();
}
- const m = statusLine.match(/^(.+?) (.+?) (.+?)$/);
+ const m = statusLine.match(/^(?<version>\S+) (?<statusCode>\S+) /);
if (!m) {
- abortHandshake(new Error("ws: invalid status line: " + statusLine));
+ throw new Error("ws: invalid status line: " + statusLine);
}
- const [_, version, statusCode] = m;
+ const { version, statusCode } = m.groups;
if (version !== "HTTP/1.1" || statusCode !== "101") {
- abortHandshake(
- new Error(
- `ws: server didn't accept handshake: version=${version}, statusCode=${statusCode}`
- )
+ throw new Error(
+ `ws: server didn't accept handshake: ` +
+ `version=${version}, statusCode=${statusCode}`
);
}
- [responseHeaders, err] = await tpReader.readMIMEHeader();
- if (err) {
- abortHandshake(new Error("ws: failed to parse response headers: " + err));
+
+ const responseHeaders = await tpReader.readMIMEHeader();
+ if (responseHeaders === EOF) {
+ throw new UnexpectedEOFError();
}
+
const expectedSecAccept = createSecAccept(key);
const secAccept = responseHeaders.get("sec-websocket-accept");
if (secAccept !== expectedSecAccept) {
- abortHandshake(
- new Error(
- `ws: unexpected sec-websocket-accept header: expected=${expectedSecAccept}, actual=${secAccept}`
- )
+ throw new Error(
+ `ws: unexpected sec-websocket-accept header: ` +
+ `expected=${expectedSecAccept}, actual=${secAccept}`
);
}
+}
+
+/** Connect to given websocket endpoint url. Endpoint must be acceptable for URL */
+export async function connectWebSocket(
+ endpoint: string,
+ headers: Headers = new Headers()
+): Promise<WebSocket> {
+ const url = new URL(endpoint);
+ let { hostname, port } = url;
+ if (!port) {
+ if (url.protocol === "http" || url.protocol === "ws") {
+ port = "80";
+ } else if (url.protocol === "https" || url.protocol === "wss") {
+ throw new Error("currently https/wss is not supported");
+ }
+ }
+ const conn = await Deno.dial("tcp", `${hostname}:${port}`);
+ const bufWriter = new BufWriter(conn);
+ const bufReader = new BufReader(conn);
+ try {
+ await handshake(url, headers, bufReader, bufWriter);
+ } catch (err) {
+ conn.close();
+ throw err;
+ }
return new WebSocketImpl(conn, {
bufWriter,
bufReader
diff --git a/ws/test.ts b/ws/test.ts
index 93936988a..bac82453d 100644
--- a/ws/test.ts
+++ b/ws/test.ts
@@ -107,8 +107,9 @@ test(async function wsReadUnmaskedPingPongFrame(): Promise<void> {
});
test(async function wsReadUnmaskedBigBinaryFrame(): Promise<void> {
+ const payloadLength = 0x100;
const a = [0x82, 0x7e, 0x01, 0x00];
- for (let i = 0; i < 256; i++) {
+ for (let i = 0; i < payloadLength; i++) {
a.push(i);
}
const buf = new BufReader(new Buffer(new Uint8Array(a)));
@@ -116,12 +117,13 @@ test(async function wsReadUnmaskedBigBinaryFrame(): Promise<void> {
assertEquals(bin.opcode, OpCode.BinaryFrame);
assertEquals(bin.isLastFrame, true);
assertEquals(bin.mask, undefined);
- assertEquals(bin.payload.length, 256);
+ assertEquals(bin.payload.length, payloadLength);
});
test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise<void> {
+ const payloadLength = 0x10000;
const a = [0x82, 0x7f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00];
- for (let i = 0; i < 0xffff; i++) {
+ for (let i = 0; i < payloadLength; i++) {
a.push(i);
}
const buf = new BufReader(new Buffer(new Uint8Array(a)));
@@ -129,7 +131,7 @@ test(async function wsReadUnmaskedBigBigBinaryFrame(): Promise<void> {
assertEquals(bin.opcode, OpCode.BinaryFrame);
assertEquals(bin.isLastFrame, true);
assertEquals(bin.mask, undefined);
- assertEquals(bin.payload.length, 0xffff + 1);
+ assertEquals(bin.payload.length, payloadLength);
});
test(async function wsCreateSecAccept(): Promise<void> {