summaryrefslogtreecommitdiff
path: root/cli/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/state.rs')
-rw-r--r--cli/state.rs24
1 files changed, 13 insertions, 11 deletions
diff --git a/cli/state.rs b/cli/state.rs
index a5e9546b0..245919e7f 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -17,13 +17,16 @@ use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
use deno::ResourceTable;
-use futures::Future;
+use futures::channel::mpsc;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use rand::rngs::StdRng;
use rand::SeedableRng;
use serde_json::Value;
use std;
use std::collections::HashMap;
use std::ops::Deref;
+use std::pin::Pin;
use std::str;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -31,7 +34,6 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;
-use tokio::sync::mpsc;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@@ -101,11 +103,11 @@ impl ThreadSafeState {
}
Op::Async(fut) => {
let state = state.clone();
- let result_fut = Box::new(fut.map(move |buf: Buf| {
+ let result_fut = fut.map_ok(move |buf: Buf| {
state.clone().metrics_op_completed(buf.len());
buf
- }));
- Op::Async(result_fut)
+ });
+ Op::Async(result_fut.boxed())
}
}
}
@@ -115,13 +117,13 @@ impl ThreadSafeState {
pub fn stateful_minimal_op<D>(
&self,
dispatcher: D,
- ) -> impl Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>
+ ) -> impl Fn(i32, Option<PinnedBuf>) -> Pin<Box<MinimalOp>>
where
- D: Fn(&ThreadSafeState, i32, Option<PinnedBuf>) -> Box<MinimalOp>,
+ D: Fn(&ThreadSafeState, i32, Option<PinnedBuf>) -> Pin<Box<MinimalOp>>,
{
let state = self.clone();
- move |rid: i32, zero_copy: Option<PinnedBuf>| -> Box<MinimalOp> {
+ move |rid: i32, zero_copy: Option<PinnedBuf>| -> Pin<Box<MinimalOp>> {
dispatcher(&state, rid, zero_copy)
}
}
@@ -176,13 +178,13 @@ impl Loader for ThreadSafeState {
fn load(
&self,
module_specifier: &ModuleSpecifier,
- ) -> Box<deno::SourceCodeInfoFuture> {
+ ) -> Pin<Box<deno::SourceCodeInfoFuture>> {
self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst);
let module_url_specified = module_specifier.to_string();
let fut = self
.global_state
.fetch_compiled_module(module_specifier)
- .map(|compiled_module| deno::SourceCodeInfo {
+ .map_ok(|compiled_module| deno::SourceCodeInfo {
// Real module name, might be different from initial specifier
// due to redirections.
code: compiled_module.code,
@@ -190,7 +192,7 @@ impl Loader for ThreadSafeState {
module_url_found: compiled_module.name,
});
- Box::new(fut)
+ fut.boxed()
}
}