summaryrefslogtreecommitdiff
path: root/runtime/ops/fs_events.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/fs_events.rs')
-rw-r--r--runtime/ops/fs_events.rs58
1 files changed, 38 insertions, 20 deletions
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
index 4832c915c..38661e1d4 100644
--- a/runtime/ops/fs_events.rs
+++ b/runtime/ops/fs_events.rs
@@ -3,12 +3,16 @@
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
+use deno_core::AsyncRefCell;
use deno_core::BufVec;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use notify::event::Event as NotifyEvent;
use notify::Error as NotifyError;
@@ -18,6 +22,7 @@ use notify::RecursiveMode;
use notify::Watcher;
use serde::Deserialize;
use serde::Serialize;
+use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::path::PathBuf;
@@ -32,7 +37,18 @@ pub fn init(rt: &mut deno_core::JsRuntime) {
struct FsEventsResource {
#[allow(unused)]
watcher: RecommendedWatcher,
- receiver: mpsc::Receiver<Result<FsEvent, AnyError>>,
+ receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>,
+ cancel: CancelHandle,
+}
+
+impl Resource for FsEventsResource {
+ fn name(&self) -> Cow<str> {
+ "fsEvents".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
}
/// Represents a file system event.
@@ -99,8 +115,12 @@ fn op_fs_events_open(
.check_read(&PathBuf::from(path))?;
watcher.watch(path, recursive_mode)?;
}
- let resource = FsEventsResource { watcher, receiver };
- let rid = state.resource_table.add("fsEvents", Box::new(resource));
+ let resource = FsEventsResource {
+ watcher,
+ receiver: AsyncRefCell::new(receiver),
+ cancel: Default::default(),
+ };
+ let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
@@ -114,20 +134,18 @@ async fn op_fs_events_poll(
rid: u32,
}
let PollArgs { rid } = serde_json::from_value(args)?;
- poll_fn(move |cx| {
- let mut state = state.borrow_mut();
- let watcher = state
- .resource_table
- .get_mut::<FsEventsResource>(rid)
- .ok_or_else(bad_resource_id)?;
- watcher
- .receiver
- .poll_recv(cx)
- .map(|maybe_result| match maybe_result {
- Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
- Some(Err(err)) => Err(err),
- None => Ok(json!({ "done": true })),
- })
- })
- .await
+
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<FsEventsResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await;
+ let cancel = RcRef::map(resource, |r| &r.cancel);
+ let maybe_result = receiver.recv().or_cancel(cancel).await?;
+ match maybe_result {
+ Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
+ Some(Err(err)) => Err(err),
+ None => Ok(json!({ "done": true })),
+ }
}