summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2022-04-21 02:22:55 +0200
committerGitHub <noreply@github.com>2022-04-21 02:22:55 +0200
commit03019e778189b38938f1238f22652162de5a7434 (patch)
treecf16b44be07c1c488ffe4f31fe77eab7f6bd8c95
parentaaaa877d91c5f8b88722fd1ec725791b0eb4efe0 (diff)
Revert various PRs related to "ext/http" (#14339)
* Revert "feat(ext/http): stream auto resp body compression (#14325)" * Revert "core: introduce `resource.read_return` (#14331)" * Revert "perf(http): optimize `ReadableStream`s backed by a resource (#14284)"
-rw-r--r--.github/workflows/ci.yml6
-rw-r--r--Cargo.lock1
-rw-r--r--cli/tests/unit/http_test.ts200
-rw-r--r--core/examples/http_bench_json_ops.rs16
-rw-r--r--core/resources.rs12
-rw-r--r--ext/fetch/lib.rs7
-rw-r--r--ext/http/01_http.js63
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/lib.rs223
-rw-r--r--ext/net/01_net.js30
-rw-r--r--ext/net/io.rs16
-rw-r--r--ext/net/ops_tls.rs9
-rw-r--r--ext/web/06_streams.js42
-rw-r--r--runtime/js/40_files.js4
-rw-r--r--runtime/ops/io.rs32
-rw-r--r--serde_v8/magic/buffer.rs12
16 files changed, 173 insertions, 501 deletions
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3612b3a3e..26bf1ace8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -236,7 +236,7 @@ jobs:
~/.cargo/registry/index
~/.cargo/registry/cache
~/.cargo/git/db
- key: 9-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
+ key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
# In main branch, always creates fresh cache
- name: Cache build output (main)
@@ -252,7 +252,7 @@ jobs:
!./target/*/*.zip
!./target/*/*.tar.gz
key: |
- 9-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
+ 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
# Restore cache from the latest 'main' branch build.
- name: Cache build output (PR)
@@ -268,7 +268,7 @@ jobs:
!./target/*/*.tar.gz
key: never_saved
restore-keys: |
- 9-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
+ 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
# Don't save cache after building PRs or branches other than 'main'.
- name: Skip save cache (PR)
diff --git a/Cargo.lock b/Cargo.lock
index be295c4c2..e7008286b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -964,7 +964,6 @@ dependencies = [
name = "deno_http"
version = "0.41.0"
dependencies = [
- "async-compression",
"base64 0.13.0",
"brotli",
"bytes",
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index 5fabd40fe..37c827b9b 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -854,45 +854,6 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() {
listener.close();
});
-Deno.test(
- { permissions: { net: true, write: true, read: true } },
- async function httpServerClosedStream() {
- const listener = Deno.listen({ port: 4502 });
-
- const client = await Deno.connect({ port: 4502 });
- await client.write(new TextEncoder().encode(
- `GET / HTTP/1.0\r\n\r\n`,
- ));
-
- const conn = await listener.accept();
- const httpConn = Deno.serveHttp(conn);
- const ev = await httpConn.nextRequest();
- const { respondWith } = ev!;
-
- const tmpFile = await Deno.makeTempFile();
- const file = await Deno.open(tmpFile, { write: true, read: true });
- await file.write(new TextEncoder().encode("hello"));
-
- const reader = await file.readable.getReader();
- while (true) {
- const { done, value } = await reader.read();
- if (done) break;
- assert(value);
- }
-
- try {
- await respondWith(new Response(file.readable));
- fail("The stream should've been locked");
- } catch {
- // pass
- }
-
- httpConn.close();
- listener.close();
- client.close();
- },
-);
-
// https://github.com/denoland/deno/issues/11595
Deno.test(
{ permissions: { net: true } },
@@ -1224,25 +1185,26 @@ Deno.test(
const decoder = new TextDecoder();
Deno.test({
- name: "http server compresses body - check headers",
+ name: "http server compresses body",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
- const listener = Deno.listen({ hostname, port });
-
- const data = { hello: "deno", now: "with", compressed: "body" };
async function server() {
+ const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
assert(e);
const { request, respondWith } = e;
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
- const response = new Response(JSON.stringify(data), {
- headers: { "content-type": "application/json" },
- });
+ const response = new Response(
+ JSON.stringify({ hello: "deno", now: "with", compressed: "body" }),
+ {
+ headers: { "content-type": "application/json" },
+ },
+ );
await respondWith(response);
httpConn.close();
listener.close();
@@ -1274,60 +1236,6 @@ Deno.test({
});
Deno.test({
- name: "http server compresses body - check body",
- permissions: { net: true, run: true },
- async fn() {
- const hostname = "localhost";
- const port = 4501;
- const listener = Deno.listen({ hostname, port });
-
- const data = { hello: "deno", now: "with", compressed: "body" };
-
- async function server() {
- const tcpConn = await listener.accept();
- const httpConn = Deno.serveHttp(tcpConn);
- const e = await httpConn.nextRequest();
- assert(e);
- const { request, respondWith } = e;
- assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
- const response = new Response(JSON.stringify(data), {
- headers: { "content-type": "application/json" },
- });
- await respondWith(response);
- httpConn.close();
- listener.close();
- }
-
- async function client() {
- const url = `http://${hostname}:${port}/`;
- const cmd = [
- "curl",
- "--request",
- "GET",
- "--url",
- url,
- "--header",
- "Accept-Encoding: gzip, deflate, br",
- ];
- const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
- const status = await proc.status();
- assert(status.success);
- const stdout = proc.stdout!.readable
- .pipeThrough(new DecompressionStream("gzip"))
- .pipeThrough(new TextDecoderStream());
- let body = "";
- for await (const chunk of stdout) {
- body += chunk;
- }
- assertEquals(JSON.parse(body), data);
- proc.close();
- }
-
- await Promise.all([server(), client()]);
- },
-});
-
-Deno.test({
name: "http server doesn't compress small body",
permissions: { net: true, run: true },
async fn() {
@@ -1706,18 +1614,15 @@ Deno.test({
});
Deno.test({
- name: "http server compresses streamed bodies - check headers",
+ name: "http server doesn't compress streamed bodies",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
- const encoder = new TextEncoder();
- const listener = Deno.listen({ hostname, port });
-
- const data = { hello: "deno", now: "with", compressed: "body" };
-
async function server() {
+ const encoder = new TextEncoder();
+ const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
@@ -1726,13 +1631,23 @@ Deno.test({
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const bodyInit = new ReadableStream({
start(controller) {
- controller.enqueue(encoder.encode(JSON.stringify(data)));
+ controller.enqueue(
+ encoder.encode(
+ JSON.stringify({
+ hello: "deno",
+ now: "with",
+ compressed: "body",
+ }),
+ ),
+ );
controller.close();
},
});
const response = new Response(
bodyInit,
- { headers: { "content-type": "application/json" } },
+ {
+ headers: { "content-type": "application/json", vary: "Accept" },
+ },
);
await respondWith(response);
httpConn.close();
@@ -1755,71 +1670,8 @@ Deno.test({
const status = await proc.status();
assert(status.success);
const output = decoder.decode(await proc.output());
- assert(output.includes("vary: Accept-Encoding\r\n"));
- assert(output.includes("content-encoding: gzip\r\n"));
- proc.close();
- }
-
- await Promise.all([server(), client()]);
- },
-});
-
-Deno.test({
- name: "http server compresses streamed bodies - check body",
- permissions: { net: true, run: true },
- async fn() {
- const hostname = "localhost";
- const port = 4501;
-
- const encoder = new TextEncoder();
- const listener = Deno.listen({ hostname, port });
-
- const data = { hello: "deno", now: "with", compressed: "body" };
-
- async function server() {
- const tcpConn = await listener.accept();
- const httpConn = Deno.serveHttp(tcpConn);
- const e = await httpConn.nextRequest();
- assert(e);
- const { request, respondWith } = e;
- assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
- const bodyInit = new ReadableStream({
- start(controller) {
- controller.enqueue(encoder.encode(JSON.stringify(data)));
- controller.close();
- },
- });
- const response = new Response(
- bodyInit,
- { headers: { "content-type": "application/json" } },
- );
- await respondWith(response);
- httpConn.close();
- listener.close();
- }
-
- async function client() {
- const url = `http://${hostname}:${port}/`;
- const cmd = [
- "curl",
- "--request",
- "GET",
- "--url",
- url,
- "--header",
- "Accept-Encoding: gzip, deflate, br",
- ];
- const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
- const status = await proc.status();
- assert(status.success);
- const stdout = proc.stdout.readable
- .pipeThrough(new DecompressionStream("gzip"))
- .pipeThrough(new TextDecoderStream());
- let body = "";
- for await (const chunk of stdout) {
- body += chunk;
- }
- assertEquals(JSON.parse(body), data);
+ assert(output.includes("vary: Accept\r\n"));
+ assert(!output.includes("content-encoding: "));
proc.close();
}
@@ -1884,6 +1736,8 @@ Deno.test({
// Ensure the content-length header is updated.
assert(!output.includes(`content-length: ${contentLength}\r\n`));
assert(output.includes("content-length: 72\r\n"));
+ console.log(output);
+
proc.close();
}
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index 7c895f326..2068c3b85 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -83,18 +83,13 @@ struct TcpStream {
}
impl TcpStream {
- async fn read(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), Error> {
+ async fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> Result<usize, Error> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
- let nread = rd
- .read(&mut buf)
+ rd.read(&mut buf)
.try_or_cancel(cancel)
.await
- .map_err(Error::from)?;
- Ok((nread, buf))
+ .map_err(Error::from)
}
async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
@@ -104,10 +99,7 @@ impl TcpStream {
}
impl Resource for TcpStream {
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
diff --git a/core/resources.rs b/core/resources.rs
index ae4ef7394..9a1447392 100644
--- a/core/resources.rs
+++ b/core/resources.rs
@@ -36,17 +36,7 @@ pub trait Resource: Any + 'static {
}
/// Resources may implement `read()` to be a readable stream
- fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
- Box::pin(async move {
- let (nread, _) = self.read_return(buf).await?;
- Ok(nread)
- })
- }
-
- fn read_return(
- self: Rc<Self>,
- _buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(futures::future::err(not_supported()))
}
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index def823d8f..c216d53fa 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -485,15 +485,12 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
- fn read_return(
- self: Rc<Self>,
- mut buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(async move {
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
- Ok((read, buf))
+ Ok(read)
})
}
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index ff4b6f41f..217bfc061 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -32,8 +32,7 @@
} = window.__bootstrap.webSocket;
const { TcpConn, UnixConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls;
- const { Deferred, getReadableStreamRid, readableStreamClose } =
- window.__bootstrap.streams;
+ const { Deferred } = window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
@@ -236,6 +235,7 @@
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
+
try {
await core.opAsync(
"op_http_write_headers",
@@ -269,50 +269,35 @@
) {
throw new TypeError("Unreachable");
}
- const resourceRid = getReadableStreamRid(respBody);
- if (resourceRid) {
- if (respBody.locked) {
- throw new TypeError("ReadableStream is locked.");
+ const reader = respBody.getReader();
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
+ await reader.cancel(new TypeError("Value not a Uint8Array"));
+ break;
}
- const _reader = respBody.getReader(); // Aquire JS lock.
- await core.opAsync(
- "op_http_write_resource",
- streamRid,
- resourceRid,
- );
- readableStreamClose(respBody); // Release JS lock.
- } else {
- const reader = respBody.getReader();
- while (true) {
- const { value, done } = await reader.read();
- if (done) break;
- if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
- await reader.cancel(new TypeError("Value not a Uint8Array"));
- break;
- }
- try {
- await core.opAsync("op_http_write", streamRid, value);
- } catch (error) {
- const connError = httpConn[connErrorSymbol];
- if (
- ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
- connError != null
- ) {
- // deno-lint-ignore no-ex-assign
- error = new connError.constructor(connError.message);
- }
- await reader.cancel(error);
- throw error;
- }
- }
-
try {
- await core.opAsync("op_http_shutdown", streamRid);
+ await core.opAsync("op_http_write", streamRid, value);
} catch (error) {
+ const connError = httpConn[connErrorSymbol];
+ if (
+ ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
+ connError != null
+ ) {
+ // deno-lint-ignore no-ex-assign
+ error = new connError.constructor(connError.message);
+ }
await reader.cancel(error);
throw error;
}
}
+ try {
+ await core.opAsync("op_http_shutdown", streamRid);
+ } catch (error) {
+ await reader.cancel(error);
+ throw error;
+ }
}
const deferred = request[_deferred];
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index b4a208228..2bdbfdade 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -14,7 +14,6 @@ description = "HTTP server implementation for Deno"
path = "lib.rs"
[dependencies]
-async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] }
base64 = "0.13.0"
brotli = "3.3.3"
bytes = "1"
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index a6f47c1c9..9c0109937 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
-use async_compression::tokio::write::BrotliEncoder;
-use async_compression::tokio::write::GzipEncoder;
+use bytes::Bytes;
use cache_control::CacheControl;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
@@ -22,6 +21,7 @@ use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op;
+
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
@@ -60,9 +60,7 @@ use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
-use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
-use tokio_util::io::ReaderStream;
mod compressible;
@@ -77,7 +75,6 @@ pub fn init() -> Extension {
op_http_read::decl(),
op_http_write_headers::decl(),
op_http_write::decl(),
- op_http_write_resource::decl(),
op_http_shutdown::decl(),
op_http_websocket_accept_header::decl(),
op_http_upgrade_websocket::decl(),
@@ -341,7 +338,7 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream.
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
- Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
+ Body(hyper::body::Sender),
Closed,
}
@@ -548,60 +545,55 @@ async fn op_http_write_headers(
let body: Response<Body>;
let new_wr: HttpResponseWriter;
- // Set Vary: Accept-Encoding header for direct body response.
- // Note: we set the header irrespective of whether or not we compress the data
- // to make sure cache services do not serve uncompressed data to clients that
- // support compression.
- let vary_value = if let Some(value) = vary_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.to_lowercase().contains("accept-encoding") {
- format!("Accept-Encoding, {}", value_str)
- } else {
- value_str.to_string()
- }
- } else {
- // the header value wasn't valid UTF8, so it would have been a
- // problem anyways, so sending a default header.
- "Accept-Encoding".to_string()
- }
- } else {
- "Accept-Encoding".to_string()
- };
- builder = builder.header("vary", &vary_value);
-
- let accepts_compression = matches!(
- *stream.accept_encoding.borrow(),
- Encoding::Brotli | Encoding::Gzip
- );
- let should_compress = body_compressible
- && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
- && accepts_compression;
-
- if should_compress {
- // If user provided a ETag header for uncompressed data, we need to
- // ensure it is a Weak Etag header ("W/").
- if let Some(value) = etag_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.starts_with("W/") {
- builder = builder.header("etag", format!("W/{}", value_str));
+ match data {
+ Some(data) => {
+ // Set Vary: Accept-Encoding header for direct body response.
+ // Note: we set the header irrespective of whether or not we compress the
+ // data to make sure cache services do not serve uncompressed data to
+ // clients that support compression.
+ let vary_value = if let Some(value) = vary_header {
+ if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
+ if !value_str.to_lowercase().contains("accept-encoding") {
+ format!("Accept-Encoding, {}", value_str)
+ } else {
+ value_str.to_string()
+ }
} else {
- builder = builder.header("etag", value.as_slice());
+ // the header value wasn't valid UTF8, so it would have been a
+ // problem anyways, so sending a default header.
+ "Accept-Encoding".to_string()
}
} else {
- builder = builder.header("etag", value.as_slice());
- }
- }
- } else if let Some(value) = etag_header {
- builder = builder.header("etag", value.as_slice());
- }
+ "Accept-Encoding".to_string()
+ };
+ builder = builder.header("vary", &vary_value);
+
+ let accepts_compression = matches!(
+ *stream.accept_encoding.borrow(),
+ Encoding::Brotli | Encoding::Gzip
+ );
+
+ let should_compress =
+ body_compressible && data.len() > 20 && accepts_compression;
- match data {
- Some(data) => {
if should_compress {
// Drop 'content-length' header. Hyper will update it using compressed body.
if let Some(headers) = builder.headers_mut() {
headers.remove("content-length");
}
+ // If user provided a ETag header for uncompressed data, we need to
+ // ensure it is a Weak Etag header ("W/").
+ if let Some(value) = etag_header {
+ if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
+ if !value_str.starts_with("W/") {
+ builder = builder.header("etag", format!("W/{}", value_str));
+ } else {
+ builder = builder.header("etag", value.as_slice());
+ }
+ } else {
+ builder = builder.header("etag", value.as_slice());
+ }
+ }
match *stream.accept_encoding.borrow() {
Encoding::Brotli => {
@@ -629,6 +621,9 @@ async fn op_http_write_headers(
}
}
} else {
+ if let Some(value) = etag_header {
+ builder = builder.header("etag", value.as_slice());
+ }
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
body = builder.body(data.into_bytes().into())?;
@@ -638,35 +633,19 @@ async fn op_http_write_headers(
None => {
// If no buffer was passed, the caller will stream the response body.
- // Create a one way pipe that implements tokio's async io traits. To do
- // this we create a [tokio::io::DuplexStream], but then throw away one
- // of the directions to create a one way pipe.
- let (a, b) = tokio::io::duplex(64 * 1024);
- let (reader, _) = tokio::io::split(a);
- let (_, writer) = tokio::io::split(b);
+ // TODO(@kitsonk) had compression for streamed bodies.
- let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
-
- if should_compress {
- match *stream.accept_encoding.borrow() {
- Encoding::Brotli => {
- let writer = BrotliEncoder::new(writer);
- writer_body = Box::pin(writer);
- builder = builder.header("content-encoding", "br");
- }
- _ => {
- assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
- let writer = GzipEncoder::new(writer);
- writer_body = Box::pin(writer);
- builder = builder.header("content-encoding", "gzip");
- }
- }
- } else {
- writer_body = Box::pin(writer);
+ // Set the user provided ETag & Vary headers for a streaming response
+ if let Some(value) = etag_header {
+ builder = builder.header("etag", value.as_slice());
+ }
+ if let Some(value) = vary_header {
+ builder = builder.header("vary", value.as_slice());
}
- body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
- new_wr = HttpResponseWriter::Body(writer_body);
+ let (body_tx, body_rx) = Body::channel();
+ body = builder.body(body_rx)?;
+ new_wr = HttpResponseWriter::Body(body_tx);
}
}
@@ -686,69 +665,6 @@ async fn op_http_write_headers(
}
#[op]
-async fn op_http_write_resource(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- stream: ResourceId,
-) -> Result<(), AnyError> {
- let http_stream = state
- .borrow()
- .resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
- let resource = state.borrow().resource_table.get_any(stream)?;
- loop {
- let body_writer = match &mut *wr {
- HttpResponseWriter::Body(body_writer) => body_writer,
- HttpResponseWriter::Headers(_) => {
- return Err(http_error("no response headers"))
- }
- HttpResponseWriter::Closed => {
- return Err(http_error("response already completed"))
- }
- };
-
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
- break;
- }
-
- let mut res = body_writer.write_all(&buf).await;
- if res.is_ok() {
- res = body_writer.flush().await;
- }
- match res {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- http_stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
- }
- }
- }
-
- let wr = take(&mut *wr);
- if let HttpResponseWriter::Body(mut body_writer) = wr {
- match body_writer.shutdown().await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- http_stream.conn.closed().await?;
- }
- }
- }
-
- Ok(())
-}
-
-#[op]
async fn op_http_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -761,7 +677,7 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop {
- let body_writer = match &mut *wr {
+ let body_tx = match &mut *wr {
HttpResponseWriter::Body(body_tx) => body_tx,
HttpResponseWriter::Headers(_) => {
break Err(http_error("no response headers"))
@@ -771,17 +687,13 @@ async fn op_http_write(
}
};
- let mut res = body_writer.write_all(&buf).await;
- if res.is_ok() {
- res = body_writer.flush().await;
- }
-
- match res {
+ let bytes = Bytes::copy_from_slice(&buf[..]);
+ match body_tx.send_data(bytes).await {
Ok(_) => break Ok(()),
Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
+ // Don't return "channel closed", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
+ assert!(err.is_closed());
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
@@ -803,18 +715,7 @@ async fn op_http_shutdown(
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- let wr = take(&mut *wr);
- if let HttpResponseWriter::Body(mut body_writer) = wr {
- match body_writer.shutdown().await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
- }
- }
- }
+ take(&mut *wr);
Ok(())
}
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index fde75fe56..48cbfaaab 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -4,7 +4,7 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype } = core;
- const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
+ const { ReadableStream, WritableStream } = window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@@ -65,6 +65,8 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
+ const DEFAULT_CHUNK_SIZE = 64 * 1024;
+
function tryClose(rid) {
try {
core.close(rid);
@@ -73,6 +75,32 @@
}
}
+ function readableStreamForRid(rid) {
+ return new ReadableStream({
+ type: "bytes",
+ async pull(controller) {
+ const v = controller.byobRequest.view;
+ try {
+ const bytesRead = await read(rid, v);
+ if (bytesRead === null) {
+ tryClose(rid);
+ controller.close();
+ controller.byobRequest.respond(0);
+ } else {
+ controller.byobRequest.respond(bytesRead);
+ }
+ } catch (e) {
+ controller.error(e);
+ tryClose(rid);
+ }
+ },
+ cancel() {
+ tryClose(rid);
+ },
+ autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
+ });
+ }
+
function writableStreamForRid(rid) {
return new WritableStream({
async write(chunk, controller) {
diff --git a/ext/net/io.rs b/ext/net/io.rs
index 02caf7473..17b86af17 100644
--- a/ext/net/io.rs
+++ b/ext/net/io.rs
@@ -70,13 +70,13 @@ where
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ ) -> Result<usize, AnyError> {
let mut rd = self.rd_borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
- Ok((nread, buf))
+ Ok(nread)
}
pub async fn write(
@@ -103,10 +103,7 @@ impl Resource for TcpStreamResource {
"tcpStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
@@ -163,7 +160,7 @@ impl UnixStreamResource {
pub async fn read(
self: Rc<Self>,
_buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ ) -> Result<usize, AnyError> {
unreachable!()
}
pub async fn write(
@@ -185,10 +182,7 @@ impl Resource for UnixStreamResource {
"unixStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index ca922203c..d6b83e6e8 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -674,11 +674,11 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ ) -> Result<usize, AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
- Ok((nread, buf))
+ Ok(nread)
}
pub async fn write(
@@ -722,10 +722,7 @@ impl Resource for TlsStreamResource {
"tlsStream".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 492694563..6daea0898 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -8,7 +8,6 @@
"use strict";
((window) => {
- const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
window.__bootstrap.abortSignal;
@@ -641,41 +640,6 @@
return stream[_disturbed];
}
- const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
-
- function readableStreamForRid(rid) {
- const stream = new ReadableStream({
- type: "bytes",
- async pull(controller) {
- const v = controller.byobRequest.view;
- try {
- const bytesRead = await core.read(rid, v);
- if (bytesRead === 0) {
- core.tryClose(rid);
- controller.close();
- controller.byobRequest.respond(0);
- } else {
- controller.byobRequest.respond(bytesRead);
- }
- } catch (e) {
- controller.error(e);
- core.tryClose(rid);
- }
- },
- cancel() {
- core.tryClose(rid);
- },
- autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
- });
-
- stream[_maybeRid] = rid;
- return stream;
- }
-
- function getReadableStreamRid(stream) {
- return stream[_maybeRid];
- }
-
/**
* @param {unknown} value
* @returns {value is WritableStream}
@@ -4324,7 +4288,6 @@
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}
- const _maybeRid = Symbol("[[maybeRid]]");
/** @template R */
class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
@@ -4339,8 +4302,6 @@
[_state];
/** @type {any} */
[_storedError];
- /** @type {number | null} */
- [_maybeRid] = null;
/**
* @param {UnderlyingSource<R>=} underlyingSource
@@ -5879,9 +5840,6 @@
errorReadableStream,
createProxy,
writableStreamClose,
- readableStreamClose,
- readableStreamForRid,
- getReadableStreamRid,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,
diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js
index d2148be2f..8aa0a4972 100644
--- a/runtime/js/40_files.js
+++ b/runtime/js/40_files.js
@@ -6,8 +6,8 @@
const { read, readSync, write, writeSync } = window.__bootstrap.io;
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
const { pathFromURL } = window.__bootstrap.util;
- const { writableStreamForRid } = window.__bootstrap.streamUtils;
- const { readableStreamForRid } = window.__bootstrap.streams;
+ const { readableStreamForRid, writableStreamForRid } =
+ window.__bootstrap.streamUtils;
const {
ArrayPrototypeFilter,
Error,
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 34cd541d5..27a48a961 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -174,13 +174,13 @@ where
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ ) -> Result<usize, AnyError> {
let mut rd = self.borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
- Ok((nread, buf))
+ Ok(nread)
}
pub fn into_inner(self) -> S {
@@ -211,10 +211,7 @@ impl Resource for ChildStdoutResource {
"childStdout".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
@@ -230,10 +227,7 @@ impl Resource for ChildStderrResource {
"childStderr".into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
@@ -277,17 +271,16 @@ impl StdFileResource {
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
- ) -> Result<(usize, ZeroCopyBuf), AnyError> {
+ ) -> Result<usize, AnyError> {
if self.fs_file.is_some() {
let fs_file = self.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
- tokio::task::spawn_blocking(
- move || -> Result<(usize, ZeroCopyBuf), AnyError> {
- let mut std_file = std_file.lock().unwrap();
- Ok((std_file.read(&mut buf)?, buf))
- },
- )
+ tokio::task::spawn_blocking(move || {
+ let mut std_file = std_file.lock().unwrap();
+ std_file.read(&mut buf)
+ })
.await?
+ .map_err(AnyError::from)
} else {
Err(resource_unavailable())
}
@@ -337,10 +330,7 @@ impl Resource for StdFileResource {
self.name.as_str().into()
}
- fn read_return(
- self: Rc<Self>,
- buf: ZeroCopyBuf,
- ) -> AsyncResult<(usize, ZeroCopyBuf)> {
+ fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.read(buf))
}
diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs
index a0a1c974b..484984ac5 100644
--- a/serde_v8/magic/buffer.rs
+++ b/serde_v8/magic/buffer.rs
@@ -14,28 +14,19 @@ use crate::magic::transl8::impl_magic;
pub enum MagicBuffer {
FromV8(ZeroCopyBuf),
ToV8(Mutex<Option<Box<[u8]>>>),
- // Variant of the MagicBuffer than is never exposed to the JS.
- // Generally used to pass Vec<u8> backed buffers to resource methods.
- Temp(Vec<u8>),
}
-
impl_magic!(MagicBuffer);
impl MagicBuffer {
pub fn empty() -> Self {
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
}
-
- pub fn new_temp(vec: Vec<u8>) -> Self {
- MagicBuffer::Temp(vec)
- }
}
impl Clone for MagicBuffer {
fn clone(&self) -> Self {
match self {
Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()),
- Self::Temp(vec) => Self::Temp(vec.clone()),
Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"),
}
}
@@ -58,7 +49,6 @@ impl Deref for MagicBuffer {
fn deref(&self) -> &[u8] {
match self {
Self::FromV8(buf) => &*buf,
- Self::Temp(vec) => &*vec,
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
}
}
@@ -68,7 +58,6 @@ impl DerefMut for MagicBuffer {
fn deref_mut(&mut self) -> &mut [u8] {
match self {
Self::FromV8(buf) => &mut *buf,
- Self::Temp(vec) => &mut *vec,
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
}
}
@@ -96,7 +85,6 @@ impl ToV8 for MagicBuffer {
let value: &[u8] = buf;
value.into()
}
- Self::Temp(_) => unreachable!(),
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
};