summaryrefslogtreecommitdiff
path: root/extensions/websocket/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/websocket/lib.rs')
-rw-r--r--extensions/websocket/lib.rs109
1 files changed, 100 insertions, 9 deletions
diff --git a/extensions/websocket/lib.rs b/extensions/websocket/lib.rs
index 896a5f2e2..97e970e85 100644
--- a/extensions/websocket/lib.rs
+++ b/extensions/websocket/lib.rs
@@ -3,7 +3,6 @@
use deno_core::error::bad_resource_id;
use deno_core::error::invalid_hostname;
use deno_core::error::null_opbuf;
-use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::stream::SplitSink;
use deno_core::futures::stream::SplitStream;
@@ -30,6 +29,7 @@ use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::fmt;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
@@ -153,14 +153,26 @@ impl Resource for WsStreamResource {
}
}
+pub struct WsCancelResource(Rc<CancelHandle>);
+
+impl Resource for WsCancelResource {
+ fn name(&self) -> Cow<str> {
+ "webSocketCancel".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.0.cancel()
+ }
+}
+
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
-pub fn op_ws_check_permission<WP>(
+pub fn op_ws_check_permission_and_cancel_handle<WP>(
state: &mut OpState,
url: String,
- _: (),
-) -> Result<(), AnyError>
+ cancel_handle: bool,
+) -> Result<Option<ResourceId>, AnyError>
where
WP: WebSocketPermissions + 'static,
{
@@ -168,7 +180,14 @@ where
.borrow_mut::<WP>()
.check_net_url(&url::Url::parse(&url)?)?;
- Ok(())
+ if cancel_handle {
+ let rid = state
+ .resource_table
+ .add(WsCancelResource(CancelHandle::new_rc()));
+ Ok(Some(rid))
+ } else {
+ Ok(None)
+ }
}
#[derive(Deserialize)]
@@ -176,6 +195,7 @@ where
pub struct CreateArgs {
url: String,
protocols: String,
+ cancel_handle: Option<ResourceId>,
}
#[derive(Serialize)]
@@ -246,14 +266,32 @@ where
_ => unreachable!(),
};
+ let client = client_async(request, socket);
let (stream, response): (WsStream, Response) =
- client_async(request, socket).await.map_err(|err| {
- type_error(format!(
+ if let Some(cancel_rid) = args.cancel_handle {
+ let r = state
+ .borrow_mut()
+ .resource_table
+ .get::<WsCancelResource>(cancel_rid)
+ .ok_or_else(bad_resource_id)?;
+ client
+ .or_cancel(r.0.to_owned())
+ .await
+ .map_err(|_| DomExceptionAbortError::new("connection was aborted"))?
+ } else {
+ client.await
+ }
+ .map_err(|err| {
+ DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {}",
err.to_string()
))
})?;
+ if let Some(cancel_rid) = args.cancel_handle {
+ state.borrow_mut().resource_table.close(cancel_rid);
+ }
+
let (ws_tx, ws_rx) = stream.split();
let resource = WsStreamResource {
stream: WebSocketStreamType::Client {
@@ -398,11 +436,12 @@ pub fn init<P: WebSocketPermissions + 'static>(
.js(include_js_files!(
prefix "deno:extensions/websocket",
"01_websocket.js",
+ "02_websocketstream.js",
))
.ops(vec![
(
- "op_ws_check_permission",
- op_sync(op_ws_check_permission::<P>),
+ "op_ws_check_permission_and_cancel_handle",
+ op_sync(op_ws_check_permission_and_cancel_handle::<P>),
),
("op_ws_create", op_async(op_ws_create::<P>)),
("op_ws_send", op_async(op_ws_send)),
@@ -423,3 +462,55 @@ pub fn init<P: WebSocketPermissions + 'static>(
pub fn get_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts")
}
+
+#[derive(Debug)]
+pub struct DomExceptionNetworkError {
+ pub msg: String,
+}
+
+impl DomExceptionNetworkError {
+ pub fn new(msg: &str) -> Self {
+ DomExceptionNetworkError {
+ msg: msg.to_string(),
+ }
+ }
+}
+
+impl fmt::Display for DomExceptionNetworkError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad(&self.msg)
+ }
+}
+
+impl std::error::Error for DomExceptionNetworkError {}
+
+pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
+ e.downcast_ref::<DomExceptionNetworkError>()
+ .map(|_| "DOMExceptionNetworkError")
+}
+
+#[derive(Debug)]
+pub struct DomExceptionAbortError {
+ pub msg: String,
+}
+
+impl DomExceptionAbortError {
+ pub fn new(msg: &str) -> Self {
+ DomExceptionAbortError {
+ msg: msg.to_string(),
+ }
+ }
+}
+
+impl fmt::Display for DomExceptionAbortError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad(&self.msg)
+ }
+}
+
+impl std::error::Error for DomExceptionAbortError {}
+
+pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> {
+ e.downcast_ref::<DomExceptionAbortError>()
+ .map(|_| "DOMExceptionAbortError")
+}