summaryrefslogtreecommitdiff
path: root/cli/ops/mod.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-08-21 20:42:48 -0400
committerGitHub <noreply@github.com>2019-08-21 20:42:48 -0400
commitbdc97b3976786bb744a27e59b0f4f28554a682df (patch)
treebf4635ad903de542c10620e95adb72eee03d9204 /cli/ops/mod.rs
parentb764d1b8ffc4bf5e2ab89bdbd978d708a6da0f24 (diff)
Organize dispatch a bit (#2796)
Just some clean up reorganization around flatbuffer/minimal dispatch code. This is prep for adding a JSON dispatcher.
Diffstat (limited to 'cli/ops/mod.rs')
-rw-r--r--cli/ops/mod.rs291
1 files changed, 19 insertions, 272 deletions
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index 021c0fa47..92c0f8e62 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -1,305 +1,52 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::deno_error::GetErrorKind;
-use crate::dispatch_minimal::dispatch_minimal;
-use crate::dispatch_minimal::parse_min_record;
-use crate::msg;
use crate::state::ThreadSafeState;
-use crate::tokio_util;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
-use futures;
-use futures::Poll;
-use hyper;
-use hyper::rt::Future;
-use tokio_threadpool;
mod compiler;
-use compiler::{op_cache, op_fetch_source_file};
+mod dispatch_flatbuffers;
+mod dispatch_minimal;
mod errors;
-use errors::{op_apply_source_map, op_format_error};
-mod files;
-use files::{op_close, op_open, op_read, op_seek, op_write};
mod fetch;
-use fetch::op_fetch;
+mod files;
mod fs;
-use fs::{
- op_chdir, op_chmod, op_chown, op_copy_file, op_cwd, op_link,
- op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename,
- op_stat, op_symlink, op_truncate, op_utime,
-};
+mod io;
mod metrics;
-use metrics::op_metrics;
mod net;
-use net::{op_accept, op_dial, op_listen, op_shutdown};
mod os;
-use os::{
- op_env, op_exec_path, op_exit, op_home_dir, op_is_tty, op_set_env, op_start,
-};
mod performance;
-use performance::op_now;
mod permissions;
-use permissions::{op_permissions, op_revoke_permission};
mod process;
-use process::{op_kill, op_run, op_run_status};
mod random;
-use random::op_get_random_values;
mod repl;
-use repl::{op_repl_readline, op_repl_start};
mod resources;
-use resources::op_resources;
mod timers;
-use timers::{op_global_timer, op_global_timer_stop};
+mod utils;
mod workers;
-use workers::{
- op_create_worker, op_host_get_message, op_host_get_worker_closed,
- op_host_post_message, op_worker_get_message, op_worker_post_message,
-};
-
-type CliOpResult = OpResult<ErrBox>;
-
-type CliDispatchFn = fn(
- state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult;
-pub type OpSelector = fn(inner_type: msg::Any) -> Option<CliDispatchFn>;
+pub const OP_FLATBUFFER: OpId = 44;
+pub const OP_READ: OpId = 1;
+pub const OP_WRITE: OpId = 2;
-#[inline]
-fn empty_buf() -> Buf {
- Box::new([])
-}
-
-const FLATBUFFER_OP_ID: OpId = 44;
-
-pub fn dispatch_all(
+pub fn dispatch(
state: &ThreadSafeState,
op_id: OpId,
control: &[u8],
zero_copy: Option<PinnedBuf>,
- op_selector: OpSelector,
) -> CoreOp {
let bytes_sent_control = control.len();
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
- let op = if op_id != FLATBUFFER_OP_ID {
- let min_record = parse_min_record(control).unwrap();
- dispatch_minimal(state, op_id, min_record, zero_copy)
- } else {
- dispatch_all_legacy(state, control, zero_copy, op_selector)
- };
- state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
- op
-}
-/// Processes raw messages from JavaScript.
-/// This functions invoked every time Deno.core.dispatch() is called.
-/// control corresponds to the first argument of Deno.core.dispatch().
-/// data corresponds to the second argument of Deno.core.dispatch().
-pub fn dispatch_all_legacy(
- state: &ThreadSafeState,
- control: &[u8],
- zero_copy: Option<PinnedBuf>,
- op_selector: OpSelector,
-) -> CoreOp {
- let base = msg::get_root_as_base(&control);
- let inner_type = base.inner_type();
- let is_sync = base.sync();
- let cmd_id = base.cmd_id();
-
- debug!(
- "msg_from_js {} sync {}",
- msg::enum_name_any(inner_type),
- is_sync
- );
-
- let op_func: CliDispatchFn = match op_selector(inner_type) {
- Some(v) => v,
- None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
- };
-
- let op_result = op_func(state, &base, zero_copy);
-
- let state = state.clone();
-
- match op_result {
- Ok(Op::Sync(buf)) => {
- state.metrics_op_completed(buf.len());
- Op::Sync(buf)
+ let op = match op_id {
+ OP_READ => {
+ dispatch_minimal::dispatch(io::op_read, state, control, zero_copy)
}
- Ok(Op::Async(fut)) => {
- let result_fut = Box::new(
- fut
- .or_else(move |err: ErrBox| -> Result<Buf, ()> {
- debug!("op err {}", err);
- // No matter whether we got an Err or Ok, we want a serialized message to
- // send back. So transform the DenoError into a Buf.
- let builder = &mut FlatBufferBuilder::new();
- let errmsg_offset = builder.create_string(&format!("{}", err));
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- error: Some(errmsg_offset),
- error_kind: err.kind(),
- ..Default::default()
- },
- ))
- })
- .and_then(move |buf: Buf| -> Result<Buf, ()> {
- // Handle empty responses. For sync responses we just want
- // to send null. For async we want to send a small message
- // with the cmd_id.
- let buf = if buf.len() > 0 {
- buf
- } else {
- let builder = &mut FlatBufferBuilder::new();
- serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- )
- };
- state.metrics_op_completed(buf.len());
- Ok(buf)
- })
- .map_err(|err| panic!("unexpected error {:?}", err)),
- );
- Op::Async(result_fut)
+ OP_WRITE => {
+ dispatch_minimal::dispatch(io::op_write, state, control, zero_copy)
}
- Err(err) => {
- debug!("op err {}", err);
- // No matter whether we got an Err or Ok, we want a serialized message to
- // send back. So transform the DenoError into a Buf.
- let builder = &mut FlatBufferBuilder::new();
- let errmsg_offset = builder.create_string(&format!("{}", err));
- let response_buf = serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- error: Some(errmsg_offset),
- error_kind: err.kind(),
- ..Default::default()
- },
- );
- state.metrics_op_completed(response_buf.len());
- Op::Sync(response_buf)
- }
- }
-}
-
-/// Standard ops set for most isolates
-pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
- match inner_type {
- msg::Any::Accept => Some(op_accept),
- msg::Any::ApplySourceMap => Some(op_apply_source_map),
- msg::Any::Cache => Some(op_cache),
- msg::Any::Chdir => Some(op_chdir),
- msg::Any::Chmod => Some(op_chmod),
- msg::Any::Chown => Some(op_chown),
- msg::Any::Close => Some(op_close),
- msg::Any::CopyFile => Some(op_copy_file),
- msg::Any::CreateWorker => Some(op_create_worker),
- msg::Any::Cwd => Some(op_cwd),
- msg::Any::Dial => Some(op_dial),
- msg::Any::Environ => Some(op_env),
- msg::Any::ExecPath => Some(op_exec_path),
- msg::Any::Exit => Some(op_exit),
- msg::Any::Fetch => Some(op_fetch),
- msg::Any::FetchSourceFile => Some(op_fetch_source_file),
- msg::Any::FormatError => Some(op_format_error),
- msg::Any::GetRandomValues => Some(op_get_random_values),
- msg::Any::GlobalTimer => Some(op_global_timer),
- msg::Any::GlobalTimerStop => Some(op_global_timer_stop),
- msg::Any::HostGetMessage => Some(op_host_get_message),
- msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
- msg::Any::HostPostMessage => Some(op_host_post_message),
- msg::Any::IsTTY => Some(op_is_tty),
- msg::Any::Kill => Some(op_kill),
- msg::Any::Link => Some(op_link),
- msg::Any::Listen => Some(op_listen),
- msg::Any::MakeTempDir => Some(op_make_temp_dir),
- msg::Any::Metrics => Some(op_metrics),
- msg::Any::Mkdir => Some(op_mkdir),
- msg::Any::Now => Some(op_now),
- msg::Any::Open => Some(op_open),
- msg::Any::PermissionRevoke => Some(op_revoke_permission),
- msg::Any::Permissions => Some(op_permissions),
- msg::Any::Read => Some(op_read),
- msg::Any::ReadDir => Some(op_read_dir),
- msg::Any::Readlink => Some(op_read_link),
- msg::Any::Remove => Some(op_remove),
- msg::Any::Rename => Some(op_rename),
- msg::Any::ReplReadline => Some(op_repl_readline),
- msg::Any::ReplStart => Some(op_repl_start),
- msg::Any::Resources => Some(op_resources),
- msg::Any::Run => Some(op_run),
- msg::Any::RunStatus => Some(op_run_status),
- msg::Any::Seek => Some(op_seek),
- msg::Any::SetEnv => Some(op_set_env),
- msg::Any::Shutdown => Some(op_shutdown),
- msg::Any::Start => Some(op_start),
- msg::Any::Stat => Some(op_stat),
- msg::Any::Symlink => Some(op_symlink),
- msg::Any::Truncate => Some(op_truncate),
- msg::Any::HomeDir => Some(op_home_dir),
- msg::Any::Utime => Some(op_utime),
- msg::Any::Write => Some(op_write),
-
- // TODO(ry) split these out so that only the appropriate Workers can access
- // them.
- msg::Any::WorkerGetMessage => Some(op_worker_get_message),
- msg::Any::WorkerPostMessage => Some(op_worker_post_message),
-
- _ => None,
- }
-}
-
-fn serialize_response(
- cmd_id: u32,
- builder: &mut FlatBufferBuilder<'_>,
- mut args: msg::BaseArgs<'_>,
-) -> Buf {
- args.cmd_id = cmd_id;
- let base = msg::Base::create(builder, &args);
- msg::finish_base_buffer(builder, base);
- let data = builder.finished_data();
- // println!("serialize_response {:x?}", data);
- data.into()
-}
-
-#[inline]
-fn ok_buf(buf: Buf) -> CliOpResult {
- Ok(Op::Sync(buf))
-}
-
-// This is just type conversion. Implement From trait?
-// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85
-fn convert_blocking<F>(f: F) -> Poll<Buf, ErrBox>
-where
- F: FnOnce() -> Result<Buf, ErrBox>,
-{
- use futures::Async::*;
- match tokio_threadpool::blocking(f) {
- Ok(Ready(Ok(v))) => Ok(v.into()),
- Ok(Ready(Err(err))) => Err(err),
- Ok(NotReady) => Ok(NotReady),
- Err(err) => panic!("blocking error {}", err),
- }
-}
+ OP_FLATBUFFER => dispatch_flatbuffers::dispatch(state, control, zero_copy),
+ _ => panic!("bad op_id"),
+ };
-fn blocking<F>(is_sync: bool, f: F) -> CliOpResult
-where
- F: 'static + Send + FnOnce() -> Result<Buf, ErrBox>,
-{
- if is_sync {
- let result_buf = f()?;
- Ok(Op::Sync(result_buf))
- } else {
- Ok(Op::Async(Box::new(futures::sync::oneshot::spawn(
- tokio_util::poll_fn(move || convert_blocking(f)),
- &tokio_executor::DefaultExecutor::current(),
- ))))
- }
+ state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
+ op
}