diff options
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index a31431377..704c699a7 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -161,6 +161,51 @@ impl WsStreamResource { } } + fn try_send(self: &Rc<Self>, message: Message) -> Result<bool, AnyError> { + let waker = deno_core::futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + let res = match self.stream { + WebSocketStreamType::Client { .. } => { + match RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { tx, .. } => tx, + WebSocketStreamType::Server { .. } => unreachable!(), + }) + .try_borrow_mut() + { + Some(mut tx) => { + if tx.poll_ready_unpin(&mut cx).is_ready() { + tx.start_send_unpin(message)?; + tx.poll_flush_unpin(&mut cx).is_ready() + } else { + false + } + } + None => false, + } + } + WebSocketStreamType::Server { .. } => { + match RcRef::map(self, |r| match &r.stream { + WebSocketStreamType::Client { .. } => unreachable!(), + WebSocketStreamType::Server { tx, .. } => tx, + }) + .try_borrow_mut() + { + Some(mut tx) => { + if tx.poll_ready_unpin(&mut cx).is_ready() { + tx.start_send_unpin(message)?; + tx.poll_flush_unpin(&mut cx).is_ready() + } else { + false + } + } + None => false, + } + } + }; + Ok(res) + } + async fn next_message( self: &Rc<Self>, cancel: RcRef<CancelHandle>, @@ -426,6 +471,54 @@ pub async fn op_ws_send( Ok(()) } +#[op] +pub async fn op_ws_send_string( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + text: String, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid)?; + resource.send(Message::Text(text)).await?; + Ok(()) +} + +#[op] +pub async fn op_ws_send_binary( + state: Rc<RefCell<OpState>>, + rid: ResourceId, + data: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow_mut() + .resource_table + .get::<WsStreamResource>(rid)?; + resource.send(Message::Binary(data.to_vec())).await?; + Ok(()) +} + +#[op] +pub fn op_ws_try_send_string( + state: &mut OpState, + rid: ResourceId, + text: String, +) -> Result<bool, AnyError> { + let resource = state.resource_table.get::<WsStreamResource>(rid)?; + resource.try_send(Message::Text(text)) +} + +#[op(fast)] +pub fn op_ws_try_send_binary( + state: &mut OpState, + rid: u32, + value: &[u8], +) -> Result<bool, AnyError> { + let resource = state.resource_table.get::<WsStreamResource>(rid)?; + resource.try_send(Message::Binary(value.to_vec())) +} + #[op(deferred)] pub async fn op_ws_close( state: Rc<RefCell<OpState>>, @@ -515,6 +608,10 @@ pub fn init<P: WebSocketPermissions + 'static>( op_ws_send::decl(), op_ws_close::decl(), op_ws_next_event::decl(), + op_ws_send_string::decl(), + op_ws_send_binary::decl(), + op_ws_try_send_string::decl(), + op_ws_try_send_binary::decl(), ]) .state(move |state| { state.put::<WsUserAgent>(WsUserAgent(user_agent.clone())); |