summaryrefslogtreecommitdiff
path: root/cli/ops.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops.rs')
-rw-r--r--cli/ops.rs856
1 files changed, 389 insertions, 467 deletions
diff --git a/cli/ops.rs b/cli/ops.rs
index e8fa47aad..2155dcd5a 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -27,13 +27,14 @@ use crate::version;
use crate::worker::Worker;
use deno::js_check;
use deno::Buf;
+use deno::CoreOp;
use deno::JSError;
use deno::ModuleSpecifier;
use deno::Op;
+use deno::OpResult;
use deno::PinnedBuf;
use flatbuffers::FlatBufferBuilder;
use futures;
-use futures::future;
use futures::Async;
use futures::Poll;
use futures::Sink;
@@ -61,17 +62,13 @@ use std::os::unix::fs::PermissionsExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
-type OpResult = DenoResult<Buf>;
+type CliOpResult = OpResult<DenoError>;
-pub type OpWithError = dyn Future<Item = Buf, Error = DenoError> + Send;
-
-// TODO Ideally we wouldn't have to box the OpWithError being returned.
-// The box is just to make it easier to get a prototype refactor working.
-type OpCreator =
+type CliDispatchFn =
fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option<PinnedBuf>)
- -> Box<OpWithError>;
+ -> CliOpResult;
-pub type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>;
+pub type OpSelector = fn(inner_type: msg::Any) -> Option<CliDispatchFn>;
#[inline]
fn empty_buf() -> Buf {
@@ -83,7 +80,7 @@ pub fn dispatch_all(
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
-) -> Op {
+) -> CoreOp {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if let Some(min_record) = parse_min_record(control) {
@@ -104,29 +101,78 @@ pub fn dispatch_all_legacy(
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
-) -> Op {
+) -> CoreOp {
let base = msg::get_root_as_base(&control);
- let is_sync = base.sync();
let inner_type = base.inner_type();
+ let is_sync = base.sync();
let cmd_id = base.cmd_id();
- let op_func: OpCreator = match op_selector(inner_type) {
+ debug!(
+ "msg_from_js {} sync {}",
+ msg::enum_name_any(inner_type),
+ is_sync
+ );
+
+ let op_func: CliDispatchFn = match op_selector(inner_type) {
Some(v) => v,
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};
- let op: Box<OpWithError> = op_func(state, &base, zero_copy);
+ let op_result = op_func(state, &base, zero_copy);
let state = state.clone();
- let fut = Box::new(
- op.or_else(move |err: DenoError| -> Result<Buf, ()> {
+ match op_result {
+ Ok(Op::Sync(buf)) => {
+ state.metrics_op_completed(buf.len());
+ Op::Sync(buf)
+ }
+ Ok(Op::Async(fut)) => {
+ let result_fut = Box::new(
+ fut.or_else(move |err: DenoError| -> Result<Buf, ()> {
+ debug!("op err {}", err);
+ // No matter whether we got an Err or Ok, we want a serialized message to
+ // send back. So transform the DenoError into a Buf.
+ let builder = &mut FlatBufferBuilder::new();
+ let errmsg_offset = builder.create_string(&format!("{}", err));
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ error: Some(errmsg_offset),
+ error_kind: err.kind(),
+ ..Default::default()
+ },
+ ))
+ }).and_then(move |buf: Buf| -> Result<Buf, ()> {
+ // Handle empty responses. For sync responses we just want
+ // to send null. For async we want to send a small message
+ // with the cmd_id.
+ let buf = if buf.len() > 0 {
+ buf
+ } else {
+ let builder = &mut FlatBufferBuilder::new();
+ serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ )
+ };
+ state.metrics_op_completed(buf.len());
+ Ok(buf)
+ }).map_err(|err| panic!("unexpected error {:?}", err)),
+ );
+ Op::Async(result_fut)
+ }
+ Err(err) => {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
- Ok(serialize_response(
+ let response_buf = serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -134,51 +180,15 @@ pub fn dispatch_all_legacy(
error_kind: err.kind(),
..Default::default()
},
- ))
- }).and_then(move |buf: Buf| -> Result<Buf, ()> {
- // Handle empty responses. For sync responses we just want
- // to send null. For async we want to send a small message
- // with the cmd_id.
- let buf = if is_sync || buf.len() > 0 {
- buf
- } else {
- let builder = &mut FlatBufferBuilder::new();
- serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- )
- };
- state.metrics_op_completed(buf.len());
- Ok(buf)
- }).map_err(|err| panic!("unexpected error {:?}", err)),
- );
-
- debug!(
- "msg_from_js {} sync {}",
- msg::enum_name_any(inner_type),
- base.sync()
- );
-
- if base.sync() {
- // TODO(ry) This is not correct! If the sync op is not actually synchronous
- // (like in the case of op_fetch_module_meta_data) this wait() will block
- // a thread in the Tokio runtime. Depending on the size of the runtime's
- // thread pool, this may result in a dead lock!
- //
- // The solution is that ops should return an Op directly. Op::Sync contains
- // the result value, so if its returned directly from the OpCreator, we
- // know it has actually be evaluated synchronously.
- Op::Sync(fut.wait().unwrap())
- } else {
- Op::Async(fut)
+ );
+ state.metrics_op_completed(response_buf.len());
+ Op::Sync(response_buf)
+ }
}
}
/// Standard ops set for most isolates
-pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
+pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
match inner_type {
msg::Any::Accept => Some(op_accept),
msg::Any::Cache => Some(op_cache),
@@ -249,7 +259,7 @@ fn op_now(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
@@ -270,7 +280,8 @@ fn op_now(
subsec_nanos,
},
);
- ok_future(serialize_response(
+
+ ok_buf(serialize_response(
base.cmd_id(),
builder,
msg::BaseArgs {
@@ -285,7 +296,7 @@ fn op_is_tty(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
_data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::IsTTYRes::create(
builder,
@@ -295,7 +306,7 @@ fn op_is_tty(
stderr: atty::is(atty::Stream::Stderr),
},
);
- ok_future(serialize_response(
+ ok_buf(serialize_response(
base.cmd_id(),
builder,
msg::BaseArgs {
@@ -310,7 +321,7 @@ fn op_exit(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
_data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let inner = base.inner_as_exit().unwrap();
std::process::exit(inner.code())
}
@@ -319,7 +330,7 @@ fn op_start(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let mut builder = FlatBufferBuilder::new();
@@ -368,7 +379,7 @@ fn op_start(
},
);
- ok_future(serialize_response(
+ ok_buf(serialize_response(
base.cmd_id(),
&mut builder,
msg::BaseArgs {
@@ -383,7 +394,7 @@ fn op_format_error(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_format_error().unwrap();
let orig_error = String::from(inner.error().unwrap());
@@ -402,7 +413,7 @@ fn op_format_error(
},
);
- ok_future(serialize_response(
+ let response_buf = serialize_response(
base.cmd_id(),
&mut builder,
msg::BaseArgs {
@@ -410,7 +421,9 @@ fn op_format_error(
inner: Some(inner.as_union_value()),
..Default::default()
},
- ))
+ );
+
+ ok_buf(response_buf)
}
fn serialize_response(
@@ -427,21 +440,20 @@ fn serialize_response(
}
#[inline]
-pub fn ok_future(buf: Buf) -> Box<OpWithError> {
- Box::new(futures::future::ok(buf))
+pub fn ok_future(buf: Buf) -> CliOpResult {
+ Ok(Op::Async(Box::new(futures::future::ok(buf))))
}
-// Shout out to Earl Sweatshirt.
#[inline]
-pub fn odd_future(err: DenoError) -> Box<OpWithError> {
- Box::new(futures::future::err(err))
+pub fn ok_buf(buf: Buf) -> CliOpResult {
+ Ok(Op::Sync(buf))
}
fn op_cache(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_cache().unwrap();
let extension = inner.extension().unwrap();
@@ -455,11 +467,9 @@ fn op_cache(
// cache path. In the future, checksums will not be used in the cache
// filenames and this requirement can be removed. See
// https://github.com/denoland/deno/issues/2057
- let r = state.dir.fetch_module_meta_data(module_id, ".", true, true);
- if let Err(err) = r {
- return odd_future(err);
- }
- let module_meta_data = r.unwrap();
+ let module_meta_data = state
+ .dir
+ .fetch_module_meta_data(module_id, ".", true, true)?;
let (js_cache_path, source_map_path) = state
.dir
@@ -467,21 +477,15 @@ fn op_cache(
if extension == ".map" {
debug!("cache {:?}", source_map_path);
- let r = fs::write(source_map_path, contents);
- if let Err(err) = r {
- return odd_future(err.into());
- }
+ fs::write(source_map_path, contents).map_err(DenoError::from)?;
} else if extension == ".js" {
debug!("cache {:?}", js_cache_path);
- let r = fs::write(js_cache_path, contents);
- if let Err(err) = r {
- return odd_future(err.into());
- }
+ fs::write(js_cache_path, contents).map_err(DenoError::from)?;
} else {
unreachable!();
}
- ok_future(empty_buf())
+ ok_buf(empty_buf())
}
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
@@ -489,7 +493,10 @@ fn op_fetch_module_meta_data(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
+ if !base.sync() {
+ return Err(errors::no_async_support());
+ }
assert!(data.is_none());
let inner = base.inner_as_fetch_module_meta_data().unwrap();
let cmd_id = base.cmd_id();
@@ -510,7 +517,7 @@ fn op_fetch_module_meta_data(
Some(module_specifier) => module_specifier.to_string(),
None => specifier.to_string(),
},
- Err(err) => return odd_future(DenoError::from(err)),
+ Err(err) => return Err(DenoError::from(err)),
},
None => specifier.to_string(),
};
@@ -543,50 +550,48 @@ fn op_fetch_module_meta_data(
))
});
- // Unfortunately TypeScript's CompilerHost interface does not leave room for
- // asynchronous source code fetching. This complicates things greatly and
- // requires us to use tokio_util::block_on() below.
- assert!(base.sync());
-
// WARNING: Here we use tokio_util::block_on() which starts a new Tokio
// runtime for executing the future. This is so we don't inadvernently run
// out of threads in the main runtime.
- Box::new(futures::future::result(tokio_util::block_on(fut)))
+ let result_buf = tokio_util::block_on(fut)?;
+ Ok(Op::Sync(result_buf))
}
fn op_chdir(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_chdir().unwrap();
let directory = inner.directory().unwrap();
- Box::new(futures::future::result(|| -> OpResult {
- std::env::set_current_dir(&directory)?;
- Ok(empty_buf())
- }()))
+ std::env::set_current_dir(&directory)?;
+ ok_buf(empty_buf())
}
fn op_global_timer_stop(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
- assert!(base.sync());
+) -> CliOpResult {
+ if !base.sync() {
+ return Err(errors::no_async_support());
+ }
assert!(data.is_none());
let state = state;
let mut t = state.global_timer.lock().unwrap();
t.cancel();
- ok_future(empty_buf())
+ Ok(Op::Sync(empty_buf()))
}
fn op_global_timer(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
- assert!(!base.sync());
+) -> CliOpResult {
+ if base.sync() {
+ return Err(errors::no_sync_support());
+ }
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_global_timer().unwrap();
@@ -598,7 +603,7 @@ fn op_global_timer(
let deadline = Instant::now() + Duration::from_millis(val as u64);
let f = t.new_timeout(deadline);
- Box::new(f.then(move |_| {
+ Ok(Op::Async(Box::new(f.then(move |_| {
let builder = &mut FlatBufferBuilder::new();
let inner =
msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {});
@@ -611,36 +616,32 @@ fn op_global_timer(
..Default::default()
},
))
- }))
+ }))))
}
fn op_set_env(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_set_env().unwrap();
let key = inner.key().unwrap();
let value = inner.value().unwrap();
- if let Err(e) = state.check_env() {
- return odd_future(e);
- }
+ state.check_env()?;
std::env::set_var(key, value);
- ok_future(empty_buf())
+ ok_buf(empty_buf())
}
fn op_env(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
- if let Err(e) = state.check_env() {
- return odd_future(e);
- }
+ state.check_env()?;
let builder = &mut FlatBufferBuilder::new();
let vars: Vec<_> = std::env::vars()
@@ -651,7 +652,7 @@ fn op_env(
builder,
&msg::EnvironResArgs { map: Some(tables) },
);
- ok_future(serialize_response(
+ let response_buf = serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -659,14 +660,15 @@ fn op_env(
inner_type: msg::Any::EnvironRes,
..Default::default()
},
- ))
+ );
+ ok_buf(response_buf)
}
fn op_permissions(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
@@ -681,7 +683,7 @@ fn op_permissions(
hrtime: state.permissions.allows_hrtime(),
},
);
- ok_future(serialize_response(
+ let response_buf = serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -689,18 +691,19 @@ fn op_permissions(
inner_type: msg::Any::PermissionsRes,
..Default::default()
},
- ))
+ );
+ ok_buf(response_buf)
}
fn op_revoke_permission(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_permission_revoke().unwrap();
let permission = inner.permission().unwrap();
- let result = match permission {
+ match permission {
"run" => state.permissions.revoke_run(),
"read" => state.permissions.revoke_read(),
"write" => state.permissions.revoke_write(),
@@ -708,18 +711,15 @@ fn op_revoke_permission(
"env" => state.permissions.revoke_env(),
"hrtime" => state.permissions.revoke_hrtime(),
_ => Ok(()),
- };
- if let Err(e) = result {
- return odd_future(e);
- }
- ok_future(empty_buf())
+ }?;
+ ok_buf(empty_buf())
}
fn op_fetch(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let inner = base.inner_as_fetch().unwrap();
let cmd_id = base.cmd_id();
@@ -732,19 +732,10 @@ fn op_fetch(
Some(buf) => hyper::Body::from(Vec::from(&*buf)),
};
- let maybe_req = msg_util::deserialize_request(header, body);
- if let Err(e) = maybe_req {
- return odd_future(e);
- }
- let req = maybe_req.unwrap();
+ let req = msg_util::deserialize_request(header, body)?;
- let url_ = match url::Url::parse(url) {
- Err(err) => return odd_future(DenoError::from(err)),
- Ok(v) => v,
- };
- if let Err(e) = state.check_net_url(url_) {
- return odd_future(e);
- }
+ let url_ = url::Url::parse(url).map_err(DenoError::from)?;
+ state.check_net_url(url_)?;
let client = http_util::get_client();
@@ -776,7 +767,12 @@ fn op_fetch(
},
))
});
- Box::new(future)
+ if base.sync() {
+ let result_buf = future.wait()?;
+ Ok(Op::Sync(result_buf))
+ } else {
+ Ok(Op::Async(Box::new(future)))
+ }
}
// This is just type conversion. Implement From trait?
@@ -794,14 +790,17 @@ where
}
}
-fn blocking<F>(is_sync: bool, f: F) -> Box<OpWithError>
+fn blocking<F>(is_sync: bool, f: F) -> CliOpResult
where
F: 'static + Send + FnOnce() -> DenoResult<Buf>,
{
if is_sync {
- Box::new(futures::future::result(f()))
+ let result_buf = f()?;
+ Ok(Op::Sync(result_buf))
} else {
- Box::new(tokio_util::poll_fn(move || convert_blocking(f)))
+ Ok(Op::Async(Box::new(tokio_util::poll_fn(move || {
+ convert_blocking(f)
+ }))))
}
}
@@ -809,22 +808,20 @@ fn op_make_temp_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let base = Box::new(*base);
let inner = base.inner_as_make_temp_dir().unwrap();
let cmd_id = base.cmd_id();
// FIXME
- if let Err(e) = state.check_write("make_temp") {
- return odd_future(e);
- }
+ state.check_write("make_temp")?;
let dir = inner.dir().map(PathBuf::from);
let prefix = inner.prefix().map(String::from);
let suffix = inner.suffix().map(String::from);
- blocking(base.sync(), move || -> OpResult {
+ blocking(base.sync(), move || {
// TODO(piscisaureus): use byte vector for paths, not a string.
// See https://github.com/denoland/deno/issues/627.
// We can't assume that paths are always valid utf8 strings.
@@ -858,19 +855,14 @@ fn op_mkdir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_mkdir().unwrap();
- let (path, path_) = match resolve_path(inner.path().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (path, path_) = resolve_path(inner.path().unwrap())?;
let recursive = inner.recursive();
let mode = inner.mode();
- if let Err(e) = state.check_write(&path_) {
- return odd_future(e);
- }
+ state.check_write(&path_)?;
blocking(base.sync(), move || {
debug!("op_mkdir {}", path_);
@@ -883,18 +875,13 @@ fn op_chmod(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_chmod().unwrap();
let _mode = inner.mode();
- let (path, path_) = match resolve_path(inner.path().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (path, path_) = resolve_path(inner.path().unwrap())?;
- if let Err(e) = state.check_write(&path_) {
- return odd_future(e);
- }
+ state.check_write(&path_)?;
blocking(base.sync(), move || {
debug!("op_chmod {}", &path_);
@@ -914,16 +901,14 @@ fn op_chown(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_chown().unwrap();
let path = String::from(inner.path().unwrap());
let uid = inner.uid();
let gid = inner.gid();
- if let Err(e) = state.check_write(&path) {
- return odd_future(e);
- }
+ state.check_write(&path)?;
blocking(base.sync(), move || {
debug!("op_chown {}", &path);
@@ -938,14 +923,11 @@ fn op_open(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_open().unwrap();
- let (filename, filename_) = match resolve_path(inner.filename().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (filename, filename_) = resolve_path(inner.filename().unwrap())?;
let mode = inner.mode().unwrap();
let mut open_options = tokio::fs::OpenOptions::new();
@@ -986,29 +968,21 @@ fn op_open(
match mode {
"r" => {
- if let Err(e) = state.check_read(&filename_) {
- return odd_future(e);
- }
+ state.check_read(&filename_)?;
}
"w" | "a" | "x" => {
- if let Err(e) = state.check_write(&filename_) {
- return odd_future(e);
- }
+ state.check_write(&filename_)?;
}
&_ => {
- if let Err(e) = state.check_read(&filename_) {
- return odd_future(e);
- }
- if let Err(e) = state.check_write(&filename_) {
- return odd_future(e);
- }
+ state.check_read(&filename_)?;
+ state.check_write(&filename_)?;
}
}
let op = open_options
.open(filename)
.map_err(DenoError::from)
- .and_then(move |fs_file| -> OpResult {
+ .and_then(move |fs_file| {
let resource = resources::add_fs_file(fs_file);
let builder = &mut FlatBufferBuilder::new();
let inner =
@@ -1023,22 +997,27 @@ fn op_open(
},
))
});
- Box::new(op)
+ if base.sync() {
+ let buf = op.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(op)))
+ }
}
fn op_close(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_close().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => odd_future(errors::bad_resource()),
+ None => Err(errors::bad_resource()),
Some(resource) => {
resource.close();
- ok_future(empty_buf())
+ ok_buf(empty_buf())
}
}
}
@@ -1047,28 +1026,26 @@ fn op_kill(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_kill().unwrap();
let pid = inner.pid();
let signo = inner.signo();
- match kill(pid, signo) {
- Ok(_) => ok_future(empty_buf()),
- Err(e) => odd_future(e),
- }
+ kill(pid, signo)?;
+ ok_buf(empty_buf())
}
fn op_shutdown(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let how = inner.how();
match resources::lookup(rid) {
- None => odd_future(errors::bad_resource()),
+ None => Err(errors::bad_resource()),
Some(mut resource) => {
let shutdown_mode = match how {
0 => Shutdown::Read,
@@ -1088,13 +1065,13 @@ fn op_read(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_read().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => odd_future(errors::bad_resource()),
+ None => Err(errors::bad_resource()),
Some(resource) => {
let op = tokio::io::read(resource, data.unwrap())
.map_err(DenoError::from)
@@ -1117,7 +1094,12 @@ fn op_read(
},
))
});
- Box::new(op)
+ if base.sync() {
+ let buf = op.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(op)))
+ }
}
}
}
@@ -1126,13 +1108,13 @@ fn op_write(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_write().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => odd_future(errors::bad_resource()),
+ None => Err(errors::bad_resource()),
Some(resource) => {
let op = tokio_write::write(resource, data.unwrap())
.map_err(DenoError::from)
@@ -1154,7 +1136,12 @@ fn op_write(
},
))
});
- Box::new(op)
+ if base.sync() {
+ let buf = op.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(op)))
+ }
}
}
}
@@ -1163,20 +1150,24 @@ fn op_seek(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
- let _cmd_id = base.cmd_id();
let inner = base.inner_as_seek().unwrap();
let rid = inner.rid();
let offset = inner.offset();
let whence = inner.whence();
match resources::lookup(rid) {
- None => odd_future(errors::bad_resource()),
+ None => Err(errors::bad_resource()),
Some(resource) => {
let op = resources::seek(resource, offset, whence)
.and_then(move |_| Ok(empty_buf()));
- Box::new(op)
+ if base.sync() {
+ let buf = op.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(op)))
+ }
}
}
}
@@ -1185,18 +1176,13 @@ fn op_remove(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_remove().unwrap();
- let (path, path_) = match resolve_path(inner.path().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (path, path_) = resolve_path(inner.path().unwrap())?;
let recursive = inner.recursive();
- if let Err(e) = state.check_write(&path_) {
- return odd_future(e);
- }
+ state.check_write(&path_)?;
blocking(base.sync(), move || {
debug!("op_remove {}", path.display());
@@ -1216,24 +1202,14 @@ fn op_copy_file(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_copy_file().unwrap();
- let (from, from_) = match resolve_path(inner.from().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
- let (to, to_) = match resolve_path(inner.to().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (from, from_) = resolve_path(inner.from().unwrap())?;
+ let (to, to_) = resolve_path(inner.to().unwrap())?;
- if let Err(e) = state.check_read(&from_) {
- return odd_future(e);
- }
- if let Err(e) = state.check_write(&to_) {
- return odd_future(e);
- }
+ state.check_read(&from_)?;
+ state.check_write(&to_)?;
debug!("op_copy_file {} {}", from.display(), to.display());
blocking(base.sync(), move || {
@@ -1276,45 +1252,38 @@ fn op_cwd(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
- Box::new(futures::future::result(|| -> OpResult {
- let path = std::env::current_dir()?;
- let builder = &mut FlatBufferBuilder::new();
- let cwd =
- builder.create_string(&path.into_os_string().into_string().unwrap());
- let inner =
- msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) });
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::CwdRes,
- ..Default::default()
- },
- ))
- }()))
+ let path = std::env::current_dir()?;
+ let builder = &mut FlatBufferBuilder::new();
+ let cwd =
+ builder.create_string(&path.into_os_string().into_string().unwrap());
+ let inner = msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) });
+ let response_buf = serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::CwdRes,
+ ..Default::default()
+ },
+ );
+ ok_buf(response_buf)
}
fn op_stat(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_stat().unwrap();
let cmd_id = base.cmd_id();
- let (filename, filename_) = match resolve_path(inner.filename().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (filename, filename_) = resolve_path(inner.filename().unwrap())?;
let lstat = inner.lstat();
- if let Err(e) = state.check_read(&filename_) {
- return odd_future(e);
- }
+ state.check_read(&filename_)?;
blocking(base.sync(), move || {
let builder = &mut FlatBufferBuilder::new();
@@ -1356,20 +1325,15 @@ fn op_read_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_read_dir().unwrap();
let cmd_id = base.cmd_id();
- let (path, path_) = match resolve_path(inner.path().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (path, path_) = resolve_path(inner.path().unwrap())?;
- if let Err(e) = state.check_read(&path_) {
- return odd_future(e);
- }
+ state.check_read(&path_)?;
- blocking(base.sync(), move || -> OpResult {
+ blocking(base.sync(), move || {
debug!("op_read_dir {}", path.display());
let builder = &mut FlatBufferBuilder::new();
let entries: Vec<_> = fs::read_dir(path)?
@@ -1418,22 +1382,15 @@ fn op_rename(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_rename().unwrap();
- let (oldpath, _) = match resolve_path(inner.oldpath().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
- let (newpath, newpath_) = match resolve_path(inner.newpath().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (oldpath, _) = resolve_path(inner.oldpath().unwrap())?;
+ let (newpath, newpath_) = resolve_path(inner.newpath().unwrap())?;
- if let Err(e) = state.check_write(&newpath_) {
- return odd_future(e);
- }
- blocking(base.sync(), move || -> OpResult {
+ state.check_write(&newpath_)?;
+
+ blocking(base.sync(), move || {
debug!("op_rename {} {}", oldpath.display(), newpath.display());
fs::rename(&oldpath, &newpath)?;
Ok(empty_buf())
@@ -1444,23 +1401,15 @@ fn op_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_link().unwrap();
- let (oldname, _) = match resolve_path(inner.oldname().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
- let (newname, newname_) = match resolve_path(inner.newname().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (oldname, _) = resolve_path(inner.oldname().unwrap())?;
+ let (newname, newname_) = resolve_path(inner.newname().unwrap())?;
- if let Err(e) = state.check_write(&newname_) {
- return odd_future(e);
- }
+ state.check_write(&newname_)?;
- blocking(base.sync(), move || -> OpResult {
+ blocking(base.sync(), move || {
debug!("op_link {} {}", oldname.display(), newname.display());
std::fs::hard_link(&oldname, &newname)?;
Ok(empty_buf())
@@ -1471,29 +1420,18 @@ fn op_symlink(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_symlink().unwrap();
- let (oldname, _) = match resolve_path(inner.oldname().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
- let (newname, newname_) = match resolve_path(inner.newname().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (oldname, _) = resolve_path(inner.oldname().unwrap())?;
+ let (newname, newname_) = resolve_path(inner.newname().unwrap())?;
- if let Err(e) = state.check_write(&newname_) {
- return odd_future(e);
- }
+ state.check_write(&newname_)?;
// TODO Use type for Windows.
if cfg!(windows) {
- return odd_future(errors::new(
- ErrorKind::Other,
- "Not implemented".to_string(),
- ));
+ return Err(errors::new(ErrorKind::Other, "Not implemented".to_string()));
}
- blocking(base.sync(), move || -> OpResult {
+ blocking(base.sync(), move || {
debug!("op_symlink {} {}", oldname.display(), newname.display());
#[cfg(any(unix))]
std::os::unix::fs::symlink(&oldname, &newname)?;
@@ -1505,20 +1443,15 @@ fn op_read_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_readlink().unwrap();
let cmd_id = base.cmd_id();
- let (name, name_) = match resolve_path(inner.name().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (name, name_) = resolve_path(inner.name().unwrap())?;
- if let Err(e) = state.check_read(&name_) {
- return odd_future(e);
- }
+ state.check_read(&name_)?;
- blocking(base.sync(), move || -> OpResult {
+ blocking(base.sync(), move || {
debug!("op_read_link {}", name.display());
let path = fs::read_link(&name)?;
let builder = &mut FlatBufferBuilder::new();
@@ -1545,7 +1478,7 @@ fn op_repl_start(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_repl_start().unwrap();
let cmd_id = base.cmd_id();
@@ -1561,7 +1494,7 @@ fn op_repl_start(
builder,
&msg::ReplStartResArgs { rid: resource.rid },
);
- ok_future(serialize_response(
+ ok_buf(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -1576,7 +1509,7 @@ fn op_repl_readline(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_repl_readline().unwrap();
let cmd_id = base.cmd_id();
@@ -1584,7 +1517,7 @@ fn op_repl_readline(
let prompt = inner.prompt().unwrap().to_owned();
debug!("op_repl_readline {} {}", rid, prompt);
- blocking(base.sync(), move || -> OpResult {
+ blocking(base.sync(), move || {
let repl = resources::get_repl(rid)?;
let line = repl.lock().unwrap().readline(&prompt)?;
@@ -1612,19 +1545,14 @@ fn op_truncate(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_truncate().unwrap();
- let (filename, filename_) = match resolve_path(inner.name().unwrap()) {
- Err(err) => return odd_future(err),
- Ok(v) => v,
- };
+ let (filename, filename_) = resolve_path(inner.name().unwrap())?;
let len = inner.len();
- if let Err(e) = state.check_write(&filename_) {
- return odd_future(e);
- }
+ state.check_write(&filename_)?;
blocking(base.sync(), move || {
debug!("op_truncate {} {}", filename_, len);
@@ -1638,7 +1566,7 @@ fn op_utime(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let inner = base.inner_as_utime().unwrap();
@@ -1646,9 +1574,7 @@ fn op_utime(
let atime = inner.atime();
let mtime = inner.mtime();
- if let Err(e) = state.check_write(&filename) {
- return odd_future(e);
- }
+ state.check_write(&filename)?;
blocking(base.sync(), move || {
debug!("op_utimes {} {} {}", filename, atime, mtime);
@@ -1661,7 +1587,7 @@ fn op_listen(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_listen().unwrap();
@@ -1669,33 +1595,28 @@ fn op_listen(
assert_eq!(network, "tcp");
let address = inner.address().unwrap();
- if let Err(e) = state.check_net(&address) {
- return odd_future(e);
- }
+ state.check_net(&address)?;
- Box::new(futures::future::result((move || {
- let addr = resolve_addr(address).wait()?;
- let listener = TcpListener::bind(&addr)?;
- let resource = resources::add_tcp_listener(listener);
+ let addr = resolve_addr(address).wait()?;
+ let listener = TcpListener::bind(&addr)?;
+ let resource = resources::add_tcp_listener(listener);
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::ListenRes::create(
- builder,
- &msg::ListenResArgs { rid: resource.rid },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ListenRes,
- ..Default::default()
- },
- ))
- })()))
+ let builder = &mut FlatBufferBuilder::new();
+ let inner =
+ msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid });
+ let response_buf = serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::ListenRes,
+ ..Default::default()
+ },
+ );
+ ok_buf(response_buf)
}
-fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
+fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> DenoResult<Buf> {
let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
// TODO forward socket_addr to client.
@@ -1722,21 +1643,26 @@ fn op_accept(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_accept().unwrap();
let server_rid = inner.rid();
match resources::lookup(server_rid) {
- None => odd_future(errors::bad_resource()),
+ None => Err(errors::bad_resource()),
Some(server_resource) => {
let op = tokio_util::accept(server_resource)
.map_err(DenoError::from)
.and_then(move |(tcp_stream, _socket_addr)| {
new_conn(cmd_id, tcp_stream)
});
- Box::new(op)
+ if base.sync() {
+ let buf = op.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(op)))
+ }
}
}
}
@@ -1745,7 +1671,7 @@ fn op_dial(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_dial().unwrap();
@@ -1753,9 +1679,7 @@ fn op_dial(
assert_eq!(network, "tcp"); // TODO Support others.
let address = inner.address().unwrap();
- if let Err(e) = state.check_net(&address) {
- return odd_future(e);
- }
+ state.check_net(&address)?;
let op =
resolve_addr(address)
@@ -1765,14 +1689,19 @@ fn op_dial(
.map_err(DenoError::from)
.and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream))
});
- Box::new(op)
+ if base.sync() {
+ let buf = op.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(op)))
+ }
}
fn op_metrics(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
@@ -1781,7 +1710,7 @@ fn op_metrics(
builder,
&msg::MetricsResArgs::from(&state.metrics),
);
- ok_future(serialize_response(
+ ok_buf(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -1796,7 +1725,7 @@ fn op_resources(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
@@ -1825,7 +1754,7 @@ fn op_resources(
},
);
- ok_future(serialize_response(
+ ok_buf(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -1848,13 +1777,13 @@ fn op_run(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
- assert!(base.sync());
+) -> CliOpResult {
+ if !base.sync() {
+ return Err(errors::no_async_support());
+ }
let cmd_id = base.cmd_id();
- if let Err(e) = state.check_run() {
- return odd_future(e);
- }
+ state.check_run()?;
assert!(data.is_none());
let inner = base.inner_as_run().unwrap();
@@ -1878,12 +1807,7 @@ fn op_run(
c.stderr(subprocess_stdio_map(inner.stderr()));
// Spawn the command.
- let child = match c.spawn_async() {
- Ok(v) => v,
- Err(err) => {
- return odd_future(err.into());
- }
- };
+ let child = c.spawn_async().map_err(DenoError::from)?;
let pid = child.id();
let resources = resources::add_child(child);
@@ -1906,7 +1830,7 @@ fn op_run(
let builder = &mut FlatBufferBuilder::new();
let inner = msg::RunRes::create(builder, &res_args);
- ok_future(serialize_response(
+ Ok(Op::Sync(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
@@ -1914,29 +1838,22 @@ fn op_run(
inner_type: msg::Any::RunRes,
..Default::default()
},
- ))
+ )))
}
fn op_run_status(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_run_status().unwrap();
let rid = inner.rid();
- if let Err(e) = state.check_run() {
- return odd_future(e);
- }
+ state.check_run()?;
- let future = match resources::child_status(rid) {
- Err(e) => {
- return odd_future(e);
- }
- Ok(f) => f,
- };
+ let future = resources::child_status(rid)?;
let future = future.and_then(move |run_status| {
let code = run_status.code();
@@ -1970,7 +1887,12 @@ fn op_run_status(
},
))
});
- Box::new(future)
+ if base.sync() {
+ let buf = future.wait()?;
+ Ok(Op::Sync(buf))
+ } else {
+ Ok(Op::Async(Box::new(future)))
+ }
}
struct GetMessageFuture {
@@ -1994,7 +1916,10 @@ fn op_worker_get_message(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
+ if base.sync() {
+ return Err(errors::no_sync_support());
+ }
assert!(data.is_none());
let cmd_id = base.cmd_id();
@@ -2021,7 +1946,7 @@ fn op_worker_get_message(
},
))
});
- Box::new(op)
+ Ok(Op::Async(Box::new(op)))
}
/// Post message to host as guest worker
@@ -2029,29 +1954,26 @@ fn op_worker_post_message(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let cmd_id = base.cmd_id();
-
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
let wc = state.worker_channels.lock().unwrap();
wc.0.clone()
};
- let op = tx.send(d);
- let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
- let op = op.and_then(move |_| -> DenoResult<Buf> {
- let builder = &mut FlatBufferBuilder::new();
+ tx.send(d)
+ .wait()
+ .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?;
+ let builder = &mut FlatBufferBuilder::new();
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
- });
- Box::new(op)
+ ok_buf(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
}
/// Create worker as the host
@@ -2059,7 +1981,7 @@ fn op_create_worker(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_create_worker().unwrap();
@@ -2081,39 +2003,34 @@ fn op_create_worker(
js_check(worker.execute("denoMain()"));
js_check(worker.execute("workerMain()"));
- let op = ModuleSpecifier::resolve_root(specifier)
- .and_then(|module_specifier| {
- Ok(
- worker
- .execute_mod_async(&module_specifier, false)
- .and_then(move |()| {
- let mut workers_tl = parent_state.workers.lock().unwrap();
- workers_tl.insert(rid, worker.shared());
- let builder = &mut FlatBufferBuilder::new();
- let msg_inner = msg::CreateWorkerRes::create(
- builder,
- &msg::CreateWorkerResArgs { rid },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::CreateWorkerRes,
- ..Default::default()
- },
- ))
- }).map_err(|err| match err {
- errors::RustOrJsError::Js(_) => errors::worker_init_failed(),
- errors::RustOrJsError::Rust(err) => err,
- }),
- )
- }).map_err(DenoError::from);
+ let module_specifier = ModuleSpecifier::resolve_root(specifier)?;
- Box::new(match op {
- Ok(op) => future::Either::A(op),
- Err(err) => future::Either::B(future::result(Err(err))),
- })
+ let op = worker
+ .execute_mod_async(&module_specifier, false)
+ .and_then(move |()| {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner = msg::CreateWorkerRes::create(
+ builder,
+ &msg::CreateWorkerResArgs { rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ ))
+ }).map_err(|err| match err {
+ errors::RustOrJsError::Js(_) => errors::worker_init_failed(),
+ errors::RustOrJsError::Rust(err) => err,
+ });
+
+ let result = op.wait()?;
+ Ok(Op::Sync(result))
}
/// Return when the worker closes
@@ -2121,7 +2038,10 @@ fn op_host_get_worker_closed(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
+ if base.sync() {
+ return Err(errors::no_sync_support());
+ }
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_worker_closed().unwrap();
@@ -2134,7 +2054,7 @@ fn op_host_get_worker_closed(
worker.clone()
};
- Box::new(shared_worker_future.then(move |_result| {
+ let op = Box::new(shared_worker_future.then(move |_result| {
let builder = &mut FlatBufferBuilder::new();
Ok(serialize_response(
@@ -2144,7 +2064,8 @@ fn op_host_get_worker_closed(
..Default::default()
},
))
- }))
+ }));
+ Ok(Op::Async(Box::new(op)))
}
/// Get message from guest worker as host
@@ -2152,7 +2073,10 @@ fn op_host_get_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
+ if base.sync() {
+ return Err(errors::no_sync_support());
+ }
assert!(data.is_none());
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_message().unwrap();
@@ -2178,7 +2102,7 @@ fn op_host_get_message(
},
))
});
- Box::new(op)
+ Ok(Op::Async(Box::new(op)))
}
/// Post message to guest worker as host
@@ -2186,34 +2110,32 @@ fn op_host_post_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_post_message().unwrap();
let rid = inner.rid();
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let op = resources::post_message_to_worker(rid, d);
- let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
- let op = op.and_then(move |_| -> DenoResult<Buf> {
- let builder = &mut FlatBufferBuilder::new();
+ resources::post_message_to_worker(rid, d)
+ .wait()
+ .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?;
+ let builder = &mut FlatBufferBuilder::new();
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
- });
- Box::new(op)
+ ok_buf(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
}
fn op_get_random_values(
state: &ThreadSafeState,
_base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Box<OpWithError> {
+) -> CliOpResult {
if let Some(ref seeded_rng) = state.seeded_rng {
let mut rng = seeded_rng.lock().unwrap();
rng.fill(&mut data.unwrap()[..]);
@@ -2222,5 +2144,5 @@ fn op_get_random_values(
rng.fill(&mut data.unwrap()[..]);
}
- Box::new(ok_future(empty_buf()))
+ ok_buf(empty_buf())
}