summaryrefslogtreecommitdiff
path: root/ext/flash/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/flash/lib.rs')
-rw-r--r--ext/flash/lib.rs381
1 files changed, 125 insertions, 256 deletions
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index 8ed1baaad..201753bea 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -3,6 +3,8 @@
// False positive lint for explicit drops.
// https://github.com/rust-lang/rust-clippy/issues/6446
#![allow(clippy::await_holding_lock)]
+// https://github.com/rust-lang/rust-clippy/issues/6353
+#![allow(clippy::await_holding_refcell_ref)]
use deno_core::error::generic_error;
use deno_core::error::type_error;
@@ -29,7 +31,6 @@ use http::header::TRANSFER_ENCODING;
use http::HeaderValue;
use log::trace;
use mio::net::TcpListener;
-use mio::net::TcpStream;
use mio::Events;
use mio::Interest;
use mio::Poll;
@@ -45,7 +46,6 @@ use std::intrinsics::transmute;
use std::io::BufReader;
use std::io::Read;
use std::io::Write;
-use std::marker::PhantomPinned;
use std::mem::replace;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
@@ -62,9 +62,12 @@ mod chunked;
mod request;
#[cfg(unix)]
mod sendfile;
+mod socket;
use request::InnerRequest;
use request::Request;
+use socket::InnerStream;
+use socket::Stream;
pub struct FlashContext {
next_server_id: u32,
@@ -84,83 +87,82 @@ pub struct ServerContext {
}
#[derive(Debug, PartialEq)]
-enum ParseStatus {
+pub enum ParseStatus {
None,
Ongoing(usize),
}
-type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
-
-enum InnerStream {
- Tcp(TcpStream),
- Tls(Box<TlsTcpStream>),
-}
-
-pub struct Stream {
- inner: InnerStream,
- detached: bool,
- read_rx: Option<mpsc::Receiver<()>>,
- read_tx: Option<mpsc::Sender<()>>,
- parse_done: ParseStatus,
- buffer: UnsafeCell<Vec<u8>>,
- read_lock: Arc<Mutex<()>>,
- _pin: PhantomPinned,
+#[op]
+fn op_flash_respond(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: StringOrBuffer,
+ shutdown: bool,
+) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ flash_respond(ctx, token, shutdown, &response)
}
-impl Stream {
- pub fn detach_ownership(&mut self) {
- self.detached = true;
- }
+#[op]
+async fn op_flash_respond_async(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+ response: StringOrBuffer,
+ shutdown: bool,
+) -> Result<(), AnyError> {
+ trace!("op_flash_respond_async");
- fn reattach_ownership(&mut self) {
- self.detached = false;
- }
-}
+ let mut close = false;
+ let sock = {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
-impl Write for Stream {
- #[inline]
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- match self.inner {
- InnerStream::Tcp(ref mut stream) => stream.write(buf),
- InnerStream::Tls(ref mut stream) => stream.write(buf),
- }
- }
- #[inline]
- fn flush(&mut self) -> std::io::Result<()> {
- match self.inner {
- InnerStream::Tcp(ref mut stream) => stream.flush(),
- InnerStream::Tls(ref mut stream) => stream.flush(),
+ match shutdown {
+ true => {
+ let tx = ctx.requests.remove(&token).unwrap();
+ close = !tx.keep_alive;
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.requests.get(&token).unwrap();
+ tx.socket()
+ }
}
- }
-}
+ };
-impl Read for Stream {
- #[inline]
- fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
- match self.inner {
- InnerStream::Tcp(ref mut stream) => stream.read(buf),
- InnerStream::Tls(ref mut stream) => stream.read(buf),
- }
+ sock
+ .with_async_stream(|stream| {
+ Box::pin(async move {
+ Ok(tokio::io::AsyncWriteExt::write(stream, &response).await?)
+ })
+ })
+ .await?;
+ // server is done writing and request doesn't want to kept alive.
+ if shutdown && close {
+ sock.shutdown();
}
+ Ok(())
}
#[op]
-fn op_flash_respond(
- op_state: &mut OpState,
+async fn op_flash_respond_chuncked(
+ op_state: Rc<RefCell<OpState>>,
server_id: u32,
token: u32,
- response: StringOrBuffer,
- maybe_body: Option<ZeroCopyBuf>,
+ response: Option<ZeroCopyBuf>,
shutdown: bool,
-) {
+) -> Result<(), AnyError> {
+ let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
-
- let mut close = false;
let sock = match shutdown {
true => {
let tx = ctx.requests.remove(&token).unwrap();
- close = !tx.keep_alive;
tx.socket()
}
// In case of a websocket upgrade or streaming response.
@@ -170,49 +172,29 @@ fn op_flash_respond(
}
};
- sock.read_tx.take();
- sock.read_rx.take();
-
- let _ = sock.write(&response);
- if let Some(response) = maybe_body {
- let _ = sock.write(format!("{:x}", response.len()).as_bytes());
- let _ = sock.write(b"\r\n");
- let _ = sock.write(&response);
- let _ = sock.write(b"\r\n");
- }
+ drop(op_state);
+ sock
+ .with_async_stream(|stream| {
+ Box::pin(async move {
+ use tokio::io::AsyncWriteExt;
+ if let Some(response) = response {
+ stream
+ .write_all(format!("{:x}\r\n", response.len()).as_bytes())
+ .await?;
+ stream.write_all(&response).await?;
+ stream.write_all(b"\r\n").await?;
+ }
- // server is done writing and request doesn't want to kept alive.
- if shutdown && close {
- match &mut sock.inner {
- InnerStream::Tcp(stream) => {
- // Typically shutdown shouldn't fail.
- let _ = stream.shutdown(std::net::Shutdown::Both);
- }
- InnerStream::Tls(stream) => {
- let _ = stream.sock.shutdown(std::net::Shutdown::Both);
- }
- }
- }
-}
+ // The last chunk
+ if shutdown {
+ stream.write_all(b"0\r\n\r\n").await?;
+ }
-#[op]
-fn op_flash_respond_chuncked(
- op_state: &mut OpState,
- server_id: u32,
- token: u32,
- response: Option<ZeroCopyBuf>,
- shutdown: bool,
-) {
- let flash_ctx = op_state.borrow_mut::<FlashContext>();
- let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
- match response {
- Some(response) => {
- respond_chunked(ctx, token, shutdown, Some(&response));
- }
- None => {
- respond_chunked(ctx, token, shutdown, None);
- }
- }
+ Ok(())
+ })
+ })
+ .await?;
+ Ok(())
}
#[op]
@@ -258,24 +240,35 @@ async fn op_flash_write_resource(
}
}
- let _ = sock.write(b"Transfer-Encoding: chunked\r\n\r\n");
- loop {
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
- let _ = sock.write(b"0\r\n\r\n");
- break;
- }
- let response = &buf[..nread];
-
- let _ = sock.write(format!("{:x}", response.len()).as_bytes());
- let _ = sock.write(b"\r\n");
- let _ = sock.write(response);
- let _ = sock.write(b"\r\n");
- }
+ sock
+ .with_async_stream(|stream| {
+ Box::pin(async move {
+ use tokio::io::AsyncWriteExt;
+ stream
+ .write_all(b"Transfer-Encoding: chunked\r\n\r\n")
+ .await?;
+ loop {
+ let vec = vec![0u8; 64 * 1024]; // 64KB
+ let buf = ZeroCopyBuf::new_temp(vec);
+ let (nread, buf) = resource.clone().read_return(buf).await?;
+ if nread == 0 {
+ stream.write_all(b"0\r\n\r\n").await?;
+ break;
+ }
- resource.close();
+ let response = &buf[..nread];
+ // TODO(@littledivy): use vectored writes.
+ stream
+ .write_all(format!("{:x}\r\n", response.len()).as_bytes())
+ .await?;
+ stream.write_all(response).await?;
+ stream.write_all(b"\r\n").await?;
+ }
+ resource.close();
+ Ok(())
+ })
+ })
+ .await?;
Ok(())
}
@@ -296,7 +289,7 @@ impl fast_api::FastFunction for RespondFast {
}
fn return_type(&self) -> fast_api::CType {
- fast_api::CType::Void
+ fast_api::CType::Uint32
}
}
@@ -305,128 +298,43 @@ fn flash_respond(
token: u32,
shutdown: bool,
response: &[u8],
-) {
- let mut close = false;
- let sock = match shutdown {
- true => {
- let tx = ctx.requests.remove(&token).unwrap();
- close = !tx.keep_alive;
- tx.socket()
- }
- // In case of a websocket upgrade or streaming response.
- false => {
- let tx = ctx.requests.get(&token).unwrap();
- tx.socket()
- }
- };
+) -> u32 {
+ let tx = ctx.requests.get(&token).unwrap();
+ let sock = tx.socket();
sock.read_tx.take();
sock.read_rx.take();
- let _ = sock.write(response);
- // server is done writing and request doesn't want to kept alive.
- if shutdown && close {
- match &mut sock.inner {
- InnerStream::Tcp(stream) => {
- // Typically shutdown shouldn't fail.
- let _ = stream.shutdown(std::net::Shutdown::Both);
- }
- InnerStream::Tls(stream) => {
- let _ = stream.sock.shutdown(std::net::Shutdown::Both);
- }
- }
- }
-}
-
-unsafe fn op_flash_respond_fast(
- recv: v8::Local<v8::Object>,
- token: u32,
- response: *const fast_api::FastApiTypedArray<u8>,
- shutdown: bool,
-) {
- let ptr =
- recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
- let ctx = &mut *(ptr as *mut ServerContext);
-
- let response = &*response;
- if let Some(response) = response.get_storage_if_aligned() {
- flash_respond(ctx, token, shutdown, response);
- } else {
- todo!();
- }
-}
-
-pub struct RespondChunkedFast;
-
-impl fast_api::FastFunction for RespondChunkedFast {
- fn function(&self) -> *const c_void {
- op_flash_respond_chunked_fast as *const c_void
- }
+ let nwritten = sock.try_write(response);
- fn args(&self) -> &'static [fast_api::Type] {
- &[
- fast_api::Type::V8Value,
- fast_api::Type::Uint32,
- fast_api::Type::TypedArray(fast_api::CType::Uint8),
- fast_api::Type::Bool,
- ]
+ if shutdown && nwritten == response.len() {
+ if !tx.keep_alive {
+ sock.shutdown();
+ }
+ ctx.requests.remove(&token).unwrap();
}
- fn return_type(&self) -> fast_api::CType {
- fast_api::CType::Void
- }
+ nwritten as u32
}
-unsafe fn op_flash_respond_chunked_fast(
+unsafe fn op_flash_respond_fast(
recv: v8::Local<v8::Object>,
token: u32,
response: *const fast_api::FastApiTypedArray<u8>,
shutdown: bool,
-) {
+) -> u32 {
let ptr =
recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
let ctx = &mut *(ptr as *mut ServerContext);
let response = &*response;
if let Some(response) = response.get_storage_if_aligned() {
- respond_chunked(ctx, token, shutdown, Some(response));
+ flash_respond(ctx, token, shutdown, response)
} else {
todo!();
}
}
-fn respond_chunked(
- ctx: &mut ServerContext,
- token: u32,
- shutdown: bool,
- response: Option<&[u8]>,
-) {
- let sock = match shutdown {
- true => {
- let tx = ctx.requests.remove(&token).unwrap();
- tx.socket()
- }
- // In case of a websocket upgrade or streaming response.
- false => {
- let tx = ctx.requests.get(&token).unwrap();
- tx.socket()
- }
- };
-
- if let Some(response) = response {
- let _ = sock.write(format!("{:x}", response.len()).as_bytes());
- let _ = sock.write(b"\r\n");
- let _ = sock.write(response);
- let _ = sock.write(b"\r\n");
- }
-
- // The last chunk
- if shutdown {
- let _ = sock.write(b"0\r\n\r\n");
- }
- sock.reattach_ownership();
-}
-
macro_rules! get_request {
($op_state: ident, $token: ident) => {
get_request!($op_state, 0, $token)
@@ -631,51 +539,12 @@ fn op_flash_make_request<'scope>(
obj.set(scope, key.into(), func).unwrap();
}
- // respondChunked
- {
- let builder = v8::FunctionTemplate::builder(
- |scope: &mut v8::HandleScope,
- args: v8::FunctionCallbackArguments,
- _: v8::ReturnValue| {
- let external: v8::Local<v8::External> =
- args.data().unwrap().try_into().unwrap();
- // SAFETY: This external is guaranteed to be a pointer to a ServerContext
- let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
-
- let token = args.get(0).uint32_value(scope).unwrap();
-
- let response: v8::Local<v8::ArrayBufferView> =
- args.get(1).try_into().unwrap();
- let ab = response.buffer(scope).unwrap();
- let store = ab.get_backing_store();
- let (offset, len) = (response.byte_offset(), response.byte_length());
- // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
- // it points to a fixed continuous slice of bytes on the heap.
- // We assume it's initialized and thus safe to read (though may not contain meaningful data)
- let response = unsafe {
- &*(&store[offset..offset + len] as *const _ as *const [u8])
- };
-
- let shutdown = args.get(2).boolean_value(scope);
-
- respond_chunked(ctx, token, shutdown, Some(response));
- },
- )
- .data(v8::External::new(scope, ctx as *mut _).into());
-
- let func = builder.build_fast(scope, &RespondChunkedFast, None);
- let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
-
- let key = v8::String::new(scope, "respondChunked").unwrap();
- obj.set(scope, key.into(), func).unwrap();
- }
-
// respond
{
let builder = v8::FunctionTemplate::builder(
|scope: &mut v8::HandleScope,
args: v8::FunctionCallbackArguments,
- _: v8::ReturnValue| {
+ mut rv: v8::ReturnValue| {
let external: v8::Local<v8::External> =
args.data().unwrap().try_into().unwrap();
// SAFETY: This external is guaranteed to be a pointer to a ServerContext
@@ -697,7 +566,7 @@ fn op_flash_make_request<'scope>(
let shutdown = args.get(2).boolean_value(scope);
- flash_respond(ctx, token, shutdown, response);
+ rv.set_uint32(flash_respond(ctx, token, shutdown, response));
},
)
.data(v8::External::new(scope, ctx as *mut _).into());
@@ -1024,7 +893,6 @@ fn run_server(
read_lock: Arc::new(Mutex::new(())),
parse_done: ParseStatus::None,
buffer: UnsafeCell::new(vec![0_u8; 1024]),
- _pin: PhantomPinned,
});
trace!("New connection: {}", token.0);
@@ -1521,6 +1389,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
.ops(vec![
op_flash_serve::decl::<P>(),
op_flash_respond::decl(),
+ op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
op_flash_method::decl(),
op_flash_path::decl(),