diff --git a/.github/workflows/chartpress.yaml b/.github/workflows/chartpress.yaml index 238bc9a9..4aadf2db 100644 --- a/.github/workflows/chartpress.yaml +++ b/.github/workflows/chartpress.yaml @@ -4,7 +4,7 @@ on: branches: - 'main' - 'staging' - - 'k8s_deploy' + - 'tiler_monitoring' jobs: build: runs-on: ubuntu-22.04 @@ -71,7 +71,7 @@ jobs: OHM_SLACK_WEBHOOK_URL: ${{ secrets.OHM_SLACK_WEBHOOK_URL }} ################ Staging secrets ################ - name: Staging - substitute secrets - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/k8s_deploy' + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/tiler_monitoring' uses: bluwy/substitute-string-action@v1 with: _input-file: 'values.staging.template.yaml' @@ -189,46 +189,46 @@ jobs: PRODUCTION_OPENSTREETMAP_AUTH_SECRET: ${{ secrets.PRODUCTION_OPENSTREETMAP_AUTH_SECRET }} - name: AWS Credentials - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy' + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring' uses: aws-actions/configure-aws-credentials@v1 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Setup Kubectl and Helm Dependencies - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy' + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring' run: | sudo pip install awscli --ignore-installed six - sudo curl -L -o /usr/bin/kubectl https://dl.k8s.io/release/v1.33.0/bin/linux/amd64/kubectl + sudo curl -L -o /usr/bin/kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.17.7/2020-07-08/bin/linux/amd64/kubectl sudo chmod +x /usr/bin/kubectl - sudo curl -o /usr/bin/aws-iam-authenticator https://amazon-eks.s3.us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/aws-iam-authenticator + sudo curl -o /usr/bin/aws-iam-authenticator https://amazon-eks.s3.us-west-2.amazonaws.com/1.17.7/2020-07-08/bin/linux/amd64/aws-iam-authenticator sudo chmod +x /usr/bin/aws-iam-authenticator - curl -L https://get.helm.sh/helm-v3.17.3-linux-amd64.tar.gz -o helm.tar.gz + curl -L https://get.helm.sh/helm-v3.14.4-linux-amd64.tar.gz -o helm.tar.gz tar -xvzf helm.tar.gz sudo mv linux-amd64/helm /usr/local/bin/ sudo chmod +x /usr/local/bin/helm helm version - name: Update kube-config staging - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/k8s_deploy' + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/tiler_monitoring' run: aws eks --region us-east-1 update-kubeconfig --name osmseed-staging - name: Update kube-config prod if: github.ref == 'refs/heads/main' run: aws eks --region us-east-1 update-kubeconfig --name osmseed-production-v2 - name: Add Helm repository - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy' + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring' run: | helm repo add osm-seed https://osm-seed.github.io/osm-seed-chart/ helm repo update - name: Install helm dependencies for - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy' + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring' run: cd ohm && helm dep up # Staging - name: Staging - helm deploy - if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/k8s_deploy' - run: helm upgrade --install staging --wait --timeout 10m ohm/ -f values.staging.yaml -f ohm/values.yaml + if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/tiler_monitoring' + run: helm upgrade --install staging --wait ohm/ -f values.staging.yaml -f ohm/values.yaml # Production - name: Production - helm deploy if: github.ref == 'refs/heads/main' - run: helm upgrade --install production --wait --timeout 10m ohm/ -f values.production.yaml -f ohm/values.yaml + run: helm upgrade --install production --wait ohm/ -f values.production.yaml -f ohm/values.yaml \ No newline at end of file diff --git a/.gitignore b/.gitignore index 5ab9a7d6..69b81d66 100644 --- a/.gitignore +++ b/.gitignore @@ -33,8 +33,8 @@ hetzner/*/.envs.*.production hetzner/traefik/cloudflare-ips.txt hetzner/traefik/traefik.yml .vscode/ +.claude/ imposm3.json - cachedir_reimport/ config_reimport.json imposm3_reimport.json \ No newline at end of file diff --git a/compose/tiler.yml b/compose/tiler.yml index 10ac3158..ca194dda 100644 --- a/compose/tiler.yml +++ b/compose/tiler.yml @@ -71,21 +71,24 @@ services: # - ohm_network - # tiler-monitor: - # image: rub21/tiler-monitor:v1 - # build: - # context: ../images/tiler-monitor - # dockerfile: Dockerfile - # volumes: - # - /var/run/docker.sock:/var/run/docker.sock - # - ../images/tiler-monitor:/app - # - ../hetzner:/app/hetzner - # environment: - # - DOCKER_CONFIG_ENVIRONMENT=staging - # env_file: - # - ../envs/.env.tiler - # stdin_open: true - # tty: true + tiler-monitor: + image: rub21/tiler-monitor:v2 + build: + context: ../images/tiler-monitor + dockerfile: Dockerfile + volumes: + - /var/run/docker.sock:/var/run/docker.sock + - ../hetzner:/app/hetzner + - tiler_monitor_data:/data + ports: + - "8001:8001" + environment: + - TILER_MONITORING_DOCKER_CONFIG_ENVIRONMENT=staging + env_file: + - ../envs/.env.tiler + restart: always + networks: + - ohm_network networks: ohm_network: @@ -99,3 +102,7 @@ volumes: tiler_imposm_data: driver: local name: tiler_imposm + + tiler_monitor_data: + driver: local + name: tiler_monitor diff --git a/hetzner/tiler/tiler.production.yml b/hetzner/tiler/tiler.production.yml index ec45173d..6a719941 100644 --- a/hetzner/tiler/tiler.production.yml +++ b/hetzner/tiler/tiler.production.yml @@ -120,11 +120,12 @@ services: tiler_monitor: container_name: tiler_monitor - image: ghcr.io/openhistoricalmap/tiler-monitor:0.0.1-0.dev.git.2874.ha9bff68 + image: ghcr.io/openhistoricalmap/tiler-monitor:0.0.1-0.dev.git.3353.h63df1944 volumes: - /var/run/docker.sock:/var/run/docker.sock - - ../../images/tiler-monitor:/app + # - ../../images/tiler-monitor:/app - ../../hetzner:/app/hetzner + - tiler_monitor_data:/data environment: - DOCKER_CONFIG_ENVIRONMENT=production stdin_open: true @@ -141,6 +142,9 @@ volumes: tiler_imposm_data: driver: local name: tiler_imposm_17_03 + tiler_monitor_data: + driver: local + name: tiler_monitor_data networks: ohm_network: diff --git a/hetzner/traefik/traefik.template.yml b/hetzner/traefik/traefik.template.yml index b2f64c91..b10905f5 100644 --- a/hetzner/traefik/traefik.template.yml +++ b/hetzner/traefik/traefik.template.yml @@ -161,6 +161,14 @@ http: middlewares: - secure-headers + tiler-monitoring-router: + rule: Host(`tiler-monitoring.{{OHM_DOMAIN}}`) + entryPoints: + - port-web + service: tiler_monitor + middlewares: + - secure-headers + services: tiler_server: loadBalancer: @@ -207,6 +215,11 @@ http: servers: - url: http://cadvisor:8080 + tiler_monitor: + loadBalancer: + servers: + - url: http://tiler-monitor:8001 + providers: file: filename: /etc/traefik/traefik.yml diff --git a/images/tiler-monitor/Dockerfile b/images/tiler-monitor/Dockerfile index 0aa466bd..ca68d327 100644 --- a/images/tiler-monitor/Dockerfile +++ b/images/tiler-monitor/Dockerfile @@ -1,14 +1,42 @@ -FROM docker:cli +FROM python:3.12-slim -RUN apk add --no-cache \ - bash \ - curl \ - postgresql-client \ - docker-cli-compose +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + curl \ + bash \ + postgresql-client && \ + rm -rf /var/lib/apt/lists/* + +# Install Docker CLI from official static binaries +RUN ARCH=$(dpkg --print-architecture) && \ + if [ "$ARCH" = "amd64" ]; then DOCKER_ARCH="x86_64"; else DOCKER_ARCH="aarch64"; fi && \ + curl -fsSL "https://download.docker.com/linux/static/stable/${DOCKER_ARCH}/docker-27.5.1.tgz" | \ + tar xz --strip-components=1 -C /usr/local/bin docker/docker + +# Install Docker Compose plugin +RUN ARCH=$(dpkg --print-architecture) && \ + if [ "$ARCH" = "amd64" ]; then COMPOSE_ARCH="x86_64"; else COMPOSE_ARCH="aarch64"; fi && \ + mkdir -p /usr/local/lib/docker/cli-plugins && \ + curl -fsSL "https://github.com/docker/compose/releases/download/v2.32.4/docker-compose-linux-${COMPOSE_ARCH}" \ + -o /usr/local/lib/docker/cli-plugins/docker-compose && \ + chmod +x /usr/local/lib/docker/cli-plugins/docker-compose WORKDIR /app -COPY monitor_languages.sh . -RUN chmod +x monitor_languages.sh +# Install Python dependencies for pipeline monitor +COPY pipeline-monitor/requirements.txt /app/pipeline-monitor/requirements.txt +RUN pip install --no-cache-dir -r /app/pipeline-monitor/requirements.txt + +# Copy application code +COPY language-monitor/ /app/language-monitor/ +COPY pipeline-monitor/ /app/pipeline-monitor/ +COPY entrypoint.sh /app/entrypoint.sh + +RUN chmod +x /app/entrypoint.sh /app/language-monitor/monitor_languages.sh + +EXPOSE 8001 + +HEALTHCHECK --interval=30s --timeout=10s --retries=3 --start-period=30s \ + CMD curl -f http://localhost:8001/health || exit 1 -CMD ["bash", "monitor_languages.sh"] +CMD ["/app/entrypoint.sh"] diff --git a/images/tiler-monitor/entrypoint.sh b/images/tiler-monitor/entrypoint.sh new file mode 100755 index 00000000..f1520d1c --- /dev/null +++ b/images/tiler-monitor/entrypoint.sh @@ -0,0 +1,13 @@ +#!/bin/bash +set -e + +echo "$(date +'%Y-%m-%d %H:%M:%S') - Starting tiler-monitor (combined)" + +# Start language monitor in background +echo "$(date +'%Y-%m-%d %H:%M:%S') - Starting language monitor in background..." +bash /app/language-monitor/monitor_languages.sh & + +# Start pipeline monitor in foreground +echo "$(date +'%Y-%m-%d %H:%M:%S') - Starting pipeline monitor (FastAPI on port 8001)..." +cd /app/pipeline-monitor +exec python monitor.py diff --git a/images/tiler-monitor/monitor_languages.sh b/images/tiler-monitor/language-monitor/monitor_languages.sh similarity index 90% rename from images/tiler-monitor/monitor_languages.sh rename to images/tiler-monitor/language-monitor/monitor_languages.sh index 84ae9913..978df24a 100755 --- a/images/tiler-monitor/monitor_languages.sh +++ b/images/tiler-monitor/language-monitor/monitor_languages.sh @@ -11,9 +11,10 @@ log_message() { PG_CONNECTION="postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB" -NIM_NUMBER_LANGUAGES="${NIM_NUMBER_LANGUAGES:-5}" # Default to 5 languages -FORCE_LANGUAGES_GENERATION="${FORCE_LANGUAGES_GENERATION:-false}" -EVALUATION_INTERVAL="${EVALUATION_INTERVAL:-3600}" # Default to 1 hour +NIM_NUMBER_LANGUAGES="${TILER_MONITORING_NIM_NUMBER_LANGUAGES:-5}" # Default to 5 languages +FORCE_LANGUAGES_GENERATION="${TILER_MONITORING_FORCE_LANGUAGES_GENERATION:-false}" +EVALUATION_INTERVAL="${TILER_MONITORING_EVALUATION_INTERVAL:-3600}" # Default to 1 hour +DOCKER_CONFIG_ENVIRONMENT="${TILER_MONITORING_DOCKER_CONFIG_ENVIRONMENT:-staging}" log_message "Configuration Summary:" log_message " Environment: $DOCKER_CONFIG_ENVIRONMENT" diff --git a/images/tiler-monitor/pipeline-monitor/checks/__init__.py b/images/tiler-monitor/pipeline-monitor/checks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/images/tiler-monitor/pipeline-monitor/checks/imposm_import.py b/images/tiler-monitor/pipeline-monitor/checks/imposm_import.py new file mode 100644 index 00000000..3dae996a --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/checks/imposm_import.py @@ -0,0 +1,1356 @@ +"""Pipeline check: changeset-centric verification. + +For each changeset in the 1-2 hour window: + 1. Check if minute replication covers it (replication timestamp >= closed_at) + 2. Check if its way/relation elements exist in the tiler DB with the correct version + 3. For a random sample: verify materialized views + S3 tile cache +""" + +import json +import logging +import os +import random +import xml.etree.ElementTree as ET +from datetime import datetime, timezone, timedelta + +import psycopg2 +import requests + +from config import Config +import retry_store + +logger = logging.getLogger(__name__) + +# Load table/view mapping from JSON config +_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tables_config.json") +with open(_config_path) as f: + _tables_config = json.load(f) + +OHM_BASE = None # lazily computed + + +def _ohm_base(): + global OHM_BASE + if OHM_BASE is None: + OHM_BASE = Config.OHM_API_BASE.replace("/api/0.6", "") + return OHM_BASE + + +def _parse_timestamp(ts_str): + """Parse an ISO timestamp string to a timezone-aware datetime.""" + ts_str = ts_str.replace("Z", "+00:00") + return datetime.fromisoformat(ts_str) + + +def _relative_age(ts_str): + """Return a human-readable relative age string like '4h ago' or '25m ago'.""" + try: + dt = _parse_timestamp(ts_str) + delta = datetime.now(timezone.utc) - dt + total_seconds = int(delta.total_seconds()) + if total_seconds < 60: + return f"{total_seconds}s ago" + minutes = total_seconds // 60 + if minutes < 60: + return f"{minutes}m ago" + hours = minutes // 60 + remaining_min = minutes % 60 + if hours < 24: + return f"{hours}h{remaining_min}m ago" if remaining_min else f"{hours}h ago" + days = hours // 24 + remaining_hours = hours % 24 + return f"{days}d{remaining_hours}h ago" if remaining_hours else f"{days}d ago" + except Exception: + return "" + + +# --------------------------------------------------------------------------- +# Step 0: get changesets in the age window +# --------------------------------------------------------------------------- + +def _get_changesets_in_window(min_age, max_age, limit=10): + """Fetch closed changesets whose age is between min_age and max_age seconds. + + Fetches recent changesets and filters locally by age window. + """ + now = datetime.now(timezone.utc) + min_closed = now - timedelta(seconds=max_age) # oldest allowed + max_closed = now - timedelta(seconds=min_age) # newest allowed + + # Fetch enough to find some in the window + fetch_limit = 100 + url = f"{Config.OHM_API_BASE}/changesets" + params = {"limit": fetch_limit, "closed": "true"} + headers = {"User-Agent": "ohm-pipeline-monitor/1.0"} + + print(f"[pipeline] Fetching changesets: {url}?limit={fetch_limit}&closed=true") + print(f" Looking for changesets closed between " + f"{min_closed.strftime('%Y-%m-%dT%H:%M:%SZ')} and " + f"{max_closed.strftime('%Y-%m-%dT%H:%M:%SZ')} " + f"(age {min_age//60}-{max_age//60} min)") + + resp = requests.get(url, params=params, headers=headers, timeout=30) + resp.raise_for_status() + + root = ET.fromstring(resp.content) + changesets = [] + skipped_young = 0 + skipped_old = 0 + + for cs in root.findall("changeset"): + cs_id = int(cs.attrib["id"]) + closed_at = cs.attrib.get("closed_at", "") + if not closed_at: + continue + try: + closed_dt = _parse_timestamp(closed_at) + except (ValueError, TypeError): + continue + + age_minutes = (now - closed_dt).total_seconds() / 60 + + if closed_dt > max_closed: + skipped_young += 1 + continue + elif closed_dt < min_closed: + skipped_old += 1 + # Changesets are ordered by newest first, so once we hit old ones, stop + break + else: + changesets.append({ + "id": cs_id, + "created_at": cs.attrib.get("created_at", ""), + "closed_at": closed_at, + "closed_dt": closed_dt, + "age_minutes": round(age_minutes, 1), + }) + + if len(changesets) >= limit: + break + + print(f" Fetched {len(root.findall('changeset'))} changesets from API") + print(f" Skipped: {skipped_young} too young (<{min_age//60}min), " + f"{skipped_old} too old (>{max_age//60}min)") + print(f" Found {len(changesets)} changesets in window:") + for cs in changesets: + print(f" changeset {cs['id']}: closed_at={cs['closed_at']} " + f"(age={cs['age_minutes']}min)") + + return changesets + + +# --------------------------------------------------------------------------- +# Step 1: replication check +# --------------------------------------------------------------------------- + +def _parse_replication_state(text): + """Parse state.txt and return (sequence, timestamp).""" + data = {} + for line in text.strip().splitlines(): + if "=" in line: + key, _, value = line.partition("=") + data[key.strip()] = value.strip() + seq = int(data.get("sequenceNumber", 0)) + ts_raw = data.get("timestamp", "").replace("\\:", ":") + try: + ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")) + except ValueError: + ts = None + return seq, ts + + +def _check_replication_covers(changeset, repl_seq, repl_ts): + """Check if the replication state covers this changeset.""" + if repl_ts is None: + return { + "status": "warning", + "message": "Cannot parse replication timestamp", + } + + closed_dt = changeset["closed_dt"] + if repl_ts >= closed_dt: + return { + "status": "ok", + "message": (f"Replication covers this changeset " + f"(repl_ts={repl_ts.isoformat()} >= closed_at={changeset['closed_at']})"), + "replication_sequence": repl_seq, + "replication_timestamp": repl_ts.isoformat(), + } + else: + lag = (closed_dt - repl_ts).total_seconds() + return { + "status": "critical", + "message": (f"Replication does NOT cover this changeset. " + f"Replication is {round(lag/60, 1)}min behind " + f"(repl_ts={repl_ts.isoformat()} < closed_at={changeset['closed_at']})"), + "replication_sequence": repl_seq, + "replication_timestamp": repl_ts.isoformat(), + } + + +# --------------------------------------------------------------------------- +# Step 2: tiler DB check +# --------------------------------------------------------------------------- + +def _get_changeset_elements(changeset_id): + """Download changeset diff and extract way/relation elements with versions.""" + url = f"{Config.OHM_API_BASE}/changeset/{changeset_id}/download" + headers = {"User-Agent": "ohm-pipeline-monitor/1.0"} + resp = requests.get(url, headers=headers, timeout=30) + resp.raise_for_status() + + root = ET.fromstring(resp.content) + elements = [] + + for action in root: # create, modify, delete + action_type = action.tag + for elem in action: + osm_id = elem.attrib.get("id") + version = elem.attrib.get("version") + elem_type = elem.tag + if osm_id and elem_type in ("way", "relation"): + # Extract tags to determine which imposm table this element belongs to + tags = {} + for tag in elem.findall("tag"): + k = tag.attrib.get("k") + v = tag.attrib.get("v") + if k and v: + tags[k] = v + timestamp = elem.attrib.get("timestamp", "") + # Count nodes for ways (to detect invalid geometries) + node_count = len(elem.findall("nd")) if elem_type == "way" else 0 + elements.append({ + "type": elem_type, + "osm_id": int(osm_id), + "version": int(version) if version else None, + "action": action_type, + "tags": tags, + "timestamp": timestamp, + "node_count": node_count, + }) + return elements + + + +# Loaded from tables_config.json +TAG_TO_CHECK = _tables_config["tag_to_check"] + +# Reject rules: tag key -> list of values that imposm rejects (not imported) +_REJECT_VALUES = _tables_config.get("reject_values", {}) + +# Relation types that imposm actually imports (multipolygon, boundary, route, street) +# Relations with other type= values (e.g. site, associatedStreet) are ignored by imposm +_IMPORTABLE_RELATION_TYPES = set(_tables_config.get("importable_relation_types", [])) + +# Split config keys into simple tags ("highway") and key=value tags ("type=street") +_SIMPLE_TAGS = {} +_KV_TAGS = {} +for key, val in TAG_TO_CHECK.items(): + if "=" in key: + _KV_TAGS[key] = val + else: + _SIMPLE_TAGS[key] = val + + +def _has_compatible_tables(entry, elem_type): + """Return True if the entry has at least one table compatible with the element type. + + - Nodes can only be stored in tables ending with '_points'. + - Ways and relations need tables ending with '_lines', '_areas', + '_multilines', '_relation_members', or other non-point suffixes. + """ + if elem_type == "node": + return any(t.endswith("_points") for t in entry.get("tables", [])) + # way or relation: needs at least one non-points table + return any(not t.endswith("_points") for t in entry.get("tables", [])) + + +def _matching_entries(elem): + """Return matching tag_to_check entries for this element's tags. + + Skips tags whose value is rejected by imposm (e.g. natural=coastline). + Skips entries whose tables are incompatible with the element type + (e.g. a way matching 'shop' which only has point tables). + Other mappable tags on the same element are still matched. + """ + tags = elem.get("tags", {}) + elem_type = elem.get("type", "") + entries = [] + # Simple tags: match if tag key exists (e.g. "highway"), + # but skip if the value is in the reject list for that key + for tag_key in tags: + if tag_key in _SIMPLE_TAGS: + rejected = _REJECT_VALUES.get(tag_key, []) + if tags[tag_key] in rejected: + continue + entry = _SIMPLE_TAGS[tag_key] + if not _has_compatible_tables(entry, elem_type): + continue + entries.append(entry) + # Key=value tags: match if tag key AND value match (e.g. "type=street") + for kv, entry in _KV_TAGS.items(): + k, v = kv.split("=", 1) + if tags.get(k) == v: + if not _has_compatible_tables(entry, elem_type): + continue + entries.append(entry) + return entries + + +def _has_mappable_tags(elem): + """Return True if the element has at least one tag that imposm imports.""" + return len(_matching_entries(elem)) > 0 + + +def _has_invalid_geometry(elem): + """Return True if the element has a geometry that imposm will reject. + + Cases detected: + - Ways with area=yes but fewer than 4 nodes (need 3 unique + closing node + to form a valid polygon). + - Ways (non-area) with fewer than 2 nodes. + """ + if elem.get("type") != "way": + return False + node_count = elem.get("node_count", 0) + if node_count == 0: + return False # no node info available, don't skip + tags = elem.get("tags", {}) + if tags.get("area") == "yes": + # A polygon needs at least 4 nodes (3 unique points + closing node) + return node_count < 4 + # A line needs at least 2 nodes + return node_count < 2 + + +def _is_non_importable_relation(elem): + """Return True if the element is a relation with a type that imposm does not import. + + Imposm only processes certain relation types (multipolygon, boundary, route, street). + Relations with other type= values (e.g. site, associatedStreet) are ignored. + """ + if elem.get("type") != "relation": + return False + tags = elem.get("tags", {}) + rel_type = tags.get("type", "") + if not rel_type: + return False # no type tag, don't skip (could be imported by other means) + return rel_type not in _IMPORTABLE_RELATION_TYPES + + +def _get_candidate_tables(elem): + """Return the specific tables where this element should exist based on its tags.""" + tables = set() + for entry in _matching_entries(elem): + tables.update(entry["tables"]) + return list(tables) + + +def _get_candidate_views(elem): + """Return the specific views where this element should exist based on its tags. + + Returns a list of (view_name, column, id_mode) tuples. + id_mode is 'members' for views that store member way IDs (positive), + or 'standard' for views that store osm_id (negative for relations). + """ + views = {} + for entry in _matching_entries(elem): + col = entry.get("view_column", "osm_id") + id_mode = entry.get("view_id_mode", "standard") + for v in entry["views"]: + views[v] = (col, id_mode) + return [(v, col, mode) for v, (col, mode) in views.items()] + + +def _build_union_query(tables, search_id): + """Build a UNION ALL query to search osm_id across multiple tables in 1 round-trip.""" + parts = [] + for table in tables: + parts.append( + f"(SELECT '{table}' AS tbl " + f"FROM {table} WHERE osm_id = {int(search_id)} LIMIT 1)" + ) + return " UNION ALL ".join(parts) + + +def _check_element_in_tables(conn, elem): + """Check if an element exists in tiler DB tables using a single UNION ALL query.""" + osm_id = elem["osm_id"] + search_id = -osm_id if elem["type"] == "relation" else osm_id + candidate_tables = _get_candidate_tables(elem) + + cur = conn.cursor() + + # Get existing tables (cached per connection would be ideal, but simple first) + cur.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_name LIKE 'osm_%%' + """) + existing_tables = {row[0] for row in cur.fetchall()} + + if candidate_tables: + # Normal path: filter to candidate tables that exist + tables = [t for t in candidate_tables if t in existing_tables] + else: + # Retry path: no tags available, search ALL osm_* tables + tables = sorted(existing_tables) + + if not tables: + cur.close() + return { + "type": elem["type"], + "osm_id": osm_id, + "action": elem["action"], + "found_in_tables": [], + "found_in_views": [], + "url": f"{_ohm_base()}/{elem['type']}/{elem['osm_id']}", + } + + # Single UNION ALL query across all candidate tables + query = _build_union_query(tables, search_id) + found_in_tables = [] + + try: + cur.execute(query) + for row in cur.fetchall(): + found_in_tables.append(row[0]) + except Exception: + conn.rollback() + + cur.close() + + return { + "type": elem["type"], + "osm_id": osm_id, + "action": elem["action"], + "found_in_tables": found_in_tables, + "found_in_views": [], + "url": f"{_ohm_base()}/{elem['type']}/{elem['osm_id']}", + } + + +def _check_element_in_views(conn, elem, check): + """Check if an element exists in materialized views using a single UNION ALL query.""" + osm_id = check["osm_id"] + is_relation = check["type"] == "relation" + + candidate_views = _get_candidate_views(elem) + + if not candidate_views: + return check + + cur = conn.cursor() + + # Filter to existing views + cur.execute(""" + SELECT matviewname FROM pg_matviews + WHERE schemaname = 'public' AND matviewname LIKE 'mv_%%' + """) + existing_views = {row[0] for row in cur.fetchall()} + + view_info = [(v, col, mode) for v, col, mode in candidate_views if v in existing_views] + missing_views = [v for v, _, _ in candidate_views if v not in existing_views] + if missing_views: + logger.debug(f"Views not found in DB for {check['type']}/{osm_id}: {missing_views}") + + if not view_info: + cur.close() + return check + + # For 'members' mode views (routes): the view stores member way IDs, + # so we need to find which way IDs belong to this relation. + # For 'standard' mode: use osm_id (negative for relations). + member_way_ids = None + + # Build UNION ALL query, grouping by search strategy + parts = [] + for view, col, id_mode in sorted(view_info): + if id_mode == "members" and is_relation: + # Fetch member way IDs from the route table if not already done + if member_way_ids is None: + member_way_ids = _get_relation_member_ids(conn, osm_id) + if member_way_ids: + ids_list = ", ".join(str(mid) for mid in member_way_ids) + parts.append( + f"(SELECT '{view}' AS vw FROM {view} " + f"WHERE {col} IN ({ids_list}) LIMIT 1)" + ) + else: + search_id = -osm_id if is_relation else osm_id + parts.append( + f"(SELECT '{view}' AS vw FROM {view} " + f"WHERE {col} = {int(search_id)} LIMIT 1)" + ) + + found_in_views = [] + if parts: + query = " UNION ALL ".join(parts) + try: + cur.execute(query) + for row in cur.fetchall(): + found_in_views.append(row[0]) + except Exception as e: + logger.warning(f"View query failed for {check['type']}/{osm_id} in " + f"{[v for v,_,_ in view_info]}: {e}") + conn.rollback() + + cur.close() + check["found_in_views"] = found_in_views + return check + + +def _get_relation_member_ids(conn, relation_osm_id): + """Get member way IDs for a relation from osm_route_multilines.""" + cur = conn.cursor() + try: + cur.execute(""" + SELECT DISTINCT member + FROM osm_route_multilines + WHERE osm_id = %s + """, (-relation_osm_id,)) + ids = [row[0] for row in cur.fetchall()] + return ids + except Exception as e: + logger.debug(f"Could not get members for relation {relation_osm_id}: {e}") + conn.rollback() + return [] + finally: + cur.close() + + +def _is_element_deleted(elem): + """Check if an element has been deleted in OHM (visible=false or 410 Gone).""" + url = f"{Config.OHM_API_BASE}/{elem['type']}/{elem['osm_id']}" + headers = {"User-Agent": "ohm-pipeline-monitor/1.0"} + try: + resp = requests.get(url, headers=headers, timeout=15) + if resp.status_code == 410: + return True + if resp.status_code == 200: + root = ET.fromstring(resp.content) + el = root.find(elem["type"]) + if el is not None and el.attrib.get("visible") == "false": + return True + return False + except Exception: + return False + + +def _get_current_element_tags(element_type, osm_id): + """Fetch the latest version of an element from OHM API and return its tags. + + Returns None if the element is deleted/gone or the request fails. + Returns a dict of tags if the element exists. + """ + url = f"{Config.OHM_API_BASE}/{element_type}/{osm_id}" + headers = {"User-Agent": "ohm-pipeline-monitor/1.0"} + try: + resp = requests.get(url, headers=headers, timeout=15) + if resp.status_code == 410: + return None + if resp.status_code == 200: + root = ET.fromstring(resp.content) + el = root.find(element_type) + if el is not None: + if el.attrib.get("visible") == "false": + return None + tags = {} + for tag in el.findall("tag"): + k = tag.attrib.get("k") + v = tag.attrib.get("v") + if k and v: + tags[k] = v + return tags + return None + except Exception: + return None + + +def _get_current_element_info(element_type, osm_id): + """Fetch the latest version of an element from OHM API. + + Returns None if the element is deleted/gone or the request fails. + Returns a dict with 'tags' and 'node_count' (for ways) if the element exists. + """ + url = f"{Config.OHM_API_BASE}/{element_type}/{osm_id}" + headers = {"User-Agent": "ohm-pipeline-monitor/1.0"} + try: + resp = requests.get(url, headers=headers, timeout=15) + if resp.status_code == 410: + return None + if resp.status_code == 200: + root = ET.fromstring(resp.content) + el = root.find(element_type) + if el is not None: + if el.attrib.get("visible") == "false": + return None + tags = {} + for tag in el.findall("tag"): + k = tag.attrib.get("k") + v = tag.attrib.get("v") + if k and v: + tags[k] = v + node_count = len(el.findall("nd")) if element_type == "way" else 0 + return {"tags": tags, "node_count": node_count} + return None + except Exception: + return None + + +def _check_elements_in_db(conn, changeset_id, changeset_closed_at=None, already_checked=None): + """Check all elements of a changeset in the tiler DB. + + - ALL elements: verified in osm_* tables (fast, tag-filtered) + - SAMPLE elements: full check → tables + views + S3 tile cache + + If *already_checked* is a set of (type, osm_id) tuples, elements that + were already verified in a newer changeset are skipped to avoid + duplicate reporting. + """ + from checks.tile_cache import check_tile_cache_for_element + + try: + elements = _get_changeset_elements(changeset_id) + except requests.RequestException as e: + return { + "status": "critical", + "message": f"Failed to download changeset diff: {e}", + "elements": [], + } + + if not elements: + return { + "status": "ok", + "message": "No way/relation elements in this changeset", + "elements": [], + } + + # Filter elements: skip those without mappable tags or with invalid geometry + checkable_elements = [] + for elem in elements: + if not _has_mappable_tags(elem): + continue + if _has_invalid_geometry(elem): + tags = elem.get("tags", {}) + print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"-> invalid geometry (area={tags.get('area', 'no')}, " + f"nodes={elem.get('node_count', '?')}), imposm will reject") + continue + if _is_non_importable_relation(elem): + rel_type = elem.get("tags", {}).get("type", "") + print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"-> relation type={rel_type} is not imported by imposm") + continue + checkable_elements.append(elem) + + if not checkable_elements: + return { + "status": "ok", + "message": "No importable elements in this changeset", + "elements": [], + } + + # Deduplicate: skip elements already checked in a newer changeset + if already_checked is not None: + deduped = [] + for elem in checkable_elements: + key = (elem["type"], elem["osm_id"]) + if key in already_checked: + print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"-> already checked in a newer changeset") + continue + deduped.append(elem) + checkable_elements = deduped + + if not checkable_elements: + return { + "status": "ok", + "message": "All elements already checked in newer changesets", + "elements": [], + } + + # Select random sample for full pipeline check (tables + views + S3) + # Only sample from create/modify elements + import math + create_modify = [e for e in checkable_elements if e["action"] != "delete"] + sample_size = max(1, math.ceil(len(create_modify) * Config.FULL_CHECK_SAMPLE_PCT / 100)) + sample_size = min(sample_size, len(create_modify)) + sample_ids = set() + if create_modify: + sample_ids = {e["osm_id"] for e in random.sample(create_modify, sample_size)} + + print(f" [tiler_db] Checking {len(checkable_elements)} elements " + f"(full pipeline check on {sample_size}/{len(create_modify)} = {Config.FULL_CHECK_SAMPLE_PCT}% sampled)") + + missing = [] + not_deleted = [] + checked = [] + tile_cache_results = [] + + for elem in checkable_elements: + is_sample = elem["osm_id"] in sample_ids + sample_label = " [SAMPLE]" if is_sample else "" + ts_info = f" created={elem['timestamp']} ({_relative_age(elem['timestamp'])})" if elem.get("timestamp") else "" + + # Step 1: Check tables + check = _check_element_in_tables(conn, elem) + tables = check["found_in_tables"] + + if elem["action"] == "delete": + # DELETE: element should NOT be in the DB + if tables: + print(f" NOT_DELETED{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"(delete){ts_info} -> still in tables: {tables}") + print(f" {check['url']}") + not_deleted.append(f"{elem['type']}/{elem['osm_id']}") + else: + print(f" OK{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"(delete){ts_info} -> correctly removed") + checked.append(check) + continue + + # CREATE / MODIFY: element should be in the DB + # Step 2: Check views + if tables: + check = _check_element_in_views(conn, elem, check) + + checked.append(check) + views = check["found_in_views"] + + if tables: + print(f" OK{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"({elem['action']}){ts_info}") + print(f" tables: {tables}") + print(f" views: {views}") + print(f" {check['url']}") + + # Step 3: Check S3 tile cache (SAMPLE only) + if is_sample and changeset_closed_at and Config.S3_BUCKET_CACHE_TILER: + try: + tile_result = check_tile_cache_for_element( + conn, check, changeset_closed_at + ) + tile_cache_results.append(tile_result) + cache_status = tile_result.get("cache", {}).get("status", "unknown") + tile_info = tile_result.get("tile", {}) + if cache_status == "stale": + print(f" [S3 CACHE] STALE tile z{tile_info.get('z')}/{tile_info.get('x')}/{tile_info.get('y')}") + elif cache_status == "ok": + print(f" [S3 CACHE] OK tile z{tile_info.get('z')}/{tile_info.get('x')}/{tile_info.get('y')}") + elif cache_status == "skipped": + print(f" [S3 CACHE] skipped: {tile_result.get('cache', {}).get('message', '')}") + except Exception as e: + print(f" [S3 CACHE] error: {e}") + else: + # Not found — check if deleted in a later changeset + if _is_element_deleted(elem): + print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"({elem['action']}){ts_info} -> deleted in a later changeset") + print(f" {check['url']}") + check["deleted"] = True + continue + + print(f" MISSING{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} " + f"({elem['action']}){ts_info} -> NOT in tables, queued for retry") + print(f" {check['url']}") + retry_store.add_missing( + changeset_id, elem["type"], elem["osm_id"], Config.MAX_RETRIES, + version=elem.get("version", 0), action=elem.get("action", ""), + closed_at=changeset_closed_at or "", + ) + missing.append(f"{elem['type']}/{elem['osm_id']}") + + # Build status message + stale_tiles = [r for r in tile_cache_results if r.get("cache", {}).get("status") == "stale"] + + problems_parts = [] + # Missing elements are queued for retry, only warn about other issues + if missing: + problems_parts.append(f"Queued for retry: {', '.join(missing)}") + if not_deleted: + problems_parts.append(f"Not deleted from tiler DB: {', '.join(not_deleted)}") + + if not_deleted: + status = "warning" + msg = ". ".join(problems_parts) + elif missing: + status = "retry_pending" + msg = ". ".join(problems_parts) + elif stale_tiles: + status = "warning" + stale_ids = [f"{r['type']}/{r['osm_id']}" for r in stale_tiles] + msg = (f"All {len(checked)} elements in tables, " + f"but S3 tile cache stale for: {', '.join(stale_ids)}") + else: + status = "ok" + msg = f"All {len(checked)} elements verified in tiler DB" + if tile_cache_results: + msg += f" (S3 cache OK for {len(tile_cache_results)} sampled)" + + return { + "status": status, + "message": msg, + "elements": checked, + "tile_cache": tile_cache_results, + } + + +# --------------------------------------------------------------------------- +# Main pipeline check (scheduled) +# --------------------------------------------------------------------------- + +def check_pipeline(): + """Check the full pipeline for changesets in the 1-2 hour age window. + + For each changeset: + 1. Is it covered by minute replication? + 2. Are its elements in the tiler DB? + """ + now = datetime.now(timezone.utc) + min_age = Config.CHANGESET_MIN_AGE + max_age = Config.CHANGESET_MAX_AGE + + result = { + "name": "pipeline", + "status": "ok", + "message": "", + "details": { + "window": f"{min_age//60}-{max_age//60} minutes", + "replication": {}, + "changesets": [], + }, + "checked_at": now.isoformat(), + } + + # --- Fetch replication state --- + repl_seq, repl_ts = None, None + try: + resp = requests.get(Config.REPLICATION_STATE_URL, timeout=15) + resp.raise_for_status() + repl_seq, repl_ts = _parse_replication_state(resp.text) + result["details"]["replication"] = { + "status": "ok", + "sequence": repl_seq, + "timestamp": repl_ts.isoformat() if repl_ts else None, + } + if repl_ts: + lag_min = (now - repl_ts).total_seconds() / 60 + result["details"]["replication"]["lag_minutes"] = round(lag_min, 1) + print(f"\n[pipeline] Replication state: seq={repl_seq}, " + f"ts={repl_ts.isoformat()}, lag={lag_min:.1f}min") + except requests.RequestException as e: + result["details"]["replication"] = { + "status": "critical", + "message": f"Failed to fetch replication state: {e}", + } + print(f"\n[pipeline] WARNING: Cannot fetch replication state: {e}") + + # --- Get changesets in window --- + try: + changesets = _get_changesets_in_window( + min_age=min_age, + max_age=max_age, + limit=Config.CHANGESET_LIMIT, + ) + except requests.RequestException as e: + result["status"] = "critical" + result["message"] = f"Failed to fetch changesets from OHM API: {e}" + return result + + if not changesets: + result["message"] = ( + f"No changesets found in the {min_age//60}-{max_age//60} minute window" + ) + print(f"[pipeline] {result['message']}") + return result + + print(f"[pipeline] Found {len(changesets)} changesets in " + f"{min_age//60}-{max_age//60}min window") + + # --- Connect to tiler DB --- + conn = None + try: + conn = psycopg2.connect( + host=Config.POSTGRES_HOST, + port=Config.POSTGRES_PORT, + dbname=Config.POSTGRES_DB, + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + ) + except psycopg2.Error as e: + result["status"] = "critical" + result["message"] = f"Cannot connect to tiler DB: {e}" + print(f"[pipeline] ERROR: Cannot connect to tiler DB: {e}") + return result + + # --- Check each changeset through the pipeline --- + problems = [] + skipped = 0 + # Track already-checked elements to avoid duplicates across changesets. + # Changesets are ordered newest-first, so only the latest version is checked. + checked_elements = set() # (type, osm_id) + + for cs in changesets: + # Skip changesets already checked with status OK + if retry_store.is_changeset_passed(cs["id"]): + skipped += 1 + continue + + print(f"\n[pipeline] === Changeset {cs['id']} === " + f"(closed_at={cs['closed_at']}, age={cs['age_minutes']}min)") + print(f" URL: {_ohm_base()}/changeset/{cs['id']}") + + cs_result = { + "changeset_id": cs["id"], + "changeset_url": f"{_ohm_base()}/changeset/{cs['id']}", + "closed_at": cs["closed_at"], + "age_minutes": cs["age_minutes"], + "replication": {}, + "tiler_db": {}, + } + + # Step 1: replication + if repl_seq is not None: + repl_check = _check_replication_covers(cs, repl_seq, repl_ts) + cs_result["replication"] = repl_check + print(f" [replication] {repl_check['status'].upper()}: {repl_check['message']}") + + if repl_check["status"] != "ok": + problems.append( + f"Changeset {cs['id']}: replication not covering" + ) + else: + cs_result["replication"] = {"status": "unknown", "message": "Replication state unavailable"} + print(f" [replication] UNKNOWN: Replication state unavailable") + + # Step 2: tiler DB + db_check = _check_elements_in_db(conn, cs["id"], cs["closed_at"], already_checked=checked_elements) + cs_result["tiler_db"] = db_check + print(f" [tiler_db] {db_check['status'].upper()}: {db_check['message']}") + + # Track checked elements to skip in older changesets + for elem in db_check.get("elements", []): + checked_elements.add((elem["type"], elem["osm_id"])) + + if db_check["status"] not in ("ok", "retry_pending"): + problems.append(f"Changeset {cs['id']}: {db_check['message']}") + + result["details"]["changesets"].append(cs_result) + + # Log to history + elements = db_check.get("elements", []) + missing_count = len([e for e in elements if not e.get("found_in_tables")]) + retry_store.log_changeset_check( + changeset_id=cs["id"], + status=db_check["status"], + total_elements=len(elements), + missing_count=missing_count, + ok_count=len(elements) - missing_count, + message=db_check["message"], + created_at=cs.get("created_at", ""), + closed_at=cs.get("closed_at", ""), + elements=elements, + ) + + # --- Recheck pending, failed, and warning retries --- + retryable = retry_store.get_pending() + retry_store.get_failed() + retry_store.get_warnings() + newly_failed = [] + + if retryable: + print(f"\n[pipeline] Rechecking {len(retryable)} retries (pending + failed + warning)...") + + for entry in retryable: + cs_id = entry["changeset_id"] + etype = entry["element_type"] + oid = entry["osm_id"] + retry_num = entry["retry_count"] + 1 + prev_status = entry["status"] + + # Check if the latest version still has mappable tags / valid geometry + current_info = _get_current_element_info(etype, oid) + if current_info is None: + # Element was deleted — no longer needs to be in DB + print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) " + f"-> element deleted in latest version, no longer expected in DB") + retry_store.mark_resolved(cs_id, etype, oid) + continue + current_elem = {"type": etype, "tags": current_info["tags"], + "node_count": current_info["node_count"]} + if not _has_mappable_tags(current_elem): + # Latest version has no mappable tags — imposm won't import it + print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) " + f"-> latest version has no mappable tags, no longer expected in DB") + retry_store.mark_resolved(cs_id, etype, oid) + continue + if _has_invalid_geometry(current_elem): + # Invalid geometry — imposm will reject it + tags = current_info["tags"] + print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) " + f"-> invalid geometry (area={tags.get('area', 'no')}, " + f"nodes={current_info['node_count']}), imposm rejects this") + retry_store.mark_resolved(cs_id, etype, oid) + continue + if _is_non_importable_relation(current_elem): + rel_type = current_info["tags"].get("type", "") + print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) " + f"-> relation type={rel_type} is not imported by imposm") + retry_store.mark_resolved(cs_id, etype, oid) + continue + + # Check if the element is now in the DB + check = _check_element_in_tables(conn, {"type": etype, "osm_id": oid, "action": "modify"}) + if check["found_in_tables"]: + print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) " + f"-> found in tables after {retry_num} retries") + retry_store.mark_resolved(cs_id, etype, oid) + elif prev_status in ("failed", "warning"): + # Already exhausted retries, keep checking but don't increment + print(f" [retry] STILL MISSING {etype}/{oid} (changeset {cs_id}) " + f"-> {prev_status}, still monitoring") + else: + # Check missing percentage threshold to decide severity + cs_stats = retry_store.get_changeset_stats(cs_id) + if cs_stats and cs_stats["total_elements"] > 0: + missing_pct = (cs_stats["missing_count"] / cs_stats["total_elements"]) * 100 + else: + missing_pct = 100 # no stats available, assume worst case + + above_threshold = missing_pct >= Config.MISSING_THRESHOLD_PCT + final_status = "failed" if above_threshold else "warning" + new_status = retry_store.increment_retry(cs_id, etype, oid, final_status=final_status) + + if new_status == "failed": + print(f" [retry] FAILED {etype}/{oid} (changeset {cs_id}) " + f"-> still missing after {retry_num}/{Config.MAX_RETRIES} retries " + f"({missing_pct:.1f}% missing >= {Config.MISSING_THRESHOLD_PCT}% threshold)") + newly_failed.append({ + "type": etype, "osm_id": oid, "changeset_id": cs_id, + }) + elif new_status == "warning": + print(f" [retry] WARNING {etype}/{oid} (changeset {cs_id}) " + f"-> still missing after {retry_num}/{Config.MAX_RETRIES} retries " + f"({missing_pct:.1f}% missing < {Config.MISSING_THRESHOLD_PCT}% threshold, " + f"not alerting)") + else: + print(f" [retry] PENDING {etype}/{oid} (changeset {cs_id}) " + f"-> retry {retry_num}/{Config.MAX_RETRIES}") + + conn.close() + + # --- Overall status --- + retry_summary = retry_store.summary() + result["details"]["retries"] = retry_summary + + if newly_failed: + failed_summary = "; ".join( + f"{f['type']}/{f['osm_id']} (changeset {f['changeset_id']})" + for f in newly_failed + ) + problems.append(f"Failed after {Config.MAX_RETRIES} retries: {failed_summary}") + + has_cs_issues = any( + cs.get("replication", {}).get("status") == "critical" + or cs.get("tiler_db", {}).get("status") in ("warning", "critical") + for cs in result["details"]["changesets"] + ) + + # Include failed details for Slack alerting + failed_count = retry_summary.get("failed", 0) + warning_count = retry_summary.get("warning", 0) + result["details"]["newly_failed"] = newly_failed + result["details"]["total_failed"] = failed_count + result["details"]["total_warnings"] = warning_count + + if newly_failed: + result["status"] = "critical" + failed_labels = "; ".join( + f"{f['type']}/{f['osm_id']}" for f in newly_failed[:5] + ) + result["message"] = f"Elements missing after all retries: {failed_labels}" + elif failed_count > 0: + result["status"] = "critical" + result["message"] = f"{failed_count} elements still missing after all retries" + elif has_cs_issues: + result["status"] = "warning" + result["message"] = f"Issues found: {'; '.join(problems[:5])}" + else: + pending_count = retry_summary.get("pending", 0) + checked_count = len(changesets) - skipped + msg = ( + f"{checked_count} new changesets checked" + ) + if skipped: + msg += f", {skipped} already passed (skipped)" + if pending_count: + msg += f", {pending_count} elements pending retry" + if warning_count: + msg += f", {warning_count} elements below threshold (warning)" + result["message"] = msg + + if skipped: + print(f"[pipeline] Skipped {skipped} changesets already passed OK") + print(f"\n[pipeline] Result: {result['status'].upper()} — {result['message']}") + if retry_summary: + print(f"[pipeline] Retry store: {retry_summary}") + return result + + +# --------------------------------------------------------------------------- +# On-demand single changeset check +# --------------------------------------------------------------------------- + +def check_single_changeset(changeset_id): + """Evaluate a single changeset through the full pipeline (on-demand).""" + now = datetime.now(timezone.utc) + result = { + "name": "pipeline", + "changeset_id": changeset_id, + "changeset_url": f"{_ohm_base()}/changeset/{changeset_id}", + "status": "ok", + "message": "", + "details": {"replication": {}, "tiler_db": {}}, + "checked_at": now.isoformat(), + } + + # Get changeset info + try: + url = f"{Config.OHM_API_BASE}/changeset/{changeset_id}" + headers = {"User-Agent": "ohm-pipeline-monitor/1.0"} + resp = requests.get(url, headers=headers, timeout=30) + resp.raise_for_status() + root = ET.fromstring(resp.content) + cs_elem = root.find("changeset") + closed_at = cs_elem.attrib.get("closed_at", "") if cs_elem is not None else "" + except Exception: + closed_at = "" + + print(f"\n[pipeline] === Changeset {changeset_id} (on-demand) ===") + print(f" URL: {_ohm_base()}/changeset/{changeset_id}") + if closed_at: + print(f" closed_at: {closed_at}") + + # Step 1: replication + try: + resp = requests.get(Config.REPLICATION_STATE_URL, timeout=15) + resp.raise_for_status() + repl_seq, repl_ts = _parse_replication_state(resp.text) + + if closed_at and repl_ts: + closed_dt = _parse_timestamp(closed_at) + cs_data = {"closed_at": closed_at, "closed_dt": closed_dt} + repl_check = _check_replication_covers(cs_data, repl_seq, repl_ts) + else: + repl_check = { + "status": "ok" if repl_ts else "warning", + "message": f"Replication seq={repl_seq}, ts={repl_ts.isoformat() if repl_ts else 'unknown'}", + "replication_sequence": repl_seq, + "replication_timestamp": repl_ts.isoformat() if repl_ts else None, + } + + result["details"]["replication"] = repl_check + print(f" [replication] {repl_check['status'].upper()}: {repl_check['message']}") + except requests.RequestException as e: + result["details"]["replication"] = { + "status": "critical", + "message": f"Failed to fetch replication state: {e}", + } + print(f" [replication] CRITICAL: Cannot fetch replication state: {e}") + + # Step 2: tiler DB + try: + conn = psycopg2.connect( + host=Config.POSTGRES_HOST, + port=Config.POSTGRES_PORT, + dbname=Config.POSTGRES_DB, + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + ) + except psycopg2.Error as e: + result["status"] = "critical" + result["message"] = f"Cannot connect to tiler DB: {e}" + result["details"]["tiler_db"] = {"status": "critical", "message": str(e)} + return result + + db_check = _check_elements_in_db(conn, changeset_id, closed_at or None) + conn.close() + result["details"]["tiler_db"] = db_check + print(f" [tiler_db] {db_check['status'].upper()}: {db_check['message']}") + + # Overall + problems = [] + repl_status = result["details"]["replication"].get("status", "ok") + if repl_status == "critical": + problems.append("Replication not covering this changeset") + if db_check["status"] != "ok": + problems.append(db_check["message"]) + + if problems: + result["status"] = "warning" + result["message"] = "; ".join(problems) + else: + result["message"] = ( + f"Changeset {changeset_id} passed full pipeline check " + f"({len(db_check.get('elements', []))} elements verified)" + ) + + print(f" [result] {result['status'].upper()}: {result['message']}") + return result + + +def recheck_single_element(element_type, osm_id): + """Manually recheck a single element in the tiler DB. + + Returns detailed info about where it was found or why it's missing. + """ + try: + conn = psycopg2.connect( + host=Config.POSTGRES_HOST, + port=Config.POSTGRES_PORT, + dbname=Config.POSTGRES_DB, + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + ) + except psycopg2.Error as e: + return {"status": "error", "message": f"Cannot connect to tiler DB: {e}"} + + elem = {"type": element_type, "osm_id": osm_id, "action": "modify"} + search_id = -osm_id if element_type == "relation" else osm_id + + # Get all osm_* tables to report what was searched + cur = conn.cursor() + cur.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_schema = 'public' AND table_name LIKE 'osm_%%' + ORDER BY table_name + """) + all_tables = [row[0] for row in cur.fetchall()] + cur.close() + + check = _check_element_in_tables(conn, elem) + + # Also check views if found in tables + if check["found_in_tables"]: + check = _check_element_in_views(conn, elem, check) + + conn.close() + + found = bool(check["found_in_tables"]) + + # If found, resolve all pending retries for this element + if found: + pending = retry_store.get_pending() + retry_store.get_failed() + for entry in pending: + if entry["element_type"] == element_type and entry["osm_id"] == osm_id: + retry_store.mark_resolved(entry["changeset_id"], element_type, osm_id) + + # Build detailed result + if found: + message = f"Found in: {', '.join(check['found_in_tables'])}" + if check["found_in_views"]: + message += f" | Views: {', '.join(check['found_in_views'])}" + else: + message = ( + f"Not found in any of {len(all_tables)} osm_* tables " + f"(searched with osm_id={search_id})" + ) + + result = { + "status": "resolved" if found else "not_found", + "element_type": element_type, + "osm_id": osm_id, + "search_id": search_id, + "found_in_tables": check["found_in_tables"], + "found_in_views": check["found_in_views"], + "searched_tables": all_tables, + "message": message, + } + return result + + +def recheck_retries(): + """Manually recheck all pending and failed retries against the tiler DB. + + Returns a summary of resolved, still-missing, and newly-failed elements. + """ + retryable = retry_store.get_pending() + retry_store.get_failed() + if not retryable: + return {"resolved": [], "still_missing": [], "newly_failed": [], "message": "No retries to check"} + + try: + conn = psycopg2.connect( + host=Config.POSTGRES_HOST, + port=Config.POSTGRES_PORT, + dbname=Config.POSTGRES_DB, + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + ) + except psycopg2.Error as e: + return {"error": f"Cannot connect to tiler DB: {e}"} + + resolved = [] + still_missing = [] + newly_failed = [] + + for entry in retryable: + cs_id = entry["changeset_id"] + etype = entry["element_type"] + oid = entry["osm_id"] + retry_num = entry["retry_count"] + 1 + prev_status = entry["status"] + + # Check if the latest version still has mappable tags / valid geometry + current_info = _get_current_element_info(etype, oid) + if current_info is None: + retry_store.mark_resolved(cs_id, etype, oid) + resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id, + "reason": "element deleted"}) + continue + current_elem = {"type": etype, "tags": current_info["tags"], + "node_count": current_info["node_count"]} + if not _has_mappable_tags(current_elem): + retry_store.mark_resolved(cs_id, etype, oid) + resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id, + "reason": "no mappable tags (rejected by imposm)"}) + continue + if _has_invalid_geometry(current_elem): + retry_store.mark_resolved(cs_id, etype, oid) + resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id, + "reason": f"invalid geometry (area={current_info['tags'].get('area', 'no')}, nodes={current_info['node_count']})"}) + continue + if _is_non_importable_relation(current_elem): + rel_type = current_info["tags"].get("type", "") + retry_store.mark_resolved(cs_id, etype, oid) + resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id, + "reason": f"relation type={rel_type} not imported by imposm"}) + continue + + check = _check_element_in_tables(conn, {"type": etype, "osm_id": oid, "action": "modify"}) + if check["found_in_tables"]: + retry_store.mark_resolved(cs_id, etype, oid) + resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id, + "found_in_tables": check["found_in_tables"]}) + elif prev_status == "failed": + still_missing.append({"type": etype, "osm_id": oid, "changeset_id": cs_id}) + else: + new_status = retry_store.increment_retry(cs_id, etype, oid) + if new_status == "failed": + newly_failed.append({"type": etype, "osm_id": oid, "changeset_id": cs_id}) + else: + still_missing.append({"type": etype, "osm_id": oid, "changeset_id": cs_id}) + + conn.close() + + msg_parts = [] + if resolved: + msg_parts.append(f"{len(resolved)} resolved") + if still_missing: + msg_parts.append(f"{len(still_missing)} still missing") + if newly_failed: + msg_parts.append(f"{len(newly_failed)} newly failed") + + return { + "resolved": resolved, + "still_missing": still_missing, + "newly_failed": newly_failed, + "message": ", ".join(msg_parts) if msg_parts else "No retries to check", + } diff --git a/images/tiler-monitor/pipeline-monitor/checks/mv_freshness.py b/images/tiler-monitor/pipeline-monitor/checks/mv_freshness.py new file mode 100644 index 00000000..35d5d4d6 --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/checks/mv_freshness.py @@ -0,0 +1,179 @@ +"""Check 3: Materialized view freshness monitor. + +Queries pg_stat_user_tables to check when materialized views were last +auto-analyzed/auto-vacuumed (proxy for last refresh), and also checks +if the views exist and have rows. +""" + +from datetime import datetime, timezone + +import psycopg2 + +from config import Config + +# Key materialized views grouped by expected refresh interval. +# group_name -> (max_stale_seconds, [view_names]) +MV_GROUPS = { + "admin_boundaries_lines": ( + 300, # expect refresh every ~60s + buffer + [ + "mv_admin_boundaries_lines_z4_5", + "mv_admin_boundaries_lines_z6_7", + "mv_admin_boundaries_lines_z8_9", + "mv_admin_boundaries_lines_z10_11", + "mv_admin_boundaries_lines_z12_13", + "mv_admin_boundaries_lines_z14_15", + "mv_admin_boundaries_lines_z16_20", + ], + ), + "water": ( + 600, # expect refresh every ~180s + buffer + [ + "mv_water_lines_z10_11", + "mv_water_lines_z12_13", + "mv_water_lines_z14_15", + "mv_water_lines_z16_20", + "mv_water_areas_z6_7", + "mv_water_areas_z8_9", + "mv_water_areas_z10_11", + "mv_water_areas_z12_13", + "mv_water_areas_z14_15", + "mv_water_areas_z16_20", + ], + ), + "transport": ( + 600, + [ + "mv_transport_lines_z8_9", + "mv_transport_lines_z10_11", + "mv_transport_lines_z12_13", + "mv_transport_lines_z14_15", + "mv_transport_lines_z16_20", + ], + ), +} + + +def check_mv_freshness(): + """Check that key materialized views exist and are being refreshed.""" + result = { + "name": "mv_freshness", + "status": "ok", + "message": "", + "details": {"groups": {}}, + "checked_at": datetime.now(timezone.utc).isoformat(), + } + + try: + conn = psycopg2.connect( + host=Config.POSTGRES_HOST, + port=Config.POSTGRES_PORT, + dbname=Config.POSTGRES_DB, + user=Config.POSTGRES_USER, + password=Config.POSTGRES_PASSWORD, + ) + except psycopg2.Error as e: + result["status"] = "critical" + result["message"] = f"Cannot connect to tiler DB: {e}" + return result + + cur = conn.cursor() + + # Get list of existing materialized views + cur.execute("SELECT matviewname FROM pg_matviews WHERE schemaname = 'public'") + existing_mvs = {row[0] for row in cur.fetchall()} + + # Check row counts and last analyze times for MVs via pg_stat_user_tables. + # REFRESH MATERIALIZED VIEW triggers auto-analyze, so last_autoanalyze + # is a good proxy for "last refreshed". + cur.execute(""" + SELECT relname, n_live_tup, last_autoanalyze, last_analyze + FROM pg_stat_user_tables + WHERE schemaname = 'public' + AND relname LIKE 'mv_%%' + """) + mv_stats = {} + for row in cur.fetchall(): + name, n_rows, last_autoanalyze, last_analyze = row + # Use whichever is more recent + last_refreshed = max( + filter(None, [last_autoanalyze, last_analyze]), + default=None, + ) + mv_stats[name] = { + "n_rows": n_rows, + "last_refreshed": last_refreshed, + } + + cur.close() + conn.close() + + missing_views = [] + stale_views = [] + empty_views = [] + now = datetime.now(timezone.utc) + + for group_name, (max_stale, views) in MV_GROUPS.items(): + group_result = {"views": [], "status": "ok"} + + for view_name in views: + view_info = {"name": view_name, "status": "ok"} + + if view_name not in existing_mvs: + view_info["status"] = "critical" + view_info["message"] = "View does not exist" + missing_views.append(view_name) + elif view_name in mv_stats: + stats = mv_stats[view_name] + view_info["n_rows"] = stats["n_rows"] + + if stats["n_rows"] == 0: + view_info["status"] = "warning" + view_info["message"] = "View is empty (0 rows)" + empty_views.append(view_name) + + if stats["last_refreshed"]: + last_ref = stats["last_refreshed"] + if last_ref.tzinfo is None: + last_ref = last_ref.replace(tzinfo=timezone.utc) + age_seconds = (now - last_ref).total_seconds() + view_info["last_refreshed"] = last_ref.isoformat() + view_info["age_seconds"] = round(age_seconds) + + if age_seconds > max_stale: + view_info["status"] = "warning" + view_info["message"] = ( + f"Stale: last refreshed {round(age_seconds / 60, 1)} min ago " + f"(threshold: {max_stale // 60} min)" + ) + stale_views.append(view_name) + else: + view_info["last_refreshed"] = None + view_info["message"] = "No analyze timestamp available" + else: + view_info["message"] = "No stats available" + + group_result["views"].append(view_info) + + if any(v["status"] == "critical" for v in group_result["views"]): + group_result["status"] = "critical" + elif any(v["status"] == "warning" for v in group_result["views"]): + group_result["status"] = "warning" + + result["details"]["groups"][group_name] = group_result + + # Overall status + if missing_views: + result["status"] = "critical" + result["message"] = f"Missing views: {', '.join(missing_views[:5])}" + elif stale_views: + result["status"] = "warning" + result["message"] = f"Stale views: {', '.join(stale_views[:5])}" + elif empty_views: + result["status"] = "warning" + result["message"] = f"Empty views: {', '.join(empty_views[:5])}" + else: + total = sum(len(v) for _, v in MV_GROUPS.values()) + result["message"] = f"All {total} monitored materialized views are healthy" + + return result diff --git a/images/tiler-monitor/pipeline-monitor/checks/replication_lag.py b/images/tiler-monitor/pipeline-monitor/checks/replication_lag.py new file mode 100644 index 00000000..c15cb2ce --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/checks/replication_lag.py @@ -0,0 +1,89 @@ +"""Check 1: Minute replication lag monitor. + +Compares the latest replication sequence number available on S3 +against the last sequence number processed by imposm (from the tiler DB +or the replication state endpoint). +""" + +import time +from datetime import datetime, timezone + +import requests + +from config import Config + + +def _parse_state(text): + """Parse an imposm/osm replication state.txt and return sequence + timestamp.""" + data = {} + for line in text.strip().splitlines(): + if "=" in line: + key, _, value = line.partition("=") + data[key.strip()] = value.strip() + seq = int(data.get("sequenceNumber", 0)) + ts_raw = data.get("timestamp", "") + # Format: 2026-03-13T12\:05\:02Z (escaped colons in java properties) + ts_raw = ts_raw.replace("\\:", ":") + try: + ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00")) + except ValueError: + ts = None + return seq, ts + + +def check_replication_lag(): + """Return a dict with the replication lag check result.""" + result = { + "name": "replication_lag", + "status": "ok", + "message": "", + "details": {}, + "checked_at": datetime.now(timezone.utc).isoformat(), + } + + try: + # Get latest available replication state from S3 + resp = requests.get(Config.REPLICATION_STATE_URL, timeout=15) + resp.raise_for_status() + remote_seq, remote_ts = _parse_state(resp.text) + + result["details"]["remote_sequence"] = remote_seq + result["details"]["remote_timestamp"] = remote_ts.isoformat() if remote_ts else None + + # Get imposm's last processed state + # The imposm diff dir stores last.state.txt - we query it via the same + # base URL pattern but from the local imposm state endpoint. + # In Docker, we can check the DB for the latest sequence via the + # osm_replication_status table if available, or fall back to comparing + # timestamps of recent data. + # + # For now: compare remote timestamp against current time. + # If remote_ts is stale, replication source itself is behind. + # A more precise check reads imposm's last.state.txt from the shared volume. + + if remote_ts: + lag_seconds = (datetime.now(timezone.utc) - remote_ts).total_seconds() + result["details"]["lag_seconds"] = round(lag_seconds) + result["details"]["lag_minutes"] = round(lag_seconds / 60, 1) + + if lag_seconds > Config.REPLICATION_LAG_THRESHOLD: + result["status"] = "critical" + result["message"] = ( + f"Replication lag is {round(lag_seconds / 60, 1)} minutes " + f"(threshold: {Config.REPLICATION_LAG_THRESHOLD // 60} min). " + f"Last replication timestamp: {remote_ts.isoformat()}" + ) + else: + result["message"] = ( + f"Replication is up to date. Lag: {round(lag_seconds / 60, 1)} min, " + f"sequence: {remote_seq}" + ) + else: + result["status"] = "warning" + result["message"] = "Could not parse replication timestamp" + + except requests.RequestException as e: + result["status"] = "critical" + result["message"] = f"Failed to fetch replication state: {e}" + + return result diff --git a/images/tiler-monitor/pipeline-monitor/checks/tile_cache.py b/images/tiler-monitor/pipeline-monitor/checks/tile_cache.py new file mode 100644 index 00000000..9d21eb45 --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/checks/tile_cache.py @@ -0,0 +1,159 @@ +"""Pipeline check: verify tile cache in S3 is up-to-date. + +For a sampled element, check if the cached tile in S3 was modified +after the changeset closed_at. If the tile is stale, the cache purge +(SQS → tiler-cache) may have failed. +""" + +import mercantile +import psycopg2.extensions +from datetime import datetime, timezone + +from config import Config + + +def _get_element_centroid(conn, elem): + """Get the centroid (lon, lat) of an element from the tiler DB.""" + osm_id = elem["osm_id"] + search_id = -osm_id if elem["type"] == "relation" else osm_id + + # Search in the tables where it was found + found_tables = elem.get("found_in_tables", []) + if not found_tables: + return None + + cur = conn.cursor() + for table in found_tables: + try: + quoted = psycopg2.extensions.quote_ident(table, cur) + cur.execute( + f"SELECT ST_X(ST_Centroid(ST_Transform(geometry, 4326))), " + f"ST_Y(ST_Centroid(ST_Transform(geometry, 4326))) " + f"FROM {quoted} WHERE osm_id = %s LIMIT 1", + (search_id,), + ) + row = cur.fetchone() + if row and row[0] is not None: + cur.close() + return {"lon": row[0], "lat": row[1]} + except Exception: + conn.rollback() + + cur.close() + return None + + +def _get_tile_for_point(lon, lat, zoom): + """Convert lon/lat to tile z/x/y.""" + tile = mercantile.tile(lon, lat, zoom) + return {"z": tile.z, "x": tile.x, "y": tile.y} + + +def _check_tile_in_s3(tile, changeset_closed_at): + """Check if a cached tile in S3 is stale (older than changeset). + + Returns dict with status and details for each S3 path. + """ + if not Config.S3_BUCKET_CACHE_TILER: + return { + "status": "skipped", + "message": "S3_BUCKET_CACHE_TILER not configured", + } + + s3 = Config.get_s3_client() + bucket = Config.S3_BUCKET_CACHE_TILER + z, x, y = tile["z"], tile["x"], tile["y"] + + results = [] + stale_paths = [] + + for path_prefix in Config.S3_BUCKET_PATH_FILES: + key = f"{path_prefix}/{z}/{x}/{y}.pbf" + try: + resp = s3.head_object(Bucket=bucket, Key=key) + last_modified = resp["LastModified"] + + # Parse changeset closed_at + closed_dt = datetime.fromisoformat( + changeset_closed_at.replace("Z", "+00:00") + ) + + is_stale = last_modified < closed_dt + result = { + "path": key, + "last_modified": last_modified.isoformat(), + "is_stale": is_stale, + } + results.append(result) + if is_stale: + stale_paths.append(key) + + except s3.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + # Tile not in cache — not stale, tegola will generate on demand + results.append({ + "path": key, + "last_modified": None, + "is_stale": False, + "note": "not cached (tegola generates on demand)", + }) + else: + results.append({ + "path": key, + "error": str(e), + }) + + if stale_paths: + return { + "status": "stale", + "message": f"Tile cache is stale for: {', '.join(stale_paths)}", + "tile": tile, + "details": results, + } + else: + return { + "status": "ok", + "message": "Tile cache is up-to-date or not cached", + "tile": tile, + "details": results, + } + + +def check_tile_cache_for_element(conn, elem_check, changeset_closed_at): + """Full tile cache verification for a single element. + + Args: + conn: DB connection + elem_check: result dict from _check_element_in_db (with found_in_tables) + changeset_closed_at: ISO timestamp string + + Returns: + dict with tile cache check results + """ + osm_id = elem_check["osm_id"] + elem_type = elem_check["type"] + zoom = Config.TILE_CHECK_ZOOM + + # Step 1: get geometry from DB + centroid = _get_element_centroid(conn, elem_check) + if not centroid: + return { + "osm_id": osm_id, + "type": elem_type, + "status": "skipped", + "message": "Could not get geometry from DB", + } + + # Step 2: calculate tile + tile = _get_tile_for_point(centroid["lon"], centroid["lat"], zoom) + + # Step 3: check S3 cache + cache_result = _check_tile_in_s3(tile, changeset_closed_at) + + return { + "osm_id": osm_id, + "type": elem_type, + "centroid": centroid, + "tile": tile, + "cache": cache_result, + } diff --git a/images/tiler-monitor/pipeline-monitor/config.py b/images/tiler-monitor/pipeline-monitor/config.py new file mode 100644 index 00000000..5b91f1d1 --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/config.py @@ -0,0 +1,103 @@ +import os +import re + + +def _parse_duration(value, default): + """Parse human-readable duration (e.g. '1h', '30m', '1.5h', '2h30m', '3600') to seconds.""" + raw = os.getenv(value, "") + if not raw: + return default + # If it's just a number, treat as seconds + try: + return int(float(raw)) + except ValueError: + pass + total = 0 + for amount, unit in re.findall(r"(\d+\.?\d*)\s*(h|m|s)", raw.lower()): + amount = float(amount) + if unit == "h": + total += amount * 3600 + elif unit == "m": + total += amount * 60 + elif unit == "s": + total += amount + return int(total) if total > 0 else default + + +class Config: + # PostgreSQL (tiler DB) + POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost") + POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", 5432)) + POSTGRES_DB = os.getenv("POSTGRES_DB", "tiler") + POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres") + POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "") + + # Replication + REPLICATION_STATE_URL = os.getenv( + "REPLICATION_STATE_URL", + "https://s3.amazonaws.com/planet.openhistoricalmap.org/replication/minute/state.txt", + ) + OHM_API_BASE = os.getenv("OHM_API_BASE", "https://www.openhistoricalmap.org/api/0.6") + + # How often to run the pipeline check (e.g. "1h", "30m", "3600") + CHECK_INTERVAL = _parse_duration("TILER_MONITORING_CHECK_INTERVAL", 3600) + + # OHM changeset age window (e.g. "1h", "2h30m", "3600") + CHANGESET_MIN_AGE = _parse_duration("TILER_MONITORING_CHANGESET_MIN_AGE", 10800) + CHANGESET_MAX_AGE = _parse_duration("TILER_MONITORING_CHANGESET_MAX_AGE", 14400) + + # Max number of changesets to check per run + CHANGESET_LIMIT = int(os.getenv("CHANGESET_LIMIT", 30)) + + # Retry: how many times to recheck a missing element before alerting + MAX_RETRIES = int(os.getenv("TILER_MONITORING_MAX_RETRIES", 3)) + + # Missing threshold: minimum percentage of missing elements in a changeset + # to consider it a real failure. Below this threshold, elements are marked + # as "warning" instead of "failed" and do NOT trigger RSS/Slack alerts. + # e.g. 10 = 10% — if only 1/44 elements is missing (2.3%), it's a warning. + MISSING_THRESHOLD_PCT = int(os.getenv("TILER_MONITORING_MISSING_THRESHOLD_PCT", 10)) + + # Verbose logging + VERBOSE_LOGGING = os.getenv("VERBOSE_LOGGING", "false").lower() == "true" + + # Alerting (optional) + SLACK_WEBHOOK_URL = os.getenv("TILER_MONITORING_SLACK_WEBHOOK_URL", "") + + # Server + MONITOR_PORT = int(os.getenv("MONITOR_PORT", 8001)) + MONITOR_BASE_URL = os.getenv("TILER_MONITORING_BASE_URL", "") + + # S3 tile cache verification + S3_BUCKET_CACHE_TILER = os.getenv("S3_BUCKET_CACHE_TILER", "") + S3_BUCKET_PATH_FILES = os.getenv("S3_BUCKET_PATH_FILES", "mnt/data/ohm,mnt/data/ohm_admin,mnt/data/ohm_other_boundaries").split(",") + TILER_CACHE_AWS_ACCESS_KEY_ID = os.getenv("TILER_CACHE_AWS_ACCESS_KEY_ID", "") + TILER_CACHE_AWS_SECRET_ACCESS_KEY = os.getenv("TILER_CACHE_AWS_SECRET_ACCESS_KEY", "") + TILER_CACHE_AWS_ENDPOINT = os.getenv("TILER_CACHE_AWS_ENDPOINT", "https://s3.amazonaws.com") + TILER_CACHE_REGION = os.getenv("TILER_CACHE_REGION", "us-east-1") + TILER_CACHE_CLOUD_INFRASTRUCTURE = os.getenv("TILER_CACHE_CLOUD_INFRASTRUCTURE", "aws") + # Zoom level to verify tile cache (use high zoom for precise check) + TILE_CHECK_ZOOM = int(os.getenv("TILE_CHECK_ZOOM", 16)) + # Percentage of elements to do full pipeline check (tables + views + S3) + # e.g. 25 = 25% of elements, minimum 1 + FULL_CHECK_SAMPLE_PCT = int(os.getenv("TILER_MONITORING_FULL_CHECK_SAMPLE_PCT", 25)) + + @staticmethod + def get_s3_client(): + import boto3 + if Config.TILER_CACHE_CLOUD_INFRASTRUCTURE == "hetzner": + return boto3.client( + "s3", + aws_access_key_id=Config.TILER_CACHE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=Config.TILER_CACHE_AWS_SECRET_ACCESS_KEY, + endpoint_url=Config.TILER_CACHE_AWS_ENDPOINT, + region_name=Config.TILER_CACHE_REGION, + ) + return boto3.client("s3") + + @staticmethod + def get_db_dsn(): + return ( + f"postgresql://{Config.POSTGRES_USER}:{Config.POSTGRES_PASSWORD}" + f"@{Config.POSTGRES_HOST}:{Config.POSTGRES_PORT}/{Config.POSTGRES_DB}" + ) diff --git a/images/tiler-monitor/pipeline-monitor/monitor.py b/images/tiler-monitor/pipeline-monitor/monitor.py new file mode 100644 index 00000000..18acb82b --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/monitor.py @@ -0,0 +1,397 @@ +"""Vtile pipeline monitor. + +Runs periodic changeset-centric checks and exposes results via a FastAPI HTTP +server. Optionally sends Slack alerts when checks fail. +""" + +import logging +import os +import threading +import time +from datetime import datetime, timezone + +import requests +import uvicorn +from fastapi import FastAPI +from fastapi.responses import HTMLResponse, JSONResponse, Response + +from checks.imposm_import import check_pipeline, check_single_changeset, recheck_retries +from config import Config +import retry_store + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +logger = logging.getLogger(__name__) + +# Store latest check result +_latest_result = None +_lock = threading.Lock() + +app = FastAPI(title="OHM Vtile Pipeline Monitor") + + +# --------------------------------------------------------------------------- +# Slack alerting +# --------------------------------------------------------------------------- + +def _send_slack_alert(check_result): + """Send a Slack notification when a check is not ok.""" + if not Config.SLACK_WEBHOOK_URL: + return + status_emoji = {"ok": ":white_check_mark:", "warning": ":warning:", "critical": ":rotating_light:"} + emoji = status_emoji.get(check_result["status"], ":question:") + ohm = "https://www.openhistoricalmap.org" + + lines = [f"{emoji} *OHM Tiler Pipeline Monitor* — {check_result['status'].upper()}"] + lines.append(check_result["message"]) + + # Add failed element details with links + newly_failed = check_result.get("details", {}).get("newly_failed", []) + if newly_failed: + lines.append("") + lines.append("*Elements missing after all retries:*") + for f in newly_failed[:10]: + cs_url = f"{ohm}/changeset/{f['changeset_id']}" + elem_url = f"{ohm}/{f['type']}/{f['osm_id']}" + lines.append(f" • <{elem_url}|{f['type']}/{f['osm_id']}> in <{cs_url}|changeset {f['changeset_id']}>") + + # Add dashboard link + if Config.MONITOR_BASE_URL: + lines.append("") + lines.append(f":mag: <{Config.MONITOR_BASE_URL}|Open Dashboard> · " + f"<{Config.MONITOR_BASE_URL}/retries|View Retries>") + + # Add changeset-level issues + changesets = check_result.get("details", {}).get("changesets", []) + cs_issues = [cs for cs in changesets + if cs.get("tiler_db", {}).get("status") not in ("ok", "retry_pending", None)] + if cs_issues and not newly_failed: + lines.append("") + for cs in cs_issues[:5]: + cs_url = f"{ohm}/changeset/{cs['changeset_id']}" + msg = cs.get("tiler_db", {}).get("message", "") + lines.append(f" • <{cs_url}|Changeset {cs['changeset_id']}>: {msg}") + + text = "\n".join(lines) + try: + requests.post( + Config.SLACK_WEBHOOK_URL, + json={"text": text}, + timeout=10, + ) + except requests.RequestException as e: + logger.error(f"Failed to send Slack alert: {e}") + + +# --------------------------------------------------------------------------- +# Background scheduler +# --------------------------------------------------------------------------- + +def _run_check(): + """Run the pipeline check and update stored result.""" + try: + logger.info("=============> Running pipeline check") + result = check_pipeline() + logger.info(f"Pipeline check: {result['status']} — {result['message']}") + + with _lock: + prev = _latest_result + globals()["_latest_result"] = result + + # Alert on state changes or new failures + newly_failed = result.get("details", {}).get("newly_failed", []) + if newly_failed: + # New elements just exhausted retries — always alert + _send_slack_alert(result) + # Log each failed element as a feed event + ohm = "https://www.openhistoricalmap.org" + for f in newly_failed: + retry_store.add_feed_event( + event_type="failed", + title=f"FAILED: {f['type']}/{f['osm_id']} not found in tiler DB after all retries", + description=( + f"Element {f['type']}/{f['osm_id']} from changeset {f['changeset_id']} " + f"was not found in the tiler database after all retries." + ), + link=f"{ohm}/{f['type']}/{f['osm_id']}", + element_type=f["type"], + osm_id=f["osm_id"], + changeset_id=f["changeset_id"], + ) + elif result["status"] == "warning": + if prev is None or prev["status"] == "ok": + _send_slack_alert(result) + elif result["status"] == "ok" and prev and prev["status"] in ("critical", "warning"): + # Recovered — send ok notification + _send_slack_alert(result) + retry_store.add_feed_event( + event_type="recovered", + title="RECOVERED: All pipeline elements verified OK", + description=result["message"], + ) + + except Exception as e: + logger.exception(f"Pipeline check raised an exception: {e}") + with _lock: + globals()["_latest_result"] = { + "name": "pipeline", + "status": "critical", + "message": f"Check failed with exception: {e}", + "details": {}, + "checked_at": datetime.now(timezone.utc).isoformat(), + } + + +def _scheduler(): + """Background loop that runs checks at the configured interval.""" + logger.info(f"Pipeline monitor starting. Check interval: {Config.CHECK_INTERVAL}s") + time.sleep(10) + + while True: + _run_check() + time.sleep(Config.CHECK_INTERVAL) + + +# --------------------------------------------------------------------------- +# HTTP endpoints +# --------------------------------------------------------------------------- + +_STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") + + +@app.get("/", response_class=HTMLResponse) +def dashboard(): + """Serve the monitoring dashboard.""" + html_path = os.path.join(_STATIC_DIR, "dashboard.html") + with open(html_path) as f: + return HTMLResponse(content=f.read()) + + +@app.get("/health") +def health(): + """Overall health: returns 200 if ok, 503 otherwise.""" + with _lock: + result = _latest_result + + if result is None: + return JSONResponse( + content={"status": "starting", "message": "No checks have run yet"}, + status_code=200, + ) + + status_code = 200 if result["status"] == "ok" else 503 + return JSONResponse( + content={ + "status": result["status"], + "message": result["message"], + "checked_at": result["checked_at"], + }, + status_code=status_code, + ) + + +@app.get("/checks") +def all_checks(): + """Return full details for the latest pipeline check.""" + with _lock: + result = _latest_result + if result is None: + return JSONResponse(content={"status": "starting"}) + return JSONResponse(content=result) + + +@app.get("/changeset/{changeset_id}") +def evaluate_changeset(changeset_id: int): + """Evaluate a specific changeset through the full pipeline (on-demand).""" + result = check_single_changeset(changeset_id) + status_code = 200 if result["status"] == "ok" else 503 + return JSONResponse(content=result, status_code=status_code) + + +@app.post("/retries/recheck") +def retries_recheck(): + """Manually trigger a recheck of all pending and failed retries.""" + result = recheck_retries() + + # Update cached status if all retries are now resolved + remaining = retry_store.summary() + if remaining.get("failed", 0) == 0 and remaining.get("pending", 0) == 0: + with _lock: + prev = _latest_result + if prev and prev.get("status") == "critical": + updated = dict(prev) + updated["status"] = "ok" + updated["message"] = "All retries resolved" + updated["details"] = dict(prev.get("details", {})) + updated["details"]["retries"] = remaining + updated["details"]["total_failed"] = 0 + updated["details"]["newly_failed"] = [] + globals()["_latest_result"] = updated + + return JSONResponse(content=result) + + +@app.post("/retries/recheck/{element_type}/{osm_id}") +def retries_recheck_single(element_type: str, osm_id: int): + """Manually recheck a single element in the tiler DB.""" + try: + from checks.imposm_import import recheck_single_element + result = recheck_single_element(element_type, osm_id) + + # Update cached status if no more failed retries + remaining = retry_store.summary() + if remaining.get("failed", 0) == 0 and remaining.get("pending", 0) == 0: + with _lock: + prev = _latest_result + if prev and prev.get("status") == "critical": + updated = dict(prev) + updated["status"] = "ok" + updated["message"] = "All retries resolved" + updated["details"] = dict(prev.get("details", {})) + updated["details"]["retries"] = remaining + updated["details"]["total_failed"] = 0 + updated["details"]["newly_failed"] = [] + globals()["_latest_result"] = updated + + return JSONResponse(content=result) + except Exception as e: + logger.exception(f"Recheck failed for {element_type}/{osm_id}") + return JSONResponse( + content={"status": "error", "message": f"Recheck failed: {e}"}, + status_code=500, + ) + + +@app.get("/retries") +def retries(): + """Return current retry state with full details for debugging.""" + all_entries = retry_store.get_all_details() + pending = [e for e in all_entries if e["status"] == "pending"] + failed = [e for e in all_entries if e["status"] == "failed"] + warnings = [e for e in all_entries if e["status"] == "warning"] + return JSONResponse(content={ + "summary": retry_store.summary(), + "total": len(all_entries), + "pending": pending, + "failed": failed, + "warnings": warnings, + }) + + +@app.get("/history") +def history(page: int = 1, per_page: int = 20): + """Paginated history of all changesets checked. + + Example: /history?page=1&per_page=10 + """ + data = retry_store.get_changeset_history(page=page, per_page=per_page) + return JSONResponse(content=data) + + +@app.get("/history/{history_id}/elements") +def history_elements(history_id: int): + """Return all elements checked for a specific history entry.""" + elements = retry_store.get_changeset_elements(history_id) + return JSONResponse(content={"history_id": history_id, "elements": elements}) + + +# --------------------------------------------------------------------------- +# RSS / Atom feed +# --------------------------------------------------------------------------- + +def _xml_escape(text): + """Escape XML special characters.""" + return (str(text) + .replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'")) + + +def _to_rfc822(iso_str): + """Convert ISO timestamp to RFC 822 format for RSS.""" + try: + dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00")) + return dt.strftime("%a, %d %b %Y %H:%M:%S +0000") + except Exception: + return datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S +0000") + + +def _build_rss_feed(): + """Build an RSS 2.0 feed from persistent feed events. + + Each event is a unique item with a stable guid (based on DB id), + so Slack's /feed command detects new items as they appear. + """ + base_url = Config.MONITOR_BASE_URL or "https://tiler-monitoring.openhistoricalmap.org" + now = datetime.now(timezone.utc) + rfc822_now = now.strftime("%a, %d %b %Y %H:%M:%S +0000") + + events = retry_store.get_feed_events(limit=50) + + items_xml = [] + for ev in events: + title = ev["title"] + link = ev["link"] or base_url + guid = f"ohm-tiler-feed-{ev['id']}" + pub_date = _to_rfc822(ev["created_at"]) + desc = ev["description"] + + # Add element/changeset links in description if available + if ev["osm_id"] and ev["element_type"]: + ohm = "https://www.openhistoricalmap.org" + elem_link = f"{ohm}/{ev['element_type']}/{ev['osm_id']}" + cs_link = f"{ohm}/changeset/{ev['changeset_id']}" if ev["changeset_id"] else "" + desc_parts = [desc] + desc_parts.append(f"Element: {elem_link}") + if cs_link: + desc_parts.append(f"Changeset: {cs_link}") + desc_parts.append(f"Dashboard: {base_url}") + desc = " | ".join(desc_parts) + + items_xml.append(f""" + {_xml_escape(title)} + {_xml_escape(link)} + {_xml_escape(guid)} + {pub_date} + {_xml_escape(desc)} + """) + + items_str = "\n".join(items_xml) if items_xml else "" + + feed = f""" + + + OHM Tiler Pipeline Monitor - Alerts + {_xml_escape(base_url)} + Alerts from the OHM tiler pipeline monitor: failed elements not found in the tiler DB after retries. + en + {rfc822_now} + +{items_str} + +""" + return feed + + +@app.get("/feed.rss") +def rss_feed(): + """RSS 2.0 feed of pipeline alerts for Slack integration.""" + xml = _build_rss_feed() + return Response(content=xml, media_type="application/rss+xml") + + +# --------------------------------------------------------------------------- +# Entrypoint +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + # Start background scheduler + t = threading.Thread(target=_scheduler, daemon=True) + t.start() + + # Start HTTP server + uvicorn.run(app, host="0.0.0.0", port=Config.MONITOR_PORT) diff --git a/images/tiler-monitor/pipeline-monitor/requirements.txt b/images/tiler-monitor/pipeline-monitor/requirements.txt new file mode 100644 index 00000000..1360cc74 --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/requirements.txt @@ -0,0 +1,6 @@ +fastapi +uvicorn +requests +psycopg2-binary +mercantile +boto3 diff --git a/images/tiler-monitor/pipeline-monitor/retry_store.py b/images/tiler-monitor/pipeline-monitor/retry_store.py new file mode 100644 index 00000000..ccfccc58 --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/retry_store.py @@ -0,0 +1,521 @@ +"""SQLite-backed retry store for missing pipeline elements. + +Tracks elements that were not found in the tiler DB so they can be +rechecked on subsequent runs. After MAX_RETRIES consecutive failures +the element is marked as "failed" and an alert can be triggered. + +Uses a single shared connection with a threading lock to avoid +"database is locked" errors from concurrent access. +""" + +import logging +import sqlite3 +import os +import threading +from datetime import datetime, timezone + +logger = logging.getLogger(__name__) + +_DB_PATH = os.getenv("TILER_MONITORING_RETRY_DB", "/data/pipeline_retries.db") +_lock = threading.Lock() +_conn = None + + +def _get_conn(): + """Return the shared connection, creating it on first call.""" + global _conn + if _conn is None: + _conn = sqlite3.connect(_DB_PATH, check_same_thread=False) + _conn.row_factory = sqlite3.Row + _conn.execute("PRAGMA journal_mode=WAL") + _conn.execute("PRAGMA busy_timeout=5000") + _init_tables(_conn) + return _conn + + +def _init_tables(conn): + """Create all tables and indexes.""" + conn.execute(""" + CREATE TABLE IF NOT EXISTS pending_retries ( + changeset_id INTEGER NOT NULL, + element_type TEXT NOT NULL, + osm_id INTEGER NOT NULL, + version INTEGER NOT NULL DEFAULT 0, + action TEXT NOT NULL DEFAULT '', + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL, + first_seen TEXT NOT NULL, + last_checked TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + closed_at TEXT NOT NULL DEFAULT '', + PRIMARY KEY (changeset_id, element_type, osm_id) + ) + """) + # Migrate: add columns if missing (for existing DBs) + for col, typedef in [("version", "INTEGER NOT NULL DEFAULT 0"), + ("action", "TEXT NOT NULL DEFAULT ''"), + ("closed_at", "TEXT NOT NULL DEFAULT ''")]: + try: + conn.execute(f"ALTER TABLE pending_retries ADD COLUMN {col} {typedef}") + except sqlite3.OperationalError: + pass + + conn.execute(""" + CREATE TABLE IF NOT EXISTS changeset_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + changeset_id INTEGER NOT NULL, + created_at TEXT NOT NULL DEFAULT '', + closed_at TEXT NOT NULL DEFAULT '', + checked_at TEXT NOT NULL, + status TEXT NOT NULL, + total_elements INTEGER NOT NULL DEFAULT 0, + missing_count INTEGER NOT NULL DEFAULT 0, + ok_count INTEGER NOT NULL DEFAULT 0, + message TEXT NOT NULL DEFAULT '' + ) + """) + for hist_col, hist_typedef in [("closed_at", "TEXT NOT NULL DEFAULT ''"), + ("created_at", "TEXT NOT NULL DEFAULT ''")]: + try: + conn.execute(f"ALTER TABLE changeset_history ADD COLUMN {hist_col} {hist_typedef}") + except sqlite3.OperationalError: + pass + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_history_checked_at + ON changeset_history(checked_at DESC) + """) + + conn.execute(""" + CREATE TABLE IF NOT EXISTS element_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + history_id INTEGER NOT NULL, + changeset_id INTEGER NOT NULL, + element_type TEXT NOT NULL, + osm_id INTEGER NOT NULL, + version INTEGER NOT NULL DEFAULT 0, + action TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL, + found_in_tables TEXT NOT NULL DEFAULT '', + found_in_views TEXT NOT NULL DEFAULT '', + checked_at TEXT NOT NULL, + FOREIGN KEY (history_id) REFERENCES changeset_history(id) + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_element_history_id + ON element_history(history_id) + """) + + # Feed events: persistent log of alerts (failed elements, recoveries) + # for the RSS feed. Items are never deleted, only added. + conn.execute(""" + CREATE TABLE IF NOT EXISTS feed_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_type TEXT NOT NULL, + element_type TEXT NOT NULL DEFAULT '', + osm_id INTEGER NOT NULL DEFAULT 0, + version INTEGER NOT NULL DEFAULT 0, + changeset_id INTEGER NOT NULL DEFAULT 0, + action TEXT NOT NULL DEFAULT '', + title TEXT NOT NULL, + description TEXT NOT NULL DEFAULT '', + link TEXT NOT NULL DEFAULT '', + created_at TEXT NOT NULL + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_feed_events_created + ON feed_events(created_at DESC) + """) + + conn.commit() + + +# --------------------------------------------------------------------------- +# Pending retries +# --------------------------------------------------------------------------- + +def add_missing(changeset_id: int, element_type: str, osm_id: int, + max_retries: int, version: int = 0, action: str = "", + closed_at: str = ""): + """Register a missing element for future retry. If it already exists, do nothing.""" + now = datetime.now(timezone.utc).isoformat() + with _lock: + conn = _get_conn() + conn.execute(""" + INSERT OR IGNORE INTO pending_retries + (changeset_id, element_type, osm_id, version, action, + retry_count, max_retries, first_seen, last_checked, status, closed_at) + VALUES (?, ?, ?, ?, ?, 0, ?, ?, ?, 'pending', ?) + """, (changeset_id, element_type, osm_id, version, action, max_retries, now, now, closed_at or "")) + conn.commit() + + +def get_pending(): + """Return all elements with status='pending' that still need to be rechecked.""" + with _lock: + conn = _get_conn() + rows = conn.execute( + "SELECT * FROM pending_retries WHERE status = 'pending'" + ).fetchall() + return [dict(r) for r in rows] + + +def mark_resolved(changeset_id: int, element_type: str, osm_id: int): + """Element was found in a retry — remove it from pending.""" + with _lock: + conn = _get_conn() + conn.execute(""" + DELETE FROM pending_retries + WHERE changeset_id = ? AND element_type = ? AND osm_id = ? + """, (changeset_id, element_type, osm_id)) + conn.commit() + + +def increment_retry(changeset_id: int, element_type: str, osm_id: int, + final_status: str = "failed"): + """Bump retry_count. If it reaches max_retries, flip status to *final_status*. + + *final_status* is normally 'failed', but callers may pass 'warning' when + the missing percentage is below the alerting threshold. + + Returns the new status ('pending', 'warning', or 'failed'). + """ + now = datetime.now(timezone.utc).isoformat() + with _lock: + conn = _get_conn() + conn.execute(""" + UPDATE pending_retries + SET retry_count = retry_count + 1, last_checked = ? + WHERE changeset_id = ? AND element_type = ? AND osm_id = ? + """, (now, changeset_id, element_type, osm_id)) + + row = conn.execute(""" + SELECT retry_count, max_retries FROM pending_retries + WHERE changeset_id = ? AND element_type = ? AND osm_id = ? + """, (changeset_id, element_type, osm_id)).fetchone() + + if row and row["retry_count"] >= row["max_retries"]: + conn.execute(""" + UPDATE pending_retries SET status = ? + WHERE changeset_id = ? AND element_type = ? AND osm_id = ? + """, (final_status, changeset_id, element_type, osm_id)) + conn.commit() + return final_status + + conn.commit() + return "pending" + + +def get_failed(): + """Return all elements that exhausted their retries with status='failed'.""" + with _lock: + conn = _get_conn() + rows = conn.execute( + "SELECT * FROM pending_retries WHERE status = 'failed'" + ).fetchall() + return [dict(r) for r in rows] + + +def get_warnings(): + """Return all elements that exhausted retries but are below the missing threshold.""" + with _lock: + conn = _get_conn() + rows = conn.execute( + "SELECT * FROM pending_retries WHERE status = 'warning'" + ).fetchall() + return [dict(r) for r in rows] + + +def clear_failed(): + """Remove all failed entries (call after alerting).""" + with _lock: + conn = _get_conn() + conn.execute("DELETE FROM pending_retries WHERE status = 'failed'") + conn.commit() + + +def get_all_details(ohm_base="https://www.openhistoricalmap.org"): + """Return all entries enriched with URLs and human-readable times.""" + with _lock: + conn = _get_conn() + rows = conn.execute( + "SELECT * FROM pending_retries ORDER BY status, first_seen DESC" + ).fetchall() + + # Get changeset object counts from history (latest check per changeset) + cs_ids = list(set(r["changeset_id"] for r in rows)) + cs_stats = {} + if cs_ids: + placeholders = ",".join("?" * len(cs_ids)) + stats_rows = conn.execute(f""" + SELECT changeset_id, total_elements, missing_count, ok_count + FROM changeset_history + WHERE id IN ( + SELECT MAX(id) FROM changeset_history + WHERE changeset_id IN ({placeholders}) + GROUP BY changeset_id + ) + """, cs_ids).fetchall() + for sr in stats_rows: + cs_stats[sr["changeset_id"]] = { + "total_elements": sr["total_elements"], + "missing_count": sr["missing_count"], + "ok_count": sr["ok_count"], + } + + now = datetime.now(timezone.utc) + results = [] + for r in rows: + entry = dict(r) + entry["changeset_url"] = f"{ohm_base}/changeset/{r['changeset_id']}" + entry["element_url"] = f"{ohm_base}/{r['element_type']}/{r['osm_id']}" + try: + first = datetime.fromisoformat(r["first_seen"]) + entry["age"] = _human_duration((now - first).total_seconds()) + except Exception: + entry["age"] = "" + try: + last = datetime.fromisoformat(r["last_checked"]) + entry["last_checked_ago"] = _human_duration((now - last).total_seconds()) + except Exception: + entry["last_checked_ago"] = "" + # Changeset closed_at (close time as formatted date) + closed_at_val = r["closed_at"] if "closed_at" in r.keys() else "" + if closed_at_val: + try: + closed = datetime.fromisoformat(closed_at_val.replace("Z", "+00:00")) + entry["closed_at_fmt"] = closed.strftime("%Y-%m-%d %H:%M UTC") + except Exception: + entry["closed_at_fmt"] = "" + else: + entry["closed_at_fmt"] = "" + # Changeset object counts + stats = cs_stats.get(r["changeset_id"], {}) + entry["cs_total_elements"] = stats.get("total_elements", 0) + entry["cs_ok_count"] = stats.get("ok_count", 0) + entry["cs_missing_count"] = stats.get("missing_count", 0) + entry["retries_remaining"] = max(0, r["max_retries"] - r["retry_count"]) + results.append(entry) + return results + + +# --------------------------------------------------------------------------- +# Changeset stats helpers +# --------------------------------------------------------------------------- + +def get_changeset_stats(changeset_id: int): + """Return the latest total_elements, missing_count, ok_count for a changeset. + + Returns a dict with those keys, or None if no history exists. + """ + with _lock: + conn = _get_conn() + row = conn.execute(""" + SELECT total_elements, missing_count, ok_count + FROM changeset_history + WHERE changeset_id = ? + ORDER BY id DESC LIMIT 1 + """, (changeset_id,)).fetchone() + if row: + return { + "total_elements": row["total_elements"], + "missing_count": row["missing_count"], + "ok_count": row["ok_count"], + } + return None + + +# --------------------------------------------------------------------------- +# Changeset history +# --------------------------------------------------------------------------- + +def log_changeset_check(changeset_id: int, status: str, + total_elements: int, missing_count: int, + ok_count: int, message: str, + created_at: str = "", closed_at: str = "", + elements: list = None): + """Record a changeset check and its elements in the history tables.""" + now = datetime.now(timezone.utc).isoformat() + with _lock: + conn = _get_conn() + cur = conn.execute(""" + INSERT INTO changeset_history + (changeset_id, created_at, closed_at, checked_at, status, total_elements, missing_count, ok_count, message) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (changeset_id, created_at or "", closed_at or "", now, status, total_elements, missing_count, ok_count, message)) + history_id = cur.lastrowid + + if elements: + for elem in elements: + tables = ", ".join(elem.get("found_in_tables", [])) + views = ", ".join(elem.get("found_in_views", [])) + found = bool(elem.get("found_in_tables")) + deleted = elem.get("deleted", False) + if deleted: + elem_status = "skipped" + elif elem.get("action") == "delete": + elem_status = "ok" if not found else "not_deleted" + elif found: + elem_status = "ok" + else: + elem_status = "missing" + conn.execute(""" + INSERT INTO element_history + (history_id, changeset_id, element_type, osm_id, version, + action, status, found_in_tables, found_in_views, checked_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (history_id, changeset_id, elem.get("type", ""), + elem.get("osm_id", 0), elem.get("version", 0), + elem.get("action", ""), elem_status, tables, views, now)) + conn.commit() + + +def get_changeset_history(page: int = 1, per_page: int = 20, + ohm_base: str = "https://www.openhistoricalmap.org"): + """Return paginated changeset check history with details.""" + with _lock: + conn = _get_conn() + total = conn.execute("SELECT COUNT(*) FROM changeset_history").fetchone()[0] + total_pages = max(1, (total + per_page - 1) // per_page) + offset = (page - 1) * per_page + + rows = conn.execute(""" + SELECT * FROM changeset_history + ORDER BY checked_at DESC + LIMIT ? OFFSET ? + """, (per_page, offset)).fetchall() + + now = datetime.now(timezone.utc) + results = [] + for r in rows: + entry = dict(r) + entry["changeset_url"] = f"{ohm_base}/changeset/{r['changeset_id']}" + try: + checked = datetime.fromisoformat(r["checked_at"]) + entry["checked_ago"] = _human_duration((now - checked).total_seconds()) + except Exception: + entry["checked_ago"] = "" + if r["closed_at"]: + try: + closed = datetime.fromisoformat(r["closed_at"].replace("Z", "+00:00")) + entry["closed_ago"] = _human_duration((now - closed).total_seconds()) + except Exception: + entry["closed_ago"] = "" + else: + entry["closed_ago"] = "" + created_at_val = r["created_at"] if "created_at" in r.keys() else "" + if created_at_val: + try: + created = datetime.fromisoformat(created_at_val.replace("Z", "+00:00")) + entry["created_fmt"] = created.strftime("%Y-%m-%d %H:%M UTC") + except Exception: + entry["created_fmt"] = "" + else: + entry["created_fmt"] = "" + results.append(entry) + + return { + "page": page, + "per_page": per_page, + "total": total, + "total_pages": total_pages, + "changesets": results, + } + + +def get_changeset_elements(history_id: int, + ohm_base: str = "https://www.openhistoricalmap.org"): + """Return all elements checked for a specific history entry.""" + with _lock: + conn = _get_conn() + rows = conn.execute(""" + SELECT * FROM element_history + WHERE history_id = ? + ORDER BY status DESC, element_type, osm_id + """, (history_id,)).fetchall() + + results = [] + for r in rows: + entry = dict(r) + entry["element_url"] = f"{ohm_base}/{r['element_type']}/{r['osm_id']}" + entry["found_in_tables"] = r["found_in_tables"].split(", ") if r["found_in_tables"] else [] + entry["found_in_views"] = r["found_in_views"].split(", ") if r["found_in_views"] else [] + results.append(entry) + return results + + +def is_changeset_passed(changeset_id: int) -> bool: + """Return True if this changeset was already checked with status 'ok'.""" + with _lock: + conn = _get_conn() + row = conn.execute(""" + SELECT 1 FROM changeset_history + WHERE changeset_id = ? AND status = 'ok' + LIMIT 1 + """, (changeset_id,)).fetchone() + return row is not None + + +def summary(): + """Return counts by status for logging.""" + with _lock: + conn = _get_conn() + rows = conn.execute( + "SELECT status, COUNT(*) as cnt FROM pending_retries GROUP BY status" + ).fetchall() + return {r["status"]: r["cnt"] for r in rows} + + +# --------------------------------------------------------------------------- +# Feed events +# --------------------------------------------------------------------------- + +def add_feed_event(event_type: str, title: str, description: str = "", + link: str = "", element_type: str = "", osm_id: int = 0, + version: int = 0, changeset_id: int = 0, action: str = ""): + """Add a persistent event to the RSS feed.""" + now = datetime.now(timezone.utc).isoformat() + with _lock: + conn = _get_conn() + conn.execute(""" + INSERT INTO feed_events + (event_type, element_type, osm_id, version, changeset_id, + action, title, description, link, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (event_type, element_type, osm_id, version, changeset_id, + action, title, description, link, now)) + conn.commit() + + +def get_feed_events(limit: int = 50): + """Return the most recent feed events for the RSS feed.""" + with _lock: + conn = _get_conn() + rows = conn.execute(""" + SELECT * FROM feed_events + ORDER BY created_at DESC + LIMIT ? + """, (limit,)).fetchall() + return [dict(r) for r in rows] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _human_duration(seconds): + """Convert seconds to human-readable string like '2h 15m ago'.""" + seconds = int(seconds) + if seconds < 60: + return f"{seconds}s ago" + minutes = seconds // 60 + if minutes < 60: + return f"{minutes}m ago" + hours = minutes // 60 + remaining_min = minutes % 60 + if hours < 24: + return f"{hours}h {remaining_min}m ago" if remaining_min else f"{hours}h ago" + days = hours // 24 + remaining_hours = hours % 24 + return f"{days}d {remaining_hours}h ago" if remaining_hours else f"{days}d ago" diff --git a/images/tiler-monitor/pipeline-monitor/static/dashboard.html b/images/tiler-monitor/pipeline-monitor/static/dashboard.html new file mode 100644 index 00000000..b6980c94 --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/static/dashboard.html @@ -0,0 +1,596 @@ + + + + + +OHM Tiler Pipeline Monitor + + + + +
+ +

OHM Tiler Pipeline Monitor

+
+ +
+
Status
LOADING...
+
+ +
+ + + +
+ +
+ + + +
+ + +
+ + + + + + + +
ChangesetStatusElementsMissingReplicationAgeMessage
+ +
+ + +
+
+ +
+ + + + + + + +
ChangesetStatusTotalOKMissingCreatedClosedCheckedMessage
+ + +
+ + +
+
+ + +
+ + + + + + +
ElementChangesetActionStatusRetriesObjectsChangeset ClosedFirst SeenLast Checked
+ +
+ + + + diff --git a/images/tiler-monitor/pipeline-monitor/tables_config.json b/images/tiler-monitor/pipeline-monitor/tables_config.json new file mode 100644 index 00000000..ef0c407f --- /dev/null +++ b/images/tiler-monitor/pipeline-monitor/tables_config.json @@ -0,0 +1,103 @@ +{ + "tag_to_check": { + "amenity": { + "tables": ["osm_amenity_areas", "osm_amenity_points"], + "views": ["mv_amenity_areas_z14_15", "mv_amenity_areas_z16_20", "mv_amenity_points", "mv_amenity_points_centroids_z14_15", "mv_amenity_points_centroids_z16_20"] + }, + "aeroway": { + "tables": ["osm_transport_lines", "osm_transport_areas", "osm_transport_points"], + "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20", "mv_transport_areas_z10_12", "mv_transport_areas_z16_20", "mv_transport_points", "mv_transport_points_centroids_z16_20"] + }, + "barrier": { + "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points", "osm_water_lines"], + "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z16_20", "mv_water_lines_z8_9", "mv_water_lines_z16_20"] + }, + "boundary": { + "tables": ["osm_admin_areas", "osm_admin_lines", "osm_admin_relation_members"], + "views": ["mv_admin_boundaries_areas_z0_2", "mv_admin_boundaries_areas_z16_20", "mv_admin_boundaries_lines_z0_2", "mv_admin_boundaries_lines_z16_20", "mv_admin_boundaries_centroids_z0_2", "mv_admin_boundaries_centroids_z16_20", "mv_admin_maritime_lines_z0_5_v2", "mv_admin_maritime_lines_z10_15", "mv_non_admin_boundaries_areas_z0_2", "mv_non_admin_boundaries_areas_z16_20", "mv_non_admin_boundaries_centroids_z0_2", "mv_non_admin_boundaries_centroids_z16_20", "mv_relation_members_boundaries"] + }, + "building": { + "tables": ["osm_buildings", "osm_buildings_points"], + "views": ["mv_buildings_areas_z14_15", "mv_buildings_areas_z16_20", "mv_buildings_points", "mv_buildings_points_centroids_z14_15", "mv_buildings_points_centroids_z16_20"] + }, + "communication": { + "tables": ["osm_communication_lines", "osm_communication_multilines"], + "views": ["mv_communication_z10_12", "mv_communication_z16_20"] + }, + "craft": { + "tables": ["osm_other_points"], + "views": ["mv_other_points", "mv_other_points_centroids_z16_20"] + }, + "highway": { + "tables": ["osm_transport_lines", "osm_transport_areas", "osm_transport_points", "osm_transport_multilines"], + "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20", "mv_transport_areas_z10_12", "mv_transport_areas_z16_20", "mv_transport_points", "mv_transport_points_centroids_z10_12", "mv_transport_points_centroids_z16_20"] + }, + "historic": { + "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"], + "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"] + }, + "landuse": { + "tables": ["osm_landuse_areas", "osm_landuse_lines", "osm_landuse_points", "osm_water_areas"], + "views": ["mv_landuse_areas_z6_7", "mv_landuse_areas_z16_20", "mv_landuse_lines_z14_15", "mv_landuse_lines_z16_20", "mv_landuse_points", "mv_landuse_points_centroids_z6_7", "mv_landuse_points_centroids_z16_20", "mv_water_areas_z0_2", "mv_water_areas_z16_20", "mv_water_areas_centroids_z8_9", "mv_water_areas_centroids_z16_20"] + }, + "leisure": { + "tables": ["osm_landuse_areas", "osm_landuse_lines", "osm_landuse_points"], + "views": ["mv_landuse_areas_z6_7", "mv_landuse_areas_z16_20", "mv_landuse_lines_z14_15", "mv_landuse_lines_z16_20", "mv_landuse_points", "mv_landuse_points_centroids_z6_7", "mv_landuse_points_centroids_z16_20"] + }, + "man_made": { + "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"], + "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"] + }, + "military": { + "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"], + "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"] + }, + "natural": { + "tables": ["osm_landuse_areas", "osm_landuse_lines", "osm_landuse_points", "osm_water_areas", "osm_water_lines"], + "views": ["mv_landuse_areas_z6_7", "mv_landuse_areas_z16_20", "mv_landuse_lines_z14_15", "mv_landuse_lines_z16_20", "mv_landuse_points", "mv_landuse_points_centroids_z6_7", "mv_landuse_points_centroids_z16_20", "mv_water_areas_z0_2", "mv_water_areas_z16_20", "mv_water_areas_centroids_z8_9", "mv_water_areas_centroids_z16_20", "mv_water_lines_z8_9", "mv_water_lines_z16_20"] + }, + "place": { + "tables": ["osm_place_areas", "osm_place_points"], + "views": ["mv_place_areas_z14_20", "mv_place_points_centroids_z0_2", "mv_place_points_centroids_z11_20"] + }, + "power": { + "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"], + "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"] + }, + "railway": { + "tables": ["osm_transport_lines", "osm_transport_areas", "osm_transport_points", "osm_transport_multilines"], + "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20", "mv_transport_areas_z10_12", "mv_transport_areas_z16_20", "mv_transport_points", "mv_transport_points_centroids_z10_12", "mv_transport_points_centroids_z16_20"] + }, + "route": { + "tables": ["osm_route_lines", "osm_route_multilines"], + "views": ["mv_routes_indexed_z16_20"], + "view_column": "osm_id", + "view_id_mode": "members" + }, + "shop": { + "tables": ["osm_other_points"], + "views": ["mv_other_points", "mv_other_points_centroids_z16_20"] + }, + "tourism": { + "tables": ["osm_other_points"], + "views": ["mv_other_points", "mv_other_points_centroids_z16_20"] + }, + "type=street": { + "tables": ["osm_street_multilines"], + "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20"] + }, + "waterway": { + "tables": ["osm_water_lines", "osm_water_areas"], + "views": ["mv_water_lines_z8_9", "mv_water_lines_z16_20", "mv_water_areas_z0_2", "mv_water_areas_z16_20", "mv_water_areas_centroids_z8_9", "mv_water_areas_centroids_z16_20"] + } + }, + "reject_values": { + "natural": ["coastline"] + }, + "importable_relation_types": [ + "multipolygon", + "boundary", + "route", + "street" + ] +}