summaryrefslogtreecommitdiff
path: root/op_crates/fetch
diff options
context:
space:
mode:
Diffstat (limited to 'op_crates/fetch')
-rw-r--r--op_crates/fetch/Cargo.toml9
-rw-r--r--op_crates/fetch/lib.rs13
2 files changed, 12 insertions, 10 deletions
diff --git a/op_crates/fetch/Cargo.toml b/op_crates/fetch/Cargo.toml
index 6916685fb..9c088ac60 100644
--- a/op_crates/fetch/Cargo.toml
+++ b/op_crates/fetch/Cargo.toml
@@ -14,9 +14,10 @@ repository = "https://github.com/denoland/deno"
path = "lib.rs"
[dependencies]
+bytes = "1.0.1"
deno_core = { version = "0.75.0", path = "../../core" }
-
-bytes = "0.5.6"
-reqwest = { version = "0.10.8", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] }
+reqwest = { version = "0.11.0", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] }
serde = { version = "1.0.116", features = ["derive"] }
-tokio = { version = "0.2.22", features = ["full"] } \ No newline at end of file
+tokio = { version = "1.0.1", features = ["full"] }
+tokio-stream = "0.1.1"
+tokio-util = "0.6.0"
diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs
index b47039b08..f882736f5 100644
--- a/op_crates/fetch/lib.rs
+++ b/op_crates/fetch/lib.rs
@@ -16,6 +16,7 @@ use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
@@ -38,10 +39,10 @@ use std::io::Read;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
-use tokio::io::stream_reader;
use tokio::io::AsyncReadExt;
-use tokio::io::StreamReader;
use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tokio_util::io::StreamReader;
pub use reqwest; // Re-export reqwest
@@ -157,7 +158,7 @@ where
0 => {
// If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
- request = request.body(Body::wrap_stream(rx));
+ request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
@@ -247,7 +248,7 @@ pub async fn op_fetch_send(
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
- let stream_reader = stream_reader(stream);
+ let stream_reader = StreamReader::new(stream);
let rid = state
.borrow_mut()
.resource_table
@@ -288,7 +289,7 @@ pub async fn op_fetch_request_write(
.resource_table
.get::<FetchRequestBodyResource>(rid as u32)
.ok_or_else(bad_resource_id)?;
- let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+ let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
body.send(Ok(buf)).or_cancel(cancel).await??;
@@ -321,7 +322,7 @@ pub async fn op_fetch_response_read(
let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let mut buf = data[0].clone();
- let read = reader.read(&mut buf).or_cancel(cancel).await??;
+ let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(json!({ "read": read }))
}