diff --git a/assess_copilot_repos.py b/assess_copilot_repos.py index 6ec155f..77f08ea 100644 --- a/assess_copilot_repos.py +++ b/assess_copilot_repos.py @@ -36,6 +36,10 @@ # Directories to check in .github folder 'copilot_dirs': ['prompts', 'instructions', 'agents', 'collections', 'scripts', 'skills', 'hooks'], + # Additional Copilot-related files to check + 'copilot_files': ['copilot-instructions.md', 'AGENTS.md', '.copilotignore'], + 'copilot_nested_files': ['.github/copilot/mcp.json'], + # Performance settings 'max_workers_fetch': 10, # Parallel workers for fetching repos 'max_workers_check': 15, # Parallel workers for checking directories @@ -236,24 +240,73 @@ def check_repo_copilot(repo): return result +def check_copilot_files(repo): + """Check for additional Copilot-related files""" + repo_name = repo['nameWithOwner'] + results = {} + + # Check .github/copilot-instructions.md + copilot_instructions = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.github/copilot-instructions.md" + ) + results['has_copilot_instructions'] = copilot_instructions is not None and 'message' not in (copilot_instructions if isinstance(copilot_instructions, dict) else {}) + + # Check AGENTS.md (root level) + agents_md = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/contents/AGENTS.md" + ) + results['has_agents_md'] = agents_md is not None and 'message' not in (agents_md if isinstance(agents_md, dict) else {}) + + # Check .copilotignore (root level) + copilotignore = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.copilotignore" + ) + results['has_copilotignore'] = copilotignore is not None and 'message' not in (copilotignore if isinstance(copilotignore, dict) else {}) + + # Check .github/copilot/mcp.json + mcp_config = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.github/copilot/mcp.json" + ) + results['has_mcp_config'] = mcp_config is not None and 'message' not in (mcp_config if isinstance(mcp_config, dict) else {}) + + return results + + def check_all_repositories(repos): """Check all repositories for Copilot directories in parallel""" - log("\nChecking Copilot directories (parallel execution)...") + log("\nChecking Copilot directories and files (parallel execution)...") results = [] total = len(repos) with ThreadPoolExecutor(max_workers=CONFIG['max_workers_check']) as executor: - future_to_repo = {executor.submit(check_repo_copilot, repo): repo for repo in repos} + # Submit directory checks + dir_futures = {executor.submit(check_repo_copilot, repo): repo for repo in repos} + # Submit file checks + file_futures = {executor.submit(check_copilot_files, repo): repo for repo in repos} + + # Collect directory check results + dir_results = {} + for future in as_completed(dir_futures): + repo = dir_futures[future] + dir_results[repo['nameWithOwner']] = future.result() + # Collect file check results + file_results = {} completed = 0 - for future in as_completed(future_to_repo): - result = future.result() - results.append(result) + for future in as_completed(file_futures): + repo = file_futures[future] + file_results[repo['nameWithOwner']] = future.result() completed += 1 - - # Progress indicator log(f"⚔ Progress: {completed}/{total} repositories checked ({(completed/total*100):.0f}%)", verbose_only=True) + + # Merge results + for repo in repos: + name = repo['nameWithOwner'] + dir_result = dir_results.get(name, {}) + file_result = file_results.get(name, {}) + dir_result['files'] = file_result + results.append(dir_result) # Sort by name for consistent output results.sort(key=lambda x: x['name']) @@ -271,6 +324,7 @@ def export_to_csv(results, repos): repo_name = result['name'] owner = repo_name.split('/')[0] is_org = owner != CONFIG['personal_account'] + files = result.get('files', {}) row = { 'Repository': repo_name, @@ -284,7 +338,30 @@ def export_to_csv(results, repos): for folder in CONFIG['copilot_dirs']: row[f'Has {folder}/'] = 'Yes' if result['folders'][folder] else 'No' + # Add columns for copilot files + row['Has copilot-instructions.md'] = 'Yes' if files.get('has_copilot_instructions') else 'No' + row['Has AGENTS.md'] = 'Yes' if files.get('has_agents_md') else 'No' + row['Has .copilotignore'] = 'Yes' if files.get('has_copilotignore') else 'No' + row['Has MCP Config'] = 'Yes' if files.get('has_mcp_config') else 'No' + row['Has Copilot Directories'] = 'Yes' if any(result['folders'].values()) else 'No' + + # Recommendations + recommendations = [] + if not result['has_github_dir']: + recommendations.append('Create .github directory') + if not any(result['folders'].values()): + recommendations.append('Add Copilot directories (prompts, instructions, agents)') + if not files.get('has_copilot_instructions'): + recommendations.append('Add .github/copilot-instructions.md') + if not files.get('has_agents_md'): + recommendations.append('Add AGENTS.md') + if not files.get('has_copilotignore'): + recommendations.append('Add .copilotignore') + if not files.get('has_mcp_config'): + recommendations.append('Add .github/copilot/mcp.json') + + row['Recommendations'] = '; '.join(recommendations) if recommendations else 'None' row['Error'] = result['error'] if result['error'] else 'None' export_data.append(row) @@ -320,6 +397,12 @@ def print_summary(results, repos, fetch_time, check_time, total_time): repos_with_copilot = sum(1 for r in results if any(r['folders'].values())) repos_with_errors = sum(1 for r in results if r['error']) + # File check stats + repos_with_copilot_instructions = sum(1 for r in results if r.get('files', {}).get('has_copilot_instructions')) + repos_with_agents_md = sum(1 for r in results if r.get('files', {}).get('has_agents_md')) + repos_with_copilotignore = sum(1 for r in results if r.get('files', {}).get('has_copilotignore')) + repos_with_mcp_config = sum(1 for r in results if r.get('files', {}).get('has_mcp_config')) + log("\n" + "=" * 80) log("SUMMARY") log("=" * 80) @@ -328,6 +411,12 @@ def print_summary(results, repos, fetch_time, check_time, total_time): log(f"Repositories with Copilot directories: {repos_with_copilot}") log(f"Repositories with errors: {repos_with_errors}") + log(f"\nšŸ“„ COPILOT FILE ADOPTION:") + log(f" copilot-instructions.md: {repos_with_copilot_instructions}/{repos_checked} ({(repos_with_copilot_instructions/max(repos_checked,1)*100):.0f}%)") + log(f" AGENTS.md: {repos_with_agents_md}/{repos_checked} ({(repos_with_agents_md/max(repos_checked,1)*100):.0f}%)") + log(f" .copilotignore: {repos_with_copilotignore}/{repos_checked} ({(repos_with_copilotignore/max(repos_checked,1)*100):.0f}%)") + log(f" MCP config: {repos_with_mcp_config}/{repos_checked} ({(repos_with_mcp_config/max(repos_checked,1)*100):.0f}%)") + # Show final rate limit status for enterprise monitoring if CONFIG['enable_rate_limit_check']: final_limit = check_rate_limit() diff --git a/idp_assessment.py b/idp_assessment.py index b79ed1f..47b734a 100644 --- a/idp_assessment.py +++ b/idp_assessment.py @@ -304,6 +304,197 @@ def check_org_token_security(org_login): except Exception as e: return {'error': str(e)} +def check_org_rulesets(org_login): + """Check organization-level rulesets""" + try: + rulesets = run_gh_command(f"{CONFIG['gh_command']} api orgs/{org_login}/rulesets") + + if rulesets is None or not isinstance(rulesets, list): + return {'total_rulesets': 0, 'active_rulesets': 0, 'has_pr_requirement': False, 'error': None} + + total = len(rulesets) + active = sum(1 for r in rulesets if r.get('enforcement') == 'active') + + # Check if any ruleset enforces PR reviews + has_pr_requirement = False + for ruleset in rulesets: + ruleset_id = ruleset.get('id') + if ruleset_id and ruleset.get('enforcement') == 'active': + detail = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/rulesets/{ruleset_id}" + ) + if detail and isinstance(detail, dict): + rules = detail.get('rules', []) + for rule in rules: + if rule.get('type') in ['required_pull_request', 'pull_request']: + has_pr_requirement = True + break + if has_pr_requirement: + break + + return { + 'total_rulesets': total, + 'active_rulesets': active, + 'has_pr_requirement': has_pr_requirement, + 'error': None + } + except Exception as e: + return {'total_rulesets': 0, 'active_rulesets': 0, 'has_pr_requirement': False, 'error': str(e)} + +def check_outside_collaborators(org_login): + """Audit outside collaborators in the organization""" + try: + collabs = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/outside_collaborators --paginate" + ) + + if collabs is None: + return {'total': 0, 'without_2fa': 0, 'error': 'Not accessible or no permission'} + + if not isinstance(collabs, list): + return {'total': 0, 'without_2fa': 0, 'error': None} + + total = len(collabs) + + # Check collaborators without 2FA + collabs_no_2fa = run_gh_command( + f"{CONFIG['gh_command']} api 'orgs/{org_login}/outside_collaborators?filter=2fa_disabled' --paginate" + ) + + without_2fa = len(collabs_no_2fa) if isinstance(collabs_no_2fa, list) else 0 + + return { + 'total': total, + 'without_2fa': without_2fa, + 'error': None + } + except Exception as e: + return {'total': 0, 'without_2fa': 0, 'error': str(e)} + +def check_org_actions_permissions(org_login): + """Check organization-level GitHub Actions permissions""" + try: + perms = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/actions/permissions" + ) + + if perms is None: + return {'error': 'Not accessible'} + + # Check workflow default permissions + workflow_perms = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/actions/permissions/workflow" + ) + + default_workflow_permissions = 'write' + can_approve_prs = True + if workflow_perms: + default_workflow_permissions = workflow_perms.get('default_workflow_permissions', 'write') + can_approve_prs = workflow_perms.get('can_approve_pull_request_reviews', True) + + return { + 'enabled_repositories': perms.get('enabled_repositories', 'all'), + 'allowed_actions': perms.get('allowed_actions', 'all'), + 'default_workflow_permissions': default_workflow_permissions, + 'can_approve_prs': can_approve_prs, + 'error': None + } + except Exception as e: + return {'error': str(e)} + +def check_pat_policies(org_login): + """Check fine-grained PAT policies""" + try: + org_data = run_gh_command(f"{CONFIG['gh_command']} api orgs/{org_login}") + + if org_data is None: + return {'error': 'Not accessible'} + + # Check pending PAT requests + pat_requests = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/personal-access-token-requests" + ) + + pending_requests = len(pat_requests) if isinstance(pat_requests, list) else 0 + + return { + 'pending_pat_requests': pending_requests, + 'error': None + } + except Exception as e: + return {'error': str(e)} + +def check_code_security_config(org_login): + """Check organization code security configurations""" + try: + configs = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/code-security/configurations" + ) + + if configs is None or not isinstance(configs, list): + return {'has_config': False, 'total_configs': 0, 'has_enforced': False, 'error': None} + + total = len(configs) + has_enforced = any(c.get('enforcement') == 'enforced' for c in configs) + + return { + 'has_config': total > 0, + 'total_configs': total, + 'has_enforced': has_enforced, + 'error': None + } + except Exception as e: + return {'has_config': False, 'total_configs': 0, 'has_enforced': False, 'error': str(e)} + +def check_copilot_settings(org_login): + """Check GitHub Copilot organization settings""" + try: + billing = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/copilot/billing" + ) + + if billing is None: + return {'enabled': False, 'error': 'Not accessible or Copilot not enabled'} + + seat_breakdown = billing.get('seat_breakdown', {}) + + # Check premium requests budget + # The premium_requests_budget field indicates if org has set spending limits + # for premium model usage (GPT-4, Claude, etc.) + has_premium_budget = False + premium_budget_limit = None + + # Try to get Copilot policies/settings that include premium request limits + policies = run_gh_command( + f"{CONFIG['gh_command']} api orgs/{org_login}/copilot/policies" + ) + + if policies and isinstance(policies, dict): + premium_budget_limit = policies.get('premium_requests_budget_monthly_limit') + has_premium_budget = premium_budget_limit is not None and premium_budget_limit > 0 + + # Also check from billing response + if not has_premium_budget and billing: + # Check for budget fields in billing response + premium_budget_limit = billing.get('premium_requests_budget_monthly_limit') + if premium_budget_limit is None: + premium_budget_limit = billing.get('organization_monthly_budget') + has_premium_budget = premium_budget_limit is not None and premium_budget_limit > 0 + + return { + 'enabled': True, + 'total_seats': seat_breakdown.get('total', 0), + 'active_seats': seat_breakdown.get('active_this_cycle', 0), + 'inactive_seats': seat_breakdown.get('inactive_this_cycle', 0), + 'seat_management': billing.get('seat_management_setting', 'unknown'), + 'public_code_suggestions': billing.get('public_code_suggestions', 'unknown'), + 'has_premium_budget': has_premium_budget, + 'premium_budget_limit': premium_budget_limit, + 'error': None + } + except Exception as e: + return {'enabled': False, 'error': str(e)} + def assess_organization(org): """Perform comprehensive IAM assessment on an organization""" org_login = org['login'] @@ -320,6 +511,12 @@ def assess_organization(org): result['member_privileges'] = check_org_member_privileges(org_login) result['environments'] = check_org_environments(org_login) result['token_security'] = check_org_token_security(org_login) + result['org_rulesets'] = check_org_rulesets(org_login) + result['outside_collaborators'] = check_outside_collaborators(org_login) + result['actions_permissions'] = check_org_actions_permissions(org_login) + result['pat_policies'] = check_pat_policies(org_login) + result['code_security_config'] = check_code_security_config(org_login) + result['copilot_settings'] = check_copilot_settings(org_login) return result @@ -361,6 +558,12 @@ def export_to_csv(results): privs = result['member_privileges'] envs = result['environments'] tokens = result['token_security'] + rulesets = result.get('org_rulesets', {}) + outside_collabs = result.get('outside_collaborators', {}) + actions_perms = result.get('actions_permissions', {}) + pat_pol = result.get('pat_policies', {}) + code_sec = result.get('code_security_config', {}) + copilot = result.get('copilot_settings', {}) row = { 'Organization': result['org_login'], @@ -395,6 +598,42 @@ def export_to_csv(results): 'Dependabot Alerts Enabled': 'Yes' if tokens.get('dependabot_alerts_for_new_repos', False) else 'No', 'Token Security Status': 'āœ… Pass' if tokens.get('secret_scanning_for_new_repos', False) else 'āš ļø Review', + # Org Rulesets + 'Org Rulesets Total': rulesets.get('total_rulesets', 0), + 'Org Rulesets Active': rulesets.get('active_rulesets', 0), + 'Org Rulesets Require PR': 'Yes' if rulesets.get('has_pr_requirement', False) else 'No', + 'Org Rulesets Status': 'āœ… Pass' if rulesets.get('active_rulesets', 0) > 0 else 'āš ļø No org rulesets', + + # Outside Collaborators + 'Outside Collaborators': outside_collabs.get('total', 0), + 'Outside Collabs Without 2FA': outside_collabs.get('without_2fa', 0), + 'Outside Collabs Status': 'āœ… Pass' if outside_collabs.get('without_2fa', 0) == 0 else 'āš ļø Review', + + # Actions Permissions + 'Org Actions Allowed Policy': actions_perms.get('allowed_actions', 'unknown'), + 'Org Actions Default Permissions': actions_perms.get('default_workflow_permissions', 'unknown'), + 'Org Actions Can Approve PRs': 'Yes' if actions_perms.get('can_approve_prs', True) else 'No', + 'Org Actions Status': 'āœ… Pass' if actions_perms.get('default_workflow_permissions') == 'read' and not actions_perms.get('can_approve_prs', True) else 'āš ļø Review', + + # PAT Policies + 'Pending PAT Requests': pat_pol.get('pending_pat_requests', 0), + + # Code Security Configuration + 'Has Code Security Config': 'Yes' if code_sec.get('has_config', False) else 'No', + 'Code Security Configs Total': code_sec.get('total_configs', 0), + 'Code Security Enforced': 'Yes' if code_sec.get('has_enforced', False) else 'No', + 'Code Security Config Status': 'āœ… Pass' if code_sec.get('has_enforced', False) else 'āš ļø Review', + + # Copilot Settings + 'Copilot Enabled': 'Yes' if copilot.get('enabled', False) else 'No', + 'Copilot Total Seats': copilot.get('total_seats', 0), + 'Copilot Active Seats': copilot.get('active_seats', 0), + 'Copilot Inactive Seats': copilot.get('inactive_seats', 0), + 'Copilot Public Code Suggestions': copilot.get('public_code_suggestions', 'N/A'), + 'Copilot Premium Budget Set': 'Yes' if copilot.get('has_premium_budget', False) else 'No', + 'Copilot Premium Budget Limit': copilot.get('premium_budget_limit', 'N/A'), + 'Copilot Status': 'āœ… Pass' if (copilot.get('public_code_suggestions') == 'block' and copilot.get('has_premium_budget', False)) else 'āš ļø Review' if copilot.get('enabled') else 'N/A', + # Overall Compliance 'Overall IAM Status': 'āœ… Compliant' if all([ # Enterprise orgs use IdP for auth, so don't require org-level 2FA setting @@ -408,7 +647,13 @@ def export_to_csv(results): sso.get('error'), privs.get('error'), envs.get('error'), - tokens.get('error') + tokens.get('error'), + rulesets.get('error'), + outside_collabs.get('error'), + actions_perms.get('error'), + pat_pol.get('error'), + code_sec.get('error'), + copilot.get('error') ])) or 'None' } @@ -486,6 +731,39 @@ def print_summary(results, fetch_time, assess_time, total_time): log(f"\nšŸ”‘ TOKEN & SECRET SECURITY:") log(f" Secret Scanning for New Repos: {with_secret_scanning}/{total_orgs} ({(with_secret_scanning/total_orgs*100):.1f}%)") + # Org Rulesets + with_rulesets = sum(1 for r in results if r.get('org_rulesets', {}).get('active_rulesets', 0) > 0) + log(f"\nšŸ“‹ ORG-LEVEL RULESETS:") + log(f" Organizations with active rulesets: {with_rulesets}/{total_orgs} ({(with_rulesets/total_orgs*100):.1f}%)") + + # Outside Collaborators + total_outside = sum(r.get('outside_collaborators', {}).get('total', 0) for r in results) + without_2fa = sum(r.get('outside_collaborators', {}).get('without_2fa', 0) for r in results) + log(f"\nšŸ‘„ OUTSIDE COLLABORATORS:") + log(f" Total across all orgs: {total_outside}") + if without_2fa > 0: + log(f" āš ļø Without 2FA: {without_2fa}") + + # Actions + secure_actions = sum(1 for r in results if r.get('actions_permissions', {}).get('default_workflow_permissions') == 'read') + log(f"\n⚔ GITHUB ACTIONS GOVERNANCE:") + log(f" Orgs with read-only default permissions: {secure_actions}/{total_orgs} ({(secure_actions/total_orgs*100):.1f}%)") + + # Code Security Config + with_config = sum(1 for r in results if r.get('code_security_config', {}).get('has_config', False)) + log(f"\nšŸ›”ļø CODE SECURITY CONFIGURATIONS:") + log(f" Organizations with security config: {with_config}/{total_orgs} ({(with_config/total_orgs*100):.1f}%)") + + # Copilot Governance + with_premium_budget = sum(1 for r in results if r.get('copilot_settings', {}).get('has_premium_budget', False)) + if any(r.get('copilot_settings', {}).get('enabled', False) for r in results): + copilot_enabled_count = sum(1 for r in results if r.get('copilot_settings', {}).get('enabled', False)) + log(f"\nšŸ¤– COPILOT GOVERNANCE:") + log(f" Orgs with Copilot enabled: {copilot_enabled_count}/{total_orgs}") + log(f" Orgs with premium requests budget: {with_premium_budget}/{copilot_enabled_count}") + if with_premium_budget < copilot_enabled_count: + log(f" āš ļø {copilot_enabled_count - with_premium_budget} org(s) without premium requests budget (risk of unbounded spending)") + log(f"\nāœ… FULLY COMPLIANT ORGANIZATIONS: {fully_compliant}/{total_orgs} ({(fully_compliant/total_orgs*100):.1f}%)") # Show final rate limit diff --git a/repo_hygiene_assessment.py b/repo_hygiene_assessment.py new file mode 100644 index 0000000..2727af0 --- /dev/null +++ b/repo_hygiene_assessment.py @@ -0,0 +1,703 @@ +#!/usr/bin/env python3 +""" +GitHub Repository Hygiene Assessment Tool +Checks repositories for essential hygiene files, configurations, and staleness. + +Requirements: + - GitHub CLI (gh) installed and authenticated + - Python 3.8+ + - Appropriate permissions to access repository contents + +Usage: + python repo_hygiene_assessment.py + +Configuration: + Edit the CONFIG section below to customize behavior + +Checks performed: + - CODEOWNERS file presence (root or .github/) + - LICENSE file and SPDX identifier + - SECURITY.md file presence (root or .github/) + - README file presence + - .gitignore file presence + - dependabot.yml/yaml configuration + - Private Vulnerability Reporting status + - Stale repository detection (no push in >180 days, not archived) +""" + +import subprocess +import json +import sys +import csv +from concurrent.futures import ThreadPoolExecutor, as_completed +from time import time, sleep +from datetime import datetime, timezone +from pathlib import Path +import threading + +# ============================================================================ +# CONFIGURATION +# ============================================================================ + +CONFIG = { + # GitHub CLI command + 'gh_command': 'gh', + + # Performance settings + 'max_workers_fetch': 10, + 'max_workers_check': 15, + + # Rate limiting + 'enable_rate_limit_check': True, + 'rate_limit_threshold': 100, + 'rate_limit_wait_time': 60, + 'request_delay': 0.05, + + # Output settings + 'output_dir': '.', + 'csv_prefix': 'github_repo_hygiene_assessment', + + # Staleness threshold (days since last push) + 'stale_threshold_days': 180, + + # Personal account identifier (leave empty to auto-detect) + 'personal_account': '', + + # Verbose output + 'verbose': True, +} + +# ============================================================================ +# CORE FUNCTIONS +# ============================================================================ + +# Rate limit tracking +rate_limit_lock = threading.Lock() +rate_limit_info = {'remaining': None, 'reset_time': None, 'checked': False} + + +def log(message, verbose_only=False): + """Print message if verbose or not verbose_only""" + if not verbose_only or CONFIG['verbose']: + print(message) + + +def check_gh_installed(): + """Check if GitHub CLI is installed""" + try: + subprocess.run( + [CONFIG['gh_command'], '--version'], + capture_output=True, + check=True, + timeout=5 + ) + return True + except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + return False + + +def check_rate_limit(): + """Check GitHub API rate limit status""" + try: + result = subprocess.run( + f"{CONFIG['gh_command']} api rate_limit", + shell=True, + capture_output=True, + text=True, + check=True, + timeout=10 + ) + data = json.loads(result.stdout) + core_rate = data.get('resources', {}).get('core', {}) + return { + 'remaining': core_rate.get('remaining', 5000), + 'limit': core_rate.get('limit', 5000), + 'reset_time': core_rate.get('reset', 0) + } + except Exception: + return None + + +def wait_for_rate_limit(): + """Wait if rate limit is approaching threshold""" + if not CONFIG['enable_rate_limit_check']: + return + + with rate_limit_lock: + if not rate_limit_info['checked'] or rate_limit_info['remaining'] is None: + limit_data = check_rate_limit() + if limit_data: + rate_limit_info['remaining'] = limit_data['remaining'] + rate_limit_info['reset_time'] = limit_data['reset_time'] + rate_limit_info['checked'] = True + + log(f"šŸ“Š Rate Limit: {limit_data['remaining']}/{limit_data['limit']} requests remaining", verbose_only=True) + + if limit_data['remaining'] < CONFIG['rate_limit_threshold']: + wait_time = CONFIG['rate_limit_wait_time'] + log(f"āš ļø Rate limit threshold reached ({limit_data['remaining']} remaining)") + log(f" Waiting {wait_time} seconds...") + sleep(wait_time) + rate_limit_info['checked'] = False + + if CONFIG['request_delay'] > 0: + sleep(CONFIG['request_delay']) + + +def run_gh_command(command, return_json=True): + """Run GitHub CLI command and return output""" + wait_for_rate_limit() + + try: + result = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + check=True, + timeout=30 + ) + if return_json and result.stdout.strip(): + return json.loads(result.stdout) + return result.stdout.strip() + except (subprocess.CalledProcessError, json.JSONDecodeError, subprocess.TimeoutExpired): + return None + + +# ============================================================================ +# REPOSITORY FETCHING +# ============================================================================ + +def fetch_repositories(): + """Fetch all accessible repositories""" + log("Fetching repositories...") + + # Auto-detect personal account if not set + if not CONFIG['personal_account']: + user_info = run_gh_command(f"{CONFIG['gh_command']} api user --jq '.login'", return_json=False) + if user_info: + CONFIG['personal_account'] = user_info.strip('"') + log(f"Detected personal account: {CONFIG['personal_account']}", verbose_only=True) + + if CONFIG['enable_rate_limit_check']: + limit_data = check_rate_limit() + if limit_data: + log(f"šŸ“Š Initial Rate Limit: {limit_data['remaining']}/{limit_data['limit']} requests remaining") + + all_repos = [] + + with ThreadPoolExecutor(max_workers=CONFIG['max_workers_fetch']) as executor: + futures = [] + + # Fetch user repos + futures.append(executor.submit( + run_gh_command, + f"{CONFIG['gh_command']} repo list --json nameWithOwner,name,owner,isPrivate --limit 1000" + )) + + # Get organizations + orgs_future = executor.submit( + run_gh_command, + f"{CONFIG['gh_command']} api user/orgs --paginate" + ) + orgs_data = orgs_future.result() + + if orgs_data: + for org in orgs_data: + org_login = org['login'] + futures.append(executor.submit( + run_gh_command, + f"{CONFIG['gh_command']} repo list {org_login} --json nameWithOwner,name,owner,isPrivate --limit 1000" + )) + + for future in as_completed(futures): + result = future.result() + if result and isinstance(result, list): + all_repos.extend(result) + + # Remove duplicates + seen = set() + unique_repos = [] + for repo in all_repos: + if repo['nameWithOwner'] not in seen: + seen.add(repo['nameWithOwner']) + unique_repos.append(repo) + + return unique_repos + + +# ============================================================================ +# HYGIENE CHECKS +# ============================================================================ + +def check_codeowners(repo_name): + """Check if CODEOWNERS file exists (root or .github/)""" + # Check root CODEOWNERS + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/CODEOWNERS") + if result and isinstance(result, dict) and result.get('name'): + return True + + # Check .github/CODEOWNERS + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.github/CODEOWNERS") + if result and isinstance(result, dict) and result.get('name'): + return True + + return False + + +def check_license(repo_name): + """Check if LICENSE exists and get SPDX identifier""" + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/license") + if result and isinstance(result, dict): + license_info = result.get('license', {}) + spdx_id = license_info.get('spdx_id', 'NOASSERTION') + return {'has_license': True, 'spdx_id': spdx_id} + + return {'has_license': False, 'spdx_id': 'None'} + + +def check_security_md(repo_name): + """Check if SECURITY.md exists (root or .github/)""" + # Check root SECURITY.md + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/SECURITY.md") + if result and isinstance(result, dict) and result.get('name'): + return True + + # Check .github/SECURITY.md + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.github/SECURITY.md") + if result and isinstance(result, dict) and result.get('name'): + return True + + return False + + +def check_readme(repo_name): + """Check if README exists""" + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/readme") + if result and isinstance(result, dict) and result.get('name'): + return True + return False + + +def check_gitignore(repo_name): + """Check if .gitignore exists""" + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.gitignore") + if result and isinstance(result, dict) and result.get('name'): + return True + return False + + +def check_dependabot_config(repo_name): + """Check if dependabot.yml or dependabot.yaml exists in .github/""" + # Check .github/dependabot.yml + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.github/dependabot.yml") + if result and isinstance(result, dict) and result.get('name'): + return True + + # Check .github/dependabot.yaml + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/contents/.github/dependabot.yaml") + if result and isinstance(result, dict) and result.get('name'): + return True + + return False + + +def check_private_vuln_reporting(repo_name): + """Check if Private Vulnerability Reporting is enabled""" + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}/private-vulnerability-reporting") + if result and isinstance(result, dict): + return result.get('enabled', False) + return False + + +def check_staleness(repo_name): + """Check if repository is stale (no push in threshold days, not archived)""" + result = run_gh_command(f"{CONFIG['gh_command']} api repos/{repo_name}") + if not result or not isinstance(result, dict): + return {'days_since_push': -1, 'is_stale': False, 'is_archived': False, 'error': 'Cannot fetch repo data'} + + pushed_at = result.get('pushed_at') + archived = result.get('archived', False) + + if not pushed_at: + return {'days_since_push': -1, 'is_stale': False, 'is_archived': archived, 'error': None} + + try: + push_date = datetime.fromisoformat(pushed_at.replace('Z', '+00:00')) + now = datetime.now(timezone.utc) + days_since_push = (now - push_date).days + except (ValueError, TypeError): + return {'days_since_push': -1, 'is_stale': False, 'is_archived': archived, 'error': 'Cannot parse date'} + + is_stale = days_since_push > CONFIG['stale_threshold_days'] and not archived + + return { + 'days_since_push': days_since_push, + 'is_stale': is_stale, + 'is_archived': archived, + 'error': None + } + + +# ============================================================================ +# ASSESSMENT ORCHESTRATION +# ============================================================================ + +def assess_repository_hygiene(repo): + """Perform comprehensive hygiene assessment on a repository""" + repo_name = repo['nameWithOwner'] + owner = repo_name.split('/')[0] + + log(f"Assessing {repo_name}...", verbose_only=True) + + errors = [] + + # Run all checks + try: + has_codeowners = check_codeowners(repo_name) + except Exception as e: + has_codeowners = False + errors.append(f"CODEOWNERS: {e}") + + try: + license_info = check_license(repo_name) + except Exception as e: + license_info = {'has_license': False, 'spdx_id': 'None'} + errors.append(f"LICENSE: {e}") + + try: + has_security_md = check_security_md(repo_name) + except Exception as e: + has_security_md = False + errors.append(f"SECURITY.md: {e}") + + try: + has_readme = check_readme(repo_name) + except Exception as e: + has_readme = False + errors.append(f"README: {e}") + + try: + has_gitignore = check_gitignore(repo_name) + except Exception as e: + has_gitignore = False + errors.append(f".gitignore: {e}") + + try: + has_dependabot = check_dependabot_config(repo_name) + except Exception as e: + has_dependabot = False + errors.append(f"dependabot: {e}") + + try: + pvr_enabled = check_private_vuln_reporting(repo_name) + except Exception as e: + pvr_enabled = False + errors.append(f"PVR: {e}") + + try: + staleness = check_staleness(repo_name) + except Exception as e: + staleness = {'days_since_push': -1, 'is_stale': False, 'is_archived': False, 'error': str(e)} + errors.append(f"Staleness: {e}") + + if staleness.get('error'): + errors.append(f"Staleness: {staleness['error']}") + + return { + 'repo_name': repo_name, + 'owner': owner, + 'is_private': repo.get('isPrivate', False), + 'has_codeowners': has_codeowners, + 'license_info': license_info, + 'has_security_md': has_security_md, + 'has_readme': has_readme, + 'has_gitignore': has_gitignore, + 'has_dependabot': has_dependabot, + 'pvr_enabled': pvr_enabled, + 'staleness': staleness, + 'errors': errors, + } + + +def assess_all_repositories(repos): + """Assess all repositories in parallel""" + log("\nAssessing repository hygiene (parallel execution)...") + + results = [] + total = len(repos) + + with ThreadPoolExecutor(max_workers=CONFIG['max_workers_check']) as executor: + future_to_repo = {executor.submit(assess_repository_hygiene, repo): repo for repo in repos} + + completed = 0 + for future in as_completed(future_to_repo): + result = future.result() + results.append(result) + completed += 1 + + log(f"⚔ Progress: {completed}/{total} repositories assessed ({(completed/total*100):.0f}%)", verbose_only=True) + + results.sort(key=lambda x: x['repo_name']) + return results + + +# ============================================================================ +# CSV EXPORT +# ============================================================================ + +def export_to_csv(results): + """Export results to CSV""" + log("\n" + "=" * 80) + log("EXPORTING TO CSV") + log("=" * 80) + + export_data = [] + + for result in results: + repo_name = result['repo_name'] + owner = result['owner'] + is_private = result['is_private'] + staleness = result['staleness'] + + has_codeowners = result['has_codeowners'] + has_license = result['license_info']['has_license'] + license_type = result['license_info']['spdx_id'] + has_security_md = result['has_security_md'] + has_readme = result['has_readme'] + has_gitignore = result['has_gitignore'] + has_dependabot = result['has_dependabot'] + pvr_enabled = result['pvr_enabled'] + days_since_push = staleness['days_since_push'] + is_stale = staleness['is_stale'] + is_archived = staleness['is_archived'] + + # Generate recommendations + recommendations = [] + if not has_codeowners: + recommendations.append('Add CODEOWNERS') + if not has_license: + recommendations.append('Add LICENSE') + if not has_security_md: + recommendations.append('Add SECURITY.md') + if not has_readme: + recommendations.append('Add README') + if not has_gitignore: + recommendations.append('Add .gitignore') + if not has_dependabot: + recommendations.append('Add dependabot.yml') + if not pvr_enabled: + recommendations.append('Enable Private Vulnerability Reporting') + if is_stale: + recommendations.append('Archive or update stale repository') + + # Overall status: Pass if has CODEOWNERS + LICENSE + SECURITY.md + README + not stale (or archived) + is_passing = all([ + has_codeowners, + has_license, + has_security_md, + has_readme, + not is_stale or is_archived, + ]) + + overall_status = 'āœ… Pass' if is_passing else 'āŒ Fail' + + row = { + 'Repository': repo_name, + 'Owner': owner, + 'Visibility': 'Private' if is_private else 'Public', + 'Has CODEOWNERS': 'Yes' if has_codeowners else 'No', + 'Has LICENSE': 'Yes' if has_license else 'No', + 'License Type': license_type, + 'Has SECURITY.md': 'Yes' if has_security_md else 'No', + 'Has README': 'Yes' if has_readme else 'No', + 'Has .gitignore': 'Yes' if has_gitignore else 'No', + 'Has dependabot.yml': 'Yes' if has_dependabot else 'No', + 'Private Vuln Reporting': 'Yes' if pvr_enabled else 'No', + 'Days Since Last Push': days_since_push if days_since_push >= 0 else 'Unknown', + 'Is Stale': 'Yes' if is_stale else 'No', + 'Is Archived': 'Yes' if is_archived else 'No', + 'Overall Hygiene Status': overall_status, + 'Recommendations': '; '.join(recommendations) if recommendations else 'None', + 'Errors': '; '.join(result['errors']) if result['errors'] else 'None', + } + + export_data.append(row) + + # Generate filename + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{CONFIG['csv_prefix']}_{timestamp}.csv" + filepath = Path(CONFIG['output_dir']) / filename + + filepath.parent.mkdir(parents=True, exist_ok=True) + + if export_data: + fieldnames = list(export_data[0].keys()) + + with open(filepath, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(export_data) + + log(f"āœ… CSV file created: {filepath}") + log(f" Total rows: {len(export_data)}") + log(f" Columns: {len(fieldnames)}") + return str(filepath) + else: + log("āŒ No data to export") + return None + + +# ============================================================================ +# SUMMARY +# ============================================================================ + +def print_summary(results, repos, fetch_time, assess_time, total_time): + """Print summary statistics""" + total_repos = len(results) + + has_codeowners_count = sum(1 for r in results if r['has_codeowners']) + has_license_count = sum(1 for r in results if r['license_info']['has_license']) + has_security_count = sum(1 for r in results if r['has_security_md']) + has_readme_count = sum(1 for r in results if r['has_readme']) + has_gitignore_count = sum(1 for r in results if r['has_gitignore']) + has_dependabot_count = sum(1 for r in results if r['has_dependabot']) + pvr_enabled_count = sum(1 for r in results if r['pvr_enabled']) + stale_count = sum(1 for r in results if r['staleness']['is_stale']) + archived_count = sum(1 for r in results if r['staleness']['is_archived']) + + fully_compliant = sum(1 for r in results if all([ + r['has_codeowners'], + r['license_info']['has_license'], + r['has_security_md'], + r['has_readme'], + not r['staleness']['is_stale'] or r['staleness']['is_archived'], + ])) + + log("\n" + "=" * 80) + log("REPOSITORY HYGIENE ASSESSMENT SUMMARY") + log("=" * 80) + log(f"Total repositories assessed: {total_repos}") + + log(f"\nšŸ“Š HYGIENE CONTROLS ADOPTION:") + log(f" CODEOWNERS present: {has_codeowners_count}/{total_repos} ({(has_codeowners_count/total_repos*100):.1f}%)") + log(f" LICENSE present: {has_license_count}/{total_repos} ({(has_license_count/total_repos*100):.1f}%)") + log(f" SECURITY.md present: {has_security_count}/{total_repos} ({(has_security_count/total_repos*100):.1f}%)") + log(f" README present: {has_readme_count}/{total_repos} ({(has_readme_count/total_repos*100):.1f}%)") + log(f" .gitignore present: {has_gitignore_count}/{total_repos} ({(has_gitignore_count/total_repos*100):.1f}%)") + log(f" dependabot.yml present: {has_dependabot_count}/{total_repos} ({(has_dependabot_count/total_repos*100):.1f}%)") + log(f" Private Vuln Reporting enabled: {pvr_enabled_count}/{total_repos} ({(pvr_enabled_count/total_repos*100):.1f}%)") + + log(f"\nšŸ“… REPOSITORY FRESHNESS:") + log(f" Active repositories: {total_repos - stale_count - archived_count}") + log(f" Stale repositories: {stale_count} (no push in >{CONFIG['stale_threshold_days']} days)") + log(f" Archived repositories: {archived_count}") + + log(f"\nāœ… FULLY COMPLIANT REPOSITORIES: {fully_compliant}/{total_repos} ({(fully_compliant/total_repos*100):.1f}%)") + + # Show final rate limit + if CONFIG['enable_rate_limit_check']: + final_limit = check_rate_limit() + if final_limit: + log(f"\nšŸ“Š Final Rate Limit: {final_limit['remaining']}/{final_limit['limit']} requests remaining") + + log(f"\n⚔ PERFORMANCE METRICS:") + log(f" Repository fetch: {fetch_time:.2f}s") + log(f" Hygiene assessment: {assess_time:.2f}s") + log(f" Total execution: {total_time:.2f}s") + log(f" Average per repo: {(assess_time/len(repos)):.3f}s") + + # Show non-compliant repositories + non_compliant = [r for r in results if not all([ + r['has_codeowners'], + r['license_info']['has_license'], + r['has_security_md'], + r['has_readme'], + not r['staleness']['is_stale'] or r['staleness']['is_archived'], + ])] + + if non_compliant: + log(f"\nāŒ NON-COMPLIANT REPOSITORIES ({len(non_compliant)}):") + for result in non_compliant[:10]: + issues = [] + if not result['has_codeowners']: + issues.append('No CODEOWNERS') + if not result['license_info']['has_license']: + issues.append('No LICENSE') + if not result['has_security_md']: + issues.append('No SECURITY.md') + if not result['has_readme']: + issues.append('No README') + if result['staleness']['is_stale']: + issues.append(f"Stale ({result['staleness']['days_since_push']} days)") + + log(f" • {result['repo_name']}: {', '.join(issues)}") + + if len(non_compliant) > 10: + log(f" ... and {len(non_compliant) - 10} more (see CSV for full list)") + + +# ============================================================================ +# MAIN +# ============================================================================ + +def main(): + """Main execution function""" + start_time = time() + + log("=" * 80) + log("GITHUB REPOSITORY HYGIENE ASSESSMENT TOOL") + log("=" * 80) + + # Check prerequisites + if not check_gh_installed(): + log("\nāŒ GitHub CLI (gh) is not installed or not in PATH!") + log("\nTo install:") + log(" Windows: winget install --id GitHub.cli") + log(" macOS: brew install gh") + log(" Linux: See https://cli.github.com/") + log("\nAfter installation, authenticate with: gh auth login") + return 1 + + # Fetch repositories + fetch_start = time() + repos = fetch_repositories() + fetch_time = time() - fetch_start + + if not repos: + log("\nāŒ Could not fetch repositories. Make sure you're authenticated:") + log(" gh auth login") + return 1 + + log(f"āœ“ Found {len(repos)} repositories in {fetch_time:.2f}s") + + # Assess repositories + assess_start = time() + results = assess_all_repositories(repos) + assess_time = time() - assess_start + + # Export results + csv_file = export_to_csv(results) + + # Print summary + total_time = time() - start_time + print_summary(results, repos, fetch_time, assess_time, total_time) + + log("\n" + "=" * 80) + log("āœ… Repository hygiene assessment complete!") + log("=" * 80) + + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + log("\n\nāš ļø Operation cancelled by user") + sys.exit(130) + except Exception as e: + log(f"\nāŒ Unexpected error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/security_assessment.py b/security_assessment.py index deb910b..f4dcf27 100644 --- a/security_assessment.py +++ b/security_assessment.py @@ -398,6 +398,105 @@ def check_branch_protection(repo_name): 'error': str(e) } +def check_actions_security(repo_name): + """Check GitHub Actions security configuration""" + try: + # Check default workflow permissions + workflow_perms = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/actions/permissions/workflow" + ) + + default_permissions = 'unknown' + can_approve_prs = True + if workflow_perms: + default_permissions = workflow_perms.get('default_workflow_permissions', 'write') + can_approve_prs = workflow_perms.get('can_approve_pull_request_reviews', True) + + # Check allowed actions + actions_perms = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/actions/permissions" + ) + + actions_enabled = True + allowed_actions = 'all' + if actions_perms: + actions_enabled = actions_perms.get('enabled', True) + allowed_actions = actions_perms.get('allowed_actions', 'all') + + return { + 'actions_enabled': actions_enabled, + 'default_workflow_permissions': default_permissions, + 'can_approve_prs': can_approve_prs, + 'allowed_actions': allowed_actions, + 'is_restrictive': default_permissions == 'read' and not can_approve_prs, + 'error': None + } + except Exception as e: + return { + 'actions_enabled': True, + 'default_workflow_permissions': 'unknown', + 'can_approve_prs': True, + 'allowed_actions': 'all', + 'is_restrictive': False, + 'error': str(e) + } + +def check_deploy_keys(repo_name): + """Audit deploy keys for security issues""" + try: + keys = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/keys" + ) + + if keys is None: + return {'total_keys': 0, 'write_keys': 0, 'error': 'Not accessible'} + + if not isinstance(keys, list): + return {'total_keys': 0, 'write_keys': 0, 'error': None} + + total_keys = len(keys) + write_keys = sum(1 for k in keys if not k.get('read_only', True)) + + return { + 'total_keys': total_keys, + 'write_keys': write_keys, + 'error': None + } + except Exception as e: + return {'total_keys': 0, 'write_keys': 0, 'error': str(e)} + +def check_webhooks(repo_name): + """Audit webhooks for security issues""" + try: + hooks = run_gh_command( + f"{CONFIG['gh_command']} api repos/{repo_name}/hooks" + ) + + if hooks is None: + return {'total_hooks': 0, 'insecure_hooks': 0, 'error': 'Not accessible'} + + if not isinstance(hooks, list): + return {'total_hooks': 0, 'insecure_hooks': 0, 'error': None} + + total_hooks = len(hooks) + insecure_hooks = 0 + + for hook in hooks: + config = hook.get('config', {}) + # Flag if SSL verification disabled or no secret + if config.get('insecure_ssl') == '1': + insecure_hooks += 1 + elif not config.get('url', '').startswith('https://'): + insecure_hooks += 1 + + return { + 'total_hooks': total_hooks, + 'insecure_hooks': insecure_hooks, + 'error': None + } + except Exception as e: + return {'total_hooks': 0, 'insecure_hooks': 0, 'error': str(e)} + def assess_repository_security(repo, org_settings): """Perform comprehensive security assessment on a repository""" repo_name = repo['nameWithOwner'] @@ -419,6 +518,9 @@ def assess_repository_security(repo, org_settings): result['secret_scanning'] = check_secret_scanning(repo_name) result['dependabot'] = check_dependabot(repo_name) result['branch_protection'] = check_branch_protection(repo_name) + result['actions_security'] = check_actions_security(repo_name) + result['deploy_keys'] = check_deploy_keys(repo_name) + result['webhooks'] = check_webhooks(repo_name) result['org_settings'] = org_config return result @@ -461,6 +563,9 @@ def export_to_csv(results): secret_scan = result['secret_scanning'] dependabot = result['dependabot'] branch_prot = result['branch_protection'] + actions_sec = result['actions_security'] + deploy_keys = result['deploy_keys'] + webhooks = result['webhooks'] org_config = result.get('org_settings', {}) # Determine if org has good defaults @@ -502,6 +607,22 @@ def export_to_csv(results): 'Requires Status Checks': 'Yes' if branch_prot.get('requires_status_checks', False) else 'No', 'Branch Protection Status': 'āœ… Pass' if branch_prot['enabled'] else 'āŒ Fail', + # Actions Security + 'Actions Default Permissions': actions_sec.get('default_workflow_permissions', 'unknown'), + 'Actions Can Approve PRs': 'Yes' if actions_sec.get('can_approve_prs', True) else 'No', + 'Actions Allowed Policy': actions_sec.get('allowed_actions', 'all'), + 'Actions Security Status': 'āœ… Pass' if actions_sec.get('is_restrictive', False) else 'āš ļø Review', + + # Deploy Keys + 'Deploy Keys Total': deploy_keys.get('total_keys', 0), + 'Deploy Keys Write Access': deploy_keys.get('write_keys', 0), + 'Deploy Keys Status': 'āœ… Pass' if deploy_keys.get('write_keys', 0) == 0 else 'āš ļø Review', + + # Webhooks + 'Webhooks Total': webhooks.get('total_hooks', 0), + 'Webhooks Insecure': webhooks.get('insecure_hooks', 0), + 'Webhooks Status': 'āœ… Pass' if webhooks.get('insecure_hooks', 0) == 0 else 'āš ļø Review', + # Overall Status 'Overall Security Status': 'āœ… Pass' if all([ code_scan['enabled'] and code_scan.get('critical_alerts', 0) == 0, @@ -515,7 +636,10 @@ def export_to_csv(results): code_scan.get('error'), secret_scan.get('error'), dependabot.get('error'), - branch_prot.get('error') + branch_prot.get('error'), + actions_sec.get('error'), + deploy_keys.get('error'), + webhooks.get('error') ])) or 'None' } @@ -559,6 +683,10 @@ def print_summary(results, repos, fetch_time, assess_time, total_time): branch_prot_enabled = sum(1 for r in results if r['branch_protection']['enabled']) + actions_restrictive = sum(1 for r in results if r['actions_security'].get('is_restrictive', False)) + deploy_keys_with_write = sum(1 for r in results if r['deploy_keys'].get('write_keys', 0) > 0) + insecure_webhooks = sum(1 for r in results if r['webhooks'].get('insecure_hooks', 0) > 0) + fully_compliant = sum(1 for r in results if all([ r['code_scanning']['enabled'] and r['code_scanning'].get('critical_alerts', 0) == 0, r['secret_scanning']['enabled'], @@ -576,6 +704,9 @@ def print_summary(results, repos, fetch_time, assess_time, total_time): log(f" Secret Scanning enabled: {secret_scan_enabled}/{total_repos} ({(secret_scan_enabled/total_repos*100):.1f}%)") log(f" Dependabot enabled: {dependabot_enabled}/{total_repos} ({(dependabot_enabled/total_repos*100):.1f}%)") log(f" Branch Protection enabled: {branch_prot_enabled}/{total_repos} ({(branch_prot_enabled/total_repos*100):.1f}%)") + log(f" Actions restrictive permissions: {actions_restrictive}/{total_repos} ({(actions_restrictive/total_repos*100):.1f}%)") + log(f" Repos with write deploy keys: {deploy_keys_with_write}/{total_repos} ({(deploy_keys_with_write/total_repos*100):.1f}%)") + log(f" Repos with insecure webhooks: {insecure_webhooks}/{total_repos} ({(insecure_webhooks/total_repos*100):.1f}%)") log(f"\nāœ… FULLY COMPLIANT REPOSITORIES: {fully_compliant}/{total_repos} ({(fully_compliant/total_repos*100):.1f}%)") # Show ruleset recommendation diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..59fa147 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,53 @@ +"""Shared fixtures for GitHub Assessment tests.""" +import json +import subprocess +import pytest + + +class MockCompletedProcess: + """Mock subprocess.CompletedProcess for gh CLI calls.""" + + def __init__(self, stdout="", stderr="", returncode=0): + self.stdout = stdout + self.stderr = stderr + self.returncode = returncode + + +def make_gh_mock(responses): + """Create a mock for subprocess.run that returns predefined responses based on command content. + + Args: + responses: dict mapping command substring patterns to (stdout_data, returncode) tuples. + If stdout_data is a dict/list, it will be JSON-serialized. + """ + def mock_run(command, **kwargs): + cmd_str = command if isinstance(command, str) else ' '.join(command) + + for pattern, (data, code) in responses.items(): + if pattern in cmd_str: + if isinstance(data, (dict, list)): + stdout = json.dumps(data) + elif data is None: + raise subprocess.CalledProcessError(code, cmd_str) + else: + stdout = str(data) + return MockCompletedProcess(stdout=stdout, returncode=code) + + # Default: command not found in patterns, raise error + raise subprocess.CalledProcessError(1, cmd_str) + + return mock_run + + +@pytest.fixture +def mock_rate_limit(): + """Standard rate limit response.""" + return { + "resources": { + "core": { + "remaining": 4999, + "limit": 5000, + "reset": 9999999999 + } + } + } diff --git a/tests/test_copilot_assessment.py b/tests/test_copilot_assessment.py new file mode 100644 index 0000000..a3209a0 --- /dev/null +++ b/tests/test_copilot_assessment.py @@ -0,0 +1,50 @@ +"""Tests for assess_copilot_repos.py new checks (Copilot files).""" +import json +from unittest.mock import patch +import pytest +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import assess_copilot_repos as acr + + +class TestCheckCopilotFiles: + """Tests for additional Copilot file detection.""" + + @patch('assess_copilot_repos.run_gh_command') + def test_all_files_present(self, mock_cmd): + mock_cmd.side_effect = [ + {"name": "copilot-instructions.md"}, # .github/copilot-instructions.md + {"name": "AGENTS.md"}, # AGENTS.md + {"name": ".copilotignore"}, # .copilotignore + {"name": "mcp.json"}, # .github/copilot/mcp.json + ] + result = acr.check_copilot_files({"nameWithOwner": "org/repo"}) + assert result['has_copilot_instructions'] is True + assert result['has_agents_md'] is True + assert result['has_copilotignore'] is True + assert result['has_mcp_config'] is True + + @patch('assess_copilot_repos.run_gh_command') + def test_no_files_present(self, mock_cmd): + mock_cmd.return_value = None + result = acr.check_copilot_files({"nameWithOwner": "org/repo"}) + assert result['has_copilot_instructions'] is False + assert result['has_agents_md'] is False + assert result['has_copilotignore'] is False + assert result['has_mcp_config'] is False + + @patch('assess_copilot_repos.run_gh_command') + def test_partial_files(self, mock_cmd): + mock_cmd.side_effect = [ + {"name": "copilot-instructions.md"}, # present + None, # AGENTS.md absent + None, # .copilotignore absent + {"name": "mcp.json"}, # present + ] + result = acr.check_copilot_files({"nameWithOwner": "org/repo"}) + assert result['has_copilot_instructions'] is True + assert result['has_agents_md'] is False + assert result['has_copilotignore'] is False + assert result['has_mcp_config'] is True diff --git a/tests/test_idp_assessment.py b/tests/test_idp_assessment.py new file mode 100644 index 0000000..1fce8ea --- /dev/null +++ b/tests/test_idp_assessment.py @@ -0,0 +1,213 @@ +"""Tests for idp_assessment.py new checks (Org Governance).""" +import json +from unittest.mock import patch +import pytest +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import idp_assessment as idp + + +class TestCheckOrgRulesets: + """Tests for organization-level rulesets check.""" + + @patch('idp_assessment.run_gh_command') + def test_no_rulesets(self, mock_cmd): + mock_cmd.return_value = [] + result = idp.check_org_rulesets("myorg") + assert result['total_rulesets'] == 0 + assert result['active_rulesets'] == 0 + assert result['has_pr_requirement'] is False + + @patch('idp_assessment.run_gh_command') + def test_active_rulesets_with_pr_requirement(self, mock_cmd): + mock_cmd.side_effect = [ + [{"id": 1, "name": "Protect main", "enforcement": "active"}], + {"id": 1, "rules": [{"type": "required_pull_request"}]} + ] + result = idp.check_org_rulesets("myorg") + assert result['total_rulesets'] == 1 + assert result['active_rulesets'] == 1 + assert result['has_pr_requirement'] is True + + @patch('idp_assessment.run_gh_command') + def test_disabled_rulesets(self, mock_cmd): + mock_cmd.side_effect = [ + [{"id": 1, "name": "Draft", "enforcement": "disabled"}], + ] + result = idp.check_org_rulesets("myorg") + assert result['total_rulesets'] == 1 + assert result['active_rulesets'] == 0 + assert result['has_pr_requirement'] is False + + @patch('idp_assessment.run_gh_command') + def test_rulesets_not_accessible(self, mock_cmd): + mock_cmd.return_value = None + result = idp.check_org_rulesets("myorg") + assert result['total_rulesets'] == 0 + + +class TestCheckOutsideCollaborators: + """Tests for outside collaborators audit.""" + + @patch('idp_assessment.run_gh_command') + def test_no_outside_collaborators(self, mock_cmd): + mock_cmd.side_effect = [[], []] + result = idp.check_outside_collaborators("myorg") + assert result['total'] == 0 + assert result['without_2fa'] == 0 + + @patch('idp_assessment.run_gh_command') + def test_collaborators_all_with_2fa(self, mock_cmd): + mock_cmd.side_effect = [ + [{"login": "ext1"}, {"login": "ext2"}], + [] # none without 2FA + ] + result = idp.check_outside_collaborators("myorg") + assert result['total'] == 2 + assert result['without_2fa'] == 0 + + @patch('idp_assessment.run_gh_command') + def test_collaborators_without_2fa(self, mock_cmd): + mock_cmd.side_effect = [ + [{"login": "ext1"}, {"login": "ext2"}, {"login": "ext3"}], + [{"login": "ext2"}] # one without 2FA + ] + result = idp.check_outside_collaborators("myorg") + assert result['total'] == 3 + assert result['without_2fa'] == 1 + + @patch('idp_assessment.run_gh_command') + def test_not_accessible(self, mock_cmd): + mock_cmd.return_value = None + result = idp.check_outside_collaborators("myorg") + assert result['total'] == 0 + assert 'Not accessible' in result['error'] + + +class TestCheckOrgActionsPermissions: + """Tests for organization Actions permissions.""" + + @patch('idp_assessment.run_gh_command') + def test_secure_actions_config(self, mock_cmd): + mock_cmd.side_effect = [ + {"enabled_repositories": "all", "allowed_actions": "selected"}, + {"default_workflow_permissions": "read", "can_approve_pull_request_reviews": False}, + ] + result = idp.check_org_actions_permissions("myorg") + assert result['allowed_actions'] == 'selected' + assert result['default_workflow_permissions'] == 'read' + assert result['can_approve_prs'] is False + + @patch('idp_assessment.run_gh_command') + def test_permissive_actions(self, mock_cmd): + mock_cmd.side_effect = [ + {"enabled_repositories": "all", "allowed_actions": "all"}, + {"default_workflow_permissions": "write", "can_approve_pull_request_reviews": True}, + ] + result = idp.check_org_actions_permissions("myorg") + assert result['allowed_actions'] == 'all' + assert result['default_workflow_permissions'] == 'write' + assert result['can_approve_prs'] is True + + @patch('idp_assessment.run_gh_command') + def test_not_accessible(self, mock_cmd): + mock_cmd.return_value = None + result = idp.check_org_actions_permissions("myorg") + assert 'error' in result + + +class TestCheckCodeSecurityConfig: + """Tests for code security configurations.""" + + @patch('idp_assessment.run_gh_command') + def test_has_enforced_config(self, mock_cmd): + mock_cmd.return_value = [ + {"name": "High Security", "enforcement": "enforced"}, + {"name": "Standard", "enforcement": "unenforced"}, + ] + result = idp.check_code_security_config("myorg") + assert result['has_config'] is True + assert result['total_configs'] == 2 + assert result['has_enforced'] is True + + @patch('idp_assessment.run_gh_command') + def test_no_enforced_config(self, mock_cmd): + mock_cmd.return_value = [ + {"name": "Draft", "enforcement": "unenforced"}, + ] + result = idp.check_code_security_config("myorg") + assert result['has_config'] is True + assert result['has_enforced'] is False + + @patch('idp_assessment.run_gh_command') + def test_no_configs(self, mock_cmd): + mock_cmd.return_value = [] + result = idp.check_code_security_config("myorg") + assert result['has_config'] is False + assert result['total_configs'] == 0 + + +class TestCheckCopilotSettings: + """Tests for Copilot settings including premium budget.""" + + @patch('idp_assessment.run_gh_command') + def test_copilot_with_budget(self, mock_cmd): + mock_cmd.side_effect = [ + { # billing response + "seat_breakdown": {"total": 50, "active_this_cycle": 45, "inactive_this_cycle": 5}, + "seat_management_setting": "assign_selected", + "public_code_suggestions": "block", + }, + {"premium_requests_budget_monthly_limit": 1000}, # policies response + ] + result = idp.check_copilot_settings("myorg") + assert result['enabled'] is True + assert result['total_seats'] == 50 + assert result['public_code_suggestions'] == 'block' + assert result['has_premium_budget'] is True + assert result['premium_budget_limit'] == 1000 + + @patch('idp_assessment.run_gh_command') + def test_copilot_without_budget(self, mock_cmd): + mock_cmd.side_effect = [ + { # billing response + "seat_breakdown": {"total": 10, "active_this_cycle": 8, "inactive_this_cycle": 2}, + "seat_management_setting": "assign_all", + "public_code_suggestions": "allow", + }, + None, # policies not accessible + ] + result = idp.check_copilot_settings("myorg") + assert result['enabled'] is True + assert result['public_code_suggestions'] == 'allow' + assert result['has_premium_budget'] is False + + @patch('idp_assessment.run_gh_command') + def test_copilot_not_enabled(self, mock_cmd): + mock_cmd.return_value = None + result = idp.check_copilot_settings("myorg") + assert result['enabled'] is False + + +class TestCheckPatPolicies: + """Tests for PAT policies check.""" + + @patch('idp_assessment.run_gh_command') + def test_pending_requests(self, mock_cmd): + mock_cmd.side_effect = [ + {"login": "myorg"}, # org data + [{"id": 1}, {"id": 2}], # pending requests + ] + result = idp.check_pat_policies("myorg") + assert result['pending_pat_requests'] == 2 + + @patch('idp_assessment.run_gh_command') + def test_no_pending_requests(self, mock_cmd): + mock_cmd.side_effect = [ + {"login": "myorg"}, + [], + ] + result = idp.check_pat_policies("myorg") + assert result['pending_pat_requests'] == 0 diff --git a/tests/test_repo_hygiene.py b/tests/test_repo_hygiene.py new file mode 100644 index 0000000..32a4f88 --- /dev/null +++ b/tests/test_repo_hygiene.py @@ -0,0 +1,153 @@ +"""Tests for repo_hygiene_assessment.py new checks.""" +import json +import subprocess +from unittest.mock import patch, MagicMock +import pytest +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import repo_hygiene_assessment as rha + + +class TestCheckCodeowners: + """Tests for CODEOWNERS detection.""" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_codeowners_in_root(self, mock_cmd): + # Returns dict with 'name' → truthy → returns True on first call + mock_cmd.return_value = {"name": "CODEOWNERS", "path": "CODEOWNERS"} + result = rha.check_codeowners("org/repo") + assert result is True + + @patch('repo_hygiene_assessment.run_gh_command') + def test_codeowners_in_github_dir(self, mock_cmd): + mock_cmd.side_effect = [ + None, # root not found + {"name": "CODEOWNERS", "path": ".github/CODEOWNERS"}, + ] + result = rha.check_codeowners("org/repo") + assert result is True + + @patch('repo_hygiene_assessment.run_gh_command') + def test_no_codeowners(self, mock_cmd): + mock_cmd.side_effect = [None, None] + result = rha.check_codeowners("org/repo") + assert result is False + + +class TestCheckLicense: + """Tests for LICENSE detection.""" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_has_license_mit(self, mock_cmd): + mock_cmd.return_value = { + "license": {"key": "mit", "name": "MIT License", "spdx_id": "MIT"} + } + result = rha.check_license("org/repo") + assert result['has_license'] is True + assert result['spdx_id'] == "MIT" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_no_license(self, mock_cmd): + mock_cmd.return_value = None + result = rha.check_license("org/repo") + assert result['has_license'] is False + + +class TestCheckSecurityMd: + """Tests for SECURITY.md detection.""" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_security_md_in_root(self, mock_cmd): + mock_cmd.return_value = {"name": "SECURITY.md"} + result = rha.check_security_md("org/repo") + assert result is True + + @patch('repo_hygiene_assessment.run_gh_command') + def test_no_security_md(self, mock_cmd): + mock_cmd.side_effect = [None, None] + result = rha.check_security_md("org/repo") + assert result is False + + +class TestCheckDependabotConfig: + """Tests for dependabot.yml detection.""" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_has_dependabot_yml(self, mock_cmd): + mock_cmd.return_value = {"name": "dependabot.yml"} + result = rha.check_dependabot_config("org/repo") + assert result is True + + @patch('repo_hygiene_assessment.run_gh_command') + def test_has_dependabot_yaml(self, mock_cmd): + mock_cmd.side_effect = [None, {"name": "dependabot.yaml"}] + result = rha.check_dependabot_config("org/repo") + assert result is True + + @patch('repo_hygiene_assessment.run_gh_command') + def test_no_dependabot(self, mock_cmd): + mock_cmd.side_effect = [None, None] + result = rha.check_dependabot_config("org/repo") + assert result is False + + +class TestCheckPrivateVulnReporting: + """Tests for private vulnerability reporting.""" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_pvr_enabled(self, mock_cmd): + mock_cmd.return_value = {"enabled": True} + result = rha.check_private_vuln_reporting("org/repo") + assert result is True + + @patch('repo_hygiene_assessment.run_gh_command') + def test_pvr_disabled(self, mock_cmd): + mock_cmd.return_value = {"enabled": False} + result = rha.check_private_vuln_reporting("org/repo") + assert result is False + + @patch('repo_hygiene_assessment.run_gh_command') + def test_pvr_not_accessible(self, mock_cmd): + mock_cmd.return_value = None + result = rha.check_private_vuln_reporting("org/repo") + assert result is False + + +class TestCheckStaleness: + """Tests for stale repository detection.""" + + @patch('repo_hygiene_assessment.run_gh_command') + def test_active_repo(self, mock_cmd): + from datetime import datetime, timezone + recent = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + mock_cmd.return_value = { + "pushed_at": recent, + "archived": False, + "size": 500 + } + result = rha.check_staleness("org/repo") + assert result['is_stale'] is False + assert result['is_archived'] is False + + @patch('repo_hygiene_assessment.run_gh_command') + def test_stale_repo(self, mock_cmd): + mock_cmd.return_value = { + "pushed_at": "2020-01-01T00:00:00Z", + "archived": False, + "size": 500 + } + result = rha.check_staleness("org/repo") + assert result['is_stale'] is True + assert result['is_archived'] is False + + @patch('repo_hygiene_assessment.run_gh_command') + def test_archived_repo(self, mock_cmd): + mock_cmd.return_value = { + "pushed_at": "2020-01-01T00:00:00Z", + "archived": True, + "size": 500 + } + result = rha.check_staleness("org/repo") + assert result['is_archived'] is True diff --git a/tests/test_security_assessment.py b/tests/test_security_assessment.py new file mode 100644 index 0000000..f9943ce --- /dev/null +++ b/tests/test_security_assessment.py @@ -0,0 +1,130 @@ +"""Tests for security_assessment.py new checks (Actions, Deploy Keys, Webhooks).""" +import json +from unittest.mock import patch +import pytest +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import security_assessment as sa + + +class TestCheckActionsSecurity: + """Tests for GitHub Actions security configuration checks.""" + + @patch('security_assessment.run_gh_command') + def test_restrictive_actions_config(self, mock_cmd): + mock_cmd.side_effect = [ + {"default_workflow_permissions": "read", "can_approve_pull_request_reviews": False}, + {"enabled": True, "allowed_actions": "selected"}, + ] + result = sa.check_actions_security("org/repo") + assert result['default_workflow_permissions'] == 'read' + assert result['can_approve_prs'] is False + assert result['allowed_actions'] == 'selected' + assert result['is_restrictive'] is True + assert result['error'] is None + + @patch('security_assessment.run_gh_command') + def test_permissive_actions_config(self, mock_cmd): + mock_cmd.side_effect = [ + {"default_workflow_permissions": "write", "can_approve_pull_request_reviews": True}, + {"enabled": True, "allowed_actions": "all"}, + ] + result = sa.check_actions_security("org/repo") + assert result['default_workflow_permissions'] == 'write' + assert result['can_approve_prs'] is True + assert result['allowed_actions'] == 'all' + assert result['is_restrictive'] is False + + @patch('security_assessment.run_gh_command') + def test_actions_not_accessible(self, mock_cmd): + mock_cmd.side_effect = [None, None] + result = sa.check_actions_security("org/repo") + assert result['default_workflow_permissions'] == 'unknown' + assert result['is_restrictive'] is False + + +class TestCheckDeployKeys: + """Tests for deploy keys audit.""" + + @patch('security_assessment.run_gh_command') + def test_no_deploy_keys(self, mock_cmd): + mock_cmd.return_value = [] + result = sa.check_deploy_keys("org/repo") + assert result['total_keys'] == 0 + assert result['write_keys'] == 0 + + @patch('security_assessment.run_gh_command') + def test_read_only_keys(self, mock_cmd): + mock_cmd.return_value = [ + {"id": 1, "title": "CI Key", "read_only": True}, + {"id": 2, "title": "Deploy Key", "read_only": True}, + ] + result = sa.check_deploy_keys("org/repo") + assert result['total_keys'] == 2 + assert result['write_keys'] == 0 + + @patch('security_assessment.run_gh_command') + def test_write_access_keys_flagged(self, mock_cmd): + mock_cmd.return_value = [ + {"id": 1, "title": "CI Key", "read_only": True}, + {"id": 2, "title": "Admin Key", "read_only": False}, + {"id": 3, "title": "Deploy Key", "read_only": False}, + ] + result = sa.check_deploy_keys("org/repo") + assert result['total_keys'] == 3 + assert result['write_keys'] == 2 + + @patch('security_assessment.run_gh_command') + def test_keys_not_accessible(self, mock_cmd): + mock_cmd.return_value = None + result = sa.check_deploy_keys("org/repo") + assert result['total_keys'] == 0 + assert 'Not accessible' in result['error'] + + +class TestCheckWebhooks: + """Tests for webhook security audit.""" + + @patch('security_assessment.run_gh_command') + def test_no_webhooks(self, mock_cmd): + mock_cmd.return_value = [] + result = sa.check_webhooks("org/repo") + assert result['total_hooks'] == 0 + assert result['insecure_hooks'] == 0 + + @patch('security_assessment.run_gh_command') + def test_secure_webhooks(self, mock_cmd): + mock_cmd.return_value = [ + {"active": True, "config": {"url": "https://example.com/hook", "insecure_ssl": "0", "secret": "abc123"}}, + {"active": True, "config": {"url": "https://ci.example.com/hook", "insecure_ssl": "0", "secret": "def456"}}, + ] + result = sa.check_webhooks("org/repo") + assert result['total_hooks'] == 2 + assert result['insecure_hooks'] == 0 + + @patch('security_assessment.run_gh_command') + def test_insecure_ssl_flagged(self, mock_cmd): + mock_cmd.return_value = [ + {"active": True, "config": {"url": "https://example.com/hook", "insecure_ssl": "1", "secret": "abc"}}, + ] + result = sa.check_webhooks("org/repo") + assert result['total_hooks'] == 1 + assert result['insecure_hooks'] == 1 + + @patch('security_assessment.run_gh_command') + def test_non_https_flagged(self, mock_cmd): + mock_cmd.return_value = [ + {"active": True, "config": {"url": "http://example.com/hook", "insecure_ssl": "0"}}, + ] + result = sa.check_webhooks("org/repo") + assert result['total_hooks'] == 1 + assert result['insecure_hooks'] == 1 + + @patch('security_assessment.run_gh_command') + def test_webhooks_not_accessible(self, mock_cmd): + mock_cmd.return_value = None + result = sa.check_webhooks("org/repo") + assert result['total_hooks'] == 0 + assert 'Not accessible' in result['error']