diff options
-rw-r--r-- | src/fs.rs | 6 | ||||
-rw-r--r-- | src/ops.rs | 60 | ||||
-rw-r--r-- | src/tokio_util.rs | 27 |
3 files changed, 58 insertions, 35 deletions
@@ -13,9 +13,9 @@ use std::os::unix::fs::DirBuilderExt; #[cfg(any(unix))] use std::os::unix::fs::PermissionsExt; -pub fn write_file( +pub fn write_file<T: AsRef<[u8]>>( filename: &Path, - data: &[u8], + data: T, perm: u32, ) -> std::io::Result<()> { let is_append = perm & (1 << 31) != 0; @@ -28,7 +28,7 @@ pub fn write_file( .open(filename)?; set_permissions(&mut file, perm)?; - file.write_all(data) + file.write_all(data.as_ref()) } #[cfg(any(unix))] diff --git a/src/ops.rs b/src/ops.rs index 7c4fc5119..8e8388714 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -16,7 +16,6 @@ use version; use flatbuffers::FlatBufferBuilder; use futures; -use futures::future::poll_fn; use futures::Poll; use hyper; use hyper::rt::Future; @@ -42,6 +41,7 @@ use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_process::CommandExt; use tokio_threadpool; +use tokio_util; type OpResult = DenoResult<Buf>; @@ -236,11 +236,13 @@ fn serialize_response( data.into() } +#[inline] fn ok_future(buf: Buf) -> Box<Op> { Box::new(futures::future::ok(buf)) } // Shout out to Earl Sweatshirt. +#[inline] fn odd_future(err: DenoError) -> Box<Op> { Box::new(futures::future::err(err)) } @@ -460,21 +462,15 @@ where } } -// TODO Do not use macro for the blocking function.. We should instead be able -// to do this with a normal function, but there seems to some type system -// issues. The type of this function should be something like this: -// fn blocking<F>(is_sync: bool, f: F) -> Box<Op> -// where F: FnOnce() -> DenoResult<Buf> -macro_rules! blocking { - ($is_sync:expr, $fn:expr) => { - if $is_sync { - // If synchronous, execute the function immediately on the main thread. - Box::new(futures::future::result($fn())) - } else { - // Otherwise dispatch to thread pool. - Box::new(poll_fn(move || convert_blocking($fn))) - } - }; +fn blocking<F>(is_sync: bool, f: F) -> Box<Op> +where + F: 'static + Send + FnOnce() -> DenoResult<Buf>, +{ + if is_sync { + Box::new(futures::future::result(f())) + } else { + Box::new(tokio_util::poll_fn(move || convert_blocking(f))) + } } fn op_make_temp_dir( @@ -496,7 +492,7 @@ fn op_make_temp_dir( let prefix = inner.prefix().map(String::from); let suffix = inner.suffix().map(String::from); - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { // TODO(piscisaureus): use byte vector for paths, not a string. // See https://github.com/denoland/deno/issues/627. // We can't assume that paths are always valid utf8 strings. @@ -539,7 +535,7 @@ fn op_mkdir( if let Err(e) = state.check_write(&path) { return odd_future(e); } - blocking!(base.sync(), || { + blocking(base.sync(), move || { debug!("op_mkdir {}", path); deno_fs::mkdir(Path::new(&path), mode)?; Ok(empty_buf()) @@ -560,7 +556,7 @@ fn op_chmod( return odd_future(e); } - blocking!(base.sync(), || { + blocking(base.sync(), move || { debug!("op_chmod {}", &path); let path = PathBuf::from(&path); // Still check file/dir exists on windows @@ -644,7 +640,7 @@ fn op_shutdown( 1 => Shutdown::Write, _ => unimplemented!(), }; - blocking!(base.sync(), || { + blocking(base.sync(), move || { // Use UFCS for disambiguation Resource::shutdown(&mut resource, shutdown_mode)?; Ok(empty_buf()) @@ -743,7 +739,7 @@ fn op_remove( return odd_future(e); } - blocking!(base.sync(), || { + blocking(base.sync(), move || { debug!("op_remove {}", path.display()); let metadata = fs::metadata(&path)?; if metadata.is_file() { @@ -768,7 +764,7 @@ fn op_read_file( let cmd_id = base.cmd_id(); let filename = PathBuf::from(inner.filename().unwrap()); debug!("op_read_file {}", filename.display()); - blocking!(base.sync(), || { + blocking(base.sync(), move || { let vec = fs::read(&filename)?; // Build the response message. memcpy data into inner. // TODO(ry) zero-copy. @@ -808,7 +804,7 @@ fn op_copy_file( } debug!("op_copy_file {} {}", from.display(), to.display()); - blocking!(base.sync(), || { + blocking(base.sync(), move || { // On *nix, Rust deem non-existent path as invalid input // See https://github.com/rust-lang/rust/issues/54800 // Once the issue is reolved, we should remove this workaround. @@ -881,7 +877,7 @@ fn op_stat( let filename = PathBuf::from(inner.filename().unwrap()); let lstat = inner.lstat(); - blocking!(base.sync(), || { + blocking(base.sync(), move || { let builder = &mut FlatBufferBuilder::new(); debug!("op_stat {} {}", filename.display(), lstat); let metadata = if lstat { @@ -927,7 +923,7 @@ fn op_read_dir( let cmd_id = base.cmd_id(); let path = String::from(inner.path().unwrap()); - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { debug!("op_read_dir {}", path); let builder = &mut FlatBufferBuilder::new(); let entries: Vec<_> = fs::read_dir(Path::new(&path))? @@ -986,9 +982,9 @@ fn op_write_file( return odd_future(e); } - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { debug!("op_write_file {} {}", filename, data.len()); - deno_fs::write_file(Path::new(&filename), &data, perm)?; + deno_fs::write_file(Path::new(&filename), data, perm)?; Ok(empty_buf()) }) } @@ -1006,7 +1002,7 @@ fn op_rename( if let Err(e) = state.check_write(&newpath_) { return odd_future(e); } - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { debug!("op_rename {} {}", oldpath.display(), newpath.display()); fs::rename(&oldpath, &newpath)?; Ok(empty_buf()) @@ -1034,7 +1030,7 @@ fn op_symlink( "Not implemented".to_string(), )); } - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { debug!("op_symlink {} {}", oldname.display(), newname.display()); #[cfg(any(unix))] std::os::unix::fs::symlink(&oldname, &newname)?; @@ -1052,7 +1048,7 @@ fn op_read_link( let cmd_id = base.cmd_id(); let name = PathBuf::from(inner.name().unwrap()); - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { debug!("op_read_link {}", name.display()); let path = fs::read_link(&name)?; let builder = &mut FlatBufferBuilder::new(); @@ -1121,7 +1117,7 @@ fn op_repl_readline( // Ignore this clippy warning until this issue is addressed: // https://github.com/rust-lang-nursery/rust-clippy/issues/1684 #[cfg_attr(feature = "cargo-clippy", allow(redundant_closure_call))] - blocking!(base.sync(), || -> OpResult { + blocking(base.sync(), move || -> OpResult { let line = resources::readline(rid, &prompt)?; let builder = &mut FlatBufferBuilder::new(); @@ -1159,7 +1155,7 @@ fn op_truncate( return odd_future(e); } - blocking!(base.sync(), || { + blocking(base.sync(), move || { debug!("op_truncate {} {}", filename, len); let f = fs::OpenOptions::new().write(true).open(&filename)?; f.set_len(u64::from(len))?; diff --git a/src/tokio_util.rs b/src/tokio_util.rs index 31a67574e..6d295453f 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -73,3 +73,30 @@ impl Future for Accept { } } } + +/// `futures::future::poll_fn` only support `F: FnMut()->Poll<T, E>` +/// However, we require that `F: FnOnce()->Poll<T, E>`. +/// Therefore, we created our version of `poll_fn`. +pub fn poll_fn<T, E, F>(f: F) -> PollFn<F> +where + F: FnOnce() -> Poll<T, E>, +{ + PollFn { inner: Some(f) } +} + +pub struct PollFn<F> { + inner: Option<F>, +} + +impl<T, E, F> Future for PollFn<F> +where + F: FnOnce() -> Poll<T, E>, +{ + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + let f = self.inner.take().expect("Inner fn has been taken."); + f() + } +} |