Skip to content

Scheduler beat distributed#5

Closed
YouxinChu wants to merge 32 commits intomainfrom
scheduler_beat_distributed
Closed

Scheduler beat distributed#5
YouxinChu wants to merge 32 commits intomainfrom
scheduler_beat_distributed

Conversation

@YouxinChu
Copy link
Copy Markdown

@YouxinChu YouxinChu commented Sep 22, 2025

Added distributed locking support for RedBeat scheduler backend

Copilot AI review requested due to automatic review settings September 22, 2025 03:34
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a RedBeat scheduler backend for celery-rs, providing Redis-based distributed scheduling capabilities equivalent to Python's redbeat.RedBeatScheduler. This enables multiple beat instances to run simultaneously without duplicate task execution through distributed locking.

  • Adds RedBeat distributed locking functionality with Redis-based coordination
  • Implements scheduler backend with automatic failover and fault tolerance
  • Provides example applications demonstrating RedBeat usage

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/error.rs Adds RedisError variant for Redis-related error handling
src/beat/scheduler.rs Implements RedBeatLock trait and distributed locking in scheduler
src/beat/redbeat.rs Core RedBeat implementation with Redis client and distributed locking
src/beat/mod.rs Updates module exports and integrates RedBeat backend selection
src/beat/backend.rs Adds RedBeat lock support to SchedulerBackend trait
examples/redbeat_distributed_lock.rs Example showing distributed locking usage
examples/redbeat_app.rs Full RedBeat scheduler application example
examples/celery_app.rs Switches broker configuration from AMQP to Redis
REDBEAT.md Comprehensive documentation for RedBeat implementation

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.


// Send the task
let result = {
let message = scheduled_task.message_factory.try_create_message()?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When try_create_message() or broker.send() fail, the ? operator returns early before the lock-release block runs. The RedBeat lock then remains held until its TTL expires (30 s), so the task is
suppressed on every worker during that window.

- Updated DeltaSchedule to schedule tasks for future execution on first run.
- Refactored Scheduler to support RedBeat distributed scheduling, removing unnecessary locking logic.
- Introduced DistributedSchedulerBackend for managing distributed task scheduling using Redis.
- Added heartbeat mechanism for leader election and lock renewal in distributed environment.
- Created example applications for using RedBeat with Celery tasks.
- Enhanced logging for better visibility of scheduler state and actions.
…ling

- Removed cron_expression from ScheduledTask struct and related methods.
- Introduced TaskExecutionCallback type for asynchronous task execution callbacks.
- Added set_task_execution_callback method to Scheduler for setting task execution callbacks.
- Updated tick method in Scheduler to notify about task execution status.
- Added RedBeatConfig struct for configuration management of RedBeat scheduler backend.
- Implemented builder pattern for RedBeatConfig with validation and resolution of configuration values.
- Added tests for RedBeatConfig to ensure correct behavior of builder pattern and configuration resolution.
@longzhi
Copy link
Copy Markdown

longzhi commented Oct 1, 2025

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting

Comment on lines 93 to 99
if next_task_time <= now {
let mut scheduled_task = self
.heap
.pop()
.expect("No scheduled tasks found even though there should be");
let result = self.send_scheduled_task(&mut scheduled_task).await;

// Reschedule the task before checking if the task execution was successful.
// TODO: we may have more fine-grained logic here and reschedule the task
// only after examining the type of error.
let mut scheduled_task = self.heap.pop().unwrap();
self.send_scheduled_task(&mut scheduled_task).await?;

if let Some(rescheduled_task) = scheduled_task.reschedule_task() {
self.heap.push(rescheduled_task);
} else {
debug!("A task is not scheduled to run anymore and will be dropped");
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reschedule tasks even when broker send fails

In tick_local the call to send_scheduled_task now returns early on error (self.send_scheduled_task(...).await?) before the task is rescheduled. In the previous implementation the task was rescheduled before propagating the error. With the new flow any transient broker failure will drop the task from the heap permanently, so it will never execute once the broker becomes available again. Consider rescheduling before returning the error to preserve the retry behaviour.

Useful? React with 👍 / 👎.

Comment on lines +681 to +688
// FOLLOWER: Clear local tasks to prevent execution
if !scheduled_tasks.is_empty() {
log::debug!(
"👥 FOLLOWER sync: clearing {} local tasks",
scheduled_tasks.len()
);
scheduled_tasks.clear();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clearing follower heap loses schedule after leadership change

When RedBeatSchedulerBackend::sync runs in follower mode it clears the entire local heap to avoid executing tasks. Nothing repopulates those tasks when the instance later acquires the lock, so after a failover the new leader has an empty schedule and never executes any jobs. Unless tasks are re-registered on every leadership change or reloaded from Redis, this effectively disables scheduling after the first follower sync.

Useful? React with 👍 / 👎.

@longzhi longzhi closed this Oct 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants