summaryrefslogtreecommitdiff
path: root/runtime/ops/fs_events.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-12-13 19:45:53 +0100
committerGitHub <noreply@github.com>2020-12-13 19:45:53 +0100
commit2e74f164b6dcf0ecbf8dd38fba9fae550d784bd0 (patch)
tree61abe8e09d5331ace5d9de529f0e2737a8e05dbb /runtime/ops/fs_events.rs
parent84ef9bd21fb48fb6b5fbc8dafc3de9f361bade3b (diff)
refactor: deno_runtime crate (#8640)
This commit moves Deno JS runtime, ops, permissions and inspector implementation to new "deno_runtime" crate located in "runtime/" directory. Details in "runtime/README.md". Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'runtime/ops/fs_events.rs')
-rw-r--r--runtime/ops/fs_events.rs133
1 files changed, 133 insertions, 0 deletions
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
new file mode 100644
index 000000000..4832c915c
--- /dev/null
+++ b/runtime/ops/fs_events.rs
@@ -0,0 +1,133 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use deno_core::error::bad_resource_id;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use notify::event::Event as NotifyEvent;
+use notify::Error as NotifyError;
+use notify::EventKind;
+use notify::RecommendedWatcher;
+use notify::RecursiveMode;
+use notify::Watcher;
+use serde::Deserialize;
+use serde::Serialize;
+use std::cell::RefCell;
+use std::convert::From;
+use std::path::PathBuf;
+use std::rc::Rc;
+use tokio::sync::mpsc;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_fs_events_open", op_fs_events_open);
+ super::reg_json_async(rt, "op_fs_events_poll", op_fs_events_poll);
+}
+
+struct FsEventsResource {
+ #[allow(unused)]
+ watcher: RecommendedWatcher,
+ receiver: mpsc::Receiver<Result<FsEvent, AnyError>>,
+}
+
+/// Represents a file system event.
+///
+/// We do not use the event directly from the notify crate. We flatten
+/// the structure into this simpler structure. We want to only make it more
+/// complex as needed.
+///
+/// Feel free to expand this struct as long as you can add tests to demonstrate
+/// the complexity.
+#[derive(Serialize, Debug)]
+struct FsEvent {
+ kind: String,
+ paths: Vec<PathBuf>,
+}
+
+impl From<NotifyEvent> for FsEvent {
+ fn from(e: NotifyEvent) -> Self {
+ let kind = match e.kind {
+ EventKind::Any => "any",
+ EventKind::Access(_) => "access",
+ EventKind::Create(_) => "create",
+ EventKind::Modify(_) => "modify",
+ EventKind::Remove(_) => "remove",
+ EventKind::Other => todo!(), // What's this for? Leaving it out for now.
+ }
+ .to_string();
+ FsEvent {
+ kind,
+ paths: e.paths,
+ }
+ }
+}
+
+fn op_fs_events_open(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ #[derive(Deserialize)]
+ struct OpenArgs {
+ recursive: bool,
+ paths: Vec<String>,
+ }
+ let args: OpenArgs = serde_json::from_value(args)?;
+ let (sender, receiver) = mpsc::channel::<Result<FsEvent, AnyError>>(16);
+ let sender = std::sync::Mutex::new(sender);
+ let mut watcher: RecommendedWatcher =
+ Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
+ let res2 = res.map(FsEvent::from).map_err(AnyError::from);
+ let mut sender = sender.lock().unwrap();
+ // Ignore result, if send failed it means that watcher was already closed,
+ // but not all messages have been flushed.
+ let _ = sender.try_send(res2);
+ })?;
+ let recursive_mode = if args.recursive {
+ RecursiveMode::Recursive
+ } else {
+ RecursiveMode::NonRecursive
+ };
+ for path in &args.paths {
+ state
+ .borrow::<Permissions>()
+ .check_read(&PathBuf::from(path))?;
+ watcher.watch(path, recursive_mode)?;
+ }
+ let resource = FsEventsResource { watcher, receiver };
+ let rid = state.resource_table.add("fsEvents", Box::new(resource));
+ Ok(json!(rid))
+}
+
+async fn op_fs_events_poll(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ #[derive(Deserialize)]
+ struct PollArgs {
+ rid: u32,
+ }
+ let PollArgs { rid } = serde_json::from_value(args)?;
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let watcher = state
+ .resource_table
+ .get_mut::<FsEventsResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ watcher
+ .receiver
+ .poll_recv(cx)
+ .map(|maybe_result| match maybe_result {
+ Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
+ Some(Err(err)) => Err(err),
+ None => Ok(json!({ "done": true })),
+ })
+ })
+ .await
+}