summaryrefslogtreecommitdiff
path: root/ext/websocket/02_websocketstream.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket/02_websocketstream.js')
-rw-r--r--ext/websocket/02_websocketstream.js74
1 files changed, 38 insertions, 36 deletions
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 50dfac284..df87c0c97 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -232,6 +232,43 @@
await this.closed;
},
});
+ const pull = async (controller) => {
+ const { kind, value } = await core.opAsync(
+ "op_ws_next_event",
+ this[_rid],
+ );
+
+ switch (kind) {
+ case "string": {
+ controller.enqueue(value);
+ break;
+ }
+ case "binary": {
+ controller.enqueue(value);
+ break;
+ }
+ case "ping": {
+ await core.opAsync("op_ws_send", this[_rid], {
+ kind: "pong",
+ });
+ await pull(controller);
+ break;
+ }
+ case "closed":
+ case "close": {
+ this[_closed].resolve(value);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ case "error": {
+ const err = new Error(value);
+ this[_closed].reject(err);
+ controller.error(err);
+ core.tryClose(this[_rid]);
+ break;
+ }
+ }
+ };
const readable = new ReadableStream({
start: (controller) => {
PromisePrototypeThen(this.closed, () => {
@@ -250,42 +287,7 @@
}
});
},
- pull: async (controller) => {
- const { kind, value } = await core.opAsync(
- "op_ws_next_event",
- this[_rid],
- );
-
- switch (kind) {
- case "string": {
- controller.enqueue(value);
- break;
- }
- case "binary": {
- controller.enqueue(value);
- break;
- }
- case "ping": {
- await core.opAsync("op_ws_send", this[_rid], {
- kind: "pong",
- });
- break;
- }
- case "closed":
- case "close": {
- this[_closed].resolve(value);
- core.tryClose(this[_rid]);
- break;
- }
- case "error": {
- const err = new Error(value);
- this[_closed].reject(err);
- controller.error(err);
- core.tryClose(this[_rid]);
- break;
- }
- }
- },
+ pull,
cancel: async (reason) => {
try {
this.close(reason?.code !== undefined ? reason : {});