summaryrefslogtreecommitdiff
path: root/cli/ops/workers.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-08-24 13:20:48 -0700
committerGitHub <noreply@github.com>2019-08-24 13:20:48 -0700
commit2235dd795d3cc6c24ff1bdd1bbdcd110b4b0bdfc (patch)
treea5811adc062cbb1c66f05c863c9be245cf4fd2d2 /cli/ops/workers.rs
parentbdc0a13261deaa3748f51d9948b4e7b92864c324 (diff)
Revert json ops (#2814)
* Revert "port more ops to JSON (#2809)" This reverts commit 137f33733d365026903d40e7cde6e34ac6c36dcf. * Revert "port ops to JSON: compiler, errors, fetch, files (#2804)" This reverts commit 79f82cf10ed1dbf91346994250d7311a4d74377a. * Revert "Port rest of os ops to JSON (#2802)" This reverts commit 5b2baa5c990fbeae747e952c5dcd7a5369e950b1.
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r--cli/ops/workers.rs229
1 files changed, 140 insertions, 89 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 4eeecd068..1eb11420f 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -1,12 +1,17 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::dispatch_flatbuffers::serialize_response;
+use super::utils::ok_buf;
+use super::utils::CliOpResult;
+use crate::deno_error;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
+use crate::msg;
use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
use deno::*;
+use flatbuffers::FlatBufferBuilder;
use futures;
use futures::Async;
use futures::Future;
@@ -34,32 +39,48 @@ impl Future for GetMessageFuture {
/// Get message from host as guest worker
pub fn op_worker_get_message(
state: &ThreadSafeState,
- _args: Value,
- _data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ if base.sync() {
+ return Err(deno_error::no_sync_support());
+ }
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+
let op = GetMessageFuture {
state: state.clone(),
};
-
- let op = op
- .map_err(move |_| -> ErrBox { unimplemented!() })
- .and_then(move |maybe_buf| {
- debug!("op_worker_get_message");
-
- futures::future::ok(json!({
- "data": maybe_buf.map(|buf| buf.to_owned())
- }))
- });
-
- Ok(JsonOp::Async(Box::new(op)))
+ let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
+ debug!("op_worker_get_message");
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let inner = msg::WorkerGetMessageRes::create(
+ builder,
+ &msg::WorkerGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::WorkerGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Ok(Op::Async(Box::new(op)))
}
/// Post message to host as guest worker
pub fn op_worker_post_message(
state: &ThreadSafeState,
- _args: Value,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
+) -> CliOpResult {
+ let cmd_id = base.cmd_id();
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let tx = {
@@ -69,34 +90,33 @@ pub fn op_worker_post_message(
tx.send(d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
-
- Ok(JsonOp::Sync(json!({})))
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-struct CreateWorkerArgs {
- specifier: String,
- include_deno_namespace: bool,
- has_source_code: bool,
- source_code: String,
+ let builder = &mut FlatBufferBuilder::new();
+
+ ok_buf(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
}
/// Create worker as the host
pub fn op_create_worker(
state: &ThreadSafeState,
- args: Value,
- _data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let args: CreateWorkerArgs = serde_json::from_value(args)?;
-
- let specifier = args.specifier.as_ref();
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_create_worker().unwrap();
+ let specifier = inner.specifier().unwrap();
// Only include deno namespace if requested AND current worker
// has included namespace (to avoid escalation).
let include_deno_namespace =
- args.include_deno_namespace && state.include_deno_namespace;
- let has_source_code = args.has_source_code;
- let source_code = args.source_code;
+ inner.include_deno_namespace() && state.include_deno_namespace;
+ let has_source_code = inner.has_source_code();
+ let source_code = inner.source_code().unwrap();
let parent_state = state.clone();
@@ -130,13 +150,24 @@ pub fn op_create_worker(
let exec_cb = move |worker: Worker| {
let mut workers_tl = parent_state.workers.lock().unwrap();
workers_tl.insert(rid, worker.shared());
- json!(rid)
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner =
+ msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid });
+ serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ )
};
// Has provided source code, execute immediately.
if has_source_code {
worker.execute(&source_code).unwrap();
- return Ok(JsonOp::Sync(exec_cb(worker)));
+ return ok_buf(exec_cb(worker));
}
let op = worker
@@ -144,23 +175,22 @@ pub fn op_create_worker(
.and_then(move |()| Ok(exec_cb(worker)));
let result = op.wait()?;
- Ok(JsonOp::Sync(result))
-}
-
-#[derive(Deserialize)]
-struct HostGetWorkerClosedArgs {
- rid: i32,
+ Ok(Op::Sync(result))
}
/// Return when the worker closes
pub fn op_host_get_worker_closed(
state: &ThreadSafeState,
- args: Value,
- _data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ if base.sync() {
+ return Err(deno_error::no_sync_support());
+ }
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_worker_closed().unwrap();
+ let rid = inner.rid();
let state = state.clone();
let shared_worker_future = {
@@ -169,58 +199,79 @@ pub fn op_host_get_worker_closed(
worker.clone()
};
- let op = Box::new(
- shared_worker_future.then(move |_result| futures::future::ok(json!({}))),
- );
-
- Ok(JsonOp::Async(Box::new(op)))
-}
-
-#[derive(Deserialize)]
-struct HostGetMessageArgs {
- rid: i32,
+ let op = Box::new(shared_worker_future.then(move |_result| {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ }));
+ Ok(Op::Async(Box::new(op)))
}
/// Get message from guest worker as host
pub fn op_host_get_message(
_state: &ThreadSafeState,
- args: Value,
- _data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let args: HostGetMessageArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
- let op = resources::get_message_from_worker(rid)
- .map_err(move |_| -> ErrBox { unimplemented!() })
- .and_then(move |maybe_buf| {
- futures::future::ok(json!({
- "data": maybe_buf.map(|buf| buf.to_owned())
- }))
- });
-
- Ok(JsonOp::Async(Box::new(op)))
-}
-
-#[derive(Deserialize)]
-struct HostPostMessageArgs {
- rid: i32,
+ base: &msg::Base<'_>,
+ data: Option<PinnedBuf>,
+) -> CliOpResult {
+ if base.sync() {
+ return Err(deno_error::no_sync_support());
+ }
+ assert!(data.is_none());
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_message().unwrap();
+ let rid = inner.rid();
+
+ let op = resources::get_message_from_worker(rid);
+ let op = op.map_err(move |_| -> ErrBox { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let msg_inner = msg::HostGetMessageRes::create(
+ builder,
+ &msg::HostGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::HostGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Ok(Op::Async(Box::new(op)))
}
/// Post message to guest worker as host
pub fn op_host_post_message(
_state: &ThreadSafeState,
- args: Value,
+ base: &msg::Base<'_>,
data: Option<PinnedBuf>,
-) -> Result<JsonOp, ErrBox> {
- let args: HostPostMessageArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
+) -> CliOpResult {
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_post_message().unwrap();
+ let rid = inner.rid();
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
resources::post_message_to_worker(rid, d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
-
- Ok(JsonOp::Sync(json!({})))
+ let builder = &mut FlatBufferBuilder::new();
+
+ ok_buf(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
}