diff options
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r-- | cli/ops/io.rs | 134 |
1 files changed, 117 insertions, 17 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 98ac2f395..3ede4b411 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,18 +1,76 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; +use crate::deno_error::bad_resource; use crate::ops::minimal_op; use crate::resources; +use crate::resources::CliResource; +use crate::resources::DenoAsyncRead; +use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; -use crate::tokio_read; -use crate::tokio_write; use deno::*; use futures::Future; +use futures::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("read", s.core_op(minimal_op(op_read))); i.register_op("write", s.core_op(minimal_op(op_write))); } +#[derive(Debug, PartialEq)] +enum IoState { + Pending, + Done, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read<T>(rid: ResourceId, buf: T) -> Read<T> +where + T: AsMut<[u8]>, +{ + Read { + rid, + buf, + state: IoState::Pending, + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read<T> { + rid: ResourceId, + buf: T, + state: IoState, +} + +impl<T> Future for Read<T> +where + T: AsMut<[u8]>, +{ + type Item = usize; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.state == IoState::Done { + panic!("poll a Read after it's done"); + } + + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); + self.state = IoState::Done; + Ok(nread.into()) + } +} + pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { debug!("read rid={}", rid); let zero_copy = match zero_copy { @@ -22,13 +80,58 @@ pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { Some(buf) => buf, }; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_read::read(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)), - ), + let fut = read(rid as u32, zero_copy) + .map_err(ErrBox::from) + .and_then(move |nread| Ok(nread as i32)); + + Box::new(fut) +} + +/// A future used to write some data to a stream. +#[derive(Debug)] +pub struct Write<T> { + rid: ResourceId, + buf: T, + state: IoState, +} + +/// Creates a future that will write some of the buffer `buf` to +/// the stream resource with `rid`. +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +pub fn write<T>(rid: ResourceId, buf: T) -> Write<T> +where + T: AsRef<[u8]>, +{ + Write { + rid, + buf, + state: IoState::Pending, + } +} + +/// This is almost the same implementation as in tokio, difference is +/// that error type is `ErrBox` instead of `std::io::Error`. +impl<T> Future for Write<T> +where + T: AsRef<[u8]>, +{ + type Item = usize; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.state == IoState::Done { + panic!("poll a Read after it's done"); + } + + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); + self.state = IoState::Done; + Ok(nwritten.into()) } } @@ -41,12 +144,9 @@ pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { Some(buf) => buf, }; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_write::write(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)), - ), - } + let fut = write(rid as u32, zero_copy) + .map_err(ErrBox::from) + .and_then(move |nwritten| Ok(nwritten as i32)); + + Box::new(fut) } |