summaryrefslogtreecommitdiff
path: root/src/resources.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/resources.rs')
-rw-r--r--src/resources.rs64
1 files changed, 63 insertions, 1 deletions
diff --git a/src/resources.rs b/src/resources.rs
index f1497f214..69173fe85 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -10,10 +10,12 @@
#[cfg(unix)]
use eager_unix as eager;
+use errors;
use errors::bad_resource;
use errors::DenoError;
use errors::DenoResult;
use http_body::HttpBody;
+use isolate::WorkerChannels;
use repl::Repl;
use tokio_util;
use tokio_write;
@@ -22,7 +24,10 @@ use futures;
use futures::future::{Either, FutureResult};
use futures::Future;
use futures::Poll;
+use futures::Sink;
+use futures::Stream;
use hyper;
+use isolate::Buf;
use std;
use std::collections::HashMap;
use std::io::{Error, Read, Write};
@@ -96,6 +101,14 @@ enum Repr {
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
+ Worker(WorkerChannels),
+}
+
+/// If the given rid is open, this returns the type of resource, E.G. "worker".
+/// If the rid is closed or was never open, it returns None.
+pub fn get_type(rid: ResourceId) -> Option<String> {
+ let table = RESOURCE_TABLE.lock().unwrap();
+ table.get(&rid).map(inspect_repr)
}
pub fn table_entries() -> Vec<(u32, String)> {
@@ -131,6 +144,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::ChildStdin(_) => "childStdin",
Repr::ChildStdout(_) => "childStdout",
Repr::ChildStderr(_) => "childStderr",
+ Repr::Worker(_) => "worker",
};
String::from(h_repr)
@@ -138,7 +152,7 @@ fn inspect_repr(repr: &Repr) -> String {
// Abstract async file interface.
// Ideally in unix, if Resource represents an OS rid, it will be the same.
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct Resource {
pub rid: ResourceId,
}
@@ -284,6 +298,54 @@ pub fn add_repl(repl: Repl) -> Resource {
Resource { rid }
}
+pub fn add_worker(wc: WorkerChannels) -> Resource {
+ let rid = new_rid();
+ let mut tg = RESOURCE_TABLE.lock().unwrap();
+ let r = tg.insert(rid, Repr::Worker(wc));
+ assert!(r.is_none());
+ Resource { rid }
+}
+
+pub fn worker_post_message(
+ rid: ResourceId,
+ buf: Buf,
+) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let maybe_repr = table.get_mut(&rid);
+ match maybe_repr {
+ Some(Repr::Worker(ref mut wc)) => {
+ // unwrap here is incorrect, but doing it anyway
+ wc.0.clone().send(buf)
+ }
+ _ => panic!("bad resource"), // futures::future::err(bad_resource()).into(),
+ }
+}
+
+pub struct WorkerReceiver {
+ rid: ResourceId,
+}
+
+// Invert the dumbness that tokio_process causes by making Child itself a future.
+impl Future for WorkerReceiver {
+ type Item = Option<Buf>;
+ type Error = DenoError;
+
+ fn poll(&mut self) -> Poll<Option<Buf>, DenoError> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let maybe_repr = table.get_mut(&self.rid);
+ match maybe_repr {
+ Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| {
+ errors::new(errors::ErrorKind::Other, "recv msg error".to_string())
+ }),
+ _ => Err(bad_resource()),
+ }
+ }
+}
+
+pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver {
+ WorkerReceiver { rid }
+}
+
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct ChildResources {
pub child_rid: ResourceId,