diff options
Diffstat (limited to 'runtime/ops/signal.rs')
-rw-r--r-- | runtime/ops/signal.rs | 74 |
1 files changed, 44 insertions, 30 deletions
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs index be6bc0a35..b3891792c 100644 --- a/runtime/ops/signal.rs +++ b/runtime/ops/signal.rs @@ -11,15 +11,23 @@ use std::rc::Rc; #[cfg(unix)] use deno_core::error::bad_resource_id; #[cfg(unix)] -use deno_core::futures::future::poll_fn; -#[cfg(unix)] use deno_core::serde_json; #[cfg(unix)] use deno_core::serde_json::json; #[cfg(unix)] +use deno_core::AsyncRefCell; +#[cfg(unix)] +use deno_core::CancelFuture; +#[cfg(unix)] +use deno_core::CancelHandle; +#[cfg(unix)] +use deno_core::RcRef; +#[cfg(unix)] +use deno_core::Resource; +#[cfg(unix)] use serde::Deserialize; #[cfg(unix)] -use std::task::Waker; +use std::borrow::Cow; #[cfg(unix)] use tokio::signal::unix::{signal, Signal, SignalKind}; @@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) { #[cfg(unix)] /// The resource for signal stream. /// The second element is the waker of polling future. -pub struct SignalStreamResource(pub Signal, pub Option<Waker>); +struct SignalStreamResource { + signal: AsyncRefCell<Signal>, + cancel: CancelHandle, +} + +#[cfg(unix)] +impl Resource for SignalStreamResource { + fn name(&self) -> Cow<str> { + "signal".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} #[cfg(unix)] #[derive(Deserialize)] @@ -54,13 +76,13 @@ fn op_signal_bind( ) -> Result<Value, AnyError> { super::check_unstable(state, "Deno.signal"); let args: BindSignalArgs = serde_json::from_value(args)?; - let rid = state.resource_table.add( - "signal", - Box::new(SignalStreamResource( + let resource = SignalStreamResource { + signal: AsyncRefCell::new( signal(SignalKind::from_raw(args.signo)).expect(""), - None, - )), - ); + ), + cancel: Default::default(), + }; + let rid = state.resource_table.add(resource); Ok(json!({ "rid": rid, })) @@ -76,18 +98,18 @@ async fn op_signal_poll( let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let future = poll_fn(move |cx| { - let mut state = state.borrow_mut(); - if let Some(mut signal) = - state.resource_table.get_mut::<SignalStreamResource>(rid) - { - signal.1 = Some(cx.waker().clone()); - return signal.0.poll_recv(cx); - } - std::task::Poll::Ready(None) - }); - let result = future.await; - Ok(json!({ "done": result.is_none() })) + let resource = state + .borrow_mut() + .resource_table + .get::<SignalStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let cancel = RcRef::map(&resource, |r| &r.cancel); + let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await; + + match signal.recv().or_cancel(cancel).await { + Ok(result) => Ok(json!({ "done": result.is_none() })), + Err(_) => Ok(json!({ "done": true })), + } } #[cfg(unix)] @@ -99,14 +121,6 @@ pub fn op_signal_unbind( super::check_unstable(state, "Deno.signal"); let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let resource = state.resource_table.get_mut::<SignalStreamResource>(rid); - if let Some(signal) = resource { - if let Some(waker) = &signal.1 { - // Wakes up the pending poll if exists. - // This prevents the poll future from getting stuck forever. - waker.clone().wake(); - } - } state .resource_table .close(rid) |