summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/integration/run_tests.rs6
-rw-r--r--cli/tests/integration/watcher_tests.rs65
-rw-r--r--cli/tests/testdata/run/flash_shutdown/main.ts23
-rw-r--r--cli/tests/unit/flash_test.ts1
-rw-r--r--ext/flash/01_http.js26
-rw-r--r--ext/flash/lib.rs248
6 files changed, 145 insertions, 224 deletions
diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs
index fd60e85b7..267106913 100644
--- a/cli/tests/integration/run_tests.rs
+++ b/cli/tests/integration/run_tests.rs
@@ -3642,3 +3642,9 @@ itest!(no_lock_flag {
http_server: true,
exit_code: 0,
});
+
+// Check https://github.com/denoland/deno_std/issues/2882
+itest!(flash_shutdown {
+ args: "run --unstable --allow-net run/flash_shutdown/main.ts",
+ exit_code: 0,
+});
diff --git a/cli/tests/integration/watcher_tests.rs b/cli/tests/integration/watcher_tests.rs
index 6e3319b11..58f7e11fa 100644
--- a/cli/tests/integration/watcher_tests.rs
+++ b/cli/tests/integration/watcher_tests.rs
@@ -1167,68 +1167,3 @@ fn run_watch_dynamic_imports() {
check_alive_then_kill(child);
}
-
-// https://github.com/denoland/deno/issues/16267
-#[test]
-fn run_watch_flash() {
- let filename = "watch_flash.js";
- let t = TempDir::new();
- let file_to_watch = t.path().join(filename);
- write(
- &file_to_watch,
- r#"
- console.log("Starting flash server...");
- Deno.serve({
- onListen() {
- console.error("First server is listening");
- },
- handler: () => {},
- port: 4601,
- });
- "#,
- )
- .unwrap();
-
- let mut child = util::deno_cmd()
- .current_dir(t.path())
- .arg("run")
- .arg("--watch")
- .arg("--unstable")
- .arg("--allow-net")
- .arg("-L")
- .arg("debug")
- .arg(&file_to_watch)
- .env("NO_COLOR", "1")
- .stdout(std::process::Stdio::piped())
- .stderr(std::process::Stdio::piped())
- .spawn()
- .unwrap();
- let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
-
- wait_contains("Starting flash server...", &mut stdout_lines);
- wait_for(
- |m| m.contains("Watching paths") && m.contains(filename),
- &mut stderr_lines,
- );
-
- write(
- &file_to_watch,
- r#"
- console.log("Restarting flash server...");
- Deno.serve({
- onListen() {
- console.error("Second server is listening");
- },
- handler: () => {},
- port: 4601,
- });
- "#,
- )
- .unwrap();
-
- wait_contains("File change detected! Restarting!", &mut stderr_lines);
- wait_contains("Restarting flash server...", &mut stdout_lines);
- wait_contains("Second server is listening", &mut stderr_lines);
-
- check_alive_then_kill(child);
-}
diff --git a/cli/tests/testdata/run/flash_shutdown/main.ts b/cli/tests/testdata/run/flash_shutdown/main.ts
new file mode 100644
index 000000000..7f6985e34
--- /dev/null
+++ b/cli/tests/testdata/run/flash_shutdown/main.ts
@@ -0,0 +1,23 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+// Deno.serve caused segfault with this example after #16383
+// refs:
+// - https://github.com/denoland/deno/pull/16383
+// - https://github.com/denoland/deno_std/issues/2882
+// - revert https://github.com/denoland/deno/pull/16610
+
+const ctl = new AbortController();
+Deno.serve(() =>
+ new Promise((resolve) => {
+ resolve(new Response(new TextEncoder().encode("ok")));
+ ctl.abort();
+ }), {
+ signal: ctl.signal,
+ async onListen({ port }) {
+ const a = await fetch(`http://localhost:${port}`, {
+ method: "POST",
+ body: "",
+ });
+ await a.text();
+ },
+});
diff --git a/cli/tests/unit/flash_test.ts b/cli/tests/unit/flash_test.ts
index c64a7fe5a..024069455 100644
--- a/cli/tests/unit/flash_test.ts
+++ b/cli/tests/unit/flash_test.ts
@@ -70,7 +70,6 @@ Deno.test(async function httpServerRejectsOnAddrInUse() {
onError: createOnErrorCb(ac),
});
- await listeningPromise;
assertRejects(
() =>
Deno.serve({
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index 8eed6047e..7a6b9bc47 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -188,8 +188,8 @@
return str;
}
- function prepareFastCalls(serverId) {
- return core.ops.op_flash_make_request(serverId);
+ function prepareFastCalls() {
+ return core.ops.op_flash_make_request();
}
function hostnameForDisplay(hostname) {
@@ -482,11 +482,15 @@
const serverId = opFn(listenOpts);
const serverPromise = core.opAsync("op_flash_drive_server", serverId);
- const listenPromise = PromisePrototypeThen(
- core.opAsync("op_flash_wait_for_listening", serverId),
- (port) => {
- onListen({ hostname: listenOpts.hostname, port });
- },
+
+ PromisePrototypeCatch(
+ PromisePrototypeThen(
+ core.opAsync("op_flash_wait_for_listening", serverId),
+ (port) => {
+ onListen({ hostname: listenOpts.hostname, port });
+ },
+ ),
+ () => {},
);
const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
@@ -502,7 +506,7 @@
return;
}
server.closed = true;
- core.ops.op_flash_close_server(serverId);
+ await core.opAsync("op_flash_close_server", serverId);
await server.finished;
},
async serve() {
@@ -614,7 +618,7 @@
signal?.addEventListener("abort", () => {
clearInterval(dateInterval);
- server.close();
+ PromisePrototypeThen(server.close(), () => {}, () => {});
}, {
once: true,
});
@@ -629,7 +633,7 @@
);
}
- const fastOp = prepareFastCalls(serverId);
+ const fastOp = prepareFastCalls();
let nextRequestSync = () => fastOp.nextRequest();
let getMethodSync = (token) => fastOp.getMethod(token);
let respondFast = (token, response, shutdown) =>
@@ -649,8 +653,8 @@
}
await SafePromiseAll([
- listenPromise,
PromisePrototypeCatch(server.serve(), console.error),
+ serverPromise,
]);
};
}
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index 41d88b3a0..b077b8d21 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -35,7 +35,6 @@ use mio::Events;
use mio::Interest;
use mio::Poll;
use mio::Token;
-use mio::Waker;
use serde::Deserialize;
use serde::Serialize;
use socket2::Socket;
@@ -56,6 +55,7 @@ use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
+use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
@@ -76,24 +76,15 @@ pub struct FlashContext {
pub servers: HashMap<u32, ServerContext>,
}
-impl Drop for FlashContext {
- fn drop(&mut self) {
- // Signal each server instance to shutdown.
- for (_, server) in self.servers.drain() {
- let _ = server.waker.wake();
- }
- }
-}
-
pub struct ServerContext {
_addr: SocketAddr,
tx: mpsc::Sender<Request>,
- rx: Option<mpsc::Receiver<Request>>,
+ rx: mpsc::Receiver<Request>,
requests: HashMap<u32, Request>,
next_token: u32,
- listening_rx: Option<mpsc::Receiver<Result<u16, std::io::Error>>>,
+ listening_rx: Option<mpsc::Receiver<u16>>,
+ close_tx: mpsc::Sender<()>,
cancel_handle: Rc<CancelHandle>,
- waker: Arc<Waker>,
}
#[derive(Debug, Eq, PartialEq)]
@@ -111,10 +102,7 @@ fn op_flash_respond(
shutdown: bool,
) -> u32 {
let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = match flash_ctx.servers.get_mut(&server_id) {
- Some(ctx) => ctx,
- None => return 0,
- };
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
flash_respond(ctx, token, shutdown, &response)
}
@@ -132,10 +120,7 @@ async fn op_flash_respond_async(
let sock = {
let mut op_state = state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = match flash_ctx.servers.get_mut(&server_id) {
- Some(ctx) => ctx,
- None => return Ok(()),
- };
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
match shutdown {
true => {
@@ -399,30 +384,15 @@ fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
}
#[op]
-fn op_flash_drive_server(
- state: &mut OpState,
- server_id: u32,
-) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
- let join_handle = {
- let flash_ctx = state.borrow_mut::<FlashContext>();
- flash_ctx
- .join_handles
- .remove(&server_id)
- .ok_or_else(|| type_error("server not found"))?
+async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) {
+ let close_tx = {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx.cancel_handle.cancel();
+ ctx.close_tx.clone()
};
- Ok(async move {
- join_handle
- .await
- .map_err(|_| type_error("server join error"))??;
- Ok(())
- })
-}
-
-#[op]
-fn op_flash_close_server(state: &mut OpState, server_id: u32) {
- let flash_ctx = state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.remove(&server_id).unwrap();
- let _ = ctx.waker.wake();
+ let _ = close_tx.send(()).await;
}
#[op]
@@ -449,7 +419,7 @@ fn op_flash_path(
fn next_request_sync(ctx: &mut ServerContext) -> u32 {
let offset = ctx.next_token;
- while let Ok(token) = ctx.rx.as_mut().unwrap().try_recv() {
+ while let Ok(token) = ctx.rx.try_recv() {
ctx.requests.insert(ctx.next_token, token);
ctx.next_token += 1;
}
@@ -512,7 +482,6 @@ unsafe fn op_flash_get_method_fast(
fn op_flash_make_request<'scope>(
scope: &mut v8::HandleScope<'scope>,
state: &mut OpState,
- server_id: u32,
) -> serde_v8::Value<'scope> {
let object_template = v8::ObjectTemplate::new(scope);
assert!(object_template
@@ -520,7 +489,7 @@ fn op_flash_make_request<'scope>(
let obj = object_template.new_instance(scope).unwrap();
let ctx = {
let flash_ctx = state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ let ctx = flash_ctx.servers.get_mut(&0).unwrap();
ctx as *mut ServerContext
};
obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
@@ -736,10 +705,7 @@ async fn op_flash_read_body(
{
let op_state = &mut state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
- match flash_ctx.servers.get_mut(&server_id) {
- Some(ctx) => ctx as *mut ServerContext,
- None => return 0,
- }
+ flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext
}
.as_mut()
.unwrap()
@@ -841,40 +807,41 @@ pub struct ListenOpts {
reuseport: bool,
}
-const SERVER_TOKEN: Token = Token(0);
-// Token reserved for the thread close signal.
-const WAKER_TOKEN: Token = Token(1);
-
-#[allow(clippy::too_many_arguments)]
fn run_server(
tx: mpsc::Sender<Request>,
- listening_tx: mpsc::Sender<Result<u16, std::io::Error>>,
+ listening_tx: mpsc::Sender<u16>,
+ mut close_rx: mpsc::Receiver<()>,
addr: SocketAddr,
maybe_cert: Option<String>,
maybe_key: Option<String>,
reuseport: bool,
- mut poll: Poll,
- // We put a waker as an unused argument here as it needs to be alive both in
- // the flash thread and in the main thread (otherwise the notification would
- // not be caught by the event loop on Linux).
- // See the comment in mio's example:
- // https://docs.rs/mio/0.8.4/x86_64-unknown-linux-gnu/mio/struct.Waker.html#examples
- _waker: Arc<Waker>,
) -> Result<(), AnyError> {
- let mut listener = match listen(addr, reuseport) {
- Ok(listener) => listener,
- Err(e) => {
- listening_tx.blocking_send(Err(e)).unwrap();
- return Err(generic_error(
- "failed to start listening on the specified address",
- ));
- }
+ let domain = if addr.is_ipv4() {
+ socket2::Domain::IPV4
+ } else {
+ socket2::Domain::IPV6
};
+ let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
- // Register server.
+ #[cfg(not(windows))]
+ socket.set_reuse_address(true)?;
+ if reuseport {
+ #[cfg(target_os = "linux")]
+ socket.set_reuse_port(true)?;
+ }
+
+ let socket_addr = socket2::SockAddr::from(addr);
+ socket.bind(&socket_addr)?;
+ socket.listen(128)?;
+ socket.set_nonblocking(true)?;
+ let std_listener: std::net::TcpListener = socket.into();
+ let mut listener = TcpListener::from_std(std_listener);
+
+ let mut poll = Poll::new()?;
+ let token = Token(0);
poll
.registry()
- .register(&mut listener, SERVER_TOKEN, Interest::READABLE)
+ .register(&mut listener, token, Interest::READABLE)
.unwrap();
let tls_context: Option<Arc<rustls::ServerConfig>> = {
@@ -896,24 +863,30 @@ fn run_server(
};
listening_tx
- .blocking_send(Ok(listener.local_addr().unwrap().port()))
+ .blocking_send(listener.local_addr().unwrap().port())
.unwrap();
let mut sockets = HashMap::with_capacity(1000);
- let mut counter: usize = 2;
+ let mut counter: usize = 1;
let mut events = Events::with_capacity(1024);
'outer: loop {
- match poll.poll(&mut events, None) {
+ let result = close_rx.try_recv();
+ if result.is_ok() {
+ break 'outer;
+ }
+ // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
+ // timeout here to handle close signal.
+ match poll.poll(&mut events, Some(Duration::from_millis(100))) {
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => panic!("{}", e),
Ok(()) => (),
}
'events: for event in &events {
+ if close_rx.try_recv().is_ok() {
+ break 'outer;
+ }
let token = event.token();
match token {
- WAKER_TOKEN => {
- break 'outer;
- }
- SERVER_TOKEN => loop {
+ Token(0) => loop {
match listener.accept() {
Ok((mut socket, _)) => {
counter += 1;
@@ -1176,33 +1149,6 @@ fn run_server(
Ok(())
}
-#[inline]
-fn listen(
- addr: SocketAddr,
- reuseport: bool,
-) -> Result<TcpListener, std::io::Error> {
- let domain = if addr.is_ipv4() {
- socket2::Domain::IPV4
- } else {
- socket2::Domain::IPV6
- };
- let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
-
- #[cfg(not(windows))]
- socket.set_reuse_address(true)?;
- if reuseport {
- #[cfg(target_os = "linux")]
- socket.set_reuse_port(true)?;
- }
-
- let socket_addr = socket2::SockAddr::from(addr);
- socket.bind(&socket_addr)?;
- socket.listen(128)?;
- socket.set_nonblocking(true)?;
- let std_listener: std::net::TcpListener = socket.into();
- Ok(TcpListener::from_std(std_listener))
-}
-
fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
// Default to localhost if given just the port. Example: ":80"
if hostname.is_empty() {
@@ -1240,19 +1186,17 @@ where
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let (tx, rx) = mpsc::channel(100);
+ let (close_tx, close_rx) = mpsc::channel(1);
let (listening_tx, listening_rx) = mpsc::channel(1);
-
- let poll = Poll::new()?;
- let waker = Arc::new(Waker::new(poll.registry(), WAKER_TOKEN).unwrap());
let ctx = ServerContext {
_addr: addr,
tx,
- rx: Some(rx),
+ rx,
requests: HashMap::with_capacity(1000),
next_token: 0,
+ close_tx,
listening_rx: Some(listening_rx),
cancel_handle: CancelHandle::new_rc(),
- waker: waker.clone(),
};
let tx = ctx.tx.clone();
let maybe_cert = opts.cert;
@@ -1262,12 +1206,11 @@ where
run_server(
tx,
listening_tx,
+ close_rx,
addr,
maybe_cert,
maybe_key,
reuseport,
- poll,
- waker,
)
});
let flash_ctx = state.borrow_mut::<FlashContext>();
@@ -1302,26 +1245,45 @@ where
}
#[op]
-async fn op_flash_wait_for_listening(
- state: Rc<RefCell<OpState>>,
+fn op_flash_wait_for_listening(
+ state: &mut OpState,
server_id: u32,
-) -> Result<u16, AnyError> {
+) -> Result<impl Future<Output = Result<u16, AnyError>> + 'static, AnyError> {
let mut listening_rx = {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let flash_ctx = state.borrow_mut::<FlashContext>();
let server_ctx = flash_ctx
.servers
.get_mut(&server_id)
.ok_or_else(|| type_error("server not found"))?;
server_ctx.listening_rx.take().unwrap()
};
- match listening_rx.recv().await {
- Some(Ok(port)) => Ok(port),
- Some(Err(e)) => Err(e.into()),
- _ => Err(generic_error(
- "unknown error occurred while waiting for listening",
- )),
- }
+ Ok(async move {
+ if let Some(port) = listening_rx.recv().await {
+ Ok(port)
+ } else {
+ Err(generic_error("This error will be discarded"))
+ }
+ })
+}
+
+#[op]
+fn op_flash_drive_server(
+ state: &mut OpState,
+ server_id: u32,
+) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
+ let join_handle = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ flash_ctx
+ .join_handles
+ .remove(&server_id)
+ .ok_or_else(|| type_error("server not found"))?
+ };
+ Ok(async move {
+ join_handle
+ .await
+ .map_err(|_| type_error("server join error"))??;
+ Ok(())
+ })
}
// Asychronous version of op_flash_next. This can be a bottleneck under
@@ -1329,34 +1291,26 @@ async fn op_flash_wait_for_listening(
// requests i.e `op_flash_next() == 0`.
#[op]
async fn op_flash_next_async(
- state: Rc<RefCell<OpState>>,
+ op_state: Rc<RefCell<OpState>>,
server_id: u32,
) -> u32 {
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- let cancel_handle = ctx.cancel_handle.clone();
- let mut rx = ctx.rx.take().unwrap();
- // We need to drop the borrow before await point.
- drop(op_state);
-
- if let Ok(Some(req)) = rx.recv().or_cancel(&cancel_handle).await {
- let mut op_state = state.borrow_mut();
+ let ctx = {
+ let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx as *mut ServerContext
+ };
+ // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
+ // is responsible for ensuring this is not called concurrently.
+ let ctx = unsafe { &mut *ctx };
+ let cancel_handle = &ctx.cancel_handle;
+
+ if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
ctx.requests.insert(ctx.next_token, req);
ctx.next_token += 1;
- // Set the rx back.
- ctx.rx = Some(rx);
return 1;
}
- // Set the rx back.
- let mut op_state = state.borrow_mut();
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- if let Some(ctx) = flash_ctx.servers.get_mut(&server_id) {
- ctx.rx = Some(rx);
- }
0
}
@@ -1524,11 +1478,11 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_next_async::decl(),
op_flash_read_body::decl(),
op_flash_upgrade_websocket::decl(),
+ op_flash_drive_server::decl(),
op_flash_wait_for_listening::decl(),
op_flash_first_packet::decl(),
op_flash_has_body_stream::decl(),
op_flash_close_server::decl(),
- op_flash_drive_server::decl(),
op_flash_make_request::decl(),
op_flash_write_resource::decl(),
])