diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2018-09-27 17:33:10 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-27 17:33:10 -0400 |
commit | d38ccfc6dcb8643daa4f9e695d47a79cf068f90e (patch) | |
tree | d36ad2934e8550242d50e866f4ad2b6c303646b7 /src/isolate.rs | |
parent | bf93ca54dd85686c7b93a6189913e48e10de8dcf (diff) |
Support zero-copy data in libdeno.send(). (#838)
This is a large API refactor of deno.h which replaces
deno_send() and deno_set_response() with deno_respond().
It also adds a req_id parameter to the deno_recv_cb.
Make writeFile/writeFileSync use it.
Diffstat (limited to 'src/isolate.rs')
-rw-r--r-- | src/isolate.rs | 78 |
1 files changed, 52 insertions, 26 deletions
diff --git a/src/isolate.rs b/src/isolate.rs index 30e90be5c..8dfd4204a 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -36,12 +36,14 @@ pub type Buf = Box<[u8]>; pub type Op = Future<Item = Buf, Error = DenoError> + Send; // Returns (is_sync, op) -pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>); +pub type Dispatch = + fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8]) + -> (bool, Box<Op>); pub struct Isolate { ptr: *const libdeno::isolate, dispatch: Dispatch, - rx: mpsc::Receiver<Buf>, + rx: mpsc::Receiver<(i32, Buf)>, ntasks: i32, pub state: Arc<IsolateState>, } @@ -54,17 +56,17 @@ pub struct IsolateState { pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>, pub argv: Vec<String>, pub flags: flags::DenoFlags, - tx: Mutex<Option<mpsc::Sender<Buf>>>, + tx: Mutex<Option<mpsc::Sender<(i32, Buf)>>>, } impl IsolateState { // Thread safe. - fn send_to_js(&self, buf: Buf) { + fn send_to_js(&self, req_id: i32, buf: Buf) { let mut g = self.tx.lock().unwrap(); let maybe_tx = g.as_mut(); assert!(maybe_tx.is_some(), "Expected tx to not be deleted."); let tx = maybe_tx.unwrap(); - tx.send(buf).expect("tx.send error"); + tx.send((req_id, buf)).expect("tx.send error"); } } @@ -79,7 +81,7 @@ impl Isolate { let (flags, argv_rest) = flags::set_flags(argv); // This channel handles sending async messages back to the runtime. - let (tx, rx) = mpsc::channel::<Buf>(); + let (tx, rx) = mpsc::channel::<(i32, Buf)>(); let mut isolate = Box::new(Isolate { ptr: 0 as *const libdeno::isolate, @@ -131,12 +133,10 @@ impl Isolate { Ok(()) } - pub fn set_response(&self, buf: Buf) { - unsafe { libdeno::deno_set_response(self.ptr, buf.into()) } - } - - pub fn send(&self, buf: Buf) { - unsafe { libdeno::deno_send(self.ptr, buf.into()) }; + pub fn respond(&self, req_id: i32, buf: Buf) { + // TODO(zero-copy) Use Buf::leak(buf) to leak the heap allocated buf. And + // don't do the memcpy in ImportBuf() (in libdeno/binding.cc) + unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) } } // TODO Use Park abstraction? Note at time of writing Tokio default runtime @@ -144,12 +144,12 @@ impl Isolate { pub fn event_loop(&mut self) { // Main thread event loop. while !self.is_idle() { - let buf = self.rx.recv().unwrap(); + let (req_id, buf) = self.rx.recv().unwrap(); // Receiving a message on rx exactly corresponds to an async task // completing. self.ntasks_decrement(); // Call into JS with the buf. - self.send(buf); + self.respond(req_id, buf); } } @@ -189,18 +189,38 @@ impl From<Buf> for libdeno::deno_buf { } // Dereferences the C pointer into the Rust Isolate object. -extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) { - let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) }; +extern "C" fn pre_dispatch( + d: *const libdeno::isolate, + req_id: i32, + control_buf: libdeno::deno_buf, + data_buf: libdeno::deno_buf, +) { + // control_buf is only valid for the lifetime of this call, thus is + // interpretted as a slice. + let control_slice = unsafe { + std::slice::from_raw_parts(control_buf.data_ptr, control_buf.data_len) + }; + + // data_buf is valid for the lifetime of the promise, thus a mutable buf with + // static lifetime. + let data_slice = unsafe { + std::slice::from_raw_parts_mut::<'static>( + data_buf.data_ptr, + data_buf.data_len, + ) + }; + let isolate = Isolate::from_c(d); let dispatch = isolate.dispatch; - let (is_sync, op) = dispatch(isolate.state.clone(), bytes); + let (is_sync, op) = + dispatch(isolate.state.clone(), control_slice, data_slice); if is_sync { // Execute op synchronously. let buf = tokio_util::block_on(op).unwrap(); if buf.len() != 0 { // Set the synchronous response, the value returned from isolate.send(). - isolate.set_response(buf); + isolate.respond(req_id, buf); } } else { // Execute op asynchronously. @@ -213,7 +233,7 @@ extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) { let task = op .and_then(move |buf| { - state.send_to_js(buf); + state.send_to_js(req_id, buf); Ok(()) }).map_err(|_| ()); tokio::spawn(task); @@ -239,7 +259,8 @@ mod tests { fn unreachable_dispatch( _state: Arc<IsolateState>, - _buf: &[u8], + _control: &[u8], + _data: &'static mut [u8], ) -> (bool, Box<Op>) { unreachable!(); } @@ -267,14 +288,19 @@ mod tests { }); } - fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) { - assert_eq!(buf[0], 4); - assert_eq!(buf[1], 5); - assert_eq!(buf[2], 6); + fn dispatch_sync( + _state: Arc<IsolateState>, + control: &[u8], + data: &'static mut [u8], + ) -> (bool, Box<Op>) { + assert_eq!(control[0], 4); + assert_eq!(control[1], 5); + assert_eq!(control[2], 6); + assert_eq!(data.len(), 0); // Send back some sync response. let vec: Vec<u8> = vec![1, 2, 3]; - let buf = vec.into_boxed_slice(); - let op = Box::new(futures::future::ok(buf)); + let control = vec.into_boxed_slice(); + let op = Box::new(futures::future::ok(control)); (true, op) } } |