summaryrefslogtreecommitdiff
path: root/ext/websocket
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-03-31 10:34:12 +0530
committerGitHub <noreply@github.com>2023-03-31 10:34:12 +0530
commit0f41aff1d9f6c131ff69d6dd1dc053afb008a19d (patch)
treed4b21f4519610d22fa0f83214fa05c1af3415ffb /ext/websocket
parent772201449713fbefad6c42b9ce545a5bb2d7499b (diff)
perf(ext/websocket): efficient event kind serialization (#18509)
Use u16 to represent the kind of event (0 - 6) & event code > 6 is treated as the close code. This way we can represent all events + the close code in a single JS number. This is safe because (as per RFC 6455) close code from 0-999 are reserved & not used. | name | avg msg/sec/core | | --- | --- | | deno_main | `127820.750000` | | deno #18506 | `140079.000000` | | deno #18506 + this | `150104.250000` |
Diffstat (limited to 'ext/websocket')
-rw-r--r--ext/websocket/01_websocket.js57
-rw-r--r--ext/websocket/02_websocketstream.js43
-rw-r--r--ext/websocket/lib.rs65
3 files changed, 102 insertions, 63 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index 06eb08b60..03a6427c2 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -394,13 +394,14 @@ class WebSocket extends EventTarget {
async [_eventLoop]() {
while (this[_readyState] !== CLOSED) {
- const { kind, value } = await core.opAsync(
+ const { 0: kind, 1: value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
switch (kind) {
- case "string": {
+ case 0: {
+ /* string */
this[_serverHandleIdleTimeout]();
const event = new MessageEvent("message", {
data: value,
@@ -409,14 +410,15 @@ class WebSocket extends EventTarget {
this.dispatchEvent(event);
break;
}
- case "binary": {
+ case 1: {
+ /* binary */
this[_serverHandleIdleTimeout]();
let data;
if (this.binaryType === "blob") {
data = new Blob([value]);
} else {
- data = value.buffer;
+ data = value;
}
const event = new MessageEvent("message", {
@@ -427,12 +429,32 @@ class WebSocket extends EventTarget {
this.dispatchEvent(event);
break;
}
- case "pong": {
+ case 2: {
+ /* pong */
this[_serverHandleIdleTimeout]();
break;
}
- case "closed":
- case "close": {
+ case 5: {
+ /* error */
+ this[_readyState] = CLOSED;
+
+ const errorEv = new ErrorEvent("error", {
+ message: value,
+ });
+ this.dispatchEvent(errorEv);
+
+ const closeEv = new CloseEvent("close");
+ this.dispatchEvent(closeEv);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ case 3: {
+ /* ping */
+ break;
+ }
+ default: {
+ /* close */
+ const code = kind;
const prevState = this[_readyState];
this[_readyState] = CLOSED;
clearTimeout(this[_idleTimeoutTimeout]);
@@ -442,8 +464,8 @@ class WebSocket extends EventTarget {
await core.opAsync(
"op_ws_close",
this[_rid],
- value.code,
- value.reason,
+ code,
+ value,
);
} catch {
// ignore failures
@@ -452,26 +474,13 @@ class WebSocket extends EventTarget {
const event = new CloseEvent("close", {
wasClean: true,
- code: value.code,
- reason: value.reason,
+ code: code,
+ reason: value,
});
this.dispatchEvent(event);
core.tryClose(this[_rid]);
break;
}
- case "error": {
- this[_readyState] = CLOSED;
-
- const errorEv = new ErrorEvent("error", {
- message: value,
- });
- this.dispatchEvent(errorEv);
-
- const closeEv = new CloseEvent("close");
- this.dispatchEvent(closeEv);
- core.tryClose(this[_rid]);
- break;
- }
}
}
}
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 0a3aeb192..6e487f0b7 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -167,12 +167,13 @@ class WebSocketStream {
PromisePrototypeThen(
(async () => {
while (true) {
- const { kind } = await core.opAsync(
+ const { 0: kind } = await core.opAsync(
"op_ws_next_event",
create.rid,
);
- if (kind === "close") {
+ if (kind > 6) {
+ /* close */
break;
}
}
@@ -237,37 +238,51 @@ class WebSocketStream {
},
});
const pull = async (controller) => {
- const { kind, value } = await core.opAsync(
+ const { 0: kind, 1: value } = await core.opAsync(
"op_ws_next_event",
this[_rid],
);
switch (kind) {
- case "string": {
+ case 0:
+ case 1: {
+ /* string */
+ /* binary */
controller.enqueue(value);
break;
}
- case "binary": {
- controller.enqueue(value);
+ case 5: {
+ /* error */
+ const err = new Error(value);
+ this[_closed].reject(err);
+ controller.error(err);
+ core.tryClose(this[_rid]);
break;
}
- case "ping": {
+ case 3: {
+ /* ping */
await core.opAsync("op_ws_send", this[_rid], {
kind: "pong",
});
await pull(controller);
break;
}
- case "closed":
- case "close": {
- this[_closed].resolve(value);
+ case 2: {
+ /* pong */
+ break;
+ }
+ case 6: {
+ /* closed */
+ this[_closed].resolve(undefined);
core.tryClose(this[_rid]);
break;
}
- case "error": {
- const err = new Error(value);
- this[_closed].reject(err);
- controller.error(err);
+ default: {
+ /* close */
+ this[_closed].resolve({
+ code: kind,
+ reason: value,
+ });
core.tryClose(this[_rid]);
break;
}
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 8d3cb20d2..1c586b383 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream;
use deno_core::futures::SinkExt;
use deno_core::futures::StreamExt;
use deno_core::op;
+use deno_core::StringOrBuffer;
use deno_core::url;
use deno_core::AsyncRefCell;
@@ -475,23 +476,21 @@ pub async fn op_ws_close(
Ok(())
}
-#[derive(Serialize)]
-#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
-pub enum NextEventResponse {
- String(String),
- Binary(ZeroCopyBuf),
- Close { code: u16, reason: String },
- Ping,
- Pong,
- Error(String),
- Closed,
+#[repr(u16)]
+pub enum MessageKind {
+ Text = 0,
+ Binary = 1,
+ Pong = 2,
+ Ping = 3,
+ Error = 5,
+ Closed = 6,
}
#[op]
pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
-) -> Result<NextEventResponse, AnyError> {
+) -> Result<(u16, StringOrBuffer), AnyError> {
let resource = state
.borrow_mut()
.resource_table
@@ -500,24 +499,40 @@ pub async fn op_ws_next_event(
let cancel = RcRef::map(&resource, |r| &r.cancel);
let val = resource.next_message(cancel).await?;
let res = match val {
- Some(Ok(Message::Text(text))) => NextEventResponse::String(text),
- Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()),
- Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close {
- code: frame.code.into(),
- reason: frame.reason.to_string(),
- },
- Some(Ok(Message::Close(None))) => NextEventResponse::Close {
- code: 1005,
- reason: String::new(),
- },
- Some(Ok(Message::Ping(_))) => NextEventResponse::Ping,
- Some(Ok(Message::Pong(_))) => NextEventResponse::Pong,
- Some(Err(e)) => NextEventResponse::Error(e.to_string()),
+ Some(Ok(Message::Text(text))) => {
+ (MessageKind::Text as u16, StringOrBuffer::String(text))
+ }
+ Some(Ok(Message::Binary(data))) => (
+ MessageKind::Binary as u16,
+ StringOrBuffer::Buffer(data.into()),
+ ),
+ Some(Ok(Message::Close(Some(frame)))) => (
+ frame.code.into(),
+ StringOrBuffer::String(frame.reason.to_string()),
+ ),
+ Some(Ok(Message::Close(None))) => {
+ (1005, StringOrBuffer::String("".to_string()))
+ }
+ Some(Ok(Message::Ping(_))) => (
+ MessageKind::Ping as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ),
+ Some(Ok(Message::Pong(_))) => (
+ MessageKind::Pong as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ),
+ Some(Err(e)) => (
+ MessageKind::Error as u16,
+ StringOrBuffer::String(e.to_string()),
+ ),
None => {
// No message was received, presumably the socket closed while we waited.
// Try close the stream, ignoring any errors, and report closed status to JavaScript.
let _ = state.borrow_mut().resource_table.close(rid);
- NextEventResponse::Closed
+ (
+ MessageKind::Closed as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ )
}
};
Ok(res)