summaryrefslogtreecommitdiff
path: root/cli/js/web/streams/writable_stream.ts
blob: 8a31f512d3892dc4b124418350ff568e1474fc20 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.

import {
  AbortRequest,
  acquireWritableStreamDefaultWriter,
  Deferred,
  initializeWritableStream,
  isWritableStream,
  isWritableStreamLocked,
  makeSizeAlgorithmFromSizeFunction,
  setFunctionName,
  setUpWritableStreamDefaultControllerFromUnderlyingSink,
  writableStreamAbort,
  writableStreamClose,
  writableStreamCloseQueuedOrInFlight,
  validateAndNormalizeHighWaterMark,
} from "./internals.ts";
import * as sym from "./symbols.ts";
import type { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
import type { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
import { customInspect } from "../console.ts";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export class WritableStreamImpl<W = any> implements WritableStream<W> {
  [sym.backpressure]: boolean;
  [sym.closeRequest]?: Deferred<void>;
  [sym.inFlightWriteRequest]?: Required<Deferred<void>>;
  [sym.inFlightCloseRequest]?: Deferred<void>;
  [sym.pendingAbortRequest]?: AbortRequest;
  [sym.state]: "writable" | "closed" | "erroring" | "errored";
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  [sym.storedError]?: any;
  [sym.writableStreamController]?: WritableStreamDefaultControllerImpl<W>;
  [sym.writer]?: WritableStreamDefaultWriterImpl<W>;
  [sym.writeRequests]: Array<Required<Deferred<void>>>;

  constructor(
    underlyingSink: UnderlyingSink = {},
    strategy: QueuingStrategy = {}
  ) {
    initializeWritableStream(this);
    const size = strategy.size;
    let highWaterMark = strategy.highWaterMark ?? 1;
    const { type } = underlyingSink;
    if (type !== undefined) {
      throw new RangeError(`Sink type of "${String(type)}" not supported.`);
    }
    const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
    highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
    setUpWritableStreamDefaultControllerFromUnderlyingSink(
      this,
      underlyingSink,
      highWaterMark,
      sizeAlgorithm
    );
  }

  get locked(): boolean {
    if (!isWritableStream(this)) {
      throw new TypeError("Invalid WritableStream.");
    }
    return isWritableStreamLocked(this);
  }

  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  abort(reason: any): Promise<void> {
    if (!isWritableStream(this)) {
      return Promise.reject(new TypeError("Invalid WritableStream."));
    }
    if (isWritableStreamLocked(this)) {
      return Promise.reject(
        new TypeError("Cannot abort a locked WritableStream.")
      );
    }
    return writableStreamAbort(this, reason);
  }

  close(): Promise<void> {
    if (!isWritableStream(this)) {
      return Promise.reject(new TypeError("Invalid WritableStream."));
    }
    if (isWritableStreamLocked(this)) {
      return Promise.reject(
        new TypeError("Cannot abort a locked WritableStream.")
      );
    }
    if (writableStreamCloseQueuedOrInFlight(this)) {
      return Promise.reject(
        new TypeError("Cannot close an already closing WritableStream.")
      );
    }
    return writableStreamClose(this);
  }

  getWriter(): WritableStreamDefaultWriter<W> {
    if (!isWritableStream(this)) {
      throw new TypeError("Invalid WritableStream.");
    }
    return acquireWritableStreamDefaultWriter(this);
  }

  [customInspect](): string {
    return `${this.constructor.name} { locked: ${String(this.locked)} }`;
  }
}

setFunctionName(WritableStreamImpl, "WritableStream");