Hackathon Project: Automatic root cause analysis for data pipeline failures using OpenMetadata lineage and AI reasoning.
When a data asset breaks β a dbt test fails, a column gets renamed, a pipeline produces nulls β FixFlow automatically walks the column-level lineage graph to find the exact breaking node, then explains the root cause in plain English and surfaces a fix.
A GitHub PR bot catches schema-breaking changes before they're merged, posting AI-generated impact warnings directly in pull request comments.
- 3 Input Sources: dbt webhooks, GitHub PR webhooks, manual chat queries
- Lineage Traversal: Real-time upstream navigation via OpenMetadata API
- Schema Diff Detection: Identifies breaking changes (renames, drops, type changes)
- AI Root Cause Analysis: Groq (llama3-70b-8192) with structured JSON responses
- Chat Interface: Multi-turn conversation with investigation context
- GitHub PR Bot: Auto-comment with impact analysis before merge
- 70+ Comprehensive Tests: Full coverage with edge cases and error handling
- Docker Compose: Full stack deployment with OpenMetadata, MongoDB, Elasticsearch
| Component | Layer | Status | Code Location |
|---|---|---|---|
| dbt Test Webhook | Input | β Complete | routes/events.py |
| GitHub PR Webhook | Input | β Complete | routes/github.py |
| Manual Query (Chat) | Input | β Complete | routes/chats.py |
| Event Router | Core | β Complete | controllers/event_controller.py |
| Lineage Engine | Core | β Complete | controllers/lineage_controller.py |
| Context Builder | Core | β Complete | controllers/investigation_controller.py |
| AI Reasoning Layer | Core | β Complete | controllers/investigation_controller.py |
| Chat UI | Frontend | β Complete | frontend/app/components/ |
| Lineage Visualization | Frontend | β Complete | frontend/app/components/LineageVisualizer.tsx |
Backend: 100% Complete (7 of 7 components) Tests: 70+ comprehensive test cases Frontend: 90% Complete (7 of 7 components implemented)
| Endpoint | Method | Status | Notes |
|---|---|---|---|
/health |
GET | β 200 OK | |
/api/v1/users/register |
POST | β 201 Created | Returns JWT token |
/api/v1/users/login |
POST | β 200 OK | Body JSON, not query params |
/api/v1/users/me |
GET | β 200 OK | Bearer token required |
/api/v1/connections |
POST | β 201 Created | Use name + openmetadata_host fields |
/api/v1/connections |
GET | β 200 OK | Returns masked tokens |
/api/v1/events/manual-query |
POST | β 202 Accepted | Starts async investigation |
/api/v1/investigations |
GET | β 200 OK | Returns [] when empty |
- Docker Desktop (running)
- 8GB+ RAM (Elasticsearch needs ~2GB)
git clone https://github.com/Krishna41357/Pipeline-Autopsy.git
cd Pipeline-AutopsyCreate a .env file at the project root (same level as docker-compose.yml):
SECRET_KEY=your-secret-key-min-32-chars-change-this
GROQ_API_KEY=gsk_your_groq_key_here
DEFAULT_LLM_PROVIDER=groq
AI_MODEL=llama3-70b-8192
OPENMETADATA_API_KEY=your-openmetadata-bot-token
DEBUG=true
β οΈ Important: The root.envis for Docker Compose.server/.envis for local non-Docker development. Both must exist with appropriate values.
docker pull mongo:7.0
docker pull postgres:13
docker pull elasticsearch:8.10.2
docker pull openmetadata/server:1.3.1docker-compose up -dOpenMetadata takes ~2-3 minutes to boot. Watch progress:
docker-compose logs -f openmetadata-serverWait until you see:
Started @Xms to org.eclipse.jetty
- Open
http://localhost:8585in your browser - Sign up / log in
- Navigate to Settings β Integrations β Bots β ingestion-bot
- Copy the JWT Token
- Update
OPENMETADATA_API_KEYin your root.env - Restart backend:
docker-compose restart backend
curl http://localhost:8000/health
# Expected: {"status":"ok","service":"ks-rag","version":"1.0.0"}
curl http://localhost:8585/api/v1/system/status
# Expected: {"status":"healthy"}- Python 3.11 (recommended) or 3.10+
- MongoDB running locally
- Node.js 18+
cd server
# Create virtual environment
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txtConfigure server/.env:
# Database β must be rag_database (hardcoded in controllers)
MONGO_URI=mongodb://localhost:27017/rag_database
# Authentication
SECRET_KEY=your-secret-key-min-32-chars
# OpenMetadata
OPENMETADATA_URL=http://localhost:8585
OPENMETADATA_API_KEY=your-ingestion-bot-token
# AI β Groq recommended
GROQ_API_KEY=gsk_your_key_here
DEFAULT_LLM_PROVIDER=groq
AI_MODEL=llama3-70b-8192
# API
CORS_ORIGINS=["http://localhost:3000"]
APP_HOST=0.0.0.0
APP_PORT=8000
DEBUG=truepython app.py
# Server starts on http://localhost:8000cd frontend
npm install
echo "NEXT_PUBLIC_API_BASE_URL=http://localhost:8000" > .env.local
npm run dev
# Frontend starts on http://localhost:3000These were discovered during live testing (April 12, 2026):
1. Connection fields use different names than old docs:
// β
Correct
{
"name": "Production",
"openmetadata_host": "http://localhost:8585",
"openmetadata_token": "eyJ...",
"github_repo": "owner/repo" // optional, must match pattern owner/repo
}
// β Wrong (old docs)
{
"workspace_name": "Production",
"openmetadata_url": "http://localhost:8585"
}2. Manual query fields:
// β
Correct
{
"asset_name": "sample_data.ecommerce_db.shopify.dim_customer",
"question": "Why is this table failing?",
"connection_id": "your-connection-id"
}
// β Wrong (old docs)
{
"asset_fqn": "...",
"failure_query": "..."
}3. Login takes JSON body (not query params):
# β
Correct
POST /api/v1/users/login
Body: {"email": "user@example.com", "password": "Testpass123"}
# β Wrong
POST /api/v1/users/login?email=...&password=...# Register
curl -X POST http://localhost:8000/api/v1/users/register \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"username": "myusername",
"password": "Testpass123",
"full_name": "Optional Name"
}'
# Returns: {"access_token": "eyJ...", "token_type": "bearer"}
# Login
curl -X POST http://localhost:8000/api/v1/users/login \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "password": "Testpass123"}'curl -X POST http://localhost:8000/api/v1/connections \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "Production",
"openmetadata_host": "http://localhost:8585",
"openmetadata_token": "your-ingestion-bot-token"
}'
# Returns: {"id": "...", "name": "Production", ...}# Step 1: Create event (starts investigation automatically)
curl -X POST http://localhost:8000/api/v1/events/manual-query \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"asset_name": "sample_data.ecommerce_db.shopify.dim_customer",
"question": "Why is this table failing?",
"connection_id": "YOUR_CONNECTION_ID"
}'
# Returns: {"event_id": "...", "status": "accepted", "message": "Investigation started"}
# Step 2: Poll for results
curl http://localhost:8000/api/v1/investigations \
-H "Authorization: Bearer YOUR_TOKEN"curl -X POST http://localhost:8000/api/v1/events/dbt-webhook \
-H "Content-Type: application/json" \
-d '{
"data": {
"run_id": "dbt_run_123",
"node_id": "model.proj.orders",
"error_message": "Column user_id not found",
"status": "error"
}
}'# Create session
curl -X POST http://localhost:8000/api/v1/chats \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title": "Orders Investigation"}'
# Send query
curl -X POST http://localhost:8000/api/v1/chats/SESSION_ID/query \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{"message": "Why is my pipeline breaking?"}'βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β INPUT TRIGGERS (Layer 1) β
βββββββββββββββββββ¬βββββββββββββββββββ¬ββββββββββββββββββββββββββ€
β dbt Test β GitHub PR β Manual Query β
β Webhook β Webhook β (Chat) β
ββββββββββ¬βββββββββ΄βββββββββ¬ββββββββββ΄βββββββββββ¬βββββββββββββββ
β β β
βββββββββββββββββββΌβββββββββββββββββββββ
βΌ
βββββββββββββββββββββββββββββββββββββββ
β EVENT ROUTER (Layer 2) β
β Normalize all inputs β FailureEvent β
ββββββββββ¬βββββββββββββββββββββββββββββ
β
βββββββββββ΄βββββββββββββββββββββββ
β BACKEND CORE (Layer 3) β
ββββββββββββββββββββββββββββββββββ€
β β Lineage Traversal β
β (OpenMetadata REST API) β
β β Schema Diff Detection β
β β Context Building β
β β AI Root Cause Analysis β
β (Groq llama3-70b-8192) β
ββββββββββ¬βββββββββββββββββββββββ
β
ββββββββββ΄βββββββββββββββββββββββ
β OUTPUTS (Layer 4) β
βββββββββββββββββ¬βββββββββββββββββ€
β Chat Response β GitHub Comment β
β JSON Result β Formatted Text β
βββββββββββββββββ΄βββββββββββββββββ
Backend:
- Framework: FastAPI 0.104.1
- Database: MongoDB (
rag_databaseβ hardcoded in controllers) - Authentication: JWT (python-jose) + bcrypt (direct, no passlib)
- LLM: Groq
llama3-70b-8192(primary), OpenAI/Claude (fallback) - External APIs: OpenMetadata REST API, GitHub API
- Testing: Pytest 70+ test cases, 85%+ coverage
Infrastructure (Docker):
- MongoDB 7.0
- PostgreSQL 13 (OpenMetadata backend)
- Elasticsearch 8.10.2 (OpenMetadata search)
- OpenMetadata Server 1.3.1
Frontend:
- Next.js 16 + React 19 + TypeScript
- Tailwind CSS 4.0
- D3.js 7.8.5 (lineage visualization)
All controllers hardcode rag_database as the MongoDB database name:
db = client["rag_database"] # hardcoded in all controllersAlways use MONGO_URI=mongodb://host:27017/rag_database.
Uses bcrypt directly β not passlib (incompatible with bcrypt 4.x+):
import bcrypt as bcrypt_lib
# Passwords truncated to 72 bytes (bcrypt hard limit)Groq is the recommended provider. Claude/OpenAI keys can be set to skip if unused:
DEFAULT_LLM_PROVIDER=groq
AI_MODEL=llama3-70b-8192
OPENAI_API_KEY=skip
CLAUDE_API_KEY=skipKS-RAG/
βββ .env β Docker Compose reads this (root)
βββ docker-compose.yml
βββ server/
βββ .env β Local development reads this
GROQ_API_KEY variable is not set warning:
- Ensure
.envexists at project root (same folder asdocker-compose.yml) - Add
env_file: - .envto backend service indocker-compose.yml
mongo:7.0-alpine not found:
- Use
mongo:7.0β MongoDB does not publish alpine variants for 7.x
OpenMetadata fails with relation does not exist:
- Database migration hasn't run. Add
openmetadata-migrateservice to compose:openmetadata-migrate: image: openmetadata/server:1.3.1 command: "./bootstrap/bootstrap_storage.sh migrate-all" depends_on: postgresql: condition: service_healthy restart: "no"
- Run
docker-compose down -vthendocker-compose up -dfor clean start
Investigations return empty []:
- Check
MONGO_URIin running container points torag_database - Verify
GROQ_API_KEYandOPENMETADATA_API_KEYare not blank in container
docker exec returns 500 error on Windows:
- Remove
-itflag:docker exec container_name mongosh --eval "..." - Or update Docker Desktop to latest version
Server returns 422 on connection creation:
- Use
name(notworkspace_name) andopenmetadata_host(notopenmetadata_url) github_repomust match patternowner/repoor be omitted entirely
Token expires after 1 hour:
- User session tokens expire. Use ingestion-bot JWT token for
OPENMETADATA_API_KEYβ it has"exp": null(never expires)
cd server
# Run all 70+ tests
pytest tests/ -v
# Run specific suite
pytest tests/test_auth_controller.py -v
pytest tests/test_lineage_controller.py -v
pytest tests/test_investigation_controller.py -v
pytest tests/test_event_controller.py -v
pytest tests/test_other_controllers.py -v
# Coverage report
pytest tests/ --cov=controllers --cov-report=html| Test File | Tests | Coverage |
|---|---|---|
| test_auth_controller.py | 25 | Password, JWT, registration, login |
| test_lineage_controller.py | 15 | Traversal, break detection, errors |
| test_investigation_controller.py | 15 | Pipeline, AI context, retry logic |
| test_event_controller.py | 12 | dbt/GitHub/manual webhooks |
| test_other_controllers.py | 30 | Connections, GitHub, chat CRUD |
Pipeline-Autopsy/
βββ .env # β Docker Compose env (root)
βββ docker-compose.yml # Full stack deployment
βββ README.md
β
βββ server/ # FastAPI backend
β βββ app.py # Entry point
β βββ requirements.txt # Python dependencies
β βββ Dockerfile # Python 3.11-slim
β βββ .env # β Local dev env (server/)
β β
β βββ routes/ # API endpoints
β β βββ auth.py
β β βββ connections.py
β β βββ events.py
β β βββ investigations.py
β β βββ chats.py
β β βββ github.py
β β
β βββ controllers/ # Business logic
β β βββ auth_controller.py # bcrypt direct (no passlib)
β β βββ lineage_controller.py # OpenMetadata traversal
β β βββ investigation_controller.py
β β βββ event_controller.py
β β βββ connection_controller.py
β β βββ github_controller.py
β β βββ chat_controller.py
β β
β βββ models/ # Pydantic v2 schemas
β β βββ base.py
β β βββ users.py # ConnectionCreate uses name + openmetadata_host
β β βββ events.py # ManualQueryPayload uses asset_name + question
β β βββ investigations.py
β β βββ lineage.py
β β βββ github.py
β β βββ chat.py
β β
β βββ tests/ # 70+ test cases
β βββ conftest.py
β βββ test_auth_controller.py
β βββ test_lineage_controller.py
β βββ test_investigation_controller.py
β βββ test_event_controller.py
β βββ test_other_controllers.py
β
βββ frontend/ # Next.js 16 frontend
βββ app/
β βββ components/
β β βββ AuthContext.tsx
β β βββ LoginSignup.tsx
β β βββ PipelineAutopsy.tsx
β β βββ InvestigationHistory.tsx
β β βββ ConnectionManager.tsx
β β βββ LineageVisualizer.tsx # D3.js graph
β βββ hooks/
β βββ useApi.ts
βββ package.json
βββ Dockerfile
| File | Purpose |
|---|---|
| server/context.md | Full architecture, patch notes, API examples |
| TESTING.md | Test suite guide and coverage goals |
Krishna Srivastava GitHub: @Krishna41357 Email: krishnasrivastava41357@gmail.com
MIT License β See LICENSE file for details
Built with β€οΈ for data engineers who want visibility into their pipelines