diff options
Diffstat (limited to 'runtime/ops/fs_events.rs')
-rw-r--r-- | runtime/ops/fs_events.rs | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs index 4832c915c..38661e1d4 100644 --- a/runtime/ops/fs_events.rs +++ b/runtime/ops/fs_events.rs @@ -3,12 +3,16 @@ use crate::permissions::Permissions; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::AsyncRefCell; use deno_core::BufVec; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; use deno_core::ZeroCopyBuf; use notify::event::Event as NotifyEvent; use notify::Error as NotifyError; @@ -18,6 +22,7 @@ use notify::RecursiveMode; use notify::Watcher; use serde::Deserialize; use serde::Serialize; +use std::borrow::Cow; use std::cell::RefCell; use std::convert::From; use std::path::PathBuf; @@ -32,7 +37,18 @@ pub fn init(rt: &mut deno_core::JsRuntime) { struct FsEventsResource { #[allow(unused)] watcher: RecommendedWatcher, - receiver: mpsc::Receiver<Result<FsEvent, AnyError>>, + receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>, + cancel: CancelHandle, +} + +impl Resource for FsEventsResource { + fn name(&self) -> Cow<str> { + "fsEvents".into() + } + + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } } /// Represents a file system event. @@ -99,8 +115,12 @@ fn op_fs_events_open( .check_read(&PathBuf::from(path))?; watcher.watch(path, recursive_mode)?; } - let resource = FsEventsResource { watcher, receiver }; - let rid = state.resource_table.add("fsEvents", Box::new(resource)); + let resource = FsEventsResource { + watcher, + receiver: AsyncRefCell::new(receiver), + cancel: Default::default(), + }; + let rid = state.resource_table.add(resource); Ok(json!(rid)) } @@ -114,20 +134,18 @@ async fn op_fs_events_poll( rid: u32, } let PollArgs { rid } = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let watcher = state - .resource_table - .get_mut::<FsEventsResource>(rid) - .ok_or_else(bad_resource_id)?; - watcher - .receiver - .poll_recv(cx) - .map(|maybe_result| match maybe_result { - Some(Ok(value)) => Ok(json!({ "value": value, "done": false })), - Some(Err(err)) => Err(err), - None => Ok(json!({ "done": true })), - }) - }) - .await + + let resource = state + .borrow() + .resource_table + .get::<FsEventsResource>(rid) + .ok_or_else(bad_resource_id)?; + let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await; + let cancel = RcRef::map(resource, |r| &r.cancel); + let maybe_result = receiver.recv().or_cancel(cancel).await?; + match maybe_result { + Some(Ok(value)) => Ok(json!({ "value": value, "done": false })), + Some(Err(err)) => Err(err), + None => Ok(json!({ "done": true })), + } } |