diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-11-01 11:57:55 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-01 11:57:55 -0700 |
commit | 01d3e0f317ca180bbf0ac8a17c6651869110e02f (patch) | |
tree | b8e3bb91e87cf396bfa782f6377158f8a170b32b /ext/cron/local.rs | |
parent | 82643857cc77a80f9a819584035ec147a6114553 (diff) |
feat(cron) implement Deno.cron() (#21019)
This PR adds unstable `Deno.cron` API to trigger execution of cron jobs.
* State: All cron state is in memory. Cron jobs are scheduled according
to the cron schedule expression and the current time. No state is
persisted to disk.
* Time zone: Cron expressions specify time in UTC.
* Overlapping executions: not permitted. If the next scheduled execution
time occurs while the same cron job is still executing, the scheduled
execution is skipped.
* Retries: failed jobs are automatically retried until they succeed or
until retry threshold is reached. Retry policy can be optionally
specified using `options.backoffSchedule`.
Diffstat (limited to 'ext/cron/local.rs')
-rw-r--r-- | ext/cron/local.rs | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/ext/cron/local.rs b/ext/cron/local.rs new file mode 100644 index 000000000..0b6dcae2e --- /dev/null +++ b/ext/cron/local.rs @@ -0,0 +1,343 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::cell::OnceCell; +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::env; +use std::rc::Rc; +use std::rc::Weak; +use std::sync::Arc; + +use async_trait::async_trait; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::FutureExt; +use deno_unsync::spawn; +use deno_unsync::JoinHandle; +use tokio::sync::mpsc; +use tokio::sync::mpsc::WeakSender; +use tokio::sync::OwnedSemaphorePermit; +use tokio::sync::Semaphore; + +use crate::CronHandle; +use crate::CronHandler; +use crate::CronSpec; + +const MAX_CRONS: usize = 100; +const DISPATCH_CONCURRENCY_LIMIT: usize = 50; +const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour +const MAX_BACKOFF_COUNT: usize = 5; +const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000]; + +pub struct LocalCronHandler { + cron_schedule_tx: OnceCell<mpsc::Sender<(String, bool)>>, + concurrency_limiter: Arc<Semaphore>, + cron_loop_join_handle: OnceCell<JoinHandle<()>>, + runtime_state: Rc<RefCell<RuntimeState>>, +} + +struct RuntimeState { + crons: HashMap<String, Cron>, + scheduled_deadlines: BTreeMap<u64, Vec<String>>, +} + +struct Cron { + spec: CronSpec, + next_tx: mpsc::WeakSender<()>, + current_execution_retries: u32, +} + +impl Cron { + fn backoff_schedule(&self) -> &[u32] { + self + .spec + .backoff_schedule + .as_deref() + .unwrap_or(&DEFAULT_BACKOFF_SCHEDULE) + } +} + +impl Default for LocalCronHandler { + fn default() -> Self { + Self::new() + } +} + +impl LocalCronHandler { + pub fn new() -> Self { + Self { + cron_schedule_tx: OnceCell::new(), + concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)), + cron_loop_join_handle: OnceCell::new(), + runtime_state: Rc::new(RefCell::new(RuntimeState { + crons: HashMap::new(), + scheduled_deadlines: BTreeMap::new(), + })), + } + } + + async fn cron_loop( + runtime_state: Rc<RefCell<RuntimeState>>, + mut cron_schedule_rx: mpsc::Receiver<(String, bool)>, + ) -> Result<(), AnyError> { + loop { + let earliest_deadline = runtime_state + .borrow() + .scheduled_deadlines + .keys() + .next() + .copied(); + + let sleep_fut = if let Some(earliest_deadline) = earliest_deadline { + let now = crate::time::utc_now().timestamp_millis() as u64; + if let Some(delta) = earliest_deadline.checked_sub(now) { + tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed() + } else { + futures::future::ready(()).boxed() + } + } else { + futures::future::pending().boxed() + }; + + let cron_to_schedule = tokio::select! { + _ = sleep_fut => None, + x = cron_schedule_rx.recv() => { + if x.is_none() { + return Ok(()); + }; + x + } + }; + + // Schedule next execution of the cron if needed. + if let Some((name, prev_success)) = cron_to_schedule { + let mut runtime_state = runtime_state.borrow_mut(); + if let Some(cron) = runtime_state.crons.get_mut(&name) { + let backoff_schedule = cron.backoff_schedule(); + let next_deadline = if !prev_success + && cron.current_execution_retries < backoff_schedule.len() as u32 + { + let backoff_ms = + backoff_schedule[cron.current_execution_retries as usize]; + let now = crate::time::utc_now().timestamp_millis() as u64; + cron.current_execution_retries += 1; + now + backoff_ms as u64 + } else { + let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?; + cron.current_execution_retries = 0; + next_ts + }; + runtime_state + .scheduled_deadlines + .entry(next_deadline) + .or_default() + .push(name.to_string()); + } + } + + // Dispatch ready to execute crons. + let crons_to_execute = { + let mut runtime_state = runtime_state.borrow_mut(); + runtime_state.get_ready_crons()? + }; + for (_, tx) in crons_to_execute { + if let Some(tx) = tx.upgrade() { + let _ = tx.send(()).await; + } + } + } + } +} + +impl RuntimeState { + fn get_ready_crons( + &mut self, + ) -> Result<Vec<(String, WeakSender<()>)>, AnyError> { + let now = crate::time::utc_now().timestamp_millis() as u64; + + let ready = { + let to_remove = self + .scheduled_deadlines + .range(..=now) + .map(|(ts, _)| *ts) + .collect::<Vec<_>>(); + to_remove + .iter() + .flat_map(|ts| { + self + .scheduled_deadlines + .remove(ts) + .unwrap() + .iter() + .map(move |name| (*ts, name.clone())) + .collect::<Vec<_>>() + }) + .map(|(_, name)| { + (name.clone(), self.crons.get(&name).unwrap().next_tx.clone()) + }) + .collect::<Vec<_>>() + }; + + Ok(ready) + } +} + +#[async_trait(?Send)] +impl CronHandler for LocalCronHandler { + type EH = CronExecutionHandle; + + fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError> { + // Ensure that the cron loop is started. + self.cron_loop_join_handle.get_or_init(|| { + let (cron_schedule_tx, cron_schedule_rx) = + mpsc::channel::<(String, bool)>(1); + self.cron_schedule_tx.set(cron_schedule_tx).unwrap(); + let runtime_state = self.runtime_state.clone(); + spawn(async move { + LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx) + .await + .unwrap(); + }) + }); + + let mut runtime_state = self.runtime_state.borrow_mut(); + + if runtime_state.crons.len() > MAX_CRONS { + return Err(type_error("Too many crons")); + } + if runtime_state.crons.contains_key(&spec.name) { + return Err(type_error("Cron with this name already exists")); + } + + // Validate schedule expression. + spec + .cron_schedule + .parse::<saffron::Cron>() + .map_err(|_| type_error("Invalid cron schedule"))?; + + // Validate backoff_schedule. + if let Some(backoff_schedule) = &spec.backoff_schedule { + validate_backoff_schedule(backoff_schedule)?; + } + + let (next_tx, next_rx) = mpsc::channel::<()>(1); + let cron = Cron { + spec: spec.clone(), + next_tx: next_tx.downgrade(), + current_execution_retries: 0, + }; + runtime_state.crons.insert(spec.name.clone(), cron); + + Ok(CronExecutionHandle { + name: spec.name.clone(), + cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(), + concurrency_limiter: self.concurrency_limiter.clone(), + runtime_state: Rc::downgrade(&self.runtime_state), + inner: RefCell::new(Inner { + next_rx: Some(next_rx), + shutdown_tx: Some(next_tx), + permit: None, + }), + }) + } +} + +pub struct CronExecutionHandle { + name: String, + runtime_state: Weak<RefCell<RuntimeState>>, + cron_schedule_tx: mpsc::Sender<(String, bool)>, + concurrency_limiter: Arc<Semaphore>, + inner: RefCell<Inner>, +} + +struct Inner { + next_rx: Option<mpsc::Receiver<()>>, + shutdown_tx: Option<mpsc::Sender<()>>, + permit: Option<OwnedSemaphorePermit>, +} + +#[async_trait(?Send)] +impl CronHandle for CronExecutionHandle { + async fn next(&self, prev_success: bool) -> Result<bool, AnyError> { + self.inner.borrow_mut().permit.take(); + + if self + .cron_schedule_tx + .send((self.name.clone(), prev_success)) + .await + .is_err() + { + return Ok(false); + }; + + let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else { + return Ok(false); + }; + if next_rx.recv().await.is_none() { + return Ok(false); + }; + + let permit = self.concurrency_limiter.clone().acquire_owned().await?; + let mut inner = self.inner.borrow_mut(); + inner.next_rx = Some(next_rx); + inner.permit = Some(permit); + Ok(true) + } + + fn close(&self) { + if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() { + drop(tx) + } + if let Some(runtime_state) = self.runtime_state.upgrade() { + let mut runtime_state = runtime_state.borrow_mut(); + runtime_state.crons.remove(&self.name); + } + } +} + +fn compute_next_deadline(cron_expression: &str) -> Result<u64, AnyError> { + let now = crate::time::utc_now(); + + if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") { + if let Ok(offset) = test_schedule.parse::<u64>() { + return Ok(now.timestamp_millis() as u64 + offset); + } + } + + let cron = cron_expression + .parse::<saffron::Cron>() + .map_err(|_| anyhow::anyhow!("invalid cron expression"))?; + let Some(next_deadline) = cron.next_after(now) else { + return Err(anyhow::anyhow!("invalid cron expression")); + }; + Ok(next_deadline.timestamp_millis() as u64) +} + +fn validate_backoff_schedule( + backoff_schedule: &Vec<u32>, +) -> Result<(), AnyError> { + if backoff_schedule.len() > MAX_BACKOFF_COUNT { + return Err(type_error("Invalid backoff schedule")); + } + if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) { + return Err(type_error("Invalid backoff schedule")); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_compute_next_deadline() { + let now = crate::time::utc_now().timestamp_millis() as u64; + assert!(compute_next_deadline("*/1 * * * *").unwrap() > now); + assert!(compute_next_deadline("* * * * *").unwrap() > now); + assert!(compute_next_deadline("bogus").is_err()); + assert!(compute_next_deadline("* * * * * *").is_err()); + assert!(compute_next_deadline("* * *").is_err()); + } +} |