diff options
-rw-r--r-- | cli/ops.rs | 65 | ||||
-rw-r--r-- | cli/tokio_util.rs | 27 |
2 files changed, 58 insertions, 34 deletions
diff --git a/cli/ops.rs b/cli/ops.rs index eeb6a10c7..1f208bbe2 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -163,6 +163,14 @@ pub fn dispatch_all_legacy( ); if base.sync() { + // TODO(ry) This is not correct! If the sync op is not actually synchronous + // (like in the case of op_fetch_module_meta_data) this wait() will block + // a thread in the Tokio runtime. Depending on the size of the runtime's + // thread pool, this may result in a dead lock! + // + // The solution is that ops should return an Op directly. Op::Sync contains + // the result value, so if its returned directly from the OpCreator, we + // know it has actually be evaluated synchronously. Op::Sync(fut.wait().unwrap()) } else { Op::Async(fut) @@ -455,30 +463,39 @@ fn op_fetch_module_meta_data( let use_cache = !state.flags.reload; let no_fetch = state.flags.no_fetch; - Box::new(futures::future::result(|| -> OpResult { - let builder = &mut FlatBufferBuilder::new(); - // TODO(ry) Use fetch_module_meta_data_async. - let out = state - .dir - .fetch_module_meta_data(specifier, referrer, use_cache, no_fetch)?; - let data_off = builder.create_vector(out.source_code.as_slice()); - let msg_args = msg::FetchModuleMetaDataResArgs { - module_name: Some(builder.create_string(&out.module_name)), - filename: Some(builder.create_string(&out.filename)), - media_type: out.media_type, - data: Some(data_off), - }; - let inner = msg::FetchModuleMetaDataRes::create(builder, &msg_args); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::FetchModuleMetaDataRes, - ..Default::default() - }, - )) - }())) + let fut = state + .dir + .fetch_module_meta_data_async(specifier, referrer, use_cache, no_fetch) + .and_then(move |out| { + let builder = &mut FlatBufferBuilder::new(); + let data_off = builder.create_vector(out.source_code.as_slice()); + let msg_args = msg::FetchModuleMetaDataResArgs { + module_name: Some(builder.create_string(&out.module_name)), + filename: Some(builder.create_string(&out.filename)), + media_type: out.media_type, + data: Some(data_off), + }; + let inner = msg::FetchModuleMetaDataRes::create(builder, &msg_args); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::FetchModuleMetaDataRes, + ..Default::default() + }, + )) + }); + + // Unfortunately TypeScript's CompilerHost interface does not leave room for + // asynchronous source code fetching. This complicates things greatly and + // requires us to use tokio_util::block_on() below. + assert!(base.sync()); + + // WARNING: Here we use tokio_util::block_on() which starts a new Tokio + // runtime for executing the future. This is so we don't inadvernently run + // out of threads in the main runtime. + Box::new(futures::future::result(tokio_util::block_on(fut))) } /// Retrieve any relevant compiler configuration. diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index fa42846cb..9204b2ae1 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -11,14 +11,8 @@ use tokio::net::TcpStream; use tokio::runtime; pub fn create_threadpool_runtime() -> tokio::runtime::Runtime { - // This code can be simplified once the following PR is landed and - // released: https://github.com/tokio-rs/tokio/pull/1055 - use tokio_threadpool::Builder as ThreadPoolBuilder; - let mut threadpool_builder = ThreadPoolBuilder::new(); - threadpool_builder.panic_handler(|err| std::panic::resume_unwind(err)); - #[allow(deprecated)] runtime::Builder::new() - .threadpool_builder(threadpool_builder) + .panic_handler(|err| std::panic::resume_unwind(err)) .build() .unwrap() } @@ -32,15 +26,28 @@ where rt.block_on_all(future).unwrap(); } +/// THIS IS A HACK AND SHOULD BE AVOIDED. +/// +/// This creates a new tokio runtime, with many new threads, to execute the +/// given future. This is useful when we want to block the main runtime to +/// resolve a future without worrying that we'll us up all the threads in the +/// main runtime. pub fn block_on<F, R, E>(future: F) -> Result<R, E> where F: Send + 'static + Future<Item = R, Error = E>, R: Send + 'static, E: Send + 'static, { - let (tx, rx) = futures::sync::oneshot::channel(); - tokio::spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - rx.wait().unwrap() + use std::sync::mpsc::channel; + use std::thread; + let (sender, receiver) = channel(); + // Create a new runtime to evaluate the future asynchronously. + thread::spawn(move || { + let mut rt = create_threadpool_runtime(); + let r = rt.block_on(future); + sender.send(r).unwrap(); + }); + receiver.recv().unwrap() } // Set the default executor so we can use tokio::spawn(). It's difficult to |