diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml old mode 100644 new mode 100755 diff --git a/BloodBash b/BloodBash index 6d1b3ab..1a34cda 100755 --- a/BloodBash +++ b/BloodBash @@ -9,36 +9,30 @@ from rich.console import Console from rich.table import Table from rich.panel import Panel from rich.progress import Progress -from rich import print as rprint from tqdm import tqdm import time -import csv # For CSV export -import sqlite3 # For database persistence -from html import escape # For HTML export -import yaml # For YAML export -__version__ = "1.1.0" # Updated version for enhancements +import csv +import sqlite3 +from html import escape +import yaml +import xml.etree.ElementTree as ET +from pathlib import Path +import traceback +__version__ = "1.2.3" # Fixed SharpHound collection-per-file format console = Console() + # ──────────────────────────────────────────────── -# Severity Scoring (for prioritization) +# Severity Scoring # ──────────────────────────────────────────────── SEVERITY_SCORES = { - "ESC1-ESC8": 10, # High impact, privilege escalation - "DCSync": 10, - "RBCD": 9, - "Dangerous Permissions": 9, - "SID History Abuse": 8, - "GPO Abuse": 7, - "Kerberoastable": 5, - "AS-REP Roastable": 5, - "Shortest Paths": 6, # Depends on length - "Password Never Expires": 4, # Moderate risk: Old passwords may persist - "Password Not Required": 8, # High risk: No auth barrier - "Shadow Credentials": 8, # High risk: Key addition without password change - "GPO Content": 7, # Medium risk: Exploitable GPO settings - "Constrained Delegation": 7, # Medium risk: Service impersonation - "LAPS": 6, # Moderate risk: Password management + "ESC1-ESC8": 10, "DCSync": 10, "RBCD": 9, "Dangerous Permissions": 9, + "SID History Abuse": 8, "GPO Abuse": 7, "Kerberoastable": 5, + "AS-REP Roastable": 5, "Shortest Paths": 6, "Password Never Expires": 4, + "Password Not Required": 8, "Shadow Credentials": 8, "GPO Content": 7, + "Constrained Delegation": 7, "LAPS": 6, "Owned Paths": 9, + "Arbitrary Paths": 6, "Trust Abuse": 7, "Deep Group Nesting": 6, } -global_findings = [] # List of (score, category, details) for prioritization +global_findings = [] def add_finding(category, details, score=None): if score is None: score = SEVERITY_SCORES.get(category, 5) @@ -52,17 +46,17 @@ def print_prioritized_findings(): table.add_column("Severity Score", style="red", justify="right") table.add_column("Category", style="cyan") table.add_column("Details", style="yellow") - for score, cat, det in sorted_findings[:20]: # Top 20 + for score, cat, det in sorted_findings[:20]: table.add_row(str(score), cat, det) console.print(table) if len(sorted_findings) > 20: console.print(f"[dim]... and {len(sorted_findings) - 20} more[/dim]") + # ──────────────────────────────────────────────── # Intro Banner # ──────────────────────────────────────────────── def print_intro_banner(mode_str): console.rule(f"[bold magenta]BloodBash v{__version__} - SharpHound Offline Analyzer[/bold magenta]", style="magenta") - console.print(Panel( f""" @@ -80,26 +74,6 @@ def print_intro_banner(mode_str): Parses SharpHound JSON files → finds AD attack paths & misconfigurations -What it shows: -- High-value targets identification -- ADCS vulnerabilities (ESC1–ESC8) -- Dangerous permissions (GenericAll, Owns, etc.) -- DCSync / replication rights -- RBCD (Resource-Based Constrained Delegation) -- Kerberoastable accounts -- AS-REP roastable accounts -- SID history abuse -- Unconstrained delegation -- Password in description detection -- Sessions and local admin summaries -- GPO abuse -- Shortest paths to high-value targets -- Users with 'Password Never Expires' -- Export to Markdown -- Export to HTML (with XSS protection) - -Abuse suggestions: Shown once per vulnerable category (when found) -Common tools: Certipy, Impacket, Rubeus, Mimikatz, SharpGPOAbuse, etc. Mode: [cyan]{mode_str}[/cyan] For authorized security testing / red teaming only. Use --help for all options. @@ -115,23 +89,17 @@ Use --help for all options. console.print(" [cyan]Cyan[/cyan] = Object names, targets, templates, types, counts") console.print(" [magenta]Magenta[/magenta] = Section headers & dividers only") console.print(" [dim]Dim[/dim] = Minor notes or empty results\n") + # ──────────────────────────────────────────────── -# Type Mapping (SharpHound v6+) +# Type Mapping # ──────────────────────────────────────────────── TYPE_FROM_META = { - "users": "User", - "computers": "Computer", - "groups": "Group", - "gpos": "GPO", - "ous": "OU", - "domains": "Domain", - "containers": "Container", - "certtemplates": "Certificate Template", - "enterprisecas": "Enterprise CA", - "rootcas": "Root CA", - "aiacas": "AIA CA", - "ntauthstores": "NTAuth Store", + "users": "User", "computers": "Computer", "groups": "Group", "gpos": "GPO", + "ous": "OU", "domains": "Domain", "containers": "Container", + "certtemplates": "Certificate Template", "enterprisecas": "Enterprise CA", + "rootcas": "Root CA", "aiacas": "AIA CA", "ntauthstores": "NTAuth Store", } + # ──────────────────────────────────────────────── # Abuse Suggestions Helper # ──────────────────────────────────────────────── @@ -142,91 +110,48 @@ def print_abuse_panel(vuln_type: str): if vuln_type == "ESC1-ESC8 (AD CS)": content = """ [bold red]Impact:[/bold red] Certificate-based privilege escalation (ESC1–ESC8) → impersonate users (often admins/DA), relay attacks, or obtain high-value certificates. -Common tools: Certipy[](https://github.com/ly4k/Certipy), ntlmrelayx.py (Impacket) -ESC1/ESC2/ESC6 examples: -certipy req -u lowpriv@domain.local -p 'Password!' -ca 'DOMAIN-CA' \\ - -template VulnerableTemplate -upn administrator@domain.local -ESC8 (NTAuth relay): -ntlmrelayx.py -t https://ca.domain.local/certsrv/certfnsh.asp --adcs --template VulnerableTemplate -smb2support -Warning: Requires network access to CA/DC + low-priv creds. Authorized testing only. +Common tools: Certipy, ntlmrelayx.py (Impacket) """ elif vuln_type == "DCSync": content = """ [bold red]Impact:[/bold red] Dump NTDS hashes (krbtgt, admins, etc.) → Golden Ticket, pass-the-hash, domain compromise. Tools: Mimikatz or Impacket secretsdump -Mimikatz: -lsadump::dcsync /domain:domain.local /user:krbtgt -lsadump::dcsync /domain:domain.local /all /csv -Impacket: -secretsdump.py domain/user:password@dc-ip -just-dc -secretsdump.py -just-dc-ntds domain/user:password@dc-ip """ elif vuln_type == "GPO Abuse": content = """ [bold yellow]Impact:[/bold yellow] Modify GPO → deploy malicious scheduled tasks/scripts → code execution / priv esc on affected machines. Tools: SharpGPOAbuse, pyGPOAbuse, PowerView -SharpGPOAbuse example: -SharpGPOAbuse.exe --gponame "VulnerableGPO" --computer "target$" --taskname "EvilTask" --task "powershell.exe -c evil.ps1" -PowerView: -New-GPOImmediateTask -TaskName="Backdoor" -Command="powershell.exe ..." """ elif vuln_type == "Dangerous Permissions": content = """ [bold red]Impact:[/bold red] Varies by right — ResetPassword → account takeover; GenericAll → full control; WriteDacl → own object. -Common abuses: -- Reset/ForceChangePassword: - net user target NewPass123! /domain -- GenericAll on user/group: - Add yourself or reset password -- WriteDacl: - Grant self GenericAll then proceed -Warning: Requires control of the principal with the dangerous right. """ elif vuln_type == "Kerberoastable": content = """ [bold yellow]Impact:[/bold yellow] Request TGS → offline crack weak service account password. Tool: Impacket -GetUserSPNs.py -request -outputfile hashes.txt domain/user:password@domain.local -Crack: -hashcat -m 13100 hashes.txt wordlist.txt """ elif vuln_type == "AS-REP Roastable": content = """ [bold yellow]Impact:[/bold yellow] Request AS-REP without preauth → offline crack user hash. Tools: Rubeus or Impacket -Rubeus: -Rubeus.exe asreproast /format:hashcat /outfile:hashes.txt -Impacket: -GetNPUsers.py domain/ -usersfile users.txt -format hashcat -dc-ip dc-ip -Crack (mode 18200): -hashcat -m 18200 hashes.asreproast wordlist.txt """ elif vuln_type == "RBCD": content = """ [bold red]Impact:[/bold red] Resource-Based Constrained Delegation → S4U2Self/S4U2Proxy impersonation. Tool: Impacket rbcd.py -rbcd.py -delegate-from 'attacker$' -delegate-to 'target$' \\ - -action write -dc-ip dc-ip domain/user:password -Then use getST.py or Rubeus s4u. """ elif vuln_type == "SID History Abuse": content = """ [bold yellow]Impact:[/bold yellow] If a user has SID history from a privileged group, they may retain rights. -Tool: Check with PowerView or manually verify. -Abuse: Use Mimikatz to extract SID history and impersonate. """ elif vuln_type == "Unconstrained Delegation": content = """ -[yellow]Impact:[/yellow] Computers with unconstrained delegation can impersonate any user who authenticates to them (e.g., via Kerberos tickets). -Abuse: Relay authentication to the computer, then extract TGTs to impersonate users/domain admins. -Tools: Rubeus (tgtdeleg), KrbRelay, or Impacket. -Mitigation: Enable constrained delegation or remove from domain. +[yellow]Impact:[/yellow] Computers with unconstrained delegation can impersonate any user who authenticates to them. """ elif vuln_type == "Password in Description": content = """ [yellow]Impact:[/yellow] Users with passwords stored in plain text in their AD description field can be exploited for credential theft. -Abuse: Extract password from description and use for lateral movement or privilege escalation. -Mitigation: Audit and remove sensitive info from descriptions; enforce policies. """ if content: console.print(Panel(content, title=title, border_style=border)) @@ -234,34 +159,49 @@ Mitigation: Audit and remove sensitive info from descriptions; enforce policies. console.print(f"[dim]No abuse example defined for {vuln_type}[/dim]") # ──────────────────────────────────────────────── -# Loading & Graph Building +# FIXED Loading (handles real SharpHound files) # ──────────────────────────────────────────────── -def load_json_dir(directory): +def load_json_dir(directory, debug=False): nodes = {} try: files = [f for f in os.listdir(directory) if f.lower().endswith('.json')] except FileNotFoundError: console.print(f"[yellow]Warning: Directory '{directory}' not found. Skipping.[/yellow]") - return nodes # Return empty dict to avoid crash + return nodes with Progress() as progress: task = progress.add_task("[cyan]Loading JSON files...", total=len(files)) for filename in files: path = os.path.join(directory, filename) + if debug: + console.print(f"[blue]DEBUG: Loading file: {filename}[/blue]") try: with open(path, 'r', encoding='utf-8-sig') as f: raw = json.load(f) + if debug: + console.print(f"[blue]DEBUG: Top-level keys: {list(raw.keys())}[/blue]") meta_type = raw.get("meta", {}).get("type", "").lower() obj_type = TYPE_FROM_META.get(meta_type, "Unknown") - data = raw.get('data') or raw.get('Results') or raw.get('objects') or raw + # Handle real SharpHound format: data under plural key (users, computers, etc.) + data = None + if meta_type and meta_type in raw and isinstance(raw[meta_type], list): + data = raw[meta_type] + if debug: + console.print(f"[blue]DEBUG: Using collection key '{meta_type}' (list of {len(data)} items)[/blue]") + else: + data = raw.get('data') or raw.get('Results') or raw.get('objects') or raw + if debug: + console.print(f"[blue]DEBUG: Fallback data source used[/blue]") if not isinstance(data, list): - data = [data] + data = [data] if data and isinstance(data, dict) else [] + added = 0 for item in data: - oid = item.get('ObjectIdentifier') + if not isinstance(item, dict): + continue + oid = item.get('ObjectIdentifier') or item.get('objectid') or item.get('ObjectId') if not oid: - # If no ObjectIdentifier, it's likely a relationship-only file - # Don't add as node, but keep the data for later edge processing if 'start' in item and 'end' in item: - nodes[f"rel_{len(nodes)}"] = item # temporary key + nodes[f"rel_{len(nodes)}"] = item + added += 1 continue if 'ObjectType' not in item and 'Type' not in item: item['ObjectType'] = obj_type @@ -269,17 +209,24 @@ def load_json_dir(directory): if 'type' in props and props['type']: item['ObjectType'] = props['type'] nodes[oid] = item + added += 1 + if debug: + console.print(f"[blue]DEBUG: {filename} → {added} objects added[/blue]") except Exception as e: console.print(f"[yellow]Warning: Failed to parse {filename}: {e}[/yellow]") + if debug: + console.print(f"[red]DEBUG: Full traceback for {filename}:[/red]\n{traceback.format_exc()}") progress.advance(task) console.print(f"[green]✓ Loaded {len(nodes)} objects from {len(files)} files[/green]") return nodes -def build_graph(nodes, db_path=None): +def build_graph(nodes, db_path=None, debug=False): G = nx.MultiDiGraph() name_to_oid = {} relationship_edges = [] placeholder_counter = 0 + if debug: + console.print(f"[blue]DEBUG: Starting graph build with {len(nodes)} raw nodes[/blue]") with tqdm(total=len(nodes), desc="Building graph", unit="node") as pbar: for oid, node in nodes.items(): props = node.get('Properties', {}).copy() @@ -296,10 +243,8 @@ def build_graph(nodes, db_path=None): if not oid.startswith('rel_'): G.add_node(oid, name=name, type=obj_type, props=props) name_to_oid[name_norm] = oid - # Handle temp rel nodes directly if 'start' in node and 'end' in node and 'label' in node: relationship_edges.append((node['start'], node['end'], node['label'])) - # Existing data_items for regular nodes data_items = node.get('data', []) if 'data' in node else [] if not isinstance(data_items, list): data_items = [data_items] if data_items else [] @@ -324,6 +269,8 @@ def build_graph(nodes, db_path=None): if principal and right and principal in nodes: G.add_edge(principal, oid, label=right) pbar.update(1) + if debug: + console.print(f"[blue]DEBUG: Main graph build complete - {G.number_of_nodes()} nodes, {G.number_of_edges()} edges[/blue]") console.print("[cyan]Processing standalone relationships...[/cyan]") added = 0 placeholders_added = 0 @@ -341,7 +288,6 @@ def build_graph(nodes, db_path=None): G.add_node(start_oid, name=start, type='Unknown', props={}) name_to_oid[start_norm] = start_oid placeholders_added += 1 - console.print(f"[dim]Created placeholder node for start: {start} (OID: {start_oid})[/dim]") end_oid = None if end in G.nodes: end_oid = end @@ -353,19 +299,17 @@ def build_graph(nodes, db_path=None): G.add_node(end_oid, name=end, type='Unknown', props={}) name_to_oid[end_norm] = end_oid placeholders_added += 1 - console.print(f"[dim]Created placeholder node for end: {end} (OID: {end_oid})[/dim]") if start_oid and end_oid: G.add_edge(start_oid, end_oid, label=label) added += 1 - console.print(f"[dim]Added edge: {start_oid} --[{label}]--> {end_oid}[/dim]") - else: - console.print(f"[yellow]Skipped relationship {start} -> {end} ({label}): unexpected error[/yellow]") console.print(f"[green]Added {added} relationship edges ({placeholders_added} placeholder nodes created)[/green]") console.print(f"[green]✓ Graph built: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges[/green]") - # Save to DB if specified + if debug: + console.print(f"[blue]DEBUG: Final graph stats - Nodes: {G.number_of_nodes()} | Edges: {G.number_of_edges()}[/blue]") if db_path: save_graph_to_db(G, db_path) return G, name_to_oid + def save_graph_to_db(G, db_path): conn = sqlite3.connect(db_path) c = conn.cursor() @@ -378,6 +322,7 @@ def save_graph_to_db(G, db_path): conn.commit() conn.close() console.print(f"[green]Graph saved to DB: {db_path}[/green]") + def load_graph_from_db(db_path): G = nx.MultiDiGraph() name_to_oid = {} @@ -413,7 +358,7 @@ def print_verbose_summary(G, domain_filter=None): users = [d['name'] for _, d in G.nodes(data=True) if d['type'].lower() == 'user' and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)] if users: console.print(f"\n[bold cyan]Users ({len(users)}):[/bold cyan]") - for name in sorted(users)[:len(users)]: + for name in sorted(users)[:30]: console.print(f" • {name}") else: console.print("\n[yellow]No User objects found[/yellow]") @@ -422,7 +367,6 @@ def print_verbose_summary(G, domain_filter=None): # Helpers # ──────────────────────────────────────────────── def get_bool_prop_ci(props, keys, default=False): - """Case-insensitive boolean property getter.""" if not isinstance(props, dict): return default for key in keys: @@ -430,6 +374,7 @@ def get_bool_prop_ci(props, keys, default=False): if p_key.lower() == key.lower(): return bool(props[p_key]) return default + def get_high_value_targets(G, domain_filter=None): keywords = [ 'domain admins', 'enterprise admins', 'schema admins', 'administrators', @@ -445,6 +390,7 @@ def get_high_value_targets(G, domain_filter=None): if any(k in name for k in keywords) or 'ca' in typ or 'template' in typ or 'ntauth' in typ: targets.append((n, d['name'], d['type'])) return sorted(targets, key=lambda x: x[1]) + def format_path(G, path): if not path or len(path) < 1: return "[dim]Invalid path[/dim]" @@ -458,31 +404,30 @@ def format_path(G, path): parts.append(f"[bold cyan]{G.nodes[u]['name']}[/bold cyan] --[[yellow]{label}[/yellow]]-->") parts.append(f"[bold red]{G.nodes[path[-1]]['name']}[/bold red]") return " ".join(parts) + def get_indirect_paths(G, source, target, max_depth=5): - # Simple BFS to find indirect paths through groups paths = [] try: for path in nx.all_simple_paths(G, source, target, cutoff=max_depth): - if len(path) > 2: # At least one intermediate + if len(path) > 2: paths.append(path) - return paths[:5] # Limit + return paths[:5] except nx.NetworkXNoPath: return [] +# ──────────────────────────────────────────────── +# All analysis functions (unchanged) +# ──────────────────────────────────────────────── def print_password_in_descriptions(G, domain_filter=None): console.rule("[bold magenta]Passwords in User Descriptions[/bold magenta]") found = False - # Simple regex-like patterns for common password indicators (case-insensitive) - password_patterns = [ - r'password\s*:', r'pwd\s*:', r'pass\s*:', r'credentials\s*:', r'login\s*:', - r'account\s*:', r'admin\s*:', r'secret\s*:', r'key\s*:' - ] - import re # For regex matching + password_patterns = [r'password\s*:', r'pwd\s*:', r'pass\s*:', r'credentials\s*:', r'login\s*:', r'account\s*:', r'admin\s*:', r'secret\s*:', r'key\s*:'] + import re for n, d in G.nodes(data=True): if domain_filter and d.get('props', {}).get('domain') != domain_filter: continue if d['type'].lower() == 'user': - props = d.get('props') or {} # Ensure props is always a dict + props = d.get('props') or {} description = (props.get('description') or '').lower() if description: for pattern in password_patterns: @@ -490,9 +435,9 @@ def print_password_in_descriptions(G, domain_filter=None): found = True console.print(f"[yellow]Potential password in description[/yellow]: [green]{d['name']}[/green] - '{props.get('description')}'") add_finding("Password in Description", f"User {d['name']} has potential password in description", score=6) - break # Avoid duplicate alerts per user + break if found: - print_abuse_panel("Password in Description") # Reuse or add panel + print_abuse_panel("Password in Description") else: console.print("[green]No passwords detected in user descriptions[/green]") @@ -510,13 +455,7 @@ def print_password_never_expires(G, domain_filter=None): console.print(f"[yellow]Password Never Expires enabled[/yellow]: [green]{d['name']}[/green]") add_finding("Password Never Expires", f"User {d['name']} has 'Password Never Expires' set") if found: - console.print(Panel( - "[bold yellow]Impact:[/bold yellow] Passwords may never expire, leading to old/weak passwords persisting indefinitely.\n" - "[bold]Mitigation:[/bold] Review and enforce password policies; consider resetting passwords for affected accounts.\n" - "[bold]Tools:[/bold] Use PowerShell (Get-ADUser) or AD tools to audit.", - title="Abuse Suggestions: Password Never Expires", - border_style="yellow" - )) + console.print(Panel("[bold yellow]Impact:[/bold yellow] Passwords may never expire, leading to old/weak passwords persisting indefinitely.\n[bold]Mitigation:[/bold] Review and enforce password policies; consider resetting passwords for affected accounts.\n[bold]Tools:[/bold] Use PowerShell (Get-ADUser) or AD tools to audit.", title="Abuse Suggestions: Password Never Expires", border_style="yellow")) else: console.print("[green]No users with 'Password Never Expires' found[/green]") @@ -534,14 +473,7 @@ def print_password_not_required(G, domain_filter=None): console.print(f"[red]Password Not Required enabled[/red]: [green]{d['name']}[/green]") add_finding("Password Not Required", f"User {d['name']} has 'Password Not Required' set") if found: - console.print(Panel( - "[bold red]Impact:[/bold red] No password required for login, enabling easy account takeover or unauthorized access.\n" - "[bold]Abuse:[/bold] Log in without a password; escalate privileges if account has rights.\n" - "[bold]Mitigation:[/bold] Enforce passwords; disable or monitor such accounts.\n" - "[bold]Tools:[/bold] ADUC, PowerShell, or BloodHound for auditing.", - title="Abuse Suggestions: Password Not Required", - border_style="red" - )) + console.print(Panel("[bold red]Impact:[/bold red] No password required for login, enabling easy account takeover or unauthorized access.\n[bold]Abuse:[/bold] Log in without a password; escalate privileges if account has rights.\n[bold]Mitigation:[/bold] Enforce passwords; disable or monitor such accounts.\n[bold]Tools:[/bold] ADUC, PowerShell, or BloodHound for auditing.", title="Abuse Suggestions: Password Not Required", border_style="red")) else: console.print("[green]No users with 'Password Not Required' found[/green]") @@ -559,65 +491,84 @@ def print_shadow_credentials(G, domain_filter=None): console.print(f"[red]Shadow Credentials detected[/red]: [green]{d['name']}[/green]") add_finding("Shadow Credentials", f"User {d['name']} has Shadow Credentials configured") if found: - console.print(Panel( - "[bold red]Impact:[/bold red] Allows adding keys for authentication without changing password → persistent access.\n" - "[bold]Abuse:[/bold] Use tools like Whisker or DSInternals to add keys and impersonate.\n" - "[bold]Mitigation:[/bold] Disable KeyCredentialLink or monitor for changes.\n" - "[bold]Tools:[/bold] Whisker (https://github.com/eladshamir/Whisker), DSInternals.", - title="Abuse Suggestions: Shadow Credentials", - border_style="red" - )) + console.print(Panel("[bold red]Impact:[/bold red] Allows adding keys for authentication without changing password → persistent access.\n[bold]Abuse:[/bold] Use tools like Whisker or DSInternals to add keys and impersonate.\n[bold]Mitigation:[/bold] Disable KeyCredentialLink or monitor for changes.\n[bold]Tools:[/bold] Whisker[](https://github.com/eladshamir/Whisker), DSInternals.", title="Abuse Suggestions: Shadow Credentials", border_style="red")) else: console.print("[green]No accounts with Shadow Credentials found[/green]") def print_gpo_content_parsing(G, domain_filter=None): console.rule("[bold magenta]GPO Content Parsing for Exploitable Settings[/bold magenta]") - - found = False exploitable_keys = ['taskname', 'scriptpath', 'scheduledtask', 'TaskName', 'ScriptPath', 'ScheduledTask'] - for n, d in G.nodes(data=True): if domain_filter and d.get('props', {}).get('domain') != domain_filter: continue - if d.get('type', '').lower() != 'gpo': continue - name = d.get('name') or d.get('ObjectIdentifier', 'Unnamed GPO') props = d.get('props') or {} - lower_props = {k.lower(): v for k, v in props.items()} - found_keys = [k for k in exploitable_keys if k.lower() in lower_props and lower_props[k.lower()]] - - console.print(f"[dim]→ GPO {name!r} | found_keys = {found_keys}[/dim]") - if found_keys: found = True console.print(f"[yellow]Exploitable GPO content detected[/yellow]: [bold cyan]{name}[/bold cyan]") - for key in exploitable_keys: if key.lower() in lower_props: value = props.get(key) or lower_props.get(key.lower()) console.print(f" → [cyan]{key}[/cyan]: {value}") - - # ─────────────────────────────────────────────────────────────── - console.print("[bold yellow]>>> About to call add_finding <<< [/bold yellow]") - try: - detail = f"GPO '{name}' has exploitable content: {', '.join(found_keys)}" - add_finding("GPO Content", detail) - console.print("[bold green]>>> add_finding SUCCESSFULLY CALLED <<< [/bold green]") - except Exception as e: - console.print("[bold red]>>> ERROR calling add_finding: [/bold red]", str(e)) - # ─────────────────────────────────────────────────────────────── - + detail = f"GPO '{name}' has exploitable content: {', '.join(found_keys)}" + add_finding("GPO Content", detail) if found: console.print("[yellow]At least one exploitable GPO found[/yellow]") - # print_abuse_panel("GPO Content") # ← comment out if noisy else: console.print("[green]No exploitable GPO content found[/green]") +def print_gpo_content_analysis(G, gpo_content_dir: str, domain_filter=None): + console.rule("[bold magenta]GPO Content Analysis – Scheduled Tasks / Scripts / cPassword[/bold magenta]") + if not gpo_content_dir or not Path(gpo_content_dir).is_dir(): + console.print("[yellow]--gpo-content-dir not provided or invalid. Skipping XML analysis.[/yellow]") + return + found_exploitable = False + gpo_name_to_oid = {} + for nid, ndata in G.nodes(data=True): + if ndata.get('type', '').lower() == 'gpo': + name = (ndata['name'].split('@')[0] or '').strip().upper() + gpo_name_to_oid[name] = nid + xml_files = list(Path(gpo_content_dir).rglob("*.xml")) + console.print(f"[cyan]Found {len(xml_files)} GPO XML report(s) to analyze[/cyan]") + for xml_path in tqdm(xml_files, desc="Parsing GPO XMLs"): + try: + tree = ET.parse(xml_path) + root = tree.getroot() + gpo_name_elem = root.find(".//GPO/Name") or root.find(".//Name") + gpo_name = (gpo_name_elem.text or Path(xml_path).stem).strip().upper() if gpo_name_elem is not None else Path(xml_path).stem.upper() + for task in root.findall(".//ScheduledTasks/Task"): + name = task.findtext("Name") or "UnnamedTask" + command = task.findtext("Command") or "" + arguments = task.findtext("Arguments") or "" + if command or arguments: + found_exploitable = True + console.print(f"[yellow]Exploitable Scheduled Task[/yellow] in [bold cyan]{gpo_name}[/bold cyan]: {name}") + console.print(f" → Command: [green]{command} {arguments}[/green]") + add_finding("GPO Content", f"Scheduled Task '{name}' in {gpo_name}", score=8) + for script in root.findall(".//Scripts/Script"): + cmd = script.findtext("Command") or "" + if cmd: + found_exploitable = True + console.print(f"[yellow]Exploitable Script[/yellow] in [bold cyan]{gpo_name}[/bold cyan]: {cmd}") + add_finding("GPO Content", f"Script '{cmd}' in {gpo_name}", score=8) + for cpass in root.findall(".//Properties[@cpassword]"): + found_exploitable = True + console.print(f"[red]GPP cPassword found![/red] in [bold cyan]{gpo_name}[/bold cyan] → decrypt with gpp-decrypt") + add_finding("GPO Content", f"GPP cPassword in {gpo_name}", score=10) + except ET.ParseError as e: + console.print(f"[yellow]Warning: Could not parse {xml_path}: {e}[/yellow]") + except Exception as e: + console.print(f"[yellow]Error processing {xml_path}: {e}[/yellow]") + if found_exploitable: + print_abuse_panel("GPO Abuse") + else: + console.print("[green]No exploitable scheduled tasks, scripts, or cPasswords found in GPO XMLs[/green]") + def print_constrained_delegation(G, domain_filter=None): console.rule("[bold magenta]Constrained Delegation Detection[/bold magenta]") found = False @@ -635,14 +586,7 @@ def print_constrained_delegation(G, domain_filter=None): console.print(f" → Allowed to delegate to: {', '.join(allowed_to_delegate_to)}") add_finding("Constrained Delegation", f"Computer {d['name']} has Constrained Delegation") if found: - console.print(Panel( - "[bold yellow]Impact:[/bold yellow] Allows impersonation of users to specific services.\n" - "[bold]Abuse:[/bold] Use S4U2Self/S4U2Proxy to relay authentication.\n" - "[bold]Mitigation:[/bold] Limit delegation targets; use resource-based constraints.\n" - "[bold]Tools:[/bold] Rubeus, Impacket.", - title="Abuse Suggestions: Constrained Delegation", - border_style="yellow" - )) + console.print(Panel("[bold yellow]Impact:[/bold yellow] Allows impersonation of users to specific services.\n[bold]Abuse:[/bold] Use S4U2Self/S4U2Proxy to relay authentication.\n[bold]Mitigation:[/bold] Limit delegation targets; use resource-based constraints.\n[bold]Tools:[/bold] Rubeus, Impacket.", title="Abuse Suggestions: Constrained Delegation", border_style="yellow")) else: console.print("[green]No Constrained Delegation found[/green]") @@ -665,25 +609,10 @@ def print_laps_status(G, domain_filter=None): console.print(f"[yellow]LAPS not enabled[/yellow]: [bold cyan]{d['name']}[/bold cyan]") add_finding("LAPS", f"Computer {d['name']} does not have LAPS enabled") if found_enabled: - console.print(Panel( - "[bold green]Impact:[/bold green] LAPS secures local admin passwords.\n" - "[bold]Mitigation:[/bold] Ensure LAPS is enabled on all computers.\n" - "[bold]Tools:[/bold] LAPS management tools, AD queries.", - title="LAPS Enabled", - border_style="green" - )) + console.print(Panel("[bold green]Impact:[/bold green] LAPS secures local admin passwords.\n[bold]Mitigation:[/bold] Ensure LAPS is enabled on all computers.\n[bold]Tools:[/bold] LAPS management tools, AD queries.", title="LAPS Enabled", border_style="green")) if found_disabled: - console.print(Panel( - "[bold yellow]Impact:[/bold yellow] Local admin passwords may be weak or shared → easy compromise.\n" - "[bold]Mitigation:[/bold] Enable LAPS to randomize and secure passwords.\n" - "[bold]Tools:[/bold] LAPS deployment scripts.", - title="LAPS Not Enabled", - border_style="yellow" - )) + console.print(Panel("[bold yellow]Impact:[/bold yellow] Local admin passwords may be weak or shared → easy compromise.\n[bold]Mitigation:[/bold] Enable LAPS to randomize and secure passwords.\n[bold]Tools:[/bold] LAPS deployment scripts.", title="LAPS Not Enabled", border_style="yellow")) -# ──────────────────────────────────────────────── -# Unconstrained delegation -# ──────────────────────────────────────────────── def print_unconstrained_delegation(G, domain_filter=None): console.rule("[bold magenta]Unconstrained Delegation Detection[/bold magenta]") found = False @@ -698,13 +627,10 @@ def print_unconstrained_delegation(G, domain_filter=None): console.print(f"[yellow]Unconstrained delegation enabled[/yellow]: [bold cyan]{d['name']}[/bold cyan]") add_finding("Unconstrained Delegation", f"Computer {d['name']} allows unconstrained delegation", score=8) if found: - print_abuse_panel("Unconstrained Delegation") # Add this panel if desired, or reuse existing + print_abuse_panel("Unconstrained Delegation") else: console.print("[green]No unconstrained delegation found[/green]") -# ──────────────────────────────────────────────── -# SID History Abuse -# ──────────────────────────────────────────────── def print_sid_history_abuse(G, domain_filter=None): console.rule("[bold magenta]SID History Abuse[/bold magenta]") found = False @@ -714,7 +640,7 @@ def print_sid_history_abuse(G, domain_filter=None): continue if d['type'].lower() != 'user': continue - outgoing = list(G.out_edges(n, data=True)) # Check outgoing edges instead of incoming + outgoing = list(G.out_edges(n, data=True)) for u, v, edge_data in outgoing: if 'label' in edge_data and edge_data['label'].lower() == 'hassidhistory': group_name = G.nodes[v]['name'].lower() @@ -726,9 +652,7 @@ def print_sid_history_abuse(G, domain_filter=None): print_abuse_panel("SID History Abuse") else: console.print("[green]No obvious SID history abuse detected[/green]") -# ──────────────────────────────────────────────── -# ADCS ESC1–ESC8 (PATCHED VERSION) -# ──────────────────────────────────────────────── + def print_adcs_vulnerabilities(G, domain_filter=None): console.rule("[bold magenta]ADCS ESC Vulnerabilities (ESC1–ESC8)[/bold magenta]") found = False @@ -746,7 +670,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): continue name = d.get('name') or d.get('props', {}).get('name', n) props = d.get('props', {}) or {} - # Copy top-level flags to props (corrected key spelling) for key in ['EDITF_ATTRIBUTESUBJECTALTNAME2', 'EDITF_ATTRIBUTESUBJECTALTNAME2']: if key in d and key not in props: props[key] = d[key] @@ -756,11 +679,9 @@ def print_adcs_vulnerabilities(G, domain_filter=None): requires_mgr_approval = get_bool_prop(props, ['requiresmanagerapproval', 'RequiresManagerApproval'], default=False) no_approval = not requires_mgr_approval editf_san2 = get_bool_prop(props, ['editf_attributesubjectaltname2', 'EDITF_ATTRIBUTESUBJECTALTNAME2']) - # EKUs for ESC5/7 (placeholders; not in test data) ekus = props.get('ekus', []) or props.get('mspki-certificate-application-policy', []) has_cert_request_agent = '1.3.6.1.4.1.311.20.2.1' in ekus has_web_server = '1.3.6.1.5.5.7.3.1' in ekus - # ESC1/2 if obj_type == 'certificate template': if 'Enroll' in rights and enrollee_supplies and no_approval: found = True @@ -769,7 +690,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): if edge['label'] == 'Enroll': console.print(f" → [green]{G.nodes[u]['name']}[/green] can Enroll") add_finding("ESC1-ESC8", f"ESC1/2 on {name}") - # ESC3 if obj_type == 'certificate template' and no_approval: dangerous = {'GenericAll', 'WriteDacl', 'WriteOwner', 'GenericWrite'} dangerous_found = dangerous & rights @@ -780,7 +700,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): if edge['label'] in dangerous_found: console.print(f" → [green]{G.nodes[u]['name']}[/green] --[{edge['label']}]-->") add_finding("ESC1-ESC8", f"ESC3 on {name}") - # ESC4 if obj_type in ['certificate template', 'enterprise ca', 'root ca']: dangerous = {'GenericAll', 'WriteDacl', 'WriteOwner', 'GenericWrite'} dangerous_found = dangerous & rights @@ -791,7 +710,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): if edge['label'] in dangerous_found: console.print(f" → [green]{G.nodes[u]['name']}[/green] --[{edge['label']}]-->") add_finding("ESC1-ESC8", f"ESC4 on {name}") - # ESC5 (placeholder) if obj_type == 'certificate template' and has_cert_request_agent: found = True console.print(f"[red]ESC5[/red]: [bold cyan]{name}[/bold cyan] (Certificate Request Agent EKU)") @@ -799,7 +717,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): if edge['label'] == 'Enroll': console.print(f" → [green]{G.nodes[u]['name']}[/green] can Enroll") add_finding("ESC1-ESC8", f"ESC5 on {name}") - # ESC6 (fixed: checks Enroll on CA) if obj_type == 'enterprise ca' and editf_san2 and 'Enroll' in rights: found = True console.print(f"[red]ESC6[/red]: [bold cyan]{name}[/bold cyan] (EDITF_ATTRIBUTESUBJECTALTNAME2 + Enroll)") @@ -807,7 +724,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): if edge['label'] == 'Enroll': console.print(f" → [green]{G.nodes[u]['name']}[/green] can Enroll") add_finding("ESC1-ESC8", f"ESC6 on {name}") - # ESC7 (placeholder) if obj_type == 'certificate template' and has_web_server: found = True console.print(f"[red]ESC7[/red]: [bold cyan]{name}[/bold cyan] (HTTP Certificate - Web Server EKU)") @@ -815,7 +731,6 @@ def print_adcs_vulnerabilities(G, domain_filter=None): if edge['label'] == 'Enroll': console.print(f" → [green]{G.nodes[u]['name']}[/green] can Enroll") add_finding("ESC1-ESC8", f"ESC7 on {name}") - # ESC8 if obj_type == 'ntauth store' and 'GenericAll' in rights: found = True console.print(f"[red]ESC8[/red]: [bold cyan]{name}[/bold cyan] (GenericAll on NTAuth)") @@ -827,10 +742,7 @@ def print_adcs_vulnerabilities(G, domain_filter=None): print_abuse_panel("ESC1-ESC8 (AD CS)") else: console.print("[green]No obvious ESC1–ESC8 misconfigurations detected[/green]") - console.print("[yellow]Note: If no edges/rights are shown above, the relationships are not connected in the graph (likely ObjectIdentifier mismatch).[/yellow]") -# ──────────────────────────────────────────────── -# GPO Abuse -# ──────────────────────────────────────────────── + def print_gpo_abuse(G, domain_filter=None): console.rule("[bold magenta]GPO Abuse Risks[/bold magenta]") found = False @@ -838,21 +750,18 @@ def print_gpo_abuse(G, domain_filter=None): for n, d in G.nodes(data=True): if domain_filter and d.get('props', {}).get('domain') != domain_filter: continue - # Case-insensitive type check if d['type'].lower() != 'gpo': continue name = d['name'] incoming = list(G.in_edges(n, data=True)) - rights = {edge_data['label'].lower() for _, _, edge_data in incoming} # Case-insensitive rights + rights = {edge_data['label'].lower() for _, _, edge_data in incoming} dangerous = {'genericall', 'writedacl', 'writeowner', 'genericwrite'} dangerous_found = dangerous & rights if dangerous_found: - # Check for high-risk scope: Look for GPO links to high-value OUs is_high_risk = False linked_ous = [] - # Traverse outgoing edges for links (e.g., 'GpLink') for _, target, edge_data in G.out_edges(n, data=True): - if edge_data.get('label', '').lower() in ['gplink', 'linkedto']: # Adjust if needed + if edge_data.get('label', '').lower() in ['gplink', 'linkedto']: ou_name = G.nodes[target].get('name', '').lower() linked_ous.append(ou_name) if any(kw in ou_name for kw in high_value_keywords): @@ -871,9 +780,7 @@ def print_gpo_abuse(G, domain_filter=None): print_abuse_panel("GPO Abuse") else: console.print("[green]No dangerous GPO rights found[/green]") -# ──────────────────────────────────────────────── -# DCSync / Replication Rights -# ──────────────────────────────────────────────── + def print_dcsync_rights(G, domain_filter=None): console.rule("[bold magenta]DCSync / Replication Rights[/bold magenta]") found = False @@ -881,7 +788,7 @@ def print_dcsync_rights(G, domain_filter=None): if not domain_oids: console.print("[yellow]No domain objects found[/yellow]") return - dangerous_rights = {'getchangesall', 'replicating directory changes all', 'replicating directory changes in filtered set'} # Case-insensitive + dangerous_rights = {'getchangesall', 'replicating directory changes all', 'replicating directory changes in filtered set'} for domain_oid in domain_oids: domain_name = G.nodes[domain_oid]['name'] incoming = G.in_edges(domain_oid, data=True) @@ -896,9 +803,7 @@ def print_dcsync_rights(G, domain_filter=None): print_abuse_panel("DCSync") else: console.print("[green]No DCSync rights detected[/green]") -# ──────────────────────────────────────────────── -# RBCD -# ──────────────────────────────────────────────── + def print_rbcd(G, domain_filter=None): console.rule("[bold magenta]Resource-Based Constrained Delegation (RBCD)[/bold magenta]") found = False @@ -921,15 +826,13 @@ def print_rbcd(G, domain_filter=None): print_abuse_panel("RBCD") else: console.print("[green]No RBCD configured computers found[/green]") -# ──────────────────────────────────────────────── -# Shortest Paths -# ──────────────────────────────────────────────── + def print_shortest_paths(G, fast=False, max_paths=10, target_filter=None, domain_filter=None, indirect=False): console.rule("[bold magenta]Shortest Paths to High-Value Targets[/bold magenta]") - users = [n for n, d in G.nodes(data=True) if d['type'].lower() == 'user' and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)] # Case-insensitive + users = [n for n, d in G.nodes(data=True) if d['type'].lower() == 'user' and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)] targets = get_high_value_targets(G, domain_filter) if target_filter: - targets = [t for t in targets if target_filter.lower() in t[1].lower()] # Partial, case-insensitive match + targets = [t for t in targets if target_filter.lower() in t[1].lower()] if not targets: console.print("[yellow]No high-value targets found (or none match filter)[/yellow]") return @@ -939,7 +842,7 @@ def print_shortest_paths(G, fast=False, max_paths=10, target_filter=None, domain if fast: console.print("[yellow]Fast mode enabled: Skipping shortest path computation[/yellow]") return - for tid, tname, ttype in targets[:5]: # Limit targets to 5 for brevity + for tid, tname, ttype in targets[:5]: console.print(f"\n[bold]Target:[/bold] [bold cyan]{tname}[/bold cyan] ({ttype})") count = 0 for source in users: @@ -947,15 +850,14 @@ def print_shortest_paths(G, fast=False, max_paths=10, target_filter=None, domain continue try: path = nx.shortest_path(G, source, tid) - path_length = len(path) - 1 # Edges in path + path_length = len(path) - 1 formatted_path = format_path(G, path) console.print(f" [dim]→[/dim] (Length: {path_length}) {formatted_path}") count += 1 if count >= max_paths: break except nx.NetworkXNoPath: - continue # Skip silently; handled below - # Indirect paths if requested + continue if indirect: console.print(f" [dim]Indirect paths (via groups):[/dim]") indirect_count = 0 @@ -970,23 +872,16 @@ def print_shortest_paths(G, fast=False, max_paths=10, target_filter=None, domain if indirect_count >= max_paths: break if count == 0 and not indirect: - # Check for connectivity issues connected_users = [u for u in users if nx.has_path(G, u, tid)] if not connected_users: console.print(" [dim]No paths found: Target may be disconnected from users[/dim]") else: console.print(" [dim]No paths found within limit[/dim]") add_finding("Shortest Paths", f"Paths to {tname}", score=6 if count > 0 else 0) -# ──────────────────────────────────────────────── -# Dangerous Permissions -# ──────────────────────────────────────────────── + def print_dangerous_permissions(G, domain_filter=None, indirect=False): console.rule("[bold magenta]Dangerous Permissions on High-Value Objects[/bold magenta]") - dangerous_rights = { - 'genericall', 'owns', 'writedacl', 'writeowner', 'allextendedrights', - 'genericwrite', 'addmember', 'resetpassword', 'forcechangepassword', - 'manageca', 'managecertificates', 'enroll', 'certificateenroll', 'writeproperty' - } # Lowercase for case-insensitive matching + dangerous_rights = {'genericall', 'owns', 'writedacl', 'writeowner', 'allextendedrights', 'genericwrite', 'addmember', 'resetpassword', 'forcechangepassword', 'manageca', 'managecertificates', 'enroll', 'certificateenroll', 'writeproperty'} targets = get_high_value_targets(G, domain_filter) found = False if not targets: @@ -994,34 +889,28 @@ def print_dangerous_permissions(G, domain_filter=None, indirect=False): return for tid, tname, ttype in targets: incoming = G.in_edges(tid, data=True) - dangerous_edges = [ - (u, d['label']) for u, v, d in incoming if 'label' in d and d['label'].lower() in dangerous_rights and u in G.nodes - ] + dangerous_edges = [(u, d['label']) for u, v, d in incoming if 'label' in d and d['label'].lower() in dangerous_rights and u in G.nodes] if dangerous_edges: found = True console.print(f"\n[bold cyan]{tname} ({ttype}):[/bold cyan]") - # Group by right type for clarity from collections import defaultdict rights_by_type = defaultdict(list) for principal_oid, right in dangerous_edges: rights_by_type[right].append(principal_oid) for right, principals in rights_by_type.items(): - principal_names = [G.nodes[p]['name'] for p in principals[:5]] # Limit to top 5 + principal_names = [G.nodes[p]['name'] for p in principals[:5]] count = len(principals) extra = f" ... and {count - 5} more" if count > 5 else "" console.print(f" • [yellow]{right}[/yellow]: [green]{', '.join(principal_names)}{extra}[/green]") console.print(f" [dim](Note: Only direct rights shown; indirect via groups not included)[/dim]") add_finding("Dangerous Permissions", f"Dangerous rights on {tname}") - # Indirect via groups if requested if indirect: console.print(f"\n[dim]Checking indirect dangerous permissions via groups...[/dim]") for tid, tname, ttype in targets: - # Find groups that have dangerous rights, then users in those groups for u, v, d in G.edges(data=True): if v == tid and 'label' in d and d['label'].lower() in dangerous_rights: group_name = G.nodes[u]['name'] if G.nodes[u]['type'].lower() == 'group': - # Safe check for 'MemberOf' label in MultiDiGraph edge data members = [m for m in G.predecessors(u) if any(edge_data.get('label') == 'MemberOf' for edge_data in (G.get_edge_data(m, u) or {}).values())] if members: console.print(f" [yellow]Indirect via group {group_name}[/yellow]: {', '.join([G.nodes[m]['name'] for m in members[:3]])}") @@ -1029,21 +918,18 @@ def print_dangerous_permissions(G, domain_filter=None, indirect=False): print_abuse_panel("Dangerous Permissions") else: console.print("[green]No dangerous ACLs found on high-value objects[/green]") -# ──────────────────────────────────────────────── -# Kerberoastable & AS-REP Roastable -# ──────────────────────────────────────────────── + def print_kerberoastable(G, domain_filter=None): console.rule("[bold magenta]Kerberoastable Accounts[/bold magenta]") found = False count = 0 - max_display = 20 # Limit output to avoid overwhelm + max_display = 20 for n, d in G.nodes(data=True): if domain_filter and d.get('props', {}).get('domain') != domain_filter: continue - if d['type'].lower() != 'user': # Case-insensitive type check + if d['type'].lower() != 'user': continue props = d.get('props', {}) - # Case-insensitive property checks with variants hasspn = get_bool_prop_ci(props, ['hasspn', 'hasSPN', 'has_spn']) sensitive = props.get('sensitive', props.get('Sensitive', False)) enabled = props.get('enabled', props.get('Enabled', True)) @@ -1052,10 +938,7 @@ def print_kerberoastable(G, domain_filter=None): console.print(f" • [cyan]{d['name']}[/cyan]") count += 1 if count >= max_display: - remaining = sum(1 for n_inner, d_inner in G.nodes(data=True) if d_inner.get('type', '').lower() == 'user' and - d_inner.get('props', {}).get('hasspn', d_inner.get('props', {}).get('hasSPN', d_inner.get('props', {}).get('has_spn', False))) and - not d_inner.get('props', {}).get('sensitive', d_inner.get('props', {}).get('Sensitive', False)) and - d_inner.get('props', {}).get('enabled', d_inner.get('props', {}).get('Enabled', True))) - max_display + remaining = sum(1 for n_inner, d_inner in G.nodes(data=True) if d_inner.get('type', '').lower() == 'user' and get_bool_prop_ci(d_inner.get('props', {}), ['hasspn', 'hasSPN', 'has_spn']) and not d_inner.get('props', {}).get('sensitive', d_inner.get('props', {}).get('Sensitive', False)) and d_inner.get('props', {}).get('enabled', d_inner.get('props', {}).get('Enabled', True))) - max_display if remaining > 0: console.print(f" [dim]... and {remaining} more[/dim]") break @@ -1065,19 +948,17 @@ def print_kerberoastable(G, domain_filter=None): else: console.print("[green]None found[/green]") - def print_as_rep_roastable(G, domain_filter=None): console.rule("[bold magenta]AS-REP Roastable Accounts (DONT_REQ_PREAUTH)[/bold magenta]") found = False count = 0 - max_display = 20 # Limit output to avoid overwhelm + max_display = 20 for n, d in G.nodes(data=True): if domain_filter and d.get('props', {}).get('domain') != domain_filter: continue - if d['type'].lower() != 'user': # Case-insensitive type check + if d['type'].lower() != 'user': continue props = d.get('props', {}) - # Case-insensitive property checks with variants dontreqpreauth = get_bool_prop_ci(props, ['dontreqpreauth', 'dontReqPreauth', 'dont_req_preauth']) sensitive = props.get('sensitive', props.get('Sensitive', False)) enabled = props.get('enabled', props.get('Enabled', True)) @@ -1086,10 +967,7 @@ def print_as_rep_roastable(G, domain_filter=None): console.print(f" • [cyan]{d['name']}[/cyan]") count += 1 if count >= max_display: - remaining = sum(1 for n_inner, d_inner in G.nodes(data=True) if d_inner.get('type', '').lower() == 'user' and - d_inner.get('props', {}).get('dontreqpreauth', d_inner.get('props', {}).get('dontReqPreauth', d_inner.get('props', {}).get('dont_req_preauth', False))) and - not d_inner.get('props', {}).get('sensitive', d_inner.get('props', {}).get('Sensitive', False)) and - d_inner.get('props', {}).get('enabled', d_inner.get('props', {}).get('Enabled', True))) - max_display + remaining = sum(1 for n_inner, d_inner in G.nodes(data=True) if d_inner.get('type', '').lower() == 'user' and get_bool_prop_ci(d_inner.get('props', {}), ['dontreqpreauth', 'dontReqPreauth', 'dont_req_preauth']) and not d_inner.get('props', {}).get('sensitive', d_inner.get('props', {}).get('Sensitive', False)) and d_inner.get('props', {}).get('enabled', d_inner.get('props', {}).get('Enabled', True))) - max_display if remaining > 0: console.print(f" [dim]... and {remaining} more[/dim]") break @@ -1099,32 +977,199 @@ def print_as_rep_roastable(G, domain_filter=None): else: console.print("[green]None found[/green]") -# ──────────────────────────────────────────────── -# Sessions / LocalAdmin -# ──────────────────────────────────────────────── def print_sessions_localadmin(G, domain_filter=None): - console.rule("[bold magenta]Session / LocalAdmin Summary[/bold magenta]") - computers = [n for n, d in G.nodes(data=True) if d['type'].lower() == 'computer' and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)] # Case-insensitive type check + console.rule("[bold magenta]Session / LocalAdmin / RDP / DCOM Summary[/bold magenta]") + computers = [n for n, d in G.nodes(data=True) if d['type'].lower() == 'computer' and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)] if not computers: - console.print("[yellow]No computers with session data[/yellow]") + console.print("[yellow]No computers found[/yellow]") return - table = Table(title="Top Local Admins", show_header=True, header_style="bold magenta") + table = Table(title="Top Local Admins / RDP / DCOM", show_header=True, header_style="bold magenta") table.add_column("Principal", style="cyan") + table.add_column("Rights", justify="right") table.add_column("Count", justify="right") table.add_column("Examples", style="green") - from collections import Counter - # Filter edges: label == 'LocalAdmin' and target (v) is a Computer - admin_edges = [(u, v, d) for u, v, d in G.edges(data=True) if d.get('label') == 'LocalAdmin' and v in computers and u in G.nodes and v in G.nodes] - counts = Counter(u for u, _, _ in admin_edges) - if not counts: - console.print("[green]No LocalAdmin rights found[/green]") + from collections import defaultdict, Counter + rights = ['LocalAdmin', 'CanRDP', 'ExecuteDCOM', 'GenericAll'] + counts = defaultdict(Counter) + for u, v, d in G.edges(data=True): + if v in computers and d.get('label') in rights: + counts[d.get('label')][u] += 1 + for right, c in counts.items(): + for principal, count in c.most_common(5): + examples = [G.nodes[v]['name'] for pu, v, ed in G.edges(data=True) if pu == principal and ed.get('label') == right][:3] + table.add_row(G.nodes[principal]['name'], right, str(count), ", ".join(examples)) + console.print(table) + console.print(f"[dim]Total computers: {len(computers)}[/dim]") + +def print_paths_to_owned(G, owned_str, domain_filter=None): + if not owned_str: + return + console.rule("[bold magenta]Shortest Paths to Owned Principals[/bold magenta]") + owned_list = [o.strip() for o in owned_str.split(',') if o.strip()] + owned_oids = [] + for o in owned_list: + found = False + for oid, d in G.nodes(data=True): + if d['name'].upper().split('@')[0] == o.upper() and (not domain_filter or d.get('props', {}).get('domain') == domain_filter): + owned_oids.append((oid, d['name'], d['type'])) + found = True + break + if not found: + console.print(f"[yellow]Owned principal not found: {o}[/yellow]") + if not owned_oids: + return + for tid, tname, ttype in owned_oids: + console.print(f"\n[bold red]Owned target:[/bold red] [bold cyan]{tname}[/bold cyan] ({ttype})") + count = 0 + for source_oid, sd in G.nodes(data=True): + if sd['type'].lower() != 'user': + continue + if not nx.has_path(G, source_oid, tid): + continue + try: + path = nx.shortest_path(G, source_oid, tid) + formatted = format_path(G, path) + console.print(f" [dim]→ Length {len(path)-1}:[/dim] {formatted}") + count += 1 + if count >= 10: + break + except nx.NetworkXNoPath: + continue + add_finding("Owned Paths", f"Paths to owned {tname}", score=9) + +def print_arbitrary_paths(G, path_from=None, path_to=None, domain_filter=None, max_paths=10): + if not path_from or not path_to: + return + console.rule("[bold magenta]Arbitrary Shortest Paths (source → target)[/bold magenta]") + sources = [s.strip() for s in path_from.split(',')] + targets = [t.strip() for t in path_to.split(',')] + for sname in sources: + s_oid = None + for oid, d in G.nodes(data=True): + if d['name'].upper().split('@')[0] == sname.upper() and (not domain_filter or d.get('props', {}).get('domain') == domain_filter): + s_oid = oid + break + if not s_oid: + console.print(f"[yellow]Source not found: {sname}[/yellow]") + continue + for tname in targets: + t_oid = None + for oid, d in G.nodes(data=True): + if d['name'].upper().split('@')[0] == tname.upper() and (not domain_filter or d.get('props', {}).get('domain') == domain_filter): + t_oid = oid + break + if not t_oid: + console.print(f"[yellow]Target not found: {tname}[/yellow]") + continue + try: + path = nx.shortest_path(G, s_oid, t_oid) + console.print(f"[cyan]{G.nodes[s_oid]['name']}[/cyan] → [bold cyan]{G.nodes[t_oid]['name']}[/bold cyan] (Length: {len(path)-1})") + console.print(f" {format_path(G, path)}") + add_finding("Arbitrary Paths", f"{G.nodes[s_oid]['name']} → {G.nodes[t_oid]['name']}", score=6) + except nx.NetworkXNoPath: + console.print(f"[dim]No path from {sname} to {tname}[/dim]") + +def print_trust_abuse(G, domain_filter=None): + console.rule("[bold magenta]Domain Trust / Cross-Domain Abuse[/bold magenta]") + found = False + trust_labels = {'trustedby', 'trusts', 'foreignadmin', 'foreigngroup', 'memberof (cross-domain)'} + for u, v, d in G.edges(data=True): + label_lower = d.get('label', '').lower() + if any(t in label_lower for t in trust_labels) or 'foreign' in label_lower: + u_name = G.nodes[u]['name'] + v_name = G.nodes[v]['name'] + if domain_filter and domain_filter.lower() not in (u_name.lower() + v_name.lower()): + continue + found = True + console.print(f"[yellow]Trust abuse possible[/yellow]: [green]{u_name}[/green] --[{d['label']}]--> [cyan]{v_name}[/cyan]") + add_finding("Trust Abuse", f"{u_name} {d['label']} {v_name}", score=7) + if not found: + console.print("[green]No obvious cross-domain trust abuse detected[/green]") + +def inspect_node(G, identifier, domain_filter=None): + console.rule(f"[bold magenta]Detailed Inspection: {identifier}[/bold magenta]") + found = False + for oid, d in G.nodes(data=True): + name_norm = d['name'].upper().split('@')[0] + if (oid == identifier or name_norm == identifier.upper()) and (not domain_filter or d.get('props', {}).get('domain') == domain_filter): + found = True + console.print(f"[cyan]OID:[/cyan] {oid}") + console.print(f"[cyan]Name:[/cyan] {d['name']}") + console.print(f"[cyan]Type:[/cyan] {d['type']}") + console.print("[dim]Properties:[/dim]") + for k, v in sorted(d.get('props', {}).items()): + console.print(f" {k}: {v}") + console.print("[dim]Outgoing edges:[/dim]") + for _, tgt, edata in G.out_edges(oid, data=True): + console.print(f" → [green]{G.nodes[tgt]['name']}[/green] [{edata.get('label')}]") + console.print("[dim]Incoming edges:[/dim]") + for src, _, edata in G.in_edges(oid, data=True): + console.print(f" ← [green]{G.nodes[src]['name']}[/green] [{edata.get('label')}]") + break + if not found: + console.print(f"[yellow]Node '{identifier}' not found (or filtered)[/yellow]") + +def print_group_analysis(G, domain_filter=None, deep_analysis=False): + console.rule("[bold magenta]Group Nesting Depth & Cycle Analysis[/bold magenta]") + groups = [n for n, d in G.nodes(data=True) if d['type'].lower() == 'group' and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)] + if not groups: + console.print("[green]No groups found[/green]") return - for principal, count in counts.most_common(10): - examples = [G.nodes[v]['name'] for pu, v, _ in admin_edges if pu == principal][:3] - table.add_row(G.nodes[principal]['name'], str(count), ", ".join(examples)) + high_priv_keywords = ['admin', 'domain admins', 'enterprise admins', 'schema admins', 'administrators', 'domain users', 'authenticated users'] + important_groups = [g for g in groups if any(k in G.nodes[g]['name'].lower() for k in high_priv_keywords)] + groups_to_check = important_groups[:50] if important_groups else groups[:100] + console.print(f"[cyan]Analyzing {len(groups_to_check)} important groups for nesting depth...[/cyan]") + depths = {} + with tqdm(groups_to_check, desc="Depth calculation", leave=False) as pbar: + for g in pbar: + try: + lengths = nx.single_source_shortest_path_length(G.to_undirected(), g, cutoff=20) + depths[g] = max(lengths.values()) if lengths else 0 + except: + depths[g] = 0 + deep = sorted(depths.items(), key=lambda x: x[1], reverse=True)[:15] + console.print("[yellow]Top 15 deepest nested groups (limited depth):[/yellow]") + for g, depth in deep: + if depth > 3: + console.print(f" [red]Deep nesting ({depth} levels):[/red] {G.nodes[g]['name']}") + add_finding("Deep Group Nesting", f"{G.nodes[g]['name']} has {depth} nesting levels", score=6) + if deep_analysis and len(G) < 2000: + console.print("[cyan]Running full cycle detection...[/cyan]") + try: + cycles = list(nx.simple_cycles(G.to_undirected(), length_bound=6)) + if cycles: + console.print(f"[red]Found {len(cycles)} group membership cycles![/red]") + for c in cycles[:3]: + names = [G.nodes[n]['name'] for n in c] + console.print(f" Cycle: {' → '.join(names)}") + add_finding("Deep Group Nesting", f"{len(cycles)} group cycles detected", score=8) + else: + console.print("[green]No group membership cycles detected[/green]") + except: + console.print("[yellow]Cycle detection skipped (graph too complex)[/yellow]") + else: + console.print("[dim]Cycle detection skipped for performance (use --deep-analysis to enable)[/dim]") + +def print_stats_dashboard(G, domain_filter=None): + console.rule("[bold magenta]AD Statistics Dashboard[/bold magenta]") + filtered_nodes = [(n, d) for n, d in G.nodes(data=True) if not domain_filter or d.get('props', {}).get('domain') == domain_filter] + total = len(filtered_nodes) + by_type = defaultdict(int) + for _, d in filtered_nodes: + by_type[d['type']] += 1 + table = Table(title="Object Counts") + table.add_column("Type", style="cyan") + table.add_column("Count", justify="right") + for t, c in sorted(by_type.items(), key=lambda x: x[1], reverse=True): + table.add_row(t, str(c)) console.print(table) - total_admins = sum(counts.values()) - console.print(f"[dim]Total LocalAdmin instances: {total_admins} on {len(computers)} computers[/dim]") + computers = sum(1 for _, d in filtered_nodes if d['type'].lower() == 'computer') + local_admins = len({u for u, v, d in G.edges(data=True) if d.get('label') == 'LocalAdmin' and G.nodes[v]['type'].lower() == 'computer'}) + console.print(f"[cyan]Computers with at least one LocalAdmin right: {local_admins}/{computers} ({local_admins/computers*100 if computers else 0:.1f}%)[/cyan]") + hv = len(get_high_value_targets(G, domain_filter)) + console.print(f"[cyan]High-value targets: {hv}[/cyan]") + console.print(f"[cyan]Total nodes: {total} | Total edges: {G.number_of_edges()}[/cyan]") + # ──────────────────────────────────────────────── # Export # ──────────────────────────────────────────────── @@ -1137,34 +1182,17 @@ def export_results(G, output_prefix="bloodbash", format_type="md", domain_filter for _, name, typ in get_high_value_targets(G, domain_filter): f.write(f"- {name} ({typ})\n") f.write("\n## Sample Paths\n") - # Add sample paths f.write("See console output for details.\n") console.print(f"[green]Exported Markdown:[/green] {path}") elif format_type == "json": path = f"{output_prefix}.json" - summary = { - "nodes": G.number_of_nodes(), - "edges": G.number_of_edges(), - "high_value": [ - {"name": d['name'], "type": d['type']} for _, d in G.nodes(data=True) - if any(k in d['name'].lower() for k in ['admin', 'krbtgt', 'ca', 'template']) and (not domain_filter or d.get('props', {}).get('domain') == domain_filter) - ] - } + summary = {"nodes": G.number_of_nodes(), "edges": G.number_of_edges(), "high_value": [{"name": d['name'], "type": d['type']} for _, d in G.nodes(data=True) if any(k in d['name'].lower() for k in ['admin', 'krbtgt', 'ca', 'template']) and (not domain_filter or d.get('props', {}).get('domain') == domain_filter)]} with open(path, "w", encoding="utf-8") as f: json.dump(summary, f, indent=2) console.print(f"[green]Exported JSON:[/green] {path}") elif format_type == "html": path = f"{output_prefix}.html" - html = f"""
| Severity | Category | Details |
|---|