Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: all format test install download upload docker documentation data calibrate publish-local-area clean build paper clean-paper presentations database database-refresh promote-database promote-dataset
.PHONY: all format test install download upload docker documentation data validate-data calibrate publish-local-area clean build paper clean-paper presentations database database-refresh promote-database promote-dataset

HF_CLONE_DIR ?= $(HOME)/huggingface/policyengine-us-data

Expand All @@ -25,7 +25,7 @@ upload:

docker:
docker buildx build --platform linux/amd64 . -t policyengine-us-data:latest

documentation:
cd docs && \
rm -rf _build .jupyter_cache && \
Expand Down Expand Up @@ -101,6 +101,9 @@ calibrate: data
publish-local-area:
python policyengine_us_data/datasets/cps/local_area_calibration/publish_local_area.py

validate-data:
python -c "from policyengine_us_data.storage.upload_completed_datasets import validate_all_datasets; validate_all_datasets()"

clean:
rm -f policyengine_us_data/storage/*.h5
rm -f policyengine_us_data/storage/*.db
Expand Down
1 change: 1 addition & 0 deletions changelog.d/add-dataset-sanity-tests.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hardened data pipeline against corrupted dataset uploads: pre-upload validation gate, post-generation assertions in enhanced CPS and sparse builders, CI workflow safety guards, file size checks, and comprehensive sanity tests for all dataset variants (5 layers of defense).
202 changes: 202 additions & 0 deletions oregon_ctc_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
"""
Oregon Child Tax Credit Analysis by State Senate District

Calculates the impact of doubling Oregon's Young Child Tax Credit (or_ctc)
by State Legislative District Upper (SLDU) - i.e., State Senate districts.
"""

import numpy as np
import pandas as pd
from pathlib import Path
from policyengine_us import Microsimulation
from policyengine_core.reforms import Reform

# Local imports
from policyengine_us_data.datasets.cps.local_area_calibration.block_assignment import (
assign_geography_for_cd,
load_block_crosswalk,
)
from policyengine_us_data.storage import STORAGE_FOLDER

# Oregon congressional districts (119th Congress)
# Oregon has 6 CDs, geoid format: state_fips * 100 + district
# Oregon FIPS = 41, so: 4101, 4102, 4103, 4104, 4105, 4106
OREGON_CD_GEOIDS = [4101, 4102, 4103, 4104, 4105, 4106]


def load_district_data(cd_geoid: int) -> dict:
"""Load household data from a district H5 file."""
h5_path = STORAGE_FOLDER / "districts" / f"OR-{cd_geoid % 100:02d}.h5"
if not h5_path.exists():
raise FileNotFoundError(f"District file not found: {h5_path}")

import h5py

data = {}
with h5py.File(h5_path, "r") as f:
# Get key variables we need
for var in [
"household_weight",
"household_id",
"person_id",
"age",
"is_tax_unit_head",
"tax_unit_id",
]:
if var in f:
# Handle year dimension if present
arr = f[var][:]
if len(arr.shape) > 1:
arr = arr[:, 0] # Take first year
data[var] = arr
return data


def run_oregon_ctc_analysis():
"""Run the Oregon CTC analysis by state senate district."""
print("=" * 60)
print("Oregon Child Tax Credit Analysis by State Senate District")
print("=" * 60)

# Load block crosswalk for SLDU lookups
print("\nLoading block crosswalk...")
crosswalk = load_block_crosswalk()
oregon_blocks = crosswalk[crosswalk["block_geoid"].str[:2] == "41"]
print(f" Oregon blocks: {len(oregon_blocks):,}")
print(f" Unique SLDUs: {oregon_blocks['sldu'].nunique()}")

# Results accumulator
results_by_sldu = {}

print("\nProcessing Oregon congressional districts...")

for cd_geoid in OREGON_CD_GEOIDS:
cd_name = f"OR-{cd_geoid % 100:02d}"
print(f"\n Processing {cd_name}...")

# Load district data
h5_path = STORAGE_FOLDER / "districts" / f"{cd_name}.h5"
if not h5_path.exists():
print(f" Skipping - file not found")
continue

# Run microsimulation for this district
# Baseline
baseline = Microsimulation(dataset=str(h5_path))
baseline_ctc = baseline.calculate("or_ctc", 2024)
baseline_weights = baseline.calculate("household_weight", 2024)

# Reform: double the OR CTC max amounts
# or_young_child_tax_credit_max is the parameter
def double_or_ctc(parameters):
# Double the max credit amount
or_ctc = parameters.gov.states.or_.tax.income.credits.ctc
or_ctc.amount.update(
start=pd.Timestamp("2024-01-01"),
stop=pd.Timestamp("2100-12-31"),
value=or_ctc.amount("2024-01-01") * 2,
)
return parameters

class DoubleORCTC(Reform):
def apply(self):
self.modify_parameters(double_or_ctc)

reform = Microsimulation(dataset=str(h5_path), reform=DoubleORCTC)
reform_ctc = reform.calculate("or_ctc", 2024)

# Get number of households for block assignment
n_households = len(baseline_weights)
print(f" Households: {n_households:,}")

# Assign blocks and get SLDU for each household
geo = assign_geography_for_cd(
cd_geoid=str(cd_geoid),
n_households=n_households,
seed=cd_geoid, # Reproducible
)

sldu_assignments = geo["sldu"]

# Calculate impact per household
impact = reform_ctc - baseline_ctc

# Aggregate by SLDU
unique_sldus = np.unique(sldu_assignments[sldu_assignments != ""])

for sldu in unique_sldus:
mask = sldu_assignments == sldu
sldu_impact = np.sum(impact[mask] * baseline_weights[mask])
sldu_baseline = np.sum(baseline_ctc[mask] * baseline_weights[mask])
sldu_reform = np.sum(reform_ctc[mask] * baseline_weights[mask])
sldu_hh = np.sum(mask)
sldu_weighted_hh = np.sum(baseline_weights[mask])

if sldu not in results_by_sldu:
results_by_sldu[sldu] = {
"baseline_ctc": 0,
"reform_ctc": 0,
"impact": 0,
"households": 0,
"weighted_households": 0,
}

results_by_sldu[sldu]["baseline_ctc"] += sldu_baseline
results_by_sldu[sldu]["reform_ctc"] += sldu_reform
results_by_sldu[sldu]["impact"] += sldu_impact
results_by_sldu[sldu]["households"] += sldu_hh
results_by_sldu[sldu]["weighted_households"] += sldu_weighted_hh

# Create results DataFrame
print("\n" + "=" * 60)
print("RESULTS: Impact of Doubling Oregon CTC by State Senate District")
print("=" * 60)

df = pd.DataFrame.from_dict(results_by_sldu, orient="index")
df.index.name = "sldu"
df = df.reset_index()

# Convert to millions
df["baseline_ctc_millions"] = df["baseline_ctc"] / 1e6
df["reform_ctc_millions"] = df["reform_ctc"] / 1e6
df["impact_millions"] = df["impact"] / 1e6

# Sort by impact
df = df.sort_values("impact_millions", ascending=False)

# Display results
print(
f"\n{'SLDU':<8} {'Baseline':>12} {'Reform':>12} {'Impact':>12} {'Households':>12}"
)
print(f"{'':8} {'($M)':>12} {'($M)':>12} {'($M)':>12} {'(weighted)':>12}")
print("-" * 60)

for _, row in df.iterrows():
print(
f"{row['sldu']:<8} "
f"{row['baseline_ctc_millions']:>12.2f} "
f"{row['reform_ctc_millions']:>12.2f} "
f"{row['impact_millions']:>12.2f} "
f"{row['weighted_households']:>12,.0f}"
)

print("-" * 60)
total_baseline = df["baseline_ctc_millions"].sum()
total_reform = df["reform_ctc_millions"].sum()
total_impact = df["impact_millions"].sum()
total_hh = df["weighted_households"].sum()
print(
f"{'TOTAL':<8} {total_baseline:>12.2f} {total_reform:>12.2f} "
f"{total_impact:>12.2f} {total_hh:>12,.0f}"
)

# Save to CSV
output_path = Path("oregon_ctc_by_sldu.csv")
df.to_csv(output_path, index=False)
print(f"\nResults saved to: {output_path}")

return df


if __name__ == "__main__":
run_oregon_ctc_analysis()
25 changes: 25 additions & 0 deletions policyengine_us_data/datasets/cps/enhanced_cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ def generate(self):
)
data["household_weight"][year] = optimised_weights

# Validate dense weights
w = optimised_weights
if np.any(np.isnan(w)):
raise ValueError(
f"Year {year}: household_weight contains NaN values"
)
if np.any(w < 0):
raise ValueError(
f"Year {year}: household_weight contains negative values"
)
weighted_hh_count = float(np.sum(w))
if not (1e8 <= weighted_hh_count <= 2e8):
raise ValueError(
f"Year {year}: weighted household count "
f"{weighted_hh_count:,.0f} outside expected range "
f"[100M, 200M]"
)
logging.info(
f"Year {year}: weights validated — "
f"{weighted_hh_count:,.0f} weighted households, "
f"{int(np.sum(w > 0))} non-zero"
)

logging.info("Post-generation weight validation passed")

self.save_dataset(data)


Expand Down
52 changes: 51 additions & 1 deletion policyengine_us_data/datasets/cps/small_enhanced_cps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import pandas as pd
import numpy as np
import h5py
Expand All @@ -17,6 +18,19 @@ def create_small_ecps():
)
simulation.subsample(1_000)

# Basic validation that subsample has reasonable data
weights = simulation.calculate("household_weight").values
if np.all(weights == 0):
raise ValueError(
"create_small_ecps: all household weights are zero "
"after subsample"
)
logging.info(
f"create_small_ecps: subsample has "
f"{len(weights)} households, "
f"{int(np.sum(weights > 0))} with non-zero weight"
)

data = {}
for variable in simulation.tax_benefit_system.variables:
data[variable] = {}
Expand Down Expand Up @@ -75,6 +89,16 @@ def create_sparse_ecps():
h_ids = h_ids[h_weights > 0]
h_weights = h_weights[h_weights > 0]

if len(h_ids) < 1000:
raise ValueError(
f"create_sparse_ecps: only {len(h_ids)} households with "
f"non-zero weight (expected > 1000)"
)
logging.info(
f"create_sparse_ecps: {len(h_ids)} households after "
f"zero-weight filtering"
)

subset_df = df[df[df_household_id_column].isin(h_ids)].copy()

# Update the dataset and rebuild the simulation
Expand Down Expand Up @@ -104,12 +128,38 @@ def create_sparse_ecps():
if len(data[variable]) == 0:
del data[variable]

with h5py.File(STORAGE_FOLDER / "sparse_enhanced_cps_2024.h5", "w") as f:
# Validate critical variables exist before writing
critical_vars = [
"household_weight",
"employment_income",
"household_id",
"person_id",
]
missing = [v for v in critical_vars if v not in data]
if missing:
raise ValueError(
f"create_sparse_ecps: missing critical variables: {missing}"
)
logging.info(f"create_sparse_ecps: data dict has {len(data)} variables")

output_path = STORAGE_FOLDER / "sparse_enhanced_cps_2024.h5"
with h5py.File(output_path, "w") as f:
for variable, periods in data.items():
grp = f.create_group(variable)
for period, values in periods.items():
grp.create_dataset(str(period), data=values)

file_size = os.path.getsize(output_path)
if file_size < 1_000_000:
raise ValueError(
f"create_sparse_ecps: output file only {file_size:,} bytes "
f"(expected > 1MB)"
)
logging.info(
f"create_sparse_ecps: wrote {file_size / 1e6:.1f}MB to "
f"{output_path}"
)


if __name__ == "__main__":
create_small_ecps()
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[{"state_fips": "01", "state_abbrev": "AL", "income_tax_collections": 5881000000}, {"state_fips": "02", "state_abbrev": "AK", "income_tax_collections": 0}, {"state_fips": "04", "state_abbrev": "AZ", "income_tax_collections": 5424000000}, {"state_fips": "05", "state_abbrev": "AR", "income_tax_collections": 4352000000}, {"state_fips": "06", "state_abbrev": "CA", "income_tax_collections": 115845000000}, {"state_fips": "08", "state_abbrev": "CO", "income_tax_collections": 13671000000}, {"state_fips": "09", "state_abbrev": "CT", "income_tax_collections": 10716000000}, {"state_fips": "10", "state_abbrev": "DE", "income_tax_collections": 1747000000}, {"state_fips": "11", "state_abbrev": "DC", "income_tax_collections": 3456000000}, {"state_fips": "12", "state_abbrev": "FL", "income_tax_collections": 0}, {"state_fips": "13", "state_abbrev": "GA", "income_tax_collections": 15297000000}, {"state_fips": "15", "state_abbrev": "HI", "income_tax_collections": 2725000000}, {"state_fips": "16", "state_abbrev": "ID", "income_tax_collections": 2593000000}, {"state_fips": "17", "state_abbrev": "IL", "income_tax_collections": 21453000000}, {"state_fips": "18", "state_abbrev": "IN", "income_tax_collections": 8098000000}, {"state_fips": "19", "state_abbrev": "IA", "income_tax_collections": 5243000000}, {"state_fips": "20", "state_abbrev": "KS", "income_tax_collections": 4304000000}, {"state_fips": "21", "state_abbrev": "KY", "income_tax_collections": 6163000000}, {"state_fips": "22", "state_abbrev": "LA", "income_tax_collections": 4088000000}, {"state_fips": "23", "state_abbrev": "ME", "income_tax_collections": 2246000000}, {"state_fips": "24", "state_abbrev": "MD", "income_tax_collections": 11635000000}, {"state_fips": "25", "state_abbrev": "MA", "income_tax_collections": 18645000000}, {"state_fips": "26", "state_abbrev": "MI", "income_tax_collections": 12139000000}, {"state_fips": "27", "state_abbrev": "MN", "income_tax_collections": 14239000000}, {"state_fips": "28", "state_abbrev": "MS", "income_tax_collections": 2477000000}, {"state_fips": "29", "state_abbrev": "MO", "income_tax_collections": 9006000000}, {"state_fips": "30", "state_abbrev": "MT", "income_tax_collections": 1718000000}, {"state_fips": "31", "state_abbrev": "NE", "income_tax_collections": 3248000000}, {"state_fips": "32", "state_abbrev": "NV", "income_tax_collections": 0}, {"state_fips": "33", "state_abbrev": "NH", "income_tax_collections": 0}, {"state_fips": "34", "state_abbrev": "NJ", "income_tax_collections": 17947000000}, {"state_fips": "35", "state_abbrev": "NM", "income_tax_collections": 2224000000}, {"state_fips": "36", "state_abbrev": "NY", "income_tax_collections": 63247000000}, {"state_fips": "37", "state_abbrev": "NC", "income_tax_collections": 17171000000}, {"state_fips": "38", "state_abbrev": "ND", "income_tax_collections": 534000000}, {"state_fips": "39", "state_abbrev": "OH", "income_tax_collections": 9520000000}, {"state_fips": "40", "state_abbrev": "OK", "income_tax_collections": 4253000000}, {"state_fips": "41", "state_abbrev": "OR", "income_tax_collections": 11583000000}, {"state_fips": "42", "state_abbrev": "PA", "income_tax_collections": 16898000000}, {"state_fips": "44", "state_abbrev": "RI", "income_tax_collections": 1739000000}, {"state_fips": "45", "state_abbrev": "SC", "income_tax_collections": 6367000000}, {"state_fips": "46", "state_abbrev": "SD", "income_tax_collections": 0}, {"state_fips": "47", "state_abbrev": "TN", "income_tax_collections": 0}, {"state_fips": "48", "state_abbrev": "TX", "income_tax_collections": 0}, {"state_fips": "49", "state_abbrev": "UT", "income_tax_collections": 5464000000}, {"state_fips": "50", "state_abbrev": "VT", "income_tax_collections": 1035000000}, {"state_fips": "51", "state_abbrev": "VA", "income_tax_collections": 17934000000}, {"state_fips": "53", "state_abbrev": "WA", "income_tax_collections": 0}, {"state_fips": "54", "state_abbrev": "WV", "income_tax_collections": 2163000000}, {"state_fips": "55", "state_abbrev": "WI", "income_tax_collections": 10396000000}, {"state_fips": "56", "state_abbrev": "WY", "income_tax_collections": 0}]
Loading
Loading