summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-06-14 13:58:20 -0400
committerRyan Dahl <ry@tinyclouds.org>2019-06-14 13:56:36 -0700
commit1361e302234b17ab8079107b134dfd0ddf288439 (patch)
treea38ce903ddbd89ee2dfd437bd8f984cb1d85695b
parent3dff147d0ca1a2cd8d264d20a178d71cb38b1c4e (diff)
Revert "Refactor dispatch handling (#2452)"
Due to performance regression: https://github.com/denoland/deno/commit/dc60fe9f300043f191286ef804a365e16e455f87#commitcomment-33943711 This reverts commit dc60fe9f300043f191286ef804a365e16e455f87.
-rw-r--r--cli/dispatch_minimal.rs45
-rw-r--r--cli/errors.rs14
-rw-r--r--cli/msg.fbs3
-rw-r--r--cli/ops.rs951
-rw-r--r--cli/state.rs8
-rw-r--r--core/core.d.ts6
-rw-r--r--core/examples/http_bench.js16
-rw-r--r--core/examples/http_bench.rs16
-rw-r--r--core/isolate.rs118
-rw-r--r--core/libdeno.rs1
-rw-r--r--core/libdeno/api.cc8
-rw-r--r--core/libdeno/deno.h8
-rw-r--r--core/libdeno/libdeno.d.ts7
-rw-r--r--core/libdeno/libdeno_test.cc2
-rw-r--r--core/shared_queue.js17
-rw-r--r--core/shared_queue.rs45
-rw-r--r--js/dispatch.ts81
-rw-r--r--js/dispatch_minimal.ts30
18 files changed, 709 insertions, 667 deletions
diff --git a/cli/dispatch_minimal.rs b/cli/dispatch_minimal.rs
index 643fdaed9..0e20d3b1a 100644
--- a/cli/dispatch_minimal.rs
+++ b/cli/dispatch_minimal.rs
@@ -6,7 +6,6 @@
//! message or a "minimal" message.
use crate::state::ThreadSafeState;
use deno::Buf;
-use deno::CoreOp;
use deno::Op;
use deno::PinnedBuf;
use futures::Future;
@@ -18,6 +17,7 @@ const OP_WRITE: i32 = 2;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
pub struct Record {
+ pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
@@ -25,9 +25,15 @@ pub struct Record {
impl Into<Buf> for Record {
fn into(self) -> Buf {
- let vec = vec![DISPATCH_MINIMAL_TOKEN, self.op_id, self.arg, self.result];
+ let vec = vec![
+ DISPATCH_MINIMAL_TOKEN,
+ self.promise_id,
+ self.op_id,
+ self.arg,
+ self.result,
+ ];
let buf32 = vec.into_boxed_slice();
- let ptr = Box::into_raw(buf32) as *mut [u8; 4 * 4];
+ let ptr = Box::into_raw(buf32) as *mut [u8; 5 * 4];
unsafe { Box::from_raw(ptr) }
}
}
@@ -39,32 +45,36 @@ pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
let p = bytes.as_ptr();
#[allow(clippy::cast_ptr_alignment)]
let p32 = p as *const i32;
- let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 3) };
+ let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };
- if s.len() < 4 {
+ if s.len() < 5 {
return None;
}
let ptr = s.as_ptr();
- let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
+ let ints = unsafe { std::slice::from_raw_parts(ptr, 5) };
if ints[0] != DISPATCH_MINIMAL_TOKEN {
return None;
}
Some(Record {
- op_id: ints[1],
- arg: ints[2],
- result: ints[3],
+ promise_id: ints[1],
+ op_id: ints[2],
+ arg: ints[3],
+ result: ints[4],
})
}
#[test]
fn test_parse_min_record() {
- let buf = vec![0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0];
+ let buf = vec![
+ 0xFE, 0xCA, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0,
+ ];
assert_eq!(
parse_min_record(&buf),
Some(Record {
- op_id: 1,
- arg: 2,
- result: 3,
+ promise_id: 1,
+ op_id: 2,
+ arg: 3,
+ result: 4,
})
);
@@ -79,7 +89,8 @@ pub fn dispatch_minimal(
state: &ThreadSafeState,
mut record: Record,
zero_copy: Option<PinnedBuf>,
-) -> CoreOp {
+) -> Op {
+ let is_sync = record.promise_id == 0;
let min_op = match record.op_id {
OP_READ => ops::read(record.arg, zero_copy),
OP_WRITE => ops::write(record.arg, zero_copy),
@@ -104,7 +115,11 @@ pub fn dispatch_minimal(
state.metrics_op_completed(buf.len());
Ok(buf)
}));
- Op::Async(fut)
+ if is_sync {
+ Op::Sync(fut.wait().unwrap())
+ } else {
+ Op::Async(fut)
+ }
}
mod ops {
diff --git a/cli/errors.rs b/cli/errors.rs
index 67eb54ea7..eb0fc7d27 100644
--- a/cli/errors.rs
+++ b/cli/errors.rs
@@ -243,20 +243,6 @@ pub fn no_buffer_specified() -> DenoError {
new(ErrorKind::InvalidInput, String::from("no buffer specified"))
}
-pub fn no_async_support() -> DenoError {
- new(
- ErrorKind::NoAsyncSupport,
- String::from("op doesn't support async calls"),
- )
-}
-
-pub fn no_sync_support() -> DenoError {
- new(
- ErrorKind::NoSyncSupport,
- String::from("op doesn't support sync calls"),
- )
-}
-
#[derive(Debug)]
pub enum RustOrJsError {
Rust(DenoError),
diff --git a/cli/msg.fbs b/cli/msg.fbs
index e034aa687..56410097c 100644
--- a/cli/msg.fbs
+++ b/cli/msg.fbs
@@ -136,8 +136,6 @@ enum ErrorKind: byte {
OpNotAvaiable,
WorkerInitFailed,
UnixError,
- NoAsyncSupport,
- NoSyncSupport,
ImportMapError,
}
@@ -155,6 +153,7 @@ enum MediaType: byte {
}
table Base {
+ cmd_id: uint32;
sync: bool = false;
error_kind: ErrorKind = NoError;
error: string;
diff --git a/cli/ops.rs b/cli/ops.rs
index b14b6b1c6..e8fa47aad 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -27,14 +27,13 @@ use crate::version;
use crate::worker::Worker;
use deno::js_check;
use deno::Buf;
-use deno::CoreOp;
use deno::JSError;
use deno::ModuleSpecifier;
use deno::Op;
-use deno::OpResult;
use deno::PinnedBuf;
use flatbuffers::FlatBufferBuilder;
use futures;
+use futures::future;
use futures::Async;
use futures::Poll;
use futures::Sink;
@@ -62,13 +61,17 @@ use std::os::unix::fs::PermissionsExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
-type CliOpResult = OpResult<DenoError>;
+type OpResult = DenoResult<Buf>;
-type CliDispatchFn =
+pub type OpWithError = dyn Future<Item = Buf, Error = DenoError> + Send;
+
+// TODO Ideally we wouldn't have to box the OpWithError being returned.
+// The box is just to make it easier to get a prototype refactor working.
+type OpCreator =
fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option<PinnedBuf>)
- -> CliOpResult;
+ -> Box<OpWithError>;
-pub type OpSelector = fn(inner_type: msg::Any) -> Option<CliDispatchFn>;
+pub type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>;
#[inline]
fn empty_buf() -> Buf {
@@ -80,7 +83,7 @@ pub fn dispatch_all(
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
-) -> CoreOp {
+) -> Op {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
let op = if let Some(min_record) = parse_min_record(control) {
@@ -101,90 +104,81 @@ pub fn dispatch_all_legacy(
control: &[u8],
zero_copy: Option<PinnedBuf>,
op_selector: OpSelector,
-) -> CoreOp {
+) -> Op {
let base = msg::get_root_as_base(&control);
- let inner_type = base.inner_type();
let is_sync = base.sync();
+ let inner_type = base.inner_type();
+ let cmd_id = base.cmd_id();
- debug!(
- "msg_from_js {} sync {}",
- msg::enum_name_any(inner_type),
- is_sync
- );
-
- let op_func: CliDispatchFn = match op_selector(inner_type) {
+ let op_func: OpCreator = match op_selector(inner_type) {
Some(v) => v,
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};
- let op_result = op_func(state, &base, zero_copy);
+ let op: Box<OpWithError> = op_func(state, &base, zero_copy);
let state = state.clone();
- match op_result {
- Ok(Op::Sync(buf)) => {
- state.metrics_op_completed(buf.len());
- Op::Sync(buf)
- }
- Ok(Op::Async(fut)) => {
- let result_fut = Box::new(
- fut.or_else(move |err: DenoError| -> Result<Buf, ()> {
- debug!("op err {}", err);
- // No matter whether we got an Err or Ok, we want a serialized message to
- // send back. So transform the DenoError into a Buf.
- let builder = &mut FlatBufferBuilder::new();
- let errmsg_offset = builder.create_string(&format!("{}", err));
- Ok(serialize_response(
- builder,
- msg::BaseArgs {
- error: Some(errmsg_offset),
- error_kind: err.kind(),
- ..Default::default()
- },
- ))
- }).and_then(move |buf: Buf| -> Result<Buf, ()> {
- // Handle empty responses. For sync responses we just want
- // to send null. For async we want to send a small message
- // with the cmd_id.
- let buf = if buf.len() > 0 {
- buf
- } else {
- let builder = &mut FlatBufferBuilder::new();
- serialize_response(
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- )
- };
- state.metrics_op_completed(buf.len());
- Ok(buf)
- }).map_err(|err| panic!("unexpected error {:?}", err)),
- );
- Op::Async(result_fut)
- }
- Err(err) => {
+ let fut = Box::new(
+ op.or_else(move |err: DenoError| -> Result<Buf, ()> {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a Buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
- let response_buf = serialize_response(
+ Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
- );
- state.metrics_op_completed(response_buf.len());
- Op::Sync(response_buf)
- }
+ ))
+ }).and_then(move |buf: Buf| -> Result<Buf, ()> {
+ // Handle empty responses. For sync responses we just want
+ // to send null. For async we want to send a small message
+ // with the cmd_id.
+ let buf = if is_sync || buf.len() > 0 {
+ buf
+ } else {
+ let builder = &mut FlatBufferBuilder::new();
+ serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ )
+ };
+ state.metrics_op_completed(buf.len());
+ Ok(buf)
+ }).map_err(|err| panic!("unexpected error {:?}", err)),
+ );
+
+ debug!(
+ "msg_from_js {} sync {}",
+ msg::enum_name_any(inner_type),
+ base.sync()
+ );
+
+ if base.sync() {
+ // TODO(ry) This is not correct! If the sync op is not actually synchronous
+ // (like in the case of op_fetch_module_meta_data) this wait() will block
+ // a thread in the Tokio runtime. Depending on the size of the runtime's
+ // thread pool, this may result in a dead lock!
+ //
+ // The solution is that ops should return an Op directly. Op::Sync contains
+ // the result value, so if its returned directly from the OpCreator, we
+ // know it has actually be evaluated synchronously.
+ Op::Sync(fut.wait().unwrap())
+ } else {
+ Op::Async(fut)
}
}
/// Standard ops set for most isolates
-pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
+pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
msg::Any::Cache => Some(op_cache),
@@ -253,9 +247,9 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
// nanoseconds are rounded on 2ms.
fn op_now(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
@@ -276,8 +270,8 @@ fn op_now(
subsec_nanos,
},
);
-
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -289,10 +283,9 @@ fn op_now(
fn op_is_tty(
_state: &ThreadSafeState,
-
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
_data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::IsTTYRes::create(
builder,
@@ -302,7 +295,8 @@ fn op_is_tty(
stderr: atty::is(atty::Stream::Stderr),
},
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -314,19 +308,18 @@ fn op_is_tty(
fn op_exit(
_state: &ThreadSafeState,
-
base: &msg::Base<'_>,
_data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
let inner = base.inner_as_exit().unwrap();
std::process::exit(inner.code())
}
fn op_start(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let mut builder = FlatBufferBuilder::new();
@@ -375,7 +368,8 @@ fn op_start(
},
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
&mut builder,
msg::BaseArgs {
inner_type: msg::Any::StartRes,
@@ -389,7 +383,7 @@ fn op_format_error(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_format_error().unwrap();
let orig_error = String::from(inner.error().unwrap());
@@ -408,22 +402,23 @@ fn op_format_error(
},
);
- let response_buf = serialize_response(
+ ok_future(serialize_response(
+ base.cmd_id(),
&mut builder,
msg::BaseArgs {
inner_type: msg::Any::FormatErrorRes,
inner: Some(inner.as_union_value()),
..Default::default()
},
- );
-
- ok_buf(response_buf)
+ ))
}
fn serialize_response(
+ cmd_id: u32,
builder: &mut FlatBufferBuilder<'_>,
- args: msg::BaseArgs<'_>,
+ mut args: msg::BaseArgs<'_>,
) -> Buf {
+ args.cmd_id = cmd_id;
let base = msg::Base::create(builder, &args);
msg::finish_base_buffer(builder, base);
let data = builder.finished_data();
@@ -432,20 +427,21 @@ fn serialize_response(
}
#[inline]
-pub fn ok_future(buf: Buf) -> CliOpResult {
- Ok(Op::Async(Box::new(futures::future::ok(buf))))
+pub fn ok_future(buf: Buf) -> Box<OpWithError> {
+ Box::new(futures::future::ok(buf))
}
+// Shout out to Earl Sweatshirt.
#[inline]
-pub fn ok_buf(buf: Buf) -> CliOpResult {
- Ok(Op::Sync(buf))
+pub fn odd_future(err: DenoError) -> Box<OpWithError> {
+ Box::new(futures::future::err(err))
}
fn op_cache(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_cache().unwrap();
let extension = inner.extension().unwrap();
@@ -459,9 +455,11 @@ fn op_cache(
// cache path. In the future, checksums will not be used in the cache
// filenames and this requirement can be removed. See
// https://github.com/denoland/deno/issues/2057
- let module_meta_data = state
- .dir
- .fetch_module_meta_data(module_id, ".", true, true)?;
+ let r = state.dir.fetch_module_meta_data(module_id, ".", true, true);
+ if let Err(err) = r {
+ return odd_future(err);
+ }
+ let module_meta_data = r.unwrap();
let (js_cache_path, source_map_path) = state
.dir
@@ -469,15 +467,21 @@ fn op_cache(
if extension == ".map" {
debug!("cache {:?}", source_map_path);
- fs::write(source_map_path, contents).map_err(DenoError::from)?;
+ let r = fs::write(source_map_path, contents);
+ if let Err(err) = r {
+ return odd_future(err.into());
+ }
} else if extension == ".js" {
debug!("cache {:?}", js_cache_path);
- fs::write(js_cache_path, contents).map_err(DenoError::from)?;
+ let r = fs::write(js_cache_path, contents);
+ if let Err(err) = r {
+ return odd_future(err.into());
+ }
} else {
unreachable!();
}
- ok_buf(empty_buf())
+ ok_future(empty_buf())
}
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
@@ -485,13 +489,10 @@ fn op_fetch_module_meta_data(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(errors::no_async_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_fetch_module_meta_data().unwrap();
-
+ let cmd_id = base.cmd_id();
let specifier = inner.specifier().unwrap();
let referrer = inner.referrer().unwrap();
@@ -509,7 +510,7 @@ fn op_fetch_module_meta_data(
Some(module_specifier) => module_specifier.to_string(),
None => specifier.to_string(),
},
- Err(err) => return Err(DenoError::from(err)),
+ Err(err) => return odd_future(DenoError::from(err)),
},
None => specifier.to_string(),
};
@@ -532,6 +533,7 @@ fn op_fetch_module_meta_data(
};
let inner = msg::FetchModuleMetaDataRes::create(builder, &msg_args);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -541,50 +543,52 @@ fn op_fetch_module_meta_data(
))
});
+ // Unfortunately TypeScript's CompilerHost interface does not leave room for
+ // asynchronous source code fetching. This complicates things greatly and
+ // requires us to use tokio_util::block_on() below.
+ assert!(base.sync());
+
// WARNING: Here we use tokio_util::block_on() which starts a new Tokio
// runtime for executing the future. This is so we don't inadvernently run
// out of threads in the main runtime.
- let result_buf = tokio_util::block_on(fut)?;
- Ok(Op::Sync(result_buf))
+ Box::new(futures::future::result(tokio_util::block_on(fut)))
}
fn op_chdir(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_chdir().unwrap();
let directory = inner.directory().unwrap();
- std::env::set_current_dir(&directory)?;
- ok_buf(empty_buf())
+ Box::new(futures::future::result(|| -> OpResult {
+ std::env::set_current_dir(&directory)?;
+ Ok(empty_buf())
+ }()))
}
fn op_global_timer_stop(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(errors::no_async_support());
- }
+) -> Box<OpWithError> {
+ assert!(base.sync());
assert!(data.is_none());
let state = state;
let mut t = state.global_timer.lock().unwrap();
t.cancel();
- Ok(Op::Sync(empty_buf()))
+ ok_future(empty_buf())
}
fn op_global_timer(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
+ assert!(!base.sync());
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_global_timer().unwrap();
let val = inner.timeout();
assert!(val >= 0);
@@ -594,11 +598,12 @@ fn op_global_timer(
let deadline = Instant::now() + Duration::from_millis(val as u64);
let f = t.new_timeout(deadline);
- Ok(Op::Async(Box::new(f.then(move |_| {
+ Box::new(f.then(move |_| {
let builder = &mut FlatBufferBuilder::new();
let inner =
msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {});
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -606,31 +611,36 @@ fn op_global_timer(
..Default::default()
},
))
- }))))
+ }))
}
fn op_set_env(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_set_env().unwrap();
let key = inner.key().unwrap();
let value = inner.value().unwrap();
- state.check_env()?;
+ if let Err(e) = state.check_env() {
+ return odd_future(e);
+ }
std::env::set_var(key, value);
- ok_buf(empty_buf())
+ ok_future(empty_buf())
}
fn op_env(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
- state.check_env()?;
+ if let Err(e) = state.check_env() {
+ return odd_future(e);
+ }
let builder = &mut FlatBufferBuilder::new();
let vars: Vec<_> = std::env::vars()
@@ -641,24 +651,24 @@ fn op_env(
builder,
&msg::EnvironResArgs { map: Some(tables) },
);
- let response_buf = serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::EnvironRes,
..Default::default()
},
- );
- ok_buf(response_buf)
+ ))
}
fn op_permissions(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let inner = msg::PermissionsRes::create(
builder,
@@ -671,26 +681,26 @@ fn op_permissions(
hrtime: state.permissions.allows_hrtime(),
},
);
- let response_buf = serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::PermissionsRes,
..Default::default()
},
- );
- ok_buf(response_buf)
+ ))
}
fn op_revoke_permission(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_permission_revoke().unwrap();
let permission = inner.permission().unwrap();
- match permission {
+ let result = match permission {
"run" => state.permissions.revoke_run(),
"read" => state.permissions.revoke_read(),
"write" => state.permissions.revoke_write(),
@@ -698,16 +708,20 @@ fn op_revoke_permission(
"env" => state.permissions.revoke_env(),
"hrtime" => state.permissions.revoke_hrtime(),
_ => Ok(()),
- }?;
- ok_buf(empty_buf())
+ };
+ if let Err(e) = result {
+ return odd_future(e);
+ }
+ ok_future(empty_buf())
}
fn op_fetch(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
let inner = base.inner_as_fetch().unwrap();
+ let cmd_id = base.cmd_id();
let header = inner.header().unwrap();
assert!(header.is_request());
@@ -718,10 +732,19 @@ fn op_fetch(
Some(buf) => hyper::Body::from(Vec::from(&*buf)),
};
- let req = msg_util::deserialize_request(header, body)?;
+ let maybe_req = msg_util::deserialize_request(header, body);
+ if let Err(e) = maybe_req {
+ return odd_future(e);
+ }
+ let req = maybe_req.unwrap();
- let url_ = url::Url::parse(url).map_err(DenoError::from)?;
- state.check_net_url(url_)?;
+ let url_ = match url::Url::parse(url) {
+ Err(err) => return odd_future(DenoError::from(err)),
+ Ok(v) => v,
+ };
+ if let Err(e) = state.check_net_url(url_) {
+ return odd_future(e);
+ }
let client = http_util::get_client();
@@ -744,6 +767,7 @@ fn op_fetch(
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -752,12 +776,7 @@ fn op_fetch(
},
))
});
- if base.sync() {
- let result_buf = future.wait()?;
- Ok(Op::Sync(result_buf))
- } else {
- Ok(Op::Async(Box::new(future)))
- }
+ Box::new(future)
}
// This is just type conversion. Implement From trait?
@@ -775,17 +794,14 @@ where
}
}
-fn blocking<F>(is_sync: bool, f: F) -> CliOpResult
+fn blocking<F>(is_sync: bool, f: F) -> Box<OpWithError>
where
F: 'static + Send + FnOnce() -> DenoResult<Buf>,
{
if is_sync {
- let result_buf = f()?;
- Ok(Op::Sync(result_buf))
+ Box::new(futures::future::result(f()))
} else {
- Ok(Op::Async(Box::new(tokio_util::poll_fn(move || {
- convert_blocking(f)
- }))))
+ Box::new(tokio_util::poll_fn(move || convert_blocking(f)))
}
}
@@ -793,19 +809,22 @@ fn op_make_temp_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let base = Box::new(*base);
let inner = base.inner_as_make_temp_dir().unwrap();
+ let cmd_id = base.cmd_id();
// FIXME
- state.check_write("make_temp")?;
+ if let Err(e) = state.check_write("make_temp") {
+ return odd_future(e);
+ }
let dir = inner.dir().map(PathBuf::from);
let prefix = inner.prefix().map(String::from);
let suffix = inner.suffix().map(String::from);
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
// TODO(piscisaureus): use byte vector for paths, not a string.
// See https://github.com/denoland/deno/issues/627.
// We can't assume that paths are always valid utf8 strings.
@@ -824,6 +843,7 @@ fn op_make_temp_dir(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -838,14 +858,19 @@ fn op_mkdir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_mkdir().unwrap();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let recursive = inner.recursive();
let mode = inner.mode();
- state.check_write(&path_)?;
+ if let Err(e) = state.check_write(&path_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_mkdir {}", path_);
@@ -858,13 +883,18 @@ fn op_chmod(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_chmod().unwrap();
let _mode = inner.mode();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_write(&path_)?;
+ if let Err(e) = state.check_write(&path_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_chmod {}", &path_);
@@ -884,14 +914,16 @@ fn op_chown(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_chown().unwrap();
let path = String::from(inner.path().unwrap());
let uid = inner.uid();
let gid = inner.gid();
- state.check_write(&path)?;
+ if let Err(e) = state.check_write(&path) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_chown {}", &path);
@@ -906,11 +938,14 @@ fn op_open(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_open().unwrap();
- let (filename, filename_) = resolve_path(inner.filename().unwrap())?;
+ let (filename, filename_) = match resolve_path(inner.filename().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let mode = inner.mode().unwrap();
let mut open_options = tokio::fs::OpenOptions::new();
@@ -951,26 +986,35 @@ fn op_open(
match mode {
"r" => {
- state.check_read(&filename_)?;
+ if let Err(e) = state.check_read(&filename_) {
+ return odd_future(e);
+ }
}
"w" | "a" | "x" => {
- state.check_write(&filename_)?;
+ if let Err(e) = state.check_write(&filename_) {
+ return odd_future(e);
+ }
}
&_ => {
- state.check_read(&filename_)?;
- state.check_write(&filename_)?;
+ if let Err(e) = state.check_read(&filename_) {
+ return odd_future(e);
+ }
+ if let Err(e) = state.check_write(&filename_) {
+ return odd_future(e);
+ }
}
}
let op = open_options
.open(filename)
.map_err(DenoError::from)
- .and_then(move |fs_file| {
+ .and_then(move |fs_file| -> OpResult {
let resource = resources::add_fs_file(fs_file);
let builder = &mut FlatBufferBuilder::new();
let inner =
msg::OpenRes::create(builder, &msg::OpenResArgs { rid: resource.rid });
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -979,27 +1023,22 @@ fn op_open(
},
))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
fn op_close(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_close().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
resource.close();
- ok_buf(empty_buf())
+ ok_future(empty_buf())
}
}
}
@@ -1008,14 +1047,14 @@ fn op_kill(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_kill().unwrap();
let pid = inner.pid();
let signo = inner.signo();
match kill(pid, signo) {
- Ok(_) => ok_buf(empty_buf()),
- Err(e) => Err(e),
+ Ok(_) => ok_future(empty_buf()),
+ Err(e) => odd_future(e),
}
}
@@ -1023,13 +1062,13 @@ fn op_shutdown(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let how = inner.how();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(mut resource) => {
let shutdown_mode = match how {
0 => Shutdown::Read,
@@ -1049,12 +1088,13 @@ fn op_read(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_read().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = tokio::io::read(resource, data.unwrap())
.map_err(DenoError::from)
@@ -1068,6 +1108,7 @@ fn op_read(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1076,12 +1117,7 @@ fn op_read(
},
))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
}
}
@@ -1090,12 +1126,13 @@ fn op_write(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_write().unwrap();
let rid = inner.rid();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = tokio_write::write(resource, data.unwrap())
.map_err(DenoError::from)
@@ -1108,6 +1145,7 @@ fn op_write(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1116,12 +1154,7 @@ fn op_write(
},
))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
}
}
@@ -1130,24 +1163,20 @@ fn op_seek(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let _cmd_id = base.cmd_id();
let inner = base.inner_as_seek().unwrap();
let rid = inner.rid();
let offset = inner.offset();
let whence = inner.whence();
match resources::lookup(rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(resource) => {
let op = resources::seek(resource, offset, whence)
.and_then(move |_| Ok(empty_buf()));
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
}
}
@@ -1156,13 +1185,18 @@ fn op_remove(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_remove().unwrap();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let recursive = inner.recursive();
- state.check_write(&path_)?;
+ if let Err(e) = state.check_write(&path_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_remove {}", path.display());
@@ -1182,14 +1216,24 @@ fn op_copy_file(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_copy_file().unwrap();
- let (from, from_) = resolve_path(inner.from().unwrap())?;
- let (to, to_) = resolve_path(inner.to().unwrap())?;
+ let (from, from_) = match resolve_path(inner.from().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (to, to_) = match resolve_path(inner.to().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_read(&from_)?;
- state.check_write(&to_)?;
+ if let Err(e) = state.check_read(&from_) {
+ return odd_future(e);
+ }
+ if let Err(e) = state.check_write(&to_) {
+ return odd_future(e);
+ }
debug!("op_copy_file {} {}", from.display(), to.display());
blocking(base.sync(), move || {
@@ -1230,40 +1274,47 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
fn op_cwd(
_state: &ThreadSafeState,
-
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
- let path = std::env::current_dir()?;
- let builder = &mut FlatBufferBuilder::new();
- let cwd =
- builder.create_string(&path.into_os_string().into_string().unwrap());
- let inner = msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) });
- let response_buf = serialize_response(
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::CwdRes,
- ..Default::default()
- },
- );
- ok_buf(response_buf)
+ let cmd_id = base.cmd_id();
+ Box::new(futures::future::result(|| -> OpResult {
+ let path = std::env::current_dir()?;
+ let builder = &mut FlatBufferBuilder::new();
+ let cwd =
+ builder.create_string(&path.into_os_string().into_string().unwrap());
+ let inner =
+ msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) });
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::CwdRes,
+ ..Default::default()
+ },
+ ))
+ }()))
}
fn op_stat(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_stat().unwrap();
-
- let (filename, filename_) = resolve_path(inner.filename().unwrap())?;
+ let cmd_id = base.cmd_id();
+ let (filename, filename_) = match resolve_path(inner.filename().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let lstat = inner.lstat();
- state.check_read(&filename_)?;
+ if let Err(e) = state.check_read(&filename_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
let builder = &mut FlatBufferBuilder::new();
@@ -1290,6 +1341,7 @@ fn op_stat(
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1304,14 +1356,20 @@ fn op_read_dir(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_read_dir().unwrap();
- let (path, path_) = resolve_path(inner.path().unwrap())?;
+ let cmd_id = base.cmd_id();
+ let (path, path_) = match resolve_path(inner.path().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_read(&path_)?;
+ if let Err(e) = state.check_read(&path_) {
+ return odd_future(e);
+ }
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_read_dir {}", path.display());
let builder = &mut FlatBufferBuilder::new();
let entries: Vec<_> = fs::read_dir(path)?
@@ -1345,6 +1403,7 @@ fn op_read_dir(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1359,15 +1418,22 @@ fn op_rename(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_rename().unwrap();
- let (oldpath, _) = resolve_path(inner.oldpath().unwrap())?;
- let (newpath, newpath_) = resolve_path(inner.newpath().unwrap())?;
-
- state.check_write(&newpath_)?;
+ let (oldpath, _) = match resolve_path(inner.oldpath().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (newpath, newpath_) = match resolve_path(inner.newpath().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- blocking(base.sync(), move || {
+ if let Err(e) = state.check_write(&newpath_) {
+ return odd_future(e);
+ }
+ blocking(base.sync(), move || -> OpResult {
debug!("op_rename {} {}", oldpath.display(), newpath.display());
fs::rename(&oldpath, &newpath)?;
Ok(empty_buf())
@@ -1378,15 +1444,23 @@ fn op_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_link().unwrap();
- let (oldname, _) = resolve_path(inner.oldname().unwrap())?;
- let (newname, newname_) = resolve_path(inner.newname().unwrap())?;
+ let (oldname, _) = match resolve_path(inner.oldname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (newname, newname_) = match resolve_path(inner.newname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_write(&newname_)?;
+ if let Err(e) = state.check_write(&newname_) {
+ return odd_future(e);
+ }
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_link {} {}", oldname.display(), newname.display());
std::fs::hard_link(&oldname, &newname)?;
Ok(empty_buf())
@@ -1397,18 +1471,29 @@ fn op_symlink(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_symlink().unwrap();
- let (oldname, _) = resolve_path(inner.oldname().unwrap())?;
- let (newname, newname_) = resolve_path(inner.newname().unwrap())?;
+ let (oldname, _) = match resolve_path(inner.oldname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
+ let (newname, newname_) = match resolve_path(inner.newname().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- state.check_write(&newname_)?;
+ if let Err(e) = state.check_write(&newname_) {
+ return odd_future(e);
+ }
// TODO Use type for Windows.
if cfg!(windows) {
- return Err(errors::new(ErrorKind::Other, "Not implemented".to_string()));
+ return odd_future(errors::new(
+ ErrorKind::Other,
+ "Not implemented".to_string(),
+ ));
}
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_symlink {} {}", oldname.display(), newname.display());
#[cfg(any(unix))]
std::os::unix::fs::symlink(&oldname, &newname)?;
@@ -1420,15 +1505,20 @@ fn op_read_link(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_readlink().unwrap();
+ let cmd_id = base.cmd_id();
+ let (name, name_) = match resolve_path(inner.name().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
- let (name, name_) = resolve_path(inner.name().unwrap())?;
-
- state.check_read(&name_)?;
+ if let Err(e) = state.check_read(&name_) {
+ return odd_future(e);
+ }
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
debug!("op_read_link {}", name.display());
let path = fs::read_link(&name)?;
let builder = &mut FlatBufferBuilder::new();
@@ -1440,6 +1530,7 @@ fn op_read_link(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1454,10 +1545,10 @@ fn op_repl_start(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_repl_start().unwrap();
-
+ let cmd_id = base.cmd_id();
let history_file = String::from(inner.history_file().unwrap());
debug!("op_repl_start {}", history_file);
@@ -1470,7 +1561,8 @@ fn op_repl_start(
builder,
&msg::ReplStartResArgs { rid: resource.rid },
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1484,15 +1576,15 @@ fn op_repl_readline(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_repl_readline().unwrap();
-
+ let cmd_id = base.cmd_id();
let rid = inner.rid();
let prompt = inner.prompt().unwrap().to_owned();
debug!("op_repl_readline {} {}", rid, prompt);
- blocking(base.sync(), move || {
+ blocking(base.sync(), move || -> OpResult {
let repl = resources::get_repl(rid)?;
let line = repl.lock().unwrap().readline(&prompt)?;
@@ -1505,6 +1597,7 @@ fn op_repl_readline(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1519,14 +1612,19 @@ fn op_truncate(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_truncate().unwrap();
- let (filename, filename_) = resolve_path(inner.name().unwrap())?;
+ let (filename, filename_) = match resolve_path(inner.name().unwrap()) {
+ Err(err) => return odd_future(err),
+ Ok(v) => v,
+ };
let len = inner.len();
- state.check_write(&filename_)?;
+ if let Err(e) = state.check_write(&filename_) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_truncate {} {}", filename_, len);
@@ -1540,7 +1638,7 @@ fn op_utime(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
let inner = base.inner_as_utime().unwrap();
@@ -1548,7 +1646,9 @@ fn op_utime(
let atime = inner.atime();
let mtime = inner.mtime();
- state.check_write(&filename)?;
+ if let Err(e) = state.check_write(&filename) {
+ return odd_future(e);
+ }
blocking(base.sync(), move || {
debug!("op_utimes {} {} {}", filename, atime, mtime);
@@ -1561,35 +1661,41 @@ fn op_listen(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_listen().unwrap();
let network = inner.network().unwrap();
assert_eq!(network, "tcp");
let address = inner.address().unwrap();
- state.check_net(&address)?;
+ if let Err(e) = state.check_net(&address) {
+ return odd_future(e);
+ }
- let addr = resolve_addr(address).wait()?;
- let listener = TcpListener::bind(&addr)?;
- let resource = resources::add_tcp_listener(listener);
+ Box::new(futures::future::result((move || {
+ let addr = resolve_addr(address).wait()?;
+ let listener = TcpListener::bind(&addr)?;
+ let resource = resources::add_tcp_listener(listener);
- let builder = &mut FlatBufferBuilder::new();
- let inner =
- msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid });
- let response_buf = serialize_response(
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ListenRes,
- ..Default::default()
- },
- );
- ok_buf(response_buf)
+ let builder = &mut FlatBufferBuilder::new();
+ let inner = msg::ListenRes::create(
+ builder,
+ &msg::ListenResArgs { rid: resource.rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::ListenRes,
+ ..Default::default()
+ },
+ ))
+ })()))
}
-fn new_conn(tcp_stream: TcpStream) -> DenoResult<Buf> {
+fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
// TODO forward socket_addr to client.
@@ -1602,6 +1708,7 @@ fn new_conn(tcp_stream: TcpStream) -> DenoResult<Buf> {
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1615,24 +1722,21 @@ fn op_accept(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_accept().unwrap();
let server_rid = inner.rid();
match resources::lookup(server_rid) {
- None => Err(errors::bad_resource()),
+ None => odd_future(errors::bad_resource()),
Some(server_resource) => {
let op = tokio_util::accept(server_resource)
.map_err(DenoError::from)
- .and_then(move |(tcp_stream, _socket_addr)| new_conn(tcp_stream));
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ .and_then(move |(tcp_stream, _socket_addr)| {
+ new_conn(cmd_id, tcp_stream)
+ });
+ Box::new(op)
}
}
}
@@ -1641,15 +1745,17 @@ fn op_dial(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_dial().unwrap();
let network = inner.network().unwrap();
assert_eq!(network, "tcp"); // TODO Support others.
let address = inner.address().unwrap();
- state.check_net(&address)?;
+ if let Err(e) = state.check_net(&address) {
+ return odd_future(e);
+ }
let op =
resolve_addr(address)
@@ -1657,30 +1763,26 @@ fn op_dial(
.and_then(move |addr| {
TcpStream::connect(&addr)
.map_err(DenoError::from)
- .and_then(new_conn)
+ .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+ Box::new(op)
}
fn op_metrics(
state: &ThreadSafeState,
-
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let inner = msg::MetricsRes::create(
builder,
&msg::MetricsResArgs::from(&state.metrics),
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1692,10 +1794,11 @@ fn op_metrics(
fn op_resources(
_state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
let builder = &mut FlatBufferBuilder::new();
let serialized_resources = table_entries();
@@ -1722,7 +1825,8 @@ fn op_resources(
},
);
- ok_buf(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1744,12 +1848,13 @@ fn op_run(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(errors::no_async_support());
- }
+) -> Box<OpWithError> {
+ assert!(base.sync());
+ let cmd_id = base.cmd_id();
- state.check_run()?;
+ if let Err(e) = state.check_run() {
+ return odd_future(e);
+ }
assert!(data.is_none());
let inner = base.inner_as_run().unwrap();
@@ -1773,7 +1878,12 @@ fn op_run(
c.stderr(subprocess_stdio_map(inner.stderr()));
// Spawn the command.
- let child = c.spawn_async().map_err(DenoError::from)?;
+ let child = match c.spawn_async() {
+ Ok(v) => v,
+ Err(err) => {
+ return odd_future(err.into());
+ }
+ };
let pid = child.id();
let resources = resources::add_child(child);
@@ -1796,29 +1906,37 @@ fn op_run(
let builder = &mut FlatBufferBuilder::new();
let inner = msg::RunRes::create(builder, &res_args);
- Ok(Op::Sync(serialize_response(
+ ok_future(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::RunRes,
..Default::default()
},
- )))
+ ))
}
fn op_run_status(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_run_status().unwrap();
let rid = inner.rid();
- state.check_run()?;
+ if let Err(e) = state.check_run() {
+ return odd_future(e);
+ }
- let future = resources::child_status(rid)?;
+ let future = match resources::child_status(rid) {
+ Err(e) => {
+ return odd_future(e);
+ }
+ Ok(f) => f,
+ };
let future = future.and_then(move |run_status| {
let code = run_status.code();
@@ -1843,6 +1961,7 @@ fn op_run_status(
},
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1851,12 +1970,7 @@ fn op_run_status(
},
))
});
- if base.sync() {
- let buf = future.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(future)))
- }
+ Box::new(future)
}
struct GetMessageFuture {
@@ -1880,11 +1994,9 @@ fn op_worker_get_message(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
+ let cmd_id = base.cmd_id();
let op = GetMessageFuture {
state: state.clone(),
@@ -1900,6 +2012,7 @@ fn op_worker_get_message(
&msg::WorkerGetMessageResArgs { data },
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
@@ -1908,32 +2021,37 @@ fn op_worker_get_message(
},
))
});
- Ok(Op::Async(Box::new(op)))
+ Box::new(op)
}
/// Post message to host as guest worker
fn op_worker_post_message(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
+
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
let wc = state.worker_channels.lock().unwrap();
wc.0.clone()
};
- tx.send(d)
- .wait()
- .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
+ let op = tx.send(d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
- ok_buf(serialize_response(
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
}
/// Create worker as the host
@@ -1941,9 +2059,9 @@ fn op_create_worker(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_create_worker().unwrap();
let specifier = inner.specifier().unwrap();
@@ -1963,33 +2081,39 @@ fn op_create_worker(
js_check(worker.execute("denoMain()"));
js_check(worker.execute("workerMain()"));
- let module_specifier = ModuleSpecifier::resolve_root(specifier)?;
-
- let op = worker
- .execute_mod_async(&module_specifier, false)
- .and_then(move |()| {
- let mut workers_tl = parent_state.workers.lock().unwrap();
- workers_tl.insert(rid, worker.shared());
- let builder = &mut FlatBufferBuilder::new();
- let msg_inner = msg::CreateWorkerRes::create(
- builder,
- &msg::CreateWorkerResArgs { rid },
- );
- Ok(serialize_response(
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::CreateWorkerRes,
- ..Default::default()
- },
- ))
- }).map_err(|err| match err {
- errors::RustOrJsError::Js(_) => errors::worker_init_failed(),
- errors::RustOrJsError::Rust(err) => err,
- });
+ let op = ModuleSpecifier::resolve_root(specifier)
+ .and_then(|module_specifier| {
+ Ok(
+ worker
+ .execute_mod_async(&module_specifier, false)
+ .and_then(move |()| {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner = msg::CreateWorkerRes::create(
+ builder,
+ &msg::CreateWorkerResArgs { rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ ))
+ }).map_err(|err| match err {
+ errors::RustOrJsError::Js(_) => errors::worker_init_failed(),
+ errors::RustOrJsError::Rust(err) => err,
+ }),
+ )
+ }).map_err(DenoError::from);
- let result = op.wait()?;
- Ok(Op::Sync(result))
+ Box::new(match op {
+ Ok(op) => future::Either::A(op),
+ Err(err) => future::Either::B(future::result(Err(err))),
+ })
}
/// Return when the worker closes
@@ -1997,12 +2121,9 @@ fn op_host_get_worker_closed(
state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_worker_closed().unwrap();
let rid = inner.rid();
let state = state.clone();
@@ -2013,17 +2134,17 @@ fn op_host_get_worker_closed(
worker.clone()
};
- let op = Box::new(shared_worker_future.then(move |_result| {
+ Box::new(shared_worker_future.then(move |_result| {
let builder = &mut FlatBufferBuilder::new();
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
))
- }));
- Ok(Op::Async(Box::new(op)))
+ }))
}
/// Get message from guest worker as host
@@ -2031,12 +2152,9 @@ fn op_host_get_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(errors::no_sync_support());
- }
+) -> Box<OpWithError> {
assert!(data.is_none());
-
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_message().unwrap();
let rid = inner.rid();
@@ -2051,6 +2169,7 @@ fn op_host_get_message(
&msg::HostGetMessageResArgs { data },
);
Ok(serialize_response(
+ cmd_id,
builder,
msg::BaseArgs {
inner: Some(msg_inner.as_union_value()),
@@ -2059,7 +2178,7 @@ fn op_host_get_message(
},
))
});
- Ok(Op::Async(Box::new(op)))
+ Box::new(op)
}
/// Post message to guest worker as host
@@ -2067,30 +2186,34 @@ fn op_host_post_message(
_state: &ThreadSafeState,
base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
let inner = base.inner_as_host_post_message().unwrap();
let rid = inner.rid();
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- resources::post_message_to_worker(rid, d)
- .wait()
- .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
+ let op = resources::post_message_to_worker(rid, d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
- ok_buf(serialize_response(
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
}
fn op_get_random_values(
state: &ThreadSafeState,
_base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> CliOpResult {
+) -> Box<OpWithError> {
if let Some(ref seeded_rng) = state.seeded_rng {
let mut rng = seeded_rng.lock().unwrap();
rng.fill(&mut data.unwrap()[..]);
@@ -2099,5 +2222,5 @@ fn op_get_random_values(
rng.fill(&mut data.unwrap()[..]);
}
- ok_buf(empty_buf())
+ Box::new(ok_future(empty_buf()))
}
diff --git a/cli/state.rs b/cli/state.rs
index aa4690d44..f5eb8ae7a 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -15,9 +15,9 @@ use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
use deno::Buf;
-use deno::CoreOp;
use deno::Loader;
use deno::ModuleSpecifier;
+use deno::Op;
use deno::PinnedBuf;
use futures::future::Either;
use futures::future::Shared;
@@ -106,11 +106,7 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
- pub fn dispatch(
- &self,
- control: &[u8],
- zero_copy: Option<PinnedBuf>,
- ) -> CoreOp {
+ pub fn dispatch(&self, control: &[u8], zero_copy: Option<PinnedBuf>) -> Op {
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
}
}
diff --git a/core/core.d.ts b/core/core.d.ts
index 0c530e343..b1d1ac57f 100644
--- a/core/core.d.ts
+++ b/core/core.d.ts
@@ -4,13 +4,15 @@
// Deno core. These are not intended to be used directly by runtime users of
// Deno and therefore do not flow through to the runtime type library.
-declare type MessageCallback = (promiseId: number, msg: Uint8Array) => void;
+declare interface MessageCallback {
+ (msg: Uint8Array): void;
+}
declare interface DenoCore {
dispatch(
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
- ): Uint8Array | null | number;
+ ): Uint8Array | null;
setAsyncHandler(cb: MessageCallback): void;
sharedQueue: {
head(): number;
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index 7e678aecd..8eb764b55 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -13,6 +13,7 @@ const responseBuf = new Uint8Array(
.map(c => c.charCodeAt(0))
);
const promiseMap = new Map();
+let nextPromiseId = 1;
function assert(cond) {
if (!cond) {
@@ -36,8 +37,8 @@ const scratchBytes = new Uint8Array(
);
assert(scratchBytes.byteLength === 4 * 4);
-function send(isSync, opId, arg, zeroCopy = null) {
- scratch32[0] = isSync;
+function send(promiseId, opId, arg, zeroCopy = null) {
+ scratch32[0] = promiseId;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
@@ -46,9 +47,10 @@ function send(isSync, opId, arg, zeroCopy = null) {
/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopy = null) {
- const promiseId = send(false, opId, arg, zeroCopy);
+ const promiseId = nextPromiseId++;
const p = createResolvable();
promiseMap.set(promiseId, p);
+ send(promiseId, opId, arg, zeroCopy);
return p;
}
@@ -56,7 +58,7 @@ function recordFromBuf(buf) {
assert(buf.byteLength === 16);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
- isSync: !!buf32[0],
+ promiseId: buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
@@ -65,14 +67,14 @@ function recordFromBuf(buf) {
/** Returns i32 number */
function sendSync(opId, arg) {
- const buf = send(true, opId, arg);
+ const buf = send(0, opId, arg);
const record = recordFromBuf(buf);
return record.result;
}
-function handleAsyncMsgFromRust(promiseId, buf) {
+function handleAsyncMsgFromRust(buf) {
const record = recordFromBuf(buf);
- const { result } = record;
+ const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 0f0cd6a4b..e8c5ec1b7 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -44,7 +44,7 @@ const OP_CLOSE: i32 = 5;
#[derive(Clone, Debug, PartialEq)]
pub struct Record {
- pub is_sync: i32,
+ pub promise_id: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
@@ -52,8 +52,8 @@ pub struct Record {
impl Into<Buf> for Record {
fn into(self) -> Buf {
- let buf32 =
- vec![self.is_sync, self.op_id, self.arg, self.result].into_boxed_slice();
+ let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
+ .into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
unsafe { Box::from_raw(ptr) }
}
@@ -65,7 +65,7 @@ impl From<&[u8]> for Record {
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
Record {
- is_sync: ints[0],
+ promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
@@ -81,7 +81,7 @@ impl From<Buf> for Record {
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
Record {
- is_sync: ints[0],
+ promise_id: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
@@ -92,7 +92,7 @@ impl From<Buf> for Record {
#[test]
fn test_record_from() {
let r = Record {
- is_sync: 1,
+ promise_id: 1,
op_id: 2,
arg: 3,
result: 4,
@@ -111,9 +111,9 @@ fn test_record_from() {
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
-fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
+fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op {
let record = Record::from(control);
- let is_sync = record.is_sync == 1;
+ let is_sync = record.promise_id == 0;
let http_bench_op = match record.op_id {
OP_LISTEN => {
assert!(is_sync);
diff --git a/core/isolate.rs b/core/isolate.rs
index ae14c0040..14e1b88aa 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -21,29 +21,21 @@ use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_char;
-use libc::c_int;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
-use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
pub type Buf = Box<[u8]>;
-pub type OpAsyncFuture<I, E> = Box<dyn Future<Item = I, Error = E> + Send>;
+pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>;
-pub enum Op<E> {
+pub enum Op {
Sync(Buf),
- Async(OpAsyncFuture<Buf, E>),
+ Async(OpAsyncFuture),
}
-pub type CoreError = ();
-
-type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>;
-
-pub type CoreOp = Op<CoreError>;
-
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@@ -76,9 +68,7 @@ pub enum StartupData<'a> {
None,
}
-pub type OpResult<E> = Result<Op<E>, E>;
-
-type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
+type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op;
pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>;
type DynImportFn = Fn(&str, &str) -> DynImportFuture;
@@ -103,12 +93,6 @@ impl Future for DynImport {
}
}
-enum ResponseData {
- None,
- Buffer(deno_buf),
- PromiseId(c_int),
-}
-
/// A single execution context of JavaScript. Corresponds roughly to the "Web
/// Worker" concept in the DOM. An Isolate is a Future that can be used with
/// Tokio. The Isolate future complete when there is an error or when all
@@ -120,15 +104,14 @@ enum ResponseData {
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>,
- dispatch: Option<Arc<CoreDispatchFn>>,
+ dispatch: Option<Arc<DispatchFn>>,
dyn_import: Option<Arc<DynImportFn>>,
needs_init: bool,
shared: SharedQueue,
- pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
+ pending_ops: FuturesUnordered<OpAsyncFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
- next_promise_id: AtomicI32,
}
unsafe impl Send for Isolate {}
@@ -193,7 +176,6 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
- next_promise_id: AtomicI32::new(1),
}
}
@@ -202,7 +184,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
- F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
+ F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@@ -257,10 +239,6 @@ impl Isolate {
}
}
- pub fn get_next_promise_id(&self) -> i32 {
- self.next_promise_id.fetch_add(1, Ordering::SeqCst)
- }
-
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_argv0: deno_buf,
@@ -301,17 +279,9 @@ impl Isolate {
// return value.
// TODO(ry) check that if JSError thrown during respond(), that it will be
// picked up.
- let _ =
- isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())));
+ let _ = isolate.respond(Some(&buf));
}
Op::Async(fut) => {
- let promise_id = isolate.get_next_promise_id();
- let _ = isolate.respond(ResponseData::PromiseId(promise_id));
- let fut = Box::new(fut.and_then(
- move |buf| -> Result<(c_int, Buf), CoreError> {
- Ok((promise_id, buf))
- },
- ));
isolate.pending_ops.push(fut);
isolate.have_unpolled_ops = true;
}
@@ -370,34 +340,14 @@ impl Isolate {
}
}
- // the result type is a placeholder for a more specific enum type
- fn respond(&mut self, data: ResponseData) -> Result<(), JSError> {
- match data {
- ResponseData::PromiseId(pid) => unsafe {
- libdeno::deno_respond(
- self.libdeno_isolate,
- self.as_raw_ptr(),
- deno_buf::empty(),
- &pid,
- )
- },
- ResponseData::Buffer(r) => unsafe {
- libdeno::deno_respond(
- self.libdeno_isolate,
- self.as_raw_ptr(),
- r,
- null(),
- )
- },
- ResponseData::None => unsafe {
- libdeno::deno_respond(
- self.libdeno_isolate,
- self.as_raw_ptr(),
- deno_buf::empty(),
- null(),
- )
- },
+ fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> {
+ let buf = match maybe_buf {
+ None => deno_buf::empty(),
+ Some(r) => deno_buf::from(r),
};
+ unsafe {
+ libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
+ }
if let Some(err) = self.last_exception() {
Err(err)
} else {
@@ -575,7 +525,7 @@ impl Future for Isolate {
self.shared_init();
- let mut overflow_response: Option<(c_int, Buf)> = None;
+ let mut overflow_response: Option<Buf> = None;
loop {
// If there are any pending dyn_import futures, do those first.
@@ -596,13 +546,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
- Ok(Ready(Some(op))) => {
- let successful_push = self.shared.push(op.0, &op.1);
+ Ok(Ready(Some(buf))) => {
+ let successful_push = self.shared.push(&buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
- overflow_response = Some(op);
+ overflow_response = Some(buf);
break;
}
}
@@ -610,16 +560,14 @@ impl Future for Isolate {
}
if self.shared.size() > 0 {
- self.respond(ResponseData::None)?;
+ self.respond(None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if overflow_response.is_some() {
- let op = overflow_response.take().unwrap();
- let promise_id_bytes = op.0.to_be_bytes();
- let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into();
- self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?;
+ let buf = overflow_response.take().unwrap();
+ self.respond(Some(&buf))?;
}
self.check_promise_errors();
@@ -716,7 +664,7 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
- isolate.set_dispatch(move |control, _| -> CoreOp {
+ isolate.set_dispatch(move |control, _| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
@@ -886,7 +834,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
- Deno.core.setAsyncHandler((promiseId, buf) => {
+ Deno.core.setAsyncHandler((buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
nrecv++;
@@ -1077,7 +1025,7 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
@@ -1099,7 +1047,7 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
@@ -1120,7 +1068,7 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((cmdId, buf) => {
+ Deno.core.setAsyncHandler((buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
@@ -1128,8 +1076,8 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
- // Async messages always have number type response.
- assert(typeof response == "number");
+ // Async messages always have null response.
+ assert(response == null);
assert(asyncRecv == 0);
"#,
));
@@ -1149,7 +1097,7 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((cmdId, buf) => {
+ Deno.core.setAsyncHandler((buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1157,7 +1105,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
- assert(typeof response == "number");
+ assert(response == null);
assert(asyncRecv == 0);
"#,
));
@@ -1177,7 +1125,7 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((cmdId, buf) => {
+ Deno.core.setAsyncHandler((buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1185,7 +1133,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
- assert(typeof response == "number");
+ assert(response == null);
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows
diff --git a/core/libdeno.rs b/core/libdeno.rs
index a17a8e521..84f21e89e 100644
--- a/core/libdeno.rs
+++ b/core/libdeno.rs
@@ -267,7 +267,6 @@ extern "C" {
i: *const isolate,
user_data: *const c_void,
buf: deno_buf,
- promise_id: *const c_int,
);
pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf);
pub fn deno_execute(
diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc
index 30f82b6cc..8a3a56156 100644
--- a/core/libdeno/api.cc
+++ b/core/libdeno/api.cc
@@ -153,15 +153,11 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) {
auto _ = deno::PinnedBuf(buf);
}
-void deno_respond(Deno* d_, void* user_data, deno_buf buf, int* promise_id) {
+void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
auto* d = unwrap(d_);
if (d->current_args_ != nullptr) {
// Synchronous response.
- if (promise_id != nullptr) {
- auto number = v8::Number::New(d->isolate_, *promise_id);
- d->current_args_->GetReturnValue().Set(number);
- } else {
- CHECK_NOT_NULL(buf.data_ptr);
+ if (buf.data_ptr != nullptr) {
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
}
diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h
index 4f29f2c7a..745285554 100644
--- a/core/libdeno/deno.h
+++ b/core/libdeno/deno.h
@@ -81,10 +81,8 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// deno_respond sends up to one message back for every deno_recv_cb made.
//
// If this is called during deno_recv_cb, the issuing libdeno.send() in
-// javascript will synchronously return the specified promise_id(number)
-// or buf(Uint8Array) (or null if buf and promise_id are both null/empty).
-// Calling with non-null for both buf and promise_id will result in the
-// promise_id being returned.
+// javascript will synchronously return the specified buf as an ArrayBuffer (or
+// null if buf is empty).
//
// If this is called after deno_recv_cb has returned, the deno_respond
// will call into the JS callback specified by libdeno.recv().
@@ -94,7 +92,7 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// releasing its memory.)
//
// If a JS exception was encountered, deno_last_exception() will be non-NULL.
-void deno_respond(Deno* d, void* user_data, deno_buf buf, int* promise_id);
+void deno_respond(Deno* d, void* user_data, deno_buf buf);
// consumes zero_copy
void deno_pinned_buf_delete(deno_pinned_buf* buf);
diff --git a/core/libdeno/libdeno.d.ts b/core/libdeno/libdeno.d.ts
index 093e846ab..1bc7367d9 100644
--- a/core/libdeno/libdeno.d.ts
+++ b/core/libdeno/libdeno.d.ts
@@ -12,13 +12,14 @@ interface EvalErrorInfo {
thrown: any;
}
-declare type MessageCallbackInternal = (msg: Uint8Array) => void;
+declare interface MessageCallback {
+ (msg: Uint8Array): void;
+}
declare interface DenoCore {
- recv(cb: MessageCallbackInternal): void;
+ recv(cb: MessageCallback): void;
send(
- cmdId: number,
control: null | ArrayBufferView,
data?: ArrayBufferView
): null | Uint8Array;
diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc
index b72a8e098..485c95bff 100644
--- a/core/libdeno/libdeno_test.cc
+++ b/core/libdeno/libdeno_test.cc
@@ -75,7 +75,7 @@ TEST(LibDenoTest, RecvReturnBar) {
EXPECT_EQ(buf.data_ptr[1], 'b');
EXPECT_EQ(buf.data_ptr[2], 'c');
uint8_t response[] = {'b', 'a', 'r'};
- deno_respond(d, user_data, {response, sizeof response}, nullptr);
+ deno_respond(d, user_data, {response, sizeof response});
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr});
deno_execute(d, d, "a.js", "RecvReturnBar()");
diff --git a/core/shared_queue.js b/core/shared_queue.js
index b413f011e..75f370ce4 100644
--- a/core/shared_queue.js
+++ b/core/shared_queue.js
@@ -151,27 +151,14 @@ SharedQueue Binary Layout
function handleAsyncMsgFromRust(buf) {
if (buf) {
- handleAsyncMsgFromRustInner(buf);
+ asyncHandler(buf);
} else {
while ((buf = shift()) != null) {
- handleAsyncMsgFromRustInner(buf);
+ asyncHandler(buf);
}
}
}
- function handleAsyncMsgFromRustInner(buf) {
- // DataView to extract cmdId value.
- const dataView = new DataView(buf.buffer, buf.byteOffset, 4);
- const promiseId = dataView.getInt32(0);
- // Uint8 buffer view shifted right and shortened 4 bytes to remove cmdId from view window.
- const bufViewFinal = new Uint8Array(
- buf.buffer,
- buf.byteOffset + 4,
- buf.byteLength - 4
- );
- asyncHandler(promiseId, bufViewFinal);
- }
-
function dispatch(control, zeroCopy = null) {
maybeInit();
// First try to push control to shared.
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index 1460fb172..c33a37b90 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -17,7 +17,6 @@ SharedQueue Binary Layout
*/
use crate::libdeno::deno_buf;
-use libc::c_int;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@@ -153,19 +152,17 @@ impl SharedQueue {
Some(&self.bytes[off..end])
}
- pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool {
+ pub fn push(&mut self, record: &[u8]) -> bool {
let off = self.head();
- let end = off + record.len() + 4;
+ let end = off + record.len();
let index = self.num_records();
if end > self.bytes.len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
self.set_end(index, end);
- assert_eq!(end - off, record.len() + 4);
- let pid_bytes = promise_id.to_be_bytes();
- self.bytes[off..off + 4].copy_from_slice(&pid_bytes);
- self.bytes[off + 4..end].copy_from_slice(record);
+ assert_eq!(end - off, record.len());
+ self.bytes[off..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = end as u32;
@@ -192,30 +189,30 @@ mod tests {
assert!(h > 0);
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
- let len = r.len() + h + 4;
- assert!(q.push(1, &r));
+ let len = r.len() + h;
+ assert!(q.push(&r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
- assert!(q.push(1, &r));
+ assert!(q.push(&r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
- assert!(q.push(1, &r));
+ assert!(q.push(&r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
let r = q.shift().unwrap();
- assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice());
+ assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
let r = q.shift().unwrap();
- assert_eq!(&r[4..], vec![6, 7].as_slice());
+ assert_eq!(r, vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
let r = q.shift().unwrap();
- assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice());
+ assert_eq!(r, vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
@@ -235,19 +232,19 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2))));
+ assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1)));
assert_eq!(q.size(), 1);
- assert!(!q.push(1, &alloc_buf(2)));
+ assert!(!q.push(&alloc_buf(2)));
assert_eq!(q.size(), 1);
- assert!(q.push(1, &alloc_buf(1)));
+ assert!(q.push(&alloc_buf(1)));
assert_eq!(q.size(), 2);
- assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4);
+ assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
assert_eq!(q.size(), 1);
- assert!(!q.push(1, &alloc_buf(1)));
+ assert!(!q.push(&alloc_buf(1)));
- assert_eq!(q.shift().unwrap().len(), 1 + 4);
+ assert_eq!(q.shift().unwrap().len(), 1);
assert_eq!(q.size(), 0);
}
@@ -255,11 +252,11 @@ mod tests {
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
- assert!(q.push(1, &alloc_buf(1)))
+ assert!(q.push(&alloc_buf(1)))
}
- assert_eq!(q.push(1, &alloc_buf(1)), false);
+ assert_eq!(q.push(&alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
- assert_eq!(q.shift().unwrap().len(), 1 + 4);
- assert_eq!(q.push(1, &alloc_buf(1)), false);
+ assert_eq!(q.shift().unwrap().len(), 1);
+ assert_eq!(q.push(&alloc_buf(1)), false);
}
}
diff --git a/js/dispatch.ts b/js/dispatch.ts
index 36f97363f..0c6e70709 100644
--- a/js/dispatch.ts
+++ b/js/dispatch.ts
@@ -5,30 +5,36 @@ import * as msg from "gen/cli/msg_generated";
import * as errors from "./errors";
import * as util from "./util";
import {
+ nextPromiseId,
recordFromBufMinimal,
handleAsyncMsgFromRustMinimal
} from "./dispatch_minimal";
const promiseTable = new Map<number, util.Resolvable<msg.Base>>();
-function flatbufferRecordFromBuf(buf: Uint8Array): msg.Base {
+interface FlatbufferRecord {
+ promiseId: number;
+ base: msg.Base;
+}
+
+function flatbufferRecordFromBuf(buf: Uint8Array): FlatbufferRecord {
const bb = new flatbuffers.ByteBuffer(buf);
const base = msg.Base.getRootAsBase(bb);
- return base;
+ return {
+ promiseId: base.cmdId(),
+ base
+ };
}
-export function handleAsyncMsgFromRust(
- promiseId: number,
- ui8: Uint8Array
-): void {
+export function handleAsyncMsgFromRust(ui8: Uint8Array): void {
const buf32 = new Int32Array(ui8.buffer, ui8.byteOffset, ui8.byteLength / 4);
const recordMin = recordFromBufMinimal(buf32);
if (recordMin) {
// Fast and new
- handleAsyncMsgFromRustMinimal(promiseId, ui8, recordMin);
+ handleAsyncMsgFromRustMinimal(ui8, recordMin);
} else {
// Legacy
- let base = flatbufferRecordFromBuf(ui8);
+ let { promiseId, base } = flatbufferRecordFromBuf(ui8);
const promise = promiseTable.get(promiseId);
util.assert(promise != null, `Expecting promise in table. ${promiseId}`);
promiseTable.delete(promiseId);
@@ -50,26 +56,14 @@ function sendInternal(
innerType: msg.Any,
inner: flatbuffers.Offset,
zeroCopy: undefined | ArrayBufferView,
- isSync: true
-): Uint8Array | null;
-function sendInternal(
- builder: flatbuffers.Builder,
- innerType: msg.Any,
- inner: flatbuffers.Offset,
- zeroCopy: undefined | ArrayBufferView,
- isSync: false
-): Promise<msg.Base>;
-function sendInternal(
- builder: flatbuffers.Builder,
- innerType: msg.Any,
- inner: flatbuffers.Offset,
- zeroCopy: undefined | ArrayBufferView,
- isSync: boolean
-): Promise<msg.Base> | Uint8Array | null {
+ sync = true
+): [number, null | Uint8Array] {
+ const cmdId = nextPromiseId();
msg.Base.startBase(builder);
- msg.Base.addSync(builder, isSync);
msg.Base.addInner(builder, inner);
msg.Base.addInnerType(builder, innerType);
+ msg.Base.addSync(builder, sync);
+ msg.Base.addCmdId(builder, cmdId);
builder.finish(msg.Base.endBase(builder));
const control = builder.asUint8Array();
@@ -80,25 +74,7 @@ function sendInternal(
);
builder.inUse = false;
-
- if (typeof response === "number") {
- const promise = util.createResolvable<msg.Base>();
- promiseTable.set(response, promise);
- util.assert(!isSync);
- return promise;
- } else {
- if (!isSync) {
- util.assert(response !== null);
- const base = flatbufferRecordFromBuf(response as Uint8Array);
- const err = errors.maybeError(base);
- if (err != null) {
- return Promise.reject(err);
- } else {
- return Promise.resolve(base);
- }
- }
- return response;
- }
+ return [cmdId, response];
}
// @internal
@@ -108,7 +84,16 @@ export function sendAsync(
inner: flatbuffers.Offset,
data?: ArrayBufferView
): Promise<msg.Base> {
- const promise = sendInternal(builder, innerType, inner, data, false);
+ const [cmdId, response] = sendInternal(
+ builder,
+ innerType,
+ inner,
+ data,
+ false
+ );
+ util.assert(response == null); // null indicates async.
+ const promise = util.createResolvable<msg.Base>();
+ promiseTable.set(cmdId, promise);
return promise;
}
@@ -119,8 +104,10 @@ export function sendSync(
inner: flatbuffers.Offset,
data?: ArrayBufferView
): null | msg.Base {
- const response = sendInternal(builder, innerType, inner, data, true);
- if (response == null || response.length === 0) {
+ const [cmdId, response] = sendInternal(builder, innerType, inner, data, true);
+ util.assert(cmdId >= 0);
+ util.assert(response != null); // null indicates async.
+ if (response!.length === 0) {
return null;
} else {
const bb = new flatbuffers.ByteBuffer(response!);
diff --git a/js/dispatch_minimal.ts b/js/dispatch_minimal.ts
index bf9065f56..17d328110 100644
--- a/js/dispatch_minimal.ts
+++ b/js/dispatch_minimal.ts
@@ -5,8 +5,14 @@ import { core } from "./core";
const DISPATCH_MINIMAL_TOKEN = 0xcafe;
const promiseTableMin = new Map<number, util.Resolvable<number>>();
+let _nextPromiseId = 0;
+
+export function nextPromiseId(): number {
+ return _nextPromiseId++;
+}
export interface RecordMinimal {
+ promiseId: number;
opId: number;
arg: number;
result: number;
@@ -22,9 +28,10 @@ export function hasMinimalToken(i32: Int32Array): boolean {
export function recordFromBufMinimal(buf32: Int32Array): null | RecordMinimal {
if (hasMinimalToken(buf32)) {
return {
- opId: buf32[1],
- arg: buf32[2],
- result: buf32[3]
+ promiseId: buf32[1],
+ opId: buf32[2],
+ arg: buf32[3],
+ result: buf32[4]
};
}
return null;
@@ -39,13 +46,12 @@ const scratchBytes = new Uint8Array(
util.assert(scratchBytes.byteLength === scratch32.length * 4);
export function handleAsyncMsgFromRustMinimal(
- promiseId: number,
ui8: Uint8Array,
record: RecordMinimal
): void {
// Fast and new
util.log("minimal handleAsyncMsgFromRust ", ui8.length);
- const { result } = record;
+ const { promiseId, result } = record;
const promise = promiseTableMin.get(promiseId);
promiseTableMin.delete(promiseId);
promise!.resolve(result);
@@ -56,16 +62,16 @@ export function sendAsyncMinimal(
arg: number,
zeroCopy: Uint8Array
): Promise<number> {
- scratch32[0] = DISPATCH_MINIMAL_TOKEN;
- scratch32[1] = opId;
- scratch32[2] = arg;
+ const promiseId = nextPromiseId(); // AKA cmdId
- const promiseId = core.dispatch(scratchBytes, zeroCopy);
-
- util.assert(typeof promiseId == "number");
+ scratch32[0] = DISPATCH_MINIMAL_TOKEN;
+ scratch32[1] = promiseId;
+ scratch32[2] = opId;
+ scratch32[3] = arg;
const promise = util.createResolvable<number>();
- promiseTableMin.set(promiseId as number, promise);
+ promiseTableMin.set(promiseId, promise);
+ core.dispatch(scratchBytes, zeroCopy);
return promise;
}