diff --git a/README.md b/README.md index bd62e04..6ddaa7b 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ print(f"Total damaged built-up: {damage.values.sum():,.0f} m²") | `floodpath.soil` | ISRIC SoilGrids 2.0 (250 m, COG) | Sand/silt/clay topsoil composition + USDA texture-triangle classification + NEH 630 Ch7 hydrologic soil group (A/B/C/D) | | `floodpath.precip` | Synthetic uniform (real fetchers later: ERA5 / IMERG / CHIRPS) | Precipitation depth raster (mm) — pluggable input to the runoff equation | | `floodpath.runoff` | NEH 630 Ch9 + Ch10 + landuse + HSG + precip | SCS Curve Number raster + SCS-CN runoff equation `Q = (P-0.2S)²/(P+0.8S)` | -| `floodpath.routing` | runoff + flow direction (pyflwdir) | Steady-state hydrologic routing: accumulated runoff volume + peak discharge | +| `floodpath.routing` | runoff + flow direction (pyflwdir) + roughness + HAND | Hydrologic routing (accumulation + peak discharge) + hydraulic closure (Manning normal-depth at streams, Leopold-Maddock width) + rainfall-driven HAND flood depth | | `floodpath.damage` | JRC Huizinga 2017 + DEM/HAND/GHSL | Per-cell flood depth and damage in m² of built-up surface | ## Depth-damage curves diff --git a/floodpath/routing/__init__.py b/floodpath/routing/__init__.py index a2a3915..8f71bde 100644 --- a/floodpath/routing/__init__.py +++ b/floodpath/routing/__init__.py @@ -1,32 +1,57 @@ -"""Routing module: hydrologic + (later) hydraulic routing of runoff. +"""Routing module: hydrologic + hydraulic routing of runoff. -Currently provides steady-state hydrologic routing — accumulation of -SCS-CN runoff along the DEM-derived flow direction grid, plus a -volume-to-peak-discharge conversion given an event duration. - -Manning normal-depth (the hydraulic closure that converts discharge to -water level at stream cells) is the natural next addition; it lives in -this module's `manning.py` once shipped. +Provides: + * accumulate_runoff(...) — flow-accumulation of SCS-CN runoff into + upstream-volume and peak-discharge fields (PR1: hydrologic side). + * compute_water_level(...) — Manning normal-depth at every stream + cell, given discharge + roughness + slope + Leopold-Maddock width + (PR2: hydraulic closure). + * compute_rainfall_inundation(...) — final step: broadcast per-stream + water levels upstream via flow direction and apply the existing HAND + machinery to produce a rainfall-driven flood-depth raster. """ from .accumulate import accumulate_runoff from .constants import ( DEFAULT_EVENT_DURATION_S, EARTH_RADIUS_M, + LEOPOLD_MADDOCK_A, + LEOPOLD_MADDOCK_B, + MANNING_MIN_DEPTH, + MANNING_MIN_SLOPE, ) from .discharge import peak_discharge +from .flood import RainfallInundationDepth, compute_rainfall_inundation +from .manning import ( + WaterLevelGrid, + compute_water_level, + leopold_maddock_width, + manning_normal_depth, +) from .models import ( AccumulatedRunoffGrid, DischargeGrid, ) -from .utils import cell_areas_m2 +from .utils import cell_areas_m2, pixel_sizes_m, slope_along_flow __all__ = [ "AccumulatedRunoffGrid", "DEFAULT_EVENT_DURATION_S", "DischargeGrid", "EARTH_RADIUS_M", + "LEOPOLD_MADDOCK_A", + "LEOPOLD_MADDOCK_B", + "MANNING_MIN_DEPTH", + "MANNING_MIN_SLOPE", + "RainfallInundationDepth", + "WaterLevelGrid", "accumulate_runoff", "cell_areas_m2", + "compute_rainfall_inundation", + "compute_water_level", + "leopold_maddock_width", + "manning_normal_depth", "peak_discharge", + "pixel_sizes_m", + "slope_along_flow", ] diff --git a/floodpath/routing/constants.py b/floodpath/routing/constants.py index 03d75b4..76fe2db 100644 --- a/floodpath/routing/constants.py +++ b/floodpath/routing/constants.py @@ -21,3 +21,28 @@ # when produced by these helpers. Future routing schemes # (unit hydrograph, kinematic wave) will use distinct strings. ROUTING_METHOD: str = "steady-state-flow-accumulation" + + +# --- Channel hydraulics (Manning normal-depth + Leopold-Maddock width) ----- +# +# Leopold & Maddock (1953) "The hydraulic geometry of stream channels and +# some physiographic implications" — empirical relation between bankfull +# discharge and channel width: +# +# w = a * Q^b +# +# where a, b are calibrated regional coefficients. The values below are the +# global natural-channel averages cited in Leopold & Maddock and refined by +# Andreadis et al. (2013) "A simple global river bankfull width and depth +# database" — appropriate when no local data are available. Override per +# basin / climate when calibration data exist. +LEOPOLD_MADDOCK_A: float = 2.5 # m * (m^3/s)^(-b) +LEOPOLD_MADDOCK_B: float = 0.5 # dimensionless + +# Numerical floors. Manning's equation has a sqrt(slope) and divisions by +# width and slope, so zero values would blow up. These caps are +# conservatively tight — the right answer is "no flow" when slopes are +# this small, but flagging the cell at all gives smoother boundary output +# than a hard NaN. +MANNING_MIN_SLOPE: float = 1e-5 # m/m — flat-as-a-pond +MANNING_MIN_DEPTH: float = 0.05 # m — 5 cm channel-bottom default diff --git a/floodpath/routing/flood.py b/floodpath/routing/flood.py new file mode 100644 index 0000000..07dc393 --- /dev/null +++ b/floodpath/routing/flood.py @@ -0,0 +1,154 @@ +"""Rainfall-driven HAND inundation: combine per-stream water levels with HAND.""" + +from dataclasses import dataclass + +import numpy as np +from rasterio.transform import Affine + +from floodpath.dem.models import BoundingBox +from floodpath.hydrology.models import FlowGrid, Hand, StreamNetwork + +from .manning import WaterLevelGrid + + +@dataclass +class RainfallInundationDepth: + """Per-cell flood depth driven by per-stream-cell channel water levels. + + Replaces the flat-water-level inundation used by `damage`'s static + `compute_inundation_depth(hand, water_level=5.0)`. The water level + here varies across the basin: each off-stream cell's depth is + + depth = max(h_at_drainage_stream - HAND, 0) + + where `h_at_drainage_stream` is the Manning normal-depth at the + stream cell that the off-stream cell drains to (via the flow + direction grid). + """ + + values: np.ndarray + transform: Affine + crs: str + bbox: BoundingBox + duration_s: float + discharge_source: str + method: str = "hand-driven-by-manning-water-levels" + units: str = "m (flood depth, rainfall-driven)" + + @property + def shape(self) -> tuple[int, int]: + """Return the (rows, cols) shape of the underlying raster.""" + return self.values.shape # type: ignore[return-value] + + def stats(self) -> dict[str, float]: + """Summary statistics over wet cells (depth > 0).""" + wet = self.values[self.values > 0] + if wet.size == 0: + return { + "min": 0.0, "max": 0.0, "mean": 0.0, "median": 0.0, + "wet_cells": 0, + } + return { + "min": float(wet.min()), + "max": float(wet.max()), + "mean": float(wet.mean()), + "median": float(np.median(wet)), + "wet_cells": int(wet.size), + } + + def flooded_fraction(self) -> float: + """Fraction of the patch with depth > 0.""" + if self.values.size == 0: + return 0.0 + return float((self.values > 0).sum() / self.values.size) + + +def _broadcast_stream_levels( + flow_grid: FlowGrid, + streams: StreamNetwork, + water_level: WaterLevelGrid, +) -> np.ndarray: + """Project per-stream-cell water levels onto every cell that drains to them. + + Uses `pyflwdir.FlwdirRaster.basins(idxs=stream_indices)` to label + every cell with the 1-based index of its drainage stream cell; + cells outside any stream's catchment (e.g. ridge cells that flow + off the bbox edge before hitting a stream) get basin_id=0 and water + level NaN. + """ + flwdir = flow_grid.flwdir_raster + stream_flat_idxs = np.flatnonzero(streams.mask.flatten()) + + basins = flwdir.basins(idxs=stream_flat_idxs) # 0 = background, else 1..N + + # Build a lookup table: water level for each basin id (1-based). + # Position 0 is reserved for "out of basin" -> NaN. + h_at_streams = water_level.values.flatten() + h_lookup = np.full(stream_flat_idxs.size + 1, np.nan, dtype=np.float32) + h_lookup[1:] = h_at_streams[stream_flat_idxs] + + return h_lookup[basins] + + +def compute_rainfall_inundation( + water_level: WaterLevelGrid, + hand: Hand, + flow_grid: FlowGrid, + streams: StreamNetwork, +) -> RainfallInundationDepth: + """Produce a rainfall-driven flood depth raster from the full chain. + + For every cell: + + depth = max(h_at_drainage - HAND, 0) + + where `h_at_drainage` is the Manning normal-depth at the cell's + associated stream cell (the first downstream stream cell along the + flow direction). Cells outside any stream's catchment, or with HAND + nodata, stay dry (depth = 0). + + Args: + water_level: WaterLevelGrid from `compute_water_level`. + hand: HAND raster from `compute_hand`. + flow_grid: Same flow grid used for HAND and routing. + streams: Same StreamNetwork used for HAND and water-level computation. + + Returns: + RainfallInundationDepth carrying per-cell flood depth in m. + + Raises: + ValueError: any of the inputs has a mismatched shape. + """ + if water_level.shape != flow_grid.shape: + raise ValueError( + f"water_level shape {water_level.shape} does not match flow_grid " + f"shape {flow_grid.shape}" + ) + if hand.shape != flow_grid.shape: + raise ValueError( + f"hand shape {hand.shape} does not match flow_grid shape " + f"{flow_grid.shape}" + ) + + h_field = _broadcast_stream_levels(flow_grid, streams, water_level) + + # depth = max(h - HAND, 0). HAND nodata is a large negative value + # from pyflwdir; treat anything < 0 as nodata -> dry. + valid_hand = hand.values >= 0 + valid_h = ~np.isnan(h_field) + diff = np.where(valid_hand & valid_h, h_field - hand.values, 0.0) + depth = np.maximum(diff, 0.0).astype(np.float32) + + return RainfallInundationDepth( + values=depth, + transform=flow_grid.transform, + crs=flow_grid.crs, + bbox=BoundingBox( + min_lon=flow_grid.bbox.min_lon, + min_lat=flow_grid.bbox.min_lat, + max_lon=flow_grid.bbox.max_lon, + max_lat=flow_grid.bbox.max_lat, + ), + duration_s=water_level.duration_s, + discharge_source=water_level.discharge_source, + ) diff --git a/floodpath/routing/manning.py b/floodpath/routing/manning.py new file mode 100644 index 0000000..7fc704e --- /dev/null +++ b/floodpath/routing/manning.py @@ -0,0 +1,243 @@ +"""Channel hydraulics: Manning normal-depth solver at stream cells. + +Closes the hydraulic side of the rainfall->flood chain. Inputs are +discharge (from `peak_discharge`) and Manning's roughness (from +`floodpath.landuse.landuse_to_roughness`); outputs are per-stream-cell +water levels h that feed the rainfall-driven HAND inundation in +`floodpath.routing.flood`. + +Manning's equation (in SI units): + + Q = (1/n) * A * R^(2/3) * S^(1/2) + +For a wide rectangular channel (h << w) we approximate: + A = w * h + R ~ h +which yields a closed-form normal-depth solution: + + h = ((Q * n) / (w * sqrt(S)))^(3/5) + +Channel width w comes from the Leopold-Maddock 1953 hydraulic-geometry +relation `w = a * Q^b` with global natural-channel defaults; users with +local calibration override `width_a, width_b`. +""" + +from dataclasses import dataclass + +import numpy as np +import rasterio.warp +from rasterio.enums import Resampling +from rasterio.transform import Affine + +from floodpath.dem.models import DEM, BoundingBox +from floodpath.hydrology.models import FlowGrid, StreamNetwork +from floodpath.landuse.roughness import RoughnessGrid + +from .constants import ( + LEOPOLD_MADDOCK_A, + LEOPOLD_MADDOCK_B, + MANNING_MIN_DEPTH, + MANNING_MIN_SLOPE, +) +from .models import DischargeGrid +from .utils import slope_along_flow + + +@dataclass +class WaterLevelGrid: + """Per-cell channel water depth (m), populated only at stream cells. + + `values[i, j]` is the Manning normal-depth water level above the + channel bed at stream cell (i, j). Non-stream cells carry NaN to + keep the field semantically clean for the upstream broadcast in + `compute_rainfall_inundation`. + + `discharge_source` records where the input discharge came from + (e.g. the precip-source string carried through accumulate->discharge), + plus method tags for the hydraulic solver. + """ + + values: np.ndarray + transform: Affine + crs: str + bbox: BoundingBox + duration_s: float + discharge_source: str + method: str = "manning-normal-depth" + units: str = "m (channel water depth)" + + @property + def shape(self) -> tuple[int, int]: + """Return the (rows, cols) shape of the underlying raster.""" + return self.values.shape # type: ignore[return-value] + + def stats_at_streams(self) -> dict[str, float]: + """Summary statistics over stream cells (NaN-skipping).""" + valid = self.values[~np.isnan(self.values)] + if valid.size == 0: + return {"min": 0.0, "max": 0.0, "mean": 0.0, "median": 0.0} + return { + "min": float(valid.min()), + "max": float(valid.max()), + "mean": float(valid.mean()), + "median": float(np.median(valid)), + } + + +def leopold_maddock_width( + discharge_q: np.ndarray, + a: float = LEOPOLD_MADDOCK_A, + b: float = LEOPOLD_MADDOCK_B, +) -> np.ndarray: + """Estimate channel width (m) from discharge via Leopold-Maddock. + + w = a * Q^b + + Args: + discharge_q: Discharge in m^3/s. Scalar or array. + a, b: Hydraulic-geometry coefficients. Global natural-channel + defaults are 2.5 and 0.5 (Andreadis et al. 2013). + + Returns: + Channel width in m, same shape as input. + """ + q = np.maximum(discharge_q, 0.0) + return float(a) * np.power(q, float(b)) + + +def manning_normal_depth( + discharge_q: np.ndarray, + width_w: np.ndarray, + slope_s: np.ndarray, + manning_n: np.ndarray | float, + min_depth: float = MANNING_MIN_DEPTH, + min_slope: float = MANNING_MIN_SLOPE, +) -> np.ndarray: + """Solve Manning's wide-channel approximation for normal depth h. + + h = ((Q * n) / (w * sqrt(S)))^(3/5) + + All inputs are array-broadcast compatible. Floors are applied to + slope (`min_slope`) to avoid sqrt(0) and to the output (`min_depth`) + to keep a tiny minimum channel depth where flow exists. + + Args: + discharge_q: Discharge in m^3/s. + width_w: Channel width in m. + slope_s: Channel slope in m/m. Clamped to `min_slope`. + manning_n: Manning's n in s/m^(1/3). Scalar or array. + min_depth: Minimum returned depth in m. Default 0.05 m + (ankle-deep cap to avoid zero-thin channels). + min_slope: Slope floor in m/m. + + Returns: + Water depth (m) at each cell, broadcast shape of inputs. + """ + s = np.maximum(slope_s, float(min_slope)) + sqrt_s = np.sqrt(s) + # Avoid divide-by-zero at width=0 (no channel) — caller usually masks + # to stream cells, but defend here too. + safe_w = np.where(width_w > 0, width_w, 1.0) + h = ((discharge_q * manning_n) / (safe_w * sqrt_s)) ** (3.0 / 5.0) + h = np.where(width_w > 0, h, 0.0) + h = np.maximum(h, float(min_depth)) + return h.astype(np.float32) + + +def _roughness_at_flow_grid( + roughness: RoughnessGrid, + flow_grid: FlowGrid, +) -> np.ndarray: + """Reproject a (typically 10 m) RoughnessGrid onto the flow grid. + + Roughness is a continuous physical property; an area-weighted average + is the right downsampling for it (same logic as the runoff reproject + in `accumulate_runoff`). + """ + out = np.zeros(flow_grid.shape, dtype=np.float32) + rasterio.warp.reproject( + source=roughness.values, + destination=out, + src_transform=roughness.transform, + src_crs=roughness.crs, + dst_transform=flow_grid.transform, + dst_crs=flow_grid.crs, + resampling=Resampling.average, + ) + return out + + +def compute_water_level( + discharge: DischargeGrid, + roughness: RoughnessGrid, + flow_grid: FlowGrid, + streams: StreamNetwork, + dem: DEM, + width_a: float = LEOPOLD_MADDOCK_A, + width_b: float = LEOPOLD_MADDOCK_B, + min_depth: float = MANNING_MIN_DEPTH, + min_slope: float = MANNING_MIN_SLOPE, +) -> WaterLevelGrid: + """Compute water level h at every stream cell via Manning's equation. + + Pipeline: + 1. Reproject the (typically 10 m) RoughnessGrid onto the flow grid. + 2. Compute slope along the flow direction at every cell from the DEM. + 3. Width at every cell = `leopold_maddock_width(Q)`. + 4. h at every cell = `manning_normal_depth(Q, w, S, n)`. + 5. Mask non-stream cells to NaN so the field cleanly says "water + level only at the channel." + + Args: + discharge: Per-cell peak discharge (m^3/s) from `peak_discharge`. + roughness: Manning's n raster from `landuse_to_roughness`. May + differ in resolution from the flow grid; gets reprojected. + flow_grid: FlowGrid (carries pyflwdir + transform + CRS). + streams: StreamNetwork mask defining channel cells. + dem: DEM whose elevations are used to compute slope. + width_a, width_b: Leopold-Maddock coefficients. + min_depth: Minimum channel depth floor (m). + min_slope: Slope floor (m/m). + + Returns: + WaterLevelGrid with h (m) at stream cells, NaN elsewhere. + """ + if discharge.shape != flow_grid.shape: + raise ValueError( + f"discharge shape {discharge.shape} does not match flow_grid shape " + f"{flow_grid.shape}" + ) + if streams.shape != flow_grid.shape: + raise ValueError( + f"streams shape {streams.shape} does not match flow_grid shape " + f"{flow_grid.shape}" + ) + + n_at_flow = _roughness_at_flow_grid(roughness, flow_grid) + slope = slope_along_flow(flow_grid, dem, min_slope=min_slope) + width = leopold_maddock_width(discharge.values, a=width_a, b=width_b) + h_full = manning_normal_depth( + discharge_q=discharge.values, + width_w=width, + slope_s=slope, + manning_n=n_at_flow, + min_depth=min_depth, + min_slope=min_slope, + ) + + # Mask non-stream cells. Keep stream cells' depths; NaN everywhere else. + h = np.where(streams.mask, h_full, np.nan).astype(np.float32) + + return WaterLevelGrid( + values=h, + transform=flow_grid.transform, + crs=flow_grid.crs, + bbox=BoundingBox( + min_lon=flow_grid.bbox.min_lon, + min_lat=flow_grid.bbox.min_lat, + max_lon=flow_grid.bbox.max_lon, + max_lat=flow_grid.bbox.max_lat, + ), + duration_s=discharge.duration_s, + discharge_source=discharge.precip_source, + ) diff --git a/floodpath/routing/utils.py b/floodpath/routing/utils.py index bed261e..ef7fade 100644 --- a/floodpath/routing/utils.py +++ b/floodpath/routing/utils.py @@ -5,7 +5,10 @@ import numpy as np from rasterio.transform import Affine -from .constants import DEG_LAT_M +from floodpath.dem.models import DEM +from floodpath.hydrology.models import FlowGrid + +from .constants import DEG_LAT_M, MANNING_MIN_SLOPE def cell_areas_m2( @@ -58,3 +61,107 @@ def cell_areas_m2( areas_per_row[:, None], shape, ).astype(np.float64).copy() + + +def pixel_sizes_m( + transform: Affine, + shape: tuple[int, int], + crs: str, +) -> tuple[np.ndarray, float]: + """Return per-row pixel x-size (m) and constant pixel y-size (m). + + Helper used by `slope_along_flow` to convert flow-direction step + distances from pixel-units to metres on the WGS84 graticule. + + Args: + transform: Rasterio Affine transform. + shape: (rows, cols) of the raster. + crs: CRS string. + + Returns: + Tuple `(pixel_x_per_row, pixel_y)` where: + * `pixel_x_per_row` is a (rows,) float array — x-size in metres + for each row's centre latitude. + * `pixel_y` is a scalar — y-size in metres (constant for WGS84 + and projected CRSes). + """ + pixel_x = abs(transform.a) + pixel_y = abs(transform.e) + rows, _ = shape + + is_geographic = ("4326" in crs) or ("WGS 84" in crs) or ("WGS84" in crs) + if not is_geographic: + return np.full(rows, pixel_x, dtype=np.float64), float(pixel_y) + + row_indices = np.arange(rows) + lat_centres = transform.f + (row_indices + 0.5) * transform.e + cos_lat = np.cos(np.radians(lat_centres)) + pixel_x_per_row = pixel_x * DEG_LAT_M * cos_lat + pixel_y_m = pixel_y * DEG_LAT_M + return pixel_x_per_row.astype(np.float64), float(pixel_y_m) + + +def slope_along_flow( + flow_grid: FlowGrid, + dem: DEM, + min_slope: float = MANNING_MIN_SLOPE, +) -> np.ndarray: + """Per-cell slope (m/m) measured along the downstream flow direction. + + For each cell c with downstream cell c' (= idxs_ds[c]): + + slope = (elev[c] - elev[c']) / distance(c, c') + + where distance accounts for diagonal vs orthogonal D8 steps and uses + metric pixel sizes (lat-aware in WGS84). + + Pits (cells where idxs_ds points to self) and cells where the + computed slope would be non-positive are clamped to `min_slope` so + Manning's `1/sqrt(S)` term doesn't blow up. The convention follows + pyflwdir's own `subgrid_rivslp` defaults. + + Args: + flow_grid: FlowGrid with attached pyflwdir FlwdirRaster. + dem: DEM whose elevations the slope is measured against. + min_slope: Floor applied to all slope values (m/m). Default 1e-5 + corresponds to ~1 cm drop per kilometre. + + Returns: + 2D float64 array of shape `dem.shape` with per-cell slope in m/m, + guaranteed >= `min_slope` everywhere. + """ + flwdir = flow_grid.flwdir_raster + nrows, ncols = dem.elevation.shape + n = nrows * ncols + + elev = dem.elevation.astype(np.float64).flatten() + idxs_ds = np.asarray(flwdir.idxs_ds) + + # Identify pits (downstream is self or out of bounds — idxs_ds = -1). + self_idx = np.arange(n, dtype=idxs_ds.dtype) + pit_mask = (idxs_ds == self_idx) | (idxs_ds < 0) + safe_ds = np.where(pit_mask, self_idx, idxs_ds) + + # Row/col of each cell and its downstream neighbour. + rows = self_idx // ncols + cols = self_idx % ncols + rows_ds = safe_ds // ncols + cols_ds = safe_ds % ncols + drow = (rows - rows_ds).astype(np.int64) # +1 = downstream is north + dcol = (cols - cols_ds).astype(np.int64) + + pixel_x_per_row, pixel_y = pixel_sizes_m( + flow_grid.transform, dem.elevation.shape, dem.crs + ) + pixel_x_flat = pixel_x_per_row[rows] + distance = np.sqrt((drow * pixel_y) ** 2 + (dcol * pixel_x_flat) ** 2) + + delta_elev = elev - elev[safe_ds] + with np.errstate(divide="ignore", invalid="ignore"): + slope = np.where(distance > 0, delta_elev / distance, 0.0) + + # Pits get min_slope; non-positive slopes (sinks, flat valleys) get min_slope. + slope = np.where(pit_mask, min_slope, slope) + slope = np.maximum(slope, float(min_slope)) + + return slope.reshape(dem.elevation.shape) diff --git a/tests/routing/test_flood.py b/tests/routing/test_flood.py new file mode 100644 index 0000000..c4d79c7 --- /dev/null +++ b/tests/routing/test_flood.py @@ -0,0 +1,179 @@ +"""Unit tests for routing.flood (rainfall-driven inundation).""" + +import numpy as np +import pytest +from rasterio.transform import Affine + +from floodpath.dem.models import DEM, BoundingBox +from floodpath.hydrology.models import FlowGrid, Hand, StreamNetwork +from floodpath.landuse.models import LanduseGrid +from floodpath.routing import ( + RainfallInundationDepth, + WaterLevelGrid, + compute_rainfall_inundation, +) + + +class TestRainfallInundationDepthBasics: + def test_stats_skip_dry_cells(self) -> None: + d = RainfallInundationDepth( + values=np.array([[0.0, 1.0, 2.0]], dtype=np.float32), + transform=Affine.translation(0, 0) * Affine.scale(0.001, -0.001), + crs="EPSG:4326", + bbox=BoundingBox(0, 0, 0.003, 0.001), + duration_s=21600.0, + discharge_source="uniform", + ) + s = d.stats() + assert s["wet_cells"] == 2 + assert s["min"] == pytest.approx(1.0) + assert s["max"] == pytest.approx(2.0) + assert s["mean"] == pytest.approx(1.5) + + def test_stats_all_dry(self) -> None: + d = RainfallInundationDepth( + values=np.zeros((3, 3), dtype=np.float32), + transform=Affine.translation(0, 0) * Affine.scale(0.001, -0.001), + crs="EPSG:4326", + bbox=BoundingBox(0, 0, 0.003, 0.003), + duration_s=21600.0, + discharge_source="uniform", + ) + s = d.stats() + assert s["wet_cells"] == 0 + + def test_flooded_fraction(self) -> None: + d = RainfallInundationDepth( + values=np.array([[0.0, 0.0, 1.0, 2.0]], dtype=np.float32), + transform=Affine.translation(0, 0) * Affine.scale(0.001, -0.001), + crs="EPSG:4326", + bbox=BoundingBox(0, 0, 0.004, 0.001), + duration_s=21600.0, + discharge_source="uniform", + ) + assert d.flooded_fraction() == pytest.approx(0.5) + + +class TestRainfallInundationEndToEnd: + """Run the full rainfall-driven inundation chain on Robit Bata.""" + + @pytest.fixture + def robit_bata_flood( + self, + robit_bata_dem: DEM, + robit_bata_flow_grid: FlowGrid, + robit_bata_streams: StreamNetwork, + robit_bata_hand: Hand, + robit_bata_worldcover: LanduseGrid, + robit_bata_soil, + ) -> RainfallInundationDepth: + from floodpath.landuse import landuse_to_roughness + from floodpath.precip import uniform_precip_like + from floodpath.routing import ( + accumulate_runoff, compute_water_level, peak_discharge, + ) + from floodpath.runoff import apply_scs_cn, compute_curve_number + from floodpath.soil import texture_to_hsg + + hsg = texture_to_hsg(robit_bata_soil) + cn = compute_curve_number(robit_bata_worldcover, hsg) + roughness = landuse_to_roughness(robit_bata_worldcover) + precip = uniform_precip_like(cn, depth_mm=100.0) + runoff = apply_scs_cn(cn, precip) + acc = accumulate_runoff(runoff, robit_bata_flow_grid) + discharge = peak_discharge(acc, duration_s=6 * 3600.0) + water_level = compute_water_level( + discharge, roughness, robit_bata_flow_grid, + robit_bata_streams, robit_bata_dem, + ) + return compute_rainfall_inundation( + water_level, robit_bata_hand, + robit_bata_flow_grid, robit_bata_streams, + ) + + def test_flooded_fraction_pinned(self, robit_bata_flood) -> None: + # Pinned: 100 mm uniform + 6 hr -> ~9.5% of patch is wet under + # rainfall-driven HAND inundation. This catches regressions in + # any of the upstream chain steps (CN, SCS-CN, accumulation, + # Manning, broadcast, HAND). + assert robit_bata_flood.flooded_fraction() == pytest.approx(0.095, abs=0.005) + + def test_max_depth_pinned(self, robit_bata_flood) -> None: + # Pinned: max flood depth ~10.07 m at the outlet (= max water + # level since HAND=0 at the stream cell itself). + s = robit_bata_flood.stats() + assert s["max"] == pytest.approx(10.07, abs=0.10) + + def test_min_depth_among_wet_is_positive(self, robit_bata_flood) -> None: + # Every "wet" cell must have depth > 0 by construction. + s = robit_bata_flood.stats() + assert s["min"] > 0 + assert s["wet_cells"] > 0 + + def test_stream_cells_have_max_depth( + self, + robit_bata_flood, + robit_bata_streams: StreamNetwork, + robit_bata_hand: Hand, + ) -> None: + # At a stream cell, HAND=0, so depth = water_level (the maximum + # possible at that cell). All stream cells should be wet. + stream_depths = robit_bata_flood.values[robit_bata_streams.mask] + # Stream cells with valid HAND should be wet. + valid_stream_hand = robit_bata_hand.values[robit_bata_streams.mask] >= 0 + assert (stream_depths[valid_stream_hand] > 0).all() + + +class TestShapeMismatchErrors: + @pytest.fixture + def stub_inputs(self): + # Minimal stubs for shape-mismatch testing — bypass the real + # FlowGrid/Hand assembly since we never reach the pyflwdir call. + from types import SimpleNamespace + transform = Affine.translation(0, 0) * Affine.scale(0.001, -0.001) + bbox = BoundingBox(0, 0, 0.003, 0.003) + hand = Hand( + values=np.zeros((3, 3), dtype=np.float32), + transform=transform, crs="EPSG:4326", bbox=bbox, + stream_threshold=200, + ) + flow = SimpleNamespace( + shape=(3, 3), + transform=transform, + crs="EPSG:4326", + bbox=bbox, + flwdir_raster=None, + accumulation=np.zeros((3, 3)), + ) + streams = StreamNetwork( + mask=np.zeros((3, 3), dtype=bool), + order=np.zeros((3, 3), dtype=np.int16), + threshold=200, + transform=transform, crs="EPSG:4326", + ) + return hand, flow, streams + + def test_water_level_shape_mismatch_raises(self, stub_inputs) -> None: + hand, flow, streams = stub_inputs + bad_wl = WaterLevelGrid( + values=np.full((4, 4), np.nan, dtype=np.float32), # wrong shape + transform=flow.transform, crs=flow.crs, bbox=flow.bbox, + duration_s=21600.0, discharge_source="uniform", + ) + with pytest.raises(ValueError, match="water_level shape"): + compute_rainfall_inundation(bad_wl, hand, flow, streams) + + def test_hand_shape_mismatch_raises(self, stub_inputs) -> None: + hand, flow, streams = stub_inputs + bad_hand = Hand( + values=np.zeros((4, 4), dtype=np.float32), # wrong shape + transform=flow.transform, crs=flow.crs, bbox=flow.bbox, + stream_threshold=200, + ) + good_wl = WaterLevelGrid( + values=np.full(flow.shape, np.nan, dtype=np.float32), + transform=flow.transform, crs=flow.crs, bbox=flow.bbox, + duration_s=21600.0, discharge_source="uniform", + ) + with pytest.raises(ValueError, match="hand shape"): + compute_rainfall_inundation(good_wl, bad_hand, flow, streams) diff --git a/tests/routing/test_manning.py b/tests/routing/test_manning.py new file mode 100644 index 0000000..c3e137c --- /dev/null +++ b/tests/routing/test_manning.py @@ -0,0 +1,256 @@ +"""Unit tests for routing.manning.""" + +import numpy as np +import pytest +from rasterio.transform import Affine + +from floodpath.dem.models import DEM, BoundingBox +from floodpath.hydrology.models import FlowGrid, StreamNetwork +from floodpath.landuse.models import LanduseGrid +from floodpath.landuse.roughness import RoughnessGrid +from floodpath.routing import ( + DischargeGrid, + LEOPOLD_MADDOCK_A, + LEOPOLD_MADDOCK_B, + MANNING_MIN_DEPTH, + WaterLevelGrid, + compute_water_level, + leopold_maddock_width, + manning_normal_depth, +) + + +class TestLeopoldMaddockWidth: + def test_zero_q_yields_zero_width(self) -> None: + w = leopold_maddock_width(np.array([0.0])) + assert w[0] == pytest.approx(0.0) + + def test_one_cubic_metre_per_second(self) -> None: + # Q=1 -> w = 2.5 * 1^0.5 = 2.5 m + w = leopold_maddock_width(np.array([1.0])) + assert w[0] == pytest.approx(LEOPOLD_MADDOCK_A, rel=1e-6) + + def test_hundred_q_with_default_b(self) -> None: + # Q=100, b=0.5 -> w = 2.5 * sqrt(100) = 25 m + w = leopold_maddock_width(np.array([100.0])) + assert w[0] == pytest.approx(25.0, rel=1e-6) + + def test_custom_coefficients(self) -> None: + # Override a, b to known values. + w = leopold_maddock_width(np.array([16.0]), a=3.0, b=0.25) + # 3 * 16^0.25 = 3 * 2 = 6 + assert w[0] == pytest.approx(6.0, rel=1e-6) + + def test_negative_q_clipped_to_zero(self) -> None: + # Negative discharge shouldn't happen, but should not produce NaN. + w = leopold_maddock_width(np.array([-5.0])) + assert w[0] == pytest.approx(0.0) + + +class TestManningNormalDepth: + """Hand-computed reference values for Q=10 m^3/s, w=10 m, S=0.001, n=0.04: + + h = ((Q*n) / (w*sqrt(S)))^(3/5) + = ((10*0.04) / (10*sqrt(0.001)))^(3/5) + = (0.4 / 0.3162)^(3/5) + = 1.2649^(3/5) + = 1.150 m + + All explicit pinned values in this test class share that setup + unless noted. + """ + + def test_canonical_value(self) -> None: + h = manning_normal_depth( + discharge_q=np.array([10.0]), + width_w=np.array([10.0]), + slope_s=np.array([0.001]), + manning_n=np.array([0.04]), + min_depth=0.0, + ) + assert h[0] == pytest.approx(1.150, abs=0.01) + + def test_higher_n_gives_deeper_water(self) -> None: + # Doubling n at constant Q/w/S must increase h (rougher channel + # needs more head to push the same discharge). + h_low = manning_normal_depth( + discharge_q=np.array([10.0]), width_w=np.array([10.0]), + slope_s=np.array([0.001]), manning_n=np.array([0.02]), + min_depth=0.0, + ) + h_high = manning_normal_depth( + discharge_q=np.array([10.0]), width_w=np.array([10.0]), + slope_s=np.array([0.001]), manning_n=np.array([0.04]), + min_depth=0.0, + ) + assert h_high[0] > h_low[0] + + def test_steeper_slope_gives_shallower(self) -> None: + # 100x slope -> sqrt(slope) 10x -> denominator 10x -> h reduced. + h_flat = manning_normal_depth( + discharge_q=np.array([10.0]), width_w=np.array([10.0]), + slope_s=np.array([0.0001]), manning_n=np.array([0.04]), + min_depth=0.0, + ) + h_steep = manning_normal_depth( + discharge_q=np.array([10.0]), width_w=np.array([10.0]), + slope_s=np.array([0.01]), manning_n=np.array([0.04]), + min_depth=0.0, + ) + assert h_flat[0] > h_steep[0] + + def test_zero_q_yields_min_depth_floor(self) -> None: + # Q=0 should give h=0 mathematically; floor pushes to min_depth. + h = manning_normal_depth( + discharge_q=np.array([0.0]), width_w=np.array([10.0]), + slope_s=np.array([0.01]), manning_n=np.array([0.04]), + min_depth=0.05, + ) + assert h[0] == pytest.approx(0.05) + + def test_min_slope_clamp(self) -> None: + # Slope 0 must be clamped to min_slope, not produce inf/NaN. + h = manning_normal_depth( + discharge_q=np.array([10.0]), width_w=np.array([10.0]), + slope_s=np.array([0.0]), manning_n=np.array([0.04]), + min_depth=0.0, + min_slope=1e-5, + ) + assert np.isfinite(h[0]) + # With slope=1e-5, h = ((10*0.04)/(10*sqrt(1e-5)))^(3/5) + # = (0.4/0.0316)^(3/5) = (12.65)^(3/5) = 4.62 m approx + assert h[0] == pytest.approx(4.62, abs=0.05) + + def test_zero_width_yields_zero_then_floor(self) -> None: + # Width=0 means no channel; depth should fall back to min_depth. + h = manning_normal_depth( + discharge_q=np.array([10.0]), width_w=np.array([0.0]), + slope_s=np.array([0.01]), manning_n=np.array([0.04]), + min_depth=0.05, + ) + assert h[0] == pytest.approx(0.05) + + +class TestWaterLevelGridStats: + def test_stats_skip_nan(self) -> None: + # Build a tiny grid with mixed NaN (non-stream) and finite values. + values = np.array([[0.5, np.nan, 1.0]], dtype=np.float32) + grid = WaterLevelGrid( + values=values, + transform=Affine.translation(0, 0) * Affine.scale(0.001, -0.001), + crs="EPSG:4326", + bbox=BoundingBox(0, 0, 0.003, 0.001), + duration_s=21600.0, + discharge_source="uniform", + ) + s = grid.stats_at_streams() + assert s["min"] == pytest.approx(0.5) + assert s["max"] == pytest.approx(1.0) + assert s["mean"] == pytest.approx(0.75) + + +class TestComputeWaterLevelEndToEnd: + """Ties Manning to the existing Robit Bata fixtures + pipeline.""" + + def test_outlet_depth_pinned( + self, + robit_bata_dem: DEM, + robit_bata_flow_grid: FlowGrid, + robit_bata_streams: StreamNetwork, + robit_bata_worldcover: LanduseGrid, + robit_bata_soil, + ) -> None: + # Pinned: 100 mm uniform precip + 6-hr duration -> outlet + # peak Q=66.2 m^3/s. Manning normal-depth at that outlet given + # mild slope and ~20 m channel width gives h around 10 m. + from floodpath.landuse import landuse_to_roughness + from floodpath.precip import uniform_precip_like + from floodpath.runoff import compute_curve_number, apply_scs_cn + from floodpath.routing import ( + accumulate_runoff, peak_discharge, compute_water_level, + ) + from floodpath.soil import texture_to_hsg + + hsg = texture_to_hsg(robit_bata_soil) + cn = compute_curve_number(robit_bata_worldcover, hsg) + roughness = landuse_to_roughness(robit_bata_worldcover) + precip = uniform_precip_like(cn, depth_mm=100.0) + runoff = apply_scs_cn(cn, precip) + acc = accumulate_runoff(runoff, robit_bata_flow_grid) + discharge = peak_discharge(acc, duration_s=6 * 3600.0) + water_level = compute_water_level( + discharge=discharge, + roughness=roughness, + flow_grid=robit_bata_flow_grid, + streams=robit_bata_streams, + dem=robit_bata_dem, + ) + s = water_level.stats_at_streams() + assert s["max"] == pytest.approx(10.07, abs=0.10) + assert s["mean"] == pytest.approx(0.96, abs=0.05) + # Min should be at or above the floor. + assert s["min"] >= MANNING_MIN_DEPTH - 1e-6 + + def test_metadata_propagates( + self, + robit_bata_dem: DEM, + robit_bata_flow_grid: FlowGrid, + robit_bata_streams: StreamNetwork, + robit_bata_worldcover: LanduseGrid, + robit_bata_soil, + ) -> None: + from floodpath.landuse import landuse_to_roughness + from floodpath.precip import uniform_precip_like + from floodpath.runoff import compute_curve_number, apply_scs_cn + from floodpath.routing import ( + accumulate_runoff, peak_discharge, compute_water_level, + ) + from floodpath.soil import texture_to_hsg + + hsg = texture_to_hsg(robit_bata_soil) + cn = compute_curve_number(robit_bata_worldcover, hsg) + roughness = landuse_to_roughness(robit_bata_worldcover) + precip = uniform_precip_like(cn, depth_mm=100.0) + runoff = apply_scs_cn(cn, precip) + acc = accumulate_runoff(runoff, robit_bata_flow_grid) + discharge = peak_discharge(acc, duration_s=6 * 3600.0) + water_level = compute_water_level( + discharge, roughness, robit_bata_flow_grid, + robit_bata_streams, robit_bata_dem, + ) + assert water_level.duration_s == 6 * 3600.0 + assert water_level.discharge_source == "uniform" + assert water_level.method == "manning-normal-depth" + # Stream cell shape preserved + assert water_level.shape == robit_bata_flow_grid.shape + + def test_non_stream_cells_are_nan( + self, + robit_bata_dem: DEM, + robit_bata_flow_grid: FlowGrid, + robit_bata_streams: StreamNetwork, + robit_bata_worldcover: LanduseGrid, + robit_bata_soil, + ) -> None: + from floodpath.landuse import landuse_to_roughness + from floodpath.precip import uniform_precip_like + from floodpath.runoff import compute_curve_number, apply_scs_cn + from floodpath.routing import ( + accumulate_runoff, peak_discharge, compute_water_level, + ) + from floodpath.soil import texture_to_hsg + + hsg = texture_to_hsg(robit_bata_soil) + cn = compute_curve_number(robit_bata_worldcover, hsg) + roughness = landuse_to_roughness(robit_bata_worldcover) + precip = uniform_precip_like(cn, depth_mm=100.0) + runoff = apply_scs_cn(cn, precip) + acc = accumulate_runoff(runoff, robit_bata_flow_grid) + discharge = peak_discharge(acc, duration_s=6 * 3600.0) + water_level = compute_water_level( + discharge, roughness, robit_bata_flow_grid, + robit_bata_streams, robit_bata_dem, + ) + # NaN where streams.mask is False; finite where True. + assert np.isnan(water_level.values[~robit_bata_streams.mask]).all() + assert np.isfinite(water_level.values[robit_bata_streams.mask]).all() diff --git a/tests/routing/test_slope.py b/tests/routing/test_slope.py new file mode 100644 index 0000000..81a1d82 --- /dev/null +++ b/tests/routing/test_slope.py @@ -0,0 +1,93 @@ +"""Unit tests for slope_along_flow + pixel_sizes_m.""" + +import numpy as np +import pytest +from rasterio.transform import Affine + +from floodpath.dem.models import DEM, BoundingBox +from floodpath.hydrology.models import FlowGrid +from floodpath.hydrology import build_flow_grid +from floodpath.routing import pixel_sizes_m, slope_along_flow +from floodpath.routing.constants import MANNING_MIN_SLOPE + + +class TestPixelSizesM: + def test_projected_metres(self) -> None: + # 30m projected pixel: row size scalar = 30, x-per-row = 30 everywhere. + transform = Affine.translation(0, 0) * Affine.scale(30.0, -30.0) + x_per_row, y = pixel_sizes_m(transform, shape=(4, 4), crs="EPSG:32636") + np.testing.assert_array_equal(x_per_row, np.full(4, 30.0)) + assert y == pytest.approx(30.0) + + def test_geographic_equator_vs_60n(self) -> None: + # Geographic CRS: x-size shrinks by cos(60) ~ 0.5 at 60 deg N. + transform_eq = Affine(0.01, 0.0, 0.0, 0.0, -0.01, 0.005) + transform_hi = Affine(0.01, 0.0, 0.0, 0.0, -0.01, 60.005) + eq_x, eq_y = pixel_sizes_m(transform_eq, shape=(1, 1), crs="EPSG:4326") + hi_x, hi_y = pixel_sizes_m(transform_hi, shape=(1, 1), crs="EPSG:4326") + # y is constant + assert hi_y == pytest.approx(eq_y, rel=1e-9) + # x at 60 deg N is half (cos(60) = 0.5) + assert hi_x[0] == pytest.approx(eq_x[0] * 0.5, rel=1e-3) + + +class TestSlopeAlongFlow: + def test_slope_clamped_to_min(self) -> None: + # Flat DEM: every cell's downstream is itself or has zero drop. + # Slope must be clamped to min_slope, NOT zero (otherwise sqrt + # blows up downstream in Manning). + flat = np.full((10, 10), 100.0, dtype=np.float32) + # build_flow_grid uses pyflwdir.from_dem which fills depressions + transform = Affine.translation(0, 0) * Affine.scale(0.0001, -0.0001) + dem = DEM( + elevation=flat, + transform=transform, + crs="EPSG:4326", + bbox=BoundingBox(0, 0, 0.001, 0.001), + ) + flow = build_flow_grid(dem) + slope = slope_along_flow(flow, dem) + assert (slope >= MANNING_MIN_SLOPE).all() + # On a flat DEM, basically every cell is at the floor. + assert slope.max() == pytest.approx(MANNING_MIN_SLOPE, abs=1e-9) + + def test_inclined_dem_yields_finite_slopes(self) -> None: + # 10x10 DEM that drops 1 m per cell from north to south. + nrows, ncols = 10, 10 + elev = np.array( + [[100 - r for c in range(ncols)] for r in range(nrows)], + dtype=np.float32, + ) + transform = Affine.translation(0, 0) * Affine.scale(0.0001, -0.0001) + dem = DEM( + elevation=elev, + transform=transform, + crs="EPSG:4326", + bbox=BoundingBox(0, 0, 0.001, 0.001), + ) + flow = build_flow_grid(dem) + slope = slope_along_flow(flow, dem) + # Slopes must be positive and at least min_slope; some cells will + # have substantial slopes (~1m drop / ~10m pixel = 0.1 m/m). + assert (slope >= MANNING_MIN_SLOPE).all() + # Most cells should have actual slopes well above the floor. + substantial = (slope > 0.001).sum() + assert substantial > 0 + + def test_robit_bata_slope_distribution( + self, + robit_bata_dem: DEM, + robit_bata_flow_grid: FlowGrid, + ) -> None: + # Robit Bata is a real mountain catchment — expect slopes ranging + # from the floor (valley flats) up to several percent (steep + # hillsides). Median should be in mountain-catchment range. + slope = slope_along_flow(robit_bata_flow_grid, robit_bata_dem) + assert slope.shape == robit_bata_dem.shape + assert (slope >= MANNING_MIN_SLOPE).all() + # Median slope > 1% — verifies we're picking up real terrain + # gradients, not just flat-fill artefacts. + assert float(np.median(slope)) > 0.01 + # Max slope should be substantial but not absurd (no cliffs in + # this part of Ethiopia). + assert float(slope.max()) < 1.0 # less than 100% (45 deg)