summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/node/ops/http.rs1
-rw-r--r--tests/unit_node/http_test.ts68
2 files changed, 69 insertions, 0 deletions
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index 4b1f99ec0..773902ded 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -272,6 +272,7 @@ pub async fn op_node_http_fetch_response_upgrade(
loop {
let read = upgraded_rx.read(&mut buf).await?;
if read == 0 {
+ read_tx.shutdown().await?;
break;
}
read_tx.write_all(&buf[..read]).await?;
diff --git a/tests/unit_node/http_test.ts b/tests/unit_node/http_test.ts
index 7f5e74bf5..f85b1466b 100644
--- a/tests/unit_node/http_test.ts
+++ b/tests/unit_node/http_test.ts
@@ -13,6 +13,7 @@ import { text } from "node:stream/consumers";
import { assert, assertEquals, fail } from "@std/assert";
import { assertSpyCalls, spy } from "@std/testing/mock";
import { fromFileUrl, relative } from "@std/path";
+import { retry } from "@std/async/retry";
import { gzip } from "node:zlib";
import { Buffer } from "node:buffer";
@@ -1604,3 +1605,70 @@ Deno.test("[node/http] In ClientRequest, option.hostname has precedence over opt
await responseReceived.promise;
});
+
+Deno.test("[node/http] upgraded socket closes when the server closed without closing handshake", async () => {
+ const clientSocketClosed = Promise.withResolvers<void>();
+ const serverProcessClosed = Promise.withResolvers<void>();
+
+ // Uses the server in different process to shutdown it without closing handshake
+ const server = `
+ Deno.serve({ port: 1337 }, (req) => {
+ if (req.headers.get("upgrade") != "websocket") {
+ return new Response("ok");
+ }
+ console.log("upgrade on server");
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.addEventListener("message", (event) => {
+ console.log("server received", event.data);
+ socket.send("pong");
+ });
+ return response;
+ });
+ `;
+
+ const p = new Deno.Command("deno", { args: ["eval", server] }).spawn();
+
+ // Wait for the server to respond
+ await retry(async () => {
+ const resp = await fetch("http://localhost:1337");
+ const _text = await resp.text();
+ });
+
+ const options = {
+ port: 1337,
+ host: "127.0.0.1",
+ headers: {
+ "Connection": "Upgrade",
+ "Upgrade": "websocket",
+ "Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
+ },
+ };
+
+ http.request(options).on("upgrade", (_res, socket) => {
+ socket.on("close", () => {
+ console.log("client socket closed");
+ clientSocketClosed.resolve();
+ });
+ socket.on("data", async (data) => {
+ // receives pong message
+ assertEquals(data, Buffer.from("8104706f6e67", "hex"));
+
+ p.kill();
+ await p.status;
+
+ console.log("process closed");
+ serverProcessClosed.resolve();
+
+ // sending some additional message
+ socket.write(Buffer.from("81847de88e01", "hex"));
+ socket.write(Buffer.from("0d81e066", "hex"));
+ });
+
+ // sending ping message
+ socket.write(Buffer.from("81847de88e01", "hex"));
+ socket.write(Buffer.from("0d81e066", "hex"));
+ }).end();
+
+ await clientSocketClosed.promise;
+ await serverProcessClosed.promise;
+});