summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-14 18:10:25 +0100
committerRy Dahl <ry@tinyclouds.org>2019-11-14 12:10:25 -0500
commit8b90b8e88325d28fe41d8312ea91417b6e66a12e (patch)
treea80d85de86353e05a5e59fe8b9890635dcbf200c /cli/ops/net.rs
parent38ffe8886db104068af07cd5efe3d83f42da8ed6 (diff)
refactor: per-worker resource table, take 2 (#3342)
- removes global `RESOURCE_TABLE` - resource tables are now created per `Worker` in `State` - renames `CliResource` to `StreamResource` and moves all logic related to it to `cli/ops/io.rs` - removes `cli/resources.rs` - adds `state` argument to `op_read` and `op_write` and consequently adds `stateful_minimal_op` to `State` - IMPORTANT NOTE: workers don't have access to process stdio - this is caused by fact that dropping worker would close stdout for process (because it's constructed from raw handle, which closes underlying file descriptor on drop)
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs60
1 files changed, 33 insertions, 27 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index a4b3bf934..2fe81e140 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
-use crate::resources;
-use crate::resources::CliResource;
-use crate::resources::Resource;
use crate::state::ThreadSafeState;
+use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@@ -34,18 +33,19 @@ enum AcceptState {
}
/// Simply accepts a connection.
-pub fn accept(rid: ResourceId) -> Accept {
+pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
Accept {
- state: AcceptState::Eager,
+ accept_state: AcceptState::Eager,
rid,
+ state: state.clone(),
}
}
/// A future representing state of accepting a TCP connection.
-#[derive(Debug)]
pub struct Accept {
- state: AcceptState,
+ accept_state: AcceptState,
rid: ResourceId,
+ state: ThreadSafeState,
}
impl Future for Accept {
@@ -53,11 +53,11 @@ impl Future for Accept {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == AcceptState::Done {
+ if self.accept_state == AcceptState::Done {
panic!("poll Accept after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let listener_resource = table
.get_mut::<TcpListenerResource>(self.rid)
.ok_or_else(|| {
@@ -70,22 +70,22 @@ impl Future for Accept {
let listener = &mut listener_resource.listener;
- if self.state == AcceptState::Eager {
+ if self.accept_state == AcceptState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
- self.state = AcceptState::Pending;
+ self.accept_state = AcceptState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
return Err(e);
}
}
@@ -94,7 +94,7 @@ impl Future for Accept {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@@ -103,7 +103,7 @@ impl Future for Accept {
}
Err(e) => {
listener_resource.untrack_task();
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
Err(e)
}
}
@@ -116,23 +116,25 @@ struct AcceptArgs {
}
fn op_accept(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
-
- let table = resources::lock_resource_table();
+ let state_ = state.clone();
+ let table = state.lock_resource_table();
table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
- let op = accept(rid)
+ let op = accept(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let rid = resources::add_tcp_stream(tcp_stream);
+ let mut table = state_.lock_resource_table();
+ let rid =
+ table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@@ -161,7 +163,7 @@ fn op_dial(
) -> Result<JsonOp, ErrBox> {
let args: DialArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp"); // TODO Support others.
-
+ let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
@@ -170,7 +172,9 @@ fn op_dial(
.and_then(move |tcp_stream| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let rid = resources::add_tcp_stream(tcp_stream);
+ let mut table = state_.lock_resource_table();
+ let rid = table
+ .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@@ -193,7 +197,7 @@ struct ShutdownArgs {
}
fn op_shutdown(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -208,10 +212,12 @@ fn op_shutdown(
_ => unimplemented!(),
};
- let mut table = resources::lock_resource_table();
- let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?;
+ let mut table = state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(rid)
+ .ok_or_else(bad_resource)?;
match resource {
- CliResource::TcpStream(ref mut stream) => {
+ StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?;
}
_ => return Err(bad_resource()),
@@ -299,7 +305,7 @@ fn op_listen(
task: None,
local_addr,
};
- let mut table = resources::lock_resource_table();
+ let mut table = state.lock_resource_table();
let rid = table.add("tcpListener", Box::new(listener_resource));
Ok(JsonOp::Sync(json!({