Add enterprise orchestration: dynamic DAGs, Kafka triggers, autoscaling, and SLA monitoring#93
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
…-advanced-features-to-orchestration
|
| GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
|---|---|---|---|---|---|
| 27568531 | Triggered | Username Password | c4025c9 | backend/tests/test_security.py | View secret |
| 27600101 | Triggered | Generic Password | c4025c9 | pipeline/docker-compose.yml | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
c087985
into
codex/fix-remaining-issues-and-raise-pr
| while self._running: | ||
| for message in consumer: | ||
| event = message.value | ||
| await on_trigger(event) | ||
| await asyncio.sleep(0.2) |
There was a problem hiding this comment.
🔴 Blocking synchronous KafkaConsumer used inside async coroutine freezes the event loop
The KafkaExecutionTrigger.consume method is an async coroutine that is launched via asyncio.create_task in pipeline/backend/main.py:143. However, it uses kafka-python's synchronous KafkaConsumer, and the for message in consumer: iteration at line 199 is a blocking call that polls Kafka for up to consumer_timeout_ms (1000ms). Since this runs directly on the asyncio event loop thread without run_in_executor, it blocks the entire event loop for up to 1 second per poll cycle, making the FastAPI application unresponsive to all HTTP requests during that time.
Detailed Explanation
The kafka-python library (kafka-python==2.0.2 per requirements.txt) provides a synchronous KafkaConsumer. When for message in consumer: is called, it internally calls consumer.poll() which is a blocking I/O operation. In the async consume method:
async def consume(self, on_trigger):
consumer = KafkaConsumer(..., consumer_timeout_ms=1000)
while self._running:
for message in consumer: # <-- blocks event loop for up to 1s
await on_trigger(event)
await asyncio.sleep(0.2)This is launched as a background task in main.py:143:
kafka_task = asyncio.create_task(kafka_trigger.consume(...))Every iteration of the for loop blocks the event loop. During this blocking period, no other coroutines can run — all HTTP request handling, heartbeats, and other async operations are stalled.
Impact: When KAFKA_TRIGGER_ENABLED=True, the entire FastAPI application becomes periodically unresponsive, with up to 1-second freezes on every poll cycle. This effectively makes the application unusable in production.
Prompt for agents
In pipeline/backend/core/enterprise_orchestration.py, the KafkaExecutionTrigger.consume method (lines 178-205) uses the synchronous kafka-python KafkaConsumer directly in an async coroutine, which blocks the asyncio event loop. The blocking for message in consumer: iteration at line 199 should be offloaded to a thread using asyncio.get_event_loop().run_in_executor(). Wrap the synchronous consumer polling in a helper function and call it via await loop.run_in_executor(None, sync_poll_func). Alternatively, replace kafka-python with an async Kafka client like aiokafka.
Was this helpful? React with 👍 or 👎 to provide feedback.
| context.current_stage = stage_id | ||
| tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage))) |
There was a problem hiding this comment.
🟡 Shared mutable context.current_stage is overwritten by concurrent parallel tasks
When stages are executed in parallel groups, the loop at lines 330-335 sets context.current_stage = stage_id for each stage before creating the async task. Since all tasks in a group share the same ExecutionContext object, current_stage ends up being the last stage_id in the group by the time tasks start executing.
Root Cause
In _execute_stages, the code creates parallel tasks like this:
for stage_id in group:
stage = next((s for s in context.stages if s.id == stage_id), None)
context.current_stage = stage_id # overwritten each iteration
tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage)))All tasks share the same context object. After the loop, context.current_stage equals the last stage_id in the group. This means:
get_execution_status()(pipeline/backend/core/executor.py:849) reports the wrong current stage during parallel execution._perform_rollback()(pipeline/backend/core/executor.py:807) logs the wrong stage in its rollback message:f"Pipeline was rolled back after failing at stage {context.current_stage}".
Impact: Incorrect observability data and misleading rollback messages when multiple stages execute in parallel.
| context.current_stage = stage_id | |
| tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage))) | |
| tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage))) |
Was this helpful? React with 👍 or 👎 to provide feedback.
Motivation
Description
pipeline/backend/core/enterprise_orchestration.pyimplementingDynamicDAGGenerator,DependencyGraphOptimizer,SLAMonitor,AutoScalingAdvisor, andKafkaExecutionTriggerfor DAG generation, parallel-group optimization, SLA checks, autoscaling recommendations, and Kafka-based triggers.DependencyGraphOptimizerto form parallel execution groups and invokingSLAMonitorafter stage completion; added_stage_to_dictand_handle_sla_alerthelper methods inpipeline/backend/core/executor.py.pipeline/backend/core/distributed_execution.pywith anAutoScalingAdvisorinstance and addedqueue_depth()andautoscaling_recommendation()methods to surface autoscaling signals.pipeline/backend/api/routes/orchestration.pywith endpointsPOST /api/orchestration/dag/generate,POST /api/orchestration/dag/optimize, andPOST /api/orchestration/workers/recommend, and registered the route inmain.py.pipeline/backend/main.pyso the app can consume triggers (metadata-driven or stored pipeline) whenKAFKA_TRIGGER_ENABLEDis enabled.pipeline/backend/config.pyfor Kafka trigger options and orchestration/autoscaling/SLA defaults, and madecore/__init__.pytolerant to optional/missing Redis in minimal test environments.pipeline/backend/tests/test_enterprise_orchestration.pycovering DAG generation, dependency optimization, SLA alerting, and autoscaling recommendation logic.Testing
python -m py_compile pipeline/backend/core/enterprise_orchestration.py pipeline/backend/core/executor.py pipeline/backend/core/distributed_execution.py pipeline/backend/api/routes/orchestration.py pipeline/backend/main.pywhich succeeded.pytest -q pipeline/backend/tests/test_enterprise_orchestration.py(4 tests) which passed, andpytest -q pipeline/backend/tests/test_enterprise_orchestration.py pipeline/backend/tests/test_microservices_architecture.py(combined) which passed (total tests executed in combined run passed); the initial collection error caused by a missing optionalredisdependency was addressed by making the core import resilient and re-running tests.python -m py_compileagainstmain.pyand modified core files) were also executed and succeeded; test runs produced a few non-blocking warnings.Codex Task