summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2018-10-03 23:58:29 -0400
committerGitHub <noreply@github.com>2018-10-03 23:58:29 -0400
commit0422b224e8a55bf259cdbe955d825229496a0687 (patch)
tree1bf47de8ca1a5405a40d2303cff81f72190095bb
parente5e7f0f038494bfe8aa14e31c8d13d6cf481186a (diff)
First pass at support for TCP servers and clients. (#884)
Adds deno.listen(), deno.dial(), deno.Listener and deno.Conn.
-rw-r--r--BUILD.gn12
-rw-r--r--js/deno.ts1
-rw-r--r--js/files.ts2
-rw-r--r--js/net.ts175
-rw-r--r--js/net_test.ts37
-rw-r--r--js/unit_tests.ts1
-rw-r--r--src/handlers.rs150
-rw-r--r--src/main.rs1
-rw-r--r--src/msg.fbs30
-rw-r--r--src/resources.rs48
-rw-r--r--src/tokio_util.rs46
11 files changed, 497 insertions, 6 deletions
diff --git a/BUILD.gn b/BUILD.gn
index 8ed57ca40..4547c9a33 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -70,6 +70,7 @@ ts_sources = [
"js/blob.ts",
"js/compiler.ts",
"js/console.ts",
+ "js/copy_file.ts",
"js/deno.ts",
"js/dispatch.ts",
"js/dom_types.ts",
@@ -77,22 +78,23 @@ ts_sources = [
"js/fetch.ts",
"js/fileinfo.ts",
"js/files.ts",
- "js/io.ts",
"js/global-eval.ts",
"js/globals.ts",
+ "js/io.ts",
"js/libdeno.ts",
"js/main.ts",
- "js/mkdir.ts",
"js/make_temp_dir.ts",
+ "js/mkdir.ts",
"js/mock_builtin.js",
+ "js/net.ts",
"js/os.ts",
"js/platform.ts",
"js/plugins.d.ts",
- "js/read_file.ts",
"js/read_dir.ts",
+ "js/read_file.ts",
+ "js/read_link.ts",
"js/remove.ts",
"js/rename.ts",
- "js/read_link.ts",
"js/stat.ts",
"js/symlink.ts",
"js/text_encoding.ts",
@@ -103,7 +105,7 @@ ts_sources = [
"js/util.ts",
"js/v8_source_maps.ts",
"js/write_file.ts",
- "js/copy_file.ts",
+
"js/tsconfig.declarations.json",
"tsconfig.json",
diff --git a/js/deno.ts b/js/deno.ts
index b4181de14..c12df1236 100644
--- a/js/deno.ts
+++ b/js/deno.ts
@@ -21,4 +21,5 @@ export { platform } from "./platform";
export { trace } from "./trace";
export { truncateSync, truncate } from "./truncate";
export { FileInfo } from "./fileinfo";
+export { connect, dial, listen, Listener, Conn } from "./net";
export const args: string[] = [];
diff --git a/js/files.ts b/js/files.ts
index db2c903c4..16d3f11ab 100644
--- a/js/files.ts
+++ b/js/files.ts
@@ -18,7 +18,7 @@ export class File implements Reader, Writer, Closer {
}
close(): void {
- return close(this.fd);
+ close(this.fd);
}
}
diff --git a/js/net.ts b/js/net.ts
new file mode 100644
index 000000000..a0307bfe9
--- /dev/null
+++ b/js/net.ts
@@ -0,0 +1,175 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+import { ReadResult, Reader, Writer, Closer } from "./io";
+import * as fbs from "gen/msg_generated";
+import { assert, notImplemented } from "./util";
+import * as dispatch from "./dispatch";
+import { flatbuffers } from "flatbuffers";
+import { read, write, close } from "./files";
+
+export type Network = "tcp";
+// TODO support other types:
+// export type Network = "tcp" | "tcp4" | "tcp6" | "unix" | "unixpacket";
+
+// TODO Support finding network from Addr, see https://golang.org/pkg/net/#Addr
+export type Addr = string;
+
+/** A Listener is a generic network listener for stream-oriented protocols. */
+export interface Listener {
+ /** accept() waits for and returns the next connection to the Listener. */
+ accept(): Promise<Conn>;
+
+ /** Close closes the listener.
+ * Any pending accept promises will be rejected with errors.
+ */
+ close(): void;
+
+ addr(): Addr;
+}
+
+class ListenerImpl implements Listener {
+ constructor(readonly fd: number) {}
+
+ async accept(): Promise<Conn> {
+ const builder = new flatbuffers.Builder();
+ fbs.Accept.startAccept(builder);
+ fbs.Accept.addRid(builder, this.fd);
+ const msg = fbs.Accept.endAccept(builder);
+ const baseRes = await dispatch.sendAsync(builder, fbs.Any.Accept, msg);
+ assert(baseRes != null);
+ assert(fbs.Any.NewConn === baseRes!.msgType());
+ const res = new fbs.NewConn();
+ assert(baseRes!.msg(res) != null);
+ return new ConnImpl(res.rid(), res.remoteAddr()!, res.localAddr()!);
+ }
+
+ close(): void {
+ close(this.fd);
+ }
+
+ addr(): Addr {
+ return notImplemented();
+ }
+}
+
+export interface Conn extends Reader, Writer, Closer {
+ localAddr: string;
+ remoteAddr: string;
+}
+
+class ConnImpl implements Conn {
+ constructor(
+ readonly fd: number,
+ readonly remoteAddr: string,
+ readonly localAddr: string
+ ) {}
+
+ write(p: ArrayBufferView): Promise<number> {
+ return write(this.fd, p);
+ }
+
+ read(p: ArrayBufferView): Promise<ReadResult> {
+ return read(this.fd, p);
+ }
+
+ close(): void {
+ close(this.fd);
+ }
+
+ /** closeRead shuts down (shutdown(2)) the reading side of the TCP connection.
+ * Most callers should just use close().
+ */
+ closeRead(): void {
+ // TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
+ return notImplemented();
+ }
+
+ /** closeWrite shuts down (shutdown(2)) the writing side of the TCP
+ * connection. Most callers should just use close().
+ */
+ closeWrite(): void {
+ // TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
+ return notImplemented();
+ }
+}
+
+/** Listen announces on the local network address.
+ *
+ * The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
+ *
+ * For TCP networks, if the host in the address parameter is empty or a literal
+ * unspecified IP address, Listen listens on all available unicast and anycast
+ * IP addresses of the local system. To only use IPv4, use network "tcp4". The
+ * address can use a host name, but this is not recommended, because it will
+ * create a listener for at most one of the host's IP addresses. If the port in
+ * the address parameter is empty or "0", as in "127.0.0.1:" or "[::1]:0", a
+ * port number is automatically chosen. The Addr method of Listener can be used
+ * to discover the chosen port.
+ *
+ * See dial() for a description of the network and address parameters.
+ */
+export function listen(network: Network, address: string): Listener {
+ const builder = new flatbuffers.Builder();
+ const network_ = builder.createString(network);
+ const address_ = builder.createString(address);
+ fbs.Listen.startListen(builder);
+ fbs.Listen.addNetwork(builder, network_);
+ fbs.Listen.addAddress(builder, address_);
+ const msg = fbs.Listen.endListen(builder);
+ const baseRes = dispatch.sendSync(builder, fbs.Any.Listen, msg);
+ assert(baseRes != null);
+ assert(fbs.Any.ListenRes === baseRes!.msgType());
+ const res = new fbs.ListenRes();
+ assert(baseRes!.msg(res) != null);
+ return new ListenerImpl(res.rid());
+}
+
+/** Dial connects to the address on the named network.
+ *
+ * Supported networks are only "tcp" currently.
+ * TODO: "tcp4" (IPv4-only), "tcp6" (IPv6-only), "udp", "udp4"
+ * (IPv4-only), "udp6" (IPv6-only), "ip", "ip4" (IPv4-only), "ip6" (IPv6-only),
+ * "unix", "unixgram" and "unixpacket".
+ *
+ * For TCP and UDP networks, the address has the form "host:port". The host must
+ * be a literal IP address, or a host name that can be resolved to IP addresses.
+ * The port must be a literal port number or a service name. If the host is a
+ * literal IPv6 address it must be enclosed in square brackets, as in
+ * "[2001:db8::1]:80" or "[fe80::1%zone]:80". The zone specifies the scope of
+ * the literal IPv6 address as defined in RFC 4007. The functions JoinHostPort
+ * and SplitHostPort manipulate a pair of host and port in this form. When using
+ * TCP, and the host resolves to multiple IP addresses, Dial will try each IP
+ * address in order until one succeeds.
+ *
+ * Examples:
+ *
+ * dial("tcp", "golang.org:http")
+ * dial("tcp", "192.0.2.1:http")
+ * dial("tcp", "198.51.100.1:80")
+ * dial("udp", "[2001:db8::1]:domain")
+ * dial("udp", "[fe80::1%lo0]:53")
+ * dial("tcp", ":80")
+ */
+export async function dial(network: Network, address: string): Promise<Conn> {
+ const builder = new flatbuffers.Builder();
+ const network_ = builder.createString(network);
+ const address_ = builder.createString(address);
+ fbs.Dial.startDial(builder);
+ fbs.Dial.addNetwork(builder, network_);
+ fbs.Dial.addAddress(builder, address_);
+ const msg = fbs.Dial.endDial(builder);
+ const baseRes = await dispatch.sendAsync(builder, fbs.Any.Dial, msg);
+ assert(baseRes != null);
+ assert(fbs.Any.NewConn === baseRes!.msgType());
+ const res = new fbs.NewConn();
+ assert(baseRes!.msg(res) != null);
+ return new ConnImpl(res.rid(), res.remoteAddr()!, res.localAddr()!);
+}
+
+// Unused but reserved op.
+export async function connect(
+ network: Network,
+ address: string
+): Promise<Conn> {
+ return notImplemented();
+}
diff --git a/js/net_test.ts b/js/net_test.ts
new file mode 100644
index 000000000..0b6db7afa
--- /dev/null
+++ b/js/net_test.ts
@@ -0,0 +1,37 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+
+import * as deno from "deno";
+import { testPerm, assert, assertEqual } from "./test_util.ts";
+
+testPerm({ net: true }, function netListenClose() {
+ const listener = deno.listen("tcp", "127.0.0.1:4500");
+ listener.close();
+});
+
+testPerm({ net: true }, async function netDialListen() {
+ let addr = "127.0.0.1:4500";
+ const listener = deno.listen("tcp", addr);
+ listener.accept().then(async conn => {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ conn.close();
+ });
+ const conn = await deno.dial("tcp", addr);
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEqual(3, readResult.nread);
+ assertEqual(1, buf[0]);
+ assertEqual(2, buf[1]);
+ assertEqual(3, buf[2]);
+
+ // TODO Currently ReadResult does not properly transmit EOF in the same call.
+ // it requires a second call to get the EOF. Either ReadResult to be an
+ // integer in which 0 signifies EOF or the handler should be modified so that
+ // EOF is properly transmitted.
+ assertEqual(false, readResult.eof);
+
+ const readResult2 = await conn.read(buf);
+ assertEqual(true, readResult2.eof);
+
+ listener.close();
+ conn.close();
+});
diff --git a/js/unit_tests.ts b/js/unit_tests.ts
index 4ec5a720c..ca152ad39 100644
--- a/js/unit_tests.ts
+++ b/js/unit_tests.ts
@@ -20,6 +20,7 @@ import "./timers_test.ts";
import "./symlink_test.ts";
import "./platform_test.ts";
import "./text_encoding_test.ts";
+import "./net_test.ts";
import "./trace_test.ts";
import "./truncate_test.ts";
import "./v8_source_maps_test.ts";
diff --git a/src/handlers.rs b/src/handlers.rs
index 80eb2871e..ad701c4e1 100644
--- a/src/handlers.rs
+++ b/src/handlers.rs
@@ -9,6 +9,7 @@ use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
+use tokio_util;
use flatbuffers::FlatBufferBuilder;
use futures;
@@ -21,14 +22,18 @@ use remove_dir_all::remove_dir_all;
use resources;
use std;
use std::fs;
+use std::net::SocketAddr;
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::path::PathBuf;
+use std::str::FromStr;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant};
use tokio;
+use tokio::net::TcpListener;
+use tokio::net::TcpStream;
use tokio_io;
use tokio_threadpool;
@@ -74,6 +79,7 @@ pub fn msg_from_js(
msg::Any::Open => handle_open,
msg::Any::Read => handle_read,
msg::Any::Write => handle_write,
+ msg::Any::Close => handle_close,
msg::Any::Remove => handle_remove,
msg::Any::ReadFile => handle_read_file,
msg::Any::ReadDir => handle_read_dir,
@@ -86,6 +92,9 @@ pub fn msg_from_js(
msg::Any::WriteFile => handle_write_file,
msg::Any::Exit => handle_exit,
msg::Any::CopyFile => handle_copy_file,
+ msg::Any::Listen => handle_listen,
+ msg::Any::Accept => handle_accept,
+ msg::Any::Dial => handle_dial,
_ => panic!(format!(
"Unhandled message {}",
msg::enum_name_any(msg_type)
@@ -581,6 +590,26 @@ fn handle_open(
Box::new(op)
}
+fn handle_close(
+ _state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ let msg = base.msg_as_close().unwrap();
+ let rid = msg.rid();
+ match resources::lookup(rid) {
+ None => odd_future(errors::new(
+ errors::ErrorKind::BadFileDescriptor,
+ String::from("Bad File Descriptor"),
+ )),
+ Some(mut resource) => {
+ resource.close();
+ ok_future(empty_buf())
+ }
+ }
+}
+
fn handle_read(
_state: Arc<IsolateState>,
base: &msg::Base,
@@ -994,3 +1023,124 @@ fn handle_truncate(
Ok(empty_buf())
})
}
+
+fn handle_listen(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ if !state.flags.allow_net {
+ return odd_future(permission_denied());
+ }
+
+ let cmd_id = base.cmd_id();
+ let msg = base.msg_as_listen().unwrap();
+ let network = msg.network().unwrap();
+ assert_eq!(network, "tcp");
+ let address = msg.address().unwrap();
+
+ Box::new(futures::future::result((move || {
+ // TODO properly parse addr
+ let addr = SocketAddr::from_str(address).unwrap();
+
+ let listener = TcpListener::bind(&addr)?;
+ let resource = resources::add_tcp_listener(listener);
+
+ let builder = &mut FlatBufferBuilder::new();
+ let msg = msg::ListenRes::create(
+ builder,
+ &msg::ListenResArgs {
+ rid: resource.rid,
+ ..Default::default()
+ },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ msg: Some(msg.as_union_value()),
+ msg_type: msg::Any::ListenRes,
+ ..Default::default()
+ },
+ ))
+ })()))
+}
+
+fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
+ let tcp_stream_resource = resources::add_tcp_stream(tcp_stream);
+ // TODO forward socket_addr to client.
+
+ let builder = &mut FlatBufferBuilder::new();
+ let msg = msg::NewConn::create(
+ builder,
+ &msg::NewConnArgs {
+ rid: tcp_stream_resource.rid,
+ ..Default::default()
+ },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ msg: Some(msg.as_union_value()),
+ msg_type: msg::Any::NewConn,
+ ..Default::default()
+ },
+ ))
+}
+
+fn handle_accept(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ if !state.flags.allow_net {
+ return odd_future(permission_denied());
+ }
+
+ let cmd_id = base.cmd_id();
+ let msg = base.msg_as_accept().unwrap();
+ let server_rid = msg.rid();
+
+ match resources::lookup(server_rid) {
+ None => odd_future(errors::new(
+ errors::ErrorKind::BadFileDescriptor,
+ String::from("Bad File Descriptor"),
+ )),
+ Some(server_resource) => {
+ let op = tokio_util::accept(server_resource)
+ .map_err(|err| DenoError::from(err))
+ .and_then(move |(tcp_stream, _socket_addr)| {
+ new_conn(cmd_id, tcp_stream)
+ });
+ Box::new(op)
+ }
+ }
+}
+
+fn handle_dial(
+ state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ if !state.flags.allow_net {
+ return odd_future(permission_denied());
+ }
+
+ let cmd_id = base.cmd_id();
+ let msg = base.msg_as_dial().unwrap();
+ let network = msg.network().unwrap();
+ assert_eq!(network, "tcp");
+ let address = msg.address().unwrap();
+
+ // TODO properly parse addr
+ let addr = SocketAddr::from_str(address).unwrap();
+
+ let op = TcpStream::connect(&addr)
+ .map_err(|err| err.into())
+ .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream));
+ Box::new(op)
+}
diff --git a/src/main.rs b/src/main.rs
index d07eb5b07..82681cc34 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,6 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
extern crate flatbuffers;
+#[macro_use]
extern crate futures;
extern crate hyper;
extern crate libc;
diff --git a/src/msg.fbs b/src/msg.fbs
index 83dca11e6..b47230389 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -35,6 +35,11 @@ union Any {
Write,
WriteRes,
Close,
+ Listen,
+ ListenRes,
+ Accept,
+ Dial,
+ NewConn,
}
enum ErrorKind: byte {
@@ -285,4 +290,29 @@ table Close {
rid: int;
}
+table Listen {
+ network: string;
+ address: string;
+}
+
+table ListenRes {
+ rid: int;
+}
+
+table Accept {
+ rid: int;
+}
+
+table Dial {
+ network: string;
+ address: string;
+}
+
+// Response to Accept and Dial.
+table NewConn {
+ rid: int;
+ remote_addr: string;
+ local_addr: string;
+}
+
root_type Base;
diff --git a/src/resources.rs b/src/resources.rs
index 940b14d21..75bad04b7 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -14,11 +14,13 @@ use std;
use std::collections::HashMap;
use std::io::Error;
use std::io::{Read, Write};
+use std::net::SocketAddr;
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
+use tokio::net::TcpStream;
pub type ResourceId = i32; // Sometimes referred to RID.
@@ -45,14 +47,40 @@ enum Repr {
Stdout(tokio::io::Stdout),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
+ TcpListener(tokio::net::TcpListener),
+ TcpStream(tokio::net::TcpStream),
}
// Abstract async file interface.
// Ideally in unix, if Resource represents an OS rid, it will be the same.
+#[derive(Debug)]
pub struct Resource {
pub rid: ResourceId,
}
+impl Resource {
+ // TODO Should it return a Resource instead of net::TcpStream?
+ pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), Error> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let maybe_repr = table.get_mut(&self.rid);
+ match maybe_repr {
+ None => panic!("bad rid"),
+ Some(repr) => match repr {
+ Repr::TcpListener(ref mut s) => s.poll_accept(),
+ _ => panic!("Cannot accept"),
+ },
+ }
+ }
+
+ // close(2) is done by dropping the value. Therefore we just need to remove
+ // the resource from the RESOURCE_TABLE.
+ pub fn close(&mut self) {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let r = table.remove(&self.rid);
+ assert!(r.is_some());
+ }
+}
+
impl Read for Resource {
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
unimplemented!();
@@ -68,9 +96,11 @@ impl AsyncRead for Resource {
Some(repr) => match repr {
Repr::FsFile(ref mut f) => f.poll_read(buf),
Repr::Stdin(ref mut f) => f.poll_read(buf),
+ Repr::TcpStream(ref mut f) => f.poll_read(buf),
Repr::Stdout(_) | Repr::Stderr(_) => {
panic!("Cannot read from stdout/stderr")
}
+ Repr::TcpListener(_) => panic!("Cannot read"),
},
}
}
@@ -96,7 +126,9 @@ impl AsyncWrite for Resource {
Repr::FsFile(ref mut f) => f.poll_write(buf),
Repr::Stdout(ref mut f) => f.poll_write(buf),
Repr::Stderr(ref mut f) => f.poll_write(buf),
+ Repr::TcpStream(ref mut f) => f.poll_write(buf),
Repr::Stdin(_) => panic!("Cannot write to stdin"),
+ Repr::TcpListener(_) => panic!("Cannot write"),
},
}
}
@@ -120,6 +152,22 @@ pub fn add_fs_file(fs_file: tokio::fs::File) -> Resource {
}
}
+pub fn add_tcp_listener(listener: tokio::net::TcpListener) -> Resource {
+ let rid = new_rid();
+ let mut tg = RESOURCE_TABLE.lock().unwrap();
+ let r = tg.insert(rid, Repr::TcpListener(listener));
+ assert!(r.is_none());
+ Resource { rid }
+}
+
+pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource {
+ let rid = new_rid();
+ let mut tg = RESOURCE_TABLE.lock().unwrap();
+ let r = tg.insert(rid, Repr::TcpStream(stream));
+ assert!(r.is_none());
+ Resource { rid }
+}
+
pub fn lookup(rid: ResourceId) -> Option<Resource> {
let table = RESOURCE_TABLE.lock().unwrap();
table.get(&rid).map(|_| Resource { rid })
diff --git a/src/tokio_util.rs b/src/tokio_util.rs
index de81620ef..0a2a34917 100644
--- a/src/tokio_util.rs
+++ b/src/tokio_util.rs
@@ -1,8 +1,15 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+use resources::Resource;
+
use futures;
use futures::Future;
+use futures::Poll;
+use std::io;
+use std::mem;
+use std::net::SocketAddr;
use tokio;
+use tokio::net::TcpStream;
use tokio_executor;
pub fn block_on<F, R, E>(future: F) -> Result<R, E>
@@ -28,3 +35,42 @@ where
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
}
+
+#[derive(Debug)]
+enum AcceptState {
+ Pending(Resource),
+ Empty,
+}
+
+/// Simply accepts a connection.
+pub fn accept(r: Resource) -> Accept {
+ Accept {
+ state: AcceptState::Pending(r),
+ }
+}
+
+/// A future which can be used to easily read available number of bytes to fill
+/// a buffer.
+///
+/// Created by the [`read`] function.
+#[derive(Debug)]
+pub struct Accept {
+ state: AcceptState,
+}
+
+impl Future for Accept {
+ type Item = (TcpStream, SocketAddr);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let (stream, addr) = match self.state {
+ AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()),
+ AcceptState::Empty => panic!("poll Accept after it's done"),
+ };
+
+ match mem::replace(&mut self.state, AcceptState::Empty) {
+ AcceptState::Pending(_) => Ok((stream, addr).into()),
+ AcceptState::Empty => panic!("invalid internal state"),
+ }
+ }
+}