summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-08-24 17:31:14 +0200
committerRyan Dahl <ry@tinyclouds.org>2019-08-24 08:31:14 -0700
commit137f33733d365026903d40e7cde6e34ac6c36dcf (patch)
treee8096e119c374b199cd498ccfa1ee0ef4e6ba950 /cli
parent79f82cf10ed1dbf91346994250d7311a4d74377a (diff)
port more ops to JSON (#2809)
Diffstat (limited to 'cli')
-rw-r--r--cli/deno_error.rs17
-rw-r--r--cli/msg.fbs223
-rw-r--r--cli/msg.rs14
-rw-r--r--cli/ops/dispatch_flatbuffers.rs39
-rw-r--r--cli/ops/metrics.rs36
-rw-r--r--cli/ops/mod.rs127
-rw-r--r--cli/ops/net.rs167
-rw-r--r--cli/ops/performance.rs34
-rw-r--r--cli/ops/permissions.rs62
-rw-r--r--cli/ops/process.rs175
-rw-r--r--cli/ops/random.rs17
-rw-r--r--cli/ops/repl.rs86
-rw-r--r--cli/ops/resources.rs50
-rw-r--r--cli/ops/timers.rs59
-rw-r--r--cli/ops/workers.rs229
15 files changed, 488 insertions, 847 deletions
diff --git a/cli/deno_error.rs b/cli/deno_error.rs
index e024a396c..3b7dbcde8 100644
--- a/cli/deno_error.rs
+++ b/cli/deno_error.rs
@@ -205,6 +205,18 @@ impl GetErrorKind for ReadlineError {
}
}
+impl GetErrorKind for serde_json::error::Error {
+ fn kind(&self) -> ErrorKind {
+ use serde_json::error::*;
+ match self.classify() {
+ Category::Io => ErrorKind::InvalidInput,
+ Category::Syntax => ErrorKind::InvalidInput,
+ Category::Data => ErrorKind::InvalidData,
+ Category::Eof => ErrorKind::UnexpectedEof,
+ }
+ }
+}
+
#[cfg(unix)]
mod unix {
use super::{ErrorKind, GetErrorKind};
@@ -251,6 +263,11 @@ impl GetErrorKind for dyn AnyError {
.or_else(|| self.downcast_ref::<uri::InvalidUri>().map(Get::kind))
.or_else(|| self.downcast_ref::<url::ParseError>().map(Get::kind))
.or_else(|| self.downcast_ref::<ReadlineError>().map(Get::kind))
+ .or_else(|| {
+ self
+ .downcast_ref::<serde_json::error::Error>()
+ .map(Get::kind)
+ })
.or_else(|| unix_error_kind(self))
.unwrap_or_else(|| {
panic!("Can't get ErrorKind for {:?}", self);
diff --git a/cli/msg.fbs b/cli/msg.fbs
index 26ff61278..a7359c527 100644
--- a/cli/msg.fbs
+++ b/cli/msg.fbs
@@ -1,37 +1,14 @@
union Any {
- Accept,
Chdir,
Chmod,
Chown,
CopyFile,
- CreateWorker,
- CreateWorkerRes,
Cwd,
CwdRes,
- Dial,
- GetRandomValues,
- GlobalTimer,
- GlobalTimerRes,
- GlobalTimerStop,
- HostGetMessage,
- HostGetMessageRes,
- HostGetWorkerClosed,
- HostPostMessage,
- Kill,
Link,
- Listen,
- ListenRes,
MakeTempDir,
MakeTempDirRes,
- Metrics,
- MetricsRes,
Mkdir,
- NewConn,
- Now,
- NowRes,
- PermissionRevoke,
- Permissions,
- PermissionsRes,
Read,
ReadDir,
ReadDirRes,
@@ -40,25 +17,11 @@ union Any {
ReadlinkRes,
Remove,
Rename,
- ReplReadline,
- ReplReadlineRes,
- ReplStart,
- ReplStartRes,
- Resources,
- ResourcesRes,
- Run,
- RunRes,
- RunStatus,
- RunStatusRes,
Seek,
- Shutdown,
Stat,
StatRes,
Symlink,
Truncate,
- WorkerGetMessage,
- WorkerGetMessageRes,
- WorkerPostMessage,
Write,
WriteRes,
}
@@ -159,83 +122,15 @@ table FormatErrorRes {
error: string;
}
-// Create worker as host
-table CreateWorker {
- specifier: string;
- include_deno_namespace: bool;
- has_source_code: bool;
- source_code: string;
-}
-
-table CreateWorkerRes {
- rid: uint32;
-}
-
-table HostGetWorkerClosed {
- rid: uint32;
-}
-
-// Get message from guest worker as host
-table HostGetMessage {
- rid: uint32;
-}
-
-table HostGetMessageRes {
- data: [ubyte];
-}
-
-// Post message to guest worker as host
-table HostPostMessage {
- rid: uint32;
- // data passed thru the zero-copy data parameter.
-}
-
-// Get message from host as guest worker
-table WorkerGetMessage {
- unused: int8;
-}
-
-table WorkerGetMessageRes {
- data: [ubyte];
-}
-
-// Post message to host as guest worker
-table WorkerPostMessage {
- // data passed thru the zero-copy data parameter.
-}
-
table Chdir {
directory: string;
}
-table GlobalTimer {
- timeout: int;
-}
-
-table GlobalTimerRes { }
-
-table GlobalTimerStop { }
-
table KeyValue {
key: string;
value: string;
}
-table Permissions {}
-
-table PermissionRevoke {
- permission: string;
-}
-
-table PermissionsRes {
- run: bool;
- read: bool;
- write: bool;
- net: bool;
- env: bool;
- hrtime: bool;
-}
-
table MakeTempDir {
dir: string;
prefix: string;
@@ -294,35 +189,6 @@ table ReadlinkRes {
path: string;
}
-table ReplStart {
- history_file: string;
- // TODO add config
-}
-
-table ReplStartRes {
- rid: uint32;
-}
-
-table ReplReadline {
- rid: uint32;
- prompt: string;
-}
-
-table ReplReadlineRes {
- line: string;
-}
-
-table Resources {}
-
-table Resource {
- rid: uint32;
- repr: string;
-}
-
-table ResourcesRes {
- resources: [Resource];
-}
-
table Symlink {
oldname: string;
newname: string;
@@ -373,99 +239,10 @@ table WriteRes {
nbyte: uint;
}
-table Kill {
- pid: int32;
- signo: int32;
-}
-
-table Shutdown {
- rid: uint32;
- how: uint;
-}
-
-table Listen {
- network: string;
- address: string;
-}
-
-table ListenRes {
- rid: uint32;
-}
-
-table Accept {
- rid: uint32;
-}
-
-table Dial {
- network: string;
- address: string;
-}
-
-// Response to Accept and Dial.
-table NewConn {
- rid: uint32;
- remote_addr: string;
- local_addr: string;
-}
-
-table Metrics {}
-
-table MetricsRes {
- ops_dispatched: uint64;
- ops_completed: uint64;
- bytes_sent_control: uint64;
- bytes_sent_data: uint64;
- bytes_received: uint64;
-}
-
-enum ProcessStdio: byte { Inherit, Piped, Null }
-
-table Run {
- args: [string];
- cwd: string;
- env: [KeyValue];
- stdin: ProcessStdio;
- stdout: ProcessStdio;
- stderr: ProcessStdio;
- stdin_rid: uint32;
- stdout_rid: uint32;
- stderr_rid: uint32;
-}
-
-table RunRes {
- rid: uint32;
- pid: uint32;
- // The following stdio rids are only valid if "Piped" was specified for the
- // corresponding stdio stream. The caller MUST issue a close op for all valid
- // stdio streams.
- stdin_rid: uint32;
- stdout_rid: uint32;
- stderr_rid: uint32;
-}
-
-table RunStatus {
- rid: uint32;
-}
-
-table RunStatusRes {
- got_signal: bool;
- exit_code: int;
- exit_signal: int;
-}
-
-table Now {}
-
-table NowRes {
- seconds: uint64;
- subsec_nanos: uint32;
-}
-
table Seek {
rid: uint32;
offset: int;
whence: uint;
}
-table GetRandomValues {}
-
root_type Base;
diff --git a/cli/msg.rs b/cli/msg.rs
index 51726b572..db4c771f8 100644
--- a/cli/msg.rs
+++ b/cli/msg.rs
@@ -1,22 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
#![allow(dead_code)]
#![cfg_attr(feature = "cargo-clippy", allow(clippy::all, clippy::pedantic))]
-use crate::state;
use flatbuffers;
-use std::sync::atomic::Ordering;
// GN_OUT_DIR is set either by build.rs (for the Cargo build), or by
// build_extra/rust/run.py (for the GN+Ninja build).
include!(concat!(env!("GN_OUT_DIR"), "/gen/cli/msg_generated.rs"));
-
-impl<'a> From<&'a state::Metrics> for MetricsResArgs {
- fn from(m: &'a state::Metrics) -> Self {
- MetricsResArgs {
- ops_dispatched: m.ops_dispatched.load(Ordering::SeqCst) as u64,
- ops_completed: m.ops_completed.load(Ordering::SeqCst) as u64,
- bytes_sent_control: m.bytes_sent_control.load(Ordering::SeqCst) as u64,
- bytes_sent_data: m.bytes_sent_data.load(Ordering::SeqCst) as u64,
- bytes_received: m.bytes_received.load(Ordering::SeqCst) as u64,
- }
- }
-}
diff --git a/cli/ops/dispatch_flatbuffers.rs b/cli/ops/dispatch_flatbuffers.rs
index c785d6c06..bd0158751 100644
--- a/cli/ops/dispatch_flatbuffers.rs
+++ b/cli/ops/dispatch_flatbuffers.rs
@@ -12,19 +12,6 @@ use super::fs::{
op_make_temp_dir, op_mkdir, op_read_dir, op_read_link, op_remove, op_rename,
op_stat, op_symlink, op_truncate,
};
-use super::metrics::op_metrics;
-use super::net::{op_accept, op_dial, op_listen, op_shutdown};
-use super::performance::op_now;
-use super::permissions::{op_permissions, op_revoke_permission};
-use super::process::{op_kill, op_run, op_run_status};
-use super::random::op_get_random_values;
-use super::repl::{op_repl_readline, op_repl_start};
-use super::resources::op_resources;
-use super::timers::{op_global_timer, op_global_timer_stop};
-use super::workers::{
- op_create_worker, op_host_get_message, op_host_get_worker_closed,
- op_host_post_message, op_worker_get_message, op_worker_post_message,
-};
type CliDispatchFn = fn(
state: &ThreadSafeState,
@@ -138,50 +125,24 @@ pub fn serialize_response(
/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<CliDispatchFn> {
match inner_type {
- msg::Any::Accept => Some(op_accept),
msg::Any::Chdir => Some(op_chdir),
msg::Any::Chmod => Some(op_chmod),
msg::Any::Chown => Some(op_chown),
msg::Any::CopyFile => Some(op_copy_file),
- msg::Any::CreateWorker => Some(op_create_worker),
msg::Any::Cwd => Some(op_cwd),
- msg::Any::Dial => Some(op_dial),
- msg::Any::GetRandomValues => Some(op_get_random_values),
- msg::Any::GlobalTimer => Some(op_global_timer),
- msg::Any::GlobalTimerStop => Some(op_global_timer_stop),
- msg::Any::HostGetMessage => Some(op_host_get_message),
- msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
- msg::Any::HostPostMessage => Some(op_host_post_message),
- msg::Any::Kill => Some(op_kill),
msg::Any::Link => Some(op_link),
- msg::Any::Listen => Some(op_listen),
msg::Any::MakeTempDir => Some(op_make_temp_dir),
- msg::Any::Metrics => Some(op_metrics),
msg::Any::Mkdir => Some(op_mkdir),
- msg::Any::Now => Some(op_now),
- msg::Any::PermissionRevoke => Some(op_revoke_permission),
- msg::Any::Permissions => Some(op_permissions),
msg::Any::Read => Some(op_read),
msg::Any::ReadDir => Some(op_read_dir),
msg::Any::Readlink => Some(op_read_link),
msg::Any::Remove => Some(op_remove),
msg::Any::Rename => Some(op_rename),
- msg::Any::ReplReadline => Some(op_repl_readline),
- msg::Any::ReplStart => Some(op_repl_start),
- msg::Any::Resources => Some(op_resources),
- msg::Any::Run => Some(op_run),
- msg::Any::RunStatus => Some(op_run_status),
- msg::Any::Shutdown => Some(op_shutdown),
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
msg::Any::Write => Some(op_write),
- // TODO(ry) split these out so that only the appropriate Workers can access
- // them.
- msg::Any::WorkerGetMessage => Some(op_worker_get_message),
- msg::Any::WorkerPostMessage => Some(op_worker_post_message),
-
_ => None,
}
}
diff --git a/cli/ops/metrics.rs b/cli/ops/metrics.rs
index 76f36c390..e1a23f6c8 100644
--- a/cli/ops/metrics.rs
+++ b/cli/ops/metrics.rs
@@ -1,31 +1,21 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::*;
-use crate::msg;
+use super::dispatch_json::{JsonOp, Value};
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
+use std::sync::atomic::Ordering;
pub fn op_metrics(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
+ _args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let m = &state.metrics;
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::MetricsRes::create(
- builder,
- &msg::MetricsResArgs::from(&state.metrics),
- );
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::MetricsRes,
- ..Default::default()
- },
- ))
+ Ok(JsonOp::Sync(json!({
+ "opsDispatched": m.ops_dispatched.load(Ordering::SeqCst) as u64,
+ "opsCompleted": m.ops_completed.load(Ordering::SeqCst) as u64,
+ "bytesSentControl": m.bytes_sent_control.load(Ordering::SeqCst) as u64,
+ "bytesSentData": m.bytes_sent_data.load(Ordering::SeqCst) as u64,
+ "bytesReceived": m.bytes_received.load(Ordering::SeqCst) as u64
+ })))
}
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index 6a80e610f..4636754c9 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -45,6 +45,29 @@ pub const OP_OPEN: OpId = 15;
pub const OP_CLOSE: OpId = 16;
pub const OP_SEEK: OpId = 17;
pub const OP_FETCH: OpId = 18;
+pub const OP_METRICS: OpId = 19;
+pub const OP_REPL_START: OpId = 20;
+pub const OP_REPL_READLINE: OpId = 21;
+pub const OP_ACCEPT: OpId = 22;
+pub const OP_DIAL: OpId = 23;
+pub const OP_SHUTDOWN: OpId = 24;
+pub const OP_LISTEN: OpId = 25;
+pub const OP_RESOURCES: OpId = 26;
+pub const OP_GET_RANDOM_VALUES: OpId = 27;
+pub const OP_GLOBAL_TIMER_STOP: OpId = 28;
+pub const OP_GLOBAL_TIMER: OpId = 29;
+pub const OP_NOW: OpId = 30;
+pub const OP_PERMISSIONS: OpId = 31;
+pub const OP_REVOKE_PERMISSION: OpId = 32;
+pub const OP_CREATE_WORKER: OpId = 33;
+pub const OP_HOST_GET_WORKER_CLOSED: OpId = 34;
+pub const OP_HOST_POST_MESSAGE: OpId = 35;
+pub const OP_HOST_GET_MESSAGE: OpId = 36;
+pub const OP_WORKER_POST_MESSAGE: OpId = 37;
+pub const OP_WORKER_GET_MESSAGE: OpId = 38;
+pub const OP_RUN: OpId = 39;
+pub const OP_RUN_STATUS: OpId = 40;
+pub const OP_KILL: OpId = 41;
pub fn dispatch(
state: &ThreadSafeState,
@@ -112,9 +135,113 @@ pub fn dispatch(
OP_SEEK => {
dispatch_json::dispatch(files::op_seek, state, control, zero_copy)
}
+ OP_METRICS => {
+ dispatch_json::dispatch(metrics::op_metrics, state, control, zero_copy)
+ }
OP_FETCH => {
dispatch_json::dispatch(fetch::op_fetch, state, control, zero_copy)
}
+ OP_REPL_START => {
+ dispatch_json::dispatch(repl::op_repl_start, state, control, zero_copy)
+ }
+ OP_REPL_READLINE => {
+ dispatch_json::dispatch(repl::op_repl_readline, state, control, zero_copy)
+ }
+ OP_ACCEPT => {
+ dispatch_json::dispatch(net::op_accept, state, control, zero_copy)
+ }
+ OP_DIAL => dispatch_json::dispatch(net::op_dial, state, control, zero_copy),
+ OP_SHUTDOWN => {
+ dispatch_json::dispatch(net::op_shutdown, state, control, zero_copy)
+ }
+ OP_LISTEN => {
+ dispatch_json::dispatch(net::op_listen, state, control, zero_copy)
+ }
+ OP_RESOURCES => dispatch_json::dispatch(
+ resources::op_resources,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_GET_RANDOM_VALUES => dispatch_json::dispatch(
+ random::op_get_random_values,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_GLOBAL_TIMER_STOP => dispatch_json::dispatch(
+ timers::op_global_timer_stop,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_GLOBAL_TIMER => dispatch_json::dispatch(
+ timers::op_global_timer,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_NOW => {
+ dispatch_json::dispatch(performance::op_now, state, control, zero_copy)
+ }
+ OP_PERMISSIONS => dispatch_json::dispatch(
+ permissions::op_permissions,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_REVOKE_PERMISSION => dispatch_json::dispatch(
+ permissions::op_revoke_permission,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_CREATE_WORKER => dispatch_json::dispatch(
+ workers::op_create_worker,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_HOST_GET_WORKER_CLOSED => dispatch_json::dispatch(
+ workers::op_host_get_worker_closed,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_HOST_POST_MESSAGE => dispatch_json::dispatch(
+ workers::op_host_post_message,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_HOST_GET_MESSAGE => dispatch_json::dispatch(
+ workers::op_host_get_message,
+ state,
+ control,
+ zero_copy,
+ ),
+ // TODO: make sure these two ops are only accessible to appropriate Workers
+ OP_WORKER_POST_MESSAGE => dispatch_json::dispatch(
+ workers::op_worker_post_message,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_WORKER_GET_MESSAGE => dispatch_json::dispatch(
+ workers::op_worker_get_message,
+ state,
+ control,
+ zero_copy,
+ ),
+ OP_RUN => {
+ dispatch_json::dispatch(process::op_run, state, control, zero_copy)
+ }
+ OP_RUN_STATUS => {
+ dispatch_json::dispatch(process::op_run_status, state, control, zero_copy)
+ }
+ OP_KILL => {
+ dispatch_json::dispatch(process::op_kill, state, control, zero_copy)
+ }
OP_FLATBUFFER => dispatch_flatbuffers::dispatch(state, control, zero_copy),
_ => panic!("bad op_id"),
};
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 5ce562492..650127fad 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,15 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::*;
+use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::deno_error;
-use crate::msg;
use crate::resolve_addr::resolve_addr;
use crate::resources;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
use futures::Future;
use std;
use std::convert::From;
@@ -18,15 +15,18 @@ use tokio;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
+#[derive(Deserialize)]
+struct AcceptArgs {
+ rid: i32,
+}
+
pub fn op_accept(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_accept().unwrap();
- let server_rid = inner.rid();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: AcceptArgs = serde_json::from_value(args)?;
+ let server_rid = args.rid as u32;
match resources::lookup(server_rid) {
None => Err(deno_error::bad_resource()),
@@ -34,55 +34,65 @@ pub fn op_accept(
let op = tokio_util::accept(server_resource)
.map_err(ErrBox::from)
.and_then(move |(tcp_stream, _socket_addr)| {
- new_conn(cmd_id, tcp_stream)
+ let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
+ futures::future::ok(json!({
+ "rid": tcp_stream_resource.rid
+ }))
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+
+ Ok(JsonOp::Async(Box::new(op)))
}
}
}
+#[derive(Deserialize)]
+struct DialArgs {
+ network: String,
+ address: String,
+}
+
pub fn op_dial(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_dial().unwrap();
- let network = inner.network().unwrap();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: DialArgs = serde_json::from_value(args)?;
+ let network = args.network;
assert_eq!(network, "tcp"); // TODO Support others.
- let address = inner.address().unwrap();
+ let address = args.address;
state.check_net(&address)?;
- let op = resolve_addr(address).and_then(move |addr| {
- TcpStream::connect(&addr)
- .map_err(ErrBox::from)
- .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream))
+ let op = resolve_addr(&address).and_then(move |addr| {
+ TcpStream::connect(&addr).map_err(ErrBox::from).and_then(
+ move |tcp_stream| {
+ let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
+ futures::future::ok(json!({
+ "rid": tcp_stream_resource.rid
+ }))
+ },
+ )
});
- if base.sync() {
- let buf = op.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(op)))
- }
+
+ Ok(JsonOp::Async(Box::new(op)))
+}
+
+#[derive(Deserialize)]
+struct ShutdownArgs {
+ rid: i32,
+ how: i32,
}
pub fn op_shutdown(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let inner = base.inner_as_shutdown().unwrap();
- let rid = inner.rid();
- let how = inner.how();
- match resources::lookup(rid) {
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: ShutdownArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid;
+ let how = args.how;
+ match resources::lookup(rid as u32) {
None => Err(deno_error::bad_resource()),
Some(mut resource) => {
let shutdown_mode = match how {
@@ -90,67 +100,36 @@ pub fn op_shutdown(
1 => Shutdown::Write,
_ => unimplemented!(),
};
- blocking(base.sync(), move || {
- // Use UFCS for disambiguation
- Resource::shutdown(&mut resource, shutdown_mode)?;
- Ok(empty_buf())
- })
+
+ // Use UFCS for disambiguation
+ Resource::shutdown(&mut resource, shutdown_mode)?;
+ Ok(JsonOp::Sync(json!({})))
}
}
}
+#[derive(Deserialize)]
+struct ListenArgs {
+ network: String,
+ address: String,
+}
+
pub fn op_listen(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_listen().unwrap();
- let network = inner.network().unwrap();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: ListenArgs = serde_json::from_value(args)?;
+
+ let network = args.network;
assert_eq!(network, "tcp");
- let address = inner.address().unwrap();
+ let address = args.address;
state.check_net(&address)?;
- let addr = resolve_addr(address).wait()?;
+ let addr = resolve_addr(&address).wait()?;
let listener = TcpListener::bind(&addr)?;
let resource = resources::add_tcp_listener(listener);
- let builder = &mut FlatBufferBuilder::new();
- let inner =
- msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid });
- let response_buf = serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ListenRes,
- ..Default::default()
- },
- );
- ok_buf(response_buf)
-}
-
-fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> Result<Buf, ErrBox> {
- let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
- // TODO forward socket_addr to client.
-
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::NewConn::create(
- builder,
- &msg::NewConnArgs {
- rid: tcp_stream_resource.rid,
- ..Default::default()
- },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::NewConn,
- ..Default::default()
- },
- ))
+ Ok(JsonOp::Sync(json!(resource.rid)))
}
diff --git a/cli/ops/performance.rs b/cli/ops/performance.rs
index 94f6dbc38..090fc3323 100644
--- a/cli/ops/performance.rs
+++ b/cli/ops/performance.rs
@@ -1,10 +1,7 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::*;
-use crate::msg;
+use super::dispatch_json::{JsonOp, Value};
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
// Returns a milliseconds and nanoseconds subsec
// since the start time of the deno runtime.
@@ -12,10 +9,9 @@ use flatbuffers::FlatBufferBuilder;
// nanoseconds are rounded on 2ms.
pub fn op_now(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
+ _args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
let seconds = state.start_time.elapsed().as_secs();
let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
let reduced_time_precision = 2_000_000; // 2ms in nanoseconds
@@ -27,22 +23,8 @@ pub fn op_now(
subsec_nanos -= subsec_nanos % reduced_time_precision
}
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::NowRes::create(
- builder,
- &msg::NowResArgs {
- seconds,
- subsec_nanos,
- },
- );
-
- ok_buf(serialize_response(
- base.cmd_id(),
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::NowRes,
- ..Default::default()
- },
- ))
+ Ok(JsonOp::Sync(json!({
+ "seconds": seconds,
+ "subsecNanos": subsec_nanos,
+ })))
}
diff --git a/cli/ops/permissions.rs b/cli/ops/permissions.rs
index 6249581fb..5d14f39be 100644
--- a/cli/ops/permissions.rs
+++ b/cli/ops/permissions.rs
@@ -1,50 +1,35 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::*;
-use crate::msg;
+use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
pub fn op_permissions(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::PermissionsRes::create(
- builder,
- &msg::PermissionsResArgs {
- run: state.permissions.allows_run(),
- read: state.permissions.allows_read(),
- write: state.permissions.allows_write(),
- net: state.permissions.allows_net(),
- env: state.permissions.allows_env(),
- hrtime: state.permissions.allows_hrtime(),
- },
- );
- let response_buf = serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::PermissionsRes,
- ..Default::default()
- },
- );
- ok_buf(response_buf)
+ _args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ Ok(JsonOp::Sync(json!({
+ "run": state.permissions.allows_run(),
+ "read": state.permissions.allows_read(),
+ "write": state.permissions.allows_write(),
+ "net": state.permissions.allows_net(),
+ "env": state.permissions.allows_env(),
+ "hrtime": state.permissions.allows_hrtime(),
+ })))
+}
+
+#[derive(Deserialize)]
+struct RevokePermissionArgs {
+ permission: String,
}
pub fn op_revoke_permission(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let inner = base.inner_as_permission_revoke().unwrap();
- let permission = inner.permission().unwrap();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: RevokePermissionArgs = serde_json::from_value(args)?;
+ let permission = args.permission.as_ref();
match permission {
"run" => state.permissions.revoke_run(),
"read" => state.permissions.revoke_read(),
@@ -54,5 +39,6 @@ pub fn op_revoke_permission(
"hrtime" => state.permissions.revoke_hrtime(),
_ => Ok(()),
}?;
- ok_buf(empty_buf())
+
+ Ok(JsonOp::Sync(json!({})))
}
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index d7b326d14..8dff53c6e 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -1,13 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::*;
-use crate::deno_error;
-use crate::msg;
+use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::resources;
use crate::signal::kill;
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Future;
use std;
@@ -18,63 +14,72 @@ use tokio_process::CommandExt;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
-fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio {
- match v {
- msg::ProcessStdio::Inherit => std::process::Stdio::inherit(),
- msg::ProcessStdio::Piped => std::process::Stdio::piped(),
- msg::ProcessStdio::Null => std::process::Stdio::null(),
+fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
+ match s {
+ "inherit" => std::process::Stdio::inherit(),
+ "piped" => std::process::Stdio::piped(),
+ "null" => std::process::Stdio::null(),
+ _ => unreachable!(),
}
}
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RunArgs {
+ args: Vec<String>,
+ cwd: Option<String>,
+ env: Vec<(String, String)>,
+ stdin: String,
+ stdout: String,
+ stderr: String,
+ stdin_rid: u32,
+ stdout_rid: u32,
+ stderr_rid: u32,
+}
+
pub fn op_run(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(deno_error::no_async_support());
- }
- let cmd_id = base.cmd_id();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
- assert!(data.is_none());
- let inner = base.inner_as_run().unwrap();
- let args = inner.args().unwrap();
- let env = inner.env().unwrap();
- let cwd = inner.cwd();
+ let args = run_args.args;
+ let env = run_args.env;
+ let cwd = run_args.cwd;
- let mut c = Command::new(args.get(0));
+ let mut c = Command::new(args.get(0).unwrap());
(1..args.len()).for_each(|i| {
- let arg = args.get(i);
+ let arg = args.get(i).unwrap();
c.arg(arg);
});
cwd.map(|d| c.current_dir(d));
- (0..env.len()).for_each(|i| {
- let entry = env.get(i);
- c.env(entry.key().unwrap(), entry.value().unwrap());
- });
+ for (key, value) in &env {
+ c.env(key, value);
+ }
// TODO: make this work with other resources, eg. sockets
- let stdin_rid = inner.stdin_rid();
+ let stdin_rid = run_args.stdin_rid;
if stdin_rid > 0 {
c.stdin(resources::get_file(stdin_rid)?);
} else {
- c.stdin(subprocess_stdio_map(inner.stdin()));
+ c.stdin(subprocess_stdio_map(run_args.stdin.as_ref()));
}
- let stdout_rid = inner.stdout_rid();
+ let stdout_rid = run_args.stdout_rid;
if stdout_rid > 0 {
c.stdout(resources::get_file(stdout_rid)?);
} else {
- c.stdout(subprocess_stdio_map(inner.stdout()));
+ c.stdout(subprocess_stdio_map(run_args.stdout.as_ref()));
}
- let stderr_rid = inner.stderr_rid();
+ let stderr_rid = run_args.stderr_rid;
if stderr_rid > 0 {
c.stderr(resources::get_file(stderr_rid)?);
} else {
- c.stderr(subprocess_stdio_map(inner.stderr()));
+ c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
}
// Spawn the command.
@@ -83,44 +88,28 @@ pub fn op_run(
let pid = child.id();
let resources = resources::add_child(child);
- let mut res_args = msg::RunResArgs {
- rid: resources.child_rid,
- pid,
- ..Default::default()
- };
-
- if let Some(stdin_rid) = resources.stdin_rid {
- res_args.stdin_rid = stdin_rid;
- }
- if let Some(stdout_rid) = resources.stdout_rid {
- res_args.stdout_rid = stdout_rid;
- }
- if let Some(stderr_rid) = resources.stderr_rid {
- res_args.stderr_rid = stderr_rid;
- }
+ Ok(JsonOp::Sync(json!({
+ "rid": resources.child_rid,
+ "pid": pid,
+ "stdinRid": resources.stdin_rid,
+ "stdoutRid": resources.stdout_rid,
+ "stderrRid": resources.stderr_rid,
+ })))
+}
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::RunRes::create(builder, &res_args);
- Ok(Op::Sync(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::RunRes,
- ..Default::default()
- },
- )))
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RunStatusArgs {
+ rid: i32,
}
pub fn op_run_status(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_run_status().unwrap();
- let rid = inner.rid();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: RunStatusArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
state.check_run()?;
@@ -139,44 +128,30 @@ pub fn op_run_status(
.expect("Should have either an exit code or a signal.");
let got_signal = signal.is_some();
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::RunStatusRes::create(
- builder,
- &msg::RunStatusResArgs {
- got_signal,
- exit_code: code.unwrap_or(-1),
- exit_signal: signal.unwrap_or(-1),
- },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::RunStatusRes,
- ..Default::default()
- },
- ))
+ futures::future::ok(json!({
+ "gotSignal": got_signal,
+ "exitCode": code.unwrap_or(-1),
+ "exitSignal": signal.unwrap_or(-1),
+ }))
});
- if base.sync() {
- let buf = future.wait()?;
- Ok(Op::Sync(buf))
- } else {
- Ok(Op::Async(Box::new(future)))
- }
+
+ Ok(JsonOp::Async(Box::new(future)))
+}
+
+#[derive(Deserialize)]
+struct KillArgs {
+ pid: i32,
+ signo: i32,
}
pub fn op_kill(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
state.check_run()?;
- assert!(data.is_none());
- let inner = base.inner_as_kill().unwrap();
- let pid = inner.pid();
- let signo = inner.signo();
- kill(pid, signo)?;
- ok_buf(empty_buf())
+ let args: KillArgs = serde_json::from_value(args)?;
+ kill(args.pid, args.signo)?;
+ Ok(JsonOp::Sync(json!({})))
}
diff --git a/cli/ops/random.rs b/cli/ops/random.rs
index 0c302a080..7470eab40 100644
--- a/cli/ops/random.rs
+++ b/cli/ops/random.rs
@@ -1,6 +1,5 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::utils::*;
-use crate::msg;
+use super::dispatch_json::{JsonOp, Value};
use crate::state::ThreadSafeState;
use deno::*;
use rand::thread_rng;
@@ -8,16 +7,18 @@ use rand::Rng;
pub fn op_get_random_values(
state: &ThreadSafeState,
- _base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
+ _args: Value,
+ zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ assert!(zero_copy.is_some());
+
if let Some(ref seeded_rng) = state.seeded_rng {
let mut rng = seeded_rng.lock().unwrap();
- rng.fill(&mut data.unwrap()[..]);
+ rng.fill(&mut zero_copy.unwrap()[..]);
} else {
let mut rng = thread_rng();
- rng.fill(&mut data.unwrap()[..]);
+ rng.fill(&mut zero_copy.unwrap()[..]);
}
- ok_buf(empty_buf())
+ Ok(JsonOp::Sync(json!({})))
}
diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs
index affe78739..7ab7509de 100644
--- a/cli/ops/repl.rs
+++ b/cli/ops/repl.rs
@@ -1,78 +1,50 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::blocking;
-use super::utils::ok_buf;
-use super::utils::CliOpResult;
-use crate::msg;
+use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value};
use crate::repl;
use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ReplStartArgs {
+ history_file: String,
+}
pub fn op_repl_start(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let inner = base.inner_as_repl_start().unwrap();
- let cmd_id = base.cmd_id();
- let history_file = String::from(inner.history_file().unwrap());
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: ReplStartArgs = serde_json::from_value(args)?;
- debug!("op_repl_start {}", history_file);
- let history_path = repl::history_path(&state.dir, &history_file);
+ debug!("op_repl_start {}", args.history_file);
+ let history_path = repl::history_path(&state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = resources::add_repl(repl);
- let builder = &mut FlatBufferBuilder::new();
- let inner = msg::ReplStartRes::create(
- builder,
- &msg::ReplStartResArgs { rid: resource.rid },
- );
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ReplStartRes,
- ..Default::default()
- },
- ))
+ Ok(JsonOp::Sync(json!(resource.rid)))
+}
+
+#[derive(Deserialize)]
+struct ReplReadlineArgs {
+ rid: i32,
+ prompt: String,
}
pub fn op_repl_readline(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let inner = base.inner_as_repl_readline().unwrap();
- let cmd_id = base.cmd_id();
- let rid = inner.rid();
- let prompt = inner.prompt().unwrap().to_owned();
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: ReplReadlineArgs = serde_json::from_value(args)?;
+ let rid = args.rid;
+ let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
- blocking(base.sync(), move || {
- let repl = resources::get_repl(rid)?;
+ blocking_json(false, move || {
+ let repl = resources::get_repl(rid as u32)?;
let line = repl.lock().unwrap().readline(&prompt)?;
-
- let builder = &mut FlatBufferBuilder::new();
- let line_off = builder.create_string(&line);
- let inner = msg::ReplReadlineRes::create(
- builder,
- &msg::ReplReadlineResArgs {
- line: Some(line_off),
- },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ReplReadlineRes,
- ..Default::default()
- },
- ))
+ Ok(json!(line))
})
}
diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs
index 975d94490..dafd01d08 100644
--- a/cli/ops/resources.rs
+++ b/cli/ops/resources.rs
@@ -1,54 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::ok_buf;
-use super::utils::CliOpResult;
-use crate::msg;
+use super::dispatch_json::{JsonOp, Value};
use crate::resources::table_entries;
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
pub fn op_resources(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
-
- let builder = &mut FlatBufferBuilder::new();
+ _args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
let serialized_resources = table_entries();
-
- let res: Vec<_> = serialized_resources
- .iter()
- .map(|(key, value)| {
- let repr = builder.create_string(value);
-
- msg::Resource::create(
- builder,
- &msg::ResourceArgs {
- rid: *key,
- repr: Some(repr),
- },
- )
- })
- .collect();
-
- let resources = builder.create_vector(&res);
- let inner = msg::ResourcesRes::create(
- builder,
- &msg::ResourcesResArgs {
- resources: Some(resources),
- },
- );
-
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::ResourcesRes,
- ..Default::default()
- },
- ))
+ Ok(JsonOp::Sync(json!(serialized_resources)))
}
diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs
index 550d91f2c..46217a188 100644
--- a/cli/ops/timers.rs
+++ b/cli/ops/timers.rs
@@ -1,12 +1,7 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::empty_buf;
-use super::utils::CliOpResult;
-use crate::deno_error;
-use crate::msg;
+use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::state::ThreadSafeState;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
use futures::Future;
use std;
use std::time::Duration;
@@ -14,50 +9,34 @@ use std::time::Instant;
pub fn op_global_timer_stop(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if !base.sync() {
- return Err(deno_error::no_async_support());
- }
- assert!(data.is_none());
+ _args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
let state = state;
let mut t = state.global_timer.lock().unwrap();
t.cancel();
- Ok(Op::Sync(empty_buf()))
+ Ok(JsonOp::Sync(json!({})))
+}
+
+#[derive(Deserialize)]
+struct GlobalTimerArgs {
+ timeout: u64,
}
pub fn op_global_timer(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_global_timer().unwrap();
- let val = inner.timeout();
- assert!(val >= 0);
+ args: Value,
+ _zero_copy: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: GlobalTimerArgs = serde_json::from_value(args)?;
+ let val = args.timeout;
let state = state;
let mut t = state.global_timer.lock().unwrap();
let deadline = Instant::now() + Duration::from_millis(val as u64);
- let f = t.new_timeout(deadline);
+ let f = t
+ .new_timeout(deadline)
+ .then(move |_| futures::future::ok(json!({})));
- Ok(Op::Async(Box::new(f.then(move |_| {
- let builder = &mut FlatBufferBuilder::new();
- let inner =
- msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {});
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::GlobalTimerRes,
- ..Default::default()
- },
- ))
- }))))
+ Ok(JsonOp::Async(Box::new(f)))
}
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 1eb11420f..4eeecd068 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -1,17 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::ok_buf;
-use super::utils::CliOpResult;
-use crate::deno_error;
+use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
-use crate::msg;
use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
use futures::Future;
@@ -39,48 +34,32 @@ impl Future for GetMessageFuture {
/// Get message from host as guest worker
pub fn op_worker_get_message(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
-
+ _args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
let op = GetMessageFuture {
state: state.clone(),
};
- let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
- let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
- debug!("op_worker_get_message");
- let builder = &mut FlatBufferBuilder::new();
-
- let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
- let inner = msg::WorkerGetMessageRes::create(
- builder,
- &msg::WorkerGetMessageResArgs { data },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::WorkerGetMessageRes,
- ..Default::default()
- },
- ))
- });
- Ok(Op::Async(Box::new(op)))
+
+ let op = op
+ .map_err(move |_| -> ErrBox { unimplemented!() })
+ .and_then(move |maybe_buf| {
+ debug!("op_worker_get_message");
+
+ futures::future::ok(json!({
+ "data": maybe_buf.map(|buf| buf.to_owned())
+ }))
+ });
+
+ Ok(JsonOp::Async(Box::new(op)))
}
/// Post message to host as guest worker
pub fn op_worker_post_message(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
+ _args: Value,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- let cmd_id = base.cmd_id();
+) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
@@ -90,33 +69,34 @@ pub fn op_worker_post_message(
tx.send(d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
-
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+
+ Ok(JsonOp::Sync(json!({})))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateWorkerArgs {
+ specifier: String,
+ include_deno_namespace: bool,
+ has_source_code: bool,
+ source_code: String,
}
/// Create worker as the host
pub fn op_create_worker(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_create_worker().unwrap();
- let specifier = inner.specifier().unwrap();
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: CreateWorkerArgs = serde_json::from_value(args)?;
+
+ let specifier = args.specifier.as_ref();
// Only include deno namespace if requested AND current worker
// has included namespace (to avoid escalation).
let include_deno_namespace =
- inner.include_deno_namespace() && state.include_deno_namespace;
- let has_source_code = inner.has_source_code();
- let source_code = inner.source_code().unwrap();
+ args.include_deno_namespace && state.include_deno_namespace;
+ let has_source_code = args.has_source_code;
+ let source_code = args.source_code;
let parent_state = state.clone();
@@ -150,24 +130,13 @@ pub fn op_create_worker(
let exec_cb = move |worker: Worker| {
let mut workers_tl = parent_state.workers.lock().unwrap();
workers_tl.insert(rid, worker.shared());
- let builder = &mut FlatBufferBuilder::new();
- let msg_inner =
- msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid });
- serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::CreateWorkerRes,
- ..Default::default()
- },
- )
+ json!(rid)
};
// Has provided source code, execute immediately.
if has_source_code {
worker.execute(&source_code).unwrap();
- return ok_buf(exec_cb(worker));
+ return Ok(JsonOp::Sync(exec_cb(worker)));
}
let op = worker
@@ -175,22 +144,23 @@ pub fn op_create_worker(
.and_then(move |()| Ok(exec_cb(worker)));
let result = op.wait()?;
- Ok(Op::Sync(result))
+ Ok(JsonOp::Sync(result))
+}
+
+#[derive(Deserialize)]
+struct HostGetWorkerClosedArgs {
+ rid: i32,
}
/// Return when the worker closes
pub fn op_host_get_worker_closed(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_host_get_worker_closed().unwrap();
- let rid = inner.rid();
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
let state = state.clone();
let shared_worker_future = {
@@ -199,79 +169,58 @@ pub fn op_host_get_worker_closed(
worker.clone()
};
- let op = Box::new(shared_worker_future.then(move |_result| {
- let builder = &mut FlatBufferBuilder::new();
-
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
- }));
- Ok(Op::Async(Box::new(op)))
+ let op = Box::new(
+ shared_worker_future.then(move |_result| futures::future::ok(json!({}))),
+ );
+
+ Ok(JsonOp::Async(Box::new(op)))
+}
+
+#[derive(Deserialize)]
+struct HostGetMessageArgs {
+ rid: i32,
}
/// Get message from guest worker as host
pub fn op_host_get_message(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_host_get_message().unwrap();
- let rid = inner.rid();
-
- let op = resources::get_message_from_worker(rid);
- let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
- let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
- let builder = &mut FlatBufferBuilder::new();
-
- let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
- let msg_inner = msg::HostGetMessageRes::create(
- builder,
- &msg::HostGetMessageResArgs { data },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::HostGetMessageRes,
- ..Default::default()
- },
- ))
- });
- Ok(Op::Async(Box::new(op)))
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: HostGetMessageArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
+ let op = resources::get_message_from_worker(rid)
+ .map_err(move |_| -> ErrBox { unimplemented!() })
+ .and_then(move |maybe_buf| {
+ futures::future::ok(json!({
+ "data": maybe_buf.map(|buf| buf.to_owned())
+ }))
+ });
+
+ Ok(JsonOp::Async(Box::new(op)))
+}
+
+#[derive(Deserialize)]
+struct HostPostMessageArgs {
+ rid: i32,
}
/// Post message to guest worker as host
pub fn op_host_post_message(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
+ args: Value,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_host_post_message().unwrap();
- let rid = inner.rid();
+) -> Result<JsonOp, ErrBox> {
+ let args: HostPostMessageArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
resources::post_message_to_worker(rid, d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
-
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+
+ Ok(JsonOp::Sync(json!({})))
}