diff options
Diffstat (limited to 'ext/flash/lib.rs')
-rw-r--r-- | ext/flash/lib.rs | 129 |
1 files changed, 43 insertions, 86 deletions
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index 2c0cc548a..8f3cb341a 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -25,7 +25,6 @@ use http::header::CONNECTION; use http::header::CONTENT_LENGTH; use http::header::EXPECT; use http::header::TRANSFER_ENCODING; -use http::header::UPGRADE; use http::HeaderValue; use log::trace; use mio::net::TcpListener; @@ -58,10 +57,13 @@ use tokio::sync::mpsc; use tokio::task::JoinHandle; mod chunked; - +mod request; #[cfg(unix)] mod sendfile; +use request::InnerRequest; +use request::Request; + pub struct FlashContext { next_server_id: u32, join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>, @@ -70,22 +72,15 @@ pub struct FlashContext { pub struct ServerContext { _addr: SocketAddr, - tx: mpsc::Sender<NextRequest>, - rx: mpsc::Receiver<NextRequest>, - response: HashMap<u32, NextRequest>, + tx: mpsc::Sender<Request>, + rx: mpsc::Receiver<Request>, + requests: HashMap<u32, Request>, + next_token: u32, listening_rx: Option<mpsc::Receiver<()>>, close_tx: mpsc::Sender<()>, cancel_handle: Rc<CancelHandle>, } -struct InnerRequest { - _headers: Vec<httparse::Header<'static>>, - req: httparse::Request<'static, 'static>, - body_offset: usize, - body_len: usize, - buffer: Pin<Box<[u8]>>, -} - #[derive(Debug, PartialEq)] enum ParseStatus { None, @@ -99,7 +94,7 @@ enum InnerStream { Tls(Box<TlsTcpStream>), } -struct Stream { +pub struct Stream { inner: InnerStream, detached: bool, read_rx: Option<mpsc::Receiver<()>>, @@ -147,38 +142,6 @@ impl Read for Stream { } } -struct NextRequest { - // Pointer to stream owned by the server loop thread. - // - // Why not Arc<Mutex<Stream>>? Performance. The stream - // is never written to by the server loop thread. - // - // Dereferencing is safe until server thread finishes and - // op_flash_serve resolves or websocket upgrade is performed. - socket: *mut Stream, - inner: InnerRequest, - keep_alive: bool, - #[allow(dead_code)] - upgrade: bool, - content_read: usize, - content_length: Option<u64>, - remaining_chunk_size: Option<usize>, - te_chunked: bool, - expect_continue: bool, -} - -// SAFETY: Sent from server thread to JS thread. -// See comment above for `socket`. -unsafe impl Send for NextRequest {} - -impl NextRequest { - #[inline(always)] - pub fn socket<'a>(&self) -> &'a mut Stream { - // SAFETY: Dereferencing is safe until server thread detaches socket or finishes. - unsafe { &mut *self.socket } - } -} - #[op] fn op_flash_respond( op_state: &mut OpState, @@ -194,13 +157,13 @@ fn op_flash_respond( let mut close = false; let sock = match shutdown { true => { - let tx = ctx.response.remove(&token).unwrap(); + let tx = ctx.requests.remove(&token).unwrap(); close = !tx.keep_alive; tx.socket() } // In case of a websocket upgrade or streaming response. false => { - let tx = ctx.response.get(&token).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); tx.socket() } }; @@ -263,7 +226,7 @@ async fn op_flash_write_resource( let op_state = &mut op_state.borrow_mut(); let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - ctx.response.remove(&token).unwrap().socket() + ctx.requests.remove(&token).unwrap().socket() }; drop(op_state); @@ -344,13 +307,13 @@ fn flash_respond( let mut close = false; let sock = match shutdown { true => { - let tx = ctx.response.remove(&token).unwrap(); + let tx = ctx.requests.remove(&token).unwrap(); close = !tx.keep_alive; tx.socket() } // In case of a websocket upgrade or streaming response. false => { - let tx = ctx.response.get(&token).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); tx.socket() } }; @@ -438,12 +401,12 @@ fn respond_chunked( ) { let sock = match shutdown { true => { - let tx = ctx.response.remove(&token).unwrap(); + let tx = ctx.requests.remove(&token).unwrap(); tx.socket() } // In case of a websocket upgrade or streaming response. false => { - let tx = ctx.response.get(&token).unwrap(); + let tx = ctx.requests.get(&token).unwrap(); tx.socket() } }; @@ -469,7 +432,7 @@ macro_rules! get_request { ($op_state: ident, $server_id: expr, $token: ident) => {{ let flash_ctx = $op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap(); - ctx.response.get_mut(&$token).unwrap() + ctx.requests.get_mut(&$token).unwrap() }}; } @@ -487,8 +450,8 @@ pub enum Method { } #[inline] -fn get_method(req: &mut NextRequest) -> u32 { - let method = match req.inner.req.method.unwrap() { +fn get_method(req: &mut Request) -> u32 { + let method = match req.method() { "GET" => Method::GET, "POST" => Method::POST, "PUT" => Method::PUT, @@ -531,7 +494,7 @@ fn op_flash_path( let flash_ctx = op_state.borrow_mut::<FlashContext>(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); ctx - .response + .requests .get(&token) .unwrap() .inner @@ -543,12 +506,14 @@ fn op_flash_path( #[inline] fn next_request_sync(ctx: &mut ServerContext) -> u32 { - let mut tokens = 0; + let offset = ctx.next_token; + while let Ok(token) = ctx.rx.try_recv() { - ctx.response.insert(tokens, token); - tokens += 1; + ctx.requests.insert(ctx.next_token, token); + ctx.next_token += 1; } - tokens + + ctx.next_token - offset } pub struct NextRequestFast; @@ -597,7 +562,7 @@ unsafe fn op_flash_get_method_fast( let ptr = recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); let ctx = &mut *(ptr as *mut ServerContext); - let req = ctx.response.get_mut(&token).unwrap(); + let req = ctx.requests.get_mut(&token).unwrap(); get_method(req) } @@ -651,7 +616,7 @@ fn op_flash_make_request<'scope>( // SAFETY: This external is guaranteed to be a pointer to a ServerContext let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; let token = args.get(0).uint32_value(scope).unwrap(); - let req = ctx.response.get_mut(&token).unwrap(); + let req = ctx.requests.get_mut(&token).unwrap(); rv.set_uint32(get_method(req)); }, ) @@ -747,7 +712,7 @@ fn op_flash_make_request<'scope>( } #[inline] -fn has_body_stream(req: &NextRequest) -> bool { +fn has_body_stream(req: &Request) -> bool { let sock = req.socket(); sock.read_rx.is_some() } @@ -775,7 +740,7 @@ fn op_flash_headers( .get_mut(&server_id) .ok_or_else(|| type_error("server closed"))?; let inner_req = &ctx - .response + .requests .get(&token) .ok_or_else(|| type_error("request closed"))? .inner @@ -876,7 +841,7 @@ async fn op_flash_read_body( .as_mut() .unwrap() }; - let tx = ctx.response.get_mut(&token).unwrap(); + let tx = ctx.requests.get_mut(&token).unwrap(); if tx.te_chunked { let mut decoder = @@ -974,7 +939,7 @@ pub struct ListenOpts { } fn run_server( - tx: mpsc::Sender<NextRequest>, + tx: mpsc::Sender<Request>, listening_tx: mpsc::Sender<()>, mut close_rx: mpsc::Receiver<()>, addr: SocketAddr, @@ -1178,7 +1143,6 @@ fn run_server( // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177 // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127 let mut keep_alive = inner_req.req.version.unwrap() == 1; - let mut upgrade = false; let mut expect_continue = false; let mut te = false; let mut te_chunked = false; @@ -1198,9 +1162,6 @@ fn run_server( keep_alive = connection_has(&value, "keep-alive"); } } - Ok(UPGRADE) => { - upgrade = inner_req.req.version.unwrap() == 1; - } Ok(TRANSFER_ENCODING) => { // https://tools.ietf.org/html/rfc7230#section-3.3.3 debug_assert!(inner_req.req.version.unwrap() == 1); @@ -1250,12 +1211,11 @@ fn run_server( continue 'events; } - tx.blocking_send(NextRequest { + tx.blocking_send(Request { socket: sock_ptr, // SAFETY: headers backing buffer outlives the mio event loop ('static) inner: inner_req, keep_alive, - upgrade, te_chunked, remaining_chunk_size: None, content_read: 0, @@ -1295,7 +1255,8 @@ where _addr: addr, tx, rx, - response: HashMap::with_capacity(1000), + requests: HashMap::with_capacity(1000), + next_token: 0, close_tx, listening_rx: Some(listening_rx), cancel_handle: CancelHandle::new_rc(), @@ -1371,18 +1332,14 @@ async fn op_flash_next_async( // is responsible for ensuring this is not called concurrently. let ctx = unsafe { &mut *ctx }; let cancel_handle = &ctx.cancel_handle; - let mut tokens = 0; - while let Ok(token) = ctx.rx.try_recv() { - ctx.response.insert(tokens, token); - tokens += 1; - } - if tokens == 0 { - if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { - ctx.response.insert(tokens, req); - tokens += 1; - } + + if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { + ctx.requests.insert(ctx.next_token, req); + ctx.next_token += 1; + return 1; } - tokens + + 0 } // Syncrhonous version of op_flash_next_async. Under heavy load, @@ -1455,7 +1412,7 @@ pub fn detach_socket( // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we // use raw fds. let tx = ctx - .response + .requests .remove(&token) .ok_or_else(|| type_error("request closed"))?; let stream = tx.socket(); |