diff options
-rw-r--r-- | ext/websocket/01_websocket.js | 40 | ||||
-rw-r--r-- | ext/websocket/02_websocketstream.js | 40 | ||||
-rw-r--r-- | ext/websocket/lib.rs | 66 |
3 files changed, 103 insertions, 43 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js index e4774f815..797d64723 100644 --- a/ext/websocket/01_websocket.js +++ b/ext/websocket/01_websocket.js @@ -18,6 +18,7 @@ ArrayPrototypeJoin, ArrayPrototypeMap, ArrayPrototypeSome, + Uint32Array, ErrorPrototypeToString, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, @@ -82,6 +83,10 @@ const _idleTimeoutDuration = Symbol("[[idleTimeout]]"); const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]"); const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]"); + + /* [event type, close code] */ + const eventBuf = new Uint32Array(2); + class WebSocket extends EventTarget { [_rid]; @@ -410,13 +415,15 @@ async [_eventLoop]() { while (this[_readyState] !== CLOSED) { - const { kind, value } = await core.opAsync( + const value = await core.opAsync( "op_ws_next_event", this[_rid], + eventBuf, ); - + const kind = eventBuf[0]; switch (kind) { - case "string": { + /* string */ + case 0: { this[_serverHandleIdleTimeout](); const event = new MessageEvent("message", { data: value, @@ -425,7 +432,8 @@ this.dispatchEvent(event); break; } - case "binary": { + /* binary */ + case 1: { this[_serverHandleIdleTimeout](); let data; @@ -442,18 +450,23 @@ this.dispatchEvent(event); break; } - case "ping": { + /* ping */ + case 3: { core.opAsync("op_ws_send", this[_rid], { kind: "pong", }); break; } - case "pong": { + /* pong */ + case 4: { this[_serverHandleIdleTimeout](); break; } - case "closed": - case "close": { + /* closed */ + case 6: // falls through + /* close */ + case 2: { + const code = eventBuf[1]; const prevState = this[_readyState]; this[_readyState] = CLOSED; clearTimeout(this[_idleTimeoutTimeout]); @@ -463,8 +476,8 @@ await core.opAsync( "op_ws_close", this[_rid], - value.code, - value.reason, + code, + value, ); } catch { // ignore failures @@ -473,14 +486,15 @@ const event = new CloseEvent("close", { wasClean: true, - code: value.code, - reason: value.reason, + code, + reason: value, }); this.dispatchEvent(event); core.tryClose(this[_rid]); break; } - case "error": { + /* error */ + case 5: { this[_readyState] = CLOSED; const errorEv = new ErrorEvent("error", { diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 598816d05..876f4d0ed 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -27,6 +27,7 @@ SymbolFor, TypeError, Uint8ArrayPrototype, + Uint32Array, } = window.__bootstrap.primordials; webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter( @@ -168,12 +169,15 @@ PromisePrototypeThen( (async () => { while (true) { - const { kind } = await core.opAsync( + const kind = new Uint32Array(2); + await core.opAsync( "op_ws_next_event", create.rid, + kind, ); - if (kind === "close") { + /* close */ + if (kind[0] === 2) { break; } } @@ -237,35 +241,46 @@ await this.closed; }, }); + const pull = async (controller) => { - const { kind, value } = await core.opAsync( + /* [event type, close code] */ + const eventBuf = new Uint32Array(2); + const value = await core.opAsync( "op_ws_next_event", this[_rid], + eventBuf, ); - + const kind = eventBuf[0]; switch (kind) { - case "string": { + /* string */ + case 0: { controller.enqueue(value); break; } - case "binary": { + /* binary */ + case 1: { controller.enqueue(value); break; } - case "ping": { + /* ping */ + case 3: { await core.opAsync("op_ws_send", this[_rid], { kind: "pong", }); await pull(controller); break; } - case "closed": - case "close": { - this[_closed].resolve(value); + /* closed */ + case 6: // falls through + /* close */ + case 2: { + const code = eventBuf[1]; + this[_closed].resolve({ code, reason: value }); core.tryClose(this[_rid]); break; } - case "error": { + /* error */ + case 5: { const err = new Error(value); this[_closed].reject(err); controller.error(err); @@ -285,7 +300,8 @@ return pull(controller); } - this[_closed].resolve(value); + const code = eventBuf[1]; + this[_closed].resolve({ code, reason: value }); core.tryClose(this[_rid]); } }; diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 704c699a7..337752c21 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -20,6 +20,7 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; use deno_tls::create_client_config; use http::header::HeaderName; @@ -555,11 +556,23 @@ pub enum NextEventResponse { Closed, } -#[op] +#[repr(u32)] +enum NextEventKind { + String = 0, + Binary = 1, + Close = 2, + Ping = 3, + Pong = 4, + Error = 5, + Closed = 6, +} + +#[op(deferred)] pub async fn op_ws_next_event( state: Rc<RefCell<OpState>>, rid: ResourceId, -) -> Result<NextEventResponse, AnyError> { + kind_out: &mut [u32], +) -> Result<Option<StringOrBuffer>, AnyError> { let resource = state .borrow_mut() .resource_table @@ -567,28 +580,45 @@ pub async fn op_ws_next_event( let cancel = RcRef::map(&resource, |r| &r.cancel); let val = resource.next_message(cancel).await?; - let res = match val { - Some(Ok(Message::Text(text))) => NextEventResponse::String(text), - Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), - Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { - code: frame.code.into(), - reason: frame.reason.to_string(), - }, - Some(Ok(Message::Close(None))) => NextEventResponse::Close { - code: 1005, - reason: String::new(), - }, - Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, - Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, - Some(Err(e)) => NextEventResponse::Error(e.to_string()), + let (kind, value) = match val { + Some(Ok(Message::Text(text))) => ( + NextEventKind::String as u32, + Some(StringOrBuffer::String(text)), + ), + Some(Ok(Message::Binary(data))) => ( + NextEventKind::Binary as u32, + Some(StringOrBuffer::Buffer(data.into())), + ), + Some(Ok(Message::Close(Some(frame)))) => { + let code: u16 = frame.code.into(); + kind_out[1] = code as u32; + ( + NextEventKind::Close as u32, + Some(StringOrBuffer::String(frame.reason.to_string())), + ) + } + Some(Ok(Message::Close(None))) => { + kind_out[1] = 1005; + ( + NextEventKind::Close as u32, + Some(StringOrBuffer::String(String::new())), + ) + } + Some(Ok(Message::Ping(_))) => (NextEventKind::Ping as u32, None), + Some(Ok(Message::Pong(_))) => (NextEventKind::Pong as u32, None), + Some(Err(e)) => ( + NextEventKind::Error as u32, + Some(StringOrBuffer::String(e.to_string())), + ), None => { // No message was received, presumably the socket closed while we waited. // Try close the stream, ignoring any errors, and report closed status to JavaScript. let _ = state.borrow_mut().resource_table.close(rid); - NextEventResponse::Closed + (NextEventKind::Closed as u32, None) } }; - Ok(res) + kind_out[0] = kind as u32; + Ok(value) } pub fn init<P: WebSocketPermissions + 'static>( |