summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrey Trebler <at@edrilling.no>2020-05-04 18:27:06 +0200
committerGitHub <noreply@github.com>2020-05-04 12:27:06 -0400
commit796fc9bc3e8e4da7d53fb4617511ce4e2be22485 (patch)
treed2c140fcfb95a102b4b7b241f8edfe8fc7a971fe
parent38ecabf205336c2cf51f2a18919da3dcb1a7db97 (diff)
BREAKING: make WebSocket directly implement AsyncIterable (#5044) (#5045)
-rw-r--r--std/examples/chat/server.ts2
-rw-r--r--std/examples/chat/server_test.ts3
-rw-r--r--std/ws/README.md181
-rw-r--r--std/ws/example_client.ts75
-rw-r--r--std/ws/example_server.ts83
-rw-r--r--std/ws/mod.ts11
-rw-r--r--std/ws/test.ts58
7 files changed, 229 insertions, 184 deletions
diff --git a/std/examples/chat/server.ts b/std/examples/chat/server.ts
index a86ed737a..cb1be530e 100644
--- a/std/examples/chat/server.ts
+++ b/std/examples/chat/server.ts
@@ -18,7 +18,7 @@ async function wsHandler(ws: WebSocket): Promise<void> {
const id = ++clientId;
clients.set(id, ws);
dispatch(`Connected: [${id}]`);
- for await (const msg of ws.receive()) {
+ for await (const msg of ws) {
console.log(`msg:${id}`, msg);
if (typeof msg === "string") {
dispatch(`[${id}]: ${msg}`);
diff --git a/std/examples/chat/server_test.ts b/std/examples/chat/server_test.ts
index d1c1a8afa..92eb50f92 100644
--- a/std/examples/chat/server_test.ts
+++ b/std/examples/chat/server_test.ts
@@ -59,7 +59,8 @@ test({
let ws: WebSocket | undefined;
try {
ws = await connectWebSocket("http://127.0.0.1:8080/ws");
- const it = ws.receive();
+ const it = ws[Symbol.asyncIterator]();
+
assertEquals((await it.next()).value, "Connected: [1]");
ws.send("Hello");
assertEquals((await it.next()).value, "[1]: Hello");
diff --git a/std/ws/README.md b/std/ws/README.md
index 5dcb9ce90..e4d924df6 100644
--- a/std/ws/README.md
+++ b/std/ws/README.md
@@ -12,57 +12,54 @@ import {
acceptWebSocket,
isWebSocketCloseEvent,
isWebSocketPingEvent,
- WebSocket,
} from "https://deno.land/std/ws/mod.ts";
/** websocket echo server */
const port = Deno.args[0] || "8080";
console.log(`websocket server is running on :${port}`);
for await (const req of serve(`:${port}`)) {
- const { headers, conn } = req;
- acceptWebSocket({
- conn,
- headers,
- bufReader: req.r,
- bufWriter: req.w,
- })
- .then(
- async (sock: WebSocket): Promise<void> => {
- console.log("socket connected!");
- const it = sock.receive();
- while (true) {
- try {
- const { done, value } = await it.next();
- if (done) {
- break;
- }
- const ev = value;
- if (typeof ev === "string") {
- // text message
- console.log("ws:Text", ev);
- await sock.send(ev);
- } else if (ev instanceof Uint8Array) {
- // binary message
- console.log("ws:Binary", ev);
- } else if (isWebSocketPingEvent(ev)) {
- const [, body] = ev;
- // ping
- console.log("ws:Ping", body);
- } else if (isWebSocketCloseEvent(ev)) {
- // close
- const { code, reason } = ev;
- console.log("ws:Close", code, reason);
- }
- } catch (e) {
- console.error(`failed to receive frame: ${e}`);
- await sock.close(1000).catch(console.error);
- }
+ const { conn, r: bufReader, w: bufWriter, headers } = req;
+
+ try {
+ const sock = await acceptWebSocket({
+ conn,
+ bufReader,
+ bufWriter,
+ headers,
+ });
+
+ console.log("socket connected!");
+
+ try {
+ for await (const ev of sock) {
+ if (typeof ev === "string") {
+ // text message
+ console.log("ws:Text", ev);
+ await sock.send(ev);
+ } else if (ev instanceof Uint8Array) {
+ // binary message
+ console.log("ws:Binary", ev);
+ } else if (isWebSocketPingEvent(ev)) {
+ const [, body] = ev;
+ // ping
+ console.log("ws:Ping", body);
+ } else if (isWebSocketCloseEvent(ev)) {
+ // close
+ const { code, reason } = ev;
+ console.log("ws:Close", code, reason);
}
}
- )
- .catch((err: Error): void => {
- console.error(`failed to accept websocket: ${err}`);
- });
+ } catch (err) {
+ console.error(`failed to receive frame: ${err}`);
+
+ if (!sock.isClosed) {
+ await sock.close(1000).catch(console.error);
+ }
+ }
+ } catch (err) {
+ console.error(`failed to accept websocket: ${err}`);
+ await req.respond({ status: 400 });
+ }
}
```
@@ -75,51 +72,58 @@ import {
isWebSocketPingEvent,
isWebSocketPongEvent,
} from "https://deno.land/std/ws/mod.ts";
-import { encode } from "https://deno.land/std/strings/mod.ts";
+import { encode } from "https://deno.land/std/encoding/utf8.ts";
import { BufReader } from "https://deno.land/std/io/bufio.ts";
import { TextProtoReader } from "https://deno.land/std/textproto/mod.ts";
import { blue, green, red, yellow } from "https://deno.land/std/fmt/colors.ts";
const endpoint = Deno.args[0] || "ws://127.0.0.1:8080";
/** simple websocket cli */
-const sock = await connectWebSocket(endpoint);
-console.log(green("ws connected! (type 'close' to quit)"));
-(async function (): Promise<void> {
- for await (const msg of sock.receive()) {
- if (typeof msg === "string") {
- console.log(yellow("< " + msg));
- } else if (isWebSocketPingEvent(msg)) {
- console.log(blue("< ping"));
- } else if (isWebSocketPongEvent(msg)) {
- console.log(blue("< pong"));
- } else if (isWebSocketCloseEvent(msg)) {
- console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
+try {
+ const sock = await connectWebSocket(endpoint);
+ console.log(green("ws connected! (type 'close' to quit)"));
+
+ const messages = async (): Promise<void> => {
+ for await (const msg of sock) {
+ if (typeof msg === "string") {
+ console.log(yellow(`< ${msg}`));
+ } else if (isWebSocketPingEvent(msg)) {
+ console.log(blue("< ping"));
+ } else if (isWebSocketPongEvent(msg)) {
+ console.log(blue("< pong"));
+ } else if (isWebSocketCloseEvent(msg)) {
+ console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
+ }
}
+ };
+
+ const cli = async (): Promise<void> => {
+ const tpr = new TextProtoReader(new BufReader(Deno.stdin));
+ while (true) {
+ await Deno.stdout.write(encode("> "));
+ const line = await tpr.readLine();
+ if (line === null) {
+ break;
+ }
+ if (line === "close") {
+ break;
+ } else if (line === "ping") {
+ await sock.ping();
+ } else {
+ await sock.send(line);
+ }
+ }
+ };
+
+ await Promise.race([messages(), cli()]).catch(console.error);
+
+ if (!sock.isClosed) {
+ await sock.close(1000).catch(console.error);
}
-})();
-
-const tpr = new TextProtoReader(new BufReader(Deno.stdin));
-while (true) {
- await Deno.stdout.write(encode("> "));
- const line = await tpr.readLine();
- if (line === null) {
- break;
- }
- if (line === "close") {
- break;
- } else if (line === "ping") {
- await sock.ping();
- } else {
- await sock.send(line);
- }
- // FIXME: Without this,
- // sock.receive() won't resolved though it is readable...
- await new Promise((resolve): void => {
- setTimeout(resolve, 0);
- });
+} catch (err) {
+ console.error(red(`Could not connect to WebSocket: '${err}'`));
}
-await sock.close(1000);
-// FIXME: conn.close() won't shutdown process...
+
Deno.exit(0);
```
@@ -137,25 +141,6 @@ Returns true if input value is a WebSocketPingEvent, false otherwise.
Returns true if input value is a WebSocketPongEvent, false otherwise.
-### append
-
-This module is used to merge two Uint8Arrays.
-
-- note: This module might move to common/util.
-
-```ts
-import { append } from "https://deno.land/std/ws/mod.ts";
-
-// a = [1], b = [2]
-append(a, b); // output: [1, 2]
-
-// a = [1], b = null
-append(a, b); // output: [1]
-
-// a = [], b = [2]
-append(a, b); // output: [2]
-```
-
### unmask
Unmask masked WebSocket payload.
diff --git a/std/ws/example_client.ts b/std/ws/example_client.ts
index d680c6fef..4213025f4 100644
--- a/std/ws/example_client.ts
+++ b/std/ws/example_client.ts
@@ -11,42 +11,49 @@ import { blue, green, red, yellow } from "../fmt/colors.ts";
const endpoint = Deno.args[0] || "ws://127.0.0.1:8080";
/** simple websocket cli */
-const sock = await connectWebSocket(endpoint);
-console.log(green("ws connected! (type 'close' to quit)"));
-(async function (): Promise<void> {
- for await (const msg of sock.receive()) {
- if (typeof msg === "string") {
- console.log(yellow("< " + msg));
- } else if (isWebSocketPingEvent(msg)) {
- console.log(blue("< ping"));
- } else if (isWebSocketPongEvent(msg)) {
- console.log(blue("< pong"));
- } else if (isWebSocketCloseEvent(msg)) {
- console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
+try {
+ const sock = await connectWebSocket(endpoint);
+ console.log(green("ws connected! (type 'close' to quit)"));
+
+ const messages = async (): Promise<void> => {
+ for await (const msg of sock) {
+ if (typeof msg === "string") {
+ console.log(yellow(`< ${msg}`));
+ } else if (isWebSocketPingEvent(msg)) {
+ console.log(blue("< ping"));
+ } else if (isWebSocketPongEvent(msg)) {
+ console.log(blue("< pong"));
+ } else if (isWebSocketCloseEvent(msg)) {
+ console.log(red(`closed: code=${msg.code}, reason=${msg.reason}`));
+ }
}
- }
-})();
+ };
-const tpr = new TextProtoReader(new BufReader(Deno.stdin));
-while (true) {
- await Deno.stdout.write(encode("> "));
- const line = await tpr.readLine();
- if (line === null) {
- break;
- }
- if (line === "close") {
- break;
- } else if (line === "ping") {
- await sock.ping();
- } else {
- await sock.send(line);
+ const cli = async (): Promise<void> => {
+ const tpr = new TextProtoReader(new BufReader(Deno.stdin));
+ while (true) {
+ await Deno.stdout.write(encode("> "));
+ const line = await tpr.readLine();
+ if (line === null) {
+ break;
+ }
+ if (line === "close") {
+ break;
+ } else if (line === "ping") {
+ await sock.ping();
+ } else {
+ await sock.send(line);
+ }
+ }
+ };
+
+ await Promise.race([messages(), cli()]).catch(console.error);
+
+ if (!sock.isClosed) {
+ await sock.close(1000).catch(console.error);
}
- // FIXME: Without this,
- // sock.receive() won't resolved though it is readable...
- await new Promise((resolve): void => {
- setTimeout(resolve, 0);
- });
+} catch (err) {
+ console.error(red(`Could not connect to WebSocket: '${err}'`));
}
-await sock.close(1000);
-// FIXME: conn.close() won't shutdown process...
+
Deno.exit(0);
diff --git a/std/ws/example_server.ts b/std/ws/example_server.ts
index 048198326..947b807ca 100644
--- a/std/ws/example_server.ts
+++ b/std/ws/example_server.ts
@@ -4,55 +4,52 @@ import {
acceptWebSocket,
isWebSocketCloseEvent,
isWebSocketPingEvent,
- WebSocket,
} from "./mod.ts";
/** websocket echo server */
const port = Deno.args[0] || "8080";
console.log(`websocket server is running on :${port}`);
for await (const req of serve(`:${port}`)) {
- const { headers, conn } = req;
- acceptWebSocket({
- conn,
- headers,
- bufReader: req.r,
- bufWriter: req.w,
- })
- .then(
- async (sock: WebSocket): Promise<void> => {
- console.log("socket connected!");
- const it = sock.receive();
- while (true) {
- try {
- const { done, value } = await it.next();
- if (done) {
- break;
- }
- const ev = value;
- if (typeof ev === "string") {
- // text message
- console.log("ws:Text", ev);
- await sock.send(ev);
- } else if (ev instanceof Uint8Array) {
- // binary message
- console.log("ws:Binary", ev);
- } else if (isWebSocketPingEvent(ev)) {
- const [, body] = ev;
- // ping
- console.log("ws:Ping", body);
- } else if (isWebSocketCloseEvent(ev)) {
- // close
- const { code, reason } = ev;
- console.log("ws:Close", code, reason);
- }
- } catch (e) {
- console.error(`failed to receive frame: ${e}`);
- await sock.close(1000).catch(console.error);
- }
+ const { conn, r: bufReader, w: bufWriter, headers } = req;
+
+ try {
+ const sock = await acceptWebSocket({
+ conn,
+ bufReader,
+ bufWriter,
+ headers,
+ });
+
+ console.log("socket connected!");
+
+ try {
+ for await (const ev of sock) {
+ if (typeof ev === "string") {
+ // text message
+ console.log("ws:Text", ev);
+ await sock.send(ev);
+ } else if (ev instanceof Uint8Array) {
+ // binary message
+ console.log("ws:Binary", ev);
+ } else if (isWebSocketPingEvent(ev)) {
+ const [, body] = ev;
+ // ping
+ console.log("ws:Ping", body);
+ } else if (isWebSocketCloseEvent(ev)) {
+ // close
+ const { code, reason } = ev;
+ console.log("ws:Close", code, reason);
}
}
- )
- .catch((err: Error): void => {
- console.error(`failed to accept websocket: ${err}`);
- });
+ } catch (err) {
+ console.error(`failed to receive frame: ${err}`);
+
+ if (!sock.isClosed) {
+ await sock.close(1000).catch(console.error);
+ }
+ }
+ } catch (err) {
+ console.error(`failed to accept websocket: ${err}`);
+ await req.respond({ status: 400 });
+ }
}
diff --git a/std/ws/mod.ts b/std/ws/mod.ts
index 569936706..cec08888b 100644
--- a/std/ws/mod.ts
+++ b/std/ws/mod.ts
@@ -67,11 +67,14 @@ export interface WebSocketFrame {
payload: Uint8Array;
}
-export interface WebSocket extends Reader, Writer {
+export interface WebSocket
+ extends Reader,
+ Writer,
+ AsyncIterable<WebSocketEvent> {
readonly conn: Conn;
readonly isClosed: boolean;
- receive(): AsyncIterableIterator<WebSocketEvent>;
+ [Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent>;
/**
* @throws `Deno.errors.ConnectionReset`
@@ -228,7 +231,7 @@ class WebSocketImpl implements WebSocket {
this.bufWriter = bufWriter || new BufWriter(conn);
}
- async *receive(): AsyncIterableIterator<WebSocketEvent> {
+ async *[Symbol.asyncIterator](): AsyncIterableIterator<WebSocketEvent> {
let frames: WebSocketFrame[] = [];
let payloadsLength = 0;
while (!this._isClosed) {
@@ -336,7 +339,7 @@ class WebSocketImpl implements WebSocket {
}
async read(p: Uint8Array): Promise<number | null> {
- for await (const ev of this.receive()) {
+ for await (const ev of this) {
if (ev instanceof Uint8Array) {
return copyBytes(ev, p);
}
diff --git a/std/ws/test.ts b/std/ws/test.ts
index c59202e89..27b07a315 100644
--- a/std/ws/test.ts
+++ b/std/ws/test.ts
@@ -355,7 +355,7 @@ test("[ws] WebSocket should throw `Deno.errors.ConnectionReset` when peer closed
await assertThrowsAsync(() => sock.close(0), Deno.errors.ConnectionReset);
});
-test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof` on recive()", async () => {
+test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof`", async () => {
const buf = new Buffer();
const eofReader: Deno.Reader = {
read(_: Uint8Array): Promise<number | null> {
@@ -364,7 +364,7 @@ test("[ws] WebSocket shouldn't throw `Deno.errors.UnexpectedEof` on recive()", a
};
const conn = dummyConn(eofReader, buf);
const sock = createWebSocket({ conn });
- const it = sock.receive();
+ const it = sock[Symbol.asyncIterator]();
const { value, done } = await it.next();
assertEquals(value, undefined);
assertEquals(done, true);
@@ -456,7 +456,7 @@ test("[ws] WebSocket Reader should ignore non-message frames", async () => {
]);
const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
- const close = new Uint8Array([0x88]);
+ const close = new Uint8Array([0x88, 0x02, 0x03, 0xe8]);
const dataPayloadLength = 0x100;
const dataArr = [0x82, 0x7e, 0x01, 0x00];
@@ -523,3 +523,55 @@ test("[ws] WebSocket Reader should ignore non-message frames", async () => {
assertEquals(decode(new Buffer(p.subarray(0, helloLength)).bytes()), "Hello");
assertEquals(p.subarray(helloLength), data.subarray(4));
});
+
+test("[ws] WebSocket should act as asyncIterator", async () => {
+ const pingHello = new Uint8Array([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
+ const hello = new Uint8Array([0x81, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f]);
+ const close = new Uint8Array([0x88, 0x04, 0x03, 0xf3, 0x34, 0x32]);
+
+ enum Frames {
+ ping,
+ hello,
+ close,
+ end,
+ }
+
+ let frame = Frames.ping;
+
+ const reader: Reader = {
+ read(p: Uint8Array): Promise<number | null> {
+ if (frame === Frames.ping) {
+ frame = Frames.hello;
+ p.set(pingHello);
+ return Promise.resolve(pingHello.byteLength);
+ }
+
+ if (frame === Frames.hello) {
+ frame = Frames.close;
+ p.set(hello);
+ return Promise.resolve(hello.byteLength);
+ }
+
+ if (frame === Frames.close) {
+ frame = Frames.end;
+ p.set(close);
+ return Promise.resolve(close.byteLength);
+ }
+
+ return Promise.resolve(null);
+ },
+ };
+
+ const conn = dummyConn(reader, new Buffer());
+ const sock = createWebSocket({ conn });
+
+ const events = [];
+ for await (const wsEvent of sock) {
+ events.push(wsEvent);
+ }
+
+ assertEquals(events.length, 3);
+ assertEquals(events[0], ["ping", encode("Hello")]);
+ assertEquals(events[1], "Hello");
+ assertEquals(events[2], { code: 1011, reason: "42" });
+});