summaryrefslogtreecommitdiff
path: root/ext/fetch/27_eventsource.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/27_eventsource.js')
-rw-r--r--ext/fetch/27_eventsource.js337
1 files changed, 160 insertions, 177 deletions
diff --git a/ext/fetch/27_eventsource.js b/ext/fetch/27_eventsource.js
index b0f3004dc..02b77e88d 100644
--- a/ext/fetch/27_eventsource.js
+++ b/ext/fetch/27_eventsource.js
@@ -11,7 +11,6 @@ const {
NumberIsNaN,
ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf,
- Promise,
StringPrototypeEndsWith,
StringPrototypeIncludes,
StringPrototypeIndexOf,
@@ -19,7 +18,6 @@ const {
StringPrototypeSplit,
StringPrototypeStartsWith,
StringPrototypeToLowerCase,
- Symbol,
SymbolFor,
} = primordials;
@@ -32,6 +30,7 @@ import {
EventTarget,
setIsTrusted,
} from "ext:deno_web/02_event.js";
+import { clearTimeout, setTimeout } from "ext:deno_web/02_timers.js";
import { TransformStream } from "ext:deno_web/06_streams.js";
import { TextDecoderStream } from "ext:deno_web/08_text_encoding.js";
import { getLocationHref } from "ext:deno_web/12_location.js";
@@ -99,29 +98,24 @@ const CONNECTING = 0;
const OPEN = 1;
const CLOSED = 2;
-const _url = Symbol("[[url]]");
-const _withCredentials = Symbol("[[withCredentials]]");
-const _readyState = Symbol("[[readyState]]");
-const _reconnectionTime = Symbol("[[reconnectionTime]]");
-const _lastEventID = Symbol("[[lastEventID]]");
-const _abortController = Symbol("[[abortController]]");
-const _loop = Symbol("[[loop]]");
-
class EventSource extends EventTarget {
/** @type {AbortController} */
- [_abortController] = new AbortController();
+ #abortController = new AbortController();
+
+ /** @type {number | undefined} */
+ #reconnectionTimerId;
/** @type {number} */
- [_reconnectionTime] = 5000;
+ #reconnectionTime = 5000;
/** @type {string} */
- [_lastEventID] = "";
+ #lastEventId = "";
/** @type {number} */
- [_readyState] = CONNECTING;
+ #readyState = CONNECTING;
get readyState() {
webidl.assertBranded(this, EventSourcePrototype);
- return this[_readyState];
+ return this.#readyState;
}
get CONNECTING() {
@@ -138,36 +132,29 @@ class EventSource extends EventTarget {
}
/** @type {string} */
- [_url];
+ #url;
get url() {
webidl.assertBranded(this, EventSourcePrototype);
- return this[_url];
+ return this.#url;
}
/** @type {boolean} */
- [_withCredentials];
+ #withCredentials;
get withCredentials() {
webidl.assertBranded(this, EventSourcePrototype);
- return this[_withCredentials];
+ return this.#withCredentials;
}
constructor(url, eventSourceInitDict = {}) {
super();
this[webidl.brand] = webidl.brand;
const prefix = "Failed to construct 'EventSource'";
- webidl.requiredArguments(arguments.length, 1, {
- prefix,
- });
- url = webidl.converters.USVString(url, {
- prefix,
- context: "Argument 1",
- });
+ webidl.requiredArguments(arguments.length, 1, prefix);
+ url = webidl.converters.USVString(url, prefix, "Argument 1");
eventSourceInitDict = webidl.converters.EventSourceInit(
eventSourceInitDict,
- {
- prefix,
- context: "Argument 2",
- },
+ prefix,
+ "Argument 2",
);
try {
@@ -176,175 +163,171 @@ class EventSource extends EventTarget {
throw new DOMException(e.message, "SyntaxError");
}
- this[_url] = url;
- this[_withCredentials] = eventSourceInitDict.withCredentials;
+ this.#url = url;
+ this.#withCredentials = eventSourceInitDict.withCredentials;
- this[_loop]();
+ this.#loop();
}
close() {
webidl.assertBranded(this, EventSourcePrototype);
- this[_abortController].abort();
- this[_readyState] = CLOSED;
+ this.#abortController.abort();
+ this.#readyState = CLOSED;
+ clearTimeout(this.#reconnectionTimerId);
}
- async [_loop]() {
- let lastEventIDValue = "";
- while (this[_readyState] !== CLOSED) {
- const lastEventIDValueCopy = lastEventIDValue;
- lastEventIDValue = "";
- const req = newInnerRequest(
- "GET",
- this[_url],
- () =>
- lastEventIDValueCopy === ""
- ? [
- ["accept", "text/event-stream"],
- ]
- : [
- ["accept", "text/event-stream"],
- [
- "Last-Event-Id",
- op_utf8_to_byte_string(lastEventIDValueCopy),
- ],
- ],
- null,
- false,
- );
- /** @type {InnerResponse} */
- const res = await mainFetch(req, true, this[_abortController].signal);
-
- const contentType = ArrayPrototypeFind(
- res.headerList,
- (header) => StringPrototypeToLowerCase(header[0]) === "content-type",
- );
- if (res.type === "error") {
- if (res.aborted) {
- this[_readyState] = CLOSED;
- this.dispatchEvent(new Event("error"));
- break;
- } else {
- if (this[_readyState] === CLOSED) {
- this[_abortController].abort();
- break;
- }
- this[_readyState] = CONNECTING;
- this.dispatchEvent(new Event("error"));
- await new Promise((res) => setTimeout(res, this[_reconnectionTime]));
- if (this[_readyState] !== CONNECTING) {
- continue;
- }
+ async #loop() {
+ const lastEventIdValue = this.#lastEventId;
+ const req = newInnerRequest(
+ "GET",
+ this.#url,
+ () =>
+ lastEventIdValue === ""
+ ? [
+ ["accept", "text/event-stream"],
+ ]
+ : [
+ ["accept", "text/event-stream"],
+ ["Last-Event-Id", op_utf8_to_byte_string(lastEventIdValue)],
+ ],
+ null,
+ false,
+ );
+ /** @type {InnerResponse} */
+ let res;
+ try {
+ res = await mainFetch(req, true, this.#abortController.signal);
+ } catch {
+ this.#reestablishConnection();
+ return;
+ }
- if (this[_lastEventID] !== "") {
- lastEventIDValue = this[_lastEventID];
- }
- continue;
- }
- } else if (
- res.status !== 200 ||
- !StringPrototypeIncludes(
- contentType?.[1].toLowerCase(),
- "text/event-stream",
- )
- ) {
- this[_readyState] = CLOSED;
- this.dispatchEvent(new Event("error"));
- break;
- }
+ if (res.aborted) {
+ this.#failConnection();
+ return;
+ }
+ if (res.type === "error") {
+ this.#reestablishConnection();
+ return;
+ }
+ const contentType = ArrayPrototypeFind(
+ res.headerList,
+ (header) => StringPrototypeToLowerCase(header[0]) === "content-type",
+ );
+ if (
+ res.status !== 200 ||
+ !contentType ||
+ !StringPrototypeIncludes(
+ StringPrototypeToLowerCase(contentType[1]),
+ "text/event-stream",
+ )
+ ) {
+ this.#failConnection();
+ return;
+ }
- if (this[_readyState] !== CLOSED) {
- this[_readyState] = OPEN;
- this.dispatchEvent(new Event("open"));
+ if (this.#readyState === CLOSED) {
+ return;
+ }
+ this.#readyState = OPEN;
+ this.dispatchEvent(new Event("open"));
- let data = "";
- let eventType = "";
- let lastEventID = this[_lastEventID];
+ let data = "";
+ let eventType = "";
+ let lastEventId = this.#lastEventId;
- for await (
- // deno-lint-ignore prefer-primordials
- const chunk of res.body.stream
- .pipeThrough(new TextDecoderStream())
- .pipeThrough(new TextLineStream({ allowCR: true }))
- ) {
- if (chunk === "") {
- this[_lastEventID] = lastEventID;
- if (data === "") {
- eventType = "";
- continue;
- }
- if (StringPrototypeEndsWith(data, "\n")) {
- data = StringPrototypeSlice(data, 0, -1);
- }
- const event = new MessageEvent(eventType || "message", {
- data,
- origin: res.url(),
- lastEventId: this[_lastEventID],
- });
- setIsTrusted(event, true);
- data = "";
+ try {
+ for await (
+ // deno-lint-ignore prefer-primordials
+ const chunk of res.body.stream
+ .pipeThrough(new TextDecoderStream())
+ .pipeThrough(new TextLineStream({ allowCR: true }))
+ ) {
+ if (chunk === "") {
+ this.#lastEventId = lastEventId;
+ if (data === "") {
eventType = "";
- if (this[_readyState] !== CLOSED) {
- this.dispatchEvent(event);
- }
- } else if (StringPrototypeStartsWith(chunk, ":")) {
continue;
- } else {
- let field = chunk;
- let value = "";
- if (StringPrototypeIncludes(chunk, ":")) {
- ({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":"));
- if (StringPrototypeStartsWith(value, " ")) {
- value = StringPrototypeSlice(value, 1);
- }
+ }
+ if (StringPrototypeEndsWith(data, "\n")) {
+ data = StringPrototypeSlice(data, 0, -1);
+ }
+ const event = new MessageEvent(eventType || "message", {
+ data,
+ origin: res.url(),
+ lastEventId: this.#lastEventId,
+ });
+ setIsTrusted(event, true);
+ data = "";
+ eventType = "";
+ if (this.#readyState !== CLOSED) {
+ this.dispatchEvent(event);
+ }
+ } else if (StringPrototypeStartsWith(chunk, ":")) {
+ continue;
+ } else {
+ let field = chunk;
+ let value = "";
+ if (StringPrototypeIncludes(chunk, ":")) {
+ ({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":"));
+ if (StringPrototypeStartsWith(value, " ")) {
+ value = StringPrototypeSlice(value, 1);
}
+ }
- switch (field) {
- case "event": {
- eventType = value;
- break;
- }
- case "data": {
- data += value + "\n";
- break;
- }
- case "id": {
- if (!StringPrototypeIncludes(value, "\0")) {
- lastEventID = value;
- }
- break;
+ switch (field) {
+ case "event": {
+ eventType = value;
+ break;
+ }
+ case "data": {
+ data += value + "\n";
+ break;
+ }
+ case "id": {
+ if (!StringPrototypeIncludes(value, "\0")) {
+ lastEventId = value;
}
- case "retry": {
- const reconnectionTime = Number(value);
- if (
- !NumberIsNaN(reconnectionTime) &&
- NumberIsFinite(reconnectionTime)
- ) {
- this[_reconnectionTime] = reconnectionTime;
- }
- break;
+ break;
+ }
+ case "retry": {
+ const reconnectionTime = Number(value);
+ if (
+ !NumberIsNaN(reconnectionTime) &&
+ NumberIsFinite(reconnectionTime)
+ ) {
+ this.#reconnectionTime = reconnectionTime;
}
+ break;
}
}
-
- if (this[_abortController].signal.aborted) {
- break;
- }
- }
- if (this[_readyState] === CLOSED) {
- this[_abortController].abort();
- break;
- }
- this[_readyState] = CONNECTING;
- this.dispatchEvent(new Event("error"));
- await new Promise((res) => setTimeout(res, this[_reconnectionTime]));
- if (this[_readyState] !== CONNECTING) {
- continue;
}
+ }
+ } catch {
+ // The connection is reestablished below
+ }
- if (this[_lastEventID] !== "") {
- lastEventIDValue = this[_lastEventID];
- }
+ this.#reestablishConnection();
+ }
+
+ #reestablishConnection() {
+ if (this.#readyState === CLOSED) {
+ return;
+ }
+ this.#readyState = CONNECTING;
+ this.dispatchEvent(new Event("error"));
+ this.#reconnectionTimerId = setTimeout(() => {
+ if (this.#readyState !== CONNECTING) {
+ return;
}
+ this.#loop();
+ }, this.#reconnectionTime);
+ }
+
+ #failConnection() {
+ if (this.#readyState !== CLOSED) {
+ this.#readyState = CLOSED;
+ this.dispatchEvent(new Event("error"));
}
}