diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2021-01-12 08:50:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-11 23:50:02 -0800 |
commit | 275a5c65a20529cd4a3d775b8d8c6e9b261c76b1 (patch) | |
tree | 9f861e36e70be809d5586128a24b9f7b4332e09e /op_crates/fetch/lib.rs | |
parent | 36ff7bdf575e0547fabd8957ee778cc4224d5956 (diff) |
upgrade: tokio 1.0 (#8779)
Co-authored-by: Bert Belder <bertbelder@gmail.com>
Diffstat (limited to 'op_crates/fetch/lib.rs')
-rw-r--r-- | op_crates/fetch/lib.rs | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs index b47039b08..f882736f5 100644 --- a/op_crates/fetch/lib.rs +++ b/op_crates/fetch/lib.rs @@ -16,6 +16,7 @@ use deno_core::AsyncRefCell; use deno_core::BufVec; use deno_core::CancelFuture; use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; use deno_core::RcRef; @@ -38,10 +39,10 @@ use std::io::Read; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; -use tokio::io::stream_reader; use tokio::io::AsyncReadExt; -use tokio::io::StreamReader; use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::io::StreamReader; pub use reqwest; // Re-export reqwest @@ -157,7 +158,7 @@ where 0 => { // If no body is passed, we return a writer for streaming the body. let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); - request = request.body(Body::wrap_stream(rx)); + request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { @@ -247,7 +248,7 @@ pub async fn op_fetch_send( let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) })); - let stream_reader = stream_reader(stream); + let stream_reader = StreamReader::new(stream); let rid = state .borrow_mut() .resource_table @@ -288,7 +289,7 @@ pub async fn op_fetch_request_write( .resource_table .get::<FetchRequestBodyResource>(rid as u32) .ok_or_else(bad_resource_id)?; - let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); body.send(Ok(buf)).or_cancel(cancel).await??; @@ -321,7 +322,7 @@ pub async fn op_fetch_response_read( let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(resource, |r| &r.cancel); let mut buf = data[0].clone(); - let read = reader.read(&mut buf).or_cancel(cancel).await??; + let read = reader.read(&mut buf).try_or_cancel(cancel).await?; Ok(json!({ "read": read })) } |