diff options
-rw-r--r-- | src/eager_unix.rs | 86 | ||||
-rw-r--r-- | src/main.rs | 3 | ||||
-rw-r--r-- | src/resources.rs | 163 |
3 files changed, 130 insertions, 122 deletions
diff --git a/src/eager_unix.rs b/src/eager_unix.rs new file mode 100644 index 000000000..8646e2b23 --- /dev/null +++ b/src/eager_unix.rs @@ -0,0 +1,86 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +use resources::{EagerAccept, EagerRead, EagerWrite, Resource}; +use tokio_util; +use tokio_write; + +use futures::future::{self, Either}; +use std; +use std::io::{ErrorKind, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +use tokio; +use tokio::net::{TcpListener, TcpStream}; +use tokio_io; + +pub fn tcp_read<T: AsMut<[u8]>>( + tcp_stream: &TcpStream, + resource: Resource, + mut buf: T, +) -> EagerRead<Resource, T> { + // Unforunately we can't just call read() on tokio::net::TcpStream + let fd = (*tcp_stream).as_raw_fd(); + let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let read_result = std_tcp_stream.read(buf.as_mut()); + // std_tcp_stream will close when it gets dropped. Thus... + let _ = std_tcp_stream.into_raw_fd(); + match read_result { + Ok(nread) => Either::B(future::ok((resource, buf, nread))), + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + Either::A(tokio_io::io::read(resource, buf)) + } else { + Either::B(future::err(err)) + } + } + } +} + +pub fn tcp_write<T: AsRef<[u8]>>( + tcp_stream: &TcpStream, + resource: Resource, + buf: T, +) -> EagerWrite<Resource, T> { + let fd = (*tcp_stream).as_raw_fd(); + let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let write_result = std_tcp_stream.write(buf.as_ref()); + // std_tcp_stream will close when it gets dropped. Thus... + let _ = std_tcp_stream.into_raw_fd(); + match write_result { + Ok(nwrite) => Either::B(future::ok((resource, buf, nwrite))), + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + Either::A(tokio_write::write(resource, buf)) + } else { + Either::B(future::err(err)) + } + } + } +} + +pub fn tcp_accept( + tcp_listener: &TcpListener, + resource: Resource, +) -> EagerAccept { + let fd = (*tcp_listener).as_raw_fd(); + let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) }; + let result = std_listener.accept(); + // std_listener will close when it gets dropped. Thus... + let _ = std_listener.into_raw_fd(); + match result { + Ok((std_stream, addr)) => { + let result = tokio::net::TcpStream::from_std( + std_stream, + &tokio::reactor::Handle::default(), + ); + let tokio_stream = result.unwrap(); + Either::B(future::ok((tokio_stream, addr))) + } + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + Either::A(tokio_util::accept(resource)) + } else { + Either::B(future::err(err)) + } + } + } +} diff --git a/src/main.rs b/src/main.rs index feca8aedd..fab7be344 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,9 @@ mod tokio_util; mod tokio_write; mod version; +#[cfg(unix)] +mod eager_unix; + use std::env; static LOGGER: Logger = Logger; diff --git a/src/resources.rs b/src/resources.rs index 41f1359c3..5a41ee2db 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -8,18 +8,18 @@ // descriptors". This module implements a global resource table. Ops (AKA // handlers) look up resources by their integer id here. +#[cfg(unix)] +use eager_unix as eager; use errors::DenoError; use tokio_util; use tokio_write; use futures; -use futures::future::Either; -use futures::future::FutureResult; +use futures::future::{Either, FutureResult}; use futures::Poll; use std; use std::collections::HashMap; -use std::io::Error; -use std::io::{Read, Write}; +use std::io::{Error, Read, Write}; use std::net::{Shutdown, SocketAddr}; use std::sync::atomic::AtomicIsize; use std::sync::atomic::Ordering; @@ -194,161 +194,80 @@ pub fn lookup(rid: ResourceId) -> Option<Resource> { table.get(&rid).map(|_| Resource { rid }) } -type EagerRead<R, T> = +pub type EagerRead<R, T> = Either<tokio_io::io::Read<R, T>, FutureResult<(R, T, usize), std::io::Error>>; -#[cfg(windows)] -#[allow(unused_mut)] -pub fn eager_read<T>(resource: Resource, mut buf: T) -> EagerRead<Resource, T> -where - T: AsMut<[u8]>, -{ - Either::A(tokio_io::io::read(resource, buf)).into() -} +pub type EagerWrite<R, T> = + Either<tokio_write::Write<R, T>, FutureResult<(R, T, usize), std::io::Error>>; -#[cfg(not(windows))] -use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +pub type EagerAccept = Either< + tokio_util::Accept, + FutureResult<(tokio::net::TcpStream, std::net::SocketAddr), std::io::Error>, +>; -#[cfg(not(windows))] -fn eager_read_tcp<T: AsMut<[u8]>>( - tcp_stream: &TcpStream, +#[cfg(not(unix))] +#[allow(unused_mut)] +pub fn eager_read<T: AsMut<[u8]>>( resource: Resource, mut buf: T, ) -> EagerRead<Resource, T> { - // Unforunately we can't just call read() on tokio::net::TcpStream - let fd = (*tcp_stream).as_raw_fd(); - let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; - let read_result = std_tcp_stream.read(buf.as_mut()); - // std_tcp_stream will close when it gets dropped. Thus... - let _ = std_tcp_stream.into_raw_fd(); - match read_result { - Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))), - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Either::A(tokio_io::io::read(resource, buf)) - } else { - Either::B(futures::future::err(err)) - } - } - } + Either::A(tokio_io::io::read(resource, buf)).into() +} + +#[cfg(not(unix))] +pub fn eager_write<T: AsRef<[u8]>>( + resource: Resource, + buf: T, +) -> EagerWrite<Resource, T> { + Either::A(tokio_write::write(resource, buf)).into() +} + +#[cfg(not(unix))] +pub fn eager_accept(resource: Resource) -> EagerAccept { + Either::A(tokio_util::accept(resource)).into() } // This is an optimization that Tokio should do. // Attempt to call read() on the main thread. -#[cfg(not(windows))] -pub fn eager_read<T>(resource: Resource, buf: T) -> EagerRead<Resource, T> -where - T: AsMut<[u8]>, -{ +#[cfg(unix)] +pub fn eager_read<T: AsMut<[u8]>>( + resource: Resource, + buf: T, +) -> EagerRead<Resource, T> { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&resource.rid); match maybe_repr { None => panic!("bad rid"), Some(repr) => match repr { Repr::TcpStream(ref mut tcp_stream) => { - eager_read_tcp(tcp_stream, resource, buf) + eager::tcp_read(tcp_stream, resource, buf) } _ => Either::A(tokio_io::io::read(resource, buf)), }, } } -type EagerWrite<R, T> = - Either<tokio_write::Write<R, T>, FutureResult<(R, T, usize), std::io::Error>>; - -#[cfg(windows)] -pub fn eager_write<T>(resource: Resource, buf: T) -> EagerWrite<Resource, T> -where - T: AsRef<[u8]>, -{ - Either::A(tokio_write::write(resource, buf)).into() -} - -#[cfg(not(windows))] -fn eager_write_tcp<T: AsRef<[u8]>>( - tcp_stream: &TcpStream, +// This is an optimization that Tokio should do. +// Attempt to call write() on the main thread. +#[cfg(unix)] +pub fn eager_write<T: AsRef<[u8]>>( resource: Resource, buf: T, ) -> EagerWrite<Resource, T> { - let fd = (*tcp_stream).as_raw_fd(); - let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; - let write_result = std_tcp_stream.write(buf.as_ref()); - // std_tcp_stream will close when it gets dropped. Thus... - let _ = std_tcp_stream.into_raw_fd(); - match write_result { - Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))), - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Either::A(tokio_write::write(resource, buf)) - } else { - Either::B(futures::future::err(err)) - } - } - } -} - -// This is an optimization that Tokio should do. -// Attempt to call write() on the main thread. -#[cfg(not(windows))] -pub fn eager_write<T>(resource: Resource, buf: T) -> EagerWrite<Resource, T> -where - T: AsRef<[u8]>, -{ let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&resource.rid); match maybe_repr { None => panic!("bad rid"), Some(repr) => match repr { Repr::TcpStream(ref mut tcp_stream) => { - eager_write_tcp(tcp_stream, resource, buf) + eager::tcp_write(tcp_stream, resource, buf) } _ => Either::A(tokio_write::write(resource, buf)), }, } } -type EagerAccept = Either< - tokio_util::Accept, - FutureResult<(TcpStream, SocketAddr), std::io::Error>, ->; - -#[cfg(windows)] -pub fn eager_accept(resource: Resource) -> EagerAccept { - Either::A(tokio_util::accept(resource)).into() -} - -#[cfg(not(windows))] -fn eager_accept_tcp( - tcp_listener: &tokio::net::TcpListener, - resource: Resource, -) -> EagerAccept { - let fd = (*tcp_listener).as_raw_fd(); - let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) }; - let result = std_listener.accept(); - // std_listener will close when it gets dropped. Thus... - let _ = std_listener.into_raw_fd(); - match result { - Ok((std_stream, addr)) => { - let result = tokio::net::TcpStream::from_std( - std_stream, - &tokio::reactor::Handle::default(), - ); - let tokio_stream = result.unwrap(); - Either::B(futures::future::ok((tokio_stream, addr))) - } - Err(err) => { - if err.kind() == std::io::ErrorKind::WouldBlock { - Either::A(tokio_util::accept(resource)) - } else { - Either::B(futures::future::err(err)) - } - } - } -} - -// This is an optimization that Tokio should do. -// Attempt to call write() on the main thread. -#[cfg(not(windows))] +#[cfg(unix)] pub fn eager_accept(resource: Resource) -> EagerAccept { let mut table = RESOURCE_TABLE.lock().unwrap(); let maybe_repr = table.get_mut(&resource.rid); @@ -356,7 +275,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept { None => panic!("bad rid"), Some(repr) => match repr { Repr::TcpListener(ref mut tcp_listener) => { - eager_accept_tcp(tcp_listener, resource) + eager::tcp_accept(tcp_listener, resource) } _ => Either::A(tokio_util::accept(resource)), }, |