Skip to content

Add Spark/Dask execution backends and DB alternatives configuration#96

Merged
fuzziecoder merged 2 commits intocodex/fix-remaining-issues-and-raise-prfrom
codex/implement-distributed-compute-and-database-alternatives
Feb 25, 2026
Merged

Add Spark/Dask execution backends and DB alternatives configuration#96
fuzziecoder merged 2 commits intocodex/fix-remaining-issues-and-raise-prfrom
codex/implement-distributed-compute-and-database-alternatives

Conversation

@fuzziecoder
Copy link
Copy Markdown
Owner

@fuzziecoder fuzziecoder commented Feb 25, 2026

Motivation

  • Provide additional distributed compute options (Apache Spark and Dask) as alternatives to Ray/Celery to support ETL/batch and Python-native parallel workloads.
  • Expose database backend alternatives (CockroachDB, MongoDB, Cassandra) to allow deployments to choose a persistence engine beyond the current default.

Description

  • Extended the dispatcher to support spark and dask backends with runtime probing and graceful fallback to local execution while recording distributed_execution metadata in the execution context (backend/core/distributed_executor.py).
  • Added Spark/Dask runtime settings and new database-selection settings plus Cassandra contact-point parsing to the configuration (backend/config.py) and updated backend/.env.example with example variables.
  • Updated the API schema text for ExecutionCreate.execution_backend to list the new options (backend/api/schemas.py) and updated backend/README.md to document compute and database alternatives with example env snippets.
  • Enriched the modern orchestration stack metadata to advertise compute alternatives and database alternatives and added a Spark fallback command path (pipeline/backend/core/modern_stack.py).
  • Added/adjusted tests to validate unknown-backend behavior and Spark/Dask fallback semantics, and updated the modern stack tests to assert advertised alternatives (backend/tests/test_distributed_execution.py, pipeline/backend/tests/test_modern_stack.py).

Testing

  • Ran pytest pipeline/backend/tests/test_modern_stack.py and both tests passed.
  • Ran python -m py_compile backend/core/distributed_executor.py backend/config.py backend/api/schemas.py pipeline/backend/core/modern_stack.py and byte-compilation succeeded.
  • Attempted PYTHONPATH=/workspace/Flexi-Roaster pytest backend/tests/test_distributed_execution.py and collection failed due to a pre-existing IndentationError in backend/api/routes/executions.py that is unrelated to these changes (prevents running the backend test file end-to-end).

Codex Task


Open with Devin

@vercel
Copy link
Copy Markdown

vercel bot commented Feb 25, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
flexi-roaster Ready Ready Preview, Comment Feb 25, 2026 1:38pm

…lement-distributed-compute-and-database-alternatives
@fuzziecoder fuzziecoder merged commit 883e82a into codex/fix-remaining-issues-and-raise-pr Feb 25, 2026
4 of 6 checks passed
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Feb 25, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch codex/implement-distributed-compute-and-database-alternatives

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

Comment on lines +195 to 214
"distributed_compute": StackComponent(
name="ray",
enabled=settings.RAY_ENABLED,
config={
"dashboard_url": settings.RAY_DASHBOARD_URL,
"entrypoint": settings.RAY_JOB_ENTRYPOINT,
"alternatives": ["spark", "dask"],
},
).__dict__,
"storage": {
"database": "postgresql",
"database_alternatives": ["cockroachdb", "mongodb", "cassandra"],
"object_storage": {
"enabled": settings.OBJECT_STORAGE_ENABLED,
"bucket": settings.OBJECT_STORAGE_BUCKET,
"endpoint": settings.OBJECT_STORAGE_ENDPOINT,
},
},
"distributed_compute": self._distributed_compute(),
"storage": self._storage(),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Duplicate dictionary keys in architecture() cause new entries to be silently overwritten

The architecture() method defines "distributed_compute" and "storage" keys twice in the same dict literal. In Python, when a dictionary literal has duplicate keys, the last value wins silently.

Root Cause and Impact

The new code at lines 195-212 adds hardcoded entries:

"distributed_compute": StackComponent(
    name="ray", ...,
    config={..., "alternatives": ["spark", "dask"]},
).__dict__,
"storage": {
    "database": "postgresql",
    "database_alternatives": ["cockroachdb", "mongodb", "cassandra"],
    ...
},

But then lines 213-214 define the same keys again:

"distributed_compute": self._distributed_compute(),
"storage": self._storage(),

Since the last value wins, the hardcoded entries (with alternatives and database_alternatives) are completely dead code. The returned dict will never contain config.alternatives or database_alternatives.

Impact: The new test assertions at pipeline/backend/tests/test_modern_stack.py:24-26 will fail at runtime:

  • architecture["distributed_compute"]["config"]["alternatives"] → KeyError
  • architecture["storage"]["database_alternatives"] → KeyError

More broadly, the advertised compute alternatives and database alternatives metadata that this PR intends to expose will never appear in the architecture output.

Prompt for agents
In pipeline/backend/core/modern_stack.py, the architecture() method (lines 167-224) has duplicate dictionary keys "distributed_compute" (lines 195 and 213) and "storage" (lines 204 and 214). The second occurrence silently overwrites the first. You need to either:

1. Remove the first (hardcoded) entries at lines 195-212 and instead merge the "alternatives" and "database_alternatives" metadata into the dynamic _distributed_compute() and _storage() results, OR
2. Remove the second (dynamic) entries at lines 213-214 and keep only the hardcoded ones (but this would lose the dynamic configuration behavior).

The recommended approach is option 1: remove lines 195-212 entirely, and modify _distributed_compute() to include an "alternatives" key in its config, and modify _storage() to include a "database_alternatives" key in its returned dict. This preserves both the dynamic behavior and the new metadata the PR intends to add.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +250 to +276
if settings.RAY_ENABLED:
commands.append(
{
"layer": "distributed_compute",
"engine": "ray",
"action": "submit_ray_job",
"dashboard": settings.RAY_DASHBOARD_URL,
}
)
else:
commands.append(
{
"layer": "distributed_compute",
"engine": "spark",
"action": "submit_spark_job",
}
)

if settings.KAFKA_ENABLED:
commands.append(
{
"layer": "event_layer",
"engine": "kafka",
"action": "publish_event",
"topic": settings.KAFKA_EXECUTION_TOPIC,
}
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Duplicate distributed_compute and event_layer commands appended in submit_execution()

The submit_execution() method appends hardcoded compute and event commands (lines 250-276) and then the existing dynamic logic (lines 277-308) appends the same types of commands again, resulting in duplicate entries in the commands list.

Root Cause and Impact

The new code at lines 250-266 unconditionally appends a distributed_compute command (either ray or spark based on RAY_ENABLED). Then the existing code at lines 277-290 also appends a distributed_compute command from self._distributed_compute() if it's enabled.

Similarly, lines 268-276 append a kafka event_layer command if KAFKA_ENABLED, and lines 292-308 append an event_layer command from self._event_layer() if it's enabled.

With default settings (RAY_ENABLED=True, KAFKA_ENABLED=False, DISTRIBUTED_COMPUTE_BACKEND=ray), the commands list will contain two distributed_compute commands — one hardcoded ray command and one dynamic ray command from _distributed_compute(). This produces a malformed execution envelope with duplicate instructions.

Impact: Downstream consumers processing the commands list will see duplicate distributed_compute (and potentially event_layer) commands, which could cause double-submission of jobs or confusing metadata.

Prompt for agents
In pipeline/backend/core/modern_stack.py, the submit_execution() method has duplicate command-appending logic. Lines 250-276 add hardcoded distributed_compute and event_layer commands, and then lines 277-308 add dynamic versions of the same commands. Remove the hardcoded block at lines 250-276 (the if/else for RAY_ENABLED and the if for KAFKA_ENABLED) since the existing dynamic logic at lines 277-308 already handles these cases correctly based on the configured backends.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +136 to +145
spark.stop()

execution = self.executor.execute(pipeline)
execution.context.setdefault("distributed_execution", {})
execution.context["distributed_execution"].update(
{
"requested_backend": "spark",
"execution_mode": "spark-driver",
}
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Spark session is stopped before pipeline execution, making Spark backend useless

In _execute_with_spark, the SparkSession is created and immediately stopped (spark.stop()) before the pipeline is actually executed via self.executor.execute(pipeline). The pipeline never runs on Spark.

Root Cause and Impact

At backend/core/distributed_executor.py:129-136:

spark = (
    SparkSession.builder
    .master(settings.SPARK_MASTER_URL)
    .appName(settings.SPARK_APP_NAME)
    .getOrCreate()
)
logger.info("Spark backend initialized for pipeline %s", pipeline.id)
spark.stop()

The Spark session is stopped on line 136, and then the pipeline is executed locally via self.executor.execute(pipeline) on line 138. The Spark session is only used as a connectivity probe — the actual execution always runs locally. Yet the method reports backend_used="spark" and execution_mode="spark-driver", which is misleading.

The same pattern exists for Dask at lines 164-167: a Dask client is created and immediately closed before local execution.

Impact: When a user requests Spark or Dask execution, the pipeline always runs locally but the metadata falsely claims it ran on Spark/Dask. This is functionally equivalent to local execution with misleading metadata.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant