diff options
Diffstat (limited to 'extensions/broadcast_channel')
| -rw-r--r-- | extensions/broadcast_channel/01_broadcast_channel.js | 186 | ||||
| -rw-r--r-- | extensions/broadcast_channel/Cargo.toml | 20 | ||||
| -rw-r--r-- | extensions/broadcast_channel/README.md | 5 | ||||
| -rw-r--r-- | extensions/broadcast_channel/in_memory_broadcast_channel.rs | 97 | ||||
| -rw-r--r-- | extensions/broadcast_channel/lib.deno_broadcast_channel.d.ts | 55 | ||||
| -rw-r--r-- | extensions/broadcast_channel/lib.rs | 139 |
6 files changed, 0 insertions, 502 deletions
diff --git a/extensions/broadcast_channel/01_broadcast_channel.js b/extensions/broadcast_channel/01_broadcast_channel.js deleted file mode 100644 index 42112c014..000000000 --- a/extensions/broadcast_channel/01_broadcast_channel.js +++ /dev/null @@ -1,186 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -/// <reference path="../../core/internal.d.ts" /> - -"use strict"; - -((window) => { - const core = window.Deno.core; - const webidl = window.__bootstrap.webidl; - const { setTarget } = window.__bootstrap.event; - const { DOMException } = window.__bootstrap.domException; - const { - ArrayPrototypeIndexOf, - ArrayPrototypeSplice, - ArrayPrototypePush, - Symbol, - Uint8Array, - ObjectDefineProperty, - Map, - MapPrototypeSet, - MapPrototypeGet, - FunctionPrototypeCall, - } = window.__bootstrap.primordials; - - const handlerSymbol = Symbol("eventHandlers"); - function makeWrappedHandler(handler) { - function wrappedHandler(...args) { - if (typeof wrappedHandler.handler !== "function") { - return; - } - return FunctionPrototypeCall(wrappedHandler.handler, this, ...args); - } - wrappedHandler.handler = handler; - return wrappedHandler; - } - // TODO(lucacasonato) reuse when we can reuse code between web crates - function defineEventHandler(emitter, name) { - // HTML specification section 8.1.5.1 - ObjectDefineProperty(emitter, `on${name}`, { - get() { - // TODO(bnoordhuis) The "BroadcastChannel should have an onmessage - // event" WPT test expects that .onmessage !== undefined. Returning - // null makes it pass but is perhaps not exactly in the spirit. - if (!this[handlerSymbol]) { - return null; - } - return MapPrototypeGet(this[handlerSymbol], name)?.handler ?? null; - }, - set(value) { - if (!this[handlerSymbol]) { - this[handlerSymbol] = new Map(); - } - let handlerWrapper = MapPrototypeGet(this[handlerSymbol], name); - if (handlerWrapper) { - handlerWrapper.handler = value; - } else { - handlerWrapper = makeWrappedHandler(value); - this.addEventListener(name, handlerWrapper); - } - MapPrototypeSet(this[handlerSymbol], name, handlerWrapper); - }, - configurable: true, - enumerable: true, - }); - } - - const _name = Symbol("[[name]]"); - const _closed = Symbol("[[closed]]"); - - const channels = []; - let rid = null; - - async function recv() { - while (channels.length > 0) { - const message = await core.opAsync("op_broadcast_recv", rid); - - if (message === null) { - break; - } - - const [name, data] = message; - dispatch(null, name, new Uint8Array(data)); - } - - core.close(rid); - rid = null; - } - - function dispatch(source, name, data) { - for (const channel of channels) { - if (channel === source) continue; // Don't self-send. - if (channel[_name] !== name) continue; - if (channel[_closed]) continue; - - const go = () => { - if (channel[_closed]) return; - const event = new MessageEvent("message", { - data: core.deserialize(data), // TODO(bnoordhuis) Cache immutables. - origin: "http://127.0.0.1", - }); - setTarget(event, channel); - channel.dispatchEvent(event); - }; - - defer(go); - } - } - - // Defer to avoid starving the event loop. Not using queueMicrotask() - // for that reason: it lets promises make forward progress but can - // still starve other parts of the event loop. - function defer(go) { - setTimeout(go, 1); - } - - class BroadcastChannel extends EventTarget { - [_name]; - [_closed] = false; - - get name() { - return this[_name]; - } - - constructor(name) { - super(); - - const prefix = "Failed to construct 'BroadcastChannel'"; - webidl.requiredArguments(arguments.length, 1, { prefix }); - - this[_name] = webidl.converters["DOMString"](name, { - prefix, - context: "Argument 1", - }); - - this[webidl.brand] = webidl.brand; - - ArrayPrototypePush(channels, this); - - if (rid === null) { - // Create the rid immediately, otherwise there is a time window (and a - // race condition) where messages can get lost, because recv() is async. - rid = core.opSync("op_broadcast_subscribe"); - recv(); - } - } - - postMessage(message) { - webidl.assertBranded(this, BroadcastChannel); - - const prefix = "Failed to execute 'postMessage' on 'BroadcastChannel'"; - webidl.requiredArguments(arguments.length, 1, { prefix }); - - if (this[_closed]) { - throw new DOMException("Already closed", "InvalidStateError"); - } - - if (typeof message === "function" || typeof message === "symbol") { - throw new DOMException("Uncloneable value", "DataCloneError"); - } - - const data = core.serialize(message); - - // Send to other listeners in this VM. - dispatch(this, this[_name], new Uint8Array(data)); - - // Send to listeners in other VMs. - defer(() => core.opAsync("op_broadcast_send", [rid, this[_name]], data)); - } - - close() { - webidl.assertBranded(this, BroadcastChannel); - this[_closed] = true; - - const index = ArrayPrototypeIndexOf(channels, this); - if (index === -1) return; - - ArrayPrototypeSplice(channels, index, 1); - if (channels.length === 0) core.opSync("op_broadcast_unsubscribe", rid); - } - } - - defineEventHandler(BroadcastChannel.prototype, "message"); - defineEventHandler(BroadcastChannel.prototype, "messageerror"); - - window.__bootstrap.broadcastChannel = { BroadcastChannel }; -})(this); diff --git a/extensions/broadcast_channel/Cargo.toml b/extensions/broadcast_channel/Cargo.toml deleted file mode 100644 index 2f8b10bc7..000000000 --- a/extensions/broadcast_channel/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -[package] -name = "deno_broadcast_channel" -version = "0.8.0" -authors = ["the Deno authors"] -edition = "2018" -license = "MIT" -readme = "README.md" -repository = "https://github.com/denoland/deno" -description = "Implementation of BroadcastChannel API for Deno" - -[lib] -path = "lib.rs" - -[dependencies] -async-trait = "0.1" -deno_core = { version = "0.96.0", path = "../../core" } -tokio = { version = "1.8.1", features = ["full"] } -uuid = { version = "0.8.2", features = ["v4"] } diff --git a/extensions/broadcast_channel/README.md b/extensions/broadcast_channel/README.md deleted file mode 100644 index 5b5034ef7..000000000 --- a/extensions/broadcast_channel/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# deno_broadcast_channel - -This crate implements the BroadcastChannel functions of Deno. - -Spec: https://html.spec.whatwg.org/multipage/web-messaging.html diff --git a/extensions/broadcast_channel/in_memory_broadcast_channel.rs b/extensions/broadcast_channel/in_memory_broadcast_channel.rs deleted file mode 100644 index 879a3dbd5..000000000 --- a/extensions/broadcast_channel/in_memory_broadcast_channel.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -use crate::BroadcastChannel; -use async_trait::async_trait; -use deno_core::error::AnyError; -use deno_core::parking_lot::Mutex; -use std::sync::Arc; -use tokio::sync::broadcast; -use tokio::sync::mpsc; -use uuid::Uuid; - -#[derive(Clone)] -pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>); - -pub struct InMemoryBroadcastChannelResource { - rx: tokio::sync::Mutex<( - broadcast::Receiver<Message>, - mpsc::UnboundedReceiver<()>, - )>, - cancel_tx: mpsc::UnboundedSender<()>, - uuid: Uuid, -} - -#[derive(Clone, Debug)] -struct Message { - name: Arc<String>, - data: Arc<Vec<u8>>, - uuid: Uuid, -} - -impl Default for InMemoryBroadcastChannel { - fn default() -> Self { - let (tx, _) = broadcast::channel(256); - Self(Arc::new(Mutex::new(tx))) - } -} - -#[async_trait] -impl BroadcastChannel for InMemoryBroadcastChannel { - type Resource = InMemoryBroadcastChannelResource; - - fn subscribe(&self) -> Result<Self::Resource, AnyError> { - let (cancel_tx, cancel_rx) = mpsc::unbounded_channel(); - let broadcast_rx = self.0.lock().subscribe(); - let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx)); - let uuid = Uuid::new_v4(); - Ok(Self::Resource { - rx, - cancel_tx, - uuid, - }) - } - - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> { - Ok(resource.cancel_tx.send(())?) - } - - async fn send( - &self, - resource: &Self::Resource, - name: String, - data: Vec<u8>, - ) -> Result<(), AnyError> { - let name = Arc::new(name); - let data = Arc::new(data); - let uuid = resource.uuid; - self.0.lock().send(Message { name, data, uuid })?; - Ok(()) - } - - async fn recv( - &self, - resource: &Self::Resource, - ) -> Result<Option<crate::Message>, AnyError> { - let mut g = resource.rx.lock().await; - let (broadcast_rx, cancel_rx) = &mut *g; - loop { - let result = tokio::select! { - r = broadcast_rx.recv() => r, - _ = cancel_rx.recv() => return Ok(None), - }; - use tokio::sync::broadcast::error::RecvError::*; - match result { - Err(Closed) => return Ok(None), - Err(Lagged(_)) => (), // Backlogged, messages dropped. - Ok(message) if message.uuid == resource.uuid => (), // Self-send. - Ok(message) => { - let name = String::clone(&message.name); - let data = Vec::clone(&message.data); - return Ok(Some((name, data))); - } - } - } - } -} - -impl deno_core::Resource for InMemoryBroadcastChannelResource {} diff --git a/extensions/broadcast_channel/lib.deno_broadcast_channel.d.ts b/extensions/broadcast_channel/lib.deno_broadcast_channel.d.ts deleted file mode 100644 index c8efef778..000000000 --- a/extensions/broadcast_channel/lib.deno_broadcast_channel.d.ts +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -// deno-lint-ignore-file no-explicit-any - -/// <reference no-default-lib="true" /> -/// <reference lib="esnext" /> - -interface BroadcastChannelEventMap { - "message": MessageEvent; - "messageerror": MessageEvent; -} - -interface BroadcastChannel extends EventTarget { - /** - * Returns the channel name (as passed to the constructor). - */ - readonly name: string; - onmessage: ((this: BroadcastChannel, ev: MessageEvent) => any) | null; - onmessageerror: ((this: BroadcastChannel, ev: MessageEvent) => any) | null; - /** - * Closes the BroadcastChannel object, opening it up to garbage collection. - */ - close(): void; - /** - * Sends the given message to other BroadcastChannel objects set up for - * this channel. Messages can be structured objects, e.g. nested objects - * and arrays. - */ - postMessage(message: any): void; - addEventListener<K extends keyof BroadcastChannelEventMap>( - type: K, - listener: (this: BroadcastChannel, ev: BroadcastChannelEventMap[K]) => any, - options?: boolean | AddEventListenerOptions, - ): void; - addEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | AddEventListenerOptions, - ): void; - removeEventListener<K extends keyof BroadcastChannelEventMap>( - type: K, - listener: (this: BroadcastChannel, ev: BroadcastChannelEventMap[K]) => any, - options?: boolean | EventListenerOptions, - ): void; - removeEventListener( - type: string, - listener: EventListenerOrEventListenerObject, - options?: boolean | EventListenerOptions, - ): void; -} - -declare var BroadcastChannel: { - prototype: BroadcastChannel; - new (name: string): BroadcastChannel; -}; diff --git a/extensions/broadcast_channel/lib.rs b/extensions/broadcast_channel/lib.rs deleted file mode 100644 index b2a79916c..000000000 --- a/extensions/broadcast_channel/lib.rs +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -mod in_memory_broadcast_channel; - -pub use in_memory_broadcast_channel::InMemoryBroadcastChannel; - -use async_trait::async_trait; -use deno_core::error::bad_resource_id; -use deno_core::error::AnyError; -use deno_core::include_js_files; -use deno_core::op_async; -use deno_core::op_sync; -use deno_core::Extension; -use deno_core::OpState; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::ZeroCopyBuf; -use std::cell::RefCell; -use std::path::PathBuf; -use std::rc::Rc; - -#[async_trait] -pub trait BroadcastChannel: Clone { - type Resource: Resource; - - fn subscribe(&self) -> Result<Self::Resource, AnyError>; - - fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>; - - async fn send( - &self, - resource: &Self::Resource, - name: String, - data: Vec<u8>, - ) -> Result<(), AnyError>; - - async fn recv( - &self, - resource: &Self::Resource, - ) -> Result<Option<Message>, AnyError>; -} - -pub type Message = (String, Vec<u8>); - -struct Unstable(bool); // --unstable - -pub fn op_broadcast_subscribe<BC: BroadcastChannel + 'static>( - state: &mut OpState, - _args: (), - _buf: (), -) -> Result<ResourceId, AnyError> { - let unstable = state.borrow::<Unstable>().0; - - if !unstable { - eprintln!( - "Unstable API 'BroadcastChannel'. The --unstable flag must be provided.", - ); - std::process::exit(70); - } - - let bc = state.borrow::<BC>(); - let resource = bc.subscribe()?; - Ok(state.resource_table.add(resource)) -} - -pub fn op_broadcast_unsubscribe<BC: BroadcastChannel + 'static>( - state: &mut OpState, - rid: ResourceId, - _buf: (), -) -> Result<(), AnyError> { - let resource = state - .resource_table - .get::<BC::Resource>(rid) - .ok_or_else(bad_resource_id)?; - let bc = state.borrow::<BC>(); - bc.unsubscribe(&resource) -} - -pub async fn op_broadcast_send<BC: BroadcastChannel + 'static>( - state: Rc<RefCell<OpState>>, - (rid, name): (ResourceId, String), - buf: ZeroCopyBuf, -) -> Result<(), AnyError> { - let resource = state - .borrow() - .resource_table - .get::<BC::Resource>(rid) - .ok_or_else(bad_resource_id)?; - let bc = state.borrow().borrow::<BC>().clone(); - bc.send(&resource, name, buf.to_vec()).await -} - -pub async fn op_broadcast_recv<BC: BroadcastChannel + 'static>( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - _buf: (), -) -> Result<Option<Message>, AnyError> { - let resource = state - .borrow() - .resource_table - .get::<BC::Resource>(rid) - .ok_or_else(bad_resource_id)?; - let bc = state.borrow().borrow::<BC>().clone(); - bc.recv(&resource).await -} - -pub fn init<BC: BroadcastChannel + 'static>( - bc: BC, - unstable: bool, -) -> Extension { - Extension::builder() - .js(include_js_files!( - prefix "deno:extensions/broadcast_channel", - "01_broadcast_channel.js", - )) - .ops(vec![ - ( - "op_broadcast_subscribe", - op_sync(op_broadcast_subscribe::<BC>), - ), - ( - "op_broadcast_unsubscribe", - op_sync(op_broadcast_unsubscribe::<BC>), - ), - ("op_broadcast_send", op_async(op_broadcast_send::<BC>)), - ("op_broadcast_recv", op_async(op_broadcast_recv::<BC>)), - ]) - .state(move |state| { - state.put(bc.clone()); - state.put(Unstable(unstable)); - Ok(()) - }) - .build() -} - -pub fn get_declaration() -> PathBuf { - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("lib.deno_broadcast_channel.d.ts") -} |
