diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/compiler.rs | 75 | ||||
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 30 | ||||
-rw-r--r-- | cli/ops/net.rs | 45 | ||||
-rw-r--r-- | cli/ops/process.rs | 7 | ||||
-rw-r--r-- | cli/ops/workers.rs | 24 |
5 files changed, 77 insertions, 104 deletions
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 8bc42a92a..90df45b80 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -1,8 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::futures::future::try_join_all; -use crate::futures::future::FutureExt; -use crate::futures::future::TryFutureExt; use crate::msg; use crate::ops::json_op; use crate::state::ThreadSafeState; @@ -87,52 +85,39 @@ fn op_fetch_source_files( let global_state = state.global_state.clone(); - let future = try_join_all(futures) - .map_err(ErrBox::from) - .and_then(move |files| { - // We want to get an array of futures that resolves to - let v: Vec<_> = files - .into_iter() - .map(|file| { - // Special handling of Wasm files: - // compile them into JS first! - // This allows TS to do correct export types. - if file.media_type == msg::MediaType::Wasm { - return futures::future::Either::Left( - global_state - .wasm_compiler - .compile_async(global_state.clone(), &file) - .and_then(|compiled_mod| { - futures::future::ok((file, Some(compiled_mod.code))) - }), - ); + let future = Box::pin(async move { + let files = try_join_all(futures).await?; + + // We want to get an array of futures that resolves to + let v = files.into_iter().map(|file| { + async { + // Special handling of Wasm files: + // compile them into JS first! + // This allows TS to do correct export types. + let source_code = match file.media_type { + msg::MediaType::Wasm => { + global_state + .wasm_compiler + .compile_async(global_state.clone(), &file) + .await? + .code } - futures::future::Either::Right(futures::future::ok((file, None))) - }) - .collect(); - try_join_all(v) - }) - .and_then(move |files_with_code| { - let res = files_with_code - .into_iter() - .map(|(file, maybe_code)| { - json!({ - "url": file.url.to_string(), - "filename": file.filename.to_str().unwrap(), - "mediaType": file.media_type as i32, - "sourceCode": if let Some(code) = maybe_code { - code - } else { - String::from_utf8(file.source_code).unwrap() - }, - }) - }) - .collect(); - - futures::future::ok(res) + _ => String::from_utf8(file.source_code).unwrap(), + }; + Ok::<_, ErrBox>(json!({ + "url": file.url.to_string(), + "filename": file.filename.to_str().unwrap(), + "mediaType": file.media_type as i32, + "sourceCode": source_code, + })) + } }); - Ok(JsonOp::Async(future.boxed())) + let v = try_join_all(v).await?; + Ok(v.into()) + }); + + Ok(JsonOp::Async(future)) } #[derive(Deserialize)] diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 22d6697e5..54bf52679 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -138,21 +138,23 @@ where let min_op = d(rid, zero_copy); // Convert to CoreOp - let fut = Box::new(min_op.then(move |result| match result { - Ok(r) => { - record.result = r; - futures::future::ok(record.into()) + let fut = async move { + match min_op.await { + Ok(r) => { + record.result = r; + Ok(record.into()) + } + Err(err) => { + let error_record = ErrorRecord { + promise_id: record.promise_id, + arg: -1, + error_code: err.kind() as i32, + error_message: err.to_string().as_bytes().to_owned(), + }; + Ok(error_record.into()) + } } - Err(err) => { - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_code: err.kind() as i32, - error_message: err.to_string().as_bytes().to_owned(), - }; - futures::future::ok(error_record.into()) - } - })); + }; if is_sync { // Warning! Possible deadlocks can occur if we try to wait for a future diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 01fb65b1f..bf1e56dc9 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -8,7 +8,6 @@ use crate::state::ThreadSafeState; use deno::Resource; use deno::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; use std; use std::convert::From; use std::future::Future; @@ -39,18 +38,18 @@ pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { Accept { accept_state: AcceptState::Pending, rid, - state: state.clone(), + state, } } /// A future representing state of accepting a TCP connection. -pub struct Accept { +pub struct Accept<'a> { accept_state: AcceptState, rid: ResourceId, - state: ThreadSafeState, + state: &'a ThreadSafeState, } -impl Future for Accept { +impl Future for Accept<'_> { type Output = Result<(TcpStream, SocketAddr), ErrBox>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { @@ -109,29 +108,19 @@ fn op_accept( .get::<TcpListenerResource>(rid) .ok_or_else(bad_resource)?; - let op = accept(state, rid) - .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = match tcp_stream.local_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let remote_addr = match tcp_stream.peer_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let mut table = state_.lock_resource_table(); - let rid = - table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - futures::future::ok((rid, local_addr, remote_addr)) - }) - .map_err(ErrBox::from) - .and_then(move |(rid, local_addr, remote_addr)| { - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }); + let op = async move { + let (tcp_stream, _socket_addr) = accept(&state_, rid).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut table = state_.lock_resource_table(); + let rid = + table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + Ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }; Ok(JsonOp::Async(op.boxed())) } diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 92474cc4f..80ce4b1ac 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -220,7 +220,8 @@ fn op_run_status( state: state.clone(), }; - let future = future.and_then(move |run_status| { + let future = async move { + let run_status = future.await?; let code = run_status.code(); #[cfg(unix)] @@ -233,12 +234,12 @@ fn op_run_status( .expect("Should have either an exit code or a signal."); let got_signal = signal.is_some(); - futures::future::ok(json!({ + Ok(json!({ "gotSignal": got_signal, "exitCode": code.unwrap_or(-1), "exitSignal": signal.unwrap_or(-1), })) - }); + }; let pool = futures::executor::ThreadPool::new().unwrap(); let handle = pool.spawn_with_handle(future).unwrap(); diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 48b8deb15..00043ce77 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -77,13 +77,11 @@ fn op_worker_get_message( state: state.clone(), }; - let op = op.then(move |maybe_buf| { + let op = async move { + let maybe_buf = op.await; debug!("op_worker_get_message"); - - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf) - })) - }); + Ok(json!({ "data": maybe_buf })) + }; Ok(JsonOp::Async(op.boxed())) } @@ -255,14 +253,12 @@ fn op_host_get_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let op = worker - .get_message() - .map_err(move |_| -> ErrBox { unimplemented!() }) - .and_then(move |maybe_buf| { - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf) - })) - }); + let fut = worker.get_message(); + + let op = async move { + let maybe_buf = fut.await.unwrap(); + Ok(json!({ "data": maybe_buf })) + }; Ok(JsonOp::Async(op.boxed())) } |