Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/icefabric/hydrofabric/subset_nhf.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,16 +410,17 @@ def generate_subset_virtual_only(
) -> dict[str, gpd.GeoDataFrame]:
"""Subset hydrofabric for a virtual-only gage (no main-network upstream).

Returns all layers that belong to the ``div_id`` the virtual flowpath
sits in. No graph traversal is performed because virtual-only flowpaths
are leaf nodes — there is no upstream virtual or main network.
Returns all layers and attributes that belong to the ``div_id`` the
virtual flowpath sits in — including ALL virtual flowpaths within the
divide, not just the originating one. No graph traversal is performed
because virtual-only flowpaths are leaf nodes — there is no upstream
virtual or main network.
"""
logger.debug(f"Virtual-only subset: virtual_fp_id={virtual_fp_id}, div_id={div_id}")
crs = source.namespace.crs
div_ids = {int(div_id)}
vfp_ids = {int(virtual_fp_id)}

# Wave 1: direct div_id / virtual_fp_id filters
# Wave 1: direct div_id filters (parallel)
with ThreadPoolExecutor(max_workers=6) as ex:
f = {
"fp": ex.submit(source.load_filtered, "flowpaths", "div_id", div_ids),
Expand All @@ -428,29 +429,36 @@ def generate_subset_virtual_only(
"gages": ex.submit(source.load_filtered, "gages", "div_id", div_ids),
"ref_fp": ex.submit(source.load_filtered, "reference_flowpaths", "div_id", div_ids),
"lakes": ex.submit(source.load_filtered, "lakes", "div_id", div_ids),
"v_fp": ex.submit(source.load_filtered, "virtual_flowpaths", "virtual_fp_id", vfp_ids),
}
subset_fp = f["fp"].result()
subset_div = f["div"].result()
subset_wb = f["wb"].result()
subset_gages = f["gages"].result()
subset_ref_fp = f["ref_fp"].result()
subset_lakes = f["lakes"].result()
subset_v_fp = f["v_fp"].result()

# Derive nexus IDs from the single mainstem flowpath in this divide
# Collect ALL virtual_fp_ids from reference_flowpaths for this divide
# (not just the originating one), plus the originating vfp itself to
# ensure it's always included.
all_v_fp_ids = set(subset_ref_fp["virtual_fp_id"].to_list()) | {int(virtual_fp_id)}
subset_v_fp = source.load_filtered("virtual_flowpaths", "virtual_fp_id", all_v_fp_ids)

# Derive nexus IDs from the mainstem flowpath(s) in this divide
all_nex_ids = set()
if len(subset_fp) > 0:
all_nex_ids = set(
subset_fp.filter(pl.col("up_nex_id").is_not_null())["up_nex_id"].cast(pl.Int64).to_list()
+ subset_fp.filter(pl.col("dn_nex_id").is_not_null())["dn_nex_id"].cast(pl.Int64).to_list()
)

# Derive virtual_nexus from the virtual flowpath
# Derive virtual_nexus from ALL virtual flowpaths in this divide
all_v_nex_ids = set()
if len(subset_v_fp) > 0:
all_v_nex_ids = set(
subset_v_fp.filter(pl.col("dn_virtual_nex_id").is_not_null())["dn_virtual_nex_id"]
subset_v_fp.filter(pl.col("up_virtual_nex_id").is_not_null())["up_virtual_nex_id"]
.cast(pl.Int64)
.to_list()
+ subset_v_fp.filter(pl.col("dn_virtual_nex_id").is_not_null())["dn_virtual_nex_id"]
.cast(pl.Int64)
.to_list()
)
Expand All @@ -476,12 +484,14 @@ def generate_subset_virtual_only(
subset_nhd = source.load_filtered("nhd", "ref_id", all_ref_ids)

# Null downstream pointers (outlets)
subset_v_fp_ids = set(subset_v_fp["virtual_fp_id"].to_list())

subset_nex = subset_nex.with_columns(
pl.when(pl.col("dn_fp_id").is_in(div_ids)).then(pl.col("dn_fp_id")).otherwise(None).alias("dn_fp_id")
)

subset_v_nex = subset_v_nex.with_columns(
pl.when(pl.col("dn_virtual_fp_id").is_in(vfp_ids))
pl.when(pl.col("dn_virtual_fp_id").is_in(subset_v_fp_ids))
.then(pl.col("dn_virtual_fp_id"))
.otherwise(None)
.alias("dn_virtual_fp_id")
Expand Down
Loading