diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-08-14 17:03:02 +0200 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-08-14 11:03:02 -0400 |
commit | e6c349af9f7260c2c4ec713bd231fe554240721e (patch) | |
tree | 7671dabb8270cc0c2d7f7f5a3b5f5d8d4b1e0986 /cli/ops/mod.rs | |
parent | 58f0e9b9b1b53ca486ef38ae662b98cbde839248 (diff) |
split up ops.rs (#2753)
Note cli/dispatch_minimal.rs ops are not yet included in cli/ops.
This is part of work towards #2730
Diffstat (limited to 'cli/ops/mod.rs')
-rw-r--r-- | cli/ops/mod.rs | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs new file mode 100644 index 000000000..021c0fa47 --- /dev/null +++ b/cli/ops/mod.rs @@ -0,0 +1,305 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::deno_error::GetErrorKind; +use crate::dispatch_minimal::dispatch_minimal; +use crate::dispatch_minimal::parse_min_record; +use crate::msg; +use crate::state::ThreadSafeState; +use crate::tokio_util; +use deno::*; +use flatbuffers::FlatBufferBuilder; +use futures; +use futures::Poll; +use hyper; +use hyper::rt::Future; +use tokio_threadpool; + +mod compiler; +use compiler::{op_cache, op_fetch_source_file}; +mod errors; +use errors::{op_apply_source_map, op_format_error}; +mod files; +use files::{op_close, op_open, op_read, op_seek, op_write}; +mod fetch; +use fetch::op_fetch; +mod fs; +use fs::{ + op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link, + op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename, + op_stat, op_symlink, op_truncate, op_utime, +}; +mod metrics; +use metrics::op_metrics; +mod net; +use net::{op_accept, op_dial, op_listen, op_shutdown}; +mod os; +use os::{ + op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start, +}; +mod performance; +use performance::op_now; +mod permissions; +use permissions::{op_permissions, op_revoke_permission}; +mod process; +use process::{op_kill, op_run, op_run_status}; +mod random; +use random::op_get_random_values; +mod repl; +use repl::{op_repl_readline, op_repl_start}; +mod resources; +use resources::op_resources; +mod timers; +use timers::{op_global_timer, op_global_timer_stop}; +mod workers; +use workers::{ + op_create_worker, op_host_get_message, op_host_get_worker_closed, + op_host_post_message, op_worker_get_message, op_worker_post_message, +}; + +type CliOpResult = OpResult<ErrBox>; + +type CliDispatchFn = fn( + state: &ThreadSafeState, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult; + +pub type OpSelector = fn(inner_type: msg::Any) -> Option<CliDispatchFn>; + +#[inline] +fn empty_buf() -> Buf { + Box::new([]) +} + +const FLATBUFFER_OP_ID: OpId = 44; + +pub fn dispatch_all( + state: &ThreadSafeState, + op_id: OpId, + control: &[u8], + zero_copy: Option<PinnedBuf>, + op_selector: OpSelector, +) -> CoreOp { + let bytes_sent_control = control.len(); + let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); + let op = if op_id != FLATBUFFER_OP_ID { + let min_record = parse_min_record(control).unwrap(); + dispatch_minimal(state, op_id, min_record, zero_copy) + } else { + dispatch_all_legacy(state, control, zero_copy, op_selector) + }; + state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); + op +} + +/// Processes raw messages from JavaScript. +/// This functions invoked every time Deno.core.dispatch() is called. +/// control corresponds to the first argument of Deno.core.dispatch(). +/// data corresponds to the second argument of Deno.core.dispatch(). +pub fn dispatch_all_legacy( + state: &ThreadSafeState, + control: &[u8], + zero_copy: Option<PinnedBuf>, + op_selector: OpSelector, +) -> CoreOp { + let base = msg::get_root_as_base(&control); + let inner_type = base.inner_type(); + let is_sync = base.sync(); + let cmd_id = base.cmd_id(); + + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(inner_type), + is_sync + ); + + let op_func: CliDispatchFn = match op_selector(inner_type) { + Some(v) => v, + None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)), + }; + + let op_result = op_func(state, &base, zero_copy); + + let state = state.clone(); + + match op_result { + Ok(Op::Sync(buf)) => { + state.metrics_op_completed(buf.len()); + Op::Sync(buf) + } + Ok(Op::Async(fut)) => { + let result_fut = Box::new( + fut + .or_else(move |err: ErrBox| -> Result<Buf, ()> { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a Buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }) + .and_then(move |buf: Buf| -> Result<Buf, ()> { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if buf.len() > 0 { + buf + } else { + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) + }; + state.metrics_op_completed(buf.len()); + Ok(buf) + }) + .map_err(|err| panic!("unexpected error {:?}", err)), + ); + Op::Async(result_fut) + } + Err(err) => { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a Buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + let response_buf = serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + ); + state.metrics_op_completed(response_buf.len()); + Op::Sync(response_buf) + } + } +} + +/// Standard ops set for most isolates +pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> { + match inner_type { + msg::Any::Accept => Some(op_accept), + msg::Any::ApplySourceMap => Some(op_apply_source_map), + msg::Any::Cache => Some(op_cache), + msg::Any::Chdir => Some(op_chdir), + msg::Any::Chmod => Some(op_chmod), + msg::Any::Chown => Some(op_chown), + msg::Any::Close => Some(op_close), + msg::Any::CopyFile => Some(op_copy_file), + msg::Any::CreateWorker => Some(op_create_worker), + msg::Any::Cwd => Some(op_cwd), + msg::Any::Dial => Some(op_dial), + msg::Any::Environ => Some(op_env), + msg::Any::ExecPath => Some(op_exec_path), + msg::Any::Exit => Some(op_exit), + msg::Any::Fetch => Some(op_fetch), + msg::Any::FetchSourceFile => Some(op_fetch_source_file), + msg::Any::FormatError => Some(op_format_error), + msg::Any::GetRandomValues => Some(op_get_random_values), + msg::Any::GlobalTimer => Some(op_global_timer), + msg::Any::GlobalTimerStop => Some(op_global_timer_stop), + msg::Any::HostGetMessage => Some(op_host_get_message), + msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), + msg::Any::HostPostMessage => Some(op_host_post_message), + msg::Any::IsTTY => Some(op_is_tty), + msg::Any::Kill => Some(op_kill), + msg::Any::Link => Some(op_link), + msg::Any::Listen => Some(op_listen), + msg::Any::MakeTempDir => Some(op_make_temp_dir), + msg::Any::Metrics => Some(op_metrics), + msg::Any::Mkdir => Some(op_mkdir), + msg::Any::Now => Some(op_now), + msg::Any::Open => Some(op_open), + msg::Any::PermissionRevoke => Some(op_revoke_permission), + msg::Any::Permissions => Some(op_permissions), + msg::Any::Read => Some(op_read), + msg::Any::ReadDir => Some(op_read_dir), + msg::Any::Readlink => Some(op_read_link), + msg::Any::Remove => Some(op_remove), + msg::Any::Rename => Some(op_rename), + msg::Any::ReplReadline => Some(op_repl_readline), + msg::Any::ReplStart => Some(op_repl_start), + msg::Any::Resources => Some(op_resources), + msg::Any::Run => Some(op_run), + msg::Any::RunStatus => Some(op_run_status), + msg::Any::Seek => Some(op_seek), + msg::Any::SetEnv => Some(op_set_env), + msg::Any::Shutdown => Some(op_shutdown), + msg::Any::Start => Some(op_start), + msg::Any::Stat => Some(op_stat), + msg::Any::Symlink => Some(op_symlink), + msg::Any::Truncate => Some(op_truncate), + msg::Any::HomeDir => Some(op_home_dir), + msg::Any::Utime => Some(op_utime), + msg::Any::Write => Some(op_write), + + // TODO(ry) split these out so that only the appropriate Workers can access + // them. + msg::Any::WorkerGetMessage => Some(op_worker_get_message), + msg::Any::WorkerPostMessage => Some(op_worker_post_message), + + _ => None, + } +} + +fn serialize_response( + cmd_id: u32, + builder: &mut FlatBufferBuilder<'_>, + mut args: msg::BaseArgs<'_>, +) -> Buf { + args.cmd_id = cmd_id; + let base = msg::Base::create(builder, &args); + msg::finish_base_buffer(builder, base); + let data = builder.finished_data(); + // println!("serialize_response {:x?}", data); + data.into() +} + +#[inline] +fn ok_buf(buf: Buf) -> CliOpResult { + Ok(Op::Sync(buf)) +} + +// This is just type conversion. Implement From trait? +// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85 +fn convert_blocking<F>(f: F) -> Poll<Buf, ErrBox> +where + F: FnOnce() -> Result<Buf, ErrBox>, +{ + use futures::Async::*; + match tokio_threadpool::blocking(f) { + Ok(Ready(Ok(v))) => Ok(v.into()), + Ok(Ready(Err(err))) => Err(err), + Ok(NotReady) => Ok(NotReady), + Err(err) => panic!("blocking error {}", err), + } +} + +fn blocking<F>(is_sync: bool, f: F) -> CliOpResult +where + F: 'static + Send + FnOnce() -> Result<Buf, ErrBox>, +{ + if is_sync { + let result_buf = f()?; + Ok(Op::Sync(result_buf)) + } else { + Ok(Op::Async(Box::new(futures::sync::oneshot::spawn( + tokio_util::poll_fn(move || convert_blocking(f)), + &tokio_executor::DefaultExecutor::current(), + )))) + } +} |