summaryrefslogtreecommitdiff
path: root/ext/cron
diff options
context:
space:
mode:
Diffstat (limited to 'ext/cron')
-rw-r--r--ext/cron/01_cron.ts58
-rw-r--r--ext/cron/Cargo.toml23
-rw-r--r--ext/cron/interface.rs23
-rw-r--r--ext/cron/lib.rs128
-rw-r--r--ext/cron/local.rs343
-rw-r--r--ext/cron/time.rs19
6 files changed, 594 insertions, 0 deletions
diff --git a/ext/cron/01_cron.ts b/ext/cron/01_cron.ts
new file mode 100644
index 000000000..a615ae34b
--- /dev/null
+++ b/ext/cron/01_cron.ts
@@ -0,0 +1,58 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// @ts-ignore internal api
+const core = Deno.core;
+
+function cron(
+ name: string,
+ schedule: string,
+ handler: () => Promise<void> | void,
+ options?: { backoffSchedule?: number[]; signal?: AbortSignal },
+) {
+ if (name === undefined) {
+ throw new TypeError("Deno.cron requires a unique name");
+ }
+ if (schedule === undefined) {
+ throw new TypeError("Deno.cron requires a valid schedule");
+ }
+ if (handler === undefined) {
+ throw new TypeError("Deno.cron requires a handler");
+ }
+
+ const rid = core.ops.op_cron_create(
+ name,
+ schedule,
+ options?.backoffSchedule,
+ );
+
+ if (options?.signal) {
+ const signal = options?.signal;
+ signal.addEventListener(
+ "abort",
+ () => {
+ core.close(rid);
+ },
+ { once: true },
+ );
+ }
+
+ return (async () => {
+ let success = true;
+ while (true) {
+ const r = await core.opAsync("op_cron_next", rid, success);
+ if (r === false) {
+ break;
+ }
+ try {
+ const result = handler();
+ const _res = result instanceof Promise ? (await result) : result;
+ success = true;
+ } catch (error) {
+ console.error(`Exception in cron handler ${name}`, error);
+ success = false;
+ }
+ }
+ })();
+}
+
+export { cron };
diff --git a/ext/cron/Cargo.toml b/ext/cron/Cargo.toml
new file mode 100644
index 000000000..ac107c567
--- /dev/null
+++ b/ext/cron/Cargo.toml
@@ -0,0 +1,23 @@
+# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+[package]
+name = "deno_cron"
+version = "0.1.0"
+authors.workspace = true
+edition.workspace = true
+license.workspace = true
+readme = "README.md"
+repository.workspace = true
+description = "Implementation of the Deno cron API"
+
+[lib]
+path = "lib.rs"
+
+[dependencies]
+anyhow.workspace = true
+async-trait.workspace = true
+chrono.workspace = true
+deno_core.workspace = true
+deno_unsync = "0.1.1"
+saffron.workspace = true
+tokio.workspace = true
diff --git a/ext/cron/interface.rs b/ext/cron/interface.rs
new file mode 100644
index 000000000..c70988788
--- /dev/null
+++ b/ext/cron/interface.rs
@@ -0,0 +1,23 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use async_trait::async_trait;
+use deno_core::error::AnyError;
+
+pub trait CronHandler {
+ type EH: CronHandle + 'static;
+
+ fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError>;
+}
+
+#[async_trait(?Send)]
+pub trait CronHandle {
+ async fn next(&self, prev_success: bool) -> Result<bool, AnyError>;
+ fn close(&self);
+}
+
+#[derive(Clone)]
+pub struct CronSpec {
+ pub name: String,
+ pub cron_schedule: String,
+ pub backoff_schedule: Option<Vec<u32>>,
+}
diff --git a/ext/cron/lib.rs b/ext/cron/lib.rs
new file mode 100644
index 000000000..c49659703
--- /dev/null
+++ b/ext/cron/lib.rs
@@ -0,0 +1,128 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+mod interface;
+pub mod local;
+mod time;
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use deno_core::error::get_custom_error_class;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op2;
+use deno_core::OpState;
+use deno_core::Resource;
+use deno_core::ResourceId;
+
+pub use crate::interface::*;
+
+pub const UNSTABLE_FEATURE_NAME: &str = "cron";
+
+deno_core::extension!(deno_cron,
+ deps = [ deno_console ],
+ parameters = [ C: CronHandler ],
+ ops = [
+ op_cron_create<C>,
+ op_cron_next<C>,
+ ],
+ esm = [ "01_cron.ts" ],
+ options = {
+ cron_handler: C,
+ },
+ state = |state, options| {
+ state.put(Rc::new(options.cron_handler));
+ }
+);
+
+struct CronResource<EH: CronHandle + 'static> {
+ handle: Rc<EH>,
+}
+
+impl<EH: CronHandle + 'static> Resource for CronResource<EH> {
+ fn name(&self) -> Cow<str> {
+ "cron".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.handle.close();
+ }
+}
+
+#[op2]
+#[smi]
+fn op_cron_create<C>(
+ state: Rc<RefCell<OpState>>,
+ #[string] name: String,
+ #[string] cron_schedule: String,
+ #[serde] backoff_schedule: Option<Vec<u32>>,
+) -> Result<ResourceId, AnyError>
+where
+ C: CronHandler + 'static,
+{
+ let cron_handler = {
+ let state = state.borrow();
+ // TODO(bartlomieju): replace with `state.feature_checker.check_or_exit`
+ // once we phase out `check_or_exit_with_legacy_fallback`
+ state
+ .feature_checker
+ .check_or_exit_with_legacy_fallback(UNSTABLE_FEATURE_NAME, "Deno.cron");
+ state.borrow::<Rc<C>>().clone()
+ };
+
+ validate_cron_name(&name)?;
+
+ let handle = cron_handler.create(CronSpec {
+ name,
+ cron_schedule,
+ backoff_schedule,
+ })?;
+
+ let handle_rid = {
+ let mut state = state.borrow_mut();
+ state.resource_table.add(CronResource {
+ handle: Rc::new(handle),
+ })
+ };
+ Ok(handle_rid)
+}
+
+#[op2(async)]
+async fn op_cron_next<C>(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ prev_success: bool,
+) -> Result<bool, AnyError>
+where
+ C: CronHandler + 'static,
+{
+ let cron_handler = {
+ let state = state.borrow();
+ let resource = match state.resource_table.get::<CronResource<C::EH>>(rid) {
+ Ok(resource) => resource,
+ Err(err) => {
+ if get_custom_error_class(&err) == Some("BadResource") {
+ return Ok(false);
+ } else {
+ return Err(err);
+ }
+ }
+ };
+ resource.handle.clone()
+ };
+
+ cron_handler.next(prev_success).await
+}
+
+fn validate_cron_name(name: &str) -> Result<(), AnyError> {
+ if name.len() > 64 {
+ return Err(type_error("Cron name is too long"));
+ }
+ if !name.chars().all(|c| {
+ c.is_ascii_whitespace() || c.is_ascii_alphanumeric() || c == '_' || c == '-'
+ }) {
+ return Err(type_error("Invalid cron name"));
+ }
+ Ok(())
+}
diff --git a/ext/cron/local.rs b/ext/cron/local.rs
new file mode 100644
index 000000000..0b6dcae2e
--- /dev/null
+++ b/ext/cron/local.rs
@@ -0,0 +1,343 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::OnceCell;
+use std::cell::RefCell;
+use std::collections::BTreeMap;
+use std::collections::HashMap;
+use std::env;
+use std::rc::Rc;
+use std::rc::Weak;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::FutureExt;
+use deno_unsync::spawn;
+use deno_unsync::JoinHandle;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::WeakSender;
+use tokio::sync::OwnedSemaphorePermit;
+use tokio::sync::Semaphore;
+
+use crate::CronHandle;
+use crate::CronHandler;
+use crate::CronSpec;
+
+const MAX_CRONS: usize = 100;
+const DISPATCH_CONCURRENCY_LIMIT: usize = 50;
+const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour
+const MAX_BACKOFF_COUNT: usize = 5;
+const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000];
+
+pub struct LocalCronHandler {
+ cron_schedule_tx: OnceCell<mpsc::Sender<(String, bool)>>,
+ concurrency_limiter: Arc<Semaphore>,
+ cron_loop_join_handle: OnceCell<JoinHandle<()>>,
+ runtime_state: Rc<RefCell<RuntimeState>>,
+}
+
+struct RuntimeState {
+ crons: HashMap<String, Cron>,
+ scheduled_deadlines: BTreeMap<u64, Vec<String>>,
+}
+
+struct Cron {
+ spec: CronSpec,
+ next_tx: mpsc::WeakSender<()>,
+ current_execution_retries: u32,
+}
+
+impl Cron {
+ fn backoff_schedule(&self) -> &[u32] {
+ self
+ .spec
+ .backoff_schedule
+ .as_deref()
+ .unwrap_or(&DEFAULT_BACKOFF_SCHEDULE)
+ }
+}
+
+impl Default for LocalCronHandler {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl LocalCronHandler {
+ pub fn new() -> Self {
+ Self {
+ cron_schedule_tx: OnceCell::new(),
+ concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)),
+ cron_loop_join_handle: OnceCell::new(),
+ runtime_state: Rc::new(RefCell::new(RuntimeState {
+ crons: HashMap::new(),
+ scheduled_deadlines: BTreeMap::new(),
+ })),
+ }
+ }
+
+ async fn cron_loop(
+ runtime_state: Rc<RefCell<RuntimeState>>,
+ mut cron_schedule_rx: mpsc::Receiver<(String, bool)>,
+ ) -> Result<(), AnyError> {
+ loop {
+ let earliest_deadline = runtime_state
+ .borrow()
+ .scheduled_deadlines
+ .keys()
+ .next()
+ .copied();
+
+ let sleep_fut = if let Some(earliest_deadline) = earliest_deadline {
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+ if let Some(delta) = earliest_deadline.checked_sub(now) {
+ tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed()
+ } else {
+ futures::future::ready(()).boxed()
+ }
+ } else {
+ futures::future::pending().boxed()
+ };
+
+ let cron_to_schedule = tokio::select! {
+ _ = sleep_fut => None,
+ x = cron_schedule_rx.recv() => {
+ if x.is_none() {
+ return Ok(());
+ };
+ x
+ }
+ };
+
+ // Schedule next execution of the cron if needed.
+ if let Some((name, prev_success)) = cron_to_schedule {
+ let mut runtime_state = runtime_state.borrow_mut();
+ if let Some(cron) = runtime_state.crons.get_mut(&name) {
+ let backoff_schedule = cron.backoff_schedule();
+ let next_deadline = if !prev_success
+ && cron.current_execution_retries < backoff_schedule.len() as u32
+ {
+ let backoff_ms =
+ backoff_schedule[cron.current_execution_retries as usize];
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+ cron.current_execution_retries += 1;
+ now + backoff_ms as u64
+ } else {
+ let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?;
+ cron.current_execution_retries = 0;
+ next_ts
+ };
+ runtime_state
+ .scheduled_deadlines
+ .entry(next_deadline)
+ .or_default()
+ .push(name.to_string());
+ }
+ }
+
+ // Dispatch ready to execute crons.
+ let crons_to_execute = {
+ let mut runtime_state = runtime_state.borrow_mut();
+ runtime_state.get_ready_crons()?
+ };
+ for (_, tx) in crons_to_execute {
+ if let Some(tx) = tx.upgrade() {
+ let _ = tx.send(()).await;
+ }
+ }
+ }
+ }
+}
+
+impl RuntimeState {
+ fn get_ready_crons(
+ &mut self,
+ ) -> Result<Vec<(String, WeakSender<()>)>, AnyError> {
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+
+ let ready = {
+ let to_remove = self
+ .scheduled_deadlines
+ .range(..=now)
+ .map(|(ts, _)| *ts)
+ .collect::<Vec<_>>();
+ to_remove
+ .iter()
+ .flat_map(|ts| {
+ self
+ .scheduled_deadlines
+ .remove(ts)
+ .unwrap()
+ .iter()
+ .map(move |name| (*ts, name.clone()))
+ .collect::<Vec<_>>()
+ })
+ .map(|(_, name)| {
+ (name.clone(), self.crons.get(&name).unwrap().next_tx.clone())
+ })
+ .collect::<Vec<_>>()
+ };
+
+ Ok(ready)
+ }
+}
+
+#[async_trait(?Send)]
+impl CronHandler for LocalCronHandler {
+ type EH = CronExecutionHandle;
+
+ fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError> {
+ // Ensure that the cron loop is started.
+ self.cron_loop_join_handle.get_or_init(|| {
+ let (cron_schedule_tx, cron_schedule_rx) =
+ mpsc::channel::<(String, bool)>(1);
+ self.cron_schedule_tx.set(cron_schedule_tx).unwrap();
+ let runtime_state = self.runtime_state.clone();
+ spawn(async move {
+ LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx)
+ .await
+ .unwrap();
+ })
+ });
+
+ let mut runtime_state = self.runtime_state.borrow_mut();
+
+ if runtime_state.crons.len() > MAX_CRONS {
+ return Err(type_error("Too many crons"));
+ }
+ if runtime_state.crons.contains_key(&spec.name) {
+ return Err(type_error("Cron with this name already exists"));
+ }
+
+ // Validate schedule expression.
+ spec
+ .cron_schedule
+ .parse::<saffron::Cron>()
+ .map_err(|_| type_error("Invalid cron schedule"))?;
+
+ // Validate backoff_schedule.
+ if let Some(backoff_schedule) = &spec.backoff_schedule {
+ validate_backoff_schedule(backoff_schedule)?;
+ }
+
+ let (next_tx, next_rx) = mpsc::channel::<()>(1);
+ let cron = Cron {
+ spec: spec.clone(),
+ next_tx: next_tx.downgrade(),
+ current_execution_retries: 0,
+ };
+ runtime_state.crons.insert(spec.name.clone(), cron);
+
+ Ok(CronExecutionHandle {
+ name: spec.name.clone(),
+ cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(),
+ concurrency_limiter: self.concurrency_limiter.clone(),
+ runtime_state: Rc::downgrade(&self.runtime_state),
+ inner: RefCell::new(Inner {
+ next_rx: Some(next_rx),
+ shutdown_tx: Some(next_tx),
+ permit: None,
+ }),
+ })
+ }
+}
+
+pub struct CronExecutionHandle {
+ name: String,
+ runtime_state: Weak<RefCell<RuntimeState>>,
+ cron_schedule_tx: mpsc::Sender<(String, bool)>,
+ concurrency_limiter: Arc<Semaphore>,
+ inner: RefCell<Inner>,
+}
+
+struct Inner {
+ next_rx: Option<mpsc::Receiver<()>>,
+ shutdown_tx: Option<mpsc::Sender<()>>,
+ permit: Option<OwnedSemaphorePermit>,
+}
+
+#[async_trait(?Send)]
+impl CronHandle for CronExecutionHandle {
+ async fn next(&self, prev_success: bool) -> Result<bool, AnyError> {
+ self.inner.borrow_mut().permit.take();
+
+ if self
+ .cron_schedule_tx
+ .send((self.name.clone(), prev_success))
+ .await
+ .is_err()
+ {
+ return Ok(false);
+ };
+
+ let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else {
+ return Ok(false);
+ };
+ if next_rx.recv().await.is_none() {
+ return Ok(false);
+ };
+
+ let permit = self.concurrency_limiter.clone().acquire_owned().await?;
+ let mut inner = self.inner.borrow_mut();
+ inner.next_rx = Some(next_rx);
+ inner.permit = Some(permit);
+ Ok(true)
+ }
+
+ fn close(&self) {
+ if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() {
+ drop(tx)
+ }
+ if let Some(runtime_state) = self.runtime_state.upgrade() {
+ let mut runtime_state = runtime_state.borrow_mut();
+ runtime_state.crons.remove(&self.name);
+ }
+ }
+}
+
+fn compute_next_deadline(cron_expression: &str) -> Result<u64, AnyError> {
+ let now = crate::time::utc_now();
+
+ if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") {
+ if let Ok(offset) = test_schedule.parse::<u64>() {
+ return Ok(now.timestamp_millis() as u64 + offset);
+ }
+ }
+
+ let cron = cron_expression
+ .parse::<saffron::Cron>()
+ .map_err(|_| anyhow::anyhow!("invalid cron expression"))?;
+ let Some(next_deadline) = cron.next_after(now) else {
+ return Err(anyhow::anyhow!("invalid cron expression"));
+ };
+ Ok(next_deadline.timestamp_millis() as u64)
+}
+
+fn validate_backoff_schedule(
+ backoff_schedule: &Vec<u32>,
+) -> Result<(), AnyError> {
+ if backoff_schedule.len() > MAX_BACKOFF_COUNT {
+ return Err(type_error("Invalid backoff schedule"));
+ }
+ if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) {
+ return Err(type_error("Invalid backoff schedule"));
+ }
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_compute_next_deadline() {
+ let now = crate::time::utc_now().timestamp_millis() as u64;
+ assert!(compute_next_deadline("*/1 * * * *").unwrap() > now);
+ assert!(compute_next_deadline("* * * * *").unwrap() > now);
+ assert!(compute_next_deadline("bogus").is_err());
+ assert!(compute_next_deadline("* * * * * *").is_err());
+ assert!(compute_next_deadline("* * *").is_err());
+ }
+}
diff --git a/ext/cron/time.rs b/ext/cron/time.rs
new file mode 100644
index 000000000..60375818b
--- /dev/null
+++ b/ext/cron/time.rs
@@ -0,0 +1,19 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+/// Identical to chrono::Utc::now() but without the system "clock"
+/// feature flag.
+///
+/// The "clock" feature flag pulls in the "iana-time-zone" crate
+/// which links to macOS's "CoreFoundation" framework which increases
+/// startup time for the CLI.
+pub fn utc_now() -> chrono::DateTime<chrono::Utc> {
+ let now = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .expect("system time before Unix epoch");
+ let naive = chrono::NaiveDateTime::from_timestamp_opt(
+ now.as_secs() as i64,
+ now.subsec_nanos(),
+ )
+ .unwrap();
+ chrono::DateTime::from_naive_utc_and_offset(naive, chrono::Utc)
+}