summaryrefslogtreecommitdiff
path: root/cli/ops/workers.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-08-24 17:31:14 +0200
committerRyan Dahl <ry@tinyclouds.org>2019-08-24 08:31:14 -0700
commit137f33733d365026903d40e7cde6e34ac6c36dcf (patch)
treee8096e119c374b199cd498ccfa1ee0ef4e6ba950 /cli/ops/workers.rs
parent79f82cf10ed1dbf91346994250d7311a4d74377a (diff)
port more ops to JSON (#2809)
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r--cli/ops/workers.rs229
1 files changed, 89 insertions, 140 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 1eb11420f..4eeecd068 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -1,17 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_flatbuffers::serialize_response;
-use super::utils::ok_buf;
-use super::utils::CliOpResult;
-use crate::deno_error;
+use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
-use crate::msg;
use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use deno::*;
-use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
use futures::Future;
@@ -39,48 +34,32 @@ impl Future for GetMessageFuture {
/// Get message from host as guest worker
pub fn op_worker_get_message(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
-
+ _args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
let op = GetMessageFuture {
state: state.clone(),
};
- let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
- let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
- debug!("op_worker_get_message");
- let builder = &mut FlatBufferBuilder::new();
-
- let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
- let inner = msg::WorkerGetMessageRes::create(
- builder,
- &msg::WorkerGetMessageResArgs { data },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(inner.as_union_value()),
- inner_type: msg::Any::WorkerGetMessageRes,
- ..Default::default()
- },
- ))
- });
- Ok(Op::Async(Box::new(op)))
+
+ let op = op
+ .map_err(move |_| -> ErrBox { unimplemented!() })
+ .and_then(move |maybe_buf| {
+ debug!("op_worker_get_message");
+
+ futures::future::ok(json!({
+ "data": maybe_buf.map(|buf| buf.to_owned())
+ }))
+ });
+
+ Ok(JsonOp::Async(Box::new(op)))
}
/// Post message to host as guest worker
pub fn op_worker_post_message(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
+ _args: Value,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- let cmd_id = base.cmd_id();
+) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
@@ -90,33 +69,34 @@ pub fn op_worker_post_message(
tx.send(d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
-
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+
+ Ok(JsonOp::Sync(json!({})))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateWorkerArgs {
+ specifier: String,
+ include_deno_namespace: bool,
+ has_source_code: bool,
+ source_code: String,
}
/// Create worker as the host
pub fn op_create_worker(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_create_worker().unwrap();
- let specifier = inner.specifier().unwrap();
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: CreateWorkerArgs = serde_json::from_value(args)?;
+
+ let specifier = args.specifier.as_ref();
// Only include deno namespace if requested AND current worker
// has included namespace (to avoid escalation).
let include_deno_namespace =
- inner.include_deno_namespace() && state.include_deno_namespace;
- let has_source_code = inner.has_source_code();
- let source_code = inner.source_code().unwrap();
+ args.include_deno_namespace && state.include_deno_namespace;
+ let has_source_code = args.has_source_code;
+ let source_code = args.source_code;
let parent_state = state.clone();
@@ -150,24 +130,13 @@ pub fn op_create_worker(
let exec_cb = move |worker: Worker| {
let mut workers_tl = parent_state.workers.lock().unwrap();
workers_tl.insert(rid, worker.shared());
- let builder = &mut FlatBufferBuilder::new();
- let msg_inner =
- msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid });
- serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::CreateWorkerRes,
- ..Default::default()
- },
- )
+ json!(rid)
};
// Has provided source code, execute immediately.
if has_source_code {
worker.execute(&source_code).unwrap();
- return ok_buf(exec_cb(worker));
+ return Ok(JsonOp::Sync(exec_cb(worker)));
}
let op = worker
@@ -175,22 +144,23 @@ pub fn op_create_worker(
.and_then(move |()| Ok(exec_cb(worker)));
let result = op.wait()?;
- Ok(Op::Sync(result))
+ Ok(JsonOp::Sync(result))
+}
+
+#[derive(Deserialize)]
+struct HostGetWorkerClosedArgs {
+ rid: i32,
}
/// Return when the worker closes
pub fn op_host_get_worker_closed(
state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_host_get_worker_closed().unwrap();
- let rid = inner.rid();
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
let state = state.clone();
let shared_worker_future = {
@@ -199,79 +169,58 @@ pub fn op_host_get_worker_closed(
worker.clone()
};
- let op = Box::new(shared_worker_future.then(move |_result| {
- let builder = &mut FlatBufferBuilder::new();
-
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
- }));
- Ok(Op::Async(Box::new(op)))
+ let op = Box::new(
+ shared_worker_future.then(move |_result| futures::future::ok(json!({}))),
+ );
+
+ Ok(JsonOp::Async(Box::new(op)))
+}
+
+#[derive(Deserialize)]
+struct HostGetMessageArgs {
+ rid: i32,
}
/// Get message from guest worker as host
pub fn op_host_get_message(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
- data: Option<PinnedBuf>,
-) -> CliOpResult {
- if base.sync() {
- return Err(deno_error::no_sync_support());
- }
- assert!(data.is_none());
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_host_get_message().unwrap();
- let rid = inner.rid();
-
- let op = resources::get_message_from_worker(rid);
- let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
- let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
- let builder = &mut FlatBufferBuilder::new();
-
- let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
- let msg_inner = msg::HostGetMessageRes::create(
- builder,
- &msg::HostGetMessageResArgs { data },
- );
- Ok(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- inner: Some(msg_inner.as_union_value()),
- inner_type: msg::Any::HostGetMessageRes,
- ..Default::default()
- },
- ))
- });
- Ok(Op::Async(Box::new(op)))
+ args: Value,
+ _data: Option<PinnedBuf>,
+) -> Result<JsonOp, ErrBox> {
+ let args: HostGetMessageArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
+ let op = resources::get_message_from_worker(rid)
+ .map_err(move |_| -> ErrBox { unimplemented!() })
+ .and_then(move |maybe_buf| {
+ futures::future::ok(json!({
+ "data": maybe_buf.map(|buf| buf.to_owned())
+ }))
+ });
+
+ Ok(JsonOp::Async(Box::new(op)))
+}
+
+#[derive(Deserialize)]
+struct HostPostMessageArgs {
+ rid: i32,
}
/// Post message to guest worker as host
pub fn op_host_post_message(
_state: &ThreadSafeState,
- base: &msg::Base<'_>,
+ args: Value,
data: Option<PinnedBuf>,
-) -> CliOpResult {
- let cmd_id = base.cmd_id();
- let inner = base.inner_as_host_post_message().unwrap();
- let rid = inner.rid();
+) -> Result<JsonOp, ErrBox> {
+ let args: HostPostMessageArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
resources::post_message_to_worker(rid, d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
- let builder = &mut FlatBufferBuilder::new();
-
- ok_buf(serialize_response(
- cmd_id,
- builder,
- msg::BaseArgs {
- ..Default::default()
- },
- ))
+
+ Ok(JsonOp::Sync(json!({})))
}