diff options
Diffstat (limited to 'ext/fetch/27_eventsource.js')
-rw-r--r-- | ext/fetch/27_eventsource.js | 337 |
1 files changed, 160 insertions, 177 deletions
diff --git a/ext/fetch/27_eventsource.js b/ext/fetch/27_eventsource.js index b0f3004dc..02b77e88d 100644 --- a/ext/fetch/27_eventsource.js +++ b/ext/fetch/27_eventsource.js @@ -11,7 +11,6 @@ const { NumberIsNaN, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, - Promise, StringPrototypeEndsWith, StringPrototypeIncludes, StringPrototypeIndexOf, @@ -19,7 +18,6 @@ const { StringPrototypeSplit, StringPrototypeStartsWith, StringPrototypeToLowerCase, - Symbol, SymbolFor, } = primordials; @@ -32,6 +30,7 @@ import { EventTarget, setIsTrusted, } from "ext:deno_web/02_event.js"; +import { clearTimeout, setTimeout } from "ext:deno_web/02_timers.js"; import { TransformStream } from "ext:deno_web/06_streams.js"; import { TextDecoderStream } from "ext:deno_web/08_text_encoding.js"; import { getLocationHref } from "ext:deno_web/12_location.js"; @@ -99,29 +98,24 @@ const CONNECTING = 0; const OPEN = 1; const CLOSED = 2; -const _url = Symbol("[[url]]"); -const _withCredentials = Symbol("[[withCredentials]]"); -const _readyState = Symbol("[[readyState]]"); -const _reconnectionTime = Symbol("[[reconnectionTime]]"); -const _lastEventID = Symbol("[[lastEventID]]"); -const _abortController = Symbol("[[abortController]]"); -const _loop = Symbol("[[loop]]"); - class EventSource extends EventTarget { /** @type {AbortController} */ - [_abortController] = new AbortController(); + #abortController = new AbortController(); + + /** @type {number | undefined} */ + #reconnectionTimerId; /** @type {number} */ - [_reconnectionTime] = 5000; + #reconnectionTime = 5000; /** @type {string} */ - [_lastEventID] = ""; + #lastEventId = ""; /** @type {number} */ - [_readyState] = CONNECTING; + #readyState = CONNECTING; get readyState() { webidl.assertBranded(this, EventSourcePrototype); - return this[_readyState]; + return this.#readyState; } get CONNECTING() { @@ -138,36 +132,29 @@ class EventSource extends EventTarget { } /** @type {string} */ - [_url]; + #url; get url() { webidl.assertBranded(this, EventSourcePrototype); - return this[_url]; + return this.#url; } /** @type {boolean} */ - [_withCredentials]; + #withCredentials; get withCredentials() { webidl.assertBranded(this, EventSourcePrototype); - return this[_withCredentials]; + return this.#withCredentials; } constructor(url, eventSourceInitDict = {}) { super(); this[webidl.brand] = webidl.brand; const prefix = "Failed to construct 'EventSource'"; - webidl.requiredArguments(arguments.length, 1, { - prefix, - }); - url = webidl.converters.USVString(url, { - prefix, - context: "Argument 1", - }); + webidl.requiredArguments(arguments.length, 1, prefix); + url = webidl.converters.USVString(url, prefix, "Argument 1"); eventSourceInitDict = webidl.converters.EventSourceInit( eventSourceInitDict, - { - prefix, - context: "Argument 2", - }, + prefix, + "Argument 2", ); try { @@ -176,175 +163,171 @@ class EventSource extends EventTarget { throw new DOMException(e.message, "SyntaxError"); } - this[_url] = url; - this[_withCredentials] = eventSourceInitDict.withCredentials; + this.#url = url; + this.#withCredentials = eventSourceInitDict.withCredentials; - this[_loop](); + this.#loop(); } close() { webidl.assertBranded(this, EventSourcePrototype); - this[_abortController].abort(); - this[_readyState] = CLOSED; + this.#abortController.abort(); + this.#readyState = CLOSED; + clearTimeout(this.#reconnectionTimerId); } - async [_loop]() { - let lastEventIDValue = ""; - while (this[_readyState] !== CLOSED) { - const lastEventIDValueCopy = lastEventIDValue; - lastEventIDValue = ""; - const req = newInnerRequest( - "GET", - this[_url], - () => - lastEventIDValueCopy === "" - ? [ - ["accept", "text/event-stream"], - ] - : [ - ["accept", "text/event-stream"], - [ - "Last-Event-Id", - op_utf8_to_byte_string(lastEventIDValueCopy), - ], - ], - null, - false, - ); - /** @type {InnerResponse} */ - const res = await mainFetch(req, true, this[_abortController].signal); - - const contentType = ArrayPrototypeFind( - res.headerList, - (header) => StringPrototypeToLowerCase(header[0]) === "content-type", - ); - if (res.type === "error") { - if (res.aborted) { - this[_readyState] = CLOSED; - this.dispatchEvent(new Event("error")); - break; - } else { - if (this[_readyState] === CLOSED) { - this[_abortController].abort(); - break; - } - this[_readyState] = CONNECTING; - this.dispatchEvent(new Event("error")); - await new Promise((res) => setTimeout(res, this[_reconnectionTime])); - if (this[_readyState] !== CONNECTING) { - continue; - } + async #loop() { + const lastEventIdValue = this.#lastEventId; + const req = newInnerRequest( + "GET", + this.#url, + () => + lastEventIdValue === "" + ? [ + ["accept", "text/event-stream"], + ] + : [ + ["accept", "text/event-stream"], + ["Last-Event-Id", op_utf8_to_byte_string(lastEventIdValue)], + ], + null, + false, + ); + /** @type {InnerResponse} */ + let res; + try { + res = await mainFetch(req, true, this.#abortController.signal); + } catch { + this.#reestablishConnection(); + return; + } - if (this[_lastEventID] !== "") { - lastEventIDValue = this[_lastEventID]; - } - continue; - } - } else if ( - res.status !== 200 || - !StringPrototypeIncludes( - contentType?.[1].toLowerCase(), - "text/event-stream", - ) - ) { - this[_readyState] = CLOSED; - this.dispatchEvent(new Event("error")); - break; - } + if (res.aborted) { + this.#failConnection(); + return; + } + if (res.type === "error") { + this.#reestablishConnection(); + return; + } + const contentType = ArrayPrototypeFind( + res.headerList, + (header) => StringPrototypeToLowerCase(header[0]) === "content-type", + ); + if ( + res.status !== 200 || + !contentType || + !StringPrototypeIncludes( + StringPrototypeToLowerCase(contentType[1]), + "text/event-stream", + ) + ) { + this.#failConnection(); + return; + } - if (this[_readyState] !== CLOSED) { - this[_readyState] = OPEN; - this.dispatchEvent(new Event("open")); + if (this.#readyState === CLOSED) { + return; + } + this.#readyState = OPEN; + this.dispatchEvent(new Event("open")); - let data = ""; - let eventType = ""; - let lastEventID = this[_lastEventID]; + let data = ""; + let eventType = ""; + let lastEventId = this.#lastEventId; - for await ( - // deno-lint-ignore prefer-primordials - const chunk of res.body.stream - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new TextLineStream({ allowCR: true })) - ) { - if (chunk === "") { - this[_lastEventID] = lastEventID; - if (data === "") { - eventType = ""; - continue; - } - if (StringPrototypeEndsWith(data, "\n")) { - data = StringPrototypeSlice(data, 0, -1); - } - const event = new MessageEvent(eventType || "message", { - data, - origin: res.url(), - lastEventId: this[_lastEventID], - }); - setIsTrusted(event, true); - data = ""; + try { + for await ( + // deno-lint-ignore prefer-primordials + const chunk of res.body.stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream({ allowCR: true })) + ) { + if (chunk === "") { + this.#lastEventId = lastEventId; + if (data === "") { eventType = ""; - if (this[_readyState] !== CLOSED) { - this.dispatchEvent(event); - } - } else if (StringPrototypeStartsWith(chunk, ":")) { continue; - } else { - let field = chunk; - let value = ""; - if (StringPrototypeIncludes(chunk, ":")) { - ({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":")); - if (StringPrototypeStartsWith(value, " ")) { - value = StringPrototypeSlice(value, 1); - } + } + if (StringPrototypeEndsWith(data, "\n")) { + data = StringPrototypeSlice(data, 0, -1); + } + const event = new MessageEvent(eventType || "message", { + data, + origin: res.url(), + lastEventId: this.#lastEventId, + }); + setIsTrusted(event, true); + data = ""; + eventType = ""; + if (this.#readyState !== CLOSED) { + this.dispatchEvent(event); + } + } else if (StringPrototypeStartsWith(chunk, ":")) { + continue; + } else { + let field = chunk; + let value = ""; + if (StringPrototypeIncludes(chunk, ":")) { + ({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":")); + if (StringPrototypeStartsWith(value, " ")) { + value = StringPrototypeSlice(value, 1); } + } - switch (field) { - case "event": { - eventType = value; - break; - } - case "data": { - data += value + "\n"; - break; - } - case "id": { - if (!StringPrototypeIncludes(value, "\0")) { - lastEventID = value; - } - break; + switch (field) { + case "event": { + eventType = value; + break; + } + case "data": { + data += value + "\n"; + break; + } + case "id": { + if (!StringPrototypeIncludes(value, "\0")) { + lastEventId = value; } - case "retry": { - const reconnectionTime = Number(value); - if ( - !NumberIsNaN(reconnectionTime) && - NumberIsFinite(reconnectionTime) - ) { - this[_reconnectionTime] = reconnectionTime; - } - break; + break; + } + case "retry": { + const reconnectionTime = Number(value); + if ( + !NumberIsNaN(reconnectionTime) && + NumberIsFinite(reconnectionTime) + ) { + this.#reconnectionTime = reconnectionTime; } + break; } } - - if (this[_abortController].signal.aborted) { - break; - } - } - if (this[_readyState] === CLOSED) { - this[_abortController].abort(); - break; - } - this[_readyState] = CONNECTING; - this.dispatchEvent(new Event("error")); - await new Promise((res) => setTimeout(res, this[_reconnectionTime])); - if (this[_readyState] !== CONNECTING) { - continue; } + } + } catch { + // The connection is reestablished below + } - if (this[_lastEventID] !== "") { - lastEventIDValue = this[_lastEventID]; - } + this.#reestablishConnection(); + } + + #reestablishConnection() { + if (this.#readyState === CLOSED) { + return; + } + this.#readyState = CONNECTING; + this.dispatchEvent(new Event("error")); + this.#reconnectionTimerId = setTimeout(() => { + if (this.#readyState !== CONNECTING) { + return; } + this.#loop(); + }, this.#reconnectionTime); + } + + #failConnection() { + if (this.#readyState !== CLOSED) { + this.#readyState = CLOSED; + this.dispatchEvent(new Event("error")); } } |