summaryrefslogtreecommitdiff
path: root/op_crates/fetch/lib.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-01-12 08:50:02 +0100
committerGitHub <noreply@github.com>2021-01-11 23:50:02 -0800
commit275a5c65a20529cd4a3d775b8d8c6e9b261c76b1 (patch)
tree9f861e36e70be809d5586128a24b9f7b4332e09e /op_crates/fetch/lib.rs
parent36ff7bdf575e0547fabd8957ee778cc4224d5956 (diff)
upgrade: tokio 1.0 (#8779)
Co-authored-by: Bert Belder <bertbelder@gmail.com>
Diffstat (limited to 'op_crates/fetch/lib.rs')
-rw-r--r--op_crates/fetch/lib.rs13
1 files changed, 7 insertions, 6 deletions
diff --git a/op_crates/fetch/lib.rs b/op_crates/fetch/lib.rs
index b47039b08..f882736f5 100644
--- a/op_crates/fetch/lib.rs
+++ b/op_crates/fetch/lib.rs
@@ -16,6 +16,7 @@ use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
@@ -38,10 +39,10 @@ use std::io::Read;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
-use tokio::io::stream_reader;
use tokio::io::AsyncReadExt;
-use tokio::io::StreamReader;
use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use tokio_util::io::StreamReader;
pub use reqwest; // Re-export reqwest
@@ -157,7 +158,7 @@ where
0 => {
// If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1);
- request = request.body(Body::wrap_stream(rx));
+ request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
let request_body_rid =
state.resource_table.add(FetchRequestBodyResource {
@@ -247,7 +248,7 @@ pub async fn op_fetch_send(
let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| {
r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
}));
- let stream_reader = stream_reader(stream);
+ let stream_reader = StreamReader::new(stream);
let rid = state
.borrow_mut()
.resource_table
@@ -288,7 +289,7 @@ pub async fn op_fetch_request_write(
.resource_table
.get::<FetchRequestBodyResource>(rid as u32)
.ok_or_else(bad_resource_id)?;
- let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+ let body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
body.send(Ok(buf)).or_cancel(cancel).await??;
@@ -321,7 +322,7 @@ pub async fn op_fetch_response_read(
let mut reader = RcRef::map(&resource, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let mut buf = data[0].clone();
- let read = reader.read(&mut buf).or_cancel(cancel).await??;
+ let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(json!({ "read": read }))
}