summaryrefslogtreecommitdiff
path: root/ext/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket')
-rw-r--r--ext/websocket/01_websocket.js56
-rw-r--r--ext/websocket/02_websocketstream.js8
-rw-r--r--ext/websocket/lib.rs88
3 files changed, 93 insertions, 59 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index 01dd26579..a38af036a 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -23,12 +23,10 @@ const primordials = globalThis.__bootstrap.primordials;
const {
ArrayBufferPrototype,
ArrayBufferIsView,
- ArrayBufferPrototypeGetByteLength,
ArrayPrototypeJoin,
ArrayPrototypeMap,
ArrayPrototypeSome,
DataView,
- DataViewPrototypeGetByteLength,
ErrorPrototypeToString,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
@@ -46,7 +44,6 @@ const {
PromisePrototypeCatch,
SymbolFor,
TypedArrayPrototypeGetByteLength,
- TypedArrayPrototypeGetSymbolToStringTag,
} = primordials;
const op_ws_check_permission_and_cancel_handle =
core.ops.op_ws_check_permission_and_cancel_handle;
@@ -57,6 +54,7 @@ const {
op_ws_send_text,
op_ws_next_event,
op_ws_send_ping,
+ op_ws_get_buffered_amount,
} = core.ensureFastOps();
webidl.converters["sequence<DOMString> or DOMString"] = (
@@ -111,7 +109,6 @@ const _role = Symbol("[[role]]");
const _extensions = Symbol("[[extensions]]");
const _protocol = Symbol("[[protocol]]");
const _binaryType = Symbol("[[binaryType]]");
-const _bufferedAmount = Symbol("[[bufferedAmount]]");
const _eventLoop = Symbol("[[eventLoop]]");
const _server = Symbol("[[server]]");
@@ -179,10 +176,13 @@ class WebSocket extends EventTarget {
}
}
- [_bufferedAmount] = 0;
get bufferedAmount() {
webidl.assertBranded(this, WebSocketPrototype);
- return this[_bufferedAmount];
+ if (this[_readyState] === OPEN) {
+ return op_ws_get_buffered_amount(this[_rid]);
+ } else {
+ return 0;
+ }
}
constructor(url, protocols = []) {
@@ -318,55 +318,25 @@ class WebSocket extends EventTarget {
throw new DOMException("readyState not OPEN", "InvalidStateError");
}
- /**
- * @param {ArrayBufferView} view
- * @param {number} byteLength
- */
- const sendTypedArray = (view, byteLength) => {
- this[_bufferedAmount] += byteLength;
- PromisePrototypeThen(
- op_ws_send_binary(
- this[_rid],
- view,
- ),
- () => {
- this[_bufferedAmount] -= byteLength;
- },
- );
- };
-
if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
PromisePrototypeThen(
// deno-lint-ignore prefer-primordials
data.slice().arrayBuffer(),
(ab) =>
- sendTypedArray(
+ op_ws_send_binary(
+ this[_rid],
new DataView(ab),
- ArrayBufferPrototypeGetByteLength(ab),
),
);
} else if (ArrayBufferIsView(data)) {
- if (TypedArrayPrototypeGetSymbolToStringTag(data) === undefined) {
- // DataView
- sendTypedArray(data, DataViewPrototypeGetByteLength(data));
- } else {
- // TypedArray
- sendTypedArray(data, TypedArrayPrototypeGetByteLength(data));
- }
+ op_ws_send_binary(this[_rid], data);
} else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
- sendTypedArray(data, ArrayBufferPrototypeGetByteLength(data));
+ op_ws_send_binary(this[_rid], data);
} else {
const string = String(data);
- const d = core.encode(string);
- this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
- PromisePrototypeThen(
- op_ws_send_text(
- this[_rid],
- string,
- ),
- () => {
- this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d);
- },
+ op_ws_send_text(
+ this[_rid],
+ string,
);
}
}
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 00d5bdaec..be1001eb6 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -34,8 +34,8 @@ const {
Uint8ArrayPrototype,
} = primordials;
const {
- op_ws_send_text,
- op_ws_send_binary,
+ op_ws_send_text_async,
+ op_ws_send_binary_async,
op_ws_next_event,
op_ws_create,
op_ws_close,
@@ -210,11 +210,11 @@ class WebSocketStream {
const writable = new WritableStream({
write: async (chunk) => {
if (typeof chunk === "string") {
- await op_ws_send_text(this[_rid], chunk);
+ await op_ws_send_text_async(this[_rid], chunk);
} else if (
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)
) {
- await op_ws_send_binary(this[_rid], chunk);
+ await op_ws_send_binary_async(this[_rid], chunk);
} else {
throw new TypeError(
"A chunk may only be either a string or an Uint8Array",
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index f2101b413..af987c1e4 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -281,8 +281,10 @@ where
}
let resource = ServerWebSocket {
+ buffered: Cell::new(0),
+ errored: Cell::new(None),
ws: AsyncRefCell::new(FragmentCollector::new(stream)),
- closed: Rc::new(Cell::new(false)),
+ closed: Cell::new(false),
tx_lock: AsyncRefCell::new(()),
};
let mut state = state.borrow_mut();
@@ -315,18 +317,20 @@ pub enum MessageKind {
}
pub struct ServerWebSocket {
+ buffered: Cell<usize>,
+ errored: Cell<Option<AnyError>>,
ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
- closed: Rc<Cell<bool>>,
+ closed: Cell<bool>,
tx_lock: AsyncRefCell<()>,
}
impl ServerWebSocket {
#[inline]
pub async fn write_frame(
- self: Rc<Self>,
+ self: &Rc<Self>,
frame: Frame,
) -> Result<(), AnyError> {
- let _lock = RcRef::map(&self, |r| &r.tx_lock).borrow_mut().await;
+ let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
// to populate the write buffer. We encounter an await point when writing
// to the socket after the frame has already been written to the buffer.
@@ -361,8 +365,10 @@ pub fn ws_create_server_stream(
ws.set_auto_pong(true);
let ws_resource = ServerWebSocket {
+ buffered: Cell::new(0),
+ errored: Cell::new(None),
ws: AsyncRefCell::new(FragmentCollector::new(ws)),
- closed: Rc::new(Cell::new(false)),
+ closed: Cell::new(false),
tx_lock: AsyncRefCell::new(()),
};
@@ -370,8 +376,48 @@ pub fn ws_create_server_stream(
Ok(rid)
}
-#[op]
-pub async fn op_ws_send_binary(
+#[op(fast)]
+pub fn op_ws_send_binary(
+ state: &mut OpState,
+ rid: ResourceId,
+ data: ZeroCopyBuf,
+) {
+ let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
+ let data = data.to_vec();
+ let len = data.len();
+ resource.buffered.set(resource.buffered.get() + len);
+ deno_core::task::spawn(async move {
+ if let Err(err) = resource
+ .write_frame(Frame::new(true, OpCode::Binary, None, data))
+ .await
+ {
+ resource.errored.set(Some(err));
+ } else {
+ resource.buffered.set(resource.buffered.get() - len);
+ }
+ });
+}
+
+#[op(fast)]
+pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
+ let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
+ let len = data.len();
+ resource.buffered.set(resource.buffered.get() + len);
+ deno_core::task::spawn(async move {
+ if let Err(err) = resource
+ .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .await
+ {
+ resource.errored.set(Some(err));
+ } else {
+ resource.buffered.set(resource.buffered.get() - len);
+ }
+ });
+}
+
+/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
+#[op(fast)]
+pub async fn op_ws_send_binary_async(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
data: ZeroCopyBuf,
@@ -380,13 +426,15 @@ pub async fn op_ws_send_binary(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
+ let data = data.to_vec();
resource
- .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
+ .write_frame(Frame::new(true, OpCode::Binary, None, data))
.await
}
-#[op]
-pub async fn op_ws_send_text(
+/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
+#[op(fast)]
+pub async fn op_ws_send_text_async(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
data: String,
@@ -400,6 +448,16 @@ pub async fn op_ws_send_text(
.await
}
+#[op(fast)]
+pub fn op_ws_get_buffered_amount(state: &mut OpState, rid: ResourceId) -> u32 {
+ state
+ .resource_table
+ .get::<ServerWebSocket>(rid)
+ .unwrap()
+ .buffered
+ .get() as u32
+}
+
#[op]
pub async fn op_ws_send_pong(
state: Rc<RefCell<OpState>>,
@@ -441,8 +499,7 @@ pub async fn op_ws_close(
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
.unwrap_or_else(|| Frame::close_raw(vec![]));
- let cell = Rc::clone(&resource.closed);
- cell.set(true);
+ resource.closed.set(true);
resource.write_frame(frame).await?;
Ok(())
}
@@ -457,6 +514,10 @@ pub async fn op_ws_next_event(
.resource_table
.get::<ServerWebSocket>(rid)?;
+ if let Some(err) = resource.errored.take() {
+ return Err(err);
+ }
+
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
loop {
let val = match ws.read_frame().await {
@@ -519,8 +580,11 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
+ op_ws_send_binary_async,
+ op_ws_send_text_async,
op_ws_send_ping,
op_ws_send_pong,
+ op_ws_get_buffered_amount,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {