diff options
-rw-r--r-- | cli/file_watcher.rs | 92 | ||||
-rw-r--r-- | cli/flags.rs | 37 | ||||
-rw-r--r-- | cli/main.rs | 108 | ||||
-rw-r--r-- | cli/tests/integration_tests.rs | 44 |
4 files changed, 258 insertions, 23 deletions
diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs new file mode 100644 index 000000000..892d0d4a1 --- /dev/null +++ b/cli/file_watcher.rs @@ -0,0 +1,92 @@ +use crate::colors; +use deno_core::ErrBox; +use futures::stream::StreamExt; +use futures::Future; +use notify::event::Event as NotifyEvent; +use notify::event::EventKind; +use notify::Config; +use notify::Error as NotifyError; +use notify::RecommendedWatcher; +use notify::RecursiveMode; +use notify::Watcher; +use std::path::PathBuf; +use std::pin::Pin; +use tokio::select; +use tokio::sync::mpsc; + +// TODO(bartlomieju): rename +type WatchFuture = + Pin<Box<dyn Future<Output = std::result::Result<(), deno_core::ErrBox>>>>; + +async fn error_handler(watch_future: WatchFuture) { + let result = watch_future.await; + if let Err(err) = result { + let msg = format!("{}: {}", colors::red_bold("error"), err.to_string(),); + eprintln!("{}", msg); + } +} + +pub async fn watch_func<F>( + watch_paths: &[PathBuf], + closure: F, +) -> Result<(), ErrBox> +where + F: Fn() -> WatchFuture, +{ + loop { + let func = error_handler(closure()); + let mut is_file_changed = false; + select! { + _ = file_watcher(watch_paths) => { + is_file_changed = true; + info!( + "{} File change detected! Restarting!", + colors::intense_blue("Watcher") + ); + }, + _ = func => { }, + }; + if !is_file_changed { + info!( + "{} Process terminated! Restarting on file change...", + colors::intense_blue("Watcher") + ); + file_watcher(watch_paths).await?; + info!( + "{} File change detected! Restarting!", + colors::intense_blue("Watcher") + ); + } + } +} + +pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), deno_core::ErrBox> { + let (sender, mut receiver) = mpsc::channel::<Result<NotifyEvent, ErrBox>>(16); + let sender = std::sync::Mutex::new(sender); + + let mut watcher: RecommendedWatcher = + Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| { + let res2 = res.map_err(ErrBox::from); + let mut sender = sender.lock().unwrap(); + // Ignore result, if send failed it means that watcher was already closed, + // but not all messages have been flushed. + let _ = sender.try_send(res2); + })?; + + watcher.configure(Config::PreciseEvents(true)).unwrap(); + + for path in paths { + watcher.watch(path, RecursiveMode::NonRecursive)?; + } + + while let Some(result) = receiver.next().await { + let event = result?; + match event.kind { + EventKind::Create(_) => break, + EventKind::Modify(_) => break, + EventKind::Remove(_) => break, + _ => continue, + } + } + Ok(()) +} diff --git a/cli/flags.rs b/cli/flags.rs index 443411bc3..18fd3b29e 100644 --- a/cli/flags.rs +++ b/cli/flags.rs @@ -123,6 +123,7 @@ pub struct Flags { pub unstable: bool, pub v8_flags: Option<Vec<String>>, pub version: bool, + pub watch: bool, pub write_allowlist: Vec<PathBuf>, } @@ -562,6 +563,7 @@ fn run_parse(flags: &mut Flags, matches: &clap::ArgMatches) { flags.argv.push(v); } + flags.watch = matches.is_present("watch"); flags.subcommand = DenoSubcommand::Run { script }; } @@ -1157,6 +1159,7 @@ fn run_test_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> { fn run_subcommand<'a, 'b>() -> App<'a, 'b> { run_test_args(SubCommand::with_name("run")) + .arg(watch_arg()) .setting(AppSettings::TrailingVarArg) .arg(script_arg()) .about("Run a program given a filename or url to the module. Use '-' as a filename to read from stdin.") @@ -1409,6 +1412,17 @@ fn v8_flags_arg_parse(flags: &mut Flags, matches: &ArgMatches) { } } +fn watch_arg<'a, 'b>() -> Arg<'a, 'b> { + Arg::with_name("watch") + .requires("unstable") + .long("watch") + .help("Watch for file changes and restart process automatically") + .long_help( + "Watch for file changes and restart process automatically. +Only local files from entry point module graph are watched.", + ) +} + fn no_check_arg<'a, 'b>() -> Arg<'a, 'b> { Arg::with_name("no-check") .long("no-check") @@ -1561,6 +1575,29 @@ mod tests { } #[test] + fn run_watch() { + let r = flags_from_vec_safe(svec![ + "deno", + "run", + "--unstable", + "--watch", + "script.ts" + ]); + let flags = r.unwrap(); + assert_eq!( + flags, + Flags { + subcommand: DenoSubcommand::Run { + script: "script.ts".to_string(), + }, + watch: true, + unstable: true, + ..Flags::default() + } + ); + } + + #[test] fn run_reload_allow_write() { let r = flags_from_vec_safe(svec![ "deno", diff --git a/cli/main.rs b/cli/main.rs index 39c931349..0d55ea1df 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -30,6 +30,7 @@ mod diff; mod disk_cache; pub mod errors; mod file_fetcher; +mod file_watcher; pub mod flags; mod flags_allow_net; mod fmt; @@ -228,6 +229,8 @@ async fn cache_command(flags: Flags, files: Vec<String>) -> Result<(), ErrBox> { for file in files { let specifier = ModuleSpecifier::resolve_url_or_path(&file)?; + // TODO(bartlomieju): don't use `preload_module` in favor of calling "GlobalState::prepare_module_load()" + // explicitly? Seems wasteful to create multiple worker just to run TS compiler worker.preload_module(&specifier).await.map(|_| ())?; } @@ -435,33 +438,29 @@ async fn run_repl(flags: Flags) -> Result<(), ErrBox> { } } -async fn run_command(flags: Flags, script: String) -> Result<(), ErrBox> { +async fn run_from_stdin(flags: Flags) -> Result<(), ErrBox> { let global_state = GlobalState::new(flags.clone())?; - let main_module = if script != "-" { - ModuleSpecifier::resolve_url_or_path(&script).unwrap() - } else { - ModuleSpecifier::resolve_url_or_path("./__$deno$stdin.ts").unwrap() - }; + let main_module = + ModuleSpecifier::resolve_url_or_path("./__$deno$stdin.ts").unwrap(); let mut worker = MainWorker::create(&global_state.clone(), main_module.clone())?; - if script == "-" { - let mut source = Vec::new(); - std::io::stdin().read_to_end(&mut source)?; - let main_module_url = main_module.as_url().to_owned(); - // Create a dummy source file. - let source_file = SourceFile { - filename: main_module_url.to_file_path().unwrap(), - url: main_module_url, - types_header: None, - media_type: MediaType::TypeScript, - source_code: source.into(), - }; - // Save our fake file into file fetcher cache - // to allow module access by TS compiler - global_state - .file_fetcher - .save_source_file_in_cache(&main_module, source_file); + + let mut source = Vec::new(); + std::io::stdin().read_to_end(&mut source)?; + let main_module_url = main_module.as_url().to_owned(); + // Create a dummy source file. + let source_file = SourceFile { + filename: main_module_url.to_file_path().unwrap(), + url: main_module_url, + types_header: None, + media_type: MediaType::TypeScript, + source_code: source.into(), }; + // Save our fake file into file fetcher cache + // to allow module access by TS compiler + global_state + .file_fetcher + .save_source_file_in_cache(&main_module, source_file); debug!("main_module {}", main_module); worker.execute_module(&main_module).await?; @@ -471,6 +470,69 @@ async fn run_command(flags: Flags, script: String) -> Result<(), ErrBox> { Ok(()) } +async fn run_with_watch(flags: Flags, script: String) -> Result<(), ErrBox> { + let main_module = ModuleSpecifier::resolve_url_or_path(&script)?; + let global_state = GlobalState::new(flags.clone())?; + + let mut module_graph_loader = module_graph::ModuleGraphLoader::new( + global_state.file_fetcher.clone(), + global_state.maybe_import_map.clone(), + Permissions::allow_all(), + false, + false, + ); + module_graph_loader.add_to_graph(&main_module, None).await?; + let module_graph = module_graph_loader.get_graph(); + + // Find all local files in graph + let paths_to_watch: Vec<PathBuf> = module_graph + .values() + .map(|f| Url::parse(&f.url).unwrap()) + .filter(|url| url.scheme() == "file") + .map(|url| url.to_file_path().unwrap()) + .collect(); + + // FIXME(bartlomieju): new file watcher is created on after each restart + file_watcher::watch_func(&paths_to_watch, move || { + // FIXME(bartlomieju): GlobalState must be created on each restart - otherwise file fetcher + // will use cached source files + let gs = GlobalState::new(flags.clone()).unwrap(); + let main_module = main_module.clone(); + async move { + let mut worker = MainWorker::create(&gs, main_module.clone())?; + debug!("main_module {}", main_module); + worker.execute_module(&main_module).await?; + worker.execute("window.dispatchEvent(new Event('load'))")?; + (&mut *worker).await?; + worker.execute("window.dispatchEvent(new Event('unload'))")?; + Ok(()) + } + .boxed_local() + }) + .await +} + +async fn run_command(flags: Flags, script: String) -> Result<(), ErrBox> { + // Read script content from stdin + if script == "-" { + return run_from_stdin(flags).await; + } + + if flags.watch { + return run_with_watch(flags, script).await; + } + + let main_module = ModuleSpecifier::resolve_url_or_path(&script)?; + let global_state = GlobalState::new(flags.clone())?; + let mut worker = MainWorker::create(&global_state, main_module.clone())?; + debug!("main_module {}", main_module); + worker.execute_module(&main_module).await?; + worker.execute("window.dispatchEvent(new Event('load'))")?; + (&mut *worker).await?; + worker.execute("window.dispatchEvent(new Event('unload'))")?; + Ok(()) +} + async fn test_command( flags: Flags, include: Option<Vec<String>>, diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index e00c7a865..ec13f7380 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -982,6 +982,50 @@ fn info_with_compiled_source() { } #[test] +fn run_watch() { + let t = TempDir::new().expect("tempdir fail"); + let file_to_watch = t.path().join("file_to_watch.js"); + std::fs::write(&file_to_watch, "console.log('Hello world');") + .expect("error writing file"); + + let mut child = util::deno_cmd() + .current_dir(util::root_path()) + .arg("run") + .arg("--watch") + .arg("--unstable") + .arg(&file_to_watch) + .env("NO_COLOR", "1") + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .expect("failed to spawn script"); + + let stdout = child.stdout.as_mut().unwrap(); + let mut stdout_lines = + std::io::BufReader::new(stdout).lines().map(|r| r.unwrap()); + let stderr = child.stderr.as_mut().unwrap(); + let mut stderr_lines = + std::io::BufReader::new(stderr).lines().map(|r| r.unwrap()); + + assert!(stdout_lines.next().unwrap().contains("Hello world")); + assert!(stderr_lines.next().unwrap().contains("Process terminated")); + + // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. + std::thread::sleep(std::time::Duration::from_secs(1)); + + // Change content of the file + std::fs::write(&file_to_watch, "console.log('Hello world2');") + .expect("error writing file"); + + assert!(stderr_lines.next().unwrap().contains("Restarting")); + assert!(stdout_lines.next().unwrap().contains("Hello world2")); + assert!(stderr_lines.next().unwrap().contains("Process terminated")); + + child.kill().unwrap(); + drop(t); +} + +#[test] fn repl_test_console_log() { let (out, err) = util::run_and_collect_output( true, |