From ae00df73a27afbc665d0c06c5a9515413fecb101 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Wed, 24 Oct 2018 03:12:21 +0200 Subject: Move eager functions into eager_unix.rs --- src/eager_unix.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 src/eager_unix.rs (limited to 'src/eager_unix.rs') diff --git a/src/eager_unix.rs b/src/eager_unix.rs new file mode 100644 index 000000000..8646e2b23 --- /dev/null +++ b/src/eager_unix.rs @@ -0,0 +1,86 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +use resources::{EagerAccept, EagerRead, EagerWrite, Resource}; +use tokio_util; +use tokio_write; + +use futures::future::{self, Either}; +use std; +use std::io::{ErrorKind, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; +use tokio; +use tokio::net::{TcpListener, TcpStream}; +use tokio_io; + +pub fn tcp_read>( + tcp_stream: &TcpStream, + resource: Resource, + mut buf: T, +) -> EagerRead { + // Unforunately we can't just call read() on tokio::net::TcpStream + let fd = (*tcp_stream).as_raw_fd(); + let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let read_result = std_tcp_stream.read(buf.as_mut()); + // std_tcp_stream will close when it gets dropped. Thus... + let _ = std_tcp_stream.into_raw_fd(); + match read_result { + Ok(nread) => Either::B(future::ok((resource, buf, nread))), + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + Either::A(tokio_io::io::read(resource, buf)) + } else { + Either::B(future::err(err)) + } + } + } +} + +pub fn tcp_write>( + tcp_stream: &TcpStream, + resource: Resource, + buf: T, +) -> EagerWrite { + let fd = (*tcp_stream).as_raw_fd(); + let mut std_tcp_stream = unsafe { std::net::TcpStream::from_raw_fd(fd) }; + let write_result = std_tcp_stream.write(buf.as_ref()); + // std_tcp_stream will close when it gets dropped. Thus... + let _ = std_tcp_stream.into_raw_fd(); + match write_result { + Ok(nwrite) => Either::B(future::ok((resource, buf, nwrite))), + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + Either::A(tokio_write::write(resource, buf)) + } else { + Either::B(future::err(err)) + } + } + } +} + +pub fn tcp_accept( + tcp_listener: &TcpListener, + resource: Resource, +) -> EagerAccept { + let fd = (*tcp_listener).as_raw_fd(); + let std_listener = unsafe { std::net::TcpListener::from_raw_fd(fd) }; + let result = std_listener.accept(); + // std_listener will close when it gets dropped. Thus... + let _ = std_listener.into_raw_fd(); + match result { + Ok((std_stream, addr)) => { + let result = tokio::net::TcpStream::from_std( + std_stream, + &tokio::reactor::Handle::default(), + ); + let tokio_stream = result.unwrap(); + Either::B(future::ok((tokio_stream, addr))) + } + Err(err) => { + if err.kind() == ErrorKind::WouldBlock { + Either::A(tokio_util::accept(resource)) + } else { + Either::B(future::err(err)) + } + } + } +} -- cgit v1.2.3