Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"permissions": {
"allow": [
"Bash(git add:*)",
"Bash(git commit:*)",
"Bash(git push:*)",
"Bash(python:*)",
"Bash(make lint:*)",
"Bash(pip install:*)",
"Bash(ruff check:*)",
"Bash(ruff format:*)",
"Bash(python3:*)",
"Bash(pip3 install:*)",
"Bash(source:*)"
]
}
}
6 changes: 3 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ CORS_ORIGINS=http://localhost:3000,http://localhost:8000
METADATA_DB_HOST=postgres
METADATA_DB_PORT=5432
METADATA_DB_USER=observakit
METADATA_DB_PASSWORD=changeme
METADATA_DB_PASSWORD=your_secure_password_here
METADATA_DB_NAME=observakit

# --- Target Warehouse (the warehouse you're monitoring) ---
Expand All @@ -24,7 +24,7 @@ WAREHOUSE_TYPE=postgres
WAREHOUSE_HOST=host.docker.internal
WAREHOUSE_PORT=5432
WAREHOUSE_USER=your_user
WAREHOUSE_PASSWORD=your_password
WAREHOUSE_PASSWORD=your_warehouse_password
WAREHOUSE_DB=your_database
WAREHOUSE_SCHEMA=public

Expand All @@ -44,7 +44,7 @@ WAREHOUSE_SCHEMA=public
# --- Airflow ---
AIRFLOW_BASE_URL=http://localhost:8080
AIRFLOW_USERNAME=admin
AIRFLOW_PASSWORD=admin
AIRFLOW_PASSWORD=your_airflow_password

# --- Prefect (uncomment if using Prefect) ---
# PREFECT_API_URL=http://localhost:4200/api
Expand Down
4 changes: 1 addition & 3 deletions alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ def run_migrations_online() -> None:
)

with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
context.configure(connection=connection, target_metadata=target_metadata)

with context.begin_transaction():
context.run_migrations()
Expand Down
65 changes: 36 additions & 29 deletions alembic/versions/81c7c80e0f60_add_project_and_apikey_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,74 @@
Create Date: 2026-04-05 13:43:13.652164

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

from alembic import op

# revision identifiers, used by Alembic.
revision: str = '81c7c80e0f60'
down_revision: Union[str, Sequence[str], None] = '002_biginteger_numeric_columns'
revision: str = "81c7c80e0f60"
down_revision: Union[str, Sequence[str], None] = "002_biginteger_numeric_columns"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('projects',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('name')
op.create_table(
"projects",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("name", sa.String(length=100), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
)
op.create_table('api_keys',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('project_id', sa.Integer(), nullable=False),
sa.Column('hashed_key', sa.String(length=255), nullable=False),
sa.Column('role', sa.String(length=20), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(['project_id'], ['projects.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('hashed_key')
op.create_table(
"api_keys",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.Column("hashed_key", sa.String(length=255), nullable=False),
sa.Column("role", sa.String(length=20), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("is_active", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["project_id"],
["projects.id"],
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("hashed_key"),
)

# Insert an initial default project and admin key
import hashlib
import secrets
import os


# Generate a random 32-character hex key
initial_key = secrets.token_hex(16)
hashed_key = hashlib.sha256(initial_key.encode()).hexdigest()

# Insert project
op.execute("INSERT INTO projects (name, created_at) VALUES ('default', CURRENT_TIMESTAMP)")
# Insert API Key (project 1 is the default since it's autoincrementing and the only row we inserted)
op.execute(f"INSERT INTO api_keys (project_id, hashed_key, role, is_active, created_at) VALUES (1, '{hashed_key}', 'admin', true, CURRENT_TIMESTAMP)")

op.execute(
f"INSERT INTO api_keys (project_id, hashed_key, role, is_active, created_at) VALUES (1, '{hashed_key}', 'admin', true, CURRENT_TIMESTAMP)"
)

# Print the key to the console so the user gets it during init/upgrade!
print("\\n" + "="*60)
print("\\n" + "=" * 60)
print("🚀 OBSERVAKIT RBAC ENABLED")
print(f"🔑 Your initial Admin API Key is: {initial_key}")
print("Store this safely! It will not be shown again.")
print("="*60 + "\\n")
print("=" * 60 + "\\n")
# ### end Alembic commands ###


def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('api_keys')
op.drop_table('projects')
op.drop_table("api_keys")
op.drop_table("projects")
# ### end Alembic commands ###
71 changes: 53 additions & 18 deletions alerts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ class AlertDispatcher(ABC):
"""Abstract base class for alert dispatchers."""

@abstractmethod
def send(self, message: str, subject: str = None, alert_type: str = None,
table_name: str = None, **kwargs) -> bool:
def send(
self,
message: str,
subject: str = None,
alert_type: str = None,
table_name: str = None,
**kwargs,
) -> bool:
"""
Send an alert message.
Returns True if successful, False otherwise.
Expand All @@ -23,21 +29,27 @@ def get_alert_dispatcher(channel: str, **kwargs) -> AlertDispatcher:
"""Factory: return the appropriate alert dispatcher."""
if channel == "slack":
from alerts.slack import SlackDispatcher

return SlackDispatcher(**kwargs)
elif channel == "email":
from alerts.email import EmailDispatcher

return EmailDispatcher(**kwargs)
elif channel == "discord":
from alerts.discord import DiscordDispatcher

return DiscordDispatcher(**kwargs)
elif channel == "webhook":
from alerts.webhook import WebhookDispatcher

return WebhookDispatcher(**kwargs)
elif channel == "teams":
from alerts.teams import TeamsDispatcher

return TeamsDispatcher(**kwargs)
elif channel == "pagerduty":
from alerts.pagerduty import PagerDutyDispatcher

return PagerDutyDispatcher(**kwargs)
else:
raise ValueError(
Expand All @@ -46,7 +58,14 @@ def get_alert_dispatcher(channel: str, **kwargs) -> AlertDispatcher:
)


def dispatch_alert(alert_type: str, message: str, table_name: str = None, subject: str = None, db=None, severity: str = "fail"):
def dispatch_alert(
alert_type: str,
message: str,
table_name: str = None,
subject: str = None,
db=None,
severity: str = "fail",
):
"""
Dispatch an alert using routing rules from kit.yml.
Uses load_config() so that ${VAR:-default} env vars are properly expanded.
Expand Down Expand Up @@ -90,7 +109,9 @@ def dispatch_alert(alert_type: str, message: str, table_name: str = None, subjec
kwargs = {k: v for k, v in rule.items() if k not in ["match", "channel"]}
try:
dispatcher = get_alert_dispatcher(channel, **kwargs)
if dispatcher.send(formatted_message, subject, alert_type=alert_type, table_name=table_name):
if dispatcher.send(
formatted_message, subject, alert_type=alert_type, table_name=table_name
):
dispatched = True
used_channel = channel
except Exception as e:
Expand All @@ -101,21 +122,26 @@ def dispatch_alert(alert_type: str, message: str, table_name: str = None, subjec
default_channel = config.get("alerts", {}).get("default_channel", "slack")
try:
dispatcher = get_alert_dispatcher(default_channel)
if dispatcher.send(formatted_message, subject, alert_type=alert_type, table_name=table_name):
if dispatcher.send(
formatted_message, subject, alert_type=alert_type, table_name=table_name
):
dispatched = True
used_channel = default_channel
except Exception as e:
logging.getLogger(__name__).error(f"Failed to send default alert via {default_channel}: {e}")
logging.getLogger(__name__).error(
f"Failed to send default alert via {default_channel}: {e}"
)

if dispatched and db:
from backend.models import AlertLog

try:
log = AlertLog(
alert_type=alert_type,
channel=used_channel,
table_name=table_name,
message=formatted_message,
success=True
success=True,
)
db.add(log)
db.commit()
Expand All @@ -133,12 +159,17 @@ def is_alert_suppressed(db, table_name: str) -> bool:

from backend.models import CheckSuppression

suppression = db.query(CheckSuppression).filter(
CheckSuppression.table_name == table_name,
CheckSuppression.suppressed_until >= datetime.now(timezone.utc),
).first()
suppression = (
db.query(CheckSuppression)
.filter(
CheckSuppression.table_name == table_name,
CheckSuppression.suppressed_until >= datetime.now(timezone.utc),
)
.first()
)
if suppression:
import logging

logging.getLogger(__name__).info(
f"Alert suppressed for {table_name} until {suppression.suppressed_until} "
f"— reason: {suppression.reason}"
Expand All @@ -156,16 +187,20 @@ def is_alert_deduped(db, table_name: str, alert_type: str, window_minutes: int =

from backend.models import AlertLog

recent = db.query(AlertLog).filter(
AlertLog.table_name == table_name,
AlertLog.alert_type == alert_type,
AlertLog.sent_at >= datetime.now(timezone.utc) - timedelta(minutes=window_minutes),
).first()
recent = (
db.query(AlertLog)
.filter(
AlertLog.table_name == table_name,
AlertLog.alert_type == alert_type,
AlertLog.sent_at >= datetime.now(timezone.utc) - timedelta(minutes=window_minutes),
)
.first()
)
if recent:
import logging

logging.getLogger(__name__).info(
f"Skipping duplicate {alert_type} alert for {table_name} "
f"(last sent {recent.sent_at})"
f"Skipping duplicate {alert_type} alert for {table_name} (last sent {recent.sent_at})"
)
return True
return False
Expand Down
11 changes: 6 additions & 5 deletions alerts/discord.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
logger = logging.getLogger(__name__)

# Discord message colour codes
COLOUR_OK = 0x57F287 # green
COLOUR_WARN = 0xFEE75C # yellow
COLOUR_FAIL = 0xED4245 # red
COLOUR_INFO = 0x5865F2 # blurple (default)
COLOUR_OK = 0x57F287 # green
COLOUR_WARN = 0xFEE75C # yellow
COLOUR_FAIL = 0xED4245 # red
COLOUR_INFO = 0x5865F2 # blurple (default)

# Alert type → colour mapping
_ALERT_COLOURS = {
Expand All @@ -47,7 +47,7 @@ class DiscordDispatcher(AlertDispatcher):

def __init__(self, **kwargs):
self._webhook_url = os.getenv("DISCORD_WEBHOOK_URL", "")
self._mention = os.getenv("DISCORD_MENTION", "") # e.g. "@here" or "<@&ROLE_ID>"
self._mention = os.getenv("DISCORD_MENTION", "") # e.g. "@here" or "<@&ROLE_ID>"

def send(self, message: str, subject: str = None, alert_type: str = None, **kwargs) -> bool:
if not self._webhook_url:
Expand Down Expand Up @@ -94,4 +94,5 @@ def send(self, message: str, subject: str = None, alert_type: str = None, **kwar

def _utc_now_iso() -> str:
from datetime import datetime, timezone

return datetime.now(timezone.utc).isoformat()
8 changes: 4 additions & 4 deletions alerts/pagerduty.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ class PagerDutyDispatcher(AlertDispatcher):
"""Sends alerts using the PagerDuty Events API v2."""

def __init__(self, **kwargs):
self._routing_key = kwargs.get("pagerduty_routing_key") or os.getenv("PAGERDUTY_ROUTING_KEY", "")
self._routing_key = kwargs.get("pagerduty_routing_key") or os.getenv(
"PAGERDUTY_ROUTING_KEY", ""
)
# Allow overriding severity map via env var (JSON string)
custom_map = os.getenv("PAGERDUTY_SEVERITY_MAP")
if custom_map:
Expand Down Expand Up @@ -103,9 +105,7 @@ def send(
)
return True
else:
logger.error(
"PagerDuty API returned %d: %s", resp.status_code, resp.text
)
logger.error("PagerDuty API returned %d: %s", resp.status_code, resp.text)
return False
except Exception as exc:
logger.error("Failed to send PagerDuty alert: %s", exc)
Expand Down
Loading
Loading