summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--js/net.ts26
-rw-r--r--js/net_test.ts113
-rw-r--r--js/util.ts26
-rw-r--r--src/msg.fbs6
-rw-r--r--src/ops.rs35
-rw-r--r--src/resources.rs18
6 files changed, 217 insertions, 7 deletions
diff --git a/js/net.ts b/js/net.ts
index ccbd331dd..1258a0ff1 100644
--- a/js/net.ts
+++ b/js/net.ts
@@ -55,6 +55,8 @@ class ListenerImpl implements Listener {
export interface Conn extends Reader, Writer, Closer {
localAddr: string;
remoteAddr: string;
+ closeRead(): void;
+ closeWrite(): void;
}
class ConnImpl implements Conn {
@@ -80,19 +82,35 @@ class ConnImpl implements Conn {
* Most callers should just use close().
*/
closeRead(): void {
- // TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
- return notImplemented();
+ shutdown(this.fd, ShutdownMode.Read);
}
/** closeWrite shuts down (shutdown(2)) the writing side of the TCP
* connection. Most callers should just use close().
*/
closeWrite(): void {
- // TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
- return notImplemented();
+ shutdown(this.fd, ShutdownMode.Write);
}
}
+enum ShutdownMode {
+ // See http://man7.org/linux/man-pages/man2/shutdown.2.html
+ // Corresponding to SHUT_RD, SHUT_WR, SHUT_RDWR
+ Read = 0,
+ Write,
+ ReadWrite // unused
+}
+
+function shutdown(fd: number, how: ShutdownMode) {
+ const builder = new flatbuffers.Builder();
+ msg.Shutdown.startShutdown(builder);
+ msg.Shutdown.addRid(builder, fd);
+ msg.Shutdown.addHow(builder, how);
+ const inner = msg.Shutdown.endShutdown(builder);
+ const baseRes = dispatch.sendSync(builder, msg.Any.Shutdown, inner);
+ assert(baseRes == null);
+}
+
/** Listen announces on the local network address.
*
* The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
diff --git a/js/net_test.ts b/js/net_test.ts
index 93ad6ee00..c7c8dbb59 100644
--- a/js/net_test.ts
+++ b/js/net_test.ts
@@ -2,6 +2,7 @@
import * as deno from "deno";
import { testPerm, assert, assertEqual } from "./test_util.ts";
+import { deferred } from "./util.ts";
testPerm({ net: true }, function netListenClose() {
const listener = deno.listen("tcp", "127.0.0.1:4500");
@@ -35,3 +36,115 @@ testPerm({ net: true }, async function netDialListen() {
listener.close();
conn.close();
});
+
+testPerm({ net: true }, async function netCloseReadSuccess() {
+ const addr = "127.0.0.1:4500";
+ const listener = deno.listen("tcp", addr);
+ const closeDeferred = deferred();
+ listener.accept().then(async conn => {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEqual(3, readResult.nread);
+ assertEqual(4, buf[0]);
+ assertEqual(5, buf[1]);
+ assertEqual(6, buf[2]);
+ conn.close();
+ closeDeferred.resolve();
+ });
+ const conn = await deno.dial("tcp", addr);
+ conn.closeRead(); // closing read
+ const buf = new Uint8Array(1024);
+ const readResult = await conn.read(buf);
+ assertEqual(0, readResult.nread); // No error, read nothing
+ assertEqual(true, readResult.eof); // with immediate EOF
+ // Ensure closeRead does not impact write
+ await conn.write(new Uint8Array([4, 5, 6]));
+ await closeDeferred.promise;
+ listener.close();
+ conn.close();
+});
+
+testPerm({ net: true }, async function netDoubleCloseRead() {
+ const addr = "127.0.0.1:4500";
+ const listener = deno.listen("tcp", addr);
+ const closeDeferred = deferred();
+ listener.accept().then(async conn => {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ await closeDeferred.promise;
+ conn.close();
+ });
+ const conn = await deno.dial("tcp", addr);
+ conn.closeRead(); // closing read
+ let err;
+ try {
+ // Duplicated close should throw error
+ conn.closeRead();
+ } catch (e) {
+ err = e;
+ }
+ assert(!!err);
+ assertEqual(err.kind, deno.ErrorKind.NotConnected);
+ assertEqual(err.name, "NotConnected");
+ closeDeferred.resolve();
+ listener.close();
+ conn.close();
+});
+
+testPerm({ net: true }, async function netCloseWriteSuccess() {
+ const addr = "127.0.0.1:4500";
+ const listener = deno.listen("tcp", addr);
+ const closeDeferred = deferred();
+ listener.accept().then(async conn => {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ await closeDeferred.promise;
+ conn.close();
+ });
+ const conn = await deno.dial("tcp", addr);
+ conn.closeWrite(); // closing write
+ const buf = new Uint8Array(1024);
+ // Check read not impacted
+ const readResult = await conn.read(buf);
+ assertEqual(3, readResult.nread);
+ assertEqual(1, buf[0]);
+ assertEqual(2, buf[1]);
+ assertEqual(3, buf[2]);
+ // Check write should be closed
+ let err;
+ try {
+ await conn.write(new Uint8Array([1, 2, 3]));
+ } catch (e) {
+ err = e;
+ }
+ assert(!!err);
+ assertEqual(err.kind, deno.ErrorKind.BrokenPipe);
+ assertEqual(err.name, "BrokenPipe");
+ closeDeferred.resolve();
+ listener.close();
+ conn.close();
+});
+
+testPerm({ net: true }, async function netDoubleCloseWrite() {
+ const addr = "127.0.0.1:4500";
+ const listener = deno.listen("tcp", addr);
+ const closeDeferred = deferred();
+ listener.accept().then(async conn => {
+ await closeDeferred.promise;
+ conn.close();
+ });
+ const conn = await deno.dial("tcp", addr);
+ conn.closeWrite(); // closing write
+ let err;
+ try {
+ // Duplicated close should throw error
+ conn.closeWrite();
+ } catch (e) {
+ err = e;
+ }
+ assert(!!err);
+ assertEqual(err.kind, deno.ErrorKind.NotConnected);
+ assertEqual(err.name, "NotConnected");
+ closeDeferred.resolve();
+ listener.close();
+ conn.close();
+});
diff --git a/js/util.ts b/js/util.ts
index bfde01908..de6a078bb 100644
--- a/js/util.ts
+++ b/js/util.ts
@@ -101,3 +101,29 @@ export function containsOnlyASCII(str: string): boolean {
}
return /^[\x00-\x7F]*$/.test(str);
}
+
+// @internal
+export interface Deferred {
+ promise: Promise<void>;
+ resolve: Function;
+ reject: Function;
+}
+
+/**
+ * Create a wrapper around a promise that could be
+ * resolved externally.
+ * @internal
+ */
+export function deferred(): Deferred {
+ let resolve: Function | undefined;
+ let reject: Function | undefined;
+ const promise = new Promise<void>((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ return {
+ promise,
+ resolve: resolve!,
+ reject: reject!
+ };
+}
diff --git a/src/msg.fbs b/src/msg.fbs
index 9479d7892..16d10cdff 100644
--- a/src/msg.fbs
+++ b/src/msg.fbs
@@ -35,6 +35,7 @@ union Any {
Write,
WriteRes,
Close,
+ Shutdown,
Listen,
ListenRes,
Accept,
@@ -290,6 +291,11 @@ table Close {
rid: int;
}
+table Shutdown {
+ rid: int;
+ how: uint;
+}
+
table Listen {
network: string;
address: string;
diff --git a/src/ops.rs b/src/ops.rs
index 53163dfd4..fb67d4bef 100644
--- a/src/ops.rs
+++ b/src/ops.rs
@@ -9,6 +9,8 @@ use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
+use resources;
+use resources::Resource;
use tokio_util;
use flatbuffers::FlatBufferBuilder;
@@ -19,10 +21,9 @@ use hyper;
use hyper::rt::{Future, Stream};
use hyper::Client;
use remove_dir_all::remove_dir_all;
-use resources;
use std;
use std::fs;
-use std::net::SocketAddr;
+use std::net::{Shutdown, SocketAddr};
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
@@ -84,6 +85,7 @@ pub fn dispatch(
msg::Any::Read => op_read,
msg::Any::Write => op_write,
msg::Any::Close => op_close,
+ msg::Any::Shutdown => op_shutdown,
msg::Any::Remove => op_remove,
msg::Any::ReadFile => op_read_file,
msg::Any::ReadDir => op_read_dir,
@@ -614,6 +616,35 @@ fn op_close(
}
}
+fn op_shutdown(
+ _state: Arc<IsolateState>,
+ base: &msg::Base,
+ data: &'static mut [u8],
+) -> Box<Op> {
+ assert_eq!(data.len(), 0);
+ let inner = base.inner_as_shutdown().unwrap();
+ let rid = inner.rid();
+ let how = inner.how();
+ match resources::lookup(rid) {
+ None => odd_future(errors::new(
+ errors::ErrorKind::BadFileDescriptor,
+ String::from("Bad File Descriptor"),
+ )),
+ Some(mut resource) => {
+ let shutdown_mode = match how {
+ 0 => Shutdown::Read,
+ 1 => Shutdown::Write,
+ _ => unimplemented!(),
+ };
+ blocking!(base.sync(), || {
+ // Use UFCS for disambiguation
+ Resource::shutdown(&mut resource, shutdown_mode)?;
+ Ok(empty_buf())
+ })
+ }
+ }
+}
+
fn op_read(
_state: Arc<IsolateState>,
base: &msg::Base,
diff --git a/src/resources.rs b/src/resources.rs
index 75bad04b7..5a13e6cbf 100644
--- a/src/resources.rs
+++ b/src/resources.rs
@@ -8,13 +8,15 @@
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.
+use errors::DenoError;
+
use futures;
use futures::Poll;
use std;
use std::collections::HashMap;
use std::io::Error;
use std::io::{Read, Write};
-use std::net::SocketAddr;
+use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
@@ -79,6 +81,20 @@ impl Resource {
let r = table.remove(&self.rid);
assert!(r.is_some());
}
+
+ pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let maybe_repr = table.get_mut(&self.rid);
+ match maybe_repr {
+ None => panic!("bad rid"),
+ Some(repr) => match repr {
+ Repr::TcpStream(ref mut f) => {
+ TcpStream::shutdown(f, how).map_err(|err| DenoError::from(err))
+ }
+ _ => panic!("Cannot shutdown"),
+ },
+ }
+ }
}
impl Read for Resource {