Conversation
…ction handling in RedBeat scheduler
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…s and improving formatting
…lery-rs into scheduler_beat_distributed
There was a problem hiding this comment.
Pull Request Overview
This PR implements a RedBeat scheduler backend for celery-rs, providing Redis-based distributed scheduling capabilities equivalent to Python's redbeat.RedBeatScheduler. This enables multiple beat instances to run simultaneously without duplicate task execution through distributed locking.
- Adds RedBeat distributed locking functionality with Redis-based coordination
- Implements scheduler backend with automatic failover and fault tolerance
- Provides example applications demonstrating RedBeat usage
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/error.rs | Adds RedisError variant for Redis-related error handling |
| src/beat/scheduler.rs | Implements RedBeatLock trait and distributed locking in scheduler |
| src/beat/redbeat.rs | Core RedBeat implementation with Redis client and distributed locking |
| src/beat/mod.rs | Updates module exports and integrates RedBeat backend selection |
| src/beat/backend.rs | Adds RedBeat lock support to SchedulerBackend trait |
| examples/redbeat_distributed_lock.rs | Example showing distributed locking usage |
| examples/redbeat_app.rs | Full RedBeat scheduler application example |
| examples/celery_app.rs | Switches broker configuration from AMQP to Redis |
| REDBEAT.md | Comprehensive documentation for RedBeat implementation |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
…e; fix compilation errors and formatting
src/beat/scheduler.rs
Outdated
|
|
||
| // Send the task | ||
| let result = { | ||
| let message = scheduled_task.message_factory.try_create_message()?; |
There was a problem hiding this comment.
When try_create_message() or broker.send() fail, the ? operator returns early before the lock-release block runs. The RedBeat lock then remains held until its TTL expires (30 s), so the task is
suppressed on every worker during that window.
- Updated DeltaSchedule to schedule tasks for future execution on first run. - Refactored Scheduler to support RedBeat distributed scheduling, removing unnecessary locking logic. - Introduced DistributedSchedulerBackend for managing distributed task scheduling using Redis. - Added heartbeat mechanism for leader election and lock renewal in distributed environment. - Created example applications for using RedBeat with Celery tasks. - Enhanced logging for better visibility of scheduler state and actions.
…cheduling methods in README
…ling - Removed cron_expression from ScheduledTask struct and related methods. - Introduced TaskExecutionCallback type for asynchronous task execution callbacks. - Added set_task_execution_callback method to Scheduler for setting task execution callbacks. - Updated tick method in Scheduler to notify about task execution status. - Added RedBeatConfig struct for configuration management of RedBeat scheduler backend. - Implemented builder pattern for RedBeatConfig with validation and resolution of configuration values. - Added tests for RedBeatConfig to ensure correct behavior of builder pattern and configuration resolution.
…Beat configuration
… and scheduler modules
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting
| if next_task_time <= now { | ||
| let mut scheduled_task = self | ||
| .heap | ||
| .pop() | ||
| .expect("No scheduled tasks found even though there should be"); | ||
| let result = self.send_scheduled_task(&mut scheduled_task).await; | ||
|
|
||
| // Reschedule the task before checking if the task execution was successful. | ||
| // TODO: we may have more fine-grained logic here and reschedule the task | ||
| // only after examining the type of error. | ||
| let mut scheduled_task = self.heap.pop().unwrap(); | ||
| self.send_scheduled_task(&mut scheduled_task).await?; | ||
|
|
||
| if let Some(rescheduled_task) = scheduled_task.reschedule_task() { | ||
| self.heap.push(rescheduled_task); | ||
| } else { | ||
| debug!("A task is not scheduled to run anymore and will be dropped"); | ||
| } |
There was a problem hiding this comment.
Reschedule tasks even when broker send fails
In tick_local the call to send_scheduled_task now returns early on error (self.send_scheduled_task(...).await?) before the task is rescheduled. In the previous implementation the task was rescheduled before propagating the error. With the new flow any transient broker failure will drop the task from the heap permanently, so it will never execute once the broker becomes available again. Consider rescheduling before returning the error to preserve the retry behaviour.
Useful? React with 👍 / 👎.
| // FOLLOWER: Clear local tasks to prevent execution | ||
| if !scheduled_tasks.is_empty() { | ||
| log::debug!( | ||
| "👥 FOLLOWER sync: clearing {} local tasks", | ||
| scheduled_tasks.len() | ||
| ); | ||
| scheduled_tasks.clear(); | ||
| } |
There was a problem hiding this comment.
Clearing follower heap loses schedule after leadership change
When RedBeatSchedulerBackend::sync runs in follower mode it clears the entire local heap to avoid executing tasks. Nothing repopulates those tasks when the instance later acquires the lock, so after a failover the new leader has an empty schedule and never executes any jobs. Unless tasks are re-registered on every leadership change or reloaded from Redis, this effectively disables scheduling after the first follower sync.
Useful? React with 👍 / 👎.
Added distributed locking support for RedBeat scheduler backend