Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

All notable changes to this project will be documented in this file.

## 0.2.9 - 2026-06-12

### Fixed

- Stop re-running `KafkaRestore` jobs after completion. A finished restore Job has `active=0`, which the reconciler read as "no job running" and re-created the Job on every 5-minute requeue; Job creation is now gated on the full set of Jobs for the resource (running, succeeded, or failed), and one-shot `KafkaBackup` runs are gated the same way. Fixes [#29](https://github.com/osodevops/strimzi-backup-operator/issues/29).
- Watch backup/restore Jobs from the controllers so `KafkaBackup`/`KafkaRestore` status reflects Job completion or failure within seconds instead of after the next periodic requeue.
- Treat a terminally failed restore Job (backoffLimit exhausted) as terminal: report a `RestoreFailed` condition instead of silently re-creating the Job every requeue. Pod-level retries remain owned by the Job's `backoffLimit`.
- Make status patches idempotent so repeated reconciles no longer rewrite `lastTransitionTime`/`completionTime` with the current wall clock on every pass.

## 0.2.8 - 2026-06-11

### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kafka-backup-operator"
version = "0.2.8"
version = "0.2.9"
edition = "2021"
rust-version = "1.88"
license = "Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions deploy/helm/strimzi-backup-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ apiVersion: v2
name: strimzi-backup-operator
description: Kubernetes operator for Kafka backup and restore, integrated with Strimzi
type: application
version: 0.2.8
appVersion: "0.2.8"
version: 0.2.9
appVersion: "0.2.9"
home: https://github.com/osodevops/strimzi-backup-operator
sources:
- https://github.com/osodevops/strimzi-backup-operator
Expand Down
8 changes: 8 additions & 0 deletions src/controllers/backup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use futures::StreamExt;
use k8s_openapi::api::batch::v1::Job;
use kube::{
runtime::{
controller::{Action, Controller},
Expand Down Expand Up @@ -46,6 +47,7 @@ fn error_policy(

pub async fn run(client: Client, metrics: Arc<MetricsState>) {
let backups = Api::<KafkaBackup>::all(client.clone());
let jobs = Api::<Job>::all(client.clone());

let context = Arc::new(Context {
client: client.clone(),
Expand All @@ -55,6 +57,12 @@ pub async fn run(client: Client, metrics: Arc<MetricsState>) {
info!("Starting KafkaBackup controller");

Controller::new(backups, Config::default().any_semantic())
// Watch owned Jobs so completion/failure updates the KafkaBackup
// status immediately instead of waiting for the periodic requeue.
.owns(
jobs,
Config::default().labels("kafkabackup.com/type=backup"),
)
.shutdown_on_signal()
.run(reconcile, error_policy, context)
.for_each(|res| async move {
Expand Down
8 changes: 8 additions & 0 deletions src/controllers/restore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use futures::StreamExt;
use k8s_openapi::api::batch::v1::Job;
use kube::{
runtime::{
controller::{Action, Controller},
Expand Down Expand Up @@ -46,6 +47,7 @@ fn error_policy(

pub async fn run(client: Client, metrics: Arc<MetricsState>) {
let restores = Api::<KafkaRestore>::all(client.clone());
let jobs = Api::<Job>::all(client.clone());

let context = Arc::new(Context {
client: client.clone(),
Expand All @@ -55,6 +57,12 @@ pub async fn run(client: Client, metrics: Arc<MetricsState>) {
info!("Starting KafkaRestore controller");

Controller::new(restores, Config::default().any_semantic())
// Watch owned Jobs so completion/failure updates the KafkaRestore
// status immediately instead of waiting for the periodic requeue.
.owns(
jobs,
Config::default().labels("kafkabackup.com/type=restore"),
)
.shutdown_on_signal()
.run(reconcile, error_policy, context)
.for_each(|res| async move {
Expand Down
88 changes: 88 additions & 0 deletions src/jobs/job_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use k8s_openapi::api::batch::v1::Job;

/// Aggregate state of the Jobs spawned for a backup/restore CR.
///
/// Derived from the full list of Jobs matching the CR's label selector so
/// the reconciler can decide whether another Job may be created. A completed
/// or failed Job must block re-creation just like a running one: backup and
/// restore runs are one-shot, and pod-level retries are owned by the Job's
/// `backoffLimit`, not the reconciler.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JobsState {
/// No Jobs exist for this resource.
NoJobs,
/// At least one Job is running or has not reached a terminal state yet.
InProgress,
/// A Job completed successfully.
Succeeded { job_name: String },
/// A Job failed terminally (exhausted its backoffLimit).
Failed { job_name: String },
}

/// Classify the Jobs belonging to one CR into an aggregate state.
///
/// Precedence: Succeeded > InProgress > Failed. A succeeded Job means the
/// operation completed even if a stray duplicate is still running; a failed
/// Job with a newer active run (manual re-trigger) reports running.
pub fn classify_jobs(jobs: &[Job]) -> JobsState {
if let Some(job) = jobs.iter().find(|j| job_succeeded(j)) {
return JobsState::Succeeded {
job_name: job.metadata.name.clone().unwrap_or_default(),
};
}
// Any non-terminal Job counts as in progress, including a just-created
// Job whose status the Job controller has not populated yet.
if jobs.iter().any(|j| !job_failed(j)) {
return JobsState::InProgress;
}
if let Some(job) = jobs.iter().find(|j| job_failed(j)) {
return JobsState::Failed {
job_name: job.metadata.name.clone().unwrap_or_default(),
};
}
JobsState::NoJobs
}

/// A Job succeeded if it reports a `Complete=True` condition or any
/// succeeded pods.
pub fn job_succeeded(job: &Job) -> bool {
let Some(status) = job.status.as_ref() else {
return false;
};
status.succeeded.unwrap_or(0) > 0 || has_condition(job, "Complete")
}

/// A Job failed terminally only when it reports a `Failed=True` condition
/// (backoffLimit exhausted or deadline exceeded). `status.failed > 0` alone
/// counts pod retries still owned by the Job controller and is not terminal.
pub fn job_failed(job: &Job) -> bool {
has_condition(job, "Failed")
}

fn has_condition(job: &Job, condition_type: &str) -> bool {
job.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.is_some_and(|conds| {
conds
.iter()
.any(|c| c.type_ == condition_type && c.status == "True")
})
}

/// Whether a restore Job may be created. Restores are strictly one-shot:
/// any existing Job — running, succeeded, or failed — suppresses creation.
pub fn should_create_restore_job(state: &JobsState) -> bool {
matches!(state, JobsState::NoJobs)
}

/// Whether a one-shot backup Job may be created. A manual trigger
/// (`kafkabackup.com/trigger=now`) requests a fresh run, so it bypasses the
/// "a terminal Job already exists" gate — but never stacks onto an active Job.
pub fn should_create_backup_job(state: &JobsState, manually_triggered: bool) -> bool {
match state {
JobsState::InProgress => false,
JobsState::NoJobs => true,
JobsState::Succeeded { .. } | JobsState::Failed { .. } => manually_triggered,
}
}
1 change: 1 addition & 0 deletions src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod backup_job;
pub mod cronjob;
pub mod job_state;
pub mod restore_job;
pub mod templates;
Loading
Loading