summaryrefslogtreecommitdiff
path: root/cli/ops/io.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-07 17:11:15 +0100
committerRy Dahl <ry@tinyclouds.org>2019-11-07 11:11:15 -0500
commit25c276055b3dfdcecd77d18a0c6ebfcee531442d (patch)
treef15f6170b6e9d966a9188f74bfb3e5bec45a3512 /cli/ops/io.rs
parent415d4c2e5236f6d8dfef8865b1665f144c39a019 (diff)
refactor: remove cli::resources::Resource (#3285)
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r--cli/ops/io.rs134
1 files changed, 117 insertions, 17 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 98ac2f395..3ede4b411 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -1,18 +1,76 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
+use crate::deno_error::bad_resource;
use crate::ops::minimal_op;
use crate::resources;
+use crate::resources::CliResource;
+use crate::resources::DenoAsyncRead;
+use crate::resources::DenoAsyncWrite;
use crate::state::ThreadSafeState;
-use crate::tokio_read;
-use crate::tokio_write;
use deno::*;
use futures::Future;
+use futures::Poll;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("read", s.core_op(minimal_op(op_read)));
i.register_op("write", s.core_op(minimal_op(op_write)));
}
+#[derive(Debug, PartialEq)]
+enum IoState {
+ Pending,
+ Done,
+}
+
+/// Tries to read some bytes directly into the given `buf` in asynchronous
+/// manner, returning a future type.
+///
+/// The returned future will resolve to both the I/O stream and the buffer
+/// as well as the number of bytes read once the read operation is completed.
+pub fn read<T>(rid: ResourceId, buf: T) -> Read<T>
+where
+ T: AsMut<[u8]>,
+{
+ Read {
+ rid,
+ buf,
+ state: IoState::Pending,
+ }
+}
+
+/// A future which can be used to easily read available number of bytes to fill
+/// a buffer.
+///
+/// Created by the [`read`] function.
+#[derive(Debug)]
+pub struct Read<T> {
+ rid: ResourceId,
+ buf: T,
+ state: IoState,
+}
+
+impl<T> Future for Read<T>
+where
+ T: AsMut<[u8]>,
+{
+ type Item = usize;
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ if self.state == IoState::Done {
+ panic!("poll a Read after it's done");
+ }
+
+ let mut table = resources::lock_resource_table();
+ let resource = table
+ .get_mut::<CliResource>(self.rid)
+ .ok_or_else(bad_resource)?;
+ let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
+ self.state = IoState::Done;
+ Ok(nread.into())
+ }
+}
+
pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
@@ -22,13 +80,58 @@ pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
- match resources::lookup(rid as u32) {
- Err(e) => Box::new(futures::future::err(e)),
- Ok(resource) => Box::new(
- tokio_read::read(resource, zero_copy)
- .map_err(ErrBox::from)
- .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)),
- ),
+ let fut = read(rid as u32, zero_copy)
+ .map_err(ErrBox::from)
+ .and_then(move |nread| Ok(nread as i32));
+
+ Box::new(fut)
+}
+
+/// A future used to write some data to a stream.
+#[derive(Debug)]
+pub struct Write<T> {
+ rid: ResourceId,
+ buf: T,
+ state: IoState,
+}
+
+/// Creates a future that will write some of the buffer `buf` to
+/// the stream resource with `rid`.
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+pub fn write<T>(rid: ResourceId, buf: T) -> Write<T>
+where
+ T: AsRef<[u8]>,
+{
+ Write {
+ rid,
+ buf,
+ state: IoState::Pending,
+ }
+}
+
+/// This is almost the same implementation as in tokio, difference is
+/// that error type is `ErrBox` instead of `std::io::Error`.
+impl<T> Future for Write<T>
+where
+ T: AsRef<[u8]>,
+{
+ type Item = usize;
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ if self.state == IoState::Done {
+ panic!("poll a Read after it's done");
+ }
+
+ let mut table = resources::lock_resource_table();
+ let resource = table
+ .get_mut::<CliResource>(self.rid)
+ .ok_or_else(bad_resource)?;
+ let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
+ self.state = IoState::Done;
+ Ok(nwritten.into())
}
}
@@ -41,12 +144,9 @@ pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
Some(buf) => buf,
};
- match resources::lookup(rid as u32) {
- Err(e) => Box::new(futures::future::err(e)),
- Ok(resource) => Box::new(
- tokio_write::write(resource, zero_copy)
- .map_err(ErrBox::from)
- .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)),
- ),
- }
+ let fut = write(rid as u32, zero_copy)
+ .map_err(ErrBox::from)
+ .and_then(move |nwritten| Ok(nwritten as i32));
+
+ Box::new(fut)
}