summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2018-09-27 17:33:10 -0400
committerGitHub <noreply@github.com>2018-09-27 17:33:10 -0400
commitd38ccfc6dcb8643daa4f9e695d47a79cf068f90e (patch)
treed36ad2934e8550242d50e866f4ad2b6c303646b7 /src
parentbf93ca54dd85686c7b93a6189913e48e10de8dcf (diff)
Support zero-copy data in libdeno.send(). (#838)
This is a large API refactor of deno.h which replaces deno_send() and deno_set_response() with deno_respond(). It also adds a req_id parameter to the deno_recv_cb. Make writeFile/writeFileSync use it.
Diffstat (limited to 'src')
-rw-r--r--src/handlers.rs150
-rw-r--r--src/isolate.rs78
-rw-r--r--src/libdeno.rs10
-rw-r--r--src/msg.fbs1
4 files changed, 183 insertions, 56 deletions
diff --git a/src/handlers.rs b/src/handlers.rs
index 43097918d..62bdabb61 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -33,15 +33,21 @@ type OpResult = DenoResult<Buf>;
// TODO Ideally we wouldn't have to box the Op being returned.
// The box is just to make it easier to get a prototype refactor working.
-type Handler = fn(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op>;
+type Handler =
+ fn(state: Arc<IsolateState>, base: &msg::Base, data: &'static mut [u8])
+ -> Box<Op>;
// Hopefully Rust optimizes this away.
fn empty_buf() -> Buf {
Box::new([])
}
-pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) {
- let base = msg::get_root_as_base(bytes);
+pub fn msg_from_js(
+ state: Arc<IsolateState>,
+ control: &[u8],
+ data: &'static mut [u8],
+) -> (bool, Box<Op>) {
+ let base = msg::get_root_as_base(control);
let is_sync = base.sync();
let msg_type = base.msg_type();
let cmd_id = base.cmd_id();
@@ -71,7 +77,7 @@ pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) {
)),
};
- let op: Box<Op> = handler(state.clone(), &base);
+ let op: Box<Op> = handler(state.clone(), &base, data);
let boxed_op = Box::new(
op.or_else(move |err: DenoError| -> DenoResult<Buf> {
debug!("op err {}", err);
@@ -130,12 +136,21 @@ fn not_implemented() -> DenoError {
))
}
-fn handle_exit(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_exit(
+ _config: Arc<IsolateState>,
+ base: &msg::Base,
+ _data: &'static mut [u8],
+) -> Box<Op> {
let msg = base.msg_as_exit().unwrap();
std::process::exit(msg.code())
}
-fn handle_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_start(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let mut builder = FlatBufferBuilder::new();
let argv = state.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>();
@@ -191,7 +206,12 @@ fn odd_future(err: DenoError) -> Box<Op> {
}
// https://github.com/denoland/isolate/blob/golang/os.go#L100-L154
-fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_code_fetch(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_code_fetch().unwrap();
let cmd_id = base.cmd_id();
let module_specifier = msg.module_specifier().unwrap();
@@ -228,7 +248,12 @@ fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
}
// https://github.com/denoland/isolate/blob/golang/os.go#L156-L169
-fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_code_cache(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_code_cache().unwrap();
let filename = msg.filename().unwrap();
let source_code = msg.source_code().unwrap();
@@ -239,7 +264,12 @@ fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
}()))
}
-fn handle_set_timeout(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_set_timeout(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_set_timeout().unwrap();
let val = msg.timeout() as isize;
state
@@ -248,7 +278,12 @@ fn handle_set_timeout(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
ok_future(empty_buf())
}
-fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_set_env(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_set_env().unwrap();
let key = msg.key().unwrap();
let value = msg.value().unwrap();
@@ -261,7 +296,12 @@ fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
ok_future(empty_buf())
}
-fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_env(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let cmd_id = base.cmd_id();
if !state.flags.allow_env {
@@ -302,7 +342,12 @@ fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
))
}
-fn handle_fetch_req(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_fetch_req(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_fetch_req().unwrap();
let cmd_id = base.cmd_id();
let id = msg.id();
@@ -436,7 +481,12 @@ macro_rules! blocking {
};
}
-fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_make_temp_dir(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let base = Box::new(*base);
let msg = base.msg_as_make_temp_dir().unwrap();
let cmd_id = base.cmd_id();
@@ -480,7 +530,12 @@ fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
})
}
-fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_mkdir(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_mkdir().unwrap();
let mode = msg.mode();
let path = String::from(msg.path().unwrap());
@@ -496,7 +551,12 @@ fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
})
}
-fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_remove(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_remove().unwrap();
let path = PathBuf::from(msg.path().unwrap());
let recursive = msg.recursive();
@@ -520,7 +580,12 @@ fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
}
// Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184
-fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_read_file(
+ _config: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_read_file().unwrap();
let cmd_id = base.cmd_id();
let filename = PathBuf::from(msg.filename().unwrap());
@@ -570,7 +635,12 @@ fn get_mode(_perm: fs::Permissions) -> u32 {
0
}
-fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_stat(
+ _config: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_stat().unwrap();
let cmd_id = base.cmd_id();
let filename = PathBuf::from(msg.filename().unwrap());
@@ -612,7 +682,11 @@ fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
})
}
-fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_write_file(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
let msg = base.msg_as_write_file().unwrap();
if !state.flags.allow_write {
@@ -620,12 +694,11 @@ fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
}
let filename = String::from(msg.filename().unwrap());
- let data = Vec::from(msg.data().unwrap());
let perm = msg.perm();
blocking!(base.sync(), || -> OpResult {
- debug!("handle_write_file {}", filename);
- deno_fs::write_file(Path::new(&filename), data.as_slice(), perm)?;
+ debug!("handle_write_file {} {}", filename, data.len());
+ deno_fs::write_file(Path::new(&filename), data, perm)?;
Ok(empty_buf())
})
}
@@ -636,7 +709,12 @@ fn remove_timer(state: Arc<IsolateState>, timer_id: u32) {
}
// Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39
-fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_timer_start(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
debug!("handle_timer_start");
let msg = base.msg_as_timer_start().unwrap();
let cmd_id = base.cmd_id();
@@ -679,14 +757,24 @@ fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
}
// Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43
-fn handle_timer_clear(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_timer_clear(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_timer_clear().unwrap();
debug!("handle_timer_clear");
remove_timer(state, msg.id());
ok_future(empty_buf())
}
-fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_rename(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
if !state.flags.allow_write {
return odd_future(permission_denied());
}
@@ -700,7 +788,12 @@ fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
})
}
-fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_symlink(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
if !state.flags.allow_write {
return odd_future(permission_denied());
}
@@ -720,7 +813,12 @@ fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
})
}
-fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
+fn handle_read_link(
+ _state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
let msg = base.msg_as_readlink().unwrap();
let cmd_id = base.cmd_id();
let name = PathBuf::from(msg.name().unwrap());
diff --git a/src/isolate.rs b/src/isolate.rs
index 30e90be5c..8dfd4204a 100644
--- a/src/isolate.rs
+++ b/src/isolate.rs
@@ -36,12 +36,14 @@ pub type Buf = Box<[u8]>;
pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
-pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>);
+pub type Dispatch =
+ fn(state: Arc<IsolateState>, buf: &[u8], data_buf: &'static mut [u8])
+ -> (bool, Box<Op>);
pub struct Isolate {
ptr: *const libdeno::isolate,
dispatch: Dispatch,
- rx: mpsc::Receiver<Buf>,
+ rx: mpsc::Receiver<(i32, Buf)>,
ntasks: i32,
pub state: Arc<IsolateState>,
}
@@ -54,17 +56,17 @@ pub struct IsolateState {
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
- tx: Mutex<Option<mpsc::Sender<Buf>>>,
+ tx: Mutex<Option<mpsc::Sender<(i32, Buf)>>>,
}
impl IsolateState {
// Thread safe.
- fn send_to_js(&self, buf: Buf) {
+ fn send_to_js(&self, req_id: i32, buf: Buf) {
let mut g = self.tx.lock().unwrap();
let maybe_tx = g.as_mut();
assert!(maybe_tx.is_some(), "Expected tx to not be deleted.");
let tx = maybe_tx.unwrap();
- tx.send(buf).expect("tx.send error");
+ tx.send((req_id, buf)).expect("tx.send error");
}
}
@@ -79,7 +81,7 @@ impl Isolate {
let (flags, argv_rest) = flags::set_flags(argv);
// This channel handles sending async messages back to the runtime.
- let (tx, rx) = mpsc::channel::<Buf>();
+ let (tx, rx) = mpsc::channel::<(i32, Buf)>();
let mut isolate = Box::new(Isolate {
ptr: 0 as *const libdeno::isolate,
@@ -131,12 +133,10 @@ impl Isolate {
Ok(())
}
- pub fn set_response(&self, buf: Buf) {
- unsafe { libdeno::deno_set_response(self.ptr, buf.into()) }
- }
-
- pub fn send(&self, buf: Buf) {
- unsafe { libdeno::deno_send(self.ptr, buf.into()) };
+ pub fn respond(&self, req_id: i32, buf: Buf) {
+ // TODO(zero-copy) Use Buf::leak(buf) to leak the heap allocated buf. And
+ // don't do the memcpy in ImportBuf() (in libdeno/binding.cc)
+ unsafe { libdeno::deno_respond(self.ptr, req_id, buf.into()) }
}
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
@@ -144,12 +144,12 @@ impl Isolate {
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
- let buf = self.rx.recv().unwrap();
+ let (req_id, buf) = self.rx.recv().unwrap();
// Receiving a message on rx exactly corresponds to an async task
// completing.
self.ntasks_decrement();
// Call into JS with the buf.
- self.send(buf);
+ self.respond(req_id, buf);
}
}
@@ -189,18 +189,38 @@ impl From<Buf> for libdeno::deno_buf {
}
// Dereferences the C pointer into the Rust Isolate object.
-extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
- let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
+extern "C" fn pre_dispatch(
+ d: *const libdeno::isolate,
+ req_id: i32,
+ control_buf: libdeno::deno_buf,
+ data_buf: libdeno::deno_buf,
+) {
+ // control_buf is only valid for the lifetime of this call, thus is
+ // interpretted as a slice.
+ let control_slice = unsafe {
+ std::slice::from_raw_parts(control_buf.data_ptr, control_buf.data_len)
+ };
+
+ // data_buf is valid for the lifetime of the promise, thus a mutable buf with
+ // static lifetime.
+ let data_slice = unsafe {
+ std::slice::from_raw_parts_mut::<'static>(
+ data_buf.data_ptr,
+ data_buf.data_len,
+ )
+ };
+
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
- let (is_sync, op) = dispatch(isolate.state.clone(), bytes);
+ let (is_sync, op) =
+ dispatch(isolate.state.clone(), control_slice, data_slice);
if is_sync {
// Execute op synchronously.
let buf = tokio_util::block_on(op).unwrap();
if buf.len() != 0 {
// Set the synchronous response, the value returned from isolate.send().
- isolate.set_response(buf);
+ isolate.respond(req_id, buf);
}
} else {
// Execute op asynchronously.
@@ -213,7 +233,7 @@ extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
let task = op
.and_then(move |buf| {
- state.send_to_js(buf);
+ state.send_to_js(req_id, buf);
Ok(())
}).map_err(|_| ());
tokio::spawn(task);
@@ -239,7 +259,8 @@ mod tests {
fn unreachable_dispatch(
_state: Arc<IsolateState>,
- _buf: &[u8],
+ _control: &[u8],
+ _data: &'static mut [u8],
) -> (bool, Box<Op>) {
unreachable!();
}
@@ -267,14 +288,19 @@ mod tests {
});
}
- fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) {
- assert_eq!(buf[0], 4);
- assert_eq!(buf[1], 5);
- assert_eq!(buf[2], 6);
+ fn dispatch_sync(
+ _state: Arc<IsolateState>,
+ control: &[u8],
+ data: &'static mut [u8],
+ ) -> (bool, Box<Op>) {
+ assert_eq!(control[0], 4);
+ assert_eq!(control[1], 5);
+ assert_eq!(control[2], 6);
+ assert_eq!(data.len(), 0);
// Send back some sync response.
let vec: Vec<u8> = vec![1, 2, 3];
- let buf = vec.into_boxed_slice();
- let op = Box::new(futures::future::ok(buf));
+ let control = vec.into_boxed_slice();
+ let op = Box::new(futures::future::ok(control));
(true, op)
}
}
diff --git a/src/libdeno.rs b/src/libdeno.rs
index 637b3ef91..c7a03e3bb 100644
--- a/src/libdeno.rs
+++ b/src/libdeno.rs
@@ -19,7 +19,12 @@ pub struct deno_buf {
pub data_len: usize,
}
-type DenoRecvCb = unsafe extern "C" fn(d: *const isolate, buf: deno_buf);
+type DenoRecvCb = unsafe extern "C" fn(
+ d: *const isolate,
+ req_id: i32,
+ buf: deno_buf,
+ data_buf: deno_buf,
+);
extern "C" {
pub fn deno_init();
@@ -29,8 +34,7 @@ extern "C" {
pub fn deno_delete(i: *const isolate);
pub fn deno_last_exception(i: *const isolate) -> *const c_char;
pub fn deno_get_data(i: *const isolate) -> *const c_void;
- pub fn deno_set_response(i: *const isolate, buf: deno_buf);
- pub fn deno_send(i: *const isolate, buf: deno_buf);
+ pub fn deno_respond(i: *const isolate, req_id: i32, buf: deno_buf);
pub fn deno_execute(
i: *const isolate,
js_filename: *const c_char,
diff --git a/src/msg.fbs b/src/msg.fbs
index 52eba8ca6..0d78395ea 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -199,7 +199,6 @@ table ReadFileRes {
table WriteFile {
filename: string;
- data: [ubyte];
perm: uint;
// perm specified by https://godoc.org/os#FileMode
}