summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-01-21 09:49:47 +0100
committerGitHub <noreply@github.com>2020-01-21 09:49:47 +0100
commit7966bf14c062a05b1606a62c996890571454ecc8 (patch)
tree65bede64b47707c3accc80d0bb18e99840c639f7 /cli/ops
parentc90036ab88bb1ae6b9c87d5e368f56d8c8afab69 (diff)
refactor: split worker and worker host logic (#3722)
* split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs * refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime * BREAKING CHANGE: remove support for blob: URL in Worker * BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor * introduce WebWorker struct which is a stripped down version of cli::Worker
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/mod.rs3
-rw-r--r--cli/ops/web_worker.rs77
-rw-r--r--cli/ops/worker_host.rs (renamed from cli/ops/workers.rs)55
3 files changed, 81 insertions, 54 deletions
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index f93c5a060..203d1e17e 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -23,4 +23,5 @@ pub mod repl;
pub mod resources;
pub mod timers;
pub mod tls;
-pub mod workers;
+pub mod web_worker;
+pub mod worker_host;
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs
new file mode 100644
index 000000000..300a0dfd1
--- /dev/null
+++ b/cli/ops/web_worker.rs
@@ -0,0 +1,77 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use super::dispatch_json::{JsonOp, Value};
+use crate::deno_error::DenoError;
+use crate::deno_error::ErrorKind;
+use crate::ops::json_op;
+use crate::state::ThreadSafeState;
+use deno_core::*;
+use futures;
+use futures::future::FutureExt;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std;
+use std::convert::From;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
+ i.register_op(
+ "worker_post_message",
+ s.core_op(json_op(s.stateful_op(op_worker_post_message))),
+ );
+ i.register_op(
+ "worker_get_message",
+ s.core_op(json_op(s.stateful_op(op_worker_get_message))),
+ );
+}
+
+struct GetMessageFuture {
+ state: ThreadSafeState,
+}
+
+impl Future for GetMessageFuture {
+ type Output = Option<Buf>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut channels = inner.state.worker_channels.lock().unwrap();
+ let receiver = &mut channels.receiver;
+ receiver.poll_next_unpin(cx)
+ }
+}
+
+/// Get message from host as guest worker
+fn op_worker_get_message(
+ state: &ThreadSafeState,
+ _args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let op = GetMessageFuture {
+ state: state.clone(),
+ };
+
+ let op = async move {
+ let maybe_buf = op.await;
+ debug!("op_worker_get_message");
+ Ok(json!({ "data": maybe_buf }))
+ };
+
+ Ok(JsonOp::Async(op.boxed()))
+}
+
+/// Post message to host as guest worker
+fn op_worker_post_message(
+ state: &ThreadSafeState,
+ _args: Value,
+ data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
+ let mut channels = state.worker_channels.lock().unwrap();
+ let sender = &mut channels.sender;
+ futures::executor::block_on(sender.send(d))
+ .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
+
+ Ok(JsonOp::Sync(json!({})))
+}
diff --git a/cli/ops/workers.rs b/cli/ops/worker_host.rs
index eeffb3930..c17dee444 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/worker_host.rs
@@ -9,7 +9,7 @@ use crate::fmt_errors::JSError;
use crate::ops::json_op;
use crate::startup_data;
use crate::state::ThreadSafeState;
-use crate::worker::Worker;
+use crate::web_worker::WebWorker;
use deno_core::*;
use futures;
use futures::channel::mpsc;
@@ -54,15 +54,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
"host_get_message",
s.core_op(json_op(s.stateful_op(op_host_get_message))),
);
- // TODO: make sure these two ops are only accessible to appropriate Worker
- i.register_op(
- "worker_post_message",
- s.core_op(json_op(s.stateful_op(op_worker_post_message))),
- );
- i.register_op(
- "worker_get_message",
- s.core_op(json_op(s.stateful_op(op_worker_get_message))),
- );
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
}
@@ -81,45 +72,10 @@ impl Future for GetMessageFuture {
}
}
-/// Get message from host as guest worker
-fn op_worker_get_message(
- state: &ThreadSafeState,
- _args: Value,
- _data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let op = GetMessageFuture {
- state: state.clone(),
- };
-
- let op = async move {
- let maybe_buf = op.await;
- debug!("op_worker_get_message");
- Ok(json!({ "data": maybe_buf }))
- };
-
- Ok(JsonOp::Async(op.boxed()))
-}
-
-/// Post message to host as guest worker
-fn op_worker_post_message(
- state: &ThreadSafeState,
- _args: Value,
- data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- let mut channels = state.worker_channels.lock().unwrap();
- let sender = &mut channels.sender;
- futures::executor::block_on(sender.send(d))
- .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
-
- Ok(JsonOp::Sync(json!({})))
-}
-
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateWorkerArgs {
specifier: String,
- include_deno_namespace: bool,
has_source_code: bool,
source_code: String,
}
@@ -133,10 +89,6 @@ fn op_create_worker(
let args: CreateWorkerArgs = serde_json::from_value(args)?;
let specifier = args.specifier.as_ref();
- // Only include deno namespace if requested AND current worker
- // has included namespace (to avoid escalation).
- let include_deno_namespace =
- args.include_deno_namespace && state.include_deno_namespace;
let has_source_code = args.has_source_code;
let source_code = args.source_code;
@@ -156,16 +108,13 @@ fn op_create_worker(
state.global_state.clone(),
Some(parent_state.permissions.clone()), // by default share with parent
Some(module_specifier.clone()),
- include_deno_namespace,
int,
)?;
// TODO: add a new option to make child worker not sharing permissions
// with parent (aka .clone(), requests from child won't reflect in parent)
let name = format!("USER-WORKER-{}", specifier);
- let deno_main_call = format!("denoMain({})", include_deno_namespace);
let mut worker =
- Worker::new(name, startup_data::deno_isolate_init(), child_state, ext);
- js_check(worker.execute(&deno_main_call));
+ WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute("workerMain()"));
let worker_id = parent_state.add_child_worker(worker.clone());