summaryrefslogtreecommitdiff
path: root/cli/ops.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops.rs')
-rw-r--r--cli/ops.rs951
1 files changed, 537 insertions, 414 deletions
diff --git a/cli/ops.rs b/cli/ops.rs
index b14b6b1c6..e8fa47aad 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -27,14 +27,13 @@ use crate::version;
use crate::worker::Worker;
use deno::js_check;
use deno::Buf;
-use deno::CoreOp;
use deno::JSError;
use deno::ModuleSpecifier;
use deno::Op;
-use deno::OpResult;
use deno::PinnedBuf;
use flatbuffers::FlatBufferBuilder;
use futures;
+use futures::future;
use futures::Async;
use futures::Poll;
use futures::Sink;
@@ -62,13 +61,17 @@ use std::os::unix::fs::PermissionsExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
-type CliOpResult = OpResult<DenoError>;
+type OpResult = DenoResult<Buf>;
-type CliDispatchFn =
+pub type OpWithError = dyn Future<Item = Buf, Error = DenoError> + Send;
+
+// TODO Ideally we wouldn't have to box the OpWithError being returned.
+// The box is just to make it easier to get a prototype refactor working.
+type OpCreator =
fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option<PinnedBuf>)
- -> CliOpResult;
+ -> Box<OpWithError>;
-pub type OpSelector = fn(inner_type: msg::Any) -> Option<CliDispatchFn>;
+pub type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>;
#[inline]
fn empty_buf() -> Buf {
@@ -80,7 +83,7 @@ pub fn dispatch_all(
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
-) -> CoreOp {
+) -> Op {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if let Some(min_record) = parse_min_record(control) {
@@ -101,90 +104,81 @@ pub fn dispatch_all_legacy(
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
-) -> CoreOp {
+) -> Op {
let base = msg::get_root_as_base(&control);
- let inner_type = base.inner_type();
let is_sync = base.sync();
+ let inner_type = base.inner_type();
+ let cmd_id = base.cmd_id();
- debug!(
- "msg_from_js {} sync {}",
- msg::enum_name_any(inner_type),
- is_sync
- );
-
- let op_func: CliDispatchFn = match op_selector(inner_type) {
+ let op_func: OpCreator = match op_selector(inner_type) {
Some(v) => v,
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};
- let op_result = op_func(state, &base, zero_copy);
+ let op: Box<OpWithError> = op_func(state, &base, zero_copy);
let state = state.clone();
- match op_result {
- Ok(Op::Sync(buf)) => {
- state.metrics_op_completed(buf.len());
- Op::Sync(buf)
- }
- Ok(Op::Async(fut)) => {
- let result_fut = Box::new(
- fut.or_else(move |err: DenoError| -> Result<Buf, ()> {
- debug!("op err {}", err);
- // No matter whether we got an Err or Ok, we want a serialized message to
- // send back. So transform the DenoError into a Buf.
- let builder = &mut FlatBufferBuilder::new();
- let errmsg_offset = builder.create_string(&format!("{}", err));
- Ok(serialize_response(
- builder,
- msg::BaseArgs {
- error: Some(errmsg_offset),
- error_kind: err.kind(),
- ..Default::default()
- },
- ))
- }).and_then(move |buf: Buf| -> Result<Buf, ()> {
- // Handle empty responses. For sync responses we just want
- // to send null. For async we want to send a small message
- // with the cmd_id.
- let buf = if buf.len() > 0 {
- buf
- } else {
- let builder = &mut FlatBufferBuilder::new();
- serialize_response(
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- )
- };
- state.metrics_op_completed(buf.len());
- Ok(buf)
- }).map_err(|err| panic!("unexpected error {:?}", err)),
- );
- Op::Async(result_fut)
- }
- Err(err) => {
+ let fut = Box::new(
+ op.or_else(move |err: DenoError| -> Result<Buf, ()> {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
- let response_buf = serialize_response(
+ Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
- );
- state.metrics_op_completed(response_buf.len());
- Op::Sync(response_buf)
- }
+ ))
+ }).and_then(move |buf: Buf| -> Result<Buf, ()> {
+ // Handle empty responses. For sync responses we just want
+ // to send null. For async we want to send a small message
+ // with the cmd_id.
+ let buf = if is_sync || buf.len() > 0 {
+ buf
+ } else {
+ let builder = &mut FlatBufferBuilder::new();
+ serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ )
+ };
+ state.metrics_op_completed(buf.len());
+ Ok(buf)
+ }).map_err(|err| panic!("unexpected error {:?}", err)),
+ );
+
+ debug!(
+ "msg_from_js {} sync {}",
+ msg::enum_name_any(inner_type),
+ base.sync()
+ );
+
+ if base.sync() {
+ // TODO(ry) This is not correct! If the sync op is not actually synchronous
+ // (like in the case of op_fetch_module_meta_data) this wait() will block
+ // a thread in the Tokio runtime. Depending on the size of the runtime's
+ // thread pool, this may result in a dead lock!
+ //
+ // The solution is that ops should return an Op directly. Op::Sync contains
+ // the result value, so if its returned directly from the OpCreator, we
+ // know it has actually be evaluated synchronously.
+ Op::Sync(fut.wait().unwrap())
+ } else {
+ Op::Async(fut)
}
}
/// Standard ops set for most isolates
-pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
+pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
msg::Any::Cache => Some(op_cache),
@@ -253,9 +247,9 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
// nanoseconds are rounded on 2ms.
fn op_now(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
@@ -276,8 +270,8 @@ fn op_now(
subsec_nanos,
},
);
-
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -289,10 +283,9 @@ fn op_now(
fn op_is_tty(
_state: &ThreadSafeState,
-
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
_data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::IsTTYRes::create(
builder,
@@ -302,7 +295,8 @@ fn op_is_tty(
stderr: atty::is(atty::Stream::Stderr),
},
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -314,19 +308,18 @@ fn op_is_tty(
fn op_exit(
_state: &ThreadSafeState,
-
base: &msg::Base<'_>,
_data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
let inner = base.inner_as_exit().unwrap();
std::process::exit(inner.code())
}
fn op_start(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let mut builder = FlatBufferBuilder::new();
@@ -375,7 +368,8 @@ fn op_start(
},
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
&mut builder,
msg::BaseArgs {
inner_type: msg::Any::StartRes,
@@ -389,7 +383,7 @@ fn op_format_error(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_format_error().unwrap();
let orig_error = String::from(inner.error().unwrap());
@@ -408,22 +402,23 @@ fn op_format_error(
},
);
- let response_buf = serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
&mut builder,
msg::BaseArgs {
inner_type: msg::Any::FormatErrorRes,
inner: Some(inner.as_union_value()),
..Default::default()
},
- );
-
- ok_buf(response_buf)
+ ))
}
fn serialize_response(
+ cmd_id: u32,
builder: &mut FlatBufferBuilder<'_>,
- args: msg::BaseArgs<'_>,
+ mut args: msg::BaseArgs<'_>,
) -> Buf {
+ args.cmd_id = cmd_id;
let base = msg::Base::create(builder, &args);
msg::finish_base_buffer(builder, base);
let data = builder.finished_data();
@@ -432,20 +427,21 @@ fn serialize_response(
}
#[inline]
-pub fn ok_future(buf: Buf) -> CliOpResult {
- Ok(Op::Async(Box::new(futures::future::ok(buf))))
+pub fn ok_future(buf: Buf) -> Box<OpWithError> {
+ Box::new(futures::future::ok(buf))
}
+// Shout out to Earl Sweatshirt.
#[inline]
-pub fn ok_buf(buf: Buf) -> CliOpResult {
- Ok(Op::Sync(buf))
+pub fn odd_future(err: DenoError) -> Box<OpWithError> {
+ Box::new(futures::future::err(err))
}
fn op_cache(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_cache().unwrap();
let extension = inner.extension().unwrap();
@@ -459,9 +455,11 @@ fn op_cache(
// cache path. In the future, checksums will not be used in the cache
// filenames and this requirement can be removed. See
// https://github.com/denoland/deno/issues/2057
- let module_meta_data = state
- .dir
- .fetch_module_meta_data(module_id, ".", true, true)?;
+ let r = state.dir.fetch_module_meta_data(module_id, ".", true, true);
+ if let Err(err) = r {
+ return odd_future(err);
+ }
+ let module_meta_data = r.unwrap();
let (js_cache_path, source_map_path) = state
.dir
@@ -469,15 +467,21 @@ fn op_cache(
if extension == ".map" {
debug!("cache {:?}", source_map_path);
- fs::write(source_map_path, contents).map_err(DenoError::from)?;
+ let r = fs::write(source_map_path, contents);
+ if let Err(err) = r {
+ return odd_future(err.into());
+ }
} else if extension == ".js" {
debug!("cache {:?}", js_cache_path);
- fs::write(js_cache_path, contents).map_err(DenoError::from)?;
+ let r = fs::write(js_cache_path, contents);
+ if let Err(err) = r {
+ return odd_future(err.into());
+ }
} else {
unreachable!();
}
- ok_buf(empty_buf())
+ ok_future(empty_buf())
}
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
@@ -485,13 +489,10 @@ fn op_fetch_module_meta_data(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(errors::no_async_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_fetch_module_meta_data().unwrap();
-
+ let cmd_id = base.cmd_id();
let specifier = inner.specifier().unwrap();
let referrer = inner.referrer().unwrap();
@@ -509,7 +510,7 @@ fn op_fetch_module_meta_data(
Some(module_specifier) => module_specifier.to_string(),
None => specifier.to_string(),
},
- Err(err) => return Err(DenoError::from(err)),
+ Err(err) => return odd_future(DenoError::from(err)),
},
None => specifier.to_string(),
};
@@ -532,6 +533,7 @@ fn op_fetch_module_meta_data(
};
let inner = msg::FetchModuleMetaDataRes::create(builder, &msg_args);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -541,50 +543,52 @@ fn op_fetch_module_meta_data(
))
});
+ // Unfortunately TypeScript's CompilerHost interface does not leave room for
+ // asynchronous source code fetching. This complicates things greatly and
+ // requires us to use tokio_util::block_on() below.
+ assert!(base.sync());
+
// WARNING: Here we use tokio_util::block_on() which starts a new Tokio
// runtime for executing the future. This is so we don't inadvernently run
// out of threads in the main runtime.
- let result_buf = tokio_util::block_on(fut)?;
- Ok(Op::Sync(result_buf))
+ Box::new(futures::future::result(tokio_util::block_on(fut)))
}
fn op_chdir(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_chdir().unwrap();
let directory = inner.directory().unwrap();
- std::env::set_current_dir(&directory)?;
- ok_buf(empty_buf())
+ Box::new(futures::future::result(|| -> OpResult {
+ std::env::set_current_dir(&directory)?;
+ Ok(empty_buf())
+ }()))
}
fn op_global_timer_stop(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(errors::no_async_support());
- }
+) -> Box<OpWithError> {
+ assert!(base.sync());
assert!(data.is_none());
let state = state;
let mut t = state.global_timer.lock().unwrap();
t.cancel();
- Ok(Op::Sync(empty_buf()))
+ ok_future(empty_buf())
}
fn op_global_timer(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
+ assert!(!base.sync());
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_global_timer().unwrap();
let val = inner.timeout();
assert!(val >= 0);
@@ -594,11 +598,12 @@ fn op_global_timer(
let deadline = Instant::now() + Duration::from_millis(val as u64);
let f = t.new_timeout(deadline);
- Ok(Op::Async(Box::new(f.then(move |_| {
+ Box::new(f.then(move |_| {
let builder = &mut FlatBufferBuilder::new();
let inner =
msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {});
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -606,31 +611,36 @@ fn op_global_timer(
..Default::default()
},
))
- }))))
+ }))
}
fn op_set_env(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_set_env().unwrap();
let key = inner.key().unwrap();
let value = inner.value().unwrap();
- state.check_env()?;
+ if let Err(e) = state.check_env() {
+ return odd_future(e);
+ }
std::env::set_var(key, value);
- ok_buf(empty_buf())
+ ok_future(empty_buf())
}
fn op_env(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
- state.check_env()?;
+ if let Err(e) = state.check_env() {
+ return odd_future(e);
+ }
let builder = &mut FlatBufferBuilder::new();
let vars: Vec<_> = std::env::vars()
@@ -641,24 +651,24 @@ fn op_env(
builder,
&msg::EnvironResArgs { map: Some(tables) },
);
- let response_buf = serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::EnvironRes,
..Default::default()
},
- );
- ok_buf(response_buf)
+ ))
}
fn op_permissions(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let inner = msg::PermissionsRes::create(
builder,
@@ -671,26 +681,26 @@ fn op_permissions(
hrtime: state.permissions.allows_hrtime(),
},
);
- let response_buf = serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::PermissionsRes,
..Default::default()
},
- );
- ok_buf(response_buf)
+ ))
}
fn op_revoke_permission(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_permission_revoke().unwrap();
let permission = inner.permission().unwrap();
- match permission {
+ let result = match permission {
"run" => state.permissions.revoke_run(),
"read" => state.permissions.revoke_read(),
"write" => state.permissions.revoke_write(),
@@ -698,16 +708,20 @@ fn op_revoke_permission(
"env" => state.permissions.revoke_env(),
"hrtime" => state.permissions.revoke_hrtime(),
_ => Ok(()),
- }?;
- ok_buf(empty_buf())
+ };
+ if let Err(e) = result {
+ return odd_future(e);
+ }
+ ok_future(empty_buf())
}
fn op_fetch(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
let inner = base.inner_as_fetch().unwrap();
+ let cmd_id = base.cmd_id();
let header = inner.header().unwrap();
assert!(header.is_request());
@@ -718,10 +732,19 @@ fn op_fetch(
Some(buf) => hyper::Body::from(Vec::from(&*buf)),
};
- let req = msg_util::deserialize_request(header, body)?;
+ let maybe_req = msg_util::deserialize_request(header, body);
+ if let Err(e) = maybe_req {
+ return odd_future(e);
+ }
+ let req = maybe_req.unwrap();
- let url_ = url::Url::parse(url).map_err(DenoError::from)?;
- state.check_net_url(url_)?;
+ let url_ = match url::Url::parse(url) {
+ Err(err) => return odd_future(DenoError::from(err)),
+ Ok(v) => v,
+ };
+ if let Err(e) = state.check_net_url(url_) {
+ return odd_future(e);
+ }
let client = http_util::get_client();
@@ -744,6 +767,7 @@ fn op_fetch(
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -752,12 +776,7 @@ fn op_fetch(
},
))
});
- if base.sync() {
- let result_buf = future.wait()?;
- Ok(Op::Sync(result_buf))
- } else {
- Ok(Op::Async(Box::new(future)))
- }
+ Box::new(future)
}
// This is just type conversion. Implement From trait?
@@ -775,17 +794,14 @@ where
}
}
-fn blocking<F>(is_sync: bool, f: F) -> CliOpResult
+fn blocking<F>(is_sync: bool, f: F) -> Box<OpWithError>
where
F: 'static + Send + FnOnce() -> DenoResult<Buf>,
{
if is_sync {
- let result_buf = f()?;
- Ok(Op::Sync(result_buf))
+ Box::new(futures::future::result(f()))
} else {
- Ok(Op::Async(Box::new(tokio_util::poll_fn(move || {
- convert_blocking(f)
- }))))
+ Box::new(tokio_util::poll_fn(move || convert_blocking(f)))
}
}
@@ -793,19 +809,22 @@ fn op_make_temp_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let base = Box::new(*base);
let inner = base.inner_as_make_temp_dir().unwrap();
+ let cmd_id = base.cmd_id();
// FIXME
- state.check_write("make_temp")?;
+ if let Err(e) = state.check_write("make_temp") {
+ return odd_future(e);
+ }
let dir = inner.dir().map(PathBuf::from);
let prefix = inner.prefix().map(String::from);
let suffix = inner.suffix().map(String::from);
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
// TODO(piscisaureus): use byte vector for paths, not a string.
// See https://github.com/denoland/deno/issues/627.
// We can't assume that paths are always valid utf8 strings.
@@ -824,6 +843,7 @@ fn op_make_temp_dir(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -838,14 +858,19 @@ fn op_mkdir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_mkdir().unwrap();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let recursive = inner.recursive();
let mode = inner.mode();
- state.check_write(&path_)?;
+ if let Err(e) = state.check_write(&path_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_mkdir {}", path_);
@@ -858,13 +883,18 @@ fn op_chmod(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_chmod().unwrap();
let _mode = inner.mode();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_write(&path_)?;
+ if let Err(e) = state.check_write(&path_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_chmod {}", &path_);
@@ -884,14 +914,16 @@ fn op_chown(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_chown().unwrap();
let path = String::from(inner.path().unwrap());
let uid = inner.uid();
let gid = inner.gid();
- state.check_write(&path)?;
+ if let Err(e) = state.check_write(&path) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_chown {}", &path);
@@ -906,11 +938,14 @@ fn op_open(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_open().unwrap();
- let (filename, filename_) = resolve_path(inner.filename().unwrap())?;
+ let (filename, filename_) = match resolve_path(inner.filename().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let mode = inner.mode().unwrap();
let mut open_options = tokio::fs::OpenOptions::new();
@@ -951,26 +986,35 @@ fn op_open(
match mode {
"r" => {
- state.check_read(&filename_)?;
+ if let Err(e) = state.check_read(&filename_) {
+ return odd_future(e);
+ }
}
"w" | "a" | "x" => {
- state.check_write(&filename_)?;
+ if let Err(e) = state.check_write(&filename_) {
+ return odd_future(e);
+ }
}
&_ => {
- state.check_read(&filename_)?;
- state.check_write(&filename_)?;
+ if let Err(e) = state.check_read(&filename_) {
+ return odd_future(e);
+ }
+ if let Err(e) = state.check_write(&filename_) {
+ return odd_future(e);
+ }
}
}
let op = open_options
.open(filename)
.map_err(DenoError::from)
- .and_then(move |fs_file| {
+ .and_then(move |fs_file| -> OpResult {
let resource = resources::add_fs_file(fs_file);
let builder = &mut FlatBufferBuilder::new();
let inner =
msg::OpenRes::create(builder, &msg::OpenResArgs { rid: resource.rid });
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -979,27 +1023,22 @@ fn op_open(
},
))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
fn op_close(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_close().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
resource.close();
- ok_buf(empty_buf())
+ ok_future(empty_buf())
}
}
}
@@ -1008,14 +1047,14 @@ fn op_kill(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_kill().unwrap();
let pid = inner.pid();
let signo = inner.signo();
match kill(pid, signo) {
- Ok(_) => ok_buf(empty_buf()),
- Err(e) => Err(e),
+ Ok(_) => ok_future(empty_buf()),
+ Err(e) => odd_future(e),
}
}
@@ -1023,13 +1062,13 @@ fn op_shutdown(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let how = inner.how();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(mut resource) => {
let shutdown_mode = match how {
0 => Shutdown::Read,
@@ -1049,12 +1088,13 @@ fn op_read(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_read().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = tokio::io::read(resource, data.unwrap())
.map_err(DenoError::from)
@@ -1068,6 +1108,7 @@ fn op_read(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1076,12 +1117,7 @@ fn op_read(
},
))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
}
}
@@ -1090,12 +1126,13 @@ fn op_write(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_write().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = tokio_write::write(resource, data.unwrap())
.map_err(DenoError::from)
@@ -1108,6 +1145,7 @@ fn op_write(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1116,12 +1154,7 @@ fn op_write(
},
))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
}
}
@@ -1130,24 +1163,20 @@ fn op_seek(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let _cmd_id = base.cmd_id();
let inner = base.inner_as_seek().unwrap();
let rid = inner.rid();
let offset = inner.offset();
let whence = inner.whence();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = resources::seek(resource, offset, whence)
.and_then(move |_| Ok(empty_buf()));
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
}
}
@@ -1156,13 +1185,18 @@ fn op_remove(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_remove().unwrap();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let recursive = inner.recursive();
- state.check_write(&path_)?;
+ if let Err(e) = state.check_write(&path_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_remove {}", path.display());
@@ -1182,14 +1216,24 @@ fn op_copy_file(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_copy_file().unwrap();
- let (from, from_) = resolve_path(inner.from().unwrap())?;
- let (to, to_) = resolve_path(inner.to().unwrap())?;
+ let (from, from_) = match resolve_path(inner.from().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (to, to_) = match resolve_path(inner.to().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_read(&from_)?;
- state.check_write(&to_)?;
+ if let Err(e) = state.check_read(&from_) {
+ return odd_future(e);
+ }
+ if let Err(e) = state.check_write(&to_) {
+ return odd_future(e);
+ }
debug!("op_copy_file {} {}", from.display(), to.display());
blocking(base.sync(), move || {
@@ -1230,40 +1274,47 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
fn op_cwd(
_state: &ThreadSafeState,
-
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
- let path = std::env::current_dir()?;
- let builder = &mut FlatBufferBuilder::new();
- let cwd =
- builder.create_string(&path.into_os_string().into_string().unwrap());
- let inner = msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) });
- let response_buf = serialize_response(
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::CwdRes,
- ..Default::default()
- },
- );
- ok_buf(response_buf)
+ let cmd_id = base.cmd_id();
+ Box::new(futures::future::result(|| -> OpResult {
+ let path = std::env::current_dir()?;
+ let builder = &mut FlatBufferBuilder::new();
+ let cwd =
+ builder.create_string(&path.into_os_string().into_string().unwrap());
+ let inner =
+ msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) });
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::CwdRes,
+ ..Default::default()
+ },
+ ))
+ }()))
}
fn op_stat(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_stat().unwrap();
-
- let (filename, filename_) = resolve_path(inner.filename().unwrap())?;
+ let cmd_id = base.cmd_id();
+ let (filename, filename_) = match resolve_path(inner.filename().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let lstat = inner.lstat();
- state.check_read(&filename_)?;
+ if let Err(e) = state.check_read(&filename_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
let builder = &mut FlatBufferBuilder::new();
@@ -1290,6 +1341,7 @@ fn op_stat(
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1304,14 +1356,20 @@ fn op_read_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_read_dir().unwrap();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let cmd_id = base.cmd_id();
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_read(&path_)?;
+ if let Err(e) = state.check_read(&path_) {
+ return odd_future(e);
+ }
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_read_dir {}", path.display());
let builder = &mut FlatBufferBuilder::new();
let entries: Vec<_> = fs::read_dir(path)?
@@ -1345,6 +1403,7 @@ fn op_read_dir(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1359,15 +1418,22 @@ fn op_rename(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_rename().unwrap();
- let (oldpath, _) = resolve_path(inner.oldpath().unwrap())?;
- let (newpath, newpath_) = resolve_path(inner.newpath().unwrap())?;
-
- state.check_write(&newpath_)?;
+ let (oldpath, _) = match resolve_path(inner.oldpath().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (newpath, newpath_) = match resolve_path(inner.newpath().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- blocking(base.sync(), move || {
+ if let Err(e) = state.check_write(&newpath_) {
+ return odd_future(e);
+ }
+ blocking(base.sync(), move || -> OpResult {
debug!("op_rename {} {}", oldpath.display(), newpath.display());
fs::rename(&oldpath, &newpath)?;
Ok(empty_buf())
@@ -1378,15 +1444,23 @@ fn op_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_link().unwrap();
- let (oldname, _) = resolve_path(inner.oldname().unwrap())?;
- let (newname, newname_) = resolve_path(inner.newname().unwrap())?;
+ let (oldname, _) = match resolve_path(inner.oldname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (newname, newname_) = match resolve_path(inner.newname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_write(&newname_)?;
+ if let Err(e) = state.check_write(&newname_) {
+ return odd_future(e);
+ }
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_link {} {}", oldname.display(), newname.display());
std::fs::hard_link(&oldname, &newname)?;
Ok(empty_buf())
@@ -1397,18 +1471,29 @@ fn op_symlink(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_symlink().unwrap();
- let (oldname, _) = resolve_path(inner.oldname().unwrap())?;
- let (newname, newname_) = resolve_path(inner.newname().unwrap())?;
+ let (oldname, _) = match resolve_path(inner.oldname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (newname, newname_) = match resolve_path(inner.newname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_write(&newname_)?;
+ if let Err(e) = state.check_write(&newname_) {
+ return odd_future(e);
+ }
// TODO Use type for Windows.
if cfg!(windows) {
- return Err(errors::new(ErrorKind::Other, "Not implemented".to_string()));
+ return odd_future(errors::new(
+ ErrorKind::Other,
+ "Not implemented".to_string(),
+ ));
}
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_symlink {} {}", oldname.display(), newname.display());
#[cfg(any(unix))]
std::os::unix::fs::symlink(&oldname, &newname)?;
@@ -1420,15 +1505,20 @@ fn op_read_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_readlink().unwrap();
+ let cmd_id = base.cmd_id();
+ let (name, name_) = match resolve_path(inner.name().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- let (name, name_) = resolve_path(inner.name().unwrap())?;
-
- state.check_read(&name_)?;
+ if let Err(e) = state.check_read(&name_) {
+ return odd_future(e);
+ }
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_read_link {}", name.display());
let path = fs::read_link(&name)?;
let builder = &mut FlatBufferBuilder::new();
@@ -1440,6 +1530,7 @@ fn op_read_link(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1454,10 +1545,10 @@ fn op_repl_start(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_repl_start().unwrap();
-
+ let cmd_id = base.cmd_id();
let history_file = String::from(inner.history_file().unwrap());
debug!("op_repl_start {}", history_file);
@@ -1470,7 +1561,8 @@ fn op_repl_start(
builder,
&msg::ReplStartResArgs { rid: resource.rid },
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1484,15 +1576,15 @@ fn op_repl_readline(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_repl_readline().unwrap();
-
+ let cmd_id = base.cmd_id();
let rid = inner.rid();
let prompt = inner.prompt().unwrap().to_owned();
debug!("op_repl_readline {} {}", rid, prompt);
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
let repl = resources::get_repl(rid)?;
let line = repl.lock().unwrap().readline(&prompt)?;
@@ -1505,6 +1597,7 @@ fn op_repl_readline(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1519,14 +1612,19 @@ fn op_truncate(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_truncate().unwrap();
- let (filename, filename_) = resolve_path(inner.name().unwrap())?;
+ let (filename, filename_) = match resolve_path(inner.name().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let len = inner.len();
- state.check_write(&filename_)?;
+ if let Err(e) = state.check_write(&filename_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_truncate {} {}", filename_, len);
@@ -1540,7 +1638,7 @@ fn op_utime(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_utime().unwrap();
@@ -1548,7 +1646,9 @@ fn op_utime(
let atime = inner.atime();
let mtime = inner.mtime();
- state.check_write(&filename)?;
+ if let Err(e) = state.check_write(&filename) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_utimes {} {} {}", filename, atime, mtime);
@@ -1561,35 +1661,41 @@ fn op_listen(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_listen().unwrap();
let network = inner.network().unwrap();
assert_eq!(network, "tcp");
let address = inner.address().unwrap();
- state.check_net(&address)?;
+ if let Err(e) = state.check_net(&address) {
+ return odd_future(e);
+ }
- let addr = resolve_addr(address).wait()?;
- let listener = TcpListener::bind(&addr)?;
- let resource = resources::add_tcp_listener(listener);
+ Box::new(futures::future::result((move || {
+ let addr = resolve_addr(address).wait()?;
+ let listener = TcpListener::bind(&addr)?;
+ let resource = resources::add_tcp_listener(listener);
- let builder = &mut FlatBufferBuilder::new();
- let inner =
- msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid });
- let response_buf = serialize_response(
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ListenRes,
- ..Default::default()
- },
- );
- ok_buf(response_buf)
+ let builder = &mut FlatBufferBuilder::new();
+ let inner = msg::ListenRes::create(
+ builder,
+ &msg::ListenResArgs { rid: resource.rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::ListenRes,
+ ..Default::default()
+ },
+ ))
+ })()))
}
-fn new_conn(tcp_stream: TcpStream) -> DenoResult<Buf> {
+fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
// TODO forward socket_addr to client.
@@ -1602,6 +1708,7 @@ fn new_conn(tcp_stream: TcpStream) -> DenoResult<Buf> {
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1615,24 +1722,21 @@ fn op_accept(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_accept().unwrap();
let server_rid = inner.rid();
match resources::lookup(server_rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(server_resource) => {
let op = tokio_util::accept(server_resource)
.map_err(DenoError::from)
- .and_then(move |(tcp_stream, _socket_addr)| new_conn(tcp_stream));
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ .and_then(move |(tcp_stream, _socket_addr)| {
+ new_conn(cmd_id, tcp_stream)
+ });
+ Box::new(op)
}
}
}
@@ -1641,15 +1745,17 @@ fn op_dial(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_dial().unwrap();
let network = inner.network().unwrap();
assert_eq!(network, "tcp"); // TODO Support others.
let address = inner.address().unwrap();
- state.check_net(&address)?;
+ if let Err(e) = state.check_net(&address) {
+ return odd_future(e);
+ }
let op =
resolve_addr(address)
@@ -1657,30 +1763,26 @@ fn op_dial(
.and_then(move |addr| {
TcpStream::connect(&addr)
.map_err(DenoError::from)
- .and_then(new_conn)
+ .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
fn op_metrics(
state: &ThreadSafeState,
-
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let inner = msg::MetricsRes::create(
builder,
&msg::MetricsResArgs::from(&state.metrics),
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1692,10 +1794,11 @@ fn op_metrics(
fn op_resources(
_state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let serialized_resources = table_entries();
@@ -1722,7 +1825,8 @@ fn op_resources(
},
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1744,12 +1848,13 @@ fn op_run(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(errors::no_async_support());
- }
+) -> Box<OpWithError> {
+ assert!(base.sync());
+ let cmd_id = base.cmd_id();
- state.check_run()?;
+ if let Err(e) = state.check_run() {
+ return odd_future(e);
+ }
assert!(data.is_none());
let inner = base.inner_as_run().unwrap();
@@ -1773,7 +1878,12 @@ fn op_run(
c.stderr(subprocess_stdio_map(inner.stderr()));
// Spawn the command.
- let child = c.spawn_async().map_err(DenoError::from)?;
+ let child = match c.spawn_async() {
+ Ok(v) => v,
+ Err(err) => {
+ return odd_future(err.into());
+ }
+ };
let pid = child.id();
let resources = resources::add_child(child);
@@ -1796,29 +1906,37 @@ fn op_run(
let builder = &mut FlatBufferBuilder::new();
let inner = msg::RunRes::create(builder, &res_args);
- Ok(Op::Sync(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::RunRes,
..Default::default()
},
- )))
+ ))
}
fn op_run_status(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_run_status().unwrap();
let rid = inner.rid();
- state.check_run()?;
+ if let Err(e) = state.check_run() {
+ return odd_future(e);
+ }
- let future = resources::child_status(rid)?;
+ let future = match resources::child_status(rid) {
+ Err(e) => {
+ return odd_future(e);
+ }
+ Ok(f) => f,
+ };
let future = future.and_then(move |run_status| {
let code = run_status.code();
@@ -1843,6 +1961,7 @@ fn op_run_status(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1851,12 +1970,7 @@ fn op_run_status(
},
))
});
- if base.sync() {
- let buf = future.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(future)))
- }
+ Box::new(future)
}
struct GetMessageFuture {
@@ -1880,11 +1994,9 @@ fn op_worker_get_message(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
let op = GetMessageFuture {
state: state.clone(),
@@ -1900,6 +2012,7 @@ fn op_worker_get_message(
&msg::WorkerGetMessageResArgs { data },
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1908,32 +2021,37 @@ fn op_worker_get_message(
},
))
});
- Ok(Op::Async(Box::new(op)))
+ Box::new(op)
}
/// Post message to host as guest worker
fn op_worker_post_message(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
+
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
let wc = state.worker_channels.lock().unwrap();
wc.0.clone()
};
- tx.send(d)
- .wait()
- .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
+ let op = tx.send(d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
- ok_buf(serialize_response(
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
}
/// Create worker as the host
@@ -1941,9 +2059,9 @@ fn op_create_worker(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_create_worker().unwrap();
let specifier = inner.specifier().unwrap();
@@ -1963,33 +2081,39 @@ fn op_create_worker(
js_check(worker.execute("denoMain()"));
js_check(worker.execute("workerMain()"));
- let module_specifier = ModuleSpecifier::resolve_root(specifier)?;
-
- let op = worker
- .execute_mod_async(&module_specifier, false)
- .and_then(move |()| {
- let mut workers_tl = parent_state.workers.lock().unwrap();
- workers_tl.insert(rid, worker.shared());
- let builder = &mut FlatBufferBuilder::new();
- let msg_inner = msg::CreateWorkerRes::create(
- builder,
- &msg::CreateWorkerResArgs { rid },
- );
- Ok(serialize_response(
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::CreateWorkerRes,
- ..Default::default()
- },
- ))
- }).map_err(|err| match err {
- errors::RustOrJsError::Js(_) => errors::worker_init_failed(),
- errors::RustOrJsError::Rust(err) => err,
- });
+ let op = ModuleSpecifier::resolve_root(specifier)
+ .and_then(|module_specifier| {
+ Ok(
+ worker
+ .execute_mod_async(&module_specifier, false)
+ .and_then(move |()| {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner = msg::CreateWorkerRes::create(
+ builder,
+ &msg::CreateWorkerResArgs { rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ ))
+ }).map_err(|err| match err {
+ errors::RustOrJsError::Js(_) => errors::worker_init_failed(),
+ errors::RustOrJsError::Rust(err) => err,
+ }),
+ )
+ }).map_err(DenoError::from);
- let result = op.wait()?;
- Ok(Op::Sync(result))
+ Box::new(match op {
+ Ok(op) => future::Either::A(op),
+ Err(err) => future::Either::B(future::result(Err(err))),
+ })
}
/// Return when the worker closes
@@ -1997,12 +2121,9 @@ fn op_host_get_worker_closed(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_worker_closed().unwrap();
let rid = inner.rid();
let state = state.clone();
@@ -2013,17 +2134,17 @@ fn op_host_get_worker_closed(
worker.clone()
};
- let op = Box::new(shared_worker_future.then(move |_result| {
+ Box::new(shared_worker_future.then(move |_result| {
let builder = &mut FlatBufferBuilder::new();
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
))
- }));
- Ok(Op::Async(Box::new(op)))
+ }))
}
/// Get message from guest worker as host
@@ -2031,12 +2152,9 @@ fn op_host_get_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_message().unwrap();
let rid = inner.rid();
@@ -2051,6 +2169,7 @@ fn op_host_get_message(
&msg::HostGetMessageResArgs { data },
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(msg_inner.as_union_value()),
@@ -2059,7 +2178,7 @@ fn op_host_get_message(
},
))
});
- Ok(Op::Async(Box::new(op)))
+ Box::new(op)
}
/// Post message to guest worker as host
@@ -2067,30 +2186,34 @@ fn op_host_post_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_host_post_message().unwrap();
let rid = inner.rid();
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- resources::post_message_to_worker(rid, d)
- .wait()
- .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
+ let op = resources::post_message_to_worker(rid, d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
- ok_buf(serialize_response(
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
}
fn op_get_random_values(
state: &ThreadSafeState,
_base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
if let Some(ref seeded_rng) = state.seeded_rng {
let mut rng = seeded_rng.lock().unwrap();
rng.fill(&mut data.unwrap()[..]);
@@ -2099,5 +2222,5 @@ fn op_get_random_values(
rng.fill(&mut data.unwrap()[..]);
}
- ok_buf(empty_buf())
+ Box::new(ok_future(empty_buf()))
}