summaryrefslogtreecommitdiff
path: root/ext/node/ops/ipc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r--ext/node/ops/ipc.rs59
1 files changed, 41 insertions, 18 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index 59b6fece1..672cf0d70 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -17,8 +17,6 @@ mod impl_ {
use std::task::Context;
use std::task::Poll;
- use deno_core::error::bad_resource_id;
- use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::serde;
use deno_core::serde::Serializer;
@@ -167,7 +165,7 @@ mod impl_ {
#[smi]
pub fn op_node_child_ipc_pipe(
state: &mut OpState,
- ) -> Result<Option<ResourceId>, AnyError> {
+ ) -> Result<Option<ResourceId>, io::Error> {
let fd = match state.try_borrow_mut::<crate::ChildPipeFd>() {
Some(child_pipe_fd) => child_pipe_fd.0,
None => return Ok(None),
@@ -180,6 +178,18 @@ mod impl_ {
))
}
+ #[derive(Debug, thiserror::Error)]
+ pub enum IpcError {
+ #[error(transparent)]
+ Resource(deno_core::error::AnyError),
+ #[error(transparent)]
+ IpcJsonStream(#[from] IpcJsonStreamError),
+ #[error(transparent)]
+ Canceled(#[from] deno_core::Canceled),
+ #[error("failed to serialize json value: {0}")]
+ SerdeJson(serde_json::Error),
+ }
+
#[op2(async)]
pub fn op_node_ipc_write<'a>(
scope: &mut v8::HandleScope<'a>,
@@ -192,34 +202,37 @@ mod impl_ {
// ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
// supported by `op2` currently.
queue_ok: v8::Local<'a, v8::Array>,
- ) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> {
+ ) -> Result<impl Future<Output = Result<(), io::Error>>, IpcError> {
let mut serialized = Vec::with_capacity(64);
let mut ser = serde_json::Serializer::new(&mut serialized);
- serialize_v8_value(scope, value, &mut ser).map_err(|e| {
- deno_core::error::type_error(format!(
- "failed to serialize json value: {e}"
- ))
- })?;
+ serialize_v8_value(scope, value, &mut ser).map_err(IpcError::SerdeJson)?;
serialized.push(b'\n');
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStreamResource>(rid)
- .map_err(|_| bad_resource_id())?;
+ .map_err(IpcError::Resource)?;
let old = stream
.queued_bytes
.fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
if old + serialized.len() > 2 * INITIAL_CAPACITY {
// sending messages too fast
- let v = false.to_v8(scope)?;
+ let v = false.to_v8(scope).unwrap(); // Infallible
queue_ok.set_index(scope, 0, v);
}
Ok(async move {
- stream.clone().write_msg_bytes(&serialized).await?;
+ let cancel = stream.cancel.clone();
+ let result = stream
+ .clone()
+ .write_msg_bytes(&serialized)
+ .or_cancel(cancel)
+ .await;
+ // adjust count even on error
stream
.queued_bytes
.fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
+ result??;
Ok(())
})
}
@@ -239,12 +252,12 @@ mod impl_ {
pub async fn op_node_ipc_read(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
- ) -> Result<serde_json::Value, AnyError> {
+ ) -> Result<serde_json::Value, IpcError> {
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStreamResource>(rid)
- .map_err(|_| bad_resource_id())?;
+ .map_err(IpcError::Resource)?;
let cancel = stream.cancel.clone();
let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
@@ -400,7 +413,7 @@ mod impl_ {
async fn write_msg_bytes(
self: Rc<Self>,
msg: &[u8],
- ) -> Result<(), AnyError> {
+ ) -> Result<(), io::Error> {
let mut write_half =
RcRef::map(self, |r| &r.write_half).borrow_mut().await;
write_half.write_all(msg).await?;
@@ -455,6 +468,14 @@ mod impl_ {
}
}
+ #[derive(Debug, thiserror::Error)]
+ pub enum IpcJsonStreamError {
+ #[error("{0}")]
+ Io(#[source] std::io::Error),
+ #[error("{0}")]
+ SimdJson(#[source] simd_json::Error),
+ }
+
// JSON serialization stream over IPC pipe.
//
// `\n` is used as a delimiter between messages.
@@ -475,7 +496,7 @@ mod impl_ {
async fn read_msg(
&mut self,
- ) -> Result<Option<serde_json::Value>, AnyError> {
+ ) -> Result<Option<serde_json::Value>, IpcJsonStreamError> {
let mut json = None;
let nread = read_msg_inner(
&mut self.pipe,
@@ -483,7 +504,8 @@ mod impl_ {
&mut json,
&mut self.read_buffer,
)
- .await?;
+ .await
+ .map_err(IpcJsonStreamError::Io)?;
if nread == 0 {
// EOF.
return Ok(None);
@@ -493,7 +515,8 @@ mod impl_ {
Some(v) => v,
None => {
// Took more than a single read and some buffering.
- simd_json::from_slice(&mut self.buffer[..nread])?
+ simd_json::from_slice(&mut self.buffer[..nread])
+ .map_err(IpcJsonStreamError::SimdJson)?
}
};