diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2018-09-27 17:33:10 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-27 17:33:10 -0400 |
commit | d38ccfc6dcb8643daa4f9e695d47a79cf068f90e (patch) | |
tree | d36ad2934e8550242d50e866f4ad2b6c303646b7 /src | |
parent | bf93ca54dd85686c7b93a6189913e48e10de8dcf (diff) |
Support zero-copy data in libdeno.send(). (#838)
This is a large API refactor of deno.h which replaces
deno_send() and deno_set_response() with deno_respond().
It also adds a req_id parameter to the deno_recv_cb.
Make writeFile/writeFileSync use it.
Diffstat (limited to 'src')
-rw-r--r-- | src/handlers.rs | 150 | ||||
-rw-r--r-- | src/isolate.rs | 78 | ||||
-rw-r--r-- | src/libdeno.rs | 10 | ||||
-rw-r--r-- | src/msg.fbs | 1 |
4 files changed, 183 insertions, 56 deletions
diff --git a/src/handlers.rs b/src/handlers.rs index 43097918d..62bdabb61 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -33,15 +33,21 @@ type OpResult = DenoResult<Buf>; // TODO Ideally we wouldn't have to box the Op being returned. // The box is just to make it easier to get a prototype refactor working. -type Handler = fn(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op>; +type Handler = + fn(state: Arc<IsolateState>, base: &msg::Base, data: &'static mut [u8]) + -> Box<Op>; // Hopefully Rust optimizes this away. fn empty_buf() -> Buf { Box::new([]) } -pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) { - let base = msg::get_root_as_base(bytes); +pub fn msg_from_js( + state: Arc<IsolateState>, + control: &[u8], + data: &'static mut [u8], +) -> (bool, Box<Op>) { + let base = msg::get_root_as_base(control); let is_sync = base.sync(); let msg_type = base.msg_type(); let cmd_id = base.cmd_id(); @@ -71,7 +77,7 @@ pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) { )), }; - let op: Box<Op> = handler(state.clone(), &base); + let op: Box<Op> = handler(state.clone(), &base, data); let boxed_op = Box::new( op.or_else(move |err: DenoError| -> DenoResult<Buf> { debug!("op err {}", err); @@ -130,12 +136,21 @@ fn not_implemented() -> DenoError { )) } -fn handle_exit(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_exit( + _config: Arc<IsolateState>, + base: &msg::Base, + _data: &'static mut [u8], +) -> Box<Op> { let msg = base.msg_as_exit().unwrap(); std::process::exit(msg.code()) } -fn handle_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_start( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let mut builder = FlatBufferBuilder::new(); let argv = state.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>(); @@ -191,7 +206,12 @@ fn odd_future(err: DenoError) -> Box<Op> { } // https://github.com/denoland/isolate/blob/golang/os.go#L100-L154 -fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_code_fetch( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_code_fetch().unwrap(); let cmd_id = base.cmd_id(); let module_specifier = msg.module_specifier().unwrap(); @@ -228,7 +248,12 @@ fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { } // https://github.com/denoland/isolate/blob/golang/os.go#L156-L169 -fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_code_cache( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_code_cache().unwrap(); let filename = msg.filename().unwrap(); let source_code = msg.source_code().unwrap(); @@ -239,7 +264,12 @@ fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { }())) } -fn handle_set_timeout(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_set_timeout( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_set_timeout().unwrap(); let val = msg.timeout() as isize; state @@ -248,7 +278,12 @@ fn handle_set_timeout(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { ok_future(empty_buf()) } -fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_set_env( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_set_env().unwrap(); let key = msg.key().unwrap(); let value = msg.value().unwrap(); @@ -261,7 +296,12 @@ fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { ok_future(empty_buf()) } -fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_env( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let cmd_id = base.cmd_id(); if !state.flags.allow_env { @@ -302,7 +342,12 @@ fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { )) } -fn handle_fetch_req(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_fetch_req( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_fetch_req().unwrap(); let cmd_id = base.cmd_id(); let id = msg.id(); @@ -436,7 +481,12 @@ macro_rules! blocking { }; } -fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_make_temp_dir( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let base = Box::new(*base); let msg = base.msg_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); @@ -480,7 +530,12 @@ fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { }) } -fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_mkdir( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_mkdir().unwrap(); let mode = msg.mode(); let path = String::from(msg.path().unwrap()); @@ -496,7 +551,12 @@ fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { }) } -fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_remove( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_remove().unwrap(); let path = PathBuf::from(msg.path().unwrap()); let recursive = msg.recursive(); @@ -520,7 +580,12 @@ fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { } // Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184 -fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_read_file( + _config: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_read_file().unwrap(); let cmd_id = base.cmd_id(); let filename = PathBuf::from(msg.filename().unwrap()); @@ -570,7 +635,12 @@ fn get_mode(_perm: fs::Permissions) -> u32 { 0 } -fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_stat( + _config: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_stat().unwrap(); let cmd_id = base.cmd_id(); let filename = PathBuf::from(msg.filename().unwrap()); @@ -612,7 +682,11 @@ fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { }) } -fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_write_file( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { let msg = base.msg_as_write_file().unwrap(); if !state.flags.allow_write { @@ -620,12 +694,11 @@ fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { } let filename = String::from(msg.filename().unwrap()); - let data = Vec::from(msg.data().unwrap()); let perm = msg.perm(); blocking!(base.sync(), || -> OpResult { - debug!("handle_write_file {}", filename); - deno_fs::write_file(Path::new(&filename), data.as_slice(), perm)?; + debug!("handle_write_file {} {}", filename, data.len()); + deno_fs::write_file(Path::new(&filename), data, perm)?; Ok(empty_buf()) }) } @@ -636,7 +709,12 @@ fn remove_timer(state: Arc<IsolateState>, timer_id: u32) { } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39 -fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_timer_start( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); debug!("handle_timer_start"); let msg = base.msg_as_timer_start().unwrap(); let cmd_id = base.cmd_id(); @@ -679,14 +757,24 @@ fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { } // Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43 -fn handle_timer_clear(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_timer_clear( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_timer_clear().unwrap(); debug!("handle_timer_clear"); remove_timer(state, msg.id()); ok_future(empty_buf()) } -fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_rename( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); if !state.flags.allow_write { return odd_future(permission_denied()); } @@ -700,7 +788,12 @@ fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { }) } -fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_symlink( + state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); if !state.flags.allow_write { return odd_future(permission_denied()); } @@ -720,7 +813,12 @@ fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { }) } -fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> { +fn handle_read_link( + _state: Arc<IsolateState>, + base: &msg::Base, + data: &'static mut [u8], +) -> Box<Op> { + assert_eq!(data.len(), 0); let msg = base.msg_as_readlink().unwrap(); let cmd_id = base.cmd_id(); let name = PathBuf::from(msg.name().unwrap()); diff --git a/src/isolate.rs b/src/isolate.rs index 30e90be5c..8dfd4204a 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -36,12 +36,14 @@ pub type Buf = Box<[u8]>; pub type Op = Future<Item = Buf, Error = DenoError> + Send; // Returns (is_sync, op) -pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>); +pub type Dispatch = + fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8]) + -> (bool, Box<Op>); pub struct Isolate { ptr: *const libdeno::isolate, dispatch: Dispatch, - rx: mpsc::Receiver<Buf>, + rx: mpsc::Receiver<(i32, Buf)>, ntasks: i32, pub state: Arc<IsolateState>, } @@ -54,17 +56,17 @@ pub struct IsolateState { pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>, pub argv: Vec<String>, pub flags: flags::DenoFlags, - tx: Mutex<Option<mpsc::Sender<Buf>>>, + tx: Mutex<Option<mpsc::Sender<(i32, Buf)>>>, } impl IsolateState { // Thread safe. - fn send_to_js(&self, buf: Buf) { + fn send_to_js(&self, req_id: i32, buf: Buf) { let mut g = self.tx.lock().unwrap(); let maybe_tx = g.as_mut(); assert!(maybe_tx.is_some(), "Expected tx to not be deleted."); let tx = maybe_tx.unwrap(); - tx.send(buf).expect("tx.send error"); + tx.send((req_id, buf)).expect("tx.send error"); } } @@ -79,7 +81,7 @@ impl Isolate { let (flags, argv_rest) = flags::set_flags(argv); // This channel handles sending async messages back to the runtime. - let (tx, rx) = mpsc::channel::<Buf>(); + let (tx, rx) = mpsc::channel::<(i32, Buf)>(); let mut isolate = Box::new(Isolate { ptr: 0 as *const libdeno::isolate, @@ -131,12 +133,10 @@ impl Isolate { Ok(()) } - pub fn set_response(&self, buf: Buf) { - unsafe { libdeno::deno_set_response(self.ptr, buf.into()) } - } - - pub fn send(&self, buf: Buf) { - unsafe { libdeno::deno_send(self.ptr, buf.into()) }; + pub fn respond(&self, req_id: i32, buf: Buf) { + // TODO(zero-copy) Use Buf::leak(buf) to leak the heap allocated buf. And + // don't do the memcpy in ImportBuf() (in libdeno/binding.cc) + unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) } } // TODO Use Park abstraction? Note at time of writing Tokio default runtime @@ -144,12 +144,12 @@ impl Isolate { pub fn event_loop(&mut self) { // Main thread event loop. while !self.is_idle() { - let buf = self.rx.recv().unwrap(); + let (req_id, buf) = self.rx.recv().unwrap(); // Receiving a message on rx exactly corresponds to an async task // completing. self.ntasks_decrement(); // Call into JS with the buf. - self.send(buf); + self.respond(req_id, buf); } } @@ -189,18 +189,38 @@ impl From<Buf> for libdeno::deno_buf { } // Dereferences the C pointer into the Rust Isolate object. -extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) { - let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; +extern "C" fn pre_dispatch( + d: *const libdeno::isolate, + req_id: i32, + control_buf: libdeno::deno_buf, + data_buf: libdeno::deno_buf, +) { + // control_buf is only valid for the lifetime of this call, thus is + // interpretted as a slice. + let control_slice = unsafe { + std::slice::from_raw_parts(control_buf.data_ptr, control_buf.data_len) + }; + + // data_buf is valid for the lifetime of the promise, thus a mutable buf with + // static lifetime. + let data_slice = unsafe { + std::slice::from_raw_parts_mut::<'static>( + data_buf.data_ptr, + data_buf.data_len, + ) + }; + let isolate = Isolate::from_c(d); let dispatch = isolate.dispatch; - let (is_sync, op) = dispatch(isolate.state.clone(), bytes); + let (is_sync, op) = + dispatch(isolate.state.clone(), control_slice, data_slice); if is_sync { // Execute op synchronously. let buf = tokio_util::block_on(op).unwrap(); if buf.len() != 0 { // Set the synchronous response, the value returned from isolate.send(). - isolate.set_response(buf); + isolate.respond(req_id, buf); } } else { // Execute op asynchronously. @@ -213,7 +233,7 @@ extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) { let task = op .and_then(move |buf| { - state.send_to_js(buf); + state.send_to_js(req_id, buf); Ok(()) }).map_err(|_| ()); tokio::spawn(task); @@ -239,7 +259,8 @@ mod tests { fn unreachable_dispatch( _state: Arc<IsolateState>, - _buf: &[u8], + _control: &[u8], + _data: &'static mut [u8], ) -> (bool, Box<Op>) { unreachable!(); } @@ -267,14 +288,19 @@ mod tests { }); } - fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) { - assert_eq!(buf[0], 4); - assert_eq!(buf[1], 5); - assert_eq!(buf[2], 6); + fn dispatch_sync( + _state: Arc<IsolateState>, + control: &[u8], + data: &'static mut [u8], + ) -> (bool, Box<Op>) { + assert_eq!(control[0], 4); + assert_eq!(control[1], 5); + assert_eq!(control[2], 6); + assert_eq!(data.len(), 0); // Send back some sync response. let vec: Vec<u8> = vec![1, 2, 3]; - let buf = vec.into_boxed_slice(); - let op = Box::new(futures::future::ok(buf)); + let control = vec.into_boxed_slice(); + let op = Box::new(futures::future::ok(control)); (true, op) } } diff --git a/src/libdeno.rs b/src/libdeno.rs index 637b3ef91..c7a03e3bb 100644 --- a/src/libdeno.rs +++ b/src/libdeno.rs @@ -19,7 +19,12 @@ pub struct deno_buf { pub data_len: usize, } -type DenoRecvCb = unsafe extern "C" fn(d: *const isolate, buf: deno_buf); +type DenoRecvCb = unsafe extern "C" fn( + d: *const isolate, + req_id: i32, + buf: deno_buf, + data_buf: deno_buf, +); extern "C" { pub fn deno_init(); @@ -29,8 +34,7 @@ extern "C" { pub fn deno_delete(i: *const isolate); pub fn deno_last_exception(i: *const isolate) -> *const c_char; pub fn deno_get_data(i: *const isolate) -> *const c_void; - pub fn deno_set_response(i: *const isolate, buf: deno_buf); - pub fn deno_send(i: *const isolate, buf: deno_buf); + pub fn deno_respond(i: *const isolate, req_id: i32, buf: deno_buf); pub fn deno_execute( i: *const isolate, js_filename: *const c_char, diff --git a/src/msg.fbs b/src/msg.fbs index 52eba8ca6..0d78395ea 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -199,7 +199,6 @@ table ReadFileRes { table WriteFile { filename: string; - data: [ubyte]; perm: uint; // perm specified by https://godoc.org/os#FileMode } |