diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 167 |
1 files changed, 73 insertions, 94 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 5ce562492..650127fad 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,15 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use super::dispatch_flatbuffers::serialize_response; -use super::utils::*; +use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error; -use crate::msg; use crate::resolve_addr::resolve_addr; use crate::resources; use crate::resources::Resource; use crate::state::ThreadSafeState; use crate::tokio_util; use deno::*; -use flatbuffers::FlatBufferBuilder; use futures::Future; use std; use std::convert::From; @@ -18,15 +15,18 @@ use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; +#[derive(Deserialize)] +struct AcceptArgs { + rid: i32, +} + pub fn op_accept( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_accept().unwrap(); - let server_rid = inner.rid(); + args: Value, + _zero_copy: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: AcceptArgs = serde_json::from_value(args)?; + let server_rid = args.rid as u32; match resources::lookup(server_rid) { None => Err(deno_error::bad_resource()), @@ -34,55 +34,65 @@ pub fn op_accept( let op = tokio_util::accept(server_resource) .map_err(ErrBox::from) .and_then(move |(tcp_stream, _socket_addr)| { - new_conn(cmd_id, tcp_stream) + let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); + futures::future::ok(json!({ + "rid": tcp_stream_resource.rid + })) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + + Ok(JsonOp::Async(Box::new(op))) } } } +#[derive(Deserialize)] +struct DialArgs { + network: String, + address: String, +} + pub fn op_dial( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_dial().unwrap(); - let network = inner.network().unwrap(); + args: Value, + _zero_copy: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: DialArgs = serde_json::from_value(args)?; + let network = args.network; assert_eq!(network, "tcp"); // TODO Support others. - let address = inner.address().unwrap(); + let address = args.address; state.check_net(&address)?; - let op = resolve_addr(address).and_then(move |addr| { - TcpStream::connect(&addr) - .map_err(ErrBox::from) - .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)) + let op = resolve_addr(&address).and_then(move |addr| { + TcpStream::connect(&addr).map_err(ErrBox::from).and_then( + move |tcp_stream| { + let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); + futures::future::ok(json!({ + "rid": tcp_stream_resource.rid + })) + }, + ) }); - if base.sync() { - let buf = op.wait()?; - Ok(Op::Sync(buf)) - } else { - Ok(Op::Async(Box::new(op))) - } + + Ok(JsonOp::Async(Box::new(op))) +} + +#[derive(Deserialize)] +struct ShutdownArgs { + rid: i32, + how: i32, } pub fn op_shutdown( _state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - assert!(data.is_none()); - let inner = base.inner_as_shutdown().unwrap(); - let rid = inner.rid(); - let how = inner.how(); - match resources::lookup(rid) { + args: Value, + _zero_copy: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: ShutdownArgs = serde_json::from_value(args)?; + + let rid = args.rid; + let how = args.how; + match resources::lookup(rid as u32) { None => Err(deno_error::bad_resource()), Some(mut resource) => { let shutdown_mode = match how { @@ -90,67 +100,36 @@ pub fn op_shutdown( 1 => Shutdown::Write, _ => unimplemented!(), }; - blocking(base.sync(), move || { - // Use UFCS for disambiguation - Resource::shutdown(&mut resource, shutdown_mode)?; - Ok(empty_buf()) - }) + + // Use UFCS for disambiguation + Resource::shutdown(&mut resource, shutdown_mode)?; + Ok(JsonOp::Sync(json!({}))) } } } +#[derive(Deserialize)] +struct ListenArgs { + network: String, + address: String, +} + pub fn op_listen( state: &ThreadSafeState, - base: &msg::Base<'_>, - data: Option<PinnedBuf>, -) -> CliOpResult { - assert!(data.is_none()); - let cmd_id = base.cmd_id(); - let inner = base.inner_as_listen().unwrap(); - let network = inner.network().unwrap(); + args: Value, + _zero_copy: Option<PinnedBuf>, +) -> Result<JsonOp, ErrBox> { + let args: ListenArgs = serde_json::from_value(args)?; + + let network = args.network; assert_eq!(network, "tcp"); - let address = inner.address().unwrap(); + let address = args.address; state.check_net(&address)?; - let addr = resolve_addr(address).wait()?; + let addr = resolve_addr(&address).wait()?; let listener = TcpListener::bind(&addr)?; let resource = resources::add_tcp_listener(listener); - let builder = &mut FlatBufferBuilder::new(); - let inner = - msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid }); - let response_buf = serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ListenRes, - ..Default::default() - }, - ); - ok_buf(response_buf) -} - -fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> Result<Buf, ErrBox> { - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - // TODO forward socket_addr to client. - - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::NewConn::create( - builder, - &msg::NewConnArgs { - rid: tcp_stream_resource.rid, - ..Default::default() - }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::NewConn, - ..Default::default() - }, - )) + Ok(JsonOp::Sync(json!(resource.rid))) } |