summaryrefslogtreecommitdiff
path: root/runtime/ops
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-12-13 19:45:53 +0100
committerGitHub <noreply@github.com>2020-12-13 19:45:53 +0100
commit2e74f164b6dcf0ecbf8dd38fba9fae550d784bd0 (patch)
tree61abe8e09d5331ace5d9de529f0e2737a8e05dbb /runtime/ops
parent84ef9bd21fb48fb6b5fbc8dafc3de9f361bade3b (diff)
refactor: deno_runtime crate (#8640)
This commit moves Deno JS runtime, ops, permissions and inspector implementation to new "deno_runtime" crate located in "runtime/" directory. Details in "runtime/README.md". Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'runtime/ops')
-rw-r--r--runtime/ops/crypto.rs14
-rw-r--r--runtime/ops/dispatch_minimal.rs205
-rw-r--r--runtime/ops/fetch.rs25
-rw-r--r--runtime/ops/fs.rs1702
-rw-r--r--runtime/ops/fs_events.rs133
-rw-r--r--runtime/ops/io.rs473
-rw-r--r--runtime/ops/mod.rs89
-rw-r--r--runtime/ops/net.rs566
-rw-r--r--runtime/ops/net_unix.rs151
-rw-r--r--runtime/ops/os.rs192
-rw-r--r--runtime/ops/permissions.rs103
-rw-r--r--runtime/ops/plugin.rs156
-rw-r--r--runtime/ops/process.rs290
-rw-r--r--runtime/ops/runtime.rs118
-rw-r--r--runtime/ops/signal.rs142
-rw-r--r--runtime/ops/timers.rs193
-rw-r--r--runtime/ops/tls.rs431
-rw-r--r--runtime/ops/tty.rs334
-rw-r--r--runtime/ops/web_worker.rs37
-rw-r--r--runtime/ops/websocket.rs326
-rw-r--r--runtime/ops/worker_host.rs318
21 files changed, 5998 insertions, 0 deletions
diff --git a/runtime/ops/crypto.rs b/runtime/ops/crypto.rs
new file mode 100644
index 000000000..a73843a33
--- /dev/null
+++ b/runtime/ops/crypto.rs
@@ -0,0 +1,14 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use deno_crypto::op_get_random_values;
+use deno_crypto::rand::rngs::StdRng;
+use deno_crypto::rand::SeedableRng;
+
+pub fn init(rt: &mut deno_core::JsRuntime, maybe_seed: Option<u64>) {
+ if let Some(seed) = maybe_seed {
+ let rng = StdRng::seed_from_u64(seed);
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ state.put::<StdRng>(rng);
+ }
+ super::reg_json_sync(rt, "op_get_random_values", op_get_random_values);
+}
diff --git a/runtime/ops/dispatch_minimal.rs b/runtime/ops/dispatch_minimal.rs
new file mode 100644
index 000000000..ae8fa819d
--- /dev/null
+++ b/runtime/ops/dispatch_minimal.rs
@@ -0,0 +1,205 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::error::AnyError;
+use deno_core::futures::future::FutureExt;
+use deno_core::BufVec;
+use deno_core::Op;
+use deno_core::OpFn;
+use deno_core::OpState;
+use std::cell::RefCell;
+use std::future::Future;
+use std::iter::repeat;
+use std::mem::size_of_val;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::slice;
+
+pub enum MinimalOp {
+ Sync(Result<i32, AnyError>),
+ Async(Pin<Box<dyn Future<Output = Result<i32, AnyError>>>>),
+}
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+// This corresponds to RecordMinimal on the TS side.
+pub struct Record {
+ pub promise_id: i32,
+ pub arg: i32,
+ pub result: i32,
+}
+
+impl Into<Box<[u8]>> for Record {
+ fn into(self) -> Box<[u8]> {
+ let vec = vec![self.promise_id, self.arg, self.result];
+ let buf32 = vec.into_boxed_slice();
+ let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
+ unsafe { Box::from_raw(ptr) }
+ }
+}
+
+pub struct ErrorRecord {
+ pub promise_id: i32,
+ pub arg: i32,
+ pub error_len: i32,
+ pub error_class: &'static [u8],
+ pub error_message: Vec<u8>,
+}
+
+impl Into<Box<[u8]>> for ErrorRecord {
+ fn into(self) -> Box<[u8]> {
+ let Self {
+ promise_id,
+ arg,
+ error_len,
+ error_class,
+ error_message,
+ ..
+ } = self;
+ let header_i32 = [promise_id, arg, error_len];
+ let header_u8 = unsafe {
+ slice::from_raw_parts(
+ &header_i32 as *const _ as *const u8,
+ size_of_val(&header_i32),
+ )
+ };
+ let padded_len =
+ (header_u8.len() + error_class.len() + error_message.len() + 3usize)
+ & !3usize;
+ header_u8
+ .iter()
+ .cloned()
+ .chain(error_class.iter().cloned())
+ .chain(error_message.into_iter())
+ .chain(repeat(b' '))
+ .take(padded_len)
+ .collect()
+ }
+}
+
+#[test]
+fn test_error_record() {
+ let expected = vec![
+ 1, 0, 0, 0, 255, 255, 255, 255, 11, 0, 0, 0, 66, 97, 100, 82, 101, 115,
+ 111, 117, 114, 99, 101, 69, 114, 114, 111, 114,
+ ];
+ let err_record = ErrorRecord {
+ promise_id: 1,
+ arg: -1,
+ error_len: 11,
+ error_class: b"BadResource",
+ error_message: b"Error".to_vec(),
+ };
+ let buf: Box<[u8]> = err_record.into();
+ assert_eq!(buf, expected.into_boxed_slice());
+}
+
+pub fn parse_min_record(bytes: &[u8]) -> Option<Record> {
+ if bytes.len() % std::mem::size_of::<i32>() != 0 {
+ return None;
+ }
+ let p = bytes.as_ptr();
+ #[allow(clippy::cast_ptr_alignment)]
+ let p32 = p as *const i32;
+ let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) };
+
+ if s.len() != 3 {
+ return None;
+ }
+ let ptr = s.as_ptr();
+ let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
+ Some(Record {
+ promise_id: ints[0],
+ arg: ints[1],
+ result: ints[2],
+ })
+}
+
+#[test]
+fn test_parse_min_record() {
+ let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0];
+ assert_eq!(
+ parse_min_record(&buf),
+ Some(Record {
+ promise_id: 1,
+ arg: 3,
+ result: 4
+ })
+ );
+
+ let buf = vec![];
+ assert_eq!(parse_min_record(&buf), None);
+
+ let buf = vec![5];
+ assert_eq!(parse_min_record(&buf), None);
+}
+
+pub fn minimal_op<F>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(Rc<RefCell<OpState>>, bool, i32, BufVec) -> MinimalOp + 'static,
+{
+ Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| {
+ let mut bufs_iter = bufs.into_iter();
+ let record_buf = bufs_iter.next().expect("Expected record at position 0");
+ let zero_copy = bufs_iter.collect::<BufVec>();
+
+ let mut record = match parse_min_record(&record_buf) {
+ Some(r) => r,
+ None => {
+ let error_class = b"TypeError";
+ let error_message = b"Unparsable control buffer";
+ let error_record = ErrorRecord {
+ promise_id: 0,
+ arg: -1,
+ error_len: error_class.len() as i32,
+ error_class,
+ error_message: error_message[..].to_owned(),
+ };
+ return Op::Sync(error_record.into());
+ }
+ };
+ let is_sync = record.promise_id == 0;
+ let rid = record.arg;
+ let min_op = op_fn(state.clone(), is_sync, rid, zero_copy);
+
+ match min_op {
+ MinimalOp::Sync(sync_result) => Op::Sync(match sync_result {
+ Ok(r) => {
+ record.result = r;
+ record.into()
+ }
+ Err(err) => {
+ let error_class = (state.borrow().get_error_class_fn)(&err);
+ let error_record = ErrorRecord {
+ promise_id: record.promise_id,
+ arg: -1,
+ error_len: error_class.len() as i32,
+ error_class: error_class.as_bytes(),
+ error_message: err.to_string().as_bytes().to_owned(),
+ };
+ error_record.into()
+ }
+ }),
+ MinimalOp::Async(min_fut) => {
+ let fut = async move {
+ match min_fut.await {
+ Ok(r) => {
+ record.result = r;
+ record.into()
+ }
+ Err(err) => {
+ let error_class = (state.borrow().get_error_class_fn)(&err);
+ let error_record = ErrorRecord {
+ promise_id: record.promise_id,
+ arg: -1,
+ error_len: error_class.len() as i32,
+ error_class: error_class.as_bytes(),
+ error_message: err.to_string().as_bytes().to_owned(),
+ };
+ error_record.into()
+ }
+ }
+ };
+ Op::Async(fut.boxed_local())
+ }
+ }
+ })
+}
diff --git a/runtime/ops/fetch.rs b/runtime/ops/fetch.rs
new file mode 100644
index 000000000..0ef99f73d
--- /dev/null
+++ b/runtime/ops/fetch.rs
@@ -0,0 +1,25 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+use crate::http_util;
+use crate::permissions::Permissions;
+use deno_fetch::reqwest;
+
+pub fn init(
+ rt: &mut deno_core::JsRuntime,
+ user_agent: String,
+ maybe_ca_file: Option<&str>,
+) {
+ {
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ state.put::<reqwest::Client>({
+ http_util::create_http_client(user_agent, maybe_ca_file).unwrap()
+ });
+ }
+ super::reg_json_async(rt, "op_fetch", deno_fetch::op_fetch::<Permissions>);
+ super::reg_json_async(rt, "op_fetch_read", deno_fetch::op_fetch_read);
+ super::reg_json_sync(
+ rt,
+ "op_create_http_client",
+ deno_fetch::op_create_http_client::<Permissions>,
+ );
+}
diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs
new file mode 100644
index 000000000..865c5bcca
--- /dev/null
+++ b/runtime/ops/fs.rs
@@ -0,0 +1,1702 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+// Some deserializer fields are only used on Unix and Windows build fails without it
+use super::io::std_file_resource;
+use super::io::{FileMetadata, StreamResource, StreamResourceHolder};
+use crate::fs_util::canonicalize_path;
+use crate::permissions::Permissions;
+use deno_core::error::custom_error;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use deno_crypto::rand::thread_rng;
+use deno_crypto::rand::Rng;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::convert::From;
+use std::env::{current_dir, set_current_dir, temp_dir};
+use std::io;
+use std::io::{Seek, SeekFrom};
+use std::path::{Path, PathBuf};
+use std::rc::Rc;
+use std::time::SystemTime;
+use std::time::UNIX_EPOCH;
+
+#[cfg(not(unix))]
+use deno_core::error::generic_error;
+#[cfg(not(unix))]
+use deno_core::error::not_supported;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_open_sync", op_open_sync);
+ super::reg_json_async(rt, "op_open_async", op_open_async);
+
+ super::reg_json_sync(rt, "op_seek_sync", op_seek_sync);
+ super::reg_json_async(rt, "op_seek_async", op_seek_async);
+
+ super::reg_json_sync(rt, "op_fdatasync_sync", op_fdatasync_sync);
+ super::reg_json_async(rt, "op_fdatasync_async", op_fdatasync_async);
+
+ super::reg_json_sync(rt, "op_fsync_sync", op_fsync_sync);
+ super::reg_json_async(rt, "op_fsync_async", op_fsync_async);
+
+ super::reg_json_sync(rt, "op_fstat_sync", op_fstat_sync);
+ super::reg_json_async(rt, "op_fstat_async", op_fstat_async);
+
+ super::reg_json_sync(rt, "op_umask", op_umask);
+ super::reg_json_sync(rt, "op_chdir", op_chdir);
+
+ super::reg_json_sync(rt, "op_mkdir_sync", op_mkdir_sync);
+ super::reg_json_async(rt, "op_mkdir_async", op_mkdir_async);
+
+ super::reg_json_sync(rt, "op_chmod_sync", op_chmod_sync);
+ super::reg_json_async(rt, "op_chmod_async", op_chmod_async);
+
+ super::reg_json_sync(rt, "op_chown_sync", op_chown_sync);
+ super::reg_json_async(rt, "op_chown_async", op_chown_async);
+
+ super::reg_json_sync(rt, "op_remove_sync", op_remove_sync);
+ super::reg_json_async(rt, "op_remove_async", op_remove_async);
+
+ super::reg_json_sync(rt, "op_copy_file_sync", op_copy_file_sync);
+ super::reg_json_async(rt, "op_copy_file_async", op_copy_file_async);
+
+ super::reg_json_sync(rt, "op_stat_sync", op_stat_sync);
+ super::reg_json_async(rt, "op_stat_async", op_stat_async);
+
+ super::reg_json_sync(rt, "op_realpath_sync", op_realpath_sync);
+ super::reg_json_async(rt, "op_realpath_async", op_realpath_async);
+
+ super::reg_json_sync(rt, "op_read_dir_sync", op_read_dir_sync);
+ super::reg_json_async(rt, "op_read_dir_async", op_read_dir_async);
+
+ super::reg_json_sync(rt, "op_rename_sync", op_rename_sync);
+ super::reg_json_async(rt, "op_rename_async", op_rename_async);
+
+ super::reg_json_sync(rt, "op_link_sync", op_link_sync);
+ super::reg_json_async(rt, "op_link_async", op_link_async);
+
+ super::reg_json_sync(rt, "op_symlink_sync", op_symlink_sync);
+ super::reg_json_async(rt, "op_symlink_async", op_symlink_async);
+
+ super::reg_json_sync(rt, "op_read_link_sync", op_read_link_sync);
+ super::reg_json_async(rt, "op_read_link_async", op_read_link_async);
+
+ super::reg_json_sync(rt, "op_ftruncate_sync", op_ftruncate_sync);
+ super::reg_json_async(rt, "op_ftruncate_async", op_ftruncate_async);
+
+ super::reg_json_sync(rt, "op_truncate_sync", op_truncate_sync);
+ super::reg_json_async(rt, "op_truncate_async", op_truncate_async);
+
+ super::reg_json_sync(rt, "op_make_temp_dir_sync", op_make_temp_dir_sync);
+ super::reg_json_async(rt, "op_make_temp_dir_async", op_make_temp_dir_async);
+
+ super::reg_json_sync(rt, "op_make_temp_file_sync", op_make_temp_file_sync);
+ super::reg_json_async(rt, "op_make_temp_file_async", op_make_temp_file_async);
+
+ super::reg_json_sync(rt, "op_cwd", op_cwd);
+
+ super::reg_json_sync(rt, "op_futime_sync", op_futime_sync);
+ super::reg_json_async(rt, "op_futime_async", op_futime_async);
+
+ super::reg_json_sync(rt, "op_utime_sync", op_utime_sync);
+ super::reg_json_async(rt, "op_utime_async", op_utime_async);
+}
+
+fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
+ s.into_string().map_err(|s| {
+ let message = format!("File name or path {:?} is not valid UTF-8", s);
+ custom_error("InvalidData", message)
+ })
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct OpenArgs {
+ path: String,
+ mode: Option<u32>,
+ options: OpenOptions,
+}
+
+#[derive(Deserialize, Default, Debug)]
+#[serde(rename_all = "camelCase")]
+#[serde(default)]
+struct OpenOptions {
+ read: bool,
+ write: bool,
+ create: bool,
+ truncate: bool,
+ append: bool,
+ create_new: bool,
+}
+
+fn open_helper(
+ state: &mut OpState,
+ args: Value,
+) -> Result<(PathBuf, std::fs::OpenOptions), AnyError> {
+ let args: OpenArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+
+ let mut open_options = std::fs::OpenOptions::new();
+
+ if let Some(mode) = args.mode {
+ // mode only used if creating the file on Unix
+ // if not specified, defaults to 0o666
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::OpenOptionsExt;
+ open_options.mode(mode & 0o777);
+ }
+ #[cfg(not(unix))]
+ let _ = mode; // avoid unused warning
+ }
+
+ let permissions = state.borrow::<Permissions>();
+ let options = args.options;
+
+ if options.read {
+ permissions.check_read(&path)?;
+ }
+
+ if options.write || options.append {
+ permissions.check_write(&path)?;
+ }
+
+ open_options
+ .read(options.read)
+ .create(options.create)
+ .write(options.write)
+ .truncate(options.truncate)
+ .append(options.append)
+ .create_new(options.create_new);
+
+ Ok((path, open_options))
+}
+
+fn op_open_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let (path, open_options) = open_helper(state, args)?;
+ let std_file = open_options.open(path)?;
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ let rid = state.resource_table.add(
+ "fsFile",
+ Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
+ tokio_file,
+ FileMetadata::default(),
+ ))))),
+ );
+ Ok(json!(rid))
+}
+
+async fn op_open_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let (path, open_options) = open_helper(&mut state.borrow_mut(), args)?;
+ let tokio_file = tokio::fs::OpenOptions::from(open_options)
+ .open(path)
+ .await?;
+ let rid = state.borrow_mut().resource_table.add(
+ "fsFile",
+ Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some((
+ tokio_file,
+ FileMetadata::default(),
+ ))))),
+ );
+ Ok(json!(rid))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SeekArgs {
+ rid: i32,
+ offset: i64,
+ whence: i32,
+}
+
+fn seek_helper(args: Value) -> Result<(u32, SeekFrom), AnyError> {
+ let args: SeekArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let offset = args.offset;
+ let whence = args.whence as u32;
+ // Translate seek mode to Rust repr.
+ let seek_from = match whence {
+ 0 => SeekFrom::Start(offset as u64),
+ 1 => SeekFrom::Current(offset),
+ 2 => SeekFrom::End(offset),
+ _ => {
+ return Err(type_error(format!("Invalid seek mode: {}", whence)));
+ }
+ };
+
+ Ok((rid, seek_from))
+}
+
+fn op_seek_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let (rid, seek_from) = seek_helper(args)?;
+ let pos = std_file_resource(state, rid, |r| match r {
+ Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from),
+ Err(_) => Err(type_error(
+ "cannot seek on this type of resource".to_string(),
+ )),
+ })?;
+ Ok(json!(pos))
+}
+
+async fn op_seek_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let (rid, seek_from) = seek_helper(args)?;
+ // TODO(ry) This is a fake async op. We need to use poll_fn,
+ // tokio::fs::File::start_seek and tokio::fs::File::poll_complete
+ let pos = std_file_resource(&mut state.borrow_mut(), rid, |r| match r {
+ Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from),
+ Err(_) => Err(type_error(
+ "cannot seek on this type of resource".to_string(),
+ )),
+ })?;
+ Ok(json!(pos))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct FdatasyncArgs {
+ rid: i32,
+}
+
+fn op_fdatasync_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: FdatasyncArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ std_file_resource(state, rid, |r| match r {
+ Ok(std_file) => std_file.sync_data().map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot sync this type of resource".to_string())),
+ })?;
+ Ok(json!({}))
+}
+
+async fn op_fdatasync_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: FdatasyncArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ std_file_resource(&mut state.borrow_mut(), rid, |r| match r {
+ Ok(std_file) => std_file.sync_data().map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot sync this type of resource".to_string())),
+ })?;
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct FsyncArgs {
+ rid: i32,
+}
+
+fn op_fsync_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: FsyncArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ std_file_resource(state, rid, |r| match r {
+ Ok(std_file) => std_file.sync_all().map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot sync this type of resource".to_string())),
+ })?;
+ Ok(json!({}))
+}
+
+async fn op_fsync_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: FsyncArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ std_file_resource(&mut state.borrow_mut(), rid, |r| match r {
+ Ok(std_file) => std_file.sync_all().map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot sync this type of resource".to_string())),
+ })?;
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct FstatArgs {
+ rid: i32,
+}
+
+fn op_fstat_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.fstat");
+ let args: FstatArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let metadata = std_file_resource(state, rid, |r| match r {
+ Ok(std_file) => std_file.metadata().map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot stat this type of resource".to_string())),
+ })?;
+ Ok(get_stat_json(metadata))
+}
+
+async fn op_fstat_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ super::check_unstable2(&state, "Deno.fstat");
+
+ let args: FstatArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let metadata =
+ std_file_resource(&mut state.borrow_mut(), rid, |r| match r {
+ Ok(std_file) => std_file.metadata().map_err(AnyError::from),
+ Err(_) => {
+ Err(type_error("cannot stat this type of resource".to_string()))
+ }
+ })?;
+ Ok(get_stat_json(metadata))
+}
+
+#[derive(Deserialize)]
+struct UmaskArgs {
+ mask: Option<u32>,
+}
+
+fn op_umask(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.umask");
+ let args: UmaskArgs = serde_json::from_value(args)?;
+ // TODO implement umask for Windows
+ // see https://github.com/nodejs/node/blob/master/src/node_process_methods.cc
+ // and https://docs.microsoft.com/fr-fr/cpp/c-runtime-library/reference/umask?view=vs-2019
+ #[cfg(not(unix))]
+ {
+ let _ = args.mask; // avoid unused warning.
+ Err(not_supported())
+ }
+ #[cfg(unix)]
+ {
+ use nix::sys::stat::mode_t;
+ use nix::sys::stat::umask;
+ use nix::sys::stat::Mode;
+ let r = if let Some(mask) = args.mask {
+ // If mask provided, return previous.
+ umask(Mode::from_bits_truncate(mask as mode_t))
+ } else {
+ // If no mask provided, we query the current. Requires two syscalls.
+ let prev = umask(Mode::from_bits_truncate(0o777));
+ let _ = umask(prev);
+ prev
+ };
+ Ok(json!(r.bits() as u32))
+ }
+}
+
+#[derive(Deserialize)]
+struct ChdirArgs {
+ directory: String,
+}
+
+fn op_chdir(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: ChdirArgs = serde_json::from_value(args)?;
+ let d = PathBuf::from(&args.directory);
+ state.borrow::<Permissions>().check_read(&d)?;
+ set_current_dir(&d)?;
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct MkdirArgs {
+ path: String,
+ recursive: bool,
+ mode: Option<u32>,
+}
+
+fn op_mkdir_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: MkdirArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+ let mode = args.mode.unwrap_or(0o777) & 0o777;
+ state.borrow::<Permissions>().check_write(&path)?;
+ debug!("op_mkdir {} {:o} {}", path.display(), mode, args.recursive);
+ let mut builder = std::fs::DirBuilder::new();
+ builder.recursive(args.recursive);
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::DirBuilderExt;
+ builder.mode(mode);
+ }
+ builder.create(path)?;
+ Ok(json!({}))
+}
+
+async fn op_mkdir_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: MkdirArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+ let mode = args.mode.unwrap_or(0o777) & 0o777;
+
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_write(&path)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!("op_mkdir {} {:o} {}", path.display(), mode, args.recursive);
+ let mut builder = std::fs::DirBuilder::new();
+ builder.recursive(args.recursive);
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::DirBuilderExt;
+ builder.mode(mode);
+ }
+ builder.create(path)?;
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ChmodArgs {
+ path: String,
+ mode: u32,
+}
+
+fn op_chmod_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: ChmodArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+ let mode = args.mode & 0o777;
+
+ state.borrow::<Permissions>().check_write(&path)?;
+ debug!("op_chmod_sync {} {:o}", path.display(), mode);
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::PermissionsExt;
+ let permissions = PermissionsExt::from_mode(mode);
+ std::fs::set_permissions(&path, permissions)?;
+ Ok(json!({}))
+ }
+ // TODO Implement chmod for Windows (#4357)
+ #[cfg(not(unix))]
+ {
+ // Still check file/dir exists on Windows
+ let _metadata = std::fs::metadata(&path)?;
+ Err(generic_error("Not implemented"))
+ }
+}
+
+async fn op_chmod_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: ChmodArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+ let mode = args.mode & 0o777;
+
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_write(&path)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!("op_chmod_async {} {:o}", path.display(), mode);
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::PermissionsExt;
+ let permissions = PermissionsExt::from_mode(mode);
+ std::fs::set_permissions(&path, permissions)?;
+ Ok(json!({}))
+ }
+ // TODO Implement chmod for Windows (#4357)
+ #[cfg(not(unix))]
+ {
+ // Still check file/dir exists on Windows
+ let _metadata = std::fs::metadata(&path)?;
+ Err(not_supported())
+ }
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ChownArgs {
+ path: String,
+ uid: Option<u32>,
+ gid: Option<u32>,
+}
+
+fn op_chown_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: ChownArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+ state.borrow::<Permissions>().check_write(&path)?;
+ debug!(
+ "op_chown_sync {} {:?} {:?}",
+ path.display(),
+ args.uid,
+ args.gid,
+ );
+ #[cfg(unix)]
+ {
+ use nix::unistd::{chown, Gid, Uid};
+ let nix_uid = args.uid.map(Uid::from_raw);
+ let nix_gid = args.gid.map(Gid::from_raw);
+ chown(&path, nix_uid, nix_gid)?;
+ Ok(json!({}))
+ }
+ // TODO Implement chown for Windows
+ #[cfg(not(unix))]
+ {
+ Err(generic_error("Not implemented"))
+ }
+}
+
+async fn op_chown_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: ChownArgs = serde_json::from_value(args)?;
+ let path = Path::new(&args.path).to_path_buf();
+
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_write(&path)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!(
+ "op_chown_async {} {:?} {:?}",
+ path.display(),
+ args.uid,
+ args.gid,
+ );
+ #[cfg(unix)]
+ {
+ use nix::unistd::{chown, Gid, Uid};
+ let nix_uid = args.uid.map(Uid::from_raw);
+ let nix_gid = args.gid.map(Gid::from_raw);
+ chown(&path, nix_uid, nix_gid)?;
+ Ok(json!({}))
+ }
+ // TODO Implement chown for Windows
+ #[cfg(not(unix))]
+ Err(not_supported())
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RemoveArgs {
+ path: String,
+ recursive: bool,
+}
+
+fn op_remove_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: RemoveArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let recursive = args.recursive;
+
+ state.borrow::<Permissions>().check_write(&path)?;
+
+ #[cfg(not(unix))]
+ use std::os::windows::prelude::MetadataExt;
+
+ let metadata = std::fs::symlink_metadata(&path)?;
+
+ debug!("op_remove_sync {} {}", path.display(), recursive);
+ let file_type = metadata.file_type();
+ if file_type.is_file() {
+ std::fs::remove_file(&path)?;
+ } else if recursive {
+ std::fs::remove_dir_all(&path)?;
+ } else if file_type.is_symlink() {
+ #[cfg(unix)]
+ std::fs::remove_file(&path)?;
+ #[cfg(not(unix))]
+ {
+ use winapi::um::winnt::FILE_ATTRIBUTE_DIRECTORY;
+ if metadata.file_attributes() & FILE_ATTRIBUTE_DIRECTORY != 0 {
+ std::fs::remove_dir(&path)?;
+ } else {
+ std::fs::remove_file(&path)?;
+ }
+ }
+ } else if file_type.is_dir() {
+ std::fs::remove_dir(&path)?;
+ } else {
+ // pipes, sockets, etc...
+ std::fs::remove_file(&path)?;
+ }
+ Ok(json!({}))
+}
+
+async fn op_remove_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: RemoveArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let recursive = args.recursive;
+
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_write(&path)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ #[cfg(not(unix))]
+ use std::os::windows::prelude::MetadataExt;
+
+ let metadata = std::fs::symlink_metadata(&path)?;
+
+ debug!("op_remove_async {} {}", path.display(), recursive);
+ let file_type = metadata.file_type();
+ if file_type.is_file() {
+ std::fs::remove_file(&path)?;
+ } else if recursive {
+ std::fs::remove_dir_all(&path)?;
+ } else if file_type.is_symlink() {
+ #[cfg(unix)]
+ std::fs::remove_file(&path)?;
+ #[cfg(not(unix))]
+ {
+ use winapi::um::winnt::FILE_ATTRIBUTE_DIRECTORY;
+ if metadata.file_attributes() & FILE_ATTRIBUTE_DIRECTORY != 0 {
+ std::fs::remove_dir(&path)?;
+ } else {
+ std::fs::remove_file(&path)?;
+ }
+ }
+ } else if file_type.is_dir() {
+ std::fs::remove_dir(&path)?;
+ } else {
+ // pipes, sockets, etc...
+ std::fs::remove_file(&path)?;
+ }
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CopyFileArgs {
+ from: String,
+ to: String,
+}
+
+fn op_copy_file_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: CopyFileArgs = serde_json::from_value(args)?;
+ let from = PathBuf::from(&args.from);
+ let to = PathBuf::from(&args.to);
+
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&from)?;
+ permissions.check_write(&to)?;
+
+ debug!("op_copy_file_sync {} {}", from.display(), to.display());
+ // On *nix, Rust reports non-existent `from` as ErrorKind::InvalidInput
+ // See https://github.com/rust-lang/rust/issues/54800
+ // Once the issue is resolved, we should remove this workaround.
+ if cfg!(unix) && !from.is_file() {
+ return Err(custom_error("NotFound", "File not found"));
+ }
+
+ // returns size of from as u64 (we ignore)
+ std::fs::copy(&from, &to)?;
+ Ok(json!({}))
+}
+
+async fn op_copy_file_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: CopyFileArgs = serde_json::from_value(args)?;
+ let from = PathBuf::from(&args.from);
+ let to = PathBuf::from(&args.to);
+
+ {
+ let state = state.borrow();
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&from)?;
+ permissions.check_write(&to)?;
+ }
+
+ debug!("op_copy_file_async {} {}", from.display(), to.display());
+ tokio::task::spawn_blocking(move || {
+ // On *nix, Rust reports non-existent `from` as ErrorKind::InvalidInput
+ // See https://github.com/rust-lang/rust/issues/54800
+ // Once the issue is resolved, we should remove this workaround.
+ if cfg!(unix) && !from.is_file() {
+ return Err(custom_error("NotFound", "File not found"));
+ }
+
+ // returns size of from as u64 (we ignore)
+ std::fs::copy(&from, &to)?;
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+fn to_msec(maybe_time: Result<SystemTime, io::Error>) -> Value {
+ match maybe_time {
+ Ok(time) => {
+ let msec = time
+ .duration_since(UNIX_EPOCH)
+ .map(|t| t.as_secs_f64() * 1000f64)
+ .unwrap_or_else(|err| err.duration().as_secs_f64() * -1000f64);
+ serde_json::Number::from_f64(msec)
+ .map(Value::Number)
+ .unwrap_or(Value::Null)
+ }
+ Err(_) => Value::Null,
+ }
+}
+
+#[inline(always)]
+fn get_stat_json(metadata: std::fs::Metadata) -> Value {
+ // Unix stat member (number types only). 0 if not on unix.
+ macro_rules! usm {
+ ($member:ident) => {{
+ #[cfg(unix)]
+ {
+ metadata.$member()
+ }
+ #[cfg(not(unix))]
+ {
+ 0
+ }
+ }};
+ }
+
+ #[cfg(unix)]
+ use std::os::unix::fs::MetadataExt;
+ let json_val = json!({
+ "isFile": metadata.is_file(),
+ "isDirectory": metadata.is_dir(),
+ "isSymlink": metadata.file_type().is_symlink(),
+ "size": metadata.len(),
+ // In milliseconds, like JavaScript. Available on both Unix or Windows.
+ "mtime": to_msec(metadata.modified()),
+ "atime": to_msec(metadata.accessed()),
+ "birthtime": to_msec(metadata.created()),
+ // Following are only valid under Unix.
+ "dev": usm!(dev),
+ "ino": usm!(ino),
+ "mode": usm!(mode),
+ "nlink": usm!(nlink),
+ "uid": usm!(uid),
+ "gid": usm!(gid),
+ "rdev": usm!(rdev),
+ // TODO(kevinkassimo): *time_nsec requires BigInt.
+ // Probably should be treated as String if we need to add them.
+ "blksize": usm!(blksize),
+ "blocks": usm!(blocks),
+ });
+ json_val
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct StatArgs {
+ path: String,
+ lstat: bool,
+}
+
+fn op_stat_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: StatArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let lstat = args.lstat;
+ state.borrow::<Permissions>().check_read(&path)?;
+ debug!("op_stat_sync {} {}", path.display(), lstat);
+ let metadata = if lstat {
+ std::fs::symlink_metadata(&path)?
+ } else {
+ std::fs::metadata(&path)?
+ };
+ Ok(get_stat_json(metadata))
+}
+
+async fn op_stat_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: StatArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let lstat = args.lstat;
+
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_read(&path)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!("op_stat_async {} {}", path.display(), lstat);
+ let metadata = if lstat {
+ std::fs::symlink_metadata(&path)?
+ } else {
+ std::fs::metadata(&path)?
+ };
+ Ok(get_stat_json(metadata))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RealpathArgs {
+ path: String,
+}
+
+fn op_realpath_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: RealpathArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&path)?;
+ if path.is_relative() {
+ permissions.check_read_blind(&current_dir()?, "CWD")?;
+ }
+
+ debug!("op_realpath_sync {}", path.display());
+ // corresponds to the realpath on Unix and
+ // CreateFile and GetFinalPathNameByHandle on Windows
+ let realpath = canonicalize_path(&path)?;
+ let realpath_str = into_string(realpath.into_os_string())?;
+ Ok(json!(realpath_str))
+}
+
+async fn op_realpath_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: RealpathArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+
+ {
+ let state = state.borrow();
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&path)?;
+ if path.is_relative() {
+ permissions.check_read_blind(&current_dir()?, "CWD")?;
+ }
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!("op_realpath_async {}", path.display());
+ // corresponds to the realpath on Unix and
+ // CreateFile and GetFinalPathNameByHandle on Windows
+ let realpath = canonicalize_path(&path)?;
+ let realpath_str = into_string(realpath.into_os_string())?;
+ Ok(json!(realpath_str))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ReadDirArgs {
+ path: String,
+}
+
+fn op_read_dir_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: ReadDirArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+
+ state.borrow::<Permissions>().check_read(&path)?;
+
+ debug!("op_read_dir_sync {}", path.display());
+ let entries: Vec<_> = std::fs::read_dir(path)?
+ .filter_map(|entry| {
+ let entry = entry.unwrap();
+ let file_type = entry.file_type().unwrap();
+ // Not all filenames can be encoded as UTF-8. Skip those for now.
+ if let Ok(name) = into_string(entry.file_name()) {
+ Some(json!({
+ "name": name,
+ "isFile": file_type.is_file(),
+ "isDirectory": file_type.is_dir(),
+ "isSymlink": file_type.is_symlink()
+ }))
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ Ok(json!({ "entries": entries }))
+}
+
+async fn op_read_dir_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: ReadDirArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_read(&path)?;
+ }
+ tokio::task::spawn_blocking(move || {
+ debug!("op_read_dir_async {}", path.display());
+ let entries: Vec<_> = std::fs::read_dir(path)?
+ .filter_map(|entry| {
+ let entry = entry.unwrap();
+ let file_type = entry.file_type().unwrap();
+ // Not all filenames can be encoded as UTF-8. Skip those for now.
+ if let Ok(name) = into_string(entry.file_name()) {
+ Some(json!({
+ "name": name,
+ "isFile": file_type.is_file(),
+ "isDirectory": file_type.is_dir(),
+ "isSymlink": file_type.is_symlink()
+ }))
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ Ok(json!({ "entries": entries }))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RenameArgs {
+ oldpath: String,
+ newpath: String,
+}
+
+fn op_rename_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: RenameArgs = serde_json::from_value(args)?;
+ let oldpath = PathBuf::from(&args.oldpath);
+ let newpath = PathBuf::from(&args.newpath);
+
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&oldpath)?;
+ permissions.check_write(&oldpath)?;
+ permissions.check_write(&newpath)?;
+ debug!("op_rename_sync {} {}", oldpath.display(), newpath.display());
+ std::fs::rename(&oldpath, &newpath)?;
+ Ok(json!({}))
+}
+
+async fn op_rename_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: RenameArgs = serde_json::from_value(args)?;
+ let oldpath = PathBuf::from(&args.oldpath);
+ let newpath = PathBuf::from(&args.newpath);
+ {
+ let state = state.borrow();
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&oldpath)?;
+ permissions.check_write(&oldpath)?;
+ permissions.check_write(&newpath)?;
+ }
+ tokio::task::spawn_blocking(move || {
+ debug!(
+ "op_rename_async {} {}",
+ oldpath.display(),
+ newpath.display()
+ );
+ std::fs::rename(&oldpath, &newpath)?;
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct LinkArgs {
+ oldpath: String,
+ newpath: String,
+}
+
+fn op_link_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.link");
+ let args: LinkArgs = serde_json::from_value(args)?;
+ let oldpath = PathBuf::from(&args.oldpath);
+ let newpath = PathBuf::from(&args.newpath);
+
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&oldpath)?;
+ permissions.check_write(&newpath)?;
+
+ debug!("op_link_sync {} {}", oldpath.display(), newpath.display());
+ std::fs::hard_link(&oldpath, &newpath)?;
+ Ok(json!({}))
+}
+
+async fn op_link_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ super::check_unstable2(&state, "Deno.link");
+
+ let args: LinkArgs = serde_json::from_value(args)?;
+ let oldpath = PathBuf::from(&args.oldpath);
+ let newpath = PathBuf::from(&args.newpath);
+
+ {
+ let state = state.borrow();
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_read(&oldpath)?;
+ permissions.check_write(&newpath)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!("op_link_async {} {}", oldpath.display(), newpath.display());
+ std::fs::hard_link(&oldpath, &newpath)?;
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SymlinkArgs {
+ oldpath: String,
+ newpath: String,
+ #[cfg(not(unix))]
+ options: Option<SymlinkOptions>,
+}
+
+#[cfg(not(unix))]
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SymlinkOptions {
+ _type: String,
+}
+
+fn op_symlink_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.symlink");
+ let args: SymlinkArgs = serde_json::from_value(args)?;
+ let oldpath = PathBuf::from(&args.oldpath);
+ let newpath = PathBuf::from(&args.newpath);
+
+ state.borrow::<Permissions>().check_write(&newpath)?;
+
+ debug!(
+ "op_symlink_sync {} {}",
+ oldpath.display(),
+ newpath.display()
+ );
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::symlink;
+ symlink(&oldpath, &newpath)?;
+ Ok(json!({}))
+ }
+ #[cfg(not(unix))]
+ {
+ use std::os::windows::fs::{symlink_dir, symlink_file};
+
+ match args.options {
+ Some(options) => match options._type.as_ref() {
+ "file" => symlink_file(&oldpath, &newpath)?,
+ "dir" => symlink_dir(&oldpath, &newpath)?,
+ _ => return Err(type_error("unsupported type")),
+ },
+ None => {
+ let old_meta = std::fs::metadata(&oldpath);
+ match old_meta {
+ Ok(metadata) => {
+ if metadata.is_file() {
+ symlink_file(&oldpath, &newpath)?
+ } else if metadata.is_dir() {
+ symlink_dir(&oldpath, &newpath)?
+ }
+ }
+ Err(_) => return Err(type_error("you must pass a `options` argument for non-existent target path in windows".to_string())),
+ }
+ }
+ };
+ Ok(json!({}))
+ }
+}
+
+async fn op_symlink_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ super::check_unstable2(&state, "Deno.symlink");
+
+ let args: SymlinkArgs = serde_json::from_value(args)?;
+ let oldpath = PathBuf::from(&args.oldpath);
+ let newpath = PathBuf::from(&args.newpath);
+
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_write(&newpath)?;
+ }
+
+ tokio::task::spawn_blocking(move || {
+ debug!("op_symlink_async {} {}", oldpath.display(), newpath.display());
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::symlink;
+ symlink(&oldpath, &newpath)?;
+ Ok(json!({}))
+ }
+ #[cfg(not(unix))]
+ {
+ use std::os::windows::fs::{symlink_dir, symlink_file};
+
+ match args.options {
+ Some(options) => match options._type.as_ref() {
+ "file" => symlink_file(&oldpath, &newpath)?,
+ "dir" => symlink_dir(&oldpath, &newpath)?,
+ _ => return Err(type_error("unsupported type")),
+ },
+ None => {
+ let old_meta = std::fs::metadata(&oldpath);
+ match old_meta {
+ Ok(metadata) => {
+ if metadata.is_file() {
+ symlink_file(&oldpath, &newpath)?
+ } else if metadata.is_dir() {
+ symlink_dir(&oldpath, &newpath)?
+ }
+ }
+ Err(_) => return Err(type_error("you must pass a `options` argument for non-existent target path in windows".to_string())),
+ }
+ }
+ };
+ Ok(json!({}))
+ }
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ReadLinkArgs {
+ path: String,
+}
+
+fn op_read_link_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: ReadLinkArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+
+ state.borrow::<Permissions>().check_read(&path)?;
+
+ debug!("op_read_link_value {}", path.display());
+ let target = std::fs::read_link(&path)?.into_os_string();
+ let targetstr = into_string(target)?;
+ Ok(json!(targetstr))
+}
+
+async fn op_read_link_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: ReadLinkArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_read(&path)?;
+ }
+ tokio::task::spawn_blocking(move || {
+ debug!("op_read_link_async {}", path.display());
+ let target = std::fs::read_link(&path)?.into_os_string();
+ let targetstr = into_string(target)?;
+ Ok(json!(targetstr))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct FtruncateArgs {
+ rid: i32,
+ len: i32,
+}
+
+fn op_ftruncate_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.ftruncate");
+ let args: FtruncateArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let len = args.len as u64;
+ std_file_resource(state, rid, |r| match r {
+ Ok(std_file) => std_file.set_len(len).map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot truncate this type of resource")),
+ })?;
+ Ok(json!({}))
+}
+
+async fn op_ftruncate_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ super::check_unstable2(&state, "Deno.ftruncate");
+ let args: FtruncateArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let len = args.len as u64;
+ std_file_resource(&mut state.borrow_mut(), rid, |r| match r {
+ Ok(std_file) => std_file.set_len(len).map_err(AnyError::from),
+ Err(_) => Err(type_error("cannot truncate this type of resource")),
+ })?;
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct TruncateArgs {
+ path: String,
+ len: u64,
+}
+
+fn op_truncate_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: TruncateArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let len = args.len;
+
+ state.borrow::<Permissions>().check_write(&path)?;
+
+ debug!("op_truncate_sync {} {}", path.display(), len);
+ let f = std::fs::OpenOptions::new().write(true).open(&path)?;
+ f.set_len(len)?;
+ Ok(json!({}))
+}
+
+async fn op_truncate_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: TruncateArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let len = args.len;
+ {
+ let state = state.borrow();
+ state.borrow::<Permissions>().check_write(&path)?;
+ }
+ tokio::task::spawn_blocking(move || {
+ debug!("op_truncate_async {} {}", path.display(), len);
+ let f = std::fs::OpenOptions::new().write(true).open(&path)?;
+ f.set_len(len)?;
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+fn make_temp(
+ dir: Option<&Path>,
+ prefix: Option<&str>,
+ suffix: Option<&str>,
+ is_dir: bool,
+) -> std::io::Result<PathBuf> {
+ let prefix_ = prefix.unwrap_or("");
+ let suffix_ = suffix.unwrap_or("");
+ let mut buf: PathBuf = match dir {
+ Some(ref p) => p.to_path_buf(),
+ None => temp_dir(),
+ }
+ .join("_");
+ let mut rng = thread_rng();
+ loop {
+ let unique = rng.gen::<u32>();
+ buf.set_file_name(format!("{}{:08x}{}", prefix_, unique, suffix_));
+ let r = if is_dir {
+ #[allow(unused_mut)]
+ let mut builder = std::fs::DirBuilder::new();
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::DirBuilderExt;
+ builder.mode(0o700);
+ }
+ builder.create(buf.as_path())
+ } else {
+ let mut open_options = std::fs::OpenOptions::new();
+ open_options.write(true).create_new(true);
+ #[cfg(unix)]
+ {
+ use std::os::unix::fs::OpenOptionsExt;
+ open_options.mode(0o600);
+ }
+ open_options.open(buf.as_path())?;
+ Ok(())
+ };
+ match r {
+ Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue,
+ Ok(_) => return Ok(buf),
+ Err(e) => return Err(e),
+ }
+ }
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct MakeTempArgs {
+ dir: Option<String>,
+ prefix: Option<String>,
+ suffix: Option<String>,
+}
+
+fn op_make_temp_dir_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: MakeTempArgs = serde_json::from_value(args)?;
+
+ let dir = args.dir.map(|s| PathBuf::from(&s));
+ let prefix = args.prefix.map(String::from);
+ let suffix = args.suffix.map(String::from);
+
+ state
+ .borrow::<Permissions>()
+ .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?;
+
+ // TODO(piscisaureus): use byte vector for paths, not a string.
+ // See https://github.com/denoland/deno/issues/627.
+ // We can't assume that paths are always valid utf8 strings.
+ let path = make_temp(
+ // Converting Option<String> to Option<&str>
+ dir.as_deref(),
+ prefix.as_deref(),
+ suffix.as_deref(),
+ true,
+ )?;
+ let path_str = into_string(path.into_os_string())?;
+
+ Ok(json!(path_str))
+}
+
+async fn op_make_temp_dir_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: MakeTempArgs = serde_json::from_value(args)?;
+
+ let dir = args.dir.map(|s| PathBuf::from(&s));
+ let prefix = args.prefix.map(String::from);
+ let suffix = args.suffix.map(String::from);
+ {
+ let state = state.borrow();
+ state
+ .borrow::<Permissions>()
+ .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?;
+ }
+ tokio::task::spawn_blocking(move || {
+ // TODO(piscisaureus): use byte vector for paths, not a string.
+ // See https://github.com/denoland/deno/issues/627.
+ // We can't assume that paths are always valid utf8 strings.
+ let path = make_temp(
+ // Converting Option<String> to Option<&str>
+ dir.as_deref(),
+ prefix.as_deref(),
+ suffix.as_deref(),
+ true,
+ )?;
+ let path_str = into_string(path.into_os_string())?;
+
+ Ok(json!(path_str))
+ })
+ .await
+ .unwrap()
+}
+
+fn op_make_temp_file_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: MakeTempArgs = serde_json::from_value(args)?;
+
+ let dir = args.dir.map(|s| PathBuf::from(&s));
+ let prefix = args.prefix.map(String::from);
+ let suffix = args.suffix.map(String::from);
+
+ state
+ .borrow::<Permissions>()
+ .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?;
+
+ // TODO(piscisaureus): use byte vector for paths, not a string.
+ // See https://github.com/denoland/deno/issues/627.
+ // We can't assume that paths are always valid utf8 strings.
+ let path = make_temp(
+ // Converting Option<String> to Option<&str>
+ dir.as_deref(),
+ prefix.as_deref(),
+ suffix.as_deref(),
+ false,
+ )?;
+ let path_str = into_string(path.into_os_string())?;
+
+ Ok(json!(path_str))
+}
+
+async fn op_make_temp_file_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: MakeTempArgs = serde_json::from_value(args)?;
+
+ let dir = args.dir.map(|s| PathBuf::from(&s));
+ let prefix = args.prefix.map(String::from);
+ let suffix = args.suffix.map(String::from);
+ {
+ let state = state.borrow();
+ state
+ .borrow::<Permissions>()
+ .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?;
+ }
+ tokio::task::spawn_blocking(move || {
+ // TODO(piscisaureus): use byte vector for paths, not a string.
+ // See https://github.com/denoland/deno/issues/627.
+ // We can't assume that paths are always valid utf8 strings.
+ let path = make_temp(
+ // Converting Option<String> to Option<&str>
+ dir.as_deref(),
+ prefix.as_deref(),
+ suffix.as_deref(),
+ false,
+ )?;
+ let path_str = into_string(path.into_os_string())?;
+
+ Ok(json!(path_str))
+ })
+ .await
+ .unwrap()
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct FutimeArgs {
+ rid: i32,
+ atime: (i64, u32),
+ mtime: (i64, u32),
+}
+
+fn op_futime_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.futimeSync");
+ let args: FutimeArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1);
+ let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1);
+
+ std_file_resource(state, rid, |r| match r {
+ Ok(std_file) => {
+ filetime::set_file_handle_times(std_file, Some(atime), Some(mtime))
+ .map_err(AnyError::from)
+ }
+ Err(_) => Err(type_error(
+ "cannot futime on this type of resource".to_string(),
+ )),
+ })?;
+
+ Ok(json!({}))
+}
+
+async fn op_futime_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let mut state = state.borrow_mut();
+ super::check_unstable(&state, "Deno.futime");
+ let args: FutimeArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1);
+ let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1);
+ // TODO Not actually async! https://github.com/denoland/deno/issues/7400
+ std_file_resource(&mut state, rid, |r| match r {
+ Ok(std_file) => {
+ filetime::set_file_handle_times(std_file, Some(atime), Some(mtime))
+ .map_err(AnyError::from)
+ }
+ Err(_) => Err(type_error(
+ "cannot futime on this type of resource".to_string(),
+ )),
+ })?;
+
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct UtimeArgs {
+ path: String,
+ atime: (i64, u32),
+ mtime: (i64, u32),
+}
+
+fn op_utime_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.utime");
+
+ let args: UtimeArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1);
+ let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1);
+
+ state.borrow::<Permissions>().check_write(&path)?;
+ filetime::set_file_times(path, atime, mtime)?;
+ Ok(json!({}))
+}
+
+async fn op_utime_async(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let state = state.borrow();
+ super::check_unstable(&state, "Deno.utime");
+
+ let args: UtimeArgs = serde_json::from_value(args)?;
+ let path = PathBuf::from(&args.path);
+ let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1);
+ let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1);
+
+ state.borrow::<Permissions>().check_write(&path)?;
+
+ tokio::task::spawn_blocking(move || {
+ filetime::set_file_times(path, atime, mtime)?;
+ Ok(json!({}))
+ })
+ .await
+ .unwrap()
+}
+
+fn op_cwd(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let path = current_dir()?;
+ state
+ .borrow::<Permissions>()
+ .check_read_blind(&path, "CWD")?;
+ let path_str = into_string(path.into_os_string())?;
+ Ok(json!(path_str))
+}
diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs
new file mode 100644
index 000000000..4832c915c
--- /dev/null
+++ b/runtime/ops/fs_events.rs
@@ -0,0 +1,133 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use deno_core::error::bad_resource_id;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use notify::event::Event as NotifyEvent;
+use notify::Error as NotifyError;
+use notify::EventKind;
+use notify::RecommendedWatcher;
+use notify::RecursiveMode;
+use notify::Watcher;
+use serde::Deserialize;
+use serde::Serialize;
+use std::cell::RefCell;
+use std::convert::From;
+use std::path::PathBuf;
+use std::rc::Rc;
+use tokio::sync::mpsc;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_fs_events_open", op_fs_events_open);
+ super::reg_json_async(rt, "op_fs_events_poll", op_fs_events_poll);
+}
+
+struct FsEventsResource {
+ #[allow(unused)]
+ watcher: RecommendedWatcher,
+ receiver: mpsc::Receiver<Result<FsEvent, AnyError>>,
+}
+
+/// Represents a file system event.
+///
+/// We do not use the event directly from the notify crate. We flatten
+/// the structure into this simpler structure. We want to only make it more
+/// complex as needed.
+///
+/// Feel free to expand this struct as long as you can add tests to demonstrate
+/// the complexity.
+#[derive(Serialize, Debug)]
+struct FsEvent {
+ kind: String,
+ paths: Vec<PathBuf>,
+}
+
+impl From<NotifyEvent> for FsEvent {
+ fn from(e: NotifyEvent) -> Self {
+ let kind = match e.kind {
+ EventKind::Any => "any",
+ EventKind::Access(_) => "access",
+ EventKind::Create(_) => "create",
+ EventKind::Modify(_) => "modify",
+ EventKind::Remove(_) => "remove",
+ EventKind::Other => todo!(), // What's this for? Leaving it out for now.
+ }
+ .to_string();
+ FsEvent {
+ kind,
+ paths: e.paths,
+ }
+ }
+}
+
+fn op_fs_events_open(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ #[derive(Deserialize)]
+ struct OpenArgs {
+ recursive: bool,
+ paths: Vec<String>,
+ }
+ let args: OpenArgs = serde_json::from_value(args)?;
+ let (sender, receiver) = mpsc::channel::<Result<FsEvent, AnyError>>(16);
+ let sender = std::sync::Mutex::new(sender);
+ let mut watcher: RecommendedWatcher =
+ Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
+ let res2 = res.map(FsEvent::from).map_err(AnyError::from);
+ let mut sender = sender.lock().unwrap();
+ // Ignore result, if send failed it means that watcher was already closed,
+ // but not all messages have been flushed.
+ let _ = sender.try_send(res2);
+ })?;
+ let recursive_mode = if args.recursive {
+ RecursiveMode::Recursive
+ } else {
+ RecursiveMode::NonRecursive
+ };
+ for path in &args.paths {
+ state
+ .borrow::<Permissions>()
+ .check_read(&PathBuf::from(path))?;
+ watcher.watch(path, recursive_mode)?;
+ }
+ let resource = FsEventsResource { watcher, receiver };
+ let rid = state.resource_table.add("fsEvents", Box::new(resource));
+ Ok(json!(rid))
+}
+
+async fn op_fs_events_poll(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ #[derive(Deserialize)]
+ struct PollArgs {
+ rid: u32,
+ }
+ let PollArgs { rid } = serde_json::from_value(args)?;
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let watcher = state
+ .resource_table
+ .get_mut::<FsEventsResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ watcher
+ .receiver
+ .poll_recv(cx)
+ .map(|maybe_result| match maybe_result {
+ Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
+ Some(Err(err)) => Err(err),
+ None => Ok(json!({ "done": true })),
+ })
+ })
+ .await
+}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
new file mode 100644
index 000000000..0f8af905a
--- /dev/null
+++ b/runtime/ops/io.rs
@@ -0,0 +1,473 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use super::dispatch_minimal::minimal_op;
+use super::dispatch_minimal::MinimalOp;
+use crate::metrics::metrics_op;
+use deno_core::error::bad_resource_id;
+use deno_core::error::resource_unavailable;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::future::FutureExt;
+use deno_core::futures::ready;
+use deno_core::BufVec;
+use deno_core::JsRuntime;
+use deno_core::OpState;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpStream;
+use tokio_rustls::client::TlsStream as ClientTlsStream;
+use tokio_rustls::server::TlsStream as ServerTlsStream;
+
+#[cfg(not(windows))]
+use std::os::unix::io::FromRawFd;
+
+#[cfg(windows)]
+use std::os::windows::io::FromRawHandle;
+
+lazy_static! {
+ /// Due to portability issues on Windows handle to stdout is created from raw
+ /// file descriptor. The caveat of that approach is fact that when this
+ /// handle is dropped underlying file descriptor is closed - that is highly
+ /// not desirable in case of stdout. That's why we store this global handle
+ /// that is then cloned when obtaining stdio for process. In turn when
+ /// resource table is dropped storing reference to that handle, the handle
+ /// itself won't be closed (so Deno.core.print) will still work.
+ // TODO(ry) It should be possible to close stdout.
+ static ref STDIN_HANDLE: Option<std::fs::File> = {
+ #[cfg(not(windows))]
+ let stdin = unsafe { Some(std::fs::File::from_raw_fd(0)) };
+ #[cfg(windows)]
+ let stdin = unsafe {
+ let handle = winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_INPUT_HANDLE,
+ );
+ if handle.is_null() {
+ return None;
+ }
+ Some(std::fs::File::from_raw_handle(handle))
+ };
+ stdin
+ };
+ static ref STDOUT_HANDLE: Option<std::fs::File> = {
+ #[cfg(not(windows))]
+ let stdout = unsafe { Some(std::fs::File::from_raw_fd(1)) };
+ #[cfg(windows)]
+ let stdout = unsafe {
+ let handle = winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_OUTPUT_HANDLE,
+ );
+ if handle.is_null() {
+ return None;
+ }
+ Some(std::fs::File::from_raw_handle(handle))
+ };
+ stdout
+ };
+ static ref STDERR_HANDLE: Option<std::fs::File> = {
+ #[cfg(not(windows))]
+ let stderr = unsafe { Some(std::fs::File::from_raw_fd(2)) };
+ #[cfg(windows)]
+ let stderr = unsafe {
+ let handle = winapi::um::processenv::GetStdHandle(
+ winapi::um::winbase::STD_ERROR_HANDLE,
+ );
+ if handle.is_null() {
+ return None;
+ }
+ Some(std::fs::File::from_raw_handle(handle))
+ };
+ stderr
+ };
+}
+
+pub fn init(rt: &mut JsRuntime) {
+ rt.register_op("op_read", metrics_op(minimal_op(op_read)));
+ rt.register_op("op_write", metrics_op(minimal_op(op_write)));
+}
+
+pub fn get_stdio() -> (
+ Option<StreamResourceHolder>,
+ Option<StreamResourceHolder>,
+ Option<StreamResourceHolder>,
+) {
+ let stdin = get_stdio_stream(&STDIN_HANDLE);
+ let stdout = get_stdio_stream(&STDOUT_HANDLE);
+ let stderr = get_stdio_stream(&STDERR_HANDLE);
+
+ (stdin, stdout, stderr)
+}
+
+fn get_stdio_stream(
+ handle: &Option<std::fs::File>,
+) -> Option<StreamResourceHolder> {
+ match handle {
+ None => None,
+ Some(file_handle) => match file_handle.try_clone() {
+ Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile(
+ Some((tokio::fs::File::from_std(clone), FileMetadata::default())),
+ ))),
+ Err(_e) => None,
+ },
+ }
+}
+
+fn no_buffer_specified() -> AnyError {
+ type_error("no buffer specified")
+}
+
+#[cfg(unix)]
+use nix::sys::termios;
+
+#[derive(Default)]
+pub struct TTYMetadata {
+ #[cfg(unix)]
+ pub mode: Option<termios::Termios>,
+}
+
+#[derive(Default)]
+pub struct FileMetadata {
+ pub tty: TTYMetadata,
+}
+
+pub struct StreamResourceHolder {
+ pub resource: StreamResource,
+ waker: HashMap<usize, futures::task::AtomicWaker>,
+ waker_counter: AtomicUsize,
+}
+
+impl StreamResourceHolder {
+ pub fn new(resource: StreamResource) -> StreamResourceHolder {
+ StreamResourceHolder {
+ resource,
+ // Atleast one task is expecter for the resource
+ waker: HashMap::with_capacity(1),
+ // Tracks wakers Ids
+ waker_counter: AtomicUsize::new(0),
+ }
+ }
+}
+
+impl Drop for StreamResourceHolder {
+ fn drop(&mut self) {
+ self.wake_tasks();
+ }
+}
+
+impl StreamResourceHolder {
+ pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> {
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ // Its OK if it overflows
+ let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
+ self.waker.insert(task_waker_id, waker);
+ Ok(task_waker_id)
+ }
+
+ pub fn wake_tasks(&mut self) {
+ for waker in self.waker.values() {
+ waker.wake();
+ }
+ }
+
+ pub fn untrack_task(&mut self, task_waker_id: usize) {
+ self.waker.remove(&task_waker_id);
+ }
+}
+
+pub enum StreamResource {
+ FsFile(Option<(tokio::fs::File, FileMetadata)>),
+ TcpStream(Option<tokio::net::TcpStream>),
+ #[cfg(not(windows))]
+ UnixStream(tokio::net::UnixStream),
+ ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
+ ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
+ ChildStdin(tokio::process::ChildStdin),
+ ChildStdout(tokio::process::ChildStdout),
+ ChildStderr(tokio::process::ChildStderr),
+}
+
+trait UnpinAsyncRead: AsyncRead + Unpin {}
+trait UnpinAsyncWrite: AsyncWrite + Unpin {}
+
+impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {}
+impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {}
+
+/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
+/// but uses an `AnyError` error instead of `std::io:Error`
+pub trait DenoAsyncRead {
+ fn poll_read(
+ &mut self,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, AnyError>>;
+}
+
+impl DenoAsyncRead for StreamResource {
+ fn poll_read(
+ &mut self,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, AnyError>> {
+ use StreamResource::*;
+ let f: &mut dyn UnpinAsyncRead = match self {
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Ready(Err(resource_unavailable())),
+ TcpStream(Some(f)) => f,
+ #[cfg(not(windows))]
+ UnixStream(f) => f,
+ ClientTlsStream(f) => f,
+ ServerTlsStream(f) => f,
+ ChildStdout(f) => f,
+ ChildStderr(f) => f,
+ _ => return Err(bad_resource_id()).into(),
+ };
+ let v = ready!(Pin::new(f).poll_read(cx, buf))?;
+ Ok(v).into()
+ }
+}
+
+pub fn op_read(
+ state: Rc<RefCell<OpState>>,
+ is_sync: bool,
+ rid: i32,
+ mut zero_copy: BufVec,
+) -> MinimalOp {
+ debug!("read rid={}", rid);
+ match zero_copy.len() {
+ 0 => return MinimalOp::Sync(Err(no_buffer_specified())),
+ 1 => {}
+ _ => panic!("Invalid number of arguments"),
+ }
+
+ if is_sync {
+ MinimalOp::Sync({
+ // First we look up the rid in the resource table.
+ std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r {
+ Ok(std_file) => {
+ use std::io::Read;
+ std_file
+ .read(&mut zero_copy[0])
+ .map(|n: usize| n as i32)
+ .map_err(AnyError::from)
+ }
+ Err(_) => Err(type_error("sync read not allowed on this resource")),
+ })
+ })
+ } else {
+ let mut zero_copy = zero_copy[0].clone();
+ MinimalOp::Async(
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let resource_holder = state
+ .resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+
+ let mut task_tracker_id: Option<usize> = None;
+ let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy)
+ {
+ Poll::Ready(t) => {
+ if let Some(id) = task_tracker_id {
+ resource_holder.untrack_task(id);
+ }
+ t
+ }
+ Poll::Pending => {
+ task_tracker_id.replace(resource_holder.track_task(cx)?);
+ return Poll::Pending;
+ }
+ }?;
+ Poll::Ready(Ok(nread as i32))
+ })
+ .boxed_local(),
+ )
+ }
+}
+
+/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
+/// but uses an `AnyError` error instead of `std::io:Error`
+pub trait DenoAsyncWrite {
+ fn poll_write(
+ &mut self,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, AnyError>>;
+
+ fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
+
+ fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>;
+}
+
+impl DenoAsyncWrite for StreamResource {
+ fn poll_write(
+ &mut self,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, AnyError>> {
+ use StreamResource::*;
+ let f: &mut dyn UnpinAsyncWrite = match self {
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Pending,
+ TcpStream(Some(f)) => f,
+ #[cfg(not(windows))]
+ UnixStream(f) => f,
+ ClientTlsStream(f) => f,
+ ServerTlsStream(f) => f,
+ ChildStdin(f) => f,
+ _ => return Err(bad_resource_id()).into(),
+ };
+
+ let v = ready!(Pin::new(f).poll_write(cx, buf))?;
+ Ok(v).into()
+ }
+
+ fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> {
+ use StreamResource::*;
+ let f: &mut dyn UnpinAsyncWrite = match self {
+ FsFile(Some((f, _))) => f,
+ FsFile(None) => return Poll::Pending,
+ TcpStream(Some(f)) => f,
+ #[cfg(not(windows))]
+ UnixStream(f) => f,
+ ClientTlsStream(f) => f,
+ ServerTlsStream(f) => f,
+ ChildStdin(f) => f,
+ _ => return Err(bad_resource_id()).into(),
+ };
+
+ ready!(Pin::new(f).poll_flush(cx))?;
+ Ok(()).into()
+ }
+
+ fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> {
+ unimplemented!()
+ }
+}
+
+pub fn op_write(
+ state: Rc<RefCell<OpState>>,
+ is_sync: bool,
+ rid: i32,
+ zero_copy: BufVec,
+) -> MinimalOp {
+ debug!("write rid={}", rid);
+ match zero_copy.len() {
+ 0 => return MinimalOp::Sync(Err(no_buffer_specified())),
+ 1 => {}
+ _ => panic!("Invalid number of arguments"),
+ }
+
+ if is_sync {
+ MinimalOp::Sync({
+ // First we look up the rid in the resource table.
+ std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r {
+ Ok(std_file) => {
+ use std::io::Write;
+ std_file
+ .write(&zero_copy[0])
+ .map(|nwritten: usize| nwritten as i32)
+ .map_err(AnyError::from)
+ }
+ Err(_) => Err(type_error("sync read not allowed on this resource")),
+ })
+ })
+ } else {
+ let zero_copy = zero_copy[0].clone();
+ MinimalOp::Async(
+ async move {
+ let nwritten = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let resource_holder = state
+ .resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+ resource_holder.resource.poll_write(cx, &zero_copy)
+ })
+ .await?;
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let resource_holder = state
+ .resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
+ .ok_or_else(bad_resource_id)?;
+ resource_holder.resource.poll_flush(cx)
+ })
+ .await?;
+
+ Ok(nwritten as i32)
+ }
+ .boxed_local(),
+ )
+ }
+}
+
+/// Helper function for operating on a std::fs::File stored in the resource table.
+///
+/// We store file system file resources as tokio::fs::File, so this is a little
+/// utility function that gets a std::fs:File when you need to do blocking
+/// operations.
+///
+/// Returns ErrorKind::Busy if the resource is being used by another op.
+pub fn std_file_resource<F, T>(
+ state: &mut OpState,
+ rid: u32,
+ mut f: F,
+) -> Result<T, AnyError>
+where
+ F: FnMut(
+ Result<&mut std::fs::File, &mut StreamResource>,
+ ) -> Result<T, AnyError>,
+{
+ // First we look up the rid in the resource table.
+ let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid);
+ if let Some(ref mut resource_holder) = r {
+ // Sync write only works for FsFile. It doesn't make sense to do this
+ // for non-blocking sockets. So we error out if not FsFile.
+ match &mut resource_holder.resource {
+ StreamResource::FsFile(option_file_metadata) => {
+ // The object in the resource table is a tokio::fs::File - but in
+ // order to do a blocking write on it, we must turn it into a
+ // std::fs::File. Hopefully this code compiles down to nothing.
+ if let Some((tokio_file, metadata)) = option_file_metadata.take() {
+ match tokio_file.try_into_std() {
+ Ok(mut std_file) => {
+ let result = f(Ok(&mut std_file));
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ // return the result.
+ result
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ Err(resource_unavailable())
+ }
+ }
+ } else {
+ Err(resource_unavailable())
+ }
+ }
+ _ => f(Err(&mut resource_holder.resource)),
+ }
+ } else {
+ Err(bad_resource_id())
+ }
+}
diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs
new file mode 100644
index 000000000..a27122657
--- /dev/null
+++ b/runtime/ops/mod.rs
@@ -0,0 +1,89 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+mod dispatch_minimal;
+pub use dispatch_minimal::MinimalOp;
+
+pub mod crypto;
+pub mod fetch;
+pub mod fs;
+pub mod fs_events;
+pub mod io;
+pub mod net;
+#[cfg(unix)]
+mod net_unix;
+pub mod os;
+pub mod permissions;
+pub mod plugin;
+pub mod process;
+pub mod runtime;
+pub mod signal;
+pub mod timers;
+pub mod tls;
+pub mod tty;
+pub mod web_worker;
+pub mod websocket;
+pub mod worker_host;
+
+use crate::metrics::metrics_op;
+use deno_core::error::AnyError;
+use deno_core::json_op_async;
+use deno_core::json_op_sync;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::JsRuntime;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use std::cell::RefCell;
+use std::future::Future;
+use std::rc::Rc;
+
+pub fn reg_json_async<F, R>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
+where
+ F: Fn(Rc<RefCell<OpState>>, Value, BufVec) -> R + 'static,
+ R: Future<Output = Result<Value, AnyError>> + 'static,
+{
+ rt.register_op(name, metrics_op(json_op_async(op_fn)));
+}
+
+pub fn reg_json_sync<F>(rt: &mut JsRuntime, name: &'static str, op_fn: F)
+where
+ F: Fn(&mut OpState, Value, &mut [ZeroCopyBuf]) -> Result<Value, AnyError>
+ + 'static,
+{
+ rt.register_op(name, metrics_op(json_op_sync(op_fn)));
+}
+
+/// `UnstableChecker` is a struct so it can be placed inside `GothamState`;
+/// using type alias for a bool could work, but there's a high chance
+/// that there might be another type alias pointing to a bool, which
+/// would override previously used alias.
+pub struct UnstableChecker {
+ pub unstable: bool,
+}
+
+impl UnstableChecker {
+ /// Quits the process if the --unstable flag was not provided.
+ ///
+ /// This is intentionally a non-recoverable check so that people cannot probe
+ /// for unstable APIs from stable programs.
+ // NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
+ pub fn check_unstable(&self, api_name: &str) {
+ if !self.unstable {
+ eprintln!(
+ "Unstable API '{}'. The --unstable flag must be provided.",
+ api_name
+ );
+ std::process::exit(70);
+ }
+ }
+}
+/// Helper for checking unstable features. Used for sync ops.
+pub fn check_unstable(state: &OpState, api_name: &str) {
+ state.borrow::<UnstableChecker>().check_unstable(api_name)
+}
+
+/// Helper for checking unstable features. Used for async ops.
+pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) {
+ let state = state.borrow();
+ state.borrow::<UnstableChecker>().check_unstable(api_name)
+}
diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs
new file mode 100644
index 000000000..98ff83fc0
--- /dev/null
+++ b/runtime/ops/net.rs
@@ -0,0 +1,566 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::ops::io::StreamResource;
+use crate::ops::io::StreamResourceHolder;
+use crate::permissions::Permissions;
+use crate::resolve_addr::resolve_addr;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::custom_error;
+use deno_core::error::generic_error;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::future::poll_fn;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::net::Shutdown;
+use std::net::SocketAddr;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+use tokio::net::UdpSocket;
+
+#[cfg(unix)]
+use super::net_unix;
+#[cfg(unix)]
+use std::path::Path;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_async(rt, "op_accept", op_accept);
+ super::reg_json_async(rt, "op_connect", op_connect);
+ super::reg_json_sync(rt, "op_shutdown", op_shutdown);
+ super::reg_json_sync(rt, "op_listen", op_listen);
+ super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive);
+ super::reg_json_async(rt, "op_datagram_send", op_datagram_send);
+}
+
+#[derive(Deserialize)]
+pub(crate) struct AcceptArgs {
+ pub rid: i32,
+ pub transport: String,
+}
+
+async fn accept_tcp(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let rid = args.rid as u32;
+
+ let accept_fut = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let listener_resource = state
+ .resource_table
+ .get_mut::<TcpListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = &mut listener_resource.listener;
+ match listener.poll_accept(cx).map_err(AnyError::from) {
+ Poll::Ready(Ok((stream, addr))) => {
+ listener_resource.untrack_task();
+ Poll::Ready(Ok((stream, addr)))
+ }
+ Poll::Pending => {
+ listener_resource.track_task(cx)?;
+ Poll::Pending
+ }
+ Poll::Ready(Err(e)) => {
+ listener_resource.untrack_task();
+ Poll::Ready(Err(e))
+ }
+ }
+ });
+ let (tcp_stream, _socket_addr) = accept_fut.await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state = state.borrow_mut();
+ let rid = state.resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
+ tcp_stream,
+ )))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": "tcp",
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "tcp",
+ }
+ }))
+}
+
+async fn op_accept(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: AcceptArgs = serde_json::from_value(args)?;
+ match args.transport.as_str() {
+ "tcp" => accept_tcp(state, args, bufs).await,
+ #[cfg(unix)]
+ "unix" => net_unix::accept_unix(state, args, bufs).await,
+ _ => Err(generic_error(format!(
+ "Unsupported transport protocol {}",
+ args.transport
+ ))),
+ }
+}
+
+#[derive(Deserialize)]
+pub(crate) struct ReceiveArgs {
+ pub rid: i32,
+ pub transport: String,
+}
+
+async fn receive_udp(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
+ let mut zero_copy = zero_copy[0].clone();
+
+ let rid = args.rid as u32;
+
+ let receive_fut = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let socket = &mut resource.socket;
+ socket
+ .poll_recv_from(cx, &mut zero_copy)
+ .map_err(AnyError::from)
+ });
+ let (size, remote_addr) = receive_fut.await?;
+ Ok(json!({
+ "size": size,
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "udp",
+ }
+ }))
+}
+
+async fn op_datagram_receive(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
+
+ let args: ReceiveArgs = serde_json::from_value(args)?;
+ match args.transport.as_str() {
+ "udp" => receive_udp(state, args, zero_copy).await,
+ #[cfg(unix)]
+ "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
+ _ => Err(generic_error(format!(
+ "Unsupported transport protocol {}",
+ args.transport
+ ))),
+ }
+}
+
+#[derive(Deserialize)]
+struct SendArgs {
+ rid: i32,
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+async fn op_datagram_send(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ assert_eq!(zero_copy.len(), 1, "Invalid number of arguments");
+ let zero_copy = zero_copy[0].clone();
+
+ match serde_json::from_value(args)? {
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "udp" => {
+ {
+ let s = state.borrow();
+ s.borrow::<Permissions>()
+ .check_net(&args.hostname, args.port)?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)?;
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UdpSocketResource>(rid as u32)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ resource
+ .socket
+ .poll_send_to(cx, &zero_copy, &addr)
+ .map_ok(|byte_length| json!(byte_length))
+ .map_err(AnyError::from)
+ })
+ .await
+ }
+ #[cfg(unix)]
+ SendArgs {
+ rid,
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ let s = state.borrow();
+ s.borrow::<Permissions>().check_write(&address_path)?;
+ }
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<net_unix::UnixDatagramResource>(rid as u32)
+ .ok_or_else(|| {
+ custom_error("NotConnected", "Socket has been closed")
+ })?;
+ let socket = &mut resource.socket;
+ let byte_length = socket
+ .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap())
+ .await?;
+
+ Ok(json!(byte_length))
+ }
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+#[derive(Deserialize)]
+struct ConnectArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+async fn op_connect(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ match serde_json::from_value(args)? {
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } if transport == "tcp" => {
+ {
+ let state_ = state.borrow();
+ state_
+ .borrow::<Permissions>()
+ .check_net(&args.hostname, args.port)?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_.resource_table.add(
+ "tcpStream",
+ Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some(
+ tcp_stream,
+ )))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": transport,
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": transport,
+ }
+ }))
+ }
+ #[cfg(unix)]
+ ConnectArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" => {
+ let address_path = Path::new(&args.path);
+ super::check_unstable2(&state, "Deno.connect");
+ {
+ let state_ = state.borrow();
+ state_.borrow::<Permissions>().check_read(&address_path)?;
+ state_.borrow::<Permissions>().check_write(&address_path)?;
+ }
+ let path = args.path;
+ let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+
+ let mut state_ = state.borrow_mut();
+ let rid = state_.resource_table.add(
+ "unixStream",
+ Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
+ unix_stream,
+ ))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "path": local_addr.as_pathname(),
+ "transport": transport,
+ },
+ "remoteAddr": {
+ "path": remote_addr.as_pathname(),
+ "transport": transport,
+ }
+ }))
+ }
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
+
+#[derive(Deserialize)]
+struct ShutdownArgs {
+ rid: i32,
+ how: i32,
+}
+
+fn op_shutdown(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.shutdown");
+
+ let args: ShutdownArgs = serde_json::from_value(args)?;
+
+ let rid = args.rid as u32;
+ let how = args.how;
+
+ let shutdown_mode = match how {
+ 0 => Shutdown::Read,
+ 1 => Shutdown::Write,
+ _ => unimplemented!(),
+ };
+
+ let resource_holder = state
+ .resource_table
+ .get_mut::<StreamResourceHolder>(rid)
+ .ok_or_else(bad_resource_id)?;
+ match resource_holder.resource {
+ StreamResource::TcpStream(Some(ref mut stream)) => {
+ TcpStream::shutdown(stream, shutdown_mode)?;
+ }
+ #[cfg(unix)]
+ StreamResource::UnixStream(ref mut stream) => {
+ net_unix::UnixStream::shutdown(stream, shutdown_mode)?;
+ }
+ _ => return Err(bad_resource_id()),
+ }
+
+ Ok(json!({}))
+}
+
+#[allow(dead_code)]
+struct TcpListenerResource {
+ listener: TcpListener,
+ waker: Option<futures::task::AtomicWaker>,
+ local_addr: SocketAddr,
+}
+
+impl Drop for TcpListenerResource {
+ fn drop(&mut self) {
+ self.wake_task();
+ }
+}
+
+impl TcpListenerResource {
+ /// Track the current task so future awaiting for connection
+ /// can be notified when listener is closed.
+ ///
+ /// Throws an error if another task is already tracked.
+ pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
+ // Currently, we only allow tracking a single accept task for a listener.
+ // This might be changed in the future with multiple workers.
+ // Caveat: TcpListener by itself also only tracks an accept task at a time.
+ // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
+ if self.waker.is_some() {
+ return Err(custom_error("Busy", "Another accept task is ongoing"));
+ }
+
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ self.waker.replace(waker);
+ Ok(())
+ }
+
+ /// Notifies a task when listener is closed so accept future can resolve.
+ pub fn wake_task(&mut self) {
+ if let Some(waker) = self.waker.as_ref() {
+ waker.wake();
+ }
+ }
+
+ /// Stop tracking a task.
+ /// Happens when the task is done and thus no further tracking is needed.
+ pub fn untrack_task(&mut self) {
+ if self.waker.is_some() {
+ self.waker.take();
+ }
+ }
+}
+
+struct UdpSocketResource {
+ socket: UdpSocket,
+}
+
+#[derive(Deserialize)]
+struct IpListenArgs {
+ hostname: String,
+ port: u16,
+}
+
+#[derive(Deserialize)]
+#[serde(untagged)]
+enum ArgsEnum {
+ Ip(IpListenArgs),
+ #[cfg(unix)]
+ Unix(net_unix::UnixListenArgs),
+}
+
+#[derive(Deserialize)]
+struct ListenArgs {
+ transport: String,
+ #[serde(flatten)]
+ transport_args: ArgsEnum,
+}
+
+fn listen_tcp(
+ state: &mut OpState,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = TcpListenerResource {
+ listener,
+ waker: None,
+ local_addr,
+ };
+ let rid = state
+ .resource_table
+ .add("tcpListener", Box::new(listener_resource));
+
+ Ok((rid, local_addr))
+}
+
+fn listen_udp(
+ state: &mut OpState,
+ addr: SocketAddr,
+) -> Result<(u32, SocketAddr), AnyError> {
+ let std_socket = std::net::UdpSocket::bind(&addr)?;
+ let socket = UdpSocket::from_std(std_socket)?;
+ let local_addr = socket.local_addr()?;
+ let socket_resource = UdpSocketResource { socket };
+ let rid = state
+ .resource_table
+ .add("udpSocket", Box::new(socket_resource));
+
+ Ok((rid, local_addr))
+}
+
+fn op_listen(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let permissions = state.borrow::<Permissions>();
+ match serde_json::from_value(args)? {
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Ip(args),
+ } => {
+ {
+ if transport == "udp" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ permissions.check_net(&args.hostname, args.port)?;
+ }
+ let addr = resolve_addr(&args.hostname, args.port)?;
+ let (rid, local_addr) = if transport == "tcp" {
+ listen_tcp(state, addr)?
+ } else {
+ listen_udp(state, addr)?
+ };
+ debug!(
+ "New listener {} {}:{}",
+ rid,
+ local_addr.ip().to_string(),
+ local_addr.port()
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": transport,
+ },
+ }))
+ }
+ #[cfg(unix)]
+ ListenArgs {
+ transport,
+ transport_args: ArgsEnum::Unix(args),
+ } if transport == "unix" || transport == "unixpacket" => {
+ let address_path = Path::new(&args.path);
+ {
+ if transport == "unix" {
+ super::check_unstable(state, "Deno.listen");
+ }
+ if transport == "unixpacket" {
+ super::check_unstable(state, "Deno.listenDatagram");
+ }
+ permissions.check_read(&address_path)?;
+ permissions.check_write(&address_path)?;
+ }
+ let (rid, local_addr) = if transport == "unix" {
+ net_unix::listen_unix(state, &address_path)?
+ } else {
+ net_unix::listen_unix_packet(state, &address_path)?
+ };
+ debug!(
+ "New listener {} {}",
+ rid,
+ local_addr.as_pathname().unwrap().display(),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "path": local_addr.as_pathname(),
+ "transport": transport,
+ },
+ }))
+ }
+ #[cfg(unix)]
+ _ => Err(type_error("Wrong argument format!")),
+ }
+}
diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs
new file mode 100644
index 000000000..4c416a5a4
--- /dev/null
+++ b/runtime/ops/net_unix.rs
@@ -0,0 +1,151 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::ops::io::StreamResource;
+use crate::ops::io::StreamResourceHolder;
+use crate::ops::net::AcceptArgs;
+use crate::ops::net::ReceiveArgs;
+use deno_core::error::bad_resource;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::fs::remove_file;
+use std::os::unix;
+use std::path::Path;
+use std::rc::Rc;
+use std::task::Poll;
+use tokio::net::UnixDatagram;
+use tokio::net::UnixListener;
+pub use tokio::net::UnixStream;
+
+struct UnixListenerResource {
+ listener: UnixListener,
+}
+
+pub struct UnixDatagramResource {
+ pub socket: UnixDatagram,
+ pub local_addr: unix::net::SocketAddr,
+}
+
+#[derive(Deserialize)]
+pub struct UnixListenArgs {
+ pub path: String,
+}
+
+pub(crate) async fn accept_unix(
+ state: Rc<RefCell<OpState>>,
+ args: AcceptArgs,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let rid = args.rid as u32;
+
+ let accept_fut = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let listener_resource = state
+ .resource_table
+ .get_mut::<UnixListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = &mut listener_resource.listener;
+ use deno_core::futures::StreamExt;
+ match listener.poll_next_unpin(cx) {
+ Poll::Ready(Some(stream)) => {
+ //listener_resource.untrack_task();
+ Poll::Ready(stream)
+ }
+ Poll::Ready(None) => todo!(),
+ Poll::Pending => {
+ //listener_resource.track_task(cx)?;
+ Poll::Pending
+ }
+ }
+ .map_err(AnyError::from)
+ });
+ let unix_stream = accept_fut.await?;
+
+ let local_addr = unix_stream.local_addr()?;
+ let remote_addr = unix_stream.peer_addr()?;
+ let mut state = state.borrow_mut();
+ let rid = state.resource_table.add(
+ "unixStream",
+ Box::new(StreamResourceHolder::new(StreamResource::UnixStream(
+ unix_stream,
+ ))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "path": local_addr.as_pathname(),
+ "transport": "unix",
+ },
+ "remoteAddr": {
+ "path": remote_addr.as_pathname(),
+ "transport": "unix",
+ }
+ }))
+}
+
+pub(crate) async fn receive_unix_packet(
+ state: Rc<RefCell<OpState>>,
+ args: ReceiveArgs,
+ bufs: BufVec,
+) -> Result<Value, AnyError> {
+ assert_eq!(bufs.len(), 1, "Invalid number of arguments");
+
+ let rid = args.rid as u32;
+ let mut buf = bufs.into_iter().next().unwrap();
+
+ let mut state = state.borrow_mut();
+ let resource = state
+ .resource_table
+ .get_mut::<UnixDatagramResource>(rid)
+ .ok_or_else(|| bad_resource("Socket has been closed"))?;
+ let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?;
+ Ok(json!({
+ "size": size,
+ "remoteAddr": {
+ "path": remote_addr.as_pathname(),
+ "transport": "unixpacket",
+ }
+ }))
+}
+
+pub fn listen_unix(
+ state: &mut OpState,
+ addr: &Path,
+) -> Result<(u32, unix::net::SocketAddr), AnyError> {
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let listener = UnixListener::bind(&addr)?;
+ let local_addr = listener.local_addr()?;
+ let listener_resource = UnixListenerResource { listener };
+ let rid = state
+ .resource_table
+ .add("unixListener", Box::new(listener_resource));
+
+ Ok((rid, local_addr))
+}
+
+pub fn listen_unix_packet(
+ state: &mut OpState,
+ addr: &Path,
+) -> Result<(u32, unix::net::SocketAddr), AnyError> {
+ if addr.exists() {
+ remove_file(&addr).unwrap();
+ }
+ let socket = UnixDatagram::bind(&addr)?;
+ let local_addr = socket.local_addr()?;
+ let datagram_resource = UnixDatagramResource {
+ socket,
+ local_addr: local_addr.clone(),
+ };
+ let rid = state
+ .resource_table
+ .add("unixDatagram", Box::new(datagram_resource));
+
+ Ok((rid, local_addr))
+}
diff --git a/runtime/ops/os.rs b/runtime/ops/os.rs
new file mode 100644
index 000000000..6fd404a23
--- /dev/null
+++ b/runtime/ops/os.rs
@@ -0,0 +1,192 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use deno_core::error::AnyError;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::url::Url;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::env;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_exit", op_exit);
+ super::reg_json_sync(rt, "op_env", op_env);
+ super::reg_json_sync(rt, "op_exec_path", op_exec_path);
+ super::reg_json_sync(rt, "op_set_env", op_set_env);
+ super::reg_json_sync(rt, "op_get_env", op_get_env);
+ super::reg_json_sync(rt, "op_delete_env", op_delete_env);
+ super::reg_json_sync(rt, "op_hostname", op_hostname);
+ super::reg_json_sync(rt, "op_loadavg", op_loadavg);
+ super::reg_json_sync(rt, "op_os_release", op_os_release);
+ super::reg_json_sync(rt, "op_system_memory_info", op_system_memory_info);
+ super::reg_json_sync(rt, "op_system_cpu_info", op_system_cpu_info);
+}
+
+fn op_exec_path(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let current_exe = env::current_exe().unwrap();
+ state
+ .borrow::<Permissions>()
+ .check_read_blind(&current_exe, "exec_path")?;
+ // Now apply URL parser to current exe to get fully resolved path, otherwise
+ // we might get `./` and `../` bits in `exec_path`
+ let exe_url = Url::from_file_path(current_exe).unwrap();
+ let path = exe_url.to_file_path().unwrap();
+ Ok(json!(path))
+}
+
+#[derive(Deserialize)]
+struct SetEnv {
+ key: String,
+ value: String,
+}
+
+fn op_set_env(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: SetEnv = serde_json::from_value(args)?;
+ state.borrow::<Permissions>().check_env()?;
+ env::set_var(args.key, args.value);
+ Ok(json!({}))
+}
+
+fn op_env(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ state.borrow::<Permissions>().check_env()?;
+ let v = env::vars().collect::<HashMap<String, String>>();
+ Ok(json!(v))
+}
+
+#[derive(Deserialize)]
+struct GetEnv {
+ key: String,
+}
+
+fn op_get_env(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: GetEnv = serde_json::from_value(args)?;
+ state.borrow::<Permissions>().check_env()?;
+ let r = match env::var(args.key) {
+ Err(env::VarError::NotPresent) => json!([]),
+ v => json!([v?]),
+ };
+ Ok(r)
+}
+
+#[derive(Deserialize)]
+struct DeleteEnv {
+ key: String,
+}
+
+fn op_delete_env(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: DeleteEnv = serde_json::from_value(args)?;
+ state.borrow::<Permissions>().check_env()?;
+ env::remove_var(args.key);
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+struct Exit {
+ code: i32,
+}
+
+fn op_exit(
+ _state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: Exit = serde_json::from_value(args)?;
+ std::process::exit(args.code)
+}
+
+fn op_loadavg(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.loadavg");
+ state.borrow::<Permissions>().check_env()?;
+ match sys_info::loadavg() {
+ Ok(loadavg) => Ok(json!([loadavg.one, loadavg.five, loadavg.fifteen])),
+ Err(_) => Ok(json!([0f64, 0f64, 0f64])),
+ }
+}
+
+fn op_hostname(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.hostname");
+ state.borrow::<Permissions>().check_env()?;
+ let hostname = sys_info::hostname().unwrap_or_else(|_| "".to_string());
+ Ok(json!(hostname))
+}
+
+fn op_os_release(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.osRelease");
+ state.borrow::<Permissions>().check_env()?;
+ let release = sys_info::os_release().unwrap_or_else(|_| "".to_string());
+ Ok(json!(release))
+}
+
+fn op_system_memory_info(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.systemMemoryInfo");
+ state.borrow::<Permissions>().check_env()?;
+ match sys_info::mem_info() {
+ Ok(info) => Ok(json!({
+ "total": info.total,
+ "free": info.free,
+ "available": info.avail,
+ "buffers": info.buffers,
+ "cached": info.cached,
+ "swapTotal": info.swap_total,
+ "swapFree": info.swap_free
+ })),
+ Err(_) => Ok(json!({})),
+ }
+}
+
+fn op_system_cpu_info(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.systemCpuInfo");
+ state.borrow::<Permissions>().check_env()?;
+
+ let cores = sys_info::cpu_num().ok();
+ let speed = sys_info::cpu_speed().ok();
+
+ Ok(json!({
+ "cores": cores,
+ "speed": speed
+ }))
+}
diff --git a/runtime/ops/permissions.rs b/runtime/ops/permissions.rs
new file mode 100644
index 000000000..7474c0e37
--- /dev/null
+++ b/runtime/ops/permissions.rs
@@ -0,0 +1,103 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use deno_core::error::custom_error;
+use deno_core::error::AnyError;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::path::Path;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_query_permission", op_query_permission);
+ super::reg_json_sync(rt, "op_revoke_permission", op_revoke_permission);
+ super::reg_json_sync(rt, "op_request_permission", op_request_permission);
+}
+
+#[derive(Deserialize)]
+struct PermissionArgs {
+ name: String,
+ url: Option<String>,
+ path: Option<String>,
+}
+
+pub fn op_query_permission(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: PermissionArgs = serde_json::from_value(args)?;
+ let permissions = state.borrow::<Permissions>();
+ let path = args.path.as_deref();
+ let perm = match args.name.as_ref() {
+ "read" => permissions.query_read(&path.as_deref().map(Path::new)),
+ "write" => permissions.query_write(&path.as_deref().map(Path::new)),
+ "net" => permissions.query_net_url(&args.url.as_deref())?,
+ "env" => permissions.query_env(),
+ "run" => permissions.query_run(),
+ "plugin" => permissions.query_plugin(),
+ "hrtime" => permissions.query_hrtime(),
+ n => {
+ return Err(custom_error(
+ "ReferenceError",
+ format!("No such permission name: {}", n),
+ ))
+ }
+ };
+ Ok(json!({ "state": perm.to_string() }))
+}
+
+pub fn op_revoke_permission(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: PermissionArgs = serde_json::from_value(args)?;
+ let permissions = state.borrow_mut::<Permissions>();
+ let path = args.path.as_deref();
+ let perm = match args.name.as_ref() {
+ "read" => permissions.revoke_read(&path.as_deref().map(Path::new)),
+ "write" => permissions.revoke_write(&path.as_deref().map(Path::new)),
+ "net" => permissions.revoke_net(&args.url.as_deref())?,
+ "env" => permissions.revoke_env(),
+ "run" => permissions.revoke_run(),
+ "plugin" => permissions.revoke_plugin(),
+ "hrtime" => permissions.revoke_hrtime(),
+ n => {
+ return Err(custom_error(
+ "ReferenceError",
+ format!("No such permission name: {}", n),
+ ))
+ }
+ };
+ Ok(json!({ "state": perm.to_string() }))
+}
+
+pub fn op_request_permission(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: PermissionArgs = serde_json::from_value(args)?;
+ let permissions = state.borrow_mut::<Permissions>();
+ let path = args.path.as_deref();
+ let perm = match args.name.as_ref() {
+ "read" => permissions.request_read(&path.as_deref().map(Path::new)),
+ "write" => permissions.request_write(&path.as_deref().map(Path::new)),
+ "net" => permissions.request_net(&args.url.as_deref())?,
+ "env" => permissions.request_env(),
+ "run" => permissions.request_run(),
+ "plugin" => permissions.request_plugin(),
+ "hrtime" => permissions.request_hrtime(),
+ n => {
+ return Err(custom_error(
+ "ReferenceError",
+ format!("No such permission name: {}", n),
+ ))
+ }
+ };
+ Ok(json!({ "state": perm.to_string() }))
+}
diff --git a/runtime/ops/plugin.rs b/runtime/ops/plugin.rs
new file mode 100644
index 000000000..1f3669b6f
--- /dev/null
+++ b/runtime/ops/plugin.rs
@@ -0,0 +1,156 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::metrics::metrics_op;
+use crate::permissions::Permissions;
+use deno_core::error::AnyError;
+use deno_core::futures::prelude::*;
+use deno_core::plugin_api;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::JsRuntime;
+use deno_core::Op;
+use deno_core::OpAsyncFuture;
+use deno_core::OpId;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use dlopen::symbor::Library;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::path::PathBuf;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+
+pub fn init(rt: &mut JsRuntime) {
+ super::reg_json_sync(rt, "op_open_plugin", op_open_plugin);
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct OpenPluginArgs {
+ filename: String,
+}
+
+pub fn op_open_plugin(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: OpenPluginArgs = serde_json::from_value(args)?;
+ let filename = PathBuf::from(&args.filename);
+
+ super::check_unstable(state, "Deno.openPlugin");
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_plugin(&filename)?;
+
+ debug!("Loading Plugin: {:#?}", filename);
+ let plugin_lib = Library::open(filename).map(Rc::new)?;
+ let plugin_resource = PluginResource::new(&plugin_lib);
+
+ let rid;
+ let deno_plugin_init;
+ {
+ rid = state
+ .resource_table
+ .add("plugin", Box::new(plugin_resource));
+ deno_plugin_init = *unsafe {
+ state
+ .resource_table
+ .get::<PluginResource>(rid)
+ .unwrap()
+ .lib
+ .symbol::<plugin_api::InitFn>("deno_plugin_init")
+ .unwrap()
+ };
+ }
+
+ let mut interface = PluginInterface::new(state, &plugin_lib);
+ deno_plugin_init(&mut interface);
+
+ Ok(json!(rid))
+}
+
+struct PluginResource {
+ lib: Rc<Library>,
+}
+
+impl PluginResource {
+ fn new(lib: &Rc<Library>) -> Self {
+ Self { lib: lib.clone() }
+ }
+}
+
+struct PluginInterface<'a> {
+ state: &'a mut OpState,
+ plugin_lib: &'a Rc<Library>,
+}
+
+impl<'a> PluginInterface<'a> {
+ fn new(state: &'a mut OpState, plugin_lib: &'a Rc<Library>) -> Self {
+ Self { state, plugin_lib }
+ }
+}
+
+impl<'a> plugin_api::Interface for PluginInterface<'a> {
+ /// Does the same as `core::Isolate::register_op()`, but additionally makes
+ /// the registered op dispatcher, as well as the op futures created by it,
+ /// keep reference to the plugin `Library` object, so that the plugin doesn't
+ /// get unloaded before all its op registrations and the futures created by
+ /// them are dropped.
+ fn register_op(
+ &mut self,
+ name: &str,
+ dispatch_op_fn: plugin_api::DispatchOpFn,
+ ) -> OpId {
+ let plugin_lib = self.plugin_lib.clone();
+ let plugin_op_fn = move |state_rc: Rc<RefCell<OpState>>,
+ mut zero_copy: BufVec| {
+ let mut state = state_rc.borrow_mut();
+ let mut interface = PluginInterface::new(&mut state, &plugin_lib);
+ let op = dispatch_op_fn(&mut interface, &mut zero_copy);
+ match op {
+ sync_op @ Op::Sync(..) => sync_op,
+ Op::Async(fut) => Op::Async(PluginOpAsyncFuture::new(&plugin_lib, fut)),
+ Op::AsyncUnref(fut) => {
+ Op::AsyncUnref(PluginOpAsyncFuture::new(&plugin_lib, fut))
+ }
+ _ => unreachable!(),
+ }
+ };
+ self
+ .state
+ .op_table
+ .register_op(name, metrics_op(Box::new(plugin_op_fn)))
+ }
+}
+
+struct PluginOpAsyncFuture {
+ fut: Option<OpAsyncFuture>,
+ _plugin_lib: Rc<Library>,
+}
+
+impl PluginOpAsyncFuture {
+ fn new(plugin_lib: &Rc<Library>, fut: OpAsyncFuture) -> Pin<Box<Self>> {
+ let wrapped_fut = Self {
+ fut: Some(fut),
+ _plugin_lib: plugin_lib.clone(),
+ };
+ Box::pin(wrapped_fut)
+ }
+}
+
+impl Future for PluginOpAsyncFuture {
+ type Output = <OpAsyncFuture as Future>::Output;
+ fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
+ self.fut.as_mut().unwrap().poll_unpin(ctx)
+ }
+}
+
+impl Drop for PluginOpAsyncFuture {
+ fn drop(&mut self) {
+ self.fut.take();
+ }
+}
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
new file mode 100644
index 000000000..67b3d0761
--- /dev/null
+++ b/runtime/ops/process.rs
@@ -0,0 +1,290 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use super::io::{std_file_resource, StreamResource, StreamResourceHolder};
+use crate::permissions::Permissions;
+use deno_core::error::bad_resource_id;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::future::FutureExt;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::rc::Rc;
+use tokio::process::Command;
+
+#[cfg(unix)]
+use std::os::unix::process::ExitStatusExt;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_run", op_run);
+ super::reg_json_async(rt, "op_run_status", op_run_status);
+ super::reg_json_sync(rt, "op_kill", op_kill);
+}
+
+fn clone_file(
+ state: &mut OpState,
+ rid: u32,
+) -> Result<std::fs::File, AnyError> {
+ std_file_resource(state, rid, move |r| match r {
+ Ok(std_file) => std_file.try_clone().map_err(AnyError::from),
+ Err(_) => Err(bad_resource_id()),
+ })
+}
+
+fn subprocess_stdio_map(s: &str) -> Result<std::process::Stdio, AnyError> {
+ match s {
+ "inherit" => Ok(std::process::Stdio::inherit()),
+ "piped" => Ok(std::process::Stdio::piped()),
+ "null" => Ok(std::process::Stdio::null()),
+ _ => Err(type_error("Invalid resource for stdio")),
+ }
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RunArgs {
+ cmd: Vec<String>,
+ cwd: Option<String>,
+ env: Vec<(String, String)>,
+ stdin: String,
+ stdout: String,
+ stderr: String,
+ stdin_rid: u32,
+ stdout_rid: u32,
+ stderr_rid: u32,
+}
+
+struct ChildResource {
+ child: tokio::process::Child,
+}
+
+fn op_run(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let run_args: RunArgs = serde_json::from_value(args)?;
+ state.borrow::<Permissions>().check_run()?;
+
+ let args = run_args.cmd;
+ let env = run_args.env;
+ let cwd = run_args.cwd;
+
+ let mut c = Command::new(args.get(0).unwrap());
+ (1..args.len()).for_each(|i| {
+ let arg = args.get(i).unwrap();
+ c.arg(arg);
+ });
+ cwd.map(|d| c.current_dir(d));
+ for (key, value) in &env {
+ c.env(key, value);
+ }
+
+ // TODO: make this work with other resources, eg. sockets
+ if !run_args.stdin.is_empty() {
+ c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())?);
+ } else {
+ let file = clone_file(state, run_args.stdin_rid)?;
+ c.stdin(file);
+ }
+
+ if !run_args.stdout.is_empty() {
+ c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())?);
+ } else {
+ let file = clone_file(state, run_args.stdout_rid)?;
+ c.stdout(file);
+ }
+
+ if !run_args.stderr.is_empty() {
+ c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())?);
+ } else {
+ let file = clone_file(state, run_args.stderr_rid)?;
+ c.stderr(file);
+ }
+
+ // We want to kill child when it's closed
+ c.kill_on_drop(true);
+
+ // Spawn the command.
+ let mut child = c.spawn()?;
+ let pid = child.id();
+
+ let stdin_rid = match child.stdin.take() {
+ Some(child_stdin) => {
+ let rid = state.resource_table.add(
+ "childStdin",
+ Box::new(StreamResourceHolder::new(StreamResource::ChildStdin(
+ child_stdin,
+ ))),
+ );
+ Some(rid)
+ }
+ None => None,
+ };
+
+ let stdout_rid = match child.stdout.take() {
+ Some(child_stdout) => {
+ let rid = state.resource_table.add(
+ "childStdout",
+ Box::new(StreamResourceHolder::new(StreamResource::ChildStdout(
+ child_stdout,
+ ))),
+ );
+ Some(rid)
+ }
+ None => None,
+ };
+
+ let stderr_rid = match child.stderr.take() {
+ Some(child_stderr) => {
+ let rid = state.resource_table.add(
+ "childStderr",
+ Box::new(StreamResourceHolder::new(StreamResource::ChildStderr(
+ child_stderr,
+ ))),
+ );
+ Some(rid)
+ }
+ None => None,
+ };
+
+ let child_resource = ChildResource { child };
+ let child_rid = state.resource_table.add("child", Box::new(child_resource));
+
+ Ok(json!({
+ "rid": child_rid,
+ "pid": pid,
+ "stdinRid": stdin_rid,
+ "stdoutRid": stdout_rid,
+ "stderrRid": stderr_rid,
+ }))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct RunStatusArgs {
+ rid: i32,
+}
+
+async fn op_run_status(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: RunStatusArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+
+ {
+ let s = state.borrow();
+ s.borrow::<Permissions>().check_run()?;
+ }
+
+ let run_status = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let child_resource = state
+ .resource_table
+ .get_mut::<ChildResource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let child = &mut child_resource.child;
+ child.poll_unpin(cx).map_err(AnyError::from)
+ })
+ .await?;
+
+ let code = run_status.code();
+
+ #[cfg(unix)]
+ let signal = run_status.signal();
+ #[cfg(not(unix))]
+ let signal = None;
+
+ code
+ .or(signal)
+ .expect("Should have either an exit code or a signal.");
+ let got_signal = signal.is_some();
+
+ Ok(json!({
+ "gotSignal": got_signal,
+ "exitCode": code.unwrap_or(-1),
+ "exitSignal": signal.unwrap_or(-1),
+ }))
+}
+
+#[cfg(not(unix))]
+const SIGINT: i32 = 2;
+#[cfg(not(unix))]
+const SIGKILL: i32 = 9;
+#[cfg(not(unix))]
+const SIGTERM: i32 = 15;
+
+#[cfg(not(unix))]
+use winapi::{
+ shared::minwindef::DWORD,
+ um::{
+ handleapi::CloseHandle,
+ processthreadsapi::{OpenProcess, TerminateProcess},
+ winnt::PROCESS_TERMINATE,
+ },
+};
+
+#[cfg(unix)]
+pub fn kill(pid: i32, signo: i32) -> Result<(), AnyError> {
+ use nix::sys::signal::{kill as unix_kill, Signal};
+ use nix::unistd::Pid;
+ use std::convert::TryFrom;
+ let sig = Signal::try_from(signo)?;
+ unix_kill(Pid::from_raw(pid), Option::Some(sig)).map_err(AnyError::from)
+}
+
+#[cfg(not(unix))]
+pub fn kill(pid: i32, signal: i32) -> Result<(), AnyError> {
+ use std::io::Error;
+ match signal {
+ SIGINT | SIGKILL | SIGTERM => {
+ if pid <= 0 {
+ return Err(type_error("unsupported pid"));
+ }
+ unsafe {
+ let handle = OpenProcess(PROCESS_TERMINATE, 0, pid as DWORD);
+ if handle.is_null() {
+ return Err(Error::last_os_error().into());
+ }
+ if TerminateProcess(handle, 1) == 0 {
+ CloseHandle(handle);
+ return Err(Error::last_os_error().into());
+ }
+ if CloseHandle(handle) == 0 {
+ return Err(Error::last_os_error().into());
+ }
+ }
+ }
+ _ => {
+ return Err(type_error("unsupported signal"));
+ }
+ }
+ Ok(())
+}
+
+#[derive(Deserialize)]
+struct KillArgs {
+ pid: i32,
+ signo: i32,
+}
+
+fn op_kill(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.kill");
+ state.borrow::<Permissions>().check_run()?;
+
+ let args: KillArgs = serde_json::from_value(args)?;
+ kill(args.pid, args.signo)?;
+ Ok(json!({}))
+}
diff --git a/runtime/ops/runtime.rs b/runtime/ops/runtime.rs
new file mode 100644
index 000000000..cb3b53d53
--- /dev/null
+++ b/runtime/ops/runtime.rs
@@ -0,0 +1,118 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::metrics::Metrics;
+use crate::permissions::Permissions;
+use deno_core::error::AnyError;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::ModuleSpecifier;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+
+pub fn init(rt: &mut deno_core::JsRuntime, main_module: ModuleSpecifier) {
+ {
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ state.put::<ModuleSpecifier>(main_module);
+ }
+ super::reg_json_sync(rt, "op_main_module", op_main_module);
+ super::reg_json_sync(rt, "op_metrics", op_metrics);
+}
+
+fn op_main_module(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let main = state.borrow::<ModuleSpecifier>().to_string();
+ let main_url = ModuleSpecifier::resolve_url_or_path(&main)?;
+ if main_url.as_url().scheme() == "file" {
+ let main_path = std::env::current_dir().unwrap().join(main_url.to_string());
+ state
+ .borrow::<Permissions>()
+ .check_read_blind(&main_path, "main_module")?;
+ }
+ Ok(json!(&main))
+}
+
+fn op_metrics(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let m = state.borrow::<Metrics>();
+
+ Ok(json!({
+ "opsDispatched": m.ops_dispatched,
+ "opsDispatchedSync": m.ops_dispatched_sync,
+ "opsDispatchedAsync": m.ops_dispatched_async,
+ "opsDispatchedAsyncUnref": m.ops_dispatched_async_unref,
+ "opsCompleted": m.ops_completed,
+ "opsCompletedSync": m.ops_completed_sync,
+ "opsCompletedAsync": m.ops_completed_async,
+ "opsCompletedAsyncUnref": m.ops_completed_async_unref,
+ "bytesSentControl": m.bytes_sent_control,
+ "bytesSentData": m.bytes_sent_data,
+ "bytesReceived": m.bytes_received
+ }))
+}
+
+pub fn ppid() -> Value {
+ #[cfg(windows)]
+ {
+ // Adopted from rustup:
+ // https://github.com/rust-lang/rustup/blob/1.21.1/src/cli/self_update.rs#L1036
+ // Copyright Diggory Blake, the Mozilla Corporation, and rustup contributors.
+ // Licensed under either of
+ // - Apache License, Version 2.0
+ // - MIT license
+ use std::mem;
+ use winapi::shared::minwindef::DWORD;
+ use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE};
+ use winapi::um::processthreadsapi::GetCurrentProcessId;
+ use winapi::um::tlhelp32::{
+ CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32,
+ TH32CS_SNAPPROCESS,
+ };
+ unsafe {
+ // Take a snapshot of system processes, one of which is ours
+ // and contains our parent's pid
+ let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
+ if snapshot == INVALID_HANDLE_VALUE {
+ return serde_json::to_value(-1).unwrap();
+ }
+
+ let mut entry: PROCESSENTRY32 = mem::zeroed();
+ entry.dwSize = mem::size_of::<PROCESSENTRY32>() as DWORD;
+
+ // Iterate over system processes looking for ours
+ let success = Process32First(snapshot, &mut entry);
+ if success == 0 {
+ CloseHandle(snapshot);
+ return serde_json::to_value(-1).unwrap();
+ }
+
+ let this_pid = GetCurrentProcessId();
+ while entry.th32ProcessID != this_pid {
+ let success = Process32Next(snapshot, &mut entry);
+ if success == 0 {
+ CloseHandle(snapshot);
+ return serde_json::to_value(-1).unwrap();
+ }
+ }
+ CloseHandle(snapshot);
+
+ // FIXME: Using the process ID exposes a race condition
+ // wherein the parent process already exited and the OS
+ // reassigned its ID.
+ let parent_id = entry.th32ParentProcessID;
+ serde_json::to_value(parent_id).unwrap()
+ }
+ }
+ #[cfg(not(windows))]
+ {
+ use std::os::unix::process::parent_id;
+ serde_json::to_value(parent_id()).unwrap()
+ }
+}
diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs
new file mode 100644
index 000000000..be6bc0a35
--- /dev/null
+++ b/runtime/ops/signal.rs
@@ -0,0 +1,142 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::error::AnyError;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use std::cell::RefCell;
+use std::rc::Rc;
+
+#[cfg(unix)]
+use deno_core::error::bad_resource_id;
+#[cfg(unix)]
+use deno_core::futures::future::poll_fn;
+#[cfg(unix)]
+use deno_core::serde_json;
+#[cfg(unix)]
+use deno_core::serde_json::json;
+#[cfg(unix)]
+use serde::Deserialize;
+#[cfg(unix)]
+use std::task::Waker;
+#[cfg(unix)]
+use tokio::signal::unix::{signal, Signal, SignalKind};
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_signal_bind", op_signal_bind);
+ super::reg_json_sync(rt, "op_signal_unbind", op_signal_unbind);
+ super::reg_json_async(rt, "op_signal_poll", op_signal_poll);
+}
+
+#[cfg(unix)]
+/// The resource for signal stream.
+/// The second element is the waker of polling future.
+pub struct SignalStreamResource(pub Signal, pub Option<Waker>);
+
+#[cfg(unix)]
+#[derive(Deserialize)]
+struct BindSignalArgs {
+ signo: i32,
+}
+
+#[cfg(unix)]
+#[derive(Deserialize)]
+struct SignalArgs {
+ rid: i32,
+}
+
+#[cfg(unix)]
+fn op_signal_bind(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.signal");
+ let args: BindSignalArgs = serde_json::from_value(args)?;
+ let rid = state.resource_table.add(
+ "signal",
+ Box::new(SignalStreamResource(
+ signal(SignalKind::from_raw(args.signo)).expect(""),
+ None,
+ )),
+ );
+ Ok(json!({
+ "rid": rid,
+ }))
+}
+
+#[cfg(unix)]
+async fn op_signal_poll(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ super::check_unstable2(&state, "Deno.signal");
+ let args: SignalArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+
+ let future = poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ if let Some(mut signal) =
+ state.resource_table.get_mut::<SignalStreamResource>(rid)
+ {
+ signal.1 = Some(cx.waker().clone());
+ return signal.0.poll_recv(cx);
+ }
+ std::task::Poll::Ready(None)
+ });
+ let result = future.await;
+ Ok(json!({ "done": result.is_none() }))
+}
+
+#[cfg(unix)]
+pub fn op_signal_unbind(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.signal");
+ let args: SignalArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let resource = state.resource_table.get_mut::<SignalStreamResource>(rid);
+ if let Some(signal) = resource {
+ if let Some(waker) = &signal.1 {
+ // Wakes up the pending poll if exists.
+ // This prevents the poll future from getting stuck forever.
+ waker.clone().wake();
+ }
+ }
+ state
+ .resource_table
+ .close(rid)
+ .ok_or_else(bad_resource_id)?;
+ Ok(json!({}))
+}
+
+#[cfg(not(unix))]
+pub fn op_signal_bind(
+ _state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ unimplemented!();
+}
+
+#[cfg(not(unix))]
+fn op_signal_unbind(
+ _state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ unimplemented!();
+}
+
+#[cfg(not(unix))]
+async fn op_signal_poll(
+ _state: Rc<RefCell<OpState>>,
+ _args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ unimplemented!();
+}
diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs
new file mode 100644
index 000000000..8037fd698
--- /dev/null
+++ b/runtime/ops/timers.rs
@@ -0,0 +1,193 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+//! This module helps deno implement timers.
+//!
+//! As an optimization, we want to avoid an expensive calls into rust for every
+//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is
+//! implemented that calls into Rust for only the smallest timeout. Thus we
+//! only need to be able to start, cancel and await a single timer (or Delay, as Tokio
+//! calls it) for an entire Isolate. This is what is implemented here.
+
+use super::dispatch_minimal::minimal_op;
+use super::dispatch_minimal::MinimalOp;
+use crate::metrics::metrics_op;
+use crate::permissions::Permissions;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::channel::oneshot;
+use deno_core::futures::FutureExt;
+use deno_core::futures::TryFutureExt;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::thread::sleep;
+use std::time::Duration;
+use std::time::Instant;
+
+pub type StartTime = Instant;
+
+type TimerFuture = Pin<Box<dyn Future<Output = Result<(), ()>>>>;
+
+#[derive(Default)]
+pub struct GlobalTimer {
+ tx: Option<oneshot::Sender<()>>,
+ pub future: Option<TimerFuture>,
+}
+
+impl GlobalTimer {
+ pub fn cancel(&mut self) {
+ if let Some(tx) = self.tx.take() {
+ tx.send(()).ok();
+ }
+ }
+
+ pub fn new_timeout(&mut self, deadline: Instant) {
+ if self.tx.is_some() {
+ self.cancel();
+ }
+ assert!(self.tx.is_none());
+ self.future.take();
+
+ let (tx, rx) = oneshot::channel();
+ self.tx = Some(tx);
+
+ let delay = tokio::time::delay_until(deadline.into());
+ let rx = rx
+ .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));
+
+ let fut = futures::future::select(delay, rx)
+ .then(|_| futures::future::ok(()))
+ .boxed_local();
+ self.future = Some(fut);
+ }
+}
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ {
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ state.put::<GlobalTimer>(GlobalTimer::default());
+ state.put::<StartTime>(StartTime::now());
+ }
+ super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop);
+ super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start);
+ super::reg_json_async(rt, "op_global_timer", op_global_timer);
+ rt.register_op("op_now", metrics_op(minimal_op(op_now)));
+ super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync);
+}
+
+fn op_global_timer_stop(
+ state: &mut OpState,
+ _args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let global_timer = state.borrow_mut::<GlobalTimer>();
+ global_timer.cancel();
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+struct GlobalTimerArgs {
+ timeout: u64,
+}
+
+// Set up a timer that will be later awaited by JS promise.
+// It's a separate op, because canceling a timeout immediately
+// after setting it caused a race condition (because Tokio timeout)
+// might have been registered after next event loop tick.
+//
+// See https://github.com/denoland/deno/issues/7599 for more
+// details.
+fn op_global_timer_start(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: GlobalTimerArgs = serde_json::from_value(args)?;
+ let val = args.timeout;
+
+ let deadline = Instant::now() + Duration::from_millis(val);
+ let global_timer = state.borrow_mut::<GlobalTimer>();
+ global_timer.new_timeout(deadline);
+ Ok(json!({}))
+}
+
+async fn op_global_timer(
+ state: Rc<RefCell<OpState>>,
+ _args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let maybe_timer_fut = {
+ let mut s = state.borrow_mut();
+ let global_timer = s.borrow_mut::<GlobalTimer>();
+ global_timer.future.take()
+ };
+ if let Some(timer_fut) = maybe_timer_fut {
+ let _ = timer_fut.await;
+ }
+ Ok(json!({}))
+}
+
+// Returns a milliseconds and nanoseconds subsec
+// since the start time of the deno runtime.
+// If the High precision flag is not set, the
+// nanoseconds are rounded on 2ms.
+fn op_now(
+ state: Rc<RefCell<OpState>>,
+ // Arguments are discarded
+ _sync: bool,
+ _x: i32,
+ mut zero_copy: BufVec,
+) -> MinimalOp {
+ match zero_copy.len() {
+ 0 => return MinimalOp::Sync(Err(type_error("no buffer specified"))),
+ 1 => {}
+ _ => {
+ return MinimalOp::Sync(Err(type_error("Invalid number of arguments")))
+ }
+ }
+
+ let op_state = state.borrow();
+ let start_time = op_state.borrow::<StartTime>();
+ let seconds = start_time.elapsed().as_secs();
+ let mut subsec_nanos = start_time.elapsed().subsec_nanos() as f64;
+ let reduced_time_precision = 2_000_000.0; // 2ms in nanoseconds
+
+ // If the permission is not enabled
+ // Round the nano result on 2 milliseconds
+ // see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision
+ if op_state.borrow::<Permissions>().check_hrtime().is_err() {
+ subsec_nanos -= subsec_nanos % reduced_time_precision;
+ }
+
+ let result = (seconds * 1_000) as f64 + (subsec_nanos / 1_000_000.0);
+
+ (&mut zero_copy[0]).copy_from_slice(&result.to_be_bytes());
+
+ MinimalOp::Sync(Ok(0))
+}
+
+#[derive(Deserialize)]
+struct SleepArgs {
+ millis: u64,
+}
+
+fn op_sleep_sync(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.sleepSync");
+ let args: SleepArgs = serde_json::from_value(args)?;
+ sleep(Duration::from_millis(args.millis));
+ Ok(json!({}))
+}
diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs
new file mode 100644
index 000000000..37fd8f206
--- /dev/null
+++ b/runtime/ops/tls.rs
@@ -0,0 +1,431 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use super::io::{StreamResource, StreamResourceHolder};
+use crate::permissions::Permissions;
+use crate::resolve_addr::resolve_addr;
+use deno_core::error::bad_resource;
+use deno_core::error::bad_resource_id;
+use deno_core::error::custom_error;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::future::poll_fn;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::convert::From;
+use std::fs::File;
+use std::io::BufReader;
+use std::net::SocketAddr;
+use std::path::Path;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
+use tokio_rustls::{rustls::ClientConfig, TlsConnector};
+use tokio_rustls::{
+ rustls::{
+ internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys},
+ Certificate, NoClientAuth, PrivateKey, ServerConfig,
+ },
+ TlsAcceptor,
+};
+use webpki::DNSNameRef;
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_async(rt, "op_start_tls", op_start_tls);
+ super::reg_json_async(rt, "op_connect_tls", op_connect_tls);
+ super::reg_json_sync(rt, "op_listen_tls", op_listen_tls);
+ super::reg_json_async(rt, "op_accept_tls", op_accept_tls);
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ConnectTLSArgs {
+ transport: String,
+ hostname: String,
+ port: u16,
+ cert_file: Option<String>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct StartTLSArgs {
+ rid: u32,
+ cert_file: Option<String>,
+ hostname: String,
+}
+
+async fn op_start_tls(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: StartTLSArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let cert_file = args.cert_file.clone();
+
+ let mut domain = args.hostname;
+ if domain.is_empty() {
+ domain.push_str("localhost");
+ }
+ {
+ super::check_unstable2(&state, "Deno.startTls");
+ let s = state.borrow();
+ let permissions = s.borrow::<Permissions>();
+ permissions.check_net(&domain, 0)?;
+ if let Some(path) = cert_file.clone() {
+ permissions.check_read(Path::new(&path))?;
+ }
+ }
+ let mut resource_holder = {
+ let mut state_ = state.borrow_mut();
+ match state_.resource_table.remove::<StreamResourceHolder>(rid) {
+ Some(resource) => *resource,
+ None => return Err(bad_resource_id()),
+ }
+ };
+
+ if let StreamResource::TcpStream(ref mut tcp_stream) =
+ resource_holder.resource
+ {
+ let tcp_stream = tcp_stream.take().unwrap();
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_.resource_table.add(
+ "clientTlsStream",
+ Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
+ Box::new(tls_stream),
+ ))),
+ )
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": "tcp",
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": "tcp",
+ }
+ }))
+ } else {
+ Err(bad_resource_id())
+ }
+}
+
+async fn op_connect_tls(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: ConnectTLSArgs = serde_json::from_value(args)?;
+ let cert_file = args.cert_file.clone();
+ {
+ let s = state.borrow();
+ let permissions = s.borrow::<Permissions>();
+ permissions.check_net(&args.hostname, args.port)?;
+ if let Some(path) = cert_file.clone() {
+ permissions.check_read(Path::new(&path))?;
+ }
+ }
+ let mut domain = args.hostname.clone();
+ if domain.is_empty() {
+ domain.push_str("localhost");
+ }
+
+ let addr = resolve_addr(&args.hostname, args.port)?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_.resource_table.add(
+ "clientTlsStream",
+ Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream(
+ Box::new(tls_stream),
+ ))),
+ )
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": args.transport,
+ },
+ "remoteAddr": {
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port(),
+ "transport": args.transport,
+ }
+ }))
+}
+
+fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> {
+ let cert_file = File::open(path)?;
+ let reader = &mut BufReader::new(cert_file);
+
+ let certs = certs(reader)
+ .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?;
+
+ if certs.is_empty() {
+ let e = custom_error("InvalidData", "No certificates found in cert file");
+ return Err(e);
+ }
+
+ Ok(certs)
+}
+
+fn key_decode_err() -> AnyError {
+ custom_error("InvalidData", "Unable to decode key")
+}
+
+fn key_not_found_err() -> AnyError {
+ custom_error("InvalidData", "No keys found in key file")
+}
+
+/// Starts with -----BEGIN RSA PRIVATE KEY-----
+fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?;
+ Ok(keys)
+}
+
+/// Starts with -----BEGIN PRIVATE KEY-----
+fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?;
+ Ok(keys)
+}
+
+fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> {
+ let path = path.to_string();
+ let mut keys = load_rsa_keys(&path)?;
+
+ if keys.is_empty() {
+ keys = load_pkcs8_keys(&path)?;
+ }
+
+ if keys.is_empty() {
+ return Err(key_not_found_err());
+ }
+
+ Ok(keys)
+}
+
+#[allow(dead_code)]
+pub struct TlsListenerResource {
+ listener: TcpListener,
+ tls_acceptor: TlsAcceptor,
+ waker: Option<futures::task::AtomicWaker>,
+ local_addr: SocketAddr,
+}
+
+impl Drop for TlsListenerResource {
+ fn drop(&mut self) {
+ self.wake_task();
+ }
+}
+
+impl TlsListenerResource {
+ /// Track the current task so future awaiting for connection
+ /// can be notified when listener is closed.
+ ///
+ /// Throws an error if another task is already tracked.
+ pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> {
+ // Currently, we only allow tracking a single accept task for a listener.
+ // This might be changed in the future with multiple workers.
+ // Caveat: TcpListener by itself also only tracks an accept task at a time.
+ // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883
+ if self.waker.is_some() {
+ return Err(custom_error("Busy", "Another accept task is ongoing"));
+ }
+
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ self.waker.replace(waker);
+ Ok(())
+ }
+
+ /// Notifies a task when listener is closed so accept future can resolve.
+ pub fn wake_task(&mut self) {
+ if let Some(waker) = self.waker.as_ref() {
+ waker.wake();
+ }
+ }
+
+ /// Stop tracking a task.
+ /// Happens when the task is done and thus no further tracking is needed.
+ pub fn untrack_task(&mut self) {
+ self.waker.take();
+ }
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct ListenTlsArgs {
+ transport: String,
+ hostname: String,
+ port: u16,
+ cert_file: String,
+ key_file: String,
+}
+
+fn op_listen_tls(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: ListenTlsArgs = serde_json::from_value(args)?;
+ assert_eq!(args.transport, "tcp");
+
+ let cert_file = args.cert_file;
+ let key_file = args.key_file;
+ {
+ let permissions = state.borrow::<Permissions>();
+ permissions.check_net(&args.hostname, args.port)?;
+ permissions.check_read(Path::new(&cert_file))?;
+ permissions.check_read(Path::new(&key_file))?;
+ }
+ let mut config = ServerConfig::new(NoClientAuth::new());
+ config
+ .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0))
+ .expect("invalid key or certificate");
+ let tls_acceptor = TlsAcceptor::from(Arc::new(config));
+ let addr = resolve_addr(&args.hostname, args.port)?;
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let local_addr = listener.local_addr()?;
+ let tls_listener_resource = TlsListenerResource {
+ listener,
+ tls_acceptor,
+ waker: None,
+ local_addr,
+ };
+
+ let rid = state
+ .resource_table
+ .add("tlsListener", Box::new(tls_listener_resource));
+
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port(),
+ "transport": args.transport,
+ },
+ }))
+}
+
+#[derive(Deserialize)]
+struct AcceptTlsArgs {
+ rid: i32,
+}
+
+async fn op_accept_tls(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: AcceptTlsArgs = serde_json::from_value(args)?;
+ let rid = args.rid as u32;
+ let accept_fut = poll_fn(|cx| {
+ let mut state = state.borrow_mut();
+ let listener_resource = state
+ .resource_table
+ .get_mut::<TlsListenerResource>(rid)
+ .ok_or_else(|| bad_resource("Listener has been closed"))?;
+ let listener = &mut listener_resource.listener;
+ match listener.poll_accept(cx).map_err(AnyError::from) {
+ Poll::Ready(Ok((stream, addr))) => {
+ listener_resource.untrack_task();
+ Poll::Ready(Ok((stream, addr)))
+ }
+ Poll::Pending => {
+ listener_resource.track_task(cx)?;
+ Poll::Pending
+ }
+ Poll::Ready(Err(e)) => {
+ listener_resource.untrack_task();
+ Poll::Ready(Err(e))
+ }
+ }
+ });
+ let (tcp_stream, _socket_addr) = accept_fut.await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let tls_acceptor = {
+ let state_ = state.borrow();
+ let resource = state_
+ .resource_table
+ .get::<TlsListenerResource>(rid)
+ .ok_or_else(bad_resource_id)
+ .expect("Can't find tls listener");
+ resource.tls_acceptor.clone()
+ };
+ let tls_stream = tls_acceptor.accept(tcp_stream).await?;
+ let rid = {
+ let mut state_ = state.borrow_mut();
+ state_.resource_table.add(
+ "serverTlsStream",
+ Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream(
+ Box::new(tls_stream),
+ ))),
+ )
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": {
+ "transport": "tcp",
+ "hostname": local_addr.ip().to_string(),
+ "port": local_addr.port()
+ },
+ "remoteAddr": {
+ "transport": "tcp",
+ "hostname": remote_addr.ip().to_string(),
+ "port": remote_addr.port()
+ }
+ }))
+}
diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs
new file mode 100644
index 000000000..ad66bcf1a
--- /dev/null
+++ b/runtime/ops/tty.rs
@@ -0,0 +1,334 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use super::io::std_file_resource;
+use super::io::StreamResource;
+use super::io::StreamResourceHolder;
+use deno_core::error::bad_resource_id;
+use deno_core::error::not_supported;
+use deno_core::error::resource_unavailable;
+use deno_core::error::AnyError;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use serde::Serialize;
+use std::io::Error;
+
+#[cfg(unix)]
+use nix::sys::termios;
+
+#[cfg(windows)]
+use deno_core::error::custom_error;
+#[cfg(windows)]
+use winapi::shared::minwindef::DWORD;
+#[cfg(windows)]
+use winapi::um::wincon;
+#[cfg(windows)]
+const RAW_MODE_MASK: DWORD = wincon::ENABLE_LINE_INPUT
+ | wincon::ENABLE_ECHO_INPUT
+ | wincon::ENABLE_PROCESSED_INPUT;
+
+#[cfg(windows)]
+fn get_windows_handle(
+ f: &std::fs::File,
+) -> Result<std::os::windows::io::RawHandle, AnyError> {
+ use std::os::windows::io::AsRawHandle;
+ use winapi::um::handleapi;
+
+ let handle = f.as_raw_handle();
+ if handle == handleapi::INVALID_HANDLE_VALUE {
+ return Err(Error::last_os_error().into());
+ } else if handle.is_null() {
+ return Err(custom_error("ReferenceError", "null handle"));
+ }
+ Ok(handle)
+}
+
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_set_raw", op_set_raw);
+ super::reg_json_sync(rt, "op_isatty", op_isatty);
+ super::reg_json_sync(rt, "op_console_size", op_console_size);
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SetRawOptions {
+ cbreak: bool,
+}
+
+#[derive(Deserialize)]
+struct SetRawArgs {
+ rid: u32,
+ mode: bool,
+ options: SetRawOptions,
+}
+
+fn op_set_raw(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.setRaw");
+
+ let args: SetRawArgs = serde_json::from_value(args)?;
+ let rid = args.rid;
+ let is_raw = args.mode;
+ let cbreak = args.options.cbreak;
+
+ // From https://github.com/kkawakam/rustyline/blob/master/src/tty/windows.rs
+ // and https://github.com/kkawakam/rustyline/blob/master/src/tty/unix.rs
+ // and https://github.com/crossterm-rs/crossterm/blob/e35d4d2c1cc4c919e36d242e014af75f6127ab50/src/terminal/sys/windows.rs
+ // Copyright (c) 2015 Katsu Kawakami & Rustyline authors. MIT license.
+ // Copyright (c) 2019 Timon. MIT license.
+ #[cfg(windows)]
+ {
+ use std::os::windows::io::AsRawHandle;
+ use winapi::shared::minwindef::FALSE;
+ use winapi::um::{consoleapi, handleapi};
+
+ let resource_holder =
+ state.resource_table.get_mut::<StreamResourceHolder>(rid);
+ if resource_holder.is_none() {
+ return Err(bad_resource_id());
+ }
+ if cbreak {
+ return Err(not_supported());
+ }
+ let resource_holder = resource_holder.unwrap();
+
+ // For now, only stdin.
+ let handle = match &mut resource_holder.resource {
+ StreamResource::FsFile(ref mut option_file_metadata) => {
+ if let Some((tokio_file, metadata)) = option_file_metadata.take() {
+ match tokio_file.try_into_std() {
+ Ok(std_file) => {
+ let raw_handle = std_file.as_raw_handle();
+ // Turn the std_file handle back into a tokio file, put it back
+ // in the resource table.
+ let tokio_file = tokio::fs::File::from_std(std_file);
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ // return the result.
+ raw_handle
+ }
+ Err(tokio_file) => {
+ // This function will return an error containing the file if
+ // some operation is in-flight.
+ resource_holder.resource =
+ StreamResource::FsFile(Some((tokio_file, metadata)));
+ return Err(resource_unavailable());
+ }
+ }
+ } else {
+ return Err(resource_unavailable());
+ }
+ }
+ _ => {
+ return Err(bad_resource_id());
+ }
+ };
+
+ if handle == handleapi::INVALID_HANDLE_VALUE {
+ return Err(Error::last_os_error().into());
+ } else if handle.is_null() {
+ return Err(custom_error("ReferenceError", "null handle"));
+ }
+ let mut original_mode: DWORD = 0;
+ if unsafe { consoleapi::GetConsoleMode(handle, &mut original_mode) }
+ == FALSE
+ {
+ return Err(Error::last_os_error().into());
+ }
+ let new_mode = if is_raw {
+ original_mode & !RAW_MODE_MASK
+ } else {
+ original_mode | RAW_MODE_MASK
+ };
+ if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE {
+ return Err(Error::last_os_error().into());
+ }
+
+ Ok(json!({}))
+ }
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+
+ let resource_holder =
+ state.resource_table.get_mut::<StreamResourceHolder>(rid);
+ if resource_holder.is_none() {
+ return Err(bad_resource_id());
+ }
+
+ if is_raw {
+ let (raw_fd, maybe_tty_mode) =
+ match &mut resource_holder.unwrap().resource {
+ StreamResource::FsFile(Some((f, ref mut metadata))) => {
+ (f.as_raw_fd(), &mut metadata.tty.mode)
+ }
+ StreamResource::FsFile(None) => return Err(resource_unavailable()),
+ _ => {
+ return Err(not_supported());
+ }
+ };
+
+ if maybe_tty_mode.is_none() {
+ // Save original mode.
+ let original_mode = termios::tcgetattr(raw_fd)?;
+ maybe_tty_mode.replace(original_mode);
+ }
+
+ let mut raw = maybe_tty_mode.clone().unwrap();
+
+ raw.input_flags &= !(termios::InputFlags::BRKINT
+ | termios::InputFlags::ICRNL
+ | termios::InputFlags::INPCK
+ | termios::InputFlags::ISTRIP
+ | termios::InputFlags::IXON);
+
+ raw.control_flags |= termios::ControlFlags::CS8;
+
+ raw.local_flags &= !(termios::LocalFlags::ECHO
+ | termios::LocalFlags::ICANON
+ | termios::LocalFlags::IEXTEN);
+ if !cbreak {
+ raw.local_flags &= !(termios::LocalFlags::ISIG);
+ }
+ raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1;
+ raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0;
+ termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?;
+ Ok(json!({}))
+ } else {
+ // Try restore saved mode.
+ let (raw_fd, maybe_tty_mode) =
+ match &mut resource_holder.unwrap().resource {
+ StreamResource::FsFile(Some((f, ref mut metadata))) => {
+ (f.as_raw_fd(), &mut metadata.tty.mode)
+ }
+ StreamResource::FsFile(None) => {
+ return Err(resource_unavailable());
+ }
+ _ => {
+ return Err(bad_resource_id());
+ }
+ };
+
+ if let Some(mode) = maybe_tty_mode.take() {
+ termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?;
+ }
+
+ Ok(json!({}))
+ }
+ }
+}
+
+#[derive(Deserialize)]
+struct IsattyArgs {
+ rid: u32,
+}
+
+fn op_isatty(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: IsattyArgs = serde_json::from_value(args)?;
+ let rid = args.rid;
+
+ let isatty: bool = std_file_resource(state, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ #[cfg(windows)]
+ {
+ use winapi::um::consoleapi;
+
+ let handle = get_windows_handle(&std_file)?;
+ let mut test_mode: DWORD = 0;
+ // If I cannot get mode out of console, it is not a console.
+ Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 })
+ }
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+ let raw_fd = std_file.as_raw_fd();
+ Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 })
+ }
+ }
+ Err(StreamResource::FsFile(_)) => unreachable!(),
+ _ => Ok(false),
+ })?;
+ Ok(json!(isatty))
+}
+
+#[derive(Deserialize)]
+struct ConsoleSizeArgs {
+ rid: u32,
+}
+
+#[derive(Serialize)]
+struct ConsoleSize {
+ columns: u32,
+ rows: u32,
+}
+
+fn op_console_size(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ super::check_unstable(state, "Deno.consoleSize");
+
+ let args: ConsoleSizeArgs = serde_json::from_value(args)?;
+ let rid = args.rid;
+
+ let size = std_file_resource(state, rid as u32, move |r| match r {
+ Ok(std_file) => {
+ #[cfg(windows)]
+ {
+ use std::os::windows::io::AsRawHandle;
+ let handle = std_file.as_raw_handle();
+
+ unsafe {
+ let mut bufinfo: winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO =
+ std::mem::zeroed();
+
+ if winapi::um::wincon::GetConsoleScreenBufferInfo(
+ handle,
+ &mut bufinfo,
+ ) == 0
+ {
+ return Err(Error::last_os_error().into());
+ }
+
+ Ok(ConsoleSize {
+ columns: bufinfo.dwSize.X as u32,
+ rows: bufinfo.dwSize.Y as u32,
+ })
+ }
+ }
+
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+
+ let fd = std_file.as_raw_fd();
+ unsafe {
+ let mut size: libc::winsize = std::mem::zeroed();
+ if libc::ioctl(fd, libc::TIOCGWINSZ, &mut size as *mut _) != 0 {
+ return Err(Error::last_os_error().into());
+ }
+
+ // TODO (caspervonb) return a tuple instead
+ Ok(ConsoleSize {
+ columns: size.ws_col as u32,
+ rows: size.ws_row as u32,
+ })
+ }
+ }
+ }
+ Err(_) => Err(bad_resource_id()),
+ })?;
+
+ Ok(json!(size))
+}
diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs
new file mode 100644
index 000000000..d88330a04
--- /dev/null
+++ b/runtime/ops/web_worker.rs
@@ -0,0 +1,37 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::web_worker::WebWorkerHandle;
+use crate::web_worker::WorkerEvent;
+use deno_core::futures::channel::mpsc;
+use deno_core::serde_json::json;
+
+pub fn init(
+ rt: &mut deno_core::JsRuntime,
+ sender: mpsc::Sender<WorkerEvent>,
+ handle: WebWorkerHandle,
+) {
+ // Post message to host as guest worker.
+ let sender_ = sender.clone();
+ super::reg_json_sync(
+ rt,
+ "op_worker_post_message",
+ move |_state, _args, bufs| {
+ assert_eq!(bufs.len(), 1, "Invalid number of arguments");
+ let msg_buf: Box<[u8]> = (*bufs[0]).into();
+ sender_
+ .clone()
+ .try_send(WorkerEvent::Message(msg_buf))
+ .expect("Failed to post message to host");
+ Ok(json!({}))
+ },
+ );
+
+ // Notify host that guest worker closes.
+ super::reg_json_sync(rt, "op_worker_close", move |_state, _args, _bufs| {
+ // Notify parent that we're finished
+ sender.clone().close_channel();
+ // Terminate execution of current worker
+ handle.terminate();
+ Ok(json!({}))
+ });
+}
diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs
new file mode 100644
index 000000000..a8c591a33
--- /dev/null
+++ b/runtime/ops/websocket.rs
@@ -0,0 +1,326 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use core::task::Poll;
+use deno_core::error::bad_resource_id;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::futures::StreamExt;
+use deno_core::futures::{ready, SinkExt};
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::url;
+use deno_core::BufVec;
+use deno_core::OpState;
+use deno_core::{serde_json, ZeroCopyBuf};
+use http::{Method, Request, Uri};
+use serde::Deserialize;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::fs::File;
+use std::io::BufReader;
+use std::rc::Rc;
+use std::sync::Arc;
+use tokio::net::TcpStream;
+use tokio_rustls::{rustls::ClientConfig, TlsConnector};
+use tokio_tungstenite::stream::Stream as StreamSwitcher;
+use tokio_tungstenite::tungstenite::Error as TungsteniteError;
+use tokio_tungstenite::tungstenite::{
+ handshake::client::Response, protocol::frame::coding::CloseCode,
+ protocol::CloseFrame, Message,
+};
+use tokio_tungstenite::{client_async, WebSocketStream};
+use webpki::DNSNameRef;
+
+#[derive(Clone)]
+struct WsCaFile(String);
+#[derive(Clone)]
+struct WsUserAgent(String);
+
+pub fn init(
+ rt: &mut deno_core::JsRuntime,
+ maybe_ca_file: Option<&str>,
+ user_agent: String,
+) {
+ {
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ if let Some(ca_file) = maybe_ca_file {
+ state.put::<WsCaFile>(WsCaFile(ca_file.to_string()));
+ }
+ state.put::<WsUserAgent>(WsUserAgent(user_agent));
+ }
+ super::reg_json_sync(rt, "op_ws_check_permission", op_ws_check_permission);
+ super::reg_json_async(rt, "op_ws_create", op_ws_create);
+ super::reg_json_async(rt, "op_ws_send", op_ws_send);
+ super::reg_json_async(rt, "op_ws_close", op_ws_close);
+ super::reg_json_async(rt, "op_ws_next_event", op_ws_next_event);
+}
+
+type MaybeTlsStream =
+ StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>;
+
+type WsStream = WebSocketStream<MaybeTlsStream>;
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CheckPermissionArgs {
+ url: String,
+}
+
+// This op is needed because creating a WS instance in JavaScript is a sync
+// operation and should throw error when permissions are not fullfiled,
+// but actual op that connects WS is async.
+pub fn op_ws_check_permission(
+ state: &mut OpState,
+ args: Value,
+ _zero_copy: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: CheckPermissionArgs = serde_json::from_value(args)?;
+
+ state
+ .borrow::<Permissions>()
+ .check_net_url(&url::Url::parse(&args.url)?)?;
+
+ Ok(json!({}))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateArgs {
+ url: String,
+ protocols: String,
+}
+
+pub async fn op_ws_create(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: CreateArgs = serde_json::from_value(args)?;
+
+ {
+ let s = state.borrow();
+ s.borrow::<Permissions>()
+ .check_net_url(&url::Url::parse(&args.url)?)
+ .expect(
+ "Permission check should have been done in op_ws_check_permission",
+ );
+ }
+
+ let maybe_ca_file = state.borrow().try_borrow::<WsCaFile>().cloned();
+ let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
+ let uri: Uri = args.url.parse()?;
+ let mut request = Request::builder().method(Method::GET).uri(&uri);
+
+ request = request.header("User-Agent", user_agent);
+
+ if !args.protocols.is_empty() {
+ request = request.header("Sec-WebSocket-Protocol", args.protocols);
+ }
+
+ let request = request.body(())?;
+ let domain = &uri.host().unwrap().to_string();
+ let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
+ Some("wss") => 443,
+ Some("ws") => 80,
+ _ => unreachable!(),
+ });
+ let addr = format!("{}:{}", domain, port);
+ let try_socket = TcpStream::connect(addr).await;
+ let tcp_socket = match try_socket.map_err(TungsteniteError::Io) {
+ Ok(socket) => socket,
+ Err(_) => return Ok(json!({"success": false})),
+ };
+
+ let socket: MaybeTlsStream = match uri.scheme_str() {
+ Some("ws") => StreamSwitcher::Plain(tcp_socket),
+ Some("wss") => {
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+
+ if let Some(ws_ca_file) = maybe_ca_file {
+ let key_file = File::open(ws_ca_file.0)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?;
+ StreamSwitcher::Tls(tls_socket)
+ }
+ _ => unreachable!(),
+ };
+
+ let (stream, response): (WsStream, Response) =
+ client_async(request, socket).await.map_err(|err| {
+ type_error(format!(
+ "failed to connect to WebSocket: {}",
+ err.to_string()
+ ))
+ })?;
+
+ let mut state = state.borrow_mut();
+ let rid = state
+ .resource_table
+ .add("webSocketStream", Box::new(stream));
+
+ let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
+ Some(header) => header.to_str().unwrap(),
+ None => "",
+ };
+ let extensions = response
+ .headers()
+ .get_all("Sec-WebSocket-Extensions")
+ .iter()
+ .map(|header| header.to_str().unwrap())
+ .collect::<String>();
+ Ok(json!({
+ "success": true,
+ "rid": rid,
+ "protocol": protocol,
+ "extensions": extensions
+ }))
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SendArgs {
+ rid: u32,
+ text: Option<String>,
+}
+
+pub async fn op_ws_send(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: SendArgs = serde_json::from_value(args)?;
+
+ let mut maybe_msg = Some(match args.text {
+ Some(text) => Message::Text(text),
+ None => Message::Binary(bufs[0].to_vec()),
+ });
+ let rid = args.rid;
+
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let stream = state
+ .resource_table
+ .get_mut::<WsStream>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // TODO(ry) Handle errors below instead of unwrap.
+ // Need to map `TungsteniteError` to `AnyError`.
+ ready!(stream.poll_ready_unpin(cx)).unwrap();
+ if let Some(msg) = maybe_msg.take() {
+ stream.start_send_unpin(msg).unwrap();
+ }
+ ready!(stream.poll_flush_unpin(cx)).unwrap();
+
+ Poll::Ready(Ok(json!({})))
+ })
+ .await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CloseArgs {
+ rid: u32,
+ code: Option<u16>,
+ reason: Option<String>,
+}
+
+pub async fn op_ws_close(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: CloseArgs = serde_json::from_value(args)?;
+ let rid = args.rid;
+ let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame {
+ code: CloseCode::from(c),
+ reason: match args.reason {
+ Some(reason) => Cow::from(reason),
+ None => Default::default(),
+ },
+ })));
+
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let stream = state
+ .resource_table
+ .get_mut::<WsStream>(rid)
+ .ok_or_else(bad_resource_id)?;
+
+ // TODO(ry) Handle errors below instead of unwrap.
+ // Need to map `TungsteniteError` to `AnyError`.
+ ready!(stream.poll_ready_unpin(cx)).unwrap();
+ if let Some(msg) = maybe_msg.take() {
+ stream.start_send_unpin(msg).unwrap();
+ }
+ ready!(stream.poll_flush_unpin(cx)).unwrap();
+ ready!(stream.poll_close_unpin(cx)).unwrap();
+
+ Poll::Ready(Ok(json!({})))
+ })
+ .await
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct NextEventArgs {
+ rid: u32,
+}
+
+pub async fn op_ws_next_event(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _bufs: BufVec,
+) -> Result<Value, AnyError> {
+ let args: NextEventArgs = serde_json::from_value(args)?;
+ poll_fn(move |cx| {
+ let mut state = state.borrow_mut();
+ let stream = state
+ .resource_table
+ .get_mut::<WsStream>(args.rid)
+ .ok_or_else(bad_resource_id)?;
+ stream
+ .poll_next_unpin(cx)
+ .map(|val| {
+ match val {
+ Some(Ok(Message::Text(text))) => json!({
+ "type": "string",
+ "data": text
+ }),
+ Some(Ok(Message::Binary(data))) => {
+ // TODO(ry): don't use json to send binary data.
+ json!({
+ "type": "binary",
+ "data": data
+ })
+ }
+ Some(Ok(Message::Close(Some(frame)))) => json!({
+ "type": "close",
+ "code": u16::from(frame.code),
+ "reason": frame.reason.as_ref()
+ }),
+ Some(Ok(Message::Close(None))) => json!({ "type": "close" }),
+ Some(Ok(Message::Ping(_))) => json!({"type": "ping"}),
+ Some(Ok(Message::Pong(_))) => json!({"type": "pong"}),
+ Some(Err(_)) => json!({"type": "error"}),
+ None => {
+ state.resource_table.close(args.rid).unwrap();
+ json!({"type": "closed"})
+ }
+ }
+ })
+ .map(Ok)
+ })
+ .await
+}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
new file mode 100644
index 000000000..871e4b9fe
--- /dev/null
+++ b/runtime/ops/worker_host.rs
@@ -0,0 +1,318 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::permissions::Permissions;
+use crate::web_worker::run_web_worker;
+use crate::web_worker::WebWorker;
+use crate::web_worker::WebWorkerHandle;
+use crate::web_worker::WorkerEvent;
+use deno_core::error::generic_error;
+use deno_core::error::AnyError;
+use deno_core::error::JsError;
+use deno_core::futures::channel::mpsc;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::serde_json::Value;
+use deno_core::BufVec;
+use deno_core::ModuleSpecifier;
+use deno_core::OpState;
+use deno_core::ZeroCopyBuf;
+use serde::Deserialize;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::convert::From;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread::JoinHandle;
+
+pub struct CreateWebWorkerArgs {
+ pub name: String,
+ pub worker_id: u32,
+ pub permissions: Permissions,
+ pub main_module: ModuleSpecifier,
+ pub use_deno_namespace: bool,
+}
+
+pub type CreateWebWorkerCb =
+ dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send;
+
+/// A holder for callback that is used to create a new
+/// WebWorker. It's a struct instead of a type alias
+/// because `GothamState` used in `OpState` overrides
+/// value if type alises have the same underlying type
+#[derive(Clone)]
+pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
+
+#[derive(Deserialize)]
+struct HostUnhandledErrorArgs {
+ message: String,
+}
+
+pub fn init(
+ rt: &mut deno_core::JsRuntime,
+ sender: Option<mpsc::Sender<WorkerEvent>>,
+ create_web_worker_cb: Arc<CreateWebWorkerCb>,
+) {
+ {
+ let op_state = rt.op_state();
+ let mut state = op_state.borrow_mut();
+ state.put::<WorkersTable>(WorkersTable::default());
+ state.put::<WorkerId>(WorkerId::default());
+
+ let create_module_loader = CreateWebWorkerCbHolder(create_web_worker_cb);
+ state.put::<CreateWebWorkerCbHolder>(create_module_loader);
+ }
+ super::reg_json_sync(rt, "op_create_worker", op_create_worker);
+ super::reg_json_sync(
+ rt,
+ "op_host_terminate_worker",
+ op_host_terminate_worker,
+ );
+ super::reg_json_sync(rt, "op_host_post_message", op_host_post_message);
+ super::reg_json_async(rt, "op_host_get_message", op_host_get_message);
+ super::reg_json_sync(
+ rt,
+ "op_host_unhandled_error",
+ move |_state, args, _zero_copy| {
+ if let Some(mut sender) = sender.clone() {
+ let args: HostUnhandledErrorArgs = serde_json::from_value(args)?;
+ sender
+ .try_send(WorkerEvent::Error(generic_error(args.message)))
+ .expect("Failed to propagate error event to parent worker");
+ Ok(json!(true))
+ } else {
+ Err(generic_error("Cannot be called from main worker."))
+ }
+ },
+ );
+}
+
+pub struct WorkerThread {
+ join_handle: JoinHandle<Result<(), AnyError>>,
+ worker_handle: WebWorkerHandle,
+}
+
+pub type WorkersTable = HashMap<u32, WorkerThread>;
+pub type WorkerId = u32;
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct CreateWorkerArgs {
+ name: Option<String>,
+ specifier: String,
+ has_source_code: bool,
+ source_code: String,
+ use_deno_namespace: bool,
+}
+
+/// Create worker as the host
+fn op_create_worker(
+ state: &mut OpState,
+ args: Value,
+ _data: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: CreateWorkerArgs = serde_json::from_value(args)?;
+
+ let specifier = args.specifier.clone();
+ let maybe_source_code = if args.has_source_code {
+ Some(args.source_code.clone())
+ } else {
+ None
+ };
+ let args_name = args.name;
+ let use_deno_namespace = args.use_deno_namespace;
+ if use_deno_namespace {
+ super::check_unstable(state, "Worker.deno");
+ }
+ let permissions = state.borrow::<Permissions>().clone();
+ let worker_id = state.take::<WorkerId>();
+ let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
+ state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
+ state.put::<WorkerId>(worker_id + 1);
+
+ let module_specifier = ModuleSpecifier::resolve_url(&specifier)?;
+ let worker_name = args_name.unwrap_or_else(|| "".to_string());
+
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1);
+
+ // Setup new thread
+ let thread_builder =
+ std::thread::Builder::new().name(format!("deno-worker-{}", worker_id));
+
+ // Spawn it
+ let join_handle = thread_builder.spawn(move || {
+ // Any error inside this block is terminal:
+ // - JS worker is useless - meaning it throws an exception and can't do anything else,
+ // all action done upon it should be noops
+ // - newly spawned thread exits
+
+ let worker = (create_module_loader.0)(CreateWebWorkerArgs {
+ name: worker_name,
+ worker_id,
+ permissions,
+ main_module: module_specifier.clone(),
+ use_deno_namespace,
+ });
+
+ // Send thread safe handle to newly created worker to host thread
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+
+ // At this point the only method of communication with host
+ // is using `worker.internal_channels`.
+ //
+ // Host can already push messages and interact with worker.
+ run_web_worker(worker, module_specifier, maybe_source_code)
+ })?;
+
+ let worker_handle = handle_receiver.recv().unwrap()?;
+
+ let worker_thread = WorkerThread {
+ join_handle,
+ worker_handle,
+ };
+
+ // At this point all interactions with worker happen using thread
+ // safe handler returned from previous function calls
+ state
+ .borrow_mut::<WorkersTable>()
+ .insert(worker_id, worker_thread);
+
+ Ok(json!({ "id": worker_id }))
+}
+
+#[derive(Deserialize)]
+struct WorkerArgs {
+ id: i32,
+}
+
+fn op_host_terminate_worker(
+ state: &mut OpState,
+ args: Value,
+ _data: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let worker_thread = state
+ .borrow_mut::<WorkersTable>()
+ .remove(&id)
+ .expect("No worker handle found");
+ worker_thread.worker_handle.terminate();
+ worker_thread
+ .join_handle
+ .join()
+ .expect("Panic in worker thread")
+ .expect("Panic in worker event loop");
+ Ok(json!({}))
+}
+
+fn serialize_worker_event(event: WorkerEvent) -> Value {
+ match event {
+ WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
+ WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() {
+ Ok(js_error) => json!({
+ "type": "terminalError",
+ "error": {
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }
+ }),
+ Err(error) => json!({
+ "type": "terminalError",
+ "error": {
+ "message": error.to_string(),
+ }
+ }),
+ },
+ WorkerEvent::Error(error) => match error.downcast::<JsError>() {
+ Ok(js_error) => json!({
+ "type": "error",
+ "error": {
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }
+ }),
+ Err(error) => json!({
+ "type": "error",
+ "error": {
+ "message": error.to_string(),
+ }
+ }),
+ },
+ }
+}
+
+/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
+/// might have been called already meaning that we won't find worker in
+/// table - in that case ignore.
+fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) {
+ let mut s = state.borrow_mut();
+ let workers = s.borrow_mut::<WorkersTable>();
+ if let Some(mut worker_thread) = workers.remove(&id) {
+ worker_thread.worker_handle.sender.close_channel();
+ worker_thread
+ .join_handle
+ .join()
+ .expect("Worker thread panicked")
+ .expect("Panic in worker event loop");
+ }
+}
+
+/// Get message from guest worker as host
+async fn op_host_get_message(
+ state: Rc<RefCell<OpState>>,
+ args: Value,
+ _zero_copy: BufVec,
+) -> Result<Value, AnyError> {
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+
+ let worker_handle = {
+ let s = state.borrow();
+ let workers_table = s.borrow::<WorkersTable>();
+ let maybe_handle = workers_table.get(&id);
+ if let Some(handle) = maybe_handle {
+ handle.worker_handle.clone()
+ } else {
+ // If handle was not found it means worker has already shutdown
+ return Ok(json!({ "type": "close" }));
+ }
+ };
+
+ let maybe_event = worker_handle.get_event().await?;
+ if let Some(event) = maybe_event {
+ // Terminal error means that worker should be removed from worker table.
+ if let WorkerEvent::TerminalError(_) = &event {
+ try_remove_and_close(state, id);
+ }
+ return Ok(serialize_worker_event(event));
+ }
+
+ // If there was no event from worker it means it has already been closed.
+ try_remove_and_close(state, id);
+ Ok(json!({ "type": "close" }))
+}
+
+/// Post message to guest worker as host
+fn op_host_post_message(
+ state: &mut OpState,
+ args: Value,
+ data: &mut [ZeroCopyBuf],
+) -> Result<Value, AnyError> {
+ assert_eq!(data.len(), 1, "Invalid number of arguments");
+ let args: WorkerArgs = serde_json::from_value(args)?;
+ let id = args.id as u32;
+ let msg = Vec::from(&*data[0]).into_boxed_slice();
+
+ debug!("post message to worker {}", id);
+ let worker_thread = state
+ .borrow::<WorkersTable>()
+ .get(&id)
+ .expect("No worker handle found");
+ worker_thread.worker_handle.post_message(msg)?;
+ Ok(json!({}))
+}