Add distributed execution support (Celery & Ray)#88
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
…lement-ray-for-ml-orchestration
|
| GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
|---|---|---|---|---|---|
| 27568531 | Triggered | Username Password | 0141730 | backend/tests/test_security.py | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secret safely. Learn here the best practices.
- Revoke and rotate this secret.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
c04e459
into
codex/fix-remaining-issues-and-raise-pr
| from backend.core.distributed_executor import DistributedExecutionDispatcher | ||
| from backend.core.executor import PipelineExecutor | ||
| from backend.core.orchestration import OrchestrationEngine, OrchestrationRequest, OrchestrationRegistry | ||
| ExecutionResponse, |
There was a problem hiding this comment.
🔴 Broken merge: new import inserted mid-statement creates SyntaxError
The new from backend.core.distributed_executor import DistributedExecutionDispatcher import at line 18 was inserted in the middle of the existing import block, leaving orphaned lines ExecutionResponse,, SuccessResponse,, ) at lines 21-23 as bare expressions outside any from ... import (...) statement. This causes a SyntaxError / IndentationError when Python tries to parse the file, making the entire executions module completely non-functional.
Root Cause
The original code had a single import block:
from backend.api.schemas import (
ExecutionCreate,
...
ExecutionResponse,
SuccessResponse,
)The PR inserted line 18 (from backend.core.distributed_executor import ...) between the orchestration import and what was the continuation of the schemas import, breaking the syntax. Lines 21-23 are now dangling:
ExecutionResponse,
SuccessResponse,
)Impact: The entire executions.py module cannot be imported. All execution API endpoints (POST /api/executions, GET /api/executions, etc.) will fail with an import error at application startup.
(Refers to lines 18-23)
Prompt for agents
In backend/api/routes/executions.py, the imports at lines 10-28 are broken due to a bad merge. The new import on line 18 was inserted in the middle of the original import block. The fix requires consolidating the imports into a single clean block. Lines 10-28 should be replaced with:
from backend.api.schemas import (
ExecutionCreate,
ExecutionDetailResponse,
ExecutionListResponse,
ExecutionResponse,
OrchestrationEngineSchema,
SuccessResponse,
)
from backend.models.pipeline import Execution, ExecutionStatus
from backend.core.distributed_executor import DistributedExecutionDispatcher
from backend.core.executor import PipelineExecutor
from backend.core.orchestration import OrchestrationEngine, OrchestrationRequest, OrchestrationRegistry
from backend.api.security import UserPrincipal, get_current_user
from backend.config import settings
from backend.events import get_event_publisher
Remove the duplicate imports of PipelineExecutor (line 26), Execution/ExecutionStatus (line 28), and SuccessResponse (line 22).
Was this helpful? React with 👍 or 👎 to provide feedback.
| status=ExecutionStatus.PENDING, | ||
| started_at=datetime.now(), | ||
| total_stages=len(pipeline.stages), | ||
| context=merged_context | ||
| context=context or {}, |
There was a problem hiding this comment.
🔴 Broken merge: initialize_execution has two conflicting function signatures
The initialize_execution function has two overlapping, syntactically invalid signatures merged together. The new signature (lines 90-95) and the old signature (lines 96-99) are both present, creating invalid Python syntax.
Root Cause
Lines 90-99 show:
def initialize_execution(
pipeline_id: str,
context: Optional[Dict[str, Any]] = None,
execution_backend: Optional[str] = None,
) -> Execution:
"""Create and store a pending execution record."""
user_id: str,
context: Optional[Dict[str, Any]] = None,
) -> Execution:The old function signature (user_id: str, context: ...) was not removed when the new signature was added. Additionally, the new signature drops the user_id parameter entirely, which means the Execution(... user_id=user_id ...) constructor call at line 111 would reference an undefined variable user_id.
Similarly, lines 115-116 have duplicate context= keyword arguments in the Execution() constructor:
context=merged_context
context=context or {},Impact: This is part of the same SyntaxError that prevents the module from loading. Even if the syntax were fixed, the missing user_id parameter would cause a NameError at runtime, and the duplicate keyword argument would be a SyntaxError.
(Refers to lines 92-116)
Was this helpful? React with 👍 or 👎 to provide feedback.
| execution = initialize_execution( | ||
| execution_data.pipeline_id, | ||
| execution_backend=execution_data.execution_backend | ||
| ) | ||
|
|
||
| # Start execution in background | ||
| background_tasks.add_task( | ||
| execute_pipeline_background, | ||
| execution_data.pipeline_id, | ||
| execution.id, | ||
| execution_data.execution_backend | ||
| ) | ||
|
|
||
| execution = initialize_execution(execution_data.pipeline_id, context=orchestration_context) | ||
|
|
||
| orchestration_engine = ORCHESTRATION_SCHEMA_TO_CORE[execution_data.orchestration.engine] |
There was a problem hiding this comment.
🔴 Broken merge: create_execution calls initialize_execution three times and schedules background task three times
The create_execution endpoint has both the new distributed-execution code (lines 236-247) and the old orchestration code (lines 249-283) present simultaneously, causing initialize_execution to be called three times and background_tasks.add_task to be called up to three times per request.
Root Cause
Lines 235-283 show three sequential calls to initialize_execution:
- Line 236:
execution = initialize_execution(execution_data.pipeline_id, execution_backend=...)— new code - Line 249:
execution = initialize_execution(execution_data.pipeline_id, context=orchestration_context)— old code - Line 281:
execution = initialize_execution(execution_data.pipeline_id, user_id=current_user.user_id)— old code
Each call creates a new execution record in executions_db, so a single API call would create three orphaned execution records. The first two would be immediately abandoned (their IDs lost when execution is reassigned). Background tasks are also scheduled multiple times (lines 242-247, 254-258, 283).
Additionally, the first call at line 236-238 does not pass user_id or context (orchestration context), so the execution record would have no user association and no orchestration metadata. The new code also doesn't pass execution_backend to the old code path's initialize_execution calls.
Impact: Every execution creation request would create 3 execution records (2 orphaned), schedule 2-3 background tasks, and the returned execution would lack the requested_execution_backend context that was set in the first (discarded) execution.
(Refers to lines 236-283)
Prompt for agents
In backend/api/routes/executions.py, the create_execution function (lines 220-285) has duplicate code from a bad merge. The new distributed execution code (lines 236-247) and the old orchestration code (lines 249-283) need to be unified into a single flow. The function should:
1. Call initialize_execution exactly once, passing pipeline_id, user_id=current_user.user_id, context=orchestration_context, and execution_backend=execution_data.execution_backend.
2. Schedule execute_pipeline_background exactly once (for local orchestration), passing execution_data.execution_backend.
3. For non-local orchestration engines, dispatch via OrchestrationRegistry as before.
4. Return the single execution record.
Remove the duplicate initialize_execution calls at lines 236-238, 249, and 281. Remove the duplicate background_tasks.add_task calls at lines 242-247 and 283. Keep only one unified flow.
Was this helpful? React with 👍 or 👎 to provide feedback.
| payload = pipeline.model_dump(mode="json") | ||
| async_result = app.send_task(task_name, kwargs={"pipeline": payload}) | ||
| remote_output = async_result.get(timeout=600) | ||
| execution = Execution.model_validate(remote_output) |
There was a problem hiding this comment.
🔴 model_dump() and model_validate() called on plain dataclasses that lack these methods
The DistributedExecutionDispatcher calls pipeline.model_dump(mode="json") and Execution.model_validate(remote_output) in both the Celery and Ray code paths, but Pipeline and Execution are plain Python @dataclass classes (defined in backend/models/pipeline.py), not Pydantic BaseModel subclasses. These methods don't exist on dataclasses.
Root Cause
At backend/models/pipeline.py:50-51, Pipeline is defined as @dataclass class Pipeline, and at line 85-86, Execution is @dataclass class Execution. Neither inherits from pydantic.BaseModel.
In backend/core/distributed_executor.py:
- Line 61:
pipeline.model_dump(mode="json")→AttributeError: 'Pipeline' object has no attribute 'model_dump' - Line 64:
Execution.model_validate(remote_output)→AttributeError: type object 'Execution' has no attribute 'model_validate' - Lines 93, 95, 97, 100: Same issue in the Ray path.
Because these calls are inside try/except Exception blocks, the AttributeError will be caught and the code will silently fall back to local execution. This means Celery and Ray backends can never actually work — every attempt will fail with an AttributeError and fall back to local, with the fallback reason being the misleading attribute error rather than an actual connectivity issue.
Impact: The distributed execution feature (Celery/Ray) is completely non-functional. All requests will silently fall back to local execution regardless of backend availability.
Prompt for agents
In backend/core/distributed_executor.py, the Pipeline and Execution classes (from backend/models/pipeline.py) are plain @dataclass classes, not Pydantic BaseModel subclasses. They don't have model_dump() or model_validate() methods.
Fix all occurrences in _execute_with_celery (lines 61, 64) and _execute_with_ray (lines 93, 95, 97, 100):
For serialization, replace `pipeline.model_dump(mode="json")` with something like `dataclasses.asdict(pipeline)` (import dataclasses at the top).
For deserialization, replace `Execution.model_validate(remote_output)` with `Execution(**remote_output)` or a custom deserialization function that handles nested types (LogEntry, datetime, etc.).
Similarly for Pipeline.model_validate in the Ray remote function at line 93.
Note: You may also need to handle datetime serialization/deserialization since dataclasses.asdict won't convert datetimes to JSON-safe strings automatically.
Was this helpful? React with 👍 or 👎 to provide feedback.
| execution = initialize_execution( | ||
| execution_data.pipeline_id, | ||
| execution_backend=execution_data.execution_backend | ||
| ) |
There was a problem hiding this comment.
🔴 create_execution in new code path doesn't pass user_id, creating execution without owner
The new initialize_execution call at line 236-238 does not pass user_id, meaning the execution record will have no user association. This breaks the user-scoped access control used throughout the API.
Root Cause
At line 236-238:
execution = initialize_execution(
execution_data.pipeline_id,
execution_backend=execution_data.execution_backend
)The user_id parameter (from current_user.user_id) is not passed. Looking at the initialize_execution function, the new signature at lines 90-94 doesn't even have a user_id parameter. The Execution constructor at line 111 references user_id which would be undefined in the new signature's scope.
All query endpoints (list_executions at line 301, get_execution at line 324, etc.) filter by e.user_id == current_user.user_id, so executions created without a proper user_id would be invisible to the user who created them.
Impact: Executions created via the new code path would have no owner and would be inaccessible through all GET endpoints.
Was this helpful? React with 👍 or 👎 to provide feedback.
Motivation
Description
backend/config.pyforDISTRIBUTED_EXECUTION_BACKEND, Celery (CELERY_BROKER_URL,CELERY_RESULT_BACKEND,CELERY_EXECUTION_TASK) and Ray (RAY_ADDRESS,RAY_NAMESPACE).execution_backendto the API schema inbackend/api/schemas.pyso executions can requestlocal,celery, orraybackends.backend/core/distributed_executor.pywhich dispatches execution to Celery or Ray when available and falls back to local execution with fallback metadata recorded in the executioncontextwhen distributed runtimes are unavailable.backend/api/routes/executions.pyto use the new dispatcher, persistrequested_execution_backendin the execution record, and storedistributed_execution.backend_usedafter the run, and added docs tobackend/README.mdfor usage and env vars.backend/tests/test_distributed_execution.pyto validate backend fallback behavior and that requested/used backend metadata is tracked.Testing
PYTHONPATHinitially failed due to import errors when collecting tests (ModuleNotFoundError), which was an environment invocation issue rather than code correctness.PYTHONPATH=.usingPYTHONPATH=. pytest backend/tests/test_distributed_execution.py backend/tests/test_event_streaming.py -q, and all tests passed (4 passed, 6 warnings).Codex Task