diff --git a/.github/workflows/chartpress.yaml b/.github/workflows/chartpress.yaml
index 238bc9a9..4aadf2db 100644
--- a/.github/workflows/chartpress.yaml
+++ b/.github/workflows/chartpress.yaml
@@ -4,7 +4,7 @@ on:
branches:
- 'main'
- 'staging'
- - 'k8s_deploy'
+ - 'tiler_monitoring'
jobs:
build:
runs-on: ubuntu-22.04
@@ -71,7 +71,7 @@ jobs:
OHM_SLACK_WEBHOOK_URL: ${{ secrets.OHM_SLACK_WEBHOOK_URL }}
################ Staging secrets ################
- name: Staging - substitute secrets
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/k8s_deploy'
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/tiler_monitoring'
uses: bluwy/substitute-string-action@v1
with:
_input-file: 'values.staging.template.yaml'
@@ -189,46 +189,46 @@ jobs:
PRODUCTION_OPENSTREETMAP_AUTH_SECRET: ${{ secrets.PRODUCTION_OPENSTREETMAP_AUTH_SECRET }}
- name: AWS Credentials
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy'
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring'
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Setup Kubectl and Helm Dependencies
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy'
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring'
run: |
sudo pip install awscli --ignore-installed six
- sudo curl -L -o /usr/bin/kubectl https://dl.k8s.io/release/v1.33.0/bin/linux/amd64/kubectl
+ sudo curl -L -o /usr/bin/kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.17.7/2020-07-08/bin/linux/amd64/kubectl
sudo chmod +x /usr/bin/kubectl
- sudo curl -o /usr/bin/aws-iam-authenticator https://amazon-eks.s3.us-west-2.amazonaws.com/1.21.2/2021-07-05/bin/linux/amd64/aws-iam-authenticator
+ sudo curl -o /usr/bin/aws-iam-authenticator https://amazon-eks.s3.us-west-2.amazonaws.com/1.17.7/2020-07-08/bin/linux/amd64/aws-iam-authenticator
sudo chmod +x /usr/bin/aws-iam-authenticator
- curl -L https://get.helm.sh/helm-v3.17.3-linux-amd64.tar.gz -o helm.tar.gz
+ curl -L https://get.helm.sh/helm-v3.14.4-linux-amd64.tar.gz -o helm.tar.gz
tar -xvzf helm.tar.gz
sudo mv linux-amd64/helm /usr/local/bin/
sudo chmod +x /usr/local/bin/helm
helm version
- name: Update kube-config staging
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/k8s_deploy'
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/tiler_monitoring'
run: aws eks --region us-east-1 update-kubeconfig --name osmseed-staging
- name: Update kube-config prod
if: github.ref == 'refs/heads/main'
run: aws eks --region us-east-1 update-kubeconfig --name osmseed-production-v2
- name: Add Helm repository
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy'
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring'
run: |
helm repo add osm-seed https://osm-seed.github.io/osm-seed-chart/
helm repo update
- name: Install helm dependencies for
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/k8s_deploy'
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/main' || github.ref == 'refs/heads/tiler_monitoring'
run: cd ohm && helm dep up
# Staging
- name: Staging - helm deploy
- if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/k8s_deploy'
- run: helm upgrade --install staging --wait --timeout 10m ohm/ -f values.staging.yaml -f ohm/values.yaml
+ if: github.ref == 'refs/heads/staging' || github.ref == 'refs/heads/tiler_monitoring'
+ run: helm upgrade --install staging --wait ohm/ -f values.staging.yaml -f ohm/values.yaml
# Production
- name: Production - helm deploy
if: github.ref == 'refs/heads/main'
- run: helm upgrade --install production --wait --timeout 10m ohm/ -f values.production.yaml -f ohm/values.yaml
+ run: helm upgrade --install production --wait ohm/ -f values.production.yaml -f ohm/values.yaml
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
index 5ab9a7d6..69b81d66 100644
--- a/.gitignore
+++ b/.gitignore
@@ -33,8 +33,8 @@ hetzner/*/.envs.*.production
hetzner/traefik/cloudflare-ips.txt
hetzner/traefik/traefik.yml
.vscode/
+.claude/
imposm3.json
-
cachedir_reimport/
config_reimport.json
imposm3_reimport.json
\ No newline at end of file
diff --git a/compose/tiler.yml b/compose/tiler.yml
index 10ac3158..ca194dda 100644
--- a/compose/tiler.yml
+++ b/compose/tiler.yml
@@ -71,21 +71,24 @@ services:
# - ohm_network
- # tiler-monitor:
- # image: rub21/tiler-monitor:v1
- # build:
- # context: ../images/tiler-monitor
- # dockerfile: Dockerfile
- # volumes:
- # - /var/run/docker.sock:/var/run/docker.sock
- # - ../images/tiler-monitor:/app
- # - ../hetzner:/app/hetzner
- # environment:
- # - DOCKER_CONFIG_ENVIRONMENT=staging
- # env_file:
- # - ../envs/.env.tiler
- # stdin_open: true
- # tty: true
+ tiler-monitor:
+ image: rub21/tiler-monitor:v2
+ build:
+ context: ../images/tiler-monitor
+ dockerfile: Dockerfile
+ volumes:
+ - /var/run/docker.sock:/var/run/docker.sock
+ - ../hetzner:/app/hetzner
+ - tiler_monitor_data:/data
+ ports:
+ - "8001:8001"
+ environment:
+ - TILER_MONITORING_DOCKER_CONFIG_ENVIRONMENT=staging
+ env_file:
+ - ../envs/.env.tiler
+ restart: always
+ networks:
+ - ohm_network
networks:
ohm_network:
@@ -99,3 +102,7 @@ volumes:
tiler_imposm_data:
driver: local
name: tiler_imposm
+
+ tiler_monitor_data:
+ driver: local
+ name: tiler_monitor
diff --git a/hetzner/tiler/tiler.production.yml b/hetzner/tiler/tiler.production.yml
index ec45173d..6a719941 100644
--- a/hetzner/tiler/tiler.production.yml
+++ b/hetzner/tiler/tiler.production.yml
@@ -120,11 +120,12 @@ services:
tiler_monitor:
container_name: tiler_monitor
- image: ghcr.io/openhistoricalmap/tiler-monitor:0.0.1-0.dev.git.2874.ha9bff68
+ image: ghcr.io/openhistoricalmap/tiler-monitor:0.0.1-0.dev.git.3353.h63df1944
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- - ../../images/tiler-monitor:/app
+ # - ../../images/tiler-monitor:/app
- ../../hetzner:/app/hetzner
+ - tiler_monitor_data:/data
environment:
- DOCKER_CONFIG_ENVIRONMENT=production
stdin_open: true
@@ -141,6 +142,9 @@ volumes:
tiler_imposm_data:
driver: local
name: tiler_imposm_17_03
+ tiler_monitor_data:
+ driver: local
+ name: tiler_monitor_data
networks:
ohm_network:
diff --git a/hetzner/traefik/traefik.template.yml b/hetzner/traefik/traefik.template.yml
index b2f64c91..b10905f5 100644
--- a/hetzner/traefik/traefik.template.yml
+++ b/hetzner/traefik/traefik.template.yml
@@ -161,6 +161,14 @@ http:
middlewares:
- secure-headers
+ tiler-monitoring-router:
+ rule: Host(`tiler-monitoring.{{OHM_DOMAIN}}`)
+ entryPoints:
+ - port-web
+ service: tiler_monitor
+ middlewares:
+ - secure-headers
+
services:
tiler_server:
loadBalancer:
@@ -207,6 +215,11 @@ http:
servers:
- url: http://cadvisor:8080
+ tiler_monitor:
+ loadBalancer:
+ servers:
+ - url: http://tiler-monitor:8001
+
providers:
file:
filename: /etc/traefik/traefik.yml
diff --git a/images/tiler-monitor/Dockerfile b/images/tiler-monitor/Dockerfile
index 0aa466bd..ca68d327 100644
--- a/images/tiler-monitor/Dockerfile
+++ b/images/tiler-monitor/Dockerfile
@@ -1,14 +1,42 @@
-FROM docker:cli
+FROM python:3.12-slim
-RUN apk add --no-cache \
- bash \
- curl \
- postgresql-client \
- docker-cli-compose
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends \
+ curl \
+ bash \
+ postgresql-client && \
+ rm -rf /var/lib/apt/lists/*
+
+# Install Docker CLI from official static binaries
+RUN ARCH=$(dpkg --print-architecture) && \
+ if [ "$ARCH" = "amd64" ]; then DOCKER_ARCH="x86_64"; else DOCKER_ARCH="aarch64"; fi && \
+ curl -fsSL "https://download.docker.com/linux/static/stable/${DOCKER_ARCH}/docker-27.5.1.tgz" | \
+ tar xz --strip-components=1 -C /usr/local/bin docker/docker
+
+# Install Docker Compose plugin
+RUN ARCH=$(dpkg --print-architecture) && \
+ if [ "$ARCH" = "amd64" ]; then COMPOSE_ARCH="x86_64"; else COMPOSE_ARCH="aarch64"; fi && \
+ mkdir -p /usr/local/lib/docker/cli-plugins && \
+ curl -fsSL "https://github.com/docker/compose/releases/download/v2.32.4/docker-compose-linux-${COMPOSE_ARCH}" \
+ -o /usr/local/lib/docker/cli-plugins/docker-compose && \
+ chmod +x /usr/local/lib/docker/cli-plugins/docker-compose
WORKDIR /app
-COPY monitor_languages.sh .
-RUN chmod +x monitor_languages.sh
+# Install Python dependencies for pipeline monitor
+COPY pipeline-monitor/requirements.txt /app/pipeline-monitor/requirements.txt
+RUN pip install --no-cache-dir -r /app/pipeline-monitor/requirements.txt
+
+# Copy application code
+COPY language-monitor/ /app/language-monitor/
+COPY pipeline-monitor/ /app/pipeline-monitor/
+COPY entrypoint.sh /app/entrypoint.sh
+
+RUN chmod +x /app/entrypoint.sh /app/language-monitor/monitor_languages.sh
+
+EXPOSE 8001
+
+HEALTHCHECK --interval=30s --timeout=10s --retries=3 --start-period=30s \
+ CMD curl -f http://localhost:8001/health || exit 1
-CMD ["bash", "monitor_languages.sh"]
+CMD ["/app/entrypoint.sh"]
diff --git a/images/tiler-monitor/entrypoint.sh b/images/tiler-monitor/entrypoint.sh
new file mode 100755
index 00000000..f1520d1c
--- /dev/null
+++ b/images/tiler-monitor/entrypoint.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+set -e
+
+echo "$(date +'%Y-%m-%d %H:%M:%S') - Starting tiler-monitor (combined)"
+
+# Start language monitor in background
+echo "$(date +'%Y-%m-%d %H:%M:%S') - Starting language monitor in background..."
+bash /app/language-monitor/monitor_languages.sh &
+
+# Start pipeline monitor in foreground
+echo "$(date +'%Y-%m-%d %H:%M:%S') - Starting pipeline monitor (FastAPI on port 8001)..."
+cd /app/pipeline-monitor
+exec python monitor.py
diff --git a/images/tiler-monitor/monitor_languages.sh b/images/tiler-monitor/language-monitor/monitor_languages.sh
similarity index 90%
rename from images/tiler-monitor/monitor_languages.sh
rename to images/tiler-monitor/language-monitor/monitor_languages.sh
index 84ae9913..978df24a 100755
--- a/images/tiler-monitor/monitor_languages.sh
+++ b/images/tiler-monitor/language-monitor/monitor_languages.sh
@@ -11,9 +11,10 @@ log_message() {
PG_CONNECTION="postgresql://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"
-NIM_NUMBER_LANGUAGES="${NIM_NUMBER_LANGUAGES:-5}" # Default to 5 languages
-FORCE_LANGUAGES_GENERATION="${FORCE_LANGUAGES_GENERATION:-false}"
-EVALUATION_INTERVAL="${EVALUATION_INTERVAL:-3600}" # Default to 1 hour
+NIM_NUMBER_LANGUAGES="${TILER_MONITORING_NIM_NUMBER_LANGUAGES:-5}" # Default to 5 languages
+FORCE_LANGUAGES_GENERATION="${TILER_MONITORING_FORCE_LANGUAGES_GENERATION:-false}"
+EVALUATION_INTERVAL="${TILER_MONITORING_EVALUATION_INTERVAL:-3600}" # Default to 1 hour
+DOCKER_CONFIG_ENVIRONMENT="${TILER_MONITORING_DOCKER_CONFIG_ENVIRONMENT:-staging}"
log_message "Configuration Summary:"
log_message " Environment: $DOCKER_CONFIG_ENVIRONMENT"
diff --git a/images/tiler-monitor/pipeline-monitor/checks/__init__.py b/images/tiler-monitor/pipeline-monitor/checks/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/images/tiler-monitor/pipeline-monitor/checks/imposm_import.py b/images/tiler-monitor/pipeline-monitor/checks/imposm_import.py
new file mode 100644
index 00000000..3dae996a
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/checks/imposm_import.py
@@ -0,0 +1,1356 @@
+"""Pipeline check: changeset-centric verification.
+
+For each changeset in the 1-2 hour window:
+ 1. Check if minute replication covers it (replication timestamp >= closed_at)
+ 2. Check if its way/relation elements exist in the tiler DB with the correct version
+ 3. For a random sample: verify materialized views + S3 tile cache
+"""
+
+import json
+import logging
+import os
+import random
+import xml.etree.ElementTree as ET
+from datetime import datetime, timezone, timedelta
+
+import psycopg2
+import requests
+
+from config import Config
+import retry_store
+
+logger = logging.getLogger(__name__)
+
+# Load table/view mapping from JSON config
+_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "tables_config.json")
+with open(_config_path) as f:
+ _tables_config = json.load(f)
+
+OHM_BASE = None # lazily computed
+
+
+def _ohm_base():
+ global OHM_BASE
+ if OHM_BASE is None:
+ OHM_BASE = Config.OHM_API_BASE.replace("/api/0.6", "")
+ return OHM_BASE
+
+
+def _parse_timestamp(ts_str):
+ """Parse an ISO timestamp string to a timezone-aware datetime."""
+ ts_str = ts_str.replace("Z", "+00:00")
+ return datetime.fromisoformat(ts_str)
+
+
+def _relative_age(ts_str):
+ """Return a human-readable relative age string like '4h ago' or '25m ago'."""
+ try:
+ dt = _parse_timestamp(ts_str)
+ delta = datetime.now(timezone.utc) - dt
+ total_seconds = int(delta.total_seconds())
+ if total_seconds < 60:
+ return f"{total_seconds}s ago"
+ minutes = total_seconds // 60
+ if minutes < 60:
+ return f"{minutes}m ago"
+ hours = minutes // 60
+ remaining_min = minutes % 60
+ if hours < 24:
+ return f"{hours}h{remaining_min}m ago" if remaining_min else f"{hours}h ago"
+ days = hours // 24
+ remaining_hours = hours % 24
+ return f"{days}d{remaining_hours}h ago" if remaining_hours else f"{days}d ago"
+ except Exception:
+ return ""
+
+
+# ---------------------------------------------------------------------------
+# Step 0: get changesets in the age window
+# ---------------------------------------------------------------------------
+
+def _get_changesets_in_window(min_age, max_age, limit=10):
+ """Fetch closed changesets whose age is between min_age and max_age seconds.
+
+ Fetches recent changesets and filters locally by age window.
+ """
+ now = datetime.now(timezone.utc)
+ min_closed = now - timedelta(seconds=max_age) # oldest allowed
+ max_closed = now - timedelta(seconds=min_age) # newest allowed
+
+ # Fetch enough to find some in the window
+ fetch_limit = 100
+ url = f"{Config.OHM_API_BASE}/changesets"
+ params = {"limit": fetch_limit, "closed": "true"}
+ headers = {"User-Agent": "ohm-pipeline-monitor/1.0"}
+
+ print(f"[pipeline] Fetching changesets: {url}?limit={fetch_limit}&closed=true")
+ print(f" Looking for changesets closed between "
+ f"{min_closed.strftime('%Y-%m-%dT%H:%M:%SZ')} and "
+ f"{max_closed.strftime('%Y-%m-%dT%H:%M:%SZ')} "
+ f"(age {min_age//60}-{max_age//60} min)")
+
+ resp = requests.get(url, params=params, headers=headers, timeout=30)
+ resp.raise_for_status()
+
+ root = ET.fromstring(resp.content)
+ changesets = []
+ skipped_young = 0
+ skipped_old = 0
+
+ for cs in root.findall("changeset"):
+ cs_id = int(cs.attrib["id"])
+ closed_at = cs.attrib.get("closed_at", "")
+ if not closed_at:
+ continue
+ try:
+ closed_dt = _parse_timestamp(closed_at)
+ except (ValueError, TypeError):
+ continue
+
+ age_minutes = (now - closed_dt).total_seconds() / 60
+
+ if closed_dt > max_closed:
+ skipped_young += 1
+ continue
+ elif closed_dt < min_closed:
+ skipped_old += 1
+ # Changesets are ordered by newest first, so once we hit old ones, stop
+ break
+ else:
+ changesets.append({
+ "id": cs_id,
+ "created_at": cs.attrib.get("created_at", ""),
+ "closed_at": closed_at,
+ "closed_dt": closed_dt,
+ "age_minutes": round(age_minutes, 1),
+ })
+
+ if len(changesets) >= limit:
+ break
+
+ print(f" Fetched {len(root.findall('changeset'))} changesets from API")
+ print(f" Skipped: {skipped_young} too young (<{min_age//60}min), "
+ f"{skipped_old} too old (>{max_age//60}min)")
+ print(f" Found {len(changesets)} changesets in window:")
+ for cs in changesets:
+ print(f" changeset {cs['id']}: closed_at={cs['closed_at']} "
+ f"(age={cs['age_minutes']}min)")
+
+ return changesets
+
+
+# ---------------------------------------------------------------------------
+# Step 1: replication check
+# ---------------------------------------------------------------------------
+
+def _parse_replication_state(text):
+ """Parse state.txt and return (sequence, timestamp)."""
+ data = {}
+ for line in text.strip().splitlines():
+ if "=" in line:
+ key, _, value = line.partition("=")
+ data[key.strip()] = value.strip()
+ seq = int(data.get("sequenceNumber", 0))
+ ts_raw = data.get("timestamp", "").replace("\\:", ":")
+ try:
+ ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
+ except ValueError:
+ ts = None
+ return seq, ts
+
+
+def _check_replication_covers(changeset, repl_seq, repl_ts):
+ """Check if the replication state covers this changeset."""
+ if repl_ts is None:
+ return {
+ "status": "warning",
+ "message": "Cannot parse replication timestamp",
+ }
+
+ closed_dt = changeset["closed_dt"]
+ if repl_ts >= closed_dt:
+ return {
+ "status": "ok",
+ "message": (f"Replication covers this changeset "
+ f"(repl_ts={repl_ts.isoformat()} >= closed_at={changeset['closed_at']})"),
+ "replication_sequence": repl_seq,
+ "replication_timestamp": repl_ts.isoformat(),
+ }
+ else:
+ lag = (closed_dt - repl_ts).total_seconds()
+ return {
+ "status": "critical",
+ "message": (f"Replication does NOT cover this changeset. "
+ f"Replication is {round(lag/60, 1)}min behind "
+ f"(repl_ts={repl_ts.isoformat()} < closed_at={changeset['closed_at']})"),
+ "replication_sequence": repl_seq,
+ "replication_timestamp": repl_ts.isoformat(),
+ }
+
+
+# ---------------------------------------------------------------------------
+# Step 2: tiler DB check
+# ---------------------------------------------------------------------------
+
+def _get_changeset_elements(changeset_id):
+ """Download changeset diff and extract way/relation elements with versions."""
+ url = f"{Config.OHM_API_BASE}/changeset/{changeset_id}/download"
+ headers = {"User-Agent": "ohm-pipeline-monitor/1.0"}
+ resp = requests.get(url, headers=headers, timeout=30)
+ resp.raise_for_status()
+
+ root = ET.fromstring(resp.content)
+ elements = []
+
+ for action in root: # create, modify, delete
+ action_type = action.tag
+ for elem in action:
+ osm_id = elem.attrib.get("id")
+ version = elem.attrib.get("version")
+ elem_type = elem.tag
+ if osm_id and elem_type in ("way", "relation"):
+ # Extract tags to determine which imposm table this element belongs to
+ tags = {}
+ for tag in elem.findall("tag"):
+ k = tag.attrib.get("k")
+ v = tag.attrib.get("v")
+ if k and v:
+ tags[k] = v
+ timestamp = elem.attrib.get("timestamp", "")
+ # Count nodes for ways (to detect invalid geometries)
+ node_count = len(elem.findall("nd")) if elem_type == "way" else 0
+ elements.append({
+ "type": elem_type,
+ "osm_id": int(osm_id),
+ "version": int(version) if version else None,
+ "action": action_type,
+ "tags": tags,
+ "timestamp": timestamp,
+ "node_count": node_count,
+ })
+ return elements
+
+
+
+# Loaded from tables_config.json
+TAG_TO_CHECK = _tables_config["tag_to_check"]
+
+# Reject rules: tag key -> list of values that imposm rejects (not imported)
+_REJECT_VALUES = _tables_config.get("reject_values", {})
+
+# Relation types that imposm actually imports (multipolygon, boundary, route, street)
+# Relations with other type= values (e.g. site, associatedStreet) are ignored by imposm
+_IMPORTABLE_RELATION_TYPES = set(_tables_config.get("importable_relation_types", []))
+
+# Split config keys into simple tags ("highway") and key=value tags ("type=street")
+_SIMPLE_TAGS = {}
+_KV_TAGS = {}
+for key, val in TAG_TO_CHECK.items():
+ if "=" in key:
+ _KV_TAGS[key] = val
+ else:
+ _SIMPLE_TAGS[key] = val
+
+
+def _has_compatible_tables(entry, elem_type):
+ """Return True if the entry has at least one table compatible with the element type.
+
+ - Nodes can only be stored in tables ending with '_points'.
+ - Ways and relations need tables ending with '_lines', '_areas',
+ '_multilines', '_relation_members', or other non-point suffixes.
+ """
+ if elem_type == "node":
+ return any(t.endswith("_points") for t in entry.get("tables", []))
+ # way or relation: needs at least one non-points table
+ return any(not t.endswith("_points") for t in entry.get("tables", []))
+
+
+def _matching_entries(elem):
+ """Return matching tag_to_check entries for this element's tags.
+
+ Skips tags whose value is rejected by imposm (e.g. natural=coastline).
+ Skips entries whose tables are incompatible with the element type
+ (e.g. a way matching 'shop' which only has point tables).
+ Other mappable tags on the same element are still matched.
+ """
+ tags = elem.get("tags", {})
+ elem_type = elem.get("type", "")
+ entries = []
+ # Simple tags: match if tag key exists (e.g. "highway"),
+ # but skip if the value is in the reject list for that key
+ for tag_key in tags:
+ if tag_key in _SIMPLE_TAGS:
+ rejected = _REJECT_VALUES.get(tag_key, [])
+ if tags[tag_key] in rejected:
+ continue
+ entry = _SIMPLE_TAGS[tag_key]
+ if not _has_compatible_tables(entry, elem_type):
+ continue
+ entries.append(entry)
+ # Key=value tags: match if tag key AND value match (e.g. "type=street")
+ for kv, entry in _KV_TAGS.items():
+ k, v = kv.split("=", 1)
+ if tags.get(k) == v:
+ if not _has_compatible_tables(entry, elem_type):
+ continue
+ entries.append(entry)
+ return entries
+
+
+def _has_mappable_tags(elem):
+ """Return True if the element has at least one tag that imposm imports."""
+ return len(_matching_entries(elem)) > 0
+
+
+def _has_invalid_geometry(elem):
+ """Return True if the element has a geometry that imposm will reject.
+
+ Cases detected:
+ - Ways with area=yes but fewer than 4 nodes (need 3 unique + closing node
+ to form a valid polygon).
+ - Ways (non-area) with fewer than 2 nodes.
+ """
+ if elem.get("type") != "way":
+ return False
+ node_count = elem.get("node_count", 0)
+ if node_count == 0:
+ return False # no node info available, don't skip
+ tags = elem.get("tags", {})
+ if tags.get("area") == "yes":
+ # A polygon needs at least 4 nodes (3 unique points + closing node)
+ return node_count < 4
+ # A line needs at least 2 nodes
+ return node_count < 2
+
+
+def _is_non_importable_relation(elem):
+ """Return True if the element is a relation with a type that imposm does not import.
+
+ Imposm only processes certain relation types (multipolygon, boundary, route, street).
+ Relations with other type= values (e.g. site, associatedStreet) are ignored.
+ """
+ if elem.get("type") != "relation":
+ return False
+ tags = elem.get("tags", {})
+ rel_type = tags.get("type", "")
+ if not rel_type:
+ return False # no type tag, don't skip (could be imported by other means)
+ return rel_type not in _IMPORTABLE_RELATION_TYPES
+
+
+def _get_candidate_tables(elem):
+ """Return the specific tables where this element should exist based on its tags."""
+ tables = set()
+ for entry in _matching_entries(elem):
+ tables.update(entry["tables"])
+ return list(tables)
+
+
+def _get_candidate_views(elem):
+ """Return the specific views where this element should exist based on its tags.
+
+ Returns a list of (view_name, column, id_mode) tuples.
+ id_mode is 'members' for views that store member way IDs (positive),
+ or 'standard' for views that store osm_id (negative for relations).
+ """
+ views = {}
+ for entry in _matching_entries(elem):
+ col = entry.get("view_column", "osm_id")
+ id_mode = entry.get("view_id_mode", "standard")
+ for v in entry["views"]:
+ views[v] = (col, id_mode)
+ return [(v, col, mode) for v, (col, mode) in views.items()]
+
+
+def _build_union_query(tables, search_id):
+ """Build a UNION ALL query to search osm_id across multiple tables in 1 round-trip."""
+ parts = []
+ for table in tables:
+ parts.append(
+ f"(SELECT '{table}' AS tbl "
+ f"FROM {table} WHERE osm_id = {int(search_id)} LIMIT 1)"
+ )
+ return " UNION ALL ".join(parts)
+
+
+def _check_element_in_tables(conn, elem):
+ """Check if an element exists in tiler DB tables using a single UNION ALL query."""
+ osm_id = elem["osm_id"]
+ search_id = -osm_id if elem["type"] == "relation" else osm_id
+ candidate_tables = _get_candidate_tables(elem)
+
+ cur = conn.cursor()
+
+ # Get existing tables (cached per connection would be ideal, but simple first)
+ cur.execute("""
+ SELECT table_name FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_name LIKE 'osm_%%'
+ """)
+ existing_tables = {row[0] for row in cur.fetchall()}
+
+ if candidate_tables:
+ # Normal path: filter to candidate tables that exist
+ tables = [t for t in candidate_tables if t in existing_tables]
+ else:
+ # Retry path: no tags available, search ALL osm_* tables
+ tables = sorted(existing_tables)
+
+ if not tables:
+ cur.close()
+ return {
+ "type": elem["type"],
+ "osm_id": osm_id,
+ "action": elem["action"],
+ "found_in_tables": [],
+ "found_in_views": [],
+ "url": f"{_ohm_base()}/{elem['type']}/{elem['osm_id']}",
+ }
+
+ # Single UNION ALL query across all candidate tables
+ query = _build_union_query(tables, search_id)
+ found_in_tables = []
+
+ try:
+ cur.execute(query)
+ for row in cur.fetchall():
+ found_in_tables.append(row[0])
+ except Exception:
+ conn.rollback()
+
+ cur.close()
+
+ return {
+ "type": elem["type"],
+ "osm_id": osm_id,
+ "action": elem["action"],
+ "found_in_tables": found_in_tables,
+ "found_in_views": [],
+ "url": f"{_ohm_base()}/{elem['type']}/{elem['osm_id']}",
+ }
+
+
+def _check_element_in_views(conn, elem, check):
+ """Check if an element exists in materialized views using a single UNION ALL query."""
+ osm_id = check["osm_id"]
+ is_relation = check["type"] == "relation"
+
+ candidate_views = _get_candidate_views(elem)
+
+ if not candidate_views:
+ return check
+
+ cur = conn.cursor()
+
+ # Filter to existing views
+ cur.execute("""
+ SELECT matviewname FROM pg_matviews
+ WHERE schemaname = 'public' AND matviewname LIKE 'mv_%%'
+ """)
+ existing_views = {row[0] for row in cur.fetchall()}
+
+ view_info = [(v, col, mode) for v, col, mode in candidate_views if v in existing_views]
+ missing_views = [v for v, _, _ in candidate_views if v not in existing_views]
+ if missing_views:
+ logger.debug(f"Views not found in DB for {check['type']}/{osm_id}: {missing_views}")
+
+ if not view_info:
+ cur.close()
+ return check
+
+ # For 'members' mode views (routes): the view stores member way IDs,
+ # so we need to find which way IDs belong to this relation.
+ # For 'standard' mode: use osm_id (negative for relations).
+ member_way_ids = None
+
+ # Build UNION ALL query, grouping by search strategy
+ parts = []
+ for view, col, id_mode in sorted(view_info):
+ if id_mode == "members" and is_relation:
+ # Fetch member way IDs from the route table if not already done
+ if member_way_ids is None:
+ member_way_ids = _get_relation_member_ids(conn, osm_id)
+ if member_way_ids:
+ ids_list = ", ".join(str(mid) for mid in member_way_ids)
+ parts.append(
+ f"(SELECT '{view}' AS vw FROM {view} "
+ f"WHERE {col} IN ({ids_list}) LIMIT 1)"
+ )
+ else:
+ search_id = -osm_id if is_relation else osm_id
+ parts.append(
+ f"(SELECT '{view}' AS vw FROM {view} "
+ f"WHERE {col} = {int(search_id)} LIMIT 1)"
+ )
+
+ found_in_views = []
+ if parts:
+ query = " UNION ALL ".join(parts)
+ try:
+ cur.execute(query)
+ for row in cur.fetchall():
+ found_in_views.append(row[0])
+ except Exception as e:
+ logger.warning(f"View query failed for {check['type']}/{osm_id} in "
+ f"{[v for v,_,_ in view_info]}: {e}")
+ conn.rollback()
+
+ cur.close()
+ check["found_in_views"] = found_in_views
+ return check
+
+
+def _get_relation_member_ids(conn, relation_osm_id):
+ """Get member way IDs for a relation from osm_route_multilines."""
+ cur = conn.cursor()
+ try:
+ cur.execute("""
+ SELECT DISTINCT member
+ FROM osm_route_multilines
+ WHERE osm_id = %s
+ """, (-relation_osm_id,))
+ ids = [row[0] for row in cur.fetchall()]
+ return ids
+ except Exception as e:
+ logger.debug(f"Could not get members for relation {relation_osm_id}: {e}")
+ conn.rollback()
+ return []
+ finally:
+ cur.close()
+
+
+def _is_element_deleted(elem):
+ """Check if an element has been deleted in OHM (visible=false or 410 Gone)."""
+ url = f"{Config.OHM_API_BASE}/{elem['type']}/{elem['osm_id']}"
+ headers = {"User-Agent": "ohm-pipeline-monitor/1.0"}
+ try:
+ resp = requests.get(url, headers=headers, timeout=15)
+ if resp.status_code == 410:
+ return True
+ if resp.status_code == 200:
+ root = ET.fromstring(resp.content)
+ el = root.find(elem["type"])
+ if el is not None and el.attrib.get("visible") == "false":
+ return True
+ return False
+ except Exception:
+ return False
+
+
+def _get_current_element_tags(element_type, osm_id):
+ """Fetch the latest version of an element from OHM API and return its tags.
+
+ Returns None if the element is deleted/gone or the request fails.
+ Returns a dict of tags if the element exists.
+ """
+ url = f"{Config.OHM_API_BASE}/{element_type}/{osm_id}"
+ headers = {"User-Agent": "ohm-pipeline-monitor/1.0"}
+ try:
+ resp = requests.get(url, headers=headers, timeout=15)
+ if resp.status_code == 410:
+ return None
+ if resp.status_code == 200:
+ root = ET.fromstring(resp.content)
+ el = root.find(element_type)
+ if el is not None:
+ if el.attrib.get("visible") == "false":
+ return None
+ tags = {}
+ for tag in el.findall("tag"):
+ k = tag.attrib.get("k")
+ v = tag.attrib.get("v")
+ if k and v:
+ tags[k] = v
+ return tags
+ return None
+ except Exception:
+ return None
+
+
+def _get_current_element_info(element_type, osm_id):
+ """Fetch the latest version of an element from OHM API.
+
+ Returns None if the element is deleted/gone or the request fails.
+ Returns a dict with 'tags' and 'node_count' (for ways) if the element exists.
+ """
+ url = f"{Config.OHM_API_BASE}/{element_type}/{osm_id}"
+ headers = {"User-Agent": "ohm-pipeline-monitor/1.0"}
+ try:
+ resp = requests.get(url, headers=headers, timeout=15)
+ if resp.status_code == 410:
+ return None
+ if resp.status_code == 200:
+ root = ET.fromstring(resp.content)
+ el = root.find(element_type)
+ if el is not None:
+ if el.attrib.get("visible") == "false":
+ return None
+ tags = {}
+ for tag in el.findall("tag"):
+ k = tag.attrib.get("k")
+ v = tag.attrib.get("v")
+ if k and v:
+ tags[k] = v
+ node_count = len(el.findall("nd")) if element_type == "way" else 0
+ return {"tags": tags, "node_count": node_count}
+ return None
+ except Exception:
+ return None
+
+
+def _check_elements_in_db(conn, changeset_id, changeset_closed_at=None, already_checked=None):
+ """Check all elements of a changeset in the tiler DB.
+
+ - ALL elements: verified in osm_* tables (fast, tag-filtered)
+ - SAMPLE elements: full check → tables + views + S3 tile cache
+
+ If *already_checked* is a set of (type, osm_id) tuples, elements that
+ were already verified in a newer changeset are skipped to avoid
+ duplicate reporting.
+ """
+ from checks.tile_cache import check_tile_cache_for_element
+
+ try:
+ elements = _get_changeset_elements(changeset_id)
+ except requests.RequestException as e:
+ return {
+ "status": "critical",
+ "message": f"Failed to download changeset diff: {e}",
+ "elements": [],
+ }
+
+ if not elements:
+ return {
+ "status": "ok",
+ "message": "No way/relation elements in this changeset",
+ "elements": [],
+ }
+
+ # Filter elements: skip those without mappable tags or with invalid geometry
+ checkable_elements = []
+ for elem in elements:
+ if not _has_mappable_tags(elem):
+ continue
+ if _has_invalid_geometry(elem):
+ tags = elem.get("tags", {})
+ print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"-> invalid geometry (area={tags.get('area', 'no')}, "
+ f"nodes={elem.get('node_count', '?')}), imposm will reject")
+ continue
+ if _is_non_importable_relation(elem):
+ rel_type = elem.get("tags", {}).get("type", "")
+ print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"-> relation type={rel_type} is not imported by imposm")
+ continue
+ checkable_elements.append(elem)
+
+ if not checkable_elements:
+ return {
+ "status": "ok",
+ "message": "No importable elements in this changeset",
+ "elements": [],
+ }
+
+ # Deduplicate: skip elements already checked in a newer changeset
+ if already_checked is not None:
+ deduped = []
+ for elem in checkable_elements:
+ key = (elem["type"], elem["osm_id"])
+ if key in already_checked:
+ print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"-> already checked in a newer changeset")
+ continue
+ deduped.append(elem)
+ checkable_elements = deduped
+
+ if not checkable_elements:
+ return {
+ "status": "ok",
+ "message": "All elements already checked in newer changesets",
+ "elements": [],
+ }
+
+ # Select random sample for full pipeline check (tables + views + S3)
+ # Only sample from create/modify elements
+ import math
+ create_modify = [e for e in checkable_elements if e["action"] != "delete"]
+ sample_size = max(1, math.ceil(len(create_modify) * Config.FULL_CHECK_SAMPLE_PCT / 100))
+ sample_size = min(sample_size, len(create_modify))
+ sample_ids = set()
+ if create_modify:
+ sample_ids = {e["osm_id"] for e in random.sample(create_modify, sample_size)}
+
+ print(f" [tiler_db] Checking {len(checkable_elements)} elements "
+ f"(full pipeline check on {sample_size}/{len(create_modify)} = {Config.FULL_CHECK_SAMPLE_PCT}% sampled)")
+
+ missing = []
+ not_deleted = []
+ checked = []
+ tile_cache_results = []
+
+ for elem in checkable_elements:
+ is_sample = elem["osm_id"] in sample_ids
+ sample_label = " [SAMPLE]" if is_sample else ""
+ ts_info = f" created={elem['timestamp']} ({_relative_age(elem['timestamp'])})" if elem.get("timestamp") else ""
+
+ # Step 1: Check tables
+ check = _check_element_in_tables(conn, elem)
+ tables = check["found_in_tables"]
+
+ if elem["action"] == "delete":
+ # DELETE: element should NOT be in the DB
+ if tables:
+ print(f" NOT_DELETED{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"(delete){ts_info} -> still in tables: {tables}")
+ print(f" {check['url']}")
+ not_deleted.append(f"{elem['type']}/{elem['osm_id']}")
+ else:
+ print(f" OK{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"(delete){ts_info} -> correctly removed")
+ checked.append(check)
+ continue
+
+ # CREATE / MODIFY: element should be in the DB
+ # Step 2: Check views
+ if tables:
+ check = _check_element_in_views(conn, elem, check)
+
+ checked.append(check)
+ views = check["found_in_views"]
+
+ if tables:
+ print(f" OK{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"({elem['action']}){ts_info}")
+ print(f" tables: {tables}")
+ print(f" views: {views}")
+ print(f" {check['url']}")
+
+ # Step 3: Check S3 tile cache (SAMPLE only)
+ if is_sample and changeset_closed_at and Config.S3_BUCKET_CACHE_TILER:
+ try:
+ tile_result = check_tile_cache_for_element(
+ conn, check, changeset_closed_at
+ )
+ tile_cache_results.append(tile_result)
+ cache_status = tile_result.get("cache", {}).get("status", "unknown")
+ tile_info = tile_result.get("tile", {})
+ if cache_status == "stale":
+ print(f" [S3 CACHE] STALE tile z{tile_info.get('z')}/{tile_info.get('x')}/{tile_info.get('y')}")
+ elif cache_status == "ok":
+ print(f" [S3 CACHE] OK tile z{tile_info.get('z')}/{tile_info.get('x')}/{tile_info.get('y')}")
+ elif cache_status == "skipped":
+ print(f" [S3 CACHE] skipped: {tile_result.get('cache', {}).get('message', '')}")
+ except Exception as e:
+ print(f" [S3 CACHE] error: {e}")
+ else:
+ # Not found — check if deleted in a later changeset
+ if _is_element_deleted(elem):
+ print(f" SKIP {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"({elem['action']}){ts_info} -> deleted in a later changeset")
+ print(f" {check['url']}")
+ check["deleted"] = True
+ continue
+
+ print(f" MISSING{sample_label} {elem['type']}/{elem['osm_id']} v{elem['version']} "
+ f"({elem['action']}){ts_info} -> NOT in tables, queued for retry")
+ print(f" {check['url']}")
+ retry_store.add_missing(
+ changeset_id, elem["type"], elem["osm_id"], Config.MAX_RETRIES,
+ version=elem.get("version", 0), action=elem.get("action", ""),
+ closed_at=changeset_closed_at or "",
+ )
+ missing.append(f"{elem['type']}/{elem['osm_id']}")
+
+ # Build status message
+ stale_tiles = [r for r in tile_cache_results if r.get("cache", {}).get("status") == "stale"]
+
+ problems_parts = []
+ # Missing elements are queued for retry, only warn about other issues
+ if missing:
+ problems_parts.append(f"Queued for retry: {', '.join(missing)}")
+ if not_deleted:
+ problems_parts.append(f"Not deleted from tiler DB: {', '.join(not_deleted)}")
+
+ if not_deleted:
+ status = "warning"
+ msg = ". ".join(problems_parts)
+ elif missing:
+ status = "retry_pending"
+ msg = ". ".join(problems_parts)
+ elif stale_tiles:
+ status = "warning"
+ stale_ids = [f"{r['type']}/{r['osm_id']}" for r in stale_tiles]
+ msg = (f"All {len(checked)} elements in tables, "
+ f"but S3 tile cache stale for: {', '.join(stale_ids)}")
+ else:
+ status = "ok"
+ msg = f"All {len(checked)} elements verified in tiler DB"
+ if tile_cache_results:
+ msg += f" (S3 cache OK for {len(tile_cache_results)} sampled)"
+
+ return {
+ "status": status,
+ "message": msg,
+ "elements": checked,
+ "tile_cache": tile_cache_results,
+ }
+
+
+# ---------------------------------------------------------------------------
+# Main pipeline check (scheduled)
+# ---------------------------------------------------------------------------
+
+def check_pipeline():
+ """Check the full pipeline for changesets in the 1-2 hour age window.
+
+ For each changeset:
+ 1. Is it covered by minute replication?
+ 2. Are its elements in the tiler DB?
+ """
+ now = datetime.now(timezone.utc)
+ min_age = Config.CHANGESET_MIN_AGE
+ max_age = Config.CHANGESET_MAX_AGE
+
+ result = {
+ "name": "pipeline",
+ "status": "ok",
+ "message": "",
+ "details": {
+ "window": f"{min_age//60}-{max_age//60} minutes",
+ "replication": {},
+ "changesets": [],
+ },
+ "checked_at": now.isoformat(),
+ }
+
+ # --- Fetch replication state ---
+ repl_seq, repl_ts = None, None
+ try:
+ resp = requests.get(Config.REPLICATION_STATE_URL, timeout=15)
+ resp.raise_for_status()
+ repl_seq, repl_ts = _parse_replication_state(resp.text)
+ result["details"]["replication"] = {
+ "status": "ok",
+ "sequence": repl_seq,
+ "timestamp": repl_ts.isoformat() if repl_ts else None,
+ }
+ if repl_ts:
+ lag_min = (now - repl_ts).total_seconds() / 60
+ result["details"]["replication"]["lag_minutes"] = round(lag_min, 1)
+ print(f"\n[pipeline] Replication state: seq={repl_seq}, "
+ f"ts={repl_ts.isoformat()}, lag={lag_min:.1f}min")
+ except requests.RequestException as e:
+ result["details"]["replication"] = {
+ "status": "critical",
+ "message": f"Failed to fetch replication state: {e}",
+ }
+ print(f"\n[pipeline] WARNING: Cannot fetch replication state: {e}")
+
+ # --- Get changesets in window ---
+ try:
+ changesets = _get_changesets_in_window(
+ min_age=min_age,
+ max_age=max_age,
+ limit=Config.CHANGESET_LIMIT,
+ )
+ except requests.RequestException as e:
+ result["status"] = "critical"
+ result["message"] = f"Failed to fetch changesets from OHM API: {e}"
+ return result
+
+ if not changesets:
+ result["message"] = (
+ f"No changesets found in the {min_age//60}-{max_age//60} minute window"
+ )
+ print(f"[pipeline] {result['message']}")
+ return result
+
+ print(f"[pipeline] Found {len(changesets)} changesets in "
+ f"{min_age//60}-{max_age//60}min window")
+
+ # --- Connect to tiler DB ---
+ conn = None
+ try:
+ conn = psycopg2.connect(
+ host=Config.POSTGRES_HOST,
+ port=Config.POSTGRES_PORT,
+ dbname=Config.POSTGRES_DB,
+ user=Config.POSTGRES_USER,
+ password=Config.POSTGRES_PASSWORD,
+ )
+ except psycopg2.Error as e:
+ result["status"] = "critical"
+ result["message"] = f"Cannot connect to tiler DB: {e}"
+ print(f"[pipeline] ERROR: Cannot connect to tiler DB: {e}")
+ return result
+
+ # --- Check each changeset through the pipeline ---
+ problems = []
+ skipped = 0
+ # Track already-checked elements to avoid duplicates across changesets.
+ # Changesets are ordered newest-first, so only the latest version is checked.
+ checked_elements = set() # (type, osm_id)
+
+ for cs in changesets:
+ # Skip changesets already checked with status OK
+ if retry_store.is_changeset_passed(cs["id"]):
+ skipped += 1
+ continue
+
+ print(f"\n[pipeline] === Changeset {cs['id']} === "
+ f"(closed_at={cs['closed_at']}, age={cs['age_minutes']}min)")
+ print(f" URL: {_ohm_base()}/changeset/{cs['id']}")
+
+ cs_result = {
+ "changeset_id": cs["id"],
+ "changeset_url": f"{_ohm_base()}/changeset/{cs['id']}",
+ "closed_at": cs["closed_at"],
+ "age_minutes": cs["age_minutes"],
+ "replication": {},
+ "tiler_db": {},
+ }
+
+ # Step 1: replication
+ if repl_seq is not None:
+ repl_check = _check_replication_covers(cs, repl_seq, repl_ts)
+ cs_result["replication"] = repl_check
+ print(f" [replication] {repl_check['status'].upper()}: {repl_check['message']}")
+
+ if repl_check["status"] != "ok":
+ problems.append(
+ f"Changeset {cs['id']}: replication not covering"
+ )
+ else:
+ cs_result["replication"] = {"status": "unknown", "message": "Replication state unavailable"}
+ print(f" [replication] UNKNOWN: Replication state unavailable")
+
+ # Step 2: tiler DB
+ db_check = _check_elements_in_db(conn, cs["id"], cs["closed_at"], already_checked=checked_elements)
+ cs_result["tiler_db"] = db_check
+ print(f" [tiler_db] {db_check['status'].upper()}: {db_check['message']}")
+
+ # Track checked elements to skip in older changesets
+ for elem in db_check.get("elements", []):
+ checked_elements.add((elem["type"], elem["osm_id"]))
+
+ if db_check["status"] not in ("ok", "retry_pending"):
+ problems.append(f"Changeset {cs['id']}: {db_check['message']}")
+
+ result["details"]["changesets"].append(cs_result)
+
+ # Log to history
+ elements = db_check.get("elements", [])
+ missing_count = len([e for e in elements if not e.get("found_in_tables")])
+ retry_store.log_changeset_check(
+ changeset_id=cs["id"],
+ status=db_check["status"],
+ total_elements=len(elements),
+ missing_count=missing_count,
+ ok_count=len(elements) - missing_count,
+ message=db_check["message"],
+ created_at=cs.get("created_at", ""),
+ closed_at=cs.get("closed_at", ""),
+ elements=elements,
+ )
+
+ # --- Recheck pending, failed, and warning retries ---
+ retryable = retry_store.get_pending() + retry_store.get_failed() + retry_store.get_warnings()
+ newly_failed = []
+
+ if retryable:
+ print(f"\n[pipeline] Rechecking {len(retryable)} retries (pending + failed + warning)...")
+
+ for entry in retryable:
+ cs_id = entry["changeset_id"]
+ etype = entry["element_type"]
+ oid = entry["osm_id"]
+ retry_num = entry["retry_count"] + 1
+ prev_status = entry["status"]
+
+ # Check if the latest version still has mappable tags / valid geometry
+ current_info = _get_current_element_info(etype, oid)
+ if current_info is None:
+ # Element was deleted — no longer needs to be in DB
+ print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) "
+ f"-> element deleted in latest version, no longer expected in DB")
+ retry_store.mark_resolved(cs_id, etype, oid)
+ continue
+ current_elem = {"type": etype, "tags": current_info["tags"],
+ "node_count": current_info["node_count"]}
+ if not _has_mappable_tags(current_elem):
+ # Latest version has no mappable tags — imposm won't import it
+ print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) "
+ f"-> latest version has no mappable tags, no longer expected in DB")
+ retry_store.mark_resolved(cs_id, etype, oid)
+ continue
+ if _has_invalid_geometry(current_elem):
+ # Invalid geometry — imposm will reject it
+ tags = current_info["tags"]
+ print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) "
+ f"-> invalid geometry (area={tags.get('area', 'no')}, "
+ f"nodes={current_info['node_count']}), imposm rejects this")
+ retry_store.mark_resolved(cs_id, etype, oid)
+ continue
+ if _is_non_importable_relation(current_elem):
+ rel_type = current_info["tags"].get("type", "")
+ print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) "
+ f"-> relation type={rel_type} is not imported by imposm")
+ retry_store.mark_resolved(cs_id, etype, oid)
+ continue
+
+ # Check if the element is now in the DB
+ check = _check_element_in_tables(conn, {"type": etype, "osm_id": oid, "action": "modify"})
+ if check["found_in_tables"]:
+ print(f" [retry] RESOLVED {etype}/{oid} (changeset {cs_id}) "
+ f"-> found in tables after {retry_num} retries")
+ retry_store.mark_resolved(cs_id, etype, oid)
+ elif prev_status in ("failed", "warning"):
+ # Already exhausted retries, keep checking but don't increment
+ print(f" [retry] STILL MISSING {etype}/{oid} (changeset {cs_id}) "
+ f"-> {prev_status}, still monitoring")
+ else:
+ # Check missing percentage threshold to decide severity
+ cs_stats = retry_store.get_changeset_stats(cs_id)
+ if cs_stats and cs_stats["total_elements"] > 0:
+ missing_pct = (cs_stats["missing_count"] / cs_stats["total_elements"]) * 100
+ else:
+ missing_pct = 100 # no stats available, assume worst case
+
+ above_threshold = missing_pct >= Config.MISSING_THRESHOLD_PCT
+ final_status = "failed" if above_threshold else "warning"
+ new_status = retry_store.increment_retry(cs_id, etype, oid, final_status=final_status)
+
+ if new_status == "failed":
+ print(f" [retry] FAILED {etype}/{oid} (changeset {cs_id}) "
+ f"-> still missing after {retry_num}/{Config.MAX_RETRIES} retries "
+ f"({missing_pct:.1f}% missing >= {Config.MISSING_THRESHOLD_PCT}% threshold)")
+ newly_failed.append({
+ "type": etype, "osm_id": oid, "changeset_id": cs_id,
+ })
+ elif new_status == "warning":
+ print(f" [retry] WARNING {etype}/{oid} (changeset {cs_id}) "
+ f"-> still missing after {retry_num}/{Config.MAX_RETRIES} retries "
+ f"({missing_pct:.1f}% missing < {Config.MISSING_THRESHOLD_PCT}% threshold, "
+ f"not alerting)")
+ else:
+ print(f" [retry] PENDING {etype}/{oid} (changeset {cs_id}) "
+ f"-> retry {retry_num}/{Config.MAX_RETRIES}")
+
+ conn.close()
+
+ # --- Overall status ---
+ retry_summary = retry_store.summary()
+ result["details"]["retries"] = retry_summary
+
+ if newly_failed:
+ failed_summary = "; ".join(
+ f"{f['type']}/{f['osm_id']} (changeset {f['changeset_id']})"
+ for f in newly_failed
+ )
+ problems.append(f"Failed after {Config.MAX_RETRIES} retries: {failed_summary}")
+
+ has_cs_issues = any(
+ cs.get("replication", {}).get("status") == "critical"
+ or cs.get("tiler_db", {}).get("status") in ("warning", "critical")
+ for cs in result["details"]["changesets"]
+ )
+
+ # Include failed details for Slack alerting
+ failed_count = retry_summary.get("failed", 0)
+ warning_count = retry_summary.get("warning", 0)
+ result["details"]["newly_failed"] = newly_failed
+ result["details"]["total_failed"] = failed_count
+ result["details"]["total_warnings"] = warning_count
+
+ if newly_failed:
+ result["status"] = "critical"
+ failed_labels = "; ".join(
+ f"{f['type']}/{f['osm_id']}" for f in newly_failed[:5]
+ )
+ result["message"] = f"Elements missing after all retries: {failed_labels}"
+ elif failed_count > 0:
+ result["status"] = "critical"
+ result["message"] = f"{failed_count} elements still missing after all retries"
+ elif has_cs_issues:
+ result["status"] = "warning"
+ result["message"] = f"Issues found: {'; '.join(problems[:5])}"
+ else:
+ pending_count = retry_summary.get("pending", 0)
+ checked_count = len(changesets) - skipped
+ msg = (
+ f"{checked_count} new changesets checked"
+ )
+ if skipped:
+ msg += f", {skipped} already passed (skipped)"
+ if pending_count:
+ msg += f", {pending_count} elements pending retry"
+ if warning_count:
+ msg += f", {warning_count} elements below threshold (warning)"
+ result["message"] = msg
+
+ if skipped:
+ print(f"[pipeline] Skipped {skipped} changesets already passed OK")
+ print(f"\n[pipeline] Result: {result['status'].upper()} — {result['message']}")
+ if retry_summary:
+ print(f"[pipeline] Retry store: {retry_summary}")
+ return result
+
+
+# ---------------------------------------------------------------------------
+# On-demand single changeset check
+# ---------------------------------------------------------------------------
+
+def check_single_changeset(changeset_id):
+ """Evaluate a single changeset through the full pipeline (on-demand)."""
+ now = datetime.now(timezone.utc)
+ result = {
+ "name": "pipeline",
+ "changeset_id": changeset_id,
+ "changeset_url": f"{_ohm_base()}/changeset/{changeset_id}",
+ "status": "ok",
+ "message": "",
+ "details": {"replication": {}, "tiler_db": {}},
+ "checked_at": now.isoformat(),
+ }
+
+ # Get changeset info
+ try:
+ url = f"{Config.OHM_API_BASE}/changeset/{changeset_id}"
+ headers = {"User-Agent": "ohm-pipeline-monitor/1.0"}
+ resp = requests.get(url, headers=headers, timeout=30)
+ resp.raise_for_status()
+ root = ET.fromstring(resp.content)
+ cs_elem = root.find("changeset")
+ closed_at = cs_elem.attrib.get("closed_at", "") if cs_elem is not None else ""
+ except Exception:
+ closed_at = ""
+
+ print(f"\n[pipeline] === Changeset {changeset_id} (on-demand) ===")
+ print(f" URL: {_ohm_base()}/changeset/{changeset_id}")
+ if closed_at:
+ print(f" closed_at: {closed_at}")
+
+ # Step 1: replication
+ try:
+ resp = requests.get(Config.REPLICATION_STATE_URL, timeout=15)
+ resp.raise_for_status()
+ repl_seq, repl_ts = _parse_replication_state(resp.text)
+
+ if closed_at and repl_ts:
+ closed_dt = _parse_timestamp(closed_at)
+ cs_data = {"closed_at": closed_at, "closed_dt": closed_dt}
+ repl_check = _check_replication_covers(cs_data, repl_seq, repl_ts)
+ else:
+ repl_check = {
+ "status": "ok" if repl_ts else "warning",
+ "message": f"Replication seq={repl_seq}, ts={repl_ts.isoformat() if repl_ts else 'unknown'}",
+ "replication_sequence": repl_seq,
+ "replication_timestamp": repl_ts.isoformat() if repl_ts else None,
+ }
+
+ result["details"]["replication"] = repl_check
+ print(f" [replication] {repl_check['status'].upper()}: {repl_check['message']}")
+ except requests.RequestException as e:
+ result["details"]["replication"] = {
+ "status": "critical",
+ "message": f"Failed to fetch replication state: {e}",
+ }
+ print(f" [replication] CRITICAL: Cannot fetch replication state: {e}")
+
+ # Step 2: tiler DB
+ try:
+ conn = psycopg2.connect(
+ host=Config.POSTGRES_HOST,
+ port=Config.POSTGRES_PORT,
+ dbname=Config.POSTGRES_DB,
+ user=Config.POSTGRES_USER,
+ password=Config.POSTGRES_PASSWORD,
+ )
+ except psycopg2.Error as e:
+ result["status"] = "critical"
+ result["message"] = f"Cannot connect to tiler DB: {e}"
+ result["details"]["tiler_db"] = {"status": "critical", "message": str(e)}
+ return result
+
+ db_check = _check_elements_in_db(conn, changeset_id, closed_at or None)
+ conn.close()
+ result["details"]["tiler_db"] = db_check
+ print(f" [tiler_db] {db_check['status'].upper()}: {db_check['message']}")
+
+ # Overall
+ problems = []
+ repl_status = result["details"]["replication"].get("status", "ok")
+ if repl_status == "critical":
+ problems.append("Replication not covering this changeset")
+ if db_check["status"] != "ok":
+ problems.append(db_check["message"])
+
+ if problems:
+ result["status"] = "warning"
+ result["message"] = "; ".join(problems)
+ else:
+ result["message"] = (
+ f"Changeset {changeset_id} passed full pipeline check "
+ f"({len(db_check.get('elements', []))} elements verified)"
+ )
+
+ print(f" [result] {result['status'].upper()}: {result['message']}")
+ return result
+
+
+def recheck_single_element(element_type, osm_id):
+ """Manually recheck a single element in the tiler DB.
+
+ Returns detailed info about where it was found or why it's missing.
+ """
+ try:
+ conn = psycopg2.connect(
+ host=Config.POSTGRES_HOST,
+ port=Config.POSTGRES_PORT,
+ dbname=Config.POSTGRES_DB,
+ user=Config.POSTGRES_USER,
+ password=Config.POSTGRES_PASSWORD,
+ )
+ except psycopg2.Error as e:
+ return {"status": "error", "message": f"Cannot connect to tiler DB: {e}"}
+
+ elem = {"type": element_type, "osm_id": osm_id, "action": "modify"}
+ search_id = -osm_id if element_type == "relation" else osm_id
+
+ # Get all osm_* tables to report what was searched
+ cur = conn.cursor()
+ cur.execute("""
+ SELECT table_name FROM information_schema.tables
+ WHERE table_schema = 'public' AND table_name LIKE 'osm_%%'
+ ORDER BY table_name
+ """)
+ all_tables = [row[0] for row in cur.fetchall()]
+ cur.close()
+
+ check = _check_element_in_tables(conn, elem)
+
+ # Also check views if found in tables
+ if check["found_in_tables"]:
+ check = _check_element_in_views(conn, elem, check)
+
+ conn.close()
+
+ found = bool(check["found_in_tables"])
+
+ # If found, resolve all pending retries for this element
+ if found:
+ pending = retry_store.get_pending() + retry_store.get_failed()
+ for entry in pending:
+ if entry["element_type"] == element_type and entry["osm_id"] == osm_id:
+ retry_store.mark_resolved(entry["changeset_id"], element_type, osm_id)
+
+ # Build detailed result
+ if found:
+ message = f"Found in: {', '.join(check['found_in_tables'])}"
+ if check["found_in_views"]:
+ message += f" | Views: {', '.join(check['found_in_views'])}"
+ else:
+ message = (
+ f"Not found in any of {len(all_tables)} osm_* tables "
+ f"(searched with osm_id={search_id})"
+ )
+
+ result = {
+ "status": "resolved" if found else "not_found",
+ "element_type": element_type,
+ "osm_id": osm_id,
+ "search_id": search_id,
+ "found_in_tables": check["found_in_tables"],
+ "found_in_views": check["found_in_views"],
+ "searched_tables": all_tables,
+ "message": message,
+ }
+ return result
+
+
+def recheck_retries():
+ """Manually recheck all pending and failed retries against the tiler DB.
+
+ Returns a summary of resolved, still-missing, and newly-failed elements.
+ """
+ retryable = retry_store.get_pending() + retry_store.get_failed()
+ if not retryable:
+ return {"resolved": [], "still_missing": [], "newly_failed": [], "message": "No retries to check"}
+
+ try:
+ conn = psycopg2.connect(
+ host=Config.POSTGRES_HOST,
+ port=Config.POSTGRES_PORT,
+ dbname=Config.POSTGRES_DB,
+ user=Config.POSTGRES_USER,
+ password=Config.POSTGRES_PASSWORD,
+ )
+ except psycopg2.Error as e:
+ return {"error": f"Cannot connect to tiler DB: {e}"}
+
+ resolved = []
+ still_missing = []
+ newly_failed = []
+
+ for entry in retryable:
+ cs_id = entry["changeset_id"]
+ etype = entry["element_type"]
+ oid = entry["osm_id"]
+ retry_num = entry["retry_count"] + 1
+ prev_status = entry["status"]
+
+ # Check if the latest version still has mappable tags / valid geometry
+ current_info = _get_current_element_info(etype, oid)
+ if current_info is None:
+ retry_store.mark_resolved(cs_id, etype, oid)
+ resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id,
+ "reason": "element deleted"})
+ continue
+ current_elem = {"type": etype, "tags": current_info["tags"],
+ "node_count": current_info["node_count"]}
+ if not _has_mappable_tags(current_elem):
+ retry_store.mark_resolved(cs_id, etype, oid)
+ resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id,
+ "reason": "no mappable tags (rejected by imposm)"})
+ continue
+ if _has_invalid_geometry(current_elem):
+ retry_store.mark_resolved(cs_id, etype, oid)
+ resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id,
+ "reason": f"invalid geometry (area={current_info['tags'].get('area', 'no')}, nodes={current_info['node_count']})"})
+ continue
+ if _is_non_importable_relation(current_elem):
+ rel_type = current_info["tags"].get("type", "")
+ retry_store.mark_resolved(cs_id, etype, oid)
+ resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id,
+ "reason": f"relation type={rel_type} not imported by imposm"})
+ continue
+
+ check = _check_element_in_tables(conn, {"type": etype, "osm_id": oid, "action": "modify"})
+ if check["found_in_tables"]:
+ retry_store.mark_resolved(cs_id, etype, oid)
+ resolved.append({"type": etype, "osm_id": oid, "changeset_id": cs_id,
+ "found_in_tables": check["found_in_tables"]})
+ elif prev_status == "failed":
+ still_missing.append({"type": etype, "osm_id": oid, "changeset_id": cs_id})
+ else:
+ new_status = retry_store.increment_retry(cs_id, etype, oid)
+ if new_status == "failed":
+ newly_failed.append({"type": etype, "osm_id": oid, "changeset_id": cs_id})
+ else:
+ still_missing.append({"type": etype, "osm_id": oid, "changeset_id": cs_id})
+
+ conn.close()
+
+ msg_parts = []
+ if resolved:
+ msg_parts.append(f"{len(resolved)} resolved")
+ if still_missing:
+ msg_parts.append(f"{len(still_missing)} still missing")
+ if newly_failed:
+ msg_parts.append(f"{len(newly_failed)} newly failed")
+
+ return {
+ "resolved": resolved,
+ "still_missing": still_missing,
+ "newly_failed": newly_failed,
+ "message": ", ".join(msg_parts) if msg_parts else "No retries to check",
+ }
diff --git a/images/tiler-monitor/pipeline-monitor/checks/mv_freshness.py b/images/tiler-monitor/pipeline-monitor/checks/mv_freshness.py
new file mode 100644
index 00000000..35d5d4d6
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/checks/mv_freshness.py
@@ -0,0 +1,179 @@
+"""Check 3: Materialized view freshness monitor.
+
+Queries pg_stat_user_tables to check when materialized views were last
+auto-analyzed/auto-vacuumed (proxy for last refresh), and also checks
+if the views exist and have rows.
+"""
+
+from datetime import datetime, timezone
+
+import psycopg2
+
+from config import Config
+
+# Key materialized views grouped by expected refresh interval.
+# group_name -> (max_stale_seconds, [view_names])
+MV_GROUPS = {
+ "admin_boundaries_lines": (
+ 300, # expect refresh every ~60s + buffer
+ [
+ "mv_admin_boundaries_lines_z4_5",
+ "mv_admin_boundaries_lines_z6_7",
+ "mv_admin_boundaries_lines_z8_9",
+ "mv_admin_boundaries_lines_z10_11",
+ "mv_admin_boundaries_lines_z12_13",
+ "mv_admin_boundaries_lines_z14_15",
+ "mv_admin_boundaries_lines_z16_20",
+ ],
+ ),
+ "water": (
+ 600, # expect refresh every ~180s + buffer
+ [
+ "mv_water_lines_z10_11",
+ "mv_water_lines_z12_13",
+ "mv_water_lines_z14_15",
+ "mv_water_lines_z16_20",
+ "mv_water_areas_z6_7",
+ "mv_water_areas_z8_9",
+ "mv_water_areas_z10_11",
+ "mv_water_areas_z12_13",
+ "mv_water_areas_z14_15",
+ "mv_water_areas_z16_20",
+ ],
+ ),
+ "transport": (
+ 600,
+ [
+ "mv_transport_lines_z8_9",
+ "mv_transport_lines_z10_11",
+ "mv_transport_lines_z12_13",
+ "mv_transport_lines_z14_15",
+ "mv_transport_lines_z16_20",
+ ],
+ ),
+}
+
+
+def check_mv_freshness():
+ """Check that key materialized views exist and are being refreshed."""
+ result = {
+ "name": "mv_freshness",
+ "status": "ok",
+ "message": "",
+ "details": {"groups": {}},
+ "checked_at": datetime.now(timezone.utc).isoformat(),
+ }
+
+ try:
+ conn = psycopg2.connect(
+ host=Config.POSTGRES_HOST,
+ port=Config.POSTGRES_PORT,
+ dbname=Config.POSTGRES_DB,
+ user=Config.POSTGRES_USER,
+ password=Config.POSTGRES_PASSWORD,
+ )
+ except psycopg2.Error as e:
+ result["status"] = "critical"
+ result["message"] = f"Cannot connect to tiler DB: {e}"
+ return result
+
+ cur = conn.cursor()
+
+ # Get list of existing materialized views
+ cur.execute("SELECT matviewname FROM pg_matviews WHERE schemaname = 'public'")
+ existing_mvs = {row[0] for row in cur.fetchall()}
+
+ # Check row counts and last analyze times for MVs via pg_stat_user_tables.
+ # REFRESH MATERIALIZED VIEW triggers auto-analyze, so last_autoanalyze
+ # is a good proxy for "last refreshed".
+ cur.execute("""
+ SELECT relname, n_live_tup, last_autoanalyze, last_analyze
+ FROM pg_stat_user_tables
+ WHERE schemaname = 'public'
+ AND relname LIKE 'mv_%%'
+ """)
+ mv_stats = {}
+ for row in cur.fetchall():
+ name, n_rows, last_autoanalyze, last_analyze = row
+ # Use whichever is more recent
+ last_refreshed = max(
+ filter(None, [last_autoanalyze, last_analyze]),
+ default=None,
+ )
+ mv_stats[name] = {
+ "n_rows": n_rows,
+ "last_refreshed": last_refreshed,
+ }
+
+ cur.close()
+ conn.close()
+
+ missing_views = []
+ stale_views = []
+ empty_views = []
+ now = datetime.now(timezone.utc)
+
+ for group_name, (max_stale, views) in MV_GROUPS.items():
+ group_result = {"views": [], "status": "ok"}
+
+ for view_name in views:
+ view_info = {"name": view_name, "status": "ok"}
+
+ if view_name not in existing_mvs:
+ view_info["status"] = "critical"
+ view_info["message"] = "View does not exist"
+ missing_views.append(view_name)
+ elif view_name in mv_stats:
+ stats = mv_stats[view_name]
+ view_info["n_rows"] = stats["n_rows"]
+
+ if stats["n_rows"] == 0:
+ view_info["status"] = "warning"
+ view_info["message"] = "View is empty (0 rows)"
+ empty_views.append(view_name)
+
+ if stats["last_refreshed"]:
+ last_ref = stats["last_refreshed"]
+ if last_ref.tzinfo is None:
+ last_ref = last_ref.replace(tzinfo=timezone.utc)
+ age_seconds = (now - last_ref).total_seconds()
+ view_info["last_refreshed"] = last_ref.isoformat()
+ view_info["age_seconds"] = round(age_seconds)
+
+ if age_seconds > max_stale:
+ view_info["status"] = "warning"
+ view_info["message"] = (
+ f"Stale: last refreshed {round(age_seconds / 60, 1)} min ago "
+ f"(threshold: {max_stale // 60} min)"
+ )
+ stale_views.append(view_name)
+ else:
+ view_info["last_refreshed"] = None
+ view_info["message"] = "No analyze timestamp available"
+ else:
+ view_info["message"] = "No stats available"
+
+ group_result["views"].append(view_info)
+
+ if any(v["status"] == "critical" for v in group_result["views"]):
+ group_result["status"] = "critical"
+ elif any(v["status"] == "warning" for v in group_result["views"]):
+ group_result["status"] = "warning"
+
+ result["details"]["groups"][group_name] = group_result
+
+ # Overall status
+ if missing_views:
+ result["status"] = "critical"
+ result["message"] = f"Missing views: {', '.join(missing_views[:5])}"
+ elif stale_views:
+ result["status"] = "warning"
+ result["message"] = f"Stale views: {', '.join(stale_views[:5])}"
+ elif empty_views:
+ result["status"] = "warning"
+ result["message"] = f"Empty views: {', '.join(empty_views[:5])}"
+ else:
+ total = sum(len(v) for _, v in MV_GROUPS.values())
+ result["message"] = f"All {total} monitored materialized views are healthy"
+
+ return result
diff --git a/images/tiler-monitor/pipeline-monitor/checks/replication_lag.py b/images/tiler-monitor/pipeline-monitor/checks/replication_lag.py
new file mode 100644
index 00000000..c15cb2ce
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/checks/replication_lag.py
@@ -0,0 +1,89 @@
+"""Check 1: Minute replication lag monitor.
+
+Compares the latest replication sequence number available on S3
+against the last sequence number processed by imposm (from the tiler DB
+or the replication state endpoint).
+"""
+
+import time
+from datetime import datetime, timezone
+
+import requests
+
+from config import Config
+
+
+def _parse_state(text):
+ """Parse an imposm/osm replication state.txt and return sequence + timestamp."""
+ data = {}
+ for line in text.strip().splitlines():
+ if "=" in line:
+ key, _, value = line.partition("=")
+ data[key.strip()] = value.strip()
+ seq = int(data.get("sequenceNumber", 0))
+ ts_raw = data.get("timestamp", "")
+ # Format: 2026-03-13T12\:05\:02Z (escaped colons in java properties)
+ ts_raw = ts_raw.replace("\\:", ":")
+ try:
+ ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
+ except ValueError:
+ ts = None
+ return seq, ts
+
+
+def check_replication_lag():
+ """Return a dict with the replication lag check result."""
+ result = {
+ "name": "replication_lag",
+ "status": "ok",
+ "message": "",
+ "details": {},
+ "checked_at": datetime.now(timezone.utc).isoformat(),
+ }
+
+ try:
+ # Get latest available replication state from S3
+ resp = requests.get(Config.REPLICATION_STATE_URL, timeout=15)
+ resp.raise_for_status()
+ remote_seq, remote_ts = _parse_state(resp.text)
+
+ result["details"]["remote_sequence"] = remote_seq
+ result["details"]["remote_timestamp"] = remote_ts.isoformat() if remote_ts else None
+
+ # Get imposm's last processed state
+ # The imposm diff dir stores last.state.txt - we query it via the same
+ # base URL pattern but from the local imposm state endpoint.
+ # In Docker, we can check the DB for the latest sequence via the
+ # osm_replication_status table if available, or fall back to comparing
+ # timestamps of recent data.
+ #
+ # For now: compare remote timestamp against current time.
+ # If remote_ts is stale, replication source itself is behind.
+ # A more precise check reads imposm's last.state.txt from the shared volume.
+
+ if remote_ts:
+ lag_seconds = (datetime.now(timezone.utc) - remote_ts).total_seconds()
+ result["details"]["lag_seconds"] = round(lag_seconds)
+ result["details"]["lag_minutes"] = round(lag_seconds / 60, 1)
+
+ if lag_seconds > Config.REPLICATION_LAG_THRESHOLD:
+ result["status"] = "critical"
+ result["message"] = (
+ f"Replication lag is {round(lag_seconds / 60, 1)} minutes "
+ f"(threshold: {Config.REPLICATION_LAG_THRESHOLD // 60} min). "
+ f"Last replication timestamp: {remote_ts.isoformat()}"
+ )
+ else:
+ result["message"] = (
+ f"Replication is up to date. Lag: {round(lag_seconds / 60, 1)} min, "
+ f"sequence: {remote_seq}"
+ )
+ else:
+ result["status"] = "warning"
+ result["message"] = "Could not parse replication timestamp"
+
+ except requests.RequestException as e:
+ result["status"] = "critical"
+ result["message"] = f"Failed to fetch replication state: {e}"
+
+ return result
diff --git a/images/tiler-monitor/pipeline-monitor/checks/tile_cache.py b/images/tiler-monitor/pipeline-monitor/checks/tile_cache.py
new file mode 100644
index 00000000..9d21eb45
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/checks/tile_cache.py
@@ -0,0 +1,159 @@
+"""Pipeline check: verify tile cache in S3 is up-to-date.
+
+For a sampled element, check if the cached tile in S3 was modified
+after the changeset closed_at. If the tile is stale, the cache purge
+(SQS → tiler-cache) may have failed.
+"""
+
+import mercantile
+import psycopg2.extensions
+from datetime import datetime, timezone
+
+from config import Config
+
+
+def _get_element_centroid(conn, elem):
+ """Get the centroid (lon, lat) of an element from the tiler DB."""
+ osm_id = elem["osm_id"]
+ search_id = -osm_id if elem["type"] == "relation" else osm_id
+
+ # Search in the tables where it was found
+ found_tables = elem.get("found_in_tables", [])
+ if not found_tables:
+ return None
+
+ cur = conn.cursor()
+ for table in found_tables:
+ try:
+ quoted = psycopg2.extensions.quote_ident(table, cur)
+ cur.execute(
+ f"SELECT ST_X(ST_Centroid(ST_Transform(geometry, 4326))), "
+ f"ST_Y(ST_Centroid(ST_Transform(geometry, 4326))) "
+ f"FROM {quoted} WHERE osm_id = %s LIMIT 1",
+ (search_id,),
+ )
+ row = cur.fetchone()
+ if row and row[0] is not None:
+ cur.close()
+ return {"lon": row[0], "lat": row[1]}
+ except Exception:
+ conn.rollback()
+
+ cur.close()
+ return None
+
+
+def _get_tile_for_point(lon, lat, zoom):
+ """Convert lon/lat to tile z/x/y."""
+ tile = mercantile.tile(lon, lat, zoom)
+ return {"z": tile.z, "x": tile.x, "y": tile.y}
+
+
+def _check_tile_in_s3(tile, changeset_closed_at):
+ """Check if a cached tile in S3 is stale (older than changeset).
+
+ Returns dict with status and details for each S3 path.
+ """
+ if not Config.S3_BUCKET_CACHE_TILER:
+ return {
+ "status": "skipped",
+ "message": "S3_BUCKET_CACHE_TILER not configured",
+ }
+
+ s3 = Config.get_s3_client()
+ bucket = Config.S3_BUCKET_CACHE_TILER
+ z, x, y = tile["z"], tile["x"], tile["y"]
+
+ results = []
+ stale_paths = []
+
+ for path_prefix in Config.S3_BUCKET_PATH_FILES:
+ key = f"{path_prefix}/{z}/{x}/{y}.pbf"
+ try:
+ resp = s3.head_object(Bucket=bucket, Key=key)
+ last_modified = resp["LastModified"]
+
+ # Parse changeset closed_at
+ closed_dt = datetime.fromisoformat(
+ changeset_closed_at.replace("Z", "+00:00")
+ )
+
+ is_stale = last_modified < closed_dt
+ result = {
+ "path": key,
+ "last_modified": last_modified.isoformat(),
+ "is_stale": is_stale,
+ }
+ results.append(result)
+ if is_stale:
+ stale_paths.append(key)
+
+ except s3.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] == "404":
+ # Tile not in cache — not stale, tegola will generate on demand
+ results.append({
+ "path": key,
+ "last_modified": None,
+ "is_stale": False,
+ "note": "not cached (tegola generates on demand)",
+ })
+ else:
+ results.append({
+ "path": key,
+ "error": str(e),
+ })
+
+ if stale_paths:
+ return {
+ "status": "stale",
+ "message": f"Tile cache is stale for: {', '.join(stale_paths)}",
+ "tile": tile,
+ "details": results,
+ }
+ else:
+ return {
+ "status": "ok",
+ "message": "Tile cache is up-to-date or not cached",
+ "tile": tile,
+ "details": results,
+ }
+
+
+def check_tile_cache_for_element(conn, elem_check, changeset_closed_at):
+ """Full tile cache verification for a single element.
+
+ Args:
+ conn: DB connection
+ elem_check: result dict from _check_element_in_db (with found_in_tables)
+ changeset_closed_at: ISO timestamp string
+
+ Returns:
+ dict with tile cache check results
+ """
+ osm_id = elem_check["osm_id"]
+ elem_type = elem_check["type"]
+ zoom = Config.TILE_CHECK_ZOOM
+
+ # Step 1: get geometry from DB
+ centroid = _get_element_centroid(conn, elem_check)
+ if not centroid:
+ return {
+ "osm_id": osm_id,
+ "type": elem_type,
+ "status": "skipped",
+ "message": "Could not get geometry from DB",
+ }
+
+ # Step 2: calculate tile
+ tile = _get_tile_for_point(centroid["lon"], centroid["lat"], zoom)
+
+ # Step 3: check S3 cache
+ cache_result = _check_tile_in_s3(tile, changeset_closed_at)
+
+ return {
+ "osm_id": osm_id,
+ "type": elem_type,
+ "centroid": centroid,
+ "tile": tile,
+ "cache": cache_result,
+ }
diff --git a/images/tiler-monitor/pipeline-monitor/config.py b/images/tiler-monitor/pipeline-monitor/config.py
new file mode 100644
index 00000000..5b91f1d1
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/config.py
@@ -0,0 +1,103 @@
+import os
+import re
+
+
+def _parse_duration(value, default):
+ """Parse human-readable duration (e.g. '1h', '30m', '1.5h', '2h30m', '3600') to seconds."""
+ raw = os.getenv(value, "")
+ if not raw:
+ return default
+ # If it's just a number, treat as seconds
+ try:
+ return int(float(raw))
+ except ValueError:
+ pass
+ total = 0
+ for amount, unit in re.findall(r"(\d+\.?\d*)\s*(h|m|s)", raw.lower()):
+ amount = float(amount)
+ if unit == "h":
+ total += amount * 3600
+ elif unit == "m":
+ total += amount * 60
+ elif unit == "s":
+ total += amount
+ return int(total) if total > 0 else default
+
+
+class Config:
+ # PostgreSQL (tiler DB)
+ POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
+ POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", 5432))
+ POSTGRES_DB = os.getenv("POSTGRES_DB", "tiler")
+ POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
+ POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "")
+
+ # Replication
+ REPLICATION_STATE_URL = os.getenv(
+ "REPLICATION_STATE_URL",
+ "https://s3.amazonaws.com/planet.openhistoricalmap.org/replication/minute/state.txt",
+ )
+ OHM_API_BASE = os.getenv("OHM_API_BASE", "https://www.openhistoricalmap.org/api/0.6")
+
+ # How often to run the pipeline check (e.g. "1h", "30m", "3600")
+ CHECK_INTERVAL = _parse_duration("TILER_MONITORING_CHECK_INTERVAL", 3600)
+
+ # OHM changeset age window (e.g. "1h", "2h30m", "3600")
+ CHANGESET_MIN_AGE = _parse_duration("TILER_MONITORING_CHANGESET_MIN_AGE", 10800)
+ CHANGESET_MAX_AGE = _parse_duration("TILER_MONITORING_CHANGESET_MAX_AGE", 14400)
+
+ # Max number of changesets to check per run
+ CHANGESET_LIMIT = int(os.getenv("CHANGESET_LIMIT", 30))
+
+ # Retry: how many times to recheck a missing element before alerting
+ MAX_RETRIES = int(os.getenv("TILER_MONITORING_MAX_RETRIES", 3))
+
+ # Missing threshold: minimum percentage of missing elements in a changeset
+ # to consider it a real failure. Below this threshold, elements are marked
+ # as "warning" instead of "failed" and do NOT trigger RSS/Slack alerts.
+ # e.g. 10 = 10% — if only 1/44 elements is missing (2.3%), it's a warning.
+ MISSING_THRESHOLD_PCT = int(os.getenv("TILER_MONITORING_MISSING_THRESHOLD_PCT", 10))
+
+ # Verbose logging
+ VERBOSE_LOGGING = os.getenv("VERBOSE_LOGGING", "false").lower() == "true"
+
+ # Alerting (optional)
+ SLACK_WEBHOOK_URL = os.getenv("TILER_MONITORING_SLACK_WEBHOOK_URL", "")
+
+ # Server
+ MONITOR_PORT = int(os.getenv("MONITOR_PORT", 8001))
+ MONITOR_BASE_URL = os.getenv("TILER_MONITORING_BASE_URL", "")
+
+ # S3 tile cache verification
+ S3_BUCKET_CACHE_TILER = os.getenv("S3_BUCKET_CACHE_TILER", "")
+ S3_BUCKET_PATH_FILES = os.getenv("S3_BUCKET_PATH_FILES", "mnt/data/ohm,mnt/data/ohm_admin,mnt/data/ohm_other_boundaries").split(",")
+ TILER_CACHE_AWS_ACCESS_KEY_ID = os.getenv("TILER_CACHE_AWS_ACCESS_KEY_ID", "")
+ TILER_CACHE_AWS_SECRET_ACCESS_KEY = os.getenv("TILER_CACHE_AWS_SECRET_ACCESS_KEY", "")
+ TILER_CACHE_AWS_ENDPOINT = os.getenv("TILER_CACHE_AWS_ENDPOINT", "https://s3.amazonaws.com")
+ TILER_CACHE_REGION = os.getenv("TILER_CACHE_REGION", "us-east-1")
+ TILER_CACHE_CLOUD_INFRASTRUCTURE = os.getenv("TILER_CACHE_CLOUD_INFRASTRUCTURE", "aws")
+ # Zoom level to verify tile cache (use high zoom for precise check)
+ TILE_CHECK_ZOOM = int(os.getenv("TILE_CHECK_ZOOM", 16))
+ # Percentage of elements to do full pipeline check (tables + views + S3)
+ # e.g. 25 = 25% of elements, minimum 1
+ FULL_CHECK_SAMPLE_PCT = int(os.getenv("TILER_MONITORING_FULL_CHECK_SAMPLE_PCT", 25))
+
+ @staticmethod
+ def get_s3_client():
+ import boto3
+ if Config.TILER_CACHE_CLOUD_INFRASTRUCTURE == "hetzner":
+ return boto3.client(
+ "s3",
+ aws_access_key_id=Config.TILER_CACHE_AWS_ACCESS_KEY_ID,
+ aws_secret_access_key=Config.TILER_CACHE_AWS_SECRET_ACCESS_KEY,
+ endpoint_url=Config.TILER_CACHE_AWS_ENDPOINT,
+ region_name=Config.TILER_CACHE_REGION,
+ )
+ return boto3.client("s3")
+
+ @staticmethod
+ def get_db_dsn():
+ return (
+ f"postgresql://{Config.POSTGRES_USER}:{Config.POSTGRES_PASSWORD}"
+ f"@{Config.POSTGRES_HOST}:{Config.POSTGRES_PORT}/{Config.POSTGRES_DB}"
+ )
diff --git a/images/tiler-monitor/pipeline-monitor/monitor.py b/images/tiler-monitor/pipeline-monitor/monitor.py
new file mode 100644
index 00000000..18acb82b
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/monitor.py
@@ -0,0 +1,397 @@
+"""Vtile pipeline monitor.
+
+Runs periodic changeset-centric checks and exposes results via a FastAPI HTTP
+server. Optionally sends Slack alerts when checks fail.
+"""
+
+import logging
+import os
+import threading
+import time
+from datetime import datetime, timezone
+
+import requests
+import uvicorn
+from fastapi import FastAPI
+from fastapi.responses import HTMLResponse, JSONResponse, Response
+
+from checks.imposm_import import check_pipeline, check_single_changeset, recheck_retries
+from config import Config
+import retry_store
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s [%(levelname)s] %(message)s",
+)
+logger = logging.getLogger(__name__)
+
+# Store latest check result
+_latest_result = None
+_lock = threading.Lock()
+
+app = FastAPI(title="OHM Vtile Pipeline Monitor")
+
+
+# ---------------------------------------------------------------------------
+# Slack alerting
+# ---------------------------------------------------------------------------
+
+def _send_slack_alert(check_result):
+ """Send a Slack notification when a check is not ok."""
+ if not Config.SLACK_WEBHOOK_URL:
+ return
+ status_emoji = {"ok": ":white_check_mark:", "warning": ":warning:", "critical": ":rotating_light:"}
+ emoji = status_emoji.get(check_result["status"], ":question:")
+ ohm = "https://www.openhistoricalmap.org"
+
+ lines = [f"{emoji} *OHM Tiler Pipeline Monitor* — {check_result['status'].upper()}"]
+ lines.append(check_result["message"])
+
+ # Add failed element details with links
+ newly_failed = check_result.get("details", {}).get("newly_failed", [])
+ if newly_failed:
+ lines.append("")
+ lines.append("*Elements missing after all retries:*")
+ for f in newly_failed[:10]:
+ cs_url = f"{ohm}/changeset/{f['changeset_id']}"
+ elem_url = f"{ohm}/{f['type']}/{f['osm_id']}"
+ lines.append(f" • <{elem_url}|{f['type']}/{f['osm_id']}> in <{cs_url}|changeset {f['changeset_id']}>")
+
+ # Add dashboard link
+ if Config.MONITOR_BASE_URL:
+ lines.append("")
+ lines.append(f":mag: <{Config.MONITOR_BASE_URL}|Open Dashboard> · "
+ f"<{Config.MONITOR_BASE_URL}/retries|View Retries>")
+
+ # Add changeset-level issues
+ changesets = check_result.get("details", {}).get("changesets", [])
+ cs_issues = [cs for cs in changesets
+ if cs.get("tiler_db", {}).get("status") not in ("ok", "retry_pending", None)]
+ if cs_issues and not newly_failed:
+ lines.append("")
+ for cs in cs_issues[:5]:
+ cs_url = f"{ohm}/changeset/{cs['changeset_id']}"
+ msg = cs.get("tiler_db", {}).get("message", "")
+ lines.append(f" • <{cs_url}|Changeset {cs['changeset_id']}>: {msg}")
+
+ text = "\n".join(lines)
+ try:
+ requests.post(
+ Config.SLACK_WEBHOOK_URL,
+ json={"text": text},
+ timeout=10,
+ )
+ except requests.RequestException as e:
+ logger.error(f"Failed to send Slack alert: {e}")
+
+
+# ---------------------------------------------------------------------------
+# Background scheduler
+# ---------------------------------------------------------------------------
+
+def _run_check():
+ """Run the pipeline check and update stored result."""
+ try:
+ logger.info("=============> Running pipeline check")
+ result = check_pipeline()
+ logger.info(f"Pipeline check: {result['status']} — {result['message']}")
+
+ with _lock:
+ prev = _latest_result
+ globals()["_latest_result"] = result
+
+ # Alert on state changes or new failures
+ newly_failed = result.get("details", {}).get("newly_failed", [])
+ if newly_failed:
+ # New elements just exhausted retries — always alert
+ _send_slack_alert(result)
+ # Log each failed element as a feed event
+ ohm = "https://www.openhistoricalmap.org"
+ for f in newly_failed:
+ retry_store.add_feed_event(
+ event_type="failed",
+ title=f"FAILED: {f['type']}/{f['osm_id']} not found in tiler DB after all retries",
+ description=(
+ f"Element {f['type']}/{f['osm_id']} from changeset {f['changeset_id']} "
+ f"was not found in the tiler database after all retries."
+ ),
+ link=f"{ohm}/{f['type']}/{f['osm_id']}",
+ element_type=f["type"],
+ osm_id=f["osm_id"],
+ changeset_id=f["changeset_id"],
+ )
+ elif result["status"] == "warning":
+ if prev is None or prev["status"] == "ok":
+ _send_slack_alert(result)
+ elif result["status"] == "ok" and prev and prev["status"] in ("critical", "warning"):
+ # Recovered — send ok notification
+ _send_slack_alert(result)
+ retry_store.add_feed_event(
+ event_type="recovered",
+ title="RECOVERED: All pipeline elements verified OK",
+ description=result["message"],
+ )
+
+ except Exception as e:
+ logger.exception(f"Pipeline check raised an exception: {e}")
+ with _lock:
+ globals()["_latest_result"] = {
+ "name": "pipeline",
+ "status": "critical",
+ "message": f"Check failed with exception: {e}",
+ "details": {},
+ "checked_at": datetime.now(timezone.utc).isoformat(),
+ }
+
+
+def _scheduler():
+ """Background loop that runs checks at the configured interval."""
+ logger.info(f"Pipeline monitor starting. Check interval: {Config.CHECK_INTERVAL}s")
+ time.sleep(10)
+
+ while True:
+ _run_check()
+ time.sleep(Config.CHECK_INTERVAL)
+
+
+# ---------------------------------------------------------------------------
+# HTTP endpoints
+# ---------------------------------------------------------------------------
+
+_STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
+
+
+@app.get("/", response_class=HTMLResponse)
+def dashboard():
+ """Serve the monitoring dashboard."""
+ html_path = os.path.join(_STATIC_DIR, "dashboard.html")
+ with open(html_path) as f:
+ return HTMLResponse(content=f.read())
+
+
+@app.get("/health")
+def health():
+ """Overall health: returns 200 if ok, 503 otherwise."""
+ with _lock:
+ result = _latest_result
+
+ if result is None:
+ return JSONResponse(
+ content={"status": "starting", "message": "No checks have run yet"},
+ status_code=200,
+ )
+
+ status_code = 200 if result["status"] == "ok" else 503
+ return JSONResponse(
+ content={
+ "status": result["status"],
+ "message": result["message"],
+ "checked_at": result["checked_at"],
+ },
+ status_code=status_code,
+ )
+
+
+@app.get("/checks")
+def all_checks():
+ """Return full details for the latest pipeline check."""
+ with _lock:
+ result = _latest_result
+ if result is None:
+ return JSONResponse(content={"status": "starting"})
+ return JSONResponse(content=result)
+
+
+@app.get("/changeset/{changeset_id}")
+def evaluate_changeset(changeset_id: int):
+ """Evaluate a specific changeset through the full pipeline (on-demand)."""
+ result = check_single_changeset(changeset_id)
+ status_code = 200 if result["status"] == "ok" else 503
+ return JSONResponse(content=result, status_code=status_code)
+
+
+@app.post("/retries/recheck")
+def retries_recheck():
+ """Manually trigger a recheck of all pending and failed retries."""
+ result = recheck_retries()
+
+ # Update cached status if all retries are now resolved
+ remaining = retry_store.summary()
+ if remaining.get("failed", 0) == 0 and remaining.get("pending", 0) == 0:
+ with _lock:
+ prev = _latest_result
+ if prev and prev.get("status") == "critical":
+ updated = dict(prev)
+ updated["status"] = "ok"
+ updated["message"] = "All retries resolved"
+ updated["details"] = dict(prev.get("details", {}))
+ updated["details"]["retries"] = remaining
+ updated["details"]["total_failed"] = 0
+ updated["details"]["newly_failed"] = []
+ globals()["_latest_result"] = updated
+
+ return JSONResponse(content=result)
+
+
+@app.post("/retries/recheck/{element_type}/{osm_id}")
+def retries_recheck_single(element_type: str, osm_id: int):
+ """Manually recheck a single element in the tiler DB."""
+ try:
+ from checks.imposm_import import recheck_single_element
+ result = recheck_single_element(element_type, osm_id)
+
+ # Update cached status if no more failed retries
+ remaining = retry_store.summary()
+ if remaining.get("failed", 0) == 0 and remaining.get("pending", 0) == 0:
+ with _lock:
+ prev = _latest_result
+ if prev and prev.get("status") == "critical":
+ updated = dict(prev)
+ updated["status"] = "ok"
+ updated["message"] = "All retries resolved"
+ updated["details"] = dict(prev.get("details", {}))
+ updated["details"]["retries"] = remaining
+ updated["details"]["total_failed"] = 0
+ updated["details"]["newly_failed"] = []
+ globals()["_latest_result"] = updated
+
+ return JSONResponse(content=result)
+ except Exception as e:
+ logger.exception(f"Recheck failed for {element_type}/{osm_id}")
+ return JSONResponse(
+ content={"status": "error", "message": f"Recheck failed: {e}"},
+ status_code=500,
+ )
+
+
+@app.get("/retries")
+def retries():
+ """Return current retry state with full details for debugging."""
+ all_entries = retry_store.get_all_details()
+ pending = [e for e in all_entries if e["status"] == "pending"]
+ failed = [e for e in all_entries if e["status"] == "failed"]
+ warnings = [e for e in all_entries if e["status"] == "warning"]
+ return JSONResponse(content={
+ "summary": retry_store.summary(),
+ "total": len(all_entries),
+ "pending": pending,
+ "failed": failed,
+ "warnings": warnings,
+ })
+
+
+@app.get("/history")
+def history(page: int = 1, per_page: int = 20):
+ """Paginated history of all changesets checked.
+
+ Example: /history?page=1&per_page=10
+ """
+ data = retry_store.get_changeset_history(page=page, per_page=per_page)
+ return JSONResponse(content=data)
+
+
+@app.get("/history/{history_id}/elements")
+def history_elements(history_id: int):
+ """Return all elements checked for a specific history entry."""
+ elements = retry_store.get_changeset_elements(history_id)
+ return JSONResponse(content={"history_id": history_id, "elements": elements})
+
+
+# ---------------------------------------------------------------------------
+# RSS / Atom feed
+# ---------------------------------------------------------------------------
+
+def _xml_escape(text):
+ """Escape XML special characters."""
+ return (str(text)
+ .replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace('"', """)
+ .replace("'", "'"))
+
+
+def _to_rfc822(iso_str):
+ """Convert ISO timestamp to RFC 822 format for RSS."""
+ try:
+ dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
+ return dt.strftime("%a, %d %b %Y %H:%M:%S +0000")
+ except Exception:
+ return datetime.now(timezone.utc).strftime("%a, %d %b %Y %H:%M:%S +0000")
+
+
+def _build_rss_feed():
+ """Build an RSS 2.0 feed from persistent feed events.
+
+ Each event is a unique item with a stable guid (based on DB id),
+ so Slack's /feed command detects new items as they appear.
+ """
+ base_url = Config.MONITOR_BASE_URL or "https://tiler-monitoring.openhistoricalmap.org"
+ now = datetime.now(timezone.utc)
+ rfc822_now = now.strftime("%a, %d %b %Y %H:%M:%S +0000")
+
+ events = retry_store.get_feed_events(limit=50)
+
+ items_xml = []
+ for ev in events:
+ title = ev["title"]
+ link = ev["link"] or base_url
+ guid = f"ohm-tiler-feed-{ev['id']}"
+ pub_date = _to_rfc822(ev["created_at"])
+ desc = ev["description"]
+
+ # Add element/changeset links in description if available
+ if ev["osm_id"] and ev["element_type"]:
+ ohm = "https://www.openhistoricalmap.org"
+ elem_link = f"{ohm}/{ev['element_type']}/{ev['osm_id']}"
+ cs_link = f"{ohm}/changeset/{ev['changeset_id']}" if ev["changeset_id"] else ""
+ desc_parts = [desc]
+ desc_parts.append(f"Element: {elem_link}")
+ if cs_link:
+ desc_parts.append(f"Changeset: {cs_link}")
+ desc_parts.append(f"Dashboard: {base_url}")
+ desc = " | ".join(desc_parts)
+
+ items_xml.append(f""" -
+ {_xml_escape(title)}
+ {_xml_escape(link)}
+ {_xml_escape(guid)}
+ {pub_date}
+ {_xml_escape(desc)}
+
""")
+
+ items_str = "\n".join(items_xml) if items_xml else ""
+
+ feed = f"""
+
+
+ OHM Tiler Pipeline Monitor - Alerts
+ {_xml_escape(base_url)}
+ Alerts from the OHM tiler pipeline monitor: failed elements not found in the tiler DB after retries.
+ en
+ {rfc822_now}
+
+{items_str}
+
+"""
+ return feed
+
+
+@app.get("/feed.rss")
+def rss_feed():
+ """RSS 2.0 feed of pipeline alerts for Slack integration."""
+ xml = _build_rss_feed()
+ return Response(content=xml, media_type="application/rss+xml")
+
+
+# ---------------------------------------------------------------------------
+# Entrypoint
+# ---------------------------------------------------------------------------
+
+if __name__ == "__main__":
+ # Start background scheduler
+ t = threading.Thread(target=_scheduler, daemon=True)
+ t.start()
+
+ # Start HTTP server
+ uvicorn.run(app, host="0.0.0.0", port=Config.MONITOR_PORT)
diff --git a/images/tiler-monitor/pipeline-monitor/requirements.txt b/images/tiler-monitor/pipeline-monitor/requirements.txt
new file mode 100644
index 00000000..1360cc74
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/requirements.txt
@@ -0,0 +1,6 @@
+fastapi
+uvicorn
+requests
+psycopg2-binary
+mercantile
+boto3
diff --git a/images/tiler-monitor/pipeline-monitor/retry_store.py b/images/tiler-monitor/pipeline-monitor/retry_store.py
new file mode 100644
index 00000000..ccfccc58
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/retry_store.py
@@ -0,0 +1,521 @@
+"""SQLite-backed retry store for missing pipeline elements.
+
+Tracks elements that were not found in the tiler DB so they can be
+rechecked on subsequent runs. After MAX_RETRIES consecutive failures
+the element is marked as "failed" and an alert can be triggered.
+
+Uses a single shared connection with a threading lock to avoid
+"database is locked" errors from concurrent access.
+"""
+
+import logging
+import sqlite3
+import os
+import threading
+from datetime import datetime, timezone
+
+logger = logging.getLogger(__name__)
+
+_DB_PATH = os.getenv("TILER_MONITORING_RETRY_DB", "/data/pipeline_retries.db")
+_lock = threading.Lock()
+_conn = None
+
+
+def _get_conn():
+ """Return the shared connection, creating it on first call."""
+ global _conn
+ if _conn is None:
+ _conn = sqlite3.connect(_DB_PATH, check_same_thread=False)
+ _conn.row_factory = sqlite3.Row
+ _conn.execute("PRAGMA journal_mode=WAL")
+ _conn.execute("PRAGMA busy_timeout=5000")
+ _init_tables(_conn)
+ return _conn
+
+
+def _init_tables(conn):
+ """Create all tables and indexes."""
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS pending_retries (
+ changeset_id INTEGER NOT NULL,
+ element_type TEXT NOT NULL,
+ osm_id INTEGER NOT NULL,
+ version INTEGER NOT NULL DEFAULT 0,
+ action TEXT NOT NULL DEFAULT '',
+ retry_count INTEGER NOT NULL DEFAULT 0,
+ max_retries INTEGER NOT NULL,
+ first_seen TEXT NOT NULL,
+ last_checked TEXT NOT NULL,
+ status TEXT NOT NULL DEFAULT 'pending',
+ closed_at TEXT NOT NULL DEFAULT '',
+ PRIMARY KEY (changeset_id, element_type, osm_id)
+ )
+ """)
+ # Migrate: add columns if missing (for existing DBs)
+ for col, typedef in [("version", "INTEGER NOT NULL DEFAULT 0"),
+ ("action", "TEXT NOT NULL DEFAULT ''"),
+ ("closed_at", "TEXT NOT NULL DEFAULT ''")]:
+ try:
+ conn.execute(f"ALTER TABLE pending_retries ADD COLUMN {col} {typedef}")
+ except sqlite3.OperationalError:
+ pass
+
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS changeset_history (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ changeset_id INTEGER NOT NULL,
+ created_at TEXT NOT NULL DEFAULT '',
+ closed_at TEXT NOT NULL DEFAULT '',
+ checked_at TEXT NOT NULL,
+ status TEXT NOT NULL,
+ total_elements INTEGER NOT NULL DEFAULT 0,
+ missing_count INTEGER NOT NULL DEFAULT 0,
+ ok_count INTEGER NOT NULL DEFAULT 0,
+ message TEXT NOT NULL DEFAULT ''
+ )
+ """)
+ for hist_col, hist_typedef in [("closed_at", "TEXT NOT NULL DEFAULT ''"),
+ ("created_at", "TEXT NOT NULL DEFAULT ''")]:
+ try:
+ conn.execute(f"ALTER TABLE changeset_history ADD COLUMN {hist_col} {hist_typedef}")
+ except sqlite3.OperationalError:
+ pass
+ conn.execute("""
+ CREATE INDEX IF NOT EXISTS idx_history_checked_at
+ ON changeset_history(checked_at DESC)
+ """)
+
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS element_history (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ history_id INTEGER NOT NULL,
+ changeset_id INTEGER NOT NULL,
+ element_type TEXT NOT NULL,
+ osm_id INTEGER NOT NULL,
+ version INTEGER NOT NULL DEFAULT 0,
+ action TEXT NOT NULL DEFAULT '',
+ status TEXT NOT NULL,
+ found_in_tables TEXT NOT NULL DEFAULT '',
+ found_in_views TEXT NOT NULL DEFAULT '',
+ checked_at TEXT NOT NULL,
+ FOREIGN KEY (history_id) REFERENCES changeset_history(id)
+ )
+ """)
+ conn.execute("""
+ CREATE INDEX IF NOT EXISTS idx_element_history_id
+ ON element_history(history_id)
+ """)
+
+ # Feed events: persistent log of alerts (failed elements, recoveries)
+ # for the RSS feed. Items are never deleted, only added.
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS feed_events (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ event_type TEXT NOT NULL,
+ element_type TEXT NOT NULL DEFAULT '',
+ osm_id INTEGER NOT NULL DEFAULT 0,
+ version INTEGER NOT NULL DEFAULT 0,
+ changeset_id INTEGER NOT NULL DEFAULT 0,
+ action TEXT NOT NULL DEFAULT '',
+ title TEXT NOT NULL,
+ description TEXT NOT NULL DEFAULT '',
+ link TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL
+ )
+ """)
+ conn.execute("""
+ CREATE INDEX IF NOT EXISTS idx_feed_events_created
+ ON feed_events(created_at DESC)
+ """)
+
+ conn.commit()
+
+
+# ---------------------------------------------------------------------------
+# Pending retries
+# ---------------------------------------------------------------------------
+
+def add_missing(changeset_id: int, element_type: str, osm_id: int,
+ max_retries: int, version: int = 0, action: str = "",
+ closed_at: str = ""):
+ """Register a missing element for future retry. If it already exists, do nothing."""
+ now = datetime.now(timezone.utc).isoformat()
+ with _lock:
+ conn = _get_conn()
+ conn.execute("""
+ INSERT OR IGNORE INTO pending_retries
+ (changeset_id, element_type, osm_id, version, action,
+ retry_count, max_retries, first_seen, last_checked, status, closed_at)
+ VALUES (?, ?, ?, ?, ?, 0, ?, ?, ?, 'pending', ?)
+ """, (changeset_id, element_type, osm_id, version, action, max_retries, now, now, closed_at or ""))
+ conn.commit()
+
+
+def get_pending():
+ """Return all elements with status='pending' that still need to be rechecked."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT * FROM pending_retries WHERE status = 'pending'"
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+
+def mark_resolved(changeset_id: int, element_type: str, osm_id: int):
+ """Element was found in a retry — remove it from pending."""
+ with _lock:
+ conn = _get_conn()
+ conn.execute("""
+ DELETE FROM pending_retries
+ WHERE changeset_id = ? AND element_type = ? AND osm_id = ?
+ """, (changeset_id, element_type, osm_id))
+ conn.commit()
+
+
+def increment_retry(changeset_id: int, element_type: str, osm_id: int,
+ final_status: str = "failed"):
+ """Bump retry_count. If it reaches max_retries, flip status to *final_status*.
+
+ *final_status* is normally 'failed', but callers may pass 'warning' when
+ the missing percentage is below the alerting threshold.
+
+ Returns the new status ('pending', 'warning', or 'failed').
+ """
+ now = datetime.now(timezone.utc).isoformat()
+ with _lock:
+ conn = _get_conn()
+ conn.execute("""
+ UPDATE pending_retries
+ SET retry_count = retry_count + 1, last_checked = ?
+ WHERE changeset_id = ? AND element_type = ? AND osm_id = ?
+ """, (now, changeset_id, element_type, osm_id))
+
+ row = conn.execute("""
+ SELECT retry_count, max_retries FROM pending_retries
+ WHERE changeset_id = ? AND element_type = ? AND osm_id = ?
+ """, (changeset_id, element_type, osm_id)).fetchone()
+
+ if row and row["retry_count"] >= row["max_retries"]:
+ conn.execute("""
+ UPDATE pending_retries SET status = ?
+ WHERE changeset_id = ? AND element_type = ? AND osm_id = ?
+ """, (final_status, changeset_id, element_type, osm_id))
+ conn.commit()
+ return final_status
+
+ conn.commit()
+ return "pending"
+
+
+def get_failed():
+ """Return all elements that exhausted their retries with status='failed'."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT * FROM pending_retries WHERE status = 'failed'"
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+
+def get_warnings():
+ """Return all elements that exhausted retries but are below the missing threshold."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT * FROM pending_retries WHERE status = 'warning'"
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+
+def clear_failed():
+ """Remove all failed entries (call after alerting)."""
+ with _lock:
+ conn = _get_conn()
+ conn.execute("DELETE FROM pending_retries WHERE status = 'failed'")
+ conn.commit()
+
+
+def get_all_details(ohm_base="https://www.openhistoricalmap.org"):
+ """Return all entries enriched with URLs and human-readable times."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT * FROM pending_retries ORDER BY status, first_seen DESC"
+ ).fetchall()
+
+ # Get changeset object counts from history (latest check per changeset)
+ cs_ids = list(set(r["changeset_id"] for r in rows))
+ cs_stats = {}
+ if cs_ids:
+ placeholders = ",".join("?" * len(cs_ids))
+ stats_rows = conn.execute(f"""
+ SELECT changeset_id, total_elements, missing_count, ok_count
+ FROM changeset_history
+ WHERE id IN (
+ SELECT MAX(id) FROM changeset_history
+ WHERE changeset_id IN ({placeholders})
+ GROUP BY changeset_id
+ )
+ """, cs_ids).fetchall()
+ for sr in stats_rows:
+ cs_stats[sr["changeset_id"]] = {
+ "total_elements": sr["total_elements"],
+ "missing_count": sr["missing_count"],
+ "ok_count": sr["ok_count"],
+ }
+
+ now = datetime.now(timezone.utc)
+ results = []
+ for r in rows:
+ entry = dict(r)
+ entry["changeset_url"] = f"{ohm_base}/changeset/{r['changeset_id']}"
+ entry["element_url"] = f"{ohm_base}/{r['element_type']}/{r['osm_id']}"
+ try:
+ first = datetime.fromisoformat(r["first_seen"])
+ entry["age"] = _human_duration((now - first).total_seconds())
+ except Exception:
+ entry["age"] = ""
+ try:
+ last = datetime.fromisoformat(r["last_checked"])
+ entry["last_checked_ago"] = _human_duration((now - last).total_seconds())
+ except Exception:
+ entry["last_checked_ago"] = ""
+ # Changeset closed_at (close time as formatted date)
+ closed_at_val = r["closed_at"] if "closed_at" in r.keys() else ""
+ if closed_at_val:
+ try:
+ closed = datetime.fromisoformat(closed_at_val.replace("Z", "+00:00"))
+ entry["closed_at_fmt"] = closed.strftime("%Y-%m-%d %H:%M UTC")
+ except Exception:
+ entry["closed_at_fmt"] = ""
+ else:
+ entry["closed_at_fmt"] = ""
+ # Changeset object counts
+ stats = cs_stats.get(r["changeset_id"], {})
+ entry["cs_total_elements"] = stats.get("total_elements", 0)
+ entry["cs_ok_count"] = stats.get("ok_count", 0)
+ entry["cs_missing_count"] = stats.get("missing_count", 0)
+ entry["retries_remaining"] = max(0, r["max_retries"] - r["retry_count"])
+ results.append(entry)
+ return results
+
+
+# ---------------------------------------------------------------------------
+# Changeset stats helpers
+# ---------------------------------------------------------------------------
+
+def get_changeset_stats(changeset_id: int):
+ """Return the latest total_elements, missing_count, ok_count for a changeset.
+
+ Returns a dict with those keys, or None if no history exists.
+ """
+ with _lock:
+ conn = _get_conn()
+ row = conn.execute("""
+ SELECT total_elements, missing_count, ok_count
+ FROM changeset_history
+ WHERE changeset_id = ?
+ ORDER BY id DESC LIMIT 1
+ """, (changeset_id,)).fetchone()
+ if row:
+ return {
+ "total_elements": row["total_elements"],
+ "missing_count": row["missing_count"],
+ "ok_count": row["ok_count"],
+ }
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Changeset history
+# ---------------------------------------------------------------------------
+
+def log_changeset_check(changeset_id: int, status: str,
+ total_elements: int, missing_count: int,
+ ok_count: int, message: str,
+ created_at: str = "", closed_at: str = "",
+ elements: list = None):
+ """Record a changeset check and its elements in the history tables."""
+ now = datetime.now(timezone.utc).isoformat()
+ with _lock:
+ conn = _get_conn()
+ cur = conn.execute("""
+ INSERT INTO changeset_history
+ (changeset_id, created_at, closed_at, checked_at, status, total_elements, missing_count, ok_count, message)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """, (changeset_id, created_at or "", closed_at or "", now, status, total_elements, missing_count, ok_count, message))
+ history_id = cur.lastrowid
+
+ if elements:
+ for elem in elements:
+ tables = ", ".join(elem.get("found_in_tables", []))
+ views = ", ".join(elem.get("found_in_views", []))
+ found = bool(elem.get("found_in_tables"))
+ deleted = elem.get("deleted", False)
+ if deleted:
+ elem_status = "skipped"
+ elif elem.get("action") == "delete":
+ elem_status = "ok" if not found else "not_deleted"
+ elif found:
+ elem_status = "ok"
+ else:
+ elem_status = "missing"
+ conn.execute("""
+ INSERT INTO element_history
+ (history_id, changeset_id, element_type, osm_id, version,
+ action, status, found_in_tables, found_in_views, checked_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """, (history_id, changeset_id, elem.get("type", ""),
+ elem.get("osm_id", 0), elem.get("version", 0),
+ elem.get("action", ""), elem_status, tables, views, now))
+ conn.commit()
+
+
+def get_changeset_history(page: int = 1, per_page: int = 20,
+ ohm_base: str = "https://www.openhistoricalmap.org"):
+ """Return paginated changeset check history with details."""
+ with _lock:
+ conn = _get_conn()
+ total = conn.execute("SELECT COUNT(*) FROM changeset_history").fetchone()[0]
+ total_pages = max(1, (total + per_page - 1) // per_page)
+ offset = (page - 1) * per_page
+
+ rows = conn.execute("""
+ SELECT * FROM changeset_history
+ ORDER BY checked_at DESC
+ LIMIT ? OFFSET ?
+ """, (per_page, offset)).fetchall()
+
+ now = datetime.now(timezone.utc)
+ results = []
+ for r in rows:
+ entry = dict(r)
+ entry["changeset_url"] = f"{ohm_base}/changeset/{r['changeset_id']}"
+ try:
+ checked = datetime.fromisoformat(r["checked_at"])
+ entry["checked_ago"] = _human_duration((now - checked).total_seconds())
+ except Exception:
+ entry["checked_ago"] = ""
+ if r["closed_at"]:
+ try:
+ closed = datetime.fromisoformat(r["closed_at"].replace("Z", "+00:00"))
+ entry["closed_ago"] = _human_duration((now - closed).total_seconds())
+ except Exception:
+ entry["closed_ago"] = ""
+ else:
+ entry["closed_ago"] = ""
+ created_at_val = r["created_at"] if "created_at" in r.keys() else ""
+ if created_at_val:
+ try:
+ created = datetime.fromisoformat(created_at_val.replace("Z", "+00:00"))
+ entry["created_fmt"] = created.strftime("%Y-%m-%d %H:%M UTC")
+ except Exception:
+ entry["created_fmt"] = ""
+ else:
+ entry["created_fmt"] = ""
+ results.append(entry)
+
+ return {
+ "page": page,
+ "per_page": per_page,
+ "total": total,
+ "total_pages": total_pages,
+ "changesets": results,
+ }
+
+
+def get_changeset_elements(history_id: int,
+ ohm_base: str = "https://www.openhistoricalmap.org"):
+ """Return all elements checked for a specific history entry."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute("""
+ SELECT * FROM element_history
+ WHERE history_id = ?
+ ORDER BY status DESC, element_type, osm_id
+ """, (history_id,)).fetchall()
+
+ results = []
+ for r in rows:
+ entry = dict(r)
+ entry["element_url"] = f"{ohm_base}/{r['element_type']}/{r['osm_id']}"
+ entry["found_in_tables"] = r["found_in_tables"].split(", ") if r["found_in_tables"] else []
+ entry["found_in_views"] = r["found_in_views"].split(", ") if r["found_in_views"] else []
+ results.append(entry)
+ return results
+
+
+def is_changeset_passed(changeset_id: int) -> bool:
+ """Return True if this changeset was already checked with status 'ok'."""
+ with _lock:
+ conn = _get_conn()
+ row = conn.execute("""
+ SELECT 1 FROM changeset_history
+ WHERE changeset_id = ? AND status = 'ok'
+ LIMIT 1
+ """, (changeset_id,)).fetchone()
+ return row is not None
+
+
+def summary():
+ """Return counts by status for logging."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute(
+ "SELECT status, COUNT(*) as cnt FROM pending_retries GROUP BY status"
+ ).fetchall()
+ return {r["status"]: r["cnt"] for r in rows}
+
+
+# ---------------------------------------------------------------------------
+# Feed events
+# ---------------------------------------------------------------------------
+
+def add_feed_event(event_type: str, title: str, description: str = "",
+ link: str = "", element_type: str = "", osm_id: int = 0,
+ version: int = 0, changeset_id: int = 0, action: str = ""):
+ """Add a persistent event to the RSS feed."""
+ now = datetime.now(timezone.utc).isoformat()
+ with _lock:
+ conn = _get_conn()
+ conn.execute("""
+ INSERT INTO feed_events
+ (event_type, element_type, osm_id, version, changeset_id,
+ action, title, description, link, created_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """, (event_type, element_type, osm_id, version, changeset_id,
+ action, title, description, link, now))
+ conn.commit()
+
+
+def get_feed_events(limit: int = 50):
+ """Return the most recent feed events for the RSS feed."""
+ with _lock:
+ conn = _get_conn()
+ rows = conn.execute("""
+ SELECT * FROM feed_events
+ ORDER BY created_at DESC
+ LIMIT ?
+ """, (limit,)).fetchall()
+ return [dict(r) for r in rows]
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _human_duration(seconds):
+ """Convert seconds to human-readable string like '2h 15m ago'."""
+ seconds = int(seconds)
+ if seconds < 60:
+ return f"{seconds}s ago"
+ minutes = seconds // 60
+ if minutes < 60:
+ return f"{minutes}m ago"
+ hours = minutes // 60
+ remaining_min = minutes % 60
+ if hours < 24:
+ return f"{hours}h {remaining_min}m ago" if remaining_min else f"{hours}h ago"
+ days = hours // 24
+ remaining_hours = hours % 24
+ return f"{days}d {remaining_hours}h ago" if remaining_hours else f"{days}d ago"
diff --git a/images/tiler-monitor/pipeline-monitor/static/dashboard.html b/images/tiler-monitor/pipeline-monitor/static/dashboard.html
new file mode 100644
index 00000000..b6980c94
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/static/dashboard.html
@@ -0,0 +1,596 @@
+
+
+
+
+
+OHM Tiler Pipeline Monitor
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ |
+ Changeset | Status | Elements | Missing |
+ Replication | Age | Message |
+
+
+
+
Waiting for current run to complete...
+
+
+
+
+
+
+
+
+
+ |
+ Changeset | Status | Total | OK |
+ Missing | Created | Closed | Checked | Message |
+
+
+
+
No changesets checked yet.
+
+
+
+
+
+
+
+
+
+
+
+ | Element | Changeset | Action | Status |
+ Retries | Objects | Changeset Closed | First Seen | Last Checked | |
+
+
+
+
No pending retries. All elements are in sync.
+
+
+
+
+
diff --git a/images/tiler-monitor/pipeline-monitor/tables_config.json b/images/tiler-monitor/pipeline-monitor/tables_config.json
new file mode 100644
index 00000000..ef0c407f
--- /dev/null
+++ b/images/tiler-monitor/pipeline-monitor/tables_config.json
@@ -0,0 +1,103 @@
+{
+ "tag_to_check": {
+ "amenity": {
+ "tables": ["osm_amenity_areas", "osm_amenity_points"],
+ "views": ["mv_amenity_areas_z14_15", "mv_amenity_areas_z16_20", "mv_amenity_points", "mv_amenity_points_centroids_z14_15", "mv_amenity_points_centroids_z16_20"]
+ },
+ "aeroway": {
+ "tables": ["osm_transport_lines", "osm_transport_areas", "osm_transport_points"],
+ "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20", "mv_transport_areas_z10_12", "mv_transport_areas_z16_20", "mv_transport_points", "mv_transport_points_centroids_z16_20"]
+ },
+ "barrier": {
+ "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points", "osm_water_lines"],
+ "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z16_20", "mv_water_lines_z8_9", "mv_water_lines_z16_20"]
+ },
+ "boundary": {
+ "tables": ["osm_admin_areas", "osm_admin_lines", "osm_admin_relation_members"],
+ "views": ["mv_admin_boundaries_areas_z0_2", "mv_admin_boundaries_areas_z16_20", "mv_admin_boundaries_lines_z0_2", "mv_admin_boundaries_lines_z16_20", "mv_admin_boundaries_centroids_z0_2", "mv_admin_boundaries_centroids_z16_20", "mv_admin_maritime_lines_z0_5_v2", "mv_admin_maritime_lines_z10_15", "mv_non_admin_boundaries_areas_z0_2", "mv_non_admin_boundaries_areas_z16_20", "mv_non_admin_boundaries_centroids_z0_2", "mv_non_admin_boundaries_centroids_z16_20", "mv_relation_members_boundaries"]
+ },
+ "building": {
+ "tables": ["osm_buildings", "osm_buildings_points"],
+ "views": ["mv_buildings_areas_z14_15", "mv_buildings_areas_z16_20", "mv_buildings_points", "mv_buildings_points_centroids_z14_15", "mv_buildings_points_centroids_z16_20"]
+ },
+ "communication": {
+ "tables": ["osm_communication_lines", "osm_communication_multilines"],
+ "views": ["mv_communication_z10_12", "mv_communication_z16_20"]
+ },
+ "craft": {
+ "tables": ["osm_other_points"],
+ "views": ["mv_other_points", "mv_other_points_centroids_z16_20"]
+ },
+ "highway": {
+ "tables": ["osm_transport_lines", "osm_transport_areas", "osm_transport_points", "osm_transport_multilines"],
+ "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20", "mv_transport_areas_z10_12", "mv_transport_areas_z16_20", "mv_transport_points", "mv_transport_points_centroids_z10_12", "mv_transport_points_centroids_z16_20"]
+ },
+ "historic": {
+ "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"],
+ "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"]
+ },
+ "landuse": {
+ "tables": ["osm_landuse_areas", "osm_landuse_lines", "osm_landuse_points", "osm_water_areas"],
+ "views": ["mv_landuse_areas_z6_7", "mv_landuse_areas_z16_20", "mv_landuse_lines_z14_15", "mv_landuse_lines_z16_20", "mv_landuse_points", "mv_landuse_points_centroids_z6_7", "mv_landuse_points_centroids_z16_20", "mv_water_areas_z0_2", "mv_water_areas_z16_20", "mv_water_areas_centroids_z8_9", "mv_water_areas_centroids_z16_20"]
+ },
+ "leisure": {
+ "tables": ["osm_landuse_areas", "osm_landuse_lines", "osm_landuse_points"],
+ "views": ["mv_landuse_areas_z6_7", "mv_landuse_areas_z16_20", "mv_landuse_lines_z14_15", "mv_landuse_lines_z16_20", "mv_landuse_points", "mv_landuse_points_centroids_z6_7", "mv_landuse_points_centroids_z16_20"]
+ },
+ "man_made": {
+ "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"],
+ "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"]
+ },
+ "military": {
+ "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"],
+ "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"]
+ },
+ "natural": {
+ "tables": ["osm_landuse_areas", "osm_landuse_lines", "osm_landuse_points", "osm_water_areas", "osm_water_lines"],
+ "views": ["mv_landuse_areas_z6_7", "mv_landuse_areas_z16_20", "mv_landuse_lines_z14_15", "mv_landuse_lines_z16_20", "mv_landuse_points", "mv_landuse_points_centroids_z6_7", "mv_landuse_points_centroids_z16_20", "mv_water_areas_z0_2", "mv_water_areas_z16_20", "mv_water_areas_centroids_z8_9", "mv_water_areas_centroids_z16_20", "mv_water_lines_z8_9", "mv_water_lines_z16_20"]
+ },
+ "place": {
+ "tables": ["osm_place_areas", "osm_place_points"],
+ "views": ["mv_place_areas_z14_20", "mv_place_points_centroids_z0_2", "mv_place_points_centroids_z11_20"]
+ },
+ "power": {
+ "tables": ["osm_other_areas", "osm_other_lines", "osm_other_points"],
+ "views": ["mv_other_areas_z8_9", "mv_other_areas_z16_20", "mv_other_lines_z14_15", "mv_other_lines_z16_20", "mv_other_points", "mv_other_points_centroids_z8_9", "mv_other_points_centroids_z16_20"]
+ },
+ "railway": {
+ "tables": ["osm_transport_lines", "osm_transport_areas", "osm_transport_points", "osm_transport_multilines"],
+ "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20", "mv_transport_areas_z10_12", "mv_transport_areas_z16_20", "mv_transport_points", "mv_transport_points_centroids_z10_12", "mv_transport_points_centroids_z16_20"]
+ },
+ "route": {
+ "tables": ["osm_route_lines", "osm_route_multilines"],
+ "views": ["mv_routes_indexed_z16_20"],
+ "view_column": "osm_id",
+ "view_id_mode": "members"
+ },
+ "shop": {
+ "tables": ["osm_other_points"],
+ "views": ["mv_other_points", "mv_other_points_centroids_z16_20"]
+ },
+ "tourism": {
+ "tables": ["osm_other_points"],
+ "views": ["mv_other_points", "mv_other_points_centroids_z16_20"]
+ },
+ "type=street": {
+ "tables": ["osm_street_multilines"],
+ "views": ["mv_transport_lines_z5", "mv_transport_lines_z16_20"]
+ },
+ "waterway": {
+ "tables": ["osm_water_lines", "osm_water_areas"],
+ "views": ["mv_water_lines_z8_9", "mv_water_lines_z16_20", "mv_water_areas_z0_2", "mv_water_areas_z16_20", "mv_water_areas_centroids_z8_9", "mv_water_areas_centroids_z16_20"]
+ }
+ },
+ "reject_values": {
+ "natural": ["coastline"]
+ },
+ "importable_relation_types": [
+ "multipolygon",
+ "boundary",
+ "route",
+ "street"
+ ]
+}