diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-02-14 17:38:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-02-14 17:38:45 +0100 |
commit | d47147fb6ad229b1c039aff9d0959b6e281f4df5 (patch) | |
tree | 6e9e790f2b9bc71b5f0c9c7e64b95cae31579d58 /ext/node/polyfills/internal_binding/stream_wrap.ts | |
parent | 1d00bbe47e2ca14e2d2151518e02b2324461a065 (diff) |
feat(ext/node): embed std/node into the snapshot (#17724)
This commit moves "deno_std/node" in "ext/node" crate. The code is
transpiled and snapshotted during the build process.
During the first pass a minimal amount of work was done to create the
snapshot, a lot of code in "ext/node" depends on presence of "Deno"
global. This code will be gradually fixed in the follow up PRs to migrate
it to import relevant APIs from "internal:" modules.
Currently the code from snapshot is not used in any way, and all
Node/npm compatibility still uses code from
"https://deno.land/std/node" (or from the location specified by
"DENO_NODE_COMPAT_URL"). This will also be handled in a follow
up PRs.
---------
Co-authored-by: crowlkats <crowlkats@toaxl.com>
Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Co-authored-by: Yoshiya Hinosawa <stibium121@gmail.com>
Diffstat (limited to 'ext/node/polyfills/internal_binding/stream_wrap.ts')
-rw-r--r-- | ext/node/polyfills/internal_binding/stream_wrap.ts | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts new file mode 100644 index 000000000..3aee3b9da --- /dev/null +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -0,0 +1,374 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +// This module ports: +// - https://github.com/nodejs/node/blob/master/src/stream_base-inl.h +// - https://github.com/nodejs/node/blob/master/src/stream_base.h +// - https://github.com/nodejs/node/blob/master/src/stream_base.cc +// - https://github.com/nodejs/node/blob/master/src/stream_wrap.h +// - https://github.com/nodejs/node/blob/master/src/stream_wrap.cc + +import { TextEncoder } from "internal:deno_web/08_text_encoding.js"; +import { Buffer } from "internal:deno_node/polyfills/buffer.ts"; +import { notImplemented } from "internal:deno_node/polyfills/_utils.ts"; +import { HandleWrap } from "internal:deno_node/polyfills/internal_binding/handle_wrap.ts"; +import { + AsyncWrap, + providerType, +} from "internal:deno_node/polyfills/internal_binding/async_wrap.ts"; +import { codeMap } from "internal:deno_node/polyfills/internal_binding/uv.ts"; + +interface Reader { + read(p: Uint8Array): Promise<number | null>; +} + +interface Writer { + write(p: Uint8Array): Promise<number>; +} + +export interface Closer { + close(): void; +} + +type Ref = { ref(): void; unref(): void }; + +enum StreamBaseStateFields { + kReadBytesOrError, + kArrayBufferOffset, + kBytesWritten, + kLastWriteWasAsync, + kNumStreamBaseStateFields, +} + +export const kReadBytesOrError = StreamBaseStateFields.kReadBytesOrError; +export const kArrayBufferOffset = StreamBaseStateFields.kArrayBufferOffset; +export const kBytesWritten = StreamBaseStateFields.kBytesWritten; +export const kLastWriteWasAsync = StreamBaseStateFields.kLastWriteWasAsync; +export const kNumStreamBaseStateFields = + StreamBaseStateFields.kNumStreamBaseStateFields; + +export const streamBaseState = new Uint8Array(5); + +// This is Deno, it always will be async. +streamBaseState[kLastWriteWasAsync] = 1; + +export class WriteWrap<H extends HandleWrap> extends AsyncWrap { + handle!: H; + oncomplete!: (status: number) => void; + async!: boolean; + bytes!: number; + buffer!: unknown; + callback!: unknown; + _chunks!: unknown[]; + + constructor() { + super(providerType.WRITEWRAP); + } +} + +export class ShutdownWrap<H extends HandleWrap> extends AsyncWrap { + handle!: H; + oncomplete!: (status: number) => void; + callback!: () => void; + + constructor() { + super(providerType.SHUTDOWNWRAP); + } +} + +export const kStreamBaseField = Symbol("kStreamBaseField"); + +const SUGGESTED_SIZE = 64 * 1024; + +export class LibuvStreamWrap extends HandleWrap { + [kStreamBaseField]?: Reader & Writer & Closer & Ref; + + reading!: boolean; + #reading = false; + destroyed = false; + writeQueueSize = 0; + bytesRead = 0; + bytesWritten = 0; + + onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined; + + constructor( + provider: providerType, + stream?: Reader & Writer & Closer & Ref, + ) { + super(provider); + this.#attachToObject(stream); + } + + /** + * Start the reading of the stream. + * @return An error status code. + */ + readStart(): number { + if (!this.#reading) { + this.#reading = true; + this.#read(); + } + + return 0; + } + + /** + * Stop the reading of the stream. + * @return An error status code. + */ + readStop(): number { + this.#reading = false; + + return 0; + } + + /** + * Shutdown the stream. + * @param req A shutdown request wrapper. + * @return An error status code. + */ + shutdown(req: ShutdownWrap<LibuvStreamWrap>): number { + const status = this._onClose(); + + try { + req.oncomplete(status); + } catch { + // swallow callback error. + } + + return 0; + } + + /** + * @param userBuf + * @return An error status code. + */ + useUserBuffer(_userBuf: unknown): number { + // TODO(cmorten) + notImplemented("LibuvStreamWrap.prototype.useUserBuffer"); + } + + /** + * Write a buffer to the stream. + * @param req A write request wrapper. + * @param data The Uint8Array buffer to write to the stream. + * @return An error status code. + */ + writeBuffer(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array): number { + this.#write(req, data); + + return 0; + } + + /** + * Write multiple chunks at once. + * @param req A write request wrapper. + * @param chunks + * @param allBuffers + * @return An error status code. + */ + writev( + req: WriteWrap<LibuvStreamWrap>, + chunks: Buffer[] | (string | Buffer)[], + allBuffers: boolean, + ): number { + const count = allBuffers ? chunks.length : chunks.length >> 1; + const buffers: Buffer[] = new Array(count); + + if (!allBuffers) { + for (let i = 0; i < count; i++) { + const chunk = chunks[i * 2]; + + if (Buffer.isBuffer(chunk)) { + buffers[i] = chunk; + } + + // String chunk + const encoding: string = chunks[i * 2 + 1] as string; + buffers[i] = Buffer.from(chunk as string, encoding); + } + } else { + for (let i = 0; i < count; i++) { + buffers[i] = chunks[i] as Buffer; + } + } + + return this.writeBuffer(req, Buffer.concat(buffers)); + } + + /** + * Write an ASCII string to the stream. + * @return An error status code. + */ + writeAsciiString(req: WriteWrap<LibuvStreamWrap>, data: string): number { + const buffer = new TextEncoder().encode(data); + + return this.writeBuffer(req, buffer); + } + + /** + * Write an UTF8 string to the stream. + * @return An error status code. + */ + writeUtf8String(req: WriteWrap<LibuvStreamWrap>, data: string): number { + const buffer = new TextEncoder().encode(data); + + return this.writeBuffer(req, buffer); + } + + /** + * Write an UCS2 string to the stream. + * @return An error status code. + */ + writeUcs2String(_req: WriteWrap<LibuvStreamWrap>, _data: string): number { + notImplemented("LibuvStreamWrap.prototype.writeUcs2String"); + } + + /** + * Write an LATIN1 string to the stream. + * @return An error status code. + */ + writeLatin1String(req: WriteWrap<LibuvStreamWrap>, data: string): number { + const buffer = Buffer.from(data, "latin1"); + return this.writeBuffer(req, buffer); + } + + override _onClose(): number { + let status = 0; + this.#reading = false; + + try { + this[kStreamBaseField]?.close(); + } catch { + status = codeMap.get("ENOTCONN")!; + } + + return status; + } + + /** + * Attaches the class to the underlying stream. + * @param stream The stream to attach to. + */ + #attachToObject(stream?: Reader & Writer & Closer & Ref) { + this[kStreamBaseField] = stream; + } + + /** Internal method for reading from the attached stream. */ + async #read() { + let buf = new Uint8Array(SUGGESTED_SIZE); + + let nread: number | null; + try { + nread = await this[kStreamBaseField]!.read(buf); + } catch (e) { + if ( + e instanceof Deno.errors.Interrupted || + e instanceof Deno.errors.BadResource + ) { + nread = codeMap.get("EOF")!; + } else if ( + e instanceof Deno.errors.ConnectionReset || + e instanceof Deno.errors.ConnectionAborted + ) { + nread = codeMap.get("ECONNRESET")!; + } else { + nread = codeMap.get("UNKNOWN")!; + } + + buf = new Uint8Array(0); + } + + nread ??= codeMap.get("EOF")!; + + streamBaseState[kReadBytesOrError] = nread; + + if (nread > 0) { + this.bytesRead += nread; + } + + buf = buf.slice(0, nread); + + streamBaseState[kArrayBufferOffset] = 0; + + try { + this.onread!(buf, nread); + } catch { + // swallow callback errors. + } + + if (nread >= 0 && this.#reading) { + this.#read(); + } + } + + /** + * Internal method for writing to the attached stream. + * @param req A write request wrapper. + * @param data The Uint8Array buffer to write to the stream. + */ + async #write(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array) { + const { byteLength } = data; + + try { + // TODO(crowlKats): duplicate from runtime/js/13_buffer.js + let nwritten = 0; + while (nwritten < data.length) { + nwritten += await this[kStreamBaseField]!.write( + data.subarray(nwritten), + ); + } + } catch (e) { + let status: number; + + // TODO(cmorten): map err to status codes + if ( + e instanceof Deno.errors.BadResource || + e instanceof Deno.errors.BrokenPipe + ) { + status = codeMap.get("EBADF")!; + } else { + status = codeMap.get("UNKNOWN")!; + } + + try { + req.oncomplete(status); + } catch { + // swallow callback errors. + } + + return; + } + + streamBaseState[kBytesWritten] = byteLength; + this.bytesWritten += byteLength; + + try { + req.oncomplete(0); + } catch { + // swallow callback errors. + } + + return; + } +} |