summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-10-19 16:23:13 +0530
committerGitHub <noreply@github.com>2022-10-19 16:23:13 +0530
commit36307c45e95b599eb01bf53df161973a7ef8b58e (patch)
treeec280d9b7e80bc78127e85c60a988fae8fb7bdb7
parent57f17bd3e6a8e30d13443e6f6638999f0f3cce3e (diff)
perf(ext/websocket): optimize socket.send (#16320)
Towards #16315
-rw-r--r--ext/websocket/01_websocket.js74
-rw-r--r--ext/websocket/lib.rs97
2 files changed, 143 insertions, 28 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index f7bd820c0..e4774f815 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -18,20 +18,20 @@
ArrayPrototypeJoin,
ArrayPrototypeMap,
ArrayPrototypeSome,
- DataView,
ErrorPrototypeToString,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeThen,
RegExpPrototypeTest,
Set,
- String,
StringPrototypeEndsWith,
StringPrototypeToLowerCase,
Symbol,
SymbolIterator,
PromisePrototypeCatch,
+ queueMicrotask,
SymbolFor,
+ Uint8Array,
} = window.__bootstrap.primordials;
webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => {
@@ -290,40 +290,58 @@
throw new DOMException("readyState not OPEN", "InvalidStateError");
}
+ if (typeof data === "string") {
+ // try to send in one go!
+ const d = core.byteLength(data);
+ const sent = ops.op_ws_try_send_string(this[_rid], data);
+ this[_bufferedAmount] += d;
+ if (!sent) {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_send_string", this[_rid], data),
+ () => {
+ this[_bufferedAmount] -= d;
+ },
+ );
+ } else {
+ // Spec expects data to be start flushing on next tick but oh well...
+ // we already sent it so we can just decrement the bufferedAmount
+ // on the next tick.
+ queueMicrotask(() => {
+ this[_bufferedAmount] -= d;
+ });
+ }
+ return;
+ }
+
const sendTypedArray = (ta) => {
+ // try to send in one go!
+ const sent = ops.op_ws_try_send_binary(this[_rid], ta);
this[_bufferedAmount] += ta.byteLength;
- PromisePrototypeThen(
- core.opAsync("op_ws_send", this[_rid], {
- kind: "binary",
- value: ta,
- }),
- () => {
+ if (!sent) {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_send_binary", this[_rid], ta),
+ () => {
+ this[_bufferedAmount] -= ta.byteLength;
+ },
+ );
+ } else {
+ // Spec expects data to be start flushing on next tick but oh well...
+ // we already sent it so we can just decrement the bufferedAmount
+ // on the next tick.
+ queueMicrotask(() => {
this[_bufferedAmount] -= ta.byteLength;
- },
- );
+ });
+ }
};
- if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
- PromisePrototypeThen(
- data.slice().arrayBuffer(),
- (ab) => sendTypedArray(new DataView(ab)),
- );
+ if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
+ sendTypedArray(new Uint8Array(data));
} else if (ArrayBufferIsView(data)) {
sendTypedArray(data);
- } else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
- sendTypedArray(new DataView(data));
- } else {
- const string = String(data);
- const d = core.encode(string);
- this[_bufferedAmount] += d.byteLength;
+ } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
PromisePrototypeThen(
- core.opAsync("op_ws_send", this[_rid], {
- kind: "text",
- value: string,
- }),
- () => {
- this[_bufferedAmount] -= d.byteLength;
- },
+ data.slice().arrayBuffer(),
+ (ab) => sendTypedArray(new Uint8Array(ab)),
);
}
}
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index a31431377..704c699a7 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -161,6 +161,51 @@ impl WsStreamResource {
}
}
+ fn try_send(self: &Rc<Self>, message: Message) -> Result<bool, AnyError> {
+ let waker = deno_core::futures::task::noop_waker();
+ let mut cx = std::task::Context::from_waker(&waker);
+
+ let res = match self.stream {
+ WebSocketStreamType::Client { .. } => {
+ match RcRef::map(self, |r| match &r.stream {
+ WebSocketStreamType::Client { tx, .. } => tx,
+ WebSocketStreamType::Server { .. } => unreachable!(),
+ })
+ .try_borrow_mut()
+ {
+ Some(mut tx) => {
+ if tx.poll_ready_unpin(&mut cx).is_ready() {
+ tx.start_send_unpin(message)?;
+ tx.poll_flush_unpin(&mut cx).is_ready()
+ } else {
+ false
+ }
+ }
+ None => false,
+ }
+ }
+ WebSocketStreamType::Server { .. } => {
+ match RcRef::map(self, |r| match &r.stream {
+ WebSocketStreamType::Client { .. } => unreachable!(),
+ WebSocketStreamType::Server { tx, .. } => tx,
+ })
+ .try_borrow_mut()
+ {
+ Some(mut tx) => {
+ if tx.poll_ready_unpin(&mut cx).is_ready() {
+ tx.start_send_unpin(message)?;
+ tx.poll_flush_unpin(&mut cx).is_ready()
+ } else {
+ false
+ }
+ }
+ None => false,
+ }
+ }
+ };
+ Ok(res)
+ }
+
async fn next_message(
self: &Rc<Self>,
cancel: RcRef<CancelHandle>,
@@ -426,6 +471,54 @@ pub async fn op_ws_send(
Ok(())
}
+#[op]
+pub async fn op_ws_send_string(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ text: String,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)?;
+ resource.send(Message::Text(text)).await?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_ws_send_binary(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)?;
+ resource.send(Message::Binary(data.to_vec())).await?;
+ Ok(())
+}
+
+#[op]
+pub fn op_ws_try_send_string(
+ state: &mut OpState,
+ rid: ResourceId,
+ text: String,
+) -> Result<bool, AnyError> {
+ let resource = state.resource_table.get::<WsStreamResource>(rid)?;
+ resource.try_send(Message::Text(text))
+}
+
+#[op(fast)]
+pub fn op_ws_try_send_binary(
+ state: &mut OpState,
+ rid: u32,
+ value: &[u8],
+) -> Result<bool, AnyError> {
+ let resource = state.resource_table.get::<WsStreamResource>(rid)?;
+ resource.try_send(Message::Binary(value.to_vec()))
+}
+
#[op(deferred)]
pub async fn op_ws_close(
state: Rc<RefCell<OpState>>,
@@ -515,6 +608,10 @@ pub fn init<P: WebSocketPermissions + 'static>(
op_ws_send::decl(),
op_ws_close::decl(),
op_ws_next_event::decl(),
+ op_ws_send_string::decl(),
+ op_ws_send_binary::decl(),
+ op_ws_try_send_string::decl(),
+ op_ws_try_send_binary::decl(),
])
.state(move |state| {
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));