summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/lib.rs11
-rw-r--r--cli/ops/dispatch_minimal.rs8
-rw-r--r--cli/ops/fetch.rs9
-rw-r--r--cli/ops/files.rs28
-rw-r--r--cli/ops/io.rs175
-rw-r--r--cli/ops/mod.rs1
-rw-r--r--cli/ops/net.rs60
-rw-r--r--cli/ops/process.rs98
-rw-r--r--cli/ops/repl.rs10
-rw-r--r--cli/ops/resources.rs5
-rw-r--r--cli/ops/tls.rs54
-rw-r--r--cli/resources.rs209
-rw-r--r--cli/state.rs24
-rw-r--r--core/resources.rs2
14 files changed, 336 insertions, 358 deletions
diff --git a/cli/lib.rs b/cli/lib.rs
index 3d772bb83..17ca94b55 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -43,6 +43,7 @@ pub mod permissions;
mod progress;
mod repl;
pub mod resolve_addr;
+pub mod resources;
mod shell;
mod signal;
pub mod source_maps;
@@ -56,7 +57,6 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
use crate::global_state::ThreadSafeGlobalState;
-use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
@@ -128,15 +128,6 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();
- let state_ = state.clone();
- {
- let mut resource_table = state_.lock_resource_table();
- let (stdin, stdout, stderr) = get_stdio();
- resource_table.add("stdin", Box::new(stdin));
- resource_table.add("stdout", Box::new(stdout));
- resource_table.add("stderr", Box::new(stderr));
- }
-
let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs
index 355a24634..c19521bf1 100644
--- a/cli/ops/dispatch_minimal.rs
+++ b/cli/ops/dispatch_minimal.rs
@@ -15,6 +15,7 @@ use deno::PinnedBuf;
use futures::Future;
pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
+pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
@@ -111,10 +112,9 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}
-pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
-where
- D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
-{
+pub fn minimal_op(
+ d: Dispatcher,
+) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index a1c0fe29c..143331171 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -1,9 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
-use crate::http_body::HttpBody;
use crate::http_util::get_client;
use crate::ops::json_op;
+use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
use http::header::HeaderName;
@@ -55,7 +54,6 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
- let state_ = state.clone();
let future = request.send().map_err(ErrBox::from).and_then(move |res| {
let status = res.status();
let mut res_headers = Vec::new();
@@ -63,9 +61,8 @@ pub fn op_fetch(
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
- let body = HttpBody::from(res.into_body());
- let mut table = state_.lock_resource_table();
- let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));
+ let body = res.into_body();
+ let rid = resources::add_reqwest_body(body);
let json_res = json!({
"bodyRid": rid,
diff --git a/cli/ops/files.rs b/cli/ops/files.rs
index fc1b8e7d8..04b5f98bf 100644
--- a/cli/ops/files.rs
+++ b/cli/ops/files.rs
@@ -1,11 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::fs as deno_fs;
use crate::ops::json_op;
+use crate::resources;
+use crate::resources::CliResource;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
@@ -37,7 +38,7 @@ fn op_open(
let args: OpenArgs = serde_json::from_value(args)?;
let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?;
let mode = args.mode.as_ref();
- let state_ = state.clone();
+
let mut open_options = tokio::fs::OpenOptions::new();
match mode {
@@ -90,8 +91,7 @@ fn op_open(
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
- let mut table = state_.lock_resource_table();
- let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
+ let rid = resources::add_fs_file(fs_file);
futures::future::ok(json!(rid))
},
);
@@ -110,21 +110,21 @@ struct CloseArgs {
}
fn op_close(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;
- let mut table = state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}
+#[derive(Debug)]
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
- state: ThreadSafeState,
}
impl Future for SeekFuture {
@@ -132,13 +132,13 @@ impl Future for SeekFuture {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let mut table = self.state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
- StreamResource::FsFile(ref mut file) => file,
+ CliResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};
@@ -156,7 +156,7 @@ struct SeekArgs {
}
fn op_seek(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -177,11 +177,7 @@ fn op_seek(
}
};
- let fut = SeekFuture {
- state: state.clone(),
- seek_from,
- rid,
- };
+ let fut = SeekFuture { seek_from, rid };
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 959147f19..3ede4b411 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -1,101 +1,19 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
use crate::deno_error::bad_resource;
-use crate::http_body::HttpBody;
use crate::ops::minimal_op;
+use crate::resources;
+use crate::resources::CliResource;
+use crate::resources::DenoAsyncRead;
+use crate::resources::DenoAsyncWrite;
use crate::state::ThreadSafeState;
-use deno::ErrBox;
-use deno::Resource;
use deno::*;
-use futures;
use futures::Future;
use futures::Poll;
-use std;
-use tokio;
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio::net::TcpStream;
-use tokio_process;
-use tokio_rustls::client::TlsStream as ClientTlsStream;
-use tokio_rustls::server::TlsStream as ServerTlsStream;
-
-#[cfg(not(windows))]
-use std::os::unix::io::FromRawFd;
-
-#[cfg(windows)]
-use std::os::windows::io::FromRawHandle;
-
-#[cfg(windows)]
-extern crate winapi;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
- i.register_op(
- "read",
- s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
- );
- i.register_op(
- "write",
- s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
- );
-}
-
-pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
- let stdin = StreamResource::Stdin(tokio::io::stdin());
- let stdout = StreamResource::Stdout({
- #[cfg(not(windows))]
- let stdout = unsafe { std::fs::File::from_raw_fd(1) };
- #[cfg(windows)]
- let stdout = unsafe {
- std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
- winapi::um::winbase::STD_OUTPUT_HANDLE,
- ))
- };
- tokio::fs::File::from_std(stdout)
- });
- let stderr = StreamResource::Stderr(tokio::io::stderr());
-
- (stdin, stdout, stderr)
-}
-
-pub enum StreamResource {
- Stdin(tokio::io::Stdin),
- Stdout(tokio::fs::File),
- Stderr(tokio::io::Stderr),
- FsFile(tokio::fs::File),
- TcpStream(tokio::net::TcpStream),
- ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
- ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
- HttpBody(HttpBody),
- ChildStdin(tokio_process::ChildStdin),
- ChildStdout(tokio_process::ChildStdout),
- ChildStderr(tokio_process::ChildStderr),
-}
-
-impl Resource for StreamResource {}
-
-/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
-/// but uses an `ErrBox` error instead of `std::io:Error`
-pub trait DenoAsyncRead {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
-}
-
-impl DenoAsyncRead for StreamResource {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- StreamResource::FsFile(ref mut f) => f.poll_read(buf),
- StreamResource::Stdin(ref mut f) => f.poll_read(buf),
- StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
- StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
- StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
- StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
- StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
- StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
- _ => {
- return Err(bad_resource());
- }
- };
-
- r.map_err(ErrBox::from)
- }
+ i.register_op("read", s.core_op(minimal_op(op_read)));
+ i.register_op("write", s.core_op(minimal_op(op_write)));
}
#[derive(Debug, PartialEq)]
@@ -109,15 +27,14 @@ enum IoState {
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
-pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T>
+pub fn read<T>(rid: ResourceId, buf: T) -> Read<T>
where
T: AsMut<[u8]>,
{
Read {
rid,
buf,
- io_state: IoState::Pending,
- state: state.clone(),
+ state: IoState::Pending,
}
}
@@ -125,11 +42,11 @@ where
/// a buffer.
///
/// Created by the [`read`] function.
+#[derive(Debug)]
pub struct Read<T> {
rid: ResourceId,
buf: T,
- io_state: IoState,
- state: ThreadSafeState,
+ state: IoState,
}
impl<T> Future for Read<T>
@@ -140,25 +57,21 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.io_state == IoState::Done {
+ if self.state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
- self.io_state = IoState::Done;
+ self.state = IoState::Done;
Ok(nread.into())
}
}
-pub fn op_read(
- state: &ThreadSafeState,
- rid: i32,
- zero_copy: Option<PinnedBuf>,
-) -> Box<MinimalOp> {
+pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
@@ -167,50 +80,19 @@ pub fn op_read(
Some(buf) => buf,
};
- let fut = read(state, rid as u32, zero_copy)
+ let fut = read(rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nread| Ok(nread as i32));
Box::new(fut)
}
-/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
-/// but uses an `ErrBox` error instead of `std::io:Error`
-pub trait DenoAsyncWrite {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
-
- fn shutdown(&mut self) -> Poll<(), ErrBox>;
-}
-
-impl DenoAsyncWrite for StreamResource {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- StreamResource::FsFile(ref mut f) => f.poll_write(buf),
- StreamResource::Stdout(ref mut f) => f.poll_write(buf),
- StreamResource::Stderr(ref mut f) => f.poll_write(buf),
- StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
- StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
- StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
- StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
- _ => {
- return Err(bad_resource());
- }
- };
-
- r.map_err(ErrBox::from)
- }
-
- fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
- unimplemented!()
- }
-}
-
/// A future used to write some data to a stream.
+#[derive(Debug)]
pub struct Write<T> {
rid: ResourceId,
buf: T,
- io_state: IoState,
- state: ThreadSafeState,
+ state: IoState,
}
/// Creates a future that will write some of the buffer `buf` to
@@ -218,15 +100,14 @@ pub struct Write<T> {
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
-pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T>
+pub fn write<T>(rid: ResourceId, buf: T) -> Write<T>
where
T: AsRef<[u8]>,
{
Write {
rid,
buf,
- io_state: IoState::Pending,
- state: state.clone(),
+ state: IoState::Pending,
}
}
@@ -240,25 +121,21 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.io_state == IoState::Done {
+ if self.state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
- self.io_state = IoState::Done;
+ self.state = IoState::Done;
Ok(nwritten.into())
}
}
-pub fn op_write(
- state: &ThreadSafeState,
- rid: i32,
- zero_copy: Option<PinnedBuf>,
-) -> Box<MinimalOp> {
+pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
@@ -267,7 +144,7 @@ pub fn op_write(
Some(buf) => buf,
};
- let fut = write(state, rid as u32, zero_copy)
+ let fut = write(rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nwritten| Ok(nwritten as i32));
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index 9b33d5918..4e6eb37c8 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -5,7 +5,6 @@ mod dispatch_minimal;
pub use dispatch_json::json_op;
pub use dispatch_json::JsonOp;
pub use dispatch_minimal::minimal_op;
-pub use dispatch_minimal::MinimalOp;
pub mod compiler;
pub mod errors;
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 2fe81e140..a4b3bf934 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,11 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
+use crate::resources;
+use crate::resources::CliResource;
+use crate::resources::Resource;
use crate::state::ThreadSafeState;
-use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@@ -33,19 +34,18 @@ enum AcceptState {
}
/// Simply accepts a connection.
-pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
+pub fn accept(rid: ResourceId) -> Accept {
Accept {
- accept_state: AcceptState::Eager,
+ state: AcceptState::Eager,
rid,
- state: state.clone(),
}
}
/// A future representing state of accepting a TCP connection.
+#[derive(Debug)]
pub struct Accept {
- accept_state: AcceptState,
+ state: AcceptState,
rid: ResourceId,
- state: ThreadSafeState,
}
impl Future for Accept {
@@ -53,11 +53,11 @@ impl Future for Accept {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.accept_state == AcceptState::Done {
+ if self.state == AcceptState::Done {
panic!("poll Accept after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let listener_resource = table
.get_mut::<TcpListenerResource>(self.rid)
.ok_or_else(|| {
@@ -70,22 +70,22 @@ impl Future for Accept {
let listener = &mut listener_resource.listener;
- if self.accept_state == AcceptState::Eager {
+ if self.state == AcceptState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
- self.accept_state = AcceptState::Done;
+ self.state = AcceptState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
- self.accept_state = AcceptState::Pending;
+ self.state = AcceptState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
- self.accept_state = AcceptState::Done;
+ self.state = AcceptState::Done;
return Err(e);
}
}
@@ -94,7 +94,7 @@ impl Future for Accept {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
- self.accept_state = AcceptState::Done;
+ self.state = AcceptState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@@ -103,7 +103,7 @@ impl Future for Accept {
}
Err(e) => {
listener_resource.untrack_task();
- self.accept_state = AcceptState::Done;
+ self.state = AcceptState::Done;
Err(e)
}
}
@@ -116,25 +116,23 @@ struct AcceptArgs {
}
fn op_accept(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let state_ = state.clone();
- let table = state.lock_resource_table();
+
+ let table = resources::lock_resource_table();
table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
- let op = accept(state, rid)
+ let op = accept(rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let mut table = state_.lock_resource_table();
- let rid =
- table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ let rid = resources::add_tcp_stream(tcp_stream);
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@@ -163,7 +161,7 @@ fn op_dial(
) -> Result<JsonOp, ErrBox> {
let args: DialArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp"); // TODO Support others.
- let state_ = state.clone();
+
state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
@@ -172,9 +170,7 @@ fn op_dial(
.and_then(move |tcp_stream| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let mut table = state_.lock_resource_table();
- let rid = table
- .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ let rid = resources::add_tcp_stream(tcp_stream);
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@@ -197,7 +193,7 @@ struct ShutdownArgs {
}
fn op_shutdown(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -212,12 +208,10 @@ fn op_shutdown(
_ => unimplemented!(),
};
- let mut table = state.lock_resource_table();
- let resource = table
- .get_mut::<StreamResource>(rid)
- .ok_or_else(bad_resource)?;
+ let mut table = resources::lock_resource_table();
+ let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?;
match resource {
- StreamResource::TcpStream(ref mut stream) => {
+ CliResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?;
}
_ => return Err(bad_resource()),
@@ -305,7 +299,7 @@ fn op_listen(
task: None,
local_addr,
};
- let mut table = state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let rid = table.add("tcpListener", Box::new(listener_resource));
Ok(JsonOp::Sync(json!({
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index 237b02fd0..f7897ec51 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -1,8 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
+use crate::resources;
+use crate::resources::CloneFileFuture;
use crate::signal::kill;
use crate::state::ThreadSafeState;
use deno::*;
@@ -27,41 +28,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
}
-struct CloneFileFuture {
- rid: ResourceId,
- state: ThreadSafeState,
-}
-
-impl Future for CloneFileFuture {
- type Item = tokio::fs::File;
- type Error = ErrBox;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let mut table = self.state.lock_resource_table();
- let repr = table
- .get_mut::<StreamResource>(self.rid)
- .ok_or_else(bad_resource)?;
- match repr {
- StreamResource::FsFile(ref mut file) => {
- file.poll_try_clone().map_err(ErrBox::from)
- }
- _ => Err(bad_resource()),
- }
- }
-}
-
-fn clone_file(
- rid: u32,
- state: &ThreadSafeState,
-) -> Result<std::fs::File, ErrBox> {
- (CloneFileFuture {
- rid,
- state: state.clone(),
- })
- .wait()
- .map(|f| f.into_std())
-}
-
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
match s {
"inherit" => std::process::Stdio::inherit(),
@@ -99,7 +65,6 @@ fn op_run(
let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
- let state_ = state.clone();
let args = run_args.args;
let env = run_args.env;
@@ -118,7 +83,7 @@ fn op_run(
// TODO: make this work with other resources, eg. sockets
let stdin_rid = run_args.stdin_rid;
if stdin_rid > 0 {
- let file = clone_file(stdin_rid, &state_)?;
+ let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std();
c.stdin(file);
} else {
c.stdin(subprocess_stdio_map(run_args.stdin.as_ref()));
@@ -126,7 +91,7 @@ fn op_run(
let stdout_rid = run_args.stdout_rid;
if stdout_rid > 0 {
- let file = clone_file(stdout_rid, &state_)?;
+ let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std();
c.stdout(file);
} else {
c.stdout(subprocess_stdio_map(run_args.stdout.as_ref()));
@@ -134,7 +99,7 @@ fn op_run(
let stderr_rid = run_args.stderr_rid;
if stderr_rid > 0 {
- let file = clone_file(stderr_rid, &state_)?;
+ let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std();
c.stderr(file);
} else {
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
@@ -144,42 +109,29 @@ fn op_run(
let mut child = c.spawn_async().map_err(ErrBox::from)?;
let pid = child.id();
- let mut table = state_.lock_resource_table();
-
- let stdin_rid = match child.stdin().take() {
- Some(child_stdin) => {
- let rid = table.add(
- "childStdin",
- Box::new(StreamResource::ChildStdin(child_stdin)),
- );
- Some(rid)
- }
- None => None,
+ let stdin_rid = if child.stdin().is_some() {
+ let rid = resources::add_child_stdin(child.stdin().take().unwrap());
+ Some(rid)
+ } else {
+ None
};
- let stdout_rid = match child.stdout().take() {
- Some(child_stdout) => {
- let rid = table.add(
- "childStdout",
- Box::new(StreamResource::ChildStdout(child_stdout)),
- );
- Some(rid)
- }
- None => None,
+ let stdout_rid = if child.stdout().is_some() {
+ let rid = resources::add_child_stdout(child.stdout().take().unwrap());
+ Some(rid)
+ } else {
+ None
};
- let stderr_rid = match child.stderr().take() {
- Some(child_stderr) => {
- let rid = table.add(
- "childStderr",
- Box::new(StreamResource::ChildStderr(child_stderr)),
- );
- Some(rid)
- }
- None => None,
+ let stderr_rid = if child.stderr().is_some() {
+ let rid = resources::add_child_stderr(child.stderr().take().unwrap());
+ Some(rid)
+ } else {
+ None
};
let child_resource = ChildResource { child };
+ let mut table = resources::lock_resource_table();
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
@@ -193,7 +145,6 @@ fn op_run(
pub struct ChildStatus {
rid: ResourceId,
- state: ThreadSafeState,
}
impl Future for ChildStatus {
@@ -201,7 +152,7 @@ impl Future for ChildStatus {
type Error = ErrBox;
fn poll(&mut self) -> Poll<ExitStatus, ErrBox> {
- let mut table = self.state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let child_resource = table
.get_mut::<ChildResource>(self.rid)
.ok_or_else(bad_resource)?;
@@ -226,10 +177,7 @@ fn op_run_status(
state.check_run()?;
- let future = ChildStatus {
- rid,
- state: state.clone(),
- };
+ let future = ChildStatus { rid };
let future = future.and_then(move |run_status| {
let code = run_status.code();
diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs
index 5919ea586..723fb2571 100644
--- a/cli/ops/repl.rs
+++ b/cli/ops/repl.rs
@@ -4,8 +4,9 @@ use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::repl;
use crate::repl::Repl;
+use crate::resources;
+use crate::resources::Resource;
use crate::state::ThreadSafeState;
-use deno::Resource;
use deno::*;
use std::sync::Arc;
use std::sync::Mutex;
@@ -43,7 +44,7 @@ fn op_repl_start(
repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
- let mut table = state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let rid = table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
@@ -55,7 +56,7 @@ struct ReplReadlineArgs {
}
fn op_repl_readline(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -63,10 +64,9 @@ fn op_repl_readline(
let rid = args.rid as u32;
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
- let state = state.clone();
blocking_json(false, move || {
- let table = state.lock_resource_table();
+ let table = resources::lock_resource_table();
let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?;
let repl = resource.0.clone();
let line = repl.lock().unwrap().readline(&prompt)?;
diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs
index c35e9762c..d92c6a83c 100644
--- a/cli/ops/resources.rs
+++ b/cli/ops/resources.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op;
+use crate::resources::lock_resource_table;
use crate::state::ThreadSafeState;
use deno::*;
@@ -9,11 +10,11 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
}
fn op_resources(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
_args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
- let resource_table = state.lock_resource_table();
+ let resource_table = lock_resource_table();
let serialized_resources = resource_table.entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
}
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs
index 48419f76f..6e8348c91 100644
--- a/cli/ops/tls.rs
+++ b/cli/ops/tls.rs
@@ -1,13 +1,13 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
-use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
+use crate::resources;
+use crate::resources::Resource;
use crate::state::ThreadSafeState;
-use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@@ -60,7 +60,7 @@ pub fn op_dial_tls(
) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file;
- let state_ = state.clone();
+
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(&path)?;
@@ -99,11 +99,7 @@ pub fn op_dial_tls(
.connect(dnsname, tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
- let mut table = state_.lock_resource_table();
- let rid = table.add(
- "clientTlsStream",
- Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
- );
+ let rid = resources::add_tls_stream(tls_stream);
futures::future::ok(json!({
"rid": rid,
"localAddr": local_addr.to_string(),
@@ -269,7 +265,7 @@ fn op_listen_tls(
task: None,
local_addr,
};
- let mut table = state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let rid = table.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({
@@ -286,19 +282,18 @@ enum AcceptTlsState {
}
/// Simply accepts a TLS connection.
-pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls {
+pub fn accept_tls(rid: ResourceId) -> AcceptTls {
AcceptTls {
- accept_state: AcceptTlsState::Eager,
+ state: AcceptTlsState::Eager,
rid,
- state: state.clone(),
}
}
/// A future representing state of accepting a TLS connection.
+#[derive(Debug)]
pub struct AcceptTls {
- accept_state: AcceptTlsState,
+ state: AcceptTlsState,
rid: ResourceId,
- state: ThreadSafeState,
}
impl Future for AcceptTls {
@@ -306,11 +301,11 @@ impl Future for AcceptTls {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.accept_state == AcceptTlsState::Done {
+ if self.state == AcceptTlsState::Done {
panic!("poll AcceptTls after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = resources::lock_resource_table();
let listener_resource = table
.get_mut::<TlsListenerResource>(self.rid)
.ok_or_else(|| {
@@ -323,22 +318,22 @@ impl Future for AcceptTls {
let listener = &mut listener_resource.listener;
- if self.accept_state == AcceptTlsState::Eager {
+ if self.state == AcceptTlsState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
- self.accept_state = AcceptTlsState::Done;
+ self.state = AcceptTlsState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
- self.accept_state = AcceptTlsState::Pending;
+ self.state = AcceptTlsState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
- self.accept_state = AcceptTlsState::Done;
+ self.state = AcceptTlsState::Done;
return Err(e);
}
}
@@ -347,7 +342,7 @@ impl Future for AcceptTls {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
- self.accept_state = AcceptTlsState::Done;
+ self.state = AcceptTlsState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@@ -356,7 +351,7 @@ impl Future for AcceptTls {
}
Err(e) => {
listener_resource.untrack_task();
- self.accept_state = AcceptTlsState::Done;
+ self.state = AcceptTlsState::Done;
Err(e)
}
}
@@ -369,22 +364,21 @@ struct AcceptTlsArgs {
}
fn op_accept_tls(
- state: &ThreadSafeState,
+ _state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let state1 = state.clone();
- let state2 = state.clone();
- let op = accept_tls(state, rid)
+
+ let op = accept_tls(rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
Ok((tcp_stream, local_addr, remote_addr))
})
.and_then(move |(tcp_stream, local_addr, remote_addr)| {
- let table = state1.lock_resource_table();
+ let table = resources::lock_resource_table();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
@@ -395,11 +389,7 @@ fn op_accept_tls(
.accept(tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
- let mut table = state2.lock_resource_table();
- let rid = table.add(
- "serverTlsStream",
- Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
- );
+ let rid = resources::add_server_tls_stream(tls_stream);
Ok((rid, local_addr, remote_addr))
})
})
diff --git a/cli/resources.rs b/cli/resources.rs
new file mode 100644
index 000000000..db9b43eeb
--- /dev/null
+++ b/cli/resources.rs
@@ -0,0 +1,209 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+
+// Think of Resources as File Descriptors. They are integers that are allocated
+// by the privileged side of Deno to refer to various resources. The simplest
+// example are standard file system files and stdio - but there will be other
+// resources added in the future that might not correspond to operating system
+// level File Descriptors. To avoid confusion we call them "resources" not "file
+// descriptors". This module implements a global resource table. Ops (AKA
+// handlers) look up resources by their integer id here.
+
+use crate::deno_error::bad_resource;
+use crate::http_body::HttpBody;
+use deno::ErrBox;
+pub use deno::Resource;
+pub use deno::ResourceId;
+use deno::ResourceTable;
+
+use futures;
+use futures::Future;
+use futures::Poll;
+use reqwest::r#async::Decoder as ReqwestDecoder;
+use std;
+use std::sync::Mutex;
+use std::sync::MutexGuard;
+use tokio;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpStream;
+use tokio_process;
+use tokio_rustls::client::TlsStream as ClientTlsStream;
+use tokio_rustls::server::TlsStream as ServerTlsStream;
+
+#[cfg(not(windows))]
+use std::os::unix::io::FromRawFd;
+
+#[cfg(windows)]
+use std::os::windows::io::FromRawHandle;
+
+#[cfg(windows)]
+extern crate winapi;
+
+lazy_static! {
+ static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new({
+ let mut table = ResourceTable::default();
+
+ // TODO Load these lazily during lookup?
+ table.add("stdin", Box::new(CliResource::Stdin(tokio::io::stdin())));
+
+ table.add("stdout", Box::new(CliResource::Stdout({
+ #[cfg(not(windows))]
+ let stdout = unsafe { std::fs::File::from_raw_fd(1) };
+ #[cfg(windows)]
+ let stdout = unsafe {
+ std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_OUTPUT_HANDLE))
+ };
+ tokio::fs::File::from_std(stdout)
+ })));
+
+ table.add("stderr", Box::new(CliResource::Stderr(tokio::io::stderr())));
+ table
+ });
+}
+
+// TODO: rename to `StreamResource`
+pub enum CliResource {
+ Stdin(tokio::io::Stdin),
+ Stdout(tokio::fs::File),
+ Stderr(tokio::io::Stderr),
+ FsFile(tokio::fs::File),
+ TcpStream(tokio::net::TcpStream),
+ ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
+ ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
+ HttpBody(HttpBody),
+ ChildStdin(tokio_process::ChildStdin),
+ ChildStdout(tokio_process::ChildStdout),
+ ChildStderr(tokio_process::ChildStderr),
+}
+
+impl Resource for CliResource {}
+
+pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
+ RESOURCE_TABLE.lock().unwrap()
+}
+
+/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
+/// but uses an `ErrBox` error instead of `std::io:Error`
+pub trait DenoAsyncRead {
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
+}
+
+impl DenoAsyncRead for CliResource {
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
+ let r = match self {
+ CliResource::FsFile(ref mut f) => f.poll_read(buf),
+ CliResource::Stdin(ref mut f) => f.poll_read(buf),
+ CliResource::TcpStream(ref mut f) => f.poll_read(buf),
+ CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
+ CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
+ CliResource::HttpBody(ref mut f) => f.poll_read(buf),
+ CliResource::ChildStdout(ref mut f) => f.poll_read(buf),
+ CliResource::ChildStderr(ref mut f) => f.poll_read(buf),
+ _ => {
+ return Err(bad_resource());
+ }
+ };
+
+ r.map_err(ErrBox::from)
+ }
+}
+
+/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
+/// but uses an `ErrBox` error instead of `std::io:Error`
+pub trait DenoAsyncWrite {
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
+
+ fn shutdown(&mut self) -> Poll<(), ErrBox>;
+}
+
+impl DenoAsyncWrite for CliResource {
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
+ let r = match self {
+ CliResource::FsFile(ref mut f) => f.poll_write(buf),
+ CliResource::Stdout(ref mut f) => f.poll_write(buf),
+ CliResource::Stderr(ref mut f) => f.poll_write(buf),
+ CliResource::TcpStream(ref mut f) => f.poll_write(buf),
+ CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
+ CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
+ CliResource::ChildStdin(ref mut f) => f.poll_write(buf),
+ _ => {
+ return Err(bad_resource());
+ }
+ };
+
+ r.map_err(ErrBox::from)
+ }
+
+ fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
+ unimplemented!()
+ }
+}
+
+pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add("fsFile", Box::new(CliResource::FsFile(fs_file)))
+}
+
+pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add("tcpStream", Box::new(CliResource::TcpStream(stream)))
+}
+
+pub fn add_tls_stream(stream: ClientTlsStream<TcpStream>) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add(
+ "clientTlsStream",
+ Box::new(CliResource::ClientTlsStream(Box::new(stream))),
+ )
+}
+
+pub fn add_server_tls_stream(stream: ServerTlsStream<TcpStream>) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add(
+ "serverTlsStream",
+ Box::new(CliResource::ServerTlsStream(Box::new(stream))),
+ )
+}
+
+pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId {
+ let body = HttpBody::from(body);
+ let mut table = lock_resource_table();
+ table.add("httpBody", Box::new(CliResource::HttpBody(body)))
+}
+
+pub fn add_child_stdin(stdin: tokio_process::ChildStdin) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add("childStdin", Box::new(CliResource::ChildStdin(stdin)))
+}
+
+pub fn add_child_stdout(stdout: tokio_process::ChildStdout) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add("childStdout", Box::new(CliResource::ChildStdout(stdout)))
+}
+
+pub fn add_child_stderr(stderr: tokio_process::ChildStderr) -> ResourceId {
+ let mut table = lock_resource_table();
+ table.add("childStderr", Box::new(CliResource::ChildStderr(stderr)))
+}
+
+pub struct CloneFileFuture {
+ pub rid: ResourceId,
+}
+
+impl Future for CloneFileFuture {
+ type Item = tokio::fs::File;
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let mut table = lock_resource_table();
+ let repr = table
+ .get_mut::<CliResource>(self.rid)
+ .ok_or_else(bad_resource)?;
+ match repr {
+ CliResource::FsFile(ref mut file) => {
+ file.poll_try_clone().map_err(ErrBox::from)
+ }
+ _ => Err(bad_resource()),
+ }
+ }
+}
diff --git a/cli/state.rs b/cli/state.rs
index a5e9546b0..edfac72c0 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -5,7 +5,6 @@ use crate::global_timer::GlobalTimer;
use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
-use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
@@ -16,7 +15,6 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
-use deno::ResourceTable;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@@ -29,7 +27,6 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
-use std::sync::MutexGuard;
use std::time::Instant;
use tokio::sync::mpsc;
@@ -55,7 +52,6 @@ pub struct State {
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
- pub resource_table: Mutex<ResourceTable>,
}
impl Clone for ThreadSafeState {
@@ -72,10 +68,6 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
- pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> {
- self.resource_table.lock().unwrap()
- }
-
/// Wrap core `OpDispatcher` to collect metrics.
pub fn core_op<D>(
&self,
@@ -112,21 +104,6 @@ impl ThreadSafeState {
}
/// This is a special function that provides `state` argument to dispatcher.
- pub fn stateful_minimal_op<D>(
- &self,
- dispatcher: D,
- ) -> impl Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>
- where
- D: Fn(&ThreadSafeState, i32, Option<PinnedBuf>) -> Box<MinimalOp>,
- {
- let state = self.clone();
-
- move |rid: i32, zero_copy: Option<PinnedBuf>| -> Box<MinimalOp> {
- dispatcher(&state, rid, zero_copy)
- }
- }
-
- /// This is a special function that provides `state` argument to dispatcher.
///
/// NOTE: This only works with JSON dispatcher.
/// This is a band-aid for transition to `Isolate.register_op` API as most of our
@@ -243,7 +220,6 @@ impl ThreadSafeState {
start_time: Instant::now(),
seeded_rng,
include_deno_namespace,
- resource_table: Mutex::new(ResourceTable::default()),
};
Ok(ThreadSafeState(Arc::new(state)))
diff --git a/core/resources.rs b/core/resources.rs
index 216f5c8df..da4fb6b07 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -65,7 +65,7 @@ impl ResourceTable {
}
// close(2) is done by dropping the value. Therefore we just need to remove
- // the resource from the resource table.
+ // the resource from the RESOURCE_TABLE.
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
self.map.remove(&rid).map(|(_name, _resource)| ())
}