summaryrefslogtreecommitdiff
path: root/ext/fetch/byte_stream.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-03 14:27:25 -0600
committerGitHub <noreply@github.com>2023-08-03 14:27:25 -0600
commit7f8bf2537db0ae596a2c1baabd4011a190666ca6 (patch)
tree3dfb8df29ef39ee5eed9bc19dc57135374a338bd /ext/fetch/byte_stream.rs
parent0f07dc95f130b9ace00ad98f1b2a3f5c34662e4a (diff)
refactor(ext/fetch): refactor fetch to use new write_error method (#20029)
This is a prerequisite for fast streams work -- this particular resource used a custom `mpsc`-style stream, and this work will allow us to unify it with the streams in `ext/http` in time. Instead of using Option as an internal semaphore for "correctly completed EOF", we allow code to propagate errors into the channel which can be picked up by downstream sinks like Hyper. EOF is signalled using a more standard sender drop.
Diffstat (limited to 'ext/fetch/byte_stream.rs')
-rw-r--r--ext/fetch/byte_stream.rs87
1 files changed, 0 insertions, 87 deletions
diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs
deleted file mode 100644
index 33cbfe76e..000000000
--- a/ext/fetch/byte_stream.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
-
-use deno_core::futures::Stream;
-use tokio::sync::mpsc;
-
-/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
-/// used to bridge between the fetch task and the HTTP body stream. The stream
-/// has the special property that it errors if the channel is closed before an
-/// explicit EOF is sent (in the form of a [None] value on the sender).
-pub struct MpscByteStream {
- receiver: mpsc::Receiver<Option<bytes::Bytes>>,
- shutdown: bool,
-}
-
-impl MpscByteStream {
- pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
- let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
- let this = Self {
- receiver,
- shutdown: false,
- };
- (this, sender)
- }
-}
-
-impl Stream for MpscByteStream {
- type Item = Result<bytes::Bytes, std::io::Error>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let val = std::task::ready!(self.receiver.poll_recv(cx));
- match val {
- None if self.shutdown => Poll::Ready(None),
- None => Poll::Ready(Some(Err(std::io::Error::new(
- std::io::ErrorKind::UnexpectedEof,
- "channel closed",
- )))),
- Some(None) => {
- self.shutdown = true;
- Poll::Ready(None)
- }
- Some(Some(val)) => Poll::Ready(Some(Ok(val))),
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use bytes::Bytes;
- use deno_core::futures::StreamExt;
-
- #[tokio::test]
- async fn success() {
- let (mut stream, sender) = MpscByteStream::new();
-
- sender.send(Some(Bytes::from("hello"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
-
- sender.send(Some(Bytes::from("world"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
-
- sender.send(None).await.unwrap();
- drop(sender);
- assert!(stream.next().await.is_none());
- }
-
- #[tokio::test]
- async fn error() {
- let (mut stream, sender) = MpscByteStream::new();
-
- sender.send(Some(Bytes::from("hello"))).await.unwrap();
- assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
-
- drop(sender);
- assert_eq!(
- stream.next().await.unwrap().unwrap_err().kind(),
- std::io::ErrorKind::UnexpectedEof
- );
- }
-}