diff options
Diffstat (limited to 'src/resources.rs')
-rw-r--r-- | src/resources.rs | 64 |
1 files changed, 63 insertions, 1 deletions
diff --git a/src/resources.rs b/src/resources.rs index f1497f214..69173fe85 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -10,10 +10,12 @@ #[cfg(unix)] use eager_unix as eager; +use errors; use errors::bad_resource; use errors::DenoError; use errors::DenoResult; use http_body::HttpBody; +use isolate::WorkerChannels; use repl::Repl; use tokio_util; use tokio_write; @@ -22,7 +24,10 @@ use futures; use futures::future::{Either, FutureResult}; use futures::Future; use futures::Poll; +use futures::Sink; +use futures::Stream; use hyper; +use isolate::Buf; use std; use std::collections::HashMap; use std::io::{Error, Read, Write}; @@ -96,6 +101,14 @@ enum Repr { ChildStdin(tokio_process::ChildStdin), ChildStdout(tokio_process::ChildStdout), ChildStderr(tokio_process::ChildStderr), + Worker(WorkerChannels), +} + +/// If the given rid is open, this returns the type of resource, E.G. "worker". +/// If the rid is closed or was never open, it returns None. +pub fn get_type(rid: ResourceId) -> Option<String> { + let table = RESOURCE_TABLE.lock().unwrap(); + table.get(&rid).map(inspect_repr) } pub fn table_entries() -> Vec<(u32, String)> { @@ -131,6 +144,7 @@ fn inspect_repr(repr: &Repr) -> String { Repr::ChildStdin(_) => "childStdin", Repr::ChildStdout(_) => "childStdout", Repr::ChildStderr(_) => "childStderr", + Repr::Worker(_) => "worker", }; String::from(h_repr) @@ -138,7 +152,7 @@ fn inspect_repr(repr: &Repr) -> String { // Abstract async file interface. // Ideally in unix, if Resource represents an OS rid, it will be the same. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Resource { pub rid: ResourceId, } @@ -284,6 +298,54 @@ pub fn add_repl(repl: Repl) -> Resource { Resource { rid } } +pub fn add_worker(wc: WorkerChannels) -> Resource { + let rid = new_rid(); + let mut tg = RESOURCE_TABLE.lock().unwrap(); + let r = tg.insert(rid, Repr::Worker(wc)); + assert!(r.is_none()); + Resource { rid } +} + +pub fn worker_post_message( + rid: ResourceId, + buf: Buf, +) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::Worker(ref mut wc)) => { + // unwrap here is incorrect, but doing it anyway + wc.0.clone().send(buf) + } + _ => panic!("bad resource"), // futures::future::err(bad_resource()).into(), + } +} + +pub struct WorkerReceiver { + rid: ResourceId, +} + +// Invert the dumbness that tokio_process causes by making Child itself a future. +impl Future for WorkerReceiver { + type Item = Option<Buf>; + type Error = DenoError; + + fn poll(&mut self) -> Poll<Option<Buf>, DenoError> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&self.rid); + match maybe_repr { + Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| { + errors::new(errors::ErrorKind::Other, "recv msg error".to_string()) + }), + _ => Err(bad_resource()), + } + } +} + +pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver { + WorkerReceiver { rid } +} + #[cfg_attr(feature = "cargo-clippy", allow(stutter))] pub struct ChildResources { pub child_rid: ResourceId, |