diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 60 |
1 files changed, 33 insertions, 27 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index a4b3bf934..2fe81e140 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -1,12 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; -use crate::resources; -use crate::resources::CliResource; -use crate::resources::Resource; use crate::state::ThreadSafeState; +use deno::Resource; use deno::*; use futures::Async; use futures::Future; @@ -34,18 +33,19 @@ enum AcceptState { } /// Simply accepts a connection. -pub fn accept(rid: ResourceId) -> Accept { +pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { Accept { - state: AcceptState::Eager, + accept_state: AcceptState::Eager, rid, + state: state.clone(), } } /// A future representing state of accepting a TCP connection. -#[derive(Debug)] pub struct Accept { - state: AcceptState, + accept_state: AcceptState, rid: ResourceId, + state: ThreadSafeState, } impl Future for Accept { @@ -53,11 +53,11 @@ impl Future for Accept { type Error = ErrBox; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.state == AcceptState::Done { + if self.accept_state == AcceptState::Done { panic!("poll Accept after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let listener_resource = table .get_mut::<TcpListenerResource>(self.rid) .ok_or_else(|| { @@ -70,22 +70,22 @@ impl Future for Accept { let listener = &mut listener_resource.listener; - if self.state == AcceptState::Eager { + if self.accept_state == AcceptState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; return Ok((stream, addr).into()); } Ok(Async::NotReady) => { - self.state = AcceptState::Pending; + self.accept_state = AcceptState::Pending; return Ok(Async::NotReady); } Err(e) => { - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; return Err(e); } } @@ -94,7 +94,7 @@ impl Future for Accept { match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { listener_resource.untrack_task(); - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; Ok((stream, addr).into()) } Ok(Async::NotReady) => { @@ -103,7 +103,7 @@ impl Future for Accept { } Err(e) => { listener_resource.untrack_task(); - self.state = AcceptState::Done; + self.accept_state = AcceptState::Done; Err(e) } } @@ -116,23 +116,25 @@ struct AcceptArgs { } fn op_accept( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - - let table = resources::lock_resource_table(); + let state_ = state.clone(); + let table = state.lock_resource_table(); table .get::<TcpListenerResource>(rid) .ok_or_else(bad_resource)?; - let op = accept(rid) + let op = accept(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let rid = resources::add_tcp_stream(tcp_stream); + let mut table = state_.lock_resource_table(); + let rid = + table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) @@ -161,7 +163,7 @@ fn op_dial( ) -> Result<JsonOp, ErrBox> { let args: DialArgs = serde_json::from_value(args)?; assert_eq!(args.transport, "tcp"); // TODO Support others. - + let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { @@ -170,7 +172,9 @@ fn op_dial( .and_then(move |tcp_stream| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let rid = resources::add_tcp_stream(tcp_stream); + let mut table = state_.lock_resource_table(); + let rid = table + .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) @@ -193,7 +197,7 @@ struct ShutdownArgs { } fn op_shutdown( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { @@ -208,10 +212,12 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut table = resources::lock_resource_table(); - let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?; + let mut table = state.lock_resource_table(); + let resource = table + .get_mut::<StreamResource>(rid) + .ok_or_else(bad_resource)?; match resource { - CliResource::TcpStream(ref mut stream) => { + StreamResource::TcpStream(ref mut stream) => { TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?; } _ => return Err(bad_resource()), @@ -299,7 +305,7 @@ fn op_listen( task: None, local_addr, }; - let mut table = resources::lock_resource_table(); + let mut table = state.lock_resource_table(); let rid = table.add("tcpListener", Box::new(listener_resource)); Ok(JsonOp::Sync(json!({ |