diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-02-08 20:34:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-08 20:34:31 +0100 |
commit | cdba5ab6fc633606aaa6f95d0825832c3ac6fe5c (patch) | |
tree | e8dee2801e14b65b2da6aca62e39cd3d3ac2a786 /cli | |
parent | 619a24390ff15d5ea5e577a4d0391823f94e8592 (diff) |
refactor: rename ThreadSafeState, use RefCell for mutable state (#3931)
* rename ThreadSafeState to State
* State stores InnerState wrapped in Rc and RefCell
Diffstat (limited to 'cli')
-rw-r--r-- | cli/compilers/compiler_worker.rs | 8 | ||||
-rw-r--r-- | cli/compilers/ts.rs | 5 | ||||
-rw-r--r-- | cli/compilers/wasm.rs | 5 | ||||
-rw-r--r-- | cli/import_map.rs | 2 | ||||
-rw-r--r-- | cli/lib.rs | 14 | ||||
-rw-r--r-- | cli/ops/compiler.rs | 31 | ||||
-rw-r--r-- | cli/ops/errors.rs | 13 | ||||
-rw-r--r-- | cli/ops/fetch.rs | 10 | ||||
-rw-r--r-- | cli/ops/files.rs | 32 | ||||
-rw-r--r-- | cli/ops/fs.rs | 38 | ||||
-rw-r--r-- | cli/ops/io.rs | 31 | ||||
-rw-r--r-- | cli/ops/net.rs | 57 | ||||
-rw-r--r-- | cli/ops/os.rs | 20 | ||||
-rw-r--r-- | cli/ops/permissions.rs | 20 | ||||
-rw-r--r-- | cli/ops/plugins.rs | 21 | ||||
-rw-r--r-- | cli/ops/process.rs | 36 | ||||
-rw-r--r-- | cli/ops/random.rs | 11 | ||||
-rw-r--r-- | cli/ops/repl.rs | 24 | ||||
-rw-r--r-- | cli/ops/resources.rs | 10 | ||||
-rw-r--r-- | cli/ops/runtime.rs | 7 | ||||
-rw-r--r-- | cli/ops/runtime_compiler.rs | 12 | ||||
-rw-r--r-- | cli/ops/signal.rs | 32 | ||||
-rw-r--r-- | cli/ops/timers.rs | 25 | ||||
-rw-r--r-- | cli/ops/tls.rs | 38 | ||||
-rw-r--r-- | cli/ops/web_worker.rs | 27 | ||||
-rw-r--r-- | cli/ops/worker_host.rs | 89 | ||||
-rw-r--r-- | cli/shell.rs | 2 | ||||
-rw-r--r-- | cli/state.rs | 163 | ||||
-rw-r--r-- | cli/web_worker.rs | 81 | ||||
-rw-r--r-- | cli/worker.rs | 50 |
30 files changed, 452 insertions, 462 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs index f76395e75..3252aae02 100644 --- a/cli/compilers/compiler_worker.rs +++ b/cli/compilers/compiler_worker.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::ops; -use crate::state::ThreadSafeState; +use crate::state::State; use crate::worker::Worker; use deno_core; use deno_core::StartupData; @@ -23,11 +23,7 @@ use std::ops::DerefMut; pub struct CompilerWorker(Worker); impl CompilerWorker { - pub fn new( - name: String, - startup_data: StartupData, - state: ThreadSafeState, - ) -> Self { + pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let state_ = state.clone(); let mut worker = Worker::new(name, startup_data, state_); { diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 303c24b6a..91c264345 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -249,9 +249,8 @@ impl TsCompiler { fn setup_worker(global_state: GlobalState) -> CompilerWorker { let entry_point = ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap(); - let worker_state = - ThreadSafeState::new(global_state.clone(), None, entry_point) - .expect("Unable to create worker state"); + let worker_state = State::new(global_state.clone(), None, entry_point) + .expect("Unable to create worker state"); // Count how many times we start the compiler worker. global_state diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index 6b8da100a..c1c179f62 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -52,9 +52,8 @@ impl WasmCompiler { let entry_point = ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts") .unwrap(); - let worker_state = - ThreadSafeState::new(global_state.clone(), None, entry_point) - .expect("Unable to create worker state"); + let worker_state = State::new(global_state.clone(), None, entry_point) + .expect("Unable to create worker state"); // Count how many times we start the compiler worker. global_state diff --git a/cli/import_map.rs b/cli/import_map.rs index 1a9207288..f9c50345d 100644 --- a/cli/import_map.rs +++ b/cli/import_map.rs @@ -38,7 +38,7 @@ const SUPPORTED_FETCH_SCHEMES: [&str; 3] = ["http", "https", "file"]; type SpecifierMap = IndexMap<String, Vec<ModuleSpecifier>>; type ScopesMap = IndexMap<String, SpecifierMap>; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ImportMap { base_url: String, imports: SpecifierMap, diff --git a/cli/lib.rs b/cli/lib.rs index e752d6f2a..945554434 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -61,7 +61,7 @@ use crate::deno_error::js_check; use crate::deno_error::{print_err_and_exit, print_msg_and_exit}; use crate::global_state::GlobalState; use crate::ops::io::get_stdio; -use crate::state::ThreadSafeState; +use crate::state::State; use crate::worker::MainWorker; use deno_core::v8_set_flags; use deno_core::ErrBox; @@ -107,17 +107,17 @@ fn create_main_worker( global_state: GlobalState, main_module: ModuleSpecifier, ) -> MainWorker { - let state = ThreadSafeState::new(global_state, None, main_module) + let state = State::new(global_state, None, main_module) .map_err(deno_error::print_err_and_exit) .unwrap(); let state_ = state.clone(); { - let mut resource_table = state_.lock_resource_table(); + let mut state = state_.borrow_mut(); let (stdin, stdout, stderr) = get_stdio(); - resource_table.add("stdin", Box::new(stdin)); - resource_table.add("stdout", Box::new(stdout)); - resource_table.add("stderr", Box::new(stderr)); + state.resource_table.add("stdin", Box::new(stdin)); + state.resource_table.add("stdout", Box::new(stdout)); + state.resource_table.add("stderr", Box::new(stderr)); } MainWorker::new("main".to_string(), startup_data::deno_isolate_init(), state) @@ -157,7 +157,7 @@ async fn print_file_info( worker: &MainWorker, module_specifier: ModuleSpecifier, ) { - let global_state = worker.state.global_state.clone(); + let global_state = worker.state.borrow().global_state.clone(); let maybe_source_file = global_state .file_fetcher diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 2934c79ee..cce2f4980 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -3,11 +3,11 @@ use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::futures::future::try_join_all; use crate::msg; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::Loader; use deno_core::*; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache)))); i.register_op( "resolve_modules", @@ -28,7 +28,7 @@ struct CacheArgs { } fn op_cache( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -37,11 +37,15 @@ fn op_cache( let module_specifier = ModuleSpecifier::resolve_url(&args.module_id) .expect("Should be valid module specifier"); - state.global_state.ts_compiler.cache_compiler_output( - &module_specifier, - &args.extension, - &args.contents, - )?; + state + .borrow() + .global_state + .ts_compiler + .cache_compiler_output( + &module_specifier, + &args.extension, + &args.contents, + )?; Ok(JsonOp::Sync(json!({}))) } @@ -53,7 +57,7 @@ struct SpecifiersReferrerArgs { } fn op_resolve_modules( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -78,7 +82,7 @@ fn op_resolve_modules( } fn op_fetch_source_files( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -93,18 +97,17 @@ fn op_fetch_source_files( }; let mut futures = vec![]; + let global_state = state.borrow().global_state.clone(); + for specifier in &args.specifiers { let resolved_specifier = ModuleSpecifier::resolve_url(&specifier).expect("Invalid specifier"); - let fut = state - .global_state + let fut = global_state .file_fetcher .fetch_source_file_async(&resolved_specifier, ref_specifier.clone()); futures.push(fut); } - let global_state = state.global_state.clone(); - let future = Box::pin(async move { let files = try_join_all(futures).await?; diff --git a/cli/ops/errors.rs b/cli/ops/errors.rs index 14edf9cee..af0d7c93d 100644 --- a/cli/ops/errors.rs +++ b/cli/ops/errors.rs @@ -4,11 +4,11 @@ use crate::fmt_errors::JSError; use crate::ops::json_op; use crate::source_maps::get_orig_position; use crate::source_maps::CachedMaps; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use std::collections::HashMap; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "apply_source_map", s.core_op(json_op(s.stateful_op(op_apply_source_map))), @@ -25,12 +25,13 @@ struct FormatErrorArgs { } fn op_format_error( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: FormatErrorArgs = serde_json::from_value(args)?; - let error = JSError::from_json(&args.error, &state.global_state.ts_compiler); + let error = + JSError::from_json(&args.error, &state.borrow().global_state.ts_compiler); Ok(JsonOp::Sync(json!({ "error": error.to_string(), @@ -45,7 +46,7 @@ struct ApplySourceMap { } fn op_apply_source_map( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -57,7 +58,7 @@ fn op_apply_source_map( args.line.into(), args.column.into(), &mut mappings_map, - &state.global_state.ts_compiler, + &state.borrow().global_state.ts_compiler, ); Ok(JsonOp::Sync(json!({ diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 7ce3f1a40..f43133d7f 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -3,7 +3,7 @@ use super::dispatch_json::{Deserialize, JsonOp, Value}; use super::io::StreamResource; use crate::http_util::{create_http_client, HttpBody}; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures::future::FutureExt; use http::header::HeaderName; @@ -12,7 +12,7 @@ use http::Method; use std; use std::convert::From; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("fetch", s.core_op(json_op(s.stateful_op(op_fetch)))); } @@ -24,7 +24,7 @@ struct FetchArgs { } pub fn op_fetch( - state: &ThreadSafeState, + state: &State, args: Value, data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -65,8 +65,8 @@ pub fn op_fetch( } let body = HttpBody::from(res); - let mut table = state_.lock_resource_table(); - let rid = table.add( + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( "httpBody", Box::new(StreamResource::HttpBody(Box::new(body))), ); diff --git a/cli/ops/files.rs b/cli/ops/files.rs index f32de90b9..d625d4590 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -6,7 +6,7 @@ use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures::future::FutureExt; use std; @@ -15,7 +15,7 @@ use std::io::SeekFrom; use std::path::Path; use tokio; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("open", s.core_op(json_op(s.stateful_op(op_open)))); i.register_op("close", s.core_op(json_op(s.stateful_op(op_close)))); i.register_op("seek", s.core_op(json_op(s.stateful_op(op_seek)))); @@ -43,7 +43,7 @@ struct OpenOptions { } fn op_open( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -130,8 +130,10 @@ fn op_open( let fut = async move { let fs_file = open_options.open(filename).await?; - let mut table = state_.lock_resource_table(); - let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); + let mut state = state_.borrow_mut(); + let rid = state + .resource_table + .add("fsFile", Box::new(StreamResource::FsFile(fs_file))); Ok(json!(rid)) }; @@ -149,14 +151,17 @@ struct CloseArgs { } fn op_close( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: CloseArgs = serde_json::from_value(args)?; - let mut table = state.lock_resource_table(); - table.close(args.rid as u32).ok_or_else(bad_resource)?; + let mut state = state.borrow_mut(); + state + .resource_table + .close(args.rid as u32) + .ok_or_else(bad_resource)?; Ok(JsonOp::Sync(json!({}))) } @@ -170,7 +175,7 @@ struct SeekArgs { } fn op_seek( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -191,13 +196,14 @@ fn op_seek( } }; - let mut table = state.lock_resource_table(); - let resource = table - .get_mut::<StreamResource>(rid) + let state = state.borrow(); + let resource = state + .resource_table + .get::<StreamResource>(rid) .ok_or_else(bad_resource)?; let tokio_file = match resource { - StreamResource::FsFile(ref mut file) => file, + StreamResource::FsFile(ref file) => file, _ => return Err(bad_resource()), }; let mut file = futures::executor::block_on(tokio_file.try_clone())?; diff --git a/cli/ops/fs.rs b/cli/ops/fs.rs index 1112db495..c9b9f0e31 100644 --- a/cli/ops/fs.rs +++ b/cli/ops/fs.rs @@ -6,7 +6,7 @@ use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use remove_dir_all::remove_dir_all; use std::convert::From; @@ -19,7 +19,7 @@ use std::os::unix::fs::MetadataExt; #[cfg(unix)] use std::os::unix::fs::PermissionsExt; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("chdir", s.core_op(json_op(s.stateful_op(op_chdir)))); i.register_op("mkdir", s.core_op(json_op(s.stateful_op(op_mkdir)))); i.register_op("chmod", s.core_op(json_op(s.stateful_op(op_chmod)))); @@ -48,7 +48,7 @@ struct ChdirArgs { } fn op_chdir( - _state: &ThreadSafeState, + _state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -67,7 +67,7 @@ struct MkdirArgs { } fn op_mkdir( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -94,7 +94,7 @@ struct ChmodArgs { } fn op_chmod( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -128,7 +128,7 @@ struct ChownArgs { } fn op_chown( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -156,7 +156,7 @@ struct RemoveArgs { } fn op_remove( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -191,7 +191,7 @@ struct CopyFileArgs { } fn op_copy_file( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -290,7 +290,7 @@ struct StatArgs { } fn op_stat( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -320,7 +320,7 @@ struct RealpathArgs { } fn op_realpath( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -352,7 +352,7 @@ struct ReadDirArgs { } fn op_read_dir( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -389,7 +389,7 @@ struct RenameArgs { } fn op_rename( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -418,7 +418,7 @@ struct LinkArgs { } fn op_link( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -446,7 +446,7 @@ struct SymlinkArgs { } fn op_symlink( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -478,7 +478,7 @@ struct ReadLinkArgs { } fn op_read_link( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -506,7 +506,7 @@ struct TruncateArgs { } fn op_truncate( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -535,7 +535,7 @@ struct MakeTempDirArgs { } fn op_make_temp_dir( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -575,7 +575,7 @@ struct Utime { } fn op_utime( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -590,7 +590,7 @@ fn op_utime( } fn op_cwd( - _state: &ThreadSafeState, + _state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 9c074b8ba..4128060f1 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -3,7 +3,7 @@ use crate::deno_error; use crate::deno_error::bad_resource; use crate::http_util::HttpBody; use crate::ops::minimal_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::ErrBox; use deno_core::*; use futures::future::FutureExt; @@ -47,7 +47,7 @@ lazy_static! { }; } -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "read", s.core_op(minimal_op(s.stateful_minimal_op(op_read))), @@ -131,7 +131,7 @@ enum IoState { /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T> +pub fn read<T>(state: &State, rid: ResourceId, buf: T) -> Read<T> where T: AsMut<[u8]>, { @@ -151,7 +151,7 @@ pub struct Read<T> { rid: ResourceId, buf: T, io_state: IoState, - state: ThreadSafeState, + state: State, } impl<T> Future for Read<T> @@ -166,8 +166,9 @@ where panic!("poll a Read after it's done"); } - let mut table = inner.state.lock_resource_table(); - let resource = table + let mut state = inner.state.borrow_mut(); + let resource = state + .resource_table .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?; @@ -177,7 +178,7 @@ where } pub fn op_read( - state: &ThreadSafeState, + state: &State, rid: i32, zero_copy: Option<ZeroCopyBuf>, ) -> Pin<Box<MinimalOp>> { @@ -257,7 +258,7 @@ pub struct Write<T> { rid: ResourceId, buf: T, io_state: IoState, - state: ThreadSafeState, + state: State, nwritten: i32, } @@ -266,7 +267,7 @@ pub struct Write<T> { /// /// Any error which happens during writing will cause both the stream and the /// buffer to get destroyed. -pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T> +pub fn write<T>(state: &State, rid: ResourceId, buf: T) -> Write<T> where T: AsRef<[u8]>, { @@ -294,8 +295,9 @@ where } if inner.io_state == IoState::Pending { - let mut table = inner.state.lock_resource_table(); - let resource = table + let mut state = inner.state.borrow_mut(); + let resource = state + .resource_table .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; @@ -309,8 +311,9 @@ where // Figure out why it's needed and preferably remove it. // https://github.com/denoland/deno/issues/3565 if inner.io_state == IoState::Flush { - let mut table = inner.state.lock_resource_table(); - let resource = table + let mut state = inner.state.borrow_mut(); + let resource = state + .resource_table .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; ready!(resource.poll_flush(cx))?; @@ -322,7 +325,7 @@ where } pub fn op_write( - state: &ThreadSafeState, + state: &State, rid: i32, zero_copy: Option<ZeroCopyBuf>, ) -> Pin<Box<MinimalOp>> { diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 5b35d9287..569aebca0 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -4,7 +4,7 @@ use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures::future::FutureExt; use std; @@ -19,7 +19,7 @@ use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("accept", s.core_op(json_op(s.stateful_op(op_accept)))); i.register_op("connect", s.core_op(json_op(s.stateful_op(op_connect)))); i.register_op("shutdown", s.core_op(json_op(s.stateful_op(op_shutdown)))); @@ -33,7 +33,7 @@ enum AcceptState { } /// Simply accepts a connection. -pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { +pub fn accept(state: &State, rid: ResourceId) -> Accept { Accept { accept_state: AcceptState::Pending, rid, @@ -45,7 +45,7 @@ pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept { pub struct Accept<'a> { accept_state: AcceptState, rid: ResourceId, - state: &'a ThreadSafeState, + state: &'a State, } impl Future for Accept<'_> { @@ -57,8 +57,9 @@ impl Future for Accept<'_> { panic!("poll Accept after it's done"); } - let mut table = inner.state.lock_resource_table(); - let listener_resource = table + let mut state = inner.state.borrow_mut(); + let listener_resource = state + .resource_table .get_mut::<TcpListenerResource>(inner.rid) .ok_or_else(|| { let e = std::io::Error::new( @@ -95,25 +96,29 @@ struct AcceptArgs { } fn op_accept( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: AcceptArgs = serde_json::from_value(args)?; let rid = args.rid as u32; let state_ = state.clone(); - let table = state.lock_resource_table(); - table - .get::<TcpListenerResource>(rid) - .ok_or_else(bad_resource)?; + { + let state = state.borrow(); + state + .resource_table + .get::<TcpListenerResource>(rid) + .ok_or_else(bad_resource)?; + } let op = async move { let (tcp_stream, _socket_addr) = accept(&state_, rid).await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut table = state_.lock_resource_table(); - let rid = - table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + let mut state = state_.borrow_mut(); + let rid = state + .resource_table + .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); Ok(json!({ "rid": rid, "localAddr": { @@ -140,7 +145,7 @@ struct ConnectArgs { } fn op_connect( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -154,9 +159,10 @@ fn op_connect( let tcp_stream = TcpStream::connect(&addr).await?; let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let mut table = state_.lock_resource_table(); - let rid = - table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + let mut state = state_.borrow_mut(); + let rid = state + .resource_table + .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); Ok(json!({ "rid": rid, "localAddr": { @@ -182,7 +188,7 @@ struct ShutdownArgs { } fn op_shutdown( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -197,8 +203,9 @@ fn op_shutdown( _ => unimplemented!(), }; - let mut table = state.lock_resource_table(); - let resource = table + let mut state = state.borrow_mut(); + let resource = state + .resource_table .get_mut::<StreamResource>(rid) .ok_or_else(bad_resource)?; match resource { @@ -272,7 +279,7 @@ impl TcpListenerResource { } fn op_listen( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -290,8 +297,10 @@ fn op_listen( waker: None, local_addr, }; - let mut table = state.lock_resource_table(); - let rid = table.add("tcpListener", Box::new(listener_resource)); + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add("tcpListener", Box::new(listener_resource)); debug!( "New listener {} {}:{}", rid, diff --git a/cli/ops/os.rs b/cli/ops/os.rs index ce2320d92..8def5ac1e 100644 --- a/cli/ops/os.rs +++ b/cli/ops/os.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use atty; use deno_core::*; use std::collections::HashMap; @@ -10,7 +10,7 @@ use std::io::{Error, ErrorKind}; use sys_info; use url::Url; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("exit", s.core_op(json_op(s.stateful_op(op_exit)))); i.register_op("is_tty", s.core_op(json_op(s.stateful_op(op_is_tty)))); i.register_op("env", s.core_op(json_op(s.stateful_op(op_env)))); @@ -27,7 +27,7 @@ struct GetDirArgs { } fn op_get_dir( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -73,7 +73,7 @@ fn op_get_dir( } fn op_exec_path( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -93,7 +93,7 @@ struct SetEnv { } fn op_set_env( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -104,7 +104,7 @@ fn op_set_env( } fn op_env( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -119,7 +119,7 @@ struct GetEnv { } fn op_get_env( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -138,7 +138,7 @@ struct Exit { } fn op_exit( - _s: &ThreadSafeState, + _s: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -147,7 +147,7 @@ fn op_exit( } fn op_is_tty( - _s: &ThreadSafeState, + _s: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -159,7 +159,7 @@ fn op_is_tty( } fn op_hostname( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { diff --git a/cli/ops/permissions.rs b/cli/ops/permissions.rs index 39d49c32f..7c7cb682c 100644 --- a/cli/ops/permissions.rs +++ b/cli/ops/permissions.rs @@ -3,11 +3,11 @@ use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error::other_error; use crate::fs as deno_fs; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use std::path::Path; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "query_permission", s.core_op(json_op(s.stateful_op(op_query_permission))), @@ -38,14 +38,14 @@ fn resolve_path(path: &str) -> String { } pub fn op_query_permission( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: PermissionArgs = serde_json::from_value(args)?; - let permissions = state.permissions.lock().unwrap(); + let state = state.borrow(); let resolved_path = args.path.as_ref().map(String::as_str).map(resolve_path); - let perm = permissions.get_permission_state( + let perm = state.permissions.get_permission_state( &args.name, &args.url.as_ref().map(String::as_str), &resolved_path.as_ref().map(String::as_str).map(Path::new), @@ -54,12 +54,13 @@ pub fn op_query_permission( } pub fn op_revoke_permission( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: PermissionArgs = serde_json::from_value(args)?; - let mut permissions = state.permissions.lock().unwrap(); + let mut state = state.borrow_mut(); + let permissions = &mut state.permissions; match args.name.as_ref() { "run" => permissions.allow_run.revoke(), "read" => permissions.allow_read.revoke(), @@ -80,12 +81,13 @@ pub fn op_revoke_permission( } pub fn op_request_permission( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: PermissionArgs = serde_json::from_value(args)?; - let mut permissions = state.permissions.lock().unwrap(); + let mut state = state.borrow_mut(); + let permissions = &mut state.permissions; let resolved_path = args.path.as_ref().map(String::as_str).map(resolve_path); let perm = match args.name.as_ref() { "run" => Ok(permissions.request_run()), diff --git a/cli/ops/plugins.rs b/cli/ops/plugins.rs index 24a59917e..ecaa66b53 100644 --- a/cli/ops/plugins.rs +++ b/cli/ops/plugins.rs @@ -1,7 +1,7 @@ use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::fs as deno_fs; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use dlopen::symbor::Library; use std::collections::HashMap; @@ -9,11 +9,7 @@ use std::ffi::OsStr; use std::path::Path; use std::sync::Arc; -pub fn init( - i: &mut Isolate, - s: &ThreadSafeState, - r: Arc<deno_core::OpRegistry>, -) { +pub fn init(i: &mut Isolate, s: &State, r: Arc<deno_core::OpRegistry>) { let r_ = r; i.register_op( "open_plugin", @@ -56,7 +52,7 @@ struct OpenPluginArgs { pub fn op_open_plugin( registry: &Arc<deno_core::OpRegistry>, - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -70,9 +66,14 @@ pub fn op_open_plugin( lib, ops: HashMap::new(), }; - let mut table = state.lock_resource_table(); - let rid = table.add("plugin", Box::new(plugin_resource)); - let plugin_resource = table.get_mut::<PluginResource>(rid).unwrap(); + let mut state_ = state.borrow_mut(); + let rid = state_ + .resource_table + .add("plugin", Box::new(plugin_resource)); + let plugin_resource = state_ + .resource_table + .get_mut::<PluginResource>(rid) + .unwrap(); let init_fn = *unsafe { plugin_resource diff --git a/cli/ops/process.rs b/cli/ops/process.rs index 1ffd6b78e..e93bcbc0f 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -4,12 +4,11 @@ use super::io::StreamResource; use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::signal::kill; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::task::SpawnExt; use std; use std::convert::From; use std::future::Future; @@ -22,7 +21,7 @@ use tokio::process::Command; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("run", s.core_op(json_op(s.stateful_op(op_run)))); i.register_op( "run_status", @@ -31,12 +30,10 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); } -fn clone_file( - rid: u32, - state: &ThreadSafeState, -) -> Result<std::fs::File, ErrBox> { - let mut table = state.lock_resource_table(); - let repr = table +fn clone_file(rid: u32, state: &State) -> Result<std::fs::File, ErrBox> { + let mut state = state.borrow_mut(); + let repr = state + .resource_table .get_mut::<StreamResource>(rid) .ok_or_else(bad_resource)?; let file = match repr { @@ -76,7 +73,7 @@ struct ChildResource { } fn op_run( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -131,7 +128,8 @@ fn op_run( let mut child = c.spawn()?; let pid = child.id(); - let mut table = state_.lock_resource_table(); + let mut state = state_.borrow_mut(); + let table = &mut state.resource_table; let stdin_rid = match child.stdin.take() { Some(child_stdin) => { @@ -180,7 +178,7 @@ fn op_run( pub struct ChildStatus { rid: ResourceId, - state: ThreadSafeState, + state: State, } impl Future for ChildStatus { @@ -188,8 +186,9 @@ impl Future for ChildStatus { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { let inner = self.get_mut(); - let mut table = inner.state.lock_resource_table(); - let child_resource = table + let mut state = inner.state.borrow_mut(); + let child_resource = state + .resource_table .get_mut::<ChildResource>(inner.rid) .ok_or_else(bad_resource)?; let child = &mut child_resource.child; @@ -204,7 +203,7 @@ struct RunStatusArgs { } fn op_run_status( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -239,10 +238,7 @@ fn op_run_status( })) }; - let pool = futures::executor::ThreadPool::new().unwrap(); - let handle = pool.spawn_with_handle(future).unwrap(); - - Ok(JsonOp::Async(handle.boxed())) + Ok(JsonOp::Async(future.boxed_local())) } #[derive(Deserialize)] @@ -252,7 +248,7 @@ struct KillArgs { } fn op_kill( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { diff --git a/cli/ops/random.rs b/cli/ops/random.rs index 950e9c7f0..3c570090b 100644 --- a/cli/ops/random.rs +++ b/cli/ops/random.rs @@ -1,12 +1,12 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use rand::thread_rng; use rand::Rng; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "get_random_values", s.core_op(json_op(s.stateful_op(op_get_random_values))), @@ -14,15 +14,14 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { } fn op_get_random_values( - state: &ThreadSafeState, + state: &State, _args: Value, zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { assert!(zero_copy.is_some()); - if let Some(ref seeded_rng) = state.seeded_rng { - let mut rng = seeded_rng.lock().unwrap(); - rng.fill(&mut zero_copy.unwrap()[..]); + if let Some(ref mut seeded_rng) = state.borrow_mut().seeded_rng { + seeded_rng.fill(&mut zero_copy.unwrap()[..]); } else { let mut rng = thread_rng(); rng.fill(&mut zero_copy.unwrap()[..]); diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index 9b47f262a..d7c94a56e 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -4,12 +4,12 @@ use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::repl; use crate::repl::Repl; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use std::sync::Arc; use std::sync::Mutex; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "repl_start", s.core_op(json_op(s.stateful_op(op_repl_start))), @@ -29,7 +29,7 @@ struct ReplStartArgs { } fn op_repl_start( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -37,11 +37,11 @@ fn op_repl_start( debug!("op_repl_start {}", args.history_file); let history_path = - repl::history_path(&state.global_state.dir, &args.history_file); + repl::history_path(&state.borrow().global_state.dir, &args.history_file); let repl = repl::Repl::new(history_path); + let mut state = state.borrow_mut(); let resource = ReplResource(Arc::new(Mutex::new(repl))); - let mut table = state.lock_resource_table(); - let rid = table.add("repl", Box::new(resource)); + let rid = state.resource_table.add("repl", Box::new(resource)); Ok(JsonOp::Sync(json!(rid))) } @@ -52,7 +52,7 @@ struct ReplReadlineArgs { } fn op_repl_readline( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -60,12 +60,14 @@ fn op_repl_readline( let rid = args.rid as u32; let prompt = args.prompt; debug!("op_repl_readline {} {}", rid, prompt); - let state = state.clone(); + let state = state.borrow(); + let resource = state + .resource_table + .get::<ReplResource>(rid) + .ok_or_else(bad_resource)?; + let repl = resource.0.clone(); blocking_json(false, move || { - let table = state.lock_resource_table(); - let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?; - let repl = resource.0.clone(); let line = repl.lock().unwrap().readline(&prompt)?; Ok(json!(line)) }) diff --git a/cli/ops/resources.rs b/cli/ops/resources.rs index 447082f80..1fef4a530 100644 --- a/cli/ops/resources.rs +++ b/cli/ops/resources.rs @@ -1,19 +1,19 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("resources", s.core_op(json_op(s.stateful_op(op_resources)))); } fn op_resources( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { - let resource_table = state.lock_resource_table(); - let serialized_resources = resource_table.entries(); + let state = state.borrow(); + let serialized_resources = state.resource_table.entries(); Ok(JsonOp::Sync(json!(serialized_resources))) } diff --git a/cli/ops/runtime.rs b/cli/ops/runtime.rs index 210bbfcf6..a962f4e83 100644 --- a/cli/ops/runtime.rs +++ b/cli/ops/runtime.rs @@ -3,7 +3,7 @@ use super::dispatch_json::{JsonOp, Value}; use crate::colors; use crate::fs as deno_fs; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use crate::version; use crate::DenoSubcommand; use deno_core::*; @@ -19,15 +19,16 @@ static BUILD_OS: &str = "win"; #[cfg(target_arch = "x86_64")] static BUILD_ARCH: &str = "x64"; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("start", s.core_op(json_op(s.stateful_op(op_start)))); } fn op_start( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { + let state = state.borrow(); let gs = &state.global_state; Ok(JsonOp::Sync(json!({ diff --git a/cli/ops/runtime_compiler.rs b/cli/ops/runtime_compiler.rs index 0cdc1094e..9cfda013b 100644 --- a/cli/ops/runtime_compiler.rs +++ b/cli/ops/runtime_compiler.rs @@ -3,11 +3,11 @@ use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::compilers::runtime_compile_async; use crate::compilers::runtime_transpile_async; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use std::collections::HashMap; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile)))); i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile)))); } @@ -22,13 +22,13 @@ struct CompileArgs { } fn op_compile( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: CompileArgs = serde_json::from_value(args)?; Ok(JsonOp::Async(runtime_compile_async( - state.global_state.clone(), + state.borrow().global_state.clone(), &args.root_name, &args.sources, args.bundle, @@ -43,13 +43,13 @@ struct TranspileArgs { } fn op_transpile( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: TranspileArgs = serde_json::from_value(args)?; Ok(JsonOp::Async(runtime_transpile_async( - state.global_state.clone(), + state.borrow().global_state.clone(), &args.sources, &args.options, ))) diff --git a/cli/ops/signal.rs b/cli/ops/signal.rs index f815cb57c..8d70f9fe8 100644 --- a/cli/ops/signal.rs +++ b/cli/ops/signal.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{JsonOp, Value}; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; #[cfg(unix)] @@ -17,7 +17,7 @@ use std::task::Waker; #[cfg(unix)] use tokio::signal::unix::{signal, Signal, SignalKind}; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "signal_bind", s.core_op(json_op(s.stateful_op(op_signal_bind))), @@ -51,13 +51,13 @@ struct SignalArgs { #[cfg(unix)] fn op_signal_bind( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: BindSignalArgs = serde_json::from_value(args)?; - let mut table = state.lock_resource_table(); - let rid = table.add( + let mut state = state.borrow_mut(); + let rid = state.resource_table.add( "signal", Box::new(SignalStreamResource( signal(SignalKind::from_raw(args.signo)).expect(""), @@ -71,7 +71,7 @@ fn op_signal_bind( #[cfg(unix)] fn op_signal_poll( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -80,8 +80,10 @@ fn op_signal_poll( let state_ = state.clone(); let future = poll_fn(move |cx| { - let mut table = state_.lock_resource_table(); - if let Some(mut signal) = table.get_mut::<SignalStreamResource>(rid) { + let mut state = state_.borrow_mut(); + if let Some(mut signal) = + state.resource_table.get_mut::<SignalStreamResource>(rid) + { signal.1 = Some(cx.waker().clone()); return signal.0.poll_recv(cx); } @@ -94,14 +96,14 @@ fn op_signal_poll( #[cfg(unix)] pub fn op_signal_unbind( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: SignalArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let mut table = state.lock_resource_table(); - let resource = table.get::<SignalStreamResource>(rid); + let mut state = state.borrow_mut(); + let resource = state.resource_table.get::<SignalStreamResource>(rid); if let Some(signal) = resource { if let Some(waker) = &signal.1 { // Wakes up the pending poll if exists. @@ -109,13 +111,13 @@ pub fn op_signal_unbind( waker.clone().wake(); } } - table.close(rid).ok_or_else(bad_resource)?; + state.resource_table.close(rid).ok_or_else(bad_resource)?; Ok(JsonOp::Sync(json!({}))) } #[cfg(not(unix))] pub fn op_signal_bind( - _state: &ThreadSafeState, + _state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -124,7 +126,7 @@ pub fn op_signal_bind( #[cfg(not(unix))] fn op_signal_unbind( - _state: &ThreadSafeState, + _state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -133,7 +135,7 @@ fn op_signal_unbind( #[cfg(not(unix))] fn op_signal_poll( - _state: &ThreadSafeState, + _state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index 75b53518c..ebcbcd706 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -1,14 +1,14 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures::future::FutureExt; use std; use std::time::Duration; use std::time::Instant; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "global_timer_stop", s.core_op(json_op(s.stateful_op(op_global_timer_stop))), @@ -21,13 +21,12 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { } fn op_global_timer_stop( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { - let state = state; - let mut t = state.global_timer.lock().unwrap(); - t.cancel(); + let mut state = state.borrow_mut(); + state.global_timer.cancel(); Ok(JsonOp::Sync(json!({}))) } @@ -37,17 +36,17 @@ struct GlobalTimerArgs { } fn op_global_timer( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: GlobalTimerArgs = serde_json::from_value(args)?; let val = args.timeout; - let state = state; - let mut t = state.global_timer.lock().unwrap(); + let mut state = state.borrow_mut(); let deadline = Instant::now() + Duration::from_millis(val); - let f = t + let f = state + .global_timer .new_timeout(deadline) .then(move |_| futures::future::ok(json!({}))); @@ -59,19 +58,19 @@ fn op_global_timer( // If the High precision flag is not set, the // nanoseconds are rounded on 2ms. fn op_now( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { + let state = state.borrow(); let seconds = state.start_time.elapsed().as_secs(); let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); let reduced_time_precision = 2_000_000; // 2ms in nanoseconds - let permissions = state.permissions.lock().unwrap(); // If the permission is not enabled // Round the nano result on 2 milliseconds // see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision - if !permissions.allow_hrtime.is_allow() { + if !state.permissions.allow_hrtime.is_allow() { subsec_nanos -= subsec_nanos % reduced_time_precision } diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 45b2f2d38..6dbe99c85 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -6,7 +6,7 @@ use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures::future::FutureExt; use std; @@ -35,7 +35,7 @@ use webpki; use webpki::DNSNameRef; use webpki_roots; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "connect_tls", s.core_op(json_op(s.stateful_op(op_connect_tls))), @@ -60,7 +60,7 @@ struct ConnectTLSArgs { } pub fn op_connect_tls( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -95,8 +95,8 @@ pub fn op_connect_tls( let dnsname = DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; - let mut table = state_.lock_resource_table(); - let rid = table.add( + let mut state = state_.borrow_mut(); + let rid = state.resource_table.add( "clientTlsStream", Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), ); @@ -241,7 +241,7 @@ struct ListenTlsArgs { } fn op_listen_tls( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -270,8 +270,10 @@ fn op_listen_tls( waker: None, local_addr, }; - let mut table = state.lock_resource_table(); - let rid = table.add("tlsListener", Box::new(tls_listener_resource)); + let mut state = state.borrow_mut(); + let rid = state + .resource_table + .add("tlsListener", Box::new(tls_listener_resource)); Ok(JsonOp::Sync(json!({ "rid": rid, @@ -290,7 +292,7 @@ enum AcceptTlsState { } /// Simply accepts a TLS connection. -pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls { +pub fn accept_tls(state: &State, rid: ResourceId) -> AcceptTls { AcceptTls { accept_state: AcceptTlsState::Pending, rid, @@ -302,7 +304,7 @@ pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls { pub struct AcceptTls { accept_state: AcceptTlsState, rid: ResourceId, - state: ThreadSafeState, + state: State, } impl Future for AcceptTls { @@ -314,8 +316,9 @@ impl Future for AcceptTls { panic!("poll AcceptTls after it's done"); } - let mut table = inner.state.lock_resource_table(); - let listener_resource = table + let mut state = inner.state.borrow_mut(); + let listener_resource = state + .resource_table .get_mut::<TlsListenerResource>(inner.rid) .ok_or_else(|| { let e = std::io::Error::new( @@ -352,7 +355,7 @@ struct AcceptTlsArgs { } fn op_accept_tls( - state: &ThreadSafeState, + state: &State, args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -364,8 +367,9 @@ fn op_accept_tls( let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; let tls_acceptor = { - let table = state.lock_resource_table(); - let resource = table + let state = state.borrow(); + let resource = state + .resource_table .get::<TlsListenerResource>(rid) .ok_or_else(bad_resource) .expect("Can't find tls listener"); @@ -373,8 +377,8 @@ fn op_accept_tls( }; let tls_stream = tls_acceptor.accept(tcp_stream).await?; let rid = { - let mut table = state.lock_resource_table(); - table.add( + let mut state = state.borrow_mut(); + state.resource_table.add( "serverTlsStream", Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), ) diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index 7bfc70a84..ae6b10abc 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -3,14 +3,14 @@ use super::dispatch_json::{JsonOp, Value}; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; use crate::ops::json_op; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core::*; use futures; use futures::future::FutureExt; use std; use std::convert::From; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "worker_post_message", s.core_op(json_op(s.stateful_op(op_worker_post_message))), @@ -23,14 +23,21 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { /// Get message from host as guest worker fn op_worker_get_message( - state: &ThreadSafeState, + state: &State, _args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let state_ = state.clone(); let op = async move { - let c = state_.worker_channels_internal.lock().unwrap(); - let maybe_buf = c.as_ref().unwrap().get_message().await; + let fut = { + let state = state_.borrow(); + state + .worker_channels_internal + .as_ref() + .unwrap() + .get_message() + }; + let maybe_buf = fut.await; debug!("op_worker_get_message"); Ok(json!({ "data": maybe_buf })) }; @@ -40,13 +47,17 @@ fn op_worker_get_message( /// Post message to host as guest worker fn op_worker_post_message( - state: &ThreadSafeState, + state: &State, _args: Value, data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let c = state.worker_channels_internal.lock().unwrap(); - let fut = c.as_ref().unwrap().post_message(d); + let state = state.borrow(); + let fut = state + .worker_channels_internal + .as_ref() + .unwrap() + .post_message(d); futures::executor::block_on(fut) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index f8b3edfce..fabe0b5e8 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -4,11 +4,11 @@ use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; -use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::startup_data; -use crate::state::ThreadSafeState; +use crate::state::State; use crate::web_worker::WebWorker; +use crate::worker::WorkerChannelsExternal; use deno_core::*; use futures; use futures::future::FutureExt; @@ -17,7 +17,7 @@ use std; use std::convert::From; use std::sync::atomic::Ordering; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "create_worker", s.core_op(json_op(s.stateful_op(op_create_worker))), @@ -48,7 +48,7 @@ struct CreateWorkerArgs { /// Create worker as the host fn op_create_worker( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -59,37 +59,31 @@ fn op_create_worker( let source_code = args.source_code.clone(); let args_name = args.name; let parent_state = state.clone(); - - let (load_sender, load_receiver) = - std::sync::mpsc::sync_channel::<JsonResult>(1); + let state = state.borrow(); + let global_state = state.global_state.clone(); + let child_permissions = state.permissions.clone(); + let referrer = state.main_module.to_string(); + drop(state); + + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<Result<WorkerChannelsExternal, ErrBox>>(1); + + // TODO(bartlomieju): Isn't this wrong? + let result = ModuleSpecifier::resolve_url_or_path(&specifier)?; + let module_specifier = if !has_source_code { + ModuleSpecifier::resolve_import(&specifier, &referrer)? + } else { + result + }; std::thread::spawn(move || { - // TODO(bartlomieju): Isn't this wrong? - let result = ModuleSpecifier::resolve_url_or_path(&specifier); - if let Err(err) = result { - load_sender.send(Err(err.into())).unwrap(); - return; - } - - let module_specifier = if !has_source_code { - let referrer = parent_state.main_module.to_string(); - let result = ModuleSpecifier::resolve_import(&specifier, &referrer); - if let Err(err) = result { - load_sender.send(Err(err.into())).unwrap(); - return; - } - result.unwrap() - } else { - result.unwrap() - }; - - let result = ThreadSafeState::new_for_worker( - parent_state.global_state.clone(), - Some(parent_state.permissions.clone()), // by default share with parent + let result = State::new_for_worker( + global_state, + Some(child_permissions), // by default share with parent module_specifier.clone(), ); if let Err(err) = result { - load_sender.send(Err(err)).unwrap(); + handle_sender.send(Err(err)).unwrap(); return; } let child_state = result.unwrap(); @@ -109,12 +103,12 @@ fn op_create_worker( js_check(worker.execute(&script)); js_check(worker.execute("runWorkerMessageLoop()")); - let worker_id = parent_state.add_child_worker(&worker); + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); + // FIXME(bartlomieju): runtime is not run in this case return; } @@ -128,14 +122,13 @@ fn op_create_worker( } .boxed_local(); - load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); - crate::tokio_util::run_basic(fut); }); - let r = load_receiver.recv().unwrap(); + let handle = handle_receiver.recv().unwrap()?; + let worker_id = parent_state.add_child_worker(handle); - Ok(JsonOp::Sync(r.unwrap())) + Ok(JsonOp::Sync(json!({ "id": worker_id }))) } #[derive(Deserialize)] @@ -144,16 +137,15 @@ struct WorkerArgs { } fn op_host_close_worker( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state_ = state.clone(); + let mut state = state.borrow_mut(); - let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker_handle = workers_table.remove(&id); + let maybe_worker_handle = state.workers.remove(&id); if let Some(worker_handle) = maybe_worker_handle { let mut sender = worker_handle.sender.clone(); sender.close_channel(); @@ -173,16 +165,16 @@ struct HostGetMessageArgs { /// Get message from guest worker as host fn op_host_get_message( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetMessageArgs = serde_json::from_value(args)?; - let state_ = state.clone(); let id = args.id as u32; - let mut table = state_.workers.lock().unwrap(); + + let state = state.borrow(); // TODO: don't return bad resource anymore - let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?; let fut = worker_handle.get_message(); let op = async move { let maybe_buf = fut.await; @@ -198,7 +190,7 @@ struct HostPostMessageArgs { /// Post message to guest worker as host fn op_host_post_message( - state: &ThreadSafeState, + state: &State, args: Value, data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -207,9 +199,9 @@ fn op_host_post_message( let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); debug!("post message to worker {}", id); - let mut table = state.workers.lock().unwrap(); + let state = state.borrow(); // TODO: don't return bad resource anymore - let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?; let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); @@ -218,10 +210,11 @@ fn op_host_post_message( } fn op_metrics( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { + let state = state.borrow(); let m = &state.metrics; Ok(JsonOp::Sync(json!({ diff --git a/cli/shell.rs b/cli/shell.rs index 268ec3b76..8e52f51fd 100644 --- a/cli/shell.rs +++ b/cli/shell.rs @@ -55,7 +55,7 @@ impl fmt::Debug for Shell { enum ShellOut { /// A plain write object without color support // TODO(ry) Disabling this type of output because it makes Shell - // not thread safe and thus not includable in ThreadSafeState. + // not thread safe and thus not includable in State. // But I think we will want this in the future. //Write(Box<dyn Write>), /// Color-enabled stdio, with information on whether color should be used diff --git a/cli/state.rs b/cli/state.rs index 86479d707..4e2f47e62 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -8,7 +8,6 @@ use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; -use crate::web_worker::WebWorker; use crate::worker::WorkerChannelsExternal; use crate::worker::WorkerChannelsInternal; use deno_core::Buf; @@ -19,68 +18,53 @@ use deno_core::ModuleSpecifier; use deno_core::Op; use deno_core::ResourceTable; use deno_core::ZeroCopyBuf; -use futures::channel::mpsc; use futures::future::FutureExt; use futures::future::TryFutureExt; use rand::rngs::StdRng; use rand::SeedableRng; use serde_json::Value; use std; +use std::cell::RefCell; use std::collections::HashMap; use std::ops::Deref; use std::path::Path; use std::pin::Pin; +use std::rc::Rc; use std::str; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::MutexGuard; use std::time::Instant; -/// Isolate cannot be passed between threads but ThreadSafeState can. -/// ThreadSafeState satisfies Send and Sync. So any state that needs to be -/// accessed outside the main V8 thread should be inside ThreadSafeState. -pub struct ThreadSafeState(Arc<State>); +#[derive(Clone)] +pub struct State(Rc<RefCell<StateInner>>); + +impl Deref for State { + type Target = Rc<RefCell<StateInner>>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} #[cfg_attr(feature = "cargo-clippy", allow(stutter))] -pub struct State { +pub struct StateInner { pub global_state: GlobalState, - pub permissions: Arc<Mutex<DenoPermissions>>, + pub permissions: DenoPermissions, pub main_module: ModuleSpecifier, /// When flags contains a `.import_map_path` option, the content of the /// import map file will be resolved and set. pub import_map: Option<ImportMap>, pub metrics: Metrics, - pub global_timer: Mutex<GlobalTimer>, - pub workers: Mutex<HashMap<u32, WorkerChannelsExternal>>, - pub worker_channels_internal: Mutex<Option<WorkerChannelsInternal>>, - pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>, + pub global_timer: GlobalTimer, + pub workers: HashMap<u32, WorkerChannelsExternal>, + pub worker_channels_internal: Option<WorkerChannelsInternal>, pub next_worker_id: AtomicUsize, pub start_time: Instant, - pub seeded_rng: Option<Mutex<StdRng>>, - pub resource_table: Mutex<ResourceTable>, + pub seeded_rng: Option<StdRng>, + pub resource_table: ResourceTable, pub target_lib: TargetLib, } -impl Clone for ThreadSafeState { - fn clone(&self) -> Self { - ThreadSafeState(self.0.clone()) - } -} - -impl Deref for ThreadSafeState { - type Target = Arc<State>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl ThreadSafeState { - pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> { - self.resource_table.lock().unwrap() - } - +impl State { /// Wrap core `OpDispatcher` to collect metrics. pub fn core_op<D>( &self, @@ -130,7 +114,7 @@ impl ThreadSafeState { dispatcher: D, ) -> impl Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>> where - D: Fn(&ThreadSafeState, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, + D: Fn(&State, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, { let state = self.clone(); @@ -149,11 +133,7 @@ impl ThreadSafeState { dispatcher: D, ) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox> where - D: Fn( - &ThreadSafeState, - Value, - Option<ZeroCopyBuf>, - ) -> Result<JsonOp, ErrBox>, + D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>, { let state = self.clone(); @@ -163,7 +143,7 @@ impl ThreadSafeState { } } -impl Loader for ThreadSafeState { +impl Loader for State { fn resolve( &self, specifier: &str, @@ -171,7 +151,7 @@ impl Loader for ThreadSafeState { is_main: bool, ) -> Result<ModuleSpecifier, ErrBox> { if !is_main { - if let Some(import_map) = &self.import_map { + if let Some(import_map) = &self.borrow().import_map { let result = import_map.resolve(specifier, referrer)?; if let Some(r) = result { return Ok(r); @@ -198,11 +178,12 @@ impl Loader for ThreadSafeState { } } + let state = self.borrow(); // TODO(bartlomieju): incrementing resolve_count here has no sense... - self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst); + state.metrics.resolve_count.fetch_add(1, Ordering::SeqCst); let module_url_specified = module_specifier.to_string(); - let global_state = self.global_state.clone(); - let target_lib = self.target_lib.clone(); + let global_state = state.global_state.clone(); + let target_lib = state.target_lib.clone(); let fut = async move { let compiled_module = global_state .fetch_compiled_module(module_specifier, maybe_referrer, target_lib) @@ -220,11 +201,11 @@ impl Loader for ThreadSafeState { } } -impl ThreadSafeState { +impl State { /// If `shared_permission` is None then permissions from globa state are used. pub fn new( global_state: GlobalState, - shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, + shared_permissions: Option<DenoPermissions>, main_module: ModuleSpecifier, ) -> Result<Self, ErrBox> { let import_map: Option<ImportMap> = @@ -234,116 +215,114 @@ impl ThreadSafeState { }; let seeded_rng = match global_state.flags.seed { - Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), + Some(seed) => Some(StdRng::seed_from_u64(seed)), None => None, }; let permissions = if let Some(perm) = shared_permissions { perm } else { - Arc::new(Mutex::new(global_state.permissions.clone())) + global_state.permissions.clone() }; - let state = State { + let state = Rc::new(RefCell::new(StateInner { global_state, main_module, permissions, import_map, metrics: Metrics::default(), - global_timer: Mutex::new(GlobalTimer::new()), - worker_channels_internal: Mutex::new(None), - workers: Mutex::new(HashMap::new()), - loading_workers: Mutex::new(HashMap::new()), + global_timer: GlobalTimer::new(), + worker_channels_internal: None, + workers: HashMap::new(), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, - resource_table: Mutex::new(ResourceTable::default()), + resource_table: ResourceTable::default(), target_lib: TargetLib::Main, - }; + })); - Ok(ThreadSafeState(Arc::new(state))) + Ok(Self(state)) } /// If `shared_permission` is None then permissions from globa state are used. pub fn new_for_worker( global_state: GlobalState, - shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, + shared_permissions: Option<DenoPermissions>, main_module: ModuleSpecifier, ) -> Result<Self, ErrBox> { let seeded_rng = match global_state.flags.seed { - Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), + Some(seed) => Some(StdRng::seed_from_u64(seed)), None => None, }; let permissions = if let Some(perm) = shared_permissions { perm } else { - Arc::new(Mutex::new(global_state.permissions.clone())) + global_state.permissions.clone() }; - let state = State { + let state = Rc::new(RefCell::new(StateInner { global_state, main_module, permissions, import_map: None, metrics: Metrics::default(), - global_timer: Mutex::new(GlobalTimer::new()), - worker_channels_internal: Mutex::new(None), - workers: Mutex::new(HashMap::new()), - loading_workers: Mutex::new(HashMap::new()), + global_timer: GlobalTimer::new(), + worker_channels_internal: None, + workers: HashMap::new(), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, - resource_table: Mutex::new(ResourceTable::default()), + resource_table: ResourceTable::default(), target_lib: TargetLib::Worker, - }; + })); - Ok(ThreadSafeState(Arc::new(state))) + Ok(Self(state)) } - pub fn add_child_worker(&self, worker: &WebWorker) -> u32 { - let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; - let handle = worker.thread_safe_handle(); - let mut workers_tl = self.workers.lock().unwrap(); - workers_tl.insert(worker_id, handle); + pub fn add_child_worker(&self, handle: WorkerChannelsExternal) -> u32 { + let mut inner_state = self.borrow_mut(); + let worker_id = + inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; + inner_state.workers.insert(worker_id, handle); worker_id } #[inline] pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_read(path) + self.borrow().permissions.check_read(path) } #[inline] pub fn check_write(&self, path: &Path) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_write(path) + self.borrow().permissions.check_write(path) } #[inline] pub fn check_env(&self) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_env() + self.borrow().permissions.check_env() } #[inline] pub fn check_net(&self, hostname: &str, port: u16) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_net(hostname, port) + self.borrow().permissions.check_net(hostname, port) } #[inline] pub fn check_net_url(&self, url: &url::Url) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_net_url(url) + self.borrow().permissions.check_net_url(url) } #[inline] pub fn check_run(&self) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_run() + self.borrow().permissions.check_run() } #[inline] pub fn check_plugin(&self, filename: &Path) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_plugin(filename) + self.borrow().permissions.check_plugin(filename) } pub fn check_dyn_import( @@ -371,10 +350,10 @@ impl ThreadSafeState { } #[cfg(test)] - pub fn mock(main_module: &str) -> ThreadSafeState { + pub fn mock(main_module: &str) -> State { let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module) .expect("Invalid entry module"); - ThreadSafeState::new( + State::new( GlobalState::mock(vec!["deno".to_string()]), None, module_specifier, @@ -387,28 +366,24 @@ impl ThreadSafeState { bytes_sent_control: usize, bytes_sent_data: usize, ) { - self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); - self + let state = self.borrow(); + state.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); + state .metrics .bytes_sent_control .fetch_add(bytes_sent_control, Ordering::SeqCst); - self + state .metrics .bytes_sent_data .fetch_add(bytes_sent_data, Ordering::SeqCst); } pub fn metrics_op_completed(&self, bytes_received: usize) { - self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); - self + let state = self.borrow(); + state.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); + state .metrics .bytes_received .fetch_add(bytes_received, Ordering::SeqCst); } } - -#[test] -fn thread_safe() { - fn f<S: Send + Sync>(_: S) {} - f(ThreadSafeState::mock("./hello.js")); -} diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 7115b9027..05e3184d9 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::ops; -use crate::state::ThreadSafeState; +use crate::state::State; use crate::worker::Worker; use deno_core; use deno_core::ErrBox; @@ -23,11 +23,7 @@ use std::task::Poll; pub struct WebWorker(Worker); impl WebWorker { - pub fn new( - name: String, - startup_data: StartupData, - state: ThreadSafeState, - ) -> Self { + pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let state_ = state.clone(); let mut worker = Worker::new(name, startup_data, state_); { @@ -70,11 +66,11 @@ impl Future for WebWorker { mod tests { use super::*; use crate::startup_data; - use crate::state::ThreadSafeState; + use crate::state::State; use crate::tokio_util; fn create_test_worker() -> WebWorker { - let state = ThreadSafeState::mock("./hello.js"); + let state = State::mock("./hello.js"); let mut worker = WebWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), @@ -104,48 +100,53 @@ mod tests { worker.execute(source).unwrap(); let handle = worker.thread_safe_handle(); - let _ = tokio_util::spawn_thread(move || tokio_util::run_basic(worker)); - - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()).await; - assert!(r.is_ok()); - - let maybe_msg = handle.get_message().await; - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()).await; - assert!(r.is_ok()); - - let maybe_msg = handle.get_message().await; - assert!(maybe_msg.is_some()); - assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg).await; - assert!(r.is_ok()); + let _ = tokio_util::spawn_thread(move || { + tokio_util::run_basic(async move { + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()).await; + assert!(r.is_ok()); + + let maybe_msg = handle.get_message().await; + assert!(maybe_msg.is_some()); + + let r = handle.post_message(msg.clone()).await; + assert!(r.is_ok()); + + let maybe_msg = handle.get_message().await; + assert!(maybe_msg.is_some()); + assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = handle.post_message(msg).await; + assert!(r.is_ok()); + }) }); + + let r = tokio_util::run_basic(worker); + assert!(r.is_ok()) } #[test] fn removed_from_resource_table_on_close() { let mut worker = create_test_worker(); let handle = worker.thread_safe_handle(); - let worker_complete_fut = tokio_util::spawn_thread(move || { - worker - .execute("onmessage = () => { delete self.onmessage; }") - .unwrap(); - tokio_util::run_basic(worker) + + worker + .execute("onmessage = () => { delete self.onmessage; }") + .unwrap(); + + let worker_post_message_fut = tokio_util::spawn_thread(move || { + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = futures::executor::block_on(handle.post_message(msg)); + assert!(r.is_ok()); }); - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); tokio_util::run_basic(async move { - let r = handle.post_message(msg).await; - assert!(r.is_ok()); - let r = worker_complete_fut.await; + worker_post_message_fut.await; + let r = worker.await; assert!(r.is_ok()); }); } diff --git a/cli/worker.rs b/cli/worker.rs index c7efa95fa..20b8b8021 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::fmt_errors::JSError; use crate::ops; -use crate::state::ThreadSafeState; +use crate::state::State; use deno_core; use deno_core::Buf; use deno_core::ErrBox; @@ -112,28 +112,24 @@ fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) { pub struct Worker { pub name: String, pub isolate: Box<deno_core::EsIsolate>, - pub state: ThreadSafeState, + pub state: State, external_channels: WorkerChannelsExternal, } impl Worker { - pub fn new( - name: String, - startup_data: StartupData, - state: ThreadSafeState, - ) -> Self { + pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let mut isolate = deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); - let global_state_ = state.global_state.clone(); + let global_state_ = state.borrow().global_state.clone(); isolate.set_js_error_create(move |v8_exception| { JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }); let (internal_channels, external_channels) = create_channels(); { - let mut c = state.worker_channels_internal.lock().unwrap(); - *c = Some(internal_channels); + let mut state = state.borrow_mut(); + state.worker_channels_internal = Some(internal_channels); } Self { @@ -170,7 +166,7 @@ impl Worker { ) -> Result<(), ErrBox> { let specifier = module_specifier.to_string(); let id = self.isolate.load_module(&specifier, maybe_code).await?; - self.state.global_state.progress.done(); + self.state.borrow().global_state.progress.done(); if !is_prefetch { return self.isolate.mod_evaluate(id); } @@ -203,11 +199,7 @@ impl Future for Worker { pub struct MainWorker(Worker); impl MainWorker { - pub fn new( - name: String, - startup_data: StartupData, - state: ThreadSafeState, - ) -> Self { + pub fn new(name: String, startup_data: StartupData, state: State) -> Self { let state_ = state.clone(); let mut worker = Worker::new(name, startup_data, state_); { @@ -257,7 +249,7 @@ mod tests { use crate::flags; use crate::global_state::GlobalState; use crate::startup_data; - use crate::state::ThreadSafeState; + use crate::state::State; use crate::tokio_util; use futures::executor::block_on; use std::sync::atomic::Ordering; @@ -280,8 +272,7 @@ mod tests { ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let global_state = GlobalState::new(flags::DenoFlags::default()).unwrap(); let state = - ThreadSafeState::new(global_state, None, module_specifier.clone()) - .unwrap(); + State::new(global_state, None, module_specifier.clone()).unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = @@ -296,8 +287,8 @@ mod tests { panic!("Future got unexpected error: {:?}", e); } }); - - let metrics = &state_.metrics; + let mut state = state_.borrow_mut(); + let metrics = &mut state.metrics; assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); // Check that we didn't start the compiler. assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0); @@ -313,8 +304,7 @@ mod tests { ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let global_state = GlobalState::new(flags::DenoFlags::default()).unwrap(); let state = - ThreadSafeState::new(global_state, None, module_specifier.clone()) - .unwrap(); + State::new(global_state, None, module_specifier.clone()).unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = @@ -330,7 +320,8 @@ mod tests { } }); - let metrics = &state_.metrics; + let mut state = state_.borrow_mut(); + let metrics = &mut state.metrics; // TODO assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2); // Check that we didn't start the compiler. assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0); @@ -353,12 +344,8 @@ mod tests { ..flags::DenoFlags::default() }; let global_state = GlobalState::new(flags).unwrap(); - let state = ThreadSafeState::new( - global_state.clone(), - None, - module_specifier.clone(), - ) - .unwrap(); + let state = + State::new(global_state.clone(), None, module_specifier.clone()).unwrap(); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), @@ -374,6 +361,7 @@ mod tests { if let Err(e) = (&mut *worker).await { panic!("Future got unexpected error: {:?}", e); } + let state = state.borrow_mut(); assert_eq!(state.metrics.resolve_count.load(Ordering::SeqCst), 3); // Check that we've only invoked the compiler once. assert_eq!( @@ -384,7 +372,7 @@ mod tests { } fn create_test_worker() -> MainWorker { - let state = ThreadSafeState::mock("./hello.js"); + let state = State::mock("./hello.js"); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), |