summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2018-10-24 01:32:55 +0200
committerBert Belder <bertbelder@gmail.com>2018-10-24 11:16:00 +0200
commit58f0547e09090ea103279a4e04dbb6afeb3be55f (patch)
treeee05a38ab2098b9a54cca40756eb068213a01c78 /src
parent988ec88dd081fd84241f40b8bce64b2aeb39ebf7 (diff)
Refactor eager_{read,write,accept}_tcp into separate functions
Diffstat (limited to 'src')
-rw-r--r--src/resources.rs150
1 files changed, 84 insertions, 66 deletions
diff --git a/src/resources.rs b/src/resources.rs
index b6449c37e..41f1359c3 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -206,10 +206,37 @@ where
Either::A(tokio_io::io::read(resource, buf)).into()
}
+#[cfg(not(windows))]
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
+
+#[cfg(not(windows))]
+fn eager_read_tcp<T: AsMut<[u8]>>(
+ tcp_stream: &TcpStream,
+ resource: Resource,
+ mut buf: T,
+) -> EagerRead<Resource, T> {
+ // Unforunately we can't just call read() on tokio::net::TcpStream
+ let fd = (*tcp_stream).as_raw_fd();
+ let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
+ let read_result = std_tcp_stream.read(buf.as_mut());
+ // std_tcp_stream will close when it gets dropped. Thus...
+ let _ = std_tcp_stream.into_raw_fd();
+ match read_result {
+ Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))),
+ Err(err) => {
+ if err.kind() == std::io::ErrorKind::WouldBlock {
+ Either::A(tokio_io::io::read(resource, buf))
+ } else {
+ Either::B(futures::future::err(err))
+ }
+ }
+ }
+}
+
// This is an optimization that Tokio should do.
// Attempt to call read() on the main thread.
#[cfg(not(windows))]
-pub fn eager_read<T>(resource: Resource, mut buf: T) -> EagerRead<Resource, T>
+pub fn eager_read<T>(resource: Resource, buf: T) -> EagerRead<Resource, T>
where
T: AsMut<[u8]>,
{
@@ -219,25 +246,7 @@ where
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut tcp_stream) => {
- // Unforunately we can't just call read() on tokio::net::TcpStream
- use std::os::unix::io::AsRawFd;
- use std::os::unix::io::FromRawFd;
- use std::os::unix::io::IntoRawFd;
- let mut std_tcp_stream =
- unsafe { std::net::TcpStream::from_raw_fd(tcp_stream.as_raw_fd()) };
- let read_result = std_tcp_stream.read(buf.as_mut());
- // std_tcp_stream will close when it gets dropped. Thus...
- let _ = std_tcp_stream.into_raw_fd();
- match read_result {
- Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))),
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Either::A(tokio_io::io::read(resource, buf))
- } else {
- Either::B(futures::future::err(err))
- }
- }
- }
+ eager_read_tcp(tcp_stream, resource, buf)
}
_ => Either::A(tokio_io::io::read(resource, buf)),
},
@@ -255,6 +264,29 @@ where
Either::A(tokio_write::write(resource, buf)).into()
}
+#[cfg(not(windows))]
+fn eager_write_tcp<T: AsRef<[u8]>>(
+ tcp_stream: &TcpStream,
+ resource: Resource,
+ buf: T,
+) -> EagerWrite<Resource, T> {
+ let fd = (*tcp_stream).as_raw_fd();
+ let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
+ let write_result = std_tcp_stream.write(buf.as_ref());
+ // std_tcp_stream will close when it gets dropped. Thus...
+ let _ = std_tcp_stream.into_raw_fd();
+ match write_result {
+ Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))),
+ Err(err) => {
+ if err.kind() == std::io::ErrorKind::WouldBlock {
+ Either::A(tokio_write::write(resource, buf))
+ } else {
+ Either::B(futures::future::err(err))
+ }
+ }
+ }
+}
+
// This is an optimization that Tokio should do.
// Attempt to call write() on the main thread.
#[cfg(not(windows))]
@@ -268,25 +300,7 @@ where
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut tcp_stream) => {
- // Unforunately we can't just call write() on tokio::net::TcpStream
- use std::os::unix::io::AsRawFd;
- use std::os::unix::io::FromRawFd;
- use std::os::unix::io::IntoRawFd;
- let mut std_tcp_stream =
- unsafe { std::net::TcpStream::from_raw_fd(tcp_stream.as_raw_fd()) };
- let write_result = std_tcp_stream.write(buf.as_ref());
- // std_tcp_stream will close when it gets dropped. Thus...
- let _ = std_tcp_stream.into_raw_fd();
- match write_result {
- Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))),
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Either::A(tokio_write::write(resource, buf))
- } else {
- Either::B(futures::future::err(err))
- }
- }
- }
+ eager_write_tcp(tcp_stream, resource, buf)
}
_ => Either::A(tokio_write::write(resource, buf)),
},
@@ -303,6 +317,35 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
Either::A(tokio_util::accept(resource)).into()
}
+#[cfg(not(windows))]
+fn eager_accept_tcp(
+ tcp_listener: &tokio::net::TcpListener,
+ resource: Resource,
+) -> EagerAccept {
+ let fd = (*tcp_listener).as_raw_fd();
+ let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) };
+ let result = std_listener.accept();
+ // std_listener will close when it gets dropped. Thus...
+ let _ = std_listener.into_raw_fd();
+ match result {
+ Ok((std_stream, addr)) => {
+ let result = tokio::net::TcpStream::from_std(
+ std_stream,
+ &tokio::reactor::Handle::default(),
+ );
+ let tokio_stream = result.unwrap();
+ Either::B(futures::future::ok((tokio_stream, addr)))
+ }
+ Err(err) => {
+ if err.kind() == std::io::ErrorKind::WouldBlock {
+ Either::A(tokio_util::accept(resource))
+ } else {
+ Either::B(futures::future::err(err))
+ }
+ }
+ }
+}
+
// This is an optimization that Tokio should do.
// Attempt to call write() on the main thread.
#[cfg(not(windows))]
@@ -312,33 +355,8 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
- Repr::TcpListener(ref mut listener) => {
- // Unforunately we can't just call write() on tokio::net::TcpStream
- use std::os::unix::io::AsRawFd;
- use std::os::unix::io::FromRawFd;
- use std::os::unix::io::IntoRawFd;
- let mut std_listener =
- unsafe { std::net::TcpListener::from_raw_fd(listener.as_raw_fd()) };
- let result = std_listener.accept();
- // std_listener will close when it gets dropped. Thus...
- let _ = std_listener.into_raw_fd();
- match result {
- Ok((std_stream, addr)) => {
- let result = tokio::net::TcpStream::from_std(
- std_stream,
- &tokio::reactor::Handle::default(),
- );
- let tokio_stream = result.unwrap();
- Either::B(futures::future::ok((tokio_stream, addr)))
- }
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Either::A(tokio_util::accept(resource))
- } else {
- Either::B(futures::future::err(err))
- }
- }
- }
+ Repr::TcpListener(ref mut tcp_listener) => {
+ eager_accept_tcp(tcp_listener, resource)
}
_ => Either::A(tokio_util::accept(resource)),
},