summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/bench/http/deno_http_flash_ops.js4
-rw-r--r--cli/tests/unit/flash_test.ts67
-rw-r--r--ext/flash/01_http.js212
-rw-r--r--ext/flash/lib.rs129
-rw-r--r--ext/flash/request.rs49
5 files changed, 271 insertions, 190 deletions
diff --git a/cli/bench/http/deno_http_flash_ops.js b/cli/bench/http/deno_http_flash_ops.js
index 1b833e7f7..40ca25ff1 100644
--- a/cli/bench/http/deno_http_flash_ops.js
+++ b/cli/bench/http/deno_http_flash_ops.js
@@ -25,13 +25,15 @@ function respond(token, response) {
const response = encode(
"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World",
);
+let offset = 0;
while (true) {
let token = nextRequest();
if (token === 0) token = await opAsync("op_flash_next_async", serverId);
- for (let i = 0; i < token; i++) {
+ for (let i = offset; i < offset + token; i++) {
respond(
i,
response,
);
}
+ offset += token;
}
diff --git a/cli/tests/unit/flash_test.ts b/cli/tests/unit/flash_test.ts
index 57138b14f..51534c79b 100644
--- a/cli/tests/unit/flash_test.ts
+++ b/cli/tests/unit/flash_test.ts
@@ -1848,6 +1848,73 @@ Deno.test(
},
);
+Deno.test(
+ { permissions: { net: true } },
+ async function httpServerConcurrentRequests() {
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+
+ let reqCount = -1;
+ let timerId: number | undefined;
+ const server = Deno.serve(async (req) => {
+ reqCount++;
+ if (reqCount === 0) {
+ const msg = new TextEncoder().encode("data: hello\r\n\r\n");
+ // SSE
+ const body = new ReadableStream({
+ start(controller) {
+ timerId = setInterval(() => {
+ controller.enqueue(msg);
+ }, 1000);
+ },
+ cancel() {
+ if (typeof timerId === "number") {
+ clearInterval(timerId);
+ }
+ },
+ });
+ return new Response(body, {
+ headers: {
+ "Content-Type": "text/event-stream",
+ },
+ });
+ }
+
+ return new Response(`hello ${reqCount}`);
+ }, {
+ port: 4503,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: createOnErrorCb(ac),
+ });
+
+ const sseRequest = await fetch(`http://localhost:4503/`);
+
+ const decoder = new TextDecoder();
+ const stream = sseRequest.body!.getReader();
+ {
+ const { done, value } = await stream.read();
+ assert(!done);
+ assertEquals(decoder.decode(value), "data: hello\r\n\r\n");
+ }
+
+ const helloRequest = await fetch(`http://localhost:4503/`);
+ assertEquals(helloRequest.status, 200);
+ assertEquals(await helloRequest.text(), "hello 1");
+
+ {
+ const { done, value } = await stream.read();
+ assert(!done);
+ assertEquals(decoder.decode(value), "data: hello\r\n\r\n");
+ }
+
+ await stream.cancel();
+ clearInterval(timerId);
+ ac.abort();
+ await server;
+ },
+);
+
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index fd817219e..19920da58 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -237,20 +237,21 @@
await server.finished;
},
async serve() {
+ let offset = 0;
while (true) {
if (server.closed) {
break;
}
- let token = nextRequestSync();
- if (token === 0) {
- token = await core.opAsync("op_flash_next_async", serverId);
+ let tokens = nextRequestSync();
+ if (tokens === 0) {
+ tokens = await core.opAsync("op_flash_next_async", serverId);
if (server.closed) {
break;
}
}
- for (let i = 0; i < token; i++) {
+ for (let i = offset; i < offset + tokens; i++) {
let body = null;
// There might be a body, but we don't expose it for GET/HEAD requests.
// It will be closed automatically once the request has been handled and
@@ -290,17 +291,6 @@
if (resp === undefined) {
continue;
}
-
- const ws = resp[_ws];
- if (!ws) {
- if (hasBody && body[_state] !== "closed") {
- // TODO(@littledivy): Optimize by draining in a single op.
- try {
- await req.arrayBuffer();
- } catch { /* pass */ }
- }
- }
-
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
@@ -360,74 +350,8 @@
respBody = new Uint8Array(0);
}
- if (isStreamingResponseBody === true) {
- const resourceRid = getReadableStreamRid(respBody);
- if (resourceRid) {
- if (respBody.locked) {
- throw new TypeError("ReadableStream is locked.");
- }
- const reader = respBody.getReader(); // Aquire JS lock.
- try {
- core.opAsync(
- "op_flash_write_resource",
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- true,
- ),
- serverId,
- i,
- resourceRid,
- ).then(() => {
- // Release JS lock.
- readableStreamClose(respBody);
- });
- } catch (error) {
- await reader.cancel(error);
- throw error;
- }
- } else {
- const reader = respBody.getReader();
- let first = true;
- a:
- while (true) {
- const { value, done } = await reader.read();
- if (first) {
- first = false;
- core.ops.op_flash_respond(
- serverId,
- i,
- http1Response(
- method,
- innerResp.status ?? 200,
- innerResp.headerList,
- null,
- ),
- value ?? new Uint8Array(),
- false,
- );
- } else {
- if (value === undefined) {
- core.ops.op_flash_respond_chuncked(
- serverId,
- i,
- undefined,
- done,
- );
- } else {
- respondChunked(
- i,
- value,
- done,
- );
- }
- }
- if (done) break a;
- }
- }
- } else {
+ const ws = resp[_ws];
+ if (isStreamingResponseBody === false) {
const responseStr = http1Response(
method,
innerResp.status ?? 200,
@@ -456,29 +380,111 @@
}
}
- if (ws) {
- const wsRid = await core.opAsync(
- "op_flash_upgrade_websocket",
- serverId,
- i,
- );
- ws[_rid] = wsRid;
- ws[_protocol] = resp.headers.get("sec-websocket-protocol");
-
- ws[_readyState] = WebSocket.OPEN;
- const event = new Event("open");
- ws.dispatchEvent(event);
-
- ws[_eventLoop]();
- if (ws[_idleTimeoutDuration]) {
- ws.addEventListener(
- "close",
- () => clearTimeout(ws[_idleTimeoutTimeout]),
+ (async () => {
+ if (!ws) {
+ if (hasBody && body[_state] !== "closed") {
+ // TODO(@littledivy): Optimize by draining in a single op.
+ try {
+ await req.arrayBuffer();
+ } catch { /* pass */ }
+ }
+ }
+
+ if (isStreamingResponseBody === true) {
+ const resourceRid = getReadableStreamRid(respBody);
+ if (resourceRid) {
+ if (respBody.locked) {
+ throw new TypeError("ReadableStream is locked.");
+ }
+ const reader = respBody.getReader(); // Aquire JS lock.
+ try {
+ core.opAsync(
+ "op_flash_write_resource",
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ true,
+ ),
+ serverId,
+ i,
+ resourceRid,
+ ).then(() => {
+ // Release JS lock.
+ readableStreamClose(respBody);
+ });
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
+ } else {
+ const reader = respBody.getReader();
+ let first = true;
+ a:
+ while (true) {
+ const { value, done } = await reader.read();
+ if (first) {
+ first = false;
+ core.ops.op_flash_respond(
+ serverId,
+ i,
+ http1Response(
+ method,
+ innerResp.status ?? 200,
+ innerResp.headerList,
+ null,
+ ),
+ value ?? new Uint8Array(),
+ false,
+ );
+ } else {
+ if (value === undefined) {
+ core.ops.op_flash_respond_chuncked(
+ serverId,
+ i,
+ undefined,
+ done,
+ );
+ } else {
+ respondChunked(
+ i,
+ value,
+ done,
+ );
+ }
+ }
+ if (done) break a;
+ }
+ }
+ }
+
+ if (ws) {
+ const wsRid = await core.opAsync(
+ "op_flash_upgrade_websocket",
+ serverId,
+ i,
);
+ ws[_rid] = wsRid;
+ ws[_protocol] = resp.headers.get("sec-websocket-protocol");
+
+ ws[_readyState] = WebSocket.OPEN;
+ const event = new Event("open");
+ ws.dispatchEvent(event);
+
+ ws[_eventLoop]();
+ if (ws[_idleTimeoutDuration]) {
+ ws.addEventListener(
+ "close",
+ () => clearTimeout(ws[_idleTimeoutTimeout]),
+ );
+ }
+ ws[_serverHandleIdleTimeout]();
}
- ws[_serverHandleIdleTimeout]();
- }
+ })().catch(onError);
}
+
+ offset += tokens;
}
await server.finished;
},
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index 2c0cc548a..8f3cb341a 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -25,7 +25,6 @@ use http::header::CONNECTION;
use http::header::CONTENT_LENGTH;
use http::header::EXPECT;
use http::header::TRANSFER_ENCODING;
-use http::header::UPGRADE;
use http::HeaderValue;
use log::trace;
use mio::net::TcpListener;
@@ -58,10 +57,13 @@ use tokio::sync::mpsc;
use tokio::task::JoinHandle;
mod chunked;
-
+mod request;
#[cfg(unix)]
mod sendfile;
+use request::InnerRequest;
+use request::Request;
+
pub struct FlashContext {
next_server_id: u32,
join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>,
@@ -70,22 +72,15 @@ pub struct FlashContext {
pub struct ServerContext {
_addr: SocketAddr,
- tx: mpsc::Sender<NextRequest>,
- rx: mpsc::Receiver<NextRequest>,
- response: HashMap<u32, NextRequest>,
+ tx: mpsc::Sender<Request>,
+ rx: mpsc::Receiver<Request>,
+ requests: HashMap<u32, Request>,
+ next_token: u32,
listening_rx: Option<mpsc::Receiver<()>>,
close_tx: mpsc::Sender<()>,
cancel_handle: Rc<CancelHandle>,
}
-struct InnerRequest {
- _headers: Vec<httparse::Header<'static>>,
- req: httparse::Request<'static, 'static>,
- body_offset: usize,
- body_len: usize,
- buffer: Pin<Box<[u8]>>,
-}
-
#[derive(Debug, PartialEq)]
enum ParseStatus {
None,
@@ -99,7 +94,7 @@ enum InnerStream {
Tls(Box<TlsTcpStream>),
}
-struct Stream {
+pub struct Stream {
inner: InnerStream,
detached: bool,
read_rx: Option<mpsc::Receiver<()>>,
@@ -147,38 +142,6 @@ impl Read for Stream {
}
}
-struct NextRequest {
- // Pointer to stream owned by the server loop thread.
- //
- // Why not Arc<Mutex<Stream>>? Performance. The stream
- // is never written to by the server loop thread.
- //
- // Dereferencing is safe until server thread finishes and
- // op_flash_serve resolves or websocket upgrade is performed.
- socket: *mut Stream,
- inner: InnerRequest,
- keep_alive: bool,
- #[allow(dead_code)]
- upgrade: bool,
- content_read: usize,
- content_length: Option<u64>,
- remaining_chunk_size: Option<usize>,
- te_chunked: bool,
- expect_continue: bool,
-}
-
-// SAFETY: Sent from server thread to JS thread.
-// See comment above for `socket`.
-unsafe impl Send for NextRequest {}
-
-impl NextRequest {
- #[inline(always)]
- pub fn socket<'a>(&self) -> &'a mut Stream {
- // SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
- unsafe { &mut *self.socket }
- }
-}
-
#[op]
fn op_flash_respond(
op_state: &mut OpState,
@@ -194,13 +157,13 @@ fn op_flash_respond(
let mut close = false;
let sock = match shutdown {
true => {
- let tx = ctx.response.remove(&token).unwrap();
+ let tx = ctx.requests.remove(&token).unwrap();
close = !tx.keep_alive;
tx.socket()
}
// In case of a websocket upgrade or streaming response.
false => {
- let tx = ctx.response.get(&token).unwrap();
+ let tx = ctx.requests.get(&token).unwrap();
tx.socket()
}
};
@@ -263,7 +226,7 @@ async fn op_flash_write_resource(
let op_state = &mut op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- ctx.response.remove(&token).unwrap().socket()
+ ctx.requests.remove(&token).unwrap().socket()
};
drop(op_state);
@@ -344,13 +307,13 @@ fn flash_respond(
let mut close = false;
let sock = match shutdown {
true => {
- let tx = ctx.response.remove(&token).unwrap();
+ let tx = ctx.requests.remove(&token).unwrap();
close = !tx.keep_alive;
tx.socket()
}
// In case of a websocket upgrade or streaming response.
false => {
- let tx = ctx.response.get(&token).unwrap();
+ let tx = ctx.requests.get(&token).unwrap();
tx.socket()
}
};
@@ -438,12 +401,12 @@ fn respond_chunked(
) {
let sock = match shutdown {
true => {
- let tx = ctx.response.remove(&token).unwrap();
+ let tx = ctx.requests.remove(&token).unwrap();
tx.socket()
}
// In case of a websocket upgrade or streaming response.
false => {
- let tx = ctx.response.get(&token).unwrap();
+ let tx = ctx.requests.get(&token).unwrap();
tx.socket()
}
};
@@ -469,7 +432,7 @@ macro_rules! get_request {
($op_state: ident, $server_id: expr, $token: ident) => {{
let flash_ctx = $op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap();
- ctx.response.get_mut(&$token).unwrap()
+ ctx.requests.get_mut(&$token).unwrap()
}};
}
@@ -487,8 +450,8 @@ pub enum Method {
}
#[inline]
-fn get_method(req: &mut NextRequest) -> u32 {
- let method = match req.inner.req.method.unwrap() {
+fn get_method(req: &mut Request) -> u32 {
+ let method = match req.method() {
"GET" => Method::GET,
"POST" => Method::POST,
"PUT" => Method::PUT,
@@ -531,7 +494,7 @@ fn op_flash_path(
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx
- .response
+ .requests
.get(&token)
.unwrap()
.inner
@@ -543,12 +506,14 @@ fn op_flash_path(
#[inline]
fn next_request_sync(ctx: &mut ServerContext) -> u32 {
- let mut tokens = 0;
+ let offset = ctx.next_token;
+
while let Ok(token) = ctx.rx.try_recv() {
- ctx.response.insert(tokens, token);
- tokens += 1;
+ ctx.requests.insert(ctx.next_token, token);
+ ctx.next_token += 1;
}
- tokens
+
+ ctx.next_token - offset
}
pub struct NextRequestFast;
@@ -597,7 +562,7 @@ unsafe fn op_flash_get_method_fast(
let ptr =
recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
let ctx = &mut *(ptr as *mut ServerContext);
- let req = ctx.response.get_mut(&token).unwrap();
+ let req = ctx.requests.get_mut(&token).unwrap();
get_method(req)
}
@@ -651,7 +616,7 @@ fn op_flash_make_request<'scope>(
// SAFETY: This external is guaranteed to be a pointer to a ServerContext
let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
let token = args.get(0).uint32_value(scope).unwrap();
- let req = ctx.response.get_mut(&token).unwrap();
+ let req = ctx.requests.get_mut(&token).unwrap();
rv.set_uint32(get_method(req));
},
)
@@ -747,7 +712,7 @@ fn op_flash_make_request<'scope>(
}
#[inline]
-fn has_body_stream(req: &NextRequest) -> bool {
+fn has_body_stream(req: &Request) -> bool {
let sock = req.socket();
sock.read_rx.is_some()
}
@@ -775,7 +740,7 @@ fn op_flash_headers(
.get_mut(&server_id)
.ok_or_else(|| type_error("server closed"))?;
let inner_req = &ctx
- .response
+ .requests
.get(&token)
.ok_or_else(|| type_error("request closed"))?
.inner
@@ -876,7 +841,7 @@ async fn op_flash_read_body(
.as_mut()
.unwrap()
};
- let tx = ctx.response.get_mut(&token).unwrap();
+ let tx = ctx.requests.get_mut(&token).unwrap();
if tx.te_chunked {
let mut decoder =
@@ -974,7 +939,7 @@ pub struct ListenOpts {
}
fn run_server(
- tx: mpsc::Sender<NextRequest>,
+ tx: mpsc::Sender<Request>,
listening_tx: mpsc::Sender<()>,
mut close_rx: mpsc::Receiver<()>,
addr: SocketAddr,
@@ -1178,7 +1143,6 @@ fn run_server(
// https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177
// https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127
let mut keep_alive = inner_req.req.version.unwrap() == 1;
- let mut upgrade = false;
let mut expect_continue = false;
let mut te = false;
let mut te_chunked = false;
@@ -1198,9 +1162,6 @@ fn run_server(
keep_alive = connection_has(&value, "keep-alive");
}
}
- Ok(UPGRADE) => {
- upgrade = inner_req.req.version.unwrap() == 1;
- }
Ok(TRANSFER_ENCODING) => {
// https://tools.ietf.org/html/rfc7230#section-3.3.3
debug_assert!(inner_req.req.version.unwrap() == 1);
@@ -1250,12 +1211,11 @@ fn run_server(
continue 'events;
}
- tx.blocking_send(NextRequest {
+ tx.blocking_send(Request {
socket: sock_ptr,
// SAFETY: headers backing buffer outlives the mio event loop ('static)
inner: inner_req,
keep_alive,
- upgrade,
te_chunked,
remaining_chunk_size: None,
content_read: 0,
@@ -1295,7 +1255,8 @@ where
_addr: addr,
tx,
rx,
- response: HashMap::with_capacity(1000),
+ requests: HashMap::with_capacity(1000),
+ next_token: 0,
close_tx,
listening_rx: Some(listening_rx),
cancel_handle: CancelHandle::new_rc(),
@@ -1371,18 +1332,14 @@ async fn op_flash_next_async(
// is responsible for ensuring this is not called concurrently.
let ctx = unsafe { &mut *ctx };
let cancel_handle = &ctx.cancel_handle;
- let mut tokens = 0;
- while let Ok(token) = ctx.rx.try_recv() {
- ctx.response.insert(tokens, token);
- tokens += 1;
- }
- if tokens == 0 {
- if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
- ctx.response.insert(tokens, req);
- tokens += 1;
- }
+
+ if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
+ ctx.requests.insert(ctx.next_token, req);
+ ctx.next_token += 1;
+ return 1;
}
- tokens
+
+ 0
}
// Syncrhonous version of op_flash_next_async. Under heavy load,
@@ -1455,7 +1412,7 @@ pub fn detach_socket(
// * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
// use raw fds.
let tx = ctx
- .response
+ .requests
.remove(&token)
.ok_or_else(|| type_error("request closed"))?;
let stream = tx.socket();
diff --git a/ext/flash/request.rs b/ext/flash/request.rs
new file mode 100644
index 000000000..0736b5620
--- /dev/null
+++ b/ext/flash/request.rs
@@ -0,0 +1,49 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use crate::Stream;
+use std::pin::Pin;
+
+#[derive(Debug)]
+pub struct InnerRequest {
+ /// Backing buffer for the request.
+ pub buffer: Pin<Box<[u8]>>,
+ /// Owned headers, we have to keep it around since its referenced in `req`.
+ pub _headers: Vec<httparse::Header<'static>>,
+ /// Fully parsed request.
+ pub req: httparse::Request<'static, 'static>,
+ pub body_offset: usize,
+ pub body_len: usize,
+}
+
+#[derive(Debug)]
+pub struct Request {
+ pub inner: InnerRequest,
+ // Pointer to stream owned by the server loop thread.
+ //
+ // Dereferencing is safe until server thread finishes and
+ // op_flash_serve resolves or websocket upgrade is performed.
+ pub socket: *mut Stream,
+ pub keep_alive: bool,
+ pub content_read: usize,
+ pub content_length: Option<u64>,
+ pub remaining_chunk_size: Option<usize>,
+ pub te_chunked: bool,
+ pub expect_continue: bool,
+}
+
+// SAFETY: Sent from server thread to JS thread.
+// See comment above for `socket`.
+unsafe impl Send for Request {}
+
+impl Request {
+ #[inline(always)]
+ pub fn socket<'a>(&self) -> &'a mut Stream {
+ // SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
+ unsafe { &mut *self.socket }
+ }
+
+ #[inline(always)]
+ pub fn method(&self) -> &str {
+ self.inner.req.method.unwrap()
+ }
+}