diff options
Diffstat (limited to 'cli/ops/dispatch_minimal.rs')
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 50 |
1 files changed, 31 insertions, 19 deletions
diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 2dd4db9ef..299462ca0 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -14,7 +14,10 @@ use futures::future::FutureExt; use std::future::Future; use std::pin::Pin; -pub type MinimalOp = dyn Future<Output = Result<i32, OpError>>; +pub enum MinimalOp { + Sync(Result<i32, OpError>), + Async(Pin<Box<dyn Future<Output = Result<i32, OpError>>>>), +} #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. @@ -113,7 +116,7 @@ fn test_parse_min_record() { pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp where - D: Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, + D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp, { move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| { let mut record = match parse_min_record(control) { @@ -131,14 +134,13 @@ where }; let is_sync = record.promise_id == 0; let rid = record.arg; - let min_op = d(rid, zero_copy); + let min_op = d(is_sync, rid, zero_copy); - // Convert to CoreOp - let fut = async move { - match min_op.await { + match min_op { + MinimalOp::Sync(sync_result) => Op::Sync(match sync_result { Ok(r) => { record.result = r; - Ok(record.into()) + record.into() } Err(err) => { let error_record = ErrorRecord { @@ -147,20 +149,30 @@ where error_code: err.kind as i32, error_message: err.msg.as_bytes().to_owned(), }; - Ok(error_record.into()) + error_record.into() } + }), + MinimalOp::Async(min_fut) => { + // Convert to CoreOp + let core_fut = async move { + match min_fut.await { + Ok(r) => { + record.result = r; + Ok(record.into()) + } + Err(err) => { + let error_record = ErrorRecord { + promise_id: record.promise_id, + arg: -1, + error_code: err.kind as i32, + error_message: err.msg.as_bytes().to_owned(), + }; + Ok(error_record.into()) + } + } + }; + Op::Async(core_fut.boxed_local()) } - }; - - if is_sync { - // Warning! Possible deadlocks can occur if we try to wait for a future - // while in a future. The safe but expensive alternative is to use - // tokio_util::block_on. - // This block is only exercised for readSync and writeSync, which I think - // works since they're simple polling futures. - Op::Sync(futures::executor::block_on(fut).unwrap()) - } else { - Op::Async(fut.boxed_local()) } } } |