summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--cli/dts/lib.deno.shared_globals.d.ts1
-rw-r--r--cli/main.rs2
-rw-r--r--cli/program_state.rs4
-rw-r--r--cli/standalone.rs3
-rw-r--r--cli/tests/unit/broadcast_channel_test.ts27
-rw-r--r--cli/tests/workers/broadcast_channel.ts5
-rw-r--r--extensions/broadcast_channel/01_broadcast_channel.js106
-rw-r--r--extensions/broadcast_channel/Cargo.toml2
-rw-r--r--extensions/broadcast_channel/in_memory_broadcast_channel.rs97
-rw-r--r--extensions/broadcast_channel/lib.rs172
-rw-r--r--extensions/web/02_event.js5
-rw-r--r--runtime/build.rs5
-rw-r--r--runtime/examples/hello_runtime.rs2
-rw-r--r--runtime/web_worker.rs8
-rw-r--r--runtime/worker.rs8
-rw-r--r--tools/wpt/expectation.json8
-rw-r--r--tools/wpt/runner.ts1
18 files changed, 345 insertions, 113 deletions
diff --git a/Cargo.lock b/Cargo.lock
index c46c6ac9f..884f890af 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -595,8 +595,10 @@ dependencies = [
name = "deno_broadcast_channel"
version = "0.1.0"
dependencies = [
+ "async-trait",
"deno_core",
"tokio",
+ "uuid",
]
[[package]]
diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts
index d3784705e..e35de2e77 100644
--- a/cli/dts/lib.deno.shared_globals.d.ts
+++ b/cli/dts/lib.deno.shared_globals.d.ts
@@ -12,6 +12,7 @@
/// <reference lib="deno.fetch" />
/// <reference lib="deno.websocket" />
/// <reference lib="deno.crypto" />
+/// <reference lib="deno.broadcast_channel" />
declare namespace WebAssembly {
/**
diff --git a/cli/main.rs b/cli/main.rs
index 868805e92..60c202a7a 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -124,6 +124,7 @@ fn create_web_worker_callback(
no_color: !colors::use_color(),
get_error_class_fn: Some(&crate::errors::get_error_class_name),
blob_url_store: program_state.blob_url_store.clone(),
+ broadcast_channel: program_state.broadcast_channel.clone(),
};
let mut worker = WebWorker::from_options(
@@ -212,6 +213,7 @@ pub fn create_main_worker(
.join(checksum::gen(&[loc.to_string().as_bytes()]))
}),
blob_url_store: program_state.blob_url_store.clone(),
+ broadcast_channel: program_state.broadcast_channel.clone(),
};
let mut worker = MainWorker::from_options(main_module, permissions, &options);
diff --git a/cli/program_state.rs b/cli/program_state.rs
index 50890b9e4..9f7ddc749 100644
--- a/cli/program_state.rs
+++ b/cli/program_state.rs
@@ -15,6 +15,7 @@ use crate::module_graph::TypeLib;
use crate::source_maps::SourceMapGetter;
use crate::specifier_handler::FetchHandler;
use crate::version;
+use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_file::BlobUrlStore;
use deno_runtime::inspector::InspectorServer;
use deno_runtime::permissions::Permissions;
@@ -52,6 +53,7 @@ pub struct ProgramState {
pub maybe_inspector_server: Option<Arc<InspectorServer>>,
pub ca_data: Option<Vec<u8>>,
pub blob_url_store: BlobUrlStore,
+ pub broadcast_channel: InMemoryBroadcastChannel,
}
impl ProgramState {
@@ -77,6 +79,7 @@ impl ProgramState {
};
let blob_url_store = BlobUrlStore::default();
+ let broadcast_channel = InMemoryBroadcastChannel::default();
let file_fetcher = FileFetcher::new(
http_cache,
@@ -143,6 +146,7 @@ impl ProgramState {
maybe_inspector_server,
ca_data,
blob_url_store,
+ broadcast_channel,
};
Ok(Arc::new(program_state))
}
diff --git a/cli/standalone.rs b/cli/standalone.rs
index e0b131eb8..f281c5336 100644
--- a/cli/standalone.rs
+++ b/cli/standalone.rs
@@ -15,6 +15,7 @@ use deno_core::v8_set_flags;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
+use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_file::BlobUrlStore;
use deno_runtime::permissions::Permissions;
use deno_runtime::permissions::PermissionsOptions;
@@ -160,6 +161,7 @@ pub async fn run(
let main_module = resolve_url(SPECIFIER)?;
let permissions = Permissions::from_options(&metadata.permissions);
let blob_url_store = BlobUrlStore::default();
+ let broadcast_channel = InMemoryBroadcastChannel::default();
let module_loader = Rc::new(EmbeddedModuleLoader(source_code));
let create_web_worker_cb = Arc::new(|_| {
todo!("Worker are currently not supported in standalone binaries");
@@ -193,6 +195,7 @@ pub async fn run(
location: metadata.location,
location_data_dir: None,
blob_url_store,
+ broadcast_channel,
};
let mut worker =
MainWorker::from_options(main_module.clone(), permissions, &options);
diff --git a/cli/tests/unit/broadcast_channel_test.ts b/cli/tests/unit/broadcast_channel_test.ts
new file mode 100644
index 000000000..cfa62c856
--- /dev/null
+++ b/cli/tests/unit/broadcast_channel_test.ts
@@ -0,0 +1,27 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+import { assertEquals } from "../../../test_util/std/testing/asserts.ts";
+import { deferred } from "../../../test_util/std/async/deferred.ts";
+
+Deno.test("broadcastchannel worker", async () => {
+ const intercom = new BroadcastChannel("intercom");
+ let count = 0;
+
+ const url = new URL("../workers/broadcast_channel.ts", import.meta.url);
+ const worker = new Worker(url.href, { type: "module", name: "worker" });
+ worker.onmessage = () => intercom.postMessage(++count);
+
+ const promise = deferred();
+
+ intercom.onmessage = function (e) {
+ assertEquals(count, e.data);
+ if (count < 42) {
+ intercom.postMessage(++count);
+ } else {
+ worker.terminate();
+ intercom.close();
+ promise.resolve();
+ }
+ };
+
+ await promise;
+});
diff --git a/cli/tests/workers/broadcast_channel.ts b/cli/tests/workers/broadcast_channel.ts
new file mode 100644
index 000000000..5076e9eb7
--- /dev/null
+++ b/cli/tests/workers/broadcast_channel.ts
@@ -0,0 +1,5 @@
+new BroadcastChannel("intercom").onmessage = function (e) {
+ this.postMessage(e.data);
+};
+
+self.postMessage("go");
diff --git a/extensions/broadcast_channel/01_broadcast_channel.js b/extensions/broadcast_channel/01_broadcast_channel.js
index 34f8b9e19..7670b0cfd 100644
--- a/extensions/broadcast_channel/01_broadcast_channel.js
+++ b/extensions/broadcast_channel/01_broadcast_channel.js
@@ -4,6 +4,7 @@
((window) => {
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
+ const { setTarget } = window.__bootstrap.event;
const handlerSymbol = Symbol("eventHandlers");
function makeWrappedHandler(handler) {
@@ -21,7 +22,10 @@
// HTML specification section 8.1.5.1
Object.defineProperty(emitter, `on${name}`, {
get() {
- return this[handlerSymbol]?.get(name)?.handler;
+ // TODO(bnoordhuis) The "BroadcastChannel should have an onmessage
+ // event" WPT test expects that .onmessage !== undefined. Returning
+ // null makes it pass but is perhaps not exactly in the spirit.
+ return this[handlerSymbol]?.get(name)?.handler ?? null;
},
set(value) {
if (!this[handlerSymbol]) {
@@ -43,12 +47,56 @@
const _name = Symbol("[[name]]");
const _closed = Symbol("[[closed]]");
- const _rid = Symbol("[[rid]]");
+
+ const channels = [];
+ let rid = null;
+
+ async function recv() {
+ while (channels.length > 0) {
+ const message = await core.opAsync("op_broadcast_recv", rid);
+
+ if (message === null) {
+ break;
+ }
+
+ const [name, data] = message;
+ dispatch(null, name, new Uint8Array(data));
+ }
+
+ core.close(rid);
+ rid = null;
+ }
+
+ function dispatch(source, name, data) {
+ for (const channel of channels) {
+ if (channel === source) continue; // Don't self-send.
+ if (channel[_name] !== name) continue;
+ if (channel[_closed]) continue;
+
+ const go = () => {
+ if (channel[_closed]) return;
+ const event = new MessageEvent("message", {
+ data: core.deserialize(data), // TODO(bnoordhuis) Cache immutables.
+ origin: "http://127.0.0.1",
+ });
+ setTarget(event, channel);
+ channel.dispatchEvent(event);
+ };
+
+ defer(go);
+ }
+ }
+
+ // Defer to avoid starving the event loop. Not using queueMicrotask()
+ // for that reason: it lets promises make forward progress but can
+ // still starve other parts of the event loop.
+ function defer(go) {
+ setTimeout(go, 1);
+ }
class BroadcastChannel extends EventTarget {
[_name];
[_closed] = false;
- [_rid];
get name() {
return this[_name];
@@ -57,8 +105,6 @@
constructor(name) {
super();
- window.location;
-
const prefix = "Failed to construct 'broadcastChannel'";
webidl.requiredArguments(arguments.length, 1, { prefix });
@@ -67,46 +113,50 @@
context: "Argument 1",
});
- this[_rid] = core.opSync("op_broadcast_open", this[_name]);
-
this[webidl.brand] = webidl.brand;
- this.#eventLoop();
+ channels.push(this);
+
+ if (rid === null) {
+ // Create the rid immediately, otherwise there is a time window (and a
+ // race condition) where messages can get lost, because recv() is async.
+ rid = core.opSync("op_broadcast_subscribe");
+ recv();
+ }
}
postMessage(message) {
webidl.assertBranded(this, BroadcastChannel);
+ const prefix = "Failed to execute 'postMessage' on 'BroadcastChannel'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+
if (this[_closed]) {
throw new DOMException("Already closed", "InvalidStateError");
}
- core.opAsync("op_broadcast_send", this[_rid], core.serialize(message));
+ if (typeof message === "function" || typeof message === "symbol") {
+ throw new DOMException("Uncloneable value", "DataCloneError");
+ }
+
+ const data = core.serialize(message);
+
+ // Send to other listeners in this VM.
+ dispatch(this, this[_name], new Uint8Array(data));
+
+ // Send to listeners in other VMs.
+ defer(() => core.opAsync("op_broadcast_send", [rid, this[_name]], data));
}
close() {
webidl.assertBranded(this, BroadcastChannel);
-
this[_closed] = true;
- core.close(this[_rid]);
- }
- async #eventLoop() {
- while (!this[_closed]) {
- const message = await core.opAsync(
- "op_broadcast_next_event",
- this[_rid],
- );
-
- if (message.length !== 0) {
- const event = new MessageEvent("message", {
- data: core.deserialize(message),
- origin: window.location,
- });
- event.target = this;
- this.dispatchEvent(event);
- }
- }
+ const index = channels.indexOf(this);
+ if (index === -1) return;
+
+ channels.splice(index, 1);
+ if (channels.length === 0) core.opSync("op_broadcast_unsubscribe", rid);
}
}
diff --git a/extensions/broadcast_channel/Cargo.toml b/extensions/broadcast_channel/Cargo.toml
index 72c29f651..7bc65f3a0 100644
--- a/extensions/broadcast_channel/Cargo.toml
+++ b/extensions/broadcast_channel/Cargo.toml
@@ -14,5 +14,7 @@ repository = "https://github.com/denoland/deno"
path = "lib.rs"
[dependencies]
+async-trait = "0.1"
deno_core = { version = "0.88.0", path = "../../core" }
tokio = { version = "1.4.0", features = ["full"] }
+uuid = { version = "0.8.2", features = ["v4"] }
diff --git a/extensions/broadcast_channel/in_memory_broadcast_channel.rs b/extensions/broadcast_channel/in_memory_broadcast_channel.rs
new file mode 100644
index 000000000..34498c830
--- /dev/null
+++ b/extensions/broadcast_channel/in_memory_broadcast_channel.rs
@@ -0,0 +1,97 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::BroadcastChannel;
+use async_trait::async_trait;
+use deno_core::error::AnyError;
+use std::sync::Arc;
+use std::sync::Mutex;
+use tokio::sync::broadcast;
+use tokio::sync::mpsc;
+use uuid::Uuid;
+
+#[derive(Clone)]
+pub struct InMemoryBroadcastChannel(Arc<Mutex<broadcast::Sender<Message>>>);
+
+pub struct InMemoryBroadcastChannelResource {
+ rx: tokio::sync::Mutex<(
+ broadcast::Receiver<Message>,
+ mpsc::UnboundedReceiver<()>,
+ )>,
+ cancel_tx: mpsc::UnboundedSender<()>,
+ uuid: Uuid,
+}
+
+#[derive(Clone, Debug)]
+struct Message {
+ name: Arc<String>,
+ data: Arc<Vec<u8>>,
+ uuid: Uuid,
+}
+
+impl Default for InMemoryBroadcastChannel {
+ fn default() -> Self {
+ let (tx, _) = broadcast::channel(256);
+ Self(Arc::new(Mutex::new(tx)))
+ }
+}
+
+#[async_trait]
+impl BroadcastChannel for InMemoryBroadcastChannel {
+ type Resource = InMemoryBroadcastChannelResource;
+
+ fn subscribe(&self) -> Result<Self::Resource, AnyError> {
+ let (cancel_tx, cancel_rx) = mpsc::unbounded_channel();
+ let broadcast_rx = self.0.lock().unwrap().subscribe();
+ let rx = tokio::sync::Mutex::new((broadcast_rx, cancel_rx));
+ let uuid = Uuid::new_v4();
+ Ok(Self::Resource {
+ rx,
+ cancel_tx,
+ uuid,
+ })
+ }
+
+ fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError> {
+ Ok(resource.cancel_tx.send(())?)
+ }
+
+ async fn send(
+ &self,
+ resource: &Self::Resource,
+ name: String,
+ data: Vec<u8>,
+ ) -> Result<(), AnyError> {
+ let name = Arc::new(name);
+ let data = Arc::new(data);
+ let uuid = resource.uuid;
+ self.0.lock().unwrap().send(Message { name, data, uuid })?;
+ Ok(())
+ }
+
+ async fn recv(
+ &self,
+ resource: &Self::Resource,
+ ) -> Result<Option<crate::Message>, AnyError> {
+ let mut g = resource.rx.lock().await;
+ let (broadcast_rx, cancel_rx) = &mut *g;
+ loop {
+ let result = tokio::select! {
+ r = broadcast_rx.recv() => r,
+ _ = cancel_rx.recv() => return Ok(None),
+ };
+ use tokio::sync::broadcast::error::RecvError::*;
+ match result {
+ Err(Closed) => return Ok(None),
+ Err(Lagged(_)) => (), // Backlogged, messages dropped.
+ Ok(message) if message.uuid == resource.uuid => (), // Self-send.
+ Ok(message) => {
+ let name = String::clone(&message.name);
+ let data = Vec::clone(&message.data);
+ return Ok(Some((name, data)));
+ }
+ }
+ }
+ }
+}
+
+impl deno_core::Resource for InMemoryBroadcastChannelResource {}
diff --git a/extensions/broadcast_channel/lib.rs b/extensions/broadcast_channel/lib.rs
index cee9c3e0c..b2a79916c 100644
--- a/extensions/broadcast_channel/lib.rs
+++ b/extensions/broadcast_channel/lib.rs
@@ -1,127 +1,135 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+mod in_memory_broadcast_channel;
+
+pub use in_memory_broadcast_channel::InMemoryBroadcastChannel;
+
+use async_trait::async_trait;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::include_js_files;
use deno_core::op_async;
use deno_core::op_sync;
-use deno_core::AsyncRefCell;
use deno_core::Extension;
use deno_core::OpState;
-use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
-use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::rc::Rc;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWriteExt;
-struct BroadcastChannelResource(AsyncRefCell<tokio::fs::File>);
+#[async_trait]
+pub trait BroadcastChannel: Clone {
+ type Resource: Resource;
-impl Resource for BroadcastChannelResource {
- fn name(&self) -> Cow<str> {
- "broadcastChannel".into()
- }
+ fn subscribe(&self) -> Result<Self::Resource, AnyError>;
+
+ fn unsubscribe(&self, resource: &Self::Resource) -> Result<(), AnyError>;
+
+ async fn send(
+ &self,
+ resource: &Self::Resource,
+ name: String,
+ data: Vec<u8>,
+ ) -> Result<(), AnyError>;
+
+ async fn recv(
+ &self,
+ resource: &Self::Resource,
+ ) -> Result<Option<Message>, AnyError>;
}
-pub fn op_broadcast_open(
+pub type Message = (String, Vec<u8>);
+
+struct Unstable(bool); // --unstable
+
+pub fn op_broadcast_subscribe<BC: BroadcastChannel + 'static>(
state: &mut OpState,
- name: String,
- _bufs: Option<ZeroCopyBuf>,
+ _args: (),
+ _buf: (),
) -> Result<ResourceId, AnyError> {
- let path = PathBuf::from("./");
- std::fs::create_dir_all(&path)?;
- let file = std::fs::OpenOptions::new()
- .create(true)
- .append(true)
- .read(true)
- .open(path.join(format!("broadcast_{}", name)))?;
-
- let rid =
- state
- .resource_table
- .add(BroadcastChannelResource(AsyncRefCell::new(
- tokio::fs::File::from_std(file),
- )));
-
- Ok(rid)
+ let unstable = state.borrow::<Unstable>().0;
+
+ if !unstable {
+ eprintln!(
+ "Unstable API 'BroadcastChannel'. The --unstable flag must be provided.",
+ );
+ std::process::exit(70);
+ }
+
+ let bc = state.borrow::<BC>();
+ let resource = bc.subscribe()?;
+ Ok(state.resource_table.add(resource))
}
-pub async fn op_broadcast_send(
- state: Rc<RefCell<OpState>>,
+pub fn op_broadcast_unsubscribe<BC: BroadcastChannel + 'static>(
+ state: &mut OpState,
rid: ResourceId,
- buf: Option<ZeroCopyBuf>,
+ _buf: (),
) -> Result<(), AnyError> {
- let state = state.borrow_mut();
let resource = state
.resource_table
- .get::<BroadcastChannelResource>(rid)
+ .get::<BC::Resource>(rid)
.ok_or_else(bad_resource_id)?;
+ let bc = state.borrow::<BC>();
+ bc.unsubscribe(&resource)
+}
- let mut file = RcRef::map(&resource, |r| &r.0).borrow_mut().await;
-
- let buffer_data = buf.unwrap();
- let mut data = vec![];
- data.extend_from_slice(&(buffer_data.len() as u64).to_ne_bytes());
- data.extend_from_slice(&buffer_data);
-
- file.write_all(&data).await?;
-
- Ok(())
+pub async fn op_broadcast_send<BC: BroadcastChannel + 'static>(
+ state: Rc<RefCell<OpState>>,
+ (rid, name): (ResourceId, String),
+ buf: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let bc = state.borrow().borrow::<BC>().clone();
+ bc.send(&resource, name, buf.to_vec()).await
}
-pub async fn op_broadcast_next_event(
+pub async fn op_broadcast_recv<BC: BroadcastChannel + 'static>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
- _bufs: Option<ZeroCopyBuf>,
-) -> Result<Vec<u8>, AnyError> {
- let resource = {
- let state = state.borrow_mut();
- state
- .resource_table
- .get::<BroadcastChannelResource>(rid)
- .ok_or_else(bad_resource_id)?
- };
-
- let mut file = RcRef::map(&resource, |r| &r.0).borrow_mut().await;
-
- let size = match file.read_u64().await {
- Ok(s) => s,
- Err(e) => {
- return match e.kind() {
- deno_core::futures::io::ErrorKind::UnexpectedEof => Ok(vec![]),
- _ => Err(e.into()),
- }
- }
- };
- let mut data = vec![0u8; size as usize];
- match file.read_exact(&mut data).await {
- Ok(s) => s,
- Err(e) => {
- return match e.kind() {
- deno_core::futures::io::ErrorKind::UnexpectedEof => Ok(vec![]),
- _ => Err(e.into()),
- }
- }
- };
-
- Ok(data)
+ _buf: (),
+) -> Result<Option<Message>, AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<BC::Resource>(rid)
+ .ok_or_else(bad_resource_id)?;
+ let bc = state.borrow().borrow::<BC>().clone();
+ bc.recv(&resource).await
}
-pub fn init() -> Extension {
+pub fn init<BC: BroadcastChannel + 'static>(
+ bc: BC,
+ unstable: bool,
+) -> Extension {
Extension::builder()
.js(include_js_files!(
prefix "deno:extensions/broadcast_channel",
"01_broadcast_channel.js",
))
.ops(vec![
- ("op_broadcast_open", op_sync(op_broadcast_open)),
- ("op_broadcast_send", op_async(op_broadcast_send)),
- ("op_broadcast_next_event", op_async(op_broadcast_next_event)),
+ (
+ "op_broadcast_subscribe",
+ op_sync(op_broadcast_subscribe::<BC>),
+ ),
+ (
+ "op_broadcast_unsubscribe",
+ op_sync(op_broadcast_unsubscribe::<BC>),
+ ),
+ ("op_broadcast_send", op_async(op_broadcast_send::<BC>)),
+ ("op_broadcast_recv", op_async(op_broadcast_recv::<BC>)),
])
+ .state(move |state| {
+ state.put(bc.clone());
+ state.put(Unstable(unstable));
+ Ok(())
+ })
.build()
}
diff --git a/extensions/web/02_event.js b/extensions/web/02_event.js
index b6b5609bc..8ee6acc61 100644
--- a/extensions/web/02_event.js
+++ b/extensions/web/02_event.js
@@ -1117,6 +1117,10 @@
}
class MessageEvent extends Event {
+ get source() {
+ return null;
+ }
+
constructor(type, eventInitDict) {
super(type, {
bubbles: eventInitDict?.bubbles ?? false,
@@ -1208,5 +1212,6 @@
};
window.__bootstrap.event = {
setIsTrusted,
+ setTarget,
};
})(this);
diff --git a/runtime/build.rs b/runtime/build.rs
index 4fe89af3e..d228fffd6 100644
--- a/runtime/build.rs
+++ b/runtime/build.rs
@@ -52,7 +52,10 @@ fn create_runtime_snapshot(snapshot_path: &Path, files: Vec<PathBuf>) {
deno_crypto::init(None),
deno_webgpu::init(false),
deno_timers::init::<deno_timers::NoTimersPermission>(),
- deno_broadcast_channel::init(),
+ deno_broadcast_channel::init(
+ deno_broadcast_channel::InMemoryBroadcastChannel::default(),
+ false, // No --unstable.
+ ),
];
let js_runtime = JsRuntime::new(RuntimeOptions {
diff --git a/runtime/examples/hello_runtime.rs b/runtime/examples/hello_runtime.rs
index 80a258c17..e8abaffb8 100644
--- a/runtime/examples/hello_runtime.rs
+++ b/runtime/examples/hello_runtime.rs
@@ -2,6 +2,7 @@
use deno_core::error::AnyError;
use deno_core::FsModuleLoader;
+use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_file::BlobUrlStore;
use deno_runtime::permissions::Permissions;
use deno_runtime::worker::MainWorker;
@@ -42,6 +43,7 @@ async fn main() -> Result<(), AnyError> {
location: None,
location_data_dir: None,
blob_url_store: BlobUrlStore::default(),
+ broadcast_channel: InMemoryBroadcastChannel::default(),
};
let js_path =
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 172d24dea..c2356651e 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -7,6 +7,7 @@ use crate::metrics;
use crate::ops;
use crate::permissions::Permissions;
use crate::tokio_util::create_basic_runtime;
+use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError;
use deno_core::error::Context as ErrorContext;
use deno_core::futures::channel::mpsc;
@@ -230,6 +231,7 @@ pub struct WebWorkerOptions {
pub no_color: bool,
pub get_error_class_fn: Option<GetErrorClassFn>,
pub blob_url_store: BlobUrlStore,
+ pub broadcast_channel: InMemoryBroadcastChannel,
}
impl WebWorker {
@@ -268,7 +270,10 @@ impl WebWorker {
options.user_agent.clone(),
options.ca_data.clone(),
),
- deno_broadcast_channel::init(),
+ deno_broadcast_channel::init(
+ options.broadcast_channel.clone(),
+ options.unstable,
+ ),
deno_crypto::init(options.seed),
deno_webgpu::init(options.unstable),
deno_timers::init::<Permissions>(),
@@ -567,6 +572,7 @@ mod tests {
no_color: true,
get_error_class_fn: None,
blob_url_store: BlobUrlStore::default(),
+ broadcast_channel: InMemoryBroadcastChannel::default(),
};
let mut worker = WebWorker::from_options(
diff --git a/runtime/worker.rs b/runtime/worker.rs
index b41f0291c..9ffd0b5ab 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -7,6 +7,7 @@ use crate::js;
use crate::metrics;
use crate::ops;
use crate::permissions::Permissions;
+use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError;
use deno_core::error::Context as ErrorContext;
use deno_core::futures::future::poll_fn;
@@ -71,6 +72,7 @@ pub struct WorkerOptions {
pub location: Option<Url>,
pub location_data_dir: Option<std::path::PathBuf>,
pub blob_url_store: BlobUrlStore,
+ pub broadcast_channel: InMemoryBroadcastChannel,
}
impl MainWorker {
@@ -107,7 +109,10 @@ impl MainWorker {
),
deno_webstorage::init(options.location_data_dir.clone()),
deno_crypto::init(options.seed),
- deno_broadcast_channel::init(),
+ deno_broadcast_channel::init(
+ options.broadcast_channel.clone(),
+ options.unstable,
+ ),
deno_webgpu::init(options.unstable),
deno_timers::init::<Permissions>(),
// Metrics
@@ -296,6 +301,7 @@ mod tests {
location: None,
location_data_dir: None,
blob_url_store: BlobUrlStore::default(),
+ broadcast_channel: InMemoryBroadcastChannel::default(),
};
MainWorker::from_options(main_module, permissions, &options)
diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json
index 4d6016104..b468cc69d 100644
--- a/tools/wpt/expectation.json
+++ b/tools/wpt/expectation.json
@@ -1098,6 +1098,14 @@
"sessionStorage: defineProperty not configurable"
]
},
+ "webmessaging": {
+ "broadcastchannel": {
+ "basics.any.html": [
+ "postMessage results in correct event"
+ ],
+ "interface.any.html": true
+ }
+ },
"xhr": {
"formdata": {
"append.any.html": true,
diff --git a/tools/wpt/runner.ts b/tools/wpt/runner.ts
index eae53094d..a0941b521 100644
--- a/tools/wpt/runner.ts
+++ b/tools/wpt/runner.ts
@@ -76,6 +76,7 @@ export async function runSingleTest(
join(ROOT_PATH, `./target/${release ? "release" : "debug"}/deno`),
"run",
"-A",
+ "--unstable",
"--location",
url.toString(),
"--cert",