diff options
-rw-r--r-- | cli/compilers/ts.rs | 2 | ||||
-rw-r--r-- | cli/ops/files.rs | 5 | ||||
-rw-r--r-- | cli/ops/process.rs | 2 | ||||
-rw-r--r-- | cli/ops/repl.rs | 26 | ||||
-rw-r--r-- | cli/ops/resources.rs | 5 | ||||
-rw-r--r-- | cli/ops/workers.rs | 2 | ||||
-rw-r--r-- | cli/resources.rs | 387 | ||||
-rw-r--r-- | cli/worker.rs | 15 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 29 | ||||
-rw-r--r-- | core/resources.rs | 58 |
10 files changed, 261 insertions, 270 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 9cbaaae09..e38920820 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -274,6 +274,7 @@ impl TsCompiler { let compiler_rid = resource.rid; let first_msg_fut = resources::post_message_to_worker(compiler_rid, req_msg) + .expect("Bad compiler rid") .then(move |_| worker) .then(move |result| { if let Err(err) = result { @@ -382,6 +383,7 @@ impl TsCompiler { let compiler_rid = resource.rid; let first_msg_fut = resources::post_message_to_worker(compiler_rid, req_msg) + .expect("Bad compiler rid") .then(move |_| worker) .then(move |result| { if let Err(err) = result { diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 0432acd82..c1e43ff95 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -1,5 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::fs as deno_fs; use crate::ops::json_op; use crate::resources; @@ -110,8 +111,8 @@ fn op_close( ) -> Result<JsonOp, ErrBox> { let args: CloseArgs = serde_json::from_value(args)?; - let resource = resources::lookup(args.rid as u32)?; - resource.close(); + let mut table = resources::lock_resource_table(); + table.close(args.rid as u32).ok_or_else(bad_resource)?; Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 5b59aa030..c6dd2f2d3 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -99,7 +99,7 @@ fn op_run( let resources = resources::add_child(child); Ok(JsonOp::Sync(json!({ - "rid": resources.child_rid, + "rid": resources.child_rid.unwrap(), "pid": pid, "stdinRid": resources.stdin_rid, "stdoutRid": resources.stdout_rid, diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 886e57483..6644ab159 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -1,10 +1,15 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{blocking_json, Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::repl; +use crate::repl::Repl; use crate::resources; +use crate::resources::CoreResource; use crate::state::ThreadSafeState; use deno::*; +use std::sync::Arc; +use std::sync::Mutex; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op( @@ -17,6 +22,14 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { ); } +struct ReplResource(Arc<Mutex<Repl>>); + +impl CoreResource for ReplResource { + fn inspect_repr(&self) -> &str { + "repl" + } +} + #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct ReplStartArgs { @@ -33,9 +46,10 @@ fn op_repl_start( debug!("op_repl_start {}", args.history_file); let history_path = repl::history_path(&state.dir, &args.history_file); let repl = repl::Repl::new(history_path); - let resource = resources::add_repl(repl); - - Ok(JsonOp::Sync(json!(resource.rid))) + let resource = ReplResource(Arc::new(Mutex::new(repl))); + let mut table = resources::lock_resource_table(); + let rid = table.add(Box::new(resource)); + Ok(JsonOp::Sync(json!(rid))) } #[derive(Deserialize)] @@ -50,12 +64,14 @@ fn op_repl_readline( _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: ReplReadlineArgs = serde_json::from_value(args)?; - let rid = args.rid; + let rid = args.rid as u32; let prompt = args.prompt; debug!("op_repl_readline {} {}", rid, prompt); blocking_json(false, move || { - let repl = resources::get_repl(rid as u32)?; + let table = resources::lock_resource_table(); + let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?; + let repl = resource.0.clone(); let line = repl.lock().unwrap().readline(&prompt)?; Ok(json!(line)) }) diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index 8397e9563..d92c6a83c 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,7 +1,7 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; use crate::ops::json_op; -use crate::resources::table_entries; +use crate::resources::lock_resource_table; use crate::state::ThreadSafeState; use deno::*; @@ -14,6 +14,7 @@ fn op_resources( _args: Value, _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { - let serialized_resources = table_entries(); + let resource_table = lock_resource_table(); + let serialized_resources = resource_table.entries(); Ok(JsonOp::Sync(json!(serialized_resources))) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 670ca6b47..809aa5d9b 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -250,7 +250,7 @@ fn op_host_post_message( let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - resources::post_message_to_worker(rid, d) + resources::post_message_to_worker(rid, d)? .wait() .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; diff --git a/cli/resources.rs b/cli/resources.rs index 1c791191d..4df4e1798 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -11,11 +11,13 @@ use crate::deno_error; use crate::deno_error::bad_resource; use crate::http_body::HttpBody; -use crate::repl::Repl; use crate::state::WorkerChannels; use deno::Buf; use deno::ErrBox; +pub use deno::Resource as CoreResource; +pub use deno::ResourceId; +use deno::ResourceTable; use futures; use futures::Future; @@ -24,13 +26,11 @@ use futures::Sink; use futures::Stream; use reqwest::r#async::Decoder as ReqwestDecoder; use std; -use std::collections::BTreeMap; use std::io::{Error, Read, Seek, SeekFrom, Write}; use std::net::{Shutdown, SocketAddr}; use std::process::ExitStatus; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; +use std::sync::MutexGuard; use tokio; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; @@ -40,12 +40,6 @@ use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; use tokio_rustls::TlsAcceptor; -pub type ResourceId = u32; // Sometimes referred to RID. - -// These store Deno's file descriptors. These are not necessarily the operating -// system ones. -type ResourceTable = BTreeMap<ResourceId, Repr>; - #[cfg(not(windows))] use std::os::unix::io::FromRawFd; @@ -57,14 +51,13 @@ use std::os::windows::io::FromRawHandle; extern crate winapi; lazy_static! { - // Starts at 3 because stdio is [0-2]. - static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3); static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new({ - let mut m = BTreeMap::new(); + let mut table = ResourceTable::default(); + // TODO Load these lazily during lookup? - m.insert(0, Repr::Stdin(tokio::io::stdin())); + table.add(Box::new(CliResource::Stdin(tokio::io::stdin()))); - m.insert(1, Repr::Stdout({ + table.add(Box::new(CliResource::Stdout({ #[cfg(not(windows))] let stdout = unsafe { std::fs::File::from_raw_fd(1) }; #[cfg(windows)] @@ -73,15 +66,15 @@ lazy_static! { winapi::um::winbase::STD_OUTPUT_HANDLE)) }; tokio::fs::File::from_std(stdout) - })); + }))); - m.insert(2, Repr::Stderr(tokio::io::stderr())); - m + table.add(Box::new(CliResource::Stderr(tokio::io::stderr()))); + table }); } -// Internal representation of Resource. -enum Repr { +// TODO: move listeners out of this enum and rename to `StreamResource` +enum CliResource { Stdin(tokio::io::Stdin), Stdout(tokio::fs::File), Stderr(tokio::io::Stderr), @@ -101,7 +94,6 @@ enum Repr { ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), HttpBody(HttpBody), - Repl(Arc<Mutex<Repl>>), // Enum size is bounded by the largest variant. // Use `Box` around large `Child` struct. // https://rust-lang.github.io/rust-clippy/master/index.html#large_enum_variant @@ -112,52 +104,42 @@ enum Repr { Worker(WorkerChannels), } -/// If the given rid is open, this returns the type of resource, E.G. "worker". -/// If the rid is closed or was never open, it returns None. -pub fn get_type(rid: ResourceId) -> Option<String> { - let table = RESOURCE_TABLE.lock().unwrap(); - table.get(&rid).map(inspect_repr) -} - -pub fn table_entries() -> Vec<(u32, String)> { - let table = RESOURCE_TABLE.lock().unwrap(); - - table - .iter() - .map(|(key, value)| (*key, inspect_repr(&value))) - .collect() -} - -#[test] -fn test_table_entries() { - let mut entries = table_entries(); - entries.sort(); - assert_eq!(entries[0], (0, String::from("stdin"))); - assert_eq!(entries[1], (1, String::from("stdout"))); - assert_eq!(entries[2], (2, String::from("stderr"))); -} - -fn inspect_repr(repr: &Repr) -> String { - let h_repr = match repr { - Repr::Stdin(_) => "stdin", - Repr::Stdout(_) => "stdout", - Repr::Stderr(_) => "stderr", - Repr::FsFile(_) => "fsFile", - Repr::TcpListener(_, _) => "tcpListener", - Repr::TlsListener(_, _, _) => "tlsListener", - Repr::TcpStream(_) => "tcpStream", - Repr::ClientTlsStream(_) => "clientTlsStream", - Repr::ServerTlsStream(_) => "serverTlsStream", - Repr::HttpBody(_) => "httpBody", - Repr::Repl(_) => "repl", - Repr::Child(_) => "child", - Repr::ChildStdin(_) => "childStdin", - Repr::ChildStdout(_) => "childStdout", - Repr::ChildStderr(_) => "childStderr", - Repr::Worker(_) => "worker", - }; +impl CoreResource for CliResource { + fn close(&self) { + match self { + CliResource::TcpListener(_, Some(t)) => { + t.notify(); + } + CliResource::TlsListener(_, _, Some(t)) => { + t.notify(); + } + _ => {} + } + } - String::from(h_repr) + fn inspect_repr(&self) -> &str { + match self { + CliResource::Stdin(_) => "stdin", + CliResource::Stdout(_) => "stdout", + CliResource::Stderr(_) => "stderr", + CliResource::FsFile(_) => "fsFile", + CliResource::TcpListener(_, _) => "tcpListener", + CliResource::TlsListener(_, _, _) => "tlsListener", + CliResource::TcpStream(_) => "tcpStream", + CliResource::ClientTlsStream(_) => "clientTlsStream", + CliResource::ServerTlsStream(_) => "serverTlsStream", + CliResource::HttpBody(_) => "httpBody", + CliResource::Child(_) => "child", + CliResource::ChildStdin(_) => "childStdin", + CliResource::ChildStdout(_) => "childStdout", + CliResource::ChildStderr(_) => "childStderr", + CliResource::Worker(_) => "worker", + } + } +} + +pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { + RESOURCE_TABLE.lock().unwrap() } // Abstract async file interface. @@ -170,16 +152,15 @@ pub struct Resource { impl Resource { // TODO Should it return a Resource instead of net::TcpStream? pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), Error> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&self.rid); - match maybe_repr { + let mut table = lock_resource_table(); + match table.get_mut::<CliResource>(self.rid) { None => Err(std::io::Error::new( std::io::ErrorKind::Other, "Listener has been closed", )), Some(repr) => match repr { - Repr::TcpListener(ref mut s, _) => s.poll_accept(), - Repr::TlsListener(ref mut s, _, _) => s.poll_accept(), + CliResource::TcpListener(ref mut s, _) => s.poll_accept(), + CliResource::TlsListener(ref mut s, _, _) => s.poll_accept(), _ => panic!("Cannot accept"), }, } @@ -189,15 +170,14 @@ impl Resource { &mut self, tcp_stream: TcpStream, ) -> impl Future<Item = ServerTlsStream<TcpStream>, Error = Error> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&self.rid); - match maybe_repr { + let mut table = lock_resource_table(); + match table.get_mut::<CliResource>(self.rid) { None => Either::A(futures::future::err(std::io::Error::new( std::io::ErrorKind::Other, "Listener has been closed", ))), Some(repr) => match repr { - Repr::TlsListener(_, ref mut acceptor, _) => { + CliResource::TlsListener(_, ref mut acceptor, _) => { Either::B(acceptor.accept(tcp_stream)) } _ => panic!("Cannot accept"), @@ -208,9 +188,11 @@ impl Resource { /// Track the current task (for TcpListener resource). /// Throws an error if another task is already tracked. pub fn track_task(&mut self) -> Result<(), std::io::Error> { - let mut table = RESOURCE_TABLE.lock().unwrap(); + let mut table = lock_resource_table(); // Only track if is TcpListener. - if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + if let Some(CliResource::TcpListener(_, t)) = + table.get_mut::<CliResource>(self.rid) + { // Currently, we only allow tracking a single accept task for a listener. // This might be changed in the future with multiple workers. // Caveat: TcpListener by itself also only tracks an accept task at a time. @@ -229,9 +211,11 @@ impl Resource { /// Stop tracking a task (for TcpListener resource). /// Happens when the task is done and thus no further tracking is needed. pub fn untrack_task(&mut self) { - let mut table = RESOURCE_TABLE.lock().unwrap(); + let mut table = lock_resource_table(); // Only untrack if is TcpListener. - if let Some(Repr::TcpListener(_, t)) = table.get_mut(&self.rid) { + if let Some(CliResource::TcpListener(_, t)) = + table.get_mut::<CliResource>(self.rid) + { if t.is_some() { t.take(); } @@ -241,21 +225,18 @@ impl Resource { // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. pub fn close(&self) { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let r = table.remove(&self.rid).unwrap(); - // If TcpListener, we must kill all pending accepts! - if let Repr::TcpListener(_, Some(t)) = r { - // Call notify on the tracked task, so that they would error out. - t.notify(); - } + let mut table = lock_resource_table(); + table.close(self.rid).unwrap(); } pub fn shutdown(&mut self, how: Shutdown) -> Result<(), ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let repr = table.get_mut(&self.rid).ok_or_else(bad_resource)?; + let mut table = lock_resource_table(); + let repr = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; match repr { - Repr::TcpStream(ref mut f) => { + CliResource::TcpStream(ref mut f) => { TcpStream::shutdown(f, how).map_err(ErrBox::from) } _ => Err(bad_resource()), @@ -277,18 +258,18 @@ pub trait DenoAsyncRead { impl DenoAsyncRead for Resource { fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let repr = table.get_mut(&self.rid).ok_or_else(bad_resource)?; + let mut table = lock_resource_table(); + let repr = table.get_mut(self.rid).ok_or_else(bad_resource)?; let r = match repr { - Repr::FsFile(ref mut f) => f.poll_read(buf), - Repr::Stdin(ref mut f) => f.poll_read(buf), - Repr::TcpStream(ref mut f) => f.poll_read(buf), - Repr::ClientTlsStream(ref mut f) => f.poll_read(buf), - Repr::ServerTlsStream(ref mut f) => f.poll_read(buf), - Repr::HttpBody(ref mut f) => f.poll_read(buf), - Repr::ChildStdout(ref mut f) => f.poll_read(buf), - Repr::ChildStderr(ref mut f) => f.poll_read(buf), + CliResource::FsFile(ref mut f) => f.poll_read(buf), + CliResource::Stdin(ref mut f) => f.poll_read(buf), + CliResource::TcpStream(ref mut f) => f.poll_read(buf), + CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf), + CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf), + CliResource::HttpBody(ref mut f) => f.poll_read(buf), + CliResource::ChildStdout(ref mut f) => f.poll_read(buf), + CliResource::ChildStderr(ref mut f) => f.poll_read(buf), _ => { return Err(bad_resource()); } @@ -318,17 +299,19 @@ pub trait DenoAsyncWrite { impl DenoAsyncWrite for Resource { fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let repr = table.get_mut(&self.rid).ok_or_else(bad_resource)?; + let mut table = lock_resource_table(); + let repr = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; let r = match repr { - Repr::FsFile(ref mut f) => f.poll_write(buf), - Repr::Stdout(ref mut f) => f.poll_write(buf), - Repr::Stderr(ref mut f) => f.poll_write(buf), - Repr::TcpStream(ref mut f) => f.poll_write(buf), - Repr::ClientTlsStream(ref mut f) => f.poll_write(buf), - Repr::ServerTlsStream(ref mut f) => f.poll_write(buf), - Repr::ChildStdin(ref mut f) => f.poll_write(buf), + CliResource::FsFile(ref mut f) => f.poll_write(buf), + CliResource::Stdout(ref mut f) => f.poll_write(buf), + CliResource::Stderr(ref mut f) => f.poll_write(buf), + CliResource::TcpStream(ref mut f) => f.poll_write(buf), + CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf), + CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf), + CliResource::ChildStdin(ref mut f) => f.poll_write(buf), _ => { return Err(bad_resource()); } @@ -342,24 +325,15 @@ impl DenoAsyncWrite for Resource { } } -fn new_rid() -> ResourceId { - let next_rid = NEXT_RID.fetch_add(1, Ordering::SeqCst); - next_rid as ResourceId -} - pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::FsFile(fs_file)); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::FsFile(fs_file))); Resource { rid } } pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::TcpListener(listener, None)); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::TcpListener(listener, None))); Resource { rid } } @@ -367,59 +341,40 @@ pub fn add_tls_listener( listener: tokio::net::TcpListener, acceptor: TlsAcceptor, ) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::TlsListener(listener, acceptor, None)); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = + table.add(Box::new(CliResource::TlsListener(listener, acceptor, None))); Resource { rid } } pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::TcpStream(stream)); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::TcpStream(stream))); Resource { rid } } pub fn add_tls_stream(stream: ClientTlsStream<TcpStream>) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::ClientTlsStream(Box::new(stream))); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::ClientTlsStream(Box::new(stream)))); Resource { rid } } pub fn add_server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::ServerTlsStream(Box::new(stream))); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::ServerTlsStream(Box::new(stream)))); Resource { rid } } pub fn add_reqwest_body(body: ReqwestDecoder) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); let body = HttpBody::from(body); - let r = tg.insert(rid, Repr::HttpBody(body)); - assert!(r.is_none()); - Resource { rid } -} - -pub fn add_repl(repl: Repl) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::Repl(Arc::new(Mutex::new(repl)))); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::HttpBody(body))); Resource { rid } } pub fn add_worker(wc: WorkerChannels) -> Resource { - let rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); - let r = tg.insert(rid, Repr::Worker(wc)); - assert!(r.is_none()); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(CliResource::Worker(wc))); Resource { rid } } @@ -427,16 +382,15 @@ pub fn add_worker(wc: WorkerChannels) -> Resource { pub fn post_message_to_worker( rid: ResourceId, buf: Buf, -) -> futures::sink::Send<mpsc::Sender<Buf>> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::Worker(ref mut wc)) => { - // unwrap here is incorrect, but doing it anyway - wc.0.clone().send(buf) +) -> Result<futures::sink::Send<mpsc::Sender<Buf>>, ErrBox> { + let mut table = lock_resource_table(); + let repr = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?; + match repr { + CliResource::Worker(ref mut wc) => { + let sender = wc.0.clone(); + Ok(sender.send(buf)) } - // TODO: replace this panic with `bad_resource` - _ => panic!("bad resource"), // futures::future::err(bad_resource()).into(), + _ => Err(bad_resource()), } } @@ -450,10 +404,12 @@ impl Future for WorkerReceiver { type Error = ErrBox; fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&self.rid); - match maybe_repr { - Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(ErrBox::from), + let mut table = lock_resource_table(); + let repr = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + match repr { + CliResource::Worker(ref mut wc) => wc.1.poll().map_err(ErrBox::from), _ => Err(bad_resource()), } } @@ -473,10 +429,12 @@ impl Stream for WorkerReceiverStream { type Error = ErrBox; fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&self.rid); - match maybe_repr { - Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(ErrBox::from), + let mut table = lock_resource_table(); + let repr = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + match repr { + CliResource::Worker(ref mut wc) => wc.1.poll().map_err(ErrBox::from), _ => Err(bad_resource()), } } @@ -487,47 +445,40 @@ pub fn get_message_stream_from_worker(rid: ResourceId) -> WorkerReceiverStream { } pub struct ChildResources { - pub child_rid: ResourceId, + pub child_rid: Option<ResourceId>, pub stdin_rid: Option<ResourceId>, pub stdout_rid: Option<ResourceId>, pub stderr_rid: Option<ResourceId>, } -pub fn add_child(mut c: tokio_process::Child) -> ChildResources { - let child_rid = new_rid(); - let mut tg = RESOURCE_TABLE.lock().unwrap(); +pub fn add_child(mut child: tokio_process::Child) -> ChildResources { + let mut table = lock_resource_table(); let mut resources = ChildResources { - child_rid, + child_rid: None, stdin_rid: None, stdout_rid: None, stderr_rid: None, }; - if c.stdin().is_some() { - let stdin = c.stdin().take().unwrap(); - let rid = new_rid(); - let r = tg.insert(rid, Repr::ChildStdin(stdin)); - assert!(r.is_none()); + if child.stdin().is_some() { + let stdin = child.stdin().take().unwrap(); + let rid = table.add(Box::new(CliResource::ChildStdin(stdin))); resources.stdin_rid = Some(rid); } - if c.stdout().is_some() { - let stdout = c.stdout().take().unwrap(); - let rid = new_rid(); - let r = tg.insert(rid, Repr::ChildStdout(stdout)); - assert!(r.is_none()); + if child.stdout().is_some() { + let stdout = child.stdout().take().unwrap(); + let rid = table.add(Box::new(CliResource::ChildStdout(stdout))); resources.stdout_rid = Some(rid); } - if c.stderr().is_some() { - let stderr = c.stderr().take().unwrap(); - let rid = new_rid(); - let r = tg.insert(rid, Repr::ChildStderr(stderr)); - assert!(r.is_none()); + if child.stderr().is_some() { + let stderr = child.stderr().take().unwrap(); + let rid = table.add(Box::new(CliResource::ChildStderr(stderr))); resources.stderr_rid = Some(rid); } - let r = tg.insert(child_rid, Repr::Child(Box::new(c))); - assert!(r.is_none()); + let rid = table.add(Box::new(CliResource::Child(Box::new(child)))); + resources.child_rid = Some(rid); resources } @@ -542,29 +493,23 @@ impl Future for ChildStatus { type Error = ErrBox; fn poll(&mut self) -> Poll<ExitStatus, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&self.rid); - match maybe_repr { - Some(Repr::Child(ref mut child)) => child.poll().map_err(ErrBox::from), + let mut table = lock_resource_table(); + let repr = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + match repr { + CliResource::Child(ref mut child) => child.poll().map_err(ErrBox::from), _ => Err(bad_resource()), } } } pub fn child_status(rid: ResourceId) -> Result<ChildStatus, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); + let mut table = lock_resource_table(); + let maybe_repr = + table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?; match maybe_repr { - Some(Repr::Child(ref mut _child)) => Ok(ChildStatus { rid }), - _ => Err(bad_resource()), - } -} - -pub fn get_repl(rid: ResourceId) -> Result<Arc<Mutex<Repl>>, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::Repl(ref mut r)) => Ok(r.clone()), + CliResource::Child(ref mut _child) => Ok(ChildStatus { rid }), _ => Err(bad_resource()), } } @@ -572,13 +517,16 @@ pub fn get_repl(rid: ResourceId) -> Result<Arc<Mutex<Repl>>, ErrBox> { // TODO: revamp this after the following lands: // https://github.com/tokio-rs/tokio/pull/785 pub fn get_file(rid: ResourceId) -> Result<std::fs::File, ErrBox> { - let mut table = RESOURCE_TABLE.lock().unwrap(); + let mut table = lock_resource_table(); // We take ownership of File here. // It is put back below while still holding the lock. - let maybe_repr = table.remove(&rid); + let repr = table.map.remove(&rid).ok_or_else(bad_resource)?; + let repr = repr + .downcast::<CliResource>() + .or_else(|_| Err(bad_resource()))?; - match maybe_repr { - Some(Repr::FsFile(r)) => { + match *repr { + CliResource::FsFile(r) => { // Trait Clone not implemented on tokio::fs::File, // so convert to std File first. let std_file = r.into_std(); @@ -590,7 +538,10 @@ pub fn get_file(rid: ResourceId) -> Result<std::fs::File, ErrBox> { // to write back. let maybe_std_file_copy = std_file.try_clone(); // Insert the entry back with the same rid. - table.insert(rid, Repr::FsFile(tokio_fs::File::from_std(std_file))); + table.map.insert( + rid, + Box::new(CliResource::FsFile(tokio_fs::File::from_std(std_file))), + ); maybe_std_file_copy.map_err(ErrBox::from) } @@ -600,11 +551,9 @@ pub fn get_file(rid: ResourceId) -> Result<std::fs::File, ErrBox> { pub fn lookup(rid: ResourceId) -> Result<Resource, ErrBox> { debug!("resource lookup {}", rid); - let table = RESOURCE_TABLE.lock().unwrap(); - table - .get(&rid) - .ok_or_else(bad_resource) - .map(|_| Resource { rid }) + let table = lock_resource_table(); + let _ = table.get::<CliResource>(rid).ok_or_else(bad_resource)?; + Ok(Resource { rid }) } pub fn seek( diff --git a/cli/worker.rs b/cli/worker.rs index 1091164c7..4bb68fb95 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -298,7 +298,9 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg).wait(); + let r = resources::post_message_to_worker(resource.rid, msg) + .expect("Bad resource") + .wait(); assert!(r.is_ok()); let maybe_msg = resources::get_message_from_worker(resource.rid) @@ -312,7 +314,9 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = resources::post_message_to_worker(resource.rid, msg).wait(); + let r = resources::post_message_to_worker(resource.rid, msg) + .expect("Bad resource") + .wait(); assert!(r.is_ok()); }) } @@ -340,15 +344,14 @@ mod tests { let worker_future_ = worker_future.clone(); tokio::spawn(lazy(move || worker_future_.then(|_| Ok(())))); - assert_eq!(resources::get_type(rid), Some("worker".to_string())); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = resources::post_message_to_worker(rid, msg).wait(); + let r = resources::post_message_to_worker(rid, msg) + .expect("Bad resource") + .wait(); assert!(r.is_ok()); debug!("rid {:?}", rid); worker_future.wait().unwrap(); - assert_eq!(resources::get_type(rid), None); }) } diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index a6cc6d548..764e04303 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -15,6 +15,8 @@ extern crate lazy_static; use deno::*; use futures::future::lazy; use std::env; +use std::io::Error; +use std::io::ErrorKind; use std::net::SocketAddr; use std::sync::Mutex; use std::sync::MutexGuard; @@ -182,13 +184,25 @@ fn main() { } } +pub fn bad_resource() -> Error { + Error::new(ErrorKind::NotFound, "bad resource id") +} + struct TcpListener(tokio::net::TcpListener); -impl Resource for TcpListener {} +impl Resource for TcpListener { + fn inspect_repr(&self) -> &str { + "tcpListener" + } +} struct TcpStream(tokio::net::TcpStream); -impl Resource for TcpStream {} +impl Resource for TcpStream { + fn inspect_repr(&self) -> &str { + "tcpStream" + } +} lazy_static! { static ref RESOURCE_TABLE: Mutex<ResourceTable> = @@ -204,7 +218,8 @@ fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { debug!("accept {}", rid); let fut = futures::future::poll_fn(move || { let mut table = lock_resource_table(); - let listener = table.get_mut::<TcpListener>(rid)?; + let listener = + table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?; listener.0.poll_accept() }) .and_then(move |(stream, addr)| { @@ -233,8 +248,8 @@ fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { let rid = record.arg as u32; let mut table = lock_resource_table(); let fut = match table.close(rid) { - Ok(_) => futures::future::ok(0), - Err(e) => futures::future::err(e), + Some(_) => futures::future::ok(0), + None => futures::future::err(bad_resource()), }; Box::new(fut) } @@ -245,7 +260,7 @@ fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { let mut zero_copy_buf = zero_copy_buf.unwrap(); let fut = futures::future::poll_fn(move || { let mut table = lock_resource_table(); - let stream = table.get_mut::<TcpStream>(rid)?; + let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; stream.0.poll_read(&mut zero_copy_buf) }) .and_then(move |nread| { @@ -261,7 +276,7 @@ fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { let zero_copy_buf = zero_copy_buf.unwrap(); let fut = futures::future::poll_fn(move || { let mut table = lock_resource_table(); - let stream = table.get_mut::<TcpStream>(rid)?; + let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; stream.0.poll_write(&zero_copy_buf) }) .and_then(move |nwritten| { diff --git a/core/resources.rs b/core/resources.rs index c3d8b7107..368f83454 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -10,8 +10,6 @@ use downcast_rs::Downcast; use std; use std::any::Any; use std::collections::HashMap; -use std::io::Error; -use std::io::ErrorKind; /// ResourceId is Deno's version of a file descriptor. ResourceId is also referred /// to as rid in the code base. @@ -23,24 +21,27 @@ type ResourceMap = HashMap<ResourceId, Box<dyn Resource>>; #[derive(Default)] pub struct ResourceTable { - map: ResourceMap, + // TODO(bartlomieju): remove pub modifier, it is used by + // `get_file` method in CLI + pub map: ResourceMap, next_id: u32, } impl ResourceTable { - pub fn get<T: Resource>(&self, rid: ResourceId) -> Result<&T, Error> { - let resource = self.map.get(&rid).ok_or_else(bad_resource)?; - let resource = &resource.downcast_ref::<T>().ok_or_else(bad_resource)?; - Ok(resource) + pub fn get<T: Resource>(&self, rid: ResourceId) -> Option<&T> { + if let Some(resource) = self.map.get(&rid) { + return resource.downcast_ref::<T>(); + } + + None } - pub fn get_mut<T: Resource>( - &mut self, - rid: ResourceId, - ) -> Result<&mut T, Error> { - let resource = self.map.get_mut(&rid).ok_or_else(bad_resource)?; - let resource = resource.downcast_mut::<T>().ok_or_else(bad_resource)?; - Ok(resource) + pub fn get_mut<T: Resource>(&mut self, rid: ResourceId) -> Option<&mut T> { + if let Some(resource) = self.map.get_mut(&rid) { + return resource.downcast_mut::<T>(); + } + + None } // TODO: resource id allocation should probably be randomized for security. @@ -57,13 +58,23 @@ impl ResourceTable { rid } + pub fn entries(&self) -> Vec<(ResourceId, String)> { + self + .map + .iter() + .map(|(key, value)| (*key, value.inspect_repr().to_string())) + .collect() + } + // close(2) is done by dropping the value. Therefore we just need to remove // the resource from the RESOURCE_TABLE. - pub fn close(&mut self, rid: ResourceId) -> Result<(), Error> { - let repr = self.map.remove(&rid).ok_or_else(bad_resource)?; - // Give resource a chance to cleanup (notify tasks, etc.) - repr.close(); - Ok(()) + pub fn close(&mut self, rid: ResourceId) -> Option<()> { + if let Some(resource) = self.map.remove(&rid) { + resource.close(); + return Some(()); + } + + None } } @@ -72,13 +83,6 @@ pub trait Resource: Downcast + Any + Send { /// Method that allows to cleanup resource. fn close(&self) {} - fn inspect_repr(&self) -> &str { - unimplemented!(); - } + fn inspect_repr(&self) -> &str; } impl_downcast!(Resource); - -// TODO: probably bad error kind -pub fn bad_resource() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") -} |