diff options
-rw-r--r-- | core/bindings.rs | 9 | ||||
-rw-r--r-- | core/ops.rs | 6 | ||||
-rw-r--r-- | core/ops_bin.rs | 6 | ||||
-rw-r--r-- | core/ops_json.rs | 3 | ||||
-rw-r--r-- | core/runtime.rs | 8 | ||||
-rw-r--r-- | test_plugin/src/lib.rs | 2 |
6 files changed, 18 insertions, 16 deletions
diff --git a/core/bindings.rs b/core/bindings.rs index f086e1f9c..eaddcbfbc 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -9,7 +9,6 @@ use crate::OpResponse; use crate::OpTable; use crate::PromiseId; use crate::ZeroCopyBuf; -use futures::future::FutureExt; use rusty_v8 as v8; use serde::Serialize; use serde_v8::to_v8; @@ -433,7 +432,7 @@ fn send<'s>( } }; - let payload = OpPayload::new(scope, v); + let payload = OpPayload::new(scope, v, promise_id); let op = OpTable::route_op(op_id, state.op_state.clone(), payload, buf); match op { Op::Sync(resp) => match resp { @@ -445,13 +444,11 @@ fn send<'s>( } }, Op::Async(fut) => { - let fut2 = fut.map(move |resp| (promise_id, resp)); - state.pending_ops.push(fut2.boxed_local()); + state.pending_ops.push(fut); state.have_unpolled_ops = true; } Op::AsyncUnref(fut) => { - let fut2 = fut.map(move |resp| (promise_id, resp)); - state.pending_unref_ops.push(fut2.boxed_local()); + state.pending_unref_ops.push(fut); state.have_unpolled_ops = true; } Op::NotFound => { diff --git a/core/ops.rs b/core/ops.rs index caf984e00..53aec4ae4 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -21,7 +21,7 @@ use std::pin::Pin; use std::rc::Rc; pub type PromiseId = u64; -pub type OpAsyncFuture = Pin<Box<dyn Future<Output = OpResponse>>>; +pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>; pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static; pub type OpId = usize; @@ -29,16 +29,19 @@ pub type OpId = usize; pub struct OpPayload<'a, 'b, 'c> { pub(crate) scope: Option<&'a mut v8::HandleScope<'b>>, pub(crate) value: Option<v8::Local<'c, v8::Value>>, + pub(crate) promise_id: PromiseId, } impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> { pub fn new( scope: &'a mut v8::HandleScope<'b>, value: v8::Local<'c, v8::Value>, + promise_id: PromiseId, ) -> Self { Self { scope: Some(scope), value: Some(value), + promise_id, } } @@ -46,6 +49,7 @@ impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> { Self { scope: None, value: None, + promise_id: 0, } } diff --git a/core/ops_bin.rs b/core/ops_bin.rs index b19782449..c4c57f4b9 100644 --- a/core/ops_bin.rs +++ b/core/ops_bin.rs @@ -132,11 +132,11 @@ where p: OpPayload, b: Option<ZeroCopyBuf>| -> Op { + let pid = p.promise_id; let min_arg: u32 = p.deserialize().unwrap(); let fut = op_fn(state.clone(), min_arg, b) - .map(move |result| serialize_bin_result(result, state)); - let temp = Box::pin(fut); - Op::Async(temp) + .map(move |result| (pid, serialize_bin_result(result, state))); + Op::Async(Box::pin(fut)) }, ) } diff --git a/core/ops_json.rs b/core/ops_json.rs index 0efd44a90..3e2b532d0 100644 --- a/core/ops_json.rs +++ b/core/ops_json.rs @@ -85,12 +85,13 @@ where p: OpPayload, buf: Option<ZeroCopyBuf>| -> Result<Op, AnyError> { + let pid = p.promise_id; // Parse args let args = p.deserialize()?; use crate::futures::FutureExt; let fut = op_fn(state.clone(), args, buf) - .map(move |result| serialize_op_result(result, state)); + .map(move |result| (pid, serialize_op_result(result, state))); Ok(Op::Async(Box::pin(fut))) }; diff --git a/core/runtime.rs b/core/runtime.rs index 04c6ca1af..28f015fda 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -1506,7 +1506,7 @@ pub mod tests { Mode::Async => { let control: u8 = payload.deserialize().unwrap(); assert_eq!(control, 42); - let resp = OpResponse::Value(Box::new(43)); + let resp = (0, OpResponse::Value(Box::new(43))); Op::Async(Box::pin(futures::future::ready(resp))) } Mode::AsyncUnref => { @@ -1515,7 +1515,7 @@ pub mod tests { let fut = async { // This future never finish. futures::future::pending::<()>().await; - OpResponse::Value(Box::new(43)) + (0, OpResponse::Value(Box::new(43))) }; Op::AsyncUnref(Box::pin(fut)) } @@ -1526,7 +1526,7 @@ pub mod tests { } let resp = OpResponse::Value(Box::new(43)); - Op::Async(Box::pin(futures::future::ready(resp))) + Op::Async(Box::pin(futures::future::ready((0, resp)))) } } } @@ -1970,7 +1970,7 @@ pub mod tests { dispatch_count_.fetch_add(1, Ordering::Relaxed); let control: u8 = payload.deserialize().unwrap(); assert_eq!(control, 42); - let resp = OpResponse::Value(Box::new(43)); + let resp = (0, OpResponse::Value(Box::new(43))); Op::Async(Box::pin(futures::future::ready(resp))) }; diff --git a/test_plugin/src/lib.rs b/test_plugin/src/lib.rs index b84dcef48..51ed6d499 100644 --- a/test_plugin/src/lib.rs +++ b/test_plugin/src/lib.rs @@ -48,7 +48,7 @@ fn op_test_async( assert!(rx.await.is_ok()); let result = b"test"; let result_box: Box<[u8]> = Box::new(*result); - OpResponse::Buffer(result_box) + (0, OpResponse::Buffer(result_box)) }; Op::Async(fut.boxed()) |