summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/integration/watcher_tests.rs40
-rw-r--r--cli/tests/unit/serve_test.ts3
-rw-r--r--cli/util/file_watcher.rs7
-rw-r--r--ext/http/http_next.rs73
4 files changed, 88 insertions, 35 deletions
diff --git a/cli/tests/integration/watcher_tests.rs b/cli/tests/integration/watcher_tests.rs
index 04320060b..2d41a74ed 100644
--- a/cli/tests/integration/watcher_tests.rs
+++ b/cli/tests/integration/watcher_tests.rs
@@ -1371,6 +1371,46 @@ async fn run_watch_reload_once() {
check_alive_then_kill(child);
}
+/// Regression test for https://github.com/denoland/deno/issues/18960. Ensures that Deno.serve
+/// operates properly after a watch restart.
+#[tokio::test]
+async fn test_watch_serve() {
+ let t = TempDir::new();
+ let file_to_watch = t.path().join("file_to_watch.js");
+ let file_content = r#"
+ console.error("serving");
+ await Deno.serve({port: 4600, handler: () => new Response("hello")});
+ "#;
+ write(&file_to_watch, file_content).unwrap();
+
+ let mut child = util::deno_cmd()
+ .current_dir(util::testdata_path())
+ .arg("run")
+ .arg("--watch")
+ .arg("--unstable")
+ .arg("--allow-net")
+ .arg("-L")
+ .arg("debug")
+ .arg(&file_to_watch)
+ .env("NO_COLOR", "1")
+ .stdout(std::process::Stdio::piped())
+ .stderr(std::process::Stdio::piped())
+ .spawn()
+ .unwrap();
+ let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
+
+ wait_contains("Listening on", &mut stdout_lines).await;
+ // Note that we start serving very quickly, so we specifically want to wait for this message
+ wait_contains(r#"Watching paths: [""#, &mut stderr_lines).await;
+
+ write(&file_to_watch, file_content).unwrap();
+
+ wait_contains("serving", &mut stderr_lines).await;
+ wait_contains("Listening on", &mut stdout_lines).await;
+
+ check_alive_then_kill(child);
+}
+
#[tokio::test]
async fn run_watch_dynamic_imports() {
let t = TempDir::new();
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 5d5d0428f..ce7267f58 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -94,8 +94,9 @@ Deno.test(async function httpServerRejectsOnAddrInUse() {
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
});
+ await listeningPromise;
- assertRejects(
+ await assertRejects(
() =>
Deno.serve({
handler: (_req) => new Response("ok"),
diff --git a/cli/util/file_watcher.rs b/cli/util/file_watcher.rs
index 05415f2a6..1ad5e9ba0 100644
--- a/cli/util/file_watcher.rs
+++ b/cli/util/file_watcher.rs
@@ -304,6 +304,13 @@ where
}
loop {
+ // We may need to give the runtime a tick to settle, as cancellations may need to propagate
+ // to tasks. We choose yielding 10 times to the runtime as a decent heuristic. If watch tests
+ // start to fail, this may need to be increased.
+ for _ in 0..10 {
+ tokio::task::yield_now().await;
+ }
+
let mut watcher = new_watcher(watcher_sender.clone())?;
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 593a9c816..5ed443142 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -625,83 +625,80 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
- cancel: RcRef<CancelHandle>,
) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.serve_connection(io, svc);
- conn
- .with_upgrades()
- .map_err(AnyError::from)
- .try_or_cancel(cancel)
+ conn.with_upgrades().map_err(AnyError::from)
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
- cancel: RcRef<CancelHandle>,
) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
- conn.map_err(AnyError::from).try_or_cancel(cancel)
+ conn.map_err(AnyError::from)
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
- cancel: RcRef<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
- serve_http2_unconditional(io, svc, cancel).await
+ serve_http2_unconditional(io, svc).await
} else {
- serve_http11_unconditional(io, svc, cancel).await
+ serve_http11_unconditional(io, svc).await
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
- cancel: RcRef<CancelHandle>,
+ cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
});
- spawn_local(async {
- io.handshake().await?;
- // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
- // based on the prefix bytes
- let handshake = io.get_ref().1.alpn_protocol();
- if handshake == Some(TLS_ALPN_HTTP_2) {
- serve_http2_unconditional(io, svc, cancel).await
- } else if handshake == Some(TLS_ALPN_HTTP_11) {
- serve_http11_unconditional(io, svc, cancel).await
- } else {
- serve_http2_autodetect(io, svc, cancel).await
+ spawn_local(
+ async {
+ io.handshake().await?;
+ // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
+ // based on the prefix bytes
+ let handshake = io.get_ref().1.alpn_protocol();
+ if handshake == Some(TLS_ALPN_HTTP_2) {
+ serve_http2_unconditional(io, svc).await
+ } else if handshake == Some(TLS_ALPN_HTTP_11) {
+ serve_http11_unconditional(io, svc).await
+ } else {
+ serve_http2_autodetect(io, svc).await
+ }
}
- })
+ .try_or_cancel(cancel),
+ )
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
- cancel: RcRef<CancelHandle>,
+ cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
});
- spawn_local(serve_http2_autodetect(io, svc, cancel))
+ spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
}
fn serve_http_on(
network_stream: NetworkStream,
listen_properties: &HttpListenProperties,
- cancel: RcRef<CancelHandle>,
+ cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// We always want some sort of peer address. If we can't get one, just make up one.
@@ -733,13 +730,14 @@ fn serve_http_on(
struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
- CancelHandle,
+ // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
+ Rc<CancelHandle>,
AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>,
);
impl HttpJoinHandle {
- fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
- RcRef::map(self, |this| &this.1)
+ fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
+ self.1.clone()
}
}
@@ -753,6 +751,13 @@ impl Resource for HttpJoinHandle {
}
}
+impl Drop for HttpJoinHandle {
+ fn drop(&mut self) {
+ // In some cases we may be dropped without closing, so let's cancel everything on the way out
+ self.1.cancel();
+ }
+}
+
#[op(v8)]
pub fn op_serve_http(
state: Rc<RefCell<OpState>>,
@@ -773,12 +778,12 @@ pub fn op_serve_http(
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None),
- CancelHandle::new(),
+ CancelHandle::new_rc(),
AsyncRefCell::new(rx),
));
let cancel_clone = resource.cancel_handle();
- let listen_properties_clone = listen_properties.clone();
+ let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn_local(async move {
loop {
let conn = listener
@@ -813,7 +818,7 @@ pub fn op_serve_http_on(
state: Rc<RefCell<OpState>>,
conn: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> {
- let network_stream =
+ let network_stream: NetworkStream =
DefaultHttpRequestProperties::get_network_stream_for_rid(
&mut state.borrow_mut(),
conn,
@@ -828,7 +833,7 @@ pub fn op_serve_http_on(
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None),
- CancelHandle::new(),
+ CancelHandle::new_rc(),
AsyncRefCell::new(rx),
));
@@ -862,7 +867,7 @@ pub async fn op_http_wait(
.resource_table
.get::<HttpJoinHandle>(rid)?;
- let cancel = join_handle.clone().cancel_handle();
+ let cancel = join_handle.cancel_handle();
let next = async {
let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
recv.recv().await