summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/msg.fbs2
-rw-r--r--src/ops.rs15
-rw-r--r--src/resources.rs15
3 files changed, 22 insertions, 10 deletions
diff --git a/src/msg.fbs b/src/msg.fbs
index d5ab926dd..7fdc73946 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -206,7 +206,7 @@ table FetchRes {
status: int;
header_key: [string];
header_value: [string];
- body: [ubyte];
+ body_rid: uint32;
}
table MakeTempDir {
diff --git a/src/ops.rs b/src/ops.rs
index b07c61385..ef03630f5 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -18,7 +18,7 @@ use futures;
use futures::future::poll_fn;
use futures::Poll;
use hyper;
-use hyper::rt::{Future, Stream};
+use hyper::rt::Future;
use remove_dir_all::remove_dir_all;
use repl;
use resources::table_entries;
@@ -417,20 +417,17 @@ fn op_fetch(
(keys, values)
};
- // TODO Handle streaming body.
- res
- .into_body()
- .concat2()
- .map(move |body| (status, body, headers))
+ let body = res.into_body();
+ let body_resource = resources::add_hyper_body(body);
+ Ok((status, headers, body_resource))
});
let future = future.map_err(|err| -> DenoError { err.into() }).and_then(
- move |(status, body, headers)| {
+ move |(status, headers, body_resource)| {
debug!("fetch body ");
let builder = &mut FlatBufferBuilder::new();
// Send the first message without a body. This is just to indicate
// what status code.
- let body_off = builder.create_vector(body.as_ref());
let header_keys: Vec<&str> = headers.0.iter().map(|s| &**s).collect();
let header_keys_off =
builder.create_vector_of_strings(header_keys.as_slice());
@@ -443,7 +440,7 @@ fn op_fetch(
&msg::FetchResArgs {
id,
status,
- body: Some(body_off),
+ body_rid: body_resource.rid,
header_key: Some(header_keys_off),
header_value: Some(header_values_off),
},
diff --git a/src/resources.rs b/src/resources.rs
index 08de09889..90b7ce772 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -13,6 +13,7 @@ use eager_unix as eager;
use errors::bad_resource;
use errors::DenoError;
use errors::DenoResult;
+use http_body::HttpBody;
use repl::Repl;
use tokio_util;
use tokio_write;
@@ -20,6 +21,7 @@ use tokio_write;
use futures;
use futures::future::{Either, FutureResult};
use futures::Poll;
+use hyper;
use std;
use std::collections::HashMap;
use std::io::{Error, Read, Write};
@@ -59,6 +61,7 @@ enum Repr {
FsFile(tokio::fs::File),
TcpListener(tokio::net::TcpListener),
TcpStream(tokio::net::TcpStream),
+ HttpBody(HttpBody),
Repl(Repl),
}
@@ -89,6 +92,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::FsFile(_) => "fsFile",
Repr::TcpListener(_) => "tcpListener",
Repr::TcpStream(_) => "tcpStream",
+ Repr::HttpBody(_) => "httpBody",
Repr::Repl(_) => "repl",
};
@@ -155,6 +159,7 @@ impl AsyncRead for Resource {
Repr::FsFile(ref mut f) => f.poll_read(buf),
Repr::Stdin(ref mut f) => f.poll_read(buf),
Repr::TcpStream(ref mut f) => f.poll_read(buf),
+ Repr::HttpBody(ref mut f) => f.poll_read(buf),
_ => panic!("Cannot read"),
},
}
@@ -222,6 +227,15 @@ pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource {
Resource { rid }
}
+pub fn add_hyper_body(body: hyper::Body) -> Resource {
+ let rid = new_rid();
+ let mut tg = RESOURCE_TABLE.lock().unwrap();
+ let body = HttpBody::from(body);
+ let r = tg.insert(rid, Repr::HttpBody(body));
+ assert!(r.is_none());
+ Resource { rid }
+}
+
pub fn add_repl(repl: Repl) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
@@ -243,6 +257,7 @@ pub fn readline(rid: ResourceId, prompt: &str) -> DenoResult<String> {
}
pub fn lookup(rid: ResourceId) -> Option<Resource> {
+ debug!("resource lookup {}", rid);
let table = RESOURCE_TABLE.lock().unwrap();
table.get(&rid).map(|_| Resource { rid })
}