-
-
Notifications
You must be signed in to change notification settings - Fork 5
feat(monitoring): add prometheus metrics collection and grafana dashboard #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,10 +4,18 @@ | |
| """ | ||
| import traceback | ||
| from datetime import datetime | ||
| import time | ||
| from typing import Dict, Any, Optional | ||
|
|
||
| from backend.models.pipeline import Pipeline, Stage, Execution, ExecutionStatus, LogLevel, StageType | ||
| from backend.core.pipeline_engine import PipelineEngine | ||
| from backend.monitoring.metrics import ( | ||
| pipeline_executions_total, | ||
| pipeline_failures_total, | ||
| pipeline_execution_duration_seconds, | ||
| stage_execution_duration_seconds, | ||
| pipeline_active_executions | ||
| ) | ||
|
|
||
|
|
||
| class PipelineExecutor: | ||
|
|
@@ -36,6 +44,10 @@ def execute(self, pipeline: Pipeline) -> Execution: | |
| execution.status = ExecutionStatus.RUNNING | ||
| execution.add_log(None, LogLevel.INFO, f"Starting pipeline execution: {pipeline.name}") | ||
|
|
||
| # Track active pipeline start | ||
| pipeline_active_executions.labels(pipeline_id=pipeline.id).inc() | ||
| start_time = time.time() | ||
|
|
||
| try: | ||
| # Get execution order (topological sort) | ||
| execution_order = self.engine.get_execution_order(pipeline) | ||
|
|
@@ -60,6 +72,12 @@ def execute(self, pipeline: Pipeline) -> Execution: | |
| f"Pipeline completed successfully in {execution.duration:.2f}s" | ||
| ) | ||
|
|
||
| # Record success metrics | ||
| duration = time.time() - start_time | ||
| pipeline_executions_total.labels(pipeline_id=pipeline.id, status='success').inc() | ||
| pipeline_execution_duration_seconds.labels(pipeline_id=pipeline.id).observe(duration) | ||
| pipeline_active_executions.labels(pipeline_id=pipeline.id).dec() | ||
|
Comment on lines
+75
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Success-path metrics recording inside try block can overwrite COMPLETED status to FAILED The Prometheus metrics recording for the success path (lines 76-79) is placed inside the Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| except Exception as e: | ||
| # Handle execution failure | ||
| execution.status = ExecutionStatus.FAILED | ||
|
|
@@ -71,6 +89,13 @@ def execute(self, pipeline: Pipeline) -> Execution: | |
| f"Pipeline execution failed: {str(e)}", | ||
| metadata={"traceback": traceback.format_exc()} | ||
| ) | ||
|
|
||
| # Record failure metrics | ||
| duration = time.time() - start_time | ||
| pipeline_executions_total.labels(pipeline_id=pipeline.id, status='failed').inc() | ||
| pipeline_failures_total.labels(pipeline_id=pipeline.id).inc() | ||
| pipeline_execution_duration_seconds.labels(pipeline_id=pipeline.id).observe(duration) | ||
| pipeline_active_executions.labels(pipeline_id=pipeline.id).dec() | ||
|
Comment on lines
+92
to
+98
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 pipeline_active_executions gauge never decremented if metrics recording in except block raises The Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback. |
||
|
|
||
| return execution | ||
|
|
||
|
|
@@ -83,6 +108,7 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None: | |
| execution: Current execution context | ||
| """ | ||
| execution.add_log(stage.id, LogLevel.INFO, f"Starting stage: {stage.name}") | ||
| stage_start_time = time.time() | ||
|
|
||
| try: | ||
| # Execute based on stage type | ||
|
|
@@ -107,13 +133,29 @@ def _execute_stage(self, stage: Stage, execution: Execution) -> None: | |
| metadata={"result_keys": list(result.keys()) if isinstance(result, dict) else None} | ||
| ) | ||
|
|
||
| # Record stage success metrics | ||
| stage_duration = time.time() - stage_start_time | ||
| stage_execution_duration_seconds.labels( | ||
| pipeline_id=execution.pipeline_id, | ||
| stage_name=stage.name, | ||
| status='success' | ||
| ).observe(stage_duration) | ||
|
|
||
| except Exception as e: | ||
| execution.add_log( | ||
| stage.id, | ||
| LogLevel.ERROR, | ||
| f"Stage failed: {str(e)}", | ||
| metadata={"traceback": traceback.format_exc()} | ||
| ) | ||
|
|
||
| # Record stage failure metrics | ||
| stage_duration = time.time() - stage_start_time | ||
| stage_execution_duration_seconds.labels( | ||
| pipeline_id=execution.pipeline_id, | ||
| stage_name=stage.name, | ||
| status='failed' | ||
| ).observe(stage_duration) | ||
| raise | ||
|
|
||
| def _execute_input_stage(self, stage: Stage, execution: Execution) -> Dict[str, Any]: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,8 @@ | |
| """ | ||
| from fastapi import Depends, FastAPI, Request | ||
| from fastapi.middleware.cors import CORSMiddleware | ||
| from fastapi.responses import JSONResponse | ||
| from fastapi.responses import JSONResponse, Response | ||
| from prometheus_client import generate_latest, CONTENT_TYPE_LATEST | ||
| from datetime import datetime | ||
| import uvicorn | ||
|
|
||
|
|
@@ -71,6 +72,13 @@ async def health_check(): | |
| } | ||
|
|
||
|
|
||
| # Prometheus Metrics scraping endpoint | ||
| @app.get("/metrics", tags=["monitoring"]) | ||
| async def prometheus_metrics(): | ||
| """Prometheus metrics endpoint""" | ||
| return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) | ||
|
Comment on lines
+75
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Protect When 🔒 Proposed fix+# Define this before the route declarations.
+metrics_dependencies = [Depends(get_current_auth_context)] if settings.AUTH_ENABLED else []
+
-@app.get("/metrics", tags=["monitoring"])
+@app.get("/metrics", tags=["monitoring"], include_in_schema=False, dependencies=metrics_dependencies)
async def prometheus_metrics():
"""Prometheus metrics endpoint"""
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)🤖 Prompt for AI Agents |
||
|
|
||
|
|
||
| # Root endpoint | ||
| @app.get("/", tags=["root"]) | ||
| async def root(): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| """ | ||
| Prometheus Metrics Definitions for Pipeline Monitoring | ||
| """ | ||
| import os | ||
| from prometheus_client import Counter, Histogram, Gauge, REGISTRY | ||
|
|
||
| # Disable the default process/platform metrics for cleaner app-specific dashboards if desired | ||
| # from prometheus_client import process_collector, platform_collector, gc_collector | ||
| # REGISTRY.unregister(process_collector.ProcessCollector()) | ||
| # REGISTRY.unregister(platform_collector.PlatformCollector()) | ||
| # REGISTRY.unregister(gc_collector.GCCollector()) | ||
|
|
||
| # --- Counters --- | ||
| # Tracks the total number of pipeline executions, labeled by pipeline_id and final status | ||
| pipeline_executions_total = Counter( | ||
| 'pipeline_executions_total', | ||
| 'Total number of pipeline executions', | ||
| ['pipeline_id', 'status'] | ||
| ) | ||
|
|
||
| # Tracks the total number of explicit failures, labeled by pipeline_id | ||
| pipeline_failures_total = Counter( | ||
| 'pipeline_failures_total', | ||
| 'Total number of failed pipeline executions', | ||
| ['pipeline_id'] | ||
| ) | ||
|
|
||
| # --- Histograms --- | ||
| # Tracks the distribution of total end-to-end execution duration for a pipeline | ||
| # Buckets: 1s, 5s, 15s, 30s, 1m, 2m, 5m, 10m, 15m, 30m, 1h, +Inf | ||
| pipeline_execution_duration_seconds = Histogram( | ||
| 'pipeline_execution_duration_seconds', | ||
| 'Pipeline execution duration in seconds', | ||
| ['pipeline_id'], | ||
| buckets=(1.0, 5.0, 15.0, 30.0, 60.0, 120.0, 300.0, 600.0, 900.0, 1800.0, 3600.0, float('inf')) | ||
| ) | ||
|
|
||
| # Tracks the duration of individual stages within a pipeline | ||
| stage_execution_duration_seconds = Histogram( | ||
| 'stage_execution_duration_seconds', | ||
| 'Stage execution duration in seconds', | ||
| ['pipeline_id', 'stage_name', 'status'], | ||
| buckets=(0.1, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0, 120.0, 300.0, float('inf')) | ||
| ) | ||
|
Comment on lines
+39
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Verify that `Stage.name` is unconstrained and flows directly into the metric label.
rg -n -C2 'class Stage|name:\s*str|stage_execution_duration_seconds|stage_name\s*=\s*stage\.name' backendRepository: fuzziecoder/Flexi-Roaster Length of output: 8075 Avoid unbounded
🤖 Prompt for AI Agents |
||
|
|
||
| # --- Gauges --- | ||
| # Tracks the current number of concurrently running pipelines | ||
| pipeline_active_executions = Gauge( | ||
| 'pipeline_active_executions', | ||
| 'Number of currently running pipeline executions', | ||
| ['pipeline_id'] | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| { | ||
| "annotations": { | ||
| "list": [] | ||
| }, | ||
| "editable": true, | ||
| "fiscalYearStartMonth": 0, | ||
| "graphTooltip": 0, | ||
| "links": [], | ||
| "liveNow": false, | ||
| "panels": [ | ||
| { | ||
| "collapsed": false, | ||
| "gridPos": { | ||
| "h": 8, | ||
| "w": 6, | ||
| "x": 0, | ||
| "y": 0 | ||
| }, | ||
| "id": 1, | ||
| "panels": [], | ||
| "title": "Pipeline Success Rate (%)", | ||
| "type": "stat", | ||
| "datasource": "Prometheus", | ||
| "targets": [ | ||
| { | ||
| "expr": "sum(rate(pipeline_executions_total{status=\"success\"}[5m])) / sum(rate(pipeline_executions_total[5m])) * 100", | ||
| "refId": "A" | ||
| } | ||
|
Comment on lines
+24
to
+28
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: Use a filter to drop the divide-by-zero series, then fallback to 0 when the result is empty:
If you’re aggregating (common case), apply the same idea to the aggregated vectors: Sources: [1] https://grafana.com/docs/grafana/latest/alerting/guides/missing-data/ (Prometheus example: 🏁 Script executed: find . -name "grafana_dashboard.json" -type fRepository: fuzziecoder/Flexi-Roaster Length of output: 109 🏁 Script executed: # Once we find the file, let's examine it
cat -n docs/monitoring/grafana_dashboard.json | head -100Repository: fuzziecoder/Flexi-Roaster Length of output: 3701 Handle idle windows in the success-rate query. When no executions occur in a 5-minute window, both The proposed fix using 📊 Alternative approachThis filters out the division when the denominator is 0, then falls back to 0 when the entire expression returns no series. 🤖 Prompt for AI Agents |
||
| ], | ||
| "options": { | ||
| "colorMode": "value", | ||
| "graphMode": "area", | ||
| "justifyMode": "auto", | ||
| "orientation": "auto", | ||
| "reduceOptions": { | ||
| "calcs": [ | ||
| "lastNotNull" | ||
| ], | ||
| "fields": "", | ||
| "values": false | ||
| }, | ||
| "textMode": "auto" | ||
| } | ||
| }, | ||
| { | ||
| "collapsed": false, | ||
| "gridPos": { | ||
| "h": 8, | ||
| "w": 6, | ||
| "x": 6, | ||
| "y": 0 | ||
| }, | ||
| "id": 2, | ||
| "panels": [], | ||
| "title": "Active Pipeline Executions", | ||
| "type": "gauge", | ||
| "datasource": "Prometheus", | ||
| "targets": [ | ||
| { | ||
| "expr": "sum(pipeline_active_executions)", | ||
| "refId": "A" | ||
| } | ||
| ], | ||
| "options": { | ||
| "reduceOptions": { | ||
| "calcs": [ | ||
| "lastNotNull" | ||
| ], | ||
| "fields": "", | ||
| "values": false | ||
| }, | ||
| "showThresholdLabels": false, | ||
| "showThresholdMarkers": true, | ||
| "text": {} | ||
| } | ||
| }, | ||
| { | ||
| "collapsed": false, | ||
| "gridPos": { | ||
| "h": 8, | ||
| "w": 12, | ||
| "x": 12, | ||
| "y": 0 | ||
| }, | ||
| "id": 3, | ||
| "panels": [], | ||
| "title": "Pipeline Execution Duration (95th Percentile)", | ||
| "type": "timeseries", | ||
| "datasource": "Prometheus", | ||
| "targets": [ | ||
| { | ||
| "expr": "histogram_quantile(0.95, sum(rate(pipeline_execution_duration_seconds_bucket[5m])) by (le, pipeline_id))", | ||
| "legendFormat": "{{pipeline_id}} (95% CI)", | ||
| "refId": "A" | ||
|
Comment on lines
+92
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rename the legend; this is a percentile, not a confidence interval.
📝 Proposed fix- "legendFormat": "{{pipeline_id}} (95% CI)",
+ "legendFormat": "{{pipeline_id}} p95",🤖 Prompt for AI Agents |
||
| } | ||
| ], | ||
| "options": { | ||
| "tooltip": { | ||
| "mode": "single", | ||
| "sort": "none" | ||
| } | ||
| } | ||
| }, | ||
| { | ||
| "collapsed": false, | ||
| "gridPos": { | ||
| "h": 8, | ||
| "w": 24, | ||
| "x": 0, | ||
| "y": 8 | ||
| }, | ||
| "id": 4, | ||
| "panels": [], | ||
| "title": "Global Pipeline Throughput (Executions per Minute)", | ||
| "type": "timeseries", | ||
| "datasource": "Prometheus", | ||
| "targets": [ | ||
| { | ||
| "expr": "sum(rate(pipeline_executions_total[1m])) * 60", | ||
| "legendFormat": "Executions / Min", | ||
| "refId": "A" | ||
| } | ||
| ], | ||
| "options": { | ||
| "tooltip": { | ||
| "mode": "single", | ||
| "sort": "none" | ||
| } | ||
| } | ||
| } | ||
| ], | ||
| "refresh": "5s", | ||
| "schemaVersion": 38, | ||
| "style": "dark", | ||
| "tags": [ | ||
| "flexiroaster", | ||
| "pipelines" | ||
| ], | ||
| "templating": { | ||
| "list": [] | ||
| }, | ||
| "time": { | ||
| "from": "now-1h", | ||
| "to": "now" | ||
| }, | ||
| "timepicker": {}, | ||
| "timezone": "", | ||
| "title": "FlexiRoaster Pipeline Monitoring" | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
In Python, what clock is recommended for measuring elapsed durations:time.time()ortime.perf_counter()/time.monotonic()?💡 Result:
For measuring elapsed durations, prefer a monotonic clock:
time.perf_counter()(ortime.perf_counter_ns()) for timing/benchmarking elapsed time, because it’s monotonic and uses the highest available resolution clock. [1], [2]time.monotonic()(ortime.monotonic_ns()) when you specifically want a monotonic elapsed-time clock (also immune to system clock changes), typically with slightly less “benchmark-focused” intent thanperf_counter(). [1], [3]time.time()for durations because it’s “wall clock” time and can jump forwards/backwards if the system time is adjusted (NTP, manual changes), which can break elapsed-time measurements. [1], [3]Rule of thumb: durations →
perf_counter()(most common); timestamps/“seconds since epoch” →time.time(). [1], [3]Sources:
[1] PEP 418 (monotonic/perf_counter rationale and use) (peps.python.org)
[2] Python
timemodule docs (perf_counter) (docs.python.org)[3] Python
timemodule docs (clock semantics; monotonic vs realtime) (docs.python.org)Citations:
🏁 Script executed:
Repository: fuzziecoder/Flexi-Roaster
Length of output: 131
🏁 Script executed:
head -160 backend/core/executor.py | cat -nRepository: fuzziecoder/Flexi-Roaster
Length of output: 7403
🏁 Script executed:
Repository: fuzziecoder/Flexi-Roaster
Length of output: 51
Use a monotonic clock for elapsed-time metrics.
time.time()tracks wall time. NTP or manual clock changes can move it backward or forward and skew these histogram observations.time.perf_counter()is the recommended approach for measuring elapsed durations.⏱️ Proposed refactor
Also applies to: 49, 76, 94, 111, 137, 153
🤖 Prompt for AI Agents