summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/runtime.rs19
-rw-r--r--cli/ops/web_worker.rs80
-rw-r--r--cli/ops/worker_host.rs351
3 files changed, 294 insertions, 156 deletions
diff --git a/cli/ops/runtime.rs b/cli/ops/runtime.rs
index a962f4e83..7773e461c 100644
--- a/cli/ops/runtime.rs
+++ b/cli/ops/runtime.rs
@@ -8,6 +8,7 @@ use crate::version;
use crate::DenoSubcommand;
use deno_core::*;
use std::env;
+use std::sync::atomic::Ordering;
/// BUILD_OS and BUILD_ARCH match the values in Deno.build. See js/build.ts.
#[cfg(target_os = "macos")]
@@ -21,6 +22,7 @@ static BUILD_ARCH: &str = "x64";
pub fn init(i: &mut Isolate, s: &State) {
i.register_op("start", s.core_op(json_op(s.stateful_op(op_start))));
+ i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
fn op_start(
@@ -47,3 +49,20 @@ fn op_start(
"arch": BUILD_ARCH,
})))
}
+
+fn op_metrics(
+ state: &State,
+ _args: Value,
+ _zero_copy: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let state = state.borrow();
+ let m = &state.metrics;
+
+ Ok(JsonOp::Sync(json!({
+ "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
+ "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
+ "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
+ "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
+ "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
+ })))
+}
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
index ae6b10abc..e22c0f221 100644
--- a/cli/ops/web_worker.rs
+++ b/cli/ops/web_worker.rs
@@ -1,65 +1,65 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
-use crate::deno_error::DenoError;
-use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::state::State;
+use crate::worker::WorkerEvent;
use deno_core::*;
use futures;
-use futures::future::FutureExt;
+use futures::channel::mpsc;
+use futures::sink::SinkExt;
use std;
use std::convert::From;
-pub fn init(i: &mut Isolate, s: &State) {
+pub fn web_worker_op<D>(
+ sender: mpsc::Sender<WorkerEvent>,
+ dispatcher: D,
+) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>
+where
+ D: Fn(
+ &mpsc::Sender<WorkerEvent>,
+ Value,
+ Option<ZeroCopyBuf>,
+ ) -> Result<JsonOp, ErrBox>,
+{
+ move |args: Value, zero_copy: Option<ZeroCopyBuf>| -> Result<JsonOp, ErrBox> {
+ dispatcher(&sender, args, zero_copy)
+ }
+}
+
+pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) {
i.register_op(
"worker_post_message",
- s.core_op(json_op(s.stateful_op(op_worker_post_message))),
+ s.core_op(json_op(web_worker_op(
+ sender.clone(),
+ op_worker_post_message,
+ ))),
);
i.register_op(
- "worker_get_message",
- s.core_op(json_op(s.stateful_op(op_worker_get_message))),
+ "worker_close",
+ s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))),
);
}
-/// Get message from host as guest worker
-fn op_worker_get_message(
- state: &State,
- _args: Value,
- _data: Option<ZeroCopyBuf>,
-) -> Result<JsonOp, ErrBox> {
- let state_ = state.clone();
- let op = async move {
- let fut = {
- let state = state_.borrow();
- state
- .worker_channels_internal
- .as_ref()
- .unwrap()
- .get_message()
- };
- let maybe_buf = fut.await;
- debug!("op_worker_get_message");
- Ok(json!({ "data": maybe_buf }))
- };
-
- Ok(JsonOp::Async(op.boxed_local()))
-}
-
/// Post message to host as guest worker
fn op_worker_post_message(
- state: &State,
+ sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let state = state.borrow();
- let fut = state
- .worker_channels_internal
- .as_ref()
- .unwrap()
- .post_message(d);
- futures::executor::block_on(fut)
- .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+ let mut sender = sender.clone();
+ let fut = sender.send(WorkerEvent::Message(d));
+ futures::executor::block_on(fut).expect("Failed to post message to host");
+ Ok(JsonOp::Sync(json!({})))
+}
+/// Notify host that guest worker closes
+fn op_worker_close(
+ sender: &mpsc::Sender<WorkerEvent>,
+ _args: Value,
+ _data: Option<ZeroCopyBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let mut sender = sender.clone();
+ sender.close_channel();
Ok(JsonOp::Sync(json!({})))
}
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index fabe0b5e8..4f6f996ee 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -1,21 +1,29 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use crate::deno_error::bad_resource;
-use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
+use crate::deno_error::GetErrorKind;
+use crate::fmt_errors::JSError;
+use crate::futures::SinkExt;
+use crate::global_state::GlobalState;
use crate::ops::json_op;
+use crate::permissions::DenoPermissions;
use crate::startup_data;
use crate::state::State;
+use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
-use crate::worker::WorkerChannelsExternal;
+use crate::worker::Worker;
+use crate::worker::WorkerEvent;
+use crate::worker::WorkerHandle;
use deno_core::*;
use futures;
+use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
+use futures::stream::StreamExt;
use std;
use std::convert::From;
-use std::sync::atomic::Ordering;
+use std::task::Poll;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
@@ -23,8 +31,8 @@ pub fn init(i: &mut Isolate, s: &State) {
s.core_op(json_op(s.stateful_op(op_create_worker))),
);
i.register_op(
- "host_close_worker",
- s.core_op(json_op(s.stateful_op(op_host_close_worker))),
+ "host_terminate_worker",
+ s.core_op(json_op(s.stateful_op(op_host_terminate_worker))),
);
i.register_op(
"host_post_message",
@@ -34,7 +42,159 @@ pub fn init(i: &mut Isolate, s: &State) {
"host_get_message",
s.core_op(json_op(s.stateful_op(op_host_get_message))),
);
- i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
+}
+
+fn create_web_worker(
+ name: String,
+ global_state: GlobalState,
+ permissions: DenoPermissions,
+ specifier: ModuleSpecifier,
+) -> Result<WebWorker, ErrBox> {
+ let state =
+ State::new_for_worker(global_state, Some(permissions), specifier)?;
+
+ let mut worker =
+ WebWorker::new(name.to_string(), startup_data::deno_isolate_init(), state);
+ let script = format!("bootstrapWorkerRuntime(\"{}\")", name);
+ worker.execute(&script)?;
+
+ Ok(worker)
+}
+
+// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
+pub fn run_worker_loop(
+ rt: &mut tokio::runtime::Runtime,
+ worker: &mut Worker,
+) -> Result<(), ErrBox> {
+ let mut worker_is_ready = false;
+
+ let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> {
+ if !worker_is_ready {
+ match worker.poll_unpin(cx) {
+ Poll::Ready(r) => {
+ if let Err(e) = r {
+ let mut sender = worker.internal_channels.sender.clone();
+ futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
+ .expect("Failed to post message to host");
+ }
+ worker_is_ready = true;
+ }
+ Poll::Pending => {}
+ }
+ }
+
+ let maybe_msg = {
+ match worker.internal_channels.receiver.poll_next_unpin(cx) {
+ Poll::Ready(r) => match r {
+ Some(msg) => {
+ let msg_str = String::from_utf8(msg.to_vec()).unwrap();
+ debug!("received message from host: {}", msg_str);
+ Some(msg_str)
+ }
+ None => {
+ debug!("channel closed by host, worker event loop shuts down");
+ return Poll::Ready(Ok(()));
+ }
+ },
+ Poll::Pending => None,
+ }
+ };
+
+ if let Some(msg) = maybe_msg {
+ // TODO: just add second value and then bind using rusty_v8
+ // to get structured clone/transfer working
+ let script = format!("workerMessageRecvCallback({})", msg);
+ worker
+ .execute(&script)
+ .expect("Failed to execute message cb");
+ // Let worker be polled again
+ worker_is_ready = false;
+ worker.waker.wake();
+ }
+
+ Poll::Pending
+ });
+
+ rt.block_on(fut)
+}
+
+// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
+// TODO(bartlomieju): check if order of actions is aligned to Worker spec
+fn run_worker_thread(
+ name: String,
+ global_state: GlobalState,
+ permissions: DenoPermissions,
+ specifier: ModuleSpecifier,
+ has_source_code: bool,
+ source_code: String,
+) -> Result<WorkerHandle, ErrBox> {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
+
+ let builder =
+ std::thread::Builder::new().name(format!("deno-worker-{}", name));
+ // TODO(bartlomieju): store JoinHandle as well
+ builder.spawn(move || {
+ // Any error inside this block is terminal:
+ // - JS worker is useless - meaning it throws an exception and can't do anything else,
+ // all action done upon it should be noops
+ // - newly spawned thread exits
+ let result =
+ create_web_worker(name, global_state, permissions, specifier.clone());
+
+ if let Err(err) = result {
+ handle_sender.send(Err(err)).unwrap();
+ return;
+ }
+
+ let mut worker = result.unwrap();
+ // Send thread safe handle to newly created worker to host thread
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+
+ // At this point the only method of communication with host
+ // is using `worker.internal_channels`.
+ //
+ // Host can already push messages and interact with worker.
+ //
+ // Next steps:
+ // - create tokio runtime
+ // - load provided module or code
+ // - start driving worker's event loop
+
+ let mut rt = create_basic_runtime();
+
+ // TODO: run with using select with terminate
+
+ // Execute provided source code immediately
+ let result = if has_source_code {
+ worker.execute(&source_code)
+ } else {
+ // TODO(bartlomieju): add "type": "classic", ie. ability to load
+ // script instead of module
+ let load_future = worker
+ .execute_mod_async(&specifier, None, false)
+ .boxed_local();
+
+ rt.block_on(load_future)
+ };
+
+ if let Err(e) = result {
+ let mut sender = worker.internal_channels.sender.clone();
+ futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
+ .expect("Failed to post message to host");
+
+ // Failure to execute script is a terminal error, bye, bye.
+ return;
+ }
+
+ // TODO(bartlomieju): this thread should return result of event loop
+ // that means that we should store JoinHandle to thread to ensure
+ // that it actually terminates.
+ run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ })?;
+
+ handle_receiver.recv().unwrap()
}
#[derive(Deserialize)]
@@ -61,72 +221,28 @@ fn op_create_worker(
let parent_state = state.clone();
let state = state.borrow();
let global_state = state.global_state.clone();
- let child_permissions = state.permissions.clone();
+ let permissions = state.permissions.clone();
let referrer = state.main_module.to_string();
drop(state);
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<Result<WorkerChannelsExternal, ErrBox>>(1);
-
- // TODO(bartlomieju): Isn't this wrong?
- let result = ModuleSpecifier::resolve_url_or_path(&specifier)?;
- let module_specifier = if !has_source_code {
- ModuleSpecifier::resolve_import(&specifier, &referrer)?
- } else {
- result
- };
-
- std::thread::spawn(move || {
- let result = State::new_for_worker(
- global_state,
- Some(child_permissions), // by default share with parent
- module_specifier.clone(),
- );
- if let Err(err) = result {
- handle_sender.send(Err(err)).unwrap();
- return;
- }
- let child_state = result.unwrap();
- let worker_name = args_name.unwrap_or_else(|| {
- // TODO(bartlomieju): change it to something more descriptive
- format!("USER-WORKER-{}", specifier)
- });
-
- // TODO: add a new option to make child worker not sharing permissions
- // with parent (aka .clone(), requests from child won't reflect in parent)
- let mut worker = WebWorker::new(
- worker_name.to_string(),
- startup_data::deno_isolate_init(),
- child_state,
- );
- let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name);
- js_check(worker.execute(&script));
- js_check(worker.execute("runWorkerMessageLoop()"));
-
- handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
-
- // Has provided source code, execute immediately.
- if has_source_code {
- js_check(worker.execute(&source_code));
- // FIXME(bartlomieju): runtime is not run in this case
- return;
- }
-
- let fut = async move {
- let r = worker
- .execute_mod_async(&module_specifier, None, false)
- .await;
- if r.is_ok() {
- let _ = (&mut *worker).await;
- }
- }
- .boxed_local();
-
- crate::tokio_util::run_basic(fut);
+ let module_specifier =
+ ModuleSpecifier::resolve_import(&specifier, &referrer)?;
+ let worker_name = args_name.unwrap_or_else(|| {
+ // TODO(bartlomieju): change it to something more descriptive
+ format!("USER-WORKER-{}", specifier)
});
- let handle = handle_receiver.recv().unwrap()?;
- let worker_id = parent_state.add_child_worker(handle);
+ let worker_handle = run_worker_thread(
+ worker_name,
+ global_state,
+ permissions,
+ module_specifier,
+ has_source_code,
+ source_code,
+ )?;
+ // At this point all interactions with worker happen using thread
+ // safe handler returned from previous function call
+ let worker_id = parent_state.add_child_worker(worker_handle);
Ok(JsonOp::Sync(json!({ "id": worker_id })))
}
@@ -136,7 +252,7 @@ struct WorkerArgs {
id: i32,
}
-fn op_host_close_worker(
+fn op_host_terminate_worker(
state: &State,
args: Value,
_data: Option<ZeroCopyBuf>,
@@ -144,23 +260,37 @@ fn op_host_close_worker(
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let mut state = state.borrow_mut();
-
- let maybe_worker_handle = state.workers.remove(&id);
- if let Some(worker_handle) = maybe_worker_handle {
- let mut sender = worker_handle.sender.clone();
- sender.close_channel();
-
- let mut receiver =
- futures::executor::block_on(worker_handle.receiver.lock());
- receiver.close();
- };
-
+ let worker_handle =
+ state.workers.remove(&id).expect("No worker handle found");
+ worker_handle.terminate();
Ok(JsonOp::Sync(json!({})))
}
-#[derive(Deserialize)]
-struct HostGetMessageArgs {
- id: i32,
+fn serialize_worker_event(event: WorkerEvent) -> Value {
+ match event {
+ WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
+ WorkerEvent::Error(error) => match error.kind() {
+ ErrorKind::JSError => {
+ let error = error.downcast::<JSError>().unwrap();
+ let exception: V8Exception = error.into();
+ json!({
+ "type": "error",
+ "error": {
+ "message": exception.message,
+ "fileName": exception.script_resource_name,
+ "lineNumber": exception.line_number,
+ "columnNumber": exception.start_column,
+ }
+ })
+ }
+ _ => json!({
+ "type": "error",
+ "error": {
+ "message": error.to_string(),
+ }
+ }),
+ },
+ }
}
/// Get message from guest worker as host
@@ -169,59 +299,48 @@ fn op_host_get_message(
args: Value,
_data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
- let args: HostGetMessageArgs = serde_json::from_value(args)?;
+ let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
-
- let state = state.borrow();
- // TODO: don't return bad resource anymore
- let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
- let fut = worker_handle.get_message();
+ let state_ = state.borrow();
+ let worker_handle = state_
+ .workers
+ .get(&id)
+ .expect("No worker handle found")
+ .clone();
+ let state_ = state.clone();
let op = async move {
- let maybe_buf = fut.await;
- Ok(json!({ "data": maybe_buf }))
+ let response = match worker_handle.get_event().await {
+ Some(event) => serialize_worker_event(event),
+ None => {
+ let mut state_ = state_.borrow_mut();
+ let mut handle =
+ state_.workers.remove(&id).expect("No worker handle found");
+ handle.sender.close_channel();
+ // TODO(bartlomieju): join thread handle here
+ json!({ "type": "close" })
+ }
+ };
+ Ok(response)
};
Ok(JsonOp::Async(op.boxed_local()))
}
-#[derive(Deserialize)]
-struct HostPostMessageArgs {
- id: i32,
-}
-
/// Post message to guest worker as host
fn op_host_post_message(
state: &State,
args: Value,
data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, ErrBox> {
- let args: HostPostMessageArgs = serde_json::from_value(args)?;
+ let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
debug!("post message to worker {}", id);
let state = state.borrow();
- // TODO: don't return bad resource anymore
- let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?;
+ let worker_handle = state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));
futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(json!({})))
}
-
-fn op_metrics(
- state: &State,
- _args: Value,
- _zero_copy: Option<ZeroCopyBuf>,
-) -> Result<JsonOp, ErrBox> {
- let state = state.borrow();
- let m = &state.metrics;
-
- Ok(JsonOp::Sync(json!({
- "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
- "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
- "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
- "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
- "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
- })))
-}