summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock42
-rw-r--r--Cargo.toml3
-rw-r--r--cli/build.rs2
-rw-r--r--cli/tests/integration/js_unit_tests.rs1
-rw-r--r--cli/tests/unit/cron_test.ts242
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts25
-rw-r--r--ext/cron/01_cron.ts58
-rw-r--r--ext/cron/Cargo.toml23
-rw-r--r--ext/cron/interface.rs23
-rw-r--r--ext/cron/lib.rs128
-rw-r--r--ext/cron/local.rs343
-rw-r--r--ext/cron/time.rs19
-rw-r--r--runtime/Cargo.toml2
-rw-r--r--runtime/build.rs3
-rw-r--r--runtime/js/90_deno_ns.js2
-rw-r--r--runtime/lib.rs1
-rw-r--r--runtime/web_worker.rs2
-rw-r--r--runtime/worker.rs2
-rw-r--r--tools/core_import_map.json1
19 files changed, 918 insertions, 4 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 53e6438cd..8f2cfeff0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -205,7 +205,7 @@ dependencies = [
"asn1-rs-derive",
"asn1-rs-impl",
"displaydoc",
- "nom",
+ "nom 7.1.3",
"num-traits",
"rusticata-macros",
"thiserror",
@@ -1281,6 +1281,19 @@ dependencies = [
]
[[package]]
+name = "deno_cron"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "chrono",
+ "deno_core",
+ "deno_unsync 0.1.1",
+ "saffron",
+ "tokio",
+]
+
+[[package]]
name = "deno_crypto"
version = "0.135.0"
dependencies = [
@@ -1696,6 +1709,7 @@ dependencies = [
"deno_cache",
"deno_console",
"deno_core",
+ "deno_cron",
"deno_crypto",
"deno_fetch",
"deno_ffi",
@@ -1967,7 +1981,7 @@ checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e"
dependencies = [
"asn1-rs",
"displaydoc",
- "nom",
+ "nom 7.1.3",
"num-bigint",
"num-traits",
"rusticata-macros",
@@ -3682,6 +3696,16 @@ dependencies = [
[[package]]
name = "nom"
+version = "5.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08959a387a676302eebf4ddbcbc611da04285579f76f88ee0506c63b1a61dd4b"
+dependencies = [
+ "memchr",
+ "version_check",
+]
+
+[[package]]
+name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
@@ -4747,7 +4771,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
dependencies = [
- "nom",
+ "nom 7.1.3",
]
[[package]]
@@ -4889,6 +4913,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072"
[[package]]
+name = "saffron"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "03fb9a628596fc7590eb7edbf7b0613287be78df107f5f97b118aad59fb2eea9"
+dependencies = [
+ "chrono",
+ "nom 5.1.3",
+]
+
+[[package]]
name = "salsa20"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -7033,7 +7067,7 @@ dependencies = [
"data-encoding",
"der-parser",
"lazy_static",
- "nom",
+ "nom 7.1.3",
"oid-registry",
"rusticata-macros",
"thiserror",
diff --git a/Cargo.toml b/Cargo.toml
index d2a4f6638..d6c8c81cd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,6 +13,7 @@ members = [
"ext/broadcast_channel",
"ext/cache",
"ext/console",
+ "ext/cron",
"ext/crypto",
"ext/fetch",
"ext/ffi",
@@ -56,6 +57,7 @@ denokv_remote = "0.2.3"
deno_broadcast_channel = { version = "0.115.0", path = "./ext/broadcast_channel" }
deno_cache = { version = "0.53.0", path = "./ext/cache" }
deno_console = { version = "0.121.0", path = "./ext/console" }
+deno_cron = { version = "0.1.0", path = "./ext/cron" }
deno_crypto = { version = "0.135.0", path = "./ext/crypto" }
deno_fetch = { version = "0.145.0", path = "./ext/fetch" }
deno_ffi = { version = "0.108.0", path = "./ext/ffi" }
@@ -133,6 +135,7 @@ rustls-webpki = "0.101.4"
rustls-native-certs = "0.6.2"
webpki-roots = "0.25.2"
scopeguard = "1.2.0"
+saffron = "=0.1.0"
serde = { version = "1.0.149", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0.85"
diff --git a/cli/build.rs b/cli/build.rs
index e6b9dc0a4..b7a452465 100644
--- a/cli/build.rs
+++ b/cli/build.rs
@@ -349,6 +349,7 @@ deno_core::extension!(
fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput {
use deno_core::Extension;
use deno_runtime::deno_cache::SqliteBackedCache;
+ use deno_runtime::deno_cron::local::LocalCronHandler;
use deno_runtime::deno_http::DefaultHttpPropertyExtractor;
use deno_runtime::deno_kv::sqlite::SqliteDbHandler;
use deno_runtime::permissions::PermissionsContainer;
@@ -383,6 +384,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput {
deno_kv::deno_kv::init_ops(SqliteDbHandler::<PermissionsContainer>::new(
None, None,
)),
+ deno_cron::deno_cron::init_ops(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops(Default::default()),
diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs
index f110f8aa6..863776aa2 100644
--- a/cli/tests/integration/js_unit_tests.rs
+++ b/cli/tests/integration/js_unit_tests.rs
@@ -24,6 +24,7 @@ util::unit_test_factory!(
console_test,
copy_file_test,
custom_event_test,
+ cron_test,
dir_test,
dom_exception_test,
error_stack_test,
diff --git a/cli/tests/unit/cron_test.ts b/cli/tests/unit/cron_test.ts
new file mode 100644
index 000000000..636a04fd2
--- /dev/null
+++ b/cli/tests/unit/cron_test.ts
@@ -0,0 +1,242 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+import { assertEquals, assertThrows, deferred } from "./test_util.ts";
+
+const sleep = (time: number) => new Promise((r) => setTimeout(r, time));
+
+Deno.test(function noNameTest() {
+ assertThrows(
+ // @ts-ignore test
+ () => Deno.cron(),
+ TypeError,
+ "Deno.cron requires a unique name",
+ );
+});
+
+Deno.test(function noSchedule() {
+ assertThrows(
+ // @ts-ignore test
+ () => Deno.cron("foo"),
+ TypeError,
+ "Deno.cron requires a valid schedule",
+ );
+});
+
+Deno.test(function noHandler() {
+ assertThrows(
+ // @ts-ignore test
+ () => Deno.cron("foo", "*/1 * * * *"),
+ TypeError,
+ "Deno.cron requires a handler",
+ );
+});
+
+Deno.test(function invalidNameTest() {
+ assertThrows(
+ () => Deno.cron("abc[]", "*/1 * * * *", () => {}),
+ TypeError,
+ "Invalid cron name",
+ );
+ assertThrows(
+ () => Deno.cron("a**bc", "*/1 * * * *", () => {}),
+ TypeError,
+ "Invalid cron name",
+ );
+ assertThrows(
+ () => Deno.cron("abc<>", "*/1 * * * *", () => {}),
+ TypeError,
+ "Invalid cron name",
+ );
+ assertThrows(
+ () => Deno.cron(";']", "*/1 * * * *", () => {}),
+ TypeError,
+ "Invalid cron name",
+ );
+ assertThrows(
+ () =>
+ Deno.cron(
+ "0000000000000000000000000000000000000000000000000000000000000000000000",
+ "*/1 * * * *",
+ () => {},
+ ),
+ TypeError,
+ "Cron name is too long",
+ );
+});
+
+Deno.test(function invalidScheduleTest() {
+ assertThrows(
+ () => Deno.cron("abc", "bogus", () => {}),
+ TypeError,
+ "Invalid cron schedule",
+ );
+ assertThrows(
+ () => Deno.cron("abc", "* * * * * *", () => {}),
+ TypeError,
+ "Invalid cron schedule",
+ );
+ assertThrows(
+ () => Deno.cron("abc", "* * * *", () => {}),
+ TypeError,
+ "Invalid cron schedule",
+ );
+ assertThrows(
+ () => Deno.cron("abc", "m * * * *", () => {}),
+ TypeError,
+ "Invalid cron schedule",
+ );
+});
+
+Deno.test(function invalidBackoffScheduleTest() {
+ assertThrows(
+ () =>
+ Deno.cron("abc", "*/1 * * * *", () => {}, {
+ backoffSchedule: [1, 1, 1, 1, 1, 1],
+ }),
+ TypeError,
+ "Invalid backoff schedule",
+ );
+ assertThrows(
+ () =>
+ Deno.cron("abc", "*/1 * * * *", () => {}, {
+ backoffSchedule: [3600001],
+ }),
+ TypeError,
+ "Invalid backoff schedule",
+ );
+});
+
+Deno.test(async function tooManyCrons() {
+ const crons: Promise<void>[] = [];
+ const ac = new AbortController();
+ for (let i = 0; i <= 100; i++) {
+ const c = Deno.cron(`abc_${i}`, "*/1 * * * *", () => {}, {
+ signal: ac.signal,
+ });
+ crons.push(c);
+ }
+
+ try {
+ assertThrows(
+ () => {
+ Deno.cron("next-cron", "*/1 * * * *", () => {}, { signal: ac.signal });
+ },
+ TypeError,
+ "Too many crons",
+ );
+ } finally {
+ ac.abort();
+ for (const c of crons) {
+ await c;
+ }
+ }
+});
+
+Deno.test(async function duplicateCrons() {
+ const ac = new AbortController();
+ const c = Deno.cron("abc", "*/20 * * * *", () => {
+ }, { signal: ac.signal });
+ try {
+ assertThrows(
+ () => Deno.cron("abc", "*/20 * * * *", () => {}),
+ TypeError,
+ "Cron with this name already exists",
+ );
+ } finally {
+ ac.abort();
+ await c;
+ }
+});
+
+Deno.test(async function basicTest() {
+ Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100");
+
+ let count = 0;
+ const promise = deferred();
+ const ac = new AbortController();
+ const c = Deno.cron("abc", "*/20 * * * *", () => {
+ count++;
+ if (count > 5) {
+ promise.resolve();
+ }
+ }, { signal: ac.signal });
+ try {
+ await promise;
+ } finally {
+ ac.abort();
+ await c;
+ }
+});
+
+Deno.test(async function multipleCrons() {
+ Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100");
+
+ let count0 = 0;
+ let count1 = 0;
+ const promise0 = deferred();
+ const promise1 = deferred();
+ const ac = new AbortController();
+ const c0 = Deno.cron("abc", "*/20 * * * *", () => {
+ count0++;
+ if (count0 > 5) {
+ promise0.resolve();
+ }
+ }, { signal: ac.signal });
+ const c1 = Deno.cron("xyz", "*/20 * * * *", () => {
+ count1++;
+ if (count1 > 5) {
+ promise1.resolve();
+ }
+ }, { signal: ac.signal });
+ try {
+ await promise0;
+ await promise1;
+ } finally {
+ ac.abort();
+ await c0;
+ await c1;
+ }
+});
+
+Deno.test(async function overlappingExecutions() {
+ Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100");
+
+ let count = 0;
+ const promise0 = deferred();
+ const promise1 = deferred();
+ const ac = new AbortController();
+ const c = Deno.cron("abc", "*/20 * * * *", async () => {
+ promise0.resolve();
+ count++;
+ await promise1;
+ }, { signal: ac.signal });
+ try {
+ await promise0;
+ } finally {
+ await sleep(2000);
+ promise1.resolve();
+ ac.abort();
+ await c;
+ }
+ assertEquals(count, 1);
+});
+
+Deno.test(async function retriesWithBackkoffSchedule() {
+ Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "5000");
+
+ let count = 0;
+ const ac = new AbortController();
+ const c = Deno.cron("abc", "*/20 * * * *", async () => {
+ count += 1;
+ await sleep(10);
+ throw new TypeError("cron error");
+ }, { signal: ac.signal, backoffSchedule: [10, 20] });
+ try {
+ await sleep(6000);
+ } finally {
+ ac.abort();
+ await c;
+ }
+
+ // The cron should have executed 3 times (1st attempt and 2 retries).
+ assertEquals(count, 3);
+});
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 56188f3b8..c758e620c 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -1319,6 +1319,31 @@ declare namespace Deno {
/** **UNSTABLE**: New API, yet to be vetted.
*
+ * Create a cron job that will periodically execute the provided handler
+ * callback based on the specified schedule.
+ *
+ * ```ts
+ * Deno.cron("sample cron", "*\/20 * * * *", () => {
+ * console.log("cron job executed");
+ * });
+ * ```
+ * `backoffSchedule` option can be used to specify the retry policy for failed
+ * executions. Each element in the array represents the number of milliseconds
+ * to wait before retrying the execution. For example, `[1000, 5000, 10000]`
+ * means that a failed execution will be retried at most 3 times, with 1
+ * second, 5 seconds, and 10 seconds delay between each retry.
+ *
+ * @category Cron
+ */
+ export function cron(
+ name: string,
+ schedule: string,
+ handler: () => Promise<void> | void,
+ options?: { backoffSchedule?: number[]; signal?: AbortSignal },
+ ): Promise<void>;
+
+ /** **UNSTABLE**: New API, yet to be vetted.
+ *
* A key to be persisted in a {@linkcode Deno.Kv}. A key is a sequence
* of {@linkcode Deno.KvKeyPart}s.
*
diff --git a/ext/cron/01_cron.ts b/ext/cron/01_cron.ts
new file mode 100644
index 000000000..a615ae34b
--- /dev/null
+++ b/ext/cron/01_cron.ts
@@ -0,0 +1,58 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// @ts-ignore internal api
+const core = Deno.core;
+
+function cron(
+ name: string,
+ schedule: string,
+ handler: () => Promise<void> | void,
+ options?: { backoffSchedule?: number[]; signal?: AbortSignal },
+) {
+ if (name === undefined) {
+ throw new TypeError("Deno.cron requires a unique name");
+ }
+ if (schedule === undefined) {
+ throw new TypeError("Deno.cron requires a valid schedule");
+ }
+ if (handler === undefined) {
+ throw new TypeError("Deno.cron requires a handler");
+ }
+
+ const rid = core.ops.op_cron_create(
+ name,
+ schedule,
+ options?.backoffSchedule,
+ );
+
+ if (options?.signal) {
+ const signal = options?.signal;
+ signal.addEventListener(
+ "abort",
+ () => {
+ core.close(rid);
+ },
+ { once: true },
+ );
+ }
+
+ return (async () => {
+ let success = true;
+ while (true) {
+ const r = await core.opAsync("op_cron_next", rid, success);
+ if (r === false) {
+ break;
+ }
+ try {
+ const result = handler();
+ const _res = result instanceof Promise ? (await result) : result;
+ success = true;
+ } catch (error) {
+ console.error(`Exception in cron handler ${name}`, error);
+ success = false;
+ }
+ }
+ })();
+}
+
+export { cron };
diff --git a/ext/cron/Cargo.toml b/ext/cron/Cargo.toml
new file mode 100644
index 000000000..ac107c567
--- /dev/null
+++ b/ext/cron/Cargo.toml
@@ -0,0 +1,23 @@
+# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_cron"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+readme = "README.md"
+repository.workspace = true
+description = "Implementation of the Deno cron API"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+anyhow.workspace = true
+async-trait.workspace = true
+chrono.workspace = true
+deno_core.workspace = true
+deno_unsync = "0.1.1"
+saffron.workspace = true
+tokio.workspace = true
diff --git a/ext/cron/interface.rs b/ext/cron/interface.rs
new file mode 100644
index 000000000..c70988788
--- /dev/null
+++ b/ext/cron/interface.rs
@@ -0,0 +1,23 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use async_trait::async_trait;
+use deno_core::error::AnyError;
+
+pub trait CronHandler {
+ type EH: CronHandle + 'static;
+
+ fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError>;
+}
+
+#[async_trait(?Send)]
+pub trait CronHandle {
+ async fn next(&self, prev_success: bool) -> Result<bool, AnyError>;
+ fn close(&self);
+}
+
+#[derive(Clone)]
+pub struct CronSpec {
+ pub name: String,
+ pub cron_schedule: String,
+ pub backoff_schedule: Option<Vec<u32>>,
+}
diff --git a/ext/cron/lib.rs b/ext/cron/lib.rs
new file mode 100644
index 000000000..c49659703
--- /dev/null
+++ b/ext/cron/lib.rs
@@ -0,0 +1,128 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+mod interface;
+pub mod local;
+mod time;
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use deno_core::error::get_custom_error_class;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op2;
+use deno_core::OpState;
+use deno_core::Resource;
+use deno_core::ResourceId;
+
+pub use crate::interface::*;
+
+pub const UNSTABLE_FEATURE_NAME: &str = "cron";
+
+deno_core::extension!(deno_cron,
+ deps = [ deno_console ],
+ parameters = [ C: CronHandler ],
+ ops = [
+ op_cron_create<C>,
+ op_cron_next<C>,
+ ],
+ esm = [ "01_cron.ts" ],
+ options = {
+ cron_handler: C,
+ },
+ state = |state, options| {
+ state.put(Rc::new(options.cron_handler));
+ }
+);
+
+struct CronResource<EH: CronHandle + 'static> {
+ handle: Rc<EH>,
+}
+
+impl<EH: CronHandle + 'static> Resource for CronResource<EH> {
+ fn name(&self) -> Cow<str> {
+ "cron".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.handle.close();
+ }
+}
+
+#[op2]
+#[smi]
+fn op_cron_create<C>(
+ state: Rc<RefCell<OpState>>,
+ #[string] name: String,
+ #[string] cron_schedule: String,
+ #[serde] backoff_schedule: Option<Vec<u32>>,
+) -> Result<ResourceId, AnyError>
+where
+ C: CronHandler + 'static,
+{
+ let cron_handler = {
+ let state = state.borrow();
+ // TODO(bartlomieju): replace with `state.feature_checker.check_or_exit`
+ // once we phase out `check_or_exit_with_legacy_fallback`
+ state
+ .feature_checker
+ .check_or_exit_with_legacy_fallback(UNSTABLE_FEATURE_NAME, "Deno.cron");
+ state.borrow::<Rc<C>>().clone()
+ };
+
+ validate_cron_name(&name)?;
+
+ let handle = cron_handler.create(CronSpec {
+ name,
+ cron_schedule,
+ backoff_schedule,
+ })?;
+
+ let handle_rid = {
+ let mut state = state.borrow_mut();
+ state.resource_table.add(CronResource {
+ handle: Rc::new(handle),
+ })
+ };
+ Ok(handle_rid)
+}
+
+#[op2(async)]
+async fn op_cron_next<C>(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ prev_success: bool,
+) -> Result<bool, AnyError>
+where
+ C: CronHandler + 'static,
+{
+ let cron_handler = {
+ let state = state.borrow();
+ let resource = match state.resource_table.get::<CronResource<C::EH>>(rid) {
+ Ok(resource) => resource,
+ Err(err) => {
+ if get_custom_error_class(&err) == Some("BadResource") {
+ return Ok(false);
+ } else {
+ return Err(err);
+ }
+ }
+ };
+ resource.handle.clone()
+ };
+
+ cron_handler.next(prev_success).await
+}
+
+fn validate_cron_name(name: &str) -> Result<(), AnyError> {
+ if name.len() > 64 {
+ return Err(type_error("Cron name is too long"));
+ }
+ if !name.chars().all(|c| {
+ c.is_ascii_whitespace() || c.is_ascii_alphanumeric() || c == '_' || c == '-'
+ }) {
+ return Err(type_error("Invalid cron name"));
+ }
+ Ok(())
+}
diff --git a/ext/cron/local.rs b/ext/cron/local.rs
new file mode 100644
index 000000000..0b6dcae2e
--- /dev/null
+++ b/ext/cron/local.rs
@@ -0,0 +1,343 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::OnceCell;
+use std::cell::RefCell;
+use std::collections::BTreeMap;
+use std::collections::HashMap;
+use std::env;
+use std::rc::Rc;
+use std::rc::Weak;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::FutureExt;
+use deno_unsync::spawn;
+use deno_unsync::JoinHandle;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::WeakSender;
+use tokio::sync::OwnedSemaphorePermit;
+use tokio::sync::Semaphore;
+
+use crate::CronHandle;
+use crate::CronHandler;
+use crate::CronSpec;
+
+const MAX_CRONS: usize = 100;
+const DISPATCH_CONCURRENCY_LIMIT: usize = 50;
+const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour
+const MAX_BACKOFF_COUNT: usize = 5;
+const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000];
+
+pub struct LocalCronHandler {
+ cron_schedule_tx: OnceCell<mpsc::Sender<(String, bool)>>,
+ concurrency_limiter: Arc<Semaphore>,
+ cron_loop_join_handle: OnceCell<JoinHandle<()>>,
+ runtime_state: Rc<RefCell<RuntimeState>>,
+}
+
+struct RuntimeState {
+ crons: HashMap<String, Cron>,
+ scheduled_deadlines: BTreeMap<u64, Vec<String>>,
+}
+
+struct Cron {
+ spec: CronSpec,
+ next_tx: mpsc::WeakSender<()>,
+ current_execution_retries: u32,
+}
+
+impl Cron {
+ fn backoff_schedule(&self) -> &[u32] {
+ self
+ .spec
+ .backoff_schedule
+ .as_deref()
+ .unwrap_or(&DEFAULT_BACKOFF_SCHEDULE)
+ }
+}
+
+impl Default for LocalCronHandler {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl LocalCronHandler {
+ pub fn new() -> Self {
+ Self {
+ cron_schedule_tx: OnceCell::new(),
+ concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)),
+ cron_loop_join_handle: OnceCell::new(),
+ runtime_state: Rc::new(RefCell::new(RuntimeState {
+ crons: HashMap::new(),
+ scheduled_deadlines: BTreeMap::new(),
+ })),
+ }
+ }
+
+ async fn cron_loop(
+ runtime_state: Rc<RefCell<RuntimeState>>,
+ mut cron_schedule_rx: mpsc::Receiver<(String, bool)>,
+ ) -> Result<(), AnyError> {
+ loop {
+ let earliest_deadline = runtime_state
+ .borrow()
+ .scheduled_deadlines
+ .keys()
+ .next()
+ .copied();
+
+ let sleep_fut = if let Some(earliest_deadline) = earliest_deadline {
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+ if let Some(delta) = earliest_deadline.checked_sub(now) {
+ tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed()
+ } else {
+ futures::future::ready(()).boxed()
+ }
+ } else {
+ futures::future::pending().boxed()
+ };
+
+ let cron_to_schedule = tokio::select! {
+ _ = sleep_fut => None,
+ x = cron_schedule_rx.recv() => {
+ if x.is_none() {
+ return Ok(());
+ };
+ x
+ }
+ };
+
+ // Schedule next execution of the cron if needed.
+ if let Some((name, prev_success)) = cron_to_schedule {
+ let mut runtime_state = runtime_state.borrow_mut();
+ if let Some(cron) = runtime_state.crons.get_mut(&name) {
+ let backoff_schedule = cron.backoff_schedule();
+ let next_deadline = if !prev_success
+ && cron.current_execution_retries < backoff_schedule.len() as u32
+ {
+ let backoff_ms =
+ backoff_schedule[cron.current_execution_retries as usize];
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+ cron.current_execution_retries += 1;
+ now + backoff_ms as u64
+ } else {
+ let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?;
+ cron.current_execution_retries = 0;
+ next_ts
+ };
+ runtime_state
+ .scheduled_deadlines
+ .entry(next_deadline)
+ .or_default()
+ .push(name.to_string());
+ }
+ }
+
+ // Dispatch ready to execute crons.
+ let crons_to_execute = {
+ let mut runtime_state = runtime_state.borrow_mut();
+ runtime_state.get_ready_crons()?
+ };
+ for (_, tx) in crons_to_execute {
+ if let Some(tx) = tx.upgrade() {
+ let _ = tx.send(()).await;
+ }
+ }
+ }
+ }
+}
+
+impl RuntimeState {
+ fn get_ready_crons(
+ &mut self,
+ ) -> Result<Vec<(String, WeakSender<()>)>, AnyError> {
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+
+ let ready = {
+ let to_remove = self
+ .scheduled_deadlines
+ .range(..=now)
+ .map(|(ts, _)| *ts)
+ .collect::<Vec<_>>();
+ to_remove
+ .iter()
+ .flat_map(|ts| {
+ self
+ .scheduled_deadlines
+ .remove(ts)
+ .unwrap()
+ .iter()
+ .map(move |name| (*ts, name.clone()))
+ .collect::<Vec<_>>()
+ })
+ .map(|(_, name)| {
+ (name.clone(), self.crons.get(&name).unwrap().next_tx.clone())
+ })
+ .collect::<Vec<_>>()
+ };
+
+ Ok(ready)
+ }
+}
+
+#[async_trait(?Send)]
+impl CronHandler for LocalCronHandler {
+ type EH = CronExecutionHandle;
+
+ fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError> {
+ // Ensure that the cron loop is started.
+ self.cron_loop_join_handle.get_or_init(|| {
+ let (cron_schedule_tx, cron_schedule_rx) =
+ mpsc::channel::<(String, bool)>(1);
+ self.cron_schedule_tx.set(cron_schedule_tx).unwrap();
+ let runtime_state = self.runtime_state.clone();
+ spawn(async move {
+ LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx)
+ .await
+ .unwrap();
+ })
+ });
+
+ let mut runtime_state = self.runtime_state.borrow_mut();
+
+ if runtime_state.crons.len() > MAX_CRONS {
+ return Err(type_error("Too many crons"));
+ }
+ if runtime_state.crons.contains_key(&spec.name) {
+ return Err(type_error("Cron with this name already exists"));
+ }
+
+ // Validate schedule expression.
+ spec
+ .cron_schedule
+ .parse::<saffron::Cron>()
+ .map_err(|_| type_error("Invalid cron schedule"))?;
+
+ // Validate backoff_schedule.
+ if let Some(backoff_schedule) = &spec.backoff_schedule {
+ validate_backoff_schedule(backoff_schedule)?;
+ }
+
+ let (next_tx, next_rx) = mpsc::channel::<()>(1);
+ let cron = Cron {
+ spec: spec.clone(),
+ next_tx: next_tx.downgrade(),
+ current_execution_retries: 0,
+ };
+ runtime_state.crons.insert(spec.name.clone(), cron);
+
+ Ok(CronExecutionHandle {
+ name: spec.name.clone(),
+ cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(),
+ concurrency_limiter: self.concurrency_limiter.clone(),
+ runtime_state: Rc::downgrade(&self.runtime_state),
+ inner: RefCell::new(Inner {
+ next_rx: Some(next_rx),
+ shutdown_tx: Some(next_tx),
+ permit: None,
+ }),
+ })
+ }
+}
+
+pub struct CronExecutionHandle {
+ name: String,
+ runtime_state: Weak<RefCell<RuntimeState>>,
+ cron_schedule_tx: mpsc::Sender<(String, bool)>,
+ concurrency_limiter: Arc<Semaphore>,
+ inner: RefCell<Inner>,
+}
+
+struct Inner {
+ next_rx: Option<mpsc::Receiver<()>>,
+ shutdown_tx: Option<mpsc::Sender<()>>,
+ permit: Option<OwnedSemaphorePermit>,
+}
+
+#[async_trait(?Send)]
+impl CronHandle for CronExecutionHandle {
+ async fn next(&self, prev_success: bool) -> Result<bool, AnyError> {
+ self.inner.borrow_mut().permit.take();
+
+ if self
+ .cron_schedule_tx
+ .send((self.name.clone(), prev_success))
+ .await
+ .is_err()
+ {
+ return Ok(false);
+ };
+
+ let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else {
+ return Ok(false);
+ };
+ if next_rx.recv().await.is_none() {
+ return Ok(false);
+ };
+
+ let permit = self.concurrency_limiter.clone().acquire_owned().await?;
+ let mut inner = self.inner.borrow_mut();
+ inner.next_rx = Some(next_rx);
+ inner.permit = Some(permit);
+ Ok(true)
+ }
+
+ fn close(&self) {
+ if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() {
+ drop(tx)
+ }
+ if let Some(runtime_state) = self.runtime_state.upgrade() {
+ let mut runtime_state = runtime_state.borrow_mut();
+ runtime_state.crons.remove(&self.name);
+ }
+ }
+}
+
+fn compute_next_deadline(cron_expression: &str) -> Result<u64, AnyError> {
+ let now = crate::time::utc_now();
+
+ if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") {
+ if let Ok(offset) = test_schedule.parse::<u64>() {
+ return Ok(now.timestamp_millis() as u64 + offset);
+ }
+ }
+
+ let cron = cron_expression
+ .parse::<saffron::Cron>()
+ .map_err(|_| anyhow::anyhow!("invalid cron expression"))?;
+ let Some(next_deadline) = cron.next_after(now) else {
+ return Err(anyhow::anyhow!("invalid cron expression"));
+ };
+ Ok(next_deadline.timestamp_millis() as u64)
+}
+
+fn validate_backoff_schedule(
+ backoff_schedule: &Vec<u32>,
+) -> Result<(), AnyError> {
+ if backoff_schedule.len() > MAX_BACKOFF_COUNT {
+ return Err(type_error("Invalid backoff schedule"));
+ }
+ if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) {
+ return Err(type_error("Invalid backoff schedule"));
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_compute_next_deadline() {
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+ assert!(compute_next_deadline("*/1 * * * *").unwrap() > now);
+ assert!(compute_next_deadline("* * * * *").unwrap() > now);
+ assert!(compute_next_deadline("bogus").is_err());
+ assert!(compute_next_deadline("* * * * * *").is_err());
+ assert!(compute_next_deadline("* * *").is_err());
+ }
+}
diff --git a/ext/cron/time.rs b/ext/cron/time.rs
new file mode 100644
index 000000000..60375818b
--- /dev/null
+++ b/ext/cron/time.rs
@@ -0,0 +1,19 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+/// Identical to chrono::Utc::now() but without the system "clock"
+/// feature flag.
+///
+/// The "clock" feature flag pulls in the "iana-time-zone" crate
+/// which links to macOS's "CoreFoundation" framework which increases
+/// startup time for the CLI.
+pub fn utc_now() -> chrono::DateTime<chrono::Utc> {
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .expect("system time before Unix epoch");
+ let naive = chrono::NaiveDateTime::from_timestamp_opt(
+ now.as_secs() as i64,
+ now.subsec_nanos(),
+ )
+ .unwrap();
+ chrono::DateTime::from_naive_utc_and_offset(naive, chrono::Utc)
+}
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 4634d758a..7c60fd4a8 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -44,6 +44,7 @@ deno_broadcast_channel.workspace = true
deno_cache.workspace = true
deno_console.workspace = true
deno_core.workspace = true
+deno_cron.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
@@ -71,6 +72,7 @@ deno_broadcast_channel.workspace = true
deno_cache.workspace = true
deno_console.workspace = true
deno_core.workspace = true
+deno_cron.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
diff --git a/runtime/build.rs b/runtime/build.rs
index ce1896e6f..2c6f0a4d8 100644
--- a/runtime/build.rs
+++ b/runtime/build.rs
@@ -223,6 +223,9 @@ mod startup_snapshot {
deno_kv::deno_kv::init_ops_and_esm(deno_kv::sqlite::SqliteDbHandler::<
Permissions,
>::new(None, None)),
+ deno_cron::deno_cron::init_ops_and_esm(
+ deno_cron::local::LocalCronHandler::new(),
+ ),
deno_napi::deno_napi::init_ops_and_esm::<Permissions>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Default::default()),
diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js
index 5364a60ee..c44c14bbe 100644
--- a/runtime/js/90_deno_ns.js
+++ b/runtime/js/90_deno_ns.js
@@ -24,6 +24,7 @@ import * as tty from "ext:runtime/40_tty.js";
// TODO(bartlomieju): this is funky we have two `http` imports
import * as httpRuntime from "ext:runtime/40_http.js";
import * as kv from "ext:deno_kv/01_db.ts";
+import * as cron from "ext:deno_cron/01_cron.ts";
const denoNs = {
metrics: core.metrics,
@@ -179,6 +180,7 @@ const denoNsUnstable = {
Kv: kv.Kv,
KvU64: kv.KvU64,
KvListIterator: kv.KvListIterator,
+ cron: cron.cron,
};
export { denoNs, denoNsUnstable };
diff --git a/runtime/lib.rs b/runtime/lib.rs
index bc49a2fef..c49446375 100644
--- a/runtime/lib.rs
+++ b/runtime/lib.rs
@@ -4,6 +4,7 @@ pub use deno_broadcast_channel;
pub use deno_cache;
pub use deno_console;
pub use deno_core;
+pub use deno_cron;
pub use deno_crypto;
pub use deno_fetch;
pub use deno_ffi;
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index de69ce43b..5c42d752f 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -36,6 +36,7 @@ use deno_core::RuntimeOptions;
use deno_core::SharedArrayBufferStore;
use deno_core::Snapshot;
use deno_core::SourceMapGetter;
+use deno_cron::local::LocalCronHandler;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
@@ -450,6 +451,7 @@ impl WebWorker {
},
),
),
+ deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),
diff --git a/runtime/worker.rs b/runtime/worker.rs
index f0fc25aa2..2e2910109 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -31,6 +31,7 @@ use deno_core::RuntimeOptions;
use deno_core::SharedArrayBufferStore;
use deno_core::Snapshot;
use deno_core::SourceMapGetter;
+use deno_cron::local::LocalCronHandler;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
@@ -273,6 +274,7 @@ impl MainWorker {
},
),
),
+ deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),
diff --git a/tools/core_import_map.json b/tools/core_import_map.json
index 3c1d65f93..0f356eb97 100644
--- a/tools/core_import_map.json
+++ b/tools/core_import_map.json
@@ -3,6 +3,7 @@
"ext:deno_broadcast_channel/01_broadcast_channel.js": "../ext/broadcast_channel/01_broadcast_channel.js",
"ext:deno_cache/01_cache.js": "../ext/cache/01_cache.js",
"ext:deno_console/01_console.js": "../ext/console/01_console.js",
+ "ext:deno_cron/01_cron.ts": "../ext/cron/01_cron.ts",
"ext:deno_crypto/00_crypto.js": "../ext/crypto/00_crypto.js",
"ext:deno_fetch/20_headers.js": "../ext/fetch/20_headers.js",
"ext:deno_fetch/21_formdata.js": "../ext/fetch/21_formdata.js",