summaryrefslogtreecommitdiff
path: root/runtime/ops
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops')
-rw-r--r--runtime/ops/fs.rs31
-rw-r--r--runtime/ops/io.rs714
-rw-r--r--runtime/ops/net.rs45
-rw-r--r--runtime/ops/net_unix.rs4
-rw-r--r--runtime/ops/process.rs13
-rw-r--r--runtime/ops/tls.rs9
-rw-r--r--runtime/ops/tty.rs48
7 files changed, 453 insertions, 411 deletions
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs
index d1a2489ba..d1242b116 100644
--- a/runtime/ops/fs.rs
+++ b/runtime/ops/fs.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
// Some deserializer fields are only used on Unix and Windows build fails without it
-use super::io::std_file_resource;
-use super::io::StreamResource;
+use super::io::StdFileResource;
use crate::fs_util::canonicalize_path;
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
@@ -188,7 +187,7 @@ fn op_open_sync(
let (path, open_options) = open_helper(state, args)?;
let std_file = open_options.open(path)?;
let tokio_file = tokio::fs::File::from_std(std_file);
- let resource = StreamResource::fs_file(tokio_file);
+ let resource = StdFileResource::fs_file(tokio_file);
let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
@@ -202,7 +201,7 @@ async fn op_open_async(
let tokio_file = tokio::fs::OpenOptions::from(open_options)
.open(path)
.await?;
- let resource = StreamResource::fs_file(tokio_file);
+ let resource = StdFileResource::fs_file(tokio_file);
let rid = state.borrow_mut().resource_table.add(resource);
Ok(json!(rid))
}
@@ -239,7 +238,7 @@ fn op_seek_sync(
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, AnyError> {
let (rid, seek_from) = seek_helper(args)?;
- let pos = std_file_resource(state, rid, |r| match r {
+ let pos = StdFileResource::with(state, rid, |r| match r {
Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from),
Err(_) => Err(type_error(
"cannot seek on this type of resource".to_string(),
@@ -258,7 +257,7 @@ async fn op_seek_async(
let resource = state
.borrow_mut()
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
@@ -286,7 +285,7 @@ fn op_fdatasync_sync(
) -> Result<Value, AnyError> {
let args: FdatasyncArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- std_file_resource(state, rid, |r| match r {
+ StdFileResource::with(state, rid, |r| match r {
Ok(std_file) => std_file.sync_data().map_err(AnyError::from),
Err(_) => Err(type_error("cannot sync this type of resource".to_string())),
})?;
@@ -304,7 +303,7 @@ async fn op_fdatasync_async(
let resource = state
.borrow_mut()
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
@@ -332,7 +331,7 @@ fn op_fsync_sync(
) -> Result<Value, AnyError> {
let args: FsyncArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- std_file_resource(state, rid, |r| match r {
+ StdFileResource::with(state, rid, |r| match r {
Ok(std_file) => std_file.sync_all().map_err(AnyError::from),
Err(_) => Err(type_error("cannot sync this type of resource".to_string())),
})?;
@@ -350,7 +349,7 @@ async fn op_fsync_async(
let resource = state
.borrow_mut()
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
@@ -379,7 +378,7 @@ fn op_fstat_sync(
super::check_unstable(state, "Deno.fstat");
let args: FstatArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let metadata = std_file_resource(state, rid, |r| match r {
+ let metadata = StdFileResource::with(state, rid, |r| match r {
Ok(std_file) => std_file.metadata().map_err(AnyError::from),
Err(_) => Err(type_error("cannot stat this type of resource".to_string())),
})?;
@@ -399,7 +398,7 @@ async fn op_fstat_async(
let resource = state
.borrow_mut()
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
@@ -1365,7 +1364,7 @@ fn op_ftruncate_sync(
let args: FtruncateArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let len = args.len as u64;
- std_file_resource(state, rid, |r| match r {
+ StdFileResource::with(state, rid, |r| match r {
Ok(std_file) => std_file.set_len(len).map_err(AnyError::from),
Err(_) => Err(type_error("cannot truncate this type of resource")),
})?;
@@ -1385,7 +1384,7 @@ async fn op_ftruncate_async(
let resource = state
.borrow_mut()
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
@@ -1648,7 +1647,7 @@ fn op_futime_sync(
let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1);
let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1);
- std_file_resource(state, rid, |r| match r {
+ StdFileResource::with(state, rid, |r| match r {
Ok(std_file) => {
filetime::set_file_handle_times(std_file, Some(atime), Some(mtime))
.map_err(AnyError::from)
@@ -1675,7 +1674,7 @@ async fn op_futime_async(
let resource = state
.borrow_mut()
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index a8654cc29..2ac8e1b78 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -3,11 +3,14 @@
use super::dispatch_minimal::minimal_op;
use super::dispatch_minimal::MinimalOp;
use crate::metrics::metrics_op;
-use deno_core::error::bad_resource_id;
use deno_core::error::resource_unavailable;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::error::{bad_resource_id, not_supported};
use deno_core::futures::future::FutureExt;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::BufVec;
@@ -17,20 +20,30 @@ use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::convert::TryInto;
+use std::io::Read;
+use std::io::Write;
use std::rc::Rc;
+use tokio::io::split;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
+use tokio::io::ReadHalf;
+use tokio::io::WriteHalf;
use tokio::net::tcp;
use tokio::net::TcpStream;
-use tokio_rustls::client::TlsStream as ClientTlsStream;
-use tokio_rustls::server::TlsStream as ServerTlsStream;
+use tokio::process;
+use tokio_rustls as tls;
#[cfg(unix)]
use std::os::unix::io::FromRawFd;
+#[cfg(unix)]
+use tokio::net::unix;
#[cfg(windows)]
use std::os::windows::io::FromRawHandle;
@@ -94,12 +107,13 @@ lazy_static! {
pub fn init(rt: &mut JsRuntime) {
rt.register_op("op_read", metrics_op(minimal_op(op_read)));
rt.register_op("op_write", metrics_op(minimal_op(op_write)));
+ super::reg_json_async(rt, "op_shutdown", op_shutdown);
}
pub fn get_stdio() -> (
- Option<StreamResource>,
- Option<StreamResource>,
- Option<StreamResource>,
+ Option<StdFileResource>,
+ Option<StdFileResource>,
+ Option<StdFileResource>,
) {
let stdin = get_stdio_stream(&STDIN_HANDLE, "stdin");
let stdout = get_stdio_stream(&STDOUT_HANDLE, "stdout");
@@ -111,13 +125,13 @@ pub fn get_stdio() -> (
fn get_stdio_stream(
handle: &Option<std::fs::File>,
name: &str,
-) -> Option<StreamResource> {
+) -> Option<StdFileResource> {
match handle {
None => None,
Some(file_handle) => match file_handle.try_clone() {
Ok(clone) => {
let tokio_file = tokio::fs::File::from_std(clone);
- Some(StreamResource::stdio(tokio_file, name))
+ Some(StdFileResource::stdio(tokio_file, name))
}
Err(_e) => None,
},
@@ -143,6 +157,80 @@ pub struct FileMetadata {
}
#[derive(Debug)]
+pub struct WriteOnlyResource<S> {
+ stream: AsyncRefCell<S>,
+}
+
+impl<S: 'static> From<S> for WriteOnlyResource<S> {
+ fn from(stream: S) -> Self {
+ Self {
+ stream: stream.into(),
+ }
+ }
+}
+
+impl<S> WriteOnlyResource<S>
+where
+ S: AsyncWrite + Unpin + 'static,
+{
+ pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> {
+ RcRef::map(self, |r| &r.stream).borrow_mut()
+ }
+
+ async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
+ let mut stream = self.borrow_mut().await;
+ let nwritten = stream.write(buf).await?;
+ Ok(nwritten)
+ }
+
+ async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ let mut stream = self.borrow_mut().await;
+ stream.shutdown().await?;
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+pub struct ReadOnlyResource<S> {
+ stream: AsyncRefCell<S>,
+ cancel_handle: CancelHandle,
+}
+
+impl<S: 'static> From<S> for ReadOnlyResource<S> {
+ fn from(stream: S) -> Self {
+ Self {
+ stream: stream.into(),
+ cancel_handle: Default::default(),
+ }
+ }
+}
+
+impl<S> ReadOnlyResource<S>
+where
+ S: AsyncRead + Unpin + 'static,
+{
+ pub fn borrow_mut(self: &Rc<Self>) -> AsyncMutFuture<S> {
+ RcRef::map(self, |r| &r.stream).borrow_mut()
+ }
+
+ pub fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> {
+ RcRef::map(self, |r| &r.cancel_handle)
+ }
+
+ pub fn cancel_read_ops(&self) {
+ self.cancel_handle.cancel()
+ }
+
+ async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
+ let mut rd = self.borrow_mut().await;
+ let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
+ Ok(nread)
+ }
+}
+
+/// A full duplex resource has a read and write ends that are completely
+/// independent, like TCP/Unix sockets and TLS streams.
+#[derive(Debug)]
pub struct FullDuplexResource<R, W> {
rd: AsyncRefCell<R>,
wr: AsyncRefCell<W>,
@@ -152,7 +240,11 @@ pub struct FullDuplexResource<R, W> {
cancel_handle: CancelHandle,
}
-impl<R: 'static, W: 'static> FullDuplexResource<R, W> {
+impl<R, W> FullDuplexResource<R, W>
+where
+ R: AsyncRead + Unpin + 'static,
+ W: AsyncWrite + Unpin + 'static,
+{
pub fn new((rd, wr): (R, W)) -> Self {
Self {
rd: rd.into(),
@@ -180,13 +272,7 @@ impl<R: 'static, W: 'static> FullDuplexResource<R, W> {
pub fn cancel_read_ops(&self) {
self.cancel_handle.cancel()
}
-}
-impl<R, W> FullDuplexResource<R, W>
-where
- R: AsyncRead + Unpin + 'static,
- W: AsyncWrite + Unpin + 'static,
-{
async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
let mut rd = self.rd_borrow_mut().await;
let nread = rd.read(buf).try_or_cancel(self.cancel_handle()).await?;
@@ -198,6 +284,56 @@ where
let nwritten = wr.write(buf).await?;
Ok(nwritten)
}
+
+ async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ let mut wr = self.wr_borrow_mut().await;
+ wr.shutdown().await?;
+ Ok(())
+ }
+}
+
+pub type FullDuplexSplitResource<S> =
+ FullDuplexResource<ReadHalf<S>, WriteHalf<S>>;
+
+impl<S> From<S> for FullDuplexSplitResource<S>
+where
+ S: AsyncRead + AsyncWrite + 'static,
+{
+ fn from(stream: S) -> Self {
+ Self::new(split(stream))
+ }
+}
+
+pub type ChildStdinResource = WriteOnlyResource<process::ChildStdin>;
+
+impl Resource for ChildStdinResource {
+ fn name(&self) -> Cow<str> {
+ "childStdin".into()
+ }
+}
+
+pub type ChildStdoutResource = ReadOnlyResource<process::ChildStdout>;
+
+impl Resource for ChildStdoutResource {
+ fn name(&self) -> Cow<str> {
+ "childStdout".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
+
+pub type ChildStderrResource = ReadOnlyResource<process::ChildStderr>;
+
+impl Resource for ChildStderrResource {
+ fn name(&self) -> Cow<str> {
+ "childStderr".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
}
pub type TcpStreamResource =
@@ -213,35 +349,74 @@ impl Resource for TcpStreamResource {
}
}
-#[derive(Default)]
-pub struct StreamResource {
- pub fs_file:
- Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>,
+pub type TlsClientStreamResource =
+ FullDuplexSplitResource<tls::client::TlsStream<TcpStream>>;
- #[cfg(unix)]
- pub unix_stream: Option<AsyncRefCell<tokio::net::UnixStream>>,
+impl Resource for TlsClientStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tlsClientStream".into()
+ }
- child_stdin: Option<AsyncRefCell<tokio::process::ChildStdin>>,
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
- child_stdout: Option<AsyncRefCell<tokio::process::ChildStdout>>,
+pub type TlsServerStreamResource =
+ FullDuplexSplitResource<tls::server::TlsStream<TcpStream>>;
- child_stderr: Option<AsyncRefCell<tokio::process::ChildStderr>>,
+impl Resource for TlsServerStreamResource {
+ fn name(&self) -> Cow<str> {
+ "tlsServerStream".into()
+ }
- client_tls_stream: Option<AsyncRefCell<ClientTlsStream<TcpStream>>>,
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
+ }
+}
- server_tls_stream: Option<AsyncRefCell<ServerTlsStream<TcpStream>>>,
+#[cfg(unix)]
+pub type UnixStreamResource =
+ FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
- cancel: CancelHandle,
- name: String,
+#[cfg(not(unix))]
+struct UnixStreamResource;
+
+#[cfg(not(unix))]
+impl UnixStreamResource {
+ async fn read(self: &Rc<Self>, _buf: &mut [u8]) -> Result<usize, AnyError> {
+ unreachable!()
+ }
+ async fn write(self: &Rc<Self>, _buf: &[u8]) -> Result<usize, AnyError> {
+ unreachable!()
+ }
+ async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
+ unreachable!()
+ }
+ fn cancel_read_ops(&self) {
+ unreachable!()
+ }
}
-impl std::fmt::Debug for StreamResource {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- write!(f, "StreamResource")
+impl Resource for UnixStreamResource {
+ fn name(&self) -> Cow<str> {
+ "unixStream".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_read_ops();
}
}
-impl StreamResource {
+#[derive(Debug, Default)]
+pub struct StdFileResource {
+ pub fs_file:
+ Option<AsyncRefCell<(Option<tokio::fs::File>, Option<FileMetadata>)>>,
+ cancel: CancelHandle,
+ name: String,
+}
+
+impl StdFileResource {
pub fn stdio(fs_file: tokio::fs::File, name: &str) -> Self {
Self {
fs_file: Some(AsyncRefCell::new((
@@ -264,194 +439,89 @@ impl StreamResource {
}
}
- #[cfg(unix)]
- pub fn unix_stream(unix_stream: tokio::net::UnixStream) -> Self {
- Self {
- unix_stream: Some(AsyncRefCell::new(unix_stream)),
- name: "unixStream".to_string(),
- ..Default::default()
- }
- }
-
- pub fn child_stdout(child: tokio::process::ChildStdout) -> Self {
- Self {
- child_stdout: Some(AsyncRefCell::new(child)),
- name: "childStdout".to_string(),
- ..Default::default()
- }
- }
-
- pub fn child_stderr(child: tokio::process::ChildStderr) -> Self {
- Self {
- child_stderr: Some(AsyncRefCell::new(child)),
- name: "childStderr".to_string(),
- ..Default::default()
- }
- }
-
- pub fn child_stdin(child: tokio::process::ChildStdin) -> Self {
- Self {
- child_stdin: Some(AsyncRefCell::new(child)),
- name: "childStdin".to_string(),
- ..Default::default()
- }
- }
-
- pub fn client_tls_stream(stream: ClientTlsStream<TcpStream>) -> Self {
- Self {
- client_tls_stream: Some(AsyncRefCell::new(stream)),
- name: "clientTlsStream".to_string(),
- ..Default::default()
- }
- }
-
- pub fn server_tls_stream(stream: ServerTlsStream<TcpStream>) -> Self {
- Self {
- server_tls_stream: Some(AsyncRefCell::new(stream)),
- name: "serverTlsStream".to_string(),
- ..Default::default()
- }
- }
-
- async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
- // TODO(bartlomieju): in the future, it would be better for `StreamResource`
- // to be an enum instead a struct with many `Option` fields, however I
- // wasn't able to get it to work with `AsyncRefCell`s.
+ async fn read(self: &Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
if self.fs_file.is_some() {
- debug_assert!(self.child_stdin.is_none());
- debug_assert!(self.child_stdout.is_none());
- debug_assert!(self.child_stderr.is_none());
- debug_assert!(self.server_tls_stream.is_none());
- debug_assert!(self.client_tls_stream.is_none());
- let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
+ let mut fs_file = RcRef::map(&*self, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
- let nwritten = (*fs_file).0.as_mut().unwrap().read(buf).await?;
+ let nwritten = fs_file.0.as_mut().unwrap().read(buf).await?;
return Ok(nwritten);
- } else if self.child_stdout.is_some() {
- debug_assert!(self.child_stdin.is_none());
- debug_assert!(self.child_stderr.is_none());
- debug_assert!(self.server_tls_stream.is_none());
- debug_assert!(self.client_tls_stream.is_none());
- let mut child_stdout =
- RcRef::map(&self, |r| r.child_stdout.as_ref().unwrap())
- .borrow_mut()
- .await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = child_stdout.read(buf).try_or_cancel(cancel).await?;
- return Ok(nread);
- } else if self.child_stderr.is_some() {
- debug_assert!(self.child_stdin.is_none());
- debug_assert!(self.server_tls_stream.is_none());
- debug_assert!(self.client_tls_stream.is_none());
- let mut child_stderr =
- RcRef::map(&self, |r| r.child_stderr.as_ref().unwrap())
- .borrow_mut()
- .await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = child_stderr.read(buf).try_or_cancel(cancel).await?;
- return Ok(nread);
- } else if self.client_tls_stream.is_some() {
- debug_assert!(self.server_tls_stream.is_none());
- let mut client_tls_stream =
- RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = client_tls_stream.read(buf).try_or_cancel(cancel).await?;
- return Ok(nread);
- } else if self.server_tls_stream.is_some() {
- let mut server_tls_stream =
- RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = server_tls_stream.read(buf).try_or_cancel(cancel).await?;
- return Ok(nread);
- }
-
- #[cfg(unix)]
- if self.unix_stream.is_some() {
- let mut unix_stream =
- RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = unix_stream.read(buf).try_or_cancel(cancel).await?;
- return Ok(nread);
+ } else {
+ Err(resource_unavailable())
}
-
- Err(bad_resource_id())
}
- async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
- // TODO(bartlomieju): in the future, it would be better for `StreamResource`
- // to be an enum instead a struct with many `Option` fields, however I
- // wasn't able to get it to work with `AsyncRefCell`s.
+ async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
if self.fs_file.is_some() {
- debug_assert!(self.child_stdin.is_none());
- debug_assert!(self.child_stdout.is_none());
- debug_assert!(self.child_stderr.is_none());
- debug_assert!(self.server_tls_stream.is_none());
- debug_assert!(self.client_tls_stream.is_none());
- let mut fs_file = RcRef::map(&self, |r| r.fs_file.as_ref().unwrap())
+ let mut fs_file = RcRef::map(&*self, |r| r.fs_file.as_ref().unwrap())
.borrow_mut()
.await;
- let nwritten = (*fs_file).0.as_mut().unwrap().write(buf).await?;
- (*fs_file).0.as_mut().unwrap().flush().await?;
- return Ok(nwritten);
- } else if self.child_stdin.is_some() {
- debug_assert!(self.child_stdout.is_none());
- debug_assert!(self.child_stderr.is_none());
- debug_assert!(self.server_tls_stream.is_none());
- debug_assert!(self.client_tls_stream.is_none());
- let mut child_stdin =
- RcRef::map(&self, |r| r.child_stdin.as_ref().unwrap())
- .borrow_mut()
- .await;
- let nwritten = child_stdin.write(buf).await?;
- child_stdin.flush().await?;
- return Ok(nwritten);
- } else if self.client_tls_stream.is_some() {
- debug_assert!(self.server_tls_stream.is_none());
- let mut client_tls_stream =
- RcRef::map(&self, |r| r.client_tls_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- let nwritten = client_tls_stream.write(buf).await?;
- client_tls_stream.flush().await?;
- return Ok(nwritten);
- } else if self.server_tls_stream.is_some() {
- let mut server_tls_stream =
- RcRef::map(&self, |r| r.server_tls_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- let nwritten = server_tls_stream.write(buf).await?;
- server_tls_stream.flush().await?;
+ let nwritten = fs_file.0.as_mut().unwrap().write(buf).await?;
+ fs_file.0.as_mut().unwrap().flush().await?;
return Ok(nwritten);
+ } else {
+ Err(resource_unavailable())
}
+ }
- #[cfg(unix)]
- if self.unix_stream.is_some() {
- let mut unix_stream =
- RcRef::map(&self, |r| r.unix_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- let nwritten = unix_stream.write(buf).await?;
- unix_stream.flush().await?;
- return Ok(nwritten);
+ pub fn with<F, R>(
+ state: &mut OpState,
+ rid: u32,
+ mut f: F,
+ ) -> Result<R, AnyError>
+ where
+ F: FnMut(Result<&mut std::fs::File, ()>) -> Result<R, AnyError>,
+ {
+ // First we look up the rid in the resource table.
+ let resource = state
+ .resource_table
+ .get::<StdFileResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // Sync write only works for FsFile. It doesn't make sense to do this
+ // for non-blocking sockets. So we error out if not FsFile.
+ if resource.fs_file.is_none() {
+ return f(Err(()));
}
- Err(bad_resource_id())
+ // The object in the resource table is a tokio::fs::File - but in
+ // order to do a blocking write on it, we must turn it into a
+ // std::fs::File. Hopefully this code compiles down to nothing.
+ let fs_file_resource =
+ RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
+
+ if let Some(mut fs_file) = fs_file_resource {
+ let tokio_file = fs_file.0.take().unwrap();
+ match tokio_file.try_into_std() {
+ Ok(mut std_file) => {
+ let result = f(Ok(&mut std_file));
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ fs_file.0 = Some(tokio_file);
+ // return the result.
+ result
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ fs_file.0 = Some(tokio_file);
+ Err(resource_unavailable())
+ }
+ }
+ } else {
+ Err(resource_unavailable())
+ }
}
}
-impl Resource for StreamResource {
+impl Resource for StdFileResource {
fn name(&self) -> Cow<str> {
- self.name.clone().into()
+ self.name.as_str().into()
}
fn close(self: Rc<Self>) {
+ // TODO: do not cancel file I/O when file is writable.
self.cancel.cancel()
}
}
@@ -460,153 +530,163 @@ pub fn op_read(
state: Rc<RefCell<OpState>>,
is_sync: bool,
rid: i32,
- mut zero_copy: BufVec,
+ bufs: BufVec,
) -> MinimalOp {
- debug!("read rid={}", rid);
- match zero_copy.len() {
+ match bufs.len() {
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
1 => {}
_ => panic!("Invalid number of arguments"),
- }
+ };
+ let buf = bufs.into_iter().next().unwrap();
if is_sync {
- MinimalOp::Sync({
- // First we look up the rid in the resource table.
- std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r {
- Ok(std_file) => {
- use std::io::Read;
- std_file
- .read(&mut zero_copy[0])
- .map(|n: usize| n as i32)
- .map_err(AnyError::from)
- }
- Err(_) => Err(type_error("sync read not allowed on this resource")),
- })
- })
+ MinimalOp::Sync(op_read_sync(state, rid, buf))
} else {
- let mut zero_copy = zero_copy[0].clone();
- MinimalOp::Async({
- async move {
- let resource = state
- .borrow()
- .resource_table
- .get_any(rid as u32)
- .ok_or_else(bad_resource_id)?;
- let nread = if let Some(stream) =
- resource.downcast_rc::<TcpStreamResource>()
- {
- stream.read(&mut zero_copy).await?
- } else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
- stream.clone().read(&mut zero_copy).await?
- } else {
- return Err(bad_resource_id());
- };
- Ok(nread as i32)
- }
- .boxed_local()
- })
+ MinimalOp::Async(op_read_async(state, rid, buf).boxed_local())
}
}
+fn op_read_sync(
+ state: Rc<RefCell<OpState>>,
+ rid: i32,
+ mut buf: ZeroCopyBuf,
+) -> Result<i32, AnyError> {
+ let rid = rid.try_into().map_err(|_| bad_resource_id())?;
+ StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r {
+ Ok(std_file) => std_file
+ .read(&mut buf)
+ .map(|n: usize| n as i32)
+ .map_err(AnyError::from),
+ Err(_) => Err(not_supported()),
+ })
+}
+
+async fn op_read_async(
+ state: Rc<RefCell<OpState>>,
+ rid: i32,
+ mut buf: ZeroCopyBuf,
+) -> Result<i32, AnyError> {
+ let rid = rid.try_into().map_err(|_| bad_resource_id())?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ let nread = if let Some(s) = resource.downcast_rc::<ChildStdoutResource>() {
+ s.read(&mut buf).await?
+ } else if let Some(s) = resource.downcast_rc::<ChildStderrResource>() {
+ s.read(&mut buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.read(&mut buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() {
+ s.read(&mut buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() {
+ s.read(&mut buf).await?
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.read(&mut buf).await?
+ } else if let Some(s) = resource.downcast_rc::<StdFileResource>() {
+ s.read(&mut buf).await?
+ } else {
+ return Err(not_supported());
+ };
+ Ok(nread as i32)
+}
+
pub fn op_write(
state: Rc<RefCell<OpState>>,
is_sync: bool,
rid: i32,
- zero_copy: BufVec,
+ bufs: BufVec,
) -> MinimalOp {
- debug!("write rid={}", rid);
- match zero_copy.len() {
+ match bufs.len() {
0 => return MinimalOp::Sync(Err(no_buffer_specified())),
1 => {}
_ => panic!("Invalid number of arguments"),
- }
+ };
+ let buf = bufs.into_iter().next().unwrap();
if is_sync {
- MinimalOp::Sync({
- // First we look up the rid in the resource table.
- std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r {
- Ok(std_file) => {
- use std::io::Write;
- std_file
- .write(&zero_copy[0])
- .map(|nwritten: usize| nwritten as i32)
- .map_err(AnyError::from)
- }
- Err(_) => Err(type_error("sync read not allowed on this resource")),
- })
- })
+ MinimalOp::Sync(op_write_sync(state, rid, buf))
} else {
- let zero_copy = zero_copy[0].clone();
- MinimalOp::Async({
- async move {
- let resource = state
- .borrow()
- .resource_table
- .get_any(rid as u32)
- .ok_or_else(bad_resource_id)?;
- let nwritten = if let Some(stream) =
- resource.downcast_rc::<TcpStreamResource>()
- {
- stream.write(&zero_copy).await?
- } else if let Some(stream) = resource.downcast_rc::<StreamResource>() {
- stream.clone().write(&zero_copy).await?
- } else {
- return Err(bad_resource_id());
- };
- Ok(nwritten as i32)
- }
- .boxed_local()
- })
+ MinimalOp::Async(op_write_async(state, rid, buf).boxed_local())
}
}
-pub fn std_file_resource<F, T>(
- state: &mut OpState,
- rid: u32,
- mut f: F,
-) -> Result<T, AnyError>
-where
- F: FnMut(Result<&mut std::fs::File, ()>) -> Result<T, AnyError>,
-{
- // First we look up the rid in the resource table.
+fn op_write_sync(
+ state: Rc<RefCell<OpState>>,
+ rid: i32,
+ buf: ZeroCopyBuf,
+) -> Result<i32, AnyError> {
+ let rid = rid.try_into().map_err(|_| bad_resource_id())?;
+ StdFileResource::with(&mut state.borrow_mut(), rid, move |r| match r {
+ Ok(std_file) => std_file
+ .write(&buf)
+ .map(|nwritten: usize| nwritten as i32)
+ .map_err(AnyError::from),
+ Err(_) => Err(not_supported()),
+ })
+}
+
+async fn op_write_async(
+ state: Rc<RefCell<OpState>>,
+ rid: i32,
+ buf: ZeroCopyBuf,
+) -> Result<i32, AnyError> {
+ let rid = rid.try_into().map_err(|_| bad_resource_id())?;
let resource = state
+ .borrow()
.resource_table
- .get::<StreamResource>(rid)
+ .get_any(rid)
.ok_or_else(bad_resource_id)?;
+ let nwritten = if let Some(s) = resource.downcast_rc::<ChildStdinResource>() {
+ s.write(&buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.write(&buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() {
+ s.write(&buf).await?
+ } else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() {
+ s.write(&buf).await?
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.write(&buf).await?
+ } else if let Some(s) = resource.downcast_rc::<StdFileResource>() {
+ s.write(&buf).await?
+ } else {
+ return Err(not_supported());
+ };
+ Ok(nwritten as i32)
+}
- // Sync write only works for FsFile. It doesn't make sense to do this
- // for non-blocking sockets. So we error out if not FsFile.
- if resource.fs_file.is_none() {
- return f(Err(()));
- }
-
- // The object in the resource table is a tokio::fs::File - but in
- // order to do a blocking write on it, we must turn it into a
- // std::fs::File. Hopefully this code compiles down to nothing.
-
- let fs_file_resource =
- RcRef::map(&resource, |r| r.fs_file.as_ref().unwrap()).try_borrow_mut();
-
- if let Some(mut fs_file) = fs_file_resource {
- let tokio_file = fs_file.0.take().unwrap();
- match tokio_file.try_into_std() {
- Ok(mut std_file) => {
- let result = f(Ok(&mut std_file));
- // Turn the std_file handle back into a tokio file, put it back
- // in the resource table.
- let tokio_file = tokio::fs::File::from_std(std_file);
- fs_file.0 = Some(tokio_file);
- // return the result.
- result
- }
- Err(tokio_file) => {
- // This function will return an error containing the file if
- // some operation is in-flight.
- fs_file.0 = Some(tokio_file);
- Err(resource_unavailable())
- }
- }
+#[derive(Deserialize)]
+struct ShutdownArgs {
+ rid: i32,
+}
+
+async fn op_shutdown(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let rid = serde_json::from_value::<ShutdownArgs>(args)?
+ .rid
+ .try_into()
+ .map_err(|_| bad_resource_id())?;
+ let resource = state
+ .borrow()
+ .resource_table
+ .get_any(rid)
+ .ok_or_else(bad_resource_id)?;
+ if let Some(s) = resource.downcast_rc::<ChildStdinResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<TcpStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<TlsClientStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<TlsServerStreamResource>() {
+ s.shutdown().await?;
+ } else if let Some(s) = resource.downcast_rc::<UnixStreamResource>() {
+ s.shutdown().await?;
} else {
- Err(resource_unavailable())
+ return Err(not_supported());
}
+ Ok(json!({}))
}
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
index e3864b38a..7e80bb86b 100644
--- a/runtime/ops/net.rs
+++ b/runtime/ops/net.rs
@@ -5,7 +5,6 @@ use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
use deno_core::error::bad_resource;
-use deno_core::error::bad_resource_id;
use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::type_error;
@@ -27,7 +26,6 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::net::SocketAddr;
use std::rc::Rc;
-use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@@ -42,14 +40,13 @@ use trust_dns_resolver::AsyncResolver;
#[cfg(unix)]
use super::net_unix;
#[cfg(unix)]
-use crate::ops::io::StreamResource;
+use crate::ops::io::UnixStreamResource;
#[cfg(unix)]
use std::path::Path;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_async(rt, "op_accept", op_accept);
super::reg_json_async(rt, "op_connect", op_connect);
- super::reg_json_async(rt, "op_shutdown", op_shutdown);
super::reg_json_sync(rt, "op_listen", op_listen);
super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
@@ -318,7 +315,7 @@ async fn op_connect(
let remote_addr = unix_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
- let resource = StreamResource::unix_stream(unix_stream);
+ let resource = UnixStreamResource::new(unix_stream.into_split());
let rid = state_.resource_table.add(resource);
Ok(json!({
"rid": rid,
@@ -336,44 +333,6 @@ async fn op_connect(
}
}
-#[derive(Deserialize)]
-struct ShutdownArgs {
- rid: i32,
-}
-
-async fn op_shutdown(
- state: Rc<RefCell<OpState>>,
- args: Value,
- _zero_copy: BufVec,
-) -> Result<Value, AnyError> {
- let args: ShutdownArgs = serde_json::from_value(args)?;
- let rid = args.rid as u32;
-
- let resource = state
- .borrow()
- .resource_table
- .get_any(rid)
- .ok_or_else(bad_resource_id)?;
- if let Some(stream) = resource.downcast_rc::<TcpStreamResource>() {
- let mut wr = stream.wr_borrow_mut().await;
- wr.shutdown().await?;
- return Ok(json!({}));
- }
-
- #[cfg(unix)]
- if let Some(stream) = resource.downcast_rc::<StreamResource>() {
- if stream.unix_stream.is_some() {
- let mut wr = RcRef::map(stream, |r| r.unix_stream.as_ref().unwrap())
- .borrow_mut()
- .await;
- wr.shutdown().await?;
- return Ok(json!({}));
- }
- }
-
- Err(bad_resource_id())
-}
-
struct TcpListenerResource {
listener: AsyncRefCell<TcpListener>,
cancel: CancelHandle,
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
index 1177d071c..c3e561568 100644
--- a/runtime/ops/net_unix.rs
+++ b/runtime/ops/net_unix.rs
@@ -1,6 +1,6 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::ops::io::StreamResource;
+use crate::ops::io::UnixStreamResource;
use crate::ops::net::AcceptArgs;
use crate::ops::net::ReceiveArgs;
use deno_core::error::bad_resource;
@@ -81,7 +81,7 @@ pub(crate) async fn accept_unix(
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
- let resource = StreamResource::unix_stream(unix_stream);
+ let resource = UnixStreamResource::new(unix_stream.into_split());
let mut state = state.borrow_mut();
let rid = state.resource_table.add(resource);
Ok(json!({
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index 412d21ef2..89f323232 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -1,6 +1,9 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use super::io::{std_file_resource, StreamResource};
+use super::io::ChildStderrResource;
+use super::io::ChildStdinResource;
+use super::io::ChildStdoutResource;
+use super::io::StdFileResource;
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::type_error;
@@ -34,7 +37,7 @@ fn clone_file(
state: &mut OpState,
rid: u32,
) -> Result<std::fs::File, AnyError> {
- std_file_resource(state, rid, move |r| match r {
+ StdFileResource::with(state, rid, move |r| match r {
Ok(std_file) => std_file.try_clone().map_err(AnyError::from),
Err(_) => Err(bad_resource_id()),
})
@@ -134,7 +137,7 @@ fn op_run(
Some(child_stdin) => {
let rid = state
.resource_table
- .add(StreamResource::child_stdin(child_stdin));
+ .add(ChildStdinResource::from(child_stdin));
Some(rid)
}
None => None,
@@ -144,7 +147,7 @@ fn op_run(
Some(child_stdout) => {
let rid = state
.resource_table
- .add(StreamResource::child_stdout(child_stdout));
+ .add(ChildStdoutResource::from(child_stdout));
Some(rid)
}
None => None,
@@ -154,7 +157,7 @@ fn op_run(
Some(child_stderr) => {
let rid = state
.resource_table
- .add(StreamResource::child_stderr(child_stderr));
+ .add(ChildStderrResource::from(child_stderr));
Some(rid)
}
None => None,
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
index 2e3e34da2..b81bd5e23 100644
--- a/runtime/ops/tls.rs
+++ b/runtime/ops/tls.rs
@@ -1,7 +1,8 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use super::io::StreamResource;
use super::io::TcpStreamResource;
+use super::io::TlsClientStreamResource;
+use super::io::TlsServerStreamResource;
use crate::permissions::Permissions;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
@@ -148,7 +149,7 @@ async fn op_start_tls(
let mut state_ = state.borrow_mut();
state_
.resource_table
- .add(StreamResource::client_tls_stream(tls_stream))
+ .add(TlsClientStreamResource::from(tls_stream))
};
Ok(json!({
"rid": rid,
@@ -210,7 +211,7 @@ async fn op_connect_tls(
let mut state_ = state.borrow_mut();
state_
.resource_table
- .add(StreamResource::client_tls_stream(tls_stream))
+ .add(TlsClientStreamResource::from(tls_stream))
};
Ok(json!({
"rid": rid,
@@ -402,7 +403,7 @@ async fn op_accept_tls(
let mut state_ = state.borrow_mut();
state_
.resource_table
- .add(StreamResource::server_tls_stream(tls_stream))
+ .add(TlsServerStreamResource::from(tls_stream))
};
Ok(json!({
diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs
index dfde8e0d3..a8ff9938b 100644
--- a/runtime/ops/tty.rs
+++ b/runtime/ops/tty.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use super::io::std_file_resource;
-use super::io::StreamResource;
+use super::io::StdFileResource;
use deno_core::error::bad_resource_id;
use deno_core::error::not_supported;
use deno_core::error::resource_unavailable;
@@ -90,7 +89,7 @@ fn op_set_raw(
let resource = state
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if cbreak {
@@ -157,7 +156,7 @@ fn op_set_raw(
let resource = state
.resource_table
- .get::<StreamResource>(rid)
+ .get::<StdFileResource>(rid)
.ok_or_else(bad_resource_id)?;
if resource.fs_file.is_none() {
@@ -229,26 +228,27 @@ fn op_isatty(
let args: IsattyArgs = serde_json::from_value(args)?;
let rid = args.rid;
- let isatty: bool = std_file_resource(state, rid as u32, move |r| match r {
- Ok(std_file) => {
- #[cfg(windows)]
- {
- use winapi::um::consoleapi;
-
- let handle = get_windows_handle(&std_file)?;
- let mut test_mode: DWORD = 0;
- // If I cannot get mode out of console, it is not a console.
- Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 })
- }
- #[cfg(unix)]
- {
- use std::os::unix::io::AsRawFd;
- let raw_fd = std_file.as_raw_fd();
- Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
+ let isatty: bool =
+ StdFileResource::with(state, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ #[cfg(windows)]
+ {
+ use winapi::um::consoleapi;
+
+ let handle = get_windows_handle(&std_file)?;
+ let mut test_mode: DWORD = 0;
+ // If I cannot get mode out of console, it is not a console.
+ Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 })
+ }
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+ let raw_fd = std_file.as_raw_fd();
+ Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
+ }
}
- }
- _ => Ok(false),
- })?;
+ _ => Ok(false),
+ })?;
Ok(json!(isatty))
}
@@ -273,7 +273,7 @@ fn op_console_size(
let args: ConsoleSizeArgs = serde_json::from_value(args)?;
let rid = args.rid;
- let size = std_file_resource(state, rid as u32, move |r| match r {
+ let size = StdFileResource::with(state, rid as u32, move |r| match r {
Ok(std_file) => {
#[cfg(windows)]
{