Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f7b5c1b
perf: change api route handlers to def instead of async def
dylanlee Apr 22, 2026
41006cd
perf: only load local catalog once independent of uvicorn worker count
dylanlee Apr 22, 2026
8d93db0
perf: prewarm hydrofabric graph JSONs in parent process to avoid work…
dylanlee Apr 22, 2026
5f8bb3b
perf: raise nginx timeouts
dylanlee Apr 22, 2026
9ff522a
feat: PET module returns empty list
dylanlee Apr 22, 2026
c9886b0
feat: allow nullable nhd_feature_id's to account for missing alaska N…
dylanlee Apr 22, 2026
2cd1c1a
perf: added semaphore for subset heavy endpoints
dylanlee Apr 22, 2026
a5104f3
perf: limit semaphore threads to 1 for Test instance size
dylanlee Apr 22, 2026
c45d489
test: fix tests to use new gpkglimiter
dylanlee Apr 22, 2026
407c6ad
perf: reset workers and increase gpkg timeout
dylanlee Apr 22, 2026
373bbae
infra: modify user_data.sh.tpl to add concurrency env variables
dylanlee Apr 22, 2026
d06e931
fix: add 404 exception for vpu not found
dylanlee Apr 22, 2026
5720f5a
perf: garbage collect after writing all layers in gpkg instead of per…
dylanlee Apr 22, 2026
8eb9f79
perf: lower default MAX_REQUESTS_PER_WORKER
dylanlee Apr 22, 2026
4bbcb3f
test: add dask as test dependency
dylanlee Apr 22, 2026
04b7cda
perf: add gpkg, icechunk, and sqlite catalog table caching
dylanlee Apr 22, 2026
0f4c2d2
infra: bump cache size to 250 gpkg's
dylanlee Apr 22, 2026
6d3b549
perf: add a max request queue depth on requests that trigger subsetter
dylanlee Apr 22, 2026
d5cbdd5
fix: fix gpkg cache file response bug
dylanlee Apr 22, 2026
5eb85bb
fix: make it so that queue_depth applies to both hydrofabric and para…
dylanlee Apr 22, 2026
d91c2d3
fix: address user provide codeql path error
dylanlee Apr 22, 2026
049b97a
log: send no associated flowpath error from gage subsetter when appro…
dylanlee Apr 22, 2026
1f86ef6
infra: tune down max_requests_per_worker to 100
dylanlee Apr 22, 2026
47bf549
nit: set 100 as app main max_requests_per_worker default
dylanlee Apr 22, 2026
c58fb38
fix: codeql path resolution issues
dylanlee Apr 22, 2026
f77b8b8
feat: maintain ability to subset when gage is on virtual flowpath
dylanlee Apr 22, 2026
d1539a1
fix: vfp gages make it to subset gpkg gages layer
dylanlee Apr 22, 2026
b443fd2
fix: vfp subset only returns divide vfp is in
dylanlee Apr 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,4 @@ data/*
.Rdata
.Rhistory
icefabric.Rproj
docs/api_schema.json
199 changes: 199 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,187 @@
import hashlib
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from logging import Logger
from pathlib import Path
from threading import BoundedSemaphore
from typing import Any

from fastapi import HTTPException, Request
from pyiceberg.catalog import Catalog
from rustworkx import PyDiGraph

_log = logging.getLogger(__name__)


class _GpkgAdmission:
"""Context manager returned by :meth:`GpkgLimiter.admit`.

Enforces, in order: queue-depth admission (429 fast-fail when too many
requests are already waiting), then semaphore acquire with timeout
(503 if the wait exceeds ``queue_timeout_s``). Releases the semaphore
on exit. ``release()`` is idempotent so callers streaming a response
(e.g. FastAPI FileResponse) can free the slot early while still being
wrapped in ``with``.
"""

def __init__(
self,
limiter: "GpkgLimiter",
*,
logger: logging.Logger | None,
context: str,
) -> None:
self._limiter = limiter
self._logger = logger
self._context = context
self._sem_held = False

def __enter__(self) -> "_GpkgAdmission":
lim = self._limiter
with lim.queue_lock:
if lim.waiting >= lim.max_queue_depth:
current = lim.waiting
raise HTTPException(
status_code=429,
detail=(
"Hydrofabric service is over capacity "
f"({current} requests already queued, max "
f"{lim.max_queue_depth}). Please retry shortly."
),
headers={"Retry-After": "30"},
)
lim.waiting += 1
start = time.monotonic()
try:
acquired = lim.semaphore.acquire(timeout=lim.queue_timeout_s)
finally:
with lim.queue_lock:
lim.waiting -= 1
if not acquired:
raise HTTPException(
status_code=503,
detail=(
"Hydrofabric service is at capacity. Please retry shortly. "
f"(waited {lim.queue_timeout_s:.0f}s for a slot)"
),
headers={"Retry-After": "30"},
)
self._sem_held = True
wait_ms = (time.monotonic() - start) * 1000
if wait_ms > 250 and self._logger is not None:
suffix = f" {self._context}" if self._context else ""
self._logger.info(f"gpkg semaphore wait: {wait_ms:.0f} ms{suffix}")
return self

def release(self) -> None:
"""Release the semaphore slot. Idempotent; safe to call in error paths."""
if self._sem_held:
self._sem_held = False
self._limiter.semaphore.release()

def __exit__(self, exc_type, exc, tb) -> None:
self.release()


@dataclass
class GpkgLimiter:
"""Per-app concurrency + admission guard for the hydrofabric gpkg endpoint.

``semaphore`` caps concurrent heavy builds per worker. ``max_queue_depth``
caps how many additional requests may be *waiting* on that semaphore
before we shed load with a 429 instead of letting clients sit through
``queue_timeout_s`` of silence. ``waiting`` is the live waiter count,
guarded by ``queue_lock``. Use :meth:`admit` from request handlers; it
wraps the full admission protocol as a context manager.
"""

semaphore: BoundedSemaphore
queue_timeout_s: float
max_queue_depth: int = 15
queue_lock: threading.Lock = field(default_factory=threading.Lock)
waiting: int = 0

def admit(self, *, logger: logging.Logger | None = None, context: str = "") -> _GpkgAdmission:
"""Return a context manager that performs queue-depth + timeout admission.

Raises ``HTTPException(429)`` if the queue is full, ``HTTPException(503)``
if the semaphore can't be acquired within ``queue_timeout_s``. ``context``
is appended to the ``semaphore wait`` log line when it exceeds 250ms.
"""
return _GpkgAdmission(self, logger=logger, context=context)


@dataclass
class StreamflowData:
"""Icechunk repo + xarray Dataset for streamflow. Open once per worker."""

dataset: Any
repo: Any


class GpkgCache:
"""Disk-based result cache for hydrofabric gpkg responses.

Key: sha256 of (namespace, id_type, identifier, snapshot_id). Any table
snapshot change naturally invalidates keys. Eviction is an LRU-by-atime
cap on file count (simpler than tracking bytes and good enough here).
"""

def __init__(self, cache_dir: Path, max_entries: int):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.max_entries = max_entries
self._evict_lock = threading.Lock()

@staticmethod
def key(namespace: str, id_type: str, identifier: str, snapshot_id: str) -> str:
"""Build the deterministic cache key for a subset request."""
raw = f"{namespace}:{id_type}:{identifier}:{snapshot_id}".encode()
return hashlib.sha256(raw).hexdigest()

def path(self, key: str) -> Path:
"""Return the on-disk path for a given cache key."""
return self.cache_dir / f"{key}.gpkg"

def get(self, key: str) -> Path | None:
"""Return cached path if present (and bump atime for LRU); else None."""
p = self.path(key)
if not p.exists():
return None
try:
os.utime(p, None) # bump atime/mtime for LRU
except OSError:
pass
return p

def commit(self, key: str, built_path: Path) -> Path:
"""Atomically install a freshly-built gpkg at the cache path.

Caller should write to a temp path first, then call commit() to move.
Safe under concurrent writers for the same key (last writer wins on
identical content).
"""
dest = self.path(key)
os.replace(built_path, dest)
self._evict_if_needed()
return dest

def _evict_if_needed(self) -> None:
with self._evict_lock:
files = list(self.cache_dir.glob("*.gpkg"))
if len(files) <= self.max_entries:
return
files.sort(key=lambda p: p.stat().st_atime)
for old in files[: len(files) - self.max_entries]:
try:
old.unlink()
_log.info(f"gpkg cache evicted {old.name}")
except OSError:
pass


def get_catalog(request: Request) -> Catalog:
"""Gets the pyiceberg catalog reference from the app state
Expand Down Expand Up @@ -92,3 +270,24 @@ def get_graphs(request: Request) -> PyDiGraph:
if not hasattr(request.app.state, "network_graphs") or request.app.state.network_graphs is None:
raise HTTPException(status_code=500, detail="network_graphs not loaded")
return request.app.state.network_graphs


def get_gpkg_limiter(request: Request) -> GpkgLimiter:
"""Returns the per-app GpkgLimiter; 500 if not configured in lifespan."""
limiter = getattr(request.app.state, "gpkg_limiter", None)
if limiter is None:
raise HTTPException(status_code=500, detail="gpkg_limiter not loaded")
return limiter


def get_streamflow_data(request: Request) -> StreamflowData:
"""Returns the cached StreamflowData opened in lifespan."""
data = getattr(request.app.state, "streamflow_data", None)
if data is None:
raise HTTPException(status_code=503, detail="streamflow_data not loaded")
return data


def get_gpkg_cache(request: Request) -> GpkgCache | None:
"""Returns the per-app GpkgCache; None if disabled (caching skipped)."""
return getattr(request.app.state, "gpkg_cache", None)
131 changes: 129 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import logging
import os
import threading
from contextlib import asynccontextmanager
from pathlib import Path

import anyio
import uvicorn
from fastapi import FastAPI, status
from fastapi.staticfiles import StaticFiles
Expand All @@ -12,6 +14,7 @@
from pyiceberg.exceptions import NoSuchTableError
from pyprojroot import here

from app import GpkgCache, GpkgLimiter, StreamflowData
from app.routers.hydrofabric.router import api_router as hydrofabric_api_router
from app.routers.nwm_modules.router import (
cfe_router,
Expand Down Expand Up @@ -118,18 +121,79 @@ async def lifespan(app: FastAPI):
"""
app.state.main_logger = main_logger
app.state.main_logger.info("Application starting up.")
# Cap per-worker sync-handler concurrency. Hydrofabric/ras_xs/nwm handlers
# can spike to hundreds of MB of pandas/geopandas memory per in-flight
# request, so on a t3.large (8 GB / 2 workers) we keep this low to avoid
# OOM. Effective per-instance concurrency = workers * total_tokens.
thread_limiter = anyio.to_thread.current_default_thread_limiter()
thread_limiter.total_tokens = 20
app.state.main_logger.info(f"AnyIO threadpool limit set to {thread_limiter.total_tokens}")
deploy_env = os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or args.deploy_env
deploy_env = deploy_env.lower()
load_creds(deploy_env)
if args.cache_catalog == "sql":
if args.cache_catalog == "sql" and not os.environ.get("ICEFABRIC_CACHE_BUILT"):
app.state.main_logger.info("Building local SQL cache...")
build_cache(set(args.cached_namespaces), deploy_env)
else:
app.state.main_logger.info(
"Skipping local SQL cache build (already built by parent process or disabled)."
)
catalog = load_catalog(args.catalog)
cache_catalog = load_catalog(args.cache_catalog)
hydrofabric_namespaces = ["conus_hf", "ak_hf", "hi_hf", "prvi_hf"]

# Cache pyiceberg Table objects for the local SQL catalog. Data in the
# SQL warehouse is frozen for this worker's lifetime (parent built it),
# so repeated load_table() calls can safely return the same metadata
# object. Each call otherwise round-trips SQLite + re-parses manifest
# JSON (~5-20ms); per-request we hit 11 tables for hydrofabric gpkg.
_sql_table_cache: dict = {}
_sql_table_cache_lock = threading.Lock()
_sql_original_load_table = cache_catalog.load_table

def _cached_load_table(identifier):
key = str(identifier)
with _sql_table_cache_lock:
cached = _sql_table_cache.get(key)
if cached is None:
cached = _sql_original_load_table(identifier)
_sql_table_cache[key] = cached
return cached

cache_catalog.load_table = _cached_load_table # type: ignore[method-assign]

app.state.catalog = catalog
app.state.cache_catalog = cache_catalog
app.state.cached_namespaces = {e.split(":")[0] for e in args.cached_namespaces}
# Per-worker concurrency cap for the heavy gpkg endpoint. Tunable via env.
gpkg_concurrency = int(os.environ.get("ICEFABRIC_HF_GPKG_CONCURRENCY", "1"))
gpkg_queue_timeout_s = float(os.environ.get("ICEFABRIC_HF_GPKG_QUEUE_TIMEOUT_S", "300"))
# Queue-depth admission: reject with 429 once this many requests are
# already waiting for a semaphore slot. Chosen so the worst-case wait
# (max_queue_depth * expected build time) stays under queue_timeout_s.
gpkg_max_queue_depth = int(os.environ.get("ICEFABRIC_HF_GPKG_MAX_QUEUE_DEPTH", "15"))
app.state.gpkg_limiter = GpkgLimiter(
semaphore=threading.BoundedSemaphore(gpkg_concurrency),
queue_timeout_s=gpkg_queue_timeout_s,
max_queue_depth=gpkg_max_queue_depth,
)
app.state.main_logger.info(
f"gpkg concurrency cap per worker = {gpkg_concurrency} "
f"(queue timeout {gpkg_queue_timeout_s:.0f}s, max queue depth {gpkg_max_queue_depth})"
)

# Disk-based result cache for hydrofabric gpkg. A given (namespace,
# id_type, identifier, snapshot_id) is deterministic, so repeat requests
# can skip the subset entirely. Disk-only -> zero RAM cost. Set max
# entries conservatively; at ~200 MB per CONUS VPU, 30 entries ~= 6 GiB.
gpkg_cache_enabled = os.environ.get("ICEFABRIC_GPKG_CACHE_ENABLED", "1") != "0"
if gpkg_cache_enabled:
gpkg_cache_dir = Path(os.environ.get("ICEFABRIC_GPKG_CACHE_DIR", "/tmp/hf_gpkg_cache"))
gpkg_cache_max = int(os.environ.get("ICEFABRIC_GPKG_CACHE_MAX_ENTRIES", "30"))
app.state.gpkg_cache = GpkgCache(cache_dir=gpkg_cache_dir, max_entries=gpkg_cache_max)
app.state.main_logger.info(f"gpkg result cache at {gpkg_cache_dir} (max {gpkg_cache_max} entries)")
else:
app.state.gpkg_cache = None
try:
app.state.network_graphs = load_upstream_json(
catalog=catalog,
Expand All @@ -140,6 +204,27 @@ async def lifespan(app: FastAPI):
raise NotImplementedError(
"Cannot load API as the Hydrofabric Database/Namespace cannot be connected to. Please ensure you are have access to the correct hydrofabric namespaces"
) from None

# Open the streamflow icechunk repo + zarr dataset once per worker.
# zarr reads are lazy so RAM cost is metadata only.
try:
import icechunk
import xarray as xr

from icefabric.cli.streamflow import PREFIX, get_bucket

storage_config = icechunk.s3_storage(
bucket=get_bucket(), prefix=PREFIX, region="us-east-1", from_env=True
)
_streamflow_repo = icechunk.Repository.open(storage_config)
_streamflow_session = _streamflow_repo.writable_session("main")
_streamflow_ds = xr.open_zarr(_streamflow_session.store, consolidated=False)
app.state.streamflow_data = StreamflowData(dataset=_streamflow_ds, repo=_streamflow_repo)
app.state.main_logger.info("Opened streamflow icechunk dataset.")
except Exception as e: # noqa: BLE001
app.state.main_logger.warning(f"Could not open streamflow dataset: {e}")
app.state.streamflow_data = None

yield
app.state.main_logger.info("Application shutting down.")

Expand Down Expand Up @@ -211,4 +296,46 @@ def get_health() -> HealthCheck:
print("INFO: Documentation directory 'static/docs' not found. Docs will not be served.")

if __name__ == "__main__":
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True, log_level="info")
# One-time setup in the parent before forking workers. With workers>1,
# doing this in lifespan races: concurrent SQL cache builds clobber the
# warehouse, and concurrent load_upstream_json() calls have one worker
# reading a partially-written graph JSON (-> EOF). Pre-building here
# means workers only hit the safe "read existing" paths.
_deploy_env = (
os.environ.get("ICEFABRIC_DEPLOY_ENV") or os.environ.get("ENVIRONMENT") or args.deploy_env
).lower()
load_creds(_deploy_env)

if args.cache_catalog == "sql":
main_logger.info("Building local SQL cache (parent process, one-time)...")
build_cache(set(args.cached_namespaces), _deploy_env)
os.environ["ICEFABRIC_CACHE_BUILT"] = "1"

# Prewarm hydrofabric graph JSON files so workers only read, never write.
_hf_namespaces = ["conus_hf", "ak_hf", "hi_hf", "prvi_hf"]
try:
main_logger.info("Prewarming hydrofabric network graphs (parent process)...")
_prewarm_catalog = load_catalog(args.catalog)
load_upstream_json(
catalog=_prewarm_catalog,
namespaces=_hf_namespaces,
output_path=here() / "data",
)
except NoSuchTableError:
main_logger.warning(
"Hydrofabric namespaces not reachable at prewarm time; workers will attempt at startup."
)

# Recycle each worker after this many requests. Resets per-process RSS
# that otherwise creeps from glibc/numpy fragmentation over time. The
# supervisor respawns the worker; new workers skip the heavy one-time
# setup (cache + graphs are already on disk), so churn is ~seconds.
max_requests_per_worker = int(os.environ.get("ICEFABRIC_MAX_REQUESTS_PER_WORKER", "100"))
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
workers=2,
log_level="info",
limit_max_requests=max_requests_per_worker,
)
Loading
Loading