Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Python style
Patterns
- `__init__` methods must not raise exceptions; defer validation and connection to first use (lazy init)
- Writers: inherit from `Writer(ABC)`, implement `write(topic, message) -> (bool, str|None)` and `check_health() -> (bool, str)`
- PostgreSQL: `WriterPostgres` and `ReaderPostgres` cache a single connection per instance
- Route dispatch via `ROUTE_MAP` dict mapping routes to handler functions in `event_gate_lambda.py` and `event_stats_lambda.py`
- Separate business logic from environment access (env vars, file I/O, network calls)
- No duplicate validation; centralize parsing in one layer where practical
Expand All @@ -50,3 +51,6 @@ Testing
Quality gates (run after changes, fix only if below threshold)
- Run all quality gates at once: `make qa`
- Once a quality gate passes, do not re-run it in different scenarios

Git workflow
- Do NOT create git commits; committing is the developer's responsibility
8 changes: 8 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ updates:
commit-message:
prefix: "chore"
include: "scope"
groups:
github-actions:
patterns:
- "*"

- package-ecosystem: "pip"
directory: "/"
Expand All @@ -31,3 +35,7 @@ updates:
include: "scope"
allow:
- dependency-type: "direct"
groups:
python-dependencies:
patterns:
- "*"
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ cryptography==46.0.7
jsonschema==4.25.1
PyJWT==2.12.1
requests==2.32.5
aiosql==15.0
boto3==1.42.91
botocore==1.42.91
confluent-kafka==2.14.0
moto[s3,secretsmanager,events]==5.1.22
testcontainers==4.14.2
Expand Down
7 changes: 4 additions & 3 deletions src/handlers/handler_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from src.handlers.handler_token import HandlerToken
from src.utils.conf_path import CONF_DIR
from src.utils.config_loader import load_access_config
from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST
from src.utils.utils import build_error_response
from src.writers.writer import Writer

Expand Down Expand Up @@ -69,11 +70,11 @@ def with_load_topic_schemas(self) -> "HandlerTopic":
logger.debug("Loading topic schemas from %s.", topic_schemas_dir)

with open(os.path.join(topic_schemas_dir, "runs.json"), "r", encoding="utf-8") as file:
self.topics["public.cps.za.runs"] = json.load(file)
self.topics[TOPIC_RUNS] = json.load(file)
with open(os.path.join(topic_schemas_dir, "dlchange.json"), "r", encoding="utf-8") as file:
self.topics["public.cps.za.dlchange"] = json.load(file)
self.topics[TOPIC_DLCHANGE] = json.load(file)
with open(os.path.join(topic_schemas_dir, "test.json"), "r", encoding="utf-8") as file:
self.topics["public.cps.za.test"] = json.load(file)
self.topics[TOPIC_TEST] = json.load(file)

logger.debug("Loaded topic schemas successfully.")
return self
Expand Down
148 changes: 77 additions & 71 deletions src/readers/reader_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,57 @@
"""Postgres reader for run/job statistics."""

import logging
import os
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import cached_property
from pathlib import Path
from typing import Any

import aiosql
from botocore.exceptions import BotoCoreError, ClientError

from src.utils.constants import (
POSTGRES_DEFAULT_LIMIT,
POSTGRES_DEFAULT_WINDOW_MS,
POSTGRES_MAX_LIMIT,
POSTGRES_STATEMENT_TIMEOUT_MS,
REQUIRED_CONNECTION_FIELDS,
)
from src.utils.utils import load_postgres_config

try:
import psycopg2
from psycopg2 import Error as PsycopgError
from psycopg2 import sql as psycopg2_sql
except ImportError:
psycopg2 = None # type: ignore
psycopg2_sql = None # type: ignore

class PsycopgError(Exception): # type: ignore
"""Shim psycopg2 error base when psycopg2 is not installed."""

from src.utils.postgres_base import PsycopgError, PostgresBase

logger = logging.getLogger(__name__)

_RUNS_SQL_BASE = (
"SELECT r.event_id, r.job_ref, r.tenant_id, r.source_app,"
" r.source_app_version, r.environment,"
" r.timestamp_start AS run_timestamp_start,"
" r.timestamp_end AS run_timestamp_end,"
" j.internal_id, j.country, j.catalog_id, j.status,"
" j.timestamp_start, j.timestamp_end, j.message, j.additional_info"
" FROM public_cps_za_runs_jobs j"
" INNER JOIN public_cps_za_runs r ON j.event_id = r.event_id"
" WHERE r.timestamp_start >= %s AND r.timestamp_start <= %s"
)
_SQL_DIR = Path(__file__).parent / "sql"


_RUNS_SQL_TAIL = " ORDER BY j.internal_id DESC LIMIT %s"
@dataclass(frozen=True)
class ReaderQueries:
"""Typed holder for reader SQL query strings loaded via aiosql."""

_RUNS_SQL = _RUNS_SQL_BASE + _RUNS_SQL_TAIL
_RUNS_SQL_WITH_CURSOR = _RUNS_SQL_BASE + " AND j.internal_id < %s" + _RUNS_SQL_TAIL
get_stats: str
get_stats_with_cursor: str


class ReaderPostgres:
class ReaderPostgres(PostgresBase):
"""Read-only Postgres accessor for run/job statistics."""

def __init__(self) -> None:
self._secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
self._secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
self._db_config: dict[str, Any] | None = None
super().__init__()
logger.debug("Initialized PostgreSQL reader.")

def _load_db_config(self) -> dict[str, Any]:
"""Load database config from AWS Secrets Manager if not already loaded."""
if self._db_config is None:
self._db_config = load_postgres_config(self._secret_name, self._secret_region)
config = self._db_config
if config is None:
raise RuntimeError("Failed to load database configuration.")
return config
def _connect_options(self) -> str | None:
"""Set statement timeout and read-only mode for reader connections."""
return f"-c statement_timeout={POSTGRES_STATEMENT_TIMEOUT_MS}" " -c default_transaction_read_only=on"

@cached_property
def _queries(self) -> ReaderQueries:
"""Load SQL queries from the `sql/` directory via aiosql."""
queries = aiosql.from_path(_SQL_DIR, "psycopg2")
return ReaderQueries(
get_stats=queries.get_stats.sql,
get_stats_with_cursor=queries.get_stats_with_cursor.sql,
)

def read_stats(
self,
Expand All @@ -102,42 +90,25 @@ def read_stats(
Raises:
RuntimeError: On database connectivity or query errors.
"""
db_config = self._load_db_config()
required_keys = ("database", "host", "user", "password", "port")
missing_keys = [key for key in required_keys if not db_config.get(key)]
if missing_keys:
raise RuntimeError(f"PostgreSQL config missing: {', '.join(missing_keys)}.")
if psycopg2 is None:
raise RuntimeError("psycopg2 is not available.")
config = self._pg_config
if not config.get("database"):
raise RuntimeError("PostgreSQL config missing: database.")
if not all(config.get(field) for field in REQUIRED_CONNECTION_FIELDS):
missing = [field for field in REQUIRED_CONNECTION_FIELDS if not config.get(field)]
raise RuntimeError(f"PostgreSQL config missing: {', '.join(missing)}.")

limit = max(1, min(limit, POSTGRES_MAX_LIMIT))
now_ms = int(time.time() * 1000)
ts_start = timestamp_start if timestamp_start is not None else (now_ms - POSTGRES_DEFAULT_WINDOW_MS)
ts_end = timestamp_end if timestamp_end is not None else now_ms

params: list[Any] = [ts_start, ts_end]
if cursor is not None:
params.append(cursor)
query = psycopg2_sql.SQL(_RUNS_SQL_WITH_CURSOR)
else:
query = psycopg2_sql.SQL(_RUNS_SQL)
params.append(limit + 1)

try:
with psycopg2.connect( # type: ignore[attr-defined]
database=db_config["database"],
host=db_config["host"],
user=db_config["user"],
password=db_config["password"],
port=db_config["port"],
options="-c statement_timeout=30000 -c default_transaction_read_only=on",
) as connection:
with connection.cursor() as db_cursor:
db_cursor.execute(query, params)
col_names = [desc[0] for desc in db_cursor.description] # type: ignore[union-attr]
raw_rows = db_cursor.fetchall()
col_names, raw_rows = self._execute_with_retry(
lambda conn: self._run_stats_query(conn, ts_start, ts_end, cursor, limit)
)
except PsycopgError as exc:
raise RuntimeError(f"Database query failed: {exc}") from exc
self._close_connection()
raise RuntimeError(f"Database query error: {exc}") from exc

rows = [dict(zip(col_names, row, strict=True)) for row in raw_rows]

Expand All @@ -160,6 +131,41 @@ def read_stats(
logger.debug("Stats query returned %d rows.", len(rows))
return rows, pagination

def _run_stats_query(
self,
connection: Any,
ts_start: int,
ts_end: int,
cursor: int | None,
limit: int,
) -> tuple[list[str], list[tuple[Any, ...]]]:
"""Execute the stats SQL query and return column names and raw rows."""
try:
with connection.cursor() as db_cursor:
if cursor is not None:
db_cursor.execute(
self._queries.get_stats_with_cursor,
{"ts_start": ts_start, "ts_end": ts_end, "cursor_id": cursor, "lim": limit + 1},
)
else:
db_cursor.execute(
self._queries.get_stats,
{"ts_start": ts_start, "ts_end": ts_end, "lim": limit + 1},
)
if db_cursor.description is None:
raise RuntimeError("Stats query returned no result description.")
col_names = [desc[0] for desc in db_cursor.description]
raw_rows = db_cursor.fetchall()
finally:
# Rollback closes the implicit transaction opened by the SELECT,
# leaving the cached connection in a clean idle state for reuse.
try:
connection.rollback()
except PsycopgError:
logger.debug("Failed to close the implicit transaction. Closing cached connection.", exc_info=True)
self._close_connection()
return col_names, raw_rows

@staticmethod
def _format_row(row: dict[str, Any]) -> dict[str, Any]:
"""Add computed columns to a result row.
Expand Down Expand Up @@ -225,14 +231,14 @@ def check_health(self) -> tuple[bool, str]:
return False, "postgres secret not configured"

try:
db_config = self._load_db_config()
pg_config = self._pg_config
except (BotoCoreError, ClientError, RuntimeError, ValueError, KeyError) as err:
return False, str(err)

if not db_config.get("database"):
if not pg_config.get("database"):
return False, "database not configured"

missing = [f for f in ("host", "user", "password", "port") if not db_config.get(f)]
missing = [field for field in REQUIRED_CONNECTION_FIELDS if not pg_config.get(field)]
if missing:
return False, f"{missing[0]} not configured"

Expand Down
28 changes: 28 additions & 0 deletions src/readers/sql/stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- name: get_stats(ts_start, ts_end, lim)
-- Get run/job statistics with keyset pagination.
SELECT r.event_id, r.job_ref, r.tenant_id, r.source_app,
r.source_app_version, r.environment,
r.timestamp_start AS run_timestamp_start,
r.timestamp_end AS run_timestamp_end,
j.internal_id, j.country, j.catalog_id, j.status,
j.timestamp_start, j.timestamp_end, j.message, j.additional_info
FROM public_cps_za_runs_jobs j
INNER JOIN public_cps_za_runs r ON j.event_id = r.event_id
WHERE r.timestamp_start >= :ts_start AND r.timestamp_start <= :ts_end
ORDER BY j.internal_id DESC
LIMIT :lim;

-- name: get_stats_with_cursor(ts_start, ts_end, cursor_id, lim)
-- Get run/job statistics with cursor-based keyset pagination.
SELECT r.event_id, r.job_ref, r.tenant_id, r.source_app,
r.source_app_version, r.environment,
r.timestamp_start AS run_timestamp_start,
r.timestamp_end AS run_timestamp_end,
j.internal_id, j.country, j.catalog_id, j.status,
j.timestamp_start, j.timestamp_end, j.message, j.additional_info
FROM public_cps_za_runs_jobs j
INNER JOIN public_cps_za_runs r ON j.event_id = r.event_id
WHERE r.timestamp_start >= :ts_start AND r.timestamp_start <= :ts_end
AND j.internal_id < :cursor_id
ORDER BY j.internal_id DESC
LIMIT :lim;
8 changes: 5 additions & 3 deletions src/utils/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from boto3.resources.base import ServiceResource

from src.utils.constants import TOPIC_DLCHANGE, TOPIC_RUNS, TOPIC_TEST

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -76,9 +78,9 @@ def load_topic_names(conf_dir: str) -> list[str]:
List of topic name strings.
"""
filename_to_topic = {
"runs.json": "public.cps.za.runs",
"dlchange.json": "public.cps.za.dlchange",
"test.json": "public.cps.za.test",
"runs.json": TOPIC_RUNS,
"dlchange.json": TOPIC_DLCHANGE,
"test.json": TOPIC_TEST,
}
schemas_dir = os.path.join(conf_dir, "topic_schemas")
topics: list[str] = []
Expand Down
33 changes: 26 additions & 7 deletions src/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,43 @@

"""Constants and enums used across the project."""

from typing import Any
from typing import TypedDict

# Configuration keys
TOKEN_PROVIDER_URL_KEY = "token_provider_url"
TOKEN_PUBLIC_KEY_URL_KEY = "token_public_key_url"
TOKEN_PUBLIC_KEYS_URL_KEY = "token_public_keys_url"
SSL_CA_BUNDLE_KEY = "ssl_ca_bundle"

# Postgres connection
POSTGRES_CONNECT_TIMEOUT_SECONDS = 5
POSTGRES_STATEMENT_TIMEOUT_MS = 30000
POSTGRES_MAX_RETRIES = 2
REQUIRED_CONNECTION_FIELDS = ("host", "user", "password", "port")

# Postgres stats defaults
POSTGRES_DEFAULT_LIMIT = 50
POSTGRES_MAX_LIMIT = 1000
POSTGRES_DEFAULT_WINDOW_MS = 7 * 24 * 60 * 60 * 1000 # 7 days in milliseconds

SUPPORTED_TOPICS: list[str] = ["public.cps.za.runs"]
# Topic name constants
TOPIC_RUNS = "public.cps.za.runs"
TOPIC_DLCHANGE = "public.cps.za.dlchange"
TOPIC_TEST = "public.cps.za.test"

SUPPORTED_TOPICS: list[str] = [TOPIC_RUNS]


class TopicTableConfig(TypedDict, total=False):
"""Structure describing a topic's PostgreSQL table mapping."""

main: str
jobs: str
columns: dict[str, list[str]]


# Maps topic names to their PostgreSQL table(s)
TOPIC_TABLE_MAP: dict[str, dict[str, Any]] = {
"public.cps.za.runs": {
TOPIC_TABLE_MAP: dict[str, TopicTableConfig] = {
TOPIC_RUNS: {
"main": "public_cps_za_runs",
"jobs": "public_cps_za_runs_jobs",
"columns": {
Expand All @@ -60,7 +79,7 @@
],
},
},
"public.cps.za.dlchange": {
TOPIC_DLCHANGE: {
"main": "public_cps_za_dlchange",
"columns": {
"main": [
Expand All @@ -80,7 +99,7 @@
],
},
},
"public.cps.za.test": {
TOPIC_TEST: {
"main": "public_cps_za_test",
"columns": {
"main": [
Expand Down
Loading