From 9de8178c9b9bc21cc880ea81f163118408cc6c46 Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Wed, 22 Jan 2020 02:01:10 +0900 Subject: feat: add AsyncUnref ops (#3721) This is in order to support features like signal handlers, which shouldn't prevent the program from exiting. --- cli/ops/dispatch_json.rs | 10 ++++++++++ cli/state.rs | 8 ++++++++ 2 files changed, 18 insertions(+) (limited to 'cli') diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index de1f0a752..87cfff51d 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -14,6 +14,9 @@ pub type AsyncJsonOp = pub enum JsonOp { Sync(Value), Async(AsyncJsonOp), + /// AsyncUnref is the variation of Async, which doesn't block the program + /// exiting. + AsyncUnref(AsyncJsonOp), } fn json_err(err: ErrBox) -> Value { @@ -77,6 +80,13 @@ where }); CoreOp::Async(fut2.boxed()) } + Ok(JsonOp::AsyncUnref(fut)) => { + assert!(promise_id.is_some()); + let fut2 = fut.then(move |result| { + futures::future::ok(serialize_result(promise_id, result)) + }); + CoreOp::AsyncUnref(fut2.boxed()) + } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { diff --git a/cli/state.rs b/cli/state.rs index 02c258280..c07b3d5d0 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -110,6 +110,14 @@ impl ThreadSafeState { }); Op::Async(result_fut.boxed()) } + Op::AsyncUnref(fut) => { + let state = state.clone(); + let result_fut = fut.map_ok(move |buf: Buf| { + state.metrics_op_completed(buf.len()); + buf + }); + Op::AsyncUnref(result_fut.boxed()) + } } } } -- cgit v1.2.3