summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2021-07-06 19:42:52 +0200
committerGitHub <noreply@github.com>2021-07-06 19:42:52 +0200
commitbdfad23dd012d0c3226b466544e86109da18d09c (patch)
treef59bfc463f29047df80c48abbe299e55a16b8622
parent672a88f2727286233ab904e23a9145aa5563a834 (diff)
feat: support SharedArrayBuffer sharing between workers (#11040)
This commit adds support for sharing SABs between workers.
-rw-r--r--cli/main.rs6
-rw-r--r--cli/program_state.rs4
-rw-r--r--cli/standalone.rs1
-rw-r--r--cli/tests/workers/shared_array_buffer.ts9
-rw-r--r--cli/tests/workers/test.ts28
-rw-r--r--core/bindings.rs36
-rw-r--r--core/lib.rs1
-rw-r--r--core/runtime.rs40
-rw-r--r--main.js14
-rw-r--r--runtime/examples/hello_runtime.rs1
-rw-r--r--runtime/web_worker.rs3
-rw-r--r--runtime/worker.rs4
-rw-r--r--worker.js5
13 files changed, 152 insertions, 0 deletions
diff --git a/cli/main.rs b/cli/main.rs
index a61f94530..eb7b4d333 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -123,6 +123,9 @@ fn create_web_worker_callback(
get_error_class_fn: Some(&crate::errors::get_error_class_name),
blob_store: program_state.blob_store.clone(),
broadcast_channel: program_state.broadcast_channel.clone(),
+ shared_array_buffer_store: Some(
+ program_state.shared_array_buffer_store.clone(),
+ ),
};
let (mut worker, external_handle) = WebWorker::from_options(
@@ -209,6 +212,9 @@ pub fn create_main_worker(
}),
blob_store: program_state.blob_store.clone(),
broadcast_channel: program_state.broadcast_channel.clone(),
+ shared_array_buffer_store: Some(
+ program_state.shared_array_buffer_store.clone(),
+ ),
};
let mut worker = MainWorker::from_options(main_module, permissions, &options);
diff --git a/cli/program_state.rs b/cli/program_state.rs
index becc8faa3..5bb60906e 100644
--- a/cli/program_state.rs
+++ b/cli/program_state.rs
@@ -15,6 +15,7 @@ use crate::module_graph::TypeLib;
use crate::source_maps::SourceMapGetter;
use crate::specifier_handler::FetchHandler;
use crate::version;
+use deno_core::SharedArrayBufferStore;
use deno_runtime::deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_runtime::deno_web::BlobStore;
use deno_runtime::inspector_server::InspectorServer;
@@ -55,6 +56,7 @@ pub struct ProgramState {
pub ca_data: Option<Vec<u8>>,
pub blob_store: BlobStore,
pub broadcast_channel: InMemoryBroadcastChannel,
+ pub shared_array_buffer_store: SharedArrayBufferStore,
}
impl ProgramState {
@@ -81,6 +83,7 @@ impl ProgramState {
let blob_store = BlobStore::default();
let broadcast_channel = InMemoryBroadcastChannel::default();
+ let shared_array_buffer_store = SharedArrayBufferStore::default();
let file_fetcher = FileFetcher::new(
http_cache,
@@ -148,6 +151,7 @@ impl ProgramState {
ca_data,
blob_store,
broadcast_channel,
+ shared_array_buffer_store,
};
Ok(Arc::new(program_state))
}
diff --git a/cli/standalone.rs b/cli/standalone.rs
index 74e5de1ca..c8918563f 100644
--- a/cli/standalone.rs
+++ b/cli/standalone.rs
@@ -248,6 +248,7 @@ pub async fn run(
origin_storage_dir: None,
blob_store,
broadcast_channel,
+ shared_array_buffer_store: None,
};
let mut worker =
MainWorker::from_options(main_module.clone(), permissions, &options);
diff --git a/cli/tests/workers/shared_array_buffer.ts b/cli/tests/workers/shared_array_buffer.ts
new file mode 100644
index 000000000..4af95863a
--- /dev/null
+++ b/cli/tests/workers/shared_array_buffer.ts
@@ -0,0 +1,9 @@
+self.postMessage("ready");
+
+globalThis.addEventListener("message", (e) => {
+ const bytes1 = new Uint8Array(e.data[0]);
+ const bytes2 = new Uint8Array(e.data[1]);
+ bytes1[0] = 1;
+ bytes2[0] = 2;
+ self.postMessage("done");
+});
diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts
index b37b7aeb1..d35dbec82 100644
--- a/cli/tests/workers/test.ts
+++ b/cli/tests/workers/test.ts
@@ -790,6 +790,34 @@ Deno.test({
});
Deno.test({
+ name: "worker SharedArrayBuffer",
+ fn: async function (): Promise<void> {
+ const promise = deferred();
+ const workerOptions: WorkerOptions = { type: "module" };
+ const w = new Worker(
+ new URL("shared_array_buffer.ts", import.meta.url).href,
+ workerOptions,
+ );
+ const sab1 = new SharedArrayBuffer(1);
+ const sab2 = new SharedArrayBuffer(1);
+ const bytes1 = new Uint8Array(sab1);
+ const bytes2 = new Uint8Array(sab2);
+ assertEquals(bytes1[0], 0);
+ assertEquals(bytes2[0], 0);
+ w.onmessage = (): void => {
+ w.postMessage([sab1, sab2]);
+ w.onmessage = (): void => {
+ assertEquals(bytes1[0], 1);
+ assertEquals(bytes2[0], 2);
+ promise.resolve();
+ };
+ };
+ await promise;
+ w.terminate();
+ },
+});
+
+Deno.test({
name: "Send MessagePorts from / to workers",
fn: async function (): Promise<void> {
const result = deferred();
diff --git a/core/bindings.rs b/core/bindings.rs
index 143ccda9b..d8337322d 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -22,7 +22,10 @@ use std::convert::TryInto;
use std::option::Option;
use std::rc::Rc;
use url::Url;
+use v8::HandleScope;
+use v8::Local;
use v8::MapFnTo;
+use v8::SharedArrayBuffer;
lazy_static::lazy_static! {
pub static ref EXTERNAL_REFERENCES: v8::ExternalReferences =
@@ -713,6 +716,22 @@ impl<'a> v8::ValueSerializerImpl for SerializeDeserialize<'a> {
scope.throw_exception(error);
}
+ fn get_shared_array_buffer_id<'s>(
+ &mut self,
+ scope: &mut HandleScope<'s>,
+ shared_array_buffer: Local<'s, SharedArrayBuffer>,
+ ) -> Option<u32> {
+ let state_rc = JsRuntime::state(scope);
+ let state = state_rc.borrow_mut();
+ if let Some(shared_array_buffer_store) = &state.shared_array_buffer_store {
+ let backing_store = shared_array_buffer.get_backing_store();
+ let id = shared_array_buffer_store.insert(backing_store);
+ Some(id)
+ } else {
+ None
+ }
+ }
+
fn write_host_object<'s>(
&mut self,
scope: &mut v8::HandleScope<'s>,
@@ -735,6 +754,23 @@ impl<'a> v8::ValueSerializerImpl for SerializeDeserialize<'a> {
}
impl<'a> v8::ValueDeserializerImpl for SerializeDeserialize<'a> {
+ fn get_shared_array_buffer_from_id<'s>(
+ &mut self,
+ scope: &mut HandleScope<'s>,
+ transfer_id: u32,
+ ) -> Option<Local<'s, SharedArrayBuffer>> {
+ let state_rc = JsRuntime::state(scope);
+ let state = state_rc.borrow_mut();
+ if let Some(shared_array_buffer_store) = &state.shared_array_buffer_store {
+ let backing_store = shared_array_buffer_store.take(transfer_id)?;
+ let shared_array_buffer =
+ v8::SharedArrayBuffer::with_backing_store(scope, &backing_store);
+ Some(shared_array_buffer)
+ } else {
+ None
+ }
+ }
+
fn read_host_object<'s>(
&mut self,
scope: &mut v8::HandleScope<'s>,
diff --git a/core/lib.rs b/core/lib.rs
index 4a9a213f4..8c8861c79 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -57,6 +57,7 @@ pub use crate::modules::ModuleLoader;
pub use crate::modules::ModuleSource;
pub use crate::modules::ModuleSourceFuture;
pub use crate::modules::NoopModuleLoader;
+pub use crate::runtime::SharedArrayBufferStore;
// TODO(bartlomieju): this struct should be implementation
// detail nad not be public
pub use crate::modules::RecursiveModuleLoad;
diff --git a/core/runtime.rs b/core/runtime.rs
index 48003c811..cf43c2adc 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -38,6 +38,8 @@ use std::mem::forget;
use std::option::Option;
use std::pin::Pin;
use std::rc::Rc;
+use std::sync::Arc;
+use std::sync::Mutex;
use std::sync::Once;
use std::task::Context;
use std::task::Poll;
@@ -97,6 +99,36 @@ struct ModEvaluate {
sender: mpsc::Sender<Result<(), AnyError>>,
}
+#[derive(Default, Clone)]
+pub struct SharedArrayBufferStore(Arc<Mutex<SharedArrayBufferStoreInner>>);
+
+#[derive(Default)]
+pub struct SharedArrayBufferStoreInner {
+ buffers: HashMap<u32, v8::SharedRef<v8::BackingStore>>,
+ last_id: u32,
+}
+
+impl SharedArrayBufferStore {
+ pub(crate) fn insert(
+ &self,
+ backing_store: v8::SharedRef<v8::BackingStore>,
+ ) -> u32 {
+ let mut buffers = self.0.lock().unwrap();
+ let last_id = buffers.last_id;
+ buffers.buffers.insert(last_id, backing_store);
+ buffers.last_id += 1;
+ last_id
+ }
+
+ pub(crate) fn take(
+ &self,
+ id: u32,
+ ) -> Option<v8::SharedRef<v8::BackingStore>> {
+ let mut buffers = self.0.lock().unwrap();
+ buffers.buffers.remove(&id)
+ }
+}
+
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@@ -116,6 +148,7 @@ pub(crate) struct JsRuntimeState {
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
+ pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
waker: AtomicWaker,
}
@@ -204,6 +237,12 @@ pub struct RuntimeOptions {
/// V8 platform instance to use. Used when Deno initializes V8
/// (which it only does once), otherwise it's silenty dropped.
pub v8_platform: Option<v8::SharedRef<v8::Platform>>,
+
+ /// The buffer to use for transferring SharedArrayBuffers between isolates.
+ /// If multiple isolates should have the possibility of sharing
+ /// SharedArrayBuffers, they should use the same SharedArrayBufferStore. If no
+ /// SharedArrayBufferStore is specified, SharedArrayBuffer can not be serialized.
+ pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
}
impl JsRuntime {
@@ -294,6 +333,7 @@ impl JsRuntime {
js_error_create_fn,
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
+ shared_array_buffer_store: options.shared_array_buffer_store,
op_state: op_state.clone(),
have_unpolled_ops: false,
waker: AtomicWaker::new(),
diff --git a/main.js b/main.js
new file mode 100644
index 000000000..feb6c5a1e
--- /dev/null
+++ b/main.js
@@ -0,0 +1,14 @@
+const worker = new Worker(new URL("./worker.js", import.meta.url), {
+ type: "module",
+});
+
+const sab = new SharedArrayBuffer(1);
+console.log(new Uint8Array(sab));
+
+setInterval(() => {
+ console.log(new Uint8Array(sab));
+}, 100);
+
+worker.onmessage = () => {
+ worker.postMessage(sab);
+};
diff --git a/runtime/examples/hello_runtime.rs b/runtime/examples/hello_runtime.rs
index 047b8991b..707809037 100644
--- a/runtime/examples/hello_runtime.rs
+++ b/runtime/examples/hello_runtime.rs
@@ -43,6 +43,7 @@ async fn main() -> Result<(), AnyError> {
origin_storage_dir: None,
blob_store: BlobStore::default(),
broadcast_channel: InMemoryBroadcastChannel::default(),
+ shared_array_buffer_store: None,
};
let js_path =
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 55e107002..f8aadf4c2 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -28,6 +28,7 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
+use deno_core::SharedArrayBufferStore;
use deno_web::create_entangled_message_port;
use deno_web::BlobStore;
use deno_web::MessagePort;
@@ -269,6 +270,7 @@ pub struct WebWorkerOptions {
pub get_error_class_fn: Option<GetErrorClassFn>,
pub blob_store: BlobStore,
pub broadcast_channel: InMemoryBroadcastChannel,
+ pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
}
impl WebWorker {
@@ -351,6 +353,7 @@ impl WebWorker {
startup_snapshot: Some(js::deno_isolate_init()),
js_error_create_fn: options.js_error_create_fn.clone(),
get_error_class_fn: options.get_error_class_fn,
+ shared_array_buffer_store: options.shared_array_buffer_store.clone(),
extensions,
..Default::default()
});
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 555fc89d2..04c294146 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -22,6 +22,7 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
+use deno_core::SharedArrayBufferStore;
use deno_web::BlobStore;
use log::debug;
use std::env;
@@ -70,6 +71,7 @@ pub struct WorkerOptions {
pub origin_storage_dir: Option<std::path::PathBuf>,
pub blob_store: BlobStore,
pub broadcast_channel: InMemoryBroadcastChannel,
+ pub shared_array_buffer_store: Option<SharedArrayBufferStore>,
}
impl MainWorker {
@@ -136,6 +138,7 @@ impl MainWorker {
startup_snapshot: Some(js::deno_isolate_init()),
js_error_create_fn: options.js_error_create_fn.clone(),
get_error_class_fn: options.get_error_class_fn,
+ shared_array_buffer_store: options.shared_array_buffer_store.clone(),
extensions,
..Default::default()
});
@@ -300,6 +303,7 @@ mod tests {
origin_storage_dir: None,
blob_store: BlobStore::default(),
broadcast_channel: InMemoryBroadcastChannel::default(),
+ shared_array_buffer_store: None,
};
MainWorker::from_options(main_module, permissions, &options)
diff --git a/worker.js b/worker.js
new file mode 100644
index 000000000..c1b860204
--- /dev/null
+++ b/worker.js
@@ -0,0 +1,5 @@
+self.postMessage("ready");
+
+globalThis.addEventListener("message", (e) => {
+ new Uint8Array(e.data)[0] = 1;
+});