diff options
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r-- | cli/ops/workers.rs | 229 |
1 files changed, 140 insertions, 89 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 4eeecd068..1eb11420f 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -1,12 +1,17 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::dispatch_flatbuffers::serialize_response; +use super::utils::ok_buf; +use super::utils::CliOpResult; +use crate::deno_error; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; +use crate::msg; use crate::resources; use crate::startup_data; use crate::state::ThreadSafeState; use crate::worker::Worker; use deno::*; +use flatbuffers::FlatBufferBuilder; use futures; use futures::Async; use futures::Future; @@ -34,32 +39,48 @@ impl Future for GetMessageFuture { /// Get message from host as guest worker pub fn op_worker_get_message( state: &ThreadSafeState, - _args: Value, - _data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + if base.sync() { + return Err(deno_error::no_sync_support()); + } + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let op = GetMessageFuture { state: state.clone(), }; - - let op = op - .map_err(move |_| -> ErrBox { unimplemented!() }) - .and_then(move |maybe_buf| { - debug!("op_worker_get_message"); - - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf.to_owned()) - })) - }); - - Ok(JsonOp::Async(Box::new(op))) + let op = op.map_err(move |_| -> ErrBox { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> { + debug!("op_worker_get_message"); + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let inner = msg::WorkerGetMessageRes::create( + builder, + &msg::WorkerGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::WorkerGetMessageRes, + ..Default::default() + }, + )) + }); + Ok(Op::Async(Box::new(op))) } /// Post message to host as guest worker pub fn op_worker_post_message( state: &ThreadSafeState, - _args: Value, + base: &msg::Base<'_>, data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { +) -> CliOpResult { + let cmd_id = base.cmd_id(); let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let tx = { @@ -69,34 +90,33 @@ pub fn op_worker_post_message( tx.send(d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - - Ok(JsonOp::Sync(json!({}))) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateWorkerArgs { - specifier: String, - include_deno_namespace: bool, - has_source_code: bool, - source_code: String, + let builder = &mut FlatBufferBuilder::new(); + + ok_buf(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) } /// Create worker as the host pub fn op_create_worker( state: &ThreadSafeState, - args: Value, - _data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: CreateWorkerArgs = serde_json::from_value(args)?; - - let specifier = args.specifier.as_ref(); + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_create_worker().unwrap(); + let specifier = inner.specifier().unwrap(); // Only include deno namespace if requested AND current worker // has included namespace (to avoid escalation). let include_deno_namespace = - args.include_deno_namespace && state.include_deno_namespace; - let has_source_code = args.has_source_code; - let source_code = args.source_code; + inner.include_deno_namespace() && state.include_deno_namespace; + let has_source_code = inner.has_source_code(); + let source_code = inner.source_code().unwrap(); let parent_state = state.clone(); @@ -130,13 +150,24 @@ pub fn op_create_worker( let exec_cb = move |worker: Worker| { let mut workers_tl = parent_state.workers.lock().unwrap(); workers_tl.insert(rid, worker.shared()); - json!(rid) + let builder = &mut FlatBufferBuilder::new(); + let msg_inner = + msg::CreateWorkerRes::create(builder, &msg::CreateWorkerResArgs { rid }); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::CreateWorkerRes, + ..Default::default() + }, + ) }; // Has provided source code, execute immediately. if has_source_code { worker.execute(&source_code).unwrap(); - return Ok(JsonOp::Sync(exec_cb(worker))); + return ok_buf(exec_cb(worker)); } let op = worker @@ -144,23 +175,22 @@ pub fn op_create_worker( .and_then(move |()| Ok(exec_cb(worker))); let result = op.wait()?; - Ok(JsonOp::Sync(result)) -} - -#[derive(Deserialize)] -struct HostGetWorkerClosedArgs { - rid: i32, + Ok(Op::Sync(result)) } /// Return when the worker closes pub fn op_host_get_worker_closed( state: &ThreadSafeState, - args: Value, - _data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + if base.sync() { + return Err(deno_error::no_sync_support()); + } + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_worker_closed().unwrap(); + let rid = inner.rid(); let state = state.clone(); let shared_worker_future = { @@ -169,58 +199,79 @@ pub fn op_host_get_worker_closed( worker.clone() }; - let op = Box::new( - shared_worker_future.then(move |_result| futures::future::ok(json!({}))), - ); - - Ok(JsonOp::Async(Box::new(op))) -} - -#[derive(Deserialize)] -struct HostGetMessageArgs { - rid: i32, + let op = Box::new(shared_worker_future.then(move |_result| { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + })); + Ok(Op::Async(Box::new(op))) } /// Get message from guest worker as host pub fn op_host_get_message( _state: &ThreadSafeState, - args: Value, - _data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: HostGetMessageArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; - let op = resources::get_message_from_worker(rid) - .map_err(move |_| -> ErrBox { unimplemented!() }) - .and_then(move |maybe_buf| { - futures::future::ok(json!({ - "data": maybe_buf.map(|buf| buf.to_owned()) - })) - }); - - Ok(JsonOp::Async(Box::new(op))) -} - -#[derive(Deserialize)] -struct HostPostMessageArgs { - rid: i32, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + if base.sync() { + return Err(deno_error::no_sync_support()); + } + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_message().unwrap(); + let rid = inner.rid(); + + let op = resources::get_message_from_worker(rid); + let op = op.map_err(move |_| -> ErrBox { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> Result<Buf, ErrBox> { + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let msg_inner = msg::HostGetMessageRes::create( + builder, + &msg::HostGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::HostGetMessageRes, + ..Default::default() + }, + )) + }); + Ok(Op::Async(Box::new(op))) } /// Post message to guest worker as host pub fn op_host_post_message( _state: &ThreadSafeState, - args: Value, + base: &msg::Base<'_>, data: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: HostPostMessageArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; +) -> CliOpResult { + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_post_message().unwrap(); + let rid = inner.rid(); let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); resources::post_message_to_worker(rid, d) .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - - Ok(JsonOp::Sync(json!({}))) + let builder = &mut FlatBufferBuilder::new(); + + ok_buf(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) } |