diff options
Diffstat (limited to 'ext/websocket')
-rw-r--r-- | ext/websocket/01_websocket.js | 57 | ||||
-rw-r--r-- | ext/websocket/02_websocketstream.js | 43 | ||||
-rw-r--r-- | ext/websocket/lib.rs | 65 |
3 files changed, 102 insertions, 63 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index 06eb08b60..03a6427c2 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -394,13 +394,14 @@ class WebSocket extends EventTarget { async [_eventLoop]() { while (this[_readyState] !== CLOSED) { - const { kind, value } = await core.opAsync( + const { 0: kind, 1: value } = await core.opAsync( "op_ws_next_event", this[_rid], ); switch (kind) { - case "string": { + case 0: { + /* string */ this[_serverHandleIdleTimeout](); const event = new MessageEvent("message", { data: value, @@ -409,14 +410,15 @@ class WebSocket extends EventTarget { this.dispatchEvent(event); break; } - case "binary": { + case 1: { + /* binary */ this[_serverHandleIdleTimeout](); let data; if (this.binaryType === "blob") { data = new Blob([value]); } else { - data = value.buffer; + data = value; } const event = new MessageEvent("message", { @@ -427,12 +429,32 @@ class WebSocket extends EventTarget { this.dispatchEvent(event); break; } - case "pong": { + case 2: { + /* pong */ this[_serverHandleIdleTimeout](); break; } - case "closed": - case "close": { + case 5: { + /* error */ + this[_readyState] = CLOSED; + + const errorEv = new ErrorEvent("error", { + message: value, + }); + this.dispatchEvent(errorEv); + + const closeEv = new CloseEvent("close"); + this.dispatchEvent(closeEv); + core.tryClose(this[_rid]); + break; + } + case 3: { + /* ping */ + break; + } + default: { + /* close */ + const code = kind; const prevState = this[_readyState]; this[_readyState] = CLOSED; clearTimeout(this[_idleTimeoutTimeout]); @@ -442,8 +464,8 @@ class WebSocket extends EventTarget { await core.opAsync( "op_ws_close", this[_rid], - value.code, - value.reason, + code, + value, ); } catch { // ignore failures @@ -452,26 +474,13 @@ class WebSocket extends EventTarget { const event = new CloseEvent("close", { wasClean: true, - code: value.code, - reason: value.reason, + code: code, + reason: value, }); this.dispatchEvent(event); core.tryClose(this[_rid]); break; } - case "error": { - this[_readyState] = CLOSED; - - const errorEv = new ErrorEvent("error", { - message: value, - }); - this.dispatchEvent(errorEv); - - const closeEv = new CloseEvent("close"); - this.dispatchEvent(closeEv); - core.tryClose(this[_rid]); - break; - } } } } diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 0a3aeb192..6e487f0b7 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -167,12 +167,13 @@ class WebSocketStream { PromisePrototypeThen( (async () => { while (true) { - const { kind } = await core.opAsync( + const { 0: kind } = await core.opAsync( "op_ws_next_event", create.rid, ); - if (kind === "close") { + if (kind > 6) { + /* close */ break; } } @@ -237,37 +238,51 @@ class WebSocketStream { }, }); const pull = async (controller) => { - const { kind, value } = await core.opAsync( + const { 0: kind, 1: value } = await core.opAsync( "op_ws_next_event", this[_rid], ); switch (kind) { - case "string": { + case 0: + case 1: { + /* string */ + /* binary */ controller.enqueue(value); break; } - case "binary": { - controller.enqueue(value); + case 5: { + /* error */ + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + core.tryClose(this[_rid]); break; } - case "ping": { + case 3: { + /* ping */ await core.opAsync("op_ws_send", this[_rid], { kind: "pong", }); await pull(controller); break; } - case "closed": - case "close": { - this[_closed].resolve(value); + case 2: { + /* pong */ + break; + } + case 6: { + /* closed */ + this[_closed].resolve(undefined); core.tryClose(this[_rid]); break; } - case "error": { - const err = new Error(value); - this[_closed].reject(err); - controller.error(err); + default: { + /* close */ + this[_closed].resolve({ + code: kind, + reason: value, + }); core.tryClose(this[_rid]); break; } diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 8d3cb20d2..1c586b383 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -8,6 +8,7 @@ use deno_core::futures::stream::SplitStream; use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; use deno_core::op; +use deno_core::StringOrBuffer; use deno_core::url; use deno_core::AsyncRefCell; @@ -475,23 +476,21 @@ pub async fn op_ws_close( Ok(()) } -#[derive(Serialize)] -#[serde(tag = "kind", content = "value", rename_all = "camelCase")] -pub enum NextEventResponse { - String(String), - Binary(ZeroCopyBuf), - Close { code: u16, reason: String }, - Ping, - Pong, - Error(String), - Closed, +#[repr(u16)] +pub enum MessageKind { + Text = 0, + Binary = 1, + Pong = 2, + Ping = 3, + Error = 5, + Closed = 6, } #[op] pub async fn op_ws_next_event( state: Rc<RefCell<OpState>>, rid: ResourceId, -) -> Result<NextEventResponse, AnyError> { +) -> Result<(u16, StringOrBuffer), AnyError> { let resource = state .borrow_mut() .resource_table @@ -500,24 +499,40 @@ pub async fn op_ws_next_event( let cancel = RcRef::map(&resource, |r| &r.cancel); let val = resource.next_message(cancel).await?; let res = match val { - Some(Ok(Message::Text(text))) => NextEventResponse::String(text), - Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), - Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { - code: frame.code.into(), - reason: frame.reason.to_string(), - }, - Some(Ok(Message::Close(None))) => NextEventResponse::Close { - code: 1005, - reason: String::new(), - }, - Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, - Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, - Some(Err(e)) => NextEventResponse::Error(e.to_string()), + Some(Ok(Message::Text(text))) => { + (MessageKind::Text as u16, StringOrBuffer::String(text)) + } + Some(Ok(Message::Binary(data))) => ( + MessageKind::Binary as u16, + StringOrBuffer::Buffer(data.into()), + ), + Some(Ok(Message::Close(Some(frame)))) => ( + frame.code.into(), + StringOrBuffer::String(frame.reason.to_string()), + ), + Some(Ok(Message::Close(None))) => { + (1005, StringOrBuffer::String("".to_string())) + } + Some(Ok(Message::Ping(_))) => ( + MessageKind::Ping as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + Some(Ok(Message::Pong(_))) => ( + MessageKind::Pong as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + Some(Err(e)) => ( + MessageKind::Error as u16, + StringOrBuffer::String(e.to_string()), + ), None => { // No message was received, presumably the socket closed while we waited. // Try close the stream, ignoring any errors, and report closed status to JavaScript. let _ = state.borrow_mut().resource_table.close(rid); - NextEventResponse::Closed + ( + MessageKind::Closed as u16, + StringOrBuffer::Buffer(vec![].into()), + ) } }; Ok(res) |