summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron O'Mullan <aaron.omullan@gmail.com>2021-04-11 07:05:43 +0200
committerGitHub <noreply@github.com>2021-04-11 07:05:43 +0200
commit29eca72fea4f1e160a8d76d2ebda26e2c48b9658 (patch)
treebe8db9d2253a1f64bda23a01b035932b6dc5d34b
parent8aa0d5f96ed418e21efb99967f1f6b7fea0dc87f (diff)
core: avoid async op future reboxing to bundle PromiseId (#10123)
-rw-r--r--core/bindings.rs9
-rw-r--r--core/ops.rs6
-rw-r--r--core/ops_bin.rs6
-rw-r--r--core/ops_json.rs3
-rw-r--r--core/runtime.rs8
-rw-r--r--test_plugin/src/lib.rs2
6 files changed, 18 insertions, 16 deletions
diff --git a/core/bindings.rs b/core/bindings.rs
index f086e1f9c..eaddcbfbc 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -9,7 +9,6 @@ use crate::OpResponse;
use crate::OpTable;
use crate::PromiseId;
use crate::ZeroCopyBuf;
-use futures::future::FutureExt;
use rusty_v8 as v8;
use serde::Serialize;
use serde_v8::to_v8;
@@ -433,7 +432,7 @@ fn send<'s>(
}
};
- let payload = OpPayload::new(scope, v);
+ let payload = OpPayload::new(scope, v, promise_id);
let op = OpTable::route_op(op_id, state.op_state.clone(), payload, buf);
match op {
Op::Sync(resp) => match resp {
@@ -445,13 +444,11 @@ fn send<'s>(
}
},
Op::Async(fut) => {
- let fut2 = fut.map(move |resp| (promise_id, resp));
- state.pending_ops.push(fut2.boxed_local());
+ state.pending_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
- let fut2 = fut.map(move |resp| (promise_id, resp));
- state.pending_unref_ops.push(fut2.boxed_local());
+ state.pending_unref_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::NotFound => {
diff --git a/core/ops.rs b/core/ops.rs
index caf984e00..53aec4ae4 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -21,7 +21,7 @@ use std::pin::Pin;
use std::rc::Rc;
pub type PromiseId = u64;
-pub type OpAsyncFuture = Pin<Box<dyn Future<Output = OpResponse>>>;
+pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>;
pub type OpFn =
dyn Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static;
pub type OpId = usize;
@@ -29,16 +29,19 @@ pub type OpId = usize;
pub struct OpPayload<'a, 'b, 'c> {
pub(crate) scope: Option<&'a mut v8::HandleScope<'b>>,
pub(crate) value: Option<v8::Local<'c, v8::Value>>,
+ pub(crate) promise_id: PromiseId,
}
impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
pub fn new(
scope: &'a mut v8::HandleScope<'b>,
value: v8::Local<'c, v8::Value>,
+ promise_id: PromiseId,
) -> Self {
Self {
scope: Some(scope),
value: Some(value),
+ promise_id,
}
}
@@ -46,6 +49,7 @@ impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
Self {
scope: None,
value: None,
+ promise_id: 0,
}
}
diff --git a/core/ops_bin.rs b/core/ops_bin.rs
index b19782449..c4c57f4b9 100644
--- a/core/ops_bin.rs
+++ b/core/ops_bin.rs
@@ -132,11 +132,11 @@ where
p: OpPayload,
b: Option<ZeroCopyBuf>|
-> Op {
+ let pid = p.promise_id;
let min_arg: u32 = p.deserialize().unwrap();
let fut = op_fn(state.clone(), min_arg, b)
- .map(move |result| serialize_bin_result(result, state));
- let temp = Box::pin(fut);
- Op::Async(temp)
+ .map(move |result| (pid, serialize_bin_result(result, state)));
+ Op::Async(Box::pin(fut))
},
)
}
diff --git a/core/ops_json.rs b/core/ops_json.rs
index 0efd44a90..3e2b532d0 100644
--- a/core/ops_json.rs
+++ b/core/ops_json.rs
@@ -85,12 +85,13 @@ where
p: OpPayload,
buf: Option<ZeroCopyBuf>|
-> Result<Op, AnyError> {
+ let pid = p.promise_id;
// Parse args
let args = p.deserialize()?;
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), args, buf)
- .map(move |result| serialize_op_result(result, state));
+ .map(move |result| (pid, serialize_op_result(result, state)));
Ok(Op::Async(Box::pin(fut)))
};
diff --git a/core/runtime.rs b/core/runtime.rs
index 04c6ca1af..28f015fda 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -1506,7 +1506,7 @@ pub mod tests {
Mode::Async => {
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
- let resp = OpResponse::Value(Box::new(43));
+ let resp = (0, OpResponse::Value(Box::new(43)));
Op::Async(Box::pin(futures::future::ready(resp)))
}
Mode::AsyncUnref => {
@@ -1515,7 +1515,7 @@ pub mod tests {
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
- OpResponse::Value(Box::new(43))
+ (0, OpResponse::Value(Box::new(43)))
};
Op::AsyncUnref(Box::pin(fut))
}
@@ -1526,7 +1526,7 @@ pub mod tests {
}
let resp = OpResponse::Value(Box::new(43));
- Op::Async(Box::pin(futures::future::ready(resp)))
+ Op::Async(Box::pin(futures::future::ready((0, resp))))
}
}
}
@@ -1970,7 +1970,7 @@ pub mod tests {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
let control: u8 = payload.deserialize().unwrap();
assert_eq!(control, 42);
- let resp = OpResponse::Value(Box::new(43));
+ let resp = (0, OpResponse::Value(Box::new(43)));
Op::Async(Box::pin(futures::future::ready(resp)))
};
diff --git a/test_plugin/src/lib.rs b/test_plugin/src/lib.rs
index b84dcef48..51ed6d499 100644
--- a/test_plugin/src/lib.rs
+++ b/test_plugin/src/lib.rs
@@ -48,7 +48,7 @@ fn op_test_async(
assert!(rx.await.is_ok());
let result = b"test";
let result_box: Box<[u8]> = Box::new(*result);
- OpResponse::Buffer(result_box)
+ (0, OpResponse::Buffer(result_box))
};
Op::Async(fut.boxed())