diff options
Diffstat (limited to 'ext/fetch/27_eventsource.js')
-rw-r--r-- | ext/fetch/27_eventsource.js | 379 |
1 files changed, 379 insertions, 0 deletions
diff --git a/ext/fetch/27_eventsource.js b/ext/fetch/27_eventsource.js new file mode 100644 index 000000000..6e35bb693 --- /dev/null +++ b/ext/fetch/27_eventsource.js @@ -0,0 +1,379 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +/// <reference path="../../core/internal.d.ts" /> + +const core = globalThis.Deno.core; + +import * as webidl from "ext:deno_webidl/00_webidl.js"; +import { URL } from "ext:deno_url/00_url.js"; +import DOMException from "ext:deno_web/01_dom_exception.js"; +import { + defineEventHandler, + EventTarget, + setIsTrusted, +} from "ext:deno_web/02_event.js"; +import { TransformStream } from "ext:deno_web/06_streams.js"; +import { TextDecoderStream } from "ext:deno_web/08_text_encoding.js"; +import { getLocationHref } from "ext:deno_web/12_location.js"; +import { newInnerRequest } from "ext:deno_fetch/23_request.js"; +import { mainFetch } from "ext:deno_fetch/26_fetch.js"; + +const primordials = globalThis.__bootstrap.primordials; +const { + ArrayPrototypeFind, + Number, + NumberIsFinite, + NumberIsNaN, + ObjectDefineProperties, + Promise, + StringPrototypeEndsWith, + StringPrototypeIncludes, + StringPrototypeIndexOf, + StringPrototypeSlice, + StringPrototypeSplit, + StringPrototypeStartsWith, + StringPrototypeToLowerCase, + Symbol, +} = primordials; + +// Copied from https://github.com/denoland/deno_std/blob/e0753abe0c8602552862a568348c046996709521/streams/text_line_stream.ts#L20-L74 +export class TextLineStream extends TransformStream { + #allowCR; + #buf = ""; + + constructor(options) { + super({ + transform: (chunk, controller) => this.#handle(chunk, controller), + flush: (controller) => { + if (this.#buf.length > 0) { + if ( + this.#allowCR && + this.#buf[this.#buf.length - 1] === "\r" + ) controller.enqueue(StringPrototypeSlice(this.#buf, 0, -1)); + else controller.enqueue(this.#buf); + } + }, + }); + this.#allowCR = options?.allowCR ?? false; + } + + #handle(chunk, controller) { + chunk = this.#buf + chunk; + + for (;;) { + const lfIndex = StringPrototypeIndexOf(chunk, "\n"); + + if (this.#allowCR) { + const crIndex = StringPrototypeIndexOf(chunk, "\r"); + + if ( + crIndex !== -1 && crIndex !== (chunk.length - 1) && + (lfIndex === -1 || (lfIndex - 1) > crIndex) + ) { + controller.enqueue(StringPrototypeSlice(chunk, 0, crIndex)); + chunk = StringPrototypeSlice(chunk, crIndex + 1); + continue; + } + } + + if (lfIndex !== -1) { + let crOrLfIndex = lfIndex; + if (chunk[lfIndex - 1] === "\r") { + crOrLfIndex--; + } + controller.enqueue(StringPrototypeSlice(chunk, 0, crOrLfIndex)); + chunk = StringPrototypeSlice(chunk, lfIndex + 1); + continue; + } + + break; + } + + this.#buf = chunk; + } +} + +const CONNECTING = 0; +const OPEN = 1; +const CLOSED = 2; + +const _url = Symbol("[[url]]"); +const _withCredentials = Symbol("[[withCredentials]]"); +const _readyState = Symbol("[[readyState]]"); +const _reconnectionTime = Symbol("[[reconnectionTime]]"); +const _lastEventID = Symbol("[[lastEventID]]"); +const _abortController = Symbol("[[abortController]]"); +const _loop = Symbol("[[loop]]"); + +class EventSource extends EventTarget { + /** @type {AbortController} */ + [_abortController] = new AbortController(); + + /** @type {number} */ + [_reconnectionTime] = 5000; + + /** @type {string} */ + [_lastEventID] = ""; + + /** @type {number} */ + [_readyState] = CONNECTING; + get readyState() { + webidl.assertBranded(this, EventSourcePrototype); + return this[_readyState]; + } + + get CONNECTING() { + webidl.assertBranded(this, EventSourcePrototype); + return CONNECTING; + } + get OPEN() { + webidl.assertBranded(this, EventSourcePrototype); + return OPEN; + } + get CLOSED() { + webidl.assertBranded(this, EventSourcePrototype); + return CLOSED; + } + + /** @type {string} */ + [_url]; + get url() { + webidl.assertBranded(this, EventSourcePrototype); + return this[_url]; + } + + /** @type {boolean} */ + [_withCredentials]; + get withCredentials() { + webidl.assertBranded(this, EventSourcePrototype); + return this[_withCredentials]; + } + + constructor(url, eventSourceInitDict = {}) { + super(); + this[webidl.brand] = webidl.brand; + const prefix = "Failed to construct 'EventSource'"; + webidl.requiredArguments(arguments.length, 1, { + prefix, + }); + url = webidl.converters.USVString(url, { + prefix, + context: "Argument 1", + }); + eventSourceInitDict = webidl.converters.EventSourceInit( + eventSourceInitDict, + { + prefix, + context: "Argument 2", + }, + ); + + try { + url = new URL(url, getLocationHref()).href; + } catch (e) { + throw new DOMException(e.message, "SyntaxError"); + } + + this[_url] = url; + this[_withCredentials] = eventSourceInitDict.withCredentials; + + this[_loop](); + } + + close() { + webidl.assertBranded(this, EventSourcePrototype); + this[_abortController].abort(); + this[_readyState] = CLOSED; + } + + async [_loop]() { + let lastEventIDValue = ""; + while (this[_readyState] !== CLOSED) { + const lastEventIDValueCopy = lastEventIDValue; + lastEventIDValue = ""; + const req = newInnerRequest( + "GET", + this[_url], + () => + lastEventIDValueCopy === "" + ? [ + ["accept", "text/event-stream"], + ] + : [ + ["accept", "text/event-stream"], + [ + "Last-Event-Id", + core.ops.op_utf8_to_byte_string(lastEventIDValueCopy), + ], + ], + null, + false, + ); + /** @type {InnerResponse} */ + const res = await mainFetch(req, true, this[_abortController].signal); + + const contentType = ArrayPrototypeFind( + res.headerList, + (header) => StringPrototypeToLowerCase(header[0]) === "content-type", + ); + if (res.type === "error") { + if (res.aborted) { + this[_readyState] = CLOSED; + this.dispatchEvent(new Event("error")); + break; + } else { + if (this[_readyState] === CLOSED) { + this[_abortController].abort(); + break; + } + this[_readyState] = CONNECTING; + this.dispatchEvent(new Event("error")); + await new Promise((res) => setTimeout(res, this[_reconnectionTime])); + if (this[_readyState] !== CONNECTING) { + continue; + } + + if (this[_lastEventID] !== "") { + lastEventIDValue = this[_lastEventID]; + } + continue; + } + } else if ( + res.status !== 200 || + !StringPrototypeIncludes( + contentType?.[1].toLowerCase(), + "text/event-stream", + ) + ) { + this[_readyState] = CLOSED; + this.dispatchEvent(new Event("error")); + break; + } + + if (this[_readyState] !== CLOSED) { + this[_readyState] = OPEN; + this.dispatchEvent(new Event("open")); + + let data = ""; + let eventType = ""; + let lastEventID = this[_lastEventID]; + + for await ( + // deno-lint-ignore prefer-primordials + const chunk of res.body.stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new TextLineStream({ allowCR: true })) + ) { + if (chunk === "") { + this[_lastEventID] = lastEventID; + if (data === "") { + eventType = ""; + continue; + } + if (StringPrototypeEndsWith(data, "\n")) { + data = StringPrototypeSlice(data, 0, -1); + } + const event = new MessageEvent(eventType || "message", { + data, + origin: res.url(), + lastEventId: this[_lastEventID], + }); + setIsTrusted(event, true); + data = ""; + eventType = ""; + if (this[_readyState] !== CLOSED) { + this.dispatchEvent(event); + } + } else if (StringPrototypeStartsWith(chunk, ":")) { + continue; + } else { + let field = chunk; + let value = ""; + if (StringPrototypeIncludes(chunk, ":")) { + ({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":")); + if (StringPrototypeStartsWith(value, " ")) { + value = StringPrototypeSlice(value, 1); + } + } + + switch (field) { + case "event": { + eventType = value; + break; + } + case "data": { + data += value + "\n"; + break; + } + case "id": { + if (!StringPrototypeIncludes(value, "\0")) { + lastEventID = value; + } + break; + } + case "retry": { + const reconnectionTime = Number(value); + if ( + !NumberIsNaN(reconnectionTime) && + NumberIsFinite(reconnectionTime) + ) { + this[_reconnectionTime] = reconnectionTime; + } + break; + } + } + } + + if (this[_abortController].signal.aborted) { + break; + } + } + if (this[_readyState] === CLOSED) { + this[_abortController].abort(); + break; + } + this[_readyState] = CONNECTING; + this.dispatchEvent(new Event("error")); + await new Promise((res) => setTimeout(res, this[_reconnectionTime])); + if (this[_readyState] !== CONNECTING) { + continue; + } + + if (this[_lastEventID] !== "") { + lastEventIDValue = this[_lastEventID]; + } + } + } + } +} + +const EventSourcePrototype = EventSource.prototype; + +ObjectDefineProperties(EventSource, { + CONNECTING: { + value: 0, + }, + OPEN: { + value: 1, + }, + CLOSED: { + value: 2, + }, +}); + +defineEventHandler(EventSource.prototype, "open"); +defineEventHandler(EventSource.prototype, "message"); +defineEventHandler(EventSource.prototype, "error"); + +webidl.converters.EventSourceInit = webidl.createDictionaryConverter( + "EventSourceInit", + [ + { + key: "withCredentials", + defaultValue: false, + converter: webidl.converters.boolean, + }, + ], +); + +export { EventSource }; |