diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/dispatch_json.rs | 36 | ||||
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 4 | ||||
-rw-r--r-- | cli/ops/fetch.rs | 2 | ||||
-rw-r--r-- | cli/ops/files.rs | 4 | ||||
-rw-r--r-- | cli/ops/fs.rs | 3 | ||||
-rw-r--r-- | cli/ops/io.rs | 11 | ||||
-rw-r--r-- | cli/ops/mod.rs | 1 | ||||
-rw-r--r-- | cli/ops/net.rs | 4 | ||||
-rw-r--r-- | cli/ops/signal.rs | 2 | ||||
-rw-r--r-- | cli/ops/timers.rs | 2 | ||||
-rw-r--r-- | cli/ops/tls.rs | 4 | ||||
-rw-r--r-- | cli/ops/web_worker.rs | 2 | ||||
-rw-r--r-- | cli/ops/worker_host.rs | 225 |
13 files changed, 151 insertions, 149 deletions
diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 0b053b1e8..0806001ab 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -6,10 +6,10 @@ use serde_json::json; pub use serde_json::Value; use std::future::Future; use std::pin::Pin; -use tokio::task; -pub type AsyncJsonOp = - Pin<Box<dyn Future<Output = Result<Value, ErrBox>> + Send>>; +pub type JsonResult = Result<Value, ErrBox>; + +pub type AsyncJsonOp = Pin<Box<dyn Future<Output = JsonResult>>>; pub enum JsonOp { Sync(Value), @@ -27,10 +27,7 @@ fn json_err(err: ErrBox) -> Value { }) } -fn serialize_result( - promise_id: Option<u64>, - result: Result<Value, ErrBox>, -) -> Buf { +fn serialize_result(promise_id: Option<u64>, result: JsonResult) -> Buf { let value = match result { Ok(v) => json!({ "ok": v, "promiseId": promise_id }), Err(err) => json!({ "err": json_err(err), "promiseId": promise_id }), @@ -78,21 +75,21 @@ where let fut2 = fut.then(move |result| { futures::future::ok(serialize_result(promise_id, result)) }); - CoreOp::Async(fut2.boxed()) + CoreOp::Async(fut2.boxed_local()) } Ok(JsonOp::AsyncUnref(fut)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { futures::future::ok(serialize_result(promise_id, result)) }); - CoreOp::AsyncUnref(fut2.boxed()) + CoreOp::AsyncUnref(fut2.boxed_local()) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { CoreOp::Sync(buf) } else { - CoreOp::Async(futures::future::ok(buf).boxed()) + CoreOp::Async(futures::future::ok(buf).boxed_local()) } } } @@ -101,17 +98,20 @@ where pub fn blocking_json<F>(is_sync: bool, f: F) -> Result<JsonOp, ErrBox> where - F: 'static + Send + FnOnce() -> Result<Value, ErrBox> + Unpin, + F: 'static + Send + FnOnce() -> JsonResult, { if is_sync { Ok(JsonOp::Sync(f()?)) } else { - let fut = async move { - task::spawn_blocking(move || f()) - .await - .map_err(ErrBox::from)? - } - .boxed(); - Ok(JsonOp::Async(fut.boxed())) + // TODO(ry) use thread pool. + let fut = crate::tokio_util::spawn_thread(f); + /* + let fut = async move { + tokio::task::spawn_blocking(move || f()) + .await + .map_err(ErrBox::from)? + }.boxed_local(); + */ + Ok(JsonOp::Async(fut.boxed_local())) } } diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index b9a4f7530..70c4af6c3 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -16,7 +16,7 @@ use futures::future::FutureExt; use std::future::Future; use std::pin::Pin; -pub type MinimalOp = dyn Future<Output = Result<i32, ErrBox>> + Send; +pub type MinimalOp = dyn Future<Output = Result<i32, ErrBox>>; #[derive(Copy, Clone, Debug, PartialEq)] // This corresponds to RecordMinimal on the TS side. @@ -164,7 +164,7 @@ where // works since they're simple polling futures. Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut.boxed()) + Op::Async(fut.boxed_local()) } } } diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index ba7f7a949..7ce3f1a40 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -81,5 +81,5 @@ pub fn op_fetch( Ok(json_res) }; - Ok(JsonOp::Async(future.boxed())) + Ok(JsonOp::Async(future.boxed_local())) } diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 76a0191fd..f32de90b9 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -139,7 +139,7 @@ fn op_open( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed())) + Ok(JsonOp::Async(fut.boxed_local())) } } @@ -211,6 +211,6 @@ fn op_seek( let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(fut.boxed())) + Ok(JsonOp::Async(fut.boxed_local())) } } diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs index d5ce59f99..1112db495 100644 --- a/cli/ops/fs.rs +++ b/cli/ops/fs.rs @@ -4,6 +4,7 @@ use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value}; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fs as deno_fs; +use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::state::ThreadSafeState; use deno_core::*; @@ -233,7 +234,7 @@ macro_rules! to_seconds { fn get_stat_json( metadata: fs::Metadata, maybe_name: Option<String>, -) -> Result<Value, ErrBox> { +) -> JsonResult { // Unix stat member (number types only). 0 if not on unix. macro_rules! usm { ($member: ident) => {{ diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 410748ca4..18ad3cf30 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -187,14 +187,14 @@ pub fn op_read( debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { - return futures::future::err(deno_error::no_buffer_specified()).boxed() + return futures::future::err(deno_error::no_buffer_specified()) + .boxed_local() } Some(buf) => buf, }; let fut = read(state, rid as u32, zero_copy); - - fut.boxed() + fut.boxed_local() } /// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait @@ -332,12 +332,13 @@ pub fn op_write( debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { - return futures::future::err(deno_error::no_buffer_specified()).boxed() + return futures::future::err(deno_error::no_buffer_specified()) + .boxed_local() } Some(buf) => buf, }; let fut = write(state, rid as u32, zero_copy); - fut.boxed() + fut.boxed_local() } diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index aa702c9c8..dd772cd9a 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -4,6 +4,7 @@ mod dispatch_minimal; pub use dispatch_json::json_op; pub use dispatch_json::JsonOp; +pub use dispatch_json::JsonResult; pub use dispatch_minimal::minimal_op; pub use dispatch_minimal::MinimalOp; diff --git a/cli/ops/net.rs b/cli/ops/net.rs index adad32815..41cfc2909 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -130,7 +130,7 @@ fn op_accept( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } #[derive(Deserialize)] @@ -173,7 +173,7 @@ fn op_connect( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } #[derive(Deserialize)] diff --git a/cli/ops/signal.rs b/cli/ops/signal.rs index b2a9b73ac..9726becc5 100644 --- a/cli/ops/signal.rs +++ b/cli/ops/signal.rs @@ -94,7 +94,7 @@ fn op_signal_poll( }) .then(|result| async move { Ok(json!({ "done": result.is_none() })) }); - Ok(JsonOp::AsyncUnref(future.boxed())) + Ok(JsonOp::AsyncUnref(future.boxed_local())) } #[cfg(unix)] diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 2c21ba6f1..75b53518c 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -51,7 +51,7 @@ fn op_global_timer( .new_timeout(deadline) .then(move |_| futures::future::ok(json!({}))); - Ok(JsonOp::Async(f.boxed())) + Ok(JsonOp::Async(f.boxed_local())) } // Returns a milliseconds and nanoseconds subsec diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 45b6887a0..126a00f63 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -116,7 +116,7 @@ pub fn op_connect_tls( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } fn load_certs(path: &str) -> Result<Vec<Certificate>, ErrBox> { @@ -397,5 +397,5 @@ fn op_accept_tls( })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index ff2c558be..db7086c59 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -37,7 +37,7 @@ fn op_worker_get_message( Ok(json!({ "data": maybe_buf })) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } /// Post message to host as guest worker diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index a1509d2f7..519294314 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -5,6 +5,7 @@ use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fmt_errors::JSError; +use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; @@ -18,11 +19,7 @@ use futures::sink::SinkExt; use futures::stream::StreamExt; use std; use std::convert::From; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op( @@ -73,93 +70,99 @@ fn op_create_worker( ) -> Result<JsonOp, ErrBox> { let args: CreateWorkerArgs = serde_json::from_value(args)?; - let specifier = args.specifier.as_ref(); + let specifier = args.specifier.clone(); let has_source_code = args.has_source_code; - let source_code = args.source_code; - + let source_code = args.source_code.clone(); + let args_name = args.name; let parent_state = state.clone(); - // TODO(bartlomieju): Isn't this wrong? - let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; - if !has_source_code { - if let Some(referrer) = parent_state.main_module.as_ref() { - let referrer = referrer.clone().to_string(); - module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; + let (load_sender, load_receiver) = + std::sync::mpsc::sync_channel::<JsonResult>(1); + + std::thread::spawn(move || { + // TODO(bartlomieju): Isn't this wrong? + let result = ModuleSpecifier::resolve_url_or_path(&specifier); + if let Err(err) = result { + load_sender.send(Err(err.into())).unwrap(); + return; + } + let mut module_specifier = result.unwrap(); + if !has_source_code { + if let Some(referrer) = parent_state.main_module.as_ref() { + let referrer = referrer.clone().to_string(); + let result = ModuleSpecifier::resolve_import(&specifier, &referrer); + if let Err(err) = result { + load_sender.send(Err(err.into())).unwrap(); + return; + } + module_specifier = result.unwrap(); + } } - } - let (int, ext) = ThreadSafeState::create_channels(); - let child_state = ThreadSafeState::new_for_worker( - state.global_state.clone(), - Some(parent_state.permissions.clone()), // by default share with parent - Some(module_specifier.clone()), - int, - )?; - let worker_name = if let Some(name) = args.name { - name - } else { - // TODO(bartlomieju): change it to something more descriptive - format!("USER-WORKER-{}", specifier) - }; + let (int, ext) = ThreadSafeState::create_channels(); + let result = ThreadSafeState::new_for_worker( + parent_state.global_state.clone(), + Some(parent_state.permissions.clone()), // by default share with parent + Some(module_specifier.clone()), + int, + ); + if let Err(err) = result { + load_sender.send(Err(err)).unwrap(); + return; + } + let child_state = result.unwrap(); + let worker_name = args_name.unwrap_or_else(|| { + // TODO(bartlomieju): change it to something more descriptive + format!("USER-WORKER-{}", specifier) + }); + + // TODO: add a new option to make child worker not sharing permissions + // with parent (aka .clone(), requests from child won't reflect in parent) + let mut worker = WebWorker::new( + worker_name.to_string(), + startup_data::deno_isolate_init(), + child_state, + ext, + ); + let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name); + js_check(worker.execute(&script)); + js_check(worker.execute("runWorkerMessageLoop()")); + + let worker_id = parent_state.add_child_worker(&worker); + + // Has provided source code, execute immediately. + if has_source_code { + js_check(worker.execute(&source_code)); + load_sender + .send(Ok(json!({"id": worker_id, "loaded": true}))) + .unwrap(); + return; + } - // TODO: add a new option to make child worker not sharing permissions - // with parent (aka .clone(), requests from child won't reflect in parent) - let mut worker = WebWorker::new( - worker_name.to_string(), - startup_data::deno_isolate_init(), - child_state, - ext, - ); - let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name); - js_check(worker.execute(&script)); - js_check(worker.execute("runWorkerMessageLoop()")); + let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1); - let worker_id = parent_state.add_child_worker(worker.clone()); + // TODO(bartlomieju): this future should be spawned on the separate thread, + // dedicated to that worker + let fut = async move { + let result = worker + .execute_mod_async(&module_specifier, None, false) + .await; + sender.send(result).await.expect("Failed to send message"); + } + .boxed_local(); + let mut table = parent_state.loading_workers.lock().unwrap(); + table.insert(worker_id, receiver); - // Has provided source code, execute immediately. - if has_source_code { - js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); - } + load_sender + .send(Ok(json!({"id": worker_id, "loaded": false}))) + .unwrap(); - let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1); + crate::tokio_util::run_basic(fut); + }); - // TODO(bartlomieju): this future should be spawned on the separate thread, - // dedicated to that worker - let fut = async move { - let result = worker - .execute_mod_async(&module_specifier, None, false) - .await; - sender.send(result).await.expect("Failed to send message"); - } - .boxed(); - tokio::spawn(fut); - let mut table = state.loading_workers.lock().unwrap(); - table.insert(worker_id, receiver); - Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) -} + let r = load_receiver.recv().unwrap(); -struct WorkerPollFuture { - state: ThreadSafeState, - rid: ResourceId, -} - -impl Future for WorkerPollFuture { - type Output = Result<(), ErrBox>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut workers_table = inner.state.workers.lock().unwrap(); - let maybe_worker = workers_table.get_mut(&inner.rid); - if maybe_worker.is_none() { - return Poll::Ready(Ok(())); - } - match maybe_worker.unwrap().poll_unpin(cx) { - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Pending => Poll::Pending, - } - } + Ok(JsonOp::Sync(r.unwrap())) } fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { @@ -206,27 +209,21 @@ fn op_host_get_worker_loaded( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } fn op_host_poll_worker( - state: &ThreadSafeState, - args: Value, + _state: &ThreadSafeState, + _args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - - let future = WorkerPollFuture { - state: state.clone(), - rid: id, - }; - - let op = async move { - let result = future.await; - Ok(serialize_worker_result(result)) - }; - Ok(JsonOp::Async(op.boxed())) + println!("op_host_poll_worker"); + // TOOO(ry) remove this. + todo!() + /* + let op = async { Ok(serialize_worker_result(Ok(()))) }; + Ok(JsonOp::Async(op.boxed_local())) + */ } fn op_host_close_worker( @@ -239,13 +236,13 @@ fn op_host_close_worker( let state_ = state.clone(); let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker = workers_table.remove(&id); - if let Some(worker) = maybe_worker { - let channels = worker.state.worker_channels.clone(); - let mut sender = channels.sender.clone(); + let maybe_worker_handle = workers_table.remove(&id); + if let Some(worker_handle) = maybe_worker_handle { + let mut sender = worker_handle.sender.clone(); sender.close_channel(); - let mut receiver = futures::executor::block_on(channels.receiver.lock()); + let mut receiver = + futures::executor::block_on(worker_handle.receiver.lock()); receiver.close(); }; @@ -253,18 +250,22 @@ fn op_host_close_worker( } fn op_host_resume_worker( - state: &ThreadSafeState, - args: Value, + _state: &ThreadSafeState, + _args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { + // TODO(ry) We are not on the same thread. We cannot just call worker.execute. + // We can only send messages. This needs to be reimplemented somehow. + todo!() + /* let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state_ = state.clone(); - - let mut workers_table = state_.workers.lock().unwrap(); + let state = state.clone(); + let mut workers_table = state.workers.lock().unwrap(); let worker = workers_table.get_mut(&id).unwrap(); js_check(worker.execute("runWorkerMessageLoop()")); Ok(JsonOp::Sync(json!({}))) + */ } #[derive(Deserialize)] @@ -283,15 +284,13 @@ fn op_host_get_message( let id = args.id as u32; let mut table = state_.workers.lock().unwrap(); // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker.get_message(); - + let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker_handle.get_message(); let op = async move { let maybe_buf = fut.await.unwrap(); Ok(json!({ "data": maybe_buf })) }; - - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } #[derive(Deserialize)] @@ -312,8 +311,8 @@ fn op_host_post_message( debug!("post message to worker {}", id); let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker + let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); futures::executor::block_on(fut)?; |