diff options
author | Andrey Trebler <at@edrilling.no> | 2020-05-04 18:27:06 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-04 12:27:06 -0400 |
commit | 796fc9bc3e8e4da7d53fb4617511ce4e2be22485 (patch) | |
tree | d2c140fcfb95a102b4b7b241f8edfe8fc7a971fe | |
parent | 38ecabf205336c2cf51f2a18919da3dcb1a7db97 (diff) |
BREAKING: make WebSocket directly implement AsyncIterable (#5044) (#5045)
-rw-r--r-- | std/examples/chat/server.ts | 2 | ||||
-rw-r--r-- | std/examples/chat/server_test.ts | 3 | ||||
-rw-r--r-- | std/ws/README.md | 181 | ||||
-rw-r--r-- | std/ws/example_client.ts | 75 | ||||
-rw-r--r-- | std/ws/example_server.ts | 83 | ||||
-rw-r--r-- | std/ws/mod.ts | 11 | ||||
-rw-r--r-- | std/ws/test.ts | 58 |
7 files changed, 229 insertions, 184 deletions
diff --git a/std/examples/chat/server.ts b/std/examples/chat/server.ts index a86ed737a..cb1be530e 100644 --- a/std/examples/chat/server.ts +++ b/std/examples/chat/server.ts @@ -18,7 +18,7 @@ async function wsHandler(ws: WebSocket): Promise<void> { const id = ++clientId; clients.set(id, ws); dispatch(`Connected: [${id}]`); - for await (const msg of ws.receive()) { + for await (const msg of ws) { console.log(`msg:${id}`, msg); if (typeof msg === "string") { dispatch(`[${id}]: ${msg}`); diff --git a/std/examples/chat/server_test.ts b/std/examples/chat/server_test.ts index d1c1a8afa..92eb50f92 100644 --- a/std/examples/chat/server_test.ts +++ b/std/examples/chat/server_test.ts @@ -59,7 +59,8 @@ test({ let ws: WebSocket | undefined; try { ws = await connectWebSocket("http://127.0.0.1:8080/ws"); - const it = ws.receive(); + const it = ws[Symbol.asyncIterator](); + assertEquals((await it.next()).value, "Connected: [1]"); ws.send("Hello"); assertEquals((await it.next()).value, "[1]: Hello"); diff --git a/std/ws/README.md b/std/ws/README.md index 5dcb9ce90..e4d924df6 100644 --- a/std/ws/README.md +++ b/std/ws/README.md @@ -12,57 +12,54 @@ import { acceptWebSocket, isWebSocketCloseEvent, isWebSocketPingEvent, - WebSocket, } from "https://deno.land/std/ws/mod.ts"; /** websocket echo server */ const port = Deno.args[0] || "8080"; console.log(`websocket server is running on :${port}`); for await (const req of serve(`:${port}`)) { - const { headers, conn } = req; - acceptWebSocket({ - conn, - headers, - bufReader: req.r, - bufWriter: req.w, - }) - .then( - async (sock: WebSocket): Promise<void> => { - console.log("socket connected!"); - const it = sock.receive(); - while (true) { - try { - const { done, value } = await it.next(); - if (done) { - break; - } - const ev = value; - if (typeof ev === "string") { - // text message - console.log("ws:Text", ev); - await sock.send(ev); - } else if (ev instanceof Uint8Array) { - // binary message - console.log("ws:Binary", ev); - } else if (isWebSocketPingEvent(ev)) { - const [, body] = ev; - // ping - console.log("ws:Ping", body); - } else if (isWebSocketCloseEvent(ev)) { - // close - const { code, reason } = ev; - console.log("ws:Close", code, reason); - } - } catch (e) { - console.error(`failed to receive frame: ${e}`); - await sock.close(1000).catch(console.error); - } + const { conn, r: bufReader, w: bufWriter, headers } = req; + + try { + const sock = await acceptWebSocket({ + conn, + bufReader, + bufWriter, + headers, + }); + + console.log("socket connected!"); + + try { + for await (const ev of sock) { + if (typeof ev === "string") { + // text message + console.log("ws:Text", ev); + await sock.send(ev); + } else if (ev instanceof Uint8Array) { + // binary message + console.log("ws:Binary", ev); + } else if (isWebSocketPingEvent(ev)) { + const [, body] = ev; + // ping + console.log("ws:Ping", body); + } else if (isWebSocketCloseEvent(ev)) { + // close + const { code, reason } = ev; + console.log("ws:Close", code, reason); } } - ) - .catch((err: Error): void => { - console.error(`failed to accept websocket: ${err}`); - }); + } catch (err) { + console.error(`failed to receive frame: ${err}`); + + if (!sock.isClosed) { + await sock.close(1000).catch(console.error); + } + } + } catch (err) { + console.error(`failed to accept websocket: ${err}`); + await req.respond({ status: 400 }); + } } ``` @@ -75,51 +72,58 @@ import { isWebSocketPingEvent, isWebSocketPongEvent, } from "https://deno.land/std/ws/mod.ts"; -import { encode } from "https://deno.land/std/strings/mod.ts"; +import { encode } from "https://deno.land/std/encoding/utf8.ts"; import { BufReader } from "https://deno.land/std/io/bufio.ts"; import { TextProtoReader } from "https://deno.land/std/textproto/mod.ts"; import { blue, green, red, yellow } from "https://deno.land/std/fmt/colors.ts"; const endpoint = Deno.args[0] || "ws://127.0.0.1:8080"; /** simple websocket cli */ -const sock = await connectWebSocket(endpoint); -console.log(green("ws connected! (type 'close' to quit)")); -(async function (): Promise<void> { - for await (const msg of sock.receive()) { - if (typeof msg === "string") { - console.log(yellow("< " + msg)); - } else if (isWebSocketPingEvent(msg)) { - console.log(blue("< ping")); - } else if (isWebSocketPongEvent(msg)) { - console.log(blue("< pong")); - } else if (isWebSocketCloseEvent(msg)) { - console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`)); +try { + const sock = await connectWebSocket(endpoint); + console.log(green("ws connected! (type 'close' to quit)")); + + const messages = async (): Promise<void> => { + for await (const msg of sock) { + if (typeof msg === "string") { + console.log(yellow(`< ${msg}`)); + } else if (isWebSocketPingEvent(msg)) { + console.log(blue("< ping")); + } else if (isWebSocketPongEvent(msg)) { + console.log(blue("< pong")); + } else if (isWebSocketCloseEvent(msg)) { + console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`)); + } } + }; + + const cli = async (): Promise<void> => { + const tpr = new TextProtoReader(new BufReader(Deno.stdin)); + while (true) { + await Deno.stdout.write(encode("> ")); + const line = await tpr.readLine(); + if (line === null) { + break; + } + if (line === "close") { + break; + } else if (line === "ping") { + await sock.ping(); + } else { + await sock.send(line); + } + } + }; + + await Promise.race([messages(), cli()]).catch(console.error); + + if (!sock.isClosed) { + await sock.close(1000).catch(console.error); } -})(); - -const tpr = new TextProtoReader(new BufReader(Deno.stdin)); -while (true) { - await Deno.stdout.write(encode("> ")); - const line = await tpr.readLine(); - if (line === null) { - break; - } - if (line === "close") { - break; - } else if (line === "ping") { - await sock.ping(); - } else { - await sock.send(line); - } - // FIXME: Without this, - // sock.receive() won't resolved though it is readable... - await new Promise((resolve): void => { - setTimeout(resolve, 0); - }); +} catch (err) { + console.error(red(`Could not connect to WebSocket: '${err}'`)); } -await sock.close(1000); -// FIXME: conn.close() won't shutdown process... + Deno.exit(0); ``` @@ -137,25 +141,6 @@ Returns true if input value is a WebSocketPingEvent, false otherwise. Returns true if input value is a WebSocketPongEvent, false otherwise. -### append - -This module is used to merge two Uint8Arrays. - -- note: This module might move to common/util. - -```ts -import { append } from "https://deno.land/std/ws/mod.ts"; - -// a = [1], b = [2] -append(a, b); // output: [1, 2] - -// a = [1], b = null -append(a, b); // output: [1] - -// a = [], b = [2] -append(a, b); // output: [2] -``` - ### unmask Unmask masked WebSocket payload. diff --git a/std/ws/example_client.ts b/std/ws/example_client.ts index d680c6fef..4213025f4 100644 --- a/std/ws/example_client.ts +++ b/std/ws/example_client.ts @@ -11,42 +11,49 @@ import { blue, green, red, yellow } from "../fmt/colors.ts"; const endpoint = Deno.args[0] || "ws://127.0.0.1:8080"; /** simple websocket cli */ -const sock = await connectWebSocket(endpoint); -console.log(green("ws connected! (type 'close' to quit)")); -(async function (): Promise<void> { - for await (const msg of sock.receive()) { - if (typeof msg === "string") { - console.log(yellow("< " + msg)); - } else if (isWebSocketPingEvent(msg)) { - console.log(blue("< ping")); - } else if (isWebSocketPongEvent(msg)) { - console.log(blue("< pong")); - } else if (isWebSocketCloseEvent(msg)) { - console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`)); +try { + const sock = await connectWebSocket(endpoint); + console.log(green("ws connected! (type 'close' to quit)")); + + const messages = async (): Promise<void> => { + for await (const msg of sock) { + if (typeof msg === "string") { + console.log(yellow(`< ${msg}`)); + } else if (isWebSocketPingEvent(msg)) { + console.log(blue("< ping")); + } else if (isWebSocketPongEvent(msg)) { + console.log(blue("< pong")); + } else if (isWebSocketCloseEvent(msg)) { + console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`)); + } } - } -})(); + }; -const tpr = new TextProtoReader(new BufReader(Deno.stdin)); -while (true) { - await Deno.stdout.write(encode("> ")); - const line = await tpr.readLine(); - if (line === null) { - break; - } - if (line === "close") { - break; - } else if (line === "ping") { - await sock.ping(); - } else { - await sock.send(line); + const cli = async (): Promise<void> => { + const tpr = new TextProtoReader(new BufReader(Deno.stdin)); + while (true) { + await Deno.stdout.write(encode("> ")); + const line = await tpr.readLine(); + if (line === null) { + break; + } + if (line === "close") { + break; + } else if (line === "ping") { + await sock.ping(); + } else { + await sock.send(line); + } + } + }; + + await Promise.race([messages(), cli()]).catch(console.error); + + if (!sock.isClosed) { + await sock.close(1000).catch(console.error); } - // FIXME: Without this, - // sock.receive() won't resolved though it is readable... - await new Promise((resolve): void => { - setTimeout(resolve, 0); - }); +} catch (err) { + console.error(red(`Could not connect to WebSocket: '${err}'`)); } -await sock.close(1000); -// FIXME: conn.close() won't shutdown process... + Deno.exit(0); diff --git a/std/ws/example_server.ts b/std/ws/example_server.ts index 048198326..947b807ca 100644 --- a/std/ws/example_server.ts +++ b/std/ws/example_server.ts @@ -4,55 +4,52 @@ import { acceptWebSocket, isWebSocketCloseEvent, isWebSocketPingEvent, - WebSocket, } from "./mod.ts"; /** websocket echo server */ const port = Deno.args[0] || "8080"; console.log(`websocket server is running on :${port}`); for await (const req of serve(`:${port}`)) { - const { headers, conn } = req; - acceptWebSocket({ - conn, - headers, - bufReader: req.r, - bufWriter: req.w, - }) - .then( - async (sock: WebSocket): Promise<void> => { - console.log("socket connected!"); - const it = sock.receive(); - while (true) { - try { - const { done, value } = await it.next(); - if (done) { - break; - } - const ev = value; - if (typeof ev === "string") { - // text message - console.log("ws:Text", ev); - await sock.send(ev); - } else if (ev instanceof Uint8Array) { - // binary message - console.log("ws:Binary", ev); - } else if (isWebSocketPingEvent(ev)) { - const [, body] = ev; - // ping - console.log("ws:Ping", body); - } else if (isWebSocketCloseEvent(ev)) { - // close - const { code, reason } = ev; - console.log("ws:Close", code, reason); - } - } catch (e) { - console.error(`failed to receive frame: ${e}`); - await sock.close(1000).catch(console.error); - } + const { conn, r: bufReader, w: bufWriter, headers } = req; + + try { + const sock = await acceptWebSocket({ + conn, + bufReader, + bufWriter, + headers, + }); + + console.log("socket connected!"); + + try { + for await (const ev of sock) { + if (typeof ev === "string") { + // text message + console.log("ws:Text", ev); + await sock.send(ev); + } else if (ev instanceof Uint8Array) { + // binary message + console.log("ws:Binary", ev); + } else if (isWebSocketPingEvent(ev)) { + const [, body] = ev; + // ping + console.log("ws:Ping", body); + } else if (isWebSocketCloseEvent(ev)) { + // close + const { code, reason } = ev; + console.log("ws:Close", code, reason); } } - ) - .catch((err: Error): void => { - console.error(`failed to accept websocket: ${err}`); - }); + } catch (err) { + console.error(`failed to receive frame: ${err}`); + + if (!sock.isClosed) { + await sock.close(1000).catch(console.error); + } + } + } catch (err) { + console.error(`failed to accept websocket: ${err}`); + await req.respond({ status: 400 }); + } } diff --git a/std/ws/mod.ts b/std/ws/mod.ts index 569936706..cec08888b 100644 --- a/std/ws/mod.ts +++ b/std/ws/mod.ts @@ -67,11 +67,14 @@ export interface WebSocketFrame { payload: Uint8Array; } -export interface WebSocket extends Reader, Writer { +export interface WebSocket + extends Reader, + Writer, + AsyncIterable<WebSocketEvent> { readonly conn: Conn; readonly isClosed: boolean; - receive(): AsyncIterableIterator<WebSocketEvent>; + [Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent>; /** * @throws `Deno.errors.ConnectionReset` @@ -228,7 +231,7 @@ class WebSocketImpl implements WebSocket { this.bufWriter = bufWriter || new BufWriter(conn); } - async *receive(): AsyncIterableIterator<WebSocketEvent> { + async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent> { let frames: WebSocketFrame[] = []; let payloadsLength = 0; while (!this._isClosed) { @@ -336,7 +339,7 @@ class WebSocketImpl implements WebSocket { } async read(p: Uint8Array): Promise<number | null> { - for await (const ev of this.receive()) { + for await (const ev of this) { if (ev instanceof Uint8Array) { return copyBytes(ev, p); } diff --git a/std/ws/test.ts b/std/ws/test.ts index c59202e89..27b07a315 100644 --- a/std/ws/test.ts +++ b/std/ws/test.ts @@ -355,7 +355,7 @@ test("[ws] WebSocket should throw `Deno.errors.ConnectionReset` when peer closed await assertThrowsAsync(() => sock.close(0), Deno.errors.ConnectionReset); }); -test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof` on recive()", async () => { +test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof`", async () => { const buf = new Buffer(); const eofReader: Deno.Reader = { read(_: Uint8Array): Promise<number | null> { @@ -364,7 +364,7 @@ test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof` on recive()", a }; const conn = dummyConn(eofReader, buf); const sock = createWebSocket({ conn }); - const it = sock.receive(); + const it = sock[Symbol.asyncIterator](); const { value, done } = await it.next(); assertEquals(value, undefined); assertEquals(done, true); @@ -456,7 +456,7 @@ test("[ws] WebSocket Reader should ignore non-message frames", async () => { ]); const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); - const close = new Uint8Array([0x88]); + const close = new Uint8Array([0x88, 0x02, 0x03, 0xe8]); const dataPayloadLength = 0x100; const dataArr = [0x82, 0x7e, 0x01, 0x00]; @@ -523,3 +523,55 @@ test("[ws] WebSocket Reader should ignore non-message frames", async () => { assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello"); assertEquals(p.subarray(helloLength), data.subarray(4)); }); + +test("[ws] WebSocket should act as asyncIterator", async () => { + const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); + const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]); + const close = new Uint8Array([0x88, 0x04, 0x03, 0xf3, 0x34, 0x32]); + + enum Frames { + ping, + hello, + close, + end, + } + + let frame = Frames.ping; + + const reader: Reader = { + read(p: Uint8Array): Promise<number | null> { + if (frame === Frames.ping) { + frame = Frames.hello; + p.set(pingHello); + return Promise.resolve(pingHello.byteLength); + } + + if (frame === Frames.hello) { + frame = Frames.close; + p.set(hello); + return Promise.resolve(hello.byteLength); + } + + if (frame === Frames.close) { + frame = Frames.end; + p.set(close); + return Promise.resolve(close.byteLength); + } + + return Promise.resolve(null); + }, + }; + + const conn = dummyConn(reader, new Buffer()); + const sock = createWebSocket({ conn }); + + const events = []; + for await (const wsEvent of sock) { + events.push(wsEvent); + } + + assertEquals(events.length, 3); + assertEquals(events[0], ["ping", encode("Hello")]); + assertEquals(events[1], "Hello"); + assertEquals(events[2], { code: 1011, reason: "42" }); +}); |