diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-08-24 17:31:14 +0200 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-08-24 08:31:14 -0700 |
commit | 137f33733d365026903d40e7cde6e34ac6c36dcf (patch) | |
tree | e8096e119c374b199cd498ccfa1ee0ef4e6ba950 /cli/ops/workers.rs | |
parent | 79f82cf10ed1dbf91346994250d7311a4d74377a (diff) |
port more ops to JSON (#2809)
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r-- | cli/ops/workers.rs | 229 |
1 files changed, 89 insertions, 140 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 1eb11420f..4eeecd068 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -1,17 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::ok_buf; -use super::utils::CliOpResult; -use crate::deno_error; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; -use crate::msg; use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; use crate::worker::Worker; use deno::*; -use flatbuffers::FlatBufferBuilder; use futures; use futures::Async; use futures::Future; @@ -39,48 +34,32 @@ impl Future for GetMessageFuture { /// Get message from host as guest worker pub fn op_worker_get_message( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - + _args: Value, + _data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { let op = GetMessageFuture { state: state.clone(), }; - let op = op.map_err(move |_| -> ErrBox { unimplemented!() }); - let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> { - debug!("op_worker_get_message"); - let builder = &mut FlatBufferBuilder::new(); - - let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); - let inner = msg::WorkerGetMessageRes::create( - builder, - &msg::WorkerGetMessageResArgs { data }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::WorkerGetMessageRes, - ..Default::default() - }, - )) - }); - Ok(Op::Async(Box::new(op))) + + let op = op + .map_err(move |_| -> ErrBox { unimplemented!() }) + .and_then(move |maybe_buf| { + debug!("op_worker_get_message"); + + futures::future::ok(json!({ + "data": maybe_buf.map(|buf| buf.to_owned()) + })) + }); + + Ok(JsonOp::Async(Box::new(op))) } /// Post message to host as guest worker pub fn op_worker_post_message( state: &ThreadSafeState, - base: &msg::Base<'_>, + _args: Value, data: Option<PinnedBuf>, -) -> CliOpResult { - let cmd_id = base.cmd_id(); +) -> Result<JsonOp, ErrBox> { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let tx = { @@ -90,33 +69,34 @@ pub fn op_worker_post_message( tx.send(d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - let builder = &mut FlatBufferBuilder::new(); - - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) + + Ok(JsonOp::Sync(json!({}))) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateWorkerArgs { + specifier: String, + include_deno_namespace: bool, + has_source_code: bool, + source_code: String, } /// Create worker as the host pub fn op_create_worker( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_create_worker().unwrap(); - let specifier = inner.specifier().unwrap(); + args: Value, + _data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: CreateWorkerArgs = serde_json::from_value(args)?; + + let specifier = args.specifier.as_ref(); // Only include deno namespace if requested AND current worker // has included namespace (to avoid escalation). let include_deno_namespace = - inner.include_deno_namespace() && state.include_deno_namespace; - let has_source_code = inner.has_source_code(); - let source_code = inner.source_code().unwrap(); + args.include_deno_namespace && state.include_deno_namespace; + let has_source_code = args.has_source_code; + let source_code = args.source_code; let parent_state = state.clone(); @@ -150,24 +130,13 @@ pub fn op_create_worker( let exec_cb = move |worker: Worker| { let mut workers_tl = parent_state.workers.lock().unwrap(); workers_tl.insert(rid, worker.shared()); - let builder = &mut FlatBufferBuilder::new(); - let msg_inner = - msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid }); - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(msg_inner.as_union_value()), - inner_type: msg::Any::CreateWorkerRes, - ..Default::default() - }, - ) + json!(rid) }; // Has provided source code, execute immediately. if has_source_code { worker.execute(&source_code).unwrap(); - return ok_buf(exec_cb(worker)); + return Ok(JsonOp::Sync(exec_cb(worker))); } let op = worker @@ -175,22 +144,23 @@ pub fn op_create_worker( .and_then(move |()| Ok(exec_cb(worker))); let result = op.wait()?; - Ok(Op::Sync(result)) + Ok(JsonOp::Sync(result)) +} + +#[derive(Deserialize)] +struct HostGetWorkerClosedArgs { + rid: i32, } /// Return when the worker closes pub fn op_host_get_worker_closed( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_host_get_worker_closed().unwrap(); - let rid = inner.rid(); + args: Value, + _data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; + + let rid = args.rid as u32; let state = state.clone(); let shared_worker_future = { @@ -199,79 +169,58 @@ pub fn op_host_get_worker_closed( worker.clone() }; - let op = Box::new(shared_worker_future.then(move |_result| { - let builder = &mut FlatBufferBuilder::new(); - - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) - })); - Ok(Op::Async(Box::new(op))) + let op = Box::new( + shared_worker_future.then(move |_result| futures::future::ok(json!({}))), + ); + + Ok(JsonOp::Async(Box::new(op))) +} + +#[derive(Deserialize)] +struct HostGetMessageArgs { + rid: i32, } /// Get message from guest worker as host pub fn op_host_get_message( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - if base.sync() { - return Err(deno_error::no_sync_support()); - } - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_host_get_message().unwrap(); - let rid = inner.rid(); - - let op = resources::get_message_from_worker(rid); - let op = op.map_err(move |_| -> ErrBox { unimplemented!() }); - let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> { - let builder = &mut FlatBufferBuilder::new(); - - let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); - let msg_inner = msg::HostGetMessageRes::create( - builder, - &msg::HostGetMessageResArgs { data }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(msg_inner.as_union_value()), - inner_type: msg::Any::HostGetMessageRes, - ..Default::default() - }, - )) - }); - Ok(Op::Async(Box::new(op))) + args: Value, + _data: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: HostGetMessageArgs = serde_json::from_value(args)?; + + let rid = args.rid as u32; + let op = resources::get_message_from_worker(rid) + .map_err(move |_| -> ErrBox { unimplemented!() }) + .and_then(move |maybe_buf| { + futures::future::ok(json!({ + "data": maybe_buf.map(|buf| buf.to_owned()) + })) + }); + + Ok(JsonOp::Async(Box::new(op))) +} + +#[derive(Deserialize)] +struct HostPostMessageArgs { + rid: i32, } /// Post message to guest worker as host pub fn op_host_post_message( _state: &ThreadSafeState, - base: &msg::Base<'_>, + args: Value, data: Option<PinnedBuf>, -) -> CliOpResult { - let cmd_id = base.cmd_id(); - let inner = base.inner_as_host_post_message().unwrap(); - let rid = inner.rid(); +) -> Result<JsonOp, ErrBox> { + let args: HostPostMessageArgs = serde_json::from_value(args)?; + + let rid = args.rid as u32; let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); resources::post_message_to_worker(rid, d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - let builder = &mut FlatBufferBuilder::new(); - - ok_buf(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) + + Ok(JsonOp::Sync(json!({}))) } |