From 27df42f659ae7b77968d31363ade89addb516ea1 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> Date: Wed, 23 Oct 2024 21:50:35 -0700 Subject: fix(ext/node): cancel pending ipc writes on channel close (#26504) Fixes the issue described in https://github.com/denoland/deno/issues/23882#issuecomment-2423316362. The parent was starting to send a message right before the process would exit, and the channel closed in the middle of the write. Unlike with reads, we weren't cancelling the pending writes, which resulted in a `Broken pipe` error surfacing to the user. --- ext/node/ops/ipc.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'ext/node/ops/ipc.rs') diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index 59b6fece1..c5e0f875d 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -216,10 +216,17 @@ mod impl_ { queue_ok.set_index(scope, 0, v); } Ok(async move { - stream.clone().write_msg_bytes(&serialized).await?; + let cancel = stream.cancel.clone(); + let result = stream + .clone() + .write_msg_bytes(&serialized) + .or_cancel(cancel) + .await; + // adjust count even on error stream .queued_bytes .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed); + result??; Ok(()) }) } -- cgit v1.2.3 From c71e020668b40666aecfdffb1dbf979abcb41958 Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Thu, 24 Oct 2024 10:45:17 -0700 Subject: refactor(ext/node): use concrete error types (#26419) --- ext/node/ops/ipc.rs | 50 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 17 deletions(-) (limited to 'ext/node/ops/ipc.rs') diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index c5e0f875d..672cf0d70 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -17,8 +17,6 @@ mod impl_ { use std::task::Context; use std::task::Poll; - use deno_core::error::bad_resource_id; - use deno_core::error::AnyError; use deno_core::op2; use deno_core::serde; use deno_core::serde::Serializer; @@ -167,7 +165,7 @@ mod impl_ { #[smi] pub fn op_node_child_ipc_pipe( state: &mut OpState, - ) -> Result, AnyError> { + ) -> Result, io::Error> { let fd = match state.try_borrow_mut::() { Some(child_pipe_fd) => child_pipe_fd.0, None => return Ok(None), @@ -180,6 +178,18 @@ mod impl_ { )) } + #[derive(Debug, thiserror::Error)] + pub enum IpcError { + #[error(transparent)] + Resource(deno_core::error::AnyError), + #[error(transparent)] + IpcJsonStream(#[from] IpcJsonStreamError), + #[error(transparent)] + Canceled(#[from] deno_core::Canceled), + #[error("failed to serialize json value: {0}")] + SerdeJson(serde_json::Error), + } + #[op2(async)] pub fn op_node_ipc_write<'a>( scope: &mut v8::HandleScope<'a>, @@ -192,27 +202,23 @@ mod impl_ { // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not // supported by `op2` currently. queue_ok: v8::Local<'a, v8::Array>, - ) -> Result>, AnyError> { + ) -> Result>, IpcError> { let mut serialized = Vec::with_capacity(64); let mut ser = serde_json::Serializer::new(&mut serialized); - serialize_v8_value(scope, value, &mut ser).map_err(|e| { - deno_core::error::type_error(format!( - "failed to serialize json value: {e}" - )) - })?; + serialize_v8_value(scope, value, &mut ser).map_err(IpcError::SerdeJson)?; serialized.push(b'\n'); let stream = state .borrow() .resource_table .get::(rid) - .map_err(|_| bad_resource_id())?; + .map_err(IpcError::Resource)?; let old = stream .queued_bytes .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed); if old + serialized.len() > 2 * INITIAL_CAPACITY { // sending messages too fast - let v = false.to_v8(scope)?; + let v = false.to_v8(scope).unwrap(); // Infallible queue_ok.set_index(scope, 0, v); } Ok(async move { @@ -246,12 +252,12 @@ mod impl_ { pub async fn op_node_ipc_read( state: Rc>, #[smi] rid: ResourceId, - ) -> Result { + ) -> Result { let stream = state .borrow() .resource_table .get::(rid) - .map_err(|_| bad_resource_id())?; + .map_err(IpcError::Resource)?; let cancel = stream.cancel.clone(); let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; @@ -407,7 +413,7 @@ mod impl_ { async fn write_msg_bytes( self: Rc, msg: &[u8], - ) -> Result<(), AnyError> { + ) -> Result<(), io::Error> { let mut write_half = RcRef::map(self, |r| &r.write_half).borrow_mut().await; write_half.write_all(msg).await?; @@ -462,6 +468,14 @@ mod impl_ { } } + #[derive(Debug, thiserror::Error)] + pub enum IpcJsonStreamError { + #[error("{0}")] + Io(#[source] std::io::Error), + #[error("{0}")] + SimdJson(#[source] simd_json::Error), + } + // JSON serialization stream over IPC pipe. // // `\n` is used as a delimiter between messages. @@ -482,7 +496,7 @@ mod impl_ { async fn read_msg( &mut self, - ) -> Result, AnyError> { + ) -> Result, IpcJsonStreamError> { let mut json = None; let nread = read_msg_inner( &mut self.pipe, @@ -490,7 +504,8 @@ mod impl_ { &mut json, &mut self.read_buffer, ) - .await?; + .await + .map_err(IpcJsonStreamError::Io)?; if nread == 0 { // EOF. return Ok(None); @@ -500,7 +515,8 @@ mod impl_ { Some(v) => v, None => { // Took more than a single read and some buffering. - simd_json::from_slice(&mut self.buffer[..nread])? + simd_json::from_slice(&mut self.buffer[..nread]) + .map_err(IpcJsonStreamError::SimdJson)? } }; -- cgit v1.2.3