diff options
Diffstat (limited to 'src/ops.rs')
-rw-r--r-- | src/ops.rs | 173 |
1 files changed, 133 insertions, 40 deletions
diff --git a/src/ops.rs b/src/ops.rs index d678e9727..cf25f29e0 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -16,7 +16,10 @@ use version; use flatbuffers::FlatBufferBuilder; use futures; +use futures::Async; use futures::Poll; +use futures::Sink; +use futures::Stream; use hyper; use hyper::rt::Future; use remove_dir_all::remove_dir_all; @@ -34,6 +37,7 @@ use std::path::Path; use std::path::PathBuf; use std::process::Command; use std::str::FromStr; +use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use tokio; @@ -48,7 +52,7 @@ type OpResult = DenoResult<Buf>; // TODO Ideally we wouldn't have to box the Op being returned. // The box is just to make it easier to get a prototype refactor working. type OpCreator = - fn(state: &IsolateState, base: &msg::Base, data: libdeno::deno_buf) + fn(state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf) -> Box<Op>; #[inline] @@ -113,8 +117,10 @@ pub fn dispatch( msg::Any::Stat => op_stat, msg::Any::Symlink => op_symlink, msg::Any::Truncate => op_truncate, - msg::Any::WriteFile => op_write_file, + msg::Any::WorkerGetMessage => op_worker_get_message, + msg::Any::WorkerPostMessage => op_worker_post_message, msg::Any::Write => op_write, + msg::Any::WriteFile => op_write_file, _ => panic!(format!( "Unhandled message {}", msg::enum_name_any(inner_type) @@ -168,7 +174,7 @@ pub fn dispatch( } fn op_exit( - _config: &IsolateState, + _config: &Arc<IsolateState>, base: &msg::Base, _data: libdeno::deno_buf, ) -> Box<Op> { @@ -177,7 +183,7 @@ fn op_exit( } fn op_start( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -238,19 +244,19 @@ fn serialize_response( } #[inline] -fn ok_future(buf: Buf) -> Box<Op> { +pub fn ok_future(buf: Buf) -> Box<Op> { Box::new(futures::future::ok(buf)) } // Shout out to Earl Sweatshirt. #[inline] -fn odd_future(err: DenoError) -> Box<Op> { +pub fn odd_future(err: DenoError) -> Box<Op> { Box::new(futures::future::err(err)) } // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 fn op_code_fetch( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -293,7 +299,7 @@ fn op_code_fetch( // https://github.com/denoland/deno/blob/golang/os.go#L156-L169 fn op_code_cache( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -312,7 +318,7 @@ fn op_code_cache( } fn op_chdir( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -344,7 +350,7 @@ fn op_set_timeout( } fn op_set_env( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -360,7 +366,7 @@ fn op_set_env( } fn op_env( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -392,7 +398,7 @@ fn op_env( } fn op_fetch( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -479,7 +485,7 @@ where } fn op_make_temp_dir( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -528,7 +534,7 @@ fn op_make_temp_dir( } fn op_mkdir( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -548,7 +554,7 @@ fn op_mkdir( } fn op_chmod( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -581,7 +587,7 @@ fn op_chmod( } fn op_open( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -657,7 +663,7 @@ fn op_open( } fn op_close( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -674,7 +680,7 @@ fn op_close( } fn op_shutdown( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -700,7 +706,7 @@ fn op_shutdown( } fn op_read( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -738,7 +744,7 @@ fn op_read( } fn op_write( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -775,7 +781,7 @@ fn op_write( } fn op_remove( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -805,7 +811,7 @@ fn op_remove( // Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184 fn op_read_file( - _config: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -839,7 +845,7 @@ fn op_read_file( } fn op_copy_file( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -891,7 +897,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 { } fn op_cwd( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -917,7 +923,7 @@ fn op_cwd( } fn op_stat( - _config: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -964,7 +970,7 @@ fn op_stat( } fn op_read_dir( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1022,7 +1028,7 @@ fn op_read_dir( } fn op_write_file( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1042,7 +1048,7 @@ fn op_write_file( } fn op_rename( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1062,7 +1068,7 @@ fn op_rename( } fn op_symlink( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1091,7 +1097,7 @@ fn op_symlink( } fn op_read_link( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1124,7 +1130,7 @@ fn op_read_link( } fn op_repl_start( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1155,7 +1161,7 @@ fn op_repl_start( } fn op_repl_readline( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1193,7 +1199,7 @@ fn op_repl_readline( } fn op_truncate( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1216,7 +1222,7 @@ fn op_truncate( } fn op_listen( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1282,7 +1288,7 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { } fn op_accept( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1308,7 +1314,7 @@ fn op_accept( } fn op_dial( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1332,7 +1338,7 @@ fn op_dial( } fn op_metrics( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1356,7 +1362,7 @@ fn op_metrics( } fn op_resources( - _state: &IsolateState, + _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1408,7 +1414,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio { } fn op_run( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1476,7 +1482,7 @@ fn op_run( } fn op_run_status( - state: &IsolateState, + state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { @@ -1530,3 +1536,90 @@ fn op_run_status( }); Box::new(future) } + +struct GetMessageFuture { + pub state: Arc<IsolateState>, +} + +impl Future for GetMessageFuture { + type Item = Option<Buf>; + type Error = (); + + fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { + assert!(self.state.worker_channels.is_some()); + match self.state.worker_channels { + None => panic!("expected worker_channels"), + Some(ref wc) => { + let mut wc = wc.lock().unwrap(); + wc.1.poll() + } + } + } +} + +fn op_worker_get_message( + state: &Arc<IsolateState>, + base: &msg::Base, + data: libdeno::deno_buf, +) -> Box<Op> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + + let op = GetMessageFuture { + state: state.clone(), + }; + let op = op.map_err(move |_| -> DenoError { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> { + debug!("op_worker_get_message"); + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let inner = msg::WorkerGetMessageRes::create( + builder, + &msg::WorkerGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::WorkerGetMessageRes, + ..Default::default() + }, + )) + }); + Box::new(op) +} + +fn op_worker_post_message( + state: &Arc<IsolateState>, + base: &msg::Base, + data: libdeno::deno_buf, +) -> Box<Op> { + let cmd_id = base.cmd_id(); + + let d = Vec::from(data.as_ref()).into_boxed_slice(); + + assert!(state.worker_channels.is_some()); + let tx = match state.worker_channels { + None => panic!("expected worker_channels"), + Some(ref wc) => { + let mut wc = wc.lock().unwrap(); + wc.0.clone() + } + }; + let op = tx.send(d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult<Buf> { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) +} |