summaryrefslogtreecommitdiff
path: root/ext/fetch/byte_stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/fetch/byte_stream.rs')
-rw-r--r--ext/fetch/byte_stream.rs87
1 files changed, 0 insertions, 87 deletions
diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs
deleted file mode 100644
index 33cbfe76e..000000000
--- a/ext/fetch/byte_stream.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
-
-use deno_core::futures::Stream;
-use tokio::sync::mpsc;
-
-/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
-/// used to bridge between the fetch task and the HTTP body stream. The stream
-/// has the special property that it errors if the channel is closed before an
-/// explicit EOF is sent (in the form of a [None] value on the sender).
-pub struct MpscByteStream {
- receiver: mpsc::Receiver<Option<bytes::Bytes>>,
- shutdown: bool,
-}
-
-impl MpscByteStream {
- pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
- let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
- let this = Self {
- receiver,
- shutdown: false,
- };
- (this, sender)
- }
-}
-
-impl Stream for MpscByteStream {
- type Item = Result<bytes::Bytes, std::io::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let val = std::task::ready!(self.receiver.poll_recv(cx));
- match val {
- None if self.shutdown => Poll::Ready(None),
- None => Poll::Ready(Some(Err(std::io::Error::new(
- std::io::ErrorKind::UnexpectedEof,
- "channel closed",
- )))),
- Some(None) => {
- self.shutdown = true;
- Poll::Ready(None)
- }
- Some(Some(val)) => Poll::Ready(Some(Ok(val))),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use bytes::Bytes;
- use deno_core::futures::StreamExt;
-
- #[tokio::test]
- async fn success() {
- let (mut stream, sender) = MpscByteStream::new();
-
- sender.send(Some(Bytes::from("hello"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
-
- sender.send(Some(Bytes::from("world"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
-
- sender.send(None).await.unwrap();
- drop(sender);
- assert!(stream.next().await.is_none());
- }
-
- #[tokio::test]
- async fn error() {
- let (mut stream, sender) = MpscByteStream::new();
-
- sender.send(Some(Bytes::from("hello"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
-
- drop(sender);
- assert_eq!(
- stream.next().await.unwrap().unwrap_err().kind(),
- std::io::ErrorKind::UnexpectedEof
- );
- }
-}