diff --git a/src/icefabric/hydrofabric/subset_nhf.py b/src/icefabric/hydrofabric/subset_nhf.py index 1291281..f425240 100644 --- a/src/icefabric/hydrofabric/subset_nhf.py +++ b/src/icefabric/hydrofabric/subset_nhf.py @@ -410,16 +410,17 @@ def generate_subset_virtual_only( ) -> dict[str, gpd.GeoDataFrame]: """Subset hydrofabric for a virtual-only gage (no main-network upstream). - Returns all layers that belong to the ``div_id`` the virtual flowpath - sits in. No graph traversal is performed because virtual-only flowpaths - are leaf nodes — there is no upstream virtual or main network. + Returns all layers and attributes that belong to the ``div_id`` the + virtual flowpath sits in — including ALL virtual flowpaths within the + divide, not just the originating one. No graph traversal is performed + because virtual-only flowpaths are leaf nodes — there is no upstream + virtual or main network. """ logger.debug(f"Virtual-only subset: virtual_fp_id={virtual_fp_id}, div_id={div_id}") crs = source.namespace.crs div_ids = {int(div_id)} - vfp_ids = {int(virtual_fp_id)} - # Wave 1: direct div_id / virtual_fp_id filters + # Wave 1: direct div_id filters (parallel) with ThreadPoolExecutor(max_workers=6) as ex: f = { "fp": ex.submit(source.load_filtered, "flowpaths", "div_id", div_ids), @@ -428,7 +429,6 @@ def generate_subset_virtual_only( "gages": ex.submit(source.load_filtered, "gages", "div_id", div_ids), "ref_fp": ex.submit(source.load_filtered, "reference_flowpaths", "div_id", div_ids), "lakes": ex.submit(source.load_filtered, "lakes", "div_id", div_ids), - "v_fp": ex.submit(source.load_filtered, "virtual_flowpaths", "virtual_fp_id", vfp_ids), } subset_fp = f["fp"].result() subset_div = f["div"].result() @@ -436,9 +436,14 @@ def generate_subset_virtual_only( subset_gages = f["gages"].result() subset_ref_fp = f["ref_fp"].result() subset_lakes = f["lakes"].result() - subset_v_fp = f["v_fp"].result() - # Derive nexus IDs from the single mainstem flowpath in this divide + # Collect ALL virtual_fp_ids from reference_flowpaths for this divide + # (not just the originating one), plus the originating vfp itself to + # ensure it's always included. + all_v_fp_ids = set(subset_ref_fp["virtual_fp_id"].to_list()) | {int(virtual_fp_id)} + subset_v_fp = source.load_filtered("virtual_flowpaths", "virtual_fp_id", all_v_fp_ids) + + # Derive nexus IDs from the mainstem flowpath(s) in this divide all_nex_ids = set() if len(subset_fp) > 0: all_nex_ids = set( @@ -446,11 +451,14 @@ def generate_subset_virtual_only( + subset_fp.filter(pl.col("dn_nex_id").is_not_null())["dn_nex_id"].cast(pl.Int64).to_list() ) - # Derive virtual_nexus from the virtual flowpath + # Derive virtual_nexus from ALL virtual flowpaths in this divide all_v_nex_ids = set() if len(subset_v_fp) > 0: all_v_nex_ids = set( - subset_v_fp.filter(pl.col("dn_virtual_nex_id").is_not_null())["dn_virtual_nex_id"] + subset_v_fp.filter(pl.col("up_virtual_nex_id").is_not_null())["up_virtual_nex_id"] + .cast(pl.Int64) + .to_list() + + subset_v_fp.filter(pl.col("dn_virtual_nex_id").is_not_null())["dn_virtual_nex_id"] .cast(pl.Int64) .to_list() ) @@ -476,12 +484,14 @@ def generate_subset_virtual_only( subset_nhd = source.load_filtered("nhd", "ref_id", all_ref_ids) # Null downstream pointers (outlets) + subset_v_fp_ids = set(subset_v_fp["virtual_fp_id"].to_list()) + subset_nex = subset_nex.with_columns( pl.when(pl.col("dn_fp_id").is_in(div_ids)).then(pl.col("dn_fp_id")).otherwise(None).alias("dn_fp_id") ) subset_v_nex = subset_v_nex.with_columns( - pl.when(pl.col("dn_virtual_fp_id").is_in(vfp_ids)) + pl.when(pl.col("dn_virtual_fp_id").is_in(subset_v_fp_ids)) .then(pl.col("dn_virtual_fp_id")) .otherwise(None) .alias("dn_virtual_fp_id")