diff options
Diffstat (limited to 'extensions/fetch/26_fetch.js')
-rw-r--r-- | extensions/fetch/26_fetch.js | 217 |
1 files changed, 171 insertions, 46 deletions
diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js index 1dd797339..6e1d4d4c8 100644 --- a/extensions/fetch/26_fetch.js +++ b/extensions/fetch/26_fetch.js @@ -15,14 +15,18 @@ const core = window.Deno.core; const webidl = window.__bootstrap.webidl; const { byteLowerCase } = window.__bootstrap.infra; + const { errorReadableStream } = window.__bootstrap.streams; const { InnerBody, extractBody } = window.__bootstrap.fetchBody; const { toInnerRequest, + toInnerResponse, fromInnerResponse, redirectStatus, nullBodyStatus, networkError, + abortedNetworkError, } = window.__bootstrap.fetch; + const abortSignal = window.__bootstrap.abortSignal; const REQUEST_BODY_HEADER_NAMES = [ "content-encoding", @@ -68,10 +72,26 @@ /** * @param {number} responseBodyRid + * @param {AbortSignal} [terminator] * @returns {ReadableStream<Uint8Array>} */ - function createResponseBodyStream(responseBodyRid) { - return new ReadableStream({ + function createResponseBodyStream(responseBodyRid, terminator) { + function onAbort() { + if (readable) { + errorReadableStream( + readable, + new DOMException("Ongoing fetch was aborted.", "AbortError"), + ); + } + try { + core.close(responseBodyRid); + } catch (_) { + // might have already been closed + } + } + // TODO(lucacasonato): clean up registration + terminator[abortSignal.add](onAbort); + const readable = new ReadableStream({ type: "bytes", async pull(controller) { try { @@ -88,28 +108,45 @@ } else { // We have reached the end of the body, so we close the stream. controller.close(); - core.close(responseBodyRid); + try { + core.close(responseBodyRid); + } catch (_) { + // might have already been closed + } } } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - core.close(responseBodyRid); + if (terminator.aborted) { + controller.error( + new DOMException("Ongoing fetch was aborted.", "AbortError"), + ); + } else { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + } + try { + core.close(responseBodyRid); + } catch (_) { + // might have already been closed + } } }, cancel() { - core.close(responseBodyRid); + if (!terminator.aborted) { + terminator[abortSignal.signalAbort](); + } }, }); + return readable; } /** * @param {InnerRequest} req * @param {boolean} recursive + * @param {AbortSignal} terminator * @returns {Promise<InnerResponse>} */ - async function mainFetch(req, recursive) { + async function mainFetch(req, recursive, terminator) { /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ let reqBody = null; if (req.body !== null) { @@ -130,7 +167,7 @@ } } - const { requestRid, requestBodyRid } = opFetch({ + const { requestRid, requestBodyRid, cancelHandleRid } = opFetch({ method: req.method, url: req.currentUrl(), headers: req.headerList, @@ -138,6 +175,20 @@ hasBody: reqBody !== null, }, reqBody instanceof Uint8Array ? reqBody : null); + function onAbort() { + try { + core.close(cancelHandleRid); + } catch (_) { + // might have already been closed + } + try { + core.close(requestBodyRid); + } catch (_) { + // might have already been closed + } + } + terminator[abortSignal.add](onAbort); + if (requestBodyRid !== null) { if (reqBody === null || !(reqBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); @@ -145,24 +196,49 @@ const reader = reqBody.getReader(); (async () => { while (true) { - const { value, done } = await reader.read(); + const { value, done } = await reader.read().catch((err) => { + if (terminator.aborted) return { done: true, value: undefined }; + throw err; + }); if (done) break; if (!(value instanceof Uint8Array)) { await reader.cancel("value not a Uint8Array"); break; } try { - await opFetchRequestWrite(requestBodyRid, value); + await opFetchRequestWrite(requestBodyRid, value).catch((err) => { + if (terminator.aborted) return; + throw err; + }); + if (terminator.aborted) break; } catch (err) { await reader.cancel(err); break; } } - core.close(requestBodyRid); + try { + core.close(requestBodyRid); + } catch (_) { + // might have already been closed + } })(); } - const resp = await opFetchSend(requestRid); + let resp; + try { + resp = await opFetchSend(requestRid).catch((err) => { + if (terminator.aborted) return; + throw err; + }); + } finally { + try { + core.close(cancelHandleRid); + } catch (_) { + // might have already been closed + } + } + if (terminator.aborted) return abortedNetworkError(); + /** @type {InnerResponse} */ const response = { headerList: resp.headers, @@ -185,7 +261,7 @@ ); case "follow": core.close(resp.responseRid); - return httpRedirectFetch(req, response); + return httpRedirectFetch(req, response, terminator); case "manual": break; } @@ -194,7 +270,9 @@ if (nullBodyStatus(response.status)) { core.close(resp.responseRid); } else { - response.body = new InnerBody(createResponseBodyStream(resp.responseRid)); + response.body = new InnerBody( + createResponseBodyStream(resp.responseRid, terminator), + ); } if (recursive) return response; @@ -211,7 +289,7 @@ * @param {InnerResponse} response * @returns {Promise<InnerResponse>} */ - function httpRedirectFetch(request, response) { + function httpRedirectFetch(request, response, terminator) { const locationHeaders = response.headerList.filter((entry) => byteLowerCase(entry[0]) === "location" ); @@ -264,43 +342,90 @@ request.body = res.body; } request.urlList.push(locationURL.href); - return mainFetch(request, true); + return mainFetch(request, true, terminator); } /** * @param {RequestInfo} input * @param {RequestInit} init */ - async function fetch(input, init = {}) { - const prefix = "Failed to call 'fetch'"; - webidl.requiredArguments(arguments.length, 1, { prefix }); - input = webidl.converters["RequestInfo"](input, { - prefix, - context: "Argument 1", - }); - init = webidl.converters["RequestInit"](init, { - prefix, - context: "Argument 2", - }); - + function fetch(input, init = {}) { // 1. - const requestObject = new Request(input, init); - // 2. - const request = toInnerRequest(requestObject); - // 10. - if (!requestObject.headers.has("Accept")) { - request.headerList.push(["Accept", "*/*"]); - } + const p = new Promise((resolve, reject) => { + const prefix = "Failed to call 'fetch'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + input = webidl.converters["RequestInfo"](input, { + prefix, + context: "Argument 1", + }); + init = webidl.converters["RequestInit"](init, { + prefix, + context: "Argument 2", + }); - // 12. - const response = await mainFetch(request, false); - if (response.type === "error") { - throw new TypeError( - "Fetch failed: " + (response.error ?? "unknown error"), - ); - } + // 2. + const requestObject = new Request(input, init); + // 3. + const request = toInnerRequest(requestObject); + // 4. + if (requestObject.signal.aborted) { + reject(abortFetch(request, null)); + return; + } - return fromInnerResponse(response, "immutable"); + // 7. + let responseObject = null; + // 9. + let locallyAborted = false; + // 10. + function onabort() { + locallyAborted = true; + reject(abortFetch(request, responseObject)); + } + requestObject.signal[abortSignal.add](onabort); + + if (!requestObject.headers.has("Accept")) { + request.headerList.push(["Accept", "*/*"]); + } + + // 12. + mainFetch(request, false, requestObject.signal).then((response) => { + // 12.1. + if (locallyAborted) return; + // 12.2. + if (response.aborted) { + reject(request, responseObject); + requestObject.signal[abortSignal.remove](onabort); + return; + } + // 12.3. + if (response.type === "error") { + const err = new TypeError( + "Fetch failed: " + (response.error ?? "unknown error"), + ); + reject(err); + requestObject.signal[abortSignal.remove](onabort); + return; + } + responseObject = fromInnerResponse(response, "immutable"); + resolve(responseObject); + requestObject.signal[abortSignal.remove](onabort); + }).catch((err) => { + reject(err); + requestObject.signal[abortSignal.remove](onabort); + }); + }); + return p; + } + + function abortFetch(request, responseObject) { + const error = new DOMException("Ongoing fetch was aborted.", "AbortError"); + if (request.body !== null) request.body.cancel(error); + if (responseObject !== null) { + const response = toInnerResponse(responseObject); + if (response.body !== null) response.body.error(error); + } + return error; } window.__bootstrap.fetch ??= {}; |