diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/compiler.rs | 6 | ||||
-rw-r--r-- | cli/ops/dispatch_flatbuffers.rs | 216 | ||||
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 114 | ||||
-rw-r--r-- | cli/ops/errors.rs | 5 | ||||
-rw-r--r-- | cli/ops/fetch.rs | 4 | ||||
-rw-r--r-- | cli/ops/files.rs | 6 | ||||
-rw-r--r-- | cli/ops/fs.rs | 8 | ||||
-rw-r--r-- | cli/ops/io.rs | 43 | ||||
-rw-r--r-- | cli/ops/metrics.rs | 5 | ||||
-rw-r--r-- | cli/ops/mod.rs | 291 | ||||
-rw-r--r-- | cli/ops/net.rs | 13 | ||||
-rw-r--r-- | cli/ops/os.rs | 6 | ||||
-rw-r--r-- | cli/ops/performance.rs | 5 | ||||
-rw-r--r-- | cli/ops/permissions.rs | 6 | ||||
-rw-r--r-- | cli/ops/process.rs | 6 | ||||
-rw-r--r-- | cli/ops/random.rs | 4 | ||||
-rw-r--r-- | cli/ops/repl.rs | 8 | ||||
-rw-r--r-- | cli/ops/resources.rs | 6 | ||||
-rw-r--r-- | cli/ops/timers.rs | 6 | ||||
-rw-r--r-- | cli/ops/utils.rs | 48 | ||||
-rw-r--r-- | cli/ops/workers.rs | 8 |
21 files changed, 478 insertions, 336 deletions
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 8d75668c1..efdcd2c9b 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -1,10 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use crate::tokio_util; use deno::*; diff --git a/cli/ops/dispatch_flatbuffers.rs b/cli/ops/dispatch_flatbuffers.rs new file mode 100644 index 000000000..2b2e5050d --- /dev/null +++ b/cli/ops/dispatch_flatbuffers.rs @@ -0,0 +1,216 @@ +use super::utils::CliOpResult; +use crate::deno_error::GetErrorKind; +use crate::msg; +use crate::state::ThreadSafeState; +use deno::*; +use flatbuffers::FlatBufferBuilder; +use hyper::rt::Future; + +use super::compiler::{op_cache, op_fetch_source_file}; +use super::errors::{op_apply_source_map, op_format_error}; +use super::fetch::op_fetch; +use super::files::{op_close, op_open, op_read, op_seek, op_write}; +use super::fs::{ + op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link, + op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename, + op_stat, op_symlink, op_truncate, op_utime, +}; +use super::metrics::op_metrics; +use super::net::{op_accept, op_dial, op_listen, op_shutdown}; +use super::os::{ + op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start, +}; +use super::performance::op_now; +use super::permissions::{op_permissions, op_revoke_permission}; +use super::process::{op_kill, op_run, op_run_status}; +use super::random::op_get_random_values; +use super::repl::{op_repl_readline, op_repl_start}; +use super::resources::op_resources; +use super::timers::{op_global_timer, op_global_timer_stop}; +use super::workers::{ + op_create_worker, op_host_get_message, op_host_get_worker_closed, + op_host_post_message, op_worker_get_message, op_worker_post_message, +}; + +type CliDispatchFn = fn( + state: &ThreadSafeState, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult; + +/// Processes raw messages from JavaScript. +/// This functions invoked every time Deno.core.dispatch() is called. +/// control corresponds to the first argument of Deno.core.dispatch(). +/// data corresponds to the second argument of Deno.core.dispatch(). +pub fn dispatch( + state: &ThreadSafeState, + control: &[u8], + zero_copy: Option<PinnedBuf>, +) -> CoreOp { + let base = msg::get_root_as_base(&control); + let inner_type = base.inner_type(); + let is_sync = base.sync(); + let cmd_id = base.cmd_id(); + + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(inner_type), + is_sync + ); + + let op_func: CliDispatchFn = match op_selector_std(inner_type) { + Some(v) => v, + None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)), + }; + + let op_result = op_func(state, &base, zero_copy); + + let state = state.clone(); + + match op_result { + Ok(Op::Sync(buf)) => { + state.metrics_op_completed(buf.len()); + Op::Sync(buf) + } + Ok(Op::Async(fut)) => { + let result_fut = Box::new( + fut + .or_else(move |err: ErrBox| -> Result<Buf, ()> { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a Buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }) + .and_then(move |buf: Buf| -> Result<Buf, ()> { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if buf.len() > 0 { + buf + } else { + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) + }; + state.metrics_op_completed(buf.len()); + Ok(buf) + }) + .map_err(|err| panic!("unexpected error {:?}", err)), + ); + Op::Async(result_fut) + } + Err(err) => { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a Buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + let response_buf = serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + ); + state.metrics_op_completed(response_buf.len()); + Op::Sync(response_buf) + } + } +} + +pub fn serialize_response( + cmd_id: u32, + builder: &mut FlatBufferBuilder<'_>, + mut args: msg::BaseArgs<'_>, +) -> Buf { + args.cmd_id = cmd_id; + let base = msg::Base::create(builder, &args); + msg::finish_base_buffer(builder, base); + let data = builder.finished_data(); + // println!("serialize_response {:x?}", data); + data.into() +} + +/// Standard ops set for most isolates +pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> { + match inner_type { + msg::Any::Accept => Some(op_accept), + msg::Any::ApplySourceMap => Some(op_apply_source_map), + msg::Any::Cache => Some(op_cache), + msg::Any::Chdir => Some(op_chdir), + msg::Any::Chmod => Some(op_chmod), + msg::Any::Chown => Some(op_chown), + msg::Any::Close => Some(op_close), + msg::Any::CopyFile => Some(op_copy_file), + msg::Any::CreateWorker => Some(op_create_worker), + msg::Any::Cwd => Some(op_cwd), + msg::Any::Dial => Some(op_dial), + msg::Any::Environ => Some(op_env), + msg::Any::ExecPath => Some(op_exec_path), + msg::Any::Exit => Some(op_exit), + msg::Any::Fetch => Some(op_fetch), + msg::Any::FetchSourceFile => Some(op_fetch_source_file), + msg::Any::FormatError => Some(op_format_error), + msg::Any::GetRandomValues => Some(op_get_random_values), + msg::Any::GlobalTimer => Some(op_global_timer), + msg::Any::GlobalTimerStop => Some(op_global_timer_stop), + msg::Any::HostGetMessage => Some(op_host_get_message), + msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), + msg::Any::HostPostMessage => Some(op_host_post_message), + msg::Any::IsTTY => Some(op_is_tty), + msg::Any::Kill => Some(op_kill), + msg::Any::Link => Some(op_link), + msg::Any::Listen => Some(op_listen), + msg::Any::MakeTempDir => Some(op_make_temp_dir), + msg::Any::Metrics => Some(op_metrics), + msg::Any::Mkdir => Some(op_mkdir), + msg::Any::Now => Some(op_now), + msg::Any::Open => Some(op_open), + msg::Any::PermissionRevoke => Some(op_revoke_permission), + msg::Any::Permissions => Some(op_permissions), + msg::Any::Read => Some(op_read), + msg::Any::ReadDir => Some(op_read_dir), + msg::Any::Readlink => Some(op_read_link), + msg::Any::Remove => Some(op_remove), + msg::Any::Rename => Some(op_rename), + msg::Any::ReplReadline => Some(op_repl_readline), + msg::Any::ReplStart => Some(op_repl_start), + msg::Any::Resources => Some(op_resources), + msg::Any::Run => Some(op_run), + msg::Any::RunStatus => Some(op_run_status), + msg::Any::Seek => Some(op_seek), + msg::Any::SetEnv => Some(op_set_env), + msg::Any::Shutdown => Some(op_shutdown), + msg::Any::Start => Some(op_start), + msg::Any::Stat => Some(op_stat), + msg::Any::Symlink => Some(op_symlink), + msg::Any::Truncate => Some(op_truncate), + msg::Any::HomeDir => Some(op_home_dir), + msg::Any::Utime => Some(op_utime), + msg::Any::Write => Some(op_write), + + // TODO(ry) split these out so that only the appropriate Workers can access + // them. + msg::Any::WorkerGetMessage => Some(op_worker_get_message), + msg::Any::WorkerPostMessage => Some(op_worker_post_message), + + _ => None, + } +} diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs new file mode 100644 index 000000000..37ad56813 --- /dev/null +++ b/cli/ops/dispatch_minimal.rs @@ -0,0 +1,114 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +// Do not add flatbuffer dependencies to this module. +//! Connects to js/dispatch_minimal.ts sendAsyncMinimal This acts as a faster +//! alternative to flatbuffers using a very simple list of int32s to lay out +//! messages. The first i32 is used to determine if a message a flatbuffer +//! message or a "minimal" message. +use crate::state::ThreadSafeState; +use deno::Buf; +use deno::CoreOp; +use deno::ErrBox; +use deno::Op; +use deno::PinnedBuf; +use futures::Future; + +pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send; +pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>; + +#[derive(Copy, Clone, Debug, PartialEq)] +// This corresponds to RecordMinimal on the TS side. +pub struct Record { + pub promise_id: i32, + pub arg: i32, + pub result: i32, +} + +impl Into<Buf> for Record { + fn into(self) -> Buf { + let vec = vec![self.promise_id, self.arg, self.result]; + let buf32 = vec.into_boxed_slice(); + let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; + unsafe { Box::from_raw(ptr) } + } +} + +pub fn parse_min_record(bytes: &[u8]) -> Option<Record> { + if bytes.len() % std::mem::size_of::<i32>() != 0 { + return None; + } + let p = bytes.as_ptr(); + #[allow(clippy::cast_ptr_alignment)] + let p32 = p as *const i32; + let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }; + + if s.len() != 3 { + return None; + } + let ptr = s.as_ptr(); + let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; + Some(Record { + promise_id: ints[0], + arg: ints[1], + result: ints[2], + }) +} + +#[test] +fn test_parse_min_record() { + let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]; + assert_eq!( + parse_min_record(&buf), + Some(Record { + promise_id: 1, + arg: 3, + result: 4, + }) + ); + + let buf = vec![]; + assert_eq!(parse_min_record(&buf), None); + + let buf = vec![5]; + assert_eq!(parse_min_record(&buf), None); +} + +pub fn dispatch( + d: Dispatcher, + state: &ThreadSafeState, + control: &[u8], + zero_copy: Option<PinnedBuf>, +) -> CoreOp { + let mut record = parse_min_record(control).unwrap(); + let is_sync = record.promise_id == 0; + + // TODO(ry) Currently there aren't any sync minimal ops. This is just a sanity + // check. Remove later. + assert!(!is_sync); + + let state = state.clone(); + + let rid = record.arg; + let min_op = d(rid, zero_copy); + + let fut = Box::new(min_op.then(move |result| -> Result<Buf, ()> { + match result { + Ok(r) => { + record.result = r; + } + Err(err) => { + // TODO(ry) The dispatch_minimal doesn't properly pipe errors back to + // the caller. + debug!("swallowed err {}", err); + record.result = -1; + } + } + let buf: Buf = record.into(); + state.metrics_op_completed(buf.len()); + Ok(buf) + })); + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } +} diff --git a/cli/ops/errors.rs b/cli/ops/errors.rs index 16dfc34fd..a27f3656e 100644 --- a/cli/ops/errors.rs +++ b/cli/ops/errors.rs @@ -1,10 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error; use crate::fmt_errors::JSError; use crate::msg; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::source_maps::get_orig_position; use crate::source_maps::CachedMaps; use crate::state::ThreadSafeState; diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index e4f57972f..7661eb6e9 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -1,9 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::CliOpResult; use crate::http_util; use crate::msg; use crate::msg_util; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::resources; use crate::state::ThreadSafeState; use deno::*; diff --git a/cli/ops/files.rs b/cli/ops/files.rs index ce3285623..023bd65f9 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -1,11 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error; use crate::fs as deno_fs; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::resources; use crate::state::ThreadSafeState; use crate::tokio_write; diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs index 6530e3816..d46ed91e1 100644 --- a/cli/ops/fs.rs +++ b/cli/ops/fs.rs @@ -1,18 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::msg; -use crate::ops::blocking; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use deno::*; use flatbuffers::FlatBufferBuilder; use remove_dir_all::remove_dir_all; -use std; use std::convert::From; use std::fs; use std::path::PathBuf; diff --git a/cli/ops/io.rs b/cli/ops/io.rs new file mode 100644 index 000000000..610238942 --- /dev/null +++ b/cli/ops/io.rs @@ -0,0 +1,43 @@ +use super::dispatch_minimal::MinimalOp; +use crate::deno_error; +use crate::resources; +use crate::tokio_write; +use deno::ErrBox; +use deno::PinnedBuf; +use futures::Future; + +pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { + debug!("read rid={}", rid); + let zero_copy = match zero_copy { + None => { + return Box::new(futures::future::err(deno_error::no_buffer_specified())) + } + Some(buf) => buf, + }; + match resources::lookup(rid as u32) { + None => Box::new(futures::future::err(deno_error::bad_resource())), + Some(resource) => Box::new( + tokio::io::read(resource, zero_copy) + .map_err(ErrBox::from) + .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)), + ), + } +} + +pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { + debug!("write rid={}", rid); + let zero_copy = match zero_copy { + None => { + return Box::new(futures::future::err(deno_error::no_buffer_specified())) + } + Some(buf) => buf, + }; + match resources::lookup(rid as u32) { + None => Box::new(futures::future::err(deno_error::bad_resource())), + Some(resource) => Box::new( + tokio_write::write(resource, zero_copy) + .map_err(ErrBox::from) + .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)), + ), + } +} diff --git a/cli/ops/metrics.rs b/cli/ops/metrics.rs index 7d821f1fa..76f36c390 100644 --- a/cli/ops/metrics.rs +++ b/cli/ops/metrics.rs @@ -1,8 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::msg; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use deno::*; use flatbuffers::FlatBufferBuilder; diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 021c0fa47..92c0f8e62 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -1,305 +1,52 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::deno_error::GetErrorKind; -use crate::dispatch_minimal::dispatch_minimal; -use crate::dispatch_minimal::parse_min_record; -use crate::msg; use crate::state::ThreadSafeState; -use crate::tokio_util; use deno::*; -use flatbuffers::FlatBufferBuilder; -use futures; -use futures::Poll; -use hyper; -use hyper::rt::Future; -use tokio_threadpool; mod compiler; -use compiler::{op_cache, op_fetch_source_file}; +mod dispatch_flatbuffers; +mod dispatch_minimal; mod errors; -use errors::{op_apply_source_map, op_format_error}; -mod files; -use files::{op_close, op_open, op_read, op_seek, op_write}; mod fetch; -use fetch::op_fetch; +mod files; mod fs; -use fs::{ - op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link, - op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename, - op_stat, op_symlink, op_truncate, op_utime, -}; +mod io; mod metrics; -use metrics::op_metrics; mod net; -use net::{op_accept, op_dial, op_listen, op_shutdown}; mod os; -use os::{ - op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start, -}; mod performance; -use performance::op_now; mod permissions; -use permissions::{op_permissions, op_revoke_permission}; mod process; -use process::{op_kill, op_run, op_run_status}; mod random; -use random::op_get_random_values; mod repl; -use repl::{op_repl_readline, op_repl_start}; mod resources; -use resources::op_resources; mod timers; -use timers::{op_global_timer, op_global_timer_stop}; +mod utils; mod workers; -use workers::{ - op_create_worker, op_host_get_message, op_host_get_worker_closed, - op_host_post_message, op_worker_get_message, op_worker_post_message, -}; - -type CliOpResult = OpResult<ErrBox>; - -type CliDispatchFn = fn( - state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult; -pub type OpSelector = fn(inner_type: msg::Any) -> Option<CliDispatchFn>; +pub const OP_FLATBUFFER: OpId = 44; +pub const OP_READ: OpId = 1; +pub const OP_WRITE: OpId = 2; -#[inline] -fn empty_buf() -> Buf { - Box::new([]) -} - -const FLATBUFFER_OP_ID: OpId = 44; - -pub fn dispatch_all( +pub fn dispatch( state: &ThreadSafeState, op_id: OpId, control: &[u8], zero_copy: Option<PinnedBuf>, - op_selector: OpSelector, ) -> CoreOp { let bytes_sent_control = control.len(); let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); - let op = if op_id != FLATBUFFER_OP_ID { - let min_record = parse_min_record(control).unwrap(); - dispatch_minimal(state, op_id, min_record, zero_copy) - } else { - dispatch_all_legacy(state, control, zero_copy, op_selector) - }; - state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); - op -} -/// Processes raw messages from JavaScript. -/// This functions invoked every time Deno.core.dispatch() is called. -/// control corresponds to the first argument of Deno.core.dispatch(). -/// data corresponds to the second argument of Deno.core.dispatch(). -pub fn dispatch_all_legacy( - state: &ThreadSafeState, - control: &[u8], - zero_copy: Option<PinnedBuf>, - op_selector: OpSelector, -) -> CoreOp { - let base = msg::get_root_as_base(&control); - let inner_type = base.inner_type(); - let is_sync = base.sync(); - let cmd_id = base.cmd_id(); - - debug!( - "msg_from_js {} sync {}", - msg::enum_name_any(inner_type), - is_sync - ); - - let op_func: CliDispatchFn = match op_selector(inner_type) { - Some(v) => v, - None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)), - }; - - let op_result = op_func(state, &base, zero_copy); - - let state = state.clone(); - - match op_result { - Ok(Op::Sync(buf)) => { - state.metrics_op_completed(buf.len()); - Op::Sync(buf) + let op = match op_id { + OP_READ => { + dispatch_minimal::dispatch(io::op_read, state, control, zero_copy) } - Ok(Op::Async(fut)) => { - let result_fut = Box::new( - fut - .or_else(move |err: ErrBox| -> Result<Buf, ()> { - debug!("op err {}", err); - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a Buf. - let builder = &mut FlatBufferBuilder::new(); - let errmsg_offset = builder.create_string(&format!("{}", err)); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - )) - }) - .and_then(move |buf: Buf| -> Result<Buf, ()> { - // Handle empty responses. For sync responses we just want - // to send null. For async we want to send a small message - // with the cmd_id. - let buf = if buf.len() > 0 { - buf - } else { - let builder = &mut FlatBufferBuilder::new(); - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - ) - }; - state.metrics_op_completed(buf.len()); - Ok(buf) - }) - .map_err(|err| panic!("unexpected error {:?}", err)), - ); - Op::Async(result_fut) + OP_WRITE => { + dispatch_minimal::dispatch(io::op_write, state, control, zero_copy) } - Err(err) => { - debug!("op err {}", err); - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a Buf. - let builder = &mut FlatBufferBuilder::new(); - let errmsg_offset = builder.create_string(&format!("{}", err)); - let response_buf = serialize_response( - cmd_id, - builder, - msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - ); - state.metrics_op_completed(response_buf.len()); - Op::Sync(response_buf) - } - } -} - -/// Standard ops set for most isolates -pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> { - match inner_type { - msg::Any::Accept => Some(op_accept), - msg::Any::ApplySourceMap => Some(op_apply_source_map), - msg::Any::Cache => Some(op_cache), - msg::Any::Chdir => Some(op_chdir), - msg::Any::Chmod => Some(op_chmod), - msg::Any::Chown => Some(op_chown), - msg::Any::Close => Some(op_close), - msg::Any::CopyFile => Some(op_copy_file), - msg::Any::CreateWorker => Some(op_create_worker), - msg::Any::Cwd => Some(op_cwd), - msg::Any::Dial => Some(op_dial), - msg::Any::Environ => Some(op_env), - msg::Any::ExecPath => Some(op_exec_path), - msg::Any::Exit => Some(op_exit), - msg::Any::Fetch => Some(op_fetch), - msg::Any::FetchSourceFile => Some(op_fetch_source_file), - msg::Any::FormatError => Some(op_format_error), - msg::Any::GetRandomValues => Some(op_get_random_values), - msg::Any::GlobalTimer => Some(op_global_timer), - msg::Any::GlobalTimerStop => Some(op_global_timer_stop), - msg::Any::HostGetMessage => Some(op_host_get_message), - msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), - msg::Any::HostPostMessage => Some(op_host_post_message), - msg::Any::IsTTY => Some(op_is_tty), - msg::Any::Kill => Some(op_kill), - msg::Any::Link => Some(op_link), - msg::Any::Listen => Some(op_listen), - msg::Any::MakeTempDir => Some(op_make_temp_dir), - msg::Any::Metrics => Some(op_metrics), - msg::Any::Mkdir => Some(op_mkdir), - msg::Any::Now => Some(op_now), - msg::Any::Open => Some(op_open), - msg::Any::PermissionRevoke => Some(op_revoke_permission), - msg::Any::Permissions => Some(op_permissions), - msg::Any::Read => Some(op_read), - msg::Any::ReadDir => Some(op_read_dir), - msg::Any::Readlink => Some(op_read_link), - msg::Any::Remove => Some(op_remove), - msg::Any::Rename => Some(op_rename), - msg::Any::ReplReadline => Some(op_repl_readline), - msg::Any::ReplStart => Some(op_repl_start), - msg::Any::Resources => Some(op_resources), - msg::Any::Run => Some(op_run), - msg::Any::RunStatus => Some(op_run_status), - msg::Any::Seek => Some(op_seek), - msg::Any::SetEnv => Some(op_set_env), - msg::Any::Shutdown => Some(op_shutdown), - msg::Any::Start => Some(op_start), - msg::Any::Stat => Some(op_stat), - msg::Any::Symlink => Some(op_symlink), - msg::Any::Truncate => Some(op_truncate), - msg::Any::HomeDir => Some(op_home_dir), - msg::Any::Utime => Some(op_utime), - msg::Any::Write => Some(op_write), - - // TODO(ry) split these out so that only the appropriate Workers can access - // them. - msg::Any::WorkerGetMessage => Some(op_worker_get_message), - msg::Any::WorkerPostMessage => Some(op_worker_post_message), - - _ => None, - } -} - -fn serialize_response( - cmd_id: u32, - builder: &mut FlatBufferBuilder<'_>, - mut args: msg::BaseArgs<'_>, -) -> Buf { - args.cmd_id = cmd_id; - let base = msg::Base::create(builder, &args); - msg::finish_base_buffer(builder, base); - let data = builder.finished_data(); - // println!("serialize_response {:x?}", data); - data.into() -} - -#[inline] -fn ok_buf(buf: Buf) -> CliOpResult { - Ok(Op::Sync(buf)) -} - -// This is just type conversion. Implement From trait? -// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85 -fn convert_blocking<F>(f: F) -> Poll<Buf, ErrBox> -where - F: FnOnce() -> Result<Buf, ErrBox>, -{ - use futures::Async::*; - match tokio_threadpool::blocking(f) { - Ok(Ready(Ok(v))) => Ok(v.into()), - Ok(Ready(Err(err))) => Err(err), - Ok(NotReady) => Ok(NotReady), - Err(err) => panic!("blocking error {}", err), - } -} + OP_FLATBUFFER => dispatch_flatbuffers::dispatch(state, control, zero_copy), + _ => panic!("bad op_id"), + }; -fn blocking<F>(is_sync: bool, f: F) -> CliOpResult -where - F: 'static + Send + FnOnce() -> Result<Buf, ErrBox>, -{ - if is_sync { - let result_buf = f()?; - Ok(Op::Sync(result_buf)) - } else { - Ok(Op::Async(Box::new(futures::sync::oneshot::spawn( - tokio_util::poll_fn(move || convert_blocking(f)), - &tokio_executor::DefaultExecutor::current(), - )))) - } + state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); + op } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 16a24872d..5ce562492 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,10 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; +use crate::resolve_addr::resolve_addr; use crate::resources; use crate::resources::Resource; use crate::state::ThreadSafeState; @@ -13,16 +12,12 @@ use deno::*; use flatbuffers::FlatBufferBuilder; use futures::Future; use std; +use std::convert::From; use std::net::Shutdown; use tokio; - -use crate::resolve_addr::resolve_addr; -use std::convert::From; use tokio::net::TcpListener; use tokio::net::TcpStream; -use crate::ops::blocking; - pub fn op_accept( _state: &ThreadSafeState, base: &msg::Base<'_>, diff --git a/cli/ops/os.rs b/cli/ops/os.rs index cd165aa05..fbf430d7a 100644 --- a/cli/ops/os.rs +++ b/cli/ops/os.rs @@ -1,12 +1,10 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::ansi; use crate::fs as deno_fs; use crate::msg; use crate::msg_util; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use crate::version; use atty; diff --git a/cli/ops/performance.rs b/cli/ops/performance.rs index ae2a0b860..94f6dbc38 100644 --- a/cli/ops/performance.rs +++ b/cli/ops/performance.rs @@ -1,8 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::msg; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use deno::*; use flatbuffers::FlatBufferBuilder; diff --git a/cli/ops/permissions.rs b/cli/ops/permissions.rs index 47a9cf871..6249581fb 100644 --- a/cli/ops/permissions.rs +++ b/cli/ops/permissions.rs @@ -1,9 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use deno::*; use flatbuffers::FlatBufferBuilder; diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 914b8ba86..d7b326d14 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -1,10 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::resources; use crate::signal::kill; use crate::state::ThreadSafeState; diff --git a/cli/ops/random.rs b/cli/ops/random.rs index 27e120faa..0c302a080 100644 --- a/cli/ops/random.rs +++ b/cli/ops/random.rs @@ -1,8 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::utils::*; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::ok_buf; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use deno::*; use rand::thread_rng; diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 2fff389a1..affe78739 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -1,9 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::blocking; +use super::utils::ok_buf; +use super::utils::CliOpResult; use crate::msg; -use crate::ops::blocking; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::repl; use crate::resources; use crate::state::ThreadSafeState; diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index 410d69c80..975d94490 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,8 +1,8 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::ok_buf; +use super::utils::CliOpResult; use crate::msg; -use crate::ops::ok_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::resources::table_entries; use crate::state::ThreadSafeState; use deno::*; diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 6c5f6fcae..550d91f2c 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -1,9 +1,9 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::empty_buf; +use super::utils::CliOpResult; use crate::deno_error; use crate::msg; -use crate::ops::empty_buf; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::state::ThreadSafeState; use deno::*; use flatbuffers::FlatBufferBuilder; diff --git a/cli/ops/utils.rs b/cli/ops/utils.rs new file mode 100644 index 000000000..a9b0b442c --- /dev/null +++ b/cli/ops/utils.rs @@ -0,0 +1,48 @@ +use crate::tokio_util; +use deno::Buf; +use deno::ErrBox; +use deno::Op; +use deno::OpResult; +use futures::Poll; + +pub type CliOpResult = OpResult<ErrBox>; + +#[inline] +pub fn ok_buf(buf: Buf) -> CliOpResult { + Ok(Op::Sync(buf)) +} + +#[inline] +pub fn empty_buf() -> Buf { + Box::new([]) +} + +// This is just type conversion. Implement From trait? +// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85 +fn convert_blocking<F>(f: F) -> Poll<Buf, ErrBox> +where + F: FnOnce() -> Result<Buf, ErrBox>, +{ + use futures::Async::*; + match tokio_threadpool::blocking(f) { + Ok(Ready(Ok(v))) => Ok(v.into()), + Ok(Ready(Err(err))) => Err(err), + Ok(NotReady) => Ok(NotReady), + Err(err) => panic!("blocking error {}", err), + } +} + +pub fn blocking<F>(is_sync: bool, f: F) -> CliOpResult +where + F: 'static + Send + FnOnce() -> Result<Buf, ErrBox>, +{ + if is_sync { + let result_buf = f()?; + Ok(Op::Sync(result_buf)) + } else { + Ok(Op::Async(Box::new(futures::sync::oneshot::spawn( + tokio_util::poll_fn(move || convert_blocking(f)), + &tokio_executor::DefaultExecutor::current(), + )))) + } +} diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index d47aab765..1eb11420f 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -1,12 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use super::dispatch_flatbuffers::serialize_response; +use super::utils::ok_buf; +use super::utils::CliOpResult; use crate::deno_error; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::msg; -use crate::ops::ok_buf; -use crate::ops::op_selector_std; -use crate::ops::serialize_response; -use crate::ops::CliOpResult; use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; @@ -136,7 +135,6 @@ pub fn op_create_worker( let child_state = ThreadSafeState::new( parent_state.flags.clone(), child_argv, - op_selector_std, parent_state.progress.clone(), include_deno_namespace, )?; |