diff options
Diffstat (limited to 'runtime/ops')
-rw-r--r-- | runtime/ops/crypto.rs | 14 | ||||
-rw-r--r-- | runtime/ops/dispatch_minimal.rs | 205 | ||||
-rw-r--r-- | runtime/ops/fetch.rs | 25 | ||||
-rw-r--r-- | runtime/ops/fs.rs | 1702 | ||||
-rw-r--r-- | runtime/ops/fs_events.rs | 133 | ||||
-rw-r--r-- | runtime/ops/io.rs | 473 | ||||
-rw-r--r-- | runtime/ops/mod.rs | 89 | ||||
-rw-r--r-- | runtime/ops/net.rs | 566 | ||||
-rw-r--r-- | runtime/ops/net_unix.rs | 151 | ||||
-rw-r--r-- | runtime/ops/os.rs | 192 | ||||
-rw-r--r-- | runtime/ops/permissions.rs | 103 | ||||
-rw-r--r-- | runtime/ops/plugin.rs | 156 | ||||
-rw-r--r-- | runtime/ops/process.rs | 290 | ||||
-rw-r--r-- | runtime/ops/runtime.rs | 118 | ||||
-rw-r--r-- | runtime/ops/signal.rs | 142 | ||||
-rw-r--r-- | runtime/ops/timers.rs | 193 | ||||
-rw-r--r-- | runtime/ops/tls.rs | 431 | ||||
-rw-r--r-- | runtime/ops/tty.rs | 334 | ||||
-rw-r--r-- | runtime/ops/web_worker.rs | 37 | ||||
-rw-r--r-- | runtime/ops/websocket.rs | 326 | ||||
-rw-r--r-- | runtime/ops/worker_host.rs | 318 |
21 files changed, 5998 insertions, 0 deletions
diff --git a/runtime/ops/crypto.rs b/runtime/ops/crypto.rs new file mode 100644 index 000000000..a73843a33 --- /dev/null +++ b/runtime/ops/crypto.rs @@ -0,0 +1,14 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use deno_crypto::op_get_random_values; +use deno_crypto::rand::rngs::StdRng; +use deno_crypto::rand::SeedableRng; + +pub fn init(rt: &mut deno_core::JsRuntime, maybe_seed: Option<u64>) { + if let Some(seed) = maybe_seed { + let rng = StdRng::seed_from_u64(seed); + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::<StdRng>(rng); + } + super::reg_json_sync(rt, "op_get_random_values", op_get_random_values); +} diff --git a/runtime/ops/dispatch_minimal.rs b/runtime/ops/dispatch_minimal.rs new file mode 100644 index 000000000..ae8fa819d --- /dev/null +++ b/runtime/ops/dispatch_minimal.rs @@ -0,0 +1,205 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::AnyError; +use deno_core::futures::future::FutureExt; +use deno_core::BufVec; +use deno_core::Op; +use deno_core::OpFn; +use deno_core::OpState; +use std::cell::RefCell; +use std::future::Future; +use std::iter::repeat; +use std::mem::size_of_val; +use std::pin::Pin; +use std::rc::Rc; +use std::slice; + +pub enum MinimalOp { + Sync(Result<i32, AnyError>), + Async(Pin<Box<dyn Future<Output = Result<i32, AnyError>>>>), +} + +#[derive(Copy, Clone, Debug, PartialEq)] +// This corresponds to RecordMinimal on the TS side. +pub struct Record { + pub promise_id: i32, + pub arg: i32, + pub result: i32, +} + +impl Into<Box<[u8]>> for Record { + fn into(self) -> Box<[u8]> { + let vec = vec![self.promise_id, self.arg, self.result]; + let buf32 = vec.into_boxed_slice(); + let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; + unsafe { Box::from_raw(ptr) } + } +} + +pub struct ErrorRecord { + pub promise_id: i32, + pub arg: i32, + pub error_len: i32, + pub error_class: &'static [u8], + pub error_message: Vec<u8>, +} + +impl Into<Box<[u8]>> for ErrorRecord { + fn into(self) -> Box<[u8]> { + let Self { + promise_id, + arg, + error_len, + error_class, + error_message, + .. + } = self; + let header_i32 = [promise_id, arg, error_len]; + let header_u8 = unsafe { + slice::from_raw_parts( + &header_i32 as *const _ as *const u8, + size_of_val(&header_i32), + ) + }; + let padded_len = + (header_u8.len() + error_class.len() + error_message.len() + 3usize) + & !3usize; + header_u8 + .iter() + .cloned() + .chain(error_class.iter().cloned()) + .chain(error_message.into_iter()) + .chain(repeat(b' ')) + .take(padded_len) + .collect() + } +} + +#[test] +fn test_error_record() { + let expected = vec![ + 1, 0, 0, 0, 255, 255, 255, 255, 11, 0, 0, 0, 66, 97, 100, 82, 101, 115, + 111, 117, 114, 99, 101, 69, 114, 114, 111, 114, + ]; + let err_record = ErrorRecord { + promise_id: 1, + arg: -1, + error_len: 11, + error_class: b"BadResource", + error_message: b"Error".to_vec(), + }; + let buf: Box<[u8]> = err_record.into(); + assert_eq!(buf, expected.into_boxed_slice()); +} + +pub fn parse_min_record(bytes: &[u8]) -> Option<Record> { + if bytes.len() % std::mem::size_of::<i32>() != 0 { + return None; + } + let p = bytes.as_ptr(); + #[allow(clippy::cast_ptr_alignment)] + let p32 = p as *const i32; + let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }; + + if s.len() != 3 { + return None; + } + let ptr = s.as_ptr(); + let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; + Some(Record { + promise_id: ints[0], + arg: ints[1], + result: ints[2], + }) +} + +#[test] +fn test_parse_min_record() { + let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]; + assert_eq!( + parse_min_record(&buf), + Some(Record { + promise_id: 1, + arg: 3, + result: 4 + }) + ); + + let buf = vec![]; + assert_eq!(parse_min_record(&buf), None); + + let buf = vec![5]; + assert_eq!(parse_min_record(&buf), None); +} + +pub fn minimal_op<F>(op_fn: F) -> Box<OpFn> +where + F: Fn(Rc<RefCell<OpState>>, bool, i32, BufVec) -> MinimalOp + 'static, +{ + Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().expect("Expected record at position 0"); + let zero_copy = bufs_iter.collect::<BufVec>(); + + let mut record = match parse_min_record(&record_buf) { + Some(r) => r, + None => { + let error_class = b"TypeError"; + let error_message = b"Unparsable control buffer"; + let error_record = ErrorRecord { + promise_id: 0, + arg: -1, + error_len: error_class.len() as i32, + error_class, + error_message: error_message[..].to_owned(), + }; + return Op::Sync(error_record.into()); + } + }; + let is_sync = record.promise_id == 0; + let rid = record.arg; + let min_op = op_fn(state.clone(), is_sync, rid, zero_copy); + + match min_op { + MinimalOp::Sync(sync_result) => Op::Sync(match sync_result { + Ok(r) => { + record.result = r; + record.into() + } + Err(err) => { + let error_class = (state.borrow().get_error_class_fn)(&err); + let error_record = ErrorRecord { + promise_id: record.promise_id, + arg: -1, + error_len: error_class.len() as i32, + error_class: error_class.as_bytes(), + error_message: err.to_string().as_bytes().to_owned(), + }; + error_record.into() + } + }), + MinimalOp::Async(min_fut) => { + let fut = async move { + match min_fut.await { + Ok(r) => { + record.result = r; + record.into() + } + Err(err) => { + let error_class = (state.borrow().get_error_class_fn)(&err); + let error_record = ErrorRecord { + promise_id: record.promise_id, + arg: -1, + error_len: error_class.len() as i32, + error_class: error_class.as_bytes(), + error_message: err.to_string().as_bytes().to_owned(), + }; + error_record.into() + } + } + }; + Op::Async(fut.boxed_local()) + } + } + }) +} diff --git a/runtime/ops/fetch.rs b/runtime/ops/fetch.rs new file mode 100644 index 000000000..0ef99f73d --- /dev/null +++ b/runtime/ops/fetch.rs @@ -0,0 +1,25 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use crate::http_util; +use crate::permissions::Permissions; +use deno_fetch::reqwest; + +pub fn init( + rt: &mut deno_core::JsRuntime, + user_agent: String, + maybe_ca_file: Option<&str>, +) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::<reqwest::Client>({ + http_util::create_http_client(user_agent, maybe_ca_file).unwrap() + }); + } + super::reg_json_async(rt, "op_fetch", deno_fetch::op_fetch::<Permissions>); + super::reg_json_async(rt, "op_fetch_read", deno_fetch::op_fetch_read); + super::reg_json_sync( + rt, + "op_create_http_client", + deno_fetch::op_create_http_client::<Permissions>, + ); +} diff --git a/runtime/ops/fs.rs b/runtime/ops/fs.rs new file mode 100644 index 000000000..865c5bcca --- /dev/null +++ b/runtime/ops/fs.rs @@ -0,0 +1,1702 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +// Some deserializer fields are only used on Unix and Windows build fails without it +use super::io::std_file_resource; +use super::io::{FileMetadata, StreamResource, StreamResourceHolder}; +use crate::fs_util::canonicalize_path; +use crate::permissions::Permissions; +use deno_core::error::custom_error; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use deno_crypto::rand::thread_rng; +use deno_crypto::rand::Rng; +use serde::Deserialize; +use std::cell::RefCell; +use std::convert::From; +use std::env::{current_dir, set_current_dir, temp_dir}; +use std::io; +use std::io::{Seek, SeekFrom}; +use std::path::{Path, PathBuf}; +use std::rc::Rc; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; + +#[cfg(not(unix))] +use deno_core::error::generic_error; +#[cfg(not(unix))] +use deno_core::error::not_supported; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_open_sync", op_open_sync); + super::reg_json_async(rt, "op_open_async", op_open_async); + + super::reg_json_sync(rt, "op_seek_sync", op_seek_sync); + super::reg_json_async(rt, "op_seek_async", op_seek_async); + + super::reg_json_sync(rt, "op_fdatasync_sync", op_fdatasync_sync); + super::reg_json_async(rt, "op_fdatasync_async", op_fdatasync_async); + + super::reg_json_sync(rt, "op_fsync_sync", op_fsync_sync); + super::reg_json_async(rt, "op_fsync_async", op_fsync_async); + + super::reg_json_sync(rt, "op_fstat_sync", op_fstat_sync); + super::reg_json_async(rt, "op_fstat_async", op_fstat_async); + + super::reg_json_sync(rt, "op_umask", op_umask); + super::reg_json_sync(rt, "op_chdir", op_chdir); + + super::reg_json_sync(rt, "op_mkdir_sync", op_mkdir_sync); + super::reg_json_async(rt, "op_mkdir_async", op_mkdir_async); + + super::reg_json_sync(rt, "op_chmod_sync", op_chmod_sync); + super::reg_json_async(rt, "op_chmod_async", op_chmod_async); + + super::reg_json_sync(rt, "op_chown_sync", op_chown_sync); + super::reg_json_async(rt, "op_chown_async", op_chown_async); + + super::reg_json_sync(rt, "op_remove_sync", op_remove_sync); + super::reg_json_async(rt, "op_remove_async", op_remove_async); + + super::reg_json_sync(rt, "op_copy_file_sync", op_copy_file_sync); + super::reg_json_async(rt, "op_copy_file_async", op_copy_file_async); + + super::reg_json_sync(rt, "op_stat_sync", op_stat_sync); + super::reg_json_async(rt, "op_stat_async", op_stat_async); + + super::reg_json_sync(rt, "op_realpath_sync", op_realpath_sync); + super::reg_json_async(rt, "op_realpath_async", op_realpath_async); + + super::reg_json_sync(rt, "op_read_dir_sync", op_read_dir_sync); + super::reg_json_async(rt, "op_read_dir_async", op_read_dir_async); + + super::reg_json_sync(rt, "op_rename_sync", op_rename_sync); + super::reg_json_async(rt, "op_rename_async", op_rename_async); + + super::reg_json_sync(rt, "op_link_sync", op_link_sync); + super::reg_json_async(rt, "op_link_async", op_link_async); + + super::reg_json_sync(rt, "op_symlink_sync", op_symlink_sync); + super::reg_json_async(rt, "op_symlink_async", op_symlink_async); + + super::reg_json_sync(rt, "op_read_link_sync", op_read_link_sync); + super::reg_json_async(rt, "op_read_link_async", op_read_link_async); + + super::reg_json_sync(rt, "op_ftruncate_sync", op_ftruncate_sync); + super::reg_json_async(rt, "op_ftruncate_async", op_ftruncate_async); + + super::reg_json_sync(rt, "op_truncate_sync", op_truncate_sync); + super::reg_json_async(rt, "op_truncate_async", op_truncate_async); + + super::reg_json_sync(rt, "op_make_temp_dir_sync", op_make_temp_dir_sync); + super::reg_json_async(rt, "op_make_temp_dir_async", op_make_temp_dir_async); + + super::reg_json_sync(rt, "op_make_temp_file_sync", op_make_temp_file_sync); + super::reg_json_async(rt, "op_make_temp_file_async", op_make_temp_file_async); + + super::reg_json_sync(rt, "op_cwd", op_cwd); + + super::reg_json_sync(rt, "op_futime_sync", op_futime_sync); + super::reg_json_async(rt, "op_futime_async", op_futime_async); + + super::reg_json_sync(rt, "op_utime_sync", op_utime_sync); + super::reg_json_async(rt, "op_utime_async", op_utime_async); +} + +fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { + s.into_string().map_err(|s| { + let message = format!("File name or path {:?} is not valid UTF-8", s); + custom_error("InvalidData", message) + }) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct OpenArgs { + path: String, + mode: Option<u32>, + options: OpenOptions, +} + +#[derive(Deserialize, Default, Debug)] +#[serde(rename_all = "camelCase")] +#[serde(default)] +struct OpenOptions { + read: bool, + write: bool, + create: bool, + truncate: bool, + append: bool, + create_new: bool, +} + +fn open_helper( + state: &mut OpState, + args: Value, +) -> Result<(PathBuf, std::fs::OpenOptions), AnyError> { + let args: OpenArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + + let mut open_options = std::fs::OpenOptions::new(); + + if let Some(mode) = args.mode { + // mode only used if creating the file on Unix + // if not specified, defaults to 0o666 + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + open_options.mode(mode & 0o777); + } + #[cfg(not(unix))] + let _ = mode; // avoid unused warning + } + + let permissions = state.borrow::<Permissions>(); + let options = args.options; + + if options.read { + permissions.check_read(&path)?; + } + + if options.write || options.append { + permissions.check_write(&path)?; + } + + open_options + .read(options.read) + .create(options.create) + .write(options.write) + .truncate(options.truncate) + .append(options.append) + .create_new(options.create_new); + + Ok((path, open_options)) +} + +fn op_open_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let (path, open_options) = open_helper(state, args)?; + let std_file = open_options.open(path)?; + let tokio_file = tokio::fs::File::from_std(std_file); + let rid = state.resource_table.add( + "fsFile", + Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some(( + tokio_file, + FileMetadata::default(), + ))))), + ); + Ok(json!(rid)) +} + +async fn op_open_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let (path, open_options) = open_helper(&mut state.borrow_mut(), args)?; + let tokio_file = tokio::fs::OpenOptions::from(open_options) + .open(path) + .await?; + let rid = state.borrow_mut().resource_table.add( + "fsFile", + Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some(( + tokio_file, + FileMetadata::default(), + ))))), + ); + Ok(json!(rid)) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SeekArgs { + rid: i32, + offset: i64, + whence: i32, +} + +fn seek_helper(args: Value) -> Result<(u32, SeekFrom), AnyError> { + let args: SeekArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let offset = args.offset; + let whence = args.whence as u32; + // Translate seek mode to Rust repr. + let seek_from = match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(offset), + 2 => SeekFrom::End(offset), + _ => { + return Err(type_error(format!("Invalid seek mode: {}", whence))); + } + }; + + Ok((rid, seek_from)) +} + +fn op_seek_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let (rid, seek_from) = seek_helper(args)?; + let pos = std_file_resource(state, rid, |r| match r { + Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from), + Err(_) => Err(type_error( + "cannot seek on this type of resource".to_string(), + )), + })?; + Ok(json!(pos)) +} + +async fn op_seek_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let (rid, seek_from) = seek_helper(args)?; + // TODO(ry) This is a fake async op. We need to use poll_fn, + // tokio::fs::File::start_seek and tokio::fs::File::poll_complete + let pos = std_file_resource(&mut state.borrow_mut(), rid, |r| match r { + Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from), + Err(_) => Err(type_error( + "cannot seek on this type of resource".to_string(), + )), + })?; + Ok(json!(pos)) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct FdatasyncArgs { + rid: i32, +} + +fn op_fdatasync_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: FdatasyncArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + std_file_resource(state, rid, |r| match r { + Ok(std_file) => std_file.sync_data().map_err(AnyError::from), + Err(_) => Err(type_error("cannot sync this type of resource".to_string())), + })?; + Ok(json!({})) +} + +async fn op_fdatasync_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: FdatasyncArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + std_file_resource(&mut state.borrow_mut(), rid, |r| match r { + Ok(std_file) => std_file.sync_data().map_err(AnyError::from), + Err(_) => Err(type_error("cannot sync this type of resource".to_string())), + })?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct FsyncArgs { + rid: i32, +} + +fn op_fsync_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: FsyncArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + std_file_resource(state, rid, |r| match r { + Ok(std_file) => std_file.sync_all().map_err(AnyError::from), + Err(_) => Err(type_error("cannot sync this type of resource".to_string())), + })?; + Ok(json!({})) +} + +async fn op_fsync_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: FsyncArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + std_file_resource(&mut state.borrow_mut(), rid, |r| match r { + Ok(std_file) => std_file.sync_all().map_err(AnyError::from), + Err(_) => Err(type_error("cannot sync this type of resource".to_string())), + })?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct FstatArgs { + rid: i32, +} + +fn op_fstat_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.fstat"); + let args: FstatArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let metadata = std_file_resource(state, rid, |r| match r { + Ok(std_file) => std_file.metadata().map_err(AnyError::from), + Err(_) => Err(type_error("cannot stat this type of resource".to_string())), + })?; + Ok(get_stat_json(metadata)) +} + +async fn op_fstat_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + super::check_unstable2(&state, "Deno.fstat"); + + let args: FstatArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let metadata = + std_file_resource(&mut state.borrow_mut(), rid, |r| match r { + Ok(std_file) => std_file.metadata().map_err(AnyError::from), + Err(_) => { + Err(type_error("cannot stat this type of resource".to_string())) + } + })?; + Ok(get_stat_json(metadata)) +} + +#[derive(Deserialize)] +struct UmaskArgs { + mask: Option<u32>, +} + +fn op_umask( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.umask"); + let args: UmaskArgs = serde_json::from_value(args)?; + // TODO implement umask for Windows + // see https://github.com/nodejs/node/blob/master/src/node_process_methods.cc + // and https://docs.microsoft.com/fr-fr/cpp/c-runtime-library/reference/umask?view=vs-2019 + #[cfg(not(unix))] + { + let _ = args.mask; // avoid unused warning. + Err(not_supported()) + } + #[cfg(unix)] + { + use nix::sys::stat::mode_t; + use nix::sys::stat::umask; + use nix::sys::stat::Mode; + let r = if let Some(mask) = args.mask { + // If mask provided, return previous. + umask(Mode::from_bits_truncate(mask as mode_t)) + } else { + // If no mask provided, we query the current. Requires two syscalls. + let prev = umask(Mode::from_bits_truncate(0o777)); + let _ = umask(prev); + prev + }; + Ok(json!(r.bits() as u32)) + } +} + +#[derive(Deserialize)] +struct ChdirArgs { + directory: String, +} + +fn op_chdir( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: ChdirArgs = serde_json::from_value(args)?; + let d = PathBuf::from(&args.directory); + state.borrow::<Permissions>().check_read(&d)?; + set_current_dir(&d)?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct MkdirArgs { + path: String, + recursive: bool, + mode: Option<u32>, +} + +fn op_mkdir_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: MkdirArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + let mode = args.mode.unwrap_or(0o777) & 0o777; + state.borrow::<Permissions>().check_write(&path)?; + debug!("op_mkdir {} {:o} {}", path.display(), mode, args.recursive); + let mut builder = std::fs::DirBuilder::new(); + builder.recursive(args.recursive); + #[cfg(unix)] + { + use std::os::unix::fs::DirBuilderExt; + builder.mode(mode); + } + builder.create(path)?; + Ok(json!({})) +} + +async fn op_mkdir_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: MkdirArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + let mode = args.mode.unwrap_or(0o777) & 0o777; + + { + let state = state.borrow(); + state.borrow::<Permissions>().check_write(&path)?; + } + + tokio::task::spawn_blocking(move || { + debug!("op_mkdir {} {:o} {}", path.display(), mode, args.recursive); + let mut builder = std::fs::DirBuilder::new(); + builder.recursive(args.recursive); + #[cfg(unix)] + { + use std::os::unix::fs::DirBuilderExt; + builder.mode(mode); + } + builder.create(path)?; + Ok(json!({})) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ChmodArgs { + path: String, + mode: u32, +} + +fn op_chmod_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: ChmodArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + let mode = args.mode & 0o777; + + state.borrow::<Permissions>().check_write(&path)?; + debug!("op_chmod_sync {} {:o}", path.display(), mode); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let permissions = PermissionsExt::from_mode(mode); + std::fs::set_permissions(&path, permissions)?; + Ok(json!({})) + } + // TODO Implement chmod for Windows (#4357) + #[cfg(not(unix))] + { + // Still check file/dir exists on Windows + let _metadata = std::fs::metadata(&path)?; + Err(generic_error("Not implemented")) + } +} + +async fn op_chmod_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: ChmodArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + let mode = args.mode & 0o777; + + { + let state = state.borrow(); + state.borrow::<Permissions>().check_write(&path)?; + } + + tokio::task::spawn_blocking(move || { + debug!("op_chmod_async {} {:o}", path.display(), mode); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let permissions = PermissionsExt::from_mode(mode); + std::fs::set_permissions(&path, permissions)?; + Ok(json!({})) + } + // TODO Implement chmod for Windows (#4357) + #[cfg(not(unix))] + { + // Still check file/dir exists on Windows + let _metadata = std::fs::metadata(&path)?; + Err(not_supported()) + } + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ChownArgs { + path: String, + uid: Option<u32>, + gid: Option<u32>, +} + +fn op_chown_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: ChownArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + state.borrow::<Permissions>().check_write(&path)?; + debug!( + "op_chown_sync {} {:?} {:?}", + path.display(), + args.uid, + args.gid, + ); + #[cfg(unix)] + { + use nix::unistd::{chown, Gid, Uid}; + let nix_uid = args.uid.map(Uid::from_raw); + let nix_gid = args.gid.map(Gid::from_raw); + chown(&path, nix_uid, nix_gid)?; + Ok(json!({})) + } + // TODO Implement chown for Windows + #[cfg(not(unix))] + { + Err(generic_error("Not implemented")) + } +} + +async fn op_chown_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: ChownArgs = serde_json::from_value(args)?; + let path = Path::new(&args.path).to_path_buf(); + + { + let state = state.borrow(); + state.borrow::<Permissions>().check_write(&path)?; + } + + tokio::task::spawn_blocking(move || { + debug!( + "op_chown_async {} {:?} {:?}", + path.display(), + args.uid, + args.gid, + ); + #[cfg(unix)] + { + use nix::unistd::{chown, Gid, Uid}; + let nix_uid = args.uid.map(Uid::from_raw); + let nix_gid = args.gid.map(Gid::from_raw); + chown(&path, nix_uid, nix_gid)?; + Ok(json!({})) + } + // TODO Implement chown for Windows + #[cfg(not(unix))] + Err(not_supported()) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RemoveArgs { + path: String, + recursive: bool, +} + +fn op_remove_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: RemoveArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let recursive = args.recursive; + + state.borrow::<Permissions>().check_write(&path)?; + + #[cfg(not(unix))] + use std::os::windows::prelude::MetadataExt; + + let metadata = std::fs::symlink_metadata(&path)?; + + debug!("op_remove_sync {} {}", path.display(), recursive); + let file_type = metadata.file_type(); + if file_type.is_file() { + std::fs::remove_file(&path)?; + } else if recursive { + std::fs::remove_dir_all(&path)?; + } else if file_type.is_symlink() { + #[cfg(unix)] + std::fs::remove_file(&path)?; + #[cfg(not(unix))] + { + use winapi::um::winnt::FILE_ATTRIBUTE_DIRECTORY; + if metadata.file_attributes() & FILE_ATTRIBUTE_DIRECTORY != 0 { + std::fs::remove_dir(&path)?; + } else { + std::fs::remove_file(&path)?; + } + } + } else if file_type.is_dir() { + std::fs::remove_dir(&path)?; + } else { + // pipes, sockets, etc... + std::fs::remove_file(&path)?; + } + Ok(json!({})) +} + +async fn op_remove_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: RemoveArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let recursive = args.recursive; + + { + let state = state.borrow(); + state.borrow::<Permissions>().check_write(&path)?; + } + + tokio::task::spawn_blocking(move || { + #[cfg(not(unix))] + use std::os::windows::prelude::MetadataExt; + + let metadata = std::fs::symlink_metadata(&path)?; + + debug!("op_remove_async {} {}", path.display(), recursive); + let file_type = metadata.file_type(); + if file_type.is_file() { + std::fs::remove_file(&path)?; + } else if recursive { + std::fs::remove_dir_all(&path)?; + } else if file_type.is_symlink() { + #[cfg(unix)] + std::fs::remove_file(&path)?; + #[cfg(not(unix))] + { + use winapi::um::winnt::FILE_ATTRIBUTE_DIRECTORY; + if metadata.file_attributes() & FILE_ATTRIBUTE_DIRECTORY != 0 { + std::fs::remove_dir(&path)?; + } else { + std::fs::remove_file(&path)?; + } + } + } else if file_type.is_dir() { + std::fs::remove_dir(&path)?; + } else { + // pipes, sockets, etc... + std::fs::remove_file(&path)?; + } + Ok(json!({})) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CopyFileArgs { + from: String, + to: String, +} + +fn op_copy_file_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: CopyFileArgs = serde_json::from_value(args)?; + let from = PathBuf::from(&args.from); + let to = PathBuf::from(&args.to); + + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&from)?; + permissions.check_write(&to)?; + + debug!("op_copy_file_sync {} {}", from.display(), to.display()); + // On *nix, Rust reports non-existent `from` as ErrorKind::InvalidInput + // See https://github.com/rust-lang/rust/issues/54800 + // Once the issue is resolved, we should remove this workaround. + if cfg!(unix) && !from.is_file() { + return Err(custom_error("NotFound", "File not found")); + } + + // returns size of from as u64 (we ignore) + std::fs::copy(&from, &to)?; + Ok(json!({})) +} + +async fn op_copy_file_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: CopyFileArgs = serde_json::from_value(args)?; + let from = PathBuf::from(&args.from); + let to = PathBuf::from(&args.to); + + { + let state = state.borrow(); + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&from)?; + permissions.check_write(&to)?; + } + + debug!("op_copy_file_async {} {}", from.display(), to.display()); + tokio::task::spawn_blocking(move || { + // On *nix, Rust reports non-existent `from` as ErrorKind::InvalidInput + // See https://github.com/rust-lang/rust/issues/54800 + // Once the issue is resolved, we should remove this workaround. + if cfg!(unix) && !from.is_file() { + return Err(custom_error("NotFound", "File not found")); + } + + // returns size of from as u64 (we ignore) + std::fs::copy(&from, &to)?; + Ok(json!({})) + }) + .await + .unwrap() +} + +fn to_msec(maybe_time: Result<SystemTime, io::Error>) -> Value { + match maybe_time { + Ok(time) => { + let msec = time + .duration_since(UNIX_EPOCH) + .map(|t| t.as_secs_f64() * 1000f64) + .unwrap_or_else(|err| err.duration().as_secs_f64() * -1000f64); + serde_json::Number::from_f64(msec) + .map(Value::Number) + .unwrap_or(Value::Null) + } + Err(_) => Value::Null, + } +} + +#[inline(always)] +fn get_stat_json(metadata: std::fs::Metadata) -> Value { + // Unix stat member (number types only). 0 if not on unix. + macro_rules! usm { + ($member:ident) => {{ + #[cfg(unix)] + { + metadata.$member() + } + #[cfg(not(unix))] + { + 0 + } + }}; + } + + #[cfg(unix)] + use std::os::unix::fs::MetadataExt; + let json_val = json!({ + "isFile": metadata.is_file(), + "isDirectory": metadata.is_dir(), + "isSymlink": metadata.file_type().is_symlink(), + "size": metadata.len(), + // In milliseconds, like JavaScript. Available on both Unix or Windows. + "mtime": to_msec(metadata.modified()), + "atime": to_msec(metadata.accessed()), + "birthtime": to_msec(metadata.created()), + // Following are only valid under Unix. + "dev": usm!(dev), + "ino": usm!(ino), + "mode": usm!(mode), + "nlink": usm!(nlink), + "uid": usm!(uid), + "gid": usm!(gid), + "rdev": usm!(rdev), + // TODO(kevinkassimo): *time_nsec requires BigInt. + // Probably should be treated as String if we need to add them. + "blksize": usm!(blksize), + "blocks": usm!(blocks), + }); + json_val +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct StatArgs { + path: String, + lstat: bool, +} + +fn op_stat_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: StatArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let lstat = args.lstat; + state.borrow::<Permissions>().check_read(&path)?; + debug!("op_stat_sync {} {}", path.display(), lstat); + let metadata = if lstat { + std::fs::symlink_metadata(&path)? + } else { + std::fs::metadata(&path)? + }; + Ok(get_stat_json(metadata)) +} + +async fn op_stat_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: StatArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let lstat = args.lstat; + + { + let state = state.borrow(); + state.borrow::<Permissions>().check_read(&path)?; + } + + tokio::task::spawn_blocking(move || { + debug!("op_stat_async {} {}", path.display(), lstat); + let metadata = if lstat { + std::fs::symlink_metadata(&path)? + } else { + std::fs::metadata(&path)? + }; + Ok(get_stat_json(metadata)) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RealpathArgs { + path: String, +} + +fn op_realpath_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: RealpathArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&path)?; + if path.is_relative() { + permissions.check_read_blind(¤t_dir()?, "CWD")?; + } + + debug!("op_realpath_sync {}", path.display()); + // corresponds to the realpath on Unix and + // CreateFile and GetFinalPathNameByHandle on Windows + let realpath = canonicalize_path(&path)?; + let realpath_str = into_string(realpath.into_os_string())?; + Ok(json!(realpath_str)) +} + +async fn op_realpath_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: RealpathArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + + { + let state = state.borrow(); + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&path)?; + if path.is_relative() { + permissions.check_read_blind(¤t_dir()?, "CWD")?; + } + } + + tokio::task::spawn_blocking(move || { + debug!("op_realpath_async {}", path.display()); + // corresponds to the realpath on Unix and + // CreateFile and GetFinalPathNameByHandle on Windows + let realpath = canonicalize_path(&path)?; + let realpath_str = into_string(realpath.into_os_string())?; + Ok(json!(realpath_str)) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReadDirArgs { + path: String, +} + +fn op_read_dir_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: ReadDirArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + + state.borrow::<Permissions>().check_read(&path)?; + + debug!("op_read_dir_sync {}", path.display()); + let entries: Vec<_> = std::fs::read_dir(path)? + .filter_map(|entry| { + let entry = entry.unwrap(); + let file_type = entry.file_type().unwrap(); + // Not all filenames can be encoded as UTF-8. Skip those for now. + if let Ok(name) = into_string(entry.file_name()) { + Some(json!({ + "name": name, + "isFile": file_type.is_file(), + "isDirectory": file_type.is_dir(), + "isSymlink": file_type.is_symlink() + })) + } else { + None + } + }) + .collect(); + + Ok(json!({ "entries": entries })) +} + +async fn op_read_dir_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: ReadDirArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + { + let state = state.borrow(); + state.borrow::<Permissions>().check_read(&path)?; + } + tokio::task::spawn_blocking(move || { + debug!("op_read_dir_async {}", path.display()); + let entries: Vec<_> = std::fs::read_dir(path)? + .filter_map(|entry| { + let entry = entry.unwrap(); + let file_type = entry.file_type().unwrap(); + // Not all filenames can be encoded as UTF-8. Skip those for now. + if let Ok(name) = into_string(entry.file_name()) { + Some(json!({ + "name": name, + "isFile": file_type.is_file(), + "isDirectory": file_type.is_dir(), + "isSymlink": file_type.is_symlink() + })) + } else { + None + } + }) + .collect(); + + Ok(json!({ "entries": entries })) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RenameArgs { + oldpath: String, + newpath: String, +} + +fn op_rename_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: RenameArgs = serde_json::from_value(args)?; + let oldpath = PathBuf::from(&args.oldpath); + let newpath = PathBuf::from(&args.newpath); + + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&oldpath)?; + permissions.check_write(&oldpath)?; + permissions.check_write(&newpath)?; + debug!("op_rename_sync {} {}", oldpath.display(), newpath.display()); + std::fs::rename(&oldpath, &newpath)?; + Ok(json!({})) +} + +async fn op_rename_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: RenameArgs = serde_json::from_value(args)?; + let oldpath = PathBuf::from(&args.oldpath); + let newpath = PathBuf::from(&args.newpath); + { + let state = state.borrow(); + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&oldpath)?; + permissions.check_write(&oldpath)?; + permissions.check_write(&newpath)?; + } + tokio::task::spawn_blocking(move || { + debug!( + "op_rename_async {} {}", + oldpath.display(), + newpath.display() + ); + std::fs::rename(&oldpath, &newpath)?; + Ok(json!({})) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct LinkArgs { + oldpath: String, + newpath: String, +} + +fn op_link_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.link"); + let args: LinkArgs = serde_json::from_value(args)?; + let oldpath = PathBuf::from(&args.oldpath); + let newpath = PathBuf::from(&args.newpath); + + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&oldpath)?; + permissions.check_write(&newpath)?; + + debug!("op_link_sync {} {}", oldpath.display(), newpath.display()); + std::fs::hard_link(&oldpath, &newpath)?; + Ok(json!({})) +} + +async fn op_link_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + super::check_unstable2(&state, "Deno.link"); + + let args: LinkArgs = serde_json::from_value(args)?; + let oldpath = PathBuf::from(&args.oldpath); + let newpath = PathBuf::from(&args.newpath); + + { + let state = state.borrow(); + let permissions = state.borrow::<Permissions>(); + permissions.check_read(&oldpath)?; + permissions.check_write(&newpath)?; + } + + tokio::task::spawn_blocking(move || { + debug!("op_link_async {} {}", oldpath.display(), newpath.display()); + std::fs::hard_link(&oldpath, &newpath)?; + Ok(json!({})) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SymlinkArgs { + oldpath: String, + newpath: String, + #[cfg(not(unix))] + options: Option<SymlinkOptions>, +} + +#[cfg(not(unix))] +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SymlinkOptions { + _type: String, +} + +fn op_symlink_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.symlink"); + let args: SymlinkArgs = serde_json::from_value(args)?; + let oldpath = PathBuf::from(&args.oldpath); + let newpath = PathBuf::from(&args.newpath); + + state.borrow::<Permissions>().check_write(&newpath)?; + + debug!( + "op_symlink_sync {} {}", + oldpath.display(), + newpath.display() + ); + #[cfg(unix)] + { + use std::os::unix::fs::symlink; + symlink(&oldpath, &newpath)?; + Ok(json!({})) + } + #[cfg(not(unix))] + { + use std::os::windows::fs::{symlink_dir, symlink_file}; + + match args.options { + Some(options) => match options._type.as_ref() { + "file" => symlink_file(&oldpath, &newpath)?, + "dir" => symlink_dir(&oldpath, &newpath)?, + _ => return Err(type_error("unsupported type")), + }, + None => { + let old_meta = std::fs::metadata(&oldpath); + match old_meta { + Ok(metadata) => { + if metadata.is_file() { + symlink_file(&oldpath, &newpath)? + } else if metadata.is_dir() { + symlink_dir(&oldpath, &newpath)? + } + } + Err(_) => return Err(type_error("you must pass a `options` argument for non-existent target path in windows".to_string())), + } + } + }; + Ok(json!({})) + } +} + +async fn op_symlink_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + super::check_unstable2(&state, "Deno.symlink"); + + let args: SymlinkArgs = serde_json::from_value(args)?; + let oldpath = PathBuf::from(&args.oldpath); + let newpath = PathBuf::from(&args.newpath); + + { + let state = state.borrow(); + state.borrow::<Permissions>().check_write(&newpath)?; + } + + tokio::task::spawn_blocking(move || { + debug!("op_symlink_async {} {}", oldpath.display(), newpath.display()); + #[cfg(unix)] + { + use std::os::unix::fs::symlink; + symlink(&oldpath, &newpath)?; + Ok(json!({})) + } + #[cfg(not(unix))] + { + use std::os::windows::fs::{symlink_dir, symlink_file}; + + match args.options { + Some(options) => match options._type.as_ref() { + "file" => symlink_file(&oldpath, &newpath)?, + "dir" => symlink_dir(&oldpath, &newpath)?, + _ => return Err(type_error("unsupported type")), + }, + None => { + let old_meta = std::fs::metadata(&oldpath); + match old_meta { + Ok(metadata) => { + if metadata.is_file() { + symlink_file(&oldpath, &newpath)? + } else if metadata.is_dir() { + symlink_dir(&oldpath, &newpath)? + } + } + Err(_) => return Err(type_error("you must pass a `options` argument for non-existent target path in windows".to_string())), + } + } + }; + Ok(json!({})) + } + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ReadLinkArgs { + path: String, +} + +fn op_read_link_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: ReadLinkArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + + state.borrow::<Permissions>().check_read(&path)?; + + debug!("op_read_link_value {}", path.display()); + let target = std::fs::read_link(&path)?.into_os_string(); + let targetstr = into_string(target)?; + Ok(json!(targetstr)) +} + +async fn op_read_link_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: ReadLinkArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + { + let state = state.borrow(); + state.borrow::<Permissions>().check_read(&path)?; + } + tokio::task::spawn_blocking(move || { + debug!("op_read_link_async {}", path.display()); + let target = std::fs::read_link(&path)?.into_os_string(); + let targetstr = into_string(target)?; + Ok(json!(targetstr)) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct FtruncateArgs { + rid: i32, + len: i32, +} + +fn op_ftruncate_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.ftruncate"); + let args: FtruncateArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let len = args.len as u64; + std_file_resource(state, rid, |r| match r { + Ok(std_file) => std_file.set_len(len).map_err(AnyError::from), + Err(_) => Err(type_error("cannot truncate this type of resource")), + })?; + Ok(json!({})) +} + +async fn op_ftruncate_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + super::check_unstable2(&state, "Deno.ftruncate"); + let args: FtruncateArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let len = args.len as u64; + std_file_resource(&mut state.borrow_mut(), rid, |r| match r { + Ok(std_file) => std_file.set_len(len).map_err(AnyError::from), + Err(_) => Err(type_error("cannot truncate this type of resource")), + })?; + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct TruncateArgs { + path: String, + len: u64, +} + +fn op_truncate_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: TruncateArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let len = args.len; + + state.borrow::<Permissions>().check_write(&path)?; + + debug!("op_truncate_sync {} {}", path.display(), len); + let f = std::fs::OpenOptions::new().write(true).open(&path)?; + f.set_len(len)?; + Ok(json!({})) +} + +async fn op_truncate_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: TruncateArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let len = args.len; + { + let state = state.borrow(); + state.borrow::<Permissions>().check_write(&path)?; + } + tokio::task::spawn_blocking(move || { + debug!("op_truncate_async {} {}", path.display(), len); + let f = std::fs::OpenOptions::new().write(true).open(&path)?; + f.set_len(len)?; + Ok(json!({})) + }) + .await + .unwrap() +} + +fn make_temp( + dir: Option<&Path>, + prefix: Option<&str>, + suffix: Option<&str>, + is_dir: bool, +) -> std::io::Result<PathBuf> { + let prefix_ = prefix.unwrap_or(""); + let suffix_ = suffix.unwrap_or(""); + let mut buf: PathBuf = match dir { + Some(ref p) => p.to_path_buf(), + None => temp_dir(), + } + .join("_"); + let mut rng = thread_rng(); + loop { + let unique = rng.gen::<u32>(); + buf.set_file_name(format!("{}{:08x}{}", prefix_, unique, suffix_)); + let r = if is_dir { + #[allow(unused_mut)] + let mut builder = std::fs::DirBuilder::new(); + #[cfg(unix)] + { + use std::os::unix::fs::DirBuilderExt; + builder.mode(0o700); + } + builder.create(buf.as_path()) + } else { + let mut open_options = std::fs::OpenOptions::new(); + open_options.write(true).create_new(true); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + open_options.mode(0o600); + } + open_options.open(buf.as_path())?; + Ok(()) + }; + match r { + Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue, + Ok(_) => return Ok(buf), + Err(e) => return Err(e), + } + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct MakeTempArgs { + dir: Option<String>, + prefix: Option<String>, + suffix: Option<String>, +} + +fn op_make_temp_dir_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: MakeTempArgs = serde_json::from_value(args)?; + + let dir = args.dir.map(|s| PathBuf::from(&s)); + let prefix = args.prefix.map(String::from); + let suffix = args.suffix.map(String::from); + + state + .borrow::<Permissions>() + .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; + + // TODO(piscisaureus): use byte vector for paths, not a string. + // See https://github.com/denoland/deno/issues/627. + // We can't assume that paths are always valid utf8 strings. + let path = make_temp( + // Converting Option<String> to Option<&str> + dir.as_deref(), + prefix.as_deref(), + suffix.as_deref(), + true, + )?; + let path_str = into_string(path.into_os_string())?; + + Ok(json!(path_str)) +} + +async fn op_make_temp_dir_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: MakeTempArgs = serde_json::from_value(args)?; + + let dir = args.dir.map(|s| PathBuf::from(&s)); + let prefix = args.prefix.map(String::from); + let suffix = args.suffix.map(String::from); + { + let state = state.borrow(); + state + .borrow::<Permissions>() + .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; + } + tokio::task::spawn_blocking(move || { + // TODO(piscisaureus): use byte vector for paths, not a string. + // See https://github.com/denoland/deno/issues/627. + // We can't assume that paths are always valid utf8 strings. + let path = make_temp( + // Converting Option<String> to Option<&str> + dir.as_deref(), + prefix.as_deref(), + suffix.as_deref(), + true, + )?; + let path_str = into_string(path.into_os_string())?; + + Ok(json!(path_str)) + }) + .await + .unwrap() +} + +fn op_make_temp_file_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: MakeTempArgs = serde_json::from_value(args)?; + + let dir = args.dir.map(|s| PathBuf::from(&s)); + let prefix = args.prefix.map(String::from); + let suffix = args.suffix.map(String::from); + + state + .borrow::<Permissions>() + .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; + + // TODO(piscisaureus): use byte vector for paths, not a string. + // See https://github.com/denoland/deno/issues/627. + // We can't assume that paths are always valid utf8 strings. + let path = make_temp( + // Converting Option<String> to Option<&str> + dir.as_deref(), + prefix.as_deref(), + suffix.as_deref(), + false, + )?; + let path_str = into_string(path.into_os_string())?; + + Ok(json!(path_str)) +} + +async fn op_make_temp_file_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: MakeTempArgs = serde_json::from_value(args)?; + + let dir = args.dir.map(|s| PathBuf::from(&s)); + let prefix = args.prefix.map(String::from); + let suffix = args.suffix.map(String::from); + { + let state = state.borrow(); + state + .borrow::<Permissions>() + .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; + } + tokio::task::spawn_blocking(move || { + // TODO(piscisaureus): use byte vector for paths, not a string. + // See https://github.com/denoland/deno/issues/627. + // We can't assume that paths are always valid utf8 strings. + let path = make_temp( + // Converting Option<String> to Option<&str> + dir.as_deref(), + prefix.as_deref(), + suffix.as_deref(), + false, + )?; + let path_str = into_string(path.into_os_string())?; + + Ok(json!(path_str)) + }) + .await + .unwrap() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct FutimeArgs { + rid: i32, + atime: (i64, u32), + mtime: (i64, u32), +} + +fn op_futime_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.futimeSync"); + let args: FutimeArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); + let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); + + std_file_resource(state, rid, |r| match r { + Ok(std_file) => { + filetime::set_file_handle_times(std_file, Some(atime), Some(mtime)) + .map_err(AnyError::from) + } + Err(_) => Err(type_error( + "cannot futime on this type of resource".to_string(), + )), + })?; + + Ok(json!({})) +} + +async fn op_futime_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let mut state = state.borrow_mut(); + super::check_unstable(&state, "Deno.futime"); + let args: FutimeArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); + let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); + // TODO Not actually async! https://github.com/denoland/deno/issues/7400 + std_file_resource(&mut state, rid, |r| match r { + Ok(std_file) => { + filetime::set_file_handle_times(std_file, Some(atime), Some(mtime)) + .map_err(AnyError::from) + } + Err(_) => Err(type_error( + "cannot futime on this type of resource".to_string(), + )), + })?; + + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct UtimeArgs { + path: String, + atime: (i64, u32), + mtime: (i64, u32), +} + +fn op_utime_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.utime"); + + let args: UtimeArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); + let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); + + state.borrow::<Permissions>().check_write(&path)?; + filetime::set_file_times(path, atime, mtime)?; + Ok(json!({})) +} + +async fn op_utime_async( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let state = state.borrow(); + super::check_unstable(&state, "Deno.utime"); + + let args: UtimeArgs = serde_json::from_value(args)?; + let path = PathBuf::from(&args.path); + let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); + let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); + + state.borrow::<Permissions>().check_write(&path)?; + + tokio::task::spawn_blocking(move || { + filetime::set_file_times(path, atime, mtime)?; + Ok(json!({})) + }) + .await + .unwrap() +} + +fn op_cwd( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let path = current_dir()?; + state + .borrow::<Permissions>() + .check_read_blind(&path, "CWD")?; + let path_str = into_string(path.into_os_string())?; + Ok(json!(path_str)) +} diff --git a/runtime/ops/fs_events.rs b/runtime/ops/fs_events.rs new file mode 100644 index 000000000..4832c915c --- /dev/null +++ b/runtime/ops/fs_events.rs @@ -0,0 +1,133 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::permissions::Permissions; +use deno_core::error::bad_resource_id; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use notify::event::Event as NotifyEvent; +use notify::Error as NotifyError; +use notify::EventKind; +use notify::RecommendedWatcher; +use notify::RecursiveMode; +use notify::Watcher; +use serde::Deserialize; +use serde::Serialize; +use std::cell::RefCell; +use std::convert::From; +use std::path::PathBuf; +use std::rc::Rc; +use tokio::sync::mpsc; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_fs_events_open", op_fs_events_open); + super::reg_json_async(rt, "op_fs_events_poll", op_fs_events_poll); +} + +struct FsEventsResource { + #[allow(unused)] + watcher: RecommendedWatcher, + receiver: mpsc::Receiver<Result<FsEvent, AnyError>>, +} + +/// Represents a file system event. +/// +/// We do not use the event directly from the notify crate. We flatten +/// the structure into this simpler structure. We want to only make it more +/// complex as needed. +/// +/// Feel free to expand this struct as long as you can add tests to demonstrate +/// the complexity. +#[derive(Serialize, Debug)] +struct FsEvent { + kind: String, + paths: Vec<PathBuf>, +} + +impl From<NotifyEvent> for FsEvent { + fn from(e: NotifyEvent) -> Self { + let kind = match e.kind { + EventKind::Any => "any", + EventKind::Access(_) => "access", + EventKind::Create(_) => "create", + EventKind::Modify(_) => "modify", + EventKind::Remove(_) => "remove", + EventKind::Other => todo!(), // What's this for? Leaving it out for now. + } + .to_string(); + FsEvent { + kind, + paths: e.paths, + } + } +} + +fn op_fs_events_open( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + #[derive(Deserialize)] + struct OpenArgs { + recursive: bool, + paths: Vec<String>, + } + let args: OpenArgs = serde_json::from_value(args)?; + let (sender, receiver) = mpsc::channel::<Result<FsEvent, AnyError>>(16); + let sender = std::sync::Mutex::new(sender); + let mut watcher: RecommendedWatcher = + Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| { + let res2 = res.map(FsEvent::from).map_err(AnyError::from); + let mut sender = sender.lock().unwrap(); + // Ignore result, if send failed it means that watcher was already closed, + // but not all messages have been flushed. + let _ = sender.try_send(res2); + })?; + let recursive_mode = if args.recursive { + RecursiveMode::Recursive + } else { + RecursiveMode::NonRecursive + }; + for path in &args.paths { + state + .borrow::<Permissions>() + .check_read(&PathBuf::from(path))?; + watcher.watch(path, recursive_mode)?; + } + let resource = FsEventsResource { watcher, receiver }; + let rid = state.resource_table.add("fsEvents", Box::new(resource)); + Ok(json!(rid)) +} + +async fn op_fs_events_poll( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + #[derive(Deserialize)] + struct PollArgs { + rid: u32, + } + let PollArgs { rid } = serde_json::from_value(args)?; + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let watcher = state + .resource_table + .get_mut::<FsEventsResource>(rid) + .ok_or_else(bad_resource_id)?; + watcher + .receiver + .poll_recv(cx) + .map(|maybe_result| match maybe_result { + Some(Ok(value)) => Ok(json!({ "value": value, "done": false })), + Some(Err(err)) => Err(err), + None => Ok(json!({ "done": true })), + }) + }) + .await +} diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs new file mode 100644 index 000000000..0f8af905a --- /dev/null +++ b/runtime/ops/io.rs @@ -0,0 +1,473 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use super::dispatch_minimal::minimal_op; +use super::dispatch_minimal::MinimalOp; +use crate::metrics::metrics_op; +use deno_core::error::bad_resource_id; +use deno_core::error::resource_unavailable; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::future::poll_fn; +use deno_core::futures::future::FutureExt; +use deno_core::futures::ready; +use deno_core::BufVec; +use deno_core::JsRuntime; +use deno_core::OpState; +use std::cell::RefCell; +use std::collections::HashMap; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::Context; +use std::task::Poll; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpStream; +use tokio_rustls::client::TlsStream as ClientTlsStream; +use tokio_rustls::server::TlsStream as ServerTlsStream; + +#[cfg(not(windows))] +use std::os::unix::io::FromRawFd; + +#[cfg(windows)] +use std::os::windows::io::FromRawHandle; + +lazy_static! { + /// Due to portability issues on Windows handle to stdout is created from raw + /// file descriptor. The caveat of that approach is fact that when this + /// handle is dropped underlying file descriptor is closed - that is highly + /// not desirable in case of stdout. That's why we store this global handle + /// that is then cloned when obtaining stdio for process. In turn when + /// resource table is dropped storing reference to that handle, the handle + /// itself won't be closed (so Deno.core.print) will still work. + // TODO(ry) It should be possible to close stdout. + static ref STDIN_HANDLE: Option<std::fs::File> = { + #[cfg(not(windows))] + let stdin = unsafe { Some(std::fs::File::from_raw_fd(0)) }; + #[cfg(windows)] + let stdin = unsafe { + let handle = winapi::um::processenv::GetStdHandle( + winapi::um::winbase::STD_INPUT_HANDLE, + ); + if handle.is_null() { + return None; + } + Some(std::fs::File::from_raw_handle(handle)) + }; + stdin + }; + static ref STDOUT_HANDLE: Option<std::fs::File> = { + #[cfg(not(windows))] + let stdout = unsafe { Some(std::fs::File::from_raw_fd(1)) }; + #[cfg(windows)] + let stdout = unsafe { + let handle = winapi::um::processenv::GetStdHandle( + winapi::um::winbase::STD_OUTPUT_HANDLE, + ); + if handle.is_null() { + return None; + } + Some(std::fs::File::from_raw_handle(handle)) + }; + stdout + }; + static ref STDERR_HANDLE: Option<std::fs::File> = { + #[cfg(not(windows))] + let stderr = unsafe { Some(std::fs::File::from_raw_fd(2)) }; + #[cfg(windows)] + let stderr = unsafe { + let handle = winapi::um::processenv::GetStdHandle( + winapi::um::winbase::STD_ERROR_HANDLE, + ); + if handle.is_null() { + return None; + } + Some(std::fs::File::from_raw_handle(handle)) + }; + stderr + }; +} + +pub fn init(rt: &mut JsRuntime) { + rt.register_op("op_read", metrics_op(minimal_op(op_read))); + rt.register_op("op_write", metrics_op(minimal_op(op_write))); +} + +pub fn get_stdio() -> ( + Option<StreamResourceHolder>, + Option<StreamResourceHolder>, + Option<StreamResourceHolder>, +) { + let stdin = get_stdio_stream(&STDIN_HANDLE); + let stdout = get_stdio_stream(&STDOUT_HANDLE); + let stderr = get_stdio_stream(&STDERR_HANDLE); + + (stdin, stdout, stderr) +} + +fn get_stdio_stream( + handle: &Option<std::fs::File>, +) -> Option<StreamResourceHolder> { + match handle { + None => None, + Some(file_handle) => match file_handle.try_clone() { + Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile( + Some((tokio::fs::File::from_std(clone), FileMetadata::default())), + ))), + Err(_e) => None, + }, + } +} + +fn no_buffer_specified() -> AnyError { + type_error("no buffer specified") +} + +#[cfg(unix)] +use nix::sys::termios; + +#[derive(Default)] +pub struct TTYMetadata { + #[cfg(unix)] + pub mode: Option<termios::Termios>, +} + +#[derive(Default)] +pub struct FileMetadata { + pub tty: TTYMetadata, +} + +pub struct StreamResourceHolder { + pub resource: StreamResource, + waker: HashMap<usize, futures::task::AtomicWaker>, + waker_counter: AtomicUsize, +} + +impl StreamResourceHolder { + pub fn new(resource: StreamResource) -> StreamResourceHolder { + StreamResourceHolder { + resource, + // Atleast one task is expecter for the resource + waker: HashMap::with_capacity(1), + // Tracks wakers Ids + waker_counter: AtomicUsize::new(0), + } + } +} + +impl Drop for StreamResourceHolder { + fn drop(&mut self) { + self.wake_tasks(); + } +} + +impl StreamResourceHolder { + pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> { + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + // Its OK if it overflows + let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed); + self.waker.insert(task_waker_id, waker); + Ok(task_waker_id) + } + + pub fn wake_tasks(&mut self) { + for waker in self.waker.values() { + waker.wake(); + } + } + + pub fn untrack_task(&mut self, task_waker_id: usize) { + self.waker.remove(&task_waker_id); + } +} + +pub enum StreamResource { + FsFile(Option<(tokio::fs::File, FileMetadata)>), + TcpStream(Option<tokio::net::TcpStream>), + #[cfg(not(windows))] + UnixStream(tokio::net::UnixStream), + ServerTlsStream(Box<ServerTlsStream<TcpStream>>), + ClientTlsStream(Box<ClientTlsStream<TcpStream>>), + ChildStdin(tokio::process::ChildStdin), + ChildStdout(tokio::process::ChildStdout), + ChildStderr(tokio::process::ChildStderr), +} + +trait UnpinAsyncRead: AsyncRead + Unpin {} +trait UnpinAsyncWrite: AsyncWrite + Unpin {} + +impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {} +impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {} + +/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait +/// but uses an `AnyError` error instead of `std::io:Error` +pub trait DenoAsyncRead { + fn poll_read( + &mut self, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, AnyError>>; +} + +impl DenoAsyncRead for StreamResource { + fn poll_read( + &mut self, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, AnyError>> { + use StreamResource::*; + let f: &mut dyn UnpinAsyncRead = match self { + FsFile(Some((f, _))) => f, + FsFile(None) => return Poll::Ready(Err(resource_unavailable())), + TcpStream(Some(f)) => f, + #[cfg(not(windows))] + UnixStream(f) => f, + ClientTlsStream(f) => f, + ServerTlsStream(f) => f, + ChildStdout(f) => f, + ChildStderr(f) => f, + _ => return Err(bad_resource_id()).into(), + }; + let v = ready!(Pin::new(f).poll_read(cx, buf))?; + Ok(v).into() + } +} + +pub fn op_read( + state: Rc<RefCell<OpState>>, + is_sync: bool, + rid: i32, + mut zero_copy: BufVec, +) -> MinimalOp { + debug!("read rid={}", rid); + match zero_copy.len() { + 0 => return MinimalOp::Sync(Err(no_buffer_specified())), + 1 => {} + _ => panic!("Invalid number of arguments"), + } + + if is_sync { + MinimalOp::Sync({ + // First we look up the rid in the resource table. + std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r { + Ok(std_file) => { + use std::io::Read; + std_file + .read(&mut zero_copy[0]) + .map(|n: usize| n as i32) + .map_err(AnyError::from) + } + Err(_) => Err(type_error("sync read not allowed on this resource")), + }) + }) + } else { + let mut zero_copy = zero_copy[0].clone(); + MinimalOp::Async( + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let resource_holder = state + .resource_table + .get_mut::<StreamResourceHolder>(rid as u32) + .ok_or_else(bad_resource_id)?; + + let mut task_tracker_id: Option<usize> = None; + let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy) + { + Poll::Ready(t) => { + if let Some(id) = task_tracker_id { + resource_holder.untrack_task(id); + } + t + } + Poll::Pending => { + task_tracker_id.replace(resource_holder.track_task(cx)?); + return Poll::Pending; + } + }?; + Poll::Ready(Ok(nread as i32)) + }) + .boxed_local(), + ) + } +} + +/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait +/// but uses an `AnyError` error instead of `std::io:Error` +pub trait DenoAsyncWrite { + fn poll_write( + &mut self, + cx: &mut Context, + buf: &[u8], + ) -> Poll<Result<usize, AnyError>>; + + fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; + + fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; +} + +impl DenoAsyncWrite for StreamResource { + fn poll_write( + &mut self, + cx: &mut Context, + buf: &[u8], + ) -> Poll<Result<usize, AnyError>> { + use StreamResource::*; + let f: &mut dyn UnpinAsyncWrite = match self { + FsFile(Some((f, _))) => f, + FsFile(None) => return Poll::Pending, + TcpStream(Some(f)) => f, + #[cfg(not(windows))] + UnixStream(f) => f, + ClientTlsStream(f) => f, + ServerTlsStream(f) => f, + ChildStdin(f) => f, + _ => return Err(bad_resource_id()).into(), + }; + + let v = ready!(Pin::new(f).poll_write(cx, buf))?; + Ok(v).into() + } + + fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> { + use StreamResource::*; + let f: &mut dyn UnpinAsyncWrite = match self { + FsFile(Some((f, _))) => f, + FsFile(None) => return Poll::Pending, + TcpStream(Some(f)) => f, + #[cfg(not(windows))] + UnixStream(f) => f, + ClientTlsStream(f) => f, + ServerTlsStream(f) => f, + ChildStdin(f) => f, + _ => return Err(bad_resource_id()).into(), + }; + + ready!(Pin::new(f).poll_flush(cx))?; + Ok(()).into() + } + + fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> { + unimplemented!() + } +} + +pub fn op_write( + state: Rc<RefCell<OpState>>, + is_sync: bool, + rid: i32, + zero_copy: BufVec, +) -> MinimalOp { + debug!("write rid={}", rid); + match zero_copy.len() { + 0 => return MinimalOp::Sync(Err(no_buffer_specified())), + 1 => {} + _ => panic!("Invalid number of arguments"), + } + + if is_sync { + MinimalOp::Sync({ + // First we look up the rid in the resource table. + std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r { + Ok(std_file) => { + use std::io::Write; + std_file + .write(&zero_copy[0]) + .map(|nwritten: usize| nwritten as i32) + .map_err(AnyError::from) + } + Err(_) => Err(type_error("sync read not allowed on this resource")), + }) + }) + } else { + let zero_copy = zero_copy[0].clone(); + MinimalOp::Async( + async move { + let nwritten = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let resource_holder = state + .resource_table + .get_mut::<StreamResourceHolder>(rid as u32) + .ok_or_else(bad_resource_id)?; + resource_holder.resource.poll_write(cx, &zero_copy) + }) + .await?; + + // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 + // and the reasons for the need to explicitly flush are not fully known. + // Figure out why it's needed and preferably remove it. + // https://github.com/denoland/deno/issues/3565 + poll_fn(|cx| { + let mut state = state.borrow_mut(); + let resource_holder = state + .resource_table + .get_mut::<StreamResourceHolder>(rid as u32) + .ok_or_else(bad_resource_id)?; + resource_holder.resource.poll_flush(cx) + }) + .await?; + + Ok(nwritten as i32) + } + .boxed_local(), + ) + } +} + +/// Helper function for operating on a std::fs::File stored in the resource table. +/// +/// We store file system file resources as tokio::fs::File, so this is a little +/// utility function that gets a std::fs:File when you need to do blocking +/// operations. +/// +/// Returns ErrorKind::Busy if the resource is being used by another op. +pub fn std_file_resource<F, T>( + state: &mut OpState, + rid: u32, + mut f: F, +) -> Result<T, AnyError> +where + F: FnMut( + Result<&mut std::fs::File, &mut StreamResource>, + ) -> Result<T, AnyError>, +{ + // First we look up the rid in the resource table. + let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid); + if let Some(ref mut resource_holder) = r { + // Sync write only works for FsFile. It doesn't make sense to do this + // for non-blocking sockets. So we error out if not FsFile. + match &mut resource_holder.resource { + StreamResource::FsFile(option_file_metadata) => { + // The object in the resource table is a tokio::fs::File - but in + // order to do a blocking write on it, we must turn it into a + // std::fs::File. Hopefully this code compiles down to nothing. + if let Some((tokio_file, metadata)) = option_file_metadata.take() { + match tokio_file.try_into_std() { + Ok(mut std_file) => { + let result = f(Ok(&mut std_file)); + // Turn the std_file handle back into a tokio file, put it back + // in the resource table. + let tokio_file = tokio::fs::File::from_std(std_file); + resource_holder.resource = + StreamResource::FsFile(Some((tokio_file, metadata))); + // return the result. + result + } + Err(tokio_file) => { + // This function will return an error containing the file if + // some operation is in-flight. + resource_holder.resource = + StreamResource::FsFile(Some((tokio_file, metadata))); + Err(resource_unavailable()) + } + } + } else { + Err(resource_unavailable()) + } + } + _ => f(Err(&mut resource_holder.resource)), + } + } else { + Err(bad_resource_id()) + } +} diff --git a/runtime/ops/mod.rs b/runtime/ops/mod.rs new file mode 100644 index 000000000..a27122657 --- /dev/null +++ b/runtime/ops/mod.rs @@ -0,0 +1,89 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +mod dispatch_minimal; +pub use dispatch_minimal::MinimalOp; + +pub mod crypto; +pub mod fetch; +pub mod fs; +pub mod fs_events; +pub mod io; +pub mod net; +#[cfg(unix)] +mod net_unix; +pub mod os; +pub mod permissions; +pub mod plugin; +pub mod process; +pub mod runtime; +pub mod signal; +pub mod timers; +pub mod tls; +pub mod tty; +pub mod web_worker; +pub mod websocket; +pub mod worker_host; + +use crate::metrics::metrics_op; +use deno_core::error::AnyError; +use deno_core::json_op_async; +use deno_core::json_op_sync; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::JsRuntime; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use std::cell::RefCell; +use std::future::Future; +use std::rc::Rc; + +pub fn reg_json_async<F, R>(rt: &mut JsRuntime, name: &'static str, op_fn: F) +where + F: Fn(Rc<RefCell<OpState>>, Value, BufVec) -> R + 'static, + R: Future<Output = Result<Value, AnyError>> + 'static, +{ + rt.register_op(name, metrics_op(json_op_async(op_fn))); +} + +pub fn reg_json_sync<F>(rt: &mut JsRuntime, name: &'static str, op_fn: F) +where + F: Fn(&mut OpState, Value, &mut [ZeroCopyBuf]) -> Result<Value, AnyError> + + 'static, +{ + rt.register_op(name, metrics_op(json_op_sync(op_fn))); +} + +/// `UnstableChecker` is a struct so it can be placed inside `GothamState`; +/// using type alias for a bool could work, but there's a high chance +/// that there might be another type alias pointing to a bool, which +/// would override previously used alias. +pub struct UnstableChecker { + pub unstable: bool, +} + +impl UnstableChecker { + /// Quits the process if the --unstable flag was not provided. + /// + /// This is intentionally a non-recoverable check so that people cannot probe + /// for unstable APIs from stable programs. + // NOTE(bartlomieju): keep in sync with `cli/program_state.rs` + pub fn check_unstable(&self, api_name: &str) { + if !self.unstable { + eprintln!( + "Unstable API '{}'. The --unstable flag must be provided.", + api_name + ); + std::process::exit(70); + } + } +} +/// Helper for checking unstable features. Used for sync ops. +pub fn check_unstable(state: &OpState, api_name: &str) { + state.borrow::<UnstableChecker>().check_unstable(api_name) +} + +/// Helper for checking unstable features. Used for async ops. +pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) { + let state = state.borrow(); + state.borrow::<UnstableChecker>().check_unstable(api_name) +} diff --git a/runtime/ops/net.rs b/runtime/ops/net.rs new file mode 100644 index 000000000..98ff83fc0 --- /dev/null +++ b/runtime/ops/net.rs @@ -0,0 +1,566 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::ops::io::StreamResource; +use crate::ops::io::StreamResourceHolder; +use crate::permissions::Permissions; +use crate::resolve_addr::resolve_addr; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; +use deno_core::error::generic_error; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::future::poll_fn; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::cell::RefCell; +use std::net::Shutdown; +use std::net::SocketAddr; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio::net::UdpSocket; + +#[cfg(unix)] +use super::net_unix; +#[cfg(unix)] +use std::path::Path; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_async(rt, "op_accept", op_accept); + super::reg_json_async(rt, "op_connect", op_connect); + super::reg_json_sync(rt, "op_shutdown", op_shutdown); + super::reg_json_sync(rt, "op_listen", op_listen); + super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); + super::reg_json_async(rt, "op_datagram_send", op_datagram_send); +} + +#[derive(Deserialize)] +pub(crate) struct AcceptArgs { + pub rid: i32, + pub transport: String, +} + +async fn accept_tcp( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let rid = args.rid as u32; + + let accept_fut = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let listener_resource = state + .resource_table + .get_mut::<TcpListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = &mut listener_resource.listener; + match listener.poll_accept(cx).map_err(AnyError::from) { + Poll::Ready(Ok((stream, addr))) => { + listener_resource.untrack_task(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending + } + Poll::Ready(Err(e)) => { + listener_resource.untrack_task(); + Poll::Ready(Err(e)) + } + } + }); + let (tcp_stream, _socket_addr) = accept_fut.await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state = state.borrow_mut(); + let rid = state.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( + tcp_stream, + )))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": "tcp", + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "tcp", + } + })) +} + +async fn op_accept( + state: Rc<RefCell<OpState>>, + args: Value, + bufs: BufVec, +) -> Result<Value, AnyError> { + let args: AcceptArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "tcp" => accept_tcp(state, args, bufs).await, + #[cfg(unix)] + "unix" => net_unix::accept_unix(state, args, bufs).await, + _ => Err(generic_error(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + +#[derive(Deserialize)] +pub(crate) struct ReceiveArgs { + pub rid: i32, + pub transport: String, +} + +async fn receive_udp( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + zero_copy: BufVec, +) -> Result<Value, AnyError> { + assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); + let mut zero_copy = zero_copy[0].clone(); + + let rid = args.rid as u32; + + let receive_fut = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let socket = &mut resource.socket; + socket + .poll_recv_from(cx, &mut zero_copy) + .map_err(AnyError::from) + }); + let (size, remote_addr) = receive_fut.await?; + Ok(json!({ + "size": size, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "udp", + } + })) +} + +async fn op_datagram_receive( + state: Rc<RefCell<OpState>>, + args: Value, + zero_copy: BufVec, +) -> Result<Value, AnyError> { + assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); + + let args: ReceiveArgs = serde_json::from_value(args)?; + match args.transport.as_str() { + "udp" => receive_udp(state, args, zero_copy).await, + #[cfg(unix)] + "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, + _ => Err(generic_error(format!( + "Unsupported transport protocol {}", + args.transport + ))), + } +} + +#[derive(Deserialize)] +struct SendArgs { + rid: i32, + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_datagram_send( + state: Rc<RefCell<OpState>>, + args: Value, + zero_copy: BufVec, +) -> Result<Value, AnyError> { + assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); + let zero_copy = zero_copy[0].clone(); + + match serde_json::from_value(args)? { + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "udp" => { + { + let s = state.borrow(); + s.borrow::<Permissions>() + .check_net(&args.hostname, args.port)?; + } + let addr = resolve_addr(&args.hostname, args.port)?; + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UdpSocketResource>(rid as u32) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + resource + .socket + .poll_send_to(cx, &zero_copy, &addr) + .map_ok(|byte_length| json!(byte_length)) + .map_err(AnyError::from) + }) + .await + } + #[cfg(unix)] + SendArgs { + rid, + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + let s = state.borrow(); + s.borrow::<Permissions>().check_write(&address_path)?; + } + let mut state = state.borrow_mut(); + let resource = state + .resource_table + .get_mut::<net_unix::UnixDatagramResource>(rid as u32) + .ok_or_else(|| { + custom_error("NotConnected", "Socket has been closed") + })?; + let socket = &mut resource.socket; + let byte_length = socket + .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap()) + .await?; + + Ok(json!(byte_length)) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Deserialize)] +struct ConnectArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +async fn op_connect( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + match serde_json::from_value(args)? { + ConnectArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } if transport == "tcp" => { + { + let state_ = state.borrow(); + state_ + .borrow::<Permissions>() + .check_net(&args.hostname, args.port)?; + } + let addr = resolve_addr(&args.hostname, args.port)?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_.resource_table.add( + "tcpStream", + Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( + tcp_stream, + )))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": transport, + } + })) + } + #[cfg(unix)] + ConnectArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" => { + let address_path = Path::new(&args.path); + super::check_unstable2(&state, "Deno.connect"); + { + let state_ = state.borrow(); + state_.borrow::<Permissions>().check_read(&address_path)?; + state_.borrow::<Permissions>().check_write(&address_path)?; + } + let path = args.path; + let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + + let mut state_ = state.borrow_mut(); + let rid = state_.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "path": local_addr.as_pathname(), + "transport": transport, + }, + "remoteAddr": { + "path": remote_addr.as_pathname(), + "transport": transport, + } + })) + } + _ => Err(type_error("Wrong argument format!")), + } +} + +#[derive(Deserialize)] +struct ShutdownArgs { + rid: i32, + how: i32, +} + +fn op_shutdown( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.shutdown"); + + let args: ShutdownArgs = serde_json::from_value(args)?; + + let rid = args.rid as u32; + let how = args.how; + + let shutdown_mode = match how { + 0 => Shutdown::Read, + 1 => Shutdown::Write, + _ => unimplemented!(), + }; + + let resource_holder = state + .resource_table + .get_mut::<StreamResourceHolder>(rid) + .ok_or_else(bad_resource_id)?; + match resource_holder.resource { + StreamResource::TcpStream(Some(ref mut stream)) => { + TcpStream::shutdown(stream, shutdown_mode)?; + } + #[cfg(unix)] + StreamResource::UnixStream(ref mut stream) => { + net_unix::UnixStream::shutdown(stream, shutdown_mode)?; + } + _ => return Err(bad_resource_id()), + } + + Ok(json!({})) +} + +#[allow(dead_code)] +struct TcpListenerResource { + listener: TcpListener, + waker: Option<futures::task::AtomicWaker>, + local_addr: SocketAddr, +} + +impl Drop for TcpListenerResource { + fn drop(&mut self) { + self.wake_task(); + } +} + +impl TcpListenerResource { + /// Track the current task so future awaiting for connection + /// can be notified when listener is closed. + /// + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if self.waker.is_some() { + return Err(custom_error("Busy", "Another accept task is ongoing")); + } + + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + self.waker.replace(waker); + Ok(()) + } + + /// Notifies a task when listener is closed so accept future can resolve. + pub fn wake_task(&mut self) { + if let Some(waker) = self.waker.as_ref() { + waker.wake(); + } + } + + /// Stop tracking a task. + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + if self.waker.is_some() { + self.waker.take(); + } + } +} + +struct UdpSocketResource { + socket: UdpSocket, +} + +#[derive(Deserialize)] +struct IpListenArgs { + hostname: String, + port: u16, +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum ArgsEnum { + Ip(IpListenArgs), + #[cfg(unix)] + Unix(net_unix::UnixListenArgs), +} + +#[derive(Deserialize)] +struct ListenArgs { + transport: String, + #[serde(flatten)] + transport_args: ArgsEnum, +} + +fn listen_tcp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let local_addr = listener.local_addr()?; + let listener_resource = TcpListenerResource { + listener, + waker: None, + local_addr, + }; + let rid = state + .resource_table + .add("tcpListener", Box::new(listener_resource)); + + Ok((rid, local_addr)) +} + +fn listen_udp( + state: &mut OpState, + addr: SocketAddr, +) -> Result<(u32, SocketAddr), AnyError> { + let std_socket = std::net::UdpSocket::bind(&addr)?; + let socket = UdpSocket::from_std(std_socket)?; + let local_addr = socket.local_addr()?; + let socket_resource = UdpSocketResource { socket }; + let rid = state + .resource_table + .add("udpSocket", Box::new(socket_resource)); + + Ok((rid, local_addr)) +} + +fn op_listen( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let permissions = state.borrow::<Permissions>(); + match serde_json::from_value(args)? { + ListenArgs { + transport, + transport_args: ArgsEnum::Ip(args), + } => { + { + if transport == "udp" { + super::check_unstable(state, "Deno.listenDatagram"); + } + permissions.check_net(&args.hostname, args.port)?; + } + let addr = resolve_addr(&args.hostname, args.port)?; + let (rid, local_addr) = if transport == "tcp" { + listen_tcp(state, addr)? + } else { + listen_udp(state, addr)? + }; + debug!( + "New listener {} {}:{}", + rid, + local_addr.ip().to_string(), + local_addr.port() + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": transport, + }, + })) + } + #[cfg(unix)] + ListenArgs { + transport, + transport_args: ArgsEnum::Unix(args), + } if transport == "unix" || transport == "unixpacket" => { + let address_path = Path::new(&args.path); + { + if transport == "unix" { + super::check_unstable(state, "Deno.listen"); + } + if transport == "unixpacket" { + super::check_unstable(state, "Deno.listenDatagram"); + } + permissions.check_read(&address_path)?; + permissions.check_write(&address_path)?; + } + let (rid, local_addr) = if transport == "unix" { + net_unix::listen_unix(state, &address_path)? + } else { + net_unix::listen_unix_packet(state, &address_path)? + }; + debug!( + "New listener {} {}", + rid, + local_addr.as_pathname().unwrap().display(), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "path": local_addr.as_pathname(), + "transport": transport, + }, + })) + } + #[cfg(unix)] + _ => Err(type_error("Wrong argument format!")), + } +} diff --git a/runtime/ops/net_unix.rs b/runtime/ops/net_unix.rs new file mode 100644 index 000000000..4c416a5a4 --- /dev/null +++ b/runtime/ops/net_unix.rs @@ -0,0 +1,151 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::ops::io::StreamResource; +use crate::ops::io::StreamResourceHolder; +use crate::ops::net::AcceptArgs; +use crate::ops::net::ReceiveArgs; +use deno_core::error::bad_resource; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use serde::Deserialize; +use std::cell::RefCell; +use std::fs::remove_file; +use std::os::unix; +use std::path::Path; +use std::rc::Rc; +use std::task::Poll; +use tokio::net::UnixDatagram; +use tokio::net::UnixListener; +pub use tokio::net::UnixStream; + +struct UnixListenerResource { + listener: UnixListener, +} + +pub struct UnixDatagramResource { + pub socket: UnixDatagram, + pub local_addr: unix::net::SocketAddr, +} + +#[derive(Deserialize)] +pub struct UnixListenArgs { + pub path: String, +} + +pub(crate) async fn accept_unix( + state: Rc<RefCell<OpState>>, + args: AcceptArgs, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let rid = args.rid as u32; + + let accept_fut = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let listener_resource = state + .resource_table + .get_mut::<UnixListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = &mut listener_resource.listener; + use deno_core::futures::StreamExt; + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(stream)) => { + //listener_resource.untrack_task(); + Poll::Ready(stream) + } + Poll::Ready(None) => todo!(), + Poll::Pending => { + //listener_resource.track_task(cx)?; + Poll::Pending + } + } + .map_err(AnyError::from) + }); + let unix_stream = accept_fut.await?; + + let local_addr = unix_stream.local_addr()?; + let remote_addr = unix_stream.peer_addr()?; + let mut state = state.borrow_mut(); + let rid = state.resource_table.add( + "unixStream", + Box::new(StreamResourceHolder::new(StreamResource::UnixStream( + unix_stream, + ))), + ); + Ok(json!({ + "rid": rid, + "localAddr": { + "path": local_addr.as_pathname(), + "transport": "unix", + }, + "remoteAddr": { + "path": remote_addr.as_pathname(), + "transport": "unix", + } + })) +} + +pub(crate) async fn receive_unix_packet( + state: Rc<RefCell<OpState>>, + args: ReceiveArgs, + bufs: BufVec, +) -> Result<Value, AnyError> { + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + + let rid = args.rid as u32; + let mut buf = bufs.into_iter().next().unwrap(); + + let mut state = state.borrow_mut(); + let resource = state + .resource_table + .get_mut::<UnixDatagramResource>(rid) + .ok_or_else(|| bad_resource("Socket has been closed"))?; + let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; + Ok(json!({ + "size": size, + "remoteAddr": { + "path": remote_addr.as_pathname(), + "transport": "unixpacket", + } + })) +} + +pub fn listen_unix( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, unix::net::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let listener = UnixListener::bind(&addr)?; + let local_addr = listener.local_addr()?; + let listener_resource = UnixListenerResource { listener }; + let rid = state + .resource_table + .add("unixListener", Box::new(listener_resource)); + + Ok((rid, local_addr)) +} + +pub fn listen_unix_packet( + state: &mut OpState, + addr: &Path, +) -> Result<(u32, unix::net::SocketAddr), AnyError> { + if addr.exists() { + remove_file(&addr).unwrap(); + } + let socket = UnixDatagram::bind(&addr)?; + let local_addr = socket.local_addr()?; + let datagram_resource = UnixDatagramResource { + socket, + local_addr: local_addr.clone(), + }; + let rid = state + .resource_table + .add("unixDatagram", Box::new(datagram_resource)); + + Ok((rid, local_addr)) +} diff --git a/runtime/ops/os.rs b/runtime/ops/os.rs new file mode 100644 index 000000000..6fd404a23 --- /dev/null +++ b/runtime/ops/os.rs @@ -0,0 +1,192 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::permissions::Permissions; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::url::Url; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::collections::HashMap; +use std::env; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_exit", op_exit); + super::reg_json_sync(rt, "op_env", op_env); + super::reg_json_sync(rt, "op_exec_path", op_exec_path); + super::reg_json_sync(rt, "op_set_env", op_set_env); + super::reg_json_sync(rt, "op_get_env", op_get_env); + super::reg_json_sync(rt, "op_delete_env", op_delete_env); + super::reg_json_sync(rt, "op_hostname", op_hostname); + super::reg_json_sync(rt, "op_loadavg", op_loadavg); + super::reg_json_sync(rt, "op_os_release", op_os_release); + super::reg_json_sync(rt, "op_system_memory_info", op_system_memory_info); + super::reg_json_sync(rt, "op_system_cpu_info", op_system_cpu_info); +} + +fn op_exec_path( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let current_exe = env::current_exe().unwrap(); + state + .borrow::<Permissions>() + .check_read_blind(¤t_exe, "exec_path")?; + // Now apply URL parser to current exe to get fully resolved path, otherwise + // we might get `./` and `../` bits in `exec_path` + let exe_url = Url::from_file_path(current_exe).unwrap(); + let path = exe_url.to_file_path().unwrap(); + Ok(json!(path)) +} + +#[derive(Deserialize)] +struct SetEnv { + key: String, + value: String, +} + +fn op_set_env( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: SetEnv = serde_json::from_value(args)?; + state.borrow::<Permissions>().check_env()?; + env::set_var(args.key, args.value); + Ok(json!({})) +} + +fn op_env( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + state.borrow::<Permissions>().check_env()?; + let v = env::vars().collect::<HashMap<String, String>>(); + Ok(json!(v)) +} + +#[derive(Deserialize)] +struct GetEnv { + key: String, +} + +fn op_get_env( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: GetEnv = serde_json::from_value(args)?; + state.borrow::<Permissions>().check_env()?; + let r = match env::var(args.key) { + Err(env::VarError::NotPresent) => json!([]), + v => json!([v?]), + }; + Ok(r) +} + +#[derive(Deserialize)] +struct DeleteEnv { + key: String, +} + +fn op_delete_env( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: DeleteEnv = serde_json::from_value(args)?; + state.borrow::<Permissions>().check_env()?; + env::remove_var(args.key); + Ok(json!({})) +} + +#[derive(Deserialize)] +struct Exit { + code: i32, +} + +fn op_exit( + _state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: Exit = serde_json::from_value(args)?; + std::process::exit(args.code) +} + +fn op_loadavg( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.loadavg"); + state.borrow::<Permissions>().check_env()?; + match sys_info::loadavg() { + Ok(loadavg) => Ok(json!([loadavg.one, loadavg.five, loadavg.fifteen])), + Err(_) => Ok(json!([0f64, 0f64, 0f64])), + } +} + +fn op_hostname( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.hostname"); + state.borrow::<Permissions>().check_env()?; + let hostname = sys_info::hostname().unwrap_or_else(|_| "".to_string()); + Ok(json!(hostname)) +} + +fn op_os_release( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.osRelease"); + state.borrow::<Permissions>().check_env()?; + let release = sys_info::os_release().unwrap_or_else(|_| "".to_string()); + Ok(json!(release)) +} + +fn op_system_memory_info( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.systemMemoryInfo"); + state.borrow::<Permissions>().check_env()?; + match sys_info::mem_info() { + Ok(info) => Ok(json!({ + "total": info.total, + "free": info.free, + "available": info.avail, + "buffers": info.buffers, + "cached": info.cached, + "swapTotal": info.swap_total, + "swapFree": info.swap_free + })), + Err(_) => Ok(json!({})), + } +} + +fn op_system_cpu_info( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.systemCpuInfo"); + state.borrow::<Permissions>().check_env()?; + + let cores = sys_info::cpu_num().ok(); + let speed = sys_info::cpu_speed().ok(); + + Ok(json!({ + "cores": cores, + "speed": speed + })) +} diff --git a/runtime/ops/permissions.rs b/runtime/ops/permissions.rs new file mode 100644 index 000000000..7474c0e37 --- /dev/null +++ b/runtime/ops/permissions.rs @@ -0,0 +1,103 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::permissions::Permissions; +use deno_core::error::custom_error; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::path::Path; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_query_permission", op_query_permission); + super::reg_json_sync(rt, "op_revoke_permission", op_revoke_permission); + super::reg_json_sync(rt, "op_request_permission", op_request_permission); +} + +#[derive(Deserialize)] +struct PermissionArgs { + name: String, + url: Option<String>, + path: Option<String>, +} + +pub fn op_query_permission( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: PermissionArgs = serde_json::from_value(args)?; + let permissions = state.borrow::<Permissions>(); + let path = args.path.as_deref(); + let perm = match args.name.as_ref() { + "read" => permissions.query_read(&path.as_deref().map(Path::new)), + "write" => permissions.query_write(&path.as_deref().map(Path::new)), + "net" => permissions.query_net_url(&args.url.as_deref())?, + "env" => permissions.query_env(), + "run" => permissions.query_run(), + "plugin" => permissions.query_plugin(), + "hrtime" => permissions.query_hrtime(), + n => { + return Err(custom_error( + "ReferenceError", + format!("No such permission name: {}", n), + )) + } + }; + Ok(json!({ "state": perm.to_string() })) +} + +pub fn op_revoke_permission( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: PermissionArgs = serde_json::from_value(args)?; + let permissions = state.borrow_mut::<Permissions>(); + let path = args.path.as_deref(); + let perm = match args.name.as_ref() { + "read" => permissions.revoke_read(&path.as_deref().map(Path::new)), + "write" => permissions.revoke_write(&path.as_deref().map(Path::new)), + "net" => permissions.revoke_net(&args.url.as_deref())?, + "env" => permissions.revoke_env(), + "run" => permissions.revoke_run(), + "plugin" => permissions.revoke_plugin(), + "hrtime" => permissions.revoke_hrtime(), + n => { + return Err(custom_error( + "ReferenceError", + format!("No such permission name: {}", n), + )) + } + }; + Ok(json!({ "state": perm.to_string() })) +} + +pub fn op_request_permission( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: PermissionArgs = serde_json::from_value(args)?; + let permissions = state.borrow_mut::<Permissions>(); + let path = args.path.as_deref(); + let perm = match args.name.as_ref() { + "read" => permissions.request_read(&path.as_deref().map(Path::new)), + "write" => permissions.request_write(&path.as_deref().map(Path::new)), + "net" => permissions.request_net(&args.url.as_deref())?, + "env" => permissions.request_env(), + "run" => permissions.request_run(), + "plugin" => permissions.request_plugin(), + "hrtime" => permissions.request_hrtime(), + n => { + return Err(custom_error( + "ReferenceError", + format!("No such permission name: {}", n), + )) + } + }; + Ok(json!({ "state": perm.to_string() })) +} diff --git a/runtime/ops/plugin.rs b/runtime/ops/plugin.rs new file mode 100644 index 000000000..1f3669b6f --- /dev/null +++ b/runtime/ops/plugin.rs @@ -0,0 +1,156 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::metrics::metrics_op; +use crate::permissions::Permissions; +use deno_core::error::AnyError; +use deno_core::futures::prelude::*; +use deno_core::plugin_api; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::JsRuntime; +use deno_core::Op; +use deno_core::OpAsyncFuture; +use deno_core::OpId; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use dlopen::symbor::Library; +use serde::Deserialize; +use std::cell::RefCell; +use std::path::PathBuf; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; + +pub fn init(rt: &mut JsRuntime) { + super::reg_json_sync(rt, "op_open_plugin", op_open_plugin); +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct OpenPluginArgs { + filename: String, +} + +pub fn op_open_plugin( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: OpenPluginArgs = serde_json::from_value(args)?; + let filename = PathBuf::from(&args.filename); + + super::check_unstable(state, "Deno.openPlugin"); + let permissions = state.borrow::<Permissions>(); + permissions.check_plugin(&filename)?; + + debug!("Loading Plugin: {:#?}", filename); + let plugin_lib = Library::open(filename).map(Rc::new)?; + let plugin_resource = PluginResource::new(&plugin_lib); + + let rid; + let deno_plugin_init; + { + rid = state + .resource_table + .add("plugin", Box::new(plugin_resource)); + deno_plugin_init = *unsafe { + state + .resource_table + .get::<PluginResource>(rid) + .unwrap() + .lib + .symbol::<plugin_api::InitFn>("deno_plugin_init") + .unwrap() + }; + } + + let mut interface = PluginInterface::new(state, &plugin_lib); + deno_plugin_init(&mut interface); + + Ok(json!(rid)) +} + +struct PluginResource { + lib: Rc<Library>, +} + +impl PluginResource { + fn new(lib: &Rc<Library>) -> Self { + Self { lib: lib.clone() } + } +} + +struct PluginInterface<'a> { + state: &'a mut OpState, + plugin_lib: &'a Rc<Library>, +} + +impl<'a> PluginInterface<'a> { + fn new(state: &'a mut OpState, plugin_lib: &'a Rc<Library>) -> Self { + Self { state, plugin_lib } + } +} + +impl<'a> plugin_api::Interface for PluginInterface<'a> { + /// Does the same as `core::Isolate::register_op()`, but additionally makes + /// the registered op dispatcher, as well as the op futures created by it, + /// keep reference to the plugin `Library` object, so that the plugin doesn't + /// get unloaded before all its op registrations and the futures created by + /// them are dropped. + fn register_op( + &mut self, + name: &str, + dispatch_op_fn: plugin_api::DispatchOpFn, + ) -> OpId { + let plugin_lib = self.plugin_lib.clone(); + let plugin_op_fn = move |state_rc: Rc<RefCell<OpState>>, + mut zero_copy: BufVec| { + let mut state = state_rc.borrow_mut(); + let mut interface = PluginInterface::new(&mut state, &plugin_lib); + let op = dispatch_op_fn(&mut interface, &mut zero_copy); + match op { + sync_op @ Op::Sync(..) => sync_op, + Op::Async(fut) => Op::Async(PluginOpAsyncFuture::new(&plugin_lib, fut)), + Op::AsyncUnref(fut) => { + Op::AsyncUnref(PluginOpAsyncFuture::new(&plugin_lib, fut)) + } + _ => unreachable!(), + } + }; + self + .state + .op_table + .register_op(name, metrics_op(Box::new(plugin_op_fn))) + } +} + +struct PluginOpAsyncFuture { + fut: Option<OpAsyncFuture>, + _plugin_lib: Rc<Library>, +} + +impl PluginOpAsyncFuture { + fn new(plugin_lib: &Rc<Library>, fut: OpAsyncFuture) -> Pin<Box<Self>> { + let wrapped_fut = Self { + fut: Some(fut), + _plugin_lib: plugin_lib.clone(), + }; + Box::pin(wrapped_fut) + } +} + +impl Future for PluginOpAsyncFuture { + type Output = <OpAsyncFuture as Future>::Output; + fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { + self.fut.as_mut().unwrap().poll_unpin(ctx) + } +} + +impl Drop for PluginOpAsyncFuture { + fn drop(&mut self) { + self.fut.take(); + } +} diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs new file mode 100644 index 000000000..67b3d0761 --- /dev/null +++ b/runtime/ops/process.rs @@ -0,0 +1,290 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use super::io::{std_file_resource, StreamResource, StreamResourceHolder}; +use crate::permissions::Permissions; +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::future::FutureExt; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::cell::RefCell; +use std::rc::Rc; +use tokio::process::Command; + +#[cfg(unix)] +use std::os::unix::process::ExitStatusExt; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_run", op_run); + super::reg_json_async(rt, "op_run_status", op_run_status); + super::reg_json_sync(rt, "op_kill", op_kill); +} + +fn clone_file( + state: &mut OpState, + rid: u32, +) -> Result<std::fs::File, AnyError> { + std_file_resource(state, rid, move |r| match r { + Ok(std_file) => std_file.try_clone().map_err(AnyError::from), + Err(_) => Err(bad_resource_id()), + }) +} + +fn subprocess_stdio_map(s: &str) -> Result<std::process::Stdio, AnyError> { + match s { + "inherit" => Ok(std::process::Stdio::inherit()), + "piped" => Ok(std::process::Stdio::piped()), + "null" => Ok(std::process::Stdio::null()), + _ => Err(type_error("Invalid resource for stdio")), + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RunArgs { + cmd: Vec<String>, + cwd: Option<String>, + env: Vec<(String, String)>, + stdin: String, + stdout: String, + stderr: String, + stdin_rid: u32, + stdout_rid: u32, + stderr_rid: u32, +} + +struct ChildResource { + child: tokio::process::Child, +} + +fn op_run( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let run_args: RunArgs = serde_json::from_value(args)?; + state.borrow::<Permissions>().check_run()?; + + let args = run_args.cmd; + let env = run_args.env; + let cwd = run_args.cwd; + + let mut c = Command::new(args.get(0).unwrap()); + (1..args.len()).for_each(|i| { + let arg = args.get(i).unwrap(); + c.arg(arg); + }); + cwd.map(|d| c.current_dir(d)); + for (key, value) in &env { + c.env(key, value); + } + + // TODO: make this work with other resources, eg. sockets + if !run_args.stdin.is_empty() { + c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())?); + } else { + let file = clone_file(state, run_args.stdin_rid)?; + c.stdin(file); + } + + if !run_args.stdout.is_empty() { + c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())?); + } else { + let file = clone_file(state, run_args.stdout_rid)?; + c.stdout(file); + } + + if !run_args.stderr.is_empty() { + c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())?); + } else { + let file = clone_file(state, run_args.stderr_rid)?; + c.stderr(file); + } + + // We want to kill child when it's closed + c.kill_on_drop(true); + + // Spawn the command. + let mut child = c.spawn()?; + let pid = child.id(); + + let stdin_rid = match child.stdin.take() { + Some(child_stdin) => { + let rid = state.resource_table.add( + "childStdin", + Box::new(StreamResourceHolder::new(StreamResource::ChildStdin( + child_stdin, + ))), + ); + Some(rid) + } + None => None, + }; + + let stdout_rid = match child.stdout.take() { + Some(child_stdout) => { + let rid = state.resource_table.add( + "childStdout", + Box::new(StreamResourceHolder::new(StreamResource::ChildStdout( + child_stdout, + ))), + ); + Some(rid) + } + None => None, + }; + + let stderr_rid = match child.stderr.take() { + Some(child_stderr) => { + let rid = state.resource_table.add( + "childStderr", + Box::new(StreamResourceHolder::new(StreamResource::ChildStderr( + child_stderr, + ))), + ); + Some(rid) + } + None => None, + }; + + let child_resource = ChildResource { child }; + let child_rid = state.resource_table.add("child", Box::new(child_resource)); + + Ok(json!({ + "rid": child_rid, + "pid": pid, + "stdinRid": stdin_rid, + "stdoutRid": stdout_rid, + "stderrRid": stderr_rid, + })) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct RunStatusArgs { + rid: i32, +} + +async fn op_run_status( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: RunStatusArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + + { + let s = state.borrow(); + s.borrow::<Permissions>().check_run()?; + } + + let run_status = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let child_resource = state + .resource_table + .get_mut::<ChildResource>(rid) + .ok_or_else(bad_resource_id)?; + let child = &mut child_resource.child; + child.poll_unpin(cx).map_err(AnyError::from) + }) + .await?; + + let code = run_status.code(); + + #[cfg(unix)] + let signal = run_status.signal(); + #[cfg(not(unix))] + let signal = None; + + code + .or(signal) + .expect("Should have either an exit code or a signal."); + let got_signal = signal.is_some(); + + Ok(json!({ + "gotSignal": got_signal, + "exitCode": code.unwrap_or(-1), + "exitSignal": signal.unwrap_or(-1), + })) +} + +#[cfg(not(unix))] +const SIGINT: i32 = 2; +#[cfg(not(unix))] +const SIGKILL: i32 = 9; +#[cfg(not(unix))] +const SIGTERM: i32 = 15; + +#[cfg(not(unix))] +use winapi::{ + shared::minwindef::DWORD, + um::{ + handleapi::CloseHandle, + processthreadsapi::{OpenProcess, TerminateProcess}, + winnt::PROCESS_TERMINATE, + }, +}; + +#[cfg(unix)] +pub fn kill(pid: i32, signo: i32) -> Result<(), AnyError> { + use nix::sys::signal::{kill as unix_kill, Signal}; + use nix::unistd::Pid; + use std::convert::TryFrom; + let sig = Signal::try_from(signo)?; + unix_kill(Pid::from_raw(pid), Option::Some(sig)).map_err(AnyError::from) +} + +#[cfg(not(unix))] +pub fn kill(pid: i32, signal: i32) -> Result<(), AnyError> { + use std::io::Error; + match signal { + SIGINT | SIGKILL | SIGTERM => { + if pid <= 0 { + return Err(type_error("unsupported pid")); + } + unsafe { + let handle = OpenProcess(PROCESS_TERMINATE, 0, pid as DWORD); + if handle.is_null() { + return Err(Error::last_os_error().into()); + } + if TerminateProcess(handle, 1) == 0 { + CloseHandle(handle); + return Err(Error::last_os_error().into()); + } + if CloseHandle(handle) == 0 { + return Err(Error::last_os_error().into()); + } + } + } + _ => { + return Err(type_error("unsupported signal")); + } + } + Ok(()) +} + +#[derive(Deserialize)] +struct KillArgs { + pid: i32, + signo: i32, +} + +fn op_kill( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.kill"); + state.borrow::<Permissions>().check_run()?; + + let args: KillArgs = serde_json::from_value(args)?; + kill(args.pid, args.signo)?; + Ok(json!({})) +} diff --git a/runtime/ops/runtime.rs b/runtime/ops/runtime.rs new file mode 100644 index 000000000..cb3b53d53 --- /dev/null +++ b/runtime/ops/runtime.rs @@ -0,0 +1,118 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::metrics::Metrics; +use crate::permissions::Permissions; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::ModuleSpecifier; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; + +pub fn init(rt: &mut deno_core::JsRuntime, main_module: ModuleSpecifier) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::<ModuleSpecifier>(main_module); + } + super::reg_json_sync(rt, "op_main_module", op_main_module); + super::reg_json_sync(rt, "op_metrics", op_metrics); +} + +fn op_main_module( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let main = state.borrow::<ModuleSpecifier>().to_string(); + let main_url = ModuleSpecifier::resolve_url_or_path(&main)?; + if main_url.as_url().scheme() == "file" { + let main_path = std::env::current_dir().unwrap().join(main_url.to_string()); + state + .borrow::<Permissions>() + .check_read_blind(&main_path, "main_module")?; + } + Ok(json!(&main)) +} + +fn op_metrics( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let m = state.borrow::<Metrics>(); + + Ok(json!({ + "opsDispatched": m.ops_dispatched, + "opsDispatchedSync": m.ops_dispatched_sync, + "opsDispatchedAsync": m.ops_dispatched_async, + "opsDispatchedAsyncUnref": m.ops_dispatched_async_unref, + "opsCompleted": m.ops_completed, + "opsCompletedSync": m.ops_completed_sync, + "opsCompletedAsync": m.ops_completed_async, + "opsCompletedAsyncUnref": m.ops_completed_async_unref, + "bytesSentControl": m.bytes_sent_control, + "bytesSentData": m.bytes_sent_data, + "bytesReceived": m.bytes_received + })) +} + +pub fn ppid() -> Value { + #[cfg(windows)] + { + // Adopted from rustup: + // https://github.com/rust-lang/rustup/blob/1.21.1/src/cli/self_update.rs#L1036 + // Copyright Diggory Blake, the Mozilla Corporation, and rustup contributors. + // Licensed under either of + // - Apache License, Version 2.0 + // - MIT license + use std::mem; + use winapi::shared::minwindef::DWORD; + use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE}; + use winapi::um::processthreadsapi::GetCurrentProcessId; + use winapi::um::tlhelp32::{ + CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, + TH32CS_SNAPPROCESS, + }; + unsafe { + // Take a snapshot of system processes, one of which is ours + // and contains our parent's pid + let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + if snapshot == INVALID_HANDLE_VALUE { + return serde_json::to_value(-1).unwrap(); + } + + let mut entry: PROCESSENTRY32 = mem::zeroed(); + entry.dwSize = mem::size_of::<PROCESSENTRY32>() as DWORD; + + // Iterate over system processes looking for ours + let success = Process32First(snapshot, &mut entry); + if success == 0 { + CloseHandle(snapshot); + return serde_json::to_value(-1).unwrap(); + } + + let this_pid = GetCurrentProcessId(); + while entry.th32ProcessID != this_pid { + let success = Process32Next(snapshot, &mut entry); + if success == 0 { + CloseHandle(snapshot); + return serde_json::to_value(-1).unwrap(); + } + } + CloseHandle(snapshot); + + // FIXME: Using the process ID exposes a race condition + // wherein the parent process already exited and the OS + // reassigned its ID. + let parent_id = entry.th32ParentProcessID; + serde_json::to_value(parent_id).unwrap() + } + } + #[cfg(not(windows))] + { + use std::os::unix::process::parent_id; + serde_json::to_value(parent_id()).unwrap() + } +} diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs new file mode 100644 index 000000000..be6bc0a35 --- /dev/null +++ b/runtime/ops/signal.rs @@ -0,0 +1,142 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use deno_core::error::AnyError; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use std::cell::RefCell; +use std::rc::Rc; + +#[cfg(unix)] +use deno_core::error::bad_resource_id; +#[cfg(unix)] +use deno_core::futures::future::poll_fn; +#[cfg(unix)] +use deno_core::serde_json; +#[cfg(unix)] +use deno_core::serde_json::json; +#[cfg(unix)] +use serde::Deserialize; +#[cfg(unix)] +use std::task::Waker; +#[cfg(unix)] +use tokio::signal::unix::{signal, Signal, SignalKind}; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_signal_bind", op_signal_bind); + super::reg_json_sync(rt, "op_signal_unbind", op_signal_unbind); + super::reg_json_async(rt, "op_signal_poll", op_signal_poll); +} + +#[cfg(unix)] +/// The resource for signal stream. +/// The second element is the waker of polling future. +pub struct SignalStreamResource(pub Signal, pub Option<Waker>); + +#[cfg(unix)] +#[derive(Deserialize)] +struct BindSignalArgs { + signo: i32, +} + +#[cfg(unix)] +#[derive(Deserialize)] +struct SignalArgs { + rid: i32, +} + +#[cfg(unix)] +fn op_signal_bind( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.signal"); + let args: BindSignalArgs = serde_json::from_value(args)?; + let rid = state.resource_table.add( + "signal", + Box::new(SignalStreamResource( + signal(SignalKind::from_raw(args.signo)).expect(""), + None, + )), + ); + Ok(json!({ + "rid": rid, + })) +} + +#[cfg(unix)] +async fn op_signal_poll( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + super::check_unstable2(&state, "Deno.signal"); + let args: SignalArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + + let future = poll_fn(move |cx| { + let mut state = state.borrow_mut(); + if let Some(mut signal) = + state.resource_table.get_mut::<SignalStreamResource>(rid) + { + signal.1 = Some(cx.waker().clone()); + return signal.0.poll_recv(cx); + } + std::task::Poll::Ready(None) + }); + let result = future.await; + Ok(json!({ "done": result.is_none() })) +} + +#[cfg(unix)] +pub fn op_signal_unbind( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.signal"); + let args: SignalArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let resource = state.resource_table.get_mut::<SignalStreamResource>(rid); + if let Some(signal) = resource { + if let Some(waker) = &signal.1 { + // Wakes up the pending poll if exists. + // This prevents the poll future from getting stuck forever. + waker.clone().wake(); + } + } + state + .resource_table + .close(rid) + .ok_or_else(bad_resource_id)?; + Ok(json!({})) +} + +#[cfg(not(unix))] +pub fn op_signal_bind( + _state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + unimplemented!(); +} + +#[cfg(not(unix))] +fn op_signal_unbind( + _state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + unimplemented!(); +} + +#[cfg(not(unix))] +async fn op_signal_poll( + _state: Rc<RefCell<OpState>>, + _args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + unimplemented!(); +} diff --git a/runtime/ops/timers.rs b/runtime/ops/timers.rs new file mode 100644 index 000000000..8037fd698 --- /dev/null +++ b/runtime/ops/timers.rs @@ -0,0 +1,193 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +//! This module helps deno implement timers. +//! +//! As an optimization, we want to avoid an expensive calls into rust for every +//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is +//! implemented that calls into Rust for only the smallest timeout. Thus we +//! only need to be able to start, cancel and await a single timer (or Delay, as Tokio +//! calls it) for an entire Isolate. This is what is implemented here. + +use super::dispatch_minimal::minimal_op; +use super::dispatch_minimal::MinimalOp; +use crate::metrics::metrics_op; +use crate::permissions::Permissions; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::channel::oneshot; +use deno_core::futures::FutureExt; +use deno_core::futures::TryFutureExt; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::cell::RefCell; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::thread::sleep; +use std::time::Duration; +use std::time::Instant; + +pub type StartTime = Instant; + +type TimerFuture = Pin<Box<dyn Future<Output = Result<(), ()>>>>; + +#[derive(Default)] +pub struct GlobalTimer { + tx: Option<oneshot::Sender<()>>, + pub future: Option<TimerFuture>, +} + +impl GlobalTimer { + pub fn cancel(&mut self) { + if let Some(tx) = self.tx.take() { + tx.send(()).ok(); + } + } + + pub fn new_timeout(&mut self, deadline: Instant) { + if self.tx.is_some() { + self.cancel(); + } + assert!(self.tx.is_none()); + self.future.take(); + + let (tx, rx) = oneshot::channel(); + self.tx = Some(tx); + + let delay = tokio::time::delay_until(deadline.into()); + let rx = rx + .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err)); + + let fut = futures::future::select(delay, rx) + .then(|_| futures::future::ok(())) + .boxed_local(); + self.future = Some(fut); + } +} + +pub fn init(rt: &mut deno_core::JsRuntime) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::<GlobalTimer>(GlobalTimer::default()); + state.put::<StartTime>(StartTime::now()); + } + super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop); + super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start); + super::reg_json_async(rt, "op_global_timer", op_global_timer); + rt.register_op("op_now", metrics_op(minimal_op(op_now))); + super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync); +} + +fn op_global_timer_stop( + state: &mut OpState, + _args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let global_timer = state.borrow_mut::<GlobalTimer>(); + global_timer.cancel(); + Ok(json!({})) +} + +#[derive(Deserialize)] +struct GlobalTimerArgs { + timeout: u64, +} + +// Set up a timer that will be later awaited by JS promise. +// It's a separate op, because canceling a timeout immediately +// after setting it caused a race condition (because Tokio timeout) +// might have been registered after next event loop tick. +// +// See https://github.com/denoland/deno/issues/7599 for more +// details. +fn op_global_timer_start( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: GlobalTimerArgs = serde_json::from_value(args)?; + let val = args.timeout; + + let deadline = Instant::now() + Duration::from_millis(val); + let global_timer = state.borrow_mut::<GlobalTimer>(); + global_timer.new_timeout(deadline); + Ok(json!({})) +} + +async fn op_global_timer( + state: Rc<RefCell<OpState>>, + _args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let maybe_timer_fut = { + let mut s = state.borrow_mut(); + let global_timer = s.borrow_mut::<GlobalTimer>(); + global_timer.future.take() + }; + if let Some(timer_fut) = maybe_timer_fut { + let _ = timer_fut.await; + } + Ok(json!({})) +} + +// Returns a milliseconds and nanoseconds subsec +// since the start time of the deno runtime. +// If the High precision flag is not set, the +// nanoseconds are rounded on 2ms. +fn op_now( + state: Rc<RefCell<OpState>>, + // Arguments are discarded + _sync: bool, + _x: i32, + mut zero_copy: BufVec, +) -> MinimalOp { + match zero_copy.len() { + 0 => return MinimalOp::Sync(Err(type_error("no buffer specified"))), + 1 => {} + _ => { + return MinimalOp::Sync(Err(type_error("Invalid number of arguments"))) + } + } + + let op_state = state.borrow(); + let start_time = op_state.borrow::<StartTime>(); + let seconds = start_time.elapsed().as_secs(); + let mut subsec_nanos = start_time.elapsed().subsec_nanos() as f64; + let reduced_time_precision = 2_000_000.0; // 2ms in nanoseconds + + // If the permission is not enabled + // Round the nano result on 2 milliseconds + // see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision + if op_state.borrow::<Permissions>().check_hrtime().is_err() { + subsec_nanos -= subsec_nanos % reduced_time_precision; + } + + let result = (seconds * 1_000) as f64 + (subsec_nanos / 1_000_000.0); + + (&mut zero_copy[0]).copy_from_slice(&result.to_be_bytes()); + + MinimalOp::Sync(Ok(0)) +} + +#[derive(Deserialize)] +struct SleepArgs { + millis: u64, +} + +fn op_sleep_sync( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.sleepSync"); + let args: SleepArgs = serde_json::from_value(args)?; + sleep(Duration::from_millis(args.millis)); + Ok(json!({})) +} diff --git a/runtime/ops/tls.rs b/runtime/ops/tls.rs new file mode 100644 index 000000000..37fd8f206 --- /dev/null +++ b/runtime/ops/tls.rs @@ -0,0 +1,431 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use super::io::{StreamResource, StreamResourceHolder}; +use crate::permissions::Permissions; +use crate::resolve_addr::resolve_addr; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::custom_error; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::future::poll_fn; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::cell::RefCell; +use std::convert::From; +use std::fs::File; +use std::io::BufReader; +use std::net::SocketAddr; +use std::path::Path; +use std::rc::Rc; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use tokio::net::TcpListener; +use tokio::net::TcpStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use tokio_rustls::{ + rustls::{ + internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys}, + Certificate, NoClientAuth, PrivateKey, ServerConfig, + }, + TlsAcceptor, +}; +use webpki::DNSNameRef; + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_async(rt, "op_start_tls", op_start_tls); + super::reg_json_async(rt, "op_connect_tls", op_connect_tls); + super::reg_json_sync(rt, "op_listen_tls", op_listen_tls); + super::reg_json_async(rt, "op_accept_tls", op_accept_tls); +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ConnectTLSArgs { + transport: String, + hostname: String, + port: u16, + cert_file: Option<String>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct StartTLSArgs { + rid: u32, + cert_file: Option<String>, + hostname: String, +} + +async fn op_start_tls( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: StartTLSArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let cert_file = args.cert_file.clone(); + + let mut domain = args.hostname; + if domain.is_empty() { + domain.push_str("localhost"); + } + { + super::check_unstable2(&state, "Deno.startTls"); + let s = state.borrow(); + let permissions = s.borrow::<Permissions>(); + permissions.check_net(&domain, 0)?; + if let Some(path) = cert_file.clone() { + permissions.check_read(Path::new(&path))?; + } + } + let mut resource_holder = { + let mut state_ = state.borrow_mut(); + match state_.resource_table.remove::<StreamResourceHolder>(rid) { + Some(resource) => *resource, + None => return Err(bad_resource_id()), + } + }; + + if let StreamResource::TcpStream(ref mut tcp_stream) = + resource_holder.resource + { + let tcp_stream = tcp_stream.take().unwrap(); + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); + } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; + + let rid = { + let mut state_ = state.borrow_mut(); + state_.resource_table.add( + "clientTlsStream", + Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( + Box::new(tls_stream), + ))), + ) + }; + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": "tcp", + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": "tcp", + } + })) + } else { + Err(bad_resource_id()) + } +} + +async fn op_connect_tls( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: ConnectTLSArgs = serde_json::from_value(args)?; + let cert_file = args.cert_file.clone(); + { + let s = state.borrow(); + let permissions = s.borrow::<Permissions>(); + permissions.check_net(&args.hostname, args.port)?; + if let Some(path) = cert_file.clone() { + permissions.check_read(Path::new(&path))?; + } + } + let mut domain = args.hostname.clone(); + if domain.is_empty() { + domain.push_str("localhost"); + } + + let addr = resolve_addr(&args.hostname, args.port)?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); + } + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; + let rid = { + let mut state_ = state.borrow_mut(); + state_.resource_table.add( + "clientTlsStream", + Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( + Box::new(tls_stream), + ))), + ) + }; + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": args.transport, + }, + "remoteAddr": { + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port(), + "transport": args.transport, + } + })) +} + +fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> { + let cert_file = File::open(path)?; + let reader = &mut BufReader::new(cert_file); + + let certs = certs(reader) + .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; + + if certs.is_empty() { + let e = custom_error("InvalidData", "No certificates found in cert file"); + return Err(e); + } + + Ok(certs) +} + +fn key_decode_err() -> AnyError { + custom_error("InvalidData", "Unable to decode key") +} + +fn key_not_found_err() -> AnyError { + custom_error("InvalidData", "No keys found in key file") +} + +/// Starts with -----BEGIN RSA PRIVATE KEY----- +fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?; + Ok(keys) +} + +/// Starts with -----BEGIN PRIVATE KEY----- +fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?; + Ok(keys) +} + +fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { + let path = path.to_string(); + let mut keys = load_rsa_keys(&path)?; + + if keys.is_empty() { + keys = load_pkcs8_keys(&path)?; + } + + if keys.is_empty() { + return Err(key_not_found_err()); + } + + Ok(keys) +} + +#[allow(dead_code)] +pub struct TlsListenerResource { + listener: TcpListener, + tls_acceptor: TlsAcceptor, + waker: Option<futures::task::AtomicWaker>, + local_addr: SocketAddr, +} + +impl Drop for TlsListenerResource { + fn drop(&mut self) { + self.wake_task(); + } +} + +impl TlsListenerResource { + /// Track the current task so future awaiting for connection + /// can be notified when listener is closed. + /// + /// Throws an error if another task is already tracked. + pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { + // Currently, we only allow tracking a single accept task for a listener. + // This might be changed in the future with multiple workers. + // Caveat: TcpListener by itself also only tracks an accept task at a time. + // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 + if self.waker.is_some() { + return Err(custom_error("Busy", "Another accept task is ongoing")); + } + + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + self.waker.replace(waker); + Ok(()) + } + + /// Notifies a task when listener is closed so accept future can resolve. + pub fn wake_task(&mut self) { + if let Some(waker) = self.waker.as_ref() { + waker.wake(); + } + } + + /// Stop tracking a task. + /// Happens when the task is done and thus no further tracking is needed. + pub fn untrack_task(&mut self) { + self.waker.take(); + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct ListenTlsArgs { + transport: String, + hostname: String, + port: u16, + cert_file: String, + key_file: String, +} + +fn op_listen_tls( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: ListenTlsArgs = serde_json::from_value(args)?; + assert_eq!(args.transport, "tcp"); + + let cert_file = args.cert_file; + let key_file = args.key_file; + { + let permissions = state.borrow::<Permissions>(); + permissions.check_net(&args.hostname, args.port)?; + permissions.check_read(Path::new(&cert_file))?; + permissions.check_read(Path::new(&key_file))?; + } + let mut config = ServerConfig::new(NoClientAuth::new()); + config + .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) + .expect("invalid key or certificate"); + let tls_acceptor = TlsAcceptor::from(Arc::new(config)); + let addr = resolve_addr(&args.hostname, args.port)?; + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let local_addr = listener.local_addr()?; + let tls_listener_resource = TlsListenerResource { + listener, + tls_acceptor, + waker: None, + local_addr, + }; + + let rid = state + .resource_table + .add("tlsListener", Box::new(tls_listener_resource)); + + Ok(json!({ + "rid": rid, + "localAddr": { + "hostname": local_addr.ip().to_string(), + "port": local_addr.port(), + "transport": args.transport, + }, + })) +} + +#[derive(Deserialize)] +struct AcceptTlsArgs { + rid: i32, +} + +async fn op_accept_tls( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: AcceptTlsArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let accept_fut = poll_fn(|cx| { + let mut state = state.borrow_mut(); + let listener_resource = state + .resource_table + .get_mut::<TlsListenerResource>(rid) + .ok_or_else(|| bad_resource("Listener has been closed"))?; + let listener = &mut listener_resource.listener; + match listener.poll_accept(cx).map_err(AnyError::from) { + Poll::Ready(Ok((stream, addr))) => { + listener_resource.untrack_task(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending + } + Poll::Ready(Err(e)) => { + listener_resource.untrack_task(); + Poll::Ready(Err(e)) + } + } + }); + let (tcp_stream, _socket_addr) = accept_fut.await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let tls_acceptor = { + let state_ = state.borrow(); + let resource = state_ + .resource_table + .get::<TlsListenerResource>(rid) + .ok_or_else(bad_resource_id) + .expect("Can't find tls listener"); + resource.tls_acceptor.clone() + }; + let tls_stream = tls_acceptor.accept(tcp_stream).await?; + let rid = { + let mut state_ = state.borrow_mut(); + state_.resource_table.add( + "serverTlsStream", + Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream( + Box::new(tls_stream), + ))), + ) + }; + Ok(json!({ + "rid": rid, + "localAddr": { + "transport": "tcp", + "hostname": local_addr.ip().to_string(), + "port": local_addr.port() + }, + "remoteAddr": { + "transport": "tcp", + "hostname": remote_addr.ip().to_string(), + "port": remote_addr.port() + } + })) +} diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs new file mode 100644 index 000000000..ad66bcf1a --- /dev/null +++ b/runtime/ops/tty.rs @@ -0,0 +1,334 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use super::io::std_file_resource; +use super::io::StreamResource; +use super::io::StreamResourceHolder; +use deno_core::error::bad_resource_id; +use deno_core::error::not_supported; +use deno_core::error::resource_unavailable; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use serde::Serialize; +use std::io::Error; + +#[cfg(unix)] +use nix::sys::termios; + +#[cfg(windows)] +use deno_core::error::custom_error; +#[cfg(windows)] +use winapi::shared::minwindef::DWORD; +#[cfg(windows)] +use winapi::um::wincon; +#[cfg(windows)] +const RAW_MODE_MASK: DWORD = wincon::ENABLE_LINE_INPUT + | wincon::ENABLE_ECHO_INPUT + | wincon::ENABLE_PROCESSED_INPUT; + +#[cfg(windows)] +fn get_windows_handle( + f: &std::fs::File, +) -> Result<std::os::windows::io::RawHandle, AnyError> { + use std::os::windows::io::AsRawHandle; + use winapi::um::handleapi; + + let handle = f.as_raw_handle(); + if handle == handleapi::INVALID_HANDLE_VALUE { + return Err(Error::last_os_error().into()); + } else if handle.is_null() { + return Err(custom_error("ReferenceError", "null handle")); + } + Ok(handle) +} + +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_set_raw", op_set_raw); + super::reg_json_sync(rt, "op_isatty", op_isatty); + super::reg_json_sync(rt, "op_console_size", op_console_size); +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SetRawOptions { + cbreak: bool, +} + +#[derive(Deserialize)] +struct SetRawArgs { + rid: u32, + mode: bool, + options: SetRawOptions, +} + +fn op_set_raw( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.setRaw"); + + let args: SetRawArgs = serde_json::from_value(args)?; + let rid = args.rid; + let is_raw = args.mode; + let cbreak = args.options.cbreak; + + // From https://github.com/kkawakam/rustyline/blob/master/src/tty/windows.rs + // and https://github.com/kkawakam/rustyline/blob/master/src/tty/unix.rs + // and https://github.com/crossterm-rs/crossterm/blob/e35d4d2c1cc4c919e36d242e014af75f6127ab50/src/terminal/sys/windows.rs + // Copyright (c) 2015 Katsu Kawakami & Rustyline authors. MIT license. + // Copyright (c) 2019 Timon. MIT license. + #[cfg(windows)] + { + use std::os::windows::io::AsRawHandle; + use winapi::shared::minwindef::FALSE; + use winapi::um::{consoleapi, handleapi}; + + let resource_holder = + state.resource_table.get_mut::<StreamResourceHolder>(rid); + if resource_holder.is_none() { + return Err(bad_resource_id()); + } + if cbreak { + return Err(not_supported()); + } + let resource_holder = resource_holder.unwrap(); + + // For now, only stdin. + let handle = match &mut resource_holder.resource { + StreamResource::FsFile(ref mut option_file_metadata) => { + if let Some((tokio_file, metadata)) = option_file_metadata.take() { + match tokio_file.try_into_std() { + Ok(std_file) => { + let raw_handle = std_file.as_raw_handle(); + // Turn the std_file handle back into a tokio file, put it back + // in the resource table. + let tokio_file = tokio::fs::File::from_std(std_file); + resource_holder.resource = + StreamResource::FsFile(Some((tokio_file, metadata))); + // return the result. + raw_handle + } + Err(tokio_file) => { + // This function will return an error containing the file if + // some operation is in-flight. + resource_holder.resource = + StreamResource::FsFile(Some((tokio_file, metadata))); + return Err(resource_unavailable()); + } + } + } else { + return Err(resource_unavailable()); + } + } + _ => { + return Err(bad_resource_id()); + } + }; + + if handle == handleapi::INVALID_HANDLE_VALUE { + return Err(Error::last_os_error().into()); + } else if handle.is_null() { + return Err(custom_error("ReferenceError", "null handle")); + } + let mut original_mode: DWORD = 0; + if unsafe { consoleapi::GetConsoleMode(handle, &mut original_mode) } + == FALSE + { + return Err(Error::last_os_error().into()); + } + let new_mode = if is_raw { + original_mode & !RAW_MODE_MASK + } else { + original_mode | RAW_MODE_MASK + }; + if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE { + return Err(Error::last_os_error().into()); + } + + Ok(json!({})) + } + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + + let resource_holder = + state.resource_table.get_mut::<StreamResourceHolder>(rid); + if resource_holder.is_none() { + return Err(bad_resource_id()); + } + + if is_raw { + let (raw_fd, maybe_tty_mode) = + match &mut resource_holder.unwrap().resource { + StreamResource::FsFile(Some((f, ref mut metadata))) => { + (f.as_raw_fd(), &mut metadata.tty.mode) + } + StreamResource::FsFile(None) => return Err(resource_unavailable()), + _ => { + return Err(not_supported()); + } + }; + + if maybe_tty_mode.is_none() { + // Save original mode. + let original_mode = termios::tcgetattr(raw_fd)?; + maybe_tty_mode.replace(original_mode); + } + + let mut raw = maybe_tty_mode.clone().unwrap(); + + raw.input_flags &= !(termios::InputFlags::BRKINT + | termios::InputFlags::ICRNL + | termios::InputFlags::INPCK + | termios::InputFlags::ISTRIP + | termios::InputFlags::IXON); + + raw.control_flags |= termios::ControlFlags::CS8; + + raw.local_flags &= !(termios::LocalFlags::ECHO + | termios::LocalFlags::ICANON + | termios::LocalFlags::IEXTEN); + if !cbreak { + raw.local_flags &= !(termios::LocalFlags::ISIG); + } + raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1; + raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0; + termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; + Ok(json!({})) + } else { + // Try restore saved mode. + let (raw_fd, maybe_tty_mode) = + match &mut resource_holder.unwrap().resource { + StreamResource::FsFile(Some((f, ref mut metadata))) => { + (f.as_raw_fd(), &mut metadata.tty.mode) + } + StreamResource::FsFile(None) => { + return Err(resource_unavailable()); + } + _ => { + return Err(bad_resource_id()); + } + }; + + if let Some(mode) = maybe_tty_mode.take() { + termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; + } + + Ok(json!({})) + } + } +} + +#[derive(Deserialize)] +struct IsattyArgs { + rid: u32, +} + +fn op_isatty( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: IsattyArgs = serde_json::from_value(args)?; + let rid = args.rid; + + let isatty: bool = std_file_resource(state, rid as u32, move |r| match r { + Ok(std_file) => { + #[cfg(windows)] + { + use winapi::um::consoleapi; + + let handle = get_windows_handle(&std_file)?; + let mut test_mode: DWORD = 0; + // If I cannot get mode out of console, it is not a console. + Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 }) + } + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + let raw_fd = std_file.as_raw_fd(); + Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 }) + } + } + Err(StreamResource::FsFile(_)) => unreachable!(), + _ => Ok(false), + })?; + Ok(json!(isatty)) +} + +#[derive(Deserialize)] +struct ConsoleSizeArgs { + rid: u32, +} + +#[derive(Serialize)] +struct ConsoleSize { + columns: u32, + rows: u32, +} + +fn op_console_size( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + super::check_unstable(state, "Deno.consoleSize"); + + let args: ConsoleSizeArgs = serde_json::from_value(args)?; + let rid = args.rid; + + let size = std_file_resource(state, rid as u32, move |r| match r { + Ok(std_file) => { + #[cfg(windows)] + { + use std::os::windows::io::AsRawHandle; + let handle = std_file.as_raw_handle(); + + unsafe { + let mut bufinfo: winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO = + std::mem::zeroed(); + + if winapi::um::wincon::GetConsoleScreenBufferInfo( + handle, + &mut bufinfo, + ) == 0 + { + return Err(Error::last_os_error().into()); + } + + Ok(ConsoleSize { + columns: bufinfo.dwSize.X as u32, + rows: bufinfo.dwSize.Y as u32, + }) + } + } + + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + + let fd = std_file.as_raw_fd(); + unsafe { + let mut size: libc::winsize = std::mem::zeroed(); + if libc::ioctl(fd, libc::TIOCGWINSZ, &mut size as *mut _) != 0 { + return Err(Error::last_os_error().into()); + } + + // TODO (caspervonb) return a tuple instead + Ok(ConsoleSize { + columns: size.ws_col as u32, + rows: size.ws_row as u32, + }) + } + } + } + Err(_) => Err(bad_resource_id()), + })?; + + Ok(json!(size)) +} diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs new file mode 100644 index 000000000..d88330a04 --- /dev/null +++ b/runtime/ops/web_worker.rs @@ -0,0 +1,37 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::web_worker::WebWorkerHandle; +use crate::web_worker::WorkerEvent; +use deno_core::futures::channel::mpsc; +use deno_core::serde_json::json; + +pub fn init( + rt: &mut deno_core::JsRuntime, + sender: mpsc::Sender<WorkerEvent>, + handle: WebWorkerHandle, +) { + // Post message to host as guest worker. + let sender_ = sender.clone(); + super::reg_json_sync( + rt, + "op_worker_post_message", + move |_state, _args, bufs| { + assert_eq!(bufs.len(), 1, "Invalid number of arguments"); + let msg_buf: Box<[u8]> = (*bufs[0]).into(); + sender_ + .clone() + .try_send(WorkerEvent::Message(msg_buf)) + .expect("Failed to post message to host"); + Ok(json!({})) + }, + ); + + // Notify host that guest worker closes. + super::reg_json_sync(rt, "op_worker_close", move |_state, _args, _bufs| { + // Notify parent that we're finished + sender.clone().close_channel(); + // Terminate execution of current worker + handle.terminate(); + Ok(json!({})) + }); +} diff --git a/runtime/ops/websocket.rs b/runtime/ops/websocket.rs new file mode 100644 index 000000000..a8c591a33 --- /dev/null +++ b/runtime/ops/websocket.rs @@ -0,0 +1,326 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::permissions::Permissions; +use core::task::Poll; +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::futures::StreamExt; +use deno_core::futures::{ready, SinkExt}; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::url; +use deno_core::BufVec; +use deno_core::OpState; +use deno_core::{serde_json, ZeroCopyBuf}; +use http::{Method, Request, Uri}; +use serde::Deserialize; +use std::borrow::Cow; +use std::cell::RefCell; +use std::fs::File; +use std::io::BufReader; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use tokio_tungstenite::stream::Stream as StreamSwitcher; +use tokio_tungstenite::tungstenite::Error as TungsteniteError; +use tokio_tungstenite::tungstenite::{ + handshake::client::Response, protocol::frame::coding::CloseCode, + protocol::CloseFrame, Message, +}; +use tokio_tungstenite::{client_async, WebSocketStream}; +use webpki::DNSNameRef; + +#[derive(Clone)] +struct WsCaFile(String); +#[derive(Clone)] +struct WsUserAgent(String); + +pub fn init( + rt: &mut deno_core::JsRuntime, + maybe_ca_file: Option<&str>, + user_agent: String, +) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + if let Some(ca_file) = maybe_ca_file { + state.put::<WsCaFile>(WsCaFile(ca_file.to_string())); + } + state.put::<WsUserAgent>(WsUserAgent(user_agent)); + } + super::reg_json_sync(rt, "op_ws_check_permission", op_ws_check_permission); + super::reg_json_async(rt, "op_ws_create", op_ws_create); + super::reg_json_async(rt, "op_ws_send", op_ws_send); + super::reg_json_async(rt, "op_ws_close", op_ws_close); + super::reg_json_async(rt, "op_ws_next_event", op_ws_next_event); +} + +type MaybeTlsStream = + StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; + +type WsStream = WebSocketStream<MaybeTlsStream>; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CheckPermissionArgs { + url: String, +} + +// This op is needed because creating a WS instance in JavaScript is a sync +// operation and should throw error when permissions are not fullfiled, +// but actual op that connects WS is async. +pub fn op_ws_check_permission( + state: &mut OpState, + args: Value, + _zero_copy: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: CheckPermissionArgs = serde_json::from_value(args)?; + + state + .borrow::<Permissions>() + .check_net_url(&url::Url::parse(&args.url)?)?; + + Ok(json!({})) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateArgs { + url: String, + protocols: String, +} + +pub async fn op_ws_create( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: CreateArgs = serde_json::from_value(args)?; + + { + let s = state.borrow(); + s.borrow::<Permissions>() + .check_net_url(&url::Url::parse(&args.url)?) + .expect( + "Permission check should have been done in op_ws_check_permission", + ); + } + + let maybe_ca_file = state.borrow().try_borrow::<WsCaFile>().cloned(); + let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); + let uri: Uri = args.url.parse()?; + let mut request = Request::builder().method(Method::GET).uri(&uri); + + request = request.header("User-Agent", user_agent); + + if !args.protocols.is_empty() { + request = request.header("Sec-WebSocket-Protocol", args.protocols); + } + + let request = request.body(())?; + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{}:{}", domain, port); + let try_socket = TcpStream::connect(addr).await; + let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { + Ok(socket) => socket, + Err(_) => return Ok(json!({"success": false})), + }; + + let socket: MaybeTlsStream = match uri.scheme_str() { + Some("ws") => StreamSwitcher::Plain(tcp_socket), + Some("wss") => { + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + + if let Some(ws_ca_file) = maybe_ca_file { + let key_file = File::open(ws_ca_file.0)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); + } + + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; + StreamSwitcher::Tls(tls_socket) + } + _ => unreachable!(), + }; + + let (stream, response): (WsStream, Response) = + client_async(request, socket).await.map_err(|err| { + type_error(format!( + "failed to connect to WebSocket: {}", + err.to_string() + )) + })?; + + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add("webSocketStream", Box::new(stream)); + + let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + Some(header) => header.to_str().unwrap(), + None => "", + }; + let extensions = response + .headers() + .get_all("Sec-WebSocket-Extensions") + .iter() + .map(|header| header.to_str().unwrap()) + .collect::<String>(); + Ok(json!({ + "success": true, + "rid": rid, + "protocol": protocol, + "extensions": extensions + })) +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SendArgs { + rid: u32, + text: Option<String>, +} + +pub async fn op_ws_send( + state: Rc<RefCell<OpState>>, + args: Value, + bufs: BufVec, +) -> Result<Value, AnyError> { + let args: SendArgs = serde_json::from_value(args)?; + + let mut maybe_msg = Some(match args.text { + Some(text) => Message::Text(text), + None => Message::Binary(bufs[0].to_vec()), + }); + let rid = args.rid; + + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let stream = state + .resource_table + .get_mut::<WsStream>(rid) + .ok_or_else(bad_resource_id)?; + + // TODO(ry) Handle errors below instead of unwrap. + // Need to map `TungsteniteError` to `AnyError`. + ready!(stream.poll_ready_unpin(cx)).unwrap(); + if let Some(msg) = maybe_msg.take() { + stream.start_send_unpin(msg).unwrap(); + } + ready!(stream.poll_flush_unpin(cx)).unwrap(); + + Poll::Ready(Ok(json!({}))) + }) + .await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CloseArgs { + rid: u32, + code: Option<u16>, + reason: Option<String>, +} + +pub async fn op_ws_close( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: CloseArgs = serde_json::from_value(args)?; + let rid = args.rid; + let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { + code: CloseCode::from(c), + reason: match args.reason { + Some(reason) => Cow::from(reason), + None => Default::default(), + }, + }))); + + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let stream = state + .resource_table + .get_mut::<WsStream>(rid) + .ok_or_else(bad_resource_id)?; + + // TODO(ry) Handle errors below instead of unwrap. + // Need to map `TungsteniteError` to `AnyError`. + ready!(stream.poll_ready_unpin(cx)).unwrap(); + if let Some(msg) = maybe_msg.take() { + stream.start_send_unpin(msg).unwrap(); + } + ready!(stream.poll_flush_unpin(cx)).unwrap(); + ready!(stream.poll_close_unpin(cx)).unwrap(); + + Poll::Ready(Ok(json!({}))) + }) + .await +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct NextEventArgs { + rid: u32, +} + +pub async fn op_ws_next_event( + state: Rc<RefCell<OpState>>, + args: Value, + _bufs: BufVec, +) -> Result<Value, AnyError> { + let args: NextEventArgs = serde_json::from_value(args)?; + poll_fn(move |cx| { + let mut state = state.borrow_mut(); + let stream = state + .resource_table + .get_mut::<WsStream>(args.rid) + .ok_or_else(bad_resource_id)?; + stream + .poll_next_unpin(cx) + .map(|val| { + match val { + Some(Ok(Message::Text(text))) => json!({ + "type": "string", + "data": text + }), + Some(Ok(Message::Binary(data))) => { + // TODO(ry): don't use json to send binary data. + json!({ + "type": "binary", + "data": data + }) + } + Some(Ok(Message::Close(Some(frame)))) => json!({ + "type": "close", + "code": u16::from(frame.code), + "reason": frame.reason.as_ref() + }), + Some(Ok(Message::Close(None))) => json!({ "type": "close" }), + Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), + Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), + Some(Err(_)) => json!({"type": "error"}), + None => { + state.resource_table.close(args.rid).unwrap(); + json!({"type": "closed"}) + } + } + }) + .map(Ok) + }) + .await +} diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs new file mode 100644 index 000000000..871e4b9fe --- /dev/null +++ b/runtime/ops/worker_host.rs @@ -0,0 +1,318 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::permissions::Permissions; +use crate::web_worker::run_web_worker; +use crate::web_worker::WebWorker; +use crate::web_worker::WebWorkerHandle; +use crate::web_worker::WorkerEvent; +use deno_core::error::generic_error; +use deno_core::error::AnyError; +use deno_core::error::JsError; +use deno_core::futures::channel::mpsc; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::BufVec; +use deno_core::ModuleSpecifier; +use deno_core::OpState; +use deno_core::ZeroCopyBuf; +use serde::Deserialize; +use std::cell::RefCell; +use std::collections::HashMap; +use std::convert::From; +use std::rc::Rc; +use std::sync::Arc; +use std::thread::JoinHandle; + +pub struct CreateWebWorkerArgs { + pub name: String, + pub worker_id: u32, + pub permissions: Permissions, + pub main_module: ModuleSpecifier, + pub use_deno_namespace: bool, +} + +pub type CreateWebWorkerCb = + dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send; + +/// A holder for callback that is used to create a new +/// WebWorker. It's a struct instead of a type alias +/// because `GothamState` used in `OpState` overrides +/// value if type alises have the same underlying type +#[derive(Clone)] +pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>); + +#[derive(Deserialize)] +struct HostUnhandledErrorArgs { + message: String, +} + +pub fn init( + rt: &mut deno_core::JsRuntime, + sender: Option<mpsc::Sender<WorkerEvent>>, + create_web_worker_cb: Arc<CreateWebWorkerCb>, +) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::<WorkersTable>(WorkersTable::default()); + state.put::<WorkerId>(WorkerId::default()); + + let create_module_loader = CreateWebWorkerCbHolder(create_web_worker_cb); + state.put::<CreateWebWorkerCbHolder>(create_module_loader); + } + super::reg_json_sync(rt, "op_create_worker", op_create_worker); + super::reg_json_sync( + rt, + "op_host_terminate_worker", + op_host_terminate_worker, + ); + super::reg_json_sync(rt, "op_host_post_message", op_host_post_message); + super::reg_json_async(rt, "op_host_get_message", op_host_get_message); + super::reg_json_sync( + rt, + "op_host_unhandled_error", + move |_state, args, _zero_copy| { + if let Some(mut sender) = sender.clone() { + let args: HostUnhandledErrorArgs = serde_json::from_value(args)?; + sender + .try_send(WorkerEvent::Error(generic_error(args.message))) + .expect("Failed to propagate error event to parent worker"); + Ok(json!(true)) + } else { + Err(generic_error("Cannot be called from main worker.")) + } + }, + ); +} + +pub struct WorkerThread { + join_handle: JoinHandle<Result<(), AnyError>>, + worker_handle: WebWorkerHandle, +} + +pub type WorkersTable = HashMap<u32, WorkerThread>; +pub type WorkerId = u32; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CreateWorkerArgs { + name: Option<String>, + specifier: String, + has_source_code: bool, + source_code: String, + use_deno_namespace: bool, +} + +/// Create worker as the host +fn op_create_worker( + state: &mut OpState, + args: Value, + _data: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: CreateWorkerArgs = serde_json::from_value(args)?; + + let specifier = args.specifier.clone(); + let maybe_source_code = if args.has_source_code { + Some(args.source_code.clone()) + } else { + None + }; + let args_name = args.name; + let use_deno_namespace = args.use_deno_namespace; + if use_deno_namespace { + super::check_unstable(state, "Worker.deno"); + } + let permissions = state.borrow::<Permissions>().clone(); + let worker_id = state.take::<WorkerId>(); + let create_module_loader = state.take::<CreateWebWorkerCbHolder>(); + state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone()); + state.put::<WorkerId>(worker_id + 1); + + let module_specifier = ModuleSpecifier::resolve_url(&specifier)?; + let worker_name = args_name.unwrap_or_else(|| "".to_string()); + + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1); + + // Setup new thread + let thread_builder = + std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); + + // Spawn it + let join_handle = thread_builder.spawn(move || { + // Any error inside this block is terminal: + // - JS worker is useless - meaning it throws an exception and can't do anything else, + // all action done upon it should be noops + // - newly spawned thread exits + + let worker = (create_module_loader.0)(CreateWebWorkerArgs { + name: worker_name, + worker_id, + permissions, + main_module: module_specifier.clone(), + use_deno_namespace, + }); + + // Send thread safe handle to newly created worker to host thread + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + + // At this point the only method of communication with host + // is using `worker.internal_channels`. + // + // Host can already push messages and interact with worker. + run_web_worker(worker, module_specifier, maybe_source_code) + })?; + + let worker_handle = handle_receiver.recv().unwrap()?; + + let worker_thread = WorkerThread { + join_handle, + worker_handle, + }; + + // At this point all interactions with worker happen using thread + // safe handler returned from previous function calls + state + .borrow_mut::<WorkersTable>() + .insert(worker_id, worker_thread); + + Ok(json!({ "id": worker_id })) +} + +#[derive(Deserialize)] +struct WorkerArgs { + id: i32, +} + +fn op_host_terminate_worker( + state: &mut OpState, + args: Value, + _data: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let worker_thread = state + .borrow_mut::<WorkersTable>() + .remove(&id) + .expect("No worker handle found"); + worker_thread.worker_handle.terminate(); + worker_thread + .join_handle + .join() + .expect("Panic in worker thread") + .expect("Panic in worker event loop"); + Ok(json!({})) +} + +fn serialize_worker_event(event: WorkerEvent) -> Value { + match event { + WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), + WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() { + Ok(js_error) => json!({ + "type": "terminalError", + "error": { + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + } + }), + Err(error) => json!({ + "type": "terminalError", + "error": { + "message": error.to_string(), + } + }), + }, + WorkerEvent::Error(error) => match error.downcast::<JsError>() { + Ok(js_error) => json!({ + "type": "error", + "error": { + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + } + }), + Err(error) => json!({ + "type": "error", + "error": { + "message": error.to_string(), + } + }), + }, + } +} + +/// Try to remove worker from workers table - NOTE: `Worker.terminate()` +/// might have been called already meaning that we won't find worker in +/// table - in that case ignore. +fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) { + let mut s = state.borrow_mut(); + let workers = s.borrow_mut::<WorkersTable>(); + if let Some(mut worker_thread) = workers.remove(&id) { + worker_thread.worker_handle.sender.close_channel(); + worker_thread + .join_handle + .join() + .expect("Worker thread panicked") + .expect("Panic in worker event loop"); + } +} + +/// Get message from guest worker as host +async fn op_host_get_message( + state: Rc<RefCell<OpState>>, + args: Value, + _zero_copy: BufVec, +) -> Result<Value, AnyError> { + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + + let worker_handle = { + let s = state.borrow(); + let workers_table = s.borrow::<WorkersTable>(); + let maybe_handle = workers_table.get(&id); + if let Some(handle) = maybe_handle { + handle.worker_handle.clone() + } else { + // If handle was not found it means worker has already shutdown + return Ok(json!({ "type": "close" })); + } + }; + + let maybe_event = worker_handle.get_event().await?; + if let Some(event) = maybe_event { + // Terminal error means that worker should be removed from worker table. + if let WorkerEvent::TerminalError(_) = &event { + try_remove_and_close(state, id); + } + return Ok(serialize_worker_event(event)); + } + + // If there was no event from worker it means it has already been closed. + try_remove_and_close(state, id); + Ok(json!({ "type": "close" })) +} + +/// Post message to guest worker as host +fn op_host_post_message( + state: &mut OpState, + args: Value, + data: &mut [ZeroCopyBuf], +) -> Result<Value, AnyError> { + assert_eq!(data.len(), 1, "Invalid number of arguments"); + let args: WorkerArgs = serde_json::from_value(args)?; + let id = args.id as u32; + let msg = Vec::from(&*data[0]).into_boxed_slice(); + + debug!("post message to worker {}", id); + let worker_thread = state + .borrow::<WorkersTable>() + .get(&id) + .expect("No worker handle found"); + worker_thread.worker_handle.post_message(msg)?; + Ok(json!({})) +} |