summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/flash/01_http.js418
-rw-r--r--ext/flash/lib.rs28
-rw-r--r--ext/net/01_net.js50
-rw-r--r--ext/net/ops.rs33
-rw-r--r--ext/net/ops_unix.rs27
5 files changed, 315 insertions, 241 deletions
diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js
index 4435860ff..7a6b9bc47 100644
--- a/ext/flash/01_http.js
+++ b/ext/flash/01_http.js
@@ -422,239 +422,241 @@
})();
}
- async function serve(arg1, arg2) {
- let options = undefined;
- let handler = undefined;
- if (arg1 instanceof Function) {
- handler = arg1;
- options = arg2;
- } else if (arg2 instanceof Function) {
- handler = arg2;
- options = arg1;
- } else {
- options = arg1;
- }
- if (handler === undefined) {
+ function createServe(opFn) {
+ return async function serve(arg1, arg2) {
+ let options = undefined;
+ let handler = undefined;
+ if (arg1 instanceof Function) {
+ handler = arg1;
+ options = arg2;
+ } else if (arg2 instanceof Function) {
+ handler = arg2;
+ options = arg1;
+ } else {
+ options = arg1;
+ }
+ if (handler === undefined) {
+ if (options === undefined) {
+ throw new TypeError(
+ "No handler was provided, so an options bag is mandatory.",
+ );
+ }
+ handler = options.handler;
+ }
+ if (!(handler instanceof Function)) {
+ throw new TypeError("A handler function must be provided.");
+ }
if (options === undefined) {
- throw new TypeError(
- "No handler was provided, so an options bag is mandatory.",
- );
+ options = {};
}
- handler = options.handler;
- }
- if (!(handler instanceof Function)) {
- throw new TypeError("A handler function must be provided.");
- }
- if (options === undefined) {
- options = {};
- }
- const signal = options.signal;
+ const signal = options.signal;
- const onError = options.onError ?? function (error) {
- console.error(error);
- return new Response("Internal Server Error", { status: 500 });
- };
-
- const onListen = options.onListen ?? function ({ port }) {
- console.log(
- `Listening on http://${
- hostnameForDisplay(listenOpts.hostname)
- }:${port}/`,
- );
- };
+ const onError = options.onError ?? function (error) {
+ console.error(error);
+ return new Response("Internal Server Error", { status: 500 });
+ };
- const listenOpts = {
- hostname: options.hostname ?? "127.0.0.1",
- port: options.port ?? 9000,
- reuseport: options.reusePort ?? false,
- };
- if (options.cert || options.key) {
- if (!options.cert || !options.key) {
- throw new TypeError(
- "Both cert and key must be provided to enable HTTPS.",
+ const onListen = options.onListen ?? function ({ port }) {
+ console.log(
+ `Listening on http://${
+ hostnameForDisplay(listenOpts.hostname)
+ }:${port}/`,
);
+ };
+
+ const listenOpts = {
+ hostname: options.hostname ?? "127.0.0.1",
+ port: options.port ?? 9000,
+ reuseport: options.reusePort ?? false,
+ };
+ if (options.cert || options.key) {
+ if (!options.cert || !options.key) {
+ throw new TypeError(
+ "Both cert and key must be provided to enable HTTPS.",
+ );
+ }
+ listenOpts.cert = options.cert;
+ listenOpts.key = options.key;
}
- listenOpts.cert = options.cert;
- listenOpts.key = options.key;
- }
- const serverId = core.ops.op_flash_serve(listenOpts);
- const serverPromise = core.opAsync("op_flash_drive_server", serverId);
-
- PromisePrototypeCatch(
- PromisePrototypeThen(
- core.opAsync("op_flash_wait_for_listening", serverId),
- (port) => {
- onListen({ hostname: listenOpts.hostname, port });
- },
- ),
- () => {},
- );
- const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
-
- const server = {
- id: serverId,
- transport: listenOpts.cert && listenOpts.key ? "https" : "http",
- hostname: listenOpts.hostname,
- port: listenOpts.port,
- closed: false,
- finished: finishedPromise,
- async close() {
- if (server.closed) {
- return;
- }
- server.closed = true;
- await core.opAsync("op_flash_close_server", serverId);
- await server.finished;
- },
- async serve() {
- let offset = 0;
- while (true) {
+ const serverId = opFn(listenOpts);
+ const serverPromise = core.opAsync("op_flash_drive_server", serverId);
+
+ PromisePrototypeCatch(
+ PromisePrototypeThen(
+ core.opAsync("op_flash_wait_for_listening", serverId),
+ (port) => {
+ onListen({ hostname: listenOpts.hostname, port });
+ },
+ ),
+ () => {},
+ );
+ const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
+
+ const server = {
+ id: serverId,
+ transport: listenOpts.cert && listenOpts.key ? "https" : "http",
+ hostname: listenOpts.hostname,
+ port: listenOpts.port,
+ closed: false,
+ finished: finishedPromise,
+ async close() {
if (server.closed) {
- break;
+ return;
}
-
- let tokens = nextRequestSync();
- if (tokens === 0) {
- tokens = await core.opAsync("op_flash_next_async", serverId);
+ server.closed = true;
+ await core.opAsync("op_flash_close_server", serverId);
+ await server.finished;
+ },
+ async serve() {
+ let offset = 0;
+ while (true) {
if (server.closed) {
break;
}
- }
- for (let i = offset; i < offset + tokens; i++) {
- let body = null;
- // There might be a body, but we don't expose it for GET/HEAD requests.
- // It will be closed automatically once the request has been handled and
- // the response has been sent.
- const method = getMethodSync(i);
- let hasBody = method > 2; // Not GET/HEAD/CONNECT
- if (hasBody) {
- body = createRequestBodyStream(serverId, i);
- if (body === null) {
- hasBody = false;
+ let tokens = nextRequestSync();
+ if (tokens === 0) {
+ tokens = await core.opAsync("op_flash_next_async", serverId);
+ if (server.closed) {
+ break;
}
}
- const req = fromFlashRequest(
- serverId,
- /* streamRid */
- i,
- body,
- /* methodCb */
- () => methods[method],
- /* urlCb */
- () => {
- const path = core.ops.op_flash_path(serverId, i);
- return `${server.transport}://${server.hostname}:${server.port}${path}`;
- },
- /* headersCb */
- () => core.ops.op_flash_headers(serverId, i),
- );
+ for (let i = offset; i < offset + tokens; i++) {
+ let body = null;
+ // There might be a body, but we don't expose it for GET/HEAD requests.
+ // It will be closed automatically once the request has been handled and
+ // the response has been sent.
+ const method = getMethodSync(i);
+ let hasBody = method > 2; // Not GET/HEAD/CONNECT
+ if (hasBody) {
+ body = createRequestBodyStream(serverId, i);
+ if (body === null) {
+ hasBody = false;
+ }
+ }
- let resp;
- try {
- resp = handler(req);
- if (resp instanceof Promise) {
- PromisePrototypeCatch(
- PromisePrototypeThen(
- resp,
- (resp) =>
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- ),
- ),
- onError,
- );
- continue;
- } else if (typeof resp?.then === "function") {
- resp.then((resp) =>
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- )
- ).catch(onError);
- continue;
+ const req = fromFlashRequest(
+ serverId,
+ /* streamRid */
+ i,
+ body,
+ /* methodCb */
+ () => methods[method],
+ /* urlCb */
+ () => {
+ const path = core.ops.op_flash_path(serverId, i);
+ return `${server.transport}://${server.hostname}:${server.port}${path}`;
+ },
+ /* headersCb */
+ () => core.ops.op_flash_headers(serverId, i),
+ );
+
+ let resp;
+ try {
+ resp = handler(req);
+ if (resp instanceof Promise) {
+ PromisePrototypeCatch(
+ PromisePrototypeThen(
+ resp,
+ (resp) =>
+ handleResponse(
+ req,
+ resp,
+ body,
+ hasBody,
+ method,
+ serverId,
+ i,
+ respondFast,
+ respondChunked,
+ ),
+ ),
+ onError,
+ );
+ continue;
+ } else if (typeof resp?.then === "function") {
+ resp.then((resp) =>
+ handleResponse(
+ req,
+ resp,
+ body,
+ hasBody,
+ method,
+ serverId,
+ i,
+ respondFast,
+ respondChunked,
+ )
+ ).catch(onError);
+ continue;
+ }
+ } catch (e) {
+ resp = await onError(e);
}
- } catch (e) {
- resp = await onError(e);
+
+ handleResponse(
+ req,
+ resp,
+ body,
+ hasBody,
+ method,
+ serverId,
+ i,
+ respondFast,
+ respondChunked,
+ );
}
- handleResponse(
- req,
- resp,
- body,
- hasBody,
- method,
- serverId,
- i,
- respondFast,
- respondChunked,
- );
+ offset += tokens;
}
+ await server.finished;
+ },
+ };
+
+ signal?.addEventListener("abort", () => {
+ clearInterval(dateInterval);
+ PromisePrototypeThen(server.close(), () => {}, () => {});
+ }, {
+ once: true,
+ });
+
+ function respondChunked(token, chunk, shutdown) {
+ return core.opAsync(
+ "op_flash_respond_chuncked",
+ serverId,
+ token,
+ chunk,
+ shutdown,
+ );
+ }
- offset += tokens;
- }
- await server.finished;
- },
- };
-
- signal?.addEventListener("abort", () => {
- clearInterval(dateInterval);
- PromisePrototypeThen(server.close(), () => {}, () => {});
- }, {
- once: true,
- });
-
- function respondChunked(token, chunk, shutdown) {
- return core.opAsync(
- "op_flash_respond_chuncked",
- serverId,
- token,
- chunk,
- shutdown,
- );
- }
-
- const fastOp = prepareFastCalls();
- let nextRequestSync = () => fastOp.nextRequest();
- let getMethodSync = (token) => fastOp.getMethod(token);
- let respondFast = (token, response, shutdown) =>
- fastOp.respond(token, response, shutdown);
- if (serverId > 0) {
- nextRequestSync = () => core.ops.op_flash_next_server(serverId);
- getMethodSync = (token) => core.ops.op_flash_method(serverId, token);
- respondFast = (token, response, shutdown) =>
- core.ops.op_flash_respond(serverId, token, response, null, shutdown);
- }
+ const fastOp = prepareFastCalls();
+ let nextRequestSync = () => fastOp.nextRequest();
+ let getMethodSync = (token) => fastOp.getMethod(token);
+ let respondFast = (token, response, shutdown) =>
+ fastOp.respond(token, response, shutdown);
+ if (serverId > 0) {
+ nextRequestSync = () => core.ops.op_flash_next_server(serverId);
+ getMethodSync = (token) => core.ops.op_flash_method(serverId, token);
+ respondFast = (token, response, shutdown) =>
+ core.ops.op_flash_respond(serverId, token, response, null, shutdown);
+ }
- if (!dateInterval) {
- date = new Date().toUTCString();
- dateInterval = setInterval(() => {
+ if (!dateInterval) {
date = new Date().toUTCString();
- }, 1000);
- }
+ dateInterval = setInterval(() => {
+ date = new Date().toUTCString();
+ }, 1000);
+ }
- await SafePromiseAll([
- PromisePrototypeCatch(server.serve(), console.error),
- serverPromise,
- ]);
+ await SafePromiseAll([
+ PromisePrototypeCatch(server.serve(), console.error),
+ serverPromise,
+ ]);
+ };
}
function createRequestBodyStream(serverId, token) {
@@ -722,7 +724,7 @@
}
window.__bootstrap.flash = {
- serve,
+ createServe,
upgradeHttpRaw,
};
})(this);
diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs
index 1f3686760..b077b8d21 100644
--- a/ext/flash/lib.rs
+++ b/ext/flash/lib.rs
@@ -1171,15 +1171,13 @@ pub fn resolve_addr_sync(
Ok(result)
}
-#[op]
-fn op_flash_serve<P>(
+fn flash_serve<P>(
state: &mut OpState,
opts: ListenOpts,
) -> Result<u32, AnyError>
where
P: FlashPermissions + 'static,
{
- check_unstable(state, "Deno.serve");
state
.borrow_mut::<P>()
.check_net(&(&opts.hostname, Some(opts.port)), "Deno.serve()")?;
@@ -1224,6 +1222,29 @@ where
}
#[op]
+fn op_flash_serve<P>(
+ state: &mut OpState,
+ opts: ListenOpts,
+) -> Result<u32, AnyError>
+where
+ P: FlashPermissions + 'static,
+{
+ check_unstable(state, "Deno.serve");
+ flash_serve::<P>(state, opts)
+}
+
+#[op]
+fn op_node_unstable_flash_serve<P>(
+ state: &mut OpState,
+ opts: ListenOpts,
+) -> Result<u32, AnyError>
+where
+ P: FlashPermissions + 'static,
+{
+ flash_serve::<P>(state, opts)
+}
+
+#[op]
fn op_flash_wait_for_listening(
state: &mut OpState,
server_id: u32,
@@ -1445,6 +1466,7 @@ pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
))
.ops(vec![
op_flash_serve::decl::<P>(),
+ op_node_unstable_flash_serve::decl::<P>(),
op_flash_respond::decl(),
op_flash_respond_async::decl(),
op_flash_respond_chuncked::decl(),
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 765b94035..971ec2e8b 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -319,30 +319,32 @@
}
}
- function listenDatagram(args) {
- switch (args.transport) {
- case "udp": {
- const [rid, addr] = ops.op_net_listen_udp(
- {
- hostname: args.hostname ?? "127.0.0.1",
- port: args.port,
- },
- args.reuseAddress ?? false,
- );
- addr.transport = "udp";
- return new Datagram(rid, addr);
- }
- case "unixpacket": {
- const [rid, path] = ops.op_net_listen_unixpacket(args.path);
- const addr = {
- transport: "unixpacket",
- path,
- };
- return new Datagram(rid, addr);
+ function createListenDatagram(udpOpFn, unixOpFn) {
+ return function listenDatagram(args) {
+ switch (args.transport) {
+ case "udp": {
+ const [rid, addr] = udpOpFn(
+ {
+ hostname: args.hostname ?? "127.0.0.1",
+ port: args.port,
+ },
+ args.reuseAddress ?? false,
+ );
+ addr.transport = "udp";
+ return new Datagram(rid, addr);
+ }
+ case "unixpacket": {
+ const [rid, path] = unixOpFn(args.path);
+ const addr = {
+ transport: "unixpacket",
+ path,
+ };
+ return new Datagram(rid, addr);
+ }
+ default:
+ throw new TypeError(`Unsupported transport: '${transport}'`);
}
- default:
- throw new TypeError(`Unsupported transport: '${transport}'`);
- }
+ };
}
async function connect(args) {
@@ -389,7 +391,7 @@
TcpConn,
UnixConn,
listen,
- listenDatagram,
+ createListenDatagram,
Listener,
shutdown,
Datagram,
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
index e6420bf9e..96de8cff1 100644
--- a/ext/net/ops.rs
+++ b/ext/net/ops.rs
@@ -53,10 +53,13 @@ pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> {
crate::ops_unix::op_net_connect_unix::decl::<P>(),
op_net_listen_tcp::decl::<P>(),
op_net_listen_udp::decl::<P>(),
+ op_node_unstable_net_listen_udp::decl::<P>(),
#[cfg(unix)]
crate::ops_unix::op_net_listen_unix::decl::<P>(),
#[cfg(unix)]
crate::ops_unix::op_net_listen_unixpacket::decl::<P>(),
+ #[cfg(unix)]
+ crate::ops_unix::op_node_unstable_net_listen_unixpacket::decl::<P>(),
op_net_recv_udp::decl(),
#[cfg(unix)]
crate::ops_unix::op_net_recv_unixpacket::decl(),
@@ -288,8 +291,7 @@ where
Ok((rid, IpAddr::from(local_addr)))
}
-#[op]
-fn op_net_listen_udp<NP>(
+fn net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
@@ -297,7 +299,6 @@ fn op_net_listen_udp<NP>(
where
NP: NetPermissions + 'static,
{
- super::check_unstable(state, "Deno.listenDatagram");
state
.borrow_mut::<NP>()
.check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenDatagram()")?;
@@ -343,6 +344,32 @@ where
Ok((rid, IpAddr::from(local_addr)))
}
+#[op]
+fn op_net_listen_udp<NP>(
+ state: &mut OpState,
+ addr: IpAddr,
+ reuse_address: bool,
+) -> Result<(ResourceId, IpAddr), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ super::check_unstable(state, "Deno.listenDatagram");
+ net_listen_udp::<NP>(state, addr, reuse_address)
+}
+
+#[op]
+fn op_node_unstable_net_listen_udp<NP>(
+ state: &mut OpState,
+ addr: IpAddr,
+ reuse_address: bool,
+) -> Result<(ResourceId, IpAddr), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ super::check_unstable(state, "Deno.listenDatagram");
+ net_listen_udp::<NP>(state, addr, reuse_address)
+}
+
#[derive(Serialize, Eq, PartialEq, Debug)]
#[serde(untagged)]
pub enum DnsReturnRecord {
diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs
index b45b02343..bf03f4015 100644
--- a/ext/net/ops_unix.rs
+++ b/ext/net/ops_unix.rs
@@ -209,8 +209,7 @@ where
Ok((rid, pathname))
}
-#[op]
-pub fn op_net_listen_unixpacket<NP>(
+pub fn net_listen_unixpacket<NP>(
state: &mut OpState,
path: String,
) -> Result<(ResourceId, Option<String>), AnyError>
@@ -218,7 +217,6 @@ where
NP: NetPermissions + 'static,
{
let address_path = Path::new(&path);
- super::check_unstable(state, "Deno.listenDatagram");
let permissions = state.borrow_mut::<NP>();
permissions.check_read(address_path, "Deno.listenDatagram()")?;
permissions.check_write(address_path, "Deno.listenDatagram()")?;
@@ -233,6 +231,29 @@ where
Ok((rid, pathname))
}
+#[op]
+pub fn op_net_listen_unixpacket<NP>(
+ state: &mut OpState,
+ path: String,
+) -> Result<(ResourceId, Option<String>), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ super::check_unstable(state, "Deno.listenDatagram");
+ net_listen_unixpacket::<NP>(state, path)
+}
+
+#[op]
+pub fn op_node_unstable_net_listen_unixpacket<NP>(
+ state: &mut OpState,
+ path: String,
+) -> Result<(ResourceId, Option<String>), AnyError>
+where
+ NP: NetPermissions + 'static,
+{
+ net_listen_unixpacket::<NP>(state, path)
+}
+
pub fn pathstring(pathname: &Path) -> Result<String, AnyError> {
into_string(pathname.into())
}