Skip to content

Add enterprise orchestration: dynamic DAGs, Kafka triggers, autoscaling, and SLA monitoring#93

Merged
fuzziecoder merged 2 commits intocodex/fix-remaining-issues-and-raise-prfrom
codex/add-advanced-features-to-orchestration
Feb 25, 2026
Merged

Add enterprise orchestration: dynamic DAGs, Kafka triggers, autoscaling, and SLA monitoring#93
fuzziecoder merged 2 commits intocodex/fix-remaining-issues-and-raise-prfrom
codex/add-advanced-features-to-orchestration

Conversation

@fuzziecoder
Copy link
Copy Markdown
Owner

@fuzziecoder fuzziecoder commented Feb 25, 2026

Motivation

  • Provide enterprise orchestration capabilities: generate pipelines from metadata, trigger runs from events, scale workers by queue pressure, monitor SLAs, and optimize stage parallelism with heuristic/AI-inspired logic.
  • Expose these capabilities through API endpoints and integrate them into the existing execution and dispatch pipelines for production-ready orchestration.

Description

  • Added a new orchestration core module pipeline/backend/core/enterprise_orchestration.py implementing DynamicDAGGenerator, DependencyGraphOptimizer, SLAMonitor, AutoScalingAdvisor, and KafkaExecutionTrigger for DAG generation, parallel-group optimization, SLA checks, autoscaling recommendations, and Kafka-based triggers.
  • Integrated orchestration into execution flow by having the executor use DependencyGraphOptimizer to form parallel execution groups and invoking SLAMonitor after stage completion; added _stage_to_dict and _handle_sla_alert helper methods in pipeline/backend/core/executor.py.
  • Extended the distributed dispatcher pipeline/backend/core/distributed_execution.py with an AutoScalingAdvisor instance and added queue_depth() and autoscaling_recommendation() methods to surface autoscaling signals.
  • Added API surface pipeline/backend/api/routes/orchestration.py with endpoints POST /api/orchestration/dag/generate, POST /api/orchestration/dag/optimize, and POST /api/orchestration/workers/recommend, and registered the route in main.py.
  • Wired a guarded Kafka trigger listener into application lifecycle in pipeline/backend/main.py so the app can consume triggers (metadata-driven or stored pipeline) when KAFKA_TRIGGER_ENABLED is enabled.
  • Added configuration knobs in pipeline/backend/config.py for Kafka trigger options and orchestration/autoscaling/SLA defaults, and made core/__init__.py tolerant to optional/missing Redis in minimal test environments.
  • Added unit tests pipeline/backend/tests/test_enterprise_orchestration.py covering DAG generation, dependency optimization, SLA alerting, and autoscaling recommendation logic.

Testing

  • Compiled impacted modules with python -m py_compile pipeline/backend/core/enterprise_orchestration.py pipeline/backend/core/executor.py pipeline/backend/core/distributed_execution.py pipeline/backend/api/routes/orchestration.py pipeline/backend/main.py which succeeded.
  • Ran unit tests: pytest -q pipeline/backend/tests/test_enterprise_orchestration.py (4 tests) which passed, and pytest -q pipeline/backend/tests/test_enterprise_orchestration.py pipeline/backend/tests/test_microservices_architecture.py (combined) which passed (total tests executed in combined run passed); the initial collection error caused by a missing optional redis dependency was addressed by making the core import resilient and re-running tests.
  • Small linter/compile checks (python -m py_compile against main.py and modified core files) were also executed and succeeded; test runs produced a few non-blocking warnings.

Codex Task


Open with Devin

@vercel
Copy link
Copy Markdown

vercel bot commented Feb 25, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
flexi-roaster Ready Ready Preview, Comment Feb 25, 2026 1:27pm

@gitguardian
Copy link
Copy Markdown

gitguardian bot commented Feb 25, 2026

⚠️ GitGuardian has uncovered 2 secrets following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secrets in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
27568531 Triggered Username Password c4025c9 backend/tests/test_security.py View secret
27600101 Triggered Generic Password c4025c9 pipeline/docker-compose.yml View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secrets safely. Learn here the best practices.
  3. Revoke and rotate these secrets.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

@fuzziecoder fuzziecoder merged commit c087985 into codex/fix-remaining-issues-and-raise-pr Feb 25, 2026
3 of 6 checks passed
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 potential issues.

View 7 additional findings in Devin Review.

Open in Devin Review

Comment on lines +198 to +202
while self._running:
for message in consumer:
event = message.value
await on_trigger(event)
await asyncio.sleep(0.2)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Blocking synchronous KafkaConsumer used inside async coroutine freezes the event loop

The KafkaExecutionTrigger.consume method is an async coroutine that is launched via asyncio.create_task in pipeline/backend/main.py:143. However, it uses kafka-python's synchronous KafkaConsumer, and the for message in consumer: iteration at line 199 is a blocking call that polls Kafka for up to consumer_timeout_ms (1000ms). Since this runs directly on the asyncio event loop thread without run_in_executor, it blocks the entire event loop for up to 1 second per poll cycle, making the FastAPI application unresponsive to all HTTP requests during that time.

Detailed Explanation

The kafka-python library (kafka-python==2.0.2 per requirements.txt) provides a synchronous KafkaConsumer. When for message in consumer: is called, it internally calls consumer.poll() which is a blocking I/O operation. In the async consume method:

async def consume(self, on_trigger):
    consumer = KafkaConsumer(..., consumer_timeout_ms=1000)
    while self._running:
        for message in consumer:       # <-- blocks event loop for up to 1s
            await on_trigger(event)
        await asyncio.sleep(0.2)

This is launched as a background task in main.py:143:

kafka_task = asyncio.create_task(kafka_trigger.consume(...))

Every iteration of the for loop blocks the event loop. During this blocking period, no other coroutines can run — all HTTP request handling, heartbeats, and other async operations are stalled.

Impact: When KAFKA_TRIGGER_ENABLED=True, the entire FastAPI application becomes periodically unresponsive, with up to 1-second freezes on every poll cycle. This effectively makes the application unusable in production.

Prompt for agents
In pipeline/backend/core/enterprise_orchestration.py, the KafkaExecutionTrigger.consume method (lines 178-205) uses the synchronous kafka-python KafkaConsumer directly in an async coroutine, which blocks the asyncio event loop. The blocking for message in consumer: iteration at line 199 should be offloaded to a thread using asyncio.get_event_loop().run_in_executor(). Wrap the synchronous consumer polling in a helper function and call it via await loop.run_in_executor(None, sync_poll_func). Alternatively, replace kafka-python with an async Kafka client like aiokafka.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +334 to +335
context.current_stage = stage_id
tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage)))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Shared mutable context.current_stage is overwritten by concurrent parallel tasks

When stages are executed in parallel groups, the loop at lines 330-335 sets context.current_stage = stage_id for each stage before creating the async task. Since all tasks in a group share the same ExecutionContext object, current_stage ends up being the last stage_id in the group by the time tasks start executing.

Root Cause

In _execute_stages, the code creates parallel tasks like this:

for stage_id in group:
    stage = next((s for s in context.stages if s.id == stage_id), None)
    context.current_stage = stage_id   # overwritten each iteration
    tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage)))

All tasks share the same context object. After the loop, context.current_stage equals the last stage_id in the group. This means:

  1. get_execution_status() (pipeline/backend/core/executor.py:849) reports the wrong current stage during parallel execution.
  2. _perform_rollback() (pipeline/backend/core/executor.py:807) logs the wrong stage in its rollback message: f"Pipeline was rolled back after failing at stage {context.current_stage}".

Impact: Incorrect observability data and misleading rollback messages when multiple stages execute in parallel.

Suggested change
context.current_stage = stage_id
tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage)))
tasks.append(asyncio.create_task(self._execute_stage_with_retry(context, stage)))
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant