summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2021-06-22 16:30:16 +0200
committerGitHub <noreply@github.com>2021-06-22 16:30:16 +0200
commit6261c89e04b8f1a3aabc771dbc8cddad904710e9 (patch)
tree6b1814991d77b8f238e507aa2f52e93589f3c0a1
parent0a2ced57285aa0ee4b47426382c32fb53c4e07cd (diff)
feat: transfer MessagePort between workers (#11076)
Add support for transferring `MessagePort`s between workers.
-rw-r--r--cli/dts/lib.deno.shared_globals.d.ts2
-rw-r--r--cli/dts/lib.deno.worker.d.ts6
-rw-r--r--cli/main.rs4
-rw-r--r--cli/tests/workers/message_port.ts14
-rw-r--r--cli/tests/workers/nonexistent_worker.out2
-rw-r--r--cli/tests/workers/permissions_blob_local.ts.out2
-rw-r--r--cli/tests/workers/permissions_blob_remote.ts.out2
-rw-r--r--cli/tests/workers/permissions_data_local.ts.out2
-rw-r--r--cli/tests/workers/permissions_data_remote.ts.out2
-rw-r--r--cli/tests/workers/permissions_dynamic_remote.ts.out2
-rw-r--r--cli/tests/workers/permissions_remote_remote.ts.out2
-rw-r--r--cli/tests/workers/test.ts53
-rw-r--r--cli/tests/workers/worker_error.ts.out2
-rw-r--r--cli/tests/workers/worker_nested_error.ts.out2
-rw-r--r--extensions/web/lib.rs2
-rw-r--r--extensions/web/message_port.rs19
-rw-r--r--runtime/js/11_workers.js91
-rw-r--r--runtime/js/99_main.js40
-rw-r--r--runtime/ops/web_worker.rs36
-rw-r--r--runtime/ops/worker_host.rs139
-rw-r--r--runtime/web_worker.rs295
21 files changed, 373 insertions, 346 deletions
diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts
index 46154c64e..be35fae01 100644
--- a/cli/dts/lib.deno.shared_globals.d.ts
+++ b/cli/dts/lib.deno.shared_globals.d.ts
@@ -386,7 +386,7 @@ declare class Worker extends EventTarget {
specifier: string | URL,
options?: WorkerOptions,
);
- postMessage(message: any, transfer: ArrayBuffer[]): void;
+ postMessage(message: any, transfer: Transferable[]): void;
postMessage(message: any, options?: PostMessageOptions): void;
addEventListener<K extends keyof WorkerEventMap>(
type: K,
diff --git a/cli/dts/lib.deno.worker.d.ts b/cli/dts/lib.deno.worker.d.ts
index eb8f6ebf1..7d8f6078b 100644
--- a/cli/dts/lib.deno.worker.d.ts
+++ b/cli/dts/lib.deno.worker.d.ts
@@ -68,7 +68,8 @@ declare class DedicatedWorkerGlobalScope extends WorkerGlobalScope {
| ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any)
| null;
close(): void;
- postMessage(message: any): void;
+ postMessage(message: any, transfer: Transferable[]): void;
+ postMessage(message: any, options?: PostMessageOptions): void;
addEventListener<K extends keyof DedicatedWorkerGlobalScopeEventMap>(
type: K,
listener: (
@@ -105,7 +106,8 @@ declare var onmessageerror:
| ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any)
| null;
declare function close(): void;
-declare function postMessage(message: any): void;
+declare function postMessage(message: any, transfer: Transferable[]): void;
+declare function postMessage(message: any, options?: PostMessageOptions): void;
declare var navigator: WorkerNavigator;
declare var onerror:
| ((this: DedicatedWorkerGlobalScope, ev: ErrorEvent) => any)
diff --git a/cli/main.rs b/cli/main.rs
index 2586e9b60..29151f14b 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -125,7 +125,7 @@ fn create_web_worker_callback(
broadcast_channel: program_state.broadcast_channel.clone(),
};
- let mut worker = WebWorker::from_options(
+ let (mut worker, external_handle) = WebWorker::from_options(
args.name,
args.permissions,
args.main_module,
@@ -151,7 +151,7 @@ fn create_web_worker_callback(
}
worker.bootstrap(&options);
- worker
+ (worker, external_handle)
})
}
diff --git a/cli/tests/workers/message_port.ts b/cli/tests/workers/message_port.ts
new file mode 100644
index 000000000..d78304a39
--- /dev/null
+++ b/cli/tests/workers/message_port.ts
@@ -0,0 +1,14 @@
+const channel = new MessageChannel();
+
+channel.port2.onmessage = (e) => {
+ channel.port2.postMessage(e.data === "2");
+ channel.port2.close();
+};
+
+self.postMessage("1", [channel.port1]);
+
+self.onmessage = (e) => {
+ const port1 = e.ports[0];
+ port1.postMessage(e.data === "3");
+ port1.close();
+};
diff --git a/cli/tests/workers/nonexistent_worker.out b/cli/tests/workers/nonexistent_worker.out
index e43b81c5f..04b9f801d 100644
--- a/cli/tests/workers/nonexistent_worker.out
+++ b/cli/tests/workers/nonexistent_worker.out
@@ -1,3 +1,3 @@
[WILDCARD]error: Uncaught (in worker "") Cannot resolve module "file:///[WILDCARD]cli/tests/workers/doesnt_exist.js".
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll ([WILDCARD])
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/permissions_blob_local.ts.out b/cli/tests/workers/permissions_blob_local.ts.out
index a6a34e3a2..0835777ec 100644
--- a/cli/tests/workers/permissions_blob_local.ts.out
+++ b/cli/tests/workers/permissions_blob_local.ts.out
@@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag
at blob:null/[WILDCARD]:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll (deno:runtime/js/11_workers.js:243:23)
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/permissions_blob_remote.ts.out b/cli/tests/workers/permissions_blob_remote.ts.out
index 8bd277361..2d01458ca 100644
--- a/cli/tests/workers/permissions_blob_remote.ts.out
+++ b/cli/tests/workers/permissions_blob_remote.ts.out
@@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag
at blob:null/[WILDCARD]:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll (deno:runtime/js/11_workers.js:243:23)
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/permissions_data_local.ts.out b/cli/tests/workers/permissions_data_local.ts.out
index 302ab99c8..2a6be2b57 100644
--- a/cli/tests/workers/permissions_data_local.ts.out
+++ b/cli/tests/workers/permissions_data_local.ts.out
@@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag
at data:application/javascript;base64,[WILDCARD]:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll (deno:runtime/js/11_workers.js:243:23)
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/permissions_data_remote.ts.out b/cli/tests/workers/permissions_data_remote.ts.out
index 9b0ae44cc..90677892a 100644
--- a/cli/tests/workers/permissions_data_remote.ts.out
+++ b/cli/tests/workers/permissions_data_remote.ts.out
@@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag
at data:application/javascript;base64,aW1wb3J0ICJodHRwczovL2V4YW1wbGUuY29tL3NvbWUvZmlsZS50cyI7:1:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll (deno:runtime/js/11_workers.js:243:23)
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/permissions_dynamic_remote.ts.out b/cli/tests/workers/permissions_dynamic_remote.ts.out
index e2c671c34..e68c96df1 100644
--- a/cli/tests/workers/permissions_dynamic_remote.ts.out
+++ b/cli/tests/workers/permissions_dynamic_remote.ts.out
@@ -3,4 +3,4 @@ await import("https://example.com/some/file.ts");
^
at async http://localhost:4545/cli/tests/workers/dynamic_remote.ts:2:1
[WILDCARD]error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll (deno:runtime/js/11_workers.js:243:23)
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/permissions_remote_remote.ts.out b/cli/tests/workers/permissions_remote_remote.ts.out
index 8b8820c7d..5656b75a1 100644
--- a/cli/tests/workers/permissions_remote_remote.ts.out
+++ b/cli/tests/workers/permissions_remote_remote.ts.out
@@ -1,4 +1,4 @@
error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag
at http://localhost:4545/cli/tests/workers/static_remote.ts:2:0
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll (deno:runtime/js/11_workers.js:243:23)
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts
index 6a572b92f..b37b7aeb1 100644
--- a/cli/tests/workers/test.ts
+++ b/cli/tests/workers/test.ts
@@ -769,3 +769,56 @@ Deno.test({
worker.terminate();
},
});
+
+Deno.test({
+ name: "worker with relative specifier",
+ fn: async function (): Promise<void> {
+ assertEquals(location.href, "http://127.0.0.1:4545/cli/tests/");
+ const promise = deferred();
+ const w = new Worker(
+ "./workers/test_worker.ts",
+ { type: "module", name: "tsWorker" },
+ );
+ w.onmessage = (e): void => {
+ assertEquals(e.data, "Hello, world!");
+ promise.resolve();
+ };
+ w.postMessage("Hello, world!");
+ await promise;
+ w.terminate();
+ },
+});
+
+Deno.test({
+ name: "Send MessagePorts from / to workers",
+ fn: async function (): Promise<void> {
+ const result = deferred();
+ const worker = new Worker(
+ new URL("message_port.ts", import.meta.url).href,
+ { type: "module" },
+ );
+
+ const channel = new MessageChannel();
+
+ worker.onmessage = (e) => {
+ assertEquals(e.data, "1");
+ assertEquals(e.ports.length, 1);
+ const port1 = e.ports[0];
+ port1.onmessage = (e) => {
+ assertEquals(e.data, true);
+ port1.close();
+ worker.postMessage("3", [channel.port1]);
+ };
+ port1.postMessage("2");
+ };
+
+ channel.port2.onmessage = (e) => {
+ assertEquals(e.data, true);
+ channel.port2.close();
+ result.resolve();
+ };
+
+ await result;
+ worker.terminate();
+ },
+});
diff --git a/cli/tests/workers/worker_error.ts.out b/cli/tests/workers/worker_error.ts.out
index 244e56417..4a8e92f00 100644
--- a/cli/tests/workers/worker_error.ts.out
+++ b/cli/tests/workers/worker_error.ts.out
@@ -2,4 +2,4 @@
at foo ([WILDCARD])
at [WILDCARD]
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll ([WILDCARD])
+ at Worker.#pollControl ([WILDCARD])
diff --git a/cli/tests/workers/worker_nested_error.ts.out b/cli/tests/workers/worker_nested_error.ts.out
index 244e56417..4a8e92f00 100644
--- a/cli/tests/workers/worker_nested_error.ts.out
+++ b/cli/tests/workers/worker_nested_error.ts.out
@@ -2,4 +2,4 @@
at foo ([WILDCARD])
at [WILDCARD]
error: Uncaught (in promise) Error: Unhandled error event reached main worker.
- at Worker.#poll ([WILDCARD])
+ at Worker.#pollControl ([WILDCARD])
diff --git a/extensions/web/lib.rs b/extensions/web/lib.rs
index d74bb619d..6e3552476 100644
--- a/extensions/web/lib.rs
+++ b/extensions/web/lib.rs
@@ -2,7 +2,9 @@
mod message_port;
+pub use crate::message_port::create_entangled_message_port;
pub use crate::message_port::JsMessageData;
+pub use crate::message_port::MessagePort;
use deno_core::error::bad_resource_id;
use deno_core::error::null_opbuf;
diff --git a/extensions/web/message_port.rs b/extensions/web/message_port.rs
index d10b455d5..f73d0486a 100644
--- a/extensions/web/message_port.rs
+++ b/extensions/web/message_port.rs
@@ -23,7 +23,7 @@ type MessagePortMessage = (Vec<u8>, Vec<Transferable>);
pub struct MessagePort {
rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
- tx: UnboundedSender<MessagePortMessage>,
+ tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
}
impl MessagePort {
@@ -37,7 +37,9 @@ impl MessagePort {
// Swallow the failed to send error. It means the channel was disentangled,
// but not cleaned up.
- self.tx.send((data.data.to_vec(), transferables)).ok();
+ if let Some(tx) = &*self.tx.borrow() {
+ tx.send((data.data.to_vec(), transferables)).ok();
+ }
Ok(())
}
@@ -60,6 +62,13 @@ impl MessagePort {
}
Ok(None)
}
+
+ /// This forcefully disconnects the message port from its paired port. This
+ /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
+ pub fn disentangle(&self) {
+ let mut tx = self.tx.borrow_mut();
+ tx.take();
+ }
}
pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
@@ -68,12 +77,12 @@ pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
let port1 = MessagePort {
rx: RefCell::new(port1_rx),
- tx: port1_tx,
+ tx: RefCell::new(Some(port1_tx)),
};
let port2 = MessagePort {
rx: RefCell::new(port2_rx),
- tx: port2_tx,
+ tx: RefCell::new(Some(port2_tx)),
};
(port1, port2)
@@ -204,5 +213,5 @@ pub async fn op_message_port_recv_message(
}
};
let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
- resource.port.recv(state.clone()).or_cancel(cancel).await?
+ resource.port.recv(state).or_cancel(cancel).await?
}
diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js
index c917a2880..7267bec38 100644
--- a/runtime/js/11_workers.js
+++ b/runtime/js/11_workers.js
@@ -3,10 +3,13 @@
((window) => {
const core = window.Deno.core;
+ const webidl = window.__bootstrap.webidl;
const { Window } = window.__bootstrap.globalInterfaces;
const { getLocationHref } = window.__bootstrap.location;
const { log, pathFromURL } = window.__bootstrap.util;
const { defineEventHandler } = window.__bootstrap.webUtil;
+ const { deserializeJsMessageData, serializeJsMessageData } =
+ window.__bootstrap.messagePort;
function createWorker(
specifier,
@@ -34,8 +37,12 @@
core.opSync("op_host_post_message", id, data);
}
- function hostGetMessage(id) {
- return core.opAsync("op_host_get_message", id);
+ function hostRecvCtrl(id) {
+ return core.opAsync("op_host_recv_ctrl", id);
+ }
+
+ function hostRecvMessage(id) {
+ return core.opAsync("op_host_recv_message", id);
}
/**
@@ -187,18 +194,9 @@
options?.name,
);
this.#id = id;
- this.#poll();
- }
-
- #handleMessage(data) {
- const msgEvent = new MessageEvent("message", {
- cancelable: false,
- data,
- });
-
- this.dispatchEvent(msgEvent);
+ this.#pollControl();
+ this.#pollMessages();
}
-
#handleError(e) {
const event = new ErrorEvent("error", {
cancelable: true,
@@ -219,9 +217,9 @@
return handled;
}
- #poll = async () => {
+ #pollControl = async () => {
while (!this.#terminated) {
- const [type, data] = await hostGetMessage(this.#id);
+ const [type, data] = await hostRecvCtrl(this.#id);
// If terminate was called then we ignore all messages
if (this.#terminated) {
@@ -229,11 +227,6 @@
}
switch (type) {
- case 0: { // Message
- const msg = core.deserialize(data);
- this.#handleMessage(msg);
- break;
- }
case 1: { // TerminalError
this.#terminated = true;
} /* falls through */
@@ -262,19 +255,57 @@
}
};
- postMessage(message, transferOrOptions) {
- if (transferOrOptions) {
- throw new Error(
- "Not yet implemented: `transfer` and `options` are not supported.",
- );
+ #pollMessages = async () => {
+ while (!this.terminated) {
+ const data = await hostRecvMessage(this.#id);
+ if (data === null) break;
+ let message, transfer;
+ try {
+ const v = deserializeJsMessageData(data);
+ message = v[0];
+ transfer = v[1];
+ } catch (err) {
+ const event = new MessageEvent("messageerror", {
+ cancelable: false,
+ data: err,
+ });
+ this.dispatchEvent(event);
+ return;
+ }
+ const event = new MessageEvent("message", {
+ cancelable: false,
+ data: message,
+ ports: transfer,
+ });
+ this.dispatchEvent(event);
}
+ };
- if (this.#terminated) {
- return;
+ postMessage(message, transferOrOptions = {}) {
+ const prefix = "Failed to execute 'postMessage' on 'MessagePort'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ message = webidl.converters.any(message);
+ let options;
+ if (
+ webidl.type(transferOrOptions) === "Object" &&
+ transferOrOptions !== undefined &&
+ transferOrOptions[Symbol.iterator] !== undefined
+ ) {
+ const transfer = webidl.converters["sequence<object>"](
+ transferOrOptions,
+ { prefix, context: "Argument 2" },
+ );
+ options = { transfer };
+ } else {
+ options = webidl.converters.PostMessageOptions(transferOrOptions, {
+ prefix,
+ context: "Argument 2",
+ });
}
-
- const bufferMsg = core.serialize(message);
- hostPostMessage(this.#id, bufferMsg);
+ const { transfer } = options;
+ const data = serializeJsMessageData(message, transfer);
+ if (this.#terminated) return;
+ hostPostMessage(this.#id, data);
}
terminate() {
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 91a4dcefd..d0e86bce7 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -42,6 +42,8 @@ delete Object.prototype.__proto__;
const errors = window.__bootstrap.errors.errors;
const webidl = window.__bootstrap.webidl;
const { defineEventHandler } = window.__bootstrap.webUtil;
+ const { deserializeJsMessageData, serializeJsMessageData } =
+ window.__bootstrap.messagePort;
let windowIsClosing = false;
@@ -77,9 +79,31 @@ delete Object.prototype.__proto__;
const onmessage = () => {};
const onerror = () => {};
- function postMessage(data) {
- const dataIntArray = core.serialize(data);
- core.opSync("op_worker_post_message", null, dataIntArray);
+ function postMessage(message, transferOrOptions = {}) {
+ const prefix =
+ "Failed to execute 'postMessage' on 'DedicatedWorkerGlobalScope'";
+ webidl.requiredArguments(arguments.length, 1, { prefix });
+ message = webidl.converters.any(message);
+ let options;
+ if (
+ webidl.type(transferOrOptions) === "Object" &&
+ transferOrOptions !== undefined &&
+ transferOrOptions[Symbol.iterator] !== undefined
+ ) {
+ const transfer = webidl.converters["sequence<object>"](
+ transferOrOptions,
+ { prefix, context: "Argument 2" },
+ );
+ options = { transfer };
+ } else {
+ options = webidl.converters.PostMessageOptions(transferOrOptions, {
+ prefix,
+ context: "Argument 2",
+ });
+ }
+ const { transfer } = options;
+ const data = serializeJsMessageData(message, transfer);
+ core.opSync("op_worker_post_message", data);
}
let isClosing = false;
@@ -90,12 +114,16 @@ delete Object.prototype.__proto__;
globalDispatchEvent = globalThis.dispatchEvent.bind(globalThis);
}
while (!isClosing) {
- const bufferMsg = await core.opAsync("op_worker_get_message");
- const data = core.deserialize(bufferMsg);
+ const data = await core.opAsync("op_worker_recv_message");
+ if (data === null) break;
+ const v = deserializeJsMessageData(data);
+ const message = v[0];
+ const transfer = v[1];
const msgEvent = new MessageEvent("message", {
cancelable: false,
- data,
+ data: message,
+ ports: transfer,
});
try {
diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs
index 39aa2c0a9..026e38157 100644
--- a/runtime/ops/web_worker.rs
+++ b/runtime/ops/web_worker.rs
@@ -1,15 +1,15 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::web_worker::WebWorkerInternalHandle;
-use crate::web_worker::WorkerEvent;
+use crate::web_worker::WorkerControlEvent;
use deno_core::error::generic_error;
-use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
use deno_core::op_async;
use deno_core::op_sync;
+use deno_core::CancelFuture;
use deno_core::Extension;
use deno_core::OpState;
-use deno_core::ZeroCopyBuf;
+use deno_web::JsMessageData;
use std::cell::RefCell;
use std::rc::Rc;
@@ -17,7 +17,7 @@ pub fn init() -> Extension {
Extension::builder()
.ops(vec![
("op_worker_post_message", op_sync(op_worker_post_message)),
- ("op_worker_get_message", op_async(op_worker_get_message)),
+ ("op_worker_recv_message", op_async(op_worker_recv_message)),
// Notify host that guest worker closes.
("op_worker_close", op_sync(op_worker_close)),
// Notify host that guest worker has unhandled error.
@@ -31,30 +31,28 @@ pub fn init() -> Extension {
fn op_worker_post_message(
state: &mut OpState,
+ data: JsMessageData,
_: (),
- buf: Option<ZeroCopyBuf>,
) -> Result<(), AnyError> {
- let buf = buf.ok_or_else(null_opbuf)?;
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
- handle
- .post_event(WorkerEvent::Message(buf))
- .expect("Failed to post message to host");
+ handle.port.send(state, data)?;
Ok(())
}
-async fn op_worker_get_message(
+async fn op_worker_recv_message(
state: Rc<RefCell<OpState>>,
_: (),
_: (),
-) -> Result<ZeroCopyBuf, AnyError> {
- let temp = {
- let a = state.borrow();
- a.borrow::<WebWorkerInternalHandle>().clone()
+) -> Result<Option<JsMessageData>, AnyError> {
+ let handle = {
+ let state = state.borrow();
+ state.borrow::<WebWorkerInternalHandle>().clone()
};
-
- let maybe_data = temp.get_message().await;
-
- Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty))
+ handle
+ .port
+ .recv(state.clone())
+ .or_cancel(handle.cancel)
+ .await?
}
fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> {
@@ -77,7 +75,7 @@ fn op_worker_unhandled_error(
) -> Result<(), AnyError> {
let sender = state.borrow::<WebWorkerInternalHandle>().clone();
sender
- .post_event(WorkerEvent::Error(generic_error(message)))
+ .post_event(WorkerControlEvent::Error(generic_error(message)))
.expect("Failed to propagate error event to parent worker");
Ok(())
}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index 57d3ac2b8..162f9f4f7 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -12,25 +12,23 @@ use crate::permissions::UnaryPermission;
use crate::permissions::UnitPermission;
use crate::permissions::WriteDescriptor;
use crate::web_worker::run_web_worker;
+use crate::web_worker::SendableWebWorkerHandle;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
-use crate::web_worker::WorkerEvent;
+use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use deno_core::error::custom_error;
-use deno_core::error::null_opbuf;
use deno_core::error::AnyError;
-use deno_core::error::JsError;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::de;
use deno_core::serde::de::SeqAccess;
use deno_core::serde::Deserialize;
use deno_core::serde::Deserializer;
-use deno_core::serde_json::json;
use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
-use deno_core::ZeroCopyBuf;
+use deno_web::JsMessageData;
use log::debug;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -51,8 +49,9 @@ pub struct CreateWebWorkerArgs {
pub use_deno_namespace: bool,
}
-pub type CreateWebWorkerCb =
- dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send;
+pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
+ + Sync
+ + Send;
/// A holder for callback that is used to create a new
/// WebWorker. It's a struct instead of a type alias
@@ -87,7 +86,8 @@ pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
op_sync(op_host_terminate_worker),
),
("op_host_post_message", op_sync(op_host_post_message)),
- ("op_host_get_message", op_async(op_host_get_message)),
+ ("op_host_recv_ctrl", op_async(op_host_recv_ctrl)),
+ ("op_host_recv_message", op_async(op_host_recv_message)),
])
.build()
}
@@ -458,8 +458,9 @@ fn op_create_worker(
let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, AnyError>>(1);
+ let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::<
+ Result<SendableWebWorkerHandle, AnyError>,
+ >(1);
// Setup new thread
let thread_builder =
@@ -472,17 +473,18 @@ fn op_create_worker(
// all action done upon it should be noops
// - newly spawned thread exits
- let worker = (create_module_loader.0)(CreateWebWorkerArgs {
- name: worker_name,
- worker_id,
- parent_permissions,
- permissions: worker_permissions,
- main_module: module_specifier.clone(),
- use_deno_namespace,
- });
+ let (worker, external_handle) =
+ (create_module_loader.0)(CreateWebWorkerArgs {
+ name: worker_name,
+ worker_id,
+ parent_permissions,
+ permissions: worker_permissions,
+ main_module: module_specifier.clone(),
+ use_deno_namespace,
+ });
// Send thread safe handle from newly created worker to host thread
- handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ handle_sender.send(Ok(external_handle)).unwrap();
drop(handle_sender);
// At this point the only method of communication with host
@@ -497,7 +499,7 @@ fn op_create_worker(
let worker_thread = WorkerThread {
join_handle,
- worker_handle,
+ worker_handle: worker_handle.into(),
};
// At this point all interactions with worker happen using thread
@@ -514,7 +516,7 @@ fn op_host_terminate_worker(
id: WorkerId,
_: (),
) -> Result<(), AnyError> {
- let mut worker_thread = state
+ let worker_thread = state
.borrow_mut::<WorkersTable>()
.remove(&id)
.expect("No worker handle found");
@@ -527,52 +529,13 @@ fn op_host_terminate_worker(
Ok(())
}
-use deno_core::serde::Serialize;
-use deno_core::serde::Serializer;
-
-impl Serialize for WorkerEvent {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- let type_id = match &self {
- WorkerEvent::Message(_) => 0_i32,
- WorkerEvent::TerminalError(_) => 1_i32,
- WorkerEvent::Error(_) => 2_i32,
- WorkerEvent::Close => 3_i32,
- };
-
- match self {
- WorkerEvent::Message(buf) => {
- Serialize::serialize(&(type_id, buf), serializer)
- }
- WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => {
- let value = match error.downcast_ref::<JsError>() {
- Some(js_error) => json!({
- "message": js_error.message,
- "fileName": js_error.script_resource_name,
- "lineNumber": js_error.line_number,
- "columnNumber": js_error.start_column,
- }),
- None => json!({
- "message": error.to_string(),
- }),
- };
-
- Serialize::serialize(&(type_id, value), serializer)
- }
- _ => Serialize::serialize(&(type_id, ()), serializer),
- }
- }
-}
-
/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
/// might have been called already meaning that we won't find worker in
/// table - in that case ignore.
fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
- if let Some(mut worker_thread) = workers.remove(&id) {
+ if let Some(worker_thread) = workers.remove(&id) {
worker_thread.worker_handle.terminate();
worker_thread
.join_handle
@@ -582,28 +545,28 @@ fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
}
}
-/// Get message from guest worker as host
-async fn op_host_get_message(
+/// Get control event from guest worker as host
+async fn op_host_recv_ctrl(
state: Rc<RefCell<OpState>>,
id: WorkerId,
_: (),
-) -> Result<WorkerEvent, AnyError> {
+) -> Result<WorkerControlEvent, AnyError> {
let worker_handle = {
- let s = state.borrow();
- let workers_table = s.borrow::<WorkersTable>();
+ let state = state.borrow();
+ let workers_table = state.borrow::<WorkersTable>();
let maybe_handle = workers_table.get(&id);
if let Some(handle) = maybe_handle {
handle.worker_handle.clone()
} else {
// If handle was not found it means worker has already shutdown
- return Ok(WorkerEvent::Close);
+ return Ok(WorkerControlEvent::Close);
}
};
- let maybe_event = worker_handle.get_event().await?;
+ let maybe_event = worker_handle.get_control_event().await?;
if let Some(event) = maybe_event {
// Terminal error means that worker should be removed from worker table.
- if let WorkerEvent::TerminalError(_) = &event {
+ if let WorkerControlEvent::TerminalError(_) = &event {
try_remove_and_close(state, id);
}
return Ok(event);
@@ -611,22 +574,42 @@ async fn op_host_get_message(
// If there was no event from worker it means it has already been closed.
try_remove_and_close(state, id);
- Ok(WorkerEvent::Close)
+ Ok(WorkerControlEvent::Close)
+}
+
+async fn op_host_recv_message(
+ state: Rc<RefCell<OpState>>,
+ id: WorkerId,
+ _: (),
+) -> Result<Option<JsMessageData>, AnyError> {
+ let worker_handle = {
+ let s = state.borrow();
+ let workers_table = s.borrow::<WorkersTable>();
+ let maybe_handle = workers_table.get(&id);
+ if let Some(handle) = maybe_handle {
+ handle.worker_handle.clone()
+ } else {
+ // If handle was not found it means worker has already shutdown
+ return Ok(None);
+ }
+ };
+ worker_handle.port.recv(state).await
}
/// Post message to guest worker as host
fn op_host_post_message(
state: &mut OpState,
id: WorkerId,
- data: Option<ZeroCopyBuf>,
+ data: JsMessageData,
) -> Result<(), AnyError> {
- let msg = data.ok_or_else(null_opbuf)?;
-
debug!("post message to worker {}", id);
- let worker_thread = state
- .borrow::<WorkersTable>()
- .get(&id)
- .expect("No worker handle found");
- worker_thread.worker_handle.post_message(msg)?;
+ let worker_handle = {
+ let worker_thread = state
+ .borrow::<WorkersTable>()
+ .get(&id)
+ .expect("No worker handle found");
+ worker_thread.worker_handle.clone()
+ };
+ worker_handle.port.send(state, data)?;
Ok(())
}
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 753238052..a3a062221 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -8,6 +8,7 @@ use crate::permissions::Permissions;
use crate::tokio_util::create_basic_runtime;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError;
+use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt;
@@ -18,6 +19,7 @@ use deno_core::serde::Serialize;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::v8;
+use deno_core::CancelHandle;
use deno_core::Extension;
use deno_core::GetErrorClassFn;
use deno_core::JsErrorCreateFn;
@@ -26,8 +28,9 @@ use deno_core::ModuleId;
use deno_core::ModuleLoader;
use deno_core::ModuleSpecifier;
use deno_core::RuntimeOptions;
-use deno_core::ZeroCopyBuf;
+use deno_web::create_entangled_message_port;
use deno_web::BlobUrlStore;
+use deno_web::MessagePort;
use log::debug;
use std::cell::RefCell;
use std::env;
@@ -38,7 +41,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
-use tokio::sync::Mutex as AsyncMutex;
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
@@ -55,29 +57,62 @@ impl WorkerId {
}
}
-type WorkerMessage = ZeroCopyBuf;
-
/// Events that are sent to host from child
/// worker.
-pub enum WorkerEvent {
- Message(WorkerMessage),
+pub enum WorkerControlEvent {
Error(AnyError),
TerminalError(AnyError),
Close,
}
+use deno_core::serde::Serializer;
+
+impl Serialize for WorkerControlEvent {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let type_id = match &self {
+ WorkerControlEvent::TerminalError(_) => 1_i32,
+ WorkerControlEvent::Error(_) => 2_i32,
+ WorkerControlEvent::Close => 3_i32,
+ };
+
+ match self {
+ WorkerControlEvent::TerminalError(error)
+ | WorkerControlEvent::Error(error) => {
+ let value = match error.downcast_ref::<JsError>() {
+ Some(js_error) => json!({
+ "message": js_error.message,
+ "fileName": js_error.script_resource_name,
+ "lineNumber": js_error.line_number,
+ "columnNumber": js_error.start_column,
+ }),
+ None => json!({
+ "message": error.to_string(),
+ }),
+ };
+
+ Serialize::serialize(&(type_id, value), serializer)
+ }
+ _ => Serialize::serialize(&(type_id, ()), serializer),
+ }
+ }
+}
+
// Channels used for communication with worker's parent
#[derive(Clone)]
pub struct WebWorkerInternalHandle {
- sender: mpsc::Sender<WorkerEvent>,
- receiver: Rc<RefCell<mpsc::Receiver<WorkerMessage>>>,
+ sender: mpsc::Sender<WorkerControlEvent>,
+ pub port: Rc<MessagePort>,
+ pub cancel: Rc<CancelHandle>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
impl WebWorkerInternalHandle {
/// Post WorkerEvent to parent as a worker
- pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> {
+ pub fn post_event(&self, event: WorkerControlEvent) -> Result<(), AnyError> {
let mut sender = self.sender.clone();
// If the channel is closed,
// the worker must have terminated but the termination message has not yet been received.
@@ -91,13 +126,6 @@ impl WebWorkerInternalHandle {
Ok(())
}
- /// Get the WorkerEvent with lock
- /// Panic if more than one listener tries to get event
- pub async fn get_message(&self) -> Option<WorkerMessage> {
- let mut receiver = self.receiver.borrow_mut();
- receiver.next().await
- }
-
/// Check if this worker is terminated or being terminated
pub fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst)
@@ -106,6 +134,8 @@ impl WebWorkerInternalHandle {
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
pub fn terminate(&mut self) {
+ self.cancel.cancel();
+
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
@@ -121,40 +151,52 @@ impl WebWorkerInternalHandle {
}
}
-#[derive(Clone)]
-pub struct WebWorkerHandle {
- sender: mpsc::Sender<WorkerMessage>,
- receiver: Arc<AsyncMutex<mpsc::Receiver<WorkerEvent>>>,
+pub struct SendableWebWorkerHandle {
+ port: MessagePort,
+ receiver: mpsc::Receiver<WorkerControlEvent>,
terminated: Arc<AtomicBool>,
isolate_handle: v8::IsolateHandle,
}
-impl WebWorkerHandle {
- /// Post WorkerMessage to worker as a host
- pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> {
- let mut sender = self.sender.clone();
- // If the channel is closed,
- // the worker must have terminated but the termination message has not yet been recieved.
- //
- // Therefore just treat it as if the worker has terminated and return.
- if sender.is_closed() {
- self.terminated.store(true, Ordering::SeqCst);
- return Ok(());
+impl From<SendableWebWorkerHandle> for WebWorkerHandle {
+ fn from(handle: SendableWebWorkerHandle) -> Self {
+ WebWorkerHandle {
+ receiver: Rc::new(RefCell::new(handle.receiver)),
+ port: Rc::new(handle.port),
+ terminated: handle.terminated,
+ isolate_handle: handle.isolate_handle,
}
- sender.try_send(buf)?;
- Ok(())
}
+}
+
+/// This is the handle to the web worker that the parent thread uses to
+/// communicate with the worker. It is created from a `SendableWebWorkerHandle`
+/// which is sent to the parent thread from the worker thread where it is
+/// created. The reason for this seperation is that the handle first needs to be
+/// `Send` when transferring between threads, and then must be `Clone` when it
+/// has arrived on the parent thread. It can not be both at once without large
+/// amounts of Arc<Mutex> and other fun stuff.
+#[derive(Clone)]
+pub struct WebWorkerHandle {
+ pub port: Rc<MessagePort>,
+ receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
+ terminated: Arc<AtomicBool>,
+ isolate_handle: v8::IsolateHandle,
+}
+impl WebWorkerHandle {
/// Get the WorkerEvent with lock
/// Return error if more than one listener tries to get event
- pub async fn get_event(&self) -> Result<Option<WorkerEvent>, AnyError> {
- let mut receiver = self.receiver.try_lock()?;
+ pub async fn get_control_event(
+ &self,
+ ) -> Result<Option<WorkerControlEvent>, AnyError> {
+ let mut receiver = self.receiver.borrow_mut();
Ok(receiver.next().await)
}
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
- pub fn terminate(&mut self) {
+ pub fn terminate(self) {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
@@ -165,26 +207,26 @@ impl WebWorkerHandle {
self.isolate_handle.terminate_execution();
}
- // Wake web worker by closing the channel
- self.sender.close_channel();
+ self.port.disentangle();
}
}
fn create_handles(
isolate_handle: v8::IsolateHandle,
-) -> (WebWorkerInternalHandle, WebWorkerHandle) {
- let (in_tx, in_rx) = mpsc::channel::<WorkerMessage>(1);
- let (out_tx, out_rx) = mpsc::channel::<WorkerEvent>(1);
+) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
+ let (parent_port, worker_port) = create_entangled_message_port();
+ let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
let terminated = Arc::new(AtomicBool::new(false));
let internal_handle = WebWorkerInternalHandle {
- sender: out_tx,
- receiver: Rc::new(RefCell::new(in_rx)),
+ sender: ctrl_tx,
+ port: Rc::new(parent_port),
terminated: terminated.clone(),
isolate_handle: isolate_handle.clone(),
+ cancel: CancelHandle::new_rc(),
};
- let external_handle = WebWorkerHandle {
- sender: in_tx,
- receiver: Arc::new(AsyncMutex::new(out_rx)),
+ let external_handle = SendableWebWorkerHandle {
+ receiver: ctrl_rx,
+ port: worker_port,
terminated,
isolate_handle,
};
@@ -200,7 +242,6 @@ pub struct WebWorker {
pub js_runtime: JsRuntime,
pub name: String,
internal_handle: WebWorkerInternalHandle,
- external_handle: WebWorkerHandle,
pub use_deno_namespace: bool,
pub main_module: ModuleSpecifier,
}
@@ -237,7 +278,7 @@ impl WebWorker {
main_module: ModuleSpecifier,
worker_id: WorkerId,
options: &WebWorkerOptions,
- ) -> Self {
+ ) -> (Self, SendableWebWorkerHandle) {
// Permissions: many ops depend on this
let unstable = options.unstable;
let perm_ext = Extension::builder()
@@ -333,15 +374,17 @@ impl WebWorker {
(internal_handle, external_handle)
};
- Self {
- id: worker_id,
- js_runtime,
- name,
- internal_handle,
+ (
+ Self {
+ id: worker_id,
+ js_runtime,
+ name,
+ internal_handle,
+ use_deno_namespace: options.use_deno_namespace,
+ main_module,
+ },
external_handle,
- use_deno_namespace: options.use_deno_namespace,
- main_module,
- }
+ )
}
pub fn bootstrap(&mut self, options: &WebWorkerOptions) {
@@ -419,11 +462,6 @@ impl WebWorker {
}
}
- /// Returns a way to communicate with the Worker from other threads.
- pub fn thread_safe_handle(&self) -> WebWorkerHandle {
- self.external_handle.clone()
- }
-
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
@@ -446,7 +484,7 @@ impl WebWorker {
print_worker_error(e.to_string(), &self.name);
let handle = self.internal_handle.clone();
handle
- .post_event(WorkerEvent::Error(e))
+ .post_event(WorkerControlEvent::Error(e))
.expect("Failed to post message to host");
return Poll::Pending;
@@ -513,7 +551,7 @@ pub fn run_web_worker(
if let Err(e) = result {
print_worker_error(e.to_string(), &name);
internal_handle
- .post_event(WorkerEvent::TerminalError(e))
+ .post_event(WorkerControlEvent::TerminalError(e))
.expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye.
@@ -524,134 +562,3 @@ pub fn run_web_worker(
debug!("Worker thread shuts down {}", &name);
result
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::tokio_util;
-
- fn create_test_web_worker() -> WebWorker {
- let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap();
- let module_loader = Rc::new(deno_core::NoopModuleLoader);
- let create_web_worker_cb = Arc::new(|_| unreachable!());
-
- let options = WebWorkerOptions {
- args: vec![],
- apply_source_maps: false,
- debug_flag: false,
- unstable: false,
- ca_data: None,
- user_agent: "x".to_string(),
- seed: None,
- module_loader,
- create_web_worker_cb,
- js_error_create_fn: None,
- use_deno_namespace: false,
- maybe_inspector_server: None,
- runtime_version: "x".to_string(),
- ts_version: "x".to_string(),
- no_color: true,
- get_error_class_fn: None,
- blob_url_store: BlobUrlStore::default(),
- broadcast_channel: InMemoryBroadcastChannel::default(),
- };
-
- let mut worker = WebWorker::from_options(
- "TEST".to_string(),
- Permissions::allow_all(),
- main_module,
- WorkerId(1),
- &options,
- );
- worker.bootstrap(&options);
- worker
- }
-
- #[tokio::test]
- async fn test_worker_messages() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_web_worker();
- let source = r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- return close();
- } else {
- console.assert(e.data === "hi");
- }
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#;
- worker.execute_script("a", source).unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop(false));
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
- let r = handle.post_message(msg.clone().into());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
-
- let r = handle.post_message(msg.clone().into());
- assert!(r.is_ok());
-
- let maybe_msg = handle.get_event().await.unwrap();
- assert!(maybe_msg.is_some());
- match maybe_msg {
- Some(WorkerEvent::Message(buf)) => {
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]);
- }
- _ => unreachable!(),
- }
-
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded
- let r = handle.post_message(msg.into());
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
- join_handle.join().expect("Failed to join worker thread");
- }
-
- #[tokio::test]
- async fn removed_from_resource_table_on_close() {
- let (handle_sender, handle_receiver) =
- std::sync::mpsc::sync_channel::<WebWorkerHandle>(1);
-
- let join_handle = std::thread::spawn(move || {
- let mut worker = create_test_web_worker();
- worker
- .execute_script("a", "onmessage = () => { close(); }")
- .unwrap();
- let handle = worker.thread_safe_handle();
- handle_sender.send(handle).unwrap();
- let r = tokio_util::run_basic(worker.run_event_loop(false));
- assert!(r.is_ok())
- });
-
- let mut handle = handle_receiver.recv().unwrap();
-
- // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value
- let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded
- let r = handle.post_message(msg.clone().into());
- assert!(r.is_ok());
- let event = handle.get_event().await.unwrap();
- assert!(event.is_none());
- handle.sender.close_channel();
-
- join_handle.join().expect("Failed to join worker thread");
- }
-}