summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/node/lib.rs15
-rw-r--r--ext/node/ops/http.rs12
-rw-r--r--ext/node/polyfills/_http_outgoing.ts50
-rw-r--r--ext/node/polyfills/http.ts230
4 files changed, 132 insertions, 175 deletions
diff --git a/ext/node/lib.rs b/ext/node/lib.rs
index 4c81351a8..c8242992a 100644
--- a/ext/node/lib.rs
+++ b/ext/node/lib.rs
@@ -9,6 +9,7 @@ use deno_core::error::AnyError;
use deno_core::located_script_name;
use deno_core::op;
use deno_core::serde_json;
+use deno_core::url::Url;
use deno_core::JsRuntime;
use deno_core::ModuleSpecifier;
use deno_fs::sync::MaybeSend;
@@ -41,12 +42,24 @@ pub use resolution::NodeResolutionMode;
pub use resolution::NodeResolver;
pub trait NodePermissions {
+ fn check_net_url(
+ &mut self,
+ url: &Url,
+ api_name: &str,
+ ) -> Result<(), AnyError>;
fn check_read(&self, path: &Path) -> Result<(), AnyError>;
}
pub(crate) struct AllowAllNodePermissions;
impl NodePermissions for AllowAllNodePermissions {
+ fn check_net_url(
+ &mut self,
+ _url: &Url,
+ _api_name: &str,
+ ) -> Result<(), AnyError> {
+ Ok(())
+ }
fn check_read(&self, _path: &Path) -> Result<(), AnyError> {
Ok(())
}
@@ -206,7 +219,7 @@ deno_core::extension!(deno_node,
ops::zlib::op_zlib_write_async,
ops::zlib::op_zlib_init,
ops::zlib::op_zlib_reset,
- ops::http::op_node_http_request,
+ ops::http::op_node_http_request<P>,
op_node_build_os,
ops::require::op_require_init_paths,
ops::require::op_require_node_module_paths<P>,
diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs
index 2039fb388..cc7dbf522 100644
--- a/ext/node/ops/http.rs
+++ b/ext/node/ops/http.rs
@@ -24,14 +24,17 @@ use reqwest::Body;
use reqwest::Method;
#[op]
-pub fn op_node_http_request(
+pub fn op_node_http_request<P>(
state: &mut OpState,
method: ByteString,
url: String,
headers: Vec<(ByteString, ByteString)>,
client_rid: Option<u32>,
has_body: bool,
-) -> Result<FetchReturn, AnyError> {
+) -> Result<FetchReturn, AnyError>
+where
+ P: crate::NodePermissions + 'static,
+{
let client = if let Some(rid) = client_rid {
let r = state.resource_table.get::<HttpClientResource>(rid)?;
r.client.clone()
@@ -42,6 +45,11 @@ pub fn op_node_http_request(
let method = Method::from_bytes(&method)?;
let url = Url::parse(&url)?;
+ {
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_net_url(&url, "ClientRequest")?;
+ }
+
let mut header_map = HeaderMap::new();
for (key, value) in headers {
let name = HeaderName::from_bytes(&key)
diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts
index c4b88ae2f..ab6a78038 100644
--- a/ext/node/polyfills/_http_outgoing.ts
+++ b/ext/node/polyfills/_http_outgoing.ts
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+const core = globalThis.__bootstrap.core;
import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs";
import assert from "ext:deno_node/internal/assert.mjs";
import EE from "ext:deno_node/events.ts";
@@ -137,12 +138,6 @@ export class OutgoingMessage extends Stream {
this._keepAliveTimeout = 0;
this._onPendingData = nop;
-
- this.stream = new ReadableStream({
- start: (controller) => {
- this.controller = controller;
- },
- });
}
get writableFinished() {
@@ -374,21 +369,30 @@ export class OutgoingMessage extends Stream {
return headers;
}
- controller: ReadableStreamDefaultController;
write(
chunk: string | Uint8Array | Buffer,
encoding: string | null,
- // TODO(crowlKats): use callback
- _callback: () => void,
+ callback: () => void,
): boolean {
- if (typeof chunk === "string") {
- chunk = Buffer.from(chunk, encoding);
- }
- if (chunk instanceof Buffer) {
- chunk = new Uint8Array(chunk.buffer);
- }
+ if (
+ (typeof chunk === "string" && chunk.length > 0) ||
+ ((chunk instanceof Buffer || chunk instanceof Uint8Array) &&
+ chunk.buffer.byteLength > 0)
+ ) {
+ if (typeof chunk === "string") {
+ chunk = Buffer.from(chunk, encoding);
+ }
+ if (chunk instanceof Buffer) {
+ chunk = new Uint8Array(chunk.buffer);
+ }
- this.controller.enqueue(chunk);
+ core.writeAll(this._bodyWriteRid, chunk).then(() => {
+ callback?.();
+ this.emit("drain");
+ }).catch((e) => {
+ this._requestSendError = e;
+ });
+ }
return false;
}
@@ -400,18 +404,8 @@ export class OutgoingMessage extends Stream {
}
// deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: any, _callback: any) {
- if (typeof chunk === "function") {
- callback = chunk;
- chunk = null;
- encoding = null;
- } else if (typeof encoding === "function") {
- callback = encoding;
- encoding = null;
- }
- // TODO(crowlKats): finish
-
- return this;
+ end(_chunk: any, _encoding: any, _callback: any) {
+ notImplemented("OutgoingMessage.end");
}
flushHeaders() {
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts
index 93c802d37..4e72b80f8 100644
--- a/ext/node/polyfills/http.ts
+++ b/ext/node/polyfills/http.ts
@@ -38,6 +38,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
+import { notImplemented } from "ext:deno_node/_utils.ts";
import {
connResetException,
ERR_HTTP_HEADERS_SENT,
@@ -500,6 +501,14 @@ class ClientRequest extends OutgoingMessage {
delete optsWithoutSignal.signal;
}
+ if (options!.createConnection) {
+ notImplemented("ClientRequest.options.createConnection");
+ }
+
+ if (options!.lookup) {
+ notImplemented("ClientRequest.options.lookup");
+ }
+
// initiate connection
// TODO(crowlKats): finish this
/*if (this.agent) {
@@ -547,61 +556,14 @@ class ClientRequest extends OutgoingMessage {
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;
- const req = core.ops.op_node_http_request(
+ this._req = core.ops.op_node_http_request(
this.method,
url,
headers,
client.rid,
this.method === "POST" || this.method === "PATCH",
);
-
- this._req = req;
-
- if (req.requestBodyRid !== null) {
- const reader = this.stream.getReader();
- (async () => {
- let done = false;
- while (!done) {
- let val;
- try {
- const res = await reader.read();
- done = res.done;
- val = res.value;
- } catch (err) {
- //if (terminator.aborted) break;
- // TODO(lucacasonato): propagate error into response body stream
- this._requestSendError = err;
- this._requestSendErrorSet = true;
- break;
- }
- if (done) break;
- try {
- await core.writeAll(req.requestBodyRid, val);
- } catch (err) {
- //if (terminator.aborted) break;
- await reader.cancel(err);
- // TODO(lucacasonato): propagate error into response body stream
- this._requestSendError = err;
- this._requestSendErrorSet = true;
- break;
- }
- }
- if (done /*&& !terminator.aborted*/) {
- try {
- await core.shutdown(req.requestBodyRid);
- } catch (err) {
- // TODO(bartlomieju): fix this conditional
- // deno-lint-ignore no-constant-condition
- if (true) {
- this._requestSendError = err;
- this._requestSendErrorSet = true;
- }
- }
- }
- //WeakMapPrototypeDelete(requestBodyReaders, req);
- core.tryClose(req.requestBodyRid);
- })();
- }
+ this._bodyWriteRid = this._req.requestBodyRid;
}
_getClient(): Deno.HttpClient | undefined {
@@ -645,112 +607,92 @@ class ClientRequest extends OutgoingMessage {
}
}
- // TODO(bartlomieju): use callback here
// deno-lint-ignore no-explicit-any
- end(chunk?: any, encoding?: any, _cb?: any): this {
+ end(chunk?: any, encoding?: any, cb?: any): this {
this.finished = true;
-
- if (chunk !== undefined) {
+ if (chunk !== undefined && chunk !== null) {
this.write(chunk, encoding);
}
- this.controller.close();
- core.opAsync("op_fetch_send", this._req.requestRid).then((res) => {
- if (this._timeout) {
- this._timeout.onabort = null;
- }
- this._client.close();
- const incoming = new IncomingMessageForClient(this.socket);
-
- // TODO(@crowlKats):
- // incoming.httpVersionMajor = versionMajor;
- // incoming.httpVersionMinor = versionMinor;
- // incoming.httpVersion = `${versionMajor}.${versionMinor}`;
- // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders ||
- // parser.joinDuplicateHeaders;
-
- incoming.url = res.url;
- incoming.statusCode = res.status;
- incoming.statusMessage = res.statusText;
-
- incoming._addHeaderLines(
- res.headers,
- Object.entries(res.headers).flat().length,
- );
- incoming._bodyRid = res.responseRid;
+ (async () => {
+ try {
+ const [res, _] = await Promise.all([
+ core.opAsync("op_fetch_send", this._req.requestRid),
+ (async () => {
+ if (this._bodyWriteRid) {
+ try {
+ await core.shutdown(this._bodyWriteRid);
+ } catch (err) {
+ this._requestSendError = err;
+ }
+
+ core.tryClose(this._bodyWriteRid);
+
+ try {
+ cb?.();
+ } catch (_) {
+ //
+ }
+ }
+ })(),
+ ]);
+ if (this._timeout) {
+ this._timeout.onabort = null;
+ }
+ this._client.close();
+ const incoming = new IncomingMessageForClient(this.socket);
+
+ // TODO(@crowlKats):
+ // incoming.httpVersionMajor = versionMajor;
+ // incoming.httpVersionMinor = versionMinor;
+ // incoming.httpVersion = `${versionMajor}.${versionMinor}`;
+ // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders ||
+ // parser.joinDuplicateHeaders;
+
+ incoming.url = res.url;
+ incoming.statusCode = res.status;
+ incoming.statusMessage = res.statusText;
+
+ incoming._addHeaderLines(
+ res.headers,
+ Object.entries(res.headers).flat().length,
+ );
+ incoming._bodyRid = res.responseRid;
- if (this._req.cancelHandleRid !== null) {
- core.tryClose(this._req.cancelHandleRid);
- }
+ if (this._req.cancelHandleRid !== null) {
+ core.tryClose(this._req.cancelHandleRid);
+ }
- this.emit("response", incoming);
- }).catch((err) => {
- if (this._req.cancelHandleRid !== null) {
- core.tryClose(this._req.cancelHandleRid);
- }
+ this.emit("response", incoming);
+ } catch (err) {
+ if (this._req.cancelHandleRid !== null) {
+ core.tryClose(this._req.cancelHandleRid);
+ }
- if (this._requestSendErrorSet) {
- // if the request body stream errored, we want to propagate that error
- // instead of the original error from opFetchSend
- throw new TypeError("Failed to fetch: request body stream errored", {
- cause: this._requestSendError,
- });
- }
+ if (this._requestSendError !== undefined) {
+ // if the request body stream errored, we want to propagate that error
+ // instead of the original error from opFetchSend
+ throw new TypeError(
+ "Failed to fetch: request body stream errored",
+ {
+ cause: this._requestSendError,
+ },
+ );
+ }
- if (err.message.includes("connection closed before message completed")) {
- // Node.js seems ignoring this error
- } else if (err.message.includes("The signal has been aborted")) {
- // Remap this error
- this.emit("error", connResetException("socket hang up"));
- } else {
- this.emit("error", err);
+ if (
+ err.message.includes("connection closed before message completed")
+ ) {
+ // Node.js seems ignoring this error
+ } else if (err.message.includes("The signal has been aborted")) {
+ // Remap this error
+ this.emit("error", connResetException("socket hang up"));
+ } else {
+ this.emit("error", err);
+ }
}
- });
+ })();
}
- /*
- override async _final() {
- if (this.controller) {
- this.controller.close();
- }
-
- const body = await this._createBody(this.body, this.opts);
- const client = await this._createCustomClient();
- const opts = {
- body,
- method: this.opts.method,
- client,
- headers: this.opts.headers,
- signal: this.opts.signal ?? undefined,
- };
- const mayResponse = fetch(this._createUrlStrFromOptions(this.opts), opts)
- .catch((e) => {
- if (e.message.includes("connection closed before message completed")) {
- // Node.js seems ignoring this error
- } else if (e.message.includes("The signal has been aborted")) {
- // Remap this error
- this.emit("error", connResetException("socket hang up"));
- } else {
- this.emit("error", e);
- }
- return undefined;
- });
-
- const res = new IncomingMessageForClient(
- await mayResponse,
- this._createSocket(),
- );
- this.emit("response", res);
- if (client) {
- res.on("end", () => {
- client.close();
- });
- }
- if (this.opts.timeout != undefined) {
- clearTimeout(this.opts.timeout);
- this.opts.timeout = undefined;
- }
- this.cb?.(res);
- }*/
abort() {
if (this.aborted) {