diff options
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/ops.rs | 9 | ||||
-rw-r--r-- | src/tokio_write.rs | 61 |
3 files changed, 66 insertions, 5 deletions
diff --git a/src/main.rs b/src/main.rs index 6c75fb515..82993596a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,7 @@ mod libdeno; pub mod ops; mod resources; mod tokio_util; +mod tokio_write; mod version; use std::env; diff --git a/src/ops.rs b/src/ops.rs index 0120f50f0..2d7072dd0 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -11,6 +11,7 @@ use msg; use resources; use resources::Resource; use tokio_util; +use tokio_write; use version; use flatbuffers::FlatBufferBuilder; @@ -35,7 +36,6 @@ use std::time::{Duration, Instant}; use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; -use tokio_io; use tokio_threadpool; type OpResult = DenoResult<Buf>; @@ -703,15 +703,14 @@ fn op_write( match resources::lookup(rid) { None => odd_future(errors::bad_resource()), Some(resource) => { - let len = data.len(); - let op = tokio_io::io::write_all(resource, data) + let op = tokio_write::write(resource, data) .map_err(|err| DenoError::from(err)) - .and_then(move |(_resource, _buf)| { + .and_then(move |(_resource, _buf, nwritten)| { let builder = &mut FlatBufferBuilder::new(); let inner = msg::WriteRes::create( builder, &msg::WriteResArgs { - nbyte: len as u32, + nbyte: nwritten as u32, ..Default::default() }, ); diff --git a/src/tokio_write.rs b/src/tokio_write.rs new file mode 100644 index 000000000..47f62a584 --- /dev/null +++ b/src/tokio_write.rs @@ -0,0 +1,61 @@ +// TODO Submit this file upstream into tokio-io/src/io/write.rs +use std::io; +use std::mem; + +use futures::{Future, Poll}; +use tokio::io::AsyncWrite; + +/// A future used to write some data to a stream. +/// +/// This is created by the [`write`] top-level method. +/// +/// [`write`]: fn.write.html +#[derive(Debug)] +pub struct Write<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Pending { a: A, buf: T }, + Empty, +} + +/// Creates a future that will write some of the buffer `buf` to +/// the stream `a` provided. +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +pub fn write<A, T>(a: A, buf: T) -> Write<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + Write { + state: State::Pending { a: a, buf: buf }, + } +} + +impl<A, T> Future for Write<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + type Item = (A, T, usize); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T, usize), io::Error> { + let nwritten = match self.state { + State::Pending { + ref mut a, + ref mut buf, + } => try_ready!(a.poll_write(buf.as_ref())), + State::Empty => panic!("poll a Read after it's done"), + }; + + match mem::replace(&mut self.state, State::Empty) { + State::Pending { a, buf } => Ok((a, buf, nwritten).into()), + State::Empty => panic!("invalid internal state"), + } + } +} |