summaryrefslogtreecommitdiff
path: root/ext/websocket/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r--ext/websocket/lib.rs97
1 files changed, 97 insertions, 0 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index a31431377..704c699a7 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -161,6 +161,51 @@ impl WsStreamResource {
}
}
+ fn try_send(self: &Rc<Self>, message: Message) -> Result<bool, AnyError> {
+ let waker = deno_core::futures::task::noop_waker();
+ let mut cx = std::task::Context::from_waker(&waker);
+
+ let res = match self.stream {
+ WebSocketStreamType::Client { .. } => {
+ match RcRef::map(self, |r| match &r.stream {
+ WebSocketStreamType::Client { tx, .. } => tx,
+ WebSocketStreamType::Server { .. } => unreachable!(),
+ })
+ .try_borrow_mut()
+ {
+ Some(mut tx) => {
+ if tx.poll_ready_unpin(&mut cx).is_ready() {
+ tx.start_send_unpin(message)?;
+ tx.poll_flush_unpin(&mut cx).is_ready()
+ } else {
+ false
+ }
+ }
+ None => false,
+ }
+ }
+ WebSocketStreamType::Server { .. } => {
+ match RcRef::map(self, |r| match &r.stream {
+ WebSocketStreamType::Client { .. } => unreachable!(),
+ WebSocketStreamType::Server { tx, .. } => tx,
+ })
+ .try_borrow_mut()
+ {
+ Some(mut tx) => {
+ if tx.poll_ready_unpin(&mut cx).is_ready() {
+ tx.start_send_unpin(message)?;
+ tx.poll_flush_unpin(&mut cx).is_ready()
+ } else {
+ false
+ }
+ }
+ None => false,
+ }
+ }
+ };
+ Ok(res)
+ }
+
async fn next_message(
self: &Rc<Self>,
cancel: RcRef<CancelHandle>,
@@ -426,6 +471,54 @@ pub async fn op_ws_send(
Ok(())
}
+#[op]
+pub async fn op_ws_send_string(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ text: String,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)?;
+ resource.send(Message::Text(text)).await?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_ws_send_binary(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsStreamResource>(rid)?;
+ resource.send(Message::Binary(data.to_vec())).await?;
+ Ok(())
+}
+
+#[op]
+pub fn op_ws_try_send_string(
+ state: &mut OpState,
+ rid: ResourceId,
+ text: String,
+) -> Result<bool, AnyError> {
+ let resource = state.resource_table.get::<WsStreamResource>(rid)?;
+ resource.try_send(Message::Text(text))
+}
+
+#[op(fast)]
+pub fn op_ws_try_send_binary(
+ state: &mut OpState,
+ rid: u32,
+ value: &[u8],
+) -> Result<bool, AnyError> {
+ let resource = state.resource_table.get::<WsStreamResource>(rid)?;
+ resource.try_send(Message::Binary(value.to_vec()))
+}
+
#[op(deferred)]
pub async fn op_ws_close(
state: Rc<RefCell<OpState>>,
@@ -515,6 +608,10 @@ pub fn init<P: WebSocketPermissions + 'static>(
op_ws_send::decl(),
op_ws_close::decl(),
op_ws_next_event::decl(),
+ op_ws_send_string::decl(),
+ op_ws_send_binary::decl(),
+ op_ws_try_send_string::decl(),
+ op_ws_try_send_binary::decl(),
])
.state(move |state| {
state.put::<WsUserAgent>(WsUserAgent(user_agent.clone()));