summaryrefslogtreecommitdiff
path: root/extensions/broadcast_channel/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/broadcast_channel/lib.rs')
-rw-r--r--extensions/broadcast_channel/lib.rs172
1 files changed, 90 insertions, 82 deletions
diff --git a/extensions/broadcast_channel/lib.rs b/extensions/broadcast_channel/lib.rs
index cee9c3e0c..b2a79916c 100644
--- a/extensions/broadcast_channel/lib.rs
+++ b/extensions/broadcast_channel/lib.rs
@@ -1,127 +1,135 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+mod in_memory_broadcast_channel;
+
+pub use in_memory_broadcast_channel::InMemoryBroadcastChannel;
+
+use async_trait::async_trait;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::include_js_files;
use deno_core::op_async;
use deno_core::op_sync;
-use deno_core::AsyncRefCell;
use deno_core::Extension;
use deno_core::OpState;
-use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
-use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::rc::Rc;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWriteExt;
-struct BroadcastChannelResource(AsyncRefCell<tokio::fs::File>);
+#[async_trait]
+pub trait BroadcastChannel: Clone {
+ type Resource: Resource;
-impl Resource for BroadcastChannelResource {
- fn name(&self) -> Cow<str> {
- "broadcastChannel".into()
- }
+ fn subscribe(&self) -> Result<Self::Resource, AnyError>;
+
+ fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>;
+
+ async fn send(
+ &self,
+ resource: &Self::Resource,
+ name: String,
+ data: Vec<u8>,
+ ) -> Result<(), AnyError>;
+
+ async fn recv(
+ &self,
+ resource: &Self::Resource,
+ ) -> Result<Option<Message>, AnyError>;
}
-pub fn op_broadcast_open(
+pub type Message = (String, Vec<u8>);
+
+struct Unstable(bool); // --unstable
+
+pub fn op_broadcast_subscribe<BC: BroadcastChannel + 'static>(
state: &mut OpState,
- name: String,
- _bufs: Option<ZeroCopyBuf>,
+ _args: (),
+ _buf: (),
) -> Result<ResourceId, AnyError> {
- let path = PathBuf::from("./");
- std::fs::create_dir_all(&path)?;
- let file = std::fs::OpenOptions::new()
- .create(true)
- .append(true)
- .read(true)
- .open(path.join(format!("broadcast_{}", name)))?;
-
- let rid =
- state
- .resource_table
- .add(BroadcastChannelResource(AsyncRefCell::new(
- tokio::fs::File::from_std(file),
- )));
-
- Ok(rid)
+ let unstable = state.borrow::<Unstable>().0;
+
+ if !unstable {
+ eprintln!(
+ "Unstable API 'BroadcastChannel'. The --unstable flag must be provided.",
+ );
+ std::process::exit(70);
+ }
+
+ let bc = state.borrow::<BC>();
+ let resource = bc.subscribe()?;
+ Ok(state.resource_table.add(resource))
}
-pub async fn op_broadcast_send(
- state: Rc<RefCell<OpState>>,
+pub fn op_broadcast_unsubscribe<BC: BroadcastChannel + 'static>(
+ state: &mut OpState,
rid: ResourceId,
- buf: Option<ZeroCopyBuf>,
+ _buf: (),
) -> Result<(), AnyError> {
- let state = state.borrow_mut();
let resource = state
.resource_table
- .get::<BroadcastChannelResource>(rid)
+ .get::<BC::Resource>(rid)
.ok_or_else(bad_resource_id)?;
+ let bc = state.borrow::<BC>();
+ bc.unsubscribe(&resource)
+}
- let mut file = RcRef::map(&resource, |r| &r.0).borrow_mut().await;
-
- let buffer_data = buf.unwrap();
- let mut data = vec![];
- data.extend_from_slice(&(buffer_data.len() as u64).to_ne_bytes());
- data.extend_from_slice(&buffer_data);
-
- file.write_all(&data).await?;
-
- Ok(())
+pub async fn op_broadcast_send<BC: BroadcastChannel + 'static>(
+ state: Rc<RefCell<OpState>>,
+ (rid, name): (ResourceId, String),
+ buf: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let bc = state.borrow().borrow::<BC>().clone();
+ bc.send(&resource, name, buf.to_vec()).await
}
-pub async fn op_broadcast_next_event(
+pub async fn op_broadcast_recv<BC: BroadcastChannel + 'static>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
- _bufs: Option<ZeroCopyBuf>,
-) -> Result<Vec<u8>, AnyError> {
- let resource = {
- let state = state.borrow_mut();
- state
- .resource_table
- .get::<BroadcastChannelResource>(rid)
- .ok_or_else(bad_resource_id)?
- };
-
- let mut file = RcRef::map(&resource, |r| &r.0).borrow_mut().await;
-
- let size = match file.read_u64().await {
- Ok(s) => s,
- Err(e) => {
- return match e.kind() {
- deno_core::futures::io::ErrorKind::UnexpectedEof => Ok(vec![]),
- _ => Err(e.into()),
- }
- }
- };
- let mut data = vec![0u8; size as usize];
- match file.read_exact(&mut data).await {
- Ok(s) => s,
- Err(e) => {
- return match e.kind() {
- deno_core::futures::io::ErrorKind::UnexpectedEof => Ok(vec![]),
- _ => Err(e.into()),
- }
- }
- };
-
- Ok(data)
+ _buf: (),
+) -> Result<Option<Message>, AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let bc = state.borrow().borrow::<BC>().clone();
+ bc.recv(&resource).await
}
-pub fn init() -> Extension {
+pub fn init<BC: BroadcastChannel + 'static>(
+ bc: BC,
+ unstable: bool,
+) -> Extension {
Extension::builder()
.js(include_js_files!(
prefix "deno:extensions/broadcast_channel",
"01_broadcast_channel.js",
))
.ops(vec![
- ("op_broadcast_open", op_sync(op_broadcast_open)),
- ("op_broadcast_send", op_async(op_broadcast_send)),
- ("op_broadcast_next_event", op_async(op_broadcast_next_event)),
+ (
+ "op_broadcast_subscribe",
+ op_sync(op_broadcast_subscribe::<BC>),
+ ),
+ (
+ "op_broadcast_unsubscribe",
+ op_sync(op_broadcast_unsubscribe::<BC>),
+ ),
+ ("op_broadcast_send", op_async(op_broadcast_send::<BC>)),
+ ("op_broadcast_recv", op_async(op_broadcast_recv::<BC>)),
])
+ .state(move |state| {
+ state.put(bc.clone());
+ state.put(Unstable(unstable));
+ Ok(())
+ })
.build()
}