diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-12-16 17:14:12 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-16 17:14:12 +0100 |
commit | 6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch) | |
tree | 5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops/signal.rs | |
parent | 9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff) |
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table
and "AsyncRefCell".
Old implementation of resource table was completely
removed and all code referencing it was updated to use
new system.
Diffstat (limited to 'runtime/ops/signal.rs')
-rw-r--r-- | runtime/ops/signal.rs | 74 |
1 files changed, 44 insertions, 30 deletions
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs index be6bc0a35..b3891792c 100644 --- a/runtime/ops/signal.rs +++ b/runtime/ops/signal.rs @@ -11,15 +11,23 @@ use std::rc::Rc; #[cfg(unix)] use deno_core::error::bad_resource_id; #[cfg(unix)] -use deno_core::futures::future::poll_fn; -#[cfg(unix)] use deno_core::serde_json; #[cfg(unix)] use deno_core::serde_json::json; #[cfg(unix)] +use deno_core::AsyncRefCell; +#[cfg(unix)] +use deno_core::CancelFuture; +#[cfg(unix)] +use deno_core::CancelHandle; +#[cfg(unix)] +use deno_core::RcRef; +#[cfg(unix)] +use deno_core::Resource; +#[cfg(unix)] use serde::Deserialize; #[cfg(unix)] -use std::task::Waker; +use std::borrow::Cow; #[cfg(unix)] use tokio::signal::unix::{signal, Signal, SignalKind}; @@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) { #[cfg(unix)] /// The resource for signal stream. /// The second element is the waker of polling future. -pub struct SignalStreamResource(pub Signal, pub Option<Waker>); +struct SignalStreamResource { + signal: AsyncRefCell<Signal>, + cancel: CancelHandle, +} + +#[cfg(unix)] +impl Resource for SignalStreamResource { + fn name(&self) -> Cow<str> { + "signal".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } +} #[cfg(unix)] #[derive(Deserialize)] @@ -54,13 +76,13 @@ fn op_signal_bind( ) -> Result<Value, AnyError> { super::check_unstable(state, "Deno.signal"); let args: BindSignalArgs = serde_json::from_value(args)?; - let rid = state.resource_table.add( - "signal", - Box::new(SignalStreamResource( + let resource = SignalStreamResource { + signal: AsyncRefCell::new( signal(SignalKind::from_raw(args.signo)).expect(""), - None, - )), - ); + ), + cancel: Default::default(), + }; + let rid = state.resource_table.add(resource); Ok(json!({ "rid": rid, })) @@ -76,18 +98,18 @@ async fn op_signal_poll( let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let future = poll_fn(move |cx| { - let mut state = state.borrow_mut(); - if let Some(mut signal) = - state.resource_table.get_mut::<SignalStreamResource>(rid) - { - signal.1 = Some(cx.waker().clone()); - return signal.0.poll_recv(cx); - } - std::task::Poll::Ready(None) - }); - let result = future.await; - Ok(json!({ "done": result.is_none() })) + let resource = state + .borrow_mut() + .resource_table + .get::<SignalStreamResource>(rid) + .ok_or_else(bad_resource_id)?; + let cancel = RcRef::map(&resource, |r| &r.cancel); + let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await; + + match signal.recv().or_cancel(cancel).await { + Ok(result) => Ok(json!({ "done": result.is_none() })), + Err(_) => Ok(json!({ "done": true })), + } } #[cfg(unix)] @@ -99,14 +121,6 @@ pub fn op_signal_unbind( super::check_unstable(state, "Deno.signal"); let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let resource = state.resource_table.get_mut::<SignalStreamResource>(rid); - if let Some(signal) = resource { - if let Some(waker) = &signal.1 { - // Wakes up the pending poll if exists. - // This prevents the poll future from getting stuck forever. - waker.clone().wake(); - } - } state .resource_table .close(rid) |