summaryrefslogtreecommitdiff
path: root/src/ops.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ops.rs')
-rw-r--r--src/ops.rs173
1 files changed, 133 insertions, 40 deletions
diff --git a/src/ops.rs b/src/ops.rs
index d678e9727..cf25f29e0 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -16,7 +16,10 @@ use version;
use flatbuffers::FlatBufferBuilder;
use futures;
+use futures::Async;
use futures::Poll;
+use futures::Sink;
+use futures::Stream;
use hyper;
use hyper::rt::Future;
use remove_dir_all::remove_dir_all;
@@ -34,6 +37,7 @@ use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
+use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant};
use tokio;
@@ -48,7 +52,7 @@ type OpResult = DenoResult<Buf>;
// TODO Ideally we wouldn't have to box the Op being returned.
// The box is just to make it easier to get a prototype refactor working.
type OpCreator =
- fn(state: &IsolateState, base: &msg::Base, data: libdeno::deno_buf)
+ fn(state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf)
-> Box<Op>;
#[inline]
@@ -113,8 +117,10 @@ pub fn dispatch(
msg::Any::Stat => op_stat,
msg::Any::Symlink => op_symlink,
msg::Any::Truncate => op_truncate,
- msg::Any::WriteFile => op_write_file,
+ msg::Any::WorkerGetMessage => op_worker_get_message,
+ msg::Any::WorkerPostMessage => op_worker_post_message,
msg::Any::Write => op_write,
+ msg::Any::WriteFile => op_write_file,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(inner_type)
@@ -168,7 +174,7 @@ pub fn dispatch(
}
fn op_exit(
- _config: &IsolateState,
+ _config: &Arc<IsolateState>,
base: &msg::Base,
_data: libdeno::deno_buf,
) -> Box<Op> {
@@ -177,7 +183,7 @@ fn op_exit(
}
fn op_start(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -238,19 +244,19 @@ fn serialize_response(
}
#[inline]
-fn ok_future(buf: Buf) -> Box<Op> {
+pub fn ok_future(buf: Buf) -> Box<Op> {
Box::new(futures::future::ok(buf))
}
// Shout out to Earl Sweatshirt.
#[inline]
-fn odd_future(err: DenoError) -> Box<Op> {
+pub fn odd_future(err: DenoError) -> Box<Op> {
Box::new(futures::future::err(err))
}
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
fn op_code_fetch(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -293,7 +299,7 @@ fn op_code_fetch(
// https://github.com/denoland/deno/blob/golang/os.go#L156-L169
fn op_code_cache(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -312,7 +318,7 @@ fn op_code_cache(
}
fn op_chdir(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -344,7 +350,7 @@ fn op_set_timeout(
}
fn op_set_env(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -360,7 +366,7 @@ fn op_set_env(
}
fn op_env(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -392,7 +398,7 @@ fn op_env(
}
fn op_fetch(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -479,7 +485,7 @@ where
}
fn op_make_temp_dir(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -528,7 +534,7 @@ fn op_make_temp_dir(
}
fn op_mkdir(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -548,7 +554,7 @@ fn op_mkdir(
}
fn op_chmod(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -581,7 +587,7 @@ fn op_chmod(
}
fn op_open(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -657,7 +663,7 @@ fn op_open(
}
fn op_close(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -674,7 +680,7 @@ fn op_close(
}
fn op_shutdown(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -700,7 +706,7 @@ fn op_shutdown(
}
fn op_read(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -738,7 +744,7 @@ fn op_read(
}
fn op_write(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -775,7 +781,7 @@ fn op_write(
}
fn op_remove(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -805,7 +811,7 @@ fn op_remove(
// Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184
fn op_read_file(
- _config: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -839,7 +845,7 @@ fn op_read_file(
}
fn op_copy_file(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -891,7 +897,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
}
fn op_cwd(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -917,7 +923,7 @@ fn op_cwd(
}
fn op_stat(
- _config: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -964,7 +970,7 @@ fn op_stat(
}
fn op_read_dir(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1022,7 +1028,7 @@ fn op_read_dir(
}
fn op_write_file(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1042,7 +1048,7 @@ fn op_write_file(
}
fn op_rename(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1062,7 +1068,7 @@ fn op_rename(
}
fn op_symlink(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1091,7 +1097,7 @@ fn op_symlink(
}
fn op_read_link(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1124,7 +1130,7 @@ fn op_read_link(
}
fn op_repl_start(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1155,7 +1161,7 @@ fn op_repl_start(
}
fn op_repl_readline(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1193,7 +1199,7 @@ fn op_repl_readline(
}
fn op_truncate(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1216,7 +1222,7 @@ fn op_truncate(
}
fn op_listen(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1282,7 +1288,7 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
}
fn op_accept(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1308,7 +1314,7 @@ fn op_accept(
}
fn op_dial(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1332,7 +1338,7 @@ fn op_dial(
}
fn op_metrics(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1356,7 +1362,7 @@ fn op_metrics(
}
fn op_resources(
- _state: &IsolateState,
+ _state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1408,7 +1414,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio {
}
fn op_run(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1476,7 +1482,7 @@ fn op_run(
}
fn op_run_status(
- state: &IsolateState,
+ state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
@@ -1530,3 +1536,90 @@ fn op_run_status(
});
Box::new(future)
}
+
+struct GetMessageFuture {
+ pub state: Arc<IsolateState>,
+}
+
+impl Future for GetMessageFuture {
+ type Item = Option<Buf>;
+ type Error = ();
+
+ fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
+ assert!(self.state.worker_channels.is_some());
+ match self.state.worker_channels {
+ None => panic!("expected worker_channels"),
+ Some(ref wc) => {
+ let mut wc = wc.lock().unwrap();
+ wc.1.poll()
+ }
+ }
+ }
+}
+
+fn op_worker_get_message(
+ state: &Arc<IsolateState>,
+ base: &msg::Base,
+ data: libdeno::deno_buf,
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+
+ let op = GetMessageFuture {
+ state: state.clone(),
+ };
+ let op = op.map_err(move |_| -> DenoError { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
+ debug!("op_worker_get_message");
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let inner = msg::WorkerGetMessageRes::create(
+ builder,
+ &msg::WorkerGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(inner.as_union_value()),
+ inner_type: msg::Any::WorkerGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
+
+fn op_worker_post_message(
+ state: &Arc<IsolateState>,
+ base: &msg::Base,
+ data: libdeno::deno_buf,
+) -> Box<Op> {
+ let cmd_id = base.cmd_id();
+
+ let d = Vec::from(data.as_ref()).into_boxed_slice();
+
+ assert!(state.worker_channels.is_some());
+ let tx = match state.worker_channels {
+ None => panic!("expected worker_channels"),
+ Some(ref wc) => {
+ let mut wc = wc.lock().unwrap();
+ wc.0.clone()
+ }
+ };
+ let op = tx.send(d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}