summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/testdata/websocketstream_test.ts132
-rw-r--r--ext/websocket/02_websocketstream.js32
2 files changed, 163 insertions, 1 deletions
diff --git a/cli/tests/testdata/websocketstream_test.ts b/cli/tests/testdata/websocketstream_test.ts
index db9d1a094..71969314e 100644
--- a/cli/tests/testdata/websocketstream_test.ts
+++ b/cli/tests/testdata/websocketstream_test.ts
@@ -201,3 +201,135 @@ Deno.test("forbidden headers", async () => {
await ws.closed;
listener.close();
});
+
+Deno.test("sync close with empty stream", async () => {
+ const listener = Deno.listen({ port: 4512 });
+ const promise = (async () => {
+ const conn = await listener.accept();
+ const httpConn = Deno.serveHttp(conn);
+ const { request, respondWith } = (await httpConn.nextRequest())!;
+ const { response, socket } = Deno.upgradeWebSocket(request);
+ const p = new Promise<void>((resolve) => {
+ socket.onopen = () => {
+ socket.send("first message");
+ socket.send("second message");
+ };
+ socket.onclose = () => resolve();
+ });
+ await respondWith(response);
+ await p;
+ })();
+
+ const ws = new WebSocketStream("ws://localhost:4512");
+ const { readable } = await ws.connection;
+ const reader = readable.getReader();
+ const firstMessage = await reader.read();
+ assertEquals(firstMessage.value, "first message");
+ const secondMessage = await reader.read();
+ assertEquals(secondMessage.value, "second message");
+ ws.close({ code: 1000 });
+ await ws.closed;
+ await promise;
+ listener.close();
+});
+
+Deno.test("sync close with unread messages in stream", async () => {
+ const listener = Deno.listen({ port: 4512 });
+ const promise = (async () => {
+ const conn = await listener.accept();
+ const httpConn = Deno.serveHttp(conn);
+ const { request, respondWith } = (await httpConn.nextRequest())!;
+ const { response, socket } = Deno.upgradeWebSocket(request);
+ const p = new Promise<void>((resolve) => {
+ socket.onopen = () => {
+ socket.send("first message");
+ socket.send("second message");
+ socket.send("third message");
+ socket.send("fourth message");
+ };
+ socket.onclose = () => resolve();
+ });
+ await respondWith(response);
+ await p;
+ })();
+
+ const ws = new WebSocketStream("ws://localhost:4512");
+ const { readable } = await ws.connection;
+ const reader = readable.getReader();
+ const firstMessage = await reader.read();
+ assertEquals(firstMessage.value, "first message");
+ const secondMessage = await reader.read();
+ assertEquals(secondMessage.value, "second message");
+ ws.close({ code: 1000 });
+ await ws.closed;
+ await promise;
+ listener.close();
+});
+
+Deno.test("async close with empty stream", async () => {
+ const listener = Deno.listen({ port: 4512 });
+ const promise = (async () => {
+ const conn = await listener.accept();
+ const httpConn = Deno.serveHttp(conn);
+ const { request, respondWith } = (await httpConn.nextRequest())!;
+ const { response, socket } = Deno.upgradeWebSocket(request);
+ const p = new Promise<void>((resolve) => {
+ socket.onopen = () => {
+ socket.send("first message");
+ socket.send("second message");
+ };
+ socket.onclose = () => resolve();
+ });
+ await respondWith(response);
+ await p;
+ })();
+
+ const ws = new WebSocketStream("ws://localhost:4512");
+ const { readable } = await ws.connection;
+ const reader = readable.getReader();
+ const firstMessage = await reader.read();
+ assertEquals(firstMessage.value, "first message");
+ const secondMessage = await reader.read();
+ assertEquals(secondMessage.value, "second message");
+ setTimeout(() => {
+ ws.close({ code: 1000 });
+ }, 0);
+ await ws.closed;
+ await promise;
+ listener.close();
+});
+
+Deno.test("async close with unread messages in stream", async () => {
+ const listener = Deno.listen({ port: 4512 });
+ const promise = (async () => {
+ const conn = await listener.accept();
+ const httpConn = Deno.serveHttp(conn);
+ const { request, respondWith } = (await httpConn.nextRequest())!;
+ const { response, socket } = Deno.upgradeWebSocket(request);
+ const p = new Promise<void>((resolve) => {
+ socket.onopen = () => {
+ socket.send("first message");
+ socket.send("second message");
+ socket.send("third message");
+ socket.send("fourth message");
+ };
+ socket.onclose = () => resolve();
+ });
+ await respondWith(response);
+ await p;
+ })();
+
+ const ws = new WebSocketStream("ws://localhost:4512");
+ const { readable } = await ws.connection;
+ const reader = readable.getReader();
+ const firstMessage = await reader.read();
+ assertEquals(firstMessage.value, "first message");
+ const secondMessage = await reader.read();
+ assertEquals(secondMessage.value, "second message");
+ setTimeout(() => {
+ ws.close({ code: 1000 });
+ }, 0);
+ await ws.closed;
+ await promise;
+ listener.close();
+});
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 5266c8dfb..cf83fe4c7 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -64,11 +64,14 @@
],
);
+ const CLOSE_RESPONSE_TIMEOUT = 5000;
+
const _rid = Symbol("[[rid]]");
const _url = Symbol("[[url]]");
const _connection = Symbol("[[connection]]");
const _closed = Symbol("[[closed]]");
const _earlyClose = Symbol("[[earlyClose]]");
+ const _closeSent = Symbol("[[closeSent]]");
class WebSocketStream {
[_rid];
@@ -268,6 +271,21 @@
break;
}
}
+
+ if (
+ this[_closeSent].state === "fulfilled" &&
+ this[_closed].state === "pending"
+ ) {
+ if (
+ new Date().getTime() - await this[_closeSent].promise <=
+ CLOSE_RESPONSE_TIMEOUT
+ ) {
+ return pull(controller);
+ }
+
+ this[_closed].resolve(value);
+ core.tryClose(this[_rid]);
+ }
};
const readable = new ReadableStream({
start: (controller) => {
@@ -286,6 +304,12 @@
// needed to ignore warnings & assertions
}
});
+
+ PromisePrototypeThen(this[_closeSent].promise, () => {
+ if (this[_closed].state === "pending") {
+ return pull(controller);
+ }
+ });
},
pull,
cancel: async (reason) => {
@@ -328,6 +352,7 @@
[_earlyClose] = false;
[_closed] = new Deferred();
+ [_closeSent] = new Deferred();
get closed() {
webidl.assertBranded(this, WebSocketStreamPrototype);
return this[_closed].promise;
@@ -369,8 +394,13 @@
if (this[_connection].state === "pending") {
this[_earlyClose] = true;
} else if (this[_closed].state === "pending") {
- PromisePrototypeCatch(
+ PromisePrototypeThen(
core.opAsync("op_ws_close", this[_rid], code, closeInfo.reason),
+ () => {
+ setTimeout(() => {
+ this[_closeSent].resolve(new Date().getTime());
+ }, 0);
+ },
(err) => {
this[_rid] && core.tryClose(this[_rid]);
this[_closed].reject(err);