summaryrefslogtreecommitdiff
path: root/src/isolate.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/isolate.rs')
-rw-r--r--src/isolate.rs78
1 files changed, 52 insertions, 26 deletions
diff --git a/src/isolate.rs b/src/isolate.rs
index 30e90be5c..8dfd4204a 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -36,12 +36,14 @@ pub type Buf = Box<[u8]>;
pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
-pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>);
+pub type Dispatch =
+ fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8])
+ -> (bool, Box<Op>);
pub struct Isolate {
ptr: *const libdeno::isolate,
dispatch: Dispatch,
- rx: mpsc::Receiver<Buf>,
+ rx: mpsc::Receiver<(i32, Buf)>,
ntasks: i32,
pub state: Arc<IsolateState>,
}
@@ -54,17 +56,17 @@ pub struct IsolateState {
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
- tx: Mutex<Option<mpsc::Sender<Buf>>>,
+ tx: Mutex<Option<mpsc::Sender<(i32, Buf)>>>,
}
impl IsolateState {
// Thread safe.
- fn send_to_js(&self, buf: Buf) {
+ fn send_to_js(&self, req_id: i32, buf: Buf) {
let mut g = self.tx.lock().unwrap();
let maybe_tx = g.as_mut();
assert!(maybe_tx.is_some(), "Expected tx to not be deleted.");
let tx = maybe_tx.unwrap();
- tx.send(buf).expect("tx.send error");
+ tx.send((req_id, buf)).expect("tx.send error");
}
}
@@ -79,7 +81,7 @@ impl Isolate {
let (flags, argv_rest) = flags::set_flags(argv);
// This channel handles sending async messages back to the runtime.
- let (tx, rx) = mpsc::channel::<Buf>();
+ let (tx, rx) = mpsc::channel::<(i32, Buf)>();
let mut isolate = Box::new(Isolate {
ptr: 0 as *const libdeno::isolate,
@@ -131,12 +133,10 @@ impl Isolate {
Ok(())
}
- pub fn set_response(&self, buf: Buf) {
- unsafe { libdeno::deno_set_response(self.ptr, buf.into()) }
- }
-
- pub fn send(&self, buf: Buf) {
- unsafe { libdeno::deno_send(self.ptr, buf.into()) };
+ pub fn respond(&self, req_id: i32, buf: Buf) {
+ // TODO(zero-copy) Use Buf::leak(buf) to leak the heap allocated buf. And
+ // don't do the memcpy in ImportBuf() (in libdeno/binding.cc)
+ unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) }
}
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
@@ -144,12 +144,12 @@ impl Isolate {
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
- let buf = self.rx.recv().unwrap();
+ let (req_id, buf) = self.rx.recv().unwrap();
// Receiving a message on rx exactly corresponds to an async task
// completing.
self.ntasks_decrement();
// Call into JS with the buf.
- self.send(buf);
+ self.respond(req_id, buf);
}
}
@@ -189,18 +189,38 @@ impl From<Buf> for libdeno::deno_buf {
}
// Dereferences the C pointer into the Rust Isolate object.
-extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
- let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
+extern "C" fn pre_dispatch(
+ d: *const libdeno::isolate,
+ req_id: i32,
+ control_buf: libdeno::deno_buf,
+ data_buf: libdeno::deno_buf,
+) {
+ // control_buf is only valid for the lifetime of this call, thus is
+ // interpretted as a slice.
+ let control_slice = unsafe {
+ std::slice::from_raw_parts(control_buf.data_ptr, control_buf.data_len)
+ };
+
+ // data_buf is valid for the lifetime of the promise, thus a mutable buf with
+ // static lifetime.
+ let data_slice = unsafe {
+ std::slice::from_raw_parts_mut::<'static>(
+ data_buf.data_ptr,
+ data_buf.data_len,
+ )
+ };
+
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
- let (is_sync, op) = dispatch(isolate.state.clone(), bytes);
+ let (is_sync, op) =
+ dispatch(isolate.state.clone(), control_slice, data_slice);
if is_sync {
// Execute op synchronously.
let buf = tokio_util::block_on(op).unwrap();
if buf.len() != 0 {
// Set the synchronous response, the value returned from isolate.send().
- isolate.set_response(buf);
+ isolate.respond(req_id, buf);
}
} else {
// Execute op asynchronously.
@@ -213,7 +233,7 @@ extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
let task = op
.and_then(move |buf| {
- state.send_to_js(buf);
+ state.send_to_js(req_id, buf);
Ok(())
}).map_err(|_| ());
tokio::spawn(task);
@@ -239,7 +259,8 @@ mod tests {
fn unreachable_dispatch(
_state: Arc<IsolateState>,
- _buf: &[u8],
+ _control: &[u8],
+ _data: &'static mut [u8],
) -> (bool, Box<Op>) {
unreachable!();
}
@@ -267,14 +288,19 @@ mod tests {
});
}
- fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) {
- assert_eq!(buf[0], 4);
- assert_eq!(buf[1], 5);
- assert_eq!(buf[2], 6);
+ fn dispatch_sync(
+ _state: Arc<IsolateState>,
+ control: &[u8],
+ data: &'static mut [u8],
+ ) -> (bool, Box<Op>) {
+ assert_eq!(control[0], 4);
+ assert_eq!(control[1], 5);
+ assert_eq!(control[2], 6);
+ assert_eq!(data.len(), 0);
// Send back some sync response.
let vec: Vec<u8> = vec![1, 2, 3];
- let buf = vec.into_boxed_slice();
- let op = Box::new(futures::future::ok(buf));
+ let control = vec.into_boxed_slice();
+ let op = Box::new(futures::future::ok(control));
(true, op)
}
}