diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/cache/sqlite.rs | 27 | ||||
-rw-r--r-- | ext/fetch/lib.rs | 63 | ||||
-rw-r--r-- | ext/flash/lib.rs | 12 | ||||
-rw-r--r-- | ext/http/lib.rs | 110 | ||||
-rw-r--r-- | ext/net/io.rs | 57 | ||||
-rw-r--r-- | ext/net/ops_tls.rs | 30 |
6 files changed, 130 insertions, 169 deletions
diff --git a/ext/cache/sqlite.rs b/ext/cache/sqlite.rs index 7e97fb563..75aa7cc6e 100644 --- a/ext/cache/sqlite.rs +++ b/ext/cache/sqlite.rs @@ -7,7 +7,6 @@ use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; use deno_core::Resource; -use deno_core::ZeroCopyBuf; use rusqlite::params; use rusqlite::Connection; use rusqlite::OptionalExtension; @@ -347,10 +346,10 @@ pub struct CachePutResource { } impl CachePutResource { - async fn write(self: Rc<Self>, data: ZeroCopyBuf) -> Result<usize, AnyError> { + async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { let resource = deno_core::RcRef::map(&self, |r| &r.file); let mut file = resource.borrow_mut().await; - file.write_all(&data).await?; + file.write_all(data).await?; Ok(data.len()) } @@ -374,9 +373,7 @@ impl Resource for CachePutResource { "CachePutResource".into() } - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { - Box::pin(self.write(buf)) - } + deno_core::impl_writable!(); fn shutdown(self: Rc<Self>) -> AsyncResult<()> { Box::pin(self.shutdown()) @@ -394,28 +391,20 @@ impl CacheResponseResource { } } - async fn read( - self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> { let resource = deno_core::RcRef::map(&self, |r| &r.file); let mut file = resource.borrow_mut().await; - let nread = file.read(&mut buf).await?; - Ok((nread, buf)) + let nread = file.read(data).await?; + Ok(nread) } } impl Resource for CacheResponseResource { + deno_core::impl_readable_byob!(); + fn name(&self) -> Cow<str> { "CacheResponseResource".into() } - - fn read_return( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } } pub fn hash(token: &str) -> String { diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 0adc32343..b8f784284 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -5,11 +5,14 @@ mod fs_fetch_handler; use data_url::DataUrl; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::stream::Peekable; use deno_core::futures::Future; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::include_js_files; use deno_core::op; +use deno_core::BufView; +use deno_core::WriteOutcome; use deno_core::url::Url; use deno_core::AsyncRefCell; @@ -43,15 +46,14 @@ use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; +use std::cmp::min; use std::convert::From; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; -use tokio::io::AsyncReadExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::StreamReader; // Re-export reqwest and data_url pub use data_url; @@ -252,7 +254,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (tx, rx) = mpsc::channel::<std::io::Result<Vec<u8>>>(1); + let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1); // If the size of the body is known, we include a content-length // header explicitly. @@ -401,12 +403,11 @@ pub async fn op_fetch_send( let stream: BytesStream = Box::pin(res.bytes_stream().map(|r| { r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) })); - let stream_reader = StreamReader::new(stream); let rid = state .borrow_mut() .resource_table .add(FetchResponseBodyResource { - reader: AsyncRefCell::new(stream_reader), + reader: AsyncRefCell::new(stream.peekable()), cancel: CancelHandle::default(), size: content_length, }); @@ -446,7 +447,7 @@ impl Resource for FetchCancelHandle { } pub struct FetchRequestBodyResource { - body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>, + body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>, cancel: CancelHandle, } @@ -455,17 +456,16 @@ impl Resource for FetchRequestBodyResource { "fetchRequestBody".into() } - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { + fn write(self: Rc<Self>, buf: BufView) -> AsyncResult<WriteOutcome> { Box::pin(async move { - let data = buf.to_vec(); - let len = data.len(); + let bytes: bytes::Bytes = buf.into(); + let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - body.send(Ok(data)).or_cancel(cancel).await?.map_err(|_| { + body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { type_error("request body receiver not connected (request closed)") })?; - - Ok(len) + Ok(WriteOutcome::Full { nwritten }) }) } @@ -478,7 +478,7 @@ type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; struct FetchResponseBodyResource { - reader: AsyncRefCell<StreamReader<BytesStream, bytes::Bytes>>, + reader: AsyncRefCell<Peekable<BytesStream>>, cancel: CancelHandle, size: Option<u64>, } @@ -488,15 +488,36 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } - fn read_return( - self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { Box::pin(async move { - let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; - let cancel = RcRef::map(self, |r| &r.cancel); - let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok((read, buf)) + let reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; + + let fut = async move { + let mut reader = Pin::new(reader); + loop { + match reader.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let chunk = chunk.split_to(len); + break Ok(chunk.into()); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match reader.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(BufView::empty()), + } + } + }; + + let cancel_handle = RcRef::map(self, |r| &r.cancel); + fut.try_or_cancel(cancel_handle).await }) } diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs index a7bd8b439..f9ce1c744 100644 --- a/ext/flash/lib.rs +++ b/ext/flash/lib.rs @@ -253,20 +253,16 @@ async fn op_flash_write_resource( .write_all(b"Transfer-Encoding: chunked\r\n\r\n") .await?; loop { - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { + let view = resource.clone().read(64 * 1024).await?; // 64KB + if view.is_empty() { stream.write_all(b"0\r\n\r\n").await?; break; } - - let response = &buf[..nread]; // TODO(@littledivy): use vectored writes. stream - .write_all(format!("{:x}\r\n", response.len()).as_bytes()) + .write_all(format!("{:x}\r\n", view.len()).as_bytes()) .await?; - stream.write_all(response).await?; + stream.write_all(&view).await?; stream.write_all(b"\r\n").await?; } resource.close(); diff --git a/ext/http/lib.rs b/ext/http/lib.rs index a8c2810bc..e71d9fae3 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -23,6 +23,8 @@ use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -333,61 +335,58 @@ impl HttpStreamResource { } } -impl HttpStreamResource { - async fn read( - self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { - let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; - - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(_, body) => break body, - HttpRequestReader::Closed => return Ok((0, buf)), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let (parts, body) = request.into_parts(); - *rd = HttpRequestReader::Body(parts.headers, body.peekable()); +impl Resource for HttpStreamResource { + fn name(&self) -> Cow<str> { + "httpStream".into() + } + + fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> { + Box::pin(async move { + let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; + + let body = loop { + match &mut *rd { + HttpRequestReader::Headers(_) => {} + HttpRequestReader::Body(_, body) => break body, + HttpRequestReader::Closed => return Ok(BufView::empty()), } - _ => unreachable!(), + match take(&mut *rd) { + HttpRequestReader::Headers(request) => { + let (parts, body) = request.into_parts(); + *rd = HttpRequestReader::Body(parts.headers, body.peekable()); + } + _ => unreachable!(), + }; }; - }; - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok((len, buf)); + let fut = async { + let mut body = Pin::new(body); + loop { + match body.as_mut().peek_mut().await { + Some(Ok(chunk)) if !chunk.is_empty() => { + let len = min(limit, chunk.len()); + let buf = chunk.split_to(len); + let view = BufView::from(buf); + break Ok(view); + } + // This unwrap is safe because `peek_mut()` returned `Some`, and thus + // currently has a peeked value that can be synchronously returned + // from `next()`. + // + // The future returned from `next()` is always ready, so we can + // safely call `await` on it without creating a race condition. + Some(_) => match body.as_mut().next().await.unwrap() { + Ok(chunk) => assert!(chunk.is_empty()), + Err(err) => break Err(AnyError::from(err)), + }, + None => break Ok(BufView::empty()), } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok((0, buf)), } - } - }; - - let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await - } -} - -impl Resource for HttpStreamResource { - fn name(&self) -> Cow<str> { - "httpStream".into() - } + }; - fn read_return( - self: Rc<Self>, - _buf: ZeroCopyBuf, - ) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(_buf)) + let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); + fut.try_or_cancel(cancel_handle).await + }) } fn close(self: Rc<Self>) { @@ -763,16 +762,14 @@ async fn op_http_write_resource( _ => {} }; - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { + let view = resource.clone().read(64 * 1024).await?; // 64KB + if view.is_empty() { break; } match &mut *wr { HttpResponseWriter::Body(body) => { - if let Err(err) = body.write_all(&buf[..nread]).await { + if let Err(err) = body.write_all(&view).await { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. @@ -782,9 +779,8 @@ async fn op_http_write_resource( } } HttpResponseWriter::BodyUncompressed(body) => { - let mut buf = buf.to_temp(); - buf.truncate(nread); - if let Err(err) = body.send_data(Bytes::from(buf)).await { + let bytes = Bytes::from(view); + if let Err(err) = body.send_data(bytes).await { assert!(err.is_closed()); // Pull up the failure associated with the transport connection instead. http_stream.conn.closed().await?; diff --git a/ext/net/io.rs b/ext/net/io.rs index c9587c851..4c9fbe3d2 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -9,7 +9,6 @@ use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::RcRef; use deno_core::Resource; -use deno_core::ZeroCopyBuf; use socket2::SockRef; use std::borrow::Cow; use std::rc::Rc; @@ -69,22 +68,16 @@ where pub async fn read( self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + data: &mut [u8], + ) -> Result<usize, AnyError> { let mut rd = self.rd_borrow_mut().await; - let nread = rd - .read(&mut buf) - .try_or_cancel(self.cancel_handle()) - .await?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(self.cancel_handle()).await?; + Ok(nread) } - pub async fn write( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> Result<usize, AnyError> { + pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { let mut wr = self.wr_borrow_mut().await; - let nwritten = wr.write(&buf).await?; + let nwritten = wr.write(data).await?; Ok(nwritten) } @@ -99,21 +92,13 @@ pub type TcpStreamResource = FullDuplexResource<tcp::OwnedReadHalf, tcp::OwnedWriteHalf>; impl Resource for TcpStreamResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + fn name(&self) -> Cow<str> { "tcpStream".into() } - fn read_return( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { - Box::pin(self.write(buf)) - } - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { Box::pin(self.shutdown()) } @@ -161,16 +146,10 @@ pub struct UnixStreamResource; #[cfg(not(unix))] impl UnixStreamResource { - pub async fn read( - self: Rc<Self>, - _buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + fn read(self: Rc<Self>, data: &mut [u8]) -> AsyncResult<usize> { unreachable!() } - pub async fn write( - self: Rc<Self>, - _buf: ZeroCopyBuf, - ) -> Result<usize, AnyError> { + fn write(self: Rc<Self>, data: &[u8]) -> AsyncResult<usize> { unreachable!() } pub async fn shutdown(self: Rc<Self>) -> Result<(), AnyError> { @@ -182,21 +161,13 @@ impl UnixStreamResource { } impl Resource for UnixStreamResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + fn name(&self) -> Cow<str> { "unixStream".into() } - fn read_return( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { - Box::pin(self.write(buf)) - } - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { Box::pin(self.shutdown()) } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index 230f4359e..a1b48b84e 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -38,7 +38,6 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; use deno_tls::create_client_config; use deno_tls::load_certs; use deno_tls::load_private_keys; @@ -691,21 +690,18 @@ impl TlsStreamResource { pub async fn read( self: Rc<Self>, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + data: &mut [u8], + ) -> Result<usize, AnyError> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); - let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; - Ok((nread, buf)) + let nread = rd.read(data).try_or_cancel(cancel_handle).await?; + Ok(nread) } - pub async fn write( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> Result<usize, AnyError> { + pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { self.handshake().await?; let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await; - let nwritten = wr.write(&buf).await?; + let nwritten = wr.write(data).await?; wr.flush().await?; Ok(nwritten) } @@ -736,21 +732,13 @@ impl TlsStreamResource { } impl Resource for TlsStreamResource { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + fn name(&self) -> Cow<str> { "tlsStream".into() } - fn read_return( - self: Rc<Self>, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { - Box::pin(self.read(buf)) - } - - fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { - Box::pin(self.write(buf)) - } - fn shutdown(self: Rc<Self>) -> AsyncResult<()> { Box::pin(self.shutdown()) } |