diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/crypto.rs | 14 | ||||
-rw-r--r-- | cli/ops/dispatch_minimal.rs | 205 | ||||
-rw-r--r-- | cli/ops/fetch.rs | 22 | ||||
-rw-r--r-- | cli/ops/fs.rs | 1702 | ||||
-rw-r--r-- | cli/ops/fs_events.rs | 133 | ||||
-rw-r--r-- | cli/ops/io.rs | 473 | ||||
-rw-r--r-- | cli/ops/mod.rs | 56 | ||||
-rw-r--r-- | cli/ops/net.rs | 566 | ||||
-rw-r--r-- | cli/ops/net_unix.rs | 151 | ||||
-rw-r--r-- | cli/ops/os.rs | 192 | ||||
-rw-r--r-- | cli/ops/permissions.rs | 103 | ||||
-rw-r--r-- | cli/ops/plugin.rs | 156 | ||||
-rw-r--r-- | cli/ops/process.rs | 236 | ||||
-rw-r--r-- | cli/ops/runtime.rs | 118 | ||||
-rw-r--r-- | cli/ops/runtime_compiler.rs | 8 | ||||
-rw-r--r-- | cli/ops/signal.rs | 142 | ||||
-rw-r--r-- | cli/ops/timers.rs | 193 | ||||
-rw-r--r-- | cli/ops/tls.rs | 431 | ||||
-rw-r--r-- | cli/ops/tty.rs | 334 | ||||
-rw-r--r-- | cli/ops/web_worker.rs | 37 | ||||
-rw-r--r-- | cli/ops/websocket.rs | 326 | ||||
-rw-r--r-- | cli/ops/worker_host.rs | 318 |
22 files changed, 5 insertions, 5911 deletions
diff --git a/cli/ops/crypto.rs b/cli/ops/crypto.rs deleted file mode 100644 index a73843a33..000000000 --- a/cli/ops/crypto.rs +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use deno_crypto::op_get_random_values; -use deno_crypto::rand::rngs::StdRng; -use deno_crypto::rand::SeedableRng; - -pub fn init(rt: &mut deno_core::JsRuntime, maybe_seed: Option<u64>) { - if let Some(seed) = maybe_seed { - let rng = StdRng::seed_from_u64(seed); - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - state.put::<StdRng>(rng); - } - super::reg_json_sync(rt, "op_get_random_values", op_get_random_values); -} diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs deleted file mode 100644 index ae8fa819d..000000000 --- a/cli/ops/dispatch_minimal.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::AnyError; -use deno_core::futures::future::FutureExt; -use deno_core::BufVec; -use deno_core::Op; -use deno_core::OpFn; -use deno_core::OpState; -use std::cell::RefCell; -use std::future::Future; -use std::iter::repeat; -use std::mem::size_of_val; -use std::pin::Pin; -use std::rc::Rc; -use std::slice; - -pub enum MinimalOp { - Sync(Result<i32, AnyError>), - Async(Pin<Box<dyn Future<Output = Result<i32, AnyError>>>>), -} - -#[derive(Copy, Clone, Debug, PartialEq)] -// This corresponds to RecordMinimal on the TS side. -pub struct Record { - pub promise_id: i32, - pub arg: i32, - pub result: i32, -} - -impl Into<Box<[u8]>> for Record { - fn into(self) -> Box<[u8]> { - let vec = vec![self.promise_id, self.arg, self.result]; - let buf32 = vec.into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; - unsafe { Box::from_raw(ptr) } - } -} - -pub struct ErrorRecord { - pub promise_id: i32, - pub arg: i32, - pub error_len: i32, - pub error_class: &'static [u8], - pub error_message: Vec<u8>, -} - -impl Into<Box<[u8]>> for ErrorRecord { - fn into(self) -> Box<[u8]> { - let Self { - promise_id, - arg, - error_len, - error_class, - error_message, - .. - } = self; - let header_i32 = [promise_id, arg, error_len]; - let header_u8 = unsafe { - slice::from_raw_parts( - &header_i32 as *const _ as *const u8, - size_of_val(&header_i32), - ) - }; - let padded_len = - (header_u8.len() + error_class.len() + error_message.len() + 3usize) - & !3usize; - header_u8 - .iter() - .cloned() - .chain(error_class.iter().cloned()) - .chain(error_message.into_iter()) - .chain(repeat(b' ')) - .take(padded_len) - .collect() - } -} - -#[test] -fn test_error_record() { - let expected = vec![ - 1, 0, 0, 0, 255, 255, 255, 255, 11, 0, 0, 0, 66, 97, 100, 82, 101, 115, - 111, 117, 114, 99, 101, 69, 114, 114, 111, 114, - ]; - let err_record = ErrorRecord { - promise_id: 1, - arg: -1, - error_len: 11, - error_class: b"BadResource", - error_message: b"Error".to_vec(), - }; - let buf: Box<[u8]> = err_record.into(); - assert_eq!(buf, expected.into_boxed_slice()); -} - -pub fn parse_min_record(bytes: &[u8]) -> Option<Record> { - if bytes.len() % std::mem::size_of::<i32>() != 0 { - return None; - } - let p = bytes.as_ptr(); - #[allow(clippy::cast_ptr_alignment)] - let p32 = p as *const i32; - let s = unsafe { std::slice::from_raw_parts(p32, bytes.len() / 4) }; - - if s.len() != 3 { - return None; - } - let ptr = s.as_ptr(); - let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; - Some(Record { - promise_id: ints[0], - arg: ints[1], - result: ints[2], - }) -} - -#[test] -fn test_parse_min_record() { - let buf = vec![1, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]; - assert_eq!( - parse_min_record(&buf), - Some(Record { - promise_id: 1, - arg: 3, - result: 4 - }) - ); - - let buf = vec![]; - assert_eq!(parse_min_record(&buf), None); - - let buf = vec![5]; - assert_eq!(parse_min_record(&buf), None); -} - -pub fn minimal_op<F>(op_fn: F) -> Box<OpFn> -where - F: Fn(Rc<RefCell<OpState>>, bool, i32, BufVec) -> MinimalOp + 'static, -{ - Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().expect("Expected record at position 0"); - let zero_copy = bufs_iter.collect::<BufVec>(); - - let mut record = match parse_min_record(&record_buf) { - Some(r) => r, - None => { - let error_class = b"TypeError"; - let error_message = b"Unparsable control buffer"; - let error_record = ErrorRecord { - promise_id: 0, - arg: -1, - error_len: error_class.len() as i32, - error_class, - error_message: error_message[..].to_owned(), - }; - return Op::Sync(error_record.into()); - } - }; - let is_sync = record.promise_id == 0; - let rid = record.arg; - let min_op = op_fn(state.clone(), is_sync, rid, zero_copy); - - match min_op { - MinimalOp::Sync(sync_result) => Op::Sync(match sync_result { - Ok(r) => { - record.result = r; - record.into() - } - Err(err) => { - let error_class = (state.borrow().get_error_class_fn)(&err); - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_len: error_class.len() as i32, - error_class: error_class.as_bytes(), - error_message: err.to_string().as_bytes().to_owned(), - }; - error_record.into() - } - }), - MinimalOp::Async(min_fut) => { - let fut = async move { - match min_fut.await { - Ok(r) => { - record.result = r; - record.into() - } - Err(err) => { - let error_class = (state.borrow().get_error_class_fn)(&err); - let error_record = ErrorRecord { - promise_id: record.promise_id, - arg: -1, - error_len: error_class.len() as i32, - error_class: error_class.as_bytes(), - error_message: err.to_string().as_bytes().to_owned(), - }; - error_record.into() - } - } - }; - Op::Async(fut.boxed_local()) - } - } - }) -} diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs deleted file mode 100644 index 18e9e9c9f..000000000 --- a/cli/ops/fetch.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::http_util; -use crate::permissions::Permissions; -use deno_fetch::reqwest; - -pub fn init(rt: &mut deno_core::JsRuntime, maybe_ca_file: Option<&str>) { - { - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - state.put::<reqwest::Client>({ - http_util::create_http_client(http_util::get_user_agent(), maybe_ca_file) - .unwrap() - }); - } - super::reg_json_async(rt, "op_fetch", deno_fetch::op_fetch::<Permissions>); - super::reg_json_async(rt, "op_fetch_read", deno_fetch::op_fetch_read); - super::reg_json_sync( - rt, - "op_create_http_client", - deno_fetch::op_create_http_client::<Permissions>, - ); -} diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs deleted file mode 100644 index 865c5bcca..000000000 --- a/cli/ops/fs.rs +++ /dev/null @@ -1,1702 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -// Some deserializer fields are only used on Unix and Windows build fails without it -use super::io::std_file_resource; -use super::io::{FileMetadata, StreamResource, StreamResourceHolder}; -use crate::fs_util::canonicalize_path; -use crate::permissions::Permissions; -use deno_core::error::custom_error; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use deno_crypto::rand::thread_rng; -use deno_crypto::rand::Rng; -use serde::Deserialize; -use std::cell::RefCell; -use std::convert::From; -use std::env::{current_dir, set_current_dir, temp_dir}; -use std::io; -use std::io::{Seek, SeekFrom}; -use std::path::{Path, PathBuf}; -use std::rc::Rc; -use std::time::SystemTime; -use std::time::UNIX_EPOCH; - -#[cfg(not(unix))] -use deno_core::error::generic_error; -#[cfg(not(unix))] -use deno_core::error::not_supported; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_open_sync", op_open_sync); - super::reg_json_async(rt, "op_open_async", op_open_async); - - super::reg_json_sync(rt, "op_seek_sync", op_seek_sync); - super::reg_json_async(rt, "op_seek_async", op_seek_async); - - super::reg_json_sync(rt, "op_fdatasync_sync", op_fdatasync_sync); - super::reg_json_async(rt, "op_fdatasync_async", op_fdatasync_async); - - super::reg_json_sync(rt, "op_fsync_sync", op_fsync_sync); - super::reg_json_async(rt, "op_fsync_async", op_fsync_async); - - super::reg_json_sync(rt, "op_fstat_sync", op_fstat_sync); - super::reg_json_async(rt, "op_fstat_async", op_fstat_async); - - super::reg_json_sync(rt, "op_umask", op_umask); - super::reg_json_sync(rt, "op_chdir", op_chdir); - - super::reg_json_sync(rt, "op_mkdir_sync", op_mkdir_sync); - super::reg_json_async(rt, "op_mkdir_async", op_mkdir_async); - - super::reg_json_sync(rt, "op_chmod_sync", op_chmod_sync); - super::reg_json_async(rt, "op_chmod_async", op_chmod_async); - - super::reg_json_sync(rt, "op_chown_sync", op_chown_sync); - super::reg_json_async(rt, "op_chown_async", op_chown_async); - - super::reg_json_sync(rt, "op_remove_sync", op_remove_sync); - super::reg_json_async(rt, "op_remove_async", op_remove_async); - - super::reg_json_sync(rt, "op_copy_file_sync", op_copy_file_sync); - super::reg_json_async(rt, "op_copy_file_async", op_copy_file_async); - - super::reg_json_sync(rt, "op_stat_sync", op_stat_sync); - super::reg_json_async(rt, "op_stat_async", op_stat_async); - - super::reg_json_sync(rt, "op_realpath_sync", op_realpath_sync); - super::reg_json_async(rt, "op_realpath_async", op_realpath_async); - - super::reg_json_sync(rt, "op_read_dir_sync", op_read_dir_sync); - super::reg_json_async(rt, "op_read_dir_async", op_read_dir_async); - - super::reg_json_sync(rt, "op_rename_sync", op_rename_sync); - super::reg_json_async(rt, "op_rename_async", op_rename_async); - - super::reg_json_sync(rt, "op_link_sync", op_link_sync); - super::reg_json_async(rt, "op_link_async", op_link_async); - - super::reg_json_sync(rt, "op_symlink_sync", op_symlink_sync); - super::reg_json_async(rt, "op_symlink_async", op_symlink_async); - - super::reg_json_sync(rt, "op_read_link_sync", op_read_link_sync); - super::reg_json_async(rt, "op_read_link_async", op_read_link_async); - - super::reg_json_sync(rt, "op_ftruncate_sync", op_ftruncate_sync); - super::reg_json_async(rt, "op_ftruncate_async", op_ftruncate_async); - - super::reg_json_sync(rt, "op_truncate_sync", op_truncate_sync); - super::reg_json_async(rt, "op_truncate_async", op_truncate_async); - - super::reg_json_sync(rt, "op_make_temp_dir_sync", op_make_temp_dir_sync); - super::reg_json_async(rt, "op_make_temp_dir_async", op_make_temp_dir_async); - - super::reg_json_sync(rt, "op_make_temp_file_sync", op_make_temp_file_sync); - super::reg_json_async(rt, "op_make_temp_file_async", op_make_temp_file_async); - - super::reg_json_sync(rt, "op_cwd", op_cwd); - - super::reg_json_sync(rt, "op_futime_sync", op_futime_sync); - super::reg_json_async(rt, "op_futime_async", op_futime_async); - - super::reg_json_sync(rt, "op_utime_sync", op_utime_sync); - super::reg_json_async(rt, "op_utime_async", op_utime_async); -} - -fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { - s.into_string().map_err(|s| { - let message = format!("File name or path {:?} is not valid UTF-8", s); - custom_error("InvalidData", message) - }) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct OpenArgs { - path: String, - mode: Option<u32>, - options: OpenOptions, -} - -#[derive(Deserialize, Default, Debug)] -#[serde(rename_all = "camelCase")] -#[serde(default)] -struct OpenOptions { - read: bool, - write: bool, - create: bool, - truncate: bool, - append: bool, - create_new: bool, -} - -fn open_helper( - state: &mut OpState, - args: Value, -) -> Result<(PathBuf, std::fs::OpenOptions), AnyError> { - let args: OpenArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - - let mut open_options = std::fs::OpenOptions::new(); - - if let Some(mode) = args.mode { - // mode only used if creating the file on Unix - // if not specified, defaults to 0o666 - #[cfg(unix)] - { - use std::os::unix::fs::OpenOptionsExt; - open_options.mode(mode & 0o777); - } - #[cfg(not(unix))] - let _ = mode; // avoid unused warning - } - - let permissions = state.borrow::<Permissions>(); - let options = args.options; - - if options.read { - permissions.check_read(&path)?; - } - - if options.write || options.append { - permissions.check_write(&path)?; - } - - open_options - .read(options.read) - .create(options.create) - .write(options.write) - .truncate(options.truncate) - .append(options.append) - .create_new(options.create_new); - - Ok((path, open_options)) -} - -fn op_open_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let (path, open_options) = open_helper(state, args)?; - let std_file = open_options.open(path)?; - let tokio_file = tokio::fs::File::from_std(std_file); - let rid = state.resource_table.add( - "fsFile", - Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some(( - tokio_file, - FileMetadata::default(), - ))))), - ); - Ok(json!(rid)) -} - -async fn op_open_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let (path, open_options) = open_helper(&mut state.borrow_mut(), args)?; - let tokio_file = tokio::fs::OpenOptions::from(open_options) - .open(path) - .await?; - let rid = state.borrow_mut().resource_table.add( - "fsFile", - Box::new(StreamResourceHolder::new(StreamResource::FsFile(Some(( - tokio_file, - FileMetadata::default(), - ))))), - ); - Ok(json!(rid)) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct SeekArgs { - rid: i32, - offset: i64, - whence: i32, -} - -fn seek_helper(args: Value) -> Result<(u32, SeekFrom), AnyError> { - let args: SeekArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let offset = args.offset; - let whence = args.whence as u32; - // Translate seek mode to Rust repr. - let seek_from = match whence { - 0 => SeekFrom::Start(offset as u64), - 1 => SeekFrom::Current(offset), - 2 => SeekFrom::End(offset), - _ => { - return Err(type_error(format!("Invalid seek mode: {}", whence))); - } - }; - - Ok((rid, seek_from)) -} - -fn op_seek_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let (rid, seek_from) = seek_helper(args)?; - let pos = std_file_resource(state, rid, |r| match r { - Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from), - Err(_) => Err(type_error( - "cannot seek on this type of resource".to_string(), - )), - })?; - Ok(json!(pos)) -} - -async fn op_seek_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let (rid, seek_from) = seek_helper(args)?; - // TODO(ry) This is a fake async op. We need to use poll_fn, - // tokio::fs::File::start_seek and tokio::fs::File::poll_complete - let pos = std_file_resource(&mut state.borrow_mut(), rid, |r| match r { - Ok(std_file) => std_file.seek(seek_from).map_err(AnyError::from), - Err(_) => Err(type_error( - "cannot seek on this type of resource".to_string(), - )), - })?; - Ok(json!(pos)) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct FdatasyncArgs { - rid: i32, -} - -fn op_fdatasync_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: FdatasyncArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - std_file_resource(state, rid, |r| match r { - Ok(std_file) => std_file.sync_data().map_err(AnyError::from), - Err(_) => Err(type_error("cannot sync this type of resource".to_string())), - })?; - Ok(json!({})) -} - -async fn op_fdatasync_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: FdatasyncArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - std_file_resource(&mut state.borrow_mut(), rid, |r| match r { - Ok(std_file) => std_file.sync_data().map_err(AnyError::from), - Err(_) => Err(type_error("cannot sync this type of resource".to_string())), - })?; - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct FsyncArgs { - rid: i32, -} - -fn op_fsync_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: FsyncArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - std_file_resource(state, rid, |r| match r { - Ok(std_file) => std_file.sync_all().map_err(AnyError::from), - Err(_) => Err(type_error("cannot sync this type of resource".to_string())), - })?; - Ok(json!({})) -} - -async fn op_fsync_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: FsyncArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - std_file_resource(&mut state.borrow_mut(), rid, |r| match r { - Ok(std_file) => std_file.sync_all().map_err(AnyError::from), - Err(_) => Err(type_error("cannot sync this type of resource".to_string())), - })?; - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct FstatArgs { - rid: i32, -} - -fn op_fstat_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.fstat"); - let args: FstatArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let metadata = std_file_resource(state, rid, |r| match r { - Ok(std_file) => std_file.metadata().map_err(AnyError::from), - Err(_) => Err(type_error("cannot stat this type of resource".to_string())), - })?; - Ok(get_stat_json(metadata)) -} - -async fn op_fstat_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - super::check_unstable2(&state, "Deno.fstat"); - - let args: FstatArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let metadata = - std_file_resource(&mut state.borrow_mut(), rid, |r| match r { - Ok(std_file) => std_file.metadata().map_err(AnyError::from), - Err(_) => { - Err(type_error("cannot stat this type of resource".to_string())) - } - })?; - Ok(get_stat_json(metadata)) -} - -#[derive(Deserialize)] -struct UmaskArgs { - mask: Option<u32>, -} - -fn op_umask( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.umask"); - let args: UmaskArgs = serde_json::from_value(args)?; - // TODO implement umask for Windows - // see https://github.com/nodejs/node/blob/master/src/node_process_methods.cc - // and https://docs.microsoft.com/fr-fr/cpp/c-runtime-library/reference/umask?view=vs-2019 - #[cfg(not(unix))] - { - let _ = args.mask; // avoid unused warning. - Err(not_supported()) - } - #[cfg(unix)] - { - use nix::sys::stat::mode_t; - use nix::sys::stat::umask; - use nix::sys::stat::Mode; - let r = if let Some(mask) = args.mask { - // If mask provided, return previous. - umask(Mode::from_bits_truncate(mask as mode_t)) - } else { - // If no mask provided, we query the current. Requires two syscalls. - let prev = umask(Mode::from_bits_truncate(0o777)); - let _ = umask(prev); - prev - }; - Ok(json!(r.bits() as u32)) - } -} - -#[derive(Deserialize)] -struct ChdirArgs { - directory: String, -} - -fn op_chdir( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: ChdirArgs = serde_json::from_value(args)?; - let d = PathBuf::from(&args.directory); - state.borrow::<Permissions>().check_read(&d)?; - set_current_dir(&d)?; - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct MkdirArgs { - path: String, - recursive: bool, - mode: Option<u32>, -} - -fn op_mkdir_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: MkdirArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - let mode = args.mode.unwrap_or(0o777) & 0o777; - state.borrow::<Permissions>().check_write(&path)?; - debug!("op_mkdir {} {:o} {}", path.display(), mode, args.recursive); - let mut builder = std::fs::DirBuilder::new(); - builder.recursive(args.recursive); - #[cfg(unix)] - { - use std::os::unix::fs::DirBuilderExt; - builder.mode(mode); - } - builder.create(path)?; - Ok(json!({})) -} - -async fn op_mkdir_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: MkdirArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - let mode = args.mode.unwrap_or(0o777) & 0o777; - - { - let state = state.borrow(); - state.borrow::<Permissions>().check_write(&path)?; - } - - tokio::task::spawn_blocking(move || { - debug!("op_mkdir {} {:o} {}", path.display(), mode, args.recursive); - let mut builder = std::fs::DirBuilder::new(); - builder.recursive(args.recursive); - #[cfg(unix)] - { - use std::os::unix::fs::DirBuilderExt; - builder.mode(mode); - } - builder.create(path)?; - Ok(json!({})) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ChmodArgs { - path: String, - mode: u32, -} - -fn op_chmod_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: ChmodArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - let mode = args.mode & 0o777; - - state.borrow::<Permissions>().check_write(&path)?; - debug!("op_chmod_sync {} {:o}", path.display(), mode); - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let permissions = PermissionsExt::from_mode(mode); - std::fs::set_permissions(&path, permissions)?; - Ok(json!({})) - } - // TODO Implement chmod for Windows (#4357) - #[cfg(not(unix))] - { - // Still check file/dir exists on Windows - let _metadata = std::fs::metadata(&path)?; - Err(generic_error("Not implemented")) - } -} - -async fn op_chmod_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: ChmodArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - let mode = args.mode & 0o777; - - { - let state = state.borrow(); - state.borrow::<Permissions>().check_write(&path)?; - } - - tokio::task::spawn_blocking(move || { - debug!("op_chmod_async {} {:o}", path.display(), mode); - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let permissions = PermissionsExt::from_mode(mode); - std::fs::set_permissions(&path, permissions)?; - Ok(json!({})) - } - // TODO Implement chmod for Windows (#4357) - #[cfg(not(unix))] - { - // Still check file/dir exists on Windows - let _metadata = std::fs::metadata(&path)?; - Err(not_supported()) - } - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ChownArgs { - path: String, - uid: Option<u32>, - gid: Option<u32>, -} - -fn op_chown_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: ChownArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - state.borrow::<Permissions>().check_write(&path)?; - debug!( - "op_chown_sync {} {:?} {:?}", - path.display(), - args.uid, - args.gid, - ); - #[cfg(unix)] - { - use nix::unistd::{chown, Gid, Uid}; - let nix_uid = args.uid.map(Uid::from_raw); - let nix_gid = args.gid.map(Gid::from_raw); - chown(&path, nix_uid, nix_gid)?; - Ok(json!({})) - } - // TODO Implement chown for Windows - #[cfg(not(unix))] - { - Err(generic_error("Not implemented")) - } -} - -async fn op_chown_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: ChownArgs = serde_json::from_value(args)?; - let path = Path::new(&args.path).to_path_buf(); - - { - let state = state.borrow(); - state.borrow::<Permissions>().check_write(&path)?; - } - - tokio::task::spawn_blocking(move || { - debug!( - "op_chown_async {} {:?} {:?}", - path.display(), - args.uid, - args.gid, - ); - #[cfg(unix)] - { - use nix::unistd::{chown, Gid, Uid}; - let nix_uid = args.uid.map(Uid::from_raw); - let nix_gid = args.gid.map(Gid::from_raw); - chown(&path, nix_uid, nix_gid)?; - Ok(json!({})) - } - // TODO Implement chown for Windows - #[cfg(not(unix))] - Err(not_supported()) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct RemoveArgs { - path: String, - recursive: bool, -} - -fn op_remove_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: RemoveArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let recursive = args.recursive; - - state.borrow::<Permissions>().check_write(&path)?; - - #[cfg(not(unix))] - use std::os::windows::prelude::MetadataExt; - - let metadata = std::fs::symlink_metadata(&path)?; - - debug!("op_remove_sync {} {}", path.display(), recursive); - let file_type = metadata.file_type(); - if file_type.is_file() { - std::fs::remove_file(&path)?; - } else if recursive { - std::fs::remove_dir_all(&path)?; - } else if file_type.is_symlink() { - #[cfg(unix)] - std::fs::remove_file(&path)?; - #[cfg(not(unix))] - { - use winapi::um::winnt::FILE_ATTRIBUTE_DIRECTORY; - if metadata.file_attributes() & FILE_ATTRIBUTE_DIRECTORY != 0 { - std::fs::remove_dir(&path)?; - } else { - std::fs::remove_file(&path)?; - } - } - } else if file_type.is_dir() { - std::fs::remove_dir(&path)?; - } else { - // pipes, sockets, etc... - std::fs::remove_file(&path)?; - } - Ok(json!({})) -} - -async fn op_remove_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: RemoveArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let recursive = args.recursive; - - { - let state = state.borrow(); - state.borrow::<Permissions>().check_write(&path)?; - } - - tokio::task::spawn_blocking(move || { - #[cfg(not(unix))] - use std::os::windows::prelude::MetadataExt; - - let metadata = std::fs::symlink_metadata(&path)?; - - debug!("op_remove_async {} {}", path.display(), recursive); - let file_type = metadata.file_type(); - if file_type.is_file() { - std::fs::remove_file(&path)?; - } else if recursive { - std::fs::remove_dir_all(&path)?; - } else if file_type.is_symlink() { - #[cfg(unix)] - std::fs::remove_file(&path)?; - #[cfg(not(unix))] - { - use winapi::um::winnt::FILE_ATTRIBUTE_DIRECTORY; - if metadata.file_attributes() & FILE_ATTRIBUTE_DIRECTORY != 0 { - std::fs::remove_dir(&path)?; - } else { - std::fs::remove_file(&path)?; - } - } - } else if file_type.is_dir() { - std::fs::remove_dir(&path)?; - } else { - // pipes, sockets, etc... - std::fs::remove_file(&path)?; - } - Ok(json!({})) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CopyFileArgs { - from: String, - to: String, -} - -fn op_copy_file_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: CopyFileArgs = serde_json::from_value(args)?; - let from = PathBuf::from(&args.from); - let to = PathBuf::from(&args.to); - - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&from)?; - permissions.check_write(&to)?; - - debug!("op_copy_file_sync {} {}", from.display(), to.display()); - // On *nix, Rust reports non-existent `from` as ErrorKind::InvalidInput - // See https://github.com/rust-lang/rust/issues/54800 - // Once the issue is resolved, we should remove this workaround. - if cfg!(unix) && !from.is_file() { - return Err(custom_error("NotFound", "File not found")); - } - - // returns size of from as u64 (we ignore) - std::fs::copy(&from, &to)?; - Ok(json!({})) -} - -async fn op_copy_file_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: CopyFileArgs = serde_json::from_value(args)?; - let from = PathBuf::from(&args.from); - let to = PathBuf::from(&args.to); - - { - let state = state.borrow(); - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&from)?; - permissions.check_write(&to)?; - } - - debug!("op_copy_file_async {} {}", from.display(), to.display()); - tokio::task::spawn_blocking(move || { - // On *nix, Rust reports non-existent `from` as ErrorKind::InvalidInput - // See https://github.com/rust-lang/rust/issues/54800 - // Once the issue is resolved, we should remove this workaround. - if cfg!(unix) && !from.is_file() { - return Err(custom_error("NotFound", "File not found")); - } - - // returns size of from as u64 (we ignore) - std::fs::copy(&from, &to)?; - Ok(json!({})) - }) - .await - .unwrap() -} - -fn to_msec(maybe_time: Result<SystemTime, io::Error>) -> Value { - match maybe_time { - Ok(time) => { - let msec = time - .duration_since(UNIX_EPOCH) - .map(|t| t.as_secs_f64() * 1000f64) - .unwrap_or_else(|err| err.duration().as_secs_f64() * -1000f64); - serde_json::Number::from_f64(msec) - .map(Value::Number) - .unwrap_or(Value::Null) - } - Err(_) => Value::Null, - } -} - -#[inline(always)] -fn get_stat_json(metadata: std::fs::Metadata) -> Value { - // Unix stat member (number types only). 0 if not on unix. - macro_rules! usm { - ($member:ident) => {{ - #[cfg(unix)] - { - metadata.$member() - } - #[cfg(not(unix))] - { - 0 - } - }}; - } - - #[cfg(unix)] - use std::os::unix::fs::MetadataExt; - let json_val = json!({ - "isFile": metadata.is_file(), - "isDirectory": metadata.is_dir(), - "isSymlink": metadata.file_type().is_symlink(), - "size": metadata.len(), - // In milliseconds, like JavaScript. Available on both Unix or Windows. - "mtime": to_msec(metadata.modified()), - "atime": to_msec(metadata.accessed()), - "birthtime": to_msec(metadata.created()), - // Following are only valid under Unix. - "dev": usm!(dev), - "ino": usm!(ino), - "mode": usm!(mode), - "nlink": usm!(nlink), - "uid": usm!(uid), - "gid": usm!(gid), - "rdev": usm!(rdev), - // TODO(kevinkassimo): *time_nsec requires BigInt. - // Probably should be treated as String if we need to add them. - "blksize": usm!(blksize), - "blocks": usm!(blocks), - }); - json_val -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct StatArgs { - path: String, - lstat: bool, -} - -fn op_stat_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: StatArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let lstat = args.lstat; - state.borrow::<Permissions>().check_read(&path)?; - debug!("op_stat_sync {} {}", path.display(), lstat); - let metadata = if lstat { - std::fs::symlink_metadata(&path)? - } else { - std::fs::metadata(&path)? - }; - Ok(get_stat_json(metadata)) -} - -async fn op_stat_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: StatArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let lstat = args.lstat; - - { - let state = state.borrow(); - state.borrow::<Permissions>().check_read(&path)?; - } - - tokio::task::spawn_blocking(move || { - debug!("op_stat_async {} {}", path.display(), lstat); - let metadata = if lstat { - std::fs::symlink_metadata(&path)? - } else { - std::fs::metadata(&path)? - }; - Ok(get_stat_json(metadata)) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct RealpathArgs { - path: String, -} - -fn op_realpath_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: RealpathArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&path)?; - if path.is_relative() { - permissions.check_read_blind(¤t_dir()?, "CWD")?; - } - - debug!("op_realpath_sync {}", path.display()); - // corresponds to the realpath on Unix and - // CreateFile and GetFinalPathNameByHandle on Windows - let realpath = canonicalize_path(&path)?; - let realpath_str = into_string(realpath.into_os_string())?; - Ok(json!(realpath_str)) -} - -async fn op_realpath_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: RealpathArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - - { - let state = state.borrow(); - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&path)?; - if path.is_relative() { - permissions.check_read_blind(¤t_dir()?, "CWD")?; - } - } - - tokio::task::spawn_blocking(move || { - debug!("op_realpath_async {}", path.display()); - // corresponds to the realpath on Unix and - // CreateFile and GetFinalPathNameByHandle on Windows - let realpath = canonicalize_path(&path)?; - let realpath_str = into_string(realpath.into_os_string())?; - Ok(json!(realpath_str)) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ReadDirArgs { - path: String, -} - -fn op_read_dir_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: ReadDirArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - - state.borrow::<Permissions>().check_read(&path)?; - - debug!("op_read_dir_sync {}", path.display()); - let entries: Vec<_> = std::fs::read_dir(path)? - .filter_map(|entry| { - let entry = entry.unwrap(); - let file_type = entry.file_type().unwrap(); - // Not all filenames can be encoded as UTF-8. Skip those for now. - if let Ok(name) = into_string(entry.file_name()) { - Some(json!({ - "name": name, - "isFile": file_type.is_file(), - "isDirectory": file_type.is_dir(), - "isSymlink": file_type.is_symlink() - })) - } else { - None - } - }) - .collect(); - - Ok(json!({ "entries": entries })) -} - -async fn op_read_dir_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: ReadDirArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - { - let state = state.borrow(); - state.borrow::<Permissions>().check_read(&path)?; - } - tokio::task::spawn_blocking(move || { - debug!("op_read_dir_async {}", path.display()); - let entries: Vec<_> = std::fs::read_dir(path)? - .filter_map(|entry| { - let entry = entry.unwrap(); - let file_type = entry.file_type().unwrap(); - // Not all filenames can be encoded as UTF-8. Skip those for now. - if let Ok(name) = into_string(entry.file_name()) { - Some(json!({ - "name": name, - "isFile": file_type.is_file(), - "isDirectory": file_type.is_dir(), - "isSymlink": file_type.is_symlink() - })) - } else { - None - } - }) - .collect(); - - Ok(json!({ "entries": entries })) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct RenameArgs { - oldpath: String, - newpath: String, -} - -fn op_rename_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: RenameArgs = serde_json::from_value(args)?; - let oldpath = PathBuf::from(&args.oldpath); - let newpath = PathBuf::from(&args.newpath); - - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&oldpath)?; - permissions.check_write(&oldpath)?; - permissions.check_write(&newpath)?; - debug!("op_rename_sync {} {}", oldpath.display(), newpath.display()); - std::fs::rename(&oldpath, &newpath)?; - Ok(json!({})) -} - -async fn op_rename_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: RenameArgs = serde_json::from_value(args)?; - let oldpath = PathBuf::from(&args.oldpath); - let newpath = PathBuf::from(&args.newpath); - { - let state = state.borrow(); - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&oldpath)?; - permissions.check_write(&oldpath)?; - permissions.check_write(&newpath)?; - } - tokio::task::spawn_blocking(move || { - debug!( - "op_rename_async {} {}", - oldpath.display(), - newpath.display() - ); - std::fs::rename(&oldpath, &newpath)?; - Ok(json!({})) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct LinkArgs { - oldpath: String, - newpath: String, -} - -fn op_link_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.link"); - let args: LinkArgs = serde_json::from_value(args)?; - let oldpath = PathBuf::from(&args.oldpath); - let newpath = PathBuf::from(&args.newpath); - - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&oldpath)?; - permissions.check_write(&newpath)?; - - debug!("op_link_sync {} {}", oldpath.display(), newpath.display()); - std::fs::hard_link(&oldpath, &newpath)?; - Ok(json!({})) -} - -async fn op_link_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - super::check_unstable2(&state, "Deno.link"); - - let args: LinkArgs = serde_json::from_value(args)?; - let oldpath = PathBuf::from(&args.oldpath); - let newpath = PathBuf::from(&args.newpath); - - { - let state = state.borrow(); - let permissions = state.borrow::<Permissions>(); - permissions.check_read(&oldpath)?; - permissions.check_write(&newpath)?; - } - - tokio::task::spawn_blocking(move || { - debug!("op_link_async {} {}", oldpath.display(), newpath.display()); - std::fs::hard_link(&oldpath, &newpath)?; - Ok(json!({})) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct SymlinkArgs { - oldpath: String, - newpath: String, - #[cfg(not(unix))] - options: Option<SymlinkOptions>, -} - -#[cfg(not(unix))] -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct SymlinkOptions { - _type: String, -} - -fn op_symlink_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.symlink"); - let args: SymlinkArgs = serde_json::from_value(args)?; - let oldpath = PathBuf::from(&args.oldpath); - let newpath = PathBuf::from(&args.newpath); - - state.borrow::<Permissions>().check_write(&newpath)?; - - debug!( - "op_symlink_sync {} {}", - oldpath.display(), - newpath.display() - ); - #[cfg(unix)] - { - use std::os::unix::fs::symlink; - symlink(&oldpath, &newpath)?; - Ok(json!({})) - } - #[cfg(not(unix))] - { - use std::os::windows::fs::{symlink_dir, symlink_file}; - - match args.options { - Some(options) => match options._type.as_ref() { - "file" => symlink_file(&oldpath, &newpath)?, - "dir" => symlink_dir(&oldpath, &newpath)?, - _ => return Err(type_error("unsupported type")), - }, - None => { - let old_meta = std::fs::metadata(&oldpath); - match old_meta { - Ok(metadata) => { - if metadata.is_file() { - symlink_file(&oldpath, &newpath)? - } else if metadata.is_dir() { - symlink_dir(&oldpath, &newpath)? - } - } - Err(_) => return Err(type_error("you must pass a `options` argument for non-existent target path in windows".to_string())), - } - } - }; - Ok(json!({})) - } -} - -async fn op_symlink_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - super::check_unstable2(&state, "Deno.symlink"); - - let args: SymlinkArgs = serde_json::from_value(args)?; - let oldpath = PathBuf::from(&args.oldpath); - let newpath = PathBuf::from(&args.newpath); - - { - let state = state.borrow(); - state.borrow::<Permissions>().check_write(&newpath)?; - } - - tokio::task::spawn_blocking(move || { - debug!("op_symlink_async {} {}", oldpath.display(), newpath.display()); - #[cfg(unix)] - { - use std::os::unix::fs::symlink; - symlink(&oldpath, &newpath)?; - Ok(json!({})) - } - #[cfg(not(unix))] - { - use std::os::windows::fs::{symlink_dir, symlink_file}; - - match args.options { - Some(options) => match options._type.as_ref() { - "file" => symlink_file(&oldpath, &newpath)?, - "dir" => symlink_dir(&oldpath, &newpath)?, - _ => return Err(type_error("unsupported type")), - }, - None => { - let old_meta = std::fs::metadata(&oldpath); - match old_meta { - Ok(metadata) => { - if metadata.is_file() { - symlink_file(&oldpath, &newpath)? - } else if metadata.is_dir() { - symlink_dir(&oldpath, &newpath)? - } - } - Err(_) => return Err(type_error("you must pass a `options` argument for non-existent target path in windows".to_string())), - } - } - }; - Ok(json!({})) - } - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ReadLinkArgs { - path: String, -} - -fn op_read_link_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: ReadLinkArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - - state.borrow::<Permissions>().check_read(&path)?; - - debug!("op_read_link_value {}", path.display()); - let target = std::fs::read_link(&path)?.into_os_string(); - let targetstr = into_string(target)?; - Ok(json!(targetstr)) -} - -async fn op_read_link_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: ReadLinkArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - { - let state = state.borrow(); - state.borrow::<Permissions>().check_read(&path)?; - } - tokio::task::spawn_blocking(move || { - debug!("op_read_link_async {}", path.display()); - let target = std::fs::read_link(&path)?.into_os_string(); - let targetstr = into_string(target)?; - Ok(json!(targetstr)) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct FtruncateArgs { - rid: i32, - len: i32, -} - -fn op_ftruncate_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.ftruncate"); - let args: FtruncateArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let len = args.len as u64; - std_file_resource(state, rid, |r| match r { - Ok(std_file) => std_file.set_len(len).map_err(AnyError::from), - Err(_) => Err(type_error("cannot truncate this type of resource")), - })?; - Ok(json!({})) -} - -async fn op_ftruncate_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - super::check_unstable2(&state, "Deno.ftruncate"); - let args: FtruncateArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let len = args.len as u64; - std_file_resource(&mut state.borrow_mut(), rid, |r| match r { - Ok(std_file) => std_file.set_len(len).map_err(AnyError::from), - Err(_) => Err(type_error("cannot truncate this type of resource")), - })?; - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct TruncateArgs { - path: String, - len: u64, -} - -fn op_truncate_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: TruncateArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let len = args.len; - - state.borrow::<Permissions>().check_write(&path)?; - - debug!("op_truncate_sync {} {}", path.display(), len); - let f = std::fs::OpenOptions::new().write(true).open(&path)?; - f.set_len(len)?; - Ok(json!({})) -} - -async fn op_truncate_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: TruncateArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let len = args.len; - { - let state = state.borrow(); - state.borrow::<Permissions>().check_write(&path)?; - } - tokio::task::spawn_blocking(move || { - debug!("op_truncate_async {} {}", path.display(), len); - let f = std::fs::OpenOptions::new().write(true).open(&path)?; - f.set_len(len)?; - Ok(json!({})) - }) - .await - .unwrap() -} - -fn make_temp( - dir: Option<&Path>, - prefix: Option<&str>, - suffix: Option<&str>, - is_dir: bool, -) -> std::io::Result<PathBuf> { - let prefix_ = prefix.unwrap_or(""); - let suffix_ = suffix.unwrap_or(""); - let mut buf: PathBuf = match dir { - Some(ref p) => p.to_path_buf(), - None => temp_dir(), - } - .join("_"); - let mut rng = thread_rng(); - loop { - let unique = rng.gen::<u32>(); - buf.set_file_name(format!("{}{:08x}{}", prefix_, unique, suffix_)); - let r = if is_dir { - #[allow(unused_mut)] - let mut builder = std::fs::DirBuilder::new(); - #[cfg(unix)] - { - use std::os::unix::fs::DirBuilderExt; - builder.mode(0o700); - } - builder.create(buf.as_path()) - } else { - let mut open_options = std::fs::OpenOptions::new(); - open_options.write(true).create_new(true); - #[cfg(unix)] - { - use std::os::unix::fs::OpenOptionsExt; - open_options.mode(0o600); - } - open_options.open(buf.as_path())?; - Ok(()) - }; - match r { - Err(ref e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue, - Ok(_) => return Ok(buf), - Err(e) => return Err(e), - } - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct MakeTempArgs { - dir: Option<String>, - prefix: Option<String>, - suffix: Option<String>, -} - -fn op_make_temp_dir_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: MakeTempArgs = serde_json::from_value(args)?; - - let dir = args.dir.map(|s| PathBuf::from(&s)); - let prefix = args.prefix.map(String::from); - let suffix = args.suffix.map(String::from); - - state - .borrow::<Permissions>() - .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; - - // TODO(piscisaureus): use byte vector for paths, not a string. - // See https://github.com/denoland/deno/issues/627. - // We can't assume that paths are always valid utf8 strings. - let path = make_temp( - // Converting Option<String> to Option<&str> - dir.as_deref(), - prefix.as_deref(), - suffix.as_deref(), - true, - )?; - let path_str = into_string(path.into_os_string())?; - - Ok(json!(path_str)) -} - -async fn op_make_temp_dir_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: MakeTempArgs = serde_json::from_value(args)?; - - let dir = args.dir.map(|s| PathBuf::from(&s)); - let prefix = args.prefix.map(String::from); - let suffix = args.suffix.map(String::from); - { - let state = state.borrow(); - state - .borrow::<Permissions>() - .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; - } - tokio::task::spawn_blocking(move || { - // TODO(piscisaureus): use byte vector for paths, not a string. - // See https://github.com/denoland/deno/issues/627. - // We can't assume that paths are always valid utf8 strings. - let path = make_temp( - // Converting Option<String> to Option<&str> - dir.as_deref(), - prefix.as_deref(), - suffix.as_deref(), - true, - )?; - let path_str = into_string(path.into_os_string())?; - - Ok(json!(path_str)) - }) - .await - .unwrap() -} - -fn op_make_temp_file_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: MakeTempArgs = serde_json::from_value(args)?; - - let dir = args.dir.map(|s| PathBuf::from(&s)); - let prefix = args.prefix.map(String::from); - let suffix = args.suffix.map(String::from); - - state - .borrow::<Permissions>() - .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; - - // TODO(piscisaureus): use byte vector for paths, not a string. - // See https://github.com/denoland/deno/issues/627. - // We can't assume that paths are always valid utf8 strings. - let path = make_temp( - // Converting Option<String> to Option<&str> - dir.as_deref(), - prefix.as_deref(), - suffix.as_deref(), - false, - )?; - let path_str = into_string(path.into_os_string())?; - - Ok(json!(path_str)) -} - -async fn op_make_temp_file_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: MakeTempArgs = serde_json::from_value(args)?; - - let dir = args.dir.map(|s| PathBuf::from(&s)); - let prefix = args.prefix.map(String::from); - let suffix = args.suffix.map(String::from); - { - let state = state.borrow(); - state - .borrow::<Permissions>() - .check_write(dir.clone().unwrap_or_else(temp_dir).as_path())?; - } - tokio::task::spawn_blocking(move || { - // TODO(piscisaureus): use byte vector for paths, not a string. - // See https://github.com/denoland/deno/issues/627. - // We can't assume that paths are always valid utf8 strings. - let path = make_temp( - // Converting Option<String> to Option<&str> - dir.as_deref(), - prefix.as_deref(), - suffix.as_deref(), - false, - )?; - let path_str = into_string(path.into_os_string())?; - - Ok(json!(path_str)) - }) - .await - .unwrap() -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct FutimeArgs { - rid: i32, - atime: (i64, u32), - mtime: (i64, u32), -} - -fn op_futime_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.futimeSync"); - let args: FutimeArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); - let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - - std_file_resource(state, rid, |r| match r { - Ok(std_file) => { - filetime::set_file_handle_times(std_file, Some(atime), Some(mtime)) - .map_err(AnyError::from) - } - Err(_) => Err(type_error( - "cannot futime on this type of resource".to_string(), - )), - })?; - - Ok(json!({})) -} - -async fn op_futime_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let mut state = state.borrow_mut(); - super::check_unstable(&state, "Deno.futime"); - let args: FutimeArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); - let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - // TODO Not actually async! https://github.com/denoland/deno/issues/7400 - std_file_resource(&mut state, rid, |r| match r { - Ok(std_file) => { - filetime::set_file_handle_times(std_file, Some(atime), Some(mtime)) - .map_err(AnyError::from) - } - Err(_) => Err(type_error( - "cannot futime on this type of resource".to_string(), - )), - })?; - - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct UtimeArgs { - path: String, - atime: (i64, u32), - mtime: (i64, u32), -} - -fn op_utime_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.utime"); - - let args: UtimeArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); - let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - - state.borrow::<Permissions>().check_write(&path)?; - filetime::set_file_times(path, atime, mtime)?; - Ok(json!({})) -} - -async fn op_utime_async( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let state = state.borrow(); - super::check_unstable(&state, "Deno.utime"); - - let args: UtimeArgs = serde_json::from_value(args)?; - let path = PathBuf::from(&args.path); - let atime = filetime::FileTime::from_unix_time(args.atime.0, args.atime.1); - let mtime = filetime::FileTime::from_unix_time(args.mtime.0, args.mtime.1); - - state.borrow::<Permissions>().check_write(&path)?; - - tokio::task::spawn_blocking(move || { - filetime::set_file_times(path, atime, mtime)?; - Ok(json!({})) - }) - .await - .unwrap() -} - -fn op_cwd( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let path = current_dir()?; - state - .borrow::<Permissions>() - .check_read_blind(&path, "CWD")?; - let path_str = into_string(path.into_os_string())?; - Ok(json!(path_str)) -} diff --git a/cli/ops/fs_events.rs b/cli/ops/fs_events.rs deleted file mode 100644 index 4832c915c..000000000 --- a/cli/ops/fs_events.rs +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::permissions::Permissions; -use deno_core::error::bad_resource_id; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use notify::event::Event as NotifyEvent; -use notify::Error as NotifyError; -use notify::EventKind; -use notify::RecommendedWatcher; -use notify::RecursiveMode; -use notify::Watcher; -use serde::Deserialize; -use serde::Serialize; -use std::cell::RefCell; -use std::convert::From; -use std::path::PathBuf; -use std::rc::Rc; -use tokio::sync::mpsc; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_fs_events_open", op_fs_events_open); - super::reg_json_async(rt, "op_fs_events_poll", op_fs_events_poll); -} - -struct FsEventsResource { - #[allow(unused)] - watcher: RecommendedWatcher, - receiver: mpsc::Receiver<Result<FsEvent, AnyError>>, -} - -/// Represents a file system event. -/// -/// We do not use the event directly from the notify crate. We flatten -/// the structure into this simpler structure. We want to only make it more -/// complex as needed. -/// -/// Feel free to expand this struct as long as you can add tests to demonstrate -/// the complexity. -#[derive(Serialize, Debug)] -struct FsEvent { - kind: String, - paths: Vec<PathBuf>, -} - -impl From<NotifyEvent> for FsEvent { - fn from(e: NotifyEvent) -> Self { - let kind = match e.kind { - EventKind::Any => "any", - EventKind::Access(_) => "access", - EventKind::Create(_) => "create", - EventKind::Modify(_) => "modify", - EventKind::Remove(_) => "remove", - EventKind::Other => todo!(), // What's this for? Leaving it out for now. - } - .to_string(); - FsEvent { - kind, - paths: e.paths, - } - } -} - -fn op_fs_events_open( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - #[derive(Deserialize)] - struct OpenArgs { - recursive: bool, - paths: Vec<String>, - } - let args: OpenArgs = serde_json::from_value(args)?; - let (sender, receiver) = mpsc::channel::<Result<FsEvent, AnyError>>(16); - let sender = std::sync::Mutex::new(sender); - let mut watcher: RecommendedWatcher = - Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| { - let res2 = res.map(FsEvent::from).map_err(AnyError::from); - let mut sender = sender.lock().unwrap(); - // Ignore result, if send failed it means that watcher was already closed, - // but not all messages have been flushed. - let _ = sender.try_send(res2); - })?; - let recursive_mode = if args.recursive { - RecursiveMode::Recursive - } else { - RecursiveMode::NonRecursive - }; - for path in &args.paths { - state - .borrow::<Permissions>() - .check_read(&PathBuf::from(path))?; - watcher.watch(path, recursive_mode)?; - } - let resource = FsEventsResource { watcher, receiver }; - let rid = state.resource_table.add("fsEvents", Box::new(resource)); - Ok(json!(rid)) -} - -async fn op_fs_events_poll( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - #[derive(Deserialize)] - struct PollArgs { - rid: u32, - } - let PollArgs { rid } = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let watcher = state - .resource_table - .get_mut::<FsEventsResource>(rid) - .ok_or_else(bad_resource_id)?; - watcher - .receiver - .poll_recv(cx) - .map(|maybe_result| match maybe_result { - Some(Ok(value)) => Ok(json!({ "value": value, "done": false })), - Some(Err(err)) => Err(err), - None => Ok(json!({ "done": true })), - }) - }) - .await -} diff --git a/cli/ops/io.rs b/cli/ops/io.rs deleted file mode 100644 index 0f8af905a..000000000 --- a/cli/ops/io.rs +++ /dev/null @@ -1,473 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use super::dispatch_minimal::minimal_op; -use super::dispatch_minimal::MinimalOp; -use crate::metrics::metrics_op; -use deno_core::error::bad_resource_id; -use deno_core::error::resource_unavailable; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; -use deno_core::futures::future::FutureExt; -use deno_core::futures::ready; -use deno_core::BufVec; -use deno_core::JsRuntime; -use deno_core::OpState; -use std::cell::RefCell; -use std::collections::HashMap; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::Context; -use std::task::Poll; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpStream; -use tokio_rustls::client::TlsStream as ClientTlsStream; -use tokio_rustls::server::TlsStream as ServerTlsStream; - -#[cfg(not(windows))] -use std::os::unix::io::FromRawFd; - -#[cfg(windows)] -use std::os::windows::io::FromRawHandle; - -lazy_static! { - /// Due to portability issues on Windows handle to stdout is created from raw - /// file descriptor. The caveat of that approach is fact that when this - /// handle is dropped underlying file descriptor is closed - that is highly - /// not desirable in case of stdout. That's why we store this global handle - /// that is then cloned when obtaining stdio for process. In turn when - /// resource table is dropped storing reference to that handle, the handle - /// itself won't be closed (so Deno.core.print) will still work. - // TODO(ry) It should be possible to close stdout. - static ref STDIN_HANDLE: Option<std::fs::File> = { - #[cfg(not(windows))] - let stdin = unsafe { Some(std::fs::File::from_raw_fd(0)) }; - #[cfg(windows)] - let stdin = unsafe { - let handle = winapi::um::processenv::GetStdHandle( - winapi::um::winbase::STD_INPUT_HANDLE, - ); - if handle.is_null() { - return None; - } - Some(std::fs::File::from_raw_handle(handle)) - }; - stdin - }; - static ref STDOUT_HANDLE: Option<std::fs::File> = { - #[cfg(not(windows))] - let stdout = unsafe { Some(std::fs::File::from_raw_fd(1)) }; - #[cfg(windows)] - let stdout = unsafe { - let handle = winapi::um::processenv::GetStdHandle( - winapi::um::winbase::STD_OUTPUT_HANDLE, - ); - if handle.is_null() { - return None; - } - Some(std::fs::File::from_raw_handle(handle)) - }; - stdout - }; - static ref STDERR_HANDLE: Option<std::fs::File> = { - #[cfg(not(windows))] - let stderr = unsafe { Some(std::fs::File::from_raw_fd(2)) }; - #[cfg(windows)] - let stderr = unsafe { - let handle = winapi::um::processenv::GetStdHandle( - winapi::um::winbase::STD_ERROR_HANDLE, - ); - if handle.is_null() { - return None; - } - Some(std::fs::File::from_raw_handle(handle)) - }; - stderr - }; -} - -pub fn init(rt: &mut JsRuntime) { - rt.register_op("op_read", metrics_op(minimal_op(op_read))); - rt.register_op("op_write", metrics_op(minimal_op(op_write))); -} - -pub fn get_stdio() -> ( - Option<StreamResourceHolder>, - Option<StreamResourceHolder>, - Option<StreamResourceHolder>, -) { - let stdin = get_stdio_stream(&STDIN_HANDLE); - let stdout = get_stdio_stream(&STDOUT_HANDLE); - let stderr = get_stdio_stream(&STDERR_HANDLE); - - (stdin, stdout, stderr) -} - -fn get_stdio_stream( - handle: &Option<std::fs::File>, -) -> Option<StreamResourceHolder> { - match handle { - None => None, - Some(file_handle) => match file_handle.try_clone() { - Ok(clone) => Some(StreamResourceHolder::new(StreamResource::FsFile( - Some((tokio::fs::File::from_std(clone), FileMetadata::default())), - ))), - Err(_e) => None, - }, - } -} - -fn no_buffer_specified() -> AnyError { - type_error("no buffer specified") -} - -#[cfg(unix)] -use nix::sys::termios; - -#[derive(Default)] -pub struct TTYMetadata { - #[cfg(unix)] - pub mode: Option<termios::Termios>, -} - -#[derive(Default)] -pub struct FileMetadata { - pub tty: TTYMetadata, -} - -pub struct StreamResourceHolder { - pub resource: StreamResource, - waker: HashMap<usize, futures::task::AtomicWaker>, - waker_counter: AtomicUsize, -} - -impl StreamResourceHolder { - pub fn new(resource: StreamResource) -> StreamResourceHolder { - StreamResourceHolder { - resource, - // Atleast one task is expecter for the resource - waker: HashMap::with_capacity(1), - // Tracks wakers Ids - waker_counter: AtomicUsize::new(0), - } - } -} - -impl Drop for StreamResourceHolder { - fn drop(&mut self) { - self.wake_tasks(); - } -} - -impl StreamResourceHolder { - pub fn track_task(&mut self, cx: &Context) -> Result<usize, AnyError> { - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - // Its OK if it overflows - let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed); - self.waker.insert(task_waker_id, waker); - Ok(task_waker_id) - } - - pub fn wake_tasks(&mut self) { - for waker in self.waker.values() { - waker.wake(); - } - } - - pub fn untrack_task(&mut self, task_waker_id: usize) { - self.waker.remove(&task_waker_id); - } -} - -pub enum StreamResource { - FsFile(Option<(tokio::fs::File, FileMetadata)>), - TcpStream(Option<tokio::net::TcpStream>), - #[cfg(not(windows))] - UnixStream(tokio::net::UnixStream), - ServerTlsStream(Box<ServerTlsStream<TcpStream>>), - ClientTlsStream(Box<ClientTlsStream<TcpStream>>), - ChildStdin(tokio::process::ChildStdin), - ChildStdout(tokio::process::ChildStdout), - ChildStderr(tokio::process::ChildStderr), -} - -trait UnpinAsyncRead: AsyncRead + Unpin {} -trait UnpinAsyncWrite: AsyncWrite + Unpin {} - -impl<T: AsyncRead + Unpin> UnpinAsyncRead for T {} -impl<T: AsyncWrite + Unpin> UnpinAsyncWrite for T {} - -/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait -/// but uses an `AnyError` error instead of `std::io:Error` -pub trait DenoAsyncRead { - fn poll_read( - &mut self, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<Result<usize, AnyError>>; -} - -impl DenoAsyncRead for StreamResource { - fn poll_read( - &mut self, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll<Result<usize, AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncRead = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Ready(Err(resource_unavailable())), - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdout(f) => f, - ChildStderr(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - let v = ready!(Pin::new(f).poll_read(cx, buf))?; - Ok(v).into() - } -} - -pub fn op_read( - state: Rc<RefCell<OpState>>, - is_sync: bool, - rid: i32, - mut zero_copy: BufVec, -) -> MinimalOp { - debug!("read rid={}", rid); - match zero_copy.len() { - 0 => return MinimalOp::Sync(Err(no_buffer_specified())), - 1 => {} - _ => panic!("Invalid number of arguments"), - } - - if is_sync { - MinimalOp::Sync({ - // First we look up the rid in the resource table. - std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r { - Ok(std_file) => { - use std::io::Read; - std_file - .read(&mut zero_copy[0]) - .map(|n: usize| n as i32) - .map_err(AnyError::from) - } - Err(_) => Err(type_error("sync read not allowed on this resource")), - }) - }) - } else { - let mut zero_copy = zero_copy[0].clone(); - MinimalOp::Async( - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - - let mut task_tracker_id: Option<usize> = None; - let nread = match resource_holder.resource.poll_read(cx, &mut zero_copy) - { - Poll::Ready(t) => { - if let Some(id) = task_tracker_id { - resource_holder.untrack_task(id); - } - t - } - Poll::Pending => { - task_tracker_id.replace(resource_holder.track_task(cx)?); - return Poll::Pending; - } - }?; - Poll::Ready(Ok(nread as i32)) - }) - .boxed_local(), - ) - } -} - -/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait -/// but uses an `AnyError` error instead of `std::io:Error` -pub trait DenoAsyncWrite { - fn poll_write( - &mut self, - cx: &mut Context, - buf: &[u8], - ) -> Poll<Result<usize, AnyError>>; - - fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; - - fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>>; -} - -impl DenoAsyncWrite for StreamResource { - fn poll_write( - &mut self, - cx: &mut Context, - buf: &[u8], - ) -> Poll<Result<usize, AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncWrite = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Pending, - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdin(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - - let v = ready!(Pin::new(f).poll_write(cx, buf))?; - Ok(v).into() - } - - fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), AnyError>> { - use StreamResource::*; - let f: &mut dyn UnpinAsyncWrite = match self { - FsFile(Some((f, _))) => f, - FsFile(None) => return Poll::Pending, - TcpStream(Some(f)) => f, - #[cfg(not(windows))] - UnixStream(f) => f, - ClientTlsStream(f) => f, - ServerTlsStream(f) => f, - ChildStdin(f) => f, - _ => return Err(bad_resource_id()).into(), - }; - - ready!(Pin::new(f).poll_flush(cx))?; - Ok(()).into() - } - - fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), AnyError>> { - unimplemented!() - } -} - -pub fn op_write( - state: Rc<RefCell<OpState>>, - is_sync: bool, - rid: i32, - zero_copy: BufVec, -) -> MinimalOp { - debug!("write rid={}", rid); - match zero_copy.len() { - 0 => return MinimalOp::Sync(Err(no_buffer_specified())), - 1 => {} - _ => panic!("Invalid number of arguments"), - } - - if is_sync { - MinimalOp::Sync({ - // First we look up the rid in the resource table. - std_file_resource(&mut state.borrow_mut(), rid as u32, move |r| match r { - Ok(std_file) => { - use std::io::Write; - std_file - .write(&zero_copy[0]) - .map(|nwritten: usize| nwritten as i32) - .map_err(AnyError::from) - } - Err(_) => Err(type_error("sync read not allowed on this resource")), - }) - }) - } else { - let zero_copy = zero_copy[0].clone(); - MinimalOp::Async( - async move { - let nwritten = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - resource_holder.resource.poll_write(cx, &zero_copy) - }) - .await?; - - // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 - // and the reasons for the need to explicitly flush are not fully known. - // Figure out why it's needed and preferably remove it. - // https://github.com/denoland/deno/issues/3565 - poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid as u32) - .ok_or_else(bad_resource_id)?; - resource_holder.resource.poll_flush(cx) - }) - .await?; - - Ok(nwritten as i32) - } - .boxed_local(), - ) - } -} - -/// Helper function for operating on a std::fs::File stored in the resource table. -/// -/// We store file system file resources as tokio::fs::File, so this is a little -/// utility function that gets a std::fs:File when you need to do blocking -/// operations. -/// -/// Returns ErrorKind::Busy if the resource is being used by another op. -pub fn std_file_resource<F, T>( - state: &mut OpState, - rid: u32, - mut f: F, -) -> Result<T, AnyError> -where - F: FnMut( - Result<&mut std::fs::File, &mut StreamResource>, - ) -> Result<T, AnyError>, -{ - // First we look up the rid in the resource table. - let mut r = state.resource_table.get_mut::<StreamResourceHolder>(rid); - if let Some(ref mut resource_holder) = r { - // Sync write only works for FsFile. It doesn't make sense to do this - // for non-blocking sockets. So we error out if not FsFile. - match &mut resource_holder.resource { - StreamResource::FsFile(option_file_metadata) => { - // The object in the resource table is a tokio::fs::File - but in - // order to do a blocking write on it, we must turn it into a - // std::fs::File. Hopefully this code compiles down to nothing. - if let Some((tokio_file, metadata)) = option_file_metadata.take() { - match tokio_file.try_into_std() { - Ok(mut std_file) => { - let result = f(Ok(&mut std_file)); - // Turn the std_file handle back into a tokio file, put it back - // in the resource table. - let tokio_file = tokio::fs::File::from_std(std_file); - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - // return the result. - result - } - Err(tokio_file) => { - // This function will return an error containing the file if - // some operation is in-flight. - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - Err(resource_unavailable()) - } - } - } else { - Err(resource_unavailable()) - } - } - _ => f(Err(&mut resource_holder.resource)), - } - } else { - Err(bad_resource_id()) - } -} diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 56c0f1ad5..24eca3e77 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -1,32 +1,8 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -mod dispatch_minimal; -pub use dispatch_minimal::MinimalOp; - -pub mod crypto; pub mod errors; -pub mod fetch; -pub mod fs; -pub mod fs_events; -pub mod io; -pub mod net; -#[cfg(unix)] -mod net_unix; -pub mod os; -pub mod permissions; -pub mod plugin; -pub mod process; -pub mod runtime; pub mod runtime_compiler; -pub mod signal; -pub mod timers; -pub mod tls; -pub mod tty; -pub mod web_worker; -pub mod websocket; -pub mod worker_host; -use crate::metrics::metrics_op; use deno_core::error::AnyError; use deno_core::json_op_async; use deno_core::json_op_sync; @@ -35,6 +11,7 @@ use deno_core::BufVec; use deno_core::JsRuntime; use deno_core::OpState; use deno_core::ZeroCopyBuf; +use deno_runtime::metrics::metrics_op; use std::cell::RefCell; use std::future::Future; use std::rc::Rc; @@ -54,34 +31,3 @@ where { rt.register_op(name, metrics_op(json_op_sync(op_fn))); } - -pub struct UnstableChecker { - pub unstable: bool, -} - -impl UnstableChecker { - /// Quits the process if the --unstable flag was not provided. - /// - /// This is intentionally a non-recoverable check so that people cannot probe - /// for unstable APIs from stable programs. - // NOTE(bartlomieju): keep in sync with `cli/program_state.rs` - pub fn check_unstable(&self, api_name: &str) { - if !self.unstable { - eprintln!( - "Unstable API '{}'. The --unstable flag must be provided.", - api_name - ); - std::process::exit(70); - } - } -} -/// Helper for checking unstable features. Used for sync ops. -pub fn check_unstable(state: &OpState, api_name: &str) { - state.borrow::<UnstableChecker>().check_unstable(api_name) -} - -/// Helper for checking unstable features. Used for async ops. -pub fn check_unstable2(state: &Rc<RefCell<OpState>>, api_name: &str) { - let state = state.borrow(); - state.borrow::<UnstableChecker>().check_unstable(api_name) -} diff --git a/cli/ops/net.rs b/cli/ops/net.rs deleted file mode 100644 index 98ff83fc0..000000000 --- a/cli/ops/net.rs +++ /dev/null @@ -1,566 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; -use crate::permissions::Permissions; -use crate::resolve_addr::resolve_addr; -use deno_core::error::bad_resource; -use deno_core::error::bad_resource_id; -use deno_core::error::custom_error; -use deno_core::error::generic_error; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::cell::RefCell; -use std::net::Shutdown; -use std::net::SocketAddr; -use std::rc::Rc; -use std::task::Context; -use std::task::Poll; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio::net::UdpSocket; - -#[cfg(unix)] -use super::net_unix; -#[cfg(unix)] -use std::path::Path; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_async(rt, "op_accept", op_accept); - super::reg_json_async(rt, "op_connect", op_connect); - super::reg_json_sync(rt, "op_shutdown", op_shutdown); - super::reg_json_sync(rt, "op_listen", op_listen); - super::reg_json_async(rt, "op_datagram_receive", op_datagram_receive); - super::reg_json_async(rt, "op_datagram_send", op_datagram_send); -} - -#[derive(Deserialize)] -pub(crate) struct AcceptArgs { - pub rid: i32, - pub transport: String, -} - -async fn accept_tcp( - state: Rc<RefCell<OpState>>, - args: AcceptArgs, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let rid = args.rid as u32; - - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TcpListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(AnyError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) - } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": "tcp", - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "tcp", - } - })) -} - -async fn op_accept( - state: Rc<RefCell<OpState>>, - args: Value, - bufs: BufVec, -) -> Result<Value, AnyError> { - let args: AcceptArgs = serde_json::from_value(args)?; - match args.transport.as_str() { - "tcp" => accept_tcp(state, args, bufs).await, - #[cfg(unix)] - "unix" => net_unix::accept_unix(state, args, bufs).await, - _ => Err(generic_error(format!( - "Unsupported transport protocol {}", - args.transport - ))), - } -} - -#[derive(Deserialize)] -pub(crate) struct ReceiveArgs { - pub rid: i32, - pub transport: String, -} - -async fn receive_udp( - state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - zero_copy: BufVec, -) -> Result<Value, AnyError> { - assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); - let mut zero_copy = zero_copy[0].clone(); - - let rid = args.rid as u32; - - let receive_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let socket = &mut resource.socket; - socket - .poll_recv_from(cx, &mut zero_copy) - .map_err(AnyError::from) - }); - let (size, remote_addr) = receive_fut.await?; - Ok(json!({ - "size": size, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "udp", - } - })) -} - -async fn op_datagram_receive( - state: Rc<RefCell<OpState>>, - args: Value, - zero_copy: BufVec, -) -> Result<Value, AnyError> { - assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); - - let args: ReceiveArgs = serde_json::from_value(args)?; - match args.transport.as_str() { - "udp" => receive_udp(state, args, zero_copy).await, - #[cfg(unix)] - "unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await, - _ => Err(generic_error(format!( - "Unsupported transport protocol {}", - args.transport - ))), - } -} - -#[derive(Deserialize)] -struct SendArgs { - rid: i32, - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -async fn op_datagram_send( - state: Rc<RefCell<OpState>>, - args: Value, - zero_copy: BufVec, -) -> Result<Value, AnyError> { - assert_eq!(zero_copy.len(), 1, "Invalid number of arguments"); - let zero_copy = zero_copy[0].clone(); - - match serde_json::from_value(args)? { - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "udp" => { - { - let s = state.borrow(); - s.borrow::<Permissions>() - .check_net(&args.hostname, args.port)?; - } - let addr = resolve_addr(&args.hostname, args.port)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UdpSocketResource>(rid as u32) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - resource - .socket - .poll_send_to(cx, &zero_copy, &addr) - .map_ok(|byte_length| json!(byte_length)) - .map_err(AnyError::from) - }) - .await - } - #[cfg(unix)] - SendArgs { - rid, - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - let s = state.borrow(); - s.borrow::<Permissions>().check_write(&address_path)?; - } - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<net_unix::UnixDatagramResource>(rid as u32) - .ok_or_else(|| { - custom_error("NotConnected", "Socket has been closed") - })?; - let socket = &mut resource.socket; - let byte_length = socket - .send_to(&zero_copy, &resource.local_addr.as_pathname().unwrap()) - .await?; - - Ok(json!(byte_length)) - } - _ => Err(type_error("Wrong argument format!")), - } -} - -#[derive(Deserialize)] -struct ConnectArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -async fn op_connect( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - match serde_json::from_value(args)? { - ConnectArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } if transport == "tcp" => { - { - let state_ = state.borrow(); - state_ - .borrow::<Permissions>() - .check_net(&args.hostname, args.port)?; - } - let addr = resolve_addr(&args.hostname, args.port)?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let rid = state_.resource_table.add( - "tcpStream", - Box::new(StreamResourceHolder::new(StreamResource::TcpStream(Some( - tcp_stream, - )))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": transport, - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": transport, - } - })) - } - #[cfg(unix)] - ConnectArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" => { - let address_path = Path::new(&args.path); - super::check_unstable2(&state, "Deno.connect"); - { - let state_ = state.borrow(); - state_.borrow::<Permissions>().check_read(&address_path)?; - state_.borrow::<Permissions>().check_write(&address_path)?; - } - let path = args.path; - let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?; - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - - let mut state_ = state.borrow_mut(); - let rid = state_.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "path": local_addr.as_pathname(), - "transport": transport, - }, - "remoteAddr": { - "path": remote_addr.as_pathname(), - "transport": transport, - } - })) - } - _ => Err(type_error("Wrong argument format!")), - } -} - -#[derive(Deserialize)] -struct ShutdownArgs { - rid: i32, - how: i32, -} - -fn op_shutdown( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.shutdown"); - - let args: ShutdownArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; - let how = args.how; - - let shutdown_mode = match how { - 0 => Shutdown::Read, - 1 => Shutdown::Write, - _ => unimplemented!(), - }; - - let resource_holder = state - .resource_table - .get_mut::<StreamResourceHolder>(rid) - .ok_or_else(bad_resource_id)?; - match resource_holder.resource { - StreamResource::TcpStream(Some(ref mut stream)) => { - TcpStream::shutdown(stream, shutdown_mode)?; - } - #[cfg(unix)] - StreamResource::UnixStream(ref mut stream) => { - net_unix::UnixStream::shutdown(stream, shutdown_mode)?; - } - _ => return Err(bad_resource_id()), - } - - Ok(json!({})) -} - -#[allow(dead_code)] -struct TcpListenerResource { - listener: TcpListener, - waker: Option<futures::task::AtomicWaker>, - local_addr: SocketAddr, -} - -impl Drop for TcpListenerResource { - fn drop(&mut self) { - self.wake_task(); - } -} - -impl TcpListenerResource { - /// Track the current task so future awaiting for connection - /// can be notified when listener is closed. - /// - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.waker.is_some() { - return Err(custom_error("Busy", "Another accept task is ongoing")); - } - - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - self.waker.replace(waker); - Ok(()) - } - - /// Notifies a task when listener is closed so accept future can resolve. - pub fn wake_task(&mut self) { - if let Some(waker) = self.waker.as_ref() { - waker.wake(); - } - } - - /// Stop tracking a task. - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - if self.waker.is_some() { - self.waker.take(); - } - } -} - -struct UdpSocketResource { - socket: UdpSocket, -} - -#[derive(Deserialize)] -struct IpListenArgs { - hostname: String, - port: u16, -} - -#[derive(Deserialize)] -#[serde(untagged)] -enum ArgsEnum { - Ip(IpListenArgs), - #[cfg(unix)] - Unix(net_unix::UnixListenArgs), -} - -#[derive(Deserialize)] -struct ListenArgs { - transport: String, - #[serde(flatten)] - transport_args: ArgsEnum, -} - -fn listen_tcp( - state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { - let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let local_addr = listener.local_addr()?; - let listener_resource = TcpListenerResource { - listener, - waker: None, - local_addr, - }; - let rid = state - .resource_table - .add("tcpListener", Box::new(listener_resource)); - - Ok((rid, local_addr)) -} - -fn listen_udp( - state: &mut OpState, - addr: SocketAddr, -) -> Result<(u32, SocketAddr), AnyError> { - let std_socket = std::net::UdpSocket::bind(&addr)?; - let socket = UdpSocket::from_std(std_socket)?; - let local_addr = socket.local_addr()?; - let socket_resource = UdpSocketResource { socket }; - let rid = state - .resource_table - .add("udpSocket", Box::new(socket_resource)); - - Ok((rid, local_addr)) -} - -fn op_listen( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let permissions = state.borrow::<Permissions>(); - match serde_json::from_value(args)? { - ListenArgs { - transport, - transport_args: ArgsEnum::Ip(args), - } => { - { - if transport == "udp" { - super::check_unstable(state, "Deno.listenDatagram"); - } - permissions.check_net(&args.hostname, args.port)?; - } - let addr = resolve_addr(&args.hostname, args.port)?; - let (rid, local_addr) = if transport == "tcp" { - listen_tcp(state, addr)? - } else { - listen_udp(state, addr)? - }; - debug!( - "New listener {} {}:{}", - rid, - local_addr.ip().to_string(), - local_addr.port() - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": transport, - }, - })) - } - #[cfg(unix)] - ListenArgs { - transport, - transport_args: ArgsEnum::Unix(args), - } if transport == "unix" || transport == "unixpacket" => { - let address_path = Path::new(&args.path); - { - if transport == "unix" { - super::check_unstable(state, "Deno.listen"); - } - if transport == "unixpacket" { - super::check_unstable(state, "Deno.listenDatagram"); - } - permissions.check_read(&address_path)?; - permissions.check_write(&address_path)?; - } - let (rid, local_addr) = if transport == "unix" { - net_unix::listen_unix(state, &address_path)? - } else { - net_unix::listen_unix_packet(state, &address_path)? - }; - debug!( - "New listener {} {}", - rid, - local_addr.as_pathname().unwrap().display(), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "path": local_addr.as_pathname(), - "transport": transport, - }, - })) - } - #[cfg(unix)] - _ => Err(type_error("Wrong argument format!")), - } -} diff --git a/cli/ops/net_unix.rs b/cli/ops/net_unix.rs deleted file mode 100644 index 4c416a5a4..000000000 --- a/cli/ops/net_unix.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::ops::io::StreamResource; -use crate::ops::io::StreamResourceHolder; -use crate::ops::net::AcceptArgs; -use crate::ops::net::ReceiveArgs; -use deno_core::error::bad_resource; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use serde::Deserialize; -use std::cell::RefCell; -use std::fs::remove_file; -use std::os::unix; -use std::path::Path; -use std::rc::Rc; -use std::task::Poll; -use tokio::net::UnixDatagram; -use tokio::net::UnixListener; -pub use tokio::net::UnixStream; - -struct UnixListenerResource { - listener: UnixListener, -} - -pub struct UnixDatagramResource { - pub socket: UnixDatagram, - pub local_addr: unix::net::SocketAddr, -} - -#[derive(Deserialize)] -pub struct UnixListenArgs { - pub path: String, -} - -pub(crate) async fn accept_unix( - state: Rc<RefCell<OpState>>, - args: AcceptArgs, - _bufs: BufVec, -) -> Result<Value, AnyError> { - let rid = args.rid as u32; - - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<UnixListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - use deno_core::futures::StreamExt; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(stream)) => { - //listener_resource.untrack_task(); - Poll::Ready(stream) - } - Poll::Ready(None) => todo!(), - Poll::Pending => { - //listener_resource.track_task(cx)?; - Poll::Pending - } - } - .map_err(AnyError::from) - }); - let unix_stream = accept_fut.await?; - - let local_addr = unix_stream.local_addr()?; - let remote_addr = unix_stream.peer_addr()?; - let mut state = state.borrow_mut(); - let rid = state.resource_table.add( - "unixStream", - Box::new(StreamResourceHolder::new(StreamResource::UnixStream( - unix_stream, - ))), - ); - Ok(json!({ - "rid": rid, - "localAddr": { - "path": local_addr.as_pathname(), - "transport": "unix", - }, - "remoteAddr": { - "path": remote_addr.as_pathname(), - "transport": "unix", - } - })) -} - -pub(crate) async fn receive_unix_packet( - state: Rc<RefCell<OpState>>, - args: ReceiveArgs, - bufs: BufVec, -) -> Result<Value, AnyError> { - assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - - let rid = args.rid as u32; - let mut buf = bufs.into_iter().next().unwrap(); - - let mut state = state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<UnixDatagramResource>(rid) - .ok_or_else(|| bad_resource("Socket has been closed"))?; - let (size, remote_addr) = resource.socket.recv_from(&mut buf).await?; - Ok(json!({ - "size": size, - "remoteAddr": { - "path": remote_addr.as_pathname(), - "transport": "unixpacket", - } - })) -} - -pub fn listen_unix( - state: &mut OpState, - addr: &Path, -) -> Result<(u32, unix::net::SocketAddr), AnyError> { - if addr.exists() { - remove_file(&addr).unwrap(); - } - let listener = UnixListener::bind(&addr)?; - let local_addr = listener.local_addr()?; - let listener_resource = UnixListenerResource { listener }; - let rid = state - .resource_table - .add("unixListener", Box::new(listener_resource)); - - Ok((rid, local_addr)) -} - -pub fn listen_unix_packet( - state: &mut OpState, - addr: &Path, -) -> Result<(u32, unix::net::SocketAddr), AnyError> { - if addr.exists() { - remove_file(&addr).unwrap(); - } - let socket = UnixDatagram::bind(&addr)?; - let local_addr = socket.local_addr()?; - let datagram_resource = UnixDatagramResource { - socket, - local_addr: local_addr.clone(), - }; - let rid = state - .resource_table - .add("unixDatagram", Box::new(datagram_resource)); - - Ok((rid, local_addr)) -} diff --git a/cli/ops/os.rs b/cli/ops/os.rs deleted file mode 100644 index 6fd404a23..000000000 --- a/cli/ops/os.rs +++ /dev/null @@ -1,192 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::permissions::Permissions; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::url::Url; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::collections::HashMap; -use std::env; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_exit", op_exit); - super::reg_json_sync(rt, "op_env", op_env); - super::reg_json_sync(rt, "op_exec_path", op_exec_path); - super::reg_json_sync(rt, "op_set_env", op_set_env); - super::reg_json_sync(rt, "op_get_env", op_get_env); - super::reg_json_sync(rt, "op_delete_env", op_delete_env); - super::reg_json_sync(rt, "op_hostname", op_hostname); - super::reg_json_sync(rt, "op_loadavg", op_loadavg); - super::reg_json_sync(rt, "op_os_release", op_os_release); - super::reg_json_sync(rt, "op_system_memory_info", op_system_memory_info); - super::reg_json_sync(rt, "op_system_cpu_info", op_system_cpu_info); -} - -fn op_exec_path( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let current_exe = env::current_exe().unwrap(); - state - .borrow::<Permissions>() - .check_read_blind(¤t_exe, "exec_path")?; - // Now apply URL parser to current exe to get fully resolved path, otherwise - // we might get `./` and `../` bits in `exec_path` - let exe_url = Url::from_file_path(current_exe).unwrap(); - let path = exe_url.to_file_path().unwrap(); - Ok(json!(path)) -} - -#[derive(Deserialize)] -struct SetEnv { - key: String, - value: String, -} - -fn op_set_env( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: SetEnv = serde_json::from_value(args)?; - state.borrow::<Permissions>().check_env()?; - env::set_var(args.key, args.value); - Ok(json!({})) -} - -fn op_env( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - state.borrow::<Permissions>().check_env()?; - let v = env::vars().collect::<HashMap<String, String>>(); - Ok(json!(v)) -} - -#[derive(Deserialize)] -struct GetEnv { - key: String, -} - -fn op_get_env( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: GetEnv = serde_json::from_value(args)?; - state.borrow::<Permissions>().check_env()?; - let r = match env::var(args.key) { - Err(env::VarError::NotPresent) => json!([]), - v => json!([v?]), - }; - Ok(r) -} - -#[derive(Deserialize)] -struct DeleteEnv { - key: String, -} - -fn op_delete_env( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: DeleteEnv = serde_json::from_value(args)?; - state.borrow::<Permissions>().check_env()?; - env::remove_var(args.key); - Ok(json!({})) -} - -#[derive(Deserialize)] -struct Exit { - code: i32, -} - -fn op_exit( - _state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: Exit = serde_json::from_value(args)?; - std::process::exit(args.code) -} - -fn op_loadavg( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.loadavg"); - state.borrow::<Permissions>().check_env()?; - match sys_info::loadavg() { - Ok(loadavg) => Ok(json!([loadavg.one, loadavg.five, loadavg.fifteen])), - Err(_) => Ok(json!([0f64, 0f64, 0f64])), - } -} - -fn op_hostname( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.hostname"); - state.borrow::<Permissions>().check_env()?; - let hostname = sys_info::hostname().unwrap_or_else(|_| "".to_string()); - Ok(json!(hostname)) -} - -fn op_os_release( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.osRelease"); - state.borrow::<Permissions>().check_env()?; - let release = sys_info::os_release().unwrap_or_else(|_| "".to_string()); - Ok(json!(release)) -} - -fn op_system_memory_info( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.systemMemoryInfo"); - state.borrow::<Permissions>().check_env()?; - match sys_info::mem_info() { - Ok(info) => Ok(json!({ - "total": info.total, - "free": info.free, - "available": info.avail, - "buffers": info.buffers, - "cached": info.cached, - "swapTotal": info.swap_total, - "swapFree": info.swap_free - })), - Err(_) => Ok(json!({})), - } -} - -fn op_system_cpu_info( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.systemCpuInfo"); - state.borrow::<Permissions>().check_env()?; - - let cores = sys_info::cpu_num().ok(); - let speed = sys_info::cpu_speed().ok(); - - Ok(json!({ - "cores": cores, - "speed": speed - })) -} diff --git a/cli/ops/permissions.rs b/cli/ops/permissions.rs deleted file mode 100644 index 7474c0e37..000000000 --- a/cli/ops/permissions.rs +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::permissions::Permissions; -use deno_core::error::custom_error; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::path::Path; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_query_permission", op_query_permission); - super::reg_json_sync(rt, "op_revoke_permission", op_revoke_permission); - super::reg_json_sync(rt, "op_request_permission", op_request_permission); -} - -#[derive(Deserialize)] -struct PermissionArgs { - name: String, - url: Option<String>, - path: Option<String>, -} - -pub fn op_query_permission( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: PermissionArgs = serde_json::from_value(args)?; - let permissions = state.borrow::<Permissions>(); - let path = args.path.as_deref(); - let perm = match args.name.as_ref() { - "read" => permissions.query_read(&path.as_deref().map(Path::new)), - "write" => permissions.query_write(&path.as_deref().map(Path::new)), - "net" => permissions.query_net_url(&args.url.as_deref())?, - "env" => permissions.query_env(), - "run" => permissions.query_run(), - "plugin" => permissions.query_plugin(), - "hrtime" => permissions.query_hrtime(), - n => { - return Err(custom_error( - "ReferenceError", - format!("No such permission name: {}", n), - )) - } - }; - Ok(json!({ "state": perm.to_string() })) -} - -pub fn op_revoke_permission( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: PermissionArgs = serde_json::from_value(args)?; - let permissions = state.borrow_mut::<Permissions>(); - let path = args.path.as_deref(); - let perm = match args.name.as_ref() { - "read" => permissions.revoke_read(&path.as_deref().map(Path::new)), - "write" => permissions.revoke_write(&path.as_deref().map(Path::new)), - "net" => permissions.revoke_net(&args.url.as_deref())?, - "env" => permissions.revoke_env(), - "run" => permissions.revoke_run(), - "plugin" => permissions.revoke_plugin(), - "hrtime" => permissions.revoke_hrtime(), - n => { - return Err(custom_error( - "ReferenceError", - format!("No such permission name: {}", n), - )) - } - }; - Ok(json!({ "state": perm.to_string() })) -} - -pub fn op_request_permission( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: PermissionArgs = serde_json::from_value(args)?; - let permissions = state.borrow_mut::<Permissions>(); - let path = args.path.as_deref(); - let perm = match args.name.as_ref() { - "read" => permissions.request_read(&path.as_deref().map(Path::new)), - "write" => permissions.request_write(&path.as_deref().map(Path::new)), - "net" => permissions.request_net(&args.url.as_deref())?, - "env" => permissions.request_env(), - "run" => permissions.request_run(), - "plugin" => permissions.request_plugin(), - "hrtime" => permissions.request_hrtime(), - n => { - return Err(custom_error( - "ReferenceError", - format!("No such permission name: {}", n), - )) - } - }; - Ok(json!({ "state": perm.to_string() })) -} diff --git a/cli/ops/plugin.rs b/cli/ops/plugin.rs deleted file mode 100644 index 1f3669b6f..000000000 --- a/cli/ops/plugin.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::metrics::metrics_op; -use crate::permissions::Permissions; -use deno_core::error::AnyError; -use deno_core::futures::prelude::*; -use deno_core::plugin_api; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::JsRuntime; -use deno_core::Op; -use deno_core::OpAsyncFuture; -use deno_core::OpId; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use dlopen::symbor::Library; -use serde::Deserialize; -use std::cell::RefCell; -use std::path::PathBuf; -use std::pin::Pin; -use std::rc::Rc; -use std::task::Context; -use std::task::Poll; - -pub fn init(rt: &mut JsRuntime) { - super::reg_json_sync(rt, "op_open_plugin", op_open_plugin); -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct OpenPluginArgs { - filename: String, -} - -pub fn op_open_plugin( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: OpenPluginArgs = serde_json::from_value(args)?; - let filename = PathBuf::from(&args.filename); - - super::check_unstable(state, "Deno.openPlugin"); - let permissions = state.borrow::<Permissions>(); - permissions.check_plugin(&filename)?; - - debug!("Loading Plugin: {:#?}", filename); - let plugin_lib = Library::open(filename).map(Rc::new)?; - let plugin_resource = PluginResource::new(&plugin_lib); - - let rid; - let deno_plugin_init; - { - rid = state - .resource_table - .add("plugin", Box::new(plugin_resource)); - deno_plugin_init = *unsafe { - state - .resource_table - .get::<PluginResource>(rid) - .unwrap() - .lib - .symbol::<plugin_api::InitFn>("deno_plugin_init") - .unwrap() - }; - } - - let mut interface = PluginInterface::new(state, &plugin_lib); - deno_plugin_init(&mut interface); - - Ok(json!(rid)) -} - -struct PluginResource { - lib: Rc<Library>, -} - -impl PluginResource { - fn new(lib: &Rc<Library>) -> Self { - Self { lib: lib.clone() } - } -} - -struct PluginInterface<'a> { - state: &'a mut OpState, - plugin_lib: &'a Rc<Library>, -} - -impl<'a> PluginInterface<'a> { - fn new(state: &'a mut OpState, plugin_lib: &'a Rc<Library>) -> Self { - Self { state, plugin_lib } - } -} - -impl<'a> plugin_api::Interface for PluginInterface<'a> { - /// Does the same as `core::Isolate::register_op()`, but additionally makes - /// the registered op dispatcher, as well as the op futures created by it, - /// keep reference to the plugin `Library` object, so that the plugin doesn't - /// get unloaded before all its op registrations and the futures created by - /// them are dropped. - fn register_op( - &mut self, - name: &str, - dispatch_op_fn: plugin_api::DispatchOpFn, - ) -> OpId { - let plugin_lib = self.plugin_lib.clone(); - let plugin_op_fn = move |state_rc: Rc<RefCell<OpState>>, - mut zero_copy: BufVec| { - let mut state = state_rc.borrow_mut(); - let mut interface = PluginInterface::new(&mut state, &plugin_lib); - let op = dispatch_op_fn(&mut interface, &mut zero_copy); - match op { - sync_op @ Op::Sync(..) => sync_op, - Op::Async(fut) => Op::Async(PluginOpAsyncFuture::new(&plugin_lib, fut)), - Op::AsyncUnref(fut) => { - Op::AsyncUnref(PluginOpAsyncFuture::new(&plugin_lib, fut)) - } - _ => unreachable!(), - } - }; - self - .state - .op_table - .register_op(name, metrics_op(Box::new(plugin_op_fn))) - } -} - -struct PluginOpAsyncFuture { - fut: Option<OpAsyncFuture>, - _plugin_lib: Rc<Library>, -} - -impl PluginOpAsyncFuture { - fn new(plugin_lib: &Rc<Library>, fut: OpAsyncFuture) -> Pin<Box<Self>> { - let wrapped_fut = Self { - fut: Some(fut), - _plugin_lib: plugin_lib.clone(), - }; - Box::pin(wrapped_fut) - } -} - -impl Future for PluginOpAsyncFuture { - type Output = <OpAsyncFuture as Future>::Output; - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { - self.fut.as_mut().unwrap().poll_unpin(ctx) - } -} - -impl Drop for PluginOpAsyncFuture { - fn drop(&mut self) { - self.fut.take(); - } -} diff --git a/cli/ops/process.rs b/cli/ops/process.rs deleted file mode 100644 index 60a6d5095..000000000 --- a/cli/ops/process.rs +++ /dev/null @@ -1,236 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use super::io::{std_file_resource, StreamResource, StreamResourceHolder}; -use crate::permissions::Permissions; -use crate::signal::kill; -use deno_core::error::bad_resource_id; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::future::FutureExt; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::cell::RefCell; -use std::rc::Rc; -use tokio::process::Command; - -#[cfg(unix)] -use std::os::unix::process::ExitStatusExt; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_run", op_run); - super::reg_json_async(rt, "op_run_status", op_run_status); - super::reg_json_sync(rt, "op_kill", op_kill); -} - -fn clone_file( - state: &mut OpState, - rid: u32, -) -> Result<std::fs::File, AnyError> { - std_file_resource(state, rid, move |r| match r { - Ok(std_file) => std_file.try_clone().map_err(AnyError::from), - Err(_) => Err(bad_resource_id()), - }) -} - -fn subprocess_stdio_map(s: &str) -> Result<std::process::Stdio, AnyError> { - match s { - "inherit" => Ok(std::process::Stdio::inherit()), - "piped" => Ok(std::process::Stdio::piped()), - "null" => Ok(std::process::Stdio::null()), - _ => Err(type_error("Invalid resource for stdio")), - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct RunArgs { - cmd: Vec<String>, - cwd: Option<String>, - env: Vec<(String, String)>, - stdin: String, - stdout: String, - stderr: String, - stdin_rid: u32, - stdout_rid: u32, - stderr_rid: u32, -} - -struct ChildResource { - child: tokio::process::Child, -} - -fn op_run( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let run_args: RunArgs = serde_json::from_value(args)?; - state.borrow::<Permissions>().check_run()?; - - let args = run_args.cmd; - let env = run_args.env; - let cwd = run_args.cwd; - - let mut c = Command::new(args.get(0).unwrap()); - (1..args.len()).for_each(|i| { - let arg = args.get(i).unwrap(); - c.arg(arg); - }); - cwd.map(|d| c.current_dir(d)); - for (key, value) in &env { - c.env(key, value); - } - - // TODO: make this work with other resources, eg. sockets - if !run_args.stdin.is_empty() { - c.stdin(subprocess_stdio_map(run_args.stdin.as_ref())?); - } else { - let file = clone_file(state, run_args.stdin_rid)?; - c.stdin(file); - } - - if !run_args.stdout.is_empty() { - c.stdout(subprocess_stdio_map(run_args.stdout.as_ref())?); - } else { - let file = clone_file(state, run_args.stdout_rid)?; - c.stdout(file); - } - - if !run_args.stderr.is_empty() { - c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())?); - } else { - let file = clone_file(state, run_args.stderr_rid)?; - c.stderr(file); - } - - // We want to kill child when it's closed - c.kill_on_drop(true); - - // Spawn the command. - let mut child = c.spawn()?; - let pid = child.id(); - - let stdin_rid = match child.stdin.take() { - Some(child_stdin) => { - let rid = state.resource_table.add( - "childStdin", - Box::new(StreamResourceHolder::new(StreamResource::ChildStdin( - child_stdin, - ))), - ); - Some(rid) - } - None => None, - }; - - let stdout_rid = match child.stdout.take() { - Some(child_stdout) => { - let rid = state.resource_table.add( - "childStdout", - Box::new(StreamResourceHolder::new(StreamResource::ChildStdout( - child_stdout, - ))), - ); - Some(rid) - } - None => None, - }; - - let stderr_rid = match child.stderr.take() { - Some(child_stderr) => { - let rid = state.resource_table.add( - "childStderr", - Box::new(StreamResourceHolder::new(StreamResource::ChildStderr( - child_stderr, - ))), - ); - Some(rid) - } - None => None, - }; - - let child_resource = ChildResource { child }; - let child_rid = state.resource_table.add("child", Box::new(child_resource)); - - Ok(json!({ - "rid": child_rid, - "pid": pid, - "stdinRid": stdin_rid, - "stdoutRid": stdout_rid, - "stderrRid": stderr_rid, - })) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct RunStatusArgs { - rid: i32, -} - -async fn op_run_status( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: RunStatusArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - - { - let s = state.borrow(); - s.borrow::<Permissions>().check_run()?; - } - - let run_status = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let child_resource = state - .resource_table - .get_mut::<ChildResource>(rid) - .ok_or_else(bad_resource_id)?; - let child = &mut child_resource.child; - child.poll_unpin(cx).map_err(AnyError::from) - }) - .await?; - - let code = run_status.code(); - - #[cfg(unix)] - let signal = run_status.signal(); - #[cfg(not(unix))] - let signal = None; - - code - .or(signal) - .expect("Should have either an exit code or a signal."); - let got_signal = signal.is_some(); - - Ok(json!({ - "gotSignal": got_signal, - "exitCode": code.unwrap_or(-1), - "exitSignal": signal.unwrap_or(-1), - })) -} - -#[derive(Deserialize)] -struct KillArgs { - pid: i32, - signo: i32, -} - -fn op_kill( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.kill"); - state.borrow::<Permissions>().check_run()?; - - let args: KillArgs = serde_json::from_value(args)?; - kill(args.pid, args.signo)?; - Ok(json!({})) -} diff --git a/cli/ops/runtime.rs b/cli/ops/runtime.rs deleted file mode 100644 index cb3b53d53..000000000 --- a/cli/ops/runtime.rs +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::metrics::Metrics; -use crate::permissions::Permissions; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::ModuleSpecifier; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; - -pub fn init(rt: &mut deno_core::JsRuntime, main_module: ModuleSpecifier) { - { - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - state.put::<ModuleSpecifier>(main_module); - } - super::reg_json_sync(rt, "op_main_module", op_main_module); - super::reg_json_sync(rt, "op_metrics", op_metrics); -} - -fn op_main_module( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let main = state.borrow::<ModuleSpecifier>().to_string(); - let main_url = ModuleSpecifier::resolve_url_or_path(&main)?; - if main_url.as_url().scheme() == "file" { - let main_path = std::env::current_dir().unwrap().join(main_url.to_string()); - state - .borrow::<Permissions>() - .check_read_blind(&main_path, "main_module")?; - } - Ok(json!(&main)) -} - -fn op_metrics( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let m = state.borrow::<Metrics>(); - - Ok(json!({ - "opsDispatched": m.ops_dispatched, - "opsDispatchedSync": m.ops_dispatched_sync, - "opsDispatchedAsync": m.ops_dispatched_async, - "opsDispatchedAsyncUnref": m.ops_dispatched_async_unref, - "opsCompleted": m.ops_completed, - "opsCompletedSync": m.ops_completed_sync, - "opsCompletedAsync": m.ops_completed_async, - "opsCompletedAsyncUnref": m.ops_completed_async_unref, - "bytesSentControl": m.bytes_sent_control, - "bytesSentData": m.bytes_sent_data, - "bytesReceived": m.bytes_received - })) -} - -pub fn ppid() -> Value { - #[cfg(windows)] - { - // Adopted from rustup: - // https://github.com/rust-lang/rustup/blob/1.21.1/src/cli/self_update.rs#L1036 - // Copyright Diggory Blake, the Mozilla Corporation, and rustup contributors. - // Licensed under either of - // - Apache License, Version 2.0 - // - MIT license - use std::mem; - use winapi::shared::minwindef::DWORD; - use winapi::um::handleapi::{CloseHandle, INVALID_HANDLE_VALUE}; - use winapi::um::processthreadsapi::GetCurrentProcessId; - use winapi::um::tlhelp32::{ - CreateToolhelp32Snapshot, Process32First, Process32Next, PROCESSENTRY32, - TH32CS_SNAPPROCESS, - }; - unsafe { - // Take a snapshot of system processes, one of which is ours - // and contains our parent's pid - let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); - if snapshot == INVALID_HANDLE_VALUE { - return serde_json::to_value(-1).unwrap(); - } - - let mut entry: PROCESSENTRY32 = mem::zeroed(); - entry.dwSize = mem::size_of::<PROCESSENTRY32>() as DWORD; - - // Iterate over system processes looking for ours - let success = Process32First(snapshot, &mut entry); - if success == 0 { - CloseHandle(snapshot); - return serde_json::to_value(-1).unwrap(); - } - - let this_pid = GetCurrentProcessId(); - while entry.th32ProcessID != this_pid { - let success = Process32Next(snapshot, &mut entry); - if success == 0 { - CloseHandle(snapshot); - return serde_json::to_value(-1).unwrap(); - } - } - CloseHandle(snapshot); - - // FIXME: Using the process ID exposes a race condition - // wherein the parent process already exited and the OS - // reassigned its ID. - let parent_id = entry.th32ParentProcessID; - serde_json::to_value(parent_id).unwrap() - } - } - #[cfg(not(windows))] - { - use std::os::unix::process::parent_id; - serde_json::to_value(parent_id()).unwrap() - } -} diff --git a/cli/ops/runtime_compiler.rs b/cli/ops/runtime_compiler.rs index 03ba88c76..ec9806e60 100644 --- a/cli/ops/runtime_compiler.rs +++ b/cli/ops/runtime_compiler.rs @@ -6,12 +6,12 @@ use crate::media_type::MediaType; use crate::module_graph::BundleType; use crate::module_graph::EmitOptions; use crate::module_graph::GraphBuilder; -use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::specifier_handler::FetchHandler; use crate::specifier_handler::MemoryHandler; use crate::specifier_handler::SpecifierHandler; use crate::tsc_config; +use deno_runtime::permissions::Permissions; use std::sync::Arc; use deno_core::error::AnyError; @@ -49,9 +49,9 @@ async fn op_compile( ) -> Result<Value, AnyError> { let args: CompileArgs = serde_json::from_value(args)?; if args.bundle { - super::check_unstable2(&state, "Deno.bundle"); + deno_runtime::ops::check_unstable2(&state, "Deno.bundle"); } else { - super::check_unstable2(&state, "Deno.compile"); + deno_runtime::ops::check_unstable2(&state, "Deno.compile"); } let program_state = state.borrow().borrow::<Arc<ProgramState>>().clone(); let runtime_permissions = { @@ -113,7 +113,7 @@ async fn op_transpile( args: Value, _data: BufVec, ) -> Result<Value, AnyError> { - super::check_unstable2(&state, "Deno.transpileOnly"); + deno_runtime::ops::check_unstable2(&state, "Deno.transpileOnly"); let args: TranspileArgs = serde_json::from_value(args)?; let mut compiler_options = tsc_config::TsConfig::new(json!({ diff --git a/cli/ops/signal.rs b/cli/ops/signal.rs deleted file mode 100644 index be6bc0a35..000000000 --- a/cli/ops/signal.rs +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use deno_core::error::AnyError; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use std::cell::RefCell; -use std::rc::Rc; - -#[cfg(unix)] -use deno_core::error::bad_resource_id; -#[cfg(unix)] -use deno_core::futures::future::poll_fn; -#[cfg(unix)] -use deno_core::serde_json; -#[cfg(unix)] -use deno_core::serde_json::json; -#[cfg(unix)] -use serde::Deserialize; -#[cfg(unix)] -use std::task::Waker; -#[cfg(unix)] -use tokio::signal::unix::{signal, Signal, SignalKind}; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_signal_bind", op_signal_bind); - super::reg_json_sync(rt, "op_signal_unbind", op_signal_unbind); - super::reg_json_async(rt, "op_signal_poll", op_signal_poll); -} - -#[cfg(unix)] -/// The resource for signal stream. -/// The second element is the waker of polling future. -pub struct SignalStreamResource(pub Signal, pub Option<Waker>); - -#[cfg(unix)] -#[derive(Deserialize)] -struct BindSignalArgs { - signo: i32, -} - -#[cfg(unix)] -#[derive(Deserialize)] -struct SignalArgs { - rid: i32, -} - -#[cfg(unix)] -fn op_signal_bind( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.signal"); - let args: BindSignalArgs = serde_json::from_value(args)?; - let rid = state.resource_table.add( - "signal", - Box::new(SignalStreamResource( - signal(SignalKind::from_raw(args.signo)).expect(""), - None, - )), - ); - Ok(json!({ - "rid": rid, - })) -} - -#[cfg(unix)] -async fn op_signal_poll( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - super::check_unstable2(&state, "Deno.signal"); - let args: SignalArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - - let future = poll_fn(move |cx| { - let mut state = state.borrow_mut(); - if let Some(mut signal) = - state.resource_table.get_mut::<SignalStreamResource>(rid) - { - signal.1 = Some(cx.waker().clone()); - return signal.0.poll_recv(cx); - } - std::task::Poll::Ready(None) - }); - let result = future.await; - Ok(json!({ "done": result.is_none() })) -} - -#[cfg(unix)] -pub fn op_signal_unbind( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.signal"); - let args: SignalArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let resource = state.resource_table.get_mut::<SignalStreamResource>(rid); - if let Some(signal) = resource { - if let Some(waker) = &signal.1 { - // Wakes up the pending poll if exists. - // This prevents the poll future from getting stuck forever. - waker.clone().wake(); - } - } - state - .resource_table - .close(rid) - .ok_or_else(bad_resource_id)?; - Ok(json!({})) -} - -#[cfg(not(unix))] -pub fn op_signal_bind( - _state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - unimplemented!(); -} - -#[cfg(not(unix))] -fn op_signal_unbind( - _state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - unimplemented!(); -} - -#[cfg(not(unix))] -async fn op_signal_poll( - _state: Rc<RefCell<OpState>>, - _args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - unimplemented!(); -} diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs deleted file mode 100644 index 8037fd698..000000000 --- a/cli/ops/timers.rs +++ /dev/null @@ -1,193 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -//! This module helps deno implement timers. -//! -//! As an optimization, we want to avoid an expensive calls into rust for every -//! setTimeout in JavaScript. Thus in //js/timers.ts a data structure is -//! implemented that calls into Rust for only the smallest timeout. Thus we -//! only need to be able to start, cancel and await a single timer (or Delay, as Tokio -//! calls it) for an entire Isolate. This is what is implemented here. - -use super::dispatch_minimal::minimal_op; -use super::dispatch_minimal::MinimalOp; -use crate::metrics::metrics_op; -use crate::permissions::Permissions; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::channel::oneshot; -use deno_core::futures::FutureExt; -use deno_core::futures::TryFutureExt; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::cell::RefCell; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::thread::sleep; -use std::time::Duration; -use std::time::Instant; - -pub type StartTime = Instant; - -type TimerFuture = Pin<Box<dyn Future<Output = Result<(), ()>>>>; - -#[derive(Default)] -pub struct GlobalTimer { - tx: Option<oneshot::Sender<()>>, - pub future: Option<TimerFuture>, -} - -impl GlobalTimer { - pub fn cancel(&mut self) { - if let Some(tx) = self.tx.take() { - tx.send(()).ok(); - } - } - - pub fn new_timeout(&mut self, deadline: Instant) { - if self.tx.is_some() { - self.cancel(); - } - assert!(self.tx.is_none()); - self.future.take(); - - let (tx, rx) = oneshot::channel(); - self.tx = Some(tx); - - let delay = tokio::time::delay_until(deadline.into()); - let rx = rx - .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err)); - - let fut = futures::future::select(delay, rx) - .then(|_| futures::future::ok(())) - .boxed_local(); - self.future = Some(fut); - } -} - -pub fn init(rt: &mut deno_core::JsRuntime) { - { - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - state.put::<GlobalTimer>(GlobalTimer::default()); - state.put::<StartTime>(StartTime::now()); - } - super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop); - super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start); - super::reg_json_async(rt, "op_global_timer", op_global_timer); - rt.register_op("op_now", metrics_op(minimal_op(op_now))); - super::reg_json_sync(rt, "op_sleep_sync", op_sleep_sync); -} - -fn op_global_timer_stop( - state: &mut OpState, - _args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let global_timer = state.borrow_mut::<GlobalTimer>(); - global_timer.cancel(); - Ok(json!({})) -} - -#[derive(Deserialize)] -struct GlobalTimerArgs { - timeout: u64, -} - -// Set up a timer that will be later awaited by JS promise. -// It's a separate op, because canceling a timeout immediately -// after setting it caused a race condition (because Tokio timeout) -// might have been registered after next event loop tick. -// -// See https://github.com/denoland/deno/issues/7599 for more -// details. -fn op_global_timer_start( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: GlobalTimerArgs = serde_json::from_value(args)?; - let val = args.timeout; - - let deadline = Instant::now() + Duration::from_millis(val); - let global_timer = state.borrow_mut::<GlobalTimer>(); - global_timer.new_timeout(deadline); - Ok(json!({})) -} - -async fn op_global_timer( - state: Rc<RefCell<OpState>>, - _args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let maybe_timer_fut = { - let mut s = state.borrow_mut(); - let global_timer = s.borrow_mut::<GlobalTimer>(); - global_timer.future.take() - }; - if let Some(timer_fut) = maybe_timer_fut { - let _ = timer_fut.await; - } - Ok(json!({})) -} - -// Returns a milliseconds and nanoseconds subsec -// since the start time of the deno runtime. -// If the High precision flag is not set, the -// nanoseconds are rounded on 2ms. -fn op_now( - state: Rc<RefCell<OpState>>, - // Arguments are discarded - _sync: bool, - _x: i32, - mut zero_copy: BufVec, -) -> MinimalOp { - match zero_copy.len() { - 0 => return MinimalOp::Sync(Err(type_error("no buffer specified"))), - 1 => {} - _ => { - return MinimalOp::Sync(Err(type_error("Invalid number of arguments"))) - } - } - - let op_state = state.borrow(); - let start_time = op_state.borrow::<StartTime>(); - let seconds = start_time.elapsed().as_secs(); - let mut subsec_nanos = start_time.elapsed().subsec_nanos() as f64; - let reduced_time_precision = 2_000_000.0; // 2ms in nanoseconds - - // If the permission is not enabled - // Round the nano result on 2 milliseconds - // see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision - if op_state.borrow::<Permissions>().check_hrtime().is_err() { - subsec_nanos -= subsec_nanos % reduced_time_precision; - } - - let result = (seconds * 1_000) as f64 + (subsec_nanos / 1_000_000.0); - - (&mut zero_copy[0]).copy_from_slice(&result.to_be_bytes()); - - MinimalOp::Sync(Ok(0)) -} - -#[derive(Deserialize)] -struct SleepArgs { - millis: u64, -} - -fn op_sleep_sync( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.sleepSync"); - let args: SleepArgs = serde_json::from_value(args)?; - sleep(Duration::from_millis(args.millis)); - Ok(json!({})) -} diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs deleted file mode 100644 index 37fd8f206..000000000 --- a/cli/ops/tls.rs +++ /dev/null @@ -1,431 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use super::io::{StreamResource, StreamResourceHolder}; -use crate::permissions::Permissions; -use crate::resolve_addr::resolve_addr; -use deno_core::error::bad_resource; -use deno_core::error::bad_resource_id; -use deno_core::error::custom_error; -use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::future::poll_fn; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::cell::RefCell; -use std::convert::From; -use std::fs::File; -use std::io::BufReader; -use std::net::SocketAddr; -use std::path::Path; -use std::rc::Rc; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; -use tokio::net::TcpListener; -use tokio::net::TcpStream; -use tokio_rustls::{rustls::ClientConfig, TlsConnector}; -use tokio_rustls::{ - rustls::{ - internal::pemfile::{certs, pkcs8_private_keys, rsa_private_keys}, - Certificate, NoClientAuth, PrivateKey, ServerConfig, - }, - TlsAcceptor, -}; -use webpki::DNSNameRef; - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_async(rt, "op_start_tls", op_start_tls); - super::reg_json_async(rt, "op_connect_tls", op_connect_tls); - super::reg_json_sync(rt, "op_listen_tls", op_listen_tls); - super::reg_json_async(rt, "op_accept_tls", op_accept_tls); -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ConnectTLSArgs { - transport: String, - hostname: String, - port: u16, - cert_file: Option<String>, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct StartTLSArgs { - rid: u32, - cert_file: Option<String>, - hostname: String, -} - -async fn op_start_tls( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: StartTLSArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let cert_file = args.cert_file.clone(); - - let mut domain = args.hostname; - if domain.is_empty() { - domain.push_str("localhost"); - } - { - super::check_unstable2(&state, "Deno.startTls"); - let s = state.borrow(); - let permissions = s.borrow::<Permissions>(); - permissions.check_net(&domain, 0)?; - if let Some(path) = cert_file.clone() { - permissions.check_read(Path::new(&path))?; - } - } - let mut resource_holder = { - let mut state_ = state.borrow_mut(); - match state_.resource_table.remove::<StreamResourceHolder>(rid) { - Some(resource) => *resource, - None => return Err(bad_resource_id()), - } - }; - - if let StreamResource::TcpStream(ref mut tcp_stream) = - resource_holder.resource - { - let tcp_stream = tcp_stream.take().unwrap(); - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } - - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; - - let rid = { - let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "clientTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( - Box::new(tls_stream), - ))), - ) - }; - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": "tcp", - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": "tcp", - } - })) - } else { - Err(bad_resource_id()) - } -} - -async fn op_connect_tls( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: ConnectTLSArgs = serde_json::from_value(args)?; - let cert_file = args.cert_file.clone(); - { - let s = state.borrow(); - let permissions = s.borrow::<Permissions>(); - permissions.check_net(&args.hostname, args.port)?; - if let Some(path) = cert_file.clone() { - permissions.check_read(Path::new(&path))?; - } - } - let mut domain = args.hostname.clone(); - if domain.is_empty() { - domain.push_str("localhost"); - } - - let addr = resolve_addr(&args.hostname, args.port)?; - let tcp_stream = TcpStream::connect(&addr).await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - if let Some(path) = cert_file { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; - let rid = { - let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "clientTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ClientTlsStream( - Box::new(tls_stream), - ))), - ) - }; - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - "remoteAddr": { - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port(), - "transport": args.transport, - } - })) -} - -fn load_certs(path: &str) -> Result<Vec<Certificate>, AnyError> { - let cert_file = File::open(path)?; - let reader = &mut BufReader::new(cert_file); - - let certs = certs(reader) - .map_err(|_| custom_error("InvalidData", "Unable to decode certificate"))?; - - if certs.is_empty() { - let e = custom_error("InvalidData", "No certificates found in cert file"); - return Err(e); - } - - Ok(certs) -} - -fn key_decode_err() -> AnyError { - custom_error("InvalidData", "Unable to decode key") -} - -fn key_not_found_err() -> AnyError { - custom_error("InvalidData", "No keys found in key file") -} - -/// Starts with -----BEGIN RSA PRIVATE KEY----- -fn load_rsa_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - let keys = rsa_private_keys(reader).map_err(|_| key_decode_err())?; - Ok(keys) -} - -/// Starts with -----BEGIN PRIVATE KEY----- -fn load_pkcs8_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { - let key_file = File::open(path)?; - let reader = &mut BufReader::new(key_file); - let keys = pkcs8_private_keys(reader).map_err(|_| key_decode_err())?; - Ok(keys) -} - -fn load_keys(path: &str) -> Result<Vec<PrivateKey>, AnyError> { - let path = path.to_string(); - let mut keys = load_rsa_keys(&path)?; - - if keys.is_empty() { - keys = load_pkcs8_keys(&path)?; - } - - if keys.is_empty() { - return Err(key_not_found_err()); - } - - Ok(keys) -} - -#[allow(dead_code)] -pub struct TlsListenerResource { - listener: TcpListener, - tls_acceptor: TlsAcceptor, - waker: Option<futures::task::AtomicWaker>, - local_addr: SocketAddr, -} - -impl Drop for TlsListenerResource { - fn drop(&mut self) { - self.wake_task(); - } -} - -impl TlsListenerResource { - /// Track the current task so future awaiting for connection - /// can be notified when listener is closed. - /// - /// Throws an error if another task is already tracked. - pub fn track_task(&mut self, cx: &Context) -> Result<(), AnyError> { - // Currently, we only allow tracking a single accept task for a listener. - // This might be changed in the future with multiple workers. - // Caveat: TcpListener by itself also only tracks an accept task at a time. - // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.waker.is_some() { - return Err(custom_error("Busy", "Another accept task is ongoing")); - } - - let waker = futures::task::AtomicWaker::new(); - waker.register(cx.waker()); - self.waker.replace(waker); - Ok(()) - } - - /// Notifies a task when listener is closed so accept future can resolve. - pub fn wake_task(&mut self) { - if let Some(waker) = self.waker.as_ref() { - waker.wake(); - } - } - - /// Stop tracking a task. - /// Happens when the task is done and thus no further tracking is needed. - pub fn untrack_task(&mut self) { - self.waker.take(); - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct ListenTlsArgs { - transport: String, - hostname: String, - port: u16, - cert_file: String, - key_file: String, -} - -fn op_listen_tls( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: ListenTlsArgs = serde_json::from_value(args)?; - assert_eq!(args.transport, "tcp"); - - let cert_file = args.cert_file; - let key_file = args.key_file; - { - let permissions = state.borrow::<Permissions>(); - permissions.check_net(&args.hostname, args.port)?; - permissions.check_read(Path::new(&cert_file))?; - permissions.check_read(Path::new(&key_file))?; - } - let mut config = ServerConfig::new(NoClientAuth::new()); - config - .set_single_cert(load_certs(&cert_file)?, load_keys(&key_file)?.remove(0)) - .expect("invalid key or certificate"); - let tls_acceptor = TlsAcceptor::from(Arc::new(config)); - let addr = resolve_addr(&args.hostname, args.port)?; - let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let local_addr = listener.local_addr()?; - let tls_listener_resource = TlsListenerResource { - listener, - tls_acceptor, - waker: None, - local_addr, - }; - - let rid = state - .resource_table - .add("tlsListener", Box::new(tls_listener_resource)); - - Ok(json!({ - "rid": rid, - "localAddr": { - "hostname": local_addr.ip().to_string(), - "port": local_addr.port(), - "transport": args.transport, - }, - })) -} - -#[derive(Deserialize)] -struct AcceptTlsArgs { - rid: i32, -} - -async fn op_accept_tls( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: AcceptTlsArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let accept_fut = poll_fn(|cx| { - let mut state = state.borrow_mut(); - let listener_resource = state - .resource_table - .get_mut::<TlsListenerResource>(rid) - .ok_or_else(|| bad_resource("Listener has been closed"))?; - let listener = &mut listener_resource.listener; - match listener.poll_accept(cx).map_err(AnyError::from) { - Poll::Ready(Ok((stream, addr))) => { - listener_resource.untrack_task(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => { - listener_resource.track_task(cx)?; - Poll::Pending - } - Poll::Ready(Err(e)) => { - listener_resource.untrack_task(); - Poll::Ready(Err(e)) - } - } - }); - let (tcp_stream, _socket_addr) = accept_fut.await?; - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; - let tls_acceptor = { - let state_ = state.borrow(); - let resource = state_ - .resource_table - .get::<TlsListenerResource>(rid) - .ok_or_else(bad_resource_id) - .expect("Can't find tls listener"); - resource.tls_acceptor.clone() - }; - let tls_stream = tls_acceptor.accept(tcp_stream).await?; - let rid = { - let mut state_ = state.borrow_mut(); - state_.resource_table.add( - "serverTlsStream", - Box::new(StreamResourceHolder::new(StreamResource::ServerTlsStream( - Box::new(tls_stream), - ))), - ) - }; - Ok(json!({ - "rid": rid, - "localAddr": { - "transport": "tcp", - "hostname": local_addr.ip().to_string(), - "port": local_addr.port() - }, - "remoteAddr": { - "transport": "tcp", - "hostname": remote_addr.ip().to_string(), - "port": remote_addr.port() - } - })) -} diff --git a/cli/ops/tty.rs b/cli/ops/tty.rs deleted file mode 100644 index be1d7d3e4..000000000 --- a/cli/ops/tty.rs +++ /dev/null @@ -1,334 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use super::io::std_file_resource; -use super::io::StreamResource; -use super::io::StreamResourceHolder; -use deno_core::error::bad_resource_id; -use deno_core::error::last_os_error; -use deno_core::error::not_supported; -use deno_core::error::resource_unavailable; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use serde::Serialize; - -#[cfg(unix)] -use nix::sys::termios; - -#[cfg(windows)] -use deno_core::error::custom_error; -#[cfg(windows)] -use winapi::shared::minwindef::DWORD; -#[cfg(windows)] -use winapi::um::wincon; -#[cfg(windows)] -const RAW_MODE_MASK: DWORD = wincon::ENABLE_LINE_INPUT - | wincon::ENABLE_ECHO_INPUT - | wincon::ENABLE_PROCESSED_INPUT; - -#[cfg(windows)] -fn get_windows_handle( - f: &std::fs::File, -) -> Result<std::os::windows::io::RawHandle, AnyError> { - use std::os::windows::io::AsRawHandle; - use winapi::um::handleapi; - - let handle = f.as_raw_handle(); - if handle == handleapi::INVALID_HANDLE_VALUE { - return Err(last_os_error()); - } else if handle.is_null() { - return Err(custom_error("ReferenceError", "null handle")); - } - Ok(handle) -} - -pub fn init(rt: &mut deno_core::JsRuntime) { - super::reg_json_sync(rt, "op_set_raw", op_set_raw); - super::reg_json_sync(rt, "op_isatty", op_isatty); - super::reg_json_sync(rt, "op_console_size", op_console_size); -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct SetRawOptions { - cbreak: bool, -} - -#[derive(Deserialize)] -struct SetRawArgs { - rid: u32, - mode: bool, - options: SetRawOptions, -} - -fn op_set_raw( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.setRaw"); - - let args: SetRawArgs = serde_json::from_value(args)?; - let rid = args.rid; - let is_raw = args.mode; - let cbreak = args.options.cbreak; - - // From https://github.com/kkawakam/rustyline/blob/master/src/tty/windows.rs - // and https://github.com/kkawakam/rustyline/blob/master/src/tty/unix.rs - // and https://github.com/crossterm-rs/crossterm/blob/e35d4d2c1cc4c919e36d242e014af75f6127ab50/src/terminal/sys/windows.rs - // Copyright (c) 2015 Katsu Kawakami & Rustyline authors. MIT license. - // Copyright (c) 2019 Timon. MIT license. - #[cfg(windows)] - { - use std::os::windows::io::AsRawHandle; - use winapi::shared::minwindef::FALSE; - use winapi::um::{consoleapi, handleapi}; - - let resource_holder = - state.resource_table.get_mut::<StreamResourceHolder>(rid); - if resource_holder.is_none() { - return Err(bad_resource_id()); - } - if cbreak { - return Err(not_supported()); - } - let resource_holder = resource_holder.unwrap(); - - // For now, only stdin. - let handle = match &mut resource_holder.resource { - StreamResource::FsFile(ref mut option_file_metadata) => { - if let Some((tokio_file, metadata)) = option_file_metadata.take() { - match tokio_file.try_into_std() { - Ok(std_file) => { - let raw_handle = std_file.as_raw_handle(); - // Turn the std_file handle back into a tokio file, put it back - // in the resource table. - let tokio_file = tokio::fs::File::from_std(std_file); - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - // return the result. - raw_handle - } - Err(tokio_file) => { - // This function will return an error containing the file if - // some operation is in-flight. - resource_holder.resource = - StreamResource::FsFile(Some((tokio_file, metadata))); - return Err(resource_unavailable()); - } - } - } else { - return Err(resource_unavailable()); - } - } - _ => { - return Err(bad_resource_id()); - } - }; - - if handle == handleapi::INVALID_HANDLE_VALUE { - return Err(last_os_error()); - } else if handle.is_null() { - return Err(custom_error("ReferenceError", "null handle")); - } - let mut original_mode: DWORD = 0; - if unsafe { consoleapi::GetConsoleMode(handle, &mut original_mode) } - == FALSE - { - return Err(last_os_error()); - } - let new_mode = if is_raw { - original_mode & !RAW_MODE_MASK - } else { - original_mode | RAW_MODE_MASK - }; - if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE { - return Err(last_os_error()); - } - - Ok(json!({})) - } - #[cfg(unix)] - { - use std::os::unix::io::AsRawFd; - - let resource_holder = - state.resource_table.get_mut::<StreamResourceHolder>(rid); - if resource_holder.is_none() { - return Err(bad_resource_id()); - } - - if is_raw { - let (raw_fd, maybe_tty_mode) = - match &mut resource_holder.unwrap().resource { - StreamResource::FsFile(Some((f, ref mut metadata))) => { - (f.as_raw_fd(), &mut metadata.tty.mode) - } - StreamResource::FsFile(None) => return Err(resource_unavailable()), - _ => { - return Err(not_supported()); - } - }; - - if maybe_tty_mode.is_none() { - // Save original mode. - let original_mode = termios::tcgetattr(raw_fd)?; - maybe_tty_mode.replace(original_mode); - } - - let mut raw = maybe_tty_mode.clone().unwrap(); - - raw.input_flags &= !(termios::InputFlags::BRKINT - | termios::InputFlags::ICRNL - | termios::InputFlags::INPCK - | termios::InputFlags::ISTRIP - | termios::InputFlags::IXON); - - raw.control_flags |= termios::ControlFlags::CS8; - - raw.local_flags &= !(termios::LocalFlags::ECHO - | termios::LocalFlags::ICANON - | termios::LocalFlags::IEXTEN); - if !cbreak { - raw.local_flags &= !(termios::LocalFlags::ISIG); - } - raw.control_chars[termios::SpecialCharacterIndices::VMIN as usize] = 1; - raw.control_chars[termios::SpecialCharacterIndices::VTIME as usize] = 0; - termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &raw)?; - Ok(json!({})) - } else { - // Try restore saved mode. - let (raw_fd, maybe_tty_mode) = - match &mut resource_holder.unwrap().resource { - StreamResource::FsFile(Some((f, ref mut metadata))) => { - (f.as_raw_fd(), &mut metadata.tty.mode) - } - StreamResource::FsFile(None) => { - return Err(resource_unavailable()); - } - _ => { - return Err(bad_resource_id()); - } - }; - - if let Some(mode) = maybe_tty_mode.take() { - termios::tcsetattr(raw_fd, termios::SetArg::TCSADRAIN, &mode)?; - } - - Ok(json!({})) - } - } -} - -#[derive(Deserialize)] -struct IsattyArgs { - rid: u32, -} - -fn op_isatty( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: IsattyArgs = serde_json::from_value(args)?; - let rid = args.rid; - - let isatty: bool = std_file_resource(state, rid as u32, move |r| match r { - Ok(std_file) => { - #[cfg(windows)] - { - use winapi::um::consoleapi; - - let handle = get_windows_handle(&std_file)?; - let mut test_mode: DWORD = 0; - // If I cannot get mode out of console, it is not a console. - Ok(unsafe { consoleapi::GetConsoleMode(handle, &mut test_mode) != 0 }) - } - #[cfg(unix)] - { - use std::os::unix::io::AsRawFd; - let raw_fd = std_file.as_raw_fd(); - Ok(unsafe { libc::isatty(raw_fd as libc::c_int) == 1 }) - } - } - Err(StreamResource::FsFile(_)) => unreachable!(), - _ => Ok(false), - })?; - Ok(json!(isatty)) -} - -#[derive(Deserialize)] -struct ConsoleSizeArgs { - rid: u32, -} - -#[derive(Serialize)] -struct ConsoleSize { - columns: u32, - rows: u32, -} - -fn op_console_size( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - super::check_unstable(state, "Deno.consoleSize"); - - let args: ConsoleSizeArgs = serde_json::from_value(args)?; - let rid = args.rid; - - let size = std_file_resource(state, rid as u32, move |r| match r { - Ok(std_file) => { - #[cfg(windows)] - { - use std::os::windows::io::AsRawHandle; - let handle = std_file.as_raw_handle(); - - unsafe { - let mut bufinfo: winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO = - std::mem::zeroed(); - - if winapi::um::wincon::GetConsoleScreenBufferInfo( - handle, - &mut bufinfo, - ) == 0 - { - return Err(last_os_error()); - } - - Ok(ConsoleSize { - columns: bufinfo.dwSize.X as u32, - rows: bufinfo.dwSize.Y as u32, - }) - } - } - - #[cfg(unix)] - { - use std::os::unix::io::AsRawFd; - - let fd = std_file.as_raw_fd(); - unsafe { - let mut size: libc::winsize = std::mem::zeroed(); - if libc::ioctl(fd, libc::TIOCGWINSZ, &mut size as *mut _) != 0 { - return Err(last_os_error()); - } - - // TODO (caspervonb) return a tuple instead - Ok(ConsoleSize { - columns: size.ws_col as u32, - rows: size.ws_row as u32, - }) - } - } - } - Err(_) => Err(bad_resource_id()), - })?; - - Ok(json!(size)) -} diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs deleted file mode 100644 index d88330a04..000000000 --- a/cli/ops/web_worker.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::web_worker::WebWorkerHandle; -use crate::web_worker::WorkerEvent; -use deno_core::futures::channel::mpsc; -use deno_core::serde_json::json; - -pub fn init( - rt: &mut deno_core::JsRuntime, - sender: mpsc::Sender<WorkerEvent>, - handle: WebWorkerHandle, -) { - // Post message to host as guest worker. - let sender_ = sender.clone(); - super::reg_json_sync( - rt, - "op_worker_post_message", - move |_state, _args, bufs| { - assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let msg_buf: Box<[u8]> = (*bufs[0]).into(); - sender_ - .clone() - .try_send(WorkerEvent::Message(msg_buf)) - .expect("Failed to post message to host"); - Ok(json!({})) - }, - ); - - // Notify host that guest worker closes. - super::reg_json_sync(rt, "op_worker_close", move |_state, _args, _bufs| { - // Notify parent that we're finished - sender.clone().close_channel(); - // Terminate execution of current worker - handle.terminate(); - Ok(json!({})) - }); -} diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs deleted file mode 100644 index a8c591a33..000000000 --- a/cli/ops/websocket.rs +++ /dev/null @@ -1,326 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::permissions::Permissions; -use core::task::Poll; -use deno_core::error::bad_resource_id; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; -use deno_core::futures::StreamExt; -use deno_core::futures::{ready, SinkExt}; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::url; -use deno_core::BufVec; -use deno_core::OpState; -use deno_core::{serde_json, ZeroCopyBuf}; -use http::{Method, Request, Uri}; -use serde::Deserialize; -use std::borrow::Cow; -use std::cell::RefCell; -use std::fs::File; -use std::io::BufReader; -use std::rc::Rc; -use std::sync::Arc; -use tokio::net::TcpStream; -use tokio_rustls::{rustls::ClientConfig, TlsConnector}; -use tokio_tungstenite::stream::Stream as StreamSwitcher; -use tokio_tungstenite::tungstenite::Error as TungsteniteError; -use tokio_tungstenite::tungstenite::{ - handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, Message, -}; -use tokio_tungstenite::{client_async, WebSocketStream}; -use webpki::DNSNameRef; - -#[derive(Clone)] -struct WsCaFile(String); -#[derive(Clone)] -struct WsUserAgent(String); - -pub fn init( - rt: &mut deno_core::JsRuntime, - maybe_ca_file: Option<&str>, - user_agent: String, -) { - { - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - if let Some(ca_file) = maybe_ca_file { - state.put::<WsCaFile>(WsCaFile(ca_file.to_string())); - } - state.put::<WsUserAgent>(WsUserAgent(user_agent)); - } - super::reg_json_sync(rt, "op_ws_check_permission", op_ws_check_permission); - super::reg_json_async(rt, "op_ws_create", op_ws_create); - super::reg_json_async(rt, "op_ws_send", op_ws_send); - super::reg_json_async(rt, "op_ws_close", op_ws_close); - super::reg_json_async(rt, "op_ws_next_event", op_ws_next_event); -} - -type MaybeTlsStream = - StreamSwitcher<TcpStream, tokio_rustls::client::TlsStream<TcpStream>>; - -type WsStream = WebSocketStream<MaybeTlsStream>; - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CheckPermissionArgs { - url: String, -} - -// This op is needed because creating a WS instance in JavaScript is a sync -// operation and should throw error when permissions are not fullfiled, -// but actual op that connects WS is async. -pub fn op_ws_check_permission( - state: &mut OpState, - args: Value, - _zero_copy: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: CheckPermissionArgs = serde_json::from_value(args)?; - - state - .borrow::<Permissions>() - .check_net_url(&url::Url::parse(&args.url)?)?; - - Ok(json!({})) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateArgs { - url: String, - protocols: String, -} - -pub async fn op_ws_create( - state: Rc<RefCell<OpState>>, - args: Value, - _bufs: BufVec, -) -> Result<Value, AnyError> { - let args: CreateArgs = serde_json::from_value(args)?; - - { - let s = state.borrow(); - s.borrow::<Permissions>() - .check_net_url(&url::Url::parse(&args.url)?) - .expect( - "Permission check should have been done in op_ws_check_permission", - ); - } - - let maybe_ca_file = state.borrow().try_borrow::<WsCaFile>().cloned(); - let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); - let uri: Uri = args.url.parse()?; - let mut request = Request::builder().method(Method::GET).uri(&uri); - - request = request.header("User-Agent", user_agent); - - if !args.protocols.is_empty() { - request = request.header("Sec-WebSocket-Protocol", args.protocols); - } - - let request = request.body(())?; - let domain = &uri.host().unwrap().to_string(); - let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { - Some("wss") => 443, - Some("ws") => 80, - _ => unreachable!(), - }); - let addr = format!("{}:{}", domain, port); - let try_socket = TcpStream::connect(addr).await; - let tcp_socket = match try_socket.map_err(TungsteniteError::Io) { - Ok(socket) => socket, - Err(_) => return Ok(json!({"success": false})), - }; - - let socket: MaybeTlsStream = match uri.scheme_str() { - Some("ws") => StreamSwitcher::Plain(tcp_socket), - Some("wss") => { - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - - if let Some(ws_ca_file) = maybe_ca_file { - let key_file = File::open(ws_ca_file.0)?; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } - - let tls_connector = TlsConnector::from(Arc::new(config)); - let dnsname = - DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); - let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; - StreamSwitcher::Tls(tls_socket) - } - _ => unreachable!(), - }; - - let (stream, response): (WsStream, Response) = - client_async(request, socket).await.map_err(|err| { - type_error(format!( - "failed to connect to WebSocket: {}", - err.to_string() - )) - })?; - - let mut state = state.borrow_mut(); - let rid = state - .resource_table - .add("webSocketStream", Box::new(stream)); - - let protocol = match response.headers().get("Sec-WebSocket-Protocol") { - Some(header) => header.to_str().unwrap(), - None => "", - }; - let extensions = response - .headers() - .get_all("Sec-WebSocket-Extensions") - .iter() - .map(|header| header.to_str().unwrap()) - .collect::<String>(); - Ok(json!({ - "success": true, - "rid": rid, - "protocol": protocol, - "extensions": extensions - })) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct SendArgs { - rid: u32, - text: Option<String>, -} - -pub async fn op_ws_send( - state: Rc<RefCell<OpState>>, - args: Value, - bufs: BufVec, -) -> Result<Value, AnyError> { - let args: SendArgs = serde_json::from_value(args)?; - - let mut maybe_msg = Some(match args.text { - Some(text) => Message::Text(text), - None => Message::Binary(bufs[0].to_vec()), - }); - let rid = args.rid; - - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - - Poll::Ready(Ok(json!({}))) - }) - .await -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CloseArgs { - rid: u32, - code: Option<u16>, - reason: Option<String>, -} - -pub async fn op_ws_close( - state: Rc<RefCell<OpState>>, - args: Value, - _bufs: BufVec, -) -> Result<Value, AnyError> { - let args: CloseArgs = serde_json::from_value(args)?; - let rid = args.rid; - let mut maybe_msg = Some(Message::Close(args.code.map(|c| CloseFrame { - code: CloseCode::from(c), - reason: match args.reason { - Some(reason) => Cow::from(reason), - None => Default::default(), - }, - }))); - - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(rid) - .ok_or_else(bad_resource_id)?; - - // TODO(ry) Handle errors below instead of unwrap. - // Need to map `TungsteniteError` to `AnyError`. - ready!(stream.poll_ready_unpin(cx)).unwrap(); - if let Some(msg) = maybe_msg.take() { - stream.start_send_unpin(msg).unwrap(); - } - ready!(stream.poll_flush_unpin(cx)).unwrap(); - ready!(stream.poll_close_unpin(cx)).unwrap(); - - Poll::Ready(Ok(json!({}))) - }) - .await -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct NextEventArgs { - rid: u32, -} - -pub async fn op_ws_next_event( - state: Rc<RefCell<OpState>>, - args: Value, - _bufs: BufVec, -) -> Result<Value, AnyError> { - let args: NextEventArgs = serde_json::from_value(args)?; - poll_fn(move |cx| { - let mut state = state.borrow_mut(); - let stream = state - .resource_table - .get_mut::<WsStream>(args.rid) - .ok_or_else(bad_resource_id)?; - stream - .poll_next_unpin(cx) - .map(|val| { - match val { - Some(Ok(Message::Text(text))) => json!({ - "type": "string", - "data": text - }), - Some(Ok(Message::Binary(data))) => { - // TODO(ry): don't use json to send binary data. - json!({ - "type": "binary", - "data": data - }) - } - Some(Ok(Message::Close(Some(frame)))) => json!({ - "type": "close", - "code": u16::from(frame.code), - "reason": frame.reason.as_ref() - }), - Some(Ok(Message::Close(None))) => json!({ "type": "close" }), - Some(Ok(Message::Ping(_))) => json!({"type": "ping"}), - Some(Ok(Message::Pong(_))) => json!({"type": "pong"}), - Some(Err(_)) => json!({"type": "error"}), - None => { - state.resource_table.close(args.rid).unwrap(); - json!({"type": "closed"}) - } - } - }) - .map(Ok) - }) - .await -} diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs deleted file mode 100644 index 871e4b9fe..000000000 --- a/cli/ops/worker_host.rs +++ /dev/null @@ -1,318 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::permissions::Permissions; -use crate::web_worker::run_web_worker; -use crate::web_worker::WebWorker; -use crate::web_worker::WebWorkerHandle; -use crate::web_worker::WorkerEvent; -use deno_core::error::generic_error; -use deno_core::error::AnyError; -use deno_core::error::JsError; -use deno_core::futures::channel::mpsc; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::BufVec; -use deno_core::ModuleSpecifier; -use deno_core::OpState; -use deno_core::ZeroCopyBuf; -use serde::Deserialize; -use std::cell::RefCell; -use std::collections::HashMap; -use std::convert::From; -use std::rc::Rc; -use std::sync::Arc; -use std::thread::JoinHandle; - -pub struct CreateWebWorkerArgs { - pub name: String, - pub worker_id: u32, - pub permissions: Permissions, - pub main_module: ModuleSpecifier, - pub use_deno_namespace: bool, -} - -pub type CreateWebWorkerCb = - dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send; - -/// A holder for callback that is used to create a new -/// WebWorker. It's a struct instead of a type alias -/// because `GothamState` used in `OpState` overrides -/// value if type alises have the same underlying type -#[derive(Clone)] -pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>); - -#[derive(Deserialize)] -struct HostUnhandledErrorArgs { - message: String, -} - -pub fn init( - rt: &mut deno_core::JsRuntime, - sender: Option<mpsc::Sender<WorkerEvent>>, - create_web_worker_cb: Arc<CreateWebWorkerCb>, -) { - { - let op_state = rt.op_state(); - let mut state = op_state.borrow_mut(); - state.put::<WorkersTable>(WorkersTable::default()); - state.put::<WorkerId>(WorkerId::default()); - - let create_module_loader = CreateWebWorkerCbHolder(create_web_worker_cb); - state.put::<CreateWebWorkerCbHolder>(create_module_loader); - } - super::reg_json_sync(rt, "op_create_worker", op_create_worker); - super::reg_json_sync( - rt, - "op_host_terminate_worker", - op_host_terminate_worker, - ); - super::reg_json_sync(rt, "op_host_post_message", op_host_post_message); - super::reg_json_async(rt, "op_host_get_message", op_host_get_message); - super::reg_json_sync( - rt, - "op_host_unhandled_error", - move |_state, args, _zero_copy| { - if let Some(mut sender) = sender.clone() { - let args: HostUnhandledErrorArgs = serde_json::from_value(args)?; - sender - .try_send(WorkerEvent::Error(generic_error(args.message))) - .expect("Failed to propagate error event to parent worker"); - Ok(json!(true)) - } else { - Err(generic_error("Cannot be called from main worker.")) - } - }, - ); -} - -pub struct WorkerThread { - join_handle: JoinHandle<Result<(), AnyError>>, - worker_handle: WebWorkerHandle, -} - -pub type WorkersTable = HashMap<u32, WorkerThread>; -pub type WorkerId = u32; - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct CreateWorkerArgs { - name: Option<String>, - specifier: String, - has_source_code: bool, - source_code: String, - use_deno_namespace: bool, -} - -/// Create worker as the host -fn op_create_worker( - state: &mut OpState, - args: Value, - _data: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: CreateWorkerArgs = serde_json::from_value(args)?; - - let specifier = args.specifier.clone(); - let maybe_source_code = if args.has_source_code { - Some(args.source_code.clone()) - } else { - None - }; - let args_name = args.name; - let use_deno_namespace = args.use_deno_namespace; - if use_deno_namespace { - super::check_unstable(state, "Worker.deno"); - } - let permissions = state.borrow::<Permissions>().clone(); - let worker_id = state.take::<WorkerId>(); - let create_module_loader = state.take::<CreateWebWorkerCbHolder>(); - state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone()); - state.put::<WorkerId>(worker_id + 1); - - let module_specifier = ModuleSpecifier::resolve_url(&specifier)?; - let worker_name = args_name.unwrap_or_else(|| "".to_string()); - - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1); - - // Setup new thread - let thread_builder = - std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); - - // Spawn it - let join_handle = thread_builder.spawn(move || { - // Any error inside this block is terminal: - // - JS worker is useless - meaning it throws an exception and can't do anything else, - // all action done upon it should be noops - // - newly spawned thread exits - - let worker = (create_module_loader.0)(CreateWebWorkerArgs { - name: worker_name, - worker_id, - permissions, - main_module: module_specifier.clone(), - use_deno_namespace, - }); - - // Send thread safe handle to newly created worker to host thread - handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); - drop(handle_sender); - - // At this point the only method of communication with host - // is using `worker.internal_channels`. - // - // Host can already push messages and interact with worker. - run_web_worker(worker, module_specifier, maybe_source_code) - })?; - - let worker_handle = handle_receiver.recv().unwrap()?; - - let worker_thread = WorkerThread { - join_handle, - worker_handle, - }; - - // At this point all interactions with worker happen using thread - // safe handler returned from previous function calls - state - .borrow_mut::<WorkersTable>() - .insert(worker_id, worker_thread); - - Ok(json!({ "id": worker_id })) -} - -#[derive(Deserialize)] -struct WorkerArgs { - id: i32, -} - -fn op_host_terminate_worker( - state: &mut OpState, - args: Value, - _data: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let worker_thread = state - .borrow_mut::<WorkersTable>() - .remove(&id) - .expect("No worker handle found"); - worker_thread.worker_handle.terminate(); - worker_thread - .join_handle - .join() - .expect("Panic in worker thread") - .expect("Panic in worker event loop"); - Ok(json!({})) -} - -fn serialize_worker_event(event: WorkerEvent) -> Value { - match event { - WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), - WorkerEvent::TerminalError(error) => match error.downcast::<JsError>() { - Ok(js_error) => json!({ - "type": "terminalError", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "terminalError", - "error": { - "message": error.to_string(), - } - }), - }, - WorkerEvent::Error(error) => match error.downcast::<JsError>() { - Ok(js_error) => json!({ - "type": "error", - "error": { - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - } - }), - Err(error) => json!({ - "type": "error", - "error": { - "message": error.to_string(), - } - }), - }, - } -} - -/// Try to remove worker from workers table - NOTE: `Worker.terminate()` -/// might have been called already meaning that we won't find worker in -/// table - in that case ignore. -fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: u32) { - let mut s = state.borrow_mut(); - let workers = s.borrow_mut::<WorkersTable>(); - if let Some(mut worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.sender.close_channel(); - worker_thread - .join_handle - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); - } -} - -/// Get message from guest worker as host -async fn op_host_get_message( - state: Rc<RefCell<OpState>>, - args: Value, - _zero_copy: BufVec, -) -> Result<Value, AnyError> { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - - let worker_handle = { - let s = state.borrow(); - let workers_table = s.borrow::<WorkersTable>(); - let maybe_handle = workers_table.get(&id); - if let Some(handle) = maybe_handle { - handle.worker_handle.clone() - } else { - // If handle was not found it means worker has already shutdown - return Ok(json!({ "type": "close" })); - } - }; - - let maybe_event = worker_handle.get_event().await?; - if let Some(event) = maybe_event { - // Terminal error means that worker should be removed from worker table. - if let WorkerEvent::TerminalError(_) = &event { - try_remove_and_close(state, id); - } - return Ok(serialize_worker_event(event)); - } - - // If there was no event from worker it means it has already been closed. - try_remove_and_close(state, id); - Ok(json!({ "type": "close" })) -} - -/// Post message to guest worker as host -fn op_host_post_message( - state: &mut OpState, - args: Value, - data: &mut [ZeroCopyBuf], -) -> Result<Value, AnyError> { - assert_eq!(data.len(), 1, "Invalid number of arguments"); - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let msg = Vec::from(&*data[0]).into_boxed_slice(); - - debug!("post message to worker {}", id); - let worker_thread = state - .borrow::<WorkersTable>() - .get(&id) - .expect("No worker handle found"); - worker_thread.worker_handle.post_message(msg)?; - Ok(json!({})) -} |