summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/compiler.rs75
-rw-r--r--cli/ops/dispatch_minimal.rs30
-rw-r--r--cli/ops/net.rs45
-rw-r--r--cli/ops/process.rs7
-rw-r--r--cli/ops/workers.rs24
5 files changed, 77 insertions, 104 deletions
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs
index 8bc42a92a..90df45b80 100644
--- a/cli/ops/compiler.rs
+++ b/cli/ops/compiler.rs
@@ -1,8 +1,6 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::futures::future::try_join_all;
-use crate::futures::future::FutureExt;
-use crate::futures::future::TryFutureExt;
use crate::msg;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
@@ -87,52 +85,39 @@ fn op_fetch_source_files(
let global_state = state.global_state.clone();
- let future = try_join_all(futures)
- .map_err(ErrBox::from)
- .and_then(move |files| {
- // We want to get an array of futures that resolves to
- let v: Vec<_> = files
- .into_iter()
- .map(|file| {
- // Special handling of Wasm files:
- // compile them into JS first!
- // This allows TS to do correct export types.
- if file.media_type == msg::MediaType::Wasm {
- return futures::future::Either::Left(
- global_state
- .wasm_compiler
- .compile_async(global_state.clone(), &file)
- .and_then(|compiled_mod| {
- futures::future::ok((file, Some(compiled_mod.code)))
- }),
- );
+ let future = Box::pin(async move {
+ let files = try_join_all(futures).await?;
+
+ // We want to get an array of futures that resolves to
+ let v = files.into_iter().map(|file| {
+ async {
+ // Special handling of Wasm files:
+ // compile them into JS first!
+ // This allows TS to do correct export types.
+ let source_code = match file.media_type {
+ msg::MediaType::Wasm => {
+ global_state
+ .wasm_compiler
+ .compile_async(global_state.clone(), &file)
+ .await?
+ .code
}
- futures::future::Either::Right(futures::future::ok((file, None)))
- })
- .collect();
- try_join_all(v)
- })
- .and_then(move |files_with_code| {
- let res = files_with_code
- .into_iter()
- .map(|(file, maybe_code)| {
- json!({
- "url": file.url.to_string(),
- "filename": file.filename.to_str().unwrap(),
- "mediaType": file.media_type as i32,
- "sourceCode": if let Some(code) = maybe_code {
- code
- } else {
- String::from_utf8(file.source_code).unwrap()
- },
- })
- })
- .collect();
-
- futures::future::ok(res)
+ _ => String::from_utf8(file.source_code).unwrap(),
+ };
+ Ok::<_, ErrBox>(json!({
+ "url": file.url.to_string(),
+ "filename": file.filename.to_str().unwrap(),
+ "mediaType": file.media_type as i32,
+ "sourceCode": source_code,
+ }))
+ }
});
- Ok(JsonOp::Async(future.boxed()))
+ let v = try_join_all(v).await?;
+ Ok(v.into())
+ });
+
+ Ok(JsonOp::Async(future))
}
#[derive(Deserialize)]
diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs
index 22d6697e5..54bf52679 100644
--- a/cli/ops/dispatch_minimal.rs
+++ b/cli/ops/dispatch_minimal.rs
@@ -138,21 +138,23 @@ where
let min_op = d(rid, zero_copy);
// Convert to CoreOp
- let fut = Box::new(min_op.then(move |result| match result {
- Ok(r) => {
- record.result = r;
- futures::future::ok(record.into())
+ let fut = async move {
+ match min_op.await {
+ Ok(r) => {
+ record.result = r;
+ Ok(record.into())
+ }
+ Err(err) => {
+ let error_record = ErrorRecord {
+ promise_id: record.promise_id,
+ arg: -1,
+ error_code: err.kind() as i32,
+ error_message: err.to_string().as_bytes().to_owned(),
+ };
+ Ok(error_record.into())
+ }
}
- Err(err) => {
- let error_record = ErrorRecord {
- promise_id: record.promise_id,
- arg: -1,
- error_code: err.kind() as i32,
- error_message: err.to_string().as_bytes().to_owned(),
- };
- futures::future::ok(error_record.into())
- }
- }));
+ };
if is_sync {
// Warning! Possible deadlocks can occur if we try to wait for a future
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 01fb65b1f..bf1e56dc9 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -8,7 +8,6 @@ use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
use std;
use std::convert::From;
use std::future::Future;
@@ -39,18 +38,18 @@ pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
Accept {
accept_state: AcceptState::Pending,
rid,
- state: state.clone(),
+ state,
}
}
/// A future representing state of accepting a TCP connection.
-pub struct Accept {
+pub struct Accept<'a> {
accept_state: AcceptState,
rid: ResourceId,
- state: ThreadSafeState,
+ state: &'a ThreadSafeState,
}
-impl Future for Accept {
+impl Future for Accept<'_> {
type Output = Result<(TcpStream, SocketAddr), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -109,29 +108,19 @@ fn op_accept(
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
- let op = accept(state, rid)
- .and_then(move |(tcp_stream, _socket_addr)| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let mut table = state_.lock_resource_table();
- let rid =
- table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
- futures::future::ok((rid, local_addr, remote_addr))
- })
- .map_err(ErrBox::from)
- .and_then(move |(rid, local_addr, remote_addr)| {
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- });
+ let op = async move {
+ let (tcp_stream, _socket_addr) = accept(&state_, rid).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut table = state_.lock_resource_table();
+ let rid =
+ table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index 92474cc4f..80ce4b1ac 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -220,7 +220,8 @@ fn op_run_status(
state: state.clone(),
};
- let future = future.and_then(move |run_status| {
+ let future = async move {
+ let run_status = future.await?;
let code = run_status.code();
#[cfg(unix)]
@@ -233,12 +234,12 @@ fn op_run_status(
.expect("Should have either an exit code or a signal.");
let got_signal = signal.is_some();
- futures::future::ok(json!({
+ Ok(json!({
"gotSignal": got_signal,
"exitCode": code.unwrap_or(-1),
"exitSignal": signal.unwrap_or(-1),
}))
- });
+ };
let pool = futures::executor::ThreadPool::new().unwrap();
let handle = pool.spawn_with_handle(future).unwrap();
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 48b8deb15..00043ce77 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -77,13 +77,11 @@ fn op_worker_get_message(
state: state.clone(),
};
- let op = op.then(move |maybe_buf| {
+ let op = async move {
+ let maybe_buf = op.await;
debug!("op_worker_get_message");
-
- futures::future::ok(json!({
- "data": maybe_buf.map(|buf| buf)
- }))
- });
+ Ok(json!({ "data": maybe_buf }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
@@ -255,14 +253,12 @@ fn op_host_get_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
- let op = worker
- .get_message()
- .map_err(move |_| -> ErrBox { unimplemented!() })
- .and_then(move |maybe_buf| {
- futures::future::ok(json!({
- "data": maybe_buf.map(|buf| buf)
- }))
- });
+ let fut = worker.get_message();
+
+ let op = async move {
+ let maybe_buf = fut.await.unwrap();
+ Ok(json!({ "data": maybe_buf }))
+ };
Ok(JsonOp::Async(op.boxed()))
}