From 36229b0917125723a1785db330aceafc674d3172 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Thu, 28 Aug 2025 11:04:03 -0400 Subject: [PATCH 1/2] Add tools scripts to deal with metrics files clean_agg_metrics.py removes duplicate metrics rows. Duplicates get introduced occasionaly because of minor differences in the floating point values of metrics. Right now this script also removes metrics that aren't assocaiated with the stac item id implied by the aoi name in the directory. This can be undone in future if metrics from other stac items need to be included within a given stac items aoi make_master_metrics.py makes a master_metrics file from all the agg_metrics files in an eval batch --- tools/clean_agg_metrics.py | 385 +++++++++++++++++++++++++++++++++++ tools/make_master_metrics.py | 179 ++++++++++++++++ 2 files changed, 564 insertions(+) create mode 100755 tools/clean_agg_metrics.py create mode 100755 tools/make_master_metrics.py diff --git a/tools/clean_agg_metrics.py b/tools/clean_agg_metrics.py new file mode 100755 index 0000000..1a28004 --- /dev/null +++ b/tools/clean_agg_metrics.py @@ -0,0 +1,385 @@ +import argparse +import logging +import sys +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Tuple + +import fsspec +import pandas as pd + + +def setup_logging(log_file: str) -> logging.Logger: + """ + Set up logging configuration. + + Args: + log_file: Path to the log file + + Returns: + Configured logger instance + """ + # Create logger + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_handler.setFormatter(console_format) + + # File handler + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(logging.INFO) + file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + file_handler.setFormatter(file_format) + + # Add handlers + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + return logger + + +def find_agg_metrics_files(s3_prefix: str, batch: str, aws_profile: str = "fimc-data") -> List[str]: + """ + Find all agg_metrics.csv files under the batch directory. + + Args: + s3_prefix: S3 prefix path (e.g., "s3://fimc-data/autoeval/batches/") + batch: Batch directory name (e.g., "fim100_huc12_5m_non_calibrated") + aws_profile: AWS profile to use for S3 access + + Returns: + List of S3 paths to agg_metrics.csv files + """ + # Construct full batch path + batch_path = f"{s3_prefix.rstrip('/')}/{batch}" + + # Initialize S3 filesystem + fs = fsspec.filesystem("s3", profile=aws_profile) + + agg_metrics_files = [] + + try: + # List all run directories under the batch + batch_items = fs.ls(batch_path, detail=True) + + for item in batch_items: + if item["type"] == "directory": + run_dir = item["name"] + run_name = run_dir.split("/")[-1] + + # Construct expected agg_metrics.csv path + agg_metrics_path = f"s3://{run_dir}/{run_name}__agg_metrics.csv" + + # Check if file exists + if fs.exists(agg_metrics_path.replace("s3://", "")): + agg_metrics_files.append(agg_metrics_path) + + except Exception as e: + raise RuntimeError(f"Error listing S3 directories in {batch_path}: {e}") + + return agg_metrics_files + + +def get_run_directory_name(file_path: str) -> str: + """ + Extract the run directory name from the file path. + + Args: + file_path: Full S3 path to the agg_metrics.csv file + + Returns: + Run directory name + """ + # Split path and get parent directory name + path_parts = file_path.rstrip("/").split("/") + # The run directory is the parent of the CSV file + return path_parts[-2] + + +def clean_agg_metrics_data( + df: pd.DataFrame, + run_dir_name: str, + logger: logging.Logger +) -> Tuple[pd.DataFrame, dict]: + """ + Clean the agg_metrics dataframe. + + Args: + df: Original dataframe + run_dir_name: Name of the run directory + logger: Logger instance + + Returns: + Tuple of (cleaned dataframe, statistics dictionary) + """ + stats = { + "original_rows": len(df), + "rows_removed_stac_mismatch": 0, + "rows_removed_duplicates": 0, + "final_rows": 0 + } + + # Step 1: Filter rows where stac_item_id matches run directory + if "stac_item_id" in df.columns: + valid_mask = df["stac_item_id"] == run_dir_name + invalid_rows = (~valid_mask).sum() + + if invalid_rows > 0: + logger.info(f" Removing {invalid_rows} rows with non-matching stac_item_id") + df = df[valid_mask].copy() + stats["rows_removed_stac_mismatch"] = invalid_rows + else: + logger.warning(" Column 'stac_item_id' not found in dataframe") + + # Step 2: Handle duplicates based on (collection_id, stac_item_id, scenario) + index_cols = ["collection_id", "stac_item_id", "scenario"] + + # Check if all index columns exist + missing_cols = [col for col in index_cols if col not in df.columns] + if missing_cols: + logger.warning(f" Missing index columns: {missing_cols}") + else: + # Find duplicates + duplicated_mask = df.duplicated(subset=index_cols, keep=False) + + if duplicated_mask.any(): + # Process duplicates + df_no_dups = df[~duplicated_mask].copy() + df_dups = df[duplicated_mask].copy() + + # For duplicates, calculate combined score and keep best + if "true_positives_count" in df.columns and "false_positives_count" in df.columns: + df_dups["combined_score"] = ( + df_dups["true_positives_count"].fillna(0) + + df_dups["false_positives_count"].fillna(0) + ) + + # Group by index columns and keep row with max combined score + idx_to_keep = df_dups.groupby(index_cols)["combined_score"].idxmax() + df_best = df_dups.loc[idx_to_keep].drop(columns=["combined_score"]) + + # Combine non-duplicates with best duplicates + df = pd.concat([df_no_dups, df_best], ignore_index=True) + + rows_removed = len(df_dups) - len(df_best) + if rows_removed > 0: + logger.info(f" Removed {rows_removed} duplicate rows") + stats["rows_removed_duplicates"] = rows_removed + else: + logger.warning(" Columns for scoring not found, keeping first duplicate") + df = df.drop_duplicates(subset=index_cols, keep="first") + + stats["final_rows"] = len(df) + return df, stats + + +def backup_file(file_path: str, fs: fsspec.AbstractFileSystem, logger: logging.Logger) -> bool: + """ + Create a backup of the original file. + + Args: + file_path: S3 path to the file + fs: Filesystem instance + logger: Logger instance + + Returns: + True if backup was successful + """ + backup_path = file_path.replace(".csv", ".csv.backup") + s3_path = file_path.replace("s3://", "") + s3_backup_path = backup_path.replace("s3://", "") + + try: + # Copy file to backup + fs.copy(s3_path, s3_backup_path) + logger.info(f" Created backup: {backup_path}") + return True + except Exception as e: + logger.error(f" Failed to create backup: {e}") + return False + + +def process_agg_metrics_file( + file_path: str, + aws_profile: str, + logger: logging.Logger, + dry_run: bool = False +) -> Optional[dict]: + """ + Process a single agg_metrics.csv file. + + Args: + file_path: S3 path to the agg_metrics.csv file + aws_profile: AWS profile to use + logger: Logger instance + dry_run: If True, don't modify files + + Returns: + Statistics dictionary or None if processing failed + """ + logger.info(f"Processing: {file_path}") + + # Get run directory name + run_dir_name = get_run_directory_name(file_path) + logger.info(f" Run directory: {run_dir_name}") + + # Initialize filesystem + fs = fsspec.filesystem("s3", profile=aws_profile) + + try: + # Read the CSV file + with fsspec.open(file_path, "r", s3={"profile": aws_profile}) as f: + df = pd.read_csv(f) + + if df.empty: + logger.warning(" File is empty, skipping") + return None + + df_cleaned, stats = clean_agg_metrics_data(df, run_dir_name, logger) + + # Check if any changes were made + if stats["rows_removed_stac_mismatch"] > 0 or stats["rows_removed_duplicates"] > 0: + if not dry_run: + # Create backup + if not backup_file(file_path, fs, logger): + logger.error(" Skipping modification due to backup failure") + return None + + # Write cleaned data back + with fsspec.open(file_path, "w", s3={"profile": aws_profile}) as f: + df_cleaned.to_csv(f, index=False) + + logger.info(f"Modified: {stats['original_rows']} to {stats['final_rows']} rows") + else: + logger.info(f" [DRY RUN] Would modify: {stats['original_rows']} to {stats['final_rows']} rows") + + return stats + else: + logger.info(" No changes needed") + return None + + except Exception as e: + logger.error(f" Error processing file: {e}") + return None + + +def main(): + """ + Clean agg_metrics.csv files in S3 batch directories. + + This script: + 1. Finds all agg_metrics.csv files under specified S3 batch directories + 2. Validates that stac_item_id matches the parent run directory name + 3. Handles duplicates based on (collection_id, stac_item_id, scenario) index + 4. Creates backups before modifying files + 5. Logs all modifications + """ + + parser = argparse.ArgumentParser( + description="Clean agg_metrics.csv files in S3 batch directories" + ) + + parser.add_argument( + "--s3-prefix", + required=True, + help="S3 prefix path before batch directories (e.g., s3://fimc-data/autoeval/batches/)" + ) + + parser.add_argument( + "--batch", + required=True, + help="Batch directory name (e.g., fim100_huc12_5m_non_calibrated)" + ) + + parser.add_argument( + "--log-file", + default="agg_metrics_cleaning.log", + help="Path to log file (default: agg_metrics_cleaning.log)" + ) + + parser.add_argument( + "--profile", + default="fimbucket", + help="AWS profile for S3 access (default: fimc-data)" + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview changes without modifying files" + ) + + args = parser.parse_args() + + # Setup logging + logger = setup_logging(args.log_file) + + # Log start + logger.info("=" * 80) + logger.info(f"Starting agg_metrics cleaning - {datetime.now()}") + logger.info(f"S3 Prefix: {args.s3_prefix}") + logger.info(f"Batch: {args.batch}") + logger.info(f"Dry Run: {args.dry_run}") + logger.info("=" * 80) + + try: + # Find all agg_metrics.csv files + logger.info("Searching for agg_metrics.csv files...") + agg_files = find_agg_metrics_files(args.s3_prefix, args.batch, args.profile) + + if not agg_files: + logger.warning("No agg_metrics.csv files found") + return 0 + + logger.info(f"Found {len(agg_files)} agg_metrics.csv files") + + # Process each file + total_stats = { + "files_processed": 0, + "files_modified": 0, + "total_rows_removed": 0 + } + + for file_path in agg_files: + stats = process_agg_metrics_file( + file_path, + args.profile, + logger, + args.dry_run + ) + + if stats: + total_stats["files_modified"] += 1 + total_stats["total_rows_removed"] += ( + stats["rows_removed_stac_mismatch"] + + stats["rows_removed_duplicates"] + ) + + total_stats["files_processed"] += 1 + + # Log summary + logger.info("=" * 80) + logger.info("SUMMARY") + logger.info(f"Files processed: {total_stats['files_processed']}") + logger.info(f"Files modified: {total_stats['files_modified']}") + logger.info(f"Total rows removed: {total_stats['total_rows_removed']}") + + if args.dry_run: + logger.info("DRY RUN - No files were actually modified") + + logger.info(f"Log file: {args.log_file}") + logger.info("=" * 80) + + return 0 + + except Exception as e: + logger.error(f"Fatal error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tools/make_master_metrics.py b/tools/make_master_metrics.py new file mode 100755 index 0000000..50f2dc5 --- /dev/null +++ b/tools/make_master_metrics.py @@ -0,0 +1,179 @@ +import argparse +import logging +import sys +from pathlib import Path +from typing import List, Optional + +import fsspec +import pandas as pd + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def get_stac_item_directories(output_root: str) -> List[tuple[str, str]]: + """ + Get all STAC item subdirectories from the output root. + + Returns: + List of tuples (stac_item_code, full_path) + """ + stac_item_dirs = [] + + if output_root.startswith("s3://"): + fs = fsspec.filesystem("s3", anon=False) + try: + items = fs.ls(output_root, detail=True) + for item in items: + if item["type"] == "directory": + stac_item_code = item["name"].split("/")[-1] + full_path = item["name"] + if not full_path.endswith("/"): + full_path += "/" + stac_item_dirs.append((stac_item_code, full_path)) + except Exception as e: + logger.error(f"Error listing S3 directories: {e}") + else: + # Local filesystem + output_path = Path(output_root) + if output_path.exists() and output_path.is_dir(): + for item in output_path.iterdir(): + if item.is_dir(): + stac_item_dirs.append((item.name, str(item))) + + return stac_item_dirs + + +def read_agg_metrics(agg_metrics_path: str) -> Optional[pd.DataFrame]: + """ + Read an agg_metrics.csv file. + + Args: + agg_metrics_path: Path to the agg_metrics.csv file + + Returns: + DataFrame or None if file doesn't exist or can't be read + """ + try: + with fsspec.open(agg_metrics_path, "r") as f: + # Specify dtype for columns that should remain as strings + # to preserve leading zeros in HUC codes + dtype_spec = { + 'hucs': str, + 'nws_lid': str, + 'stac_item_id': str, + 'scenario': str, + 'flow': str, + 'collection_id': str + } + df = pd.read_csv(f, dtype=dtype_spec) + return df + except FileNotFoundError: + logger.warning(f"File not found: {agg_metrics_path}") + return None + except Exception as e: + logger.error(f"Error reading {agg_metrics_path}: {e}") + return None + + +def aggregate_metrics(output_root: str, calb: bool, hand_version: str, resolution: str) -> pd.DataFrame: + """ + Aggregate all agg_metrics.csv files from HUC subdirectories. + + Args: + output_root: Root directory containing HUC subdirectories + calb: Calibration flag (True/False) + hand_version: HAND version value + resolution: Resolution in meters + + Returns: + Combined DataFrame with all metrics + """ + all_metrics = [] + + stac_item_dirs = get_stac_item_directories(output_root) + + if not stac_item_dirs: + logger.warning(f"No subdirectories found in {output_root}") + return pd.DataFrame() + + logger.info(f"Found {len(stac_item_dirs)} STAC item directories") + + for stac_item_code, stac_item_path in stac_item_dirs: + if stac_item_path.startswith("s3://"): + agg_metrics_path = f"{stac_item_path.rstrip('/')}/{stac_item_code}__agg_metrics.csv" + elif output_root.startswith("s3://"): + # Handle case where stac_item_path doesn't have s3:// prefix but output_root does + agg_metrics_path = f"s3://{stac_item_path.rstrip('/')}/{stac_item_code}__agg_metrics.csv" + else: + agg_metrics_path = str(Path(stac_item_path) / f"{stac_item_code}__agg_metrics.csv") + + df = read_agg_metrics(agg_metrics_path) + + if df is not None and not df.empty: + # Add new columns + df["calibrated"] = "True" if calb else "False" + df["version"] = hand_version + df["resolution_m"] = resolution + df["extent_config"] = "COMP" + df["full_json_path"] = "null" + + all_metrics.append(df) + logger.info(f"Processed {len(df)} rows from STAC item {stac_item_code}") + else: + logger.warning(f"No valid data found for STAC item {stac_item_code}") + + if not all_metrics: + logger.warning("No valid agg_metrics.csv files were found") + return pd.DataFrame() + + # Combine all DataFrames + combined_df = pd.concat(all_metrics, ignore_index=True) + logger.info(f"Combined {len(combined_df)} total rows from {len(all_metrics)} STAC items") + + return combined_df + + +def main(): + parser = argparse.ArgumentParser( + description="Aggregate agg_metrics.csv files from multiple STAC items into a master_metrics.csv file" + ) + + parser.add_argument("output_root", help="Root directory containing STAC item subdirectories (can be S3 path)") + + parser.add_argument("--calb", action="store_true", help="Set calibration flag to True (default: False)") + + parser.add_argument("--hand-version", required=True, help="HAND version value to add to all rows") + + parser.add_argument("--resolution", required=True, help="Resolution in meters to add to all rows") + + args = parser.parse_args() + + # Clean up output root path + output_root = args.output_root.rstrip("/") + + logger.info(f"Starting aggregation from {output_root}") + logger.info(f"Parameters: calb={args.calb}, " f"hand_version={args.hand_version}, resolution={args.resolution}") + + master_df = aggregate_metrics(output_root, args.calb, args.hand_version, args.resolution) + + if master_df.empty: + logger.error("No data to write to master_metrics.csv") + sys.exit(1) + + if output_root.startswith("s3://"): + master_metrics_path = f"{output_root}/master_metrics.csv" + else: + master_metrics_path = str(Path(output_root) / "master_metrics.csv") + + try: + with fsspec.open(master_metrics_path, "w") as f: + master_df.to_csv(f, index=False) + logger.info(f"Successfully wrote {len(master_df)} rows to {master_metrics_path}") + except Exception as e: + logger.error(f"Error writing master_metrics.csv: {e}") + sys.exit(1) + + +if __name__ == "__main__": + main() From 7c2af771626784ec751133b708544ae3747307b8 Mon Sep 17 00:00:00 2001 From: Dylan Lee Date: Wed, 10 Sep 2025 15:25:44 -0400 Subject: [PATCH 2/2] Reformat line lengths --- tools/clean_agg_metrics.py | 203 ++++++++++++++--------------------- tools/make_master_metrics.py | 14 +-- 2 files changed, 89 insertions(+), 128 deletions(-) diff --git a/tools/clean_agg_metrics.py b/tools/clean_agg_metrics.py index 1a28004..6d2062e 100755 --- a/tools/clean_agg_metrics.py +++ b/tools/clean_agg_metrics.py @@ -12,85 +12,85 @@ def setup_logging(log_file: str) -> logging.Logger: """ Set up logging configuration. - + Args: log_file: Path to the log file - + Returns: Configured logger instance """ # Create logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) - + # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) - console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + console_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") console_handler.setFormatter(console_format) - + # File handler file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) - file_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + file_format = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") file_handler.setFormatter(file_format) - + # Add handlers logger.addHandler(console_handler) logger.addHandler(file_handler) - + return logger def find_agg_metrics_files(s3_prefix: str, batch: str, aws_profile: str = "fimc-data") -> List[str]: """ Find all agg_metrics.csv files under the batch directory. - + Args: s3_prefix: S3 prefix path (e.g., "s3://fimc-data/autoeval/batches/") batch: Batch directory name (e.g., "fim100_huc12_5m_non_calibrated") aws_profile: AWS profile to use for S3 access - + Returns: List of S3 paths to agg_metrics.csv files """ # Construct full batch path batch_path = f"{s3_prefix.rstrip('/')}/{batch}" - + # Initialize S3 filesystem fs = fsspec.filesystem("s3", profile=aws_profile) - + agg_metrics_files = [] - + try: # List all run directories under the batch batch_items = fs.ls(batch_path, detail=True) - + for item in batch_items: if item["type"] == "directory": run_dir = item["name"] run_name = run_dir.split("/")[-1] - + # Construct expected agg_metrics.csv path agg_metrics_path = f"s3://{run_dir}/{run_name}__agg_metrics.csv" - + # Check if file exists if fs.exists(agg_metrics_path.replace("s3://", "")): agg_metrics_files.append(agg_metrics_path) - + except Exception as e: raise RuntimeError(f"Error listing S3 directories in {batch_path}: {e}") - + return agg_metrics_files def get_run_directory_name(file_path: str) -> str: """ Extract the run directory name from the file path. - + Args: file_path: Full S3 path to the agg_metrics.csv file - + Returns: Run directory name """ @@ -100,44 +100,35 @@ def get_run_directory_name(file_path: str) -> str: return path_parts[-2] -def clean_agg_metrics_data( - df: pd.DataFrame, - run_dir_name: str, - logger: logging.Logger -) -> Tuple[pd.DataFrame, dict]: +def clean_agg_metrics_data(df: pd.DataFrame, run_dir_name: str, logger: logging.Logger) -> Tuple[pd.DataFrame, dict]: """ Clean the agg_metrics dataframe. - + Args: df: Original dataframe run_dir_name: Name of the run directory logger: Logger instance - + Returns: Tuple of (cleaned dataframe, statistics dictionary) """ - stats = { - "original_rows": len(df), - "rows_removed_stac_mismatch": 0, - "rows_removed_duplicates": 0, - "final_rows": 0 - } - + stats = {"original_rows": len(df), "rows_removed_stac_mismatch": 0, "rows_removed_duplicates": 0, "final_rows": 0} + # Step 1: Filter rows where stac_item_id matches run directory if "stac_item_id" in df.columns: valid_mask = df["stac_item_id"] == run_dir_name invalid_rows = (~valid_mask).sum() - + if invalid_rows > 0: logger.info(f" Removing {invalid_rows} rows with non-matching stac_item_id") df = df[valid_mask].copy() stats["rows_removed_stac_mismatch"] = invalid_rows else: logger.warning(" Column 'stac_item_id' not found in dataframe") - + # Step 2: Handle duplicates based on (collection_id, stac_item_id, scenario) index_cols = ["collection_id", "stac_item_id", "scenario"] - + # Check if all index columns exist missing_cols = [col for col in index_cols if col not in df.columns] if missing_cols: @@ -145,26 +136,25 @@ def clean_agg_metrics_data( else: # Find duplicates duplicated_mask = df.duplicated(subset=index_cols, keep=False) - + if duplicated_mask.any(): # Process duplicates df_no_dups = df[~duplicated_mask].copy() df_dups = df[duplicated_mask].copy() - + # For duplicates, calculate combined score and keep best if "true_positives_count" in df.columns and "false_positives_count" in df.columns: - df_dups["combined_score"] = ( - df_dups["true_positives_count"].fillna(0) + - df_dups["false_positives_count"].fillna(0) - ) - + df_dups["combined_score"] = df_dups["true_positives_count"].fillna(0) + df_dups[ + "false_positives_count" + ].fillna(0) + # Group by index columns and keep row with max combined score idx_to_keep = df_dups.groupby(index_cols)["combined_score"].idxmax() df_best = df_dups.loc[idx_to_keep].drop(columns=["combined_score"]) - + # Combine non-duplicates with best duplicates df = pd.concat([df_no_dups, df_best], ignore_index=True) - + rows_removed = len(df_dups) - len(df_best) if rows_removed > 0: logger.info(f" Removed {rows_removed} duplicate rows") @@ -172,7 +162,7 @@ def clean_agg_metrics_data( else: logger.warning(" Columns for scoring not found, keeping first duplicate") df = df.drop_duplicates(subset=index_cols, keep="first") - + stats["final_rows"] = len(df) return df, stats @@ -180,19 +170,19 @@ def clean_agg_metrics_data( def backup_file(file_path: str, fs: fsspec.AbstractFileSystem, logger: logging.Logger) -> bool: """ Create a backup of the original file. - + Args: file_path: S3 path to the file fs: Filesystem instance logger: Logger instance - + Returns: True if backup was successful """ backup_path = file_path.replace(".csv", ".csv.backup") s3_path = file_path.replace("s3://", "") s3_backup_path = backup_path.replace("s3://", "") - + try: # Copy file to backup fs.copy(s3_path, s3_backup_path) @@ -204,43 +194,40 @@ def backup_file(file_path: str, fs: fsspec.AbstractFileSystem, logger: logging.L def process_agg_metrics_file( - file_path: str, - aws_profile: str, - logger: logging.Logger, - dry_run: bool = False + file_path: str, aws_profile: str, logger: logging.Logger, dry_run: bool = False ) -> Optional[dict]: """ Process a single agg_metrics.csv file. - + Args: file_path: S3 path to the agg_metrics.csv file aws_profile: AWS profile to use logger: Logger instance dry_run: If True, don't modify files - + Returns: Statistics dictionary or None if processing failed """ logger.info(f"Processing: {file_path}") - + # Get run directory name run_dir_name = get_run_directory_name(file_path) logger.info(f" Run directory: {run_dir_name}") - + # Initialize filesystem fs = fsspec.filesystem("s3", profile=aws_profile) - + try: # Read the CSV file with fsspec.open(file_path, "r", s3={"profile": aws_profile}) as f: df = pd.read_csv(f) - + if df.empty: logger.warning(" File is empty, skipping") return None - + df_cleaned, stats = clean_agg_metrics_data(df, run_dir_name, logger) - + # Check if any changes were made if stats["rows_removed_stac_mismatch"] > 0 or stats["rows_removed_duplicates"] > 0: if not dry_run: @@ -248,20 +235,20 @@ def process_agg_metrics_file( if not backup_file(file_path, fs, logger): logger.error(" Skipping modification due to backup failure") return None - + # Write cleaned data back with fsspec.open(file_path, "w", s3={"profile": aws_profile}) as f: df_cleaned.to_csv(f, index=False) - + logger.info(f"Modified: {stats['original_rows']} to {stats['final_rows']} rows") else: logger.info(f" [DRY RUN] Would modify: {stats['original_rows']} to {stats['final_rows']} rows") - + return stats else: logger.info(" No changes needed") return None - + except Exception as e: logger.error(f" Error processing file: {e}") return None @@ -279,45 +266,29 @@ def main(): 5. Logs all modifications """ - parser = argparse.ArgumentParser( - description="Clean agg_metrics.csv files in S3 batch directories" - ) - + parser = argparse.ArgumentParser(description="Clean agg_metrics.csv files in S3 batch directories") + parser.add_argument( "--s3-prefix", required=True, - help="S3 prefix path before batch directories (e.g., s3://fimc-data/autoeval/batches/)" + help="S3 prefix path before batch directories (e.g., s3://fimc-data/autoeval/batches/)", ) - - parser.add_argument( - "--batch", - required=True, - help="Batch directory name (e.g., fim100_huc12_5m_non_calibrated)" - ) - - parser.add_argument( - "--log-file", - default="agg_metrics_cleaning.log", - help="Path to log file (default: agg_metrics_cleaning.log)" - ) - - parser.add_argument( - "--profile", - default="fimbucket", - help="AWS profile for S3 access (default: fimc-data)" - ) - + + parser.add_argument("--batch", required=True, help="Batch directory name (e.g., fim100_huc12_5m_non_calibrated)") + parser.add_argument( - "--dry-run", - action="store_true", - help="Preview changes without modifying files" + "--log-file", default="agg_metrics_cleaning.log", help="Path to log file (default: agg_metrics_cleaning.log)" ) - + + parser.add_argument("--profile", default="fimbucket", help="AWS profile for S3 access (default: fimc-data)") + + parser.add_argument("--dry-run", action="store_true", help="Preview changes without modifying files") + args = parser.parse_args() - + # Setup logging logger = setup_logging(args.log_file) - + # Log start logger.info("=" * 80) logger.info(f"Starting agg_metrics cleaning - {datetime.now()}") @@ -325,57 +296,47 @@ def main(): logger.info(f"Batch: {args.batch}") logger.info(f"Dry Run: {args.dry_run}") logger.info("=" * 80) - + try: # Find all agg_metrics.csv files logger.info("Searching for agg_metrics.csv files...") agg_files = find_agg_metrics_files(args.s3_prefix, args.batch, args.profile) - + if not agg_files: logger.warning("No agg_metrics.csv files found") return 0 - + logger.info(f"Found {len(agg_files)} agg_metrics.csv files") - + # Process each file - total_stats = { - "files_processed": 0, - "files_modified": 0, - "total_rows_removed": 0 - } - + total_stats = {"files_processed": 0, "files_modified": 0, "total_rows_removed": 0} + for file_path in agg_files: - stats = process_agg_metrics_file( - file_path, - args.profile, - logger, - args.dry_run - ) - + stats = process_agg_metrics_file(file_path, args.profile, logger, args.dry_run) + if stats: total_stats["files_modified"] += 1 total_stats["total_rows_removed"] += ( - stats["rows_removed_stac_mismatch"] + - stats["rows_removed_duplicates"] + stats["rows_removed_stac_mismatch"] + stats["rows_removed_duplicates"] ) - + total_stats["files_processed"] += 1 - + # Log summary logger.info("=" * 80) logger.info("SUMMARY") logger.info(f"Files processed: {total_stats['files_processed']}") logger.info(f"Files modified: {total_stats['files_modified']}") logger.info(f"Total rows removed: {total_stats['total_rows_removed']}") - + if args.dry_run: logger.info("DRY RUN - No files were actually modified") - + logger.info(f"Log file: {args.log_file}") logger.info("=" * 80) - + return 0 - + except Exception as e: logger.error(f"Fatal error: {e}") return 1 diff --git a/tools/make_master_metrics.py b/tools/make_master_metrics.py index 50f2dc5..61e5c8c 100755 --- a/tools/make_master_metrics.py +++ b/tools/make_master_metrics.py @@ -59,12 +59,12 @@ def read_agg_metrics(agg_metrics_path: str) -> Optional[pd.DataFrame]: # Specify dtype for columns that should remain as strings # to preserve leading zeros in HUC codes dtype_spec = { - 'hucs': str, - 'nws_lid': str, - 'stac_item_id': str, - 'scenario': str, - 'flow': str, - 'collection_id': str + "hucs": str, + "nws_lid": str, + "stac_item_id": str, + "scenario": str, + "flow": str, + "collection_id": str, } df = pd.read_csv(f, dtype=dtype_spec) return df @@ -153,7 +153,7 @@ def main(): output_root = args.output_root.rstrip("/") logger.info(f"Starting aggregation from {output_root}") - logger.info(f"Parameters: calb={args.calb}, " f"hand_version={args.hand_version}, resolution={args.resolution}") + logger.info(f"Parameters: calb={args.calb}, hand_version={args.hand_version}, resolution={args.resolution}") master_df = aggregate_metrics(output_root, args.calb, args.hand_version, args.resolution)