diff options
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 225 |
1 files changed, 112 insertions, 113 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index a1509d2f7..519294314 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -5,6 +5,7 @@ use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fmt_errors::JSError; +use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; @@ -18,11 +19,7 @@ use futures::sink::SinkExt; use futures::stream::StreamExt; use std; use std::convert::From; -use std::future::Future; -use std::pin::Pin; use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op( @@ -73,93 +70,99 @@ fn op_create_worker( ) -> Result<JsonOp, ErrBox> { let args: CreateWorkerArgs = serde_json::from_value(args)?; - let specifier = args.specifier.as_ref(); + let specifier = args.specifier.clone(); let has_source_code = args.has_source_code; - let source_code = args.source_code; - + let source_code = args.source_code.clone(); + let args_name = args.name; let parent_state = state.clone(); - // TODO(bartlomieju): Isn't this wrong? - let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; - if !has_source_code { - if let Some(referrer) = parent_state.main_module.as_ref() { - let referrer = referrer.clone().to_string(); - module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; + let (load_sender, load_receiver) = + std::sync::mpsc::sync_channel::<JsonResult>(1); + + std::thread::spawn(move || { + // TODO(bartlomieju): Isn't this wrong? + let result = ModuleSpecifier::resolve_url_or_path(&specifier); + if let Err(err) = result { + load_sender.send(Err(err.into())).unwrap(); + return; + } + let mut module_specifier = result.unwrap(); + if !has_source_code { + if let Some(referrer) = parent_state.main_module.as_ref() { + let referrer = referrer.clone().to_string(); + let result = ModuleSpecifier::resolve_import(&specifier, &referrer); + if let Err(err) = result { + load_sender.send(Err(err.into())).unwrap(); + return; + } + module_specifier = result.unwrap(); + } } - } - let (int, ext) = ThreadSafeState::create_channels(); - let child_state = ThreadSafeState::new_for_worker( - state.global_state.clone(), - Some(parent_state.permissions.clone()), // by default share with parent - Some(module_specifier.clone()), - int, - )?; - let worker_name = if let Some(name) = args.name { - name - } else { - // TODO(bartlomieju): change it to something more descriptive - format!("USER-WORKER-{}", specifier) - }; + let (int, ext) = ThreadSafeState::create_channels(); + let result = ThreadSafeState::new_for_worker( + parent_state.global_state.clone(), + Some(parent_state.permissions.clone()), // by default share with parent + Some(module_specifier.clone()), + int, + ); + if let Err(err) = result { + load_sender.send(Err(err)).unwrap(); + return; + } + let child_state = result.unwrap(); + let worker_name = args_name.unwrap_or_else(|| { + // TODO(bartlomieju): change it to something more descriptive + format!("USER-WORKER-{}", specifier) + }); + + // TODO: add a new option to make child worker not sharing permissions + // with parent (aka .clone(), requests from child won't reflect in parent) + let mut worker = WebWorker::new( + worker_name.to_string(), + startup_data::deno_isolate_init(), + child_state, + ext, + ); + let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name); + js_check(worker.execute(&script)); + js_check(worker.execute("runWorkerMessageLoop()")); + + let worker_id = parent_state.add_child_worker(&worker); + + // Has provided source code, execute immediately. + if has_source_code { + js_check(worker.execute(&source_code)); + load_sender + .send(Ok(json!({"id": worker_id, "loaded": true}))) + .unwrap(); + return; + } - // TODO: add a new option to make child worker not sharing permissions - // with parent (aka .clone(), requests from child won't reflect in parent) - let mut worker = WebWorker::new( - worker_name.to_string(), - startup_data::deno_isolate_init(), - child_state, - ext, - ); - let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name); - js_check(worker.execute(&script)); - js_check(worker.execute("runWorkerMessageLoop()")); + let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1); - let worker_id = parent_state.add_child_worker(worker.clone()); + // TODO(bartlomieju): this future should be spawned on the separate thread, + // dedicated to that worker + let fut = async move { + let result = worker + .execute_mod_async(&module_specifier, None, false) + .await; + sender.send(result).await.expect("Failed to send message"); + } + .boxed_local(); + let mut table = parent_state.loading_workers.lock().unwrap(); + table.insert(worker_id, receiver); - // Has provided source code, execute immediately. - if has_source_code { - js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); - } + load_sender + .send(Ok(json!({"id": worker_id, "loaded": false}))) + .unwrap(); - let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1); + crate::tokio_util::run_basic(fut); + }); - // TODO(bartlomieju): this future should be spawned on the separate thread, - // dedicated to that worker - let fut = async move { - let result = worker - .execute_mod_async(&module_specifier, None, false) - .await; - sender.send(result).await.expect("Failed to send message"); - } - .boxed(); - tokio::spawn(fut); - let mut table = state.loading_workers.lock().unwrap(); - table.insert(worker_id, receiver); - Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) -} + let r = load_receiver.recv().unwrap(); -struct WorkerPollFuture { - state: ThreadSafeState, - rid: ResourceId, -} - -impl Future for WorkerPollFuture { - type Output = Result<(), ErrBox>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut workers_table = inner.state.workers.lock().unwrap(); - let maybe_worker = workers_table.get_mut(&inner.rid); - if maybe_worker.is_none() { - return Poll::Ready(Ok(())); - } - match maybe_worker.unwrap().poll_unpin(cx) { - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Pending => Poll::Pending, - } - } + Ok(JsonOp::Sync(r.unwrap())) } fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { @@ -206,27 +209,21 @@ fn op_host_get_worker_loaded( Ok(serialize_worker_result(result)) }; - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } fn op_host_poll_worker( - state: &ThreadSafeState, - args: Value, + _state: &ThreadSafeState, + _args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - - let future = WorkerPollFuture { - state: state.clone(), - rid: id, - }; - - let op = async move { - let result = future.await; - Ok(serialize_worker_result(result)) - }; - Ok(JsonOp::Async(op.boxed())) + println!("op_host_poll_worker"); + // TOOO(ry) remove this. + todo!() + /* + let op = async { Ok(serialize_worker_result(Ok(()))) }; + Ok(JsonOp::Async(op.boxed_local())) + */ } fn op_host_close_worker( @@ -239,13 +236,13 @@ fn op_host_close_worker( let state_ = state.clone(); let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker = workers_table.remove(&id); - if let Some(worker) = maybe_worker { - let channels = worker.state.worker_channels.clone(); - let mut sender = channels.sender.clone(); + let maybe_worker_handle = workers_table.remove(&id); + if let Some(worker_handle) = maybe_worker_handle { + let mut sender = worker_handle.sender.clone(); sender.close_channel(); - let mut receiver = futures::executor::block_on(channels.receiver.lock()); + let mut receiver = + futures::executor::block_on(worker_handle.receiver.lock()); receiver.close(); }; @@ -253,18 +250,22 @@ fn op_host_close_worker( } fn op_host_resume_worker( - state: &ThreadSafeState, - args: Value, + _state: &ThreadSafeState, + _args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { + // TODO(ry) We are not on the same thread. We cannot just call worker.execute. + // We can only send messages. This needs to be reimplemented somehow. + todo!() + /* let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state_ = state.clone(); - - let mut workers_table = state_.workers.lock().unwrap(); + let state = state.clone(); + let mut workers_table = state.workers.lock().unwrap(); let worker = workers_table.get_mut(&id).unwrap(); js_check(worker.execute("runWorkerMessageLoop()")); Ok(JsonOp::Sync(json!({}))) + */ } #[derive(Deserialize)] @@ -283,15 +284,13 @@ fn op_host_get_message( let id = args.id as u32; let mut table = state_.workers.lock().unwrap(); // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker.get_message(); - + let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker_handle.get_message(); let op = async move { let maybe_buf = fut.await.unwrap(); Ok(json!({ "data": maybe_buf })) }; - - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(op.boxed_local())) } #[derive(Deserialize)] @@ -312,8 +311,8 @@ fn op_host_post_message( debug!("post message to worker {}", id); let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker + let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); futures::executor::block_on(fut)?; |