Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
from dataclasses import dataclass
from logging import Logger
from threading import BoundedSemaphore

from fastapi import HTTPException, Request
from pyiceberg.catalog import Catalog
from rustworkx import PyDiGraph


@dataclass
class GpkgLimiter:
"""Per-app concurrency guard for the hydrofabric gpkg endpoint."""

semaphore: BoundedSemaphore
queue_timeout_s: float


def get_catalog(request: Request) -> Catalog:
"""Gets the pyiceberg catalog reference from the app state

Expand Down Expand Up @@ -92,3 +102,11 @@ def get_graphs(request: Request) -> PyDiGraph:
if not hasattr(request.app.state, "network_graphs") or request.app.state.network_graphs is None:
raise HTTPException(status_code=500, detail="network_graphs not loaded")
return request.app.state.network_graphs


def get_gpkg_limiter(request: Request) -> GpkgLimiter:
"""Returns the per-app GpkgLimiter; 500 if not configured in lifespan."""
limiter = getattr(request.app.state, "gpkg_limiter", None)
if limiter is None:
raise HTTPException(status_code=500, detail="gpkg_limiter not loaded")
return limiter
70 changes: 68 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import logging
import os
import threading
from contextlib import asynccontextmanager
from pathlib import Path

import anyio
import uvicorn
from fastapi import FastAPI, status
from fastapi.staticfiles import StaticFiles
Expand All @@ -12,6 +14,7 @@
from pyiceberg.exceptions import NoSuchTableError
from pyprojroot import here

from app import GpkgLimiter
from app.routers.hydrofabric.router import api_router as hydrofabric_api_router
from app.routers.nwm_modules.router import (
cfe_router,
Expand Down Expand Up @@ -118,18 +121,39 @@ async def lifespan(app: FastAPI):
"""
app.state.main_logger = main_logger
app.state.main_logger.info("Application starting up.")
# Cap per-worker sync-handler concurrency. Hydrofabric/ras_xs/nwm handlers
# can spike to hundreds of MB of pandas/geopandas memory per in-flight
# request, so on a t3.large (8 GB / 2 workers) we keep this low to avoid
# OOM. Effective per-instance concurrency = workers * total_tokens.
thread_limiter = anyio.to_thread.current_default_thread_limiter()
thread_limiter.total_tokens = 20
app.state.main_logger.info(f"AnyIO threadpool limit set to {thread_limiter.total_tokens}")
deploy_env = os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or args.deploy_env
deploy_env = deploy_env.lower()
load_creds(deploy_env)
if args.cache_catalog == "sql":
if args.cache_catalog == "sql" and not os.environ.get("ICEFABRIC_CACHE_BUILT"):
app.state.main_logger.info("Building local SQL cache...")
build_cache(set(args.cached_namespaces), deploy_env)
else:
app.state.main_logger.info(
"Skipping local SQL cache build (already built by parent process or disabled)."
)
catalog = load_catalog(args.catalog)
cache_catalog = load_catalog(args.cache_catalog)
hydrofabric_namespaces = ["conus_hf", "ak_hf", "hi_hf", "prvi_hf"]
app.state.catalog = catalog
app.state.cache_catalog = cache_catalog
app.state.cached_namespaces = {e.split(":")[0] for e in args.cached_namespaces}
# Per-worker concurrency cap for the heavy gpkg endpoint. Tunable via env.
gpkg_concurrency = int(os.environ.get("ICEFABRIC_HF_GPKG_CONCURRENCY", "1"))
gpkg_queue_timeout_s = float(os.environ.get("ICEFABRIC_HF_GPKG_QUEUE_TIMEOUT_S", "300"))
app.state.gpkg_limiter = GpkgLimiter(
semaphore=threading.BoundedSemaphore(gpkg_concurrency),
queue_timeout_s=gpkg_queue_timeout_s,
)
app.state.main_logger.info(
f"gpkg concurrency cap per worker = {gpkg_concurrency} (queue timeout {gpkg_queue_timeout_s:.0f}s)"
)
try:
app.state.network_graphs = load_upstream_json(
catalog=catalog,
Expand Down Expand Up @@ -211,4 +235,46 @@ def get_health() -> HealthCheck:
print("INFO: Documentation directory 'static/docs' not found. Docs will not be served.")

if __name__ == "__main__":
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True, log_level="info")
# One-time setup in the parent before forking workers. With workers>1,
# doing this in lifespan races: concurrent SQL cache builds clobber the
# warehouse, and concurrent load_upstream_json() calls have one worker
# reading a partially-written graph JSON (-> EOF). Pre-building here
# means workers only hit the safe "read existing" paths.
_deploy_env = (
os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or args.deploy_env
).lower()
load_creds(_deploy_env)

if args.cache_catalog == "sql":
main_logger.info("Building local SQL cache (parent process, one-time)...")
build_cache(set(args.cached_namespaces), _deploy_env)
os.environ["ICEFABRIC_CACHE_BUILT"] = "1"

# Prewarm hydrofabric graph JSON files so workers only read, never write.
_hf_namespaces = ["conus_hf", "ak_hf", "hi_hf", "prvi_hf"]
try:
main_logger.info("Prewarming hydrofabric network graphs (parent process)...")
_prewarm_catalog = load_catalog(args.catalog)
load_upstream_json(
catalog=_prewarm_catalog,
namespaces=_hf_namespaces,
output_path=here() / "data",
)
except NoSuchTableError:
main_logger.warning(
"Hydrofabric namespaces not reachable at prewarm time; workers will attempt at startup."
)

# Recycle each worker after this many requests. Resets per-process RSS
# that otherwise creeps from glibc/numpy fragmentation over time. The
# supervisor respawns the worker; new workers skip the heavy one-time
# setup (cache + graphs are already on disk), so churn is ~seconds.
max_requests_per_worker = int(os.environ.get("ICEFABRIC_MAX_REQUESTS_PER_WORKER", "500"))
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
workers=2,
log_level="info",
limit_max_requests=max_requests_per_worker,
)
128 changes: 94 additions & 34 deletions app/routers/hydrofabric/router.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import gc
import logging
import pathlib
import sqlite3
import tempfile
import time
import uuid

import geopandas as gpd
Expand All @@ -13,7 +15,14 @@
from pyiceberg.expressions import EqualTo
from starlette.background import BackgroundTask

from app import get_cache_catalog, get_cached_namespaces, get_catalog, get_graphs
from app import (
GpkgLimiter,
get_cache_catalog,
get_cached_namespaces,
get_catalog,
get_gpkg_limiter,
get_graphs,
)
from icefabric.hydrofabric import subset_hydrofabric, subset_nhf
from icefabric.schemas import (
DivideAttributes,
Expand Down Expand Up @@ -41,8 +50,16 @@
api_router = APIRouter(prefix="/hydrofabric")


def _cleanup_tmp(path: pathlib.Path) -> None:
"""Delete a temp file; safe to call multiple times."""
try:
path.unlink(missing_ok=True)
except OSError as e: # pragma: no cover - defensive
logger.warning(f"cleanup: failed to delete {path}: {e}")


@api_router.get("/{identifier}/gpkg", tags=["Hydrofabric Services"])
async def get_hydrofabric_subset_gpkg(
def get_hydrofabric_subset_gpkg(
identifier: str = FastAPIPath(
...,
description="Identifier to start tracing from (e.g., catchment ID, POI ID, HL_URI)",
Expand Down Expand Up @@ -81,6 +98,7 @@ async def get_hydrofabric_subset_gpkg(
cache_catalog=Depends(get_cache_catalog),
cached_namespaces=Depends(get_cached_namespaces),
network_graphs=Depends(get_graphs),
gpkg_limiter: GpkgLimiter = Depends(get_gpkg_limiter),
):
"""
Get hydrofabric subset as a geopackage file (.gpkg)
Expand Down Expand Up @@ -122,6 +140,31 @@ async def get_hydrofabric_subset_gpkg(
# swap catalog for cached catalog if appropriate
catalog = cache_catalog if namespace in cached_namespaces else catalog

# Cap concurrent heavy builds per worker. Each CONUS VPU subset peaks
# at hundreds of MB of pandas/geopandas memory, so uncapped concurrency
# can OOM the instance. Queue timeouts surface as 503 rather than a
# silently stalled client.
sem_wait_start = time.monotonic()
if not gpkg_limiter.semaphore.acquire(timeout=gpkg_limiter.queue_timeout_s):
raise HTTPException(
status_code=503,
detail=(
"Hydrofabric service is at capacity. Please retry shortly. "
f"(waited {gpkg_limiter.queue_timeout_s:.0f}s for a slot)"
),
headers={"Retry-After": "30"},
)
sem_held = True
sem_wait_ms = (time.monotonic() - sem_wait_start) * 1000
if sem_wait_ms > 250:
logger.info(f"gpkg semaphore wait: {sem_wait_ms:.0f} ms for {identifier}")

def _release_sem() -> None:
nonlocal sem_held
if sem_held:
sem_held = False
gpkg_limiter.semaphore.release()

try:
if namespace.is_nhf:
if id_type == QueryIdType.VPU_ID:
Expand Down Expand Up @@ -175,30 +218,50 @@ async def get_hydrofabric_subset_gpkg(

tmp_path.parent.mkdir(parents=True, exist_ok=True)

# Partition layers up front so we can pop + free as we write.
# pyogrio handles spatial, sqlite handles tabular (incl. empty ones).
spatial_names: list[str] = []
nonspatial_names: list[str] = []
for name, data in output_layers.items():
if isinstance(data, gpd.GeoDataFrame) and len(data) > 0:
spatial_names.append(name)
elif not isinstance(data, gpd.GeoDataFrame):
nonspatial_names.append(name)

layers_written = 0
spatial_layers = {}
nonspatial_layers = {}

# Separate spatial vs non-spatial
for table_name, layer_data in output_layers.items():
if isinstance(layer_data, gpd.GeoDataFrame) and len(layer_data) > 0:
spatial_layers[table_name] = layer_data
elif not isinstance(layer_data, gpd.GeoDataFrame):
nonspatial_layers[table_name] = layer_data

# Write spatial layers first with pyogrio
for table_name, layer_data in spatial_layers.items():
pyogrio.write_dataframe(layer_data, tmp_path, layer=table_name)
layers_written += 1
logger.info(f"Written spatial layer '{table_name}' with {len(layer_data)} records")

# Then write non-spatial layers with sqlite3 (includes empty layers)
conn = sqlite3.connect(tmp_path)
for table_name, layer_data in nonspatial_layers.items():
layer_data.to_sql(table_name, conn, if_exists="replace", index=False)
# Stream spatial layers one at a time: pop -> write -> del + gc so
# RSS stays flat across layers instead of accumulating.
for name in spatial_names:
layer_data = output_layers.pop(name)
n_rows = len(layer_data)
pyogrio.write_dataframe(layer_data, tmp_path, layer=name)
del layer_data
gc.collect()
layers_written += 1
logger.info(f"Written non-spatial layer '{table_name}' with {len(layer_data)} records")
conn.close()
logger.info(f"Written spatial layer '{name}' with {n_rows} records")

# Share one sqlite connection across tabular layers.
if nonspatial_names:
conn = sqlite3.connect(tmp_path)
try:
for name in nonspatial_names:
layer_data = output_layers.pop(name)
n_rows = len(layer_data)
layer_data.to_sql(name, conn, if_exists="replace", index=False)
del layer_data
gc.collect()
layers_written += 1
logger.info(f"Written non-spatial layer '{name}' with {n_rows} records")
finally:
conn.close()

# Drop any layers skipped above (empty spatial frames etc.).
output_layers.clear()
gc.collect()

# Heavy work is done; release the slot before streaming to the client.
_release_sem()

if layers_written == 0:
raise HTTPException(
Expand Down Expand Up @@ -234,34 +297,31 @@ async def get_hydrofabric_subset_gpkg(
"X-Domain": namespace,
"X-Layers-Count": str(layers_written),
},
background=BackgroundTask(lambda: tmp_path.unlink(missing_ok=True)),
background=BackgroundTask(_cleanup_tmp, tmp_path),
)

except HTTPException:
# Clean up temp file if it exists and re-raise HTTP exceptions
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
_cleanup_tmp(tmp_path)
raise
except FileNotFoundError as e:
# Clean up temp file if it exists
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
_cleanup_tmp(tmp_path)
raise HTTPException(status_code=404, detail=f"Required file not found: {str(e)}") from None
except ValueError as e:
# Clean up temp file if it exists
if tmp_path.exists():
tmp_path.unlink(missing_ok=True)
_cleanup_tmp(tmp_path)
if "No origin found" in str(e):
raise HTTPException(
status_code=404,
detail=f"No origin found for {id_type.value}='{identifier}' in namespace '{namespace}'",
) from None
else:
raise HTTPException(status_code=400, detail=f"Invalid request: {str(e)}") from None
finally:
# Idempotent: no-op if already released on the happy path.
_release_sem()


@api_router.get("/history", tags=["Hydrofabric Services"])
async def get_hydrofabric_history(
def get_hydrofabric_history(
domain: str = Query("conus_hf", description="The iceberg namespace used to query the hydrofabric"),
catalog=Depends(get_catalog),
):
Expand Down
Loading
Loading