summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/flash/01_http.js51
-rw-r--r--ext/flash/lib.rs57
2 files changed, 94 insertions, 14 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index 7a6b9bc47..2b0caff49 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -243,6 +243,7 @@
i,
respondFast,
respondChunked,
+ tryRespondChunked,
) {
// there might've been an HTTP upgrade.
if (resp === undefined) {
@@ -371,6 +372,9 @@
}
} else {
const reader = respBody.getReader();
+
+ // Best case: sends headers + first chunk in a single go.
+ const { value, done } = await reader.read();
writeFixedResponse(
serverId,
i,
@@ -385,14 +389,23 @@
false,
respondFast,
);
- while (true) {
- const { value, done } = await reader.read();
- await respondChunked(
- i,
- value,
- done,
- );
- if (done) break;
+
+ await tryRespondChunked(
+ i,
+ value,
+ done,
+ );
+
+ if (!done) {
+ while (true) {
+ const chunk = await reader.read();
+ await respondChunked(
+ i,
+ chunk.value,
+ chunk.done,
+ );
+ if (chunk.done) break;
+ }
}
}
}
@@ -572,6 +585,7 @@
i,
respondFast,
respondChunked,
+ tryRespondChunked,
),
),
onError,
@@ -589,6 +603,7 @@
i,
respondFast,
respondChunked,
+ tryRespondChunked,
)
).catch(onError);
continue;
@@ -607,6 +622,7 @@
i,
respondFast,
respondChunked,
+ tryRespondChunked,
);
}
@@ -623,6 +639,25 @@
once: true,
});
+ function tryRespondChunked(token, chunk, shutdown) {
+ const nwritten = core.ops.op_try_flash_respond_chuncked(
+ serverId,
+ token,
+ chunk ?? new Uint8Array(),
+ shutdown,
+ );
+ if (nwritten > 0) {
+ return core.opAsync(
+ "op_flash_respond_chuncked",
+ serverId,
+ token,
+ chunk,
+ shutdown,
+ nwritten,
+ );
+ }
+ }
+
function respondChunked(token, chunk, shutdown) {
return core.opAsync(
"op_flash_respond_chuncked",
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index b077b8d21..d08cdbcdc 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -106,6 +106,39 @@ fn op_flash_respond(
flash_respond(ctx, token, shutdown, &response)
}
+#[op(fast)]
+fn op_try_flash_respond_chuncked(
+ op_state: &mut OpState,
+ server_id: u32,
+ token: u32,
+ response: &[u8],
+ shutdown: bool,
+) -> u32 {
+ let flash_ctx = op_state.borrow_mut::<FlashContext>();
+ let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
+ let tx = ctx.requests.get(&token).unwrap();
+ let sock = tx.socket();
+
+ // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
+ // https://github.com/denoland/deno/pull/15629
+ let h = format!("{:x}\r\n", response.len());
+
+ let concat = [h.as_bytes(), response, b"\r\n"].concat();
+ let expected = sock.try_write(&concat);
+ if expected != concat.len() {
+ if expected > 2 {
+ return expected as u32;
+ }
+ return expected as u32;
+ }
+
+ if shutdown {
+ // Best case: We've written everything and the stream is done too.
+ let _ = ctx.requests.remove(&token).unwrap();
+ }
+ 0
+}
+
#[op]
async fn op_flash_respond_async(
state: Rc<RefCell<OpState>>,
@@ -157,6 +190,7 @@ async fn op_flash_respond_chuncked(
token: u32,
response: Option<ZeroCopyBuf>,
shutdown: bool,
+ nwritten: u32,
) -> Result<(), AnyError> {
let mut op_state = op_state.borrow_mut();
let flash_ctx = op_state.borrow_mut::<FlashContext>();
@@ -178,17 +212,27 @@ async fn op_flash_respond_chuncked(
.with_async_stream(|stream| {
Box::pin(async move {
use tokio::io::AsyncWriteExt;
+ // TODO(@littledivy): Use writev when `UnixIoSlice` lands.
+ // https://github.com/denoland/deno/pull/15629
+ macro_rules! write_whats_not_written {
+ ($e:expr) => {
+ let e = $e;
+ let n = nwritten as usize;
+ if n < e.len() {
+ stream.write_all(&e[n..]).await?;
+ }
+ };
+ }
if let Some(response) = response {
- stream
- .write_all(format!("{:x}\r\n", response.len()).as_bytes())
- .await?;
- stream.write_all(&response).await?;
- stream.write_all(b"\r\n").await?;
+ let h = format!("{:x}\r\n", response.len());
+ write_whats_not_written!(h.as_bytes());
+ write_whats_not_written!(&response);
+ write_whats_not_written!(b"\r\n");
}
// The last chunk
if shutdown {
- stream.write_all(b"0\r\n\r\n").await?;
+ write_whats_not_written!(b"0\r\n\r\n");
}
Ok(())
@@ -1485,6 +1529,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
op_flash_close_server::decl(),
op_flash_make_request::decl(),
op_flash_write_resource::decl(),
+ op_try_flash_respond_chuncked::decl(),
])
.state(move |op_state| {
op_state.put(Unstable(unstable));