From 375ce63c6390cf7710210ce22f14a2b5a02cbfc3 Mon Sep 17 00:00:00 2001 From: Aaron O'Mullan Date: Tue, 9 Nov 2021 19:26:17 +0100 Subject: feat(core): streams (#12596) This allows resources to be "streams" by implementing read/write/shutdown. These streams are implicit since their nature (read/write/duplex) isn't known until called, but we could easily add another method to explicitly tag resources as streams. `op_read/op_write/op_shutdown` are now builtin ops provided by `deno_core` Note: this current implementation is simple & straightforward but it results in an additional alloc per read/write call Closes #12556 --- core/ops_builtin.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'core/ops_builtin.rs') diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index d33565caf..a6cf82fe9 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -1,6 +1,7 @@ use crate::error::type_error; use crate::error::AnyError; use crate::include_js_files; +use crate::op_async; use crate::op_sync; use crate::ops_metrics::OpMetrics; use crate::resources::ResourceId; @@ -36,6 +37,10 @@ pub(crate) fn init_builtins() -> Extension { ("op_metrics", op_sync(op_metrics)), ("op_void_sync", void_op_sync()), ("op_void_async", void_op_async()), + // TODO(@AaronO): track IO metrics for builtin streams + ("op_read", op_async(op_read)), + ("op_write", op_async(op_write)), + ("op_shutdown", op_async(op_shutdown)), ]) .build() } @@ -170,3 +175,30 @@ pub fn op_metrics( let per_op = state.tracker.per_op(); Ok((aggregate, per_op)) } + +async fn op_read( + state: Rc>, + rid: ResourceId, + buf: ZeroCopyBuf, +) -> Result { + let resource = state.borrow().resource_table.get_any(rid)?; + resource.read(buf).await.map(|n| n as u32) +} + +async fn op_write( + state: Rc>, + rid: ResourceId, + buf: ZeroCopyBuf, +) -> Result { + let resource = state.borrow().resource_table.get_any(rid)?; + resource.write(buf).await.map(|n| n as u32) +} + +async fn op_shutdown( + state: Rc>, + rid: ResourceId, + _: (), +) -> Result<(), AnyError> { + let resource = state.borrow().resource_table.get_any(rid)?; + resource.shutdown().await +} -- cgit v1.2.3