summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/workers.rs148
1 files changed, 118 insertions, 30 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 6ebaa141f..eeffb3930 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -4,12 +4,15 @@ use crate::deno_error::bad_resource;
use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
+use crate::deno_error::GetErrorKind;
+use crate::fmt_errors::JSError;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use deno_core::*;
use futures;
+use futures::channel::mpsc;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::sink::SinkExt;
@@ -19,7 +22,6 @@ use std::convert::From;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering;
-use std::sync::mpsc;
use std::task::Context;
use std::task::Poll;
@@ -29,8 +31,20 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
s.core_op(json_op(s.stateful_op(op_create_worker))),
);
i.register_op(
- "host_get_worker_closed",
- s.core_op(json_op(s.stateful_op(op_host_get_worker_closed))),
+ "host_get_worker_loaded",
+ s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))),
+ );
+ i.register_op(
+ "host_poll_worker",
+ s.core_op(json_op(s.stateful_op(op_host_poll_worker))),
+ );
+ i.register_op(
+ "host_close_worker",
+ s.core_op(json_op(s.stateful_op(op_host_close_worker))),
+ );
+ i.register_op(
+ "host_resume_worker",
+ s.core_op(json_op(s.stateful_op(op_host_resume_worker))),
);
i.register_op(
"host_post_message",
@@ -155,37 +169,36 @@ fn op_create_worker(
js_check(worker.execute("workerMain()"));
let worker_id = parent_state.add_child_worker(worker.clone());
- let response = json!(worker_id);
// Has provided source code, execute immediately.
if has_source_code {
js_check(worker.execute(&source_code));
- return Ok(JsonOp::Sync(response));
+ return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true})));
}
- // TODO(bartlomieju): this should spawn mod execution on separate tokio task
- // and block on receving message on a channel or even use sync channel /shrug
- let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
+ let (mut sender, receiver) = mpsc::channel::<Result<(), ErrBox>>(1);
+
+ // TODO(bartlomieju): this future should be spawned on the separate thread,
+ // dedicated to that worker
let fut = async move {
let result = worker
.execute_mod_async(&module_specifier, None, false)
.await;
- sender.send(result).expect("Failed to send message");
+ sender.send(result).await.expect("Failed to send message");
}
.boxed();
tokio::spawn(fut);
-
- let result = receiver.recv().expect("Failed to receive message");
- result?;
- Ok(JsonOp::Sync(response))
+ let mut table = state.loading_workers.lock().unwrap();
+ table.insert(worker_id, receiver);
+ Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false})))
}
-struct GetWorkerClosedFuture {
+struct WorkerPollFuture {
state: ThreadSafeState,
rid: ResourceId,
}
-impl Future for GetWorkerClosedFuture {
+impl Future for WorkerPollFuture {
type Output = Result<(), ErrBox>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
@@ -203,39 +216,114 @@ impl Future for GetWorkerClosedFuture {
}
}
+fn serialize_worker_result(result: Result<(), ErrBox>) -> Value {
+ if let Err(error) = result {
+ match error.kind() {
+ ErrorKind::JSError => {
+ let error = error.downcast::<JSError>().unwrap();
+ let exception: V8Exception = error.into();
+ json!({"error": {
+ "message": exception.message,
+ "fileName": exception.script_resource_name,
+ "lineNumber": exception.line_number,
+ "columnNumber": exception.start_column,
+ }})
+ }
+ _ => json!({"error": {
+ "message": error.to_string(),
+ }}),
+ }
+ } else {
+ json!({"ok": true})
+ }
+}
+
#[derive(Deserialize)]
-struct HostGetWorkerClosedArgs {
+struct WorkerArgs {
id: i32,
}
-/// Return when the worker closes
-fn op_host_get_worker_closed(
+fn op_host_get_worker_loaded(
+ state: &ThreadSafeState,
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let mut table = state.loading_workers.lock().unwrap();
+ let mut receiver = table.remove(&id).unwrap();
+
+ let op = async move {
+ let result = receiver.next().await.unwrap();
+ Ok(serialize_worker_result(result))
+ };
+
+ Ok(JsonOp::Async(op.boxed()))
+}
+
+fn op_host_poll_worker(
state: &ThreadSafeState,
args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
- let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
+ let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state_ = state.clone();
- let future = GetWorkerClosedFuture {
+ let future = WorkerPollFuture {
state: state.clone(),
rid: id,
};
- let op = future.then(move |_result| {
- let mut workers_table = state_.workers.lock().unwrap();
- let maybe_worker = workers_table.remove(&id);
- if let Some(worker) = maybe_worker {
- let mut channels = worker.state.worker_channels.lock().unwrap();
- channels.sender.close_channel();
- channels.receiver.close();
- };
- futures::future::ok(json!({}))
- });
+ let op = async move {
+ let result = future.await;
+
+ if result.is_err() {
+ let mut workers_table = state_.workers.lock().unwrap();
+ let worker = workers_table.get_mut(&id).unwrap();
+ worker.clear_exception();
+ }
+
+ Ok(serialize_worker_result(result))
+ };
Ok(JsonOp::Async(op.boxed()))
}
+fn op_host_close_worker(
+ state: &ThreadSafeState,
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let state_ = state.clone();
+
+ let mut workers_table = state_.workers.lock().unwrap();
+ let maybe_worker = workers_table.remove(&id);
+ if let Some(worker) = maybe_worker {
+ let mut channels = worker.state.worker_channels.lock().unwrap();
+ channels.sender.close_channel();
+ channels.receiver.close();
+ };
+
+ Ok(JsonOp::Sync(json!({})))
+}
+
+fn op_host_resume_worker(
+ state: &ThreadSafeState,
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let state_ = state.clone();
+
+ let mut workers_table = state_.workers.lock().unwrap();
+ let worker = workers_table.get_mut(&id).unwrap();
+ js_check(worker.execute("workerMain()"));
+ Ok(JsonOp::Sync(json!({})))
+}
+
#[derive(Deserialize)]
struct HostGetMessageArgs {
id: i32,