summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/http_test.ts47
-rw-r--r--cli/tests/unit/net_test.ts37
-rw-r--r--core/lib.rs1
-rw-r--r--core/modules.rs3
-rw-r--r--core/ops.rs66
-rw-r--r--core/ops_json.rs7
-rw-r--r--core/runtime.rs13
-rw-r--r--ext/net/01_net.js6
-rw-r--r--ext/web/13_message_port.js15
9 files changed, 121 insertions, 74 deletions
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index 0f23a2bb5..080b94a1d 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -892,15 +892,13 @@ unitTest(
respondPromise,
]);
+ httpConn.close();
listener.close();
assert(errors.length >= 1);
for (const error of errors) {
assertEquals(error.name, "Http");
- assertEquals(
- error.message,
- "connection closed before message completed",
- );
+ assert(error.message.includes("connection"));
}
},
);
@@ -975,44 +973,29 @@ unitTest(
unitTest(
{ permissions: { net: true } },
async function droppedConnSenderNoPanic() {
- async function server(listener: Deno.Listener) {
+ async function server() {
+ const listener = Deno.listen({ port: 8000 });
const conn = await listener.accept();
const http = Deno.serveHttp(conn);
-
- for (;;) {
- const req = await http.nextRequest();
- if (req == null) break;
-
- nextloop()
- .then(() => {
- http.close();
- return req.respondWith(new Response("boom"));
- })
- .catch(() => {});
- }
-
+ const evt = await http.nextRequest();
+ http.close();
try {
- http.close();
+ await evt!.respondWith(new Response("boom"));
} catch {
- "nop";
+ // Ignore error.
}
-
listener.close();
}
async function client() {
- const resp = await fetch("http://127.0.0.1:8000/");
- await resp.body?.cancel();
- }
-
- function nextloop() {
- return new Promise((resolve) => setTimeout(resolve, 0));
+ try {
+ const resp = await fetch("http://127.0.0.1:8000/");
+ await resp.body?.cancel();
+ } catch {
+ // Ignore error
+ }
}
- async function main() {
- const listener = Deno.listen({ port: 8000 });
- await Promise.all([server(listener), client()]);
- }
- await main();
+ await Promise.all([server(), client()]);
},
);
diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts
index a1585ce6b..eabe26c84 100644
--- a/cli/tests/unit/net_test.ts
+++ b/cli/tests/unit/net_test.ts
@@ -117,10 +117,10 @@ unitTest(
const listener = Deno.listen({ port: 4501 });
const p = listener.accept();
listener.close();
+ // TODO(piscisaureus): the error type should be `Interrupted` here, which
+ // gets thrown, but then ext/net catches it and rethrows `BadResource`.
await assertRejects(
- async () => {
- await p;
- },
+ () => p,
Deno.errors.BadResource,
"Listener has been closed",
);
@@ -141,11 +141,8 @@ unitTest(
const p = listener.accept();
listener.close();
await assertRejects(
- async () => {
- await p;
- },
- Deno.errors.BadResource,
- "Listener has been closed",
+ () => p,
+ Deno.errors.Interrupted,
);
},
);
@@ -173,27 +170,29 @@ unitTest(
},
);
-// TODO(jsouto): Enable when tokio updates mio to v0.7!
unitTest(
- { ignore: true, permissions: { read: true, write: true } },
+ {
+ ignore: Deno.build.os === "windows",
+ permissions: { read: true, write: true },
+ },
async function netUnixConcurrentAccept() {
const filePath = await Deno.makeTempFile();
const listener = Deno.listen({ transport: "unix", path: filePath });
let acceptErrCount = 0;
const checkErr = (e: Error) => {
- if (e.message === "Listener has been closed") {
+ if (e instanceof Deno.errors.Interrupted) { // "operation canceled"
assertEquals(acceptErrCount, 1);
- } else if (e.message === "Another accept task is ongoing") {
+ } else if (e instanceof Deno.errors.Busy) { // "Listener already in use"
acceptErrCount++;
} else {
- throw new Error("Unexpected error message");
+ throw e;
}
};
const p = listener.accept().catch(checkErr);
const p1 = listener.accept().catch(checkErr);
await Promise.race([p, p1]);
listener.close();
- await [p, p1];
+ await Promise.all([p, p1]);
assertEquals(acceptErrCount, 1);
},
);
@@ -500,11 +499,7 @@ unitTest(
);
unitTest(
- {
- // FIXME(bartlomieju)
- ignore: true,
- permissions: { net: true },
- },
+ { permissions: { net: true } },
async function netListenAsyncIterator() {
const addr = { hostname: "127.0.0.1", port: 3500 };
const listener = Deno.listen(addr);
@@ -590,8 +585,8 @@ unitTest(
await conn.write(new Uint8Array([1, 2, 3]));
}
} catch (err) {
- assert(!!err);
- assert(err instanceof Deno.errors.BadResource);
+ assert(err);
+ assert(err instanceof Deno.errors.Interrupted);
}
}
diff --git a/core/lib.rs b/core/lib.rs
index ea7b322e2..c0419f8ab 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -68,6 +68,7 @@ pub use crate::normalize_path::normalize_path;
pub use crate::ops::serialize_op_result;
pub use crate::ops::Op;
pub use crate::ops::OpAsyncFuture;
+pub use crate::ops::OpCall;
pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpPayload;
diff --git a/core/modules.rs b/core/modules.rs
index 2af09057f..31e03196a 100644
--- a/core/modules.rs
+++ b/core/modules.rs
@@ -722,6 +722,7 @@ impl ModuleMap {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::ops::OpCall;
use crate::serialize_op_result;
use crate::JsRuntime;
use crate::Op;
@@ -1009,7 +1010,7 @@ mod tests {
let (control, _): (u8, ()) = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = (0, 1, serialize_op_result(Ok(43), state));
- Op::Async(Box::pin(futures::future::ready(resp)))
+ Op::Async(OpCall::ready(resp))
};
let mut runtime = JsRuntime::new(RuntimeOptions {
diff --git a/core/ops.rs b/core/ops.rs
index 80bb30eda..05b91f32f 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -6,6 +6,11 @@ use crate::gotham_state::GothamState;
use crate::ops_metrics::OpsTracker;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
+use futures::future::maybe_done;
+use futures::future::FusedFuture;
+use futures::future::MaybeDone;
+use futures::ready;
+use futures::task::noop_waker;
use futures::Future;
use indexmap::IndexMap;
use rusty_v8 as v8;
@@ -17,10 +22,67 @@ use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+
+/// Wrapper around a Future, which causes that Future to be polled immediately.
+/// (Background: ops are stored in a `FuturesUnordered` structure which polls
+/// them, but without the `OpCall` wrapper this doesn't happen until the next
+/// turn of the event loop, which is too late for certain ops.)
+pub struct OpCall<T>(MaybeDone<Pin<Box<dyn Future<Output = T>>>>);
+
+impl<T> OpCall<T> {
+ /// Wraps a future, and polls the inner future immediately.
+ /// This should be the default choice for ops.
+ pub fn eager(fut: impl Future<Output = T> + 'static) -> Self {
+ let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>;
+ let mut inner = maybe_done(boxed);
+ let waker = noop_waker();
+ let mut cx = Context::from_waker(&waker);
+ let mut pinned = Pin::new(&mut inner);
+ let _ = pinned.as_mut().poll(&mut cx);
+ Self(inner)
+ }
+
+ /// Wraps a future; the inner future is polled the usual way (lazily).
+ pub fn lazy(fut: impl Future<Output = T> + 'static) -> Self {
+ let boxed = Box::pin(fut) as Pin<Box<dyn Future<Output = T>>>;
+ let inner = maybe_done(boxed);
+ Self(inner)
+ }
+
+ /// Create a future by specifying its output. This is basically the same as
+ /// `async { value }` or `futures::future::ready(value)`.
+ pub fn ready(value: T) -> Self {
+ Self(MaybeDone::Done(value))
+ }
+}
+
+impl<T> Future for OpCall<T> {
+ type Output = T;
+
+ fn poll(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let inner = unsafe { &mut self.get_unchecked_mut().0 };
+ let mut pinned = Pin::new(inner);
+ ready!(pinned.as_mut().poll(cx));
+ Poll::Ready(pinned.as_mut().take_output().unwrap())
+ }
+}
+
+impl<F> FusedFuture for OpCall<F>
+where
+ F: Future,
+{
+ fn is_terminated(&self) -> bool {
+ self.0.is_terminated()
+ }
+}
pub type PromiseId = u64;
-pub type OpAsyncFuture =
- Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
+pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;
diff --git a/core/ops_json.rs b/core/ops_json.rs
index 0ca7e5ce4..dca9a9a77 100644
--- a/core/ops_json.rs
+++ b/core/ops_json.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::error::AnyError;
+use crate::ops::OpCall;
use crate::serialize_op_result;
use crate::Op;
use crate::OpFn;
@@ -35,7 +36,7 @@ pub fn void_op_async() -> Box<OpFn> {
let op_id = payload.op_id;
let pid = payload.promise_id;
let op_result = serialize_op_result(Ok(()), state);
- Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result))))
+ Op::Async(OpCall::ready((pid, op_id, op_result)))
})
}
@@ -127,7 +128,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
- Op::Async(Box::pin(fut))
+ Op::Async(OpCall::eager(fut))
})
}
@@ -159,7 +160,7 @@ where
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
- Op::AsyncUnref(Box::pin(fut))
+ Op::AsyncUnref(OpCall::eager(fut))
})
}
diff --git a/core/runtime.rs b/core/runtime.rs
index 1928ff31c..873dcd3f5 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -28,7 +28,6 @@ use futures::future::FutureExt;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
-use futures::Future;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -36,7 +35,6 @@ use std::convert::TryFrom;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
-use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
@@ -44,8 +42,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture =
- Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
+type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub enum Snapshot {
Static(&'static [u8]),
@@ -1613,6 +1610,7 @@ pub mod tests {
use crate::ZeroCopyBuf;
use futures::future::lazy;
use std::ops::FnOnce;
+ use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -1645,16 +1643,15 @@ pub mod tests {
Mode::Async => {
assert_eq!(control, 42);
let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
- Op::Async(Box::pin(futures::future::ready(resp)))
+ Op::Async(OpCall::ready(resp))
}
Mode::AsyncZeroCopy(has_buffer) => {
assert_eq!(buf.is_some(), has_buffer);
if let Some(buf) = buf {
assert_eq!(buf.len(), 1);
}
-
- let resp = serialize_op_result(Ok(43), rc_op_state);
- Op::Async(Box::pin(futures::future::ready((0, 1, resp))))
+ let resp = (0, 1, serialize_op_result(Ok(43), rc_op_state));
+ Op::Async(OpCall::ready(resp))
}
}
}
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index cc10a1c0a..39df5a9d4 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -3,7 +3,7 @@
((window) => {
const core = window.Deno.core;
- const { BadResource } = core;
+ const { BadResource, Interrupted } = core;
const {
PromiseResolve,
SymbolAsyncIterator,
@@ -124,7 +124,7 @@
try {
conn = await this.accept();
} catch (error) {
- if (error instanceof BadResource) {
+ if (error instanceof BadResource || error instanceof Interrupted) {
return { value: undefined, done: true };
}
throw error;
@@ -191,7 +191,7 @@
try {
yield await this.receive();
} catch (err) {
- if (err instanceof BadResource) {
+ if (err instanceof BadResource || err instanceof Interrupted) {
break;
}
throw err;
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index c02b373d6..1430d8327 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -10,6 +10,7 @@
((window) => {
const core = window.Deno.core;
+ const { Interrupted } = core;
const webidl = window.__bootstrap.webidl;
const { setEventTargetData } = window.__bootstrap.eventTarget;
const { defineEventHandler } = window.__bootstrap.event;
@@ -134,10 +135,16 @@
this[_enabled] = true;
while (true) {
if (this[_id] === null) break;
- const data = await core.opAsync(
- "op_message_port_recv_message",
- this[_id],
- );
+ let data;
+ try {
+ data = await core.opAsync(
+ "op_message_port_recv_message",
+ this[_id],
+ );
+ } catch (err) {
+ if (err instanceof Interrupted) break;
+ throw err;
+ }
if (data === null) break;
let message, transferables;
try {