diff options
Diffstat (limited to 'ext/websocket/02_websocketstream.js')
-rw-r--r-- | ext/websocket/02_websocketstream.js | 74 |
1 files changed, 38 insertions, 36 deletions
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js index 50dfac284..df87c0c97 100644 --- a/ext/websocket/02_websocketstream.js +++ b/ext/websocket/02_websocketstream.js @@ -232,6 +232,43 @@ await this.closed; }, }); + const pull = async (controller) => { + const { kind, value } = await core.opAsync( + "op_ws_next_event", + this[_rid], + ); + + switch (kind) { + case "string": { + controller.enqueue(value); + break; + } + case "binary": { + controller.enqueue(value); + break; + } + case "ping": { + await core.opAsync("op_ws_send", this[_rid], { + kind: "pong", + }); + await pull(controller); + break; + } + case "closed": + case "close": { + this[_closed].resolve(value); + core.tryClose(this[_rid]); + break; + } + case "error": { + const err = new Error(value); + this[_closed].reject(err); + controller.error(err); + core.tryClose(this[_rid]); + break; + } + } + }; const readable = new ReadableStream({ start: (controller) => { PromisePrototypeThen(this.closed, () => { @@ -250,42 +287,7 @@ } }); }, - pull: async (controller) => { - const { kind, value } = await core.opAsync( - "op_ws_next_event", - this[_rid], - ); - - switch (kind) { - case "string": { - controller.enqueue(value); - break; - } - case "binary": { - controller.enqueue(value); - break; - } - case "ping": { - await core.opAsync("op_ws_send", this[_rid], { - kind: "pong", - }); - break; - } - case "closed": - case "close": { - this[_closed].resolve(value); - core.tryClose(this[_rid]); - break; - } - case "error": { - const err = new Error(value); - this[_closed].reject(err); - controller.error(err); - core.tryClose(this[_rid]); - break; - } - } - }, + pull, cancel: async (reason) => { try { this.close(reason?.code !== undefined ? reason : {}); |