summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-04-24 14:03:37 -0400
committerGitHub <noreply@github.com>2024-04-24 14:03:37 -0400
commiteed2598e6cf1db643b4edd07b5eff94c59eb9408 (patch)
tree0320981bba82c78647b9cf335793381400093ad9
parentb60822f6e0e3c1f3e360657cfb67c114df2e7032 (diff)
feat(ext/http): Implement request.signal for Deno.serve (#23425)
When the response has been successfully send, we abort the `Request.signal` property to indicate that all resources associated with this transaction may be torn down.
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts11
-rw-r--r--ext/http/00_serve.ts (renamed from ext/http/00_serve.js)83
-rw-r--r--ext/http/http_next.rs5
-rw-r--r--ext/http/lib.rs2
-rw-r--r--ext/http/response_body.rs15
-rw-r--r--ext/http/service.rs12
-rw-r--r--ext/node/polyfills/http.ts2
-rw-r--r--ext/node/polyfills/http2.ts2
-rw-r--r--runtime/js/90_deno_ns.js2
-rw-r--r--tests/unit/serve_test.ts17
-rw-r--r--tools/core_import_map.json2
11 files changed, 119 insertions, 34 deletions
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 76656108b..0c1ab8af0 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -10,6 +10,17 @@
declare namespace Deno {
export {}; // stop default export type behavior
+ /** Information for a HTTP request.
+ *
+ * @category HTTP Server
+ */
+ export interface ServeHandlerInfo {
+ /** The remote address of the connection. */
+ remoteAddr: Deno.NetAddr;
+ /** The completion promise */
+ completed: Promise<void>;
+ }
+
/** **UNSTABLE**: New API, yet to be vetted.
*
* Retrieve the process umask. If `mask` is provided, sets the process umask.
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.ts
index 52b833f10..1063f9691 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.ts
@@ -4,6 +4,7 @@ import { core, internals, primordials } from "ext:core/mod.js";
const {
BadResourcePrototype,
InterruptedPrototype,
+ Interrupted,
internalRidSymbol,
} = core;
import {
@@ -37,6 +38,7 @@ const {
TypeError,
TypedArrayPrototypeGetSymbolToStringTag,
Uint8Array,
+ Promise,
} = primordials;
import { InnerBody } from "ext:deno_fetch/22_body.js";
@@ -132,14 +134,31 @@ class InnerRequest {
#body;
#upgraded;
#urlValue;
+ #completed;
+ #abortController;
- constructor(external, context) {
+ constructor(external, context, abortController) {
this.#external = external;
this.#context = context;
this.#upgraded = false;
+ this.#completed = undefined;
+ this.#abortController = abortController;
}
- close() {
+ close(success = true) {
+ // The completion signal fires only if someone cares
+ if (this.#completed) {
+ if (success) {
+ this.#completed.resolve(undefined);
+ } else {
+ this.#completed.reject(
+ new Interrupted("HTTP response was not sent successfully"),
+ );
+ }
+ }
+ // Unconditionally abort the request signal. Note that we don't use
+ // an error here.
+ this.#abortController.abort();
this.#external = null;
}
@@ -271,6 +290,19 @@ class InnerRequest {
path;
}
+ get completed() {
+ if (!this.#completed) {
+ // NOTE: this is faster than Promise.withResolvers()
+ let resolve, reject;
+ const promise = new Promise((r1, r2) => {
+ resolve = r1;
+ reject = r2;
+ });
+ this.#completed = { promise, resolve, reject };
+ }
+ return this.#completed.promise;
+ }
+
get remoteAddr() {
const transport = this.#context.listener?.addr.transport;
if (transport === "unix" || transport === "unixpacket") {
@@ -375,16 +407,24 @@ class CallbackContext {
}
class ServeHandlerInfo {
- #inner = null;
- constructor(inner) {
+ #inner: InnerRequest;
+ constructor(inner: InnerRequest) {
this.#inner = inner;
}
get remoteAddr() {
return this.#inner.remoteAddr;
}
+ get completed() {
+ return this.#inner.completed;
+ }
}
-function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
+function fastSyncResponseOrStream(
+ req,
+ respBody,
+ status,
+ innerRequest: InnerRequest,
+) {
if (respBody === null || respBody === undefined) {
// Don't set the body
innerRequest?.close();
@@ -428,8 +468,8 @@ function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
autoClose,
status,
),
- () => {
- innerRequest?.close();
+ (success) => {
+ innerRequest?.close(success);
op_http_close_after_finish(req);
},
);
@@ -443,15 +483,16 @@ function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
- const signal = context.abortController.signal;
-
return async function (req) {
+ const abortController = new AbortController();
+ const signal = abortController.signal;
+
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
// 500 error.
let innerRequest;
let response;
try {
- innerRequest = new InnerRequest(req, context);
+ innerRequest = new InnerRequest(req, context, abortController);
response = await callback(
fromInnerRequest(innerRequest, signal, "immutable"),
new ServeHandlerInfo(innerRequest),
@@ -509,9 +550,27 @@ function mapToCallback(context, callback, onError) {
};
}
+type RawHandler = (
+ request: Request,
+ info: ServeHandlerInfo,
+) => Response | Promise<Response>;
+
+type RawServeOptions = {
+ port?: number;
+ hostname?: string;
+ signal?: AbortSignal;
+ reusePort?: boolean;
+ key?: string;
+ cert?: string;
+ onError?: (error: unknown) => Response | Promise<Response>;
+ onListen?: (params: { hostname: string; port: number }) => void;
+ handler?: RawHandler;
+};
+
function serve(arg1, arg2) {
- let options = undefined;
- let handler = undefined;
+ let options: RawServeOptions | undefined;
+ let handler: RawHandler | undefined;
+
if (typeof arg1 === "function") {
handler = arg1;
} else if (typeof arg2 === "function") {
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index a6527397f..9bdb79f86 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -683,7 +683,7 @@ pub async fn op_http_set_response_body_resource(
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
-) -> Result<(), AnyError> {
+) -> Result<bool, AnyError> {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_body_resource") };
@@ -716,8 +716,7 @@ pub async fn op_http_set_response_body_resource(
},
);
- http.response_body_finished().await;
- Ok(())
+ Ok(http.response_body_finished().await)
}
#[op2(fast)]
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 6fc7207be..934f8a002 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -131,7 +131,7 @@ deno_core::extension!(
http_next::op_http_close,
http_next::op_http_cancel,
],
- esm = ["00_serve.js", "01_http.js", "02_websocket.ts"],
+ esm = ["00_serve.ts", "01_http.js", "02_websocket.ts"],
);
pub enum HttpSocketAddr {
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index dac708b96..6b033ffe0 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -16,7 +16,6 @@ use deno_core::Resource;
use flate2::write::GzEncoder;
use hyper::body::Frame;
use hyper::body::SizeHint;
-use hyper::header::HeaderMap;
use pin_project::pin_project;
/// Simplification for nested types we use for our streams. We provide a way to convert from
@@ -30,10 +29,6 @@ pub enum ResponseStreamResult {
/// not register a waker and should be called again at the lowest level of this code. Generally this
/// will only be returned from compression streams that require additional buffering.
NoData,
- /// Stream provided trailers.
- // TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc.
- #[allow(unused)]
- Trailers(HeaderMap),
/// Stream failed.
Error(AnyError),
}
@@ -44,7 +39,6 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
ResponseStreamResult::EndOfStream => None,
ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))),
ResponseStreamResult::Error(err) => Some(Err(err)),
- ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))),
// This result should be handled by retrying
ResponseStreamResult::NoData => unimplemented!(),
}
@@ -198,6 +192,11 @@ impl ResponseBytesInner {
_ => Self::Bytes(BufView::from(vec)),
}
}
+
+ /// Did we complete this response successfully?
+ pub fn is_complete(&self) -> bool {
+ matches!(self, ResponseBytesInner::Done | ResponseBytesInner::Empty)
+ }
}
pub struct ResourceBodyAdapter {
@@ -387,9 +386,7 @@ impl PollFrame for GZipResponseStream {
let start_out = stm.total_out();
let res = match frame {
// Short-circuit these and just return
- x @ (ResponseStreamResult::NoData
- | ResponseStreamResult::Error(..)
- | ResponseStreamResult::Trailers(..)) => {
+ x @ (ResponseStreamResult::NoData | ResponseStreamResult::Error(..)) => {
return std::task::Poll::Ready(x)
}
ResponseStreamResult::EndOfStream => {
diff --git a/ext/http/service.rs b/ext/http/service.rs
index 932575e37..f38fec4f4 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -482,12 +482,13 @@ impl HttpRecord {
HttpRecordReady(self)
}
- /// Resolves when response body has finished streaming.
- pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ {
+ /// Resolves when response body has finished streaming. Returns true if the
+ /// response completed.
+ pub fn response_body_finished(&self) -> impl Future<Output = bool> + '_ {
struct HttpRecordFinished<'a>(&'a HttpRecord);
impl<'a> Future for HttpRecordFinished<'a> {
- type Output = ();
+ type Output = bool;
fn poll(
self: Pin<&mut Self>,
@@ -495,7 +496,10 @@ impl HttpRecord {
) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_body_finished {
- return Poll::Ready(());
+ // If we sent the response body and the trailers, this body completed successfully
+ return Poll::Ready(
+ mut_self.response_body.is_complete() && mut_self.trailers.is_none(),
+ );
}
mut_self.response_body_waker = Some(cx.waker().clone());
Poll::Pending
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts
index ceaf7aeb8..07ef66146 100644
--- a/ext/node/polyfills/http.ts
+++ b/ext/node/polyfills/http.ts
@@ -59,7 +59,7 @@ import {
ERR_UNESCAPED_CHARACTERS,
} from "ext:deno_node/internal/errors.ts";
import { getTimerDuration } from "ext:deno_node/internal/timers.mjs";
-import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js";
+import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.ts";
import { createHttpClient } from "ext:deno_fetch/22_http_client.js";
import { headersEntries } from "ext:deno_fetch/20_headers.js";
import { timerId } from "ext:deno_web/03_abort_signal.js";
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 023b6acd3..02e66e3da 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -36,7 +36,7 @@ import {
} from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "node:fs/promises";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
-import { serveHttpOnConnection } from "ext:deno_http/00_serve.js";
+import { serveHttpOnConnection } from "ext:deno_http/00_serve.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
import { Duplex } from "node:stream";
diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js
index 96799cb09..02ac7b602 100644
--- a/runtime/js/90_deno_ns.js
+++ b/runtime/js/90_deno_ns.js
@@ -13,7 +13,7 @@ import * as console from "ext:deno_console/01_console.js";
import * as ffi from "ext:deno_ffi/00_ffi.js";
import * as net from "ext:deno_net/01_net.js";
import * as tls from "ext:deno_net/02_tls.js";
-import * as serve from "ext:deno_http/00_serve.js";
+import * as serve from "ext:deno_http/00_serve.ts";
import * as http from "ext:deno_http/01_http.js";
import * as websocket from "ext:deno_http/02_websocket.ts";
import * as errors from "ext:runtime/01_errors.js";
diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts
index 048529ae9..32d05056a 100644
--- a/tests/unit/serve_test.ts
+++ b/tests/unit/serve_test.ts
@@ -2843,7 +2843,20 @@ Deno.test(
async function httpServerCancelFetch() {
const request2 = Promise.withResolvers<void>();
const request2Aborted = Promise.withResolvers<string>();
- const { finished, abort } = await makeServer(async (req) => {
+ let completed = 0;
+ let aborted = 0;
+ const { finished, abort } = await makeServer(async (req, context) => {
+ context.completed.then(() => {
+ console.log("completed");
+ completed++;
+ }).catch(() => {
+ console.log("completed (error)");
+ completed++;
+ });
+ req.signal.onabort = () => {
+ console.log("aborted", req.url);
+ aborted++;
+ };
if (req.url.endsWith("/1")) {
const fetchRecursive = await fetch(`http://localhost:${servePort}/2`);
return new Response(fetchRecursive.body);
@@ -2871,6 +2884,8 @@ Deno.test(
abort();
await finished;
+ assertEquals(completed, 2);
+ assertEquals(aborted, 2);
},
);
diff --git a/tools/core_import_map.json b/tools/core_import_map.json
index 463095de8..421769e52 100644
--- a/tools/core_import_map.json
+++ b/tools/core_import_map.json
@@ -15,7 +15,7 @@
"ext:deno_fetch/26_fetch.js": "../ext/fetch/26_fetch.js",
"ext:deno_ffi/00_ffi.js": "../ext/ffi/00_ffi.js",
"ext:deno_fs/30_fs.js": "../ext/fs/30_fs.js",
- "ext:deno_http/00_serve.js": "../ext/http/00_serve.js",
+ "ext:deno_http/00_serve.ts": "../ext/http/00_serve.ts",
"ext:deno_http/01_http.js": "../ext/http/01_http.js",
"ext:deno_io/12_io.js": "../ext/io/12_io.js",
"ext:deno_kv/01_db.ts": "../ext/kv/01_db.ts",