diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-07 17:11:15 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-07 11:11:15 -0500 |
commit | 25c276055b3dfdcecd77d18a0c6ebfcee531442d (patch) | |
tree | f15f6170b6e9d966a9188f74bfb3e5bec45a3512 /cli/ops | |
parent | 415d4c2e5236f6d8dfef8865b1665f144c39a019 (diff) |
refactor: remove cli::resources::Resource (#3285)
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/fetch.rs | 4 | ||||
-rw-r--r-- | cli/ops/files.rs | 55 | ||||
-rw-r--r-- | cli/ops/io.rs | 134 | ||||
-rw-r--r-- | cli/ops/net.rs | 32 | ||||
-rw-r--r-- | cli/ops/repl.rs | 4 | ||||
-rw-r--r-- | cli/ops/tls.rs | 17 |
6 files changed, 198 insertions, 48 deletions
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 1d330ce41..143331171 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -62,10 +62,10 @@ pub fn op_fetch( } let body = res.into_body(); - let body_resource = resources::add_reqwest_body(body); + let rid = resources::add_reqwest_body(body); let json_res = json!({ - "bodyRid": body_resource.rid, + "bodyRid": rid, "status": status.as_u16(), "statusText": status.canonical_reason().unwrap_or(""), "headers": res_headers diff --git a/cli/ops/files.rs b/cli/ops/files.rs index c1e43ff95..04b5f98bf 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -1,14 +1,19 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::deno_error::bad_resource; +use crate::deno_error::DenoError; +use crate::deno_error::ErrorKind; use crate::fs as deno_fs; use crate::ops::json_op; use crate::resources; +use crate::resources::CliResource; use crate::state::ThreadSafeState; use deno::*; use futures::Future; +use futures::Poll; use std; use std::convert::From; +use std::io::SeekFrom; use tokio; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { @@ -86,8 +91,8 @@ fn op_open( let is_sync = args.promise_id.is_none(); let op = open_options.open(filename).map_err(ErrBox::from).and_then( move |fs_file| { - let resource = resources::add_fs_file(fs_file); - futures::future::ok(json!(resource.rid)) + let rid = resources::add_fs_file(fs_file); + futures::future::ok(json!(rid)) }, ); @@ -116,6 +121,31 @@ fn op_close( Ok(JsonOp::Sync(json!({}))) } +#[derive(Debug)] +pub struct SeekFuture { + seek_from: SeekFrom, + rid: ResourceId, +} + +impl Future for SeekFuture { + type Item = u64; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + + let tokio_file = match resource { + CliResource::FsFile(ref mut file) => file, + _ => return Err(bad_resource()), + }; + + tokio_file.poll_seek(self.seek_from).map_err(ErrBox::from) + } +} + #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct SeekArgs { @@ -131,10 +161,25 @@ fn op_seek( _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: SeekArgs = serde_json::from_value(args)?; + let rid = args.rid as u32; + let offset = args.offset; + let whence = args.whence as u32; + // Translate seek mode to Rust repr. + let seek_from = match whence { + 0 => SeekFrom::Start(offset as u64), + 1 => SeekFrom::Current(i64::from(offset)), + 2 => SeekFrom::End(i64::from(offset)), + _ => { + return Err(ErrBox::from(DenoError::new( + ErrorKind::InvalidSeekMode, + format!("Invalid seek mode: {}", whence), + ))); + } + }; + + let fut = SeekFuture { seek_from, rid }; - let resource = resources::lookup(args.rid as u32)?; - let op = resources::seek(resource, args.offset, args.whence as u32) - .and_then(move |_| futures::future::ok(json!({}))); + let op = fut.and_then(move |_| futures::future::ok(json!({}))); if args.promise_id.is_none() { let buf = op.wait()?; Ok(JsonOp::Sync(buf)) diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 98ac2f395..3ede4b411 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,18 +1,76 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; +use crate::deno_error::bad_resource; use crate::ops::minimal_op; use crate::resources; +use crate::resources::CliResource; +use crate::resources::DenoAsyncRead; +use crate::resources::DenoAsyncWrite; use crate::state::ThreadSafeState; -use crate::tokio_read; -use crate::tokio_write; use deno::*; use futures::Future; +use futures::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("read", s.core_op(minimal_op(op_read))); i.register_op("write", s.core_op(minimal_op(op_write))); } +#[derive(Debug, PartialEq)] +enum IoState { + Pending, + Done, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read<T>(rid: ResourceId, buf: T) -> Read<T> +where + T: AsMut<[u8]>, +{ + Read { + rid, + buf, + state: IoState::Pending, + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read<T> { + rid: ResourceId, + buf: T, + state: IoState, +} + +impl<T> Future for Read<T> +where + T: AsMut<[u8]>, +{ + type Item = usize; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.state == IoState::Done { + panic!("poll a Read after it's done"); + } + + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); + self.state = IoState::Done; + Ok(nread.into()) + } +} + pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { debug!("read rid={}", rid); let zero_copy = match zero_copy { @@ -22,13 +80,58 @@ pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { Some(buf) => buf, }; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_read::read(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nread)| Ok(nread as i32)), - ), + let fut = read(rid as u32, zero_copy) + .map_err(ErrBox::from) + .and_then(move |nread| Ok(nread as i32)); + + Box::new(fut) +} + +/// A future used to write some data to a stream. +#[derive(Debug)] +pub struct Write<T> { + rid: ResourceId, + buf: T, + state: IoState, +} + +/// Creates a future that will write some of the buffer `buf` to +/// the stream resource with `rid`. +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +pub fn write<T>(rid: ResourceId, buf: T) -> Write<T> +where + T: AsRef<[u8]>, +{ + Write { + rid, + buf, + state: IoState::Pending, + } +} + +/// This is almost the same implementation as in tokio, difference is +/// that error type is `ErrBox` instead of `std::io::Error`. +impl<T> Future for Write<T> +where + T: AsRef<[u8]>, +{ + type Item = usize; + type Error = ErrBox; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.state == IoState::Done { + panic!("poll a Read after it's done"); + } + + let mut table = resources::lock_resource_table(); + let resource = table + .get_mut::<CliResource>(self.rid) + .ok_or_else(bad_resource)?; + let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); + self.state = IoState::Done; + Ok(nwritten.into()) } } @@ -41,12 +144,9 @@ pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> { Some(buf) => buf, }; - match resources::lookup(rid as u32) { - Err(e) => Box::new(futures::future::err(e)), - Ok(resource) => Box::new( - tokio_write::write(resource, zero_copy) - .map_err(ErrBox::from) - .and_then(move |(_resource, _buf, nwritten)| Ok(nwritten as i32)), - ), - } + let fut = write(rid as u32, zero_copy) + .map_err(ErrBox::from) + .and_then(move |nwritten| Ok(nwritten as i32)); + + Box::new(fut) } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index d603b746b..a4b3bf934 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -4,7 +4,7 @@ use crate::deno_error::bad_resource; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::CliResource; use crate::resources::Resource; use crate::state::ThreadSafeState; use deno::*; @@ -132,13 +132,13 @@ fn op_accept( .and_then(move |(tcp_stream, _socket_addr)| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - Ok((tcp_stream_resource, local_addr, remote_addr)) + let rid = resources::add_tcp_stream(tcp_stream); + Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) - .and_then(move |(tcp_stream_resource, local_addr, remote_addr)| { + .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ - "rid": tcp_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) @@ -170,13 +170,13 @@ fn op_dial( .and_then(move |tcp_stream| { let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; - let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); - Ok((tcp_stream_resource, local_addr, remote_addr)) + let rid = resources::add_tcp_stream(tcp_stream); + Ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) - .and_then(move |(tcp_stream_resource, local_addr, remote_addr)| { + .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ - "rid": tcp_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) @@ -201,7 +201,6 @@ fn op_shutdown( let rid = args.rid as u32; let how = args.how; - let mut resource = resources::lookup(rid)?; let shutdown_mode = match how { 0 => Shutdown::Read, @@ -209,8 +208,15 @@ fn op_shutdown( _ => unimplemented!(), }; - // Use UFCS for disambiguation - Resource::shutdown(&mut resource, shutdown_mode)?; + let mut table = resources::lock_resource_table(); + let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?; + match resource { + CliResource::TcpStream(ref mut stream) => { + TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?; + } + _ => return Err(bad_resource()), + } + Ok(JsonOp::Sync(json!({}))) } @@ -228,7 +234,7 @@ struct TcpListenerResource { local_addr: SocketAddr, } -impl CoreResource for TcpListenerResource {} +impl Resource for TcpListenerResource {} impl Drop for TcpListenerResource { fn drop(&mut self) { diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs index ba63c5109..723fb2571 100644 --- a/cli/ops/repl.rs +++ b/cli/ops/repl.rs @@ -5,7 +5,7 @@ use crate::ops::json_op; use crate::repl; use crate::repl::Repl; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::Resource; use crate::state::ThreadSafeState; use deno::*; use std::sync::Arc; @@ -24,7 +24,7 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { struct ReplResource(Arc<Mutex<Repl>>); -impl CoreResource for ReplResource {} +impl Resource for ReplResource {} #[derive(Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index ee08f357a..6e8348c91 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -6,7 +6,7 @@ use crate::deno_error::ErrorKind; use crate::ops::json_op; use crate::resolve_addr::resolve_addr; use crate::resources; -use crate::resources::CoreResource; +use crate::resources::Resource; use crate::state::ThreadSafeState; use deno::*; use futures::Async; @@ -99,9 +99,9 @@ pub fn op_dial_tls( .connect(dnsname, tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let tls_stream_resource = resources::add_tls_stream(tls_stream); + let rid = resources::add_tls_stream(tls_stream); futures::future::ok(json!({ - "rid": tls_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) @@ -179,7 +179,7 @@ pub struct TlsListenerResource { local_addr: SocketAddr, } -impl CoreResource for TlsListenerResource {} +impl Resource for TlsListenerResource {} impl Drop for TlsListenerResource { fn drop(&mut self) { @@ -389,14 +389,13 @@ fn op_accept_tls( .accept(tcp_stream) .map_err(ErrBox::from) .and_then(move |tls_stream| { - let tls_stream_resource = - resources::add_server_tls_stream(tls_stream); - Ok((tls_stream_resource, local_addr, remote_addr)) + let rid = resources::add_server_tls_stream(tls_stream); + Ok((rid, local_addr, remote_addr)) }) }) - .and_then(move |(tls_stream_resource, local_addr, remote_addr)| { + .and_then(move |(rid, local_addr, remote_addr)| { futures::future::ok(json!({ - "rid": tls_stream_resource.rid, + "rid": rid, "localAddr": local_addr.to_string(), "remoteAddr": remote_addr.to_string(), })) |