summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/websocket/01_websocket.js74
-rw-r--r--ext/websocket/lib.rs103
2 files changed, 28 insertions, 149 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index 0d0a4211a..9b7c45e70 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -26,6 +26,7 @@
ArrayPrototypeJoin,
ArrayPrototypeMap,
ArrayPrototypeSome,
+ DataView,
ErrorPrototypeToString,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
@@ -34,14 +35,13 @@
Set,
// TODO(lucacasonato): add SharedArrayBuffer to primordials
// SharedArrayBufferPrototype
+ String,
StringPrototypeEndsWith,
StringPrototypeToLowerCase,
Symbol,
SymbolIterator,
PromisePrototypeCatch,
- queueMicrotask,
SymbolFor,
- Uint8Array,
} = window.__bootstrap.primordials;
webidl.converters["sequence<DOMString> or DOMString"] = (V, opts) => {
@@ -300,58 +300,40 @@
throw new DOMException("readyState not OPEN", "InvalidStateError");
}
- if (typeof data === "string") {
- // try to send in one go!
- const d = core.byteLength(data);
- const sent = ops.op_ws_try_send_string(this[_rid], data);
- this[_bufferedAmount] += d;
- if (!sent) {
- PromisePrototypeThen(
- core.opAsync("op_ws_send_string", this[_rid], data),
- () => {
- this[_bufferedAmount] -= d;
- },
- );
- } else {
- // Spec expects data to be start flushing on next tick but oh well...
- // we already sent it so we can just decrement the bufferedAmount
- // on the next tick.
- queueMicrotask(() => {
- this[_bufferedAmount] -= d;
- });
- }
- return;
- }
-
const sendTypedArray = (ta) => {
- // try to send in one go!
- const sent = ops.op_ws_try_send_binary(this[_rid], ta);
this[_bufferedAmount] += ta.byteLength;
- if (!sent) {
- PromisePrototypeThen(
- core.opAsync("op_ws_send_binary", this[_rid], ta),
- () => {
- this[_bufferedAmount] -= ta.byteLength;
- },
- );
- } else {
- // Spec expects data to be start flushing on next tick but oh well...
- // we already sent it so we can just decrement the bufferedAmount
- // on the next tick.
- queueMicrotask(() => {
+ PromisePrototypeThen(
+ core.opAsync("op_ws_send", this[_rid], {
+ kind: "binary",
+ value: ta,
+ }),
+ () => {
this[_bufferedAmount] -= ta.byteLength;
- });
- }
+ },
+ );
};
- if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
- sendTypedArray(new Uint8Array(data));
+ if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
+ PromisePrototypeThen(
+ data.slice().arrayBuffer(),
+ (ab) => sendTypedArray(new DataView(ab)),
+ );
} else if (ArrayBufferIsView(data)) {
sendTypedArray(data);
- } else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, data)) {
+ } else if (ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data)) {
+ sendTypedArray(new DataView(data));
+ } else {
+ const string = String(data);
+ const d = core.encode(string);
+ this[_bufferedAmount] += d.byteLength;
PromisePrototypeThen(
- data.slice().arrayBuffer(),
- (ab) => sendTypedArray(new Uint8Array(ab)),
+ core.opAsync("op_ws_send", this[_rid], {
+ kind: "text",
+ value: string,
+ }),
+ () => {
+ this[_bufferedAmount] -= d.byteLength;
+ },
);
}
}
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index bc4b3876d..8123f84fc 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -161,51 +161,6 @@ impl WsStreamResource {
}
}
- fn try_send(self: &Rc<Self>, message: Message) -> Result<bool, AnyError> {
- let waker = deno_core::futures::task::noop_waker();
- let mut cx = std::task::Context::from_waker(&waker);
-
- let res = match self.stream {
- WebSocketStreamType::Client { .. } => {
- match RcRef::map(self, |r| match &r.stream {
- WebSocketStreamType::Client { tx, .. } => tx,
- WebSocketStreamType::Server { .. } => unreachable!(),
- })
- .try_borrow_mut()
- {
- Some(mut tx) => {
- if tx.poll_ready_unpin(&mut cx).is_ready() {
- tx.start_send_unpin(message)?;
- tx.poll_flush_unpin(&mut cx).is_ready()
- } else {
- false
- }
- }
- None => false,
- }
- }
- WebSocketStreamType::Server { .. } => {
- match RcRef::map(self, |r| match &r.stream {
- WebSocketStreamType::Client { .. } => unreachable!(),
- WebSocketStreamType::Server { tx, .. } => tx,
- })
- .try_borrow_mut()
- {
- Some(mut tx) => {
- if tx.poll_ready_unpin(&mut cx).is_ready() {
- tx.start_send_unpin(message)?;
- tx.poll_flush_unpin(&mut cx).is_ready()
- } else {
- false
- }
- }
- None => false,
- }
- }
- };
- Ok(res)
- }
-
async fn next_message(
self: &Rc<Self>,
cancel: RcRef<CancelHandle>,
@@ -471,60 +426,6 @@ pub async fn op_ws_send(
Ok(())
}
-#[op]
-pub async fn op_ws_send_string(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- text: String,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<WsStreamResource>(rid)?;
- resource.send(Message::Text(text)).await?;
- Ok(())
-}
-
-#[op]
-pub async fn op_ws_send_binary(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: ZeroCopyBuf,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<WsStreamResource>(rid)?;
- resource.send(Message::Binary(data.to_vec())).await?;
- Ok(())
-}
-
-#[op]
-pub fn op_ws_try_send_string(
- state: &mut OpState,
- rid: ResourceId,
- text: String,
-) -> bool {
- let resource = match state.resource_table.get::<WsStreamResource>(rid) {
- Ok(resource) => resource,
- Err(_) => return false,
- };
- resource.try_send(Message::Text(text)).is_ok()
-}
-
-#[op(fast)]
-pub fn op_ws_try_send_binary(
- state: &mut OpState,
- rid: u32,
- value: &[u8],
-) -> bool {
- let resource = match state.resource_table.get::<WsStreamResource>(rid) {
- Ok(resource) => resource,
- Err(_) => return false,
- };
- resource.try_send(Message::Binary(value.to_vec())).is_ok()
-}
-
#[op(deferred)]
pub async fn op_ws_close(
state: Rc<RefCell<OpState>>,
@@ -615,10 +516,6 @@ pub fn init<P: WebSocketPermissions + 'static>(
op_ws_send::decl(),
op_ws_close::decl(),
op_ws_next_event::decl(),
- op_ws_send_string::decl(),
- op_ws_send_binary::decl(),
- op_ws_try_send_string::decl(),
- op_ws_try_send_binary::decl(),
])
.state(move |state| {
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));