diff options
author | Leo K <crowlkats@toaxl.com> | 2021-08-10 00:28:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-10 00:28:17 +0200 |
commit | 2db381eba9768acf855219ec9560e20a62659994 (patch) | |
tree | c06a693b804c9a2bc3bf76f7ac66a02f57499ccb /extensions/websocket/lib.rs | |
parent | 7600a456dfc880a1eeff230f3f34c87978fedc58 (diff) |
feat: add experimental WebSocketStream API (#10365)
This commit adds the experimental WebSocketStream API when
using the --unstable flag.
The explainer for the API can be found here:
https://github.com/ricea/websocketstream-explainer
Diffstat (limited to 'extensions/websocket/lib.rs')
-rw-r--r-- | extensions/websocket/lib.rs | 109 |
1 files changed, 100 insertions, 9 deletions
diff --git a/extensions/websocket/lib.rs b/extensions/websocket/lib.rs index 896a5f2e2..97e970e85 100644 --- a/extensions/websocket/lib.rs +++ b/extensions/websocket/lib.rs @@ -3,7 +3,6 @@ use deno_core::error::bad_resource_id; use deno_core::error::invalid_hostname; use deno_core::error::null_opbuf; -use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::stream::SplitSink; use deno_core::futures::stream::SplitStream; @@ -30,6 +29,7 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::fmt; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; @@ -153,14 +153,26 @@ impl Resource for WsStreamResource { } } +pub struct WsCancelResource(Rc<CancelHandle>); + +impl Resource for WsCancelResource { + fn name(&self) -> Cow<str> { + "webSocketCancel".into() + } + + fn close(self: Rc<Self>) { + self.0.cancel() + } +} + // This op is needed because creating a WS instance in JavaScript is a sync // operation and should throw error when permissions are not fulfilled, // but actual op that connects WS is async. -pub fn op_ws_check_permission<WP>( +pub fn op_ws_check_permission_and_cancel_handle<WP>( state: &mut OpState, url: String, - _: (), -) -> Result<(), AnyError> + cancel_handle: bool, +) -> Result<Option<ResourceId>, AnyError> where WP: WebSocketPermissions + 'static, { @@ -168,7 +180,14 @@ where .borrow_mut::<WP>() .check_net_url(&url::Url::parse(&url)?)?; - Ok(()) + if cancel_handle { + let rid = state + .resource_table + .add(WsCancelResource(CancelHandle::new_rc())); + Ok(Some(rid)) + } else { + Ok(None) + } } #[derive(Deserialize)] @@ -176,6 +195,7 @@ where pub struct CreateArgs { url: String, protocols: String, + cancel_handle: Option<ResourceId>, } #[derive(Serialize)] @@ -246,14 +266,32 @@ where _ => unreachable!(), }; + let client = client_async(request, socket); let (stream, response): (WsStream, Response) = - client_async(request, socket).await.map_err(|err| { - type_error(format!( + if let Some(cancel_rid) = args.cancel_handle { + let r = state + .borrow_mut() + .resource_table + .get::<WsCancelResource>(cancel_rid) + .ok_or_else(bad_resource_id)?; + client + .or_cancel(r.0.to_owned()) + .await + .map_err(|_| DomExceptionAbortError::new("connection was aborted"))? + } else { + client.await + } + .map_err(|err| { + DomExceptionNetworkError::new(&format!( "failed to connect to WebSocket: {}", err.to_string() )) })?; + if let Some(cancel_rid) = args.cancel_handle { + state.borrow_mut().resource_table.close(cancel_rid); + } + let (ws_tx, ws_rx) = stream.split(); let resource = WsStreamResource { stream: WebSocketStreamType::Client { @@ -398,11 +436,12 @@ pub fn init<P: WebSocketPermissions + 'static>( .js(include_js_files!( prefix "deno:extensions/websocket", "01_websocket.js", + "02_websocketstream.js", )) .ops(vec![ ( - "op_ws_check_permission", - op_sync(op_ws_check_permission::<P>), + "op_ws_check_permission_and_cancel_handle", + op_sync(op_ws_check_permission_and_cancel_handle::<P>), ), ("op_ws_create", op_async(op_ws_create::<P>)), ("op_ws_send", op_async(op_ws_send)), @@ -423,3 +462,55 @@ pub fn init<P: WebSocketPermissions + 'static>( pub fn get_declaration() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") } + +#[derive(Debug)] +pub struct DomExceptionNetworkError { + pub msg: String, +} + +impl DomExceptionNetworkError { + pub fn new(msg: &str) -> Self { + DomExceptionNetworkError { + msg: msg.to_string(), + } + } +} + +impl fmt::Display for DomExceptionNetworkError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(&self.msg) + } +} + +impl std::error::Error for DomExceptionNetworkError {} + +pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> { + e.downcast_ref::<DomExceptionNetworkError>() + .map(|_| "DOMExceptionNetworkError") +} + +#[derive(Debug)] +pub struct DomExceptionAbortError { + pub msg: String, +} + +impl DomExceptionAbortError { + pub fn new(msg: &str) -> Self { + DomExceptionAbortError { + msg: msg.to_string(), + } + } +} + +impl fmt::Display for DomExceptionAbortError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad(&self.msg) + } +} + +impl std::error::Error for DomExceptionAbortError {} + +pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> { + e.downcast_ref::<DomExceptionAbortError>() + .map(|_| "DOMExceptionAbortError") +} |