diff options
Diffstat (limited to 'extensions/broadcast_channel/lib.rs')
-rw-r--r-- | extensions/broadcast_channel/lib.rs | 172 |
1 files changed, 90 insertions, 82 deletions
diff --git a/extensions/broadcast_channel/lib.rs b/extensions/broadcast_channel/lib.rs index cee9c3e0c..b2a79916c 100644 --- a/extensions/broadcast_channel/lib.rs +++ b/extensions/broadcast_channel/lib.rs @@ -1,127 +1,135 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +mod in_memory_broadcast_channel; + +pub use in_memory_broadcast_channel::InMemoryBroadcastChannel; + +use async_trait::async_trait; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; -use deno_core::AsyncRefCell; use deno_core::Extension; use deno_core::OpState; -use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; -use std::borrow::Cow; use std::cell::RefCell; use std::path::PathBuf; use std::rc::Rc; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; -struct BroadcastChannelResource(AsyncRefCell<tokio::fs::File>); +#[async_trait] +pub trait BroadcastChannel: Clone { + type Resource: Resource; -impl Resource for BroadcastChannelResource { - fn name(&self) -> Cow<str> { - "broadcastChannel".into() - } + fn subscribe(&self) -> Result<Self::Resource, AnyError>; + + fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>; + + async fn send( + &self, + resource: &Self::Resource, + name: String, + data: Vec<u8>, + ) -> Result<(), AnyError>; + + async fn recv( + &self, + resource: &Self::Resource, + ) -> Result<Option<Message>, AnyError>; } -pub fn op_broadcast_open( +pub type Message = (String, Vec<u8>); + +struct Unstable(bool); // --unstable + +pub fn op_broadcast_subscribe<BC: BroadcastChannel + 'static>( state: &mut OpState, - name: String, - _bufs: Option<ZeroCopyBuf>, + _args: (), + _buf: (), ) -> Result<ResourceId, AnyError> { - let path = PathBuf::from("./"); - std::fs::create_dir_all(&path)?; - let file = std::fs::OpenOptions::new() - .create(true) - .append(true) - .read(true) - .open(path.join(format!("broadcast_{}", name)))?; - - let rid = - state - .resource_table - .add(BroadcastChannelResource(AsyncRefCell::new( - tokio::fs::File::from_std(file), - ))); - - Ok(rid) + let unstable = state.borrow::<Unstable>().0; + + if !unstable { + eprintln!( + "Unstable API 'BroadcastChannel'. The --unstable flag must be provided.", + ); + std::process::exit(70); + } + + let bc = state.borrow::<BC>(); + let resource = bc.subscribe()?; + Ok(state.resource_table.add(resource)) } -pub async fn op_broadcast_send( - state: Rc<RefCell<OpState>>, +pub fn op_broadcast_unsubscribe<BC: BroadcastChannel + 'static>( + state: &mut OpState, rid: ResourceId, - buf: Option<ZeroCopyBuf>, + _buf: (), ) -> Result<(), AnyError> { - let state = state.borrow_mut(); let resource = state .resource_table - .get::<BroadcastChannelResource>(rid) + .get::<BC::Resource>(rid) .ok_or_else(bad_resource_id)?; + let bc = state.borrow::<BC>(); + bc.unsubscribe(&resource) +} - let mut file = RcRef::map(&resource, |r| &r.0).borrow_mut().await; - - let buffer_data = buf.unwrap(); - let mut data = vec![]; - data.extend_from_slice(&(buffer_data.len() as u64).to_ne_bytes()); - data.extend_from_slice(&buffer_data); - - file.write_all(&data).await?; - - Ok(()) +pub async fn op_broadcast_send<BC: BroadcastChannel + 'static>( + state: Rc<RefCell<OpState>>, + (rid, name): (ResourceId, String), + buf: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::<BC::Resource>(rid) + .ok_or_else(bad_resource_id)?; + let bc = state.borrow().borrow::<BC>().clone(); + bc.send(&resource, name, buf.to_vec()).await } -pub async fn op_broadcast_next_event( +pub async fn op_broadcast_recv<BC: BroadcastChannel + 'static>( state: Rc<RefCell<OpState>>, rid: ResourceId, - _bufs: Option<ZeroCopyBuf>, -) -> Result<Vec<u8>, AnyError> { - let resource = { - let state = state.borrow_mut(); - state - .resource_table - .get::<BroadcastChannelResource>(rid) - .ok_or_else(bad_resource_id)? - }; - - let mut file = RcRef::map(&resource, |r| &r.0).borrow_mut().await; - - let size = match file.read_u64().await { - Ok(s) => s, - Err(e) => { - return match e.kind() { - deno_core::futures::io::ErrorKind::UnexpectedEof => Ok(vec![]), - _ => Err(e.into()), - } - } - }; - let mut data = vec![0u8; size as usize]; - match file.read_exact(&mut data).await { - Ok(s) => s, - Err(e) => { - return match e.kind() { - deno_core::futures::io::ErrorKind::UnexpectedEof => Ok(vec![]), - _ => Err(e.into()), - } - } - }; - - Ok(data) + _buf: (), +) -> Result<Option<Message>, AnyError> { + let resource = state + .borrow() + .resource_table + .get::<BC::Resource>(rid) + .ok_or_else(bad_resource_id)?; + let bc = state.borrow().borrow::<BC>().clone(); + bc.recv(&resource).await } -pub fn init() -> Extension { +pub fn init<BC: BroadcastChannel + 'static>( + bc: BC, + unstable: bool, +) -> Extension { Extension::builder() .js(include_js_files!( prefix "deno:extensions/broadcast_channel", "01_broadcast_channel.js", )) .ops(vec![ - ("op_broadcast_open", op_sync(op_broadcast_open)), - ("op_broadcast_send", op_async(op_broadcast_send)), - ("op_broadcast_next_event", op_async(op_broadcast_next_event)), + ( + "op_broadcast_subscribe", + op_sync(op_broadcast_subscribe::<BC>), + ), + ( + "op_broadcast_unsubscribe", + op_sync(op_broadcast_unsubscribe::<BC>), + ), + ("op_broadcast_send", op_async(op_broadcast_send::<BC>)), + ("op_broadcast_recv", op_async(op_broadcast_recv::<BC>)), ]) + .state(move |state| { + state.put(bc.clone()); + state.put(Unstable(unstable)); + Ok(()) + }) .build() } |