diff options
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 113 |
1 files changed, 7 insertions, 106 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index c1dcd6aaa..f8b3edfce 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -4,7 +4,6 @@ use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; -use crate::fmt_errors::JSError; use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::startup_data; @@ -12,11 +11,8 @@ use crate::state::ThreadSafeState; use crate::web_worker::WebWorker; use deno_core::*; use futures; -use futures::channel::mpsc; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::sink::SinkExt; -use futures::stream::StreamExt; use std; use std::convert::From; use std::sync::atomic::Ordering; @@ -27,22 +23,10 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { s.core_op(json_op(s.stateful_op(op_create_worker))), ); i.register_op( - "host_get_worker_loaded", - s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), - ); - i.register_op( - "host_poll_worker", - s.core_op(json_op(s.stateful_op(op_host_poll_worker))), - ); - i.register_op( "host_close_worker", s.core_op(json_op(s.stateful_op(op_host_close_worker))), ); i.register_op( - "host_resume_worker", - s.core_op(json_op(s.stateful_op(op_host_resume_worker))), - ); - i.register_op( "host_post_message", s.core_op(json_op(s.stateful_op(op_host_post_message))), ); @@ -130,29 +114,21 @@ fn op_create_worker( // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - load_sender - .send(Ok(json!({"id": worker_id, "loaded": true}))) - .unwrap(); + load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); return; } - let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1); - - // TODO(bartlomieju): this future should be spawned on the separate thread, - // dedicated to that worker let fut = async move { - let result = worker + let r = worker .execute_mod_async(&module_specifier, None, false) .await; - sender.send(result).await.expect("Failed to send message"); + if r.is_ok() { + let _ = (&mut *worker).await; + } } .boxed_local(); - let mut table = parent_state.loading_workers.lock().unwrap(); - table.insert(worker_id, receiver); - load_sender - .send(Ok(json!({"id": worker_id, "loaded": false}))) - .unwrap(); + load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); crate::tokio_util::run_basic(fut); }); @@ -162,67 +138,11 @@ fn op_create_worker( Ok(JsonOp::Sync(r.unwrap())) } -fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { - use crate::deno_error::GetErrorKind; - - if let Err(error) = result { - match error.kind() { - ErrorKind::JSError => { - let error = error.downcast::<JSError>().unwrap(); - let exception: V8Exception = error.into(); - json!({"error": { - "message": exception.message, - "fileName": exception.script_resource_name, - "lineNumber": exception.line_number, - "columnNumber": exception.start_column, - }}) - } - _ => json!({"error": { - "message": error.to_string(), - }}), - } - } else { - json!({"ok": true}) - } -} - #[derive(Deserialize)] struct WorkerArgs { id: i32, } -fn op_host_get_worker_loaded( - state: &ThreadSafeState, - args: Value, - _data: Option<ZeroCopyBuf>, -) -> Result<JsonOp, ErrBox> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let mut table = state.loading_workers.lock().unwrap(); - let mut receiver = table.remove(&id).unwrap(); - - let op = async move { - let result = receiver.next().await.unwrap(); - Ok(serialize_worker_result(result)) - }; - - Ok(JsonOp::Async(op.boxed_local())) -} - -fn op_host_poll_worker( - _state: &ThreadSafeState, - _args: Value, - _data: Option<ZeroCopyBuf>, -) -> Result<JsonOp, ErrBox> { - println!("op_host_poll_worker"); - // TOOO(ry) remove this. - todo!() - /* - let op = async { Ok(serialize_worker_result(Ok(()))) }; - Ok(JsonOp::Async(op.boxed_local())) - */ -} - fn op_host_close_worker( state: &ThreadSafeState, args: Value, @@ -246,25 +166,6 @@ fn op_host_close_worker( Ok(JsonOp::Sync(json!({}))) } -fn op_host_resume_worker( - _state: &ThreadSafeState, - _args: Value, - _data: Option<ZeroCopyBuf>, -) -> Result<JsonOp, ErrBox> { - // TODO(ry) We are not on the same thread. We cannot just call worker.execute. - // We can only send messages. This needs to be reimplemented somehow. - todo!() - /* - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state = state.clone(); - let mut workers_table = state.workers.lock().unwrap(); - let worker = workers_table.get_mut(&id).unwrap(); - js_check(worker.execute("runWorkerMessageLoop()")); - Ok(JsonOp::Sync(json!({}))) - */ -} - #[derive(Deserialize)] struct HostGetMessageArgs { id: i32, @@ -284,7 +185,7 @@ fn op_host_get_message( let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; let fut = worker_handle.get_message(); let op = async move { - let maybe_buf = fut.await.unwrap(); + let maybe_buf = fut.await; Ok(json!({ "data": maybe_buf })) }; Ok(JsonOp::Async(op.boxed_local())) |