From 2af25b1957d6415bf4ed89e1e01d379346d93ba8 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 19 Oct 2018 16:10:25 -0400 Subject: Allow partial writes. Do not use tokio_io::io:write_all(). Adds src/tokio_write.rs --- src/main.rs | 1 + src/ops.rs | 9 ++++---- src/tokio_write.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 5 deletions(-) create mode 100644 src/tokio_write.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 6c75fb515..82993596a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,7 @@ mod libdeno; pub mod ops; mod resources; mod tokio_util; +mod tokio_write; mod version; use std::env; diff --git a/src/ops.rs b/src/ops.rs index 0120f50f0..2d7072dd0 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -11,6 +11,7 @@ use msg; use resources; use resources::Resource; use tokio_util; +use tokio_write; use version; use flatbuffers::FlatBufferBuilder; @@ -35,7 +36,6 @@ use std::time::{Duration, Instant}; use tokio; use tokio::net::TcpListener; use tokio::net::TcpStream; -use tokio_io; use tokio_threadpool; type OpResult = DenoResult; @@ -703,15 +703,14 @@ fn op_write( match resources::lookup(rid) { None => odd_future(errors::bad_resource()), Some(resource) => { - let len = data.len(); - let op = tokio_io::io::write_all(resource, data) + let op = tokio_write::write(resource, data) .map_err(|err| DenoError::from(err)) - .and_then(move |(_resource, _buf)| { + .and_then(move |(_resource, _buf, nwritten)| { let builder = &mut FlatBufferBuilder::new(); let inner = msg::WriteRes::create( builder, &msg::WriteResArgs { - nbyte: len as u32, + nbyte: nwritten as u32, ..Default::default() }, ); diff --git a/src/tokio_write.rs b/src/tokio_write.rs new file mode 100644 index 000000000..47f62a584 --- /dev/null +++ b/src/tokio_write.rs @@ -0,0 +1,61 @@ +// TODO Submit this file upstream into tokio-io/src/io/write.rs +use std::io; +use std::mem; + +use futures::{Future, Poll}; +use tokio::io::AsyncWrite; + +/// A future used to write some data to a stream. +/// +/// This is created by the [`write`] top-level method. +/// +/// [`write`]: fn.write.html +#[derive(Debug)] +pub struct Write { + state: State, +} + +#[derive(Debug)] +enum State { + Pending { a: A, buf: T }, + Empty, +} + +/// Creates a future that will write some of the buffer `buf` to +/// the stream `a` provided. +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +pub fn write(a: A, buf: T) -> Write +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + Write { + state: State::Pending { a: a, buf: buf }, + } +} + +impl Future for Write +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + type Item = (A, T, usize); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T, usize), io::Error> { + let nwritten = match self.state { + State::Pending { + ref mut a, + ref mut buf, + } => try_ready!(a.poll_write(buf.as_ref())), + State::Empty => panic!("poll a Read after it's done"), + }; + + match mem::replace(&mut self.state, State::Empty) { + State::Pending { a, buf } => Ok((a, buf, nwritten).into()), + State::Empty => panic!("invalid internal state"), + } + } +} -- cgit v1.2.3