summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLeo Kettmeir <crowlkats@toaxl.com>2022-07-18 22:49:49 +0200
committerGitHub <noreply@github.com>2022-07-18 22:49:49 +0200
commit2eb27c92db38889a0a9e0e8e356ecfe55fcf433a (patch)
tree97a988872c7d7d9b2822224edcc14a155d478095
parent2bebdc9116f0824f0eb6241445de6fb1925f4c15 (diff)
fix: WebSocketStream ping event causes pending promises (#15235)
-rw-r--r--cli/tests/integration/mod.rs40
-rw-r--r--cli/tests/testdata/websocketstream_ping_test.ts5
-rw-r--r--ext/websocket/02_websocketstream.js74
3 files changed, 83 insertions, 36 deletions
diff --git a/cli/tests/integration/mod.rs b/cli/tests/integration/mod.rs
index 277b6a5d6..3101d8dc7 100644
--- a/cli/tests/integration/mod.rs
+++ b/cli/tests/integration/mod.rs
@@ -663,6 +663,46 @@ fn websocketstream() {
}
#[test]
+fn websocketstream_ping() {
+ use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite;
+ let _g = util::http_server();
+
+ let script = util::testdata_path().join("websocketstream_ping_test.ts");
+ let root_ca = util::testdata_path().join("tls/RootCA.pem");
+ let mut child = util::deno_cmd()
+ .arg("test")
+ .arg("--unstable")
+ .arg("--allow-net")
+ .arg("--cert")
+ .arg(root_ca)
+ .arg(script)
+ .stdout(std::process::Stdio::piped())
+ .spawn()
+ .unwrap();
+
+ let server = std::net::TcpListener::bind("127.0.0.1:4513").unwrap();
+ let (stream, _) = server.accept().unwrap();
+ let mut socket = tungstenite::accept(stream).unwrap();
+ socket
+ .write_message(tungstenite::Message::Text(String::from("A")))
+ .unwrap();
+ socket
+ .write_message(tungstenite::Message::Ping(vec![]))
+ .unwrap();
+ socket
+ .write_message(tungstenite::Message::Text(String::from("B")))
+ .unwrap();
+ let message = socket.read_message().unwrap();
+ assert_eq!(message, tungstenite::Message::Pong(vec![]));
+ socket
+ .write_message(tungstenite::Message::Text(String::from("C")))
+ .unwrap();
+ socket.close(None).unwrap();
+
+ assert!(child.wait().unwrap().success());
+}
+
+#[test]
fn websocket_server_multi_field_connection_header() {
let script = util::testdata_path()
.join("websocket_server_multi_field_connection_header_test.ts");
diff --git a/cli/tests/testdata/websocketstream_ping_test.ts b/cli/tests/testdata/websocketstream_ping_test.ts
new file mode 100644
index 000000000..12f847cd8
--- /dev/null
+++ b/cli/tests/testdata/websocketstream_ping_test.ts
@@ -0,0 +1,5 @@
+const wss = new WebSocketStream("ws://127.0.0.1:4513");
+const { readable } = await wss.connection;
+for await (const _ of readable) {
+ //
+}
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 50dfac284..df87c0c97 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -232,6 +232,43 @@
await this.closed;
},
});
+ const pull = async (controller) => {
+ const { kind, value } = await core.opAsync(
+ "op_ws_next_event",
+ this[_rid],
+ );
+
+ switch (kind) {
+ case "string": {
+ controller.enqueue(value);
+ break;
+ }
+ case "binary": {
+ controller.enqueue(value);
+ break;
+ }
+ case "ping": {
+ await core.opAsync("op_ws_send", this[_rid], {
+ kind: "pong",
+ });
+ await pull(controller);
+ break;
+ }
+ case "closed":
+ case "close": {
+ this[_closed].resolve(value);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ case "error": {
+ const err = new Error(value);
+ this[_closed].reject(err);
+ controller.error(err);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ }
+ };
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
@@ -250,42 +287,7 @@
}
});
},
- pull: async (controller) => {
- const { kind, value } = await core.opAsync(
- "op_ws_next_event",
- this[_rid],
- );
-
- switch (kind) {
- case "string": {
- controller.enqueue(value);
- break;
- }
- case "binary": {
- controller.enqueue(value);
- break;
- }
- case "ping": {
- await core.opAsync("op_ws_send", this[_rid], {
- kind: "pong",
- });
- break;
- }
- case "closed":
- case "close": {
- this[_closed].resolve(value);
- core.tryClose(this[_rid]);
- break;
- }
- case "error": {
- const err = new Error(value);
- this[_closed].reject(err);
- controller.error(err);
- core.tryClose(this[_rid]);
- break;
- }
- }
- },
+ pull,
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});