summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs1
-rw-r--r--src/ops.rs9
-rw-r--r--src/tokio_write.rs61
3 files changed, 66 insertions, 5 deletions
diff --git a/src/main.rs b/src/main.rs
index 6c75fb515..82993596a 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -32,6 +32,7 @@ mod libdeno;
pub mod ops;
mod resources;
mod tokio_util;
+mod tokio_write;
mod version;
use std::env;
diff --git a/src/ops.rs b/src/ops.rs
index 0120f50f0..2d7072dd0 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -11,6 +11,7 @@ use msg;
use resources;
use resources::Resource;
use tokio_util;
+use tokio_write;
use version;
use flatbuffers::FlatBufferBuilder;
@@ -35,7 +36,6 @@ use std::time::{Duration, Instant};
use tokio;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
-use tokio_io;
use tokio_threadpool;
type OpResult = DenoResult<Buf>;
@@ -703,15 +703,14 @@ fn op_write(
match resources::lookup(rid) {
None => odd_future(errors::bad_resource()),
Some(resource) => {
- let len = data.len();
- let op = tokio_io::io::write_all(resource, data)
+ let op = tokio_write::write(resource, data)
.map_err(|err| DenoError::from(err))
- .and_then(move |(_resource, _buf)| {
+ .and_then(move |(_resource, _buf, nwritten)| {
let builder = &mut FlatBufferBuilder::new();
let inner = msg::WriteRes::create(
builder,
&msg::WriteResArgs {
- nbyte: len as u32,
+ nbyte: nwritten as u32,
..Default::default()
},
);
diff --git a/src/tokio_write.rs b/src/tokio_write.rs
new file mode 100644
index 000000000..47f62a584
--- /dev/null
+++ b/src/tokio_write.rs
@@ -0,0 +1,61 @@
+// TODO Submit this file upstream into tokio-io/src/io/write.rs
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+use tokio::io::AsyncWrite;
+
+/// A future used to write some data to a stream.
+///
+/// This is created by the [`write`] top-level method.
+///
+/// [`write`]: fn.write.html
+#[derive(Debug)]
+pub struct Write<A, T> {
+ state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+ Pending { a: A, buf: T },
+ Empty,
+}
+
+/// Creates a future that will write some of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+pub fn write<A, T>(a: A, buf: T) -> Write<A, T>
+where
+ A: AsyncWrite,
+ T: AsRef<[u8]>,
+{
+ Write {
+ state: State::Pending { a: a, buf: buf },
+ }
+}
+
+impl<A, T> Future for Write<A, T>
+where
+ A: AsyncWrite,
+ T: AsRef<[u8]>,
+{
+ type Item = (A, T, usize);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T, usize), io::Error> {
+ let nwritten = match self.state {
+ State::Pending {
+ ref mut a,
+ ref mut buf,
+ } => try_ready!(a.poll_write(buf.as_ref())),
+ State::Empty => panic!("poll a Read after it's done"),
+ };
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Pending { a, buf } => Ok((a, buf, nwritten).into()),
+ State::Empty => panic!("invalid internal state"),
+ }
+ }
+}