summaryrefslogtreecommitdiff
path: root/runtime/ops/signal.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-12-16 17:14:12 +0100
committerGitHub <noreply@github.com>2020-12-16 17:14:12 +0100
commit6984b63f2f3c8d0819fe2dced8252a81f3400ae7 (patch)
tree5201bc962f913927409ae2770aca48ffa3aaaa34 /runtime/ops/signal.rs
parent9fe26f8ca189ac81d9c20c454b9dbfa5e1011c3f (diff)
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table and "AsyncRefCell". Old implementation of resource table was completely removed and all code referencing it was updated to use new system.
Diffstat (limited to 'runtime/ops/signal.rs')
-rw-r--r--runtime/ops/signal.rs74
1 files changed, 44 insertions, 30 deletions
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs
index be6bc0a35..b3891792c 100644
--- a/runtime/ops/signal.rs
+++ b/runtime/ops/signal.rs
@@ -11,15 +11,23 @@ use std::rc::Rc;
#[cfg(unix)]
use deno_core::error::bad_resource_id;
#[cfg(unix)]
-use deno_core::futures::future::poll_fn;
-#[cfg(unix)]
use deno_core::serde_json;
#[cfg(unix)]
use deno_core::serde_json::json;
#[cfg(unix)]
+use deno_core::AsyncRefCell;
+#[cfg(unix)]
+use deno_core::CancelFuture;
+#[cfg(unix)]
+use deno_core::CancelHandle;
+#[cfg(unix)]
+use deno_core::RcRef;
+#[cfg(unix)]
+use deno_core::Resource;
+#[cfg(unix)]
use serde::Deserialize;
#[cfg(unix)]
-use std::task::Waker;
+use std::borrow::Cow;
#[cfg(unix)]
use tokio::signal::unix::{signal, Signal, SignalKind};
@@ -32,7 +40,21 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
#[cfg(unix)]
/// The resource for signal stream.
/// The second element is the waker of polling future.
-pub struct SignalStreamResource(pub Signal, pub Option<Waker>);
+struct SignalStreamResource {
+ signal: AsyncRefCell<Signal>,
+ cancel: CancelHandle,
+}
+
+#[cfg(unix)]
+impl Resource for SignalStreamResource {
+ fn name(&self) -> Cow<str> {
+ "signal".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+}
#[cfg(unix)]
#[derive(Deserialize)]
@@ -54,13 +76,13 @@ fn op_signal_bind(
) -> Result<Value, AnyError> {
super::check_unstable(state, "Deno.signal");
let args: BindSignalArgs = serde_json::from_value(args)?;
- let rid = state.resource_table.add(
- "signal",
- Box::new(SignalStreamResource(
+ let resource = SignalStreamResource {
+ signal: AsyncRefCell::new(
signal(SignalKind::from_raw(args.signo)).expect(""),
- None,
- )),
- );
+ ),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!({
"rid": rid,
}))
@@ -76,18 +98,18 @@ async fn op_signal_poll(
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let future = poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- if let Some(mut signal) =
- state.resource_table.get_mut::<SignalStreamResource>(rid)
- {
- signal.1 = Some(cx.waker().clone());
- return signal.0.poll_recv(cx);
- }
- std::task::Poll::Ready(None)
- });
- let result = future.await;
- Ok(json!({ "done": result.is_none() }))
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<SignalStreamResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let cancel = RcRef::map(&resource, |r| &r.cancel);
+ let mut signal = RcRef::map(&resource, |r| &r.signal).borrow_mut().await;
+
+ match signal.recv().or_cancel(cancel).await {
+ Ok(result) => Ok(json!({ "done": result.is_none() })),
+ Err(_) => Ok(json!({ "done": true })),
+ }
}
#[cfg(unix)]
@@ -99,14 +121,6 @@ pub fn op_signal_unbind(
super::check_unstable(state, "Deno.signal");
let args: SignalArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let resource = state.resource_table.get_mut::<SignalStreamResource>(rid);
- if let Some(signal) = resource {
- if let Some(waker) = &signal.1 {
- // Wakes up the pending poll if exists.
- // This prevents the poll future from getting stuck forever.
- waker.clone().wake();
- }
- }
state
.resource_table
.close(rid)