diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2018-09-18 11:53:16 -0700 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2018-09-25 17:02:49 -0400 |
commit | 4fd2b19f640d19e57511eb142b63e16c879ef6fd (patch) | |
tree | d45db774ca5d51ecaac5491aec68db0cbfdcdf27 /src/handlers.rs | |
parent | 7c128df4a041f3b2c04725a0f5f3320db684d067 (diff) |
Make Deno multithreaded.
By using the tokio default runtime.
This patch makes all of the ops thread safe.
Adds libdeno to JS globals to make for easier testing.
Preliminary work for #733.
Diffstat (limited to 'src/handlers.rs')
-rw-r--r-- | src/handlers.rs | 269 |
1 files changed, 119 insertions, 150 deletions
diff --git a/src/handlers.rs b/src/handlers.rs index 623c64110..c914df36d 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,46 +1,44 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. + use errors::DenoError; use errors::DenoResult; -use flatbuffers::FlatBufferBuilder; use fs as deno_fs; +use isolate::Buf; +use isolate::IsolateState; +use isolate::Op; +use msg; + +use flatbuffers::FlatBufferBuilder; use futures; use futures::sync::oneshot; use hyper; use hyper::rt::{Future, Stream}; use hyper::Client; -use isolate::from_c; -use libdeno; -use libdeno::{deno_buf, isolate}; -use msg; use remove_dir_all::remove_dir_all; use std; use std::fs; #[cfg(any(unix))] use std::os::unix::fs::PermissionsExt; use std::path::Path; +use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use tokio::timer::Delay; -// Buf represents a byte array returned from a "Op". -// The message might be empty (which will be translated into a null object on -// the javascript side) or it is a heap allocated opaque sequence of bytes. -// Usually a flatbuffer message. -type Buf = Option<Box<[u8]>>; - -// JS promises in Deno map onto a specific Future -// which yields either a DenoError or a byte array. -type Op = Future<Item = Buf, Error = DenoError>; - type OpResult = DenoResult<Buf>; // TODO Ideally we wouldn't have to box the Op being returned. // The box is just to make it easier to get a prototype refactor working. -type Handler = fn(i: *const isolate, base: &msg::Base) -> Box<Op>; +type Handler = fn(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op>; -pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) { - let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; +// Hopefully Rust optimizes this away. +fn empty_buf() -> Buf { + Box::new([]) +} + +pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) { let base = msg::get_root_as_base(bytes); + let is_sync = base.sync(); let msg_type = base.msg_type(); let cmd_id = base.cmd_id(); let handler: Handler = match msg_type { @@ -68,73 +66,51 @@ pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) { )), }; - let future = handler(i, &base); - let future = future.or_else(move |err| { - // No matter whether we got an Err or Ok, we want a serialized message to - // send back. So transform the DenoError into a deno_buf. - let builder = &mut FlatBufferBuilder::new(); - let errmsg_offset = builder.create_string(&format!("{}", err)); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - error: Some(errmsg_offset), - error_kind: err.kind(), - ..Default::default() - }, - )) - }); - - let isolate = from_c(i); - if base.sync() { - // Execute future synchronously. - // println!("sync handler {}", msg::enum_name_any(msg_type)); - let maybe_box_u8 = future.wait().unwrap(); - match maybe_box_u8 { - None => {} - Some(box_u8) => { - let buf = deno_buf_from(box_u8); - // Set the synchronous response, the value returned from isolate.send(). - unsafe { libdeno::deno_set_response(i, buf) } - } - } - } else { - // Execute future asynchornously. - let future = future.and_then(move |maybe_box_u8| { - let buf = match maybe_box_u8 { - Some(box_u8) => deno_buf_from(box_u8), - None => { - // async RPCs that return None still need to - // send a message back to signal completion. - let builder = &mut FlatBufferBuilder::new(); - deno_buf_from( - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - ).unwrap(), - ) - } + let op: Box<Op> = handler(state.clone(), &base); + let boxed_op = Box::new( + op.or_else(move |err: DenoError| -> DenoResult<Buf> { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a deno_buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }).and_then(move |buf: Buf| -> DenoResult<Buf> { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if is_sync || buf.len() > 0 { + buf + } else { + // async RPCs that return empty still need to + // send a message back to signal completion. + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) }; - // TODO(ry) make this thread safe. - unsafe { libdeno::deno_send(i, buf) }; - Ok(()) - }); - isolate.rt.spawn(future); - } -} + Ok(buf) + }), + ); -fn deno_buf_from(x: Box<[u8]>) -> deno_buf { - let len = x.len(); - let ptr = Box::into_raw(x); - deno_buf { - alloc_ptr: 0 as *mut u8, - alloc_len: 0, - data_ptr: ptr as *mut u8, - data_len: len, - } + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(msg_type), + base.sync() + ); + return (base.sync(), boxed_op); } fn permission_denied() -> DenoError { @@ -151,16 +127,15 @@ fn not_implemented() -> DenoError { )) } -fn handle_exit(_i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_exit(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_exit().unwrap(); std::process::exit(msg.code()) } -fn handle_start(i: *const isolate, base: &msg::Base) -> Box<Op> { - let isolate = from_c(i); +fn handle_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let mut builder = FlatBufferBuilder::new(); - let argv = isolate.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>(); + let argv = state.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>(); let argv_off = builder.create_vector_of_strings(argv.as_slice()); let cwd_path = std::env::current_dir().unwrap(); @@ -172,8 +147,8 @@ fn handle_start(i: *const isolate, base: &msg::Base) -> Box<Op> { &msg::StartResArgs { cwd: Some(cwd_off), argv: Some(argv_off), - debug_flag: isolate.flags.log_debug, - recompile_flag: isolate.flags.recompile, + debug_flag: state.flags.log_debug, + recompile_flag: state.flags.recompile, ..Default::default() }, ); @@ -200,7 +175,7 @@ fn serialize_response( let data = builder.finished_data(); // println!("serialize_response {:x?}", data); let vec = data.to_vec(); - Some(vec.into_boxed_slice()) + vec.into_boxed_slice() } fn ok_future(buf: Buf) -> Box<Op> { @@ -213,22 +188,17 @@ fn odd_future(err: DenoError) -> Box<Op> { } // https://github.com/denoland/isolate/blob/golang/os.go#L100-L154 -fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_code_fetch().unwrap(); let cmd_id = base.cmd_id(); let module_specifier = msg.module_specifier().unwrap(); let containing_file = msg.containing_file().unwrap(); - let isolate = from_c(i); - assert_eq!( - isolate.dir.root.join("gen"), - isolate.dir.gen, - "Sanity check" - ); + assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check"); Box::new(futures::future::result(|| -> OpResult { let builder = &mut FlatBufferBuilder::new(); - let out = isolate.dir.code_fetch(module_specifier, containing_file)?; + let out = state.dir.code_fetch(module_specifier, containing_file)?; let mut msg_args = msg::CodeFetchResArgs { module_name: Some(builder.create_string(&out.module_name)), filename: Some(builder.create_string(&out.filename)), @@ -255,36 +225,34 @@ fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box<Op> { } // https://github.com/denoland/isolate/blob/golang/os.go#L156-L169 -fn handle_code_cache(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_code_cache().unwrap(); let filename = msg.filename().unwrap(); let source_code = msg.source_code().unwrap(); let output_code = msg.output_code().unwrap(); Box::new(futures::future::result(|| -> OpResult { - let isolate = from_c(i); - isolate.dir.code_cache(filename, source_code, output_code)?; - Ok(None) + state.dir.code_cache(filename, source_code, output_code)?; + Ok(empty_buf()) }())) } -fn handle_set_env(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_set_env().unwrap(); let key = msg.key().unwrap(); let value = msg.value().unwrap(); - let isolate = from_c(i); - if !isolate.flags.allow_env { + if !state.flags.allow_env { return odd_future(permission_denied()); } std::env::set_var(key, value); - ok_future(None) + ok_future(empty_buf()) } -fn handle_env(i: *const isolate, base: &msg::Base) -> Box<Op> { - let isolate = from_c(i); +fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let cmd_id = base.cmd_id(); - if !isolate.flags.allow_env { + + if !state.flags.allow_env { return odd_future(permission_denied()); } @@ -322,22 +290,23 @@ fn handle_env(i: *const isolate, base: &msg::Base) -> Box<Op> { )) } -fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_fetch_req(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_fetch_req().unwrap(); let cmd_id = base.cmd_id(); let id = msg.id(); let url = msg.url().unwrap(); - let isolate = from_c(i); - if !isolate.flags.allow_net { + if !state.flags.allow_net { return odd_future(permission_denied()); } let url = url.parse::<hyper::Uri>().unwrap(); let client = Client::new(); + debug!("Before fetch {}", url); let future = client.get(url).and_then(move |res| { let status = res.status().as_u16() as i32; + debug!("fetch {}", status); let headers = { let map = res.headers(); @@ -361,6 +330,7 @@ fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box<Op> { let future = future.map_err(|err| -> DenoError { err.into() }).and_then( move |(status, body, headers)| { + debug!("fetch body "); let builder = &mut FlatBufferBuilder::new(); // Send the first message without a body. This is just to indicate // what status code. @@ -422,7 +392,7 @@ where (delay_task, cancel_tx) } -fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let base = Box::new(*base); let msg = base.msg_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); @@ -430,8 +400,7 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> { let prefix = msg.prefix(); let suffix = msg.suffix(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use blocking() here. @@ -461,28 +430,28 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> { }())) } -fn handle_mkdir(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_mkdir().unwrap(); let mode = msg.mode(); let path = msg.path().unwrap(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use tokio_threadpool. Box::new(futures::future::result(|| -> OpResult { debug!("handle_mkdir {}", path); deno_fs::mkdir(Path::new(path), mode)?; - Ok(None) + Ok(empty_buf()) }())) } -fn handle_remove(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_remove().unwrap(); let path = msg.path().unwrap(); let recursive = msg.recursive(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use tokio_threadpool. @@ -499,12 +468,12 @@ fn handle_remove(i: *const isolate, base: &msg::Base) -> Box<Op> { fs::remove_dir(&path_)?; } } - Ok(None) + Ok(empty_buf()) }())) } // Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184 -fn handle_read_file(_i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_read_file().unwrap(); let cmd_id = base.cmd_id(); let filename = String::from(msg.filename().unwrap()); @@ -554,7 +523,7 @@ fn get_mode(_perm: fs::Permissions) -> u32 { 0 } -fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_stat().unwrap(); let cmd_id = base.cmd_id(); let filename = String::from(msg.filename().unwrap()); @@ -597,48 +566,49 @@ fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box<Op> { }())) } -fn handle_write_file(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_write_file().unwrap(); let filename = String::from(msg.filename().unwrap()); let data = msg.data().unwrap(); let perm = msg.perm(); - let isolate = from_c(i); - if !isolate.flags.allow_write { + if !state.flags.allow_write { return odd_future(permission_denied()); } + Box::new(futures::future::result(|| -> OpResult { debug!("handle_write_file {}", filename); deno_fs::write_file(Path::new(&filename), data, perm)?; - Ok(None) + Ok(empty_buf()) }())) } -fn remove_timer(i: *const isolate, timer_id: u32) { - let isolate = from_c(i); - isolate.timers.remove(&timer_id); +fn remove_timer(state: Arc<IsolateState>, timer_id: u32) { + let mut timers = state.timers.lock().unwrap(); + timers.remove(&timer_id); } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39 -fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { debug!("handle_timer_start"); let msg = base.msg_as_timer_start().unwrap(); let cmd_id = base.cmd_id(); let timer_id = msg.id(); let delay = msg.delay(); - let isolate = from_c(i); + let config2 = state.clone(); let future = { let (delay_task, cancel_delay) = set_timeout( move || { - remove_timer(i, timer_id); + remove_timer(config2, timer_id); }, delay, ); - isolate.timers.insert(timer_id, cancel_delay); + let mut timers = state.timers.lock().unwrap(); + timers.insert(timer_id, cancel_delay); delay_task }; - Box::new(future.then(move |result| { + let r = Box::new(future.then(move |result| { let builder = &mut FlatBufferBuilder::new(); let msg = msg::TimerReady::create( builder, @@ -657,20 +627,20 @@ fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box<Op> { ..Default::default() }, )) - })) + })); + r } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43 -fn handle_timer_clear(i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_timer_clear(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_timer_clear().unwrap(); debug!("handle_timer_clear"); - remove_timer(i, msg.id()); - ok_future(None) + remove_timer(state, msg.id()); + ok_future(empty_buf()) } -fn handle_rename(i: *const isolate, base: &msg::Base) -> Box<Op> { - let isolate = from_c(i); - if !isolate.flags.allow_write { +fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { + if !state.flags.allow_write { return odd_future(permission_denied()); } let msg = base.msg_as_rename().unwrap(); @@ -679,13 +649,12 @@ fn handle_rename(i: *const isolate, base: &msg::Base) -> Box<Op> { Box::new(futures::future::result(|| -> OpResult { debug!("handle_rename {} {}", oldpath, newpath); fs::rename(Path::new(&oldpath), Path::new(&newpath))?; - Ok(None) + Ok(empty_buf()) }())) } -fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box<Op> { - let deno = from_c(i); - if !deno.flags.allow_write { +fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { + if !state.flags.allow_write { return odd_future(permission_denied()); } // TODO Use type for Windows. @@ -699,12 +668,12 @@ fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box<Op> { debug!("handle_symlink {} {}", oldname, newname); #[cfg(any(unix))] std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?; - Ok(None) + Ok(empty_buf()) }())) } } -fn handle_read_link(_i: *const isolate, base: &msg::Base) -> Box<Op> { +fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { let msg = base.msg_as_readlink().unwrap(); let cmd_id = base.cmd_id(); let name = String::from(msg.name().unwrap()); |