diff options
Diffstat (limited to 'op_crates/fetch')
-rw-r--r-- | op_crates/fetch/Cargo.toml | 9 | ||||
-rw-r--r-- | op_crates/fetch/lib.rs | 13 |
2 files changed, 12 insertions, 10 deletions
diff --git a/op_crates/fetch/Cargo.toml b/op_crates/fetch/Cargo.toml index 6916685fb..9c088ac60 100644 --- a/op_crates/fetch/Cargo.toml +++ b/op_crates/fetch/Cargo.toml @@ -14,9 +14,10 @@ repository = "https://github.com/denoland/deno" path = "lib.rs" [dependencies] +bytes = "1.0.1" deno_core = { version = "0.75.0", path = "../../core" } - -bytes = "0.5.6" -reqwest = { version = "0.10.8", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] } +reqwest = { version = "0.11.0", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli"] } serde = { version = "1.0.116", features = ["derive"] } -tokio = { version = "0.2.22", features = ["full"] }
\ No newline at end of file +tokio = { version = "1.0.1", features = ["full"] } +tokio-stream = "0.1.1" +tokio-util = "0.6.0" diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs index b47039b08..f882736f5 100644 --- a/op_crates/fetch/lib.rs +++ b/op_crates/fetch/lib.rs @@ -16,6 +16,7 @@ use deno_core::AsyncRefCell; use deno_core::BufVec; use deno_core::CancelFuture; use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; use deno_core::RcRef; @@ -38,10 +39,10 @@ use std::io::Read; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; -use tokio::io::stream_reader; use tokio::io::AsyncReadExt; -use tokio::io::StreamReader; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::StreamReader; pub use reqwest; // Re-export reqwest @@ -157,7 +158,7 @@ where 0 => { // If no body is passed, we return a writer for streaming the body. let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); - request = request.body(Body::wrap_stream(rx)); + request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { @@ -247,7 +248,7 @@ pub async fn op_fetch_send( let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) })); - let stream_reader = stream_reader(stream); + let stream_reader = StreamReader::new(stream); let rid = state .borrow_mut() .resource_table @@ -288,7 +289,7 @@ pub async fn op_fetch_request_write( .resource_table .get::<FetchRequestBodyResource>(rid as u32) .ok_or_else(bad_resource_id)?; - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); body.send(Ok(buf)).or_cancel(cancel).await??; @@ -321,7 +322,7 @@ pub async fn op_fetch_response_read( let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); let mut buf = data[0].clone(); - let read = reader.read(&mut buf).or_cancel(cancel).await??; + let read = reader.read(&mut buf).try_or_cancel(cancel).await?; Ok(json!({ "read": read })) } |