From 1ab3691b091e34ffa5a0b8f2cd18a87da8c4930c Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 10 Oct 2022 10:28:35 +0200 Subject: feat(core): add Deno.core.writeAll(rid, chunk) (#16228) This commit adds a new op_write_all to core that allows writing an entire chunk in a single async op call. Internally this calls `Resource::write_all`. The `writableStreamForRid` has been moved to `06_streams.js` now, and uses this new op. Various other code paths now also use this new op. Closes #16227 --- ext/web/06_streams.js | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) (limited to 'ext/web/06_streams.js') diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 76e31503f..09e5b7414 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -826,6 +826,62 @@ return finalBuffer; } + /** + * Create a new Writable object that is backed by a Resource that implements + * `Resource::write` / `Resource::write_all`. This object contains enough + * metadata to allow callers to bypass the JavaScript WritableStream + * implementation and write directly to the underlying resource if they so + * choose (FastStream). + * + * @param {number} rid The resource ID to write to. + * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. + * @returns {ReadableStream} + */ + function writableStreamForRid(rid, autoClose = true) { + const stream = webidl.createBranded(WritableStream); + stream[_resourceBacking] = { rid, autoClose }; + + const tryClose = () => { + if (!autoClose) return; + RESOURCE_REGISTRY.unregister(stream); + core.tryClose(rid); + }; + + if (autoClose) { + RESOURCE_REGISTRY.register(stream, rid, stream); + } + + const underlyingSink = { + async write(chunk, controller) { + try { + await core.writeAll(rid, chunk); + } catch (e) { + controller.error(e); + tryClose(); + } + }, + close() { + tryClose(); + }, + abort() { + tryClose(); + }, + }; + initializeWritableStream(stream); + setUpWritableStreamDefaultControllerFromUnderlyingSink( + stream, + underlyingSink, + underlyingSink, + 1, + () => 1, + ); + return stream; + } + + function getWritableStreamResourceBacking(stream) { + return stream[_resourceBacking]; + } + /* * @param {ReadableStream} stream */ @@ -6059,6 +6115,8 @@ readableStreamForRidUnrefableUnref, readableStreamThrowIfErrored, getReadableStreamResourceBacking, + writableStreamForRid, + getWritableStreamResourceBacking, Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, -- cgit v1.2.3