diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 167 |
1 files changed, 94 insertions, 73 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 650127fad..5ce562492 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,12 +1,15 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::dispatch_flatbuffers::serialize_response; +use super::utils::*; use crate::deno_error; +use crate::msg; use crate::resolve_addr::resolve_addr; use crate::resources; use crate::resources::Resource; use crate::state::ThreadSafeState; use crate::tokio_util; use deno::*; +use flatbuffers::FlatBufferBuilder; use futures::Future; use std; use std::convert::From; @@ -15,18 +18,15 @@ use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; -#[derive(Deserialize)] -struct AcceptArgs { - rid: i32, -} - pub fn op_accept( _state: &ThreadSafeState, - args: Value, - _zero_copy: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: AcceptArgs = serde_json::from_value(args)?; - let server_rid = args.rid as u32; + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_accept().unwrap(); + let server_rid = inner.rid(); match resources::lookup(server_rid) { None => Err(deno_error::bad_resource()), @@ -34,65 +34,55 @@ pub fn op_accept( let op = tokio_util::accept(server_resource) .map_err(ErrBox::from) .and_then(move |(tcp_stream, _socket_addr)| { - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - futures::future::ok(json!({ - "rid": tcp_stream_resource.rid - })) + new_conn(cmd_id, tcp_stream) }); - - Ok(JsonOp::Async(Box::new(op))) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } } } -#[derive(Deserialize)] -struct DialArgs { - network: String, - address: String, -} - pub fn op_dial( state: &ThreadSafeState, - args: Value, - _zero_copy: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: DialArgs = serde_json::from_value(args)?; - let network = args.network; + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_dial().unwrap(); + let network = inner.network().unwrap(); assert_eq!(network, "tcp"); // TODO Support others. - let address = args.address; + let address = inner.address().unwrap(); state.check_net(&address)?; - let op = resolve_addr(&address).and_then(move |addr| { - TcpStream::connect(&addr).map_err(ErrBox::from).and_then( - move |tcp_stream| { - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - futures::future::ok(json!({ - "rid": tcp_stream_resource.rid - })) - }, - ) + let op = resolve_addr(address).and_then(move |addr| { + TcpStream::connect(&addr) + .map_err(ErrBox::from) + .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)) }); - - Ok(JsonOp::Async(Box::new(op))) -} - -#[derive(Deserialize)] -struct ShutdownArgs { - rid: i32, - how: i32, + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } pub fn op_shutdown( _state: &ThreadSafeState, - args: Value, - _zero_copy: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: ShutdownArgs = serde_json::from_value(args)?; - - let rid = args.rid; - let how = args.how; - match resources::lookup(rid as u32) { + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let inner = base.inner_as_shutdown().unwrap(); + let rid = inner.rid(); + let how = inner.how(); + match resources::lookup(rid) { None => Err(deno_error::bad_resource()), Some(mut resource) => { let shutdown_mode = match how { @@ -100,36 +90,67 @@ pub fn op_shutdown( 1 => Shutdown::Write, _ => unimplemented!(), }; - - // Use UFCS for disambiguation - Resource::shutdown(&mut resource, shutdown_mode)?; - Ok(JsonOp::Sync(json!({}))) + blocking(base.sync(), move || { + // Use UFCS for disambiguation + Resource::shutdown(&mut resource, shutdown_mode)?; + Ok(empty_buf()) + }) } } } -#[derive(Deserialize)] -struct ListenArgs { - network: String, - address: String, -} - pub fn op_listen( state: &ThreadSafeState, - args: Value, - _zero_copy: Option<PinnedBuf>, -) -> Result<JsonOp, ErrBox> { - let args: ListenArgs = serde_json::from_value(args)?; - - let network = args.network; + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_listen().unwrap(); + let network = inner.network().unwrap(); assert_eq!(network, "tcp"); - let address = args.address; + let address = inner.address().unwrap(); state.check_net(&address)?; - let addr = resolve_addr(&address).wait()?; + let addr = resolve_addr(address).wait()?; let listener = TcpListener::bind(&addr)?; let resource = resources::add_tcp_listener(listener); - Ok(JsonOp::Sync(json!(resource.rid))) + let builder = &mut FlatBufferBuilder::new(); + let inner = + msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid }); + let response_buf = serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::ListenRes, + ..Default::default() + }, + ); + ok_buf(response_buf) +} + +fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> Result<Buf, ErrBox> { + let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); + // TODO forward socket_addr to client. + + let builder = &mut FlatBufferBuilder::new(); + let inner = msg::NewConn::create( + builder, + &msg::NewConnArgs { + rid: tcp_stream_resource.rid, + ..Default::default() + }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::NewConn, + ..Default::default() + }, + )) } |