From 7966bf14c062a05b1606a62c996890571454ecc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 21 Jan 2020 09:49:47 +0100 Subject: refactor: split worker and worker host logic (#3722) * split ops/worker.rs into ops/worker_host.rs and ops/web_worker.rs * refactor js/workers.ts and factor out js/worker_main.ts - entry point for WebWorker runtime * BREAKING CHANGE: remove support for blob: URL in Worker * BREAKING CHANGE: remove Deno namespace support and noDenoNamespace option in Worker constructor * introduce WebWorker struct which is a stripped down version of cli::Worker --- cli/ops/mod.rs | 3 +- cli/ops/web_worker.rs | 77 ++++++++++ cli/ops/worker_host.rs | 343 ++++++++++++++++++++++++++++++++++++++++++ cli/ops/workers.rs | 394 ------------------------------------------------- 4 files changed, 422 insertions(+), 395 deletions(-) create mode 100644 cli/ops/web_worker.rs create mode 100644 cli/ops/worker_host.rs delete mode 100644 cli/ops/workers.rs (limited to 'cli/ops') diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index f93c5a060..203d1e17e 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -23,4 +23,5 @@ pub mod repl; pub mod resources; pub mod timers; pub mod tls; -pub mod workers; +pub mod web_worker; +pub mod worker_host; diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs new file mode 100644 index 000000000..300a0dfd1 --- /dev/null +++ b/cli/ops/web_worker.rs @@ -0,0 +1,77 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{JsonOp, Value}; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; +use crate::ops::json_op; +use crate::state::ThreadSafeState; +use deno_core::*; +use futures; +use futures::future::FutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std; +use std::convert::From; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op( + "worker_post_message", + s.core_op(json_op(s.stateful_op(op_worker_post_message))), + ); + i.register_op( + "worker_get_message", + s.core_op(json_op(s.stateful_op(op_worker_get_message))), + ); +} + +struct GetMessageFuture { + state: ThreadSafeState, +} + +impl Future for GetMessageFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll_next_unpin(cx) + } +} + +/// Get message from host as guest worker +fn op_worker_get_message( + state: &ThreadSafeState, + _args: Value, + _data: Option, +) -> Result { + let op = GetMessageFuture { + state: state.clone(), + }; + + let op = async move { + let maybe_buf = op.await; + debug!("op_worker_get_message"); + Ok(json!({ "data": maybe_buf })) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +/// Post message to host as guest worker +fn op_worker_post_message( + state: &ThreadSafeState, + _args: Value, + data: Option, +) -> Result { + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + let mut channels = state.worker_channels.lock().unwrap(); + let sender = &mut channels.sender; + futures::executor::block_on(sender.send(d)) + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; + + Ok(JsonOp::Sync(json!({}))) +} diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs new file mode 100644 index 000000000..c17dee444 --- /dev/null +++ b/cli/ops/worker_host.rs @@ -0,0 +1,343 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; +use crate::deno_error::js_check; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; +use crate::deno_error::GetErrorKind; +use crate::fmt_errors::JSError; +use crate::ops::json_op; +use crate::startup_data; +use crate::state::ThreadSafeState; +use crate::web_worker::WebWorker; +use deno_core::*; +use futures; +use futures::channel::mpsc; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std; +use std::convert::From; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::task::Context; +use std::task::Poll; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op( + "create_worker", + s.core_op(json_op(s.stateful_op(op_create_worker))), + ); + i.register_op( + "host_get_worker_loaded", + s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), + ); + i.register_op( + "host_poll_worker", + s.core_op(json_op(s.stateful_op(op_host_poll_worker))), + ); + i.register_op( + "host_close_worker", + s.core_op(json_op(s.stateful_op(op_host_close_worker))), + ); + i.register_op( + "host_resume_worker", + s.core_op(json_op(s.stateful_op(op_host_resume_worker))), + ); + i.register_op( + "host_post_message", + s.core_op(json_op(s.stateful_op(op_host_post_message))), + ); + i.register_op( + "host_get_message", + s.core_op(json_op(s.stateful_op(op_host_get_message))), + ); + i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); +} + +struct GetMessageFuture { + state: ThreadSafeState, +} + +impl Future for GetMessageFuture { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut channels = inner.state.worker_channels.lock().unwrap(); + let receiver = &mut channels.receiver; + receiver.poll_next_unpin(cx) + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateWorkerArgs { + specifier: String, + has_source_code: bool, + source_code: String, +} + +/// Create worker as the host +fn op_create_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: CreateWorkerArgs = serde_json::from_value(args)?; + + let specifier = args.specifier.as_ref(); + let has_source_code = args.has_source_code; + let source_code = args.source_code; + + let parent_state = state.clone(); + + // TODO(bartlomieju): Isn't this wrong? + let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; + if !has_source_code { + if let Some(referrer) = parent_state.main_module.as_ref() { + let referrer = referrer.clone().to_string(); + module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; + } + } + + let (int, ext) = ThreadSafeState::create_channels(); + let child_state = ThreadSafeState::new( + state.global_state.clone(), + Some(parent_state.permissions.clone()), // by default share with parent + Some(module_specifier.clone()), + int, + )?; + // TODO: add a new option to make child worker not sharing permissions + // with parent (aka .clone(), requests from child won't reflect in parent) + let name = format!("USER-WORKER-{}", specifier); + let mut worker = + WebWorker::new(name, startup_data::deno_isolate_init(), child_state, ext); + js_check(worker.execute("workerMain()")); + + let worker_id = parent_state.add_child_worker(worker.clone()); + + // Has provided source code, execute immediately. + if has_source_code { + js_check(worker.execute(&source_code)); + return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); + } + + let (mut sender, receiver) = mpsc::channel::>(1); + + // TODO(bartlomieju): this future should be spawned on the separate thread, + // dedicated to that worker + let fut = async move { + let result = worker + .execute_mod_async(&module_specifier, None, false) + .await; + sender.send(result).await.expect("Failed to send message"); + } + .boxed(); + tokio::spawn(fut); + let mut table = state.loading_workers.lock().unwrap(); + table.insert(worker_id, receiver); + Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) +} + +struct WorkerPollFuture { + state: ThreadSafeState, + rid: ResourceId, +} + +impl Future for WorkerPollFuture { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let mut workers_table = inner.state.workers.lock().unwrap(); + let maybe_worker = workers_table.get_mut(&inner.rid); + if maybe_worker.is_none() { + return Poll::Ready(Ok(())); + } + match maybe_worker.unwrap().poll_unpin(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } +} + +fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { + if let Err(error) = result { + match error.kind() { + ErrorKind::JSError => { + let error = error.downcast::().unwrap(); + let exception: V8Exception = error.into(); + json!({"error": { + "message": exception.message, + "fileName": exception.script_resource_name, + "lineNumber": exception.line_number, + "columnNumber": exception.start_column, + }}) + } + _ => json!({"error": { + "message": error.to_string(), + }}), + } + } else { + json!({"ok": true}) + } +} + +#[derive(Deserialize)] +struct WorkerArgs { + id: i32, +} + +fn op_host_get_worker_loaded( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let mut table = state.loading_workers.lock().unwrap(); + let mut receiver = table.remove(&id).unwrap(); + + let op = async move { + let result = receiver.next().await.unwrap(); + Ok(serialize_worker_result(result)) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +fn op_host_poll_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let future = WorkerPollFuture { + state: state.clone(), + rid: id, + }; + + let op = async move { + let result = future.await; + + if result.is_err() { + let mut workers_table = state_.workers.lock().unwrap(); + let worker = workers_table.get_mut(&id).unwrap(); + worker.clear_exception(); + } + + Ok(serialize_worker_result(result)) + }; + Ok(JsonOp::Async(op.boxed())) +} + +fn op_host_close_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let mut workers_table = state_.workers.lock().unwrap(); + let maybe_worker = workers_table.remove(&id); + if let Some(worker) = maybe_worker { + let mut channels = worker.state.worker_channels.lock().unwrap(); + channels.sender.close_channel(); + channels.receiver.close(); + }; + + Ok(JsonOp::Sync(json!({}))) +} + +fn op_host_resume_worker( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let state_ = state.clone(); + + let mut workers_table = state_.workers.lock().unwrap(); + let worker = workers_table.get_mut(&id).unwrap(); + js_check(worker.execute("workerMain()")); + Ok(JsonOp::Sync(json!({}))) +} + +#[derive(Deserialize)] +struct HostGetMessageArgs { + id: i32, +} + +/// Get message from guest worker as host +fn op_host_get_message( + state: &ThreadSafeState, + args: Value, + _data: Option, +) -> Result { + let args: HostGetMessageArgs = serde_json::from_value(args)?; + + let id = args.id as u32; + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker.get_message(); + + let op = async move { + let maybe_buf = fut.await.unwrap(); + Ok(json!({ "data": maybe_buf })) + }; + + Ok(JsonOp::Async(op.boxed())) +} + +#[derive(Deserialize)] +struct HostPostMessageArgs { + id: i32, +} + +/// Post message to guest worker as host +fn op_host_post_message( + state: &ThreadSafeState, + args: Value, + data: Option, +) -> Result { + let args: HostPostMessageArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + + debug!("post message to worker {}", id); + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + let fut = worker + .post_message(msg) + .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); + futures::executor::block_on(fut)?; + Ok(JsonOp::Sync(json!({}))) +} + +fn op_metrics( + state: &ThreadSafeState, + _args: Value, + _zero_copy: Option, +) -> Result { + let m = &state.metrics; + + Ok(JsonOp::Sync(json!({ + "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, + "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, + "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, + "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, + "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 + }))) +} diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs deleted file mode 100644 index eeffb3930..000000000 --- a/cli/ops/workers.rs +++ /dev/null @@ -1,394 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; -use crate::deno_error::bad_resource; -use crate::deno_error::js_check; -use crate::deno_error::DenoError; -use crate::deno_error::ErrorKind; -use crate::deno_error::GetErrorKind; -use crate::fmt_errors::JSError; -use crate::ops::json_op; -use crate::startup_data; -use crate::state::ThreadSafeState; -use crate::worker::Worker; -use deno_core::*; -use futures; -use futures::channel::mpsc; -use futures::future::FutureExt; -use futures::future::TryFutureExt; -use futures::sink::SinkExt; -use futures::stream::StreamExt; -use std; -use std::convert::From; -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::Ordering; -use std::task::Context; -use std::task::Poll; - -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { - i.register_op( - "create_worker", - s.core_op(json_op(s.stateful_op(op_create_worker))), - ); - i.register_op( - "host_get_worker_loaded", - s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), - ); - i.register_op( - "host_poll_worker", - s.core_op(json_op(s.stateful_op(op_host_poll_worker))), - ); - i.register_op( - "host_close_worker", - s.core_op(json_op(s.stateful_op(op_host_close_worker))), - ); - i.register_op( - "host_resume_worker", - s.core_op(json_op(s.stateful_op(op_host_resume_worker))), - ); - i.register_op( - "host_post_message", - s.core_op(json_op(s.stateful_op(op_host_post_message))), - ); - i.register_op( - "host_get_message", - s.core_op(json_op(s.stateful_op(op_host_get_message))), - ); - // TODO: make sure these two ops are only accessible to appropriate Worker - i.register_op( - "worker_post_message", - s.core_op(json_op(s.stateful_op(op_worker_post_message))), - ); - i.register_op( - "worker_get_message", - s.core_op(json_op(s.stateful_op(op_worker_get_message))), - ); - i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); -} - -struct GetMessageFuture { - state: ThreadSafeState, -} - -impl Future for GetMessageFuture { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut channels = inner.state.worker_channels.lock().unwrap(); - let receiver = &mut channels.receiver; - receiver.poll_next_unpin(cx) - } -} - -/// Get message from host as guest worker -fn op_worker_get_message( - state: &ThreadSafeState, - _args: Value, - _data: Option, -) -> Result { - let op = GetMessageFuture { - state: state.clone(), - }; - - let op = async move { - let maybe_buf = op.await; - debug!("op_worker_get_message"); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -/// Post message to host as guest worker -fn op_worker_post_message( - state: &ThreadSafeState, - _args: Value, - data: Option, -) -> Result { - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let mut channels = state.worker_channels.lock().unwrap(); - let sender = &mut channels.sender; - futures::executor::block_on(sender.send(d)) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - - Ok(JsonOp::Sync(json!({}))) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateWorkerArgs { - specifier: String, - include_deno_namespace: bool, - has_source_code: bool, - source_code: String, -} - -/// Create worker as the host -fn op_create_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: CreateWorkerArgs = serde_json::from_value(args)?; - - let specifier = args.specifier.as_ref(); - // Only include deno namespace if requested AND current worker - // has included namespace (to avoid escalation). - let include_deno_namespace = - args.include_deno_namespace && state.include_deno_namespace; - let has_source_code = args.has_source_code; - let source_code = args.source_code; - - let parent_state = state.clone(); - - // TODO(bartlomieju): Isn't this wrong? - let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?; - if !has_source_code { - if let Some(referrer) = parent_state.main_module.as_ref() { - let referrer = referrer.clone().to_string(); - module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?; - } - } - - let (int, ext) = ThreadSafeState::create_channels(); - let child_state = ThreadSafeState::new( - state.global_state.clone(), - Some(parent_state.permissions.clone()), // by default share with parent - Some(module_specifier.clone()), - include_deno_namespace, - int, - )?; - // TODO: add a new option to make child worker not sharing permissions - // with parent (aka .clone(), requests from child won't reflect in parent) - let name = format!("USER-WORKER-{}", specifier); - let deno_main_call = format!("denoMain({})", include_deno_namespace); - let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), child_state, ext); - js_check(worker.execute(&deno_main_call)); - js_check(worker.execute("workerMain()")); - - let worker_id = parent_state.add_child_worker(worker.clone()); - - // Has provided source code, execute immediately. - if has_source_code { - js_check(worker.execute(&source_code)); - return Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": true}))); - } - - let (mut sender, receiver) = mpsc::channel::>(1); - - // TODO(bartlomieju): this future should be spawned on the separate thread, - // dedicated to that worker - let fut = async move { - let result = worker - .execute_mod_async(&module_specifier, None, false) - .await; - sender.send(result).await.expect("Failed to send message"); - } - .boxed(); - tokio::spawn(fut); - let mut table = state.loading_workers.lock().unwrap(); - table.insert(worker_id, receiver); - Ok(JsonOp::Sync(json!({"id": worker_id, "loaded": false}))) -} - -struct WorkerPollFuture { - state: ThreadSafeState, - rid: ResourceId, -} - -impl Future for WorkerPollFuture { - type Output = Result<(), ErrBox>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut workers_table = inner.state.workers.lock().unwrap(); - let maybe_worker = workers_table.get_mut(&inner.rid); - if maybe_worker.is_none() { - return Poll::Ready(Ok(())); - } - match maybe_worker.unwrap().poll_unpin(cx) { - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Pending => Poll::Pending, - } - } -} - -fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { - if let Err(error) = result { - match error.kind() { - ErrorKind::JSError => { - let error = error.downcast::().unwrap(); - let exception: V8Exception = error.into(); - json!({"error": { - "message": exception.message, - "fileName": exception.script_resource_name, - "lineNumber": exception.line_number, - "columnNumber": exception.start_column, - }}) - } - _ => json!({"error": { - "message": error.to_string(), - }}), - } - } else { - json!({"ok": true}) - } -} - -#[derive(Deserialize)] -struct WorkerArgs { - id: i32, -} - -fn op_host_get_worker_loaded( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let mut table = state.loading_workers.lock().unwrap(); - let mut receiver = table.remove(&id).unwrap(); - - let op = async move { - let result = receiver.next().await.unwrap(); - Ok(serialize_worker_result(result)) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -fn op_host_poll_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state_ = state.clone(); - - let future = WorkerPollFuture { - state: state.clone(), - rid: id, - }; - - let op = async move { - let result = future.await; - - if result.is_err() { - let mut workers_table = state_.workers.lock().unwrap(); - let worker = workers_table.get_mut(&id).unwrap(); - worker.clear_exception(); - } - - Ok(serialize_worker_result(result)) - }; - Ok(JsonOp::Async(op.boxed())) -} - -fn op_host_close_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state_ = state.clone(); - - let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker = workers_table.remove(&id); - if let Some(worker) = maybe_worker { - let mut channels = worker.state.worker_channels.lock().unwrap(); - channels.sender.close_channel(); - channels.receiver.close(); - }; - - Ok(JsonOp::Sync(json!({}))) -} - -fn op_host_resume_worker( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state_ = state.clone(); - - let mut workers_table = state_.workers.lock().unwrap(); - let worker = workers_table.get_mut(&id).unwrap(); - js_check(worker.execute("workerMain()")); - Ok(JsonOp::Sync(json!({}))) -} - -#[derive(Deserialize)] -struct HostGetMessageArgs { - id: i32, -} - -/// Get message from guest worker as host -fn op_host_get_message( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: HostGetMessageArgs = serde_json::from_value(args)?; - - let id = args.id as u32; - let mut table = state.workers.lock().unwrap(); - // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker.get_message(); - - let op = async move { - let maybe_buf = fut.await.unwrap(); - Ok(json!({ "data": maybe_buf })) - }; - - Ok(JsonOp::Async(op.boxed())) -} - -#[derive(Deserialize)] -struct HostPostMessageArgs { - id: i32, -} - -/// Post message to guest worker as host -fn op_host_post_message( - state: &ThreadSafeState, - args: Value, - data: Option, -) -> Result { - let args: HostPostMessageArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - - debug!("post message to worker {}", id); - let mut table = state.workers.lock().unwrap(); - // TODO: don't return bad resource anymore - let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - let fut = worker - .post_message(msg) - .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); - futures::executor::block_on(fut)?; - Ok(JsonOp::Sync(json!({}))) -} - -fn op_metrics( - state: &ThreadSafeState, - _args: Value, - _zero_copy: Option, -) -> Result { - let m = &state.metrics; - - Ok(JsonOp::Sync(json!({ - "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64, - "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64, - "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64, - "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64, - "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64 - }))) -} -- cgit v1.2.3