summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorF001 <changchun.fan@qq.com>2018-12-11 21:36:34 +0800
committerRyan Dahl <ry@tinyclouds.org>2018-12-11 08:36:34 -0500
commitc1de50b0ca9c6e1c8cd06347d19dd3db50db36d8 (patch)
tree8b1b21bee6630638016d4a7e1cd749c5cd887f16 /src
parent9a960b9f5804f5e855163e7ec43327c28daef845 (diff)
Replace blocking! macro by generic function (#1305)
Diffstat (limited to 'src')
-rw-r--r--src/fs.rs6
-rw-r--r--src/ops.rs60
-rw-r--r--src/tokio_util.rs27
3 files changed, 58 insertions, 35 deletions
diff --git a/src/fs.rs b/src/fs.rs
index fde28efa2..228b4431e 100644
--- a/src/fs.rs
+++ b/src/fs.rs
@@ -13,9 +13,9 @@ use std::os::unix::fs::DirBuilderExt;
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
-pub fn write_file(
+pub fn write_file<T: AsRef<[u8]>>(
filename: &Path,
- data: &[u8],
+ data: T,
perm: u32,
) -> std::io::Result<()> {
let is_append = perm & (1 << 31) != 0;
@@ -28,7 +28,7 @@ pub fn write_file(
.open(filename)?;
set_permissions(&mut file, perm)?;
- file.write_all(data)
+ file.write_all(data.as_ref())
}
#[cfg(any(unix))]
diff --git a/src/ops.rs b/src/ops.rs
index 7c4fc5119..8e8388714 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -16,7 +16,6 @@ use version;
use flatbuffers::FlatBufferBuilder;
use futures;
-use futures::future::poll_fn;
use futures::Poll;
use hyper;
use hyper::rt::Future;
@@ -42,6 +41,7 @@ use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_process::CommandExt;
use tokio_threadpool;
+use tokio_util;
type OpResult = DenoResult<Buf>;
@@ -236,11 +236,13 @@ fn serialize_response(
data.into()
}
+#[inline]
fn ok_future(buf: Buf) -> Box<Op> {
Box::new(futures::future::ok(buf))
}
// Shout out to Earl Sweatshirt.
+#[inline]
fn odd_future(err: DenoError) -> Box<Op> {
Box::new(futures::future::err(err))
}
@@ -460,21 +462,15 @@ where
}
}
-// TODO Do not use macro for the blocking function.. We should instead be able
-// to do this with a normal function, but there seems to some type system
-// issues. The type of this function should be something like this:
-// fn blocking<F>(is_sync: bool, f: F) -> Box<Op>
-// where F: FnOnce() -> DenoResult<Buf>
-macro_rules! blocking {
- ($is_sync:expr, $fn:expr) => {
- if $is_sync {
- // If synchronous, execute the function immediately on the main thread.
- Box::new(futures::future::result($fn()))
- } else {
- // Otherwise dispatch to thread pool.
- Box::new(poll_fn(move || convert_blocking($fn)))
- }
- };
+fn blocking<F>(is_sync: bool, f: F) -> Box<Op>
+where
+ F: 'static + Send + FnOnce() -> DenoResult<Buf>,
+{
+ if is_sync {
+ Box::new(futures::future::result(f()))
+ } else {
+ Box::new(tokio_util::poll_fn(move || convert_blocking(f)))
+ }
}
fn op_make_temp_dir(
@@ -496,7 +492,7 @@ fn op_make_temp_dir(
let prefix = inner.prefix().map(String::from);
let suffix = inner.suffix().map(String::from);
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
// TODO(piscisaureus): use byte vector for paths, not a string.
// See https://github.com/denoland/deno/issues/627.
// We can't assume that paths are always valid utf8 strings.
@@ -539,7 +535,7 @@ fn op_mkdir(
if let Err(e) = state.check_write(&path) {
return odd_future(e);
}
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
debug!("op_mkdir {}", path);
deno_fs::mkdir(Path::new(&path), mode)?;
Ok(empty_buf())
@@ -560,7 +556,7 @@ fn op_chmod(
return odd_future(e);
}
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
debug!("op_chmod {}", &path);
let path = PathBuf::from(&path);
// Still check file/dir exists on windows
@@ -644,7 +640,7 @@ fn op_shutdown(
1 => Shutdown::Write,
_ => unimplemented!(),
};
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
// Use UFCS for disambiguation
Resource::shutdown(&mut resource, shutdown_mode)?;
Ok(empty_buf())
@@ -743,7 +739,7 @@ fn op_remove(
return odd_future(e);
}
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
debug!("op_remove {}", path.display());
let metadata = fs::metadata(&path)?;
if metadata.is_file() {
@@ -768,7 +764,7 @@ fn op_read_file(
let cmd_id = base.cmd_id();
let filename = PathBuf::from(inner.filename().unwrap());
debug!("op_read_file {}", filename.display());
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
let vec = fs::read(&filename)?;
// Build the response message. memcpy data into inner.
// TODO(ry) zero-copy.
@@ -808,7 +804,7 @@ fn op_copy_file(
}
debug!("op_copy_file {} {}", from.display(), to.display());
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
// On *nix, Rust deem non-existent path as invalid input
// See https://github.com/rust-lang/rust/issues/54800
// Once the issue is reolved, we should remove this workaround.
@@ -881,7 +877,7 @@ fn op_stat(
let filename = PathBuf::from(inner.filename().unwrap());
let lstat = inner.lstat();
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
let builder = &mut FlatBufferBuilder::new();
debug!("op_stat {} {}", filename.display(), lstat);
let metadata = if lstat {
@@ -927,7 +923,7 @@ fn op_read_dir(
let cmd_id = base.cmd_id();
let path = String::from(inner.path().unwrap());
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_read_dir {}", path);
let builder = &mut FlatBufferBuilder::new();
let entries: Vec<_> = fs::read_dir(Path::new(&path))?
@@ -986,9 +982,9 @@ fn op_write_file(
return odd_future(e);
}
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_write_file {} {}", filename, data.len());
- deno_fs::write_file(Path::new(&filename), &data, perm)?;
+ deno_fs::write_file(Path::new(&filename), data, perm)?;
Ok(empty_buf())
})
}
@@ -1006,7 +1002,7 @@ fn op_rename(
if let Err(e) = state.check_write(&newpath_) {
return odd_future(e);
}
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_rename {} {}", oldpath.display(), newpath.display());
fs::rename(&oldpath, &newpath)?;
Ok(empty_buf())
@@ -1034,7 +1030,7 @@ fn op_symlink(
"Not implemented".to_string(),
));
}
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_symlink {} {}", oldname.display(), newname.display());
#[cfg(any(unix))]
std::os::unix::fs::symlink(&oldname, &newname)?;
@@ -1052,7 +1048,7 @@ fn op_read_link(
let cmd_id = base.cmd_id();
let name = PathBuf::from(inner.name().unwrap());
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_read_link {}", name.display());
let path = fs::read_link(&name)?;
let builder = &mut FlatBufferBuilder::new();
@@ -1121,7 +1117,7 @@ fn op_repl_readline(
// Ignore this clippy warning until this issue is addressed:
// https://github.com/rust-lang-nursery/rust-clippy/issues/1684
#[cfg_attr(feature = "cargo-clippy", allow(redundant_closure_call))]
- blocking!(base.sync(), || -> OpResult {
+ blocking(base.sync(), move || -> OpResult {
let line = resources::readline(rid, &prompt)?;
let builder = &mut FlatBufferBuilder::new();
@@ -1159,7 +1155,7 @@ fn op_truncate(
return odd_future(e);
}
- blocking!(base.sync(), || {
+ blocking(base.sync(), move || {
debug!("op_truncate {} {}", filename, len);
let f = fs::OpenOptions::new().write(true).open(&filename)?;
f.set_len(u64::from(len))?;
diff --git a/src/tokio_util.rs b/src/tokio_util.rs
index 31a67574e..6d295453f 100644
--- a/src/tokio_util.rs
+++ b/src/tokio_util.rs
@@ -73,3 +73,30 @@ impl Future for Accept {
}
}
}
+
+/// `futures::future::poll_fn` only support `F: FnMut()->Poll<T, E>`
+/// However, we require that `F: FnOnce()->Poll<T, E>`.
+/// Therefore, we created our version of `poll_fn`.
+pub fn poll_fn<T, E, F>(f: F) -> PollFn<F>
+where
+ F: FnOnce() -> Poll<T, E>,
+{
+ PollFn { inner: Some(f) }
+}
+
+pub struct PollFn<F> {
+ inner: Option<F>,
+}
+
+impl<T, E, F> Future for PollFn<F>
+where
+ F: FnOnce() -> Poll<T, E>,
+{
+ type Item = T;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<T, E> {
+ let f = self.inner.take().expect("Inner fn has been taken.");
+ f()
+ }
+}