diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2024-04-16 19:41:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-16 18:41:03 +0000 |
commit | 760d64bc6b200ae58e8ee948903bf1e42b6799b5 (patch) | |
tree | b92428b98e639228c847a01fcff6e2d3f036e413 | |
parent | 534dd34f8662691c1e29cc1a6b84f18d311fe0c9 (diff) |
fix(ext/node): worker_threads.receiveMessageOnPort doesn't panic (#23406)
Follow up to https://github.com/denoland/deno/pull/23386.
Instead of using async `recv()` method, it was replaced
with a poll based function that doesn't hold onto
RefCell borrow across await point.
Fixes https://github.com/denoland/deno/issues/23362
-rw-r--r-- | ext/web/message_port.rs | 16 | ||||
-rw-r--r-- | tests/unit_node/worker_threads_test.ts | 24 |
2 files changed, 34 insertions, 6 deletions
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index ac33145b1..c069037f8 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -15,6 +15,7 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use futures::future::poll_fn; use serde::Deserialize; use serde::Serialize; use tokio::sync::mpsc::error::TryRecvError; @@ -52,16 +53,19 @@ impl MessagePort { Ok(()) } - #[allow(clippy::await_holding_refcell_ref)] // TODO(ry) remove! pub async fn recv( &self, state: Rc<RefCell<OpState>>, ) -> Result<Option<JsMessageData>, AnyError> { - let mut rx = self - .rx - .try_borrow_mut() - .map_err(|_| type_error("Port receiver is already borrowed"))?; - if let Some((data, transferables)) = rx.recv().await { + let rx = &self.rx; + + let maybe_data = poll_fn(|cx| { + let mut rx = rx.borrow_mut(); + rx.poll_recv(cx) + }) + .await; + + if let Some((data, transferables)) = maybe_data { let js_transferables = serialize_transferables(&mut state.borrow_mut(), transferables); return Ok(Some(JsMessageData { diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index bd600469b..21bbca194 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -418,21 +418,45 @@ Deno.test({ // Regression test for https://github.com/denoland/deno/issues/23362 Deno.test("[node/worker_threads] receiveMessageOnPort works if there's pending read", function () { const { port1, port2 } = new workerThreads.MessageChannel(); + const { port1: port3, port2: port4 } = new workerThreads.MessageChannel(); + const { port1: port5, port2: port6 } = new workerThreads.MessageChannel(); const message1 = { hello: "world" }; const message2 = { foo: "bar" }; assertEquals(workerThreads.receiveMessageOnPort(port2), undefined); port2.start(); + port4.start(); + port6.start(); port1.postMessage(message1); port1.postMessage(message2); + port3.postMessage(message1); + port3.postMessage(message2); + port5.postMessage(message1); + port5.postMessage(message2); assertEquals(workerThreads.receiveMessageOnPort(port2), { message: message1, }); assertEquals(workerThreads.receiveMessageOnPort(port2), { message: message2, }); + assertEquals(workerThreads.receiveMessageOnPort(port4), { + message: message1, + }); + assertEquals(workerThreads.receiveMessageOnPort(port4), { + message: message2, + }); + assertEquals(workerThreads.receiveMessageOnPort(port6), { + message: message1, + }); + assertEquals(workerThreads.receiveMessageOnPort(port6), { + message: message2, + }); port1.close(); port2.close(); + port3.close(); + port4.close(); + port5.close(); + port6.close(); }); |