Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added backend/__pycache__/main.cpython-313.pyc
Binary file not shown.
Binary file not shown.
Binary file added backend/api/routes/__pycache__/ai.cpython-311.pyc
Binary file not shown.
Binary file added backend/api/routes/__pycache__/ai.cpython-313.pyc
Binary file not shown.
Binary file added backend/core/__pycache__/executor.cpython-313.pyc
Binary file not shown.
42 changes: 42 additions & 0 deletions backend/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
"""
import traceback
from datetime import datetime
import time
from typing import Dict, Any, Optional

from backend.models.pipeline import Pipeline, Stage, Execution, ExecutionStatus, LogLevel, StageType
from backend.core.pipeline_engine import PipelineEngine
from backend.monitoring.metrics import (
pipeline_executions_total,
pipeline_failures_total,
pipeline_execution_duration_seconds,
stage_execution_duration_seconds,
pipeline_active_executions
)


class PipelineExecutor:
Expand Down Expand Up @@ -36,6 +44,10 @@ def execute(self, pipeline: Pipeline) -> Execution:
execution.status = ExecutionStatus.RUNNING
execution.add_log(None, LogLevel.INFO, f"Starting pipeline execution: {pipeline.name}")

# Track active pipeline start
pipeline_active_executions.labels(pipeline_id=pipeline.id).inc()
start_time = time.time()
Comment on lines +47 to +49
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

In Python, what clock is recommended for measuring elapsed durations: time.time()ortime.perf_counter()/time.monotonic()?

💡 Result:

For measuring elapsed durations, prefer a monotonic clock:

  • Use time.perf_counter() (or time.perf_counter_ns()) for timing/benchmarking elapsed time, because it’s monotonic and uses the highest available resolution clock. [1], [2]
  • Use time.monotonic() (or time.monotonic_ns()) when you specifically want a monotonic elapsed-time clock (also immune to system clock changes), typically with slightly less “benchmark-focused” intent than perf_counter(). [1], [3]
  • Avoid time.time() for durations because it’s “wall clock” time and can jump forwards/backwards if the system time is adjusted (NTP, manual changes), which can break elapsed-time measurements. [1], [3]

Rule of thumb: durations → perf_counter() (most common); timestamps/“seconds since epoch” → time.time(). [1], [3]

Sources:
[1] PEP 418 (monotonic/perf_counter rationale and use) (peps.python.org)
[2] Python time module docs (perf_counter) (docs.python.org)
[3] Python time module docs (clock semantics; monotonic vs realtime) (docs.python.org)

Citations:


🏁 Script executed:

find . -name "executor.py" -path "*/core/*" | head -5

Repository: fuzziecoder/Flexi-Roaster

Length of output: 131


🏁 Script executed:

head -160 backend/core/executor.py | cat -n

Repository: fuzziecoder/Flexi-Roaster

Length of output: 7403


🏁 Script executed:

tail -n +160 backend/core/executor.py | grep -n "time\." | head -20

Repository: fuzziecoder/Flexi-Roaster

Length of output: 51


Use a monotonic clock for elapsed-time metrics.

time.time() tracks wall time. NTP or manual clock changes can move it backward or forward and skew these histogram observations. time.perf_counter() is the recommended approach for measuring elapsed durations.

⏱️ Proposed refactor
-        start_time = time.time()
+        start_time = time.perf_counter()
...
-            duration = time.time() - start_time
+            duration = time.perf_counter() - start_time
...
-            duration = time.time() - start_time
+            duration = time.perf_counter() - start_time
...
-        stage_start_time = time.time()
+        stage_start_time = time.perf_counter()
...
-            stage_duration = time.time() - stage_start_time
+            stage_duration = time.perf_counter() - stage_start_time
...
-            stage_duration = time.time() - stage_start_time
+            stage_duration = time.perf_counter() - stage_start_time

Also applies to: 49, 76, 94, 111, 137, 153

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/core/executor.py` around lines 47 - 49, Replace uses of the
wall-clock timer time.time() with a monotonic high-resolution timer
time.perf_counter() for all elapsed-time measurements in
backend/core/executor.py: change the start_time assignments (e.g. the start_time
set next to pipeline_active_executions.labels(pipeline_id=pipeline.id).inc())
and any subsequent elapsed = time.time() - start_time calculations to use
time.perf_counter() instead, and ensure imports/reference use the perf_counter
symbol; apply the same replacement at the other occurrences noted (the other
start_time/elapsed sites around lines 49, 76, 94, 111, 137, 153) so
histogram.observe calls use perf_counter-based durations.


try:
# Get execution order (topological sort)
execution_order = self.engine.get_execution_order(pipeline)
Expand All @@ -60,6 +72,12 @@ def execute(self, pipeline: Pipeline) -> Execution:
f"Pipeline completed successfully in {execution.duration:.2f}s"
)

# Record success metrics
duration = time.time() - start_time
pipeline_executions_total.labels(pipeline_id=pipeline.id, status='success').inc()
pipeline_execution_duration_seconds.labels(pipeline_id=pipeline.id).observe(duration)
pipeline_active_executions.labels(pipeline_id=pipeline.id).dec()
Comment on lines +75 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Success-path metrics recording inside try block can overwrite COMPLETED status to FAILED

The Prometheus metrics recording for the success path (lines 76-79) is placed inside the try block, after the execution status is already set to ExecutionStatus.COMPLETED at line 67. If any of the metrics calls (pipeline_executions_total.labels(...).inc(), .observe(), or .dec()) raise an exception, the except Exception block at line 81 will catch it and overwrite execution.status to ExecutionStatus.FAILED at line 83 — even though all pipeline stages completed successfully. The returned Execution object would report a failed status, a misleading error message from the Prometheus client, and failure metrics would be emitted to Prometheus instead of success metrics. The metrics recording should be moved after the try/except, or into a separate try block, so it cannot interfere with the execution status determination.

Prompt for agents
In backend/core/executor.py, the metrics recording code for both success and failure paths (lines 75-79 and 93-98) should be restructured so that a failure in metrics recording cannot change the execution status. The recommended approach is to: 1) Move the pipeline_active_executions.dec() call into a finally block to guarantee gauge cleanup regardless of what happens. 2) Move the success metrics recording (lines 76-79) out of the try block and into a separate try/except or into a finally block with a status check. For example, after the main try/except, add a finally block that always decrements the gauge, and use a separate try/except for the Prometheus calls that cannot affect execution.status. The key invariant is: once execution.status is set to COMPLETED at line 67, no subsequent code path should overwrite it to FAILED unless the pipeline stages themselves failed.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


except Exception as e:
# Handle execution failure
execution.status = ExecutionStatus.FAILED
Expand All @@ -71,6 +89,13 @@ def execute(self, pipeline: Pipeline) -> Execution:
f"Pipeline execution failed: {str(e)}",
metadata={"traceback": traceback.format_exc()}
)

# Record failure metrics
duration = time.time() - start_time
pipeline_executions_total.labels(pipeline_id=pipeline.id, status='failed').inc()
pipeline_failures_total.labels(pipeline_id=pipeline.id).inc()
pipeline_execution_duration_seconds.labels(pipeline_id=pipeline.id).observe(duration)
pipeline_active_executions.labels(pipeline_id=pipeline.id).dec()
Comment on lines +92 to +98
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 pipeline_active_executions gauge never decremented if metrics recording in except block raises

The pipeline_active_executions gauge is incremented at backend/core/executor.py:48 (before the try block), but it is only decremented inside the try success path (line 79) or inside the except block (line 98). If any of the metrics calls preceding the .dec() in the except block (lines 95–97) raise an exception, the decrement on line 98 is never reached and the exception propagates out. This causes the gauge to permanently drift upward, reporting phantom active executions. The .dec() call should be placed in a finally block to guarantee it always executes regardless of what happens during metrics recording.

Prompt for agents
In backend/core/executor.py, refactor the execute() method (lines 47-99) so that the pipeline_active_executions gauge decrement is guaranteed to run. Move `pipeline_active_executions.labels(pipeline_id=pipeline.id).dec()` out of both the try and except blocks and into a finally block. The structure should be:

  pipeline_active_executions.labels(pipeline_id=pipeline.id).inc()
  start_time = time.time()
  try:
      ... (success path, record success metrics but WITHOUT the .dec() call)
  except Exception as e:
      ... (failure path, record failure metrics but WITHOUT the .dec() call)
  finally:
      pipeline_active_executions.labels(pipeline_id=pipeline.id).dec()

Remove the .dec() calls from both the try block (currently line 79) and the except block (currently line 98).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


return execution

Expand All @@ -83,6 +108,7 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None:
execution: Current execution context
"""
execution.add_log(stage.id, LogLevel.INFO, f"Starting stage: {stage.name}")
stage_start_time = time.time()

try:
# Execute based on stage type
Expand All @@ -107,13 +133,29 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None:
metadata={"result_keys": list(result.keys()) if isinstance(result, dict) else None}
)

# Record stage success metrics
stage_duration = time.time() - stage_start_time
stage_execution_duration_seconds.labels(
pipeline_id=execution.pipeline_id,
stage_name=stage.name,
status='success'
).observe(stage_duration)

except Exception as e:
execution.add_log(
stage.id,
LogLevel.ERROR,
f"Stage failed: {str(e)}",
metadata={"traceback": traceback.format_exc()}
)

# Record stage failure metrics
stage_duration = time.time() - stage_start_time
stage_execution_duration_seconds.labels(
pipeline_id=execution.pipeline_id,
stage_name=stage.name,
status='failed'
).observe(stage_duration)
raise

def _execute_input_stage(self, stage: Stage, execution: Execution) -> Dict[str, Any]:
Expand Down
Binary file added backend/db/__pycache__/crud.cpython-313.pyc
Binary file not shown.
10 changes: 9 additions & 1 deletion backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"""
from fastapi import Depends, FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from datetime import datetime
import uvicorn

Expand Down Expand Up @@ -71,6 +72,13 @@ async def health_check():
}


# Prometheus Metrics scraping endpoint
@app.get("/metrics", tags=["monitoring"])
async def prometheus_metrics():
"""Prometheus metrics endpoint"""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Comment on lines +75 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Protect /metrics; it currently bypasses auth.

When settings.AUTH_ENABLED is true, only the routers on Lines 97-110 get auth_dependencies; this endpoint stays anonymous. That exposes operational data and any user-derived label values to anyone who can reach the service. Gate it behind the same auth policy or an internal-only ingress, and keep it out of the public schema.

🔒 Proposed fix
+# Define this before the route declarations.
+metrics_dependencies = [Depends(get_current_auth_context)] if settings.AUTH_ENABLED else []
+
-@app.get("/metrics", tags=["monitoring"])
+@app.get("/metrics", tags=["monitoring"], include_in_schema=False, dependencies=metrics_dependencies)
 async def prometheus_metrics():
     """Prometheus metrics endpoint"""
     return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/main.py` around lines 75 - 79, The /metrics endpoint
(prometheus_metrics) is currently exposed anonymously; when
settings.AUTH_ENABLED is true, protect it by adding the same authentication
dependency (auth_dependencies) to the app.get decorator or route registration
and hide it from the public OpenAPI (set include_in_schema=False) so it requires
auth and is not in the public schema; conditionalize the decorator or route
registration on settings.AUTH_ENABLED (or always attach auth_dependencies when
enabled) to ensure prometheus_metrics is only accessible to
authenticated/internal callers.



# Root endpoint
@app.get("/", tags=["root"])
async def root():
Expand Down
Binary file not shown.
Binary file not shown.
52 changes: 52 additions & 0 deletions backend/monitoring/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""
Prometheus Metrics Definitions for Pipeline Monitoring
"""
import os
from prometheus_client import Counter, Histogram, Gauge, REGISTRY

# Disable the default process/platform metrics for cleaner app-specific dashboards if desired
# from prometheus_client import process_collector, platform_collector, gc_collector
# REGISTRY.unregister(process_collector.ProcessCollector())
# REGISTRY.unregister(platform_collector.PlatformCollector())
# REGISTRY.unregister(gc_collector.GCCollector())

# --- Counters ---
# Tracks the total number of pipeline executions, labeled by pipeline_id and final status
pipeline_executions_total = Counter(
'pipeline_executions_total',
'Total number of pipeline executions',
['pipeline_id', 'status']
)

# Tracks the total number of explicit failures, labeled by pipeline_id
pipeline_failures_total = Counter(
'pipeline_failures_total',
'Total number of failed pipeline executions',
['pipeline_id']
)

# --- Histograms ---
# Tracks the distribution of total end-to-end execution duration for a pipeline
# Buckets: 1s, 5s, 15s, 30s, 1m, 2m, 5m, 10m, 15m, 30m, 1h, +Inf
pipeline_execution_duration_seconds = Histogram(
'pipeline_execution_duration_seconds',
'Pipeline execution duration in seconds',
['pipeline_id'],
buckets=(1.0, 5.0, 15.0, 30.0, 60.0, 120.0, 300.0, 600.0, 900.0, 1800.0, 3600.0, float('inf'))
)

# Tracks the duration of individual stages within a pipeline
stage_execution_duration_seconds = Histogram(
'stage_execution_duration_seconds',
'Stage execution duration in seconds',
['pipeline_id', 'stage_name', 'status'],
buckets=(0.1, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0, 120.0, 300.0, float('inf'))
)
Comment on lines +39 to +44
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify that `Stage.name` is unconstrained and flows directly into the metric label.
rg -n -C2 'class Stage|name:\s*str|stage_execution_duration_seconds|stage_name\s*=\s*stage\.name' backend

Repository: fuzziecoder/Flexi-Roaster

Length of output: 8075


Avoid unbounded stage_name labels.

Stage.name is defined as a free-form str in backend/models/pipeline.py and passed directly into the stage_execution_duration_seconds metric label at backend/core/executor.py lines 140 and 156. Every distinct stage name creates a new time-series in Prometheus, which will eventually exhaust memory and degrade query latency. Use a bounded dimension (e.g., stage type or index) instead, and keep the free-form name in logs or traces.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/monitoring/metrics.py` around lines 39 - 44, The metric uses an
unbounded label 'stage_name' (stage_execution_duration_seconds) which will
create infinite Prometheus series; change the metric to use a bounded dimension
such as 'stage_type' or 'stage_index' instead and update all call sites that set
labels (references in executor where
stage_execution_duration_seconds.labels(...) is called) to provide the chosen
bounded value (e.g., Stage.type or a numeric stage index). Leave Stage.name
unchanged for logs/traces—emit the free-form name to logging/tracing code but
stop passing it into the metric labels. Ensure you update the metric declaration
(remove 'stage_name' label) and all uses in the executor (the label-setting
calls around the stage timing at the locations that currently pass stage.name)
to match the new label keys.


# --- Gauges ---
# Tracks the current number of concurrently running pipelines
pipeline_active_executions = Gauge(
'pipeline_active_executions',
'Number of currently running pipeline executions',
['pipeline_id']
)
149 changes: 149 additions & 0 deletions docs/monitoring/grafana_dashboard.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
{
"annotations": {
"list": []
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"links": [],
"liveNow": false,
"panels": [
{
"collapsed": false,
"gridPos": {
"h": 8,
"w": 6,
"x": 0,
"y": 0
},
"id": 1,
"panels": [],
"title": "Pipeline Success Rate (%)",
"type": "stat",
"datasource": "Prometheus",
"targets": [
{
"expr": "sum(rate(pipeline_executions_total{status=\"success\"}[5m])) / sum(rate(pipeline_executions_total[5m])) * 100",
"refId": "A"
}
Comment on lines +24 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

In PromQL, how should a success / total * 100 query be written so Grafana shows 0 instead of NaN/N/A when the denominator is zero or the series is absent?

💡 Result:

Use a filter to drop the divide-by-zero series, then fallback to 0 when the result is empty:

(
  100 * success / total
)
and on(<your_grouping_labels>)
(
  total > 0
)
or on() vector(0)
  • total > 0 (without bool) removes series where the denominator is 0, so you don’t get NaN.
  • ... or on() vector(0) makes Grafana get a 0 when the whole expression returns no series (missing/absent data). [1]

If you’re aggregating (common case), apply the same idea to the aggregated vectors:

(
  100 *
  sum by (job) (success)
  /
  sum by (job) (total)
)
and on(job)
(
  sum by (job) (total) > 0
)
or on() vector(0)

Sources: [1] https://grafana.com/docs/grafana/latest/alerting/guides/missing-data/ (Prometheus example: your_metric_query OR on() vector(0)), [2] https://stackoverflow.com/questions/47056557/how-to-gracefully-avoid-divide-by-zero-in-prometheus


🏁 Script executed:

find . -name "grafana_dashboard.json" -type f

Repository: fuzziecoder/Flexi-Roaster

Length of output: 109


🏁 Script executed:

# Once we find the file, let's examine it
cat -n docs/monitoring/grafana_dashboard.json | head -100

Repository: fuzziecoder/Flexi-Roaster

Length of output: 3701


Handle idle windows in the success-rate query.

When no executions occur in a 5-minute window, both sum(rate(...)) expressions return empty series. The denominator becomes 0 or absent, causing the division to produce NaN/N/A instead of 0%.

The proposed fix using or vector(0) and clamp_min() will work. However, PromQL documentation recommends a cleaner pattern that filters before calculating:

📊 Alternative approach
(
  100 * sum(rate(pipeline_executions_total{status="success"}[5m]))
  /
  sum(rate(pipeline_executions_total[5m]))
)
and on() (sum(rate(pipeline_executions_total[5m])) > 0)
or on() vector(0)

This filters out the division when the denominator is 0, then falls back to 0 when the entire expression returns no series.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/monitoring/grafana_dashboard.json` around lines 24 - 28, The
success-rate PromQL expression for the target using the metric
pipeline_executions_total should be changed to handle idle 5m windows: replace
the current expr string with the pattern that performs the division only when
the denominator sum(rate(pipeline_executions_total[5m])) > 0 and otherwise
returns 0 (use the "and on() (... > 0) or on() vector(0)" post-filter approach).
Locate the "targets" -> "expr" entry containing the current
sum(rate(pipeline_executions_total...)) division and update it to the
recommended guarded expression so the panel yields 0% instead of NaN/N/A when no
executions occur.

],
"options": {
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"textMode": "auto"
}
},
{
"collapsed": false,
"gridPos": {
"h": 8,
"w": 6,
"x": 6,
"y": 0
},
"id": 2,
"panels": [],
"title": "Active Pipeline Executions",
"type": "gauge",
"datasource": "Prometheus",
"targets": [
{
"expr": "sum(pipeline_active_executions)",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"fields": "",
"values": false
},
"showThresholdLabels": false,
"showThresholdMarkers": true,
"text": {}
}
},
{
"collapsed": false,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"id": 3,
"panels": [],
"title": "Pipeline Execution Duration (95th Percentile)",
"type": "timeseries",
"datasource": "Prometheus",
"targets": [
{
"expr": "histogram_quantile(0.95, sum(rate(pipeline_execution_duration_seconds_bucket[5m])) by (le, pipeline_id))",
"legendFormat": "{{pipeline_id}} (95% CI)",
"refId": "A"
Comment on lines +92 to +94
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Rename the legend; this is a percentile, not a confidence interval.

histogram_quantile(0.95, ...) returns a p95 estimate. Labeling it 95% CI is statistically incorrect and will mislead dashboard readers.

📝 Proposed fix
-                    "legendFormat": "{{pipeline_id}} (95% CI)",
+                    "legendFormat": "{{pipeline_id}} p95",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/monitoring/grafana_dashboard.json` around lines 92 - 94, The legend text
is incorrect: the PromQL expression using histogram_quantile(0.95, ...) produces
a 95th percentile (p95), not a 95% confidence interval; update the legendFormat
value (currently "{{pipeline_id}} (95% CI)") to a correct label such as
"{{pipeline_id}} (p95)" or "{{pipeline_id}} (95th percentile)" so the dashboard
accurately reflects the metric returned by histogram_quantile.

}
],
"options": {
"tooltip": {
"mode": "single",
"sort": "none"
}
}
},
{
"collapsed": false,
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 8
},
"id": 4,
"panels": [],
"title": "Global Pipeline Throughput (Executions per Minute)",
"type": "timeseries",
"datasource": "Prometheus",
"targets": [
{
"expr": "sum(rate(pipeline_executions_total[1m])) * 60",
"legendFormat": "Executions / Min",
"refId": "A"
}
],
"options": {
"tooltip": {
"mode": "single",
"sort": "none"
}
}
}
],
"refresh": "5s",
"schemaVersion": 38,
"style": "dark",
"tags": [
"flexiroaster",
"pipelines"
],
"templating": {
"list": []
},
"time": {
"from": "now-1h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "FlexiRoaster Pipeline Monitoring"
}