diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-14 04:16:57 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-13 22:16:57 -0500 |
commit | fd62379eafde6571f126df5650b80cfda9f74229 (patch) | |
tree | 34579151043837aaae17b36179c0aa5cf6b5e5aa /cli/ops/tls.rs | |
parent | af448e864c4ac7e2ec601a25d46f95861ff5ade0 (diff) |
refactor: per-worker resource table (#3306)
- removes global `RESOURCE_TABLE` - resource tables are now created per `Worker`
in `State`
- renames `CliResource` to `StreamResource` and moves all logic related
to it to `cli/ops/io.rs`
- removes `cli/resources.rs`
- adds `state` argument to `op_read` and `op_write` and consequently adds
`stateful_minimal_op` to `State`
- IMPORTANT NOTE: workers don't have access to process stdio - this is
caused by fact that dropping worker would close stdout for process
(because it's constructed from raw handle, which closes underlying file
descriptor on drop)
Diffstat (limited to 'cli/ops/tls.rs')
-rw-r--r-- | cli/ops/tls.rs | 54 |
1 files changed, 32 insertions, 22 deletions
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 6e8348c91..48419f76f 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -1,13 +1,13 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; -use crate::resources; -use crate::resources::Resource; use crate::state::ThreadSafeState; +use deno::Resource; use deno::*; use futures::Async; use futures::Future; @@ -60,7 +60,7 @@ pub fn op_dial_tls( ) -> Result<JsonOp, ErrBox> { let args: DialTLSArgs = serde_json::from_value(args)?; let cert_file = args.cert_file; - + let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; if let Some(path) = cert_file.clone() { state.check_read(&path)?; @@ -99,7 +99,11 @@ pub fn op_dial_tls( .connect(dnsname, tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let rid = resources::add_tls_stream(tls_stream); + let mut table = state_.lock_resource_table(); + let rid = table.add( + "clientTlsStream", + Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), + ); futures::future::ok(json!({ "rid": rid, "localAddr": local_addr.to_string(), @@ -265,7 +269,7 @@ fn op_listen_tls( task: None, local_addr, }; - let mut table = resources::lock_resource_table(); + let mut table = state.lock_resource_table(); let rid = table.add("tlsListener", Box::new(tls_listener_resource)); Ok(JsonOp::Sync(json!({ @@ -282,18 +286,19 @@ enum AcceptTlsState { } /// Simply accepts a TLS connection. -pub fn accept_tls(rid: ResourceId) -> AcceptTls { +pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls { AcceptTls { - state: AcceptTlsState::Eager, + accept_state: AcceptTlsState::Eager, rid, + state: state.clone(), } } /// A future representing state of accepting a TLS connection. -#[derive(Debug)] pub struct AcceptTls { - state: AcceptTlsState, + accept_state: AcceptTlsState, rid: ResourceId, + state: ThreadSafeState, } impl Future for AcceptTls { @@ -301,11 +306,11 @@ impl Future for AcceptTls { type Error = ErrBox; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.state == AcceptTlsState::Done { + if self.accept_state == AcceptTlsState::Done { panic!("poll AcceptTls after it's done"); } - let mut table = resources::lock_resource_table(); + let mut table = self.state.lock_resource_table(); let listener_resource = table .get_mut::<TlsListenerResource>(self.rid) .ok_or_else(|| { @@ -318,22 +323,22 @@ impl Future for AcceptTls { let listener = &mut listener_resource.listener; - if self.state == AcceptTlsState::Eager { + if self.accept_state == AcceptTlsState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; return Ok((stream, addr).into()); } Ok(Async::NotReady) => { - self.state = AcceptTlsState::Pending; + self.accept_state = AcceptTlsState::Pending; return Ok(Async::NotReady); } Err(e) => { - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; return Err(e); } } @@ -342,7 +347,7 @@ impl Future for AcceptTls { match listener.poll_accept().map_err(ErrBox::from) { Ok(Async::Ready((stream, addr))) => { listener_resource.untrack_task(); - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; Ok((stream, addr).into()) } Ok(Async::NotReady) => { @@ -351,7 +356,7 @@ impl Future for AcceptTls { } Err(e) => { listener_resource.untrack_task(); - self.state = AcceptTlsState::Done; + self.accept_state = AcceptTlsState::Done; Err(e) } } @@ -364,21 +369,22 @@ struct AcceptTlsArgs { } fn op_accept_tls( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: AcceptTlsArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - - let op = accept_tls(rid) + let state1 = state.clone(); + let state2 = state.clone(); + let op = accept_tls(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; Ok((tcp_stream, local_addr, remote_addr)) }) .and_then(move |(tcp_stream, local_addr, remote_addr)| { - let table = resources::lock_resource_table(); + let table = state1.lock_resource_table(); let resource = table .get::<TlsListenerResource>(rid) .ok_or_else(bad_resource) @@ -389,7 +395,11 @@ fn op_accept_tls( .accept(tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let rid = resources::add_server_tls_stream(tls_stream); + let mut table = state2.lock_resource_table(); + let rid = table.add( + "serverTlsStream", + Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), + ); Ok((rid, local_addr, remote_addr)) }) }) |