summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/lib.rs11
-rw-r--r--cli/ops/dispatch_minimal.rs8
-rw-r--r--cli/ops/fetch.rs9
-rw-r--r--cli/ops/files.rs28
-rw-r--r--cli/ops/io.rs175
-rw-r--r--cli/ops/mod.rs1
-rw-r--r--cli/ops/net.rs60
-rw-r--r--cli/ops/process.rs98
-rw-r--r--cli/ops/repl.rs10
-rw-r--r--cli/ops/resources.rs5
-rw-r--r--cli/ops/tls.rs54
-rw-r--r--cli/resources.rs209
-rw-r--r--cli/state.rs24
-rw-r--r--core/resources.rs2
14 files changed, 358 insertions, 336 deletions
diff --git a/cli/lib.rs b/cli/lib.rs
index 17ca94b55..3d772bb83 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -43,7 +43,6 @@ pub mod permissions;
mod progress;
mod repl;
pub mod resolve_addr;
-pub mod resources;
mod shell;
mod signal;
pub mod source_maps;
@@ -57,6 +56,7 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
use crate::global_state::ThreadSafeGlobalState;
+use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
@@ -128,6 +128,15 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();
+ let state_ = state.clone();
+ {
+ let mut resource_table = state_.lock_resource_table();
+ let (stdin, stdout, stderr) = get_stdio();
+ resource_table.add("stdin", Box::new(stdin));
+ resource_table.add("stdout", Box::new(stdout));
+ resource_table.add("stderr", Box::new(stderr));
+ }
+
let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs
index c19521bf1..355a24634 100644
--- a/cli/ops/dispatch_minimal.rs
+++ b/cli/ops/dispatch_minimal.rs
@@ -15,7 +15,6 @@ use deno::PinnedBuf;
use futures::Future;
pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
-pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
@@ -112,9 +111,10 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}
-pub fn minimal_op(
- d: Dispatcher,
-) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
+pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
+where
+ D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
+{
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index 143331171..a1c0fe29c 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -1,8 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::io::StreamResource;
+use crate::http_body::HttpBody;
use crate::http_util::get_client;
use crate::ops::json_op;
-use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
use http::header::HeaderName;
@@ -54,6 +55,7 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
+ let state_ = state.clone();
let future = request.send().map_err(ErrBox::from).and_then(move |res| {
let status = res.status();
let mut res_headers = Vec::new();
@@ -61,8 +63,9 @@ pub fn op_fetch(
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
- let body = res.into_body();
- let rid = resources::add_reqwest_body(body);
+ let body = HttpBody::from(res.into_body());
+ let mut table = state_.lock_resource_table();
+ let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));
let json_res = json!({
"bodyRid": rid,
diff --git a/cli/ops/files.rs b/cli/ops/files.rs
index 04b5f98bf..fc1b8e7d8 100644
--- a/cli/ops/files.rs
+++ b/cli/ops/files.rs
@@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::fs as deno_fs;
use crate::ops::json_op;
-use crate::resources;
-use crate::resources::CliResource;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
@@ -38,7 +37,7 @@ fn op_open(
let args: OpenArgs = serde_json::from_value(args)?;
let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?;
let mode = args.mode.as_ref();
-
+ let state_ = state.clone();
let mut open_options = tokio::fs::OpenOptions::new();
match mode {
@@ -91,7 +90,8 @@ fn op_open(
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
- let rid = resources::add_fs_file(fs_file);
+ let mut table = state_.lock_resource_table();
+ let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
futures::future::ok(json!(rid))
},
);
@@ -110,21 +110,21 @@ struct CloseArgs {
}
fn op_close(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;
- let mut table = resources::lock_resource_table();
+ let mut table = state.lock_resource_table();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}
-#[derive(Debug)]
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
+ state: ThreadSafeState,
}
impl Future for SeekFuture {
@@ -132,13 +132,13 @@ impl Future for SeekFuture {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let resource = table
- .get_mut::<CliResource>(self.rid)
+ .get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
- CliResource::FsFile(ref mut file) => file,
+ StreamResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};
@@ -156,7 +156,7 @@ struct SeekArgs {
}
fn op_seek(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -177,7 +177,11 @@ fn op_seek(
}
};
- let fut = SeekFuture { seek_from, rid };
+ let fut = SeekFuture {
+ state: state.clone(),
+ seek_from,
+ rid,
+ };
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 3ede4b411..959147f19 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -1,19 +1,101 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
use crate::deno_error::bad_resource;
+use crate::http_body::HttpBody;
use crate::ops::minimal_op;
-use crate::resources;
-use crate::resources::CliResource;
-use crate::resources::DenoAsyncRead;
-use crate::resources::DenoAsyncWrite;
use crate::state::ThreadSafeState;
+use deno::ErrBox;
+use deno::Resource;
use deno::*;
+use futures;
use futures::Future;
use futures::Poll;
+use std;
+use tokio;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpStream;
+use tokio_process;
+use tokio_rustls::client::TlsStream as ClientTlsStream;
+use tokio_rustls::server::TlsStream as ServerTlsStream;
+
+#[cfg(not(windows))]
+use std::os::unix::io::FromRawFd;
+
+#[cfg(windows)]
+use std::os::windows::io::FromRawHandle;
+
+#[cfg(windows)]
+extern crate winapi;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
- i.register_op("read", s.core_op(minimal_op(op_read)));
- i.register_op("write", s.core_op(minimal_op(op_write)));
+ i.register_op(
+ "read",
+ s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
+ );
+ i.register_op(
+ "write",
+ s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
+ );
+}
+
+pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
+ let stdin = StreamResource::Stdin(tokio::io::stdin());
+ let stdout = StreamResource::Stdout({
+ #[cfg(not(windows))]
+ let stdout = unsafe { std::fs::File::from_raw_fd(1) };
+ #[cfg(windows)]
+ let stdout = unsafe {
+ std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_OUTPUT_HANDLE,
+ ))
+ };
+ tokio::fs::File::from_std(stdout)
+ });
+ let stderr = StreamResource::Stderr(tokio::io::stderr());
+
+ (stdin, stdout, stderr)
+}
+
+pub enum StreamResource {
+ Stdin(tokio::io::Stdin),
+ Stdout(tokio::fs::File),
+ Stderr(tokio::io::Stderr),
+ FsFile(tokio::fs::File),
+ TcpStream(tokio::net::TcpStream),
+ ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
+ ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
+ HttpBody(HttpBody),
+ ChildStdin(tokio_process::ChildStdin),
+ ChildStdout(tokio_process::ChildStdout),
+ ChildStderr(tokio_process::ChildStderr),
+}
+
+impl Resource for StreamResource {}
+
+/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
+/// but uses an `ErrBox` error instead of `std::io:Error`
+pub trait DenoAsyncRead {
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
+}
+
+impl DenoAsyncRead for StreamResource {
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
+ let r = match self {
+ StreamResource::FsFile(ref mut f) => f.poll_read(buf),
+ StreamResource::Stdin(ref mut f) => f.poll_read(buf),
+ StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
+ StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
+ StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
+ StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
+ StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
+ StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
+ _ => {
+ return Err(bad_resource());
+ }
+ };
+
+ r.map_err(ErrBox::from)
+ }
}
#[derive(Debug, PartialEq)]
@@ -27,14 +109,15 @@ enum IoState {
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
-pub fn read<T>(rid: ResourceId, buf: T) -> Read<T>
+pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T>
where
T: AsMut<[u8]>,
{
Read {
rid,
buf,
- state: IoState::Pending,
+ io_state: IoState::Pending,
+ state: state.clone(),
}
}
@@ -42,11 +125,11 @@ where
/// a buffer.
///
/// Created by the [`read`] function.
-#[derive(Debug)]
pub struct Read<T> {
rid: ResourceId,
buf: T,
- state: IoState,
+ io_state: IoState,
+ state: ThreadSafeState,
}
impl<T> Future for Read<T>
@@ -57,21 +140,25 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == IoState::Done {
+ if self.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let resource = table
- .get_mut::<CliResource>(self.rid)
+ .get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
- self.state = IoState::Done;
+ self.io_state = IoState::Done;
Ok(nread.into())
}
}
-pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
+pub fn op_read(
+ state: &ThreadSafeState,
+ rid: i32,
+ zero_copy: Option<PinnedBuf>,
+) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
@@ -80,19 +167,50 @@ pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
- let fut = read(rid as u32, zero_copy)
+ let fut = read(state, rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nread| Ok(nread as i32));
Box::new(fut)
}
+/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
+/// but uses an `ErrBox` error instead of `std::io:Error`
+pub trait DenoAsyncWrite {
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
+
+ fn shutdown(&mut self) -> Poll<(), ErrBox>;
+}
+
+impl DenoAsyncWrite for StreamResource {
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
+ let r = match self {
+ StreamResource::FsFile(ref mut f) => f.poll_write(buf),
+ StreamResource::Stdout(ref mut f) => f.poll_write(buf),
+ StreamResource::Stderr(ref mut f) => f.poll_write(buf),
+ StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
+ StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
+ StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
+ StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
+ _ => {
+ return Err(bad_resource());
+ }
+ };
+
+ r.map_err(ErrBox::from)
+ }
+
+ fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
+ unimplemented!()
+ }
+}
+
/// A future used to write some data to a stream.
-#[derive(Debug)]
pub struct Write<T> {
rid: ResourceId,
buf: T,
- state: IoState,
+ io_state: IoState,
+ state: ThreadSafeState,
}
/// Creates a future that will write some of the buffer `buf` to
@@ -100,14 +218,15 @@ pub struct Write<T> {
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
-pub fn write<T>(rid: ResourceId, buf: T) -> Write<T>
+pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T>
where
T: AsRef<[u8]>,
{
Write {
rid,
buf,
- state: IoState::Pending,
+ io_state: IoState::Pending,
+ state: state.clone(),
}
}
@@ -121,21 +240,25 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == IoState::Done {
+ if self.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let resource = table
- .get_mut::<CliResource>(self.rid)
+ .get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
- self.state = IoState::Done;
+ self.io_state = IoState::Done;
Ok(nwritten.into())
}
}
-pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
+pub fn op_write(
+ state: &ThreadSafeState,
+ rid: i32,
+ zero_copy: Option<PinnedBuf>,
+) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
@@ -144,7 +267,7 @@ pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
- let fut = write(rid as u32, zero_copy)
+ let fut = write(state, rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nwritten| Ok(nwritten as i32));
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index 4e6eb37c8..9b33d5918 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -5,6 +5,7 @@ mod dispatch_minimal;
pub use dispatch_json::json_op;
pub use dispatch_json::JsonOp;
pub use dispatch_minimal::minimal_op;
+pub use dispatch_minimal::MinimalOp;
pub mod compiler;
pub mod errors;
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index a4b3bf934..2fe81e140 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -1,12 +1,11 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
-use crate::resources;
-use crate::resources::CliResource;
-use crate::resources::Resource;
use crate::state::ThreadSafeState;
+use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@@ -34,18 +33,19 @@ enum AcceptState {
}
/// Simply accepts a connection.
-pub fn accept(rid: ResourceId) -> Accept {
+pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
Accept {
- state: AcceptState::Eager,
+ accept_state: AcceptState::Eager,
rid,
+ state: state.clone(),
}
}
/// A future representing state of accepting a TCP connection.
-#[derive(Debug)]
pub struct Accept {
- state: AcceptState,
+ accept_state: AcceptState,
rid: ResourceId,
+ state: ThreadSafeState,
}
impl Future for Accept {
@@ -53,11 +53,11 @@ impl Future for Accept {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == AcceptState::Done {
+ if self.accept_state == AcceptState::Done {
panic!("poll Accept after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let listener_resource = table
.get_mut::<TcpListenerResource>(self.rid)
.ok_or_else(|| {
@@ -70,22 +70,22 @@ impl Future for Accept {
let listener = &mut listener_resource.listener;
- if self.state == AcceptState::Eager {
+ if self.accept_state == AcceptState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
- self.state = AcceptState::Pending;
+ self.accept_state = AcceptState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
return Err(e);
}
}
@@ -94,7 +94,7 @@ impl Future for Accept {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@@ -103,7 +103,7 @@ impl Future for Accept {
}
Err(e) => {
listener_resource.untrack_task();
- self.state = AcceptState::Done;
+ self.accept_state = AcceptState::Done;
Err(e)
}
}
@@ -116,23 +116,25 @@ struct AcceptArgs {
}
fn op_accept(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
-
- let table = resources::lock_resource_table();
+ let state_ = state.clone();
+ let table = state.lock_resource_table();
table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
- let op = accept(rid)
+ let op = accept(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let rid = resources::add_tcp_stream(tcp_stream);
+ let mut table = state_.lock_resource_table();
+ let rid =
+ table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@@ -161,7 +163,7 @@ fn op_dial(
) -> Result<JsonOp, ErrBox> {
let args: DialArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp"); // TODO Support others.
-
+ let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
@@ -170,7 +172,9 @@ fn op_dial(
.and_then(move |tcp_stream| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
- let rid = resources::add_tcp_stream(tcp_stream);
+ let mut table = state_.lock_resource_table();
+ let rid = table
+ .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@@ -193,7 +197,7 @@ struct ShutdownArgs {
}
fn op_shutdown(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -208,10 +212,12 @@ fn op_shutdown(
_ => unimplemented!(),
};
- let mut table = resources::lock_resource_table();
- let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?;
+ let mut table = state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(rid)
+ .ok_or_else(bad_resource)?;
match resource {
- CliResource::TcpStream(ref mut stream) => {
+ StreamResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?;
}
_ => return Err(bad_resource()),
@@ -299,7 +305,7 @@ fn op_listen(
task: None,
local_addr,
};
- let mut table = resources::lock_resource_table();
+ let mut table = state.lock_resource_table();
let rid = table.add("tcpListener", Box::new(listener_resource));
Ok(JsonOp::Sync(json!({
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index f7897ec51..237b02fd0 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -1,9 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
-use crate::resources;
-use crate::resources::CloneFileFuture;
use crate::signal::kill;
use crate::state::ThreadSafeState;
use deno::*;
@@ -28,6 +27,41 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
}
+struct CloneFileFuture {
+ rid: ResourceId,
+ state: ThreadSafeState,
+}
+
+impl Future for CloneFileFuture {
+ type Item = tokio::fs::File;
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let mut table = self.state.lock_resource_table();
+ let repr = table
+ .get_mut::<StreamResource>(self.rid)
+ .ok_or_else(bad_resource)?;
+ match repr {
+ StreamResource::FsFile(ref mut file) => {
+ file.poll_try_clone().map_err(ErrBox::from)
+ }
+ _ => Err(bad_resource()),
+ }
+ }
+}
+
+fn clone_file(
+ rid: u32,
+ state: &ThreadSafeState,
+) -> Result<std::fs::File, ErrBox> {
+ (CloneFileFuture {
+ rid,
+ state: state.clone(),
+ })
+ .wait()
+ .map(|f| f.into_std())
+}
+
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
match s {
"inherit" => std::process::Stdio::inherit(),
@@ -65,6 +99,7 @@ fn op_run(
let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
+ let state_ = state.clone();
let args = run_args.args;
let env = run_args.env;
@@ -83,7 +118,7 @@ fn op_run(
// TODO: make this work with other resources, eg. sockets
let stdin_rid = run_args.stdin_rid;
if stdin_rid > 0 {
- let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std();
+ let file = clone_file(stdin_rid, &state_)?;
c.stdin(file);
} else {
c.stdin(subprocess_stdio_map(run_args.stdin.as_ref()));
@@ -91,7 +126,7 @@ fn op_run(
let stdout_rid = run_args.stdout_rid;
if stdout_rid > 0 {
- let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std();
+ let file = clone_file(stdout_rid, &state_)?;
c.stdout(file);
} else {
c.stdout(subprocess_stdio_map(run_args.stdout.as_ref()));
@@ -99,7 +134,7 @@ fn op_run(
let stderr_rid = run_args.stderr_rid;
if stderr_rid > 0 {
- let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std();
+ let file = clone_file(stderr_rid, &state_)?;
c.stderr(file);
} else {
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
@@ -109,29 +144,42 @@ fn op_run(
let mut child = c.spawn_async().map_err(ErrBox::from)?;
let pid = child.id();
- let stdin_rid = if child.stdin().is_some() {
- let rid = resources::add_child_stdin(child.stdin().take().unwrap());
- Some(rid)
- } else {
- None
+ let mut table = state_.lock_resource_table();
+
+ let stdin_rid = match child.stdin().take() {
+ Some(child_stdin) => {
+ let rid = table.add(
+ "childStdin",
+ Box::new(StreamResource::ChildStdin(child_stdin)),
+ );
+ Some(rid)
+ }
+ None => None,
};
- let stdout_rid = if child.stdout().is_some() {
- let rid = resources::add_child_stdout(child.stdout().take().unwrap());
- Some(rid)
- } else {
- None
+ let stdout_rid = match child.stdout().take() {
+ Some(child_stdout) => {
+ let rid = table.add(
+ "childStdout",
+ Box::new(StreamResource::ChildStdout(child_stdout)),
+ );
+ Some(rid)
+ }
+ None => None,
};
- let stderr_rid = if child.stderr().is_some() {
- let rid = resources::add_child_stderr(child.stderr().take().unwrap());
- Some(rid)
- } else {
- None
+ let stderr_rid = match child.stderr().take() {
+ Some(child_stderr) => {
+ let rid = table.add(
+ "childStderr",
+ Box::new(StreamResource::ChildStderr(child_stderr)),
+ );
+ Some(rid)
+ }
+ None => None,
};
let child_resource = ChildResource { child };
- let mut table = resources::lock_resource_table();
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
@@ -145,6 +193,7 @@ fn op_run(
pub struct ChildStatus {
rid: ResourceId,
+ state: ThreadSafeState,
}
impl Future for ChildStatus {
@@ -152,7 +201,7 @@ impl Future for ChildStatus {
type Error = ErrBox;
fn poll(&mut self) -> Poll<ExitStatus, ErrBox> {
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let child_resource = table
.get_mut::<ChildResource>(self.rid)
.ok_or_else(bad_resource)?;
@@ -177,7 +226,10 @@ fn op_run_status(
state.check_run()?;
- let future = ChildStatus { rid };
+ let future = ChildStatus {
+ rid,
+ state: state.clone(),
+ };
let future = future.and_then(move |run_status| {
let code = run_status.code();
diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs
index 723fb2571..5919ea586 100644
--- a/cli/ops/repl.rs
+++ b/cli/ops/repl.rs
@@ -4,9 +4,8 @@ use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::repl;
use crate::repl::Repl;
-use crate::resources;
-use crate::resources::Resource;
use crate::state::ThreadSafeState;
+use deno::Resource;
use deno::*;
use std::sync::Arc;
use std::sync::Mutex;
@@ -44,7 +43,7 @@ fn op_repl_start(
repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
- let mut table = resources::lock_resource_table();
+ let mut table = state.lock_resource_table();
let rid = table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
@@ -56,7 +55,7 @@ struct ReplReadlineArgs {
}
fn op_repl_readline(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@@ -64,9 +63,10 @@ fn op_repl_readline(
let rid = args.rid as u32;
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
+ let state = state.clone();
blocking_json(false, move || {
- let table = resources::lock_resource_table();
+ let table = state.lock_resource_table();
let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?;
let repl = resource.0.clone();
let line = repl.lock().unwrap().readline(&prompt)?;
diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs
index d92c6a83c..c35e9762c 100644
--- a/cli/ops/resources.rs
+++ b/cli/ops/resources.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op;
-use crate::resources::lock_resource_table;
use crate::state::ThreadSafeState;
use deno::*;
@@ -10,11 +9,11 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
}
fn op_resources(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
_args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
- let resource_table = lock_resource_table();
+ let resource_table = state.lock_resource_table();
let serialized_resources = resource_table.entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
}
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs
index 6e8348c91..48419f76f 100644
--- a/cli/ops/tls.rs
+++ b/cli/ops/tls.rs
@@ -1,13 +1,13 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
-use crate::resources;
-use crate::resources::Resource;
use crate::state::ThreadSafeState;
+use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@@ -60,7 +60,7 @@ pub fn op_dial_tls(
) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file;
-
+ let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(&path)?;
@@ -99,7 +99,11 @@ pub fn op_dial_tls(
.connect(dnsname, tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
- let rid = resources::add_tls_stream(tls_stream);
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "clientTlsStream",
+ Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
+ );
futures::future::ok(json!({
"rid": rid,
"localAddr": local_addr.to_string(),
@@ -265,7 +269,7 @@ fn op_listen_tls(
task: None,
local_addr,
};
- let mut table = resources::lock_resource_table();
+ let mut table = state.lock_resource_table();
let rid = table.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({
@@ -282,18 +286,19 @@ enum AcceptTlsState {
}
/// Simply accepts a TLS connection.
-pub fn accept_tls(rid: ResourceId) -> AcceptTls {
+pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls {
AcceptTls {
- state: AcceptTlsState::Eager,
+ accept_state: AcceptTlsState::Eager,
rid,
+ state: state.clone(),
}
}
/// A future representing state of accepting a TLS connection.
-#[derive(Debug)]
pub struct AcceptTls {
- state: AcceptTlsState,
+ accept_state: AcceptTlsState,
rid: ResourceId,
+ state: ThreadSafeState,
}
impl Future for AcceptTls {
@@ -301,11 +306,11 @@ impl Future for AcceptTls {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.state == AcceptTlsState::Done {
+ if self.accept_state == AcceptTlsState::Done {
panic!("poll AcceptTls after it's done");
}
- let mut table = resources::lock_resource_table();
+ let mut table = self.state.lock_resource_table();
let listener_resource = table
.get_mut::<TlsListenerResource>(self.rid)
.ok_or_else(|| {
@@ -318,22 +323,22 @@ impl Future for AcceptTls {
let listener = &mut listener_resource.listener;
- if self.state == AcceptTlsState::Eager {
+ if self.accept_state == AcceptTlsState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
- self.state = AcceptTlsState::Done;
+ self.accept_state = AcceptTlsState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
- self.state = AcceptTlsState::Pending;
+ self.accept_state = AcceptTlsState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
- self.state = AcceptTlsState::Done;
+ self.accept_state = AcceptTlsState::Done;
return Err(e);
}
}
@@ -342,7 +347,7 @@ impl Future for AcceptTls {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
- self.state = AcceptTlsState::Done;
+ self.accept_state = AcceptTlsState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@@ -351,7 +356,7 @@ impl Future for AcceptTls {
}
Err(e) => {
listener_resource.untrack_task();
- self.state = AcceptTlsState::Done;
+ self.accept_state = AcceptTlsState::Done;
Err(e)
}
}
@@ -364,21 +369,22 @@ struct AcceptTlsArgs {
}
fn op_accept_tls(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
-
- let op = accept_tls(rid)
+ let state1 = state.clone();
+ let state2 = state.clone();
+ let op = accept_tls(state, rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
Ok((tcp_stream, local_addr, remote_addr))
})
.and_then(move |(tcp_stream, local_addr, remote_addr)| {
- let table = resources::lock_resource_table();
+ let table = state1.lock_resource_table();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
@@ -389,7 +395,11 @@ fn op_accept_tls(
.accept(tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
- let rid = resources::add_server_tls_stream(tls_stream);
+ let mut table = state2.lock_resource_table();
+ let rid = table.add(
+ "serverTlsStream",
+ Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
+ );
Ok((rid, local_addr, remote_addr))
})
})
diff --git a/cli/resources.rs b/cli/resources.rs
deleted file mode 100644
index db9b43eeb..000000000
--- a/cli/resources.rs
+++ /dev/null
@@ -1,209 +0,0 @@
-// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-
-// Think of Resources as File Descriptors. They are integers that are allocated
-// by the privileged side of Deno to refer to various resources. The simplest
-// example are standard file system files and stdio - but there will be other
-// resources added in the future that might not correspond to operating system
-// level File Descriptors. To avoid confusion we call them "resources" not "file
-// descriptors". This module implements a global resource table. Ops (AKA
-// handlers) look up resources by their integer id here.
-
-use crate::deno_error::bad_resource;
-use crate::http_body::HttpBody;
-use deno::ErrBox;
-pub use deno::Resource;
-pub use deno::ResourceId;
-use deno::ResourceTable;
-
-use futures;
-use futures::Future;
-use futures::Poll;
-use reqwest::r#async::Decoder as ReqwestDecoder;
-use std;
-use std::sync::Mutex;
-use std::sync::MutexGuard;
-use tokio;
-use tokio::io::{AsyncRead, AsyncWrite};
-use tokio::net::TcpStream;
-use tokio_process;
-use tokio_rustls::client::TlsStream as ClientTlsStream;
-use tokio_rustls::server::TlsStream as ServerTlsStream;
-
-#[cfg(not(windows))]
-use std::os::unix::io::FromRawFd;
-
-#[cfg(windows)]
-use std::os::windows::io::FromRawHandle;
-
-#[cfg(windows)]
-extern crate winapi;
-
-lazy_static! {
- static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new({
- let mut table = ResourceTable::default();
-
- // TODO Load these lazily during lookup?
- table.add("stdin", Box::new(CliResource::Stdin(tokio::io::stdin())));
-
- table.add("stdout", Box::new(CliResource::Stdout({
- #[cfg(not(windows))]
- let stdout = unsafe { std::fs::File::from_raw_fd(1) };
- #[cfg(windows)]
- let stdout = unsafe {
- std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
- winapi::um::winbase::STD_OUTPUT_HANDLE))
- };
- tokio::fs::File::from_std(stdout)
- })));
-
- table.add("stderr", Box::new(CliResource::Stderr(tokio::io::stderr())));
- table
- });
-}
-
-// TODO: rename to `StreamResource`
-pub enum CliResource {
- Stdin(tokio::io::Stdin),
- Stdout(tokio::fs::File),
- Stderr(tokio::io::Stderr),
- FsFile(tokio::fs::File),
- TcpStream(tokio::net::TcpStream),
- ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
- ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
- HttpBody(HttpBody),
- ChildStdin(tokio_process::ChildStdin),
- ChildStdout(tokio_process::ChildStdout),
- ChildStderr(tokio_process::ChildStderr),
-}
-
-impl Resource for CliResource {}
-
-pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
- RESOURCE_TABLE.lock().unwrap()
-}
-
-/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
-/// but uses an `ErrBox` error instead of `std::io:Error`
-pub trait DenoAsyncRead {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
-}
-
-impl DenoAsyncRead for CliResource {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- CliResource::FsFile(ref mut f) => f.poll_read(buf),
- CliResource::Stdin(ref mut f) => f.poll_read(buf),
- CliResource::TcpStream(ref mut f) => f.poll_read(buf),
- CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
- CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
- CliResource::HttpBody(ref mut f) => f.poll_read(buf),
- CliResource::ChildStdout(ref mut f) => f.poll_read(buf),
- CliResource::ChildStderr(ref mut f) => f.poll_read(buf),
- _ => {
- return Err(bad_resource());
- }
- };
-
- r.map_err(ErrBox::from)
- }
-}
-
-/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
-/// but uses an `ErrBox` error instead of `std::io:Error`
-pub trait DenoAsyncWrite {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
-
- fn shutdown(&mut self) -> Poll<(), ErrBox>;
-}
-
-impl DenoAsyncWrite for CliResource {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- CliResource::FsFile(ref mut f) => f.poll_write(buf),
- CliResource::Stdout(ref mut f) => f.poll_write(buf),
- CliResource::Stderr(ref mut f) => f.poll_write(buf),
- CliResource::TcpStream(ref mut f) => f.poll_write(buf),
- CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
- CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
- CliResource::ChildStdin(ref mut f) => f.poll_write(buf),
- _ => {
- return Err(bad_resource());
- }
- };
-
- r.map_err(ErrBox::from)
- }
-
- fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
- unimplemented!()
- }
-}
-
-pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId {
- let mut table = lock_resource_table();
- table.add("fsFile", Box::new(CliResource::FsFile(fs_file)))
-}
-
-pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId {
- let mut table = lock_resource_table();
- table.add("tcpStream", Box::new(CliResource::TcpStream(stream)))
-}
-
-pub fn add_tls_stream(stream: ClientTlsStream<TcpStream>) -> ResourceId {
- let mut table = lock_resource_table();
- table.add(
- "clientTlsStream",
- Box::new(CliResource::ClientTlsStream(Box::new(stream))),
- )
-}
-
-pub fn add_server_tls_stream(stream: ServerTlsStream<TcpStream>) -> ResourceId {
- let mut table = lock_resource_table();
- table.add(
- "serverTlsStream",
- Box::new(CliResource::ServerTlsStream(Box::new(stream))),
- )
-}
-
-pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId {
- let body = HttpBody::from(body);
- let mut table = lock_resource_table();
- table.add("httpBody", Box::new(CliResource::HttpBody(body)))
-}
-
-pub fn add_child_stdin(stdin: tokio_process::ChildStdin) -> ResourceId {
- let mut table = lock_resource_table();
- table.add("childStdin", Box::new(CliResource::ChildStdin(stdin)))
-}
-
-pub fn add_child_stdout(stdout: tokio_process::ChildStdout) -> ResourceId {
- let mut table = lock_resource_table();
- table.add("childStdout", Box::new(CliResource::ChildStdout(stdout)))
-}
-
-pub fn add_child_stderr(stderr: tokio_process::ChildStderr) -> ResourceId {
- let mut table = lock_resource_table();
- table.add("childStderr", Box::new(CliResource::ChildStderr(stderr)))
-}
-
-pub struct CloneFileFuture {
- pub rid: ResourceId,
-}
-
-impl Future for CloneFileFuture {
- type Item = tokio::fs::File;
- type Error = ErrBox;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let mut table = lock_resource_table();
- let repr = table
- .get_mut::<CliResource>(self.rid)
- .ok_or_else(bad_resource)?;
- match repr {
- CliResource::FsFile(ref mut file) => {
- file.poll_try_clone().map_err(ErrBox::from)
- }
- _ => Err(bad_resource()),
- }
- }
-}
diff --git a/cli/state.rs b/cli/state.rs
index edfac72c0..a5e9546b0 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -5,6 +5,7 @@ use crate::global_timer::GlobalTimer;
use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
+use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
@@ -15,6 +16,7 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
+use deno::ResourceTable;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@@ -27,6 +29,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
+use std::sync::MutexGuard;
use std::time::Instant;
use tokio::sync::mpsc;
@@ -52,6 +55,7 @@ pub struct State {
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
+ pub resource_table: Mutex<ResourceTable>,
}
impl Clone for ThreadSafeState {
@@ -68,6 +72,10 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
+ pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> {
+ self.resource_table.lock().unwrap()
+ }
+
/// Wrap core `OpDispatcher` to collect metrics.
pub fn core_op<D>(
&self,
@@ -104,6 +112,21 @@ impl ThreadSafeState {
}
/// This is a special function that provides `state` argument to dispatcher.
+ pub fn stateful_minimal_op<D>(
+ &self,
+ dispatcher: D,
+ ) -> impl Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>
+ where
+ D: Fn(&ThreadSafeState, i32, Option<PinnedBuf>) -> Box<MinimalOp>,
+ {
+ let state = self.clone();
+
+ move |rid: i32, zero_copy: Option<PinnedBuf>| -> Box<MinimalOp> {
+ dispatcher(&state, rid, zero_copy)
+ }
+ }
+
+ /// This is a special function that provides `state` argument to dispatcher.
///
/// NOTE: This only works with JSON dispatcher.
/// This is a band-aid for transition to `Isolate.register_op` API as most of our
@@ -220,6 +243,7 @@ impl ThreadSafeState {
start_time: Instant::now(),
seeded_rng,
include_deno_namespace,
+ resource_table: Mutex::new(ResourceTable::default()),
};
Ok(ThreadSafeState(Arc::new(state)))
diff --git a/core/resources.rs b/core/resources.rs
index da4fb6b07..216f5c8df 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -65,7 +65,7 @@ impl ResourceTable {
}
// close(2) is done by dropping the value. Therefore we just need to remove
- // the resource from the RESOURCE_TABLE.
+ // the resource from the resource table.
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
self.map.remove(&rid).map(|(_name, _resource)| ())
}