From 905b40133f0753326b857e5d3de33dc488979171 Mon Sep 17 00:00:00 2001 From: Sion Smith Date: Fri, 12 Jun 2026 06:56:23 +0100 Subject: [PATCH] fix: stop re-running restore and one-shot backup jobs after completion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A finished Job has status.active=0, which is_job_running() read as "no job running", so every 5-minute requeue created a fresh restore Job — and the completion check ran only after the creation step, so the duplicate was already spawned by the time the status flipped to RestoreComplete. Terminally failed Jobs were re-created forever, and the one-shot KafkaBackup path had the identical defect. - Add jobs::job_state::classify_jobs(): aggregate the full set of Jobs for a CR into Succeeded/InProgress/Failed/NoJobs using Job conditions (Complete/Failed), so pod retries within backoffLimit stay InProgress and a pending Job with no status yet blocks duplicate creation. - Gate Job creation on that classification: restores never create a second Job; one-shot backups only re-run via the manual trigger annotation, which still never stacks onto an active Job. - Watch owned Jobs (.owns) in both controllers so status reflects completion/failure within seconds instead of the next requeue. - Report a terminal RestoreFailed condition for failed restore Jobs instead of silently re-creating them. - Make status patches idempotent (Job-derived timestamps, skip when the condition/lastBackup already reflects the outcome) so the new watch cannot churn lastTransitionTime in a reconcile loop. Fixes #29 Co-Authored-By: Claude Fable 5 --- CHANGELOG.md | 9 + Cargo.lock | 2 +- Cargo.toml | 2 +- .../helm/strimzi-backup-operator/Chart.yaml | 4 +- src/controllers/backup.rs | 8 + src/controllers/restore.rs | 8 + src/jobs/job_state.rs | 88 ++++++++ src/jobs/mod.rs | 1 + src/reconcilers/backup.rs | 171 ++++++++------ src/reconcilers/restore.rs | 209 ++++++++++-------- tests/integration/job_state_test.rs | 190 ++++++++++++++++ tests/integration/mod.rs | 1 + 12 files changed, 534 insertions(+), 159 deletions(-) create mode 100644 src/jobs/job_state.rs create mode 100644 tests/integration/job_state_test.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 37b53e8..ed59f87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ All notable changes to this project will be documented in this file. +## 0.2.9 - 2026-06-12 + +### Fixed + +- Stop re-running `KafkaRestore` jobs after completion. A finished restore Job has `active=0`, which the reconciler read as "no job running" and re-created the Job on every 5-minute requeue; Job creation is now gated on the full set of Jobs for the resource (running, succeeded, or failed), and one-shot `KafkaBackup` runs are gated the same way. Fixes [#29](https://github.com/osodevops/strimzi-backup-operator/issues/29). +- Watch backup/restore Jobs from the controllers so `KafkaBackup`/`KafkaRestore` status reflects Job completion or failure within seconds instead of after the next periodic requeue. +- Treat a terminally failed restore Job (backoffLimit exhausted) as terminal: report a `RestoreFailed` condition instead of silently re-creating the Job every requeue. Pod-level retries remain owned by the Job's `backoffLimit`. +- Make status patches idempotent so repeated reconciles no longer rewrite `lastTransitionTime`/`completionTime` with the current wall clock on every pass. + ## 0.2.8 - 2026-06-11 ### Added diff --git a/Cargo.lock b/Cargo.lock index 5a1daf6..dfdff64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1099,7 +1099,7 @@ dependencies = [ [[package]] name = "kafka-backup-operator" -version = "0.2.8" +version = "0.2.9" dependencies = [ "anyhow", "assert-json-diff", diff --git a/Cargo.toml b/Cargo.toml index f96a487..2a21846 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kafka-backup-operator" -version = "0.2.8" +version = "0.2.9" edition = "2021" rust-version = "1.88" license = "Apache-2.0" diff --git a/deploy/helm/strimzi-backup-operator/Chart.yaml b/deploy/helm/strimzi-backup-operator/Chart.yaml index 9815ea8..5c159c5 100644 --- a/deploy/helm/strimzi-backup-operator/Chart.yaml +++ b/deploy/helm/strimzi-backup-operator/Chart.yaml @@ -2,8 +2,8 @@ apiVersion: v2 name: strimzi-backup-operator description: Kubernetes operator for Kafka backup and restore, integrated with Strimzi type: application -version: 0.2.8 -appVersion: "0.2.8" +version: 0.2.9 +appVersion: "0.2.9" home: https://github.com/osodevops/strimzi-backup-operator sources: - https://github.com/osodevops/strimzi-backup-operator diff --git a/src/controllers/backup.rs b/src/controllers/backup.rs index 0522b20..e5a1820 100644 --- a/src/controllers/backup.rs +++ b/src/controllers/backup.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use futures::StreamExt; +use k8s_openapi::api::batch::v1::Job; use kube::{ runtime::{ controller::{Action, Controller}, @@ -46,6 +47,7 @@ fn error_policy( pub async fn run(client: Client, metrics: Arc) { let backups = Api::::all(client.clone()); + let jobs = Api::::all(client.clone()); let context = Arc::new(Context { client: client.clone(), @@ -55,6 +57,12 @@ pub async fn run(client: Client, metrics: Arc) { info!("Starting KafkaBackup controller"); Controller::new(backups, Config::default().any_semantic()) + // Watch owned Jobs so completion/failure updates the KafkaBackup + // status immediately instead of waiting for the periodic requeue. + .owns( + jobs, + Config::default().labels("kafkabackup.com/type=backup"), + ) .shutdown_on_signal() .run(reconcile, error_policy, context) .for_each(|res| async move { diff --git a/src/controllers/restore.rs b/src/controllers/restore.rs index a365e19..c3ff771 100644 --- a/src/controllers/restore.rs +++ b/src/controllers/restore.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use futures::StreamExt; +use k8s_openapi::api::batch::v1::Job; use kube::{ runtime::{ controller::{Action, Controller}, @@ -46,6 +47,7 @@ fn error_policy( pub async fn run(client: Client, metrics: Arc) { let restores = Api::::all(client.clone()); + let jobs = Api::::all(client.clone()); let context = Arc::new(Context { client: client.clone(), @@ -55,6 +57,12 @@ pub async fn run(client: Client, metrics: Arc) { info!("Starting KafkaRestore controller"); Controller::new(restores, Config::default().any_semantic()) + // Watch owned Jobs so completion/failure updates the KafkaRestore + // status immediately instead of waiting for the periodic requeue. + .owns( + jobs, + Config::default().labels("kafkabackup.com/type=restore"), + ) .shutdown_on_signal() .run(reconcile, error_policy, context) .for_each(|res| async move { diff --git a/src/jobs/job_state.rs b/src/jobs/job_state.rs new file mode 100644 index 0000000..ae10020 --- /dev/null +++ b/src/jobs/job_state.rs @@ -0,0 +1,88 @@ +use k8s_openapi::api::batch::v1::Job; + +/// Aggregate state of the Jobs spawned for a backup/restore CR. +/// +/// Derived from the full list of Jobs matching the CR's label selector so +/// the reconciler can decide whether another Job may be created. A completed +/// or failed Job must block re-creation just like a running one: backup and +/// restore runs are one-shot, and pod-level retries are owned by the Job's +/// `backoffLimit`, not the reconciler. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum JobsState { + /// No Jobs exist for this resource. + NoJobs, + /// At least one Job is running or has not reached a terminal state yet. + InProgress, + /// A Job completed successfully. + Succeeded { job_name: String }, + /// A Job failed terminally (exhausted its backoffLimit). + Failed { job_name: String }, +} + +/// Classify the Jobs belonging to one CR into an aggregate state. +/// +/// Precedence: Succeeded > InProgress > Failed. A succeeded Job means the +/// operation completed even if a stray duplicate is still running; a failed +/// Job with a newer active run (manual re-trigger) reports running. +pub fn classify_jobs(jobs: &[Job]) -> JobsState { + if let Some(job) = jobs.iter().find(|j| job_succeeded(j)) { + return JobsState::Succeeded { + job_name: job.metadata.name.clone().unwrap_or_default(), + }; + } + // Any non-terminal Job counts as in progress, including a just-created + // Job whose status the Job controller has not populated yet. + if jobs.iter().any(|j| !job_failed(j)) { + return JobsState::InProgress; + } + if let Some(job) = jobs.iter().find(|j| job_failed(j)) { + return JobsState::Failed { + job_name: job.metadata.name.clone().unwrap_or_default(), + }; + } + JobsState::NoJobs +} + +/// A Job succeeded if it reports a `Complete=True` condition or any +/// succeeded pods. +pub fn job_succeeded(job: &Job) -> bool { + let Some(status) = job.status.as_ref() else { + return false; + }; + status.succeeded.unwrap_or(0) > 0 || has_condition(job, "Complete") +} + +/// A Job failed terminally only when it reports a `Failed=True` condition +/// (backoffLimit exhausted or deadline exceeded). `status.failed > 0` alone +/// counts pod retries still owned by the Job controller and is not terminal. +pub fn job_failed(job: &Job) -> bool { + has_condition(job, "Failed") +} + +fn has_condition(job: &Job, condition_type: &str) -> bool { + job.status + .as_ref() + .and_then(|s| s.conditions.as_ref()) + .is_some_and(|conds| { + conds + .iter() + .any(|c| c.type_ == condition_type && c.status == "True") + }) +} + +/// Whether a restore Job may be created. Restores are strictly one-shot: +/// any existing Job — running, succeeded, or failed — suppresses creation. +pub fn should_create_restore_job(state: &JobsState) -> bool { + matches!(state, JobsState::NoJobs) +} + +/// Whether a one-shot backup Job may be created. A manual trigger +/// (`kafkabackup.com/trigger=now`) requests a fresh run, so it bypasses the +/// "a terminal Job already exists" gate — but never stacks onto an active Job. +pub fn should_create_backup_job(state: &JobsState, manually_triggered: bool) -> bool { + match state { + JobsState::InProgress => false, + JobsState::NoJobs => true, + JobsState::Succeeded { .. } | JobsState::Failed { .. } => manually_triggered, + } +} diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index e9feaf6..60bf48e 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,4 +1,5 @@ pub mod backup_job; pub mod cronjob; +pub mod job_state; pub mod restore_job; pub mod templates; diff --git a/src/reconcilers/backup.rs b/src/reconcilers/backup.rs index 64c5179..ac479ce 100644 --- a/src/reconcilers/backup.rs +++ b/src/reconcilers/backup.rs @@ -16,6 +16,7 @@ use crate::crd::{KafkaBackup, KafkaBackupStatus}; use crate::error::{Error, Result}; use crate::jobs::backup_job::build_backup_job; use crate::jobs::cronjob::build_backup_cronjob; +use crate::jobs::job_state::{classify_jobs, job_failed, job_succeeded, should_create_backup_job}; use crate::metrics::prometheus::MetricsState; use crate::reconcilers::{ job_service_account_name, FINALIZER, TRIGGER_ANNOTATION, TRIGGER_VALUE_NOW, @@ -131,22 +132,27 @@ pub async fn reconcile_backup( .and_then(|a| a.get(TRIGGER_ANNOTATION)) .is_some_and(|v| v == TRIGGER_VALUE_NOW); - // Step 7: Create one-shot Job (if no schedule, or if manually triggered) + // Step 7: Create one-shot Job (if no schedule, or if manually triggered). + // A one-shot run must only be created when no Job exists for this CR at + // all — a succeeded or failed Job has `active=0`, and treating it as "no + // job running" re-ran the backup on every requeue (issue #29). A manual + // trigger explicitly requests a fresh run, but never stacks onto an + // active Job. if backup.spec.schedule.is_none() || triggered { - let job_name = format!("{name}-{}", Utc::now().format("%Y%m%d-%H%M%S")); - let job = build_backup_job( - &backup, - &job_name, - &config_map_name, - &kafka_cluster, - &resolved_auth, - job_service_account.as_deref(), - )?; - let jobs_api: Api = Api::namespaced(client.clone(), &namespace); + let jobs = jobs_api.list(&backup_jobs_selector(&name)).await?; + + if should_create_backup_job(&classify_jobs(&jobs.items), triggered) { + let job_name = format!("{name}-{}", Utc::now().format("%Y%m%d-%H%M%S")); + let job = build_backup_job( + &backup, + &job_name, + &config_map_name, + &kafka_cluster, + &resolved_auth, + job_service_account.as_deref(), + )?; - // Check if a job is already running - if !is_job_running(&jobs_api, &name).await? { jobs_api .create(&PostParams::default(), &job) .await @@ -155,7 +161,7 @@ pub async fn reconcile_backup( info!(%job_name, "Created backup job"); update_status_running(&backup_api, &name, generation).await?; } else { - debug!(%name, "Backup job already running, skipping"); + debug!(%name, "Backup job already exists, skipping creation"); } // Remove trigger annotation if present @@ -303,15 +309,10 @@ async fn create_or_update_config_map( Ok(()) } -async fn is_job_running(jobs_api: &Api, backup_name: &str) -> Result { - let lp = kube::api::ListParams::default().labels(&format!( +fn backup_jobs_selector(backup_name: &str) -> kube::api::ListParams { + kube::api::ListParams::default().labels(&format!( "kafkabackup.com/backup={backup_name},kafkabackup.com/type=backup" - )); - let jobs = jobs_api.list(&lp).await?; - let running = jobs - .iter() - .any(|j| j.status.as_ref().is_some_and(|s| s.active.unwrap_or(0) > 0)); - Ok(running) + )) } async fn apply_retention_policy( @@ -367,10 +368,7 @@ async fn active_backup_ids( backup: &KafkaBackup, ) -> Result> { let jobs_api: Api = Api::namespaced(client.clone(), namespace); - let lp = kube::api::ListParams::default().labels(&format!( - "kafkabackup.com/backup={backup_name},kafkabackup.com/type=backup" - )); - let jobs = jobs_api.list(&lp).await?; + let jobs = jobs_api.list(&backup_jobs_selector(backup_name)).await?; let mut active = BTreeSet::new(); for job in jobs { @@ -402,47 +400,75 @@ async fn check_job_completion( let name = backup.name_any(); let namespace = backup.namespace().unwrap_or_default(); let jobs_api: Api = Api::namespaced(client.clone(), &namespace); + let jobs = jobs_api.list(&backup_jobs_selector(&name)).await?; - let lp = kube::api::ListParams::default().labels(&format!( - "kafkabackup.com/backup={name},kafkabackup.com/type=backup" - )); - let jobs = jobs_api.list(&lp).await?; - - for job in &jobs { - let job_name = job.metadata.name.as_deref().unwrap_or(""); - if let Some(status) = &job.status { - if status.succeeded.unwrap_or(0) > 0 { - info!(%job_name, "Backup job completed successfully"); - let backup_id = job_name.to_string(); - let now = Utc::now(); - - let history_entry = BackupHistoryEntry { - id: backup_id.clone(), - status: BackupStatus::Completed, - start_time: job - .status - .as_ref() - .and_then(|s| s.start_time.as_ref()) - .map(|t| t.0) - .unwrap_or(now), - completion_time: Some(now), - size_bytes: None, - topics_backed_up: None, - partitions_backed_up: None, - }; - - update_status_completed(backup_api, &name, generation, &history_entry).await?; - } else if status.failed.unwrap_or(0) > 0 { - error!(%job_name, "Backup job failed"); - update_status_error( - backup_api, - &name, - generation, - &Error::JobCreationFailed(format!("Job {job_name} failed")), - ) - .await?; - } + // Record only the most recent terminal Job, and only once — re-patching + // an already-recorded outcome churns the status (fresh lastTransitionTime + // and completionTime) and retriggers the watch on every reconcile. + let latest_terminal = jobs + .items + .iter() + .filter(|j| job_succeeded(j) || job_failed(j)) + .max_by_key(|j| { + j.status + .as_ref() + .and_then(|s| s.start_time.as_ref()) + .map(|t| t.0) + }); + let Some(job) = latest_terminal else { + return Ok(()); + }; + let job_name = job.metadata.name.as_deref().unwrap_or(""); + + if job_succeeded(job) { + let already_recorded = backup + .status + .as_ref() + .and_then(|s| s.last_backup.as_ref()) + .is_some_and(|lb| lb.id == job_name); + if already_recorded { + return Ok(()); } + + info!(%job_name, "Backup job completed successfully"); + let now = Utc::now(); + let job_status = job.status.as_ref(); + let history_entry = BackupHistoryEntry { + id: job_name.to_string(), + status: BackupStatus::Completed, + start_time: job_status + .and_then(|s| s.start_time.as_ref()) + .map(|t| t.0) + .unwrap_or(now), + completion_time: Some( + job_status + .and_then(|s| s.completion_time.as_ref()) + .map(|t| t.0) + .unwrap_or(now), + ), + size_bytes: None, + topics_backed_up: None, + partitions_backed_up: None, + }; + + update_status_completed(backup_api, &name, generation, &history_entry).await?; + } else { + let message = format!("Backup job {job_name} failed"); + let already_recorded = backup + .status + .as_ref() + .map(|s| s.conditions.as_slice()) + .and_then(|c| find_condition(c, CONDITION_TYPE_ERROR)) + .is_some_and(|c| c.message.as_deref() == Some(message.as_str())); + if already_recorded { + return Ok(()); + } + + error!(%job_name, "Backup job failed"); + let mut status = current_backup_status(backup_api, &name).await?; + status.conditions = error_conditions(REASON_BACKUP_FAILED, &message); + status.observed_generation = Some(generation); + patch_status(backup_api, &name, &status).await?; } Ok(()) @@ -462,6 +488,13 @@ async fn update_status_scheduled( next_backup: &str, ) -> Result<()> { let mut status = current_backup_status(api, name).await?; + let already_current = find_condition(&status.conditions, CONDITION_TYPE_READY) + .is_some_and(|c| c.reason.as_deref() == Some(REASON_BACKUP_SCHEDULED)) + && status.next_scheduled_backup.as_deref() == Some(next_backup) + && status.observed_generation == Some(generation); + if already_current { + return Ok(()); + } status.conditions = vec![ready( REASON_BACKUP_SCHEDULED, &format!("Next backup scheduled: {next_backup}"), @@ -476,6 +509,14 @@ async fn update_status_suspended( name: &str, generation: i64, ) -> Result<()> { + let current = current_backup_status(api, name).await?; + let already_current = find_condition(¤t.conditions, CONDITION_TYPE_READY) + .is_some_and(|c| c.reason.as_deref() == Some(REASON_BACKUP_SUSPENDED)) + && current.next_scheduled_backup.is_none() + && current.observed_generation == Some(generation); + if already_current { + return Ok(()); + } // Manual merge patch: `nextScheduledBackup` must be an explicit null to be // cleared, but `KafkaBackupStatus` skips `None` fields when serializing. let condition = ready(REASON_BACKUP_SUSPENDED, "Backup schedule is suspended"); diff --git a/src/reconcilers/restore.rs b/src/reconcilers/restore.rs index 08e3065..5baa322 100644 --- a/src/reconcilers/restore.rs +++ b/src/reconcilers/restore.rs @@ -10,9 +10,10 @@ use kube::{ use tracing::{debug, error, info, warn}; use crate::adapters::restore_config::build_restore_config_yaml; -use crate::crd::common::{RestoreInfo, RestoreStatus}; +use crate::crd::common::{Condition, RestoreInfo, RestoreStatus}; use crate::crd::{KafkaBackup, KafkaRestore, KafkaRestoreStatus}; use crate::error::{Error, Result}; +use crate::jobs::job_state::{classify_jobs, JobsState}; use crate::jobs::restore_job::build_restore_job; use crate::metrics::prometheus::MetricsState; use crate::reconcilers::{job_service_account_name, FINALIZER}; @@ -57,6 +58,75 @@ pub async fn reconcile_restore( } } + // Decide from the full set of Jobs for this restore. A Job in any state — + // running, pending, succeeded, or failed — must suppress creating another + // one: restores are one-shot, and pod retries belong to the Job's + // backoffLimit (issue #29: a completed Job has `active=0`, which used to + // read as "no job running" and re-ran the restore on every requeue). + let jobs_api: Api = Api::namespaced(client.clone(), &namespace); + let lp = kube::api::ListParams::default().labels(&format!( + "kafkabackup.com/restore={name},kafkabackup.com/type=restore" + )); + let jobs = jobs_api.list(&lp).await?; + + match classify_jobs(&jobs.items) { + JobsState::Succeeded { job_name } => { + info!(%job_name, "Restore job completed successfully"); + let job_status = jobs + .items + .iter() + .find(|j| j.metadata.name.as_deref() == Some(job_name.as_str())) + .and_then(|j| j.status.as_ref()); + let now = Utc::now(); + let restore_info = RestoreInfo { + start_time: job_status + .and_then(|s| s.start_time.as_ref()) + .map(|t| t.0) + .unwrap_or(now), + completion_time: Some( + job_status + .and_then(|s| s.completion_time.as_ref()) + .map(|t| t.0) + .unwrap_or(now), + ), + status: RestoreStatus::Completed, + restored_topics: None, + restored_partitions: None, + restored_bytes: None, + point_in_time_target: None, + actual_point_in_time: None, + }; + update_status_completed(&restore_api, &name, generation, &restore_info).await?; + return Ok(()); + } + JobsState::Failed { job_name } => { + // Terminal: do not re-create the Job. Patch once — repeating the + // patch would churn lastTransitionTime and retrigger the watch. + if !has_condition_reason( + restore.status.as_ref(), + CONDITION_TYPE_ERROR, + REASON_RESTORE_FAILED, + ) { + error!(%job_name, "Restore job failed"); + update_status_failed(&restore_api, &name, generation, &job_name).await?; + } + return Ok(()); + } + JobsState::InProgress => { + if !has_condition_reason( + restore.status.as_ref(), + CONDITION_TYPE_READY, + REASON_RESTORE_RUNNING, + ) { + update_status_running(&restore_api, &name, generation).await?; + } + return Ok(()); + } + JobsState::NoJobs => {} + } + + // No Job exists yet — resolve dependencies and create one. + // Step 1: Resolve source backup CR let backup_api: Api = Api::namespaced(client.clone(), &namespace); let source_backup = backup_api @@ -114,36 +184,42 @@ pub async fn reconcile_restore( ) .await?; - // Step 6: Create restore Job if not already running - let jobs_api: Api = Api::namespaced(client.clone(), &namespace); - if !is_job_running(&jobs_api, &name).await? { - let job_name = format!("{name}-{}", Utc::now().format("%Y%m%d-%H%M%S")); - let job_service_account = job_service_account_name(); - let job = build_restore_job( - &restore, - &job_name, - &config_map_name, - &kafka_cluster, - &resolved_auth, - &source_backup, - job_service_account.as_deref(), - )?; - - jobs_api - .create(&PostParams::default(), &job) - .await - .map_err(|e| Error::JobCreationFailed(e.to_string()))?; - - info!(%job_name, "Created restore job"); - update_status_running(&restore_api, &name, generation).await?; - } + // Step 6: Create the restore Job + let job_name = format!("{name}-{}", Utc::now().format("%Y%m%d-%H%M%S")); + let job_service_account = job_service_account_name(); + let job = build_restore_job( + &restore, + &job_name, + &config_map_name, + &kafka_cluster, + &resolved_auth, + &source_backup, + job_service_account.as_deref(), + )?; + + jobs_api + .create(&PostParams::default(), &job) + .await + .map_err(|e| Error::JobCreationFailed(e.to_string()))?; - // Step 7: Check job completion - check_job_completion(&client, &restore_api, &restore, generation).await?; + info!(%job_name, "Created restore job"); + update_status_running(&restore_api, &name, generation).await?; Ok(()) } +/// Whether the current status has a condition of `condition_type` with the +/// given reason. Used to avoid re-patching an identical status, which would +/// churn `lastTransitionTime` and retrigger the watch. +fn has_condition_reason( + status: Option<&KafkaRestoreStatus>, + condition_type: &str, + reason: &str, +) -> bool { + let conditions: &[Condition] = status.map(|s| s.conditions.as_slice()).unwrap_or(&[]); + find_condition(conditions, condition_type).is_some_and(|c| c.reason.as_deref() == Some(reason)) +} + async fn handle_cleanup(restore: &KafkaRestore, client: &Client, namespace: &str) -> Result<()> { let name = restore.name_any(); info!(%name, "Cleaning up KafkaRestore resources"); @@ -251,70 +327,6 @@ async fn create_or_update_config_map( Ok(()) } -async fn is_job_running(jobs_api: &Api, restore_name: &str) -> Result { - let lp = kube::api::ListParams::default().labels(&format!( - "kafkabackup.com/restore={restore_name},kafkabackup.com/type=restore" - )); - let jobs = jobs_api.list(&lp).await?; - let running = jobs - .iter() - .any(|j| j.status.as_ref().is_some_and(|s| s.active.unwrap_or(0) > 0)); - Ok(running) -} - -async fn check_job_completion( - client: &Client, - restore_api: &Api, - restore: &KafkaRestore, - generation: i64, -) -> Result<()> { - let name = restore.name_any(); - let namespace = restore.namespace().unwrap_or_default(); - let jobs_api: Api = Api::namespaced(client.clone(), &namespace); - - let lp = kube::api::ListParams::default().labels(&format!( - "kafkabackup.com/restore={name},kafkabackup.com/type=restore" - )); - let jobs = jobs_api.list(&lp).await?; - - for job in &jobs { - let job_name = job.metadata.name.as_deref().unwrap_or(""); - if let Some(status) = &job.status { - if status.succeeded.unwrap_or(0) > 0 { - info!(%job_name, "Restore job completed successfully"); - let now = Utc::now(); - let restore_info = RestoreInfo { - start_time: job - .status - .as_ref() - .and_then(|s| s.start_time.as_ref()) - .map(|t| t.0) - .unwrap_or(now), - completion_time: Some(now), - status: RestoreStatus::Completed, - restored_topics: None, - restored_partitions: None, - restored_bytes: None, - point_in_time_target: None, - actual_point_in_time: None, - }; - update_status_completed(restore_api, &name, generation, &restore_info).await?; - } else if status.failed.unwrap_or(0) > 0 { - error!(%job_name, "Restore job failed"); - update_status_error( - restore_api, - &name, - generation, - &Error::JobCreationFailed(format!("Restore job {job_name} failed")), - ) - .await?; - } - } - } - - Ok(()) -} - async fn update_status_running(api: &Api, name: &str, generation: i64) -> Result<()> { let status = KafkaRestoreStatus { conditions: vec![not_ready(REASON_RESTORE_RUNNING, "Restore job is running")], @@ -346,6 +358,23 @@ async fn update_status_completed( patch_status(api, name, &status).await } +async fn update_status_failed( + api: &Api, + name: &str, + generation: i64, + job_name: &str, +) -> Result<()> { + let status = KafkaRestoreStatus { + conditions: error_conditions( + REASON_RESTORE_FAILED, + &format!("Restore job {job_name} failed"), + ), + observed_generation: Some(generation), + ..Default::default() + }; + patch_status(api, name, &status).await +} + async fn update_status_error( api: &Api, name: &str, diff --git a/tests/integration/job_state_test.rs b/tests/integration/job_state_test.rs new file mode 100644 index 0000000..2d46795 --- /dev/null +++ b/tests/integration/job_state_test.rs @@ -0,0 +1,190 @@ +use k8s_openapi::api::batch::v1::{Job, JobCondition, JobStatus}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use kafka_backup_operator::jobs::job_state::{ + classify_jobs, should_create_backup_job, should_create_restore_job, JobsState, +}; + +fn job(name: &str, status: Option) -> Job { + Job { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + spec: None, + status, + } +} + +fn condition(type_: &str, status: &str) -> JobCondition { + JobCondition { + type_: type_.to_string(), + status: status.to_string(), + ..Default::default() + } +} + +fn active_job(name: &str) -> Job { + job( + name, + Some(JobStatus { + active: Some(1), + ..Default::default() + }), + ) +} + +fn succeeded_job(name: &str) -> Job { + job( + name, + Some(JobStatus { + succeeded: Some(1), + conditions: Some(vec![condition("Complete", "True")]), + ..Default::default() + }), + ) +} + +fn failed_job(name: &str) -> Job { + job( + name, + Some(JobStatus { + failed: Some(4), + conditions: Some(vec![condition("Failed", "True")]), + ..Default::default() + }), + ) +} + +#[test] +fn test_no_jobs_allows_creation() { + let state = classify_jobs(&[]); + assert_eq!(state, JobsState::NoJobs); + assert!(should_create_restore_job(&state)); + assert!(should_create_backup_job(&state, false)); +} + +#[test] +fn test_active_job_blocks_creation() { + let state = classify_jobs(&[active_job("r-1")]); + assert_eq!(state, JobsState::InProgress); + assert!(!should_create_restore_job(&state)); + assert!(!should_create_backup_job(&state, false)); +} + +/// A Job that was just created has no meaningful status yet (the Job +/// controller has not set `active`). It must still count as in progress — +/// otherwise a reconcile racing Job-controller status propagation creates a +/// duplicate. +#[test] +fn test_pending_job_without_status_blocks_creation() { + let state = classify_jobs(&[job("r-1", None)]); + assert_eq!(state, JobsState::InProgress); + assert!(!should_create_restore_job(&state)); +} + +#[test] +fn test_pending_job_with_empty_status_blocks_creation() { + let state = classify_jobs(&[job("r-1", Some(JobStatus::default()))]); + assert_eq!(state, JobsState::InProgress); + assert!(!should_create_restore_job(&state)); +} + +/// Issue #29: a completed Job has `active=0`, which the operator used to read +/// as "no job running" — re-creating the restore Job on every requeue. +#[test] +fn test_succeeded_job_blocks_creation() { + let state = classify_jobs(&[succeeded_job("r-1")]); + assert_eq!( + state, + JobsState::Succeeded { + job_name: "r-1".to_string() + } + ); + assert!(!should_create_restore_job(&state)); + assert!(!should_create_backup_job(&state, false)); +} + +/// `succeeded > 0` alone (no conditions yet) must already count as success. +#[test] +fn test_succeeded_count_without_conditions_blocks_creation() { + let state = classify_jobs(&[job( + "r-1", + Some(JobStatus { + succeeded: Some(1), + ..Default::default() + }), + )]); + assert_eq!( + state, + JobsState::Succeeded { + job_name: "r-1".to_string() + } + ); + assert!(!should_create_restore_job(&state)); +} + +/// A terminally failed restore must not be silently re-run every requeue; +/// pod-level retries are owned by the Job's backoffLimit. +#[test] +fn test_failed_job_blocks_creation() { + let state = classify_jobs(&[failed_job("r-1")]); + assert_eq!( + state, + JobsState::Failed { + job_name: "r-1".to_string() + } + ); + assert!(!should_create_restore_job(&state)); + assert!(!should_create_backup_job(&state, false)); +} + +/// `failed > 0` while the Job is still retrying within its backoffLimit (no +/// `Failed` condition) is not terminal. +#[test] +fn test_retrying_job_is_in_progress_not_failed() { + let state = classify_jobs(&[job( + "r-1", + Some(JobStatus { + failed: Some(2), + active: Some(1), + ..Default::default() + }), + )]); + assert_eq!(state, JobsState::InProgress); +} + +/// A stray duplicate may still be running next to an already-succeeded Job +/// (state left behind by pre-fix operator versions). Success wins: the +/// restore is complete. +#[test] +fn test_succeeded_takes_precedence_over_active() { + let state = classify_jobs(&[succeeded_job("r-1"), active_job("r-2")]); + assert_eq!( + state, + JobsState::Succeeded { + job_name: "r-1".to_string() + } + ); + assert!(!should_create_restore_job(&state)); +} + +/// A failed Job plus a newer active run (manual re-trigger) reports running. +#[test] +fn test_active_takes_precedence_over_failed() { + let state = classify_jobs(&[failed_job("b-1"), active_job("b-2")]); + assert_eq!(state, JobsState::InProgress); +} + +/// Manual trigger annotation requests a fresh backup run after a terminal +/// Job, but never stacks onto an active one. +#[test] +fn test_backup_trigger_allows_rerun_after_terminal_states() { + let succeeded = classify_jobs(&[succeeded_job("b-1")]); + assert!(should_create_backup_job(&succeeded, true)); + + let failed = classify_jobs(&[failed_job("b-1")]); + assert!(should_create_backup_job(&failed, true)); + + let in_progress = classify_jobs(&[active_job("b-1")]); + assert!(!should_create_backup_job(&in_progress, true)); +} diff --git a/tests/integration/mod.rs b/tests/integration/mod.rs index 5f25369..a084521 100644 --- a/tests/integration/mod.rs +++ b/tests/integration/mod.rs @@ -1,3 +1,4 @@ mod backup_test; +mod job_state_test; mod reconcile_backup_test; mod restore_test;