diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-08-14 17:03:02 +0200 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-08-14 11:03:02 -0400 |
commit | e6c349af9f7260c2c4ec713bd231fe554240721e (patch) | |
tree | 7671dabb8270cc0c2d7f7f5a3b5f5d8d4b1e0986 /cli/ops/net.rs | |
parent | 58f0e9b9b1b53ca486ef38ae662b98cbde839248 (diff) |
split up ops.rs (#2753)
Note cli/dispatch_minimal.rs ops are not yet included in cli/ops.
This is part of work towards #2730
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs new file mode 100644 index 000000000..16a24872d --- /dev/null +++ b/cli/ops/net.rs @@ -0,0 +1,161 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::deno_error; +use crate::msg; +use crate::ops::empty_buf; +use crate::ops::ok_buf; +use crate::ops::serialize_response; +use crate::ops::CliOpResult; +use crate::resources; +use crate::resources::Resource; +use crate::state::ThreadSafeState; +use crate::tokio_util; +use deno::*; +use flatbuffers::FlatBufferBuilder; +use futures::Future; +use std; +use std::net::Shutdown; +use tokio; + +use crate::resolve_addr::resolve_addr; +use std::convert::From; +use tokio::net::TcpListener; +use tokio::net::TcpStream; + +use crate::ops::blocking; + +pub fn op_accept( + _state: &ThreadSafeState, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_accept().unwrap(); + let server_rid = inner.rid(); + + match resources::lookup(server_rid) { + None => Err(deno_error::bad_resource()), + Some(server_resource) => { + let op = tokio_util::accept(server_resource) + .map_err(ErrBox::from) + .and_then(move |(tcp_stream, _socket_addr)| { + new_conn(cmd_id, tcp_stream) + }); + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } + } + } +} + +pub fn op_dial( + state: &ThreadSafeState, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_dial().unwrap(); + let network = inner.network().unwrap(); + assert_eq!(network, "tcp"); // TODO Support others. + let address = inner.address().unwrap(); + + state.check_net(&address)?; + + let op = resolve_addr(address).and_then(move |addr| { + TcpStream::connect(&addr) + .map_err(ErrBox::from) + .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)) + }); + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } +} + +pub fn op_shutdown( + _state: &ThreadSafeState, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let inner = base.inner_as_shutdown().unwrap(); + let rid = inner.rid(); + let how = inner.how(); + match resources::lookup(rid) { + None => Err(deno_error::bad_resource()), + Some(mut resource) => { + let shutdown_mode = match how { + 0 => Shutdown::Read, + 1 => Shutdown::Write, + _ => unimplemented!(), + }; + blocking(base.sync(), move || { + // Use UFCS for disambiguation + Resource::shutdown(&mut resource, shutdown_mode)?; + Ok(empty_buf()) + }) + } + } +} + +pub fn op_listen( + state: &ThreadSafeState, + base: &msg::Base<'_>, + data: Option<PinnedBuf>, +) -> CliOpResult { + assert!(data.is_none()); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_listen().unwrap(); + let network = inner.network().unwrap(); + assert_eq!(network, "tcp"); + let address = inner.address().unwrap(); + + state.check_net(&address)?; + + let addr = resolve_addr(address).wait()?; + let listener = TcpListener::bind(&addr)?; + let resource = resources::add_tcp_listener(listener); + + let builder = &mut FlatBufferBuilder::new(); + let inner = + msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid }); + let response_buf = serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::ListenRes, + ..Default::default() + }, + ); + ok_buf(response_buf) +} + +fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> Result<Buf, ErrBox> { + let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); + // TODO forward socket_addr to client. + + let builder = &mut FlatBufferBuilder::new(); + let inner = msg::NewConn::create( + builder, + &msg::NewConnArgs { + rid: tcp_stream_resource.rid, + ..Default::default() + }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::NewConn, + ..Default::default() + }, + )) +} |