diff --git a/tools/clean_agg_metrics.py b/tools/clean_agg_metrics.py new file mode 100755 index 0000000..6d2062e --- /dev/null +++ b/tools/clean_agg_metrics.py @@ -0,0 +1,346 @@ +import argparse +import logging +import sys +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Tuple + +import fsspec +import pandas as pd + + +def setup_logging(log_file: str) -> logging.Logger: + """ + Set up logging configuration. + + Args: + log_file: Path to the log file + + Returns: + Configured logger instance + """ + # Create logger + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + console_handler.setFormatter(console_format) + + # File handler + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.INFO) + file_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(file_format) + + # Add handlers + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + return logger + + +def find_agg_metrics_files(s3_prefix: str, batch: str, aws_profile: str = "fimc-data") -> List[str]: + """ + Find all agg_metrics.csv files under the batch directory. + + Args: + s3_prefix: S3 prefix path (e.g., "s3://fimc-data/autoeval/batches/") + batch: Batch directory name (e.g., "fim100_huc12_5m_non_calibrated") + aws_profile: AWS profile to use for S3 access + + Returns: + List of S3 paths to agg_metrics.csv files + """ + # Construct full batch path + batch_path = f"{s3_prefix.rstrip('/')}/{batch}" + + # Initialize S3 filesystem + fs = fsspec.filesystem("s3", profile=aws_profile) + + agg_metrics_files = [] + + try: + # List all run directories under the batch + batch_items = fs.ls(batch_path, detail=True) + + for item in batch_items: + if item["type"] == "directory": + run_dir = item["name"] + run_name = run_dir.split("/")[-1] + + # Construct expected agg_metrics.csv path + agg_metrics_path = f"s3://{run_dir}/{run_name}__agg_metrics.csv" + + # Check if file exists + if fs.exists(agg_metrics_path.replace("s3://", "")): + agg_metrics_files.append(agg_metrics_path) + + except Exception as e: + raise RuntimeError(f"Error listing S3 directories in {batch_path}: {e}") + + return agg_metrics_files + + +def get_run_directory_name(file_path: str) -> str: + """ + Extract the run directory name from the file path. + + Args: + file_path: Full S3 path to the agg_metrics.csv file + + Returns: + Run directory name + """ + # Split path and get parent directory name + path_parts = file_path.rstrip("/").split("/") + # The run directory is the parent of the CSV file + return path_parts[-2] + + +def clean_agg_metrics_data(df: pd.DataFrame, run_dir_name: str, logger: logging.Logger) -> Tuple[pd.DataFrame, dict]: + """ + Clean the agg_metrics dataframe. + + Args: + df: Original dataframe + run_dir_name: Name of the run directory + logger: Logger instance + + Returns: + Tuple of (cleaned dataframe, statistics dictionary) + """ + stats = {"original_rows": len(df), "rows_removed_stac_mismatch": 0, "rows_removed_duplicates": 0, "final_rows": 0} + + # Step 1: Filter rows where stac_item_id matches run directory + if "stac_item_id" in df.columns: + valid_mask = df["stac_item_id"] == run_dir_name + invalid_rows = (~valid_mask).sum() + + if invalid_rows > 0: + logger.info(f" Removing {invalid_rows} rows with non-matching stac_item_id") + df = df[valid_mask].copy() + stats["rows_removed_stac_mismatch"] = invalid_rows + else: + logger.warning(" Column 'stac_item_id' not found in dataframe") + + # Step 2: Handle duplicates based on (collection_id, stac_item_id, scenario) + index_cols = ["collection_id", "stac_item_id", "scenario"] + + # Check if all index columns exist + missing_cols = [col for col in index_cols if col not in df.columns] + if missing_cols: + logger.warning(f" Missing index columns: {missing_cols}") + else: + # Find duplicates + duplicated_mask = df.duplicated(subset=index_cols, keep=False) + + if duplicated_mask.any(): + # Process duplicates + df_no_dups = df[~duplicated_mask].copy() + df_dups = df[duplicated_mask].copy() + + # For duplicates, calculate combined score and keep best + if "true_positives_count" in df.columns and "false_positives_count" in df.columns: + df_dups["combined_score"] = df_dups["true_positives_count"].fillna(0) + df_dups[ + "false_positives_count" + ].fillna(0) + + # Group by index columns and keep row with max combined score + idx_to_keep = df_dups.groupby(index_cols)["combined_score"].idxmax() + df_best = df_dups.loc[idx_to_keep].drop(columns=["combined_score"]) + + # Combine non-duplicates with best duplicates + df = pd.concat([df_no_dups, df_best], ignore_index=True) + + rows_removed = len(df_dups) - len(df_best) + if rows_removed > 0: + logger.info(f" Removed {rows_removed} duplicate rows") + stats["rows_removed_duplicates"] = rows_removed + else: + logger.warning(" Columns for scoring not found, keeping first duplicate") + df = df.drop_duplicates(subset=index_cols, keep="first") + + stats["final_rows"] = len(df) + return df, stats + + +def backup_file(file_path: str, fs: fsspec.AbstractFileSystem, logger: logging.Logger) -> bool: + """ + Create a backup of the original file. + + Args: + file_path: S3 path to the file + fs: Filesystem instance + logger: Logger instance + + Returns: + True if backup was successful + """ + backup_path = file_path.replace(".csv", ".csv.backup") + s3_path = file_path.replace("s3://", "") + s3_backup_path = backup_path.replace("s3://", "") + + try: + # Copy file to backup + fs.copy(s3_path, s3_backup_path) + logger.info(f" Created backup: {backup_path}") + return True + except Exception as e: + logger.error(f" Failed to create backup: {e}") + return False + + +def process_agg_metrics_file( + file_path: str, aws_profile: str, logger: logging.Logger, dry_run: bool = False +) -> Optional[dict]: + """ + Process a single agg_metrics.csv file. + + Args: + file_path: S3 path to the agg_metrics.csv file + aws_profile: AWS profile to use + logger: Logger instance + dry_run: If True, don't modify files + + Returns: + Statistics dictionary or None if processing failed + """ + logger.info(f"Processing: {file_path}") + + # Get run directory name + run_dir_name = get_run_directory_name(file_path) + logger.info(f" Run directory: {run_dir_name}") + + # Initialize filesystem + fs = fsspec.filesystem("s3", profile=aws_profile) + + try: + # Read the CSV file + with fsspec.open(file_path, "r", s3={"profile": aws_profile}) as f: + df = pd.read_csv(f) + + if df.empty: + logger.warning(" File is empty, skipping") + return None + + df_cleaned, stats = clean_agg_metrics_data(df, run_dir_name, logger) + + # Check if any changes were made + if stats["rows_removed_stac_mismatch"] > 0 or stats["rows_removed_duplicates"] > 0: + if not dry_run: + # Create backup + if not backup_file(file_path, fs, logger): + logger.error(" Skipping modification due to backup failure") + return None + + # Write cleaned data back + with fsspec.open(file_path, "w", s3={"profile": aws_profile}) as f: + df_cleaned.to_csv(f, index=False) + + logger.info(f"Modified: {stats['original_rows']} to {stats['final_rows']} rows") + else: + logger.info(f" [DRY RUN] Would modify: {stats['original_rows']} to {stats['final_rows']} rows") + + return stats + else: + logger.info(" No changes needed") + return None + + except Exception as e: + logger.error(f" Error processing file: {e}") + return None + + +def main(): + """ + Clean agg_metrics.csv files in S3 batch directories. + + This script: + 1. Finds all agg_metrics.csv files under specified S3 batch directories + 2. Validates that stac_item_id matches the parent run directory name + 3. Handles duplicates based on (collection_id, stac_item_id, scenario) index + 4. Creates backups before modifying files + 5. Logs all modifications + """ + + parser = argparse.ArgumentParser(description="Clean agg_metrics.csv files in S3 batch directories") + + parser.add_argument( + "--s3-prefix", + required=True, + help="S3 prefix path before batch directories (e.g., s3://fimc-data/autoeval/batches/)", + ) + + parser.add_argument("--batch", required=True, help="Batch directory name (e.g., fim100_huc12_5m_non_calibrated)") + + parser.add_argument( + "--log-file", default="agg_metrics_cleaning.log", help="Path to log file (default: agg_metrics_cleaning.log)" + ) + + parser.add_argument("--profile", default="fimbucket", help="AWS profile for S3 access (default: fimc-data)") + + parser.add_argument("--dry-run", action="store_true", help="Preview changes without modifying files") + + args = parser.parse_args() + + # Setup logging + logger = setup_logging(args.log_file) + + # Log start + logger.info("=" * 80) + logger.info(f"Starting agg_metrics cleaning - {datetime.now()}") + logger.info(f"S3 Prefix: {args.s3_prefix}") + logger.info(f"Batch: {args.batch}") + logger.info(f"Dry Run: {args.dry_run}") + logger.info("=" * 80) + + try: + # Find all agg_metrics.csv files + logger.info("Searching for agg_metrics.csv files...") + agg_files = find_agg_metrics_files(args.s3_prefix, args.batch, args.profile) + + if not agg_files: + logger.warning("No agg_metrics.csv files found") + return 0 + + logger.info(f"Found {len(agg_files)} agg_metrics.csv files") + + # Process each file + total_stats = {"files_processed": 0, "files_modified": 0, "total_rows_removed": 0} + + for file_path in agg_files: + stats = process_agg_metrics_file(file_path, args.profile, logger, args.dry_run) + + if stats: + total_stats["files_modified"] += 1 + total_stats["total_rows_removed"] += ( + stats["rows_removed_stac_mismatch"] + stats["rows_removed_duplicates"] + ) + + total_stats["files_processed"] += 1 + + # Log summary + logger.info("=" * 80) + logger.info("SUMMARY") + logger.info(f"Files processed: {total_stats['files_processed']}") + logger.info(f"Files modified: {total_stats['files_modified']}") + logger.info(f"Total rows removed: {total_stats['total_rows_removed']}") + + if args.dry_run: + logger.info("DRY RUN - No files were actually modified") + + logger.info(f"Log file: {args.log_file}") + logger.info("=" * 80) + + return 0 + + except Exception as e: + logger.error(f"Fatal error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/make_master_metrics.py b/tools/make_master_metrics.py new file mode 100755 index 0000000..61e5c8c --- /dev/null +++ b/tools/make_master_metrics.py @@ -0,0 +1,179 @@ +import argparse +import logging +import sys +from pathlib import Path +from typing import List, Optional + +import fsspec +import pandas as pd + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def get_stac_item_directories(output_root: str) -> List[tuple[str, str]]: + """ + Get all STAC item subdirectories from the output root. + + Returns: + List of tuples (stac_item_code, full_path) + """ + stac_item_dirs = [] + + if output_root.startswith("s3://"): + fs = fsspec.filesystem("s3", anon=False) + try: + items = fs.ls(output_root, detail=True) + for item in items: + if item["type"] == "directory": + stac_item_code = item["name"].split("/")[-1] + full_path = item["name"] + if not full_path.endswith("/"): + full_path += "/" + stac_item_dirs.append((stac_item_code, full_path)) + except Exception as e: + logger.error(f"Error listing S3 directories: {e}") + else: + # Local filesystem + output_path = Path(output_root) + if output_path.exists() and output_path.is_dir(): + for item in output_path.iterdir(): + if item.is_dir(): + stac_item_dirs.append((item.name, str(item))) + + return stac_item_dirs + + +def read_agg_metrics(agg_metrics_path: str) -> Optional[pd.DataFrame]: + """ + Read an agg_metrics.csv file. + + Args: + agg_metrics_path: Path to the agg_metrics.csv file + + Returns: + DataFrame or None if file doesn't exist or can't be read + """ + try: + with fsspec.open(agg_metrics_path, "r") as f: + # Specify dtype for columns that should remain as strings + # to preserve leading zeros in HUC codes + dtype_spec = { + "hucs": str, + "nws_lid": str, + "stac_item_id": str, + "scenario": str, + "flow": str, + "collection_id": str, + } + df = pd.read_csv(f, dtype=dtype_spec) + return df + except FileNotFoundError: + logger.warning(f"File not found: {agg_metrics_path}") + return None + except Exception as e: + logger.error(f"Error reading {agg_metrics_path}: {e}") + return None + + +def aggregate_metrics(output_root: str, calb: bool, hand_version: str, resolution: str) -> pd.DataFrame: + """ + Aggregate all agg_metrics.csv files from HUC subdirectories. + + Args: + output_root: Root directory containing HUC subdirectories + calb: Calibration flag (True/False) + hand_version: HAND version value + resolution: Resolution in meters + + Returns: + Combined DataFrame with all metrics + """ + all_metrics = [] + + stac_item_dirs = get_stac_item_directories(output_root) + + if not stac_item_dirs: + logger.warning(f"No subdirectories found in {output_root}") + return pd.DataFrame() + + logger.info(f"Found {len(stac_item_dirs)} STAC item directories") + + for stac_item_code, stac_item_path in stac_item_dirs: + if stac_item_path.startswith("s3://"): + agg_metrics_path = f"{stac_item_path.rstrip('/')}/{stac_item_code}__agg_metrics.csv" + elif output_root.startswith("s3://"): + # Handle case where stac_item_path doesn't have s3:// prefix but output_root does + agg_metrics_path = f"s3://{stac_item_path.rstrip('/')}/{stac_item_code}__agg_metrics.csv" + else: + agg_metrics_path = str(Path(stac_item_path) / f"{stac_item_code}__agg_metrics.csv") + + df = read_agg_metrics(agg_metrics_path) + + if df is not None and not df.empty: + # Add new columns + df["calibrated"] = "True" if calb else "False" + df["version"] = hand_version + df["resolution_m"] = resolution + df["extent_config"] = "COMP" + df["full_json_path"] = "null" + + all_metrics.append(df) + logger.info(f"Processed {len(df)} rows from STAC item {stac_item_code}") + else: + logger.warning(f"No valid data found for STAC item {stac_item_code}") + + if not all_metrics: + logger.warning("No valid agg_metrics.csv files were found") + return pd.DataFrame() + + # Combine all DataFrames + combined_df = pd.concat(all_metrics, ignore_index=True) + logger.info(f"Combined {len(combined_df)} total rows from {len(all_metrics)} STAC items") + + return combined_df + + +def main(): + parser = argparse.ArgumentParser( + description="Aggregate agg_metrics.csv files from multiple STAC items into a master_metrics.csv file" + ) + + parser.add_argument("output_root", help="Root directory containing STAC item subdirectories (can be S3 path)") + + parser.add_argument("--calb", action="store_true", help="Set calibration flag to True (default: False)") + + parser.add_argument("--hand-version", required=True, help="HAND version value to add to all rows") + + parser.add_argument("--resolution", required=True, help="Resolution in meters to add to all rows") + + args = parser.parse_args() + + # Clean up output root path + output_root = args.output_root.rstrip("/") + + logger.info(f"Starting aggregation from {output_root}") + logger.info(f"Parameters: calb={args.calb}, hand_version={args.hand_version}, resolution={args.resolution}") + + master_df = aggregate_metrics(output_root, args.calb, args.hand_version, args.resolution) + + if master_df.empty: + logger.error("No data to write to master_metrics.csv") + sys.exit(1) + + if output_root.startswith("s3://"): + master_metrics_path = f"{output_root}/master_metrics.csv" + else: + master_metrics_path = str(Path(output_root) / "master_metrics.csv") + + try: + with fsspec.open(master_metrics_path, "w") as f: + master_df.to_csv(f, index=False) + logger.info(f"Successfully wrote {len(master_df)} rows to {master_metrics_path}") + except Exception as e: + logger.error(f"Error writing master_metrics.csv: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main()