diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-01-21 09:49:47 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-21 09:49:47 +0100 |
commit | 7966bf14c062a05b1606a62c996890571454ecc8 (patch) | |
tree | 65bede64b47707c3accc80d0bb18e99840c639f7 /cli/ops | |
parent | c90036ab88bb1ae6b9c87d5e368f56d8c8afab69 (diff) |
refactor: split worker and worker host logic (#3722)
* split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs
* refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime
* BREAKING CHANGE: remove support for blob: URL in Worker
* BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor
* introduce WebWorker struct which is a stripped down version of cli::Worker
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/mod.rs | 3 | ||||
-rw-r--r-- | cli/ops/web_worker.rs | 77 | ||||
-rw-r--r-- | cli/ops/worker_host.rs (renamed from cli/ops/workers.rs) | 55 |
3 files changed, 81 insertions, 54 deletions
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index f93c5a060..203d1e17e 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -23,4 +23,5 @@ pub mod repl; pub mod resources; pub mod timers; pub mod tls; -pub mod workers; +pub mod web_worker; +pub mod worker_host; diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs new file mode 100644 index 000000000..300a0dfd1 --- /dev/null +++ b/cli/ops/web_worker.rs @@ -0,0 +1,77 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{JsonOp, Value}; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; +use crate::ops::json_op; +use crate::state::ThreadSafeState; +use deno_core::*; +use futures; +use futures::future::FutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std; +use std::convert::From; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op( + "worker_post_message", + s.core_op(json_op(s.stateful_op(op_worker_post_message))), + ); + i.register_op( + "worker_get_message", + s.core_op(json_op(s.stateful_op(op_worker_get_message))), + ); +} + +struct GetMessageFuture { + state: ThreadSafeState, +} + +impl Future for GetMessageFuture { + type Output = Option<Buf>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll_next_unpin(cx) + } +} + +/// Get message from host as guest worker +fn op_worker_get_message( + state: &ThreadSafeState, + _args: Value, + _data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let op = GetMessageFuture { + state: state.clone(), + }; + + let op = async move { + let maybe_buf = op.await; + debug!("op_worker_get_message"); + Ok(json!({ "data": maybe_buf })) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +/// Post message to host as guest worker +fn op_worker_post_message( + state: &ThreadSafeState, + _args: Value, + data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + let mut channels = state.worker_channels.lock().unwrap(); + let sender = &mut channels.sender; + futures::executor::block_on(sender.send(d)) + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + + Ok(JsonOp::Sync(json!({}))) +} diff --git a/cli/ops/workers.rs b/cli/ops/worker_host.rs index eeffb3930..c17dee444 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/worker_host.rs @@ -9,7 +9,7 @@ use crate::fmt_errors::JSError; use crate::ops::json_op; use crate::startup_data; use crate::state::ThreadSafeState; -use crate::worker::Worker; +use crate::web_worker::WebWorker; use deno_core::*; use futures; use futures::channel::mpsc; @@ -54,15 +54,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { "host_get_message", s.core_op(json_op(s.stateful_op(op_host_get_message))), ); - // TODO: make sure these two ops are only accessible to appropriate Worker - i.register_op( - "worker_post_message", - s.core_op(json_op(s.stateful_op(op_worker_post_message))), - ); - i.register_op( - "worker_get_message", - s.core_op(json_op(s.stateful_op(op_worker_get_message))), - ); i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); } @@ -81,45 +72,10 @@ impl Future for GetMessageFuture { } } -/// Get message from host as guest worker -fn op_worker_get_message( - state: &ThreadSafeState, - _args: Value, - _data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let op = GetMessageFuture { - state: state.clone(), - }; - - let op = async move { - let maybe_buf = op.await; - debug!("op_worker_get_message"); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -/// Post message to host as guest worker -fn op_worker_post_message( - state: &ThreadSafeState, - _args: Value, - data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let mut channels = state.worker_channels.lock().unwrap(); - let sender = &mut channels.sender; - futures::executor::block_on(sender.send(d)) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - - Ok(JsonOp::Sync(json!({}))) -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateWorkerArgs { specifier: String, - include_deno_namespace: bool, has_source_code: bool, source_code: String, } @@ -133,10 +89,6 @@ fn op_create_worker( let args: CreateWorkerArgs = serde_json::from_value(args)?; let specifier = args.specifier.as_ref(); - // Only include deno namespace if requested AND current worker - // has included namespace (to avoid escalation). - let include_deno_namespace = - args.include_deno_namespace && state.include_deno_namespace; let has_source_code = args.has_source_code; let source_code = args.source_code; @@ -156,16 +108,13 @@ fn op_create_worker( state.global_state.clone(), Some(parent_state.permissions.clone()), // by default share with parent Some(module_specifier.clone()), - include_deno_namespace, int, )?; // TODO: add a new option to make child worker not sharing permissions // with parent (aka .clone(), requests from child won't reflect in parent) let name = format!("USER-WORKER-{}", specifier); - let deno_main_call = format!("denoMain({})", include_deno_namespace); let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), child_state, ext); - js_check(worker.execute(&deno_main_call)); + WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext); js_check(worker.execute("workerMain()")); let worker_id = parent_state.add_child_worker(worker.clone()); |