summaryrefslogtreecommitdiff
path: root/ext/flash/lib.rs
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2022-08-18 17:35:02 +0530
committerGitHub <noreply@github.com>2022-08-18 17:35:02 +0530
commitcd21cff29942f24ba7d38287186cce64d0e84e56 (patch)
treee663eff884526ee762ae9141a3cf5a0f6967a84e /ext/flash/lib.rs
parent0b0843e4a54d7c1ddf293ac1ccee2479b69a5ba9 (diff)
feat(ext/flash): An optimized http/1.1 server (#15405)
Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com> Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl> Co-authored-by: crowlkats <crowlkats@toaxl.com> Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'ext/flash/lib.rs')
-rw-r--r--ext/flash/lib.rs1567
1 files changed, 1567 insertions, 0 deletions
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
new file mode 100644
index 000000000..2c0cc548a
--- /dev/null
+++ b/ext/flash/lib.rs
@@ -0,0 +1,1567 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+// False positive lint for explicit drops.
+// https://github.com/rust-lang/rust-clippy/issues/6446
+#![allow(clippy::await_holding_lock)]
+
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op;
+use deno_core::serde_v8;
+use deno_core::v8;
+use deno_core::v8::fast_api;
+use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::Extension;
+use deno_core::OpState;
+use deno_core::StringOrBuffer;
+use deno_core::ZeroCopyBuf;
+use deno_core::V8_WRAPPER_OBJECT_INDEX;
+use deno_tls::load_certs;
+use deno_tls::load_private_keys;
+use http::header::HeaderName;
+use http::header::CONNECTION;
+use http::header::CONTENT_LENGTH;
+use http::header::EXPECT;
+use http::header::TRANSFER_ENCODING;
+use http::header::UPGRADE;
+use http::HeaderValue;
+use log::trace;
+use mio::net::TcpListener;
+use mio::net::TcpStream;
+use mio::Events;
+use mio::Interest;
+use mio::Poll;
+use mio::Token;
+use serde::Deserialize;
+use serde::Serialize;
+use std::cell::RefCell;
+use std::cell::UnsafeCell;
+use std::collections::HashMap;
+use std::ffi::c_void;
+use std::future::Future;
+use std::intrinsics::transmute;
+use std::io::BufReader;
+use std::io::Read;
+use std::io::Write;
+use std::marker::PhantomPinned;
+use std::mem::replace;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Mutex;
+use std::task::Context;
+use std::time::Duration;
+use tokio::sync::mpsc;
+use tokio::task::JoinHandle;
+
+mod chunked;
+
+#[cfg(unix)]
+mod sendfile;
+
+pub struct FlashContext {
+ next_server_id: u32,
+ join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>,
+ pub servers: HashMap<u32, ServerContext>,
+}
+
+pub struct ServerContext {
+ _addr: SocketAddr,
+ tx: mpsc::Sender<NextRequest>,
+ rx: mpsc::Receiver<NextRequest>,
+ response: HashMap<u32, NextRequest>,
+ listening_rx: Option<mpsc::Receiver<()>>,
+ close_tx: mpsc::Sender<()>,
+ cancel_handle: Rc<CancelHandle>,
+}
+
+struct InnerRequest {
+ _headers: Vec<httparse::Header<'static>>,
+ req: httparse::Request<'static, 'static>,
+ body_offset: usize,
+ body_len: usize,
+ buffer: Pin<Box<[u8]>>,
+}
+
+#[derive(Debug, PartialEq)]
+enum ParseStatus {
+ None,
+ Ongoing(usize),
+}
+
+type TlsTcpStream = rustls::StreamOwned<rustls::ServerConnection, TcpStream>;
+
+enum InnerStream {
+ Tcp(TcpStream),
+ Tls(Box<TlsTcpStream>),
+}
+
+struct Stream {
+ inner: InnerStream,
+ detached: bool,
+ read_rx: Option<mpsc::Receiver<()>>,
+ read_tx: Option<mpsc::Sender<()>>,
+ parse_done: ParseStatus,
+ buffer: UnsafeCell<Vec<u8>>,
+ read_lock: Arc<Mutex<()>>,
+ _pin: PhantomPinned,
+}
+
+impl Stream {
+ pub fn detach_ownership(&mut self) {
+ self.detached = true;
+ }
+
+ fn reattach_ownership(&mut self) {
+ self.detached = false;
+ }
+}
+
+impl Write for Stream {
+ #[inline]
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ match self.inner {
+ InnerStream::Tcp(ref mut stream) => stream.write(buf),
+ InnerStream::Tls(ref mut stream) => stream.write(buf),
+ }
+ }
+ #[inline]
+ fn flush(&mut self) -> std::io::Result<()> {
+ match self.inner {
+ InnerStream::Tcp(ref mut stream) => stream.flush(),
+ InnerStream::Tls(ref mut stream) => stream.flush(),
+ }
+ }
+}
+
+impl Read for Stream {
+ #[inline]
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ match self.inner {
+ InnerStream::Tcp(ref mut stream) => stream.read(buf),
+ InnerStream::Tls(ref mut stream) => stream.read(buf),
+ }
+ }
+}
+
+struct NextRequest {
+ // Pointer to stream owned by the server loop thread.
+ //
+ // Why not Arc<Mutex<Stream>>? Performance. The stream
+ // is never written to by the server loop thread.
+ //
+ // Dereferencing is safe until server thread finishes and
+ // op_flash_serve resolves or websocket upgrade is performed.
+ socket: *mut Stream,
+ inner: InnerRequest,
+ keep_alive: bool,
+ #[allow(dead_code)]
+ upgrade: bool,
+ content_read: usize,
+ content_length: Option<u64>,
+ remaining_chunk_size: Option<usize>,
+ te_chunked: bool,
+ expect_continue: bool,
+}
+
+// SAFETY: Sent from server thread to JS thread.
+// See comment above for `socket`.
+unsafe impl Send for NextRequest {}
+
+impl NextRequest {
+ #[inline(always)]
+ pub fn socket<'a>(&self) -> &'a mut Stream {
+ // SAFETY: Dereferencing is safe until server thread detaches socket or finishes.
+ unsafe { &mut *self.socket }
+ }
+}
+
+#[op]
+fn op_flash_respond(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: StringOrBuffer,
+ maybe_body: Option<ZeroCopyBuf>,
+ shutdown: bool,
+) {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+
+ let mut close = false;
+ let sock = match shutdown {
+ true => {
+ let tx = ctx.response.remove(&token).unwrap();
+ close = !tx.keep_alive;
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.response.get(&token).unwrap();
+ tx.socket()
+ }
+ };
+
+ sock.read_tx.take();
+ sock.read_rx.take();
+
+ let _ = sock.write(&response);
+ if let Some(response) = maybe_body {
+ let _ = sock.write(format!("{:x}", response.len()).as_bytes());
+ let _ = sock.write(b"\r\n");
+ let _ = sock.write(&response);
+ let _ = sock.write(b"\r\n");
+ }
+
+ // server is done writing and request doesn't want to kept alive.
+ if shutdown && close {
+ match &mut sock.inner {
+ InnerStream::Tcp(stream) => {
+ // Typically shutdown shouldn't fail.
+ let _ = stream.shutdown(std::net::Shutdown::Both);
+ }
+ InnerStream::Tls(stream) => {
+ let _ = stream.sock.shutdown(std::net::Shutdown::Both);
+ }
+ }
+ }
+}
+
+#[op]
+fn op_flash_respond_chuncked(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: Option<ZeroCopyBuf>,
+ shutdown: bool,
+) {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ match response {
+ Some(response) => {
+ respond_chunked(ctx, token, shutdown, Some(&response));
+ }
+ None => {
+ respond_chunked(ctx, token, shutdown, None);
+ }
+ }
+}
+
+#[op]
+async fn op_flash_write_resource(
+ op_state: Rc<RefCell<OpState>>,
+ response: StringOrBuffer,
+ server_id: u32,
+ token: u32,
+ resource_id: deno_core::ResourceId,
+) -> Result<(), AnyError> {
+ let resource = op_state.borrow_mut().resource_table.take_any(resource_id)?;
+ let sock = {
+ let op_state = &mut op_state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx.response.remove(&token).unwrap().socket()
+ };
+
+ drop(op_state);
+ let _ = sock.write(&response);
+
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+ if let InnerStream::Tcp(stream_handle) = &sock.inner {
+ let stream_handle = stream_handle.as_raw_fd();
+ if let Some(fd) = resource.clone().backing_fd() {
+ // SAFETY: all-zero byte-pattern is a valid value for libc::stat.
+ let mut stat: libc::stat = unsafe { std::mem::zeroed() };
+ // SAFETY: call to libc::fstat.
+ if unsafe { libc::fstat(fd, &mut stat) } >= 0 {
+ let _ = sock.write(
+ format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(),
+ );
+ let tx = sendfile::SendFile {
+ io: (fd, stream_handle),
+ written: 0,
+ };
+ tx.await?;
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ let _ = sock.write(b"Transfer-Encoding: chunked\r\n\r\n");
+ loop {
+ let vec = vec![0u8; 64 * 1024]; // 64KB
+ let buf = ZeroCopyBuf::new_temp(vec);
+ let (nread, buf) = resource.clone().read_return(buf).await?;
+ if nread == 0 {
+ let _ = sock.write(b"0\r\n\r\n");
+ break;
+ }
+ let response = &buf[..nread];
+
+ let _ = sock.write(format!("{:x}", response.len()).as_bytes());
+ let _ = sock.write(b"\r\n");
+ let _ = sock.write(response);
+ let _ = sock.write(b"\r\n");
+ }
+
+ resource.close();
+ Ok(())
+}
+
+pub struct RespondFast;
+
+impl fast_api::FastFunction for RespondFast {
+ fn function(&self) -> *const c_void {
+ op_flash_respond_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[
+ fast_api::Type::V8Value,
+ fast_api::Type::Uint32,
+ fast_api::Type::TypedArray(fast_api::CType::Uint8),
+ fast_api::Type::Bool,
+ ]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Void
+ }
+}
+
+fn flash_respond(
+ ctx: &mut ServerContext,
+ token: u32,
+ shutdown: bool,
+ response: &[u8],
+) {
+ let mut close = false;
+ let sock = match shutdown {
+ true => {
+ let tx = ctx.response.remove(&token).unwrap();
+ close = !tx.keep_alive;
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.response.get(&token).unwrap();
+ tx.socket()
+ }
+ };
+
+ sock.read_tx.take();
+ sock.read_rx.take();
+
+ let _ = sock.write(response);
+ // server is done writing and request doesn't want to kept alive.
+ if shutdown && close {
+ match &mut sock.inner {
+ InnerStream::Tcp(stream) => {
+ // Typically shutdown shouldn't fail.
+ let _ = stream.shutdown(std::net::Shutdown::Both);
+ }
+ InnerStream::Tls(stream) => {
+ let _ = stream.sock.shutdown(std::net::Shutdown::Both);
+ }
+ }
+ }
+}
+
+unsafe fn op_flash_respond_fast(
+ recv: v8::Local<v8::Object>,
+ token: u32,
+ response: *const fast_api::FastApiTypedArray<u8>,
+ shutdown: bool,
+) {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+
+ let response = &*response;
+ if let Some(response) = response.get_storage_if_aligned() {
+ flash_respond(ctx, token, shutdown, response);
+ } else {
+ todo!();
+ }
+}
+
+pub struct RespondChunkedFast;
+
+impl fast_api::FastFunction for RespondChunkedFast {
+ fn function(&self) -> *const c_void {
+ op_flash_respond_chunked_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[
+ fast_api::Type::V8Value,
+ fast_api::Type::Uint32,
+ fast_api::Type::TypedArray(fast_api::CType::Uint8),
+ fast_api::Type::Bool,
+ ]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Void
+ }
+}
+
+unsafe fn op_flash_respond_chunked_fast(
+ recv: v8::Local<v8::Object>,
+ token: u32,
+ response: *const fast_api::FastApiTypedArray<u8>,
+ shutdown: bool,
+) {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+
+ let response = &*response;
+ if let Some(response) = response.get_storage_if_aligned() {
+ respond_chunked(ctx, token, shutdown, Some(response));
+ } else {
+ todo!();
+ }
+}
+
+fn respond_chunked(
+ ctx: &mut ServerContext,
+ token: u32,
+ shutdown: bool,
+ response: Option<&[u8]>,
+) {
+ let sock = match shutdown {
+ true => {
+ let tx = ctx.response.remove(&token).unwrap();
+ tx.socket()
+ }
+ // In case of a websocket upgrade or streaming response.
+ false => {
+ let tx = ctx.response.get(&token).unwrap();
+ tx.socket()
+ }
+ };
+
+ if let Some(response) = response {
+ let _ = sock.write(format!("{:x}", response.len()).as_bytes());
+ let _ = sock.write(b"\r\n");
+ let _ = sock.write(response);
+ let _ = sock.write(b"\r\n");
+ }
+
+ // The last chunk
+ if shutdown {
+ let _ = sock.write(b"0\r\n\r\n");
+ }
+ sock.reattach_ownership();
+}
+
+macro_rules! get_request {
+ ($op_state: ident, $token: ident) => {
+ get_request!($op_state, 0, $token)
+ };
+ ($op_state: ident, $server_id: expr, $token: ident) => {{
+ let flash_ctx = $op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap();
+ ctx.response.get_mut(&$token).unwrap()
+ }};
+}
+
+#[repr(u32)]
+pub enum Method {
+ GET = 0,
+ HEAD,
+ CONNECT,
+ PUT,
+ DELETE,
+ OPTIONS,
+ TRACE,
+ POST,
+ PATCH,
+}
+
+#[inline]
+fn get_method(req: &mut NextRequest) -> u32 {
+ let method = match req.inner.req.method.unwrap() {
+ "GET" => Method::GET,
+ "POST" => Method::POST,
+ "PUT" => Method::PUT,
+ "DELETE" => Method::DELETE,
+ "OPTIONS" => Method::OPTIONS,
+ "HEAD" => Method::HEAD,
+ "PATCH" => Method::PATCH,
+ "TRACE" => Method::TRACE,
+ "CONNECT" => Method::CONNECT,
+ _ => Method::GET,
+ };
+ method as u32
+}
+
+#[op]
+fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
+ let req = get_request!(state, server_id, token);
+ get_method(req)
+}
+
+#[op]
+async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) {
+ let close_tx = {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx.cancel_handle.cancel();
+ ctx.close_tx.clone()
+ };
+ let _ = close_tx.send(()).await;
+}
+
+#[op]
+fn op_flash_path(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+) -> String {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx
+ .response
+ .get(&token)
+ .unwrap()
+ .inner
+ .req
+ .path
+ .unwrap()
+ .to_string()
+}
+
+#[inline]
+fn next_request_sync(ctx: &mut ServerContext) -> u32 {
+ let mut tokens = 0;
+ while let Ok(token) = ctx.rx.try_recv() {
+ ctx.response.insert(tokens, token);
+ tokens += 1;
+ }
+ tokens
+}
+
+pub struct NextRequestFast;
+
+impl fast_api::FastFunction for NextRequestFast {
+ fn function(&self) -> *const c_void {
+ op_flash_next_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[fast_api::Type::V8Value]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Uint32
+ }
+}
+
+unsafe fn op_flash_next_fast(recv: v8::Local<v8::Object>) -> u32 {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+ next_request_sync(ctx)
+}
+
+pub struct GetMethodFast;
+
+impl fast_api::FastFunction for GetMethodFast {
+ fn function(&self) -> *const c_void {
+ op_flash_get_method_fast as *const c_void
+ }
+
+ fn args(&self) -> &'static [fast_api::Type] {
+ &[fast_api::Type::V8Value, fast_api::Type::Uint32]
+ }
+
+ fn return_type(&self) -> fast_api::CType {
+ fast_api::CType::Uint32
+ }
+}
+
+unsafe fn op_flash_get_method_fast(
+ recv: v8::Local<v8::Object>,
+ token: u32,
+) -> u32 {
+ let ptr =
+ recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
+ let ctx = &mut *(ptr as *mut ServerContext);
+ let req = ctx.response.get_mut(&token).unwrap();
+ get_method(req)
+}
+
+// Fast calls
+#[op(v8)]
+fn op_flash_make_request<'scope>(
+ scope: &mut v8::HandleScope<'scope>,
+ state: &mut OpState,
+) -> serde_v8::Value<'scope> {
+ let object_template = v8::ObjectTemplate::new(scope);
+ assert!(object_template
+ .set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize));
+ let obj = object_template.new_instance(scope).unwrap();
+ let ctx = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&0).unwrap();
+ ctx as *mut ServerContext
+ };
+ obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
+
+ // nextRequest
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |_: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ mut rv: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+ rv.set_uint32(next_request_sync(ctx));
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &NextRequestFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "nextRequest").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ // getMethod
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ mut rv: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+ let token = args.get(0).uint32_value(scope).unwrap();
+ let req = ctx.response.get_mut(&token).unwrap();
+ rv.set_uint32(get_method(req));
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &GetMethodFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "getMethod").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ // respondChunked
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ _: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+
+ let token = args.get(0).uint32_value(scope).unwrap();
+
+ let response: v8::Local<v8::ArrayBufferView> =
+ args.get(1).try_into().unwrap();
+ let ab = response.buffer(scope).unwrap();
+ let store = ab.get_backing_store();
+ let (offset, len) = (response.byte_offset(), response.byte_length());
+ // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
+ // it points to a fixed continuous slice of bytes on the heap.
+ // We assume it's initialized and thus safe to read (though may not contain meaningful data)
+ let response = unsafe {
+ &*(&store[offset..offset + len] as *const _ as *const [u8])
+ };
+
+ let shutdown = args.get(2).boolean_value(scope);
+
+ respond_chunked(ctx, token, shutdown, Some(response));
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &RespondChunkedFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "respondChunked").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ // respond
+ {
+ let builder = v8::FunctionTemplate::builder(
+ |scope: &mut v8::HandleScope,
+ args: v8::FunctionCallbackArguments,
+ _: v8::ReturnValue| {
+ let external: v8::Local<v8::External> =
+ args.data().unwrap().try_into().unwrap();
+ // SAFETY: This external is guaranteed to be a pointer to a ServerContext
+ let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
+
+ let token = args.get(0).uint32_value(scope).unwrap();
+
+ let response: v8::Local<v8::ArrayBufferView> =
+ args.get(1).try_into().unwrap();
+ let ab = response.buffer(scope).unwrap();
+ let store = ab.get_backing_store();
+ let (offset, len) = (response.byte_offset(), response.byte_length());
+ // SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
+ // it points to a fixed continuous slice of bytes on the heap.
+ // We assume it's initialized and thus safe to read (though may not contain meaningful data)
+ let response = unsafe {
+ &*(&store[offset..offset + len] as *const _ as *const [u8])
+ };
+
+ let shutdown = args.get(2).boolean_value(scope);
+
+ flash_respond(ctx, token, shutdown, response);
+ },
+ )
+ .data(v8::External::new(scope, ctx as *mut _).into());
+
+ let func = builder.build_fast(scope, &RespondFast, None);
+ let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
+
+ let key = v8::String::new(scope, "respond").unwrap();
+ obj.set(scope, key.into(), func).unwrap();
+ }
+
+ let value: v8::Local<v8::Value> = obj.into();
+ value.into()
+}
+
+#[inline]
+fn has_body_stream(req: &NextRequest) -> bool {
+ let sock = req.socket();
+ sock.read_rx.is_some()
+}
+
+#[op]
+fn op_flash_has_body_stream(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+) -> bool {
+ let req = get_request!(op_state, server_id, token);
+ has_body_stream(req)
+}
+
+#[op]
+fn op_flash_headers(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
+ let mut op_state = state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx
+ .servers
+ .get_mut(&server_id)
+ .ok_or_else(|| type_error("server closed"))?;
+ let inner_req = &ctx
+ .response
+ .get(&token)
+ .ok_or_else(|| type_error("request closed"))?
+ .inner
+ .req;
+ Ok(
+ inner_req
+ .headers
+ .iter()
+ .map(|h| (h.name.as_bytes().into(), h.value.into()))
+ .collect(),
+ )
+}
+
+// Remember the first packet we read? It probably also has some body data. This op quickly copies it into
+// a buffer and sets up channels for streaming the rest.
+#[op]
+fn op_flash_first_packet(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+) -> Result<Option<ZeroCopyBuf>, AnyError> {
+ let tx = get_request!(op_state, server_id, token);
+ let sock = tx.socket();
+
+ if !tx.te_chunked && tx.content_length.is_none() {
+ return Ok(None);
+ }
+
+ if tx.expect_continue {
+ let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n");
+ tx.expect_continue = false;
+ }
+
+ let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len];
+ // Oh there is nothing here.
+ if buffer.is_empty() {
+ return Ok(Some(ZeroCopyBuf::empty()));
+ }
+
+ if tx.te_chunked {
+ let mut buf = vec![0; 1024];
+ let mut offset = 0;
+ let mut decoder = chunked::Decoder::new(
+ std::io::Cursor::new(buffer),
+ tx.remaining_chunk_size,
+ );
+
+ loop {
+ match decoder.read(&mut buf[offset..]) {
+ Ok(n) => {
+ tx.remaining_chunk_size = decoder.remaining_chunks_size;
+ offset += n;
+
+ if n == 0 {
+ tx.te_chunked = false;
+ buf.truncate(offset);
+ return Ok(Some(buf.into()));
+ }
+
+ if offset < buf.len()
+ && decoder.source.position() < buffer.len() as u64
+ {
+ continue;
+ }
+
+ buf.truncate(offset);
+ return Ok(Some(buf.into()));
+ }
+ Err(e) => {
+ return Err(type_error(format!("{}", e)));
+ }
+ }
+ }
+ }
+
+ tx.content_length
+ .ok_or_else(|| type_error("no content-length"))?;
+ tx.content_read += buffer.len();
+
+ Ok(Some(buffer.to_vec().into()))
+}
+
+#[op]
+async fn op_flash_read_body(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+ mut buf: ZeroCopyBuf,
+) -> usize {
+ // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
+ // is responsible for ensuring this is not called concurrently.
+ let ctx = unsafe {
+ {
+ let op_state = &mut state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext
+ }
+ .as_mut()
+ .unwrap()
+ };
+ let tx = ctx.response.get_mut(&token).unwrap();
+
+ if tx.te_chunked {
+ let mut decoder =
+ chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size);
+ loop {
+ let sock = tx.socket();
+
+ let _lock = sock.read_lock.lock().unwrap();
+ match decoder.read(&mut buf) {
+ Ok(n) => {
+ tx.remaining_chunk_size = decoder.remaining_chunks_size;
+ return n;
+ }
+ Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => {
+ panic!("chunked read error: {}", e);
+ }
+ Err(_) => {
+ drop(_lock);
+ sock.read_rx.as_mut().unwrap().recv().await.unwrap();
+ }
+ }
+ }
+ }
+
+ if let Some(content_length) = tx.content_length {
+ let sock = tx.socket();
+ let l = sock.read_lock.clone();
+
+ loop {
+ let _lock = l.lock().unwrap();
+ if tx.content_read >= content_length as usize {
+ return 0;
+ }
+ match sock.read(&mut buf) {
+ Ok(n) => {
+ tx.content_read += n;
+ return n;
+ }
+ _ => {
+ drop(_lock);
+ sock.read_rx.as_mut().unwrap().recv().await.unwrap();
+ }
+ }
+ }
+ }
+
+ 0
+}
+
+// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67
+#[inline]
+fn from_digits(bytes: &[u8]) -> Option<u64> {
+ // cannot use FromStr for u64, since it allows a signed prefix
+ let mut result = 0u64;
+ const RADIX: u64 = 10;
+ if bytes.is_empty() {
+ return None;
+ }
+ for &b in bytes {
+ // can't use char::to_digit, since we haven't verified these bytes
+ // are utf-8.
+ match b {
+ b'0'..=b'9' => {
+ result = result.checked_mul(RADIX)?;
+ result = result.checked_add((b - b'0') as u64)?;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+ Some(result)
+}
+
+#[inline]
+fn connection_has(value: &HeaderValue, needle: &str) -> bool {
+ if let Ok(s) = value.to_str() {
+ for val in s.split(',') {
+ if val.trim().eq_ignore_ascii_case(needle) {
+ return true;
+ }
+ }
+ }
+ false
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListenOpts {
+ cert: Option<String>,
+ key: Option<String>,
+ hostname: String,
+ port: u16,
+ use_tls: bool,
+}
+
+fn run_server(
+ tx: mpsc::Sender<NextRequest>,
+ listening_tx: mpsc::Sender<()>,
+ mut close_rx: mpsc::Receiver<()>,
+ addr: SocketAddr,
+ maybe_cert: Option<String>,
+ maybe_key: Option<String>,
+) -> Result<(), AnyError> {
+ let mut listener = TcpListener::bind(addr)?;
+ let mut poll = Poll::new()?;
+ let token = Token(0);
+ poll
+ .registry()
+ .register(&mut listener, token, Interest::READABLE)
+ .unwrap();
+
+ let tls_context: Option<Arc<rustls::ServerConfig>> = {
+ if let Some(cert) = maybe_cert {
+ let key = maybe_key.unwrap();
+ let certificate_chain: Vec<rustls::Certificate> =
+ load_certs(&mut BufReader::new(cert.as_bytes()))?;
+ let private_key = load_private_keys(key.as_bytes())?.remove(0);
+
+ let config = rustls::ServerConfig::builder()
+ .with_safe_defaults()
+ .with_no_client_auth()
+ .with_single_cert(certificate_chain, private_key)
+ .expect("invalid key or certificate");
+ Some(Arc::new(config))
+ } else {
+ None
+ }
+ };
+
+ listening_tx.blocking_send(()).unwrap();
+ let mut sockets = HashMap::with_capacity(1000);
+ let mut counter: usize = 1;
+ let mut events = Events::with_capacity(1024);
+ 'outer: loop {
+ let result = close_rx.try_recv();
+ if result.is_ok() {
+ break 'outer;
+ }
+ // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
+ // timeout here to handle close signal.
+ match poll.poll(&mut events, Some(Duration::from_millis(100))) {
+ Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
+ Err(e) => panic!("{}", e),
+ Ok(()) => (),
+ }
+ 'events: for event in &events {
+ if close_rx.try_recv().is_ok() {
+ break 'outer;
+ }
+ let token = event.token();
+ match token {
+ Token(0) => loop {
+ match listener.accept() {
+ Ok((mut socket, _)) => {
+ counter += 1;
+ let token = Token(counter);
+ poll
+ .registry()
+ .register(&mut socket, token, Interest::READABLE)
+ .unwrap();
+ let socket = match tls_context {
+ Some(ref tls_conf) => {
+ let connection =
+ rustls::ServerConnection::new(tls_conf.clone()).unwrap();
+ InnerStream::Tls(Box::new(rustls::StreamOwned::new(
+ connection, socket,
+ )))
+ }
+ None => InnerStream::Tcp(socket),
+ };
+ let stream = Box::pin(Stream {
+ inner: socket,
+ detached: false,
+ read_rx: None,
+ read_tx: None,
+ read_lock: Arc::new(Mutex::new(())),
+ parse_done: ParseStatus::None,
+ buffer: UnsafeCell::new(vec![0_u8; 1024]),
+ _pin: PhantomPinned,
+ });
+
+ trace!("New connection: {}", token.0);
+ sockets.insert(token, stream);
+ }
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
+ Err(_) => break,
+ }
+ },
+ token => {
+ let socket = sockets.get_mut(&token).unwrap();
+ // SAFETY: guarantee that we will never move the data out of the mutable reference.
+ let socket = unsafe {
+ let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket);
+ Pin::get_unchecked_mut(mut_ref)
+ };
+ let sock_ptr = socket as *mut _;
+
+ if socket.detached {
+ match &mut socket.inner {
+ InnerStream::Tcp(ref mut socket) => {
+ poll.registry().deregister(socket).unwrap();
+ }
+ InnerStream::Tls(_) => {
+ todo!("upgrade tls not implemented");
+ }
+ }
+
+ let boxed = sockets.remove(&token).unwrap();
+ std::mem::forget(boxed);
+ trace!("Socket detached: {}", token.0);
+ continue;
+ }
+
+ debug_assert!(event.is_readable());
+
+ trace!("Socket readable: {}", token.0);
+ if let Some(tx) = &socket.read_tx {
+ {
+ let _l = socket.read_lock.lock().unwrap();
+ }
+ trace!("Sending readiness notification: {}", token.0);
+ let _ = tx.blocking_send(());
+
+ continue;
+ }
+
+ let mut headers = vec![httparse::EMPTY_HEADER; 40];
+ let mut req = httparse::Request::new(&mut headers);
+ let body_offset;
+ let body_len;
+ loop {
+ // SAFETY: It is safe for the read buf to be mutable here.
+ let buffer = unsafe { &mut *socket.buffer.get() };
+ let offset = match socket.parse_done {
+ ParseStatus::None => 0,
+ ParseStatus::Ongoing(offset) => offset,
+ };
+ if offset >= buffer.len() {
+ buffer.resize(offset * 2, 0);
+ }
+ let nread = socket.read(&mut buffer[offset..]);
+
+ match nread {
+ Ok(0) => {
+ trace!("Socket closed: {}", token.0);
+ // FIXME: don't remove while JS is writing!
+ // sockets.remove(&token);
+ continue 'events;
+ }
+ Ok(read) => match req.parse(&buffer[..offset + read]) {
+ Ok(httparse::Status::Complete(n)) => {
+ body_offset = n;
+ body_len = offset + read;
+ socket.parse_done = ParseStatus::None;
+ break;
+ }
+ Ok(httparse::Status::Partial) => {
+ socket.parse_done = ParseStatus::Ongoing(offset + read);
+ continue;
+ }
+ Err(_) => {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+ },
+ Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ break 'events
+ }
+ Err(_) => break 'events,
+ }
+ }
+
+ debug_assert_eq!(socket.parse_done, ParseStatus::None);
+ if let Some(method) = &req.method {
+ if method == &"POST" || method == &"PUT" {
+ let (tx, rx) = mpsc::channel(100);
+ socket.read_tx = Some(tx);
+ socket.read_rx = Some(rx);
+ }
+ }
+
+ // SAFETY: It is safe for the read buf to be mutable here.
+ let buffer = unsafe { &mut *socket.buffer.get() };
+ let inner_req = InnerRequest {
+ // SAFETY: backing buffer is pinned and lives as long as the request.
+ req: unsafe { transmute::<httparse::Request<'_, '_>, _>(req) },
+ // SAFETY: backing buffer is pinned and lives as long as the request.
+ _headers: unsafe {
+ transmute::<Vec<httparse::Header<'_>>, _>(headers)
+ },
+ buffer: Pin::new(
+ replace(buffer, vec![0_u8; 1024]).into_boxed_slice(),
+ ),
+ body_offset,
+ body_len,
+ };
+ // h1
+ // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177
+ // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127
+ let mut keep_alive = inner_req.req.version.unwrap() == 1;
+ let mut upgrade = false;
+ let mut expect_continue = false;
+ let mut te = false;
+ let mut te_chunked = false;
+ let mut content_length = None;
+ for header in inner_req.req.headers.iter() {
+ match HeaderName::from_bytes(header.name.as_bytes()) {
+ Ok(CONNECTION) => {
+ // SAFETY: illegal bytes are validated by httparse.
+ let value = unsafe {
+ HeaderValue::from_maybe_shared_unchecked(header.value)
+ };
+ if keep_alive {
+ // 1.1
+ keep_alive = !connection_has(&value, "close");
+ } else {
+ // 1.0
+ keep_alive = connection_has(&value, "keep-alive");
+ }
+ }
+ Ok(UPGRADE) => {
+ upgrade = inner_req.req.version.unwrap() == 1;
+ }
+ Ok(TRANSFER_ENCODING) => {
+ // https://tools.ietf.org/html/rfc7230#section-3.3.3
+ debug_assert!(inner_req.req.version.unwrap() == 1);
+ // Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it.
+ te = true;
+ content_length = None;
+ // SAFETY: illegal bytes are validated by httparse.
+ let value = unsafe {
+ HeaderValue::from_maybe_shared_unchecked(header.value)
+ };
+ if let Ok(Some(encoding)) =
+ value.to_str().map(|s| s.rsplit(',').next())
+ {
+ // Chunked must always be the last encoding
+ if encoding.trim().eq_ignore_ascii_case("chunked") {
+ te_chunked = true;
+ }
+ }
+ }
+ // Transfer-Encoding overrides the Content-Length.
+ Ok(CONTENT_LENGTH) if !te => {
+ if let Some(len) = from_digits(header.value) {
+ if let Some(prev) = content_length {
+ if prev != len {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+ continue;
+ }
+ content_length = Some(len);
+ } else {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+ }
+ Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => {
+ expect_continue =
+ header.value.eq_ignore_ascii_case(b"100-continue");
+ }
+ _ => {}
+ }
+ }
+
+ // There is Transfer-Encoding but its not chunked.
+ if te && !te_chunked {
+ let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
+ continue 'events;
+ }
+
+ tx.blocking_send(NextRequest {
+ socket: sock_ptr,
+ // SAFETY: headers backing buffer outlives the mio event loop ('static)
+ inner: inner_req,
+ keep_alive,
+ upgrade,
+ te_chunked,
+ remaining_chunk_size: None,
+ content_read: 0,
+ content_length,
+ expect_continue,
+ })
+ .ok();
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+#[op]
+fn op_flash_serve<P>(
+ state: &mut OpState,
+ opts: ListenOpts,
+) -> Result<u32, AnyError>
+where
+ P: FlashPermissions + 'static,
+{
+ if opts.use_tls {
+ check_unstable(state, "Deno.serveTls");
+ } else {
+ check_unstable(state, "Deno.serve");
+ }
+ state
+ .borrow_mut::<P>()
+ .check_net(&(&opts.hostname, Some(opts.port)))?;
+ let addr = SocketAddr::new(opts.hostname.parse()?, opts.port);
+ let (tx, rx) = mpsc::channel(100);
+ let (close_tx, close_rx) = mpsc::channel(1);
+ let (listening_tx, listening_rx) = mpsc::channel(1);
+ let ctx = ServerContext {
+ _addr: addr,
+ tx,
+ rx,
+ response: HashMap::with_capacity(1000),
+ close_tx,
+ listening_rx: Some(listening_rx),
+ cancel_handle: CancelHandle::new_rc(),
+ };
+ let tx = ctx.tx.clone();
+ let maybe_cert = opts.cert;
+ let maybe_key = opts.key;
+ let join_handle = tokio::task::spawn_blocking(move || {
+ run_server(tx, listening_tx, close_rx, addr, maybe_cert, maybe_key)
+ });
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ let server_id = flash_ctx.next_server_id;
+ flash_ctx.next_server_id += 1;
+ flash_ctx.join_handles.insert(server_id, join_handle);
+ flash_ctx.servers.insert(server_id, ctx);
+ Ok(server_id)
+}
+
+#[op]
+fn op_flash_wait_for_listening(
+ state: &mut OpState,
+ server_id: u32,
+) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
+ let mut listening_rx = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ let server_ctx = flash_ctx
+ .servers
+ .get_mut(&server_id)
+ .ok_or_else(|| type_error("server not found"))?;
+ server_ctx.listening_rx.take().unwrap()
+ };
+ Ok(async move {
+ listening_rx.recv().await;
+ Ok(())
+ })
+}
+
+#[op]
+fn op_flash_drive_server(
+ state: &mut OpState,
+ server_id: u32,
+) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
+ let join_handle = {
+ let flash_ctx = state.borrow_mut::<FlashContext>();
+ flash_ctx
+ .join_handles
+ .remove(&server_id)
+ .ok_or_else(|| type_error("server not found"))?
+ };
+ Ok(async move {
+ join_handle
+ .await
+ .map_err(|_| type_error("server join error"))??;
+ Ok(())
+ })
+}
+
+// Asychronous version of op_flash_next. This can be a bottleneck under
+// heavy load, it should be used as a fallback if there are no buffered
+// requests i.e `op_flash_next() == 0`.
+#[op]
+async fn op_flash_next_async(
+ op_state: Rc<RefCell<OpState>>,
+ server_id: u32,
+) -> u32 {
+ let ctx = {
+ let mut op_state = op_state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ ctx as *mut ServerContext
+ };
+ // SAFETY: we cannot hold op_state borrow across the await point. The JS caller
+ // is responsible for ensuring this is not called concurrently.
+ let ctx = unsafe { &mut *ctx };
+ let cancel_handle = &ctx.cancel_handle;
+ let mut tokens = 0;
+ while let Ok(token) = ctx.rx.try_recv() {
+ ctx.response.insert(tokens, token);
+ tokens += 1;
+ }
+ if tokens == 0 {
+ if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
+ ctx.response.insert(tokens, req);
+ tokens += 1;
+ }
+ }
+ tokens
+}
+
+// Syncrhonous version of op_flash_next_async. Under heavy load,
+// this can collect buffered requests from rx channel and return tokens in a single batch.
+//
+// perf: please do not add any arguments to this op. With optimizations enabled,
+// the ContextScope creation is optimized away and the op is as simple as:
+// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); }
+#[op]
+fn op_flash_next(op_state: &mut OpState) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&0).unwrap();
+ next_request_sync(ctx)
+}
+
+// Syncrhonous version of op_flash_next_async. Under heavy load,
+// this can collect buffered requests from rx channel and return tokens in a single batch.
+#[op]
+fn op_flash_next_server(op_state: &mut OpState, server_id: u32) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ next_request_sync(ctx)
+}
+
+// Wrapper type for tokio::net::TcpStream that implements
+// deno_websocket::UpgradedStream
+struct UpgradedStream(tokio::net::TcpStream);
+impl tokio::io::AsyncRead for UpgradedStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut tokio::io::ReadBuf,
+ ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
+ }
+}
+
+impl tokio::io::AsyncWrite for UpgradedStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_flush(cx)
+ }
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
+ }
+}
+
+impl deno_websocket::Upgraded for UpgradedStream {}
+
+#[inline]
+pub fn detach_socket(
+ ctx: &mut ServerContext,
+ token: u32,
+) -> Result<tokio::net::TcpStream, AnyError> {
+ // Two main 'hacks' to get this working:
+ // * make server thread forget about the socket. `detach_ownership` prevents the socket from being
+ // dropped on the server thread.
+ // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
+ // use raw fds.
+ let tx = ctx
+ .response
+ .remove(&token)
+ .ok_or_else(|| type_error("request closed"))?;
+ let stream = tx.socket();
+ // prevent socket from being dropped on server thread.
+ // TODO(@littledivy): Box-ify, since there is no overhead.
+ stream.detach_ownership();
+
+ #[cfg(unix)]
+ let std_stream = {
+ use std::os::unix::prelude::AsRawFd;
+ use std::os::unix::prelude::FromRawFd;
+ let fd = match stream.inner {
+ InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
+ _ => todo!(),
+ };
+ // SAFETY: `fd` is a valid file descriptor.
+ unsafe { std::net::TcpStream::from_raw_fd(fd) }
+ };
+ #[cfg(windows)]
+ let std_stream = {
+ use std::os::windows::prelude::AsRawSocket;
+ use std::os::windows::prelude::FromRawSocket;
+ let fd = match stream.inner {
+ InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
+ _ => todo!(),
+ };
+ // SAFETY: `fd` is a valid file descriptor.
+ unsafe { std::net::TcpStream::from_raw_socket(fd) }
+ };
+ let stream = tokio::net::TcpStream::from_std(std_stream)?;
+ Ok(stream)
+}
+
+#[op]
+async fn op_flash_upgrade_websocket(
+ state: Rc<RefCell<OpState>>,
+ server_id: u32,
+ token: u32,
+) -> Result<deno_core::ResourceId, AnyError> {
+ let stream = {
+ let op_state = &mut state.borrow_mut();
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)?
+ };
+ deno_websocket::ws_create_server_stream(
+ &state,
+ Box::pin(UpgradedStream(stream)),
+ )
+ .await
+}
+
+pub struct Unstable(pub bool);
+
+fn check_unstable(state: &OpState, api_name: &str) {
+ let unstable = state.borrow::<Unstable>();
+
+ if !unstable.0 {
+ eprintln!(
+ "Unstable API '{}'. The --unstable flag must be provided.",
+ api_name
+ );
+ std::process::exit(70);
+ }
+}
+
+pub trait FlashPermissions {
+ fn check_net<T: AsRef<str>>(
+ &mut self,
+ _host: &(T, Option<u16>),
+ ) -> Result<(), AnyError>;
+}
+
+pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
+ Extension::builder()
+ .js(deno_core::include_js_files!(
+ prefix "deno:ext/flash",
+ "01_http.js",
+ ))
+ .ops(vec![
+ op_flash_serve::decl::<P>(),
+ op_flash_respond::decl(),
+ op_flash_respond_chuncked::decl(),
+ op_flash_method::decl(),
+ op_flash_path::decl(),
+ op_flash_headers::decl(),
+ op_flash_next::decl(),
+ op_flash_next_server::decl(),
+ op_flash_next_async::decl(),
+ op_flash_read_body::decl(),
+ op_flash_upgrade_websocket::decl(),
+ op_flash_drive_server::decl(),
+ op_flash_wait_for_listening::decl(),
+ op_flash_first_packet::decl(),
+ op_flash_has_body_stream::decl(),
+ op_flash_close_server::decl(),
+ op_flash_make_request::decl(),
+ op_flash_write_resource::decl(),
+ ])
+ .state(move |op_state| {
+ op_state.put(Unstable(unstable));
+ op_state.put(FlashContext {
+ next_server_id: 0,
+ join_handles: HashMap::default(),
+ servers: HashMap::default(),
+ });
+ Ok(())
+ })
+ .build()
+}