summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/eager_unix.rs86
-rw-r--r--src/main.rs3
-rw-r--r--src/resources.rs163
3 files changed, 130 insertions, 122 deletions
diff --git a/src/eager_unix.rs b/src/eager_unix.rs
new file mode 100644
index 000000000..8646e2b23
--- /dev/null
+++ b/src/eager_unix.rs
@@ -0,0 +1,86 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+use resources::{EagerAccept, EagerRead, EagerWrite, Resource};
+use tokio_util;
+use tokio_write;
+
+use futures::future::{self, Either};
+use std;
+use std::io::{ErrorKind, Read, Write};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
+use tokio;
+use tokio::net::{TcpListener, TcpStream};
+use tokio_io;
+
+pub fn tcp_read<T: AsMut<[u8]>>(
+ tcp_stream: &TcpStream,
+ resource: Resource,
+ mut buf: T,
+) -> EagerRead<Resource, T> {
+ // Unforunately we can't just call read() on tokio::net::TcpStream
+ let fd = (*tcp_stream).as_raw_fd();
+ let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
+ let read_result = std_tcp_stream.read(buf.as_mut());
+ // std_tcp_stream will close when it gets dropped. Thus...
+ let _ = std_tcp_stream.into_raw_fd();
+ match read_result {
+ Ok(nread) => Either::B(future::ok((resource, buf, nread))),
+ Err(err) => {
+ if err.kind() == ErrorKind::WouldBlock {
+ Either::A(tokio_io::io::read(resource, buf))
+ } else {
+ Either::B(future::err(err))
+ }
+ }
+ }
+}
+
+pub fn tcp_write<T: AsRef<[u8]>>(
+ tcp_stream: &TcpStream,
+ resource: Resource,
+ buf: T,
+) -> EagerWrite<Resource, T> {
+ let fd = (*tcp_stream).as_raw_fd();
+ let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
+ let write_result = std_tcp_stream.write(buf.as_ref());
+ // std_tcp_stream will close when it gets dropped. Thus...
+ let _ = std_tcp_stream.into_raw_fd();
+ match write_result {
+ Ok(nwrite) => Either::B(future::ok((resource, buf, nwrite))),
+ Err(err) => {
+ if err.kind() == ErrorKind::WouldBlock {
+ Either::A(tokio_write::write(resource, buf))
+ } else {
+ Either::B(future::err(err))
+ }
+ }
+ }
+}
+
+pub fn tcp_accept(
+ tcp_listener: &TcpListener,
+ resource: Resource,
+) -> EagerAccept {
+ let fd = (*tcp_listener).as_raw_fd();
+ let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) };
+ let result = std_listener.accept();
+ // std_listener will close when it gets dropped. Thus...
+ let _ = std_listener.into_raw_fd();
+ match result {
+ Ok((std_stream, addr)) => {
+ let result = tokio::net::TcpStream::from_std(
+ std_stream,
+ &tokio::reactor::Handle::default(),
+ );
+ let tokio_stream = result.unwrap();
+ Either::B(future::ok((tokio_stream, addr)))
+ }
+ Err(err) => {
+ if err.kind() == ErrorKind::WouldBlock {
+ Either::A(tokio_util::accept(resource))
+ } else {
+ Either::B(future::err(err))
+ }
+ }
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index feca8aedd..fab7be344 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -36,6 +36,9 @@ mod tokio_util;
mod tokio_write;
mod version;
+#[cfg(unix)]
+mod eager_unix;
+
use std::env;
static LOGGER: Logger = Logger;
diff --git a/src/resources.rs b/src/resources.rs
index 41f1359c3..5a41ee2db 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -8,18 +8,18 @@
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.
+#[cfg(unix)]
+use eager_unix as eager;
use errors::DenoError;
use tokio_util;
use tokio_write;
use futures;
-use futures::future::Either;
-use futures::future::FutureResult;
+use futures::future::{Either, FutureResult};
use futures::Poll;
use std;
use std::collections::HashMap;
-use std::io::Error;
-use std::io::{Read, Write};
+use std::io::{Error, Read, Write};
use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering;
@@ -194,161 +194,80 @@ pub fn lookup(rid: ResourceId) -> Option<Resource> {
table.get(&rid).map(|_| Resource { rid })
}
-type EagerRead<R, T> =
+pub type EagerRead<R, T> =
Either<tokio_io::io::Read<R, T>, FutureResult<(R, T, usize), std::io::Error>>;
-#[cfg(windows)]
-#[allow(unused_mut)]
-pub fn eager_read<T>(resource: Resource, mut buf: T) -> EagerRead<Resource, T>
-where
- T: AsMut<[u8]>,
-{
- Either::A(tokio_io::io::read(resource, buf)).into()
-}
+pub type EagerWrite<R, T> =
+ Either<tokio_write::Write<R, T>, FutureResult<(R, T, usize), std::io::Error>>;
-#[cfg(not(windows))]
-use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
+pub type EagerAccept = Either<
+ tokio_util::Accept,
+ FutureResult<(tokio::net::TcpStream, std::net::SocketAddr), std::io::Error>,
+>;
-#[cfg(not(windows))]
-fn eager_read_tcp<T: AsMut<[u8]>>(
- tcp_stream: &TcpStream,
+#[cfg(not(unix))]
+#[allow(unused_mut)]
+pub fn eager_read<T: AsMut<[u8]>>(
resource: Resource,
mut buf: T,
) -> EagerRead<Resource, T> {
- // Unforunately we can't just call read() on tokio::net::TcpStream
- let fd = (*tcp_stream).as_raw_fd();
- let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
- let read_result = std_tcp_stream.read(buf.as_mut());
- // std_tcp_stream will close when it gets dropped. Thus...
- let _ = std_tcp_stream.into_raw_fd();
- match read_result {
- Ok(nread) => Either::B(futures::future::ok((resource, buf, nread))),
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Either::A(tokio_io::io::read(resource, buf))
- } else {
- Either::B(futures::future::err(err))
- }
- }
- }
+ Either::A(tokio_io::io::read(resource, buf)).into()
+}
+
+#[cfg(not(unix))]
+pub fn eager_write<T: AsRef<[u8]>>(
+ resource: Resource,
+ buf: T,
+) -> EagerWrite<Resource, T> {
+ Either::A(tokio_write::write(resource, buf)).into()
+}
+
+#[cfg(not(unix))]
+pub fn eager_accept(resource: Resource) -> EagerAccept {
+ Either::A(tokio_util::accept(resource)).into()
}
// This is an optimization that Tokio should do.
// Attempt to call read() on the main thread.
-#[cfg(not(windows))]
-pub fn eager_read<T>(resource: Resource, buf: T) -> EagerRead<Resource, T>
-where
- T: AsMut<[u8]>,
-{
+#[cfg(unix)]
+pub fn eager_read<T: AsMut<[u8]>>(
+ resource: Resource,
+ buf: T,
+) -> EagerRead<Resource, T> {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&resource.rid);
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut tcp_stream) => {
- eager_read_tcp(tcp_stream, resource, buf)
+ eager::tcp_read(tcp_stream, resource, buf)
}
_ => Either::A(tokio_io::io::read(resource, buf)),
},
}
}
-type EagerWrite<R, T> =
- Either<tokio_write::Write<R, T>, FutureResult<(R, T, usize), std::io::Error>>;
-
-#[cfg(windows)]
-pub fn eager_write<T>(resource: Resource, buf: T) -> EagerWrite<Resource, T>
-where
- T: AsRef<[u8]>,
-{
- Either::A(tokio_write::write(resource, buf)).into()
-}
-
-#[cfg(not(windows))]
-fn eager_write_tcp<T: AsRef<[u8]>>(
- tcp_stream: &TcpStream,
+// This is an optimization that Tokio should do.
+// Attempt to call write() on the main thread.
+#[cfg(unix)]
+pub fn eager_write<T: AsRef<[u8]>>(
resource: Resource,
buf: T,
) -> EagerWrite<Resource, T> {
- let fd = (*tcp_stream).as_raw_fd();
- let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) };
- let write_result = std_tcp_stream.write(buf.as_ref());
- // std_tcp_stream will close when it gets dropped. Thus...
- let _ = std_tcp_stream.into_raw_fd();
- match write_result {
- Ok(nwrite) => Either::B(futures::future::ok((resource, buf, nwrite))),
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Either::A(tokio_write::write(resource, buf))
- } else {
- Either::B(futures::future::err(err))
- }
- }
- }
-}
-
-// This is an optimization that Tokio should do.
-// Attempt to call write() on the main thread.
-#[cfg(not(windows))]
-pub fn eager_write<T>(resource: Resource, buf: T) -> EagerWrite<Resource, T>
-where
- T: AsRef<[u8]>,
-{
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&resource.rid);
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut tcp_stream) => {
- eager_write_tcp(tcp_stream, resource, buf)
+ eager::tcp_write(tcp_stream, resource, buf)
}
_ => Either::A(tokio_write::write(resource, buf)),
},
}
}
-type EagerAccept = Either<
- tokio_util::Accept,
- FutureResult<(TcpStream, SocketAddr), std::io::Error>,
->;
-
-#[cfg(windows)]
-pub fn eager_accept(resource: Resource) -> EagerAccept {
- Either::A(tokio_util::accept(resource)).into()
-}
-
-#[cfg(not(windows))]
-fn eager_accept_tcp(
- tcp_listener: &tokio::net::TcpListener,
- resource: Resource,
-) -> EagerAccept {
- let fd = (*tcp_listener).as_raw_fd();
- let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) };
- let result = std_listener.accept();
- // std_listener will close when it gets dropped. Thus...
- let _ = std_listener.into_raw_fd();
- match result {
- Ok((std_stream, addr)) => {
- let result = tokio::net::TcpStream::from_std(
- std_stream,
- &tokio::reactor::Handle::default(),
- );
- let tokio_stream = result.unwrap();
- Either::B(futures::future::ok((tokio_stream, addr)))
- }
- Err(err) => {
- if err.kind() == std::io::ErrorKind::WouldBlock {
- Either::A(tokio_util::accept(resource))
- } else {
- Either::B(futures::future::err(err))
- }
- }
- }
-}
-
-// This is an optimization that Tokio should do.
-// Attempt to call write() on the main thread.
-#[cfg(not(windows))]
+#[cfg(unix)]
pub fn eager_accept(resource: Resource) -> EagerAccept {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&resource.rid);
@@ -356,7 +275,7 @@ pub fn eager_accept(resource: Resource) -> EagerAccept {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpListener(ref mut tcp_listener) => {
- eager_accept_tcp(tcp_listener, resource)
+ eager::tcp_accept(tcp_listener, resource)
}
_ => Either::A(tokio_util::accept(resource)),
},