diff options
Diffstat (limited to 'cli')
-rw-r--r-- | cli/compilers/js.rs | 2 | ||||
-rw-r--r-- | cli/compilers/json.rs | 2 | ||||
-rw-r--r-- | cli/compilers/ts.rs | 22 | ||||
-rw-r--r-- | cli/compilers/wasm.rs | 84 | ||||
-rw-r--r-- | cli/disk_cache.rs | 10 | ||||
-rw-r--r-- | cli/file_fetcher.rs | 48 | ||||
-rw-r--r-- | cli/global_state.rs | 2 | ||||
-rw-r--r-- | cli/ops/compiler.rs | 75 | ||||
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 30 | ||||
-rw-r--r-- | cli/ops/net.rs | 45 | ||||
-rw-r--r-- | cli/ops/process.rs | 7 | ||||
-rw-r--r-- | cli/ops/workers.rs | 24 | ||||
-rw-r--r-- | cli/state.rs | 2 | ||||
-rw-r--r-- | cli/worker.rs | 4 |
14 files changed, 160 insertions, 197 deletions
diff --git a/cli/compilers/js.rs b/cli/compilers/js.rs index 056c71ef1..8f0fcdde4 100644 --- a/cli/compilers/js.rs +++ b/cli/compilers/js.rs @@ -10,7 +10,7 @@ pub struct JsCompiler {} impl JsCompiler { pub fn compile_async( - self: &Self, + &self, source_file: &SourceFile, ) -> Pin<Box<CompiledModuleFuture>> { let module = CompiledModule { diff --git a/cli/compilers/json.rs b/cli/compilers/json.rs index 1152b27b3..699355ff5 100644 --- a/cli/compilers/json.rs +++ b/cli/compilers/json.rs @@ -15,7 +15,7 @@ pub struct JsonCompiler {} impl JsonCompiler { pub fn compile_async( - self: &Self, + &self, source_file: &SourceFile, ) -> Pin<Box<CompiledModuleFuture>> { let maybe_json_value: serde_json::Result<serde_json::Value> = diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index e97be8fe8..d3a08da3c 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -142,7 +142,7 @@ impl CompiledFileMetadata { None } - pub fn to_json_string(self: &Self) -> Result<String, serde_json::Error> { + pub fn to_json_string(&self) -> Result<String, serde_json::Error> { let mut value_map = serde_json::map::Map::new(); value_map.insert(SOURCE_PATH.to_owned(), json!(&self.source_path)); @@ -246,7 +246,7 @@ impl TsCompiler { } pub fn bundle_async( - self: &Self, + &self, global_state: ThreadSafeGlobalState, module_name: String, out_file: Option<String>, @@ -305,7 +305,7 @@ impl TsCompiler { /// /// If compilation is required then new V8 worker is spawned with fresh TS compiler. pub fn compile_async( - self: &Self, + &self, global_state: ThreadSafeGlobalState, source_file: &SourceFile, ) -> Pin<Box<CompiledModuleFuture>> { @@ -389,7 +389,7 @@ impl TsCompiler { } /// Get associated `CompiledFileMetadata` for given module if it exists. - pub fn get_metadata(self: &Self, url: &Url) -> Option<CompiledFileMetadata> { + pub fn get_metadata(&self, url: &Url) -> Option<CompiledFileMetadata> { // Try to load cached version: // 1. check if there's 'meta' file let cache_key = self @@ -409,7 +409,7 @@ impl TsCompiler { } pub fn get_compiled_module( - self: &Self, + &self, module_url: &Url, ) -> Result<CompiledModule, ErrBox> { let compiled_source_file = self.get_compiled_source_file(module_url)?; @@ -428,7 +428,7 @@ impl TsCompiler { // TODO: ideally we shouldn't construct SourceFile by hand, but it should be delegated to // SourceFileFetcher pub fn get_compiled_source_file( - self: &Self, + &self, module_url: &Url, ) -> Result<SourceFile, ErrBox> { let cache_key = self @@ -453,7 +453,7 @@ impl TsCompiler { /// Along compiled file a special metadata file is saved as well containing /// hash that can be validated to avoid unnecessary recompilation. fn cache_compiled_file( - self: &Self, + &self, module_specifier: &ModuleSpecifier, contents: &str, ) -> std::io::Result<()> { @@ -495,7 +495,7 @@ impl TsCompiler { // TODO: ideally we shouldn't construct SourceFile by hand, but it should be delegated to // SourceFileFetcher pub fn get_source_map_file( - self: &Self, + &self, module_specifier: &ModuleSpecifier, ) -> Result<SourceFile, ErrBox> { let cache_key = self @@ -517,7 +517,7 @@ impl TsCompiler { /// Save source map file for given TS module to on-disk cache. fn cache_source_map( - self: &Self, + &self, module_specifier: &ModuleSpecifier, contents: &str, ) -> std::io::Result<()> { @@ -529,7 +529,7 @@ impl TsCompiler { /// This method is called by TS compiler via an "op". pub fn cache_compiler_output( - self: &Self, + &self, module_specifier: &ModuleSpecifier, extension: &str, contents: &str, @@ -564,7 +564,7 @@ impl SourceMapGetter for TsCompiler { // `SourceMapGetter` related methods impl TsCompiler { - fn try_to_resolve(self: &Self, script_name: &str) -> Option<ModuleSpecifier> { + fn try_to_resolve(&self, script_name: &str) -> Option<ModuleSpecifier> { // if `script_name` can't be resolved to ModuleSpecifier it's probably internal // script (like `gen/cli/bundle/compiler.js`) so we won't be // able to get source for it anyway diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index dbd951b03..e3297283b 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -6,9 +6,7 @@ use crate::global_state::ThreadSafeGlobalState; use crate::startup_data; use crate::state::*; use crate::worker::Worker; -use deno::Buf; use futures::FutureExt; -use futures::TryFutureExt; use serde_derive::Deserialize; use serde_json; use std::collections::HashMap; @@ -69,7 +67,7 @@ impl WasmCompiler { } pub fn compile_async( - self: &Self, + &self, global_state: ThreadSafeGlobalState, source_file: &SourceFile, ) -> Pin<Box<CompiledModuleFuture>> { @@ -86,47 +84,45 @@ impl WasmCompiler { let worker_ = worker.clone(); let url = source_file.url.clone(); - let fut = worker - .post_message( - serde_json::to_string(&base64_data) - .unwrap() - .into_boxed_str() - .into_boxed_bytes(), - ) - .then(|_| worker) - .then(move |result| { - if let Err(err) = result { - // TODO(ry) Need to forward the error instead of exiting. - eprintln!("{}", err.to_string()); - std::process::exit(1); - } - debug!("Sent message to worker"); - worker_.get_message() - }) - .map_err(|_| panic!("not handled")) - .and_then(move |maybe_msg: Option<Buf>| { - debug!("Received message from worker"); - let json_msg = maybe_msg.unwrap(); - let module_info: WasmModuleInfo = - serde_json::from_slice(&json_msg).unwrap(); - debug!("WASM module info: {:#?}", &module_info); - let code = wrap_wasm_code( - &base64_data, - &module_info.import_list, - &module_info.export_list, - ); - debug!("Generated code: {}", &code); - let module = CompiledModule { - code, - name: url.to_string(), - }; - { - cache_.lock().unwrap().insert(url.clone(), module.clone()); - } - debug!("<<<<< wasm_compile_async END"); - futures::future::ok(module) - }); - fut.boxed() + Box::pin(async move { + let _ = worker + .post_message( + serde_json::to_string(&base64_data) + .unwrap() + .into_boxed_str() + .into_boxed_bytes(), + ) + .await; + + if let Err(err) = worker.await { + // TODO(ry) Need to forward the error instead of exiting. + eprintln!("{}", err.to_string()); + std::process::exit(1); + } + debug!("Sent message to worker"); + let maybe_msg = worker_.get_message().await.expect("not handled"); + + debug!("Received message from worker"); + let json_msg = maybe_msg.unwrap(); + let module_info: WasmModuleInfo = + serde_json::from_slice(&json_msg).unwrap(); + debug!("WASM module info: {:#?}", &module_info); + let code = wrap_wasm_code( + &base64_data, + &module_info.import_list, + &module_info.export_list, + ); + debug!("Generated code: {}", &code); + let module = CompiledModule { + code, + name: url.to_string(), + }; + { + cache_.lock().unwrap().insert(url.clone(), module.clone()); + } + debug!("<<<<< wasm_compile_async END"); + Ok(module) + }) } } diff --git a/cli/disk_cache.rs b/cli/disk_cache.rs index 975a31f45..a6689c6ec 100644 --- a/cli/disk_cache.rs +++ b/cli/disk_cache.rs @@ -21,7 +21,7 @@ impl DiskCache { } } - pub fn get_cache_filename(self: &Self, url: &Url) -> PathBuf { + pub fn get_cache_filename(&self, url: &Url) -> PathBuf { let mut out = PathBuf::new(); let scheme = url.scheme(); @@ -83,7 +83,7 @@ impl DiskCache { } pub fn get_cache_filename_with_extension( - self: &Self, + &self, url: &Url, extension: &str, ) -> PathBuf { @@ -99,12 +99,12 @@ impl DiskCache { } } - pub fn get(self: &Self, filename: &Path) -> std::io::Result<Vec<u8>> { + pub fn get(&self, filename: &Path) -> std::io::Result<Vec<u8>> { let path = self.location.join(filename); fs::read(&path) } - pub fn set(self: &Self, filename: &Path, data: &[u8]) -> std::io::Result<()> { + pub fn set(&self, filename: &Path, data: &[u8]) -> std::io::Result<()> { let path = self.location.join(filename); match path.parent() { Some(ref parent) => fs::create_dir_all(parent), @@ -113,7 +113,7 @@ impl DiskCache { deno_fs::write_file(&path, data, 0o666) } - pub fn remove(self: &Self, filename: &Path) -> std::io::Result<()> { + pub fn remove(&self, filename: &Path) -> std::io::Result<()> { let path = self.location.join(filename); fs::remove_file(path) } diff --git a/cli/file_fetcher.rs b/cli/file_fetcher.rs index 085faada2..bb1c2f651 100644 --- a/cli/file_fetcher.rs +++ b/cli/file_fetcher.rs @@ -12,7 +12,6 @@ use deno::ErrBox; use deno::ModuleSpecifier; use futures::future::Either; use futures::future::FutureExt; -use futures::future::TryFutureExt; use serde_json; use std; use std::collections::HashMap; @@ -115,7 +114,7 @@ impl SourceFileFetcher { /// Required for TS compiler and source maps. pub fn fetch_cached_source_file( - self: &Self, + &self, specifier: &ModuleSpecifier, ) -> Option<SourceFile> { let maybe_source_file = self.source_file_cache.get(specifier.to_string()); @@ -211,7 +210,7 @@ impl SourceFileFetcher { /// If `cached_only` is true then this method will fail for remote files /// not already cached. fn get_source_file_async( - self: &Self, + &self, module_url: &Url, use_disk_cache: bool, no_remote: bool, @@ -261,10 +260,7 @@ impl SourceFileFetcher { } /// Fetch local source file. - fn fetch_local_file( - self: &Self, - module_url: &Url, - ) -> Result<SourceFile, ErrBox> { + fn fetch_local_file(&self, module_url: &Url) -> Result<SourceFile, ErrBox> { let filepath = module_url.to_file_path().map_err(|()| { ErrBox::from(DenoError::new( ErrorKind::InvalidPath, @@ -299,7 +295,7 @@ impl SourceFileFetcher { /// that user provides, and the final module_name is the resolved path /// after following all redirections. fn fetch_cached_remote_source( - self: &Self, + &self, module_url: &Url, ) -> Result<Option<SourceFile>, ErrBox> { let source_code_headers = self.get_source_code_headers(&module_url); @@ -351,7 +347,7 @@ impl SourceFileFetcher { /// Asynchronously fetch remote source file specified by the URL following redirects. fn fetch_remote_source_async( - self: &Self, + &self, module_url: &Url, use_disk_cache: bool, cached_only: bool, @@ -399,8 +395,8 @@ impl SourceFileFetcher { let module_url = module_url.clone(); // Single pass fetch, either yields code or yields redirect. - let f = http_util::fetch_string_once(&module_url).and_then(move |r| { - match r { + let f = async move { + match http_util::fetch_string_once(&module_url).await? { FetchOnceResult::Redirect(new_module_url) => { // If redirects, update module_name and filename for next looped call. dir @@ -415,12 +411,14 @@ impl SourceFileFetcher { drop(download_job); // Recurse - Either::Left(dir.fetch_remote_source_async( - &new_module_url, - use_disk_cache, - cached_only, - redirect_limit - 1, - )) + dir + .fetch_remote_source_async( + &new_module_url, + use_disk_cache, + cached_only, + redirect_limit - 1, + ) + .await } FetchOnceResult::Code(source, maybe_content_type) => { // We land on the code. @@ -454,10 +452,10 @@ impl SourceFileFetcher { // Explicit drop to keep reference alive until future completes. drop(download_job); - Either::Right(futures::future::ok(source_file)) + Ok(source_file) } } - }); + }; f.boxed() } @@ -467,7 +465,7 @@ impl SourceFileFetcher { /// NOTE: chances are that the source file was downloaded due to redirects. /// In this case, the headers file provides info about where we should go and get /// the file that redirect eventually points to. - fn get_source_code_headers(self: &Self, url: &Url) -> SourceCodeHeaders { + fn get_source_code_headers(&self, url: &Url) -> SourceCodeHeaders { let cache_key = self .deps_cache .get_cache_filename_with_extension(url, "headers.json"); @@ -482,11 +480,7 @@ impl SourceFileFetcher { } /// Save contents of downloaded remote file in on-disk cache for subsequent access. - fn save_source_code( - self: &Self, - url: &Url, - source: &str, - ) -> std::io::Result<()> { + fn save_source_code(&self, url: &Url, source: &str) -> std::io::Result<()> { let cache_key = self.deps_cache.get_cache_filename(url); // May not exist. DON'T unwrap. @@ -503,7 +497,7 @@ impl SourceFileFetcher { /// /// If nothing needs to be saved, the headers file is not created. fn save_source_code_headers( - self: &Self, + &self, url: &Url, mime_type: Option<String>, redirect_to: Option<String>, @@ -667,7 +661,7 @@ impl SourceCodeHeaders { // TODO: remove this nonsense `cache_filename` param, this should be // done when instantiating SourceCodeHeaders pub fn to_json_string( - self: &Self, + &self, cache_filename: &Path, ) -> Result<Option<String>, serde_json::Error> { // TODO(kevinkassimo): consider introduce serde::Deserialize to make things simpler. diff --git a/cli/global_state.rs b/cli/global_state.rs index d3f29c999..0f7416e98 100644 --- a/cli/global_state.rs +++ b/cli/global_state.rs @@ -205,7 +205,7 @@ impl ThreadSafeGlobalState { } pub fn check_dyn_import( - self: &Self, + &self, module_specifier: &ModuleSpecifier, ) -> Result<(), ErrBox> { let u = module_specifier.as_url(); diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 8bc42a92a..90df45b80 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -1,8 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::futures::future::try_join_all; -use crate::futures::future::FutureExt; -use crate::futures::future::TryFutureExt; use crate::msg; use crate::ops::json_op; use crate::state::ThreadSafeState; @@ -87,52 +85,39 @@ fn op_fetch_source_files( let global_state = state.global_state.clone(); - let future = try_join_all(futures) - .map_err(ErrBox::from) - .and_then(move |files| { - // We want to get an array of futures that resolves to - let v: Vec<_> = files - .into_iter() - .map(|file| { - // Special handling of Wasm files: - // compile them into JS first! - // This allows TS to do correct export types. - if file.media_type == msg::MediaType::Wasm { - return futures::future::Either::Left( - global_state - .wasm_compiler - .compile_async(global_state.clone(), &file) - .and_then(|compiled_mod| { - futures::future::ok((file, Some(compiled_mod.code))) - }), - ); + let future = Box::pin(async move { + let files = try_join_all(futures).await?; + + // We want to get an array of futures that resolves to + let v = files.into_iter().map(|file| { + async { + // Special handling of Wasm files: + // compile them into JS first! + // This allows TS to do correct export types. + let source_code = match file.media_type { + msg::MediaType::Wasm => { + global_state + .wasm_compiler + .compile_async(global_state.clone(), &file) + .await? + .code } - futures::future::Either::Right(futures::future::ok((file, None))) - }) - .collect(); - try_join_all(v) - }) - .and_then(move |files_with_code| { - let res = files_with_code - .into_iter() - .map(|(file, maybe_code)| { - json!({ - "url": file.url.to_string(), - "filename": file.filename.to_str().unwrap(), - "mediaType": file.media_type as i32, - "sourceCode": if let Some(code) = maybe_code { - code - } else { - String::from_utf8(file.source_code).unwrap() - }, - }) - }) - .collect(); - - futures::future::ok(res) + _ => String::from_utf8(file.source_code).unwrap(), + }; + Ok::<_, ErrBox>(json!({ + "url": file.url.to_string(), + "filename": file.filename.to_str().unwrap(), + "mediaType": file.media_type as i32, + "sourceCode": source_code, + })) + } }); - Ok(JsonOp::Async(future.boxed())) + let v = try_join_all(v).await?; + Ok(v.into()) + }); + + Ok(JsonOp::Async(future)) } #[derive(Deserialize)] diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 22d6697e5..54bf52679 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -138,21 +138,23 @@ where let min_op = d(rid, zero_copy); // Convert to CoreOp - let fut = Box::new(min_op.then(move |result| match result { - Ok(r) => { - record.result = r; - futures::future::ok(record.into()) + let fut = async move { + match min_op.await { + Ok(r) => { + record.result = r; + Ok(record.into()) + } + Err(err) => { + let error_record = ErrorRecord { + promise_id: record.promise_id, + arg: -1, + error_code: err.kind() as i32, + error_message: err.to_string().as_bytes().to_owned(), + }; + Ok(error_record.into()) + } } - Err(err) => { - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_code: err.kind() as i32, - error_message: err.to_string().as_bytes().to_owned(), - }; - futures::future::ok(error_record.into()) - } - })); + }; if is_sync { // Warning! Possible deadlocks can occur if we try to wait for a future diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 01fb65b1f..bf1e56dc9 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -8,7 +8,6 @@ use crate::state::ThreadSafeState; use deno::Resource; use deno::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; use std; use std::convert::From; use std::future::Future; @@ -39,18 +38,18 @@ pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { Accept { accept_state: AcceptState::Pending, rid, - state: state.clone(), + state, } } /// A future representing state of accepting a TCP connection. -pub struct Accept { +pub struct Accept<'a> { accept_state: AcceptState, rid: ResourceId, - state: ThreadSafeState, + state: &'a ThreadSafeState, } -impl Future for Accept { +impl Future for Accept<'_> { type Output = Result<(TcpStream, SocketAddr), ErrBox>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { @@ -109,29 +108,19 @@ fn op_accept( .get::<TcpListenerResource>(rid) .ok_or_else(bad_resource)?; - let op = accept(state, rid) - .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = match tcp_stream.local_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let remote_addr = match tcp_stream.peer_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let mut table = state_.lock_resource_table(); - let rid = - table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - futures::future::ok((rid, local_addr, remote_addr)) - }) - .map_err(ErrBox::from) - .and_then(move |(rid, local_addr, remote_addr)| { - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }); + let op = async move { + let (tcp_stream, _socket_addr) = accept(&state_, rid).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut table = state_.lock_resource_table(); + let rid = + table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + Ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }; Ok(JsonOp::Async(op.boxed())) } diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 92474cc4f..80ce4b1ac 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -220,7 +220,8 @@ fn op_run_status( state: state.clone(), }; - let future = future.and_then(move |run_status| { + let future = async move { + let run_status = future.await?; let code = run_status.code(); #[cfg(unix)] @@ -233,12 +234,12 @@ fn op_run_status( .expect("Should have either an exit code or a signal."); let got_signal = signal.is_some(); - futures::future::ok(json!({ + Ok(json!({ "gotSignal": got_signal, "exitCode": code.unwrap_or(-1), "exitSignal": signal.unwrap_or(-1), })) - }); + }; let pool = futures::executor::ThreadPool::new().unwrap(); let handle = pool.spawn_with_handle(future).unwrap(); diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 48b8deb15..00043ce77 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -77,13 +77,11 @@ fn op_worker_get_message( state: state.clone(), }; - let op = op.then(move |maybe_buf| { + let op = async move { + let maybe_buf = op.await; debug!("op_worker_get_message"); - - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf) - })) - }); + Ok(json!({ "data": maybe_buf })) + }; Ok(JsonOp::Async(op.boxed())) } @@ -255,14 +253,12 @@ fn op_host_get_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let op = worker - .get_message() - .map_err(move |_| -> ErrBox { unimplemented!() }) - .and_then(move |maybe_buf| { - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf) - })) - }); + let fut = worker.get_message(); + + let op = async move { + let maybe_buf = fut.await.unwrap(); + Ok(json!({ "data": maybe_buf })) + }; Ok(JsonOp::Async(op.boxed())) } diff --git a/cli/state.rs b/cli/state.rs index 36ab7f331..6cfa51a55 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -301,7 +301,7 @@ impl ThreadSafeState { } pub fn check_dyn_import( - self: &Self, + &self, module_specifier: &ModuleSpecifier, ) -> Result<(), ErrBox> { let u = module_specifier.as_url(); diff --git a/cli/worker.rs b/cli/worker.rs index df5e663b3..31f90c1da 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -157,7 +157,7 @@ impl Worker { /// /// This method blocks current thread. pub fn post_message( - self: &Self, + &self, buf: Buf, ) -> impl Future<Output = Result<(), ErrBox>> { let channels = self.external_channels.lock().unwrap(); @@ -170,7 +170,7 @@ impl Worker { } /// Get message from worker as a host. - pub fn get_message(self: &Self) -> WorkerReceiver { + pub fn get_message(&self) -> WorkerReceiver { WorkerReceiver { channels: self.external_channels.clone(), } |