summaryrefslogtreecommitdiff
path: root/runtime/ops/signal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/signal.rs')
-rw-r--r--runtime/ops/signal.rs74
1 files changed, 44 insertions, 30 deletions
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs
index be6bc0a35..b3891792c 100644
--- a/runtime/ops/signal.rs
+++ b/runtime/ops/signal.rs
@@ -11,15 +11,23 @@ use std::rc::Rc;
#[cfg(unix)]
use deno_core::error::bad_resource_id;
#[cfg(unix)]
-use deno_core::futures::future::poll_fn;
-#[cfg(unix)]
use deno_core::serde_json;
#[cfg(unix)]
use deno_core::serde_json::json;
#[cfg(unix)]
+use deno_core::AsyncRefCell;
+#[cfg(unix)]
+use deno_core::CancelFuture;
+#[cfg(unix)]
+use deno_core::CancelHandle;
+#[cfg(unix)]
+use deno_core::RcRef;
+#[cfg(unix)]
+use deno_core::Resource;
+#[cfg(unix)]
use serde::Deserialize;
#[cfg(unix)]
-use std::task::Waker;
+use std::borrow::Cow;
#[cfg(unix)]
use tokio::signal::unix::{signal, Signal, SignalKind};
@@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
#[cfg(unix)]
/// The resource for signal stream.
/// The second element is the waker of polling future.
-pub struct SignalStreamResource(pub Signal, pub Option<Waker>);
+struct SignalStreamResource {
+ signal: AsyncRefCell<Signal>,
+ cancel: CancelHandle,
+}
+
+#[cfg(unix)]
+impl Resource for SignalStreamResource {
+ fn name(&self) -> Cow<str> {
+ "signal".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
#[cfg(unix)]
#[derive(Deserialize)]
@@ -54,13 +76,13 @@ fn op_signal_bind(
) -> Result<Value, AnyError> {
super::check_unstable(state, "Deno.signal");
let args: BindSignalArgs = serde_json::from_value(args)?;
- let rid = state.resource_table.add(
- "signal",
- Box::new(SignalStreamResource(
+ let resource = SignalStreamResource {
+ signal: AsyncRefCell::new(
signal(SignalKind::from_raw(args.signo)).expect(""),
- None,
- )),
- );
+ ),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
}))
@@ -76,18 +98,18 @@ async fn op_signal_poll(
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let future = poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- if let Some(mut signal) =
- state.resource_table.get_mut::<SignalStreamResource>(rid)
- {
- signal.1 = Some(cx.waker().clone());
- return signal.0.poll_recv(cx);
- }
- std::task::Poll::Ready(None)
- });
- let result = future.await;
- Ok(json!({ "done": result.is_none() }))
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<SignalStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await;
+
+ match signal.recv().or_cancel(cancel).await {
+ Ok(result) => Ok(json!({ "done": result.is_none() })),
+ Err(_) => Ok(json!({ "done": true })),
+ }
}
#[cfg(unix)]
@@ -99,14 +121,6 @@ pub fn op_signal_unbind(
super::check_unstable(state, "Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let resource = state.resource_table.get_mut::<SignalStreamResource>(rid);
- if let Some(signal) = resource {
- if let Some(waker) = &signal.1 {
- // Wakes up the pending poll if exists.
- // This prevents the poll future from getting stuck forever.
- waker.clone().wake();
- }
- }
state
.resource_table
.close(rid)