diff --git a/.github/workflows/auth-tests.yml b/.github/workflows/auth-tests.yml new file mode 100644 index 0000000..b29b8a9 --- /dev/null +++ b/.github/workflows/auth-tests.yml @@ -0,0 +1,121 @@ +name: Auth Tests + +on: + push: + paths: + - 'core/auth.py' + - 'core/creds_checker.py' + - 'core/target_parser.py' + - 'detectors/**' + - 'tests/**' + - '.github/workflows/auth-tests.yml' + pull_request: + paths: + - 'core/auth.py' + - 'core/creds_checker.py' + - 'core/target_parser.py' + - 'detectors/**' + - 'tests/**' + +jobs: + test-stubbed: + name: "Stubbed – Python ${{ matrix.python-version }} / ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - run: pip install pytest pytest-cov + - name: Run tests with coverage + run: > + pytest tests/ -v + --cov=core.auth + --cov-report=term-missing + --cov-report=xml:coverage-stubbed.xml + - name: Upload coverage artifact + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-stubbed-${{ matrix.os }}-py${{ matrix.python-version }} + path: coverage-stubbed.xml + + test-full: + name: "Full deps – Python ${{ matrix.python-version }} / ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ['3.10', '3.11', '3.12', '3.13'] + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - run: pip install pytest pytest-cov -r requirements.txt + - name: Run tests with coverage + run: > + pytest tests/ -v + --cov=core.auth + --cov-report=term-missing + --cov-report=xml:coverage-full.xml + - name: Upload coverage artifact + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-full-${{ matrix.os }}-py${{ matrix.python-version }} + path: coverage-full.xml + + coverage-report: + name: Coverage summary + needs: [test-stubbed, test-full] + if: github.event_name == 'pull_request' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + - run: pip install pytest pytest-cov coverage + - name: Download all coverage artifacts + uses: actions/download-artifact@v4 + with: + path: coverage-reports + - name: Combine and report + run: | + pip install coverage + # find all coverage XML files and generate a combined report + echo "## Coverage reports by OS" > coverage-summary.md + for f in coverage-reports/*/coverage-*.xml; do + name=$(basename "$(dirname "$f")") + echo "### $name" >> coverage-summary.md + python -c " + import xml.etree.ElementTree as ET + tree = ET.parse('$f') + root = tree.getroot() + rate = float(root.attrib.get('line-rate', 0)) * 100 + print(f' Line coverage: {rate:.1f}%') + " >> coverage-summary.md + done + echo "" >> coverage-summary.md + cat coverage-summary.md + - name: Post coverage to PR summary + run: cat coverage-summary.md >> "$GITHUB_STEP_SUMMARY" + + lint: + name: Static analysis + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: '3.12' + - run: pip install pyflakes pycodestyle + - run: pyflakes core/auth.py + - run: pycodestyle --max-line-length=120 --ignore=E501,W503 core/auth.py diff --git a/core/__init__.py b/core/__init__.py index d80f790..fb368bc 100644 --- a/core/__init__.py +++ b/core/__init__.py @@ -2,4 +2,4 @@ RelayKing Core Module """ -__all__ = ['banner', 'config', 'target_parser', 'scanner', 'relay_analyzer'] +__all__ = ['auth', 'banner', 'config', 'target_parser', 'scanner', 'relay_analyzer'] diff --git a/core/auth.py b/core/auth.py new file mode 100644 index 0000000..537347e --- /dev/null +++ b/core/auth.py @@ -0,0 +1,169 @@ +""" +RelayKing Shared Authentication Module +Consolidates LDAP and RPC authentication logic used across the codebase. +""" + +from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE +from impacket.ldap import ldap as ldap_impacket + + +def get_base_dn(domain): + """Convert domain name to LDAP base DN (e.g. 'corp.local' -> 'DC=corp,DC=local').""" + if not domain: + return "" + return ','.join(f"DC={part}" for part in domain.split('.')) + + +def is_kerberos_error(error): + """Check if an exception is Kerberos-specific (should not be retried).""" + err_str = str(error).lower() + return any(kw in err_str for kw in ('kdc', 'kerberos', 'krb')) + + +def connect_ldap(config, dc_ip=None): + """ + Establish an authenticated LDAP connection using impacket. + + Handles Kerberos, pass-the-hash, password auth, and null/anonymous bind. + Automatically falls back from ldap:// to ldaps:// on channel binding + enforcement (80090346). + + Args: + config: RelayKingConfig instance + dc_ip: DC IP address (defaults to config.dc_ip, then resolved from config.domain) + + Returns: + Tuple of (connection, use_impacket: bool, search_base: str) + use_impacket is False only for null/anonymous bind (ldap3 connection). + + Raises: + Exception on connection or auth failure + """ + import socket as _socket + + if not dc_ip: + dc_ip = config.dc_ip + if not dc_ip and config.domain: + dc_ip = _socket.gethostbyname(config.domain) + + search_base = get_base_dn(config.domain) if config.domain else '' + + # Null/anonymous bind uses ldap3 (impacket requires credentials) + if config.null_auth: + import ssl + from ldap3 import Server, Connection, ALL, Tls + port = 636 if config.use_ldaps else 389 + tls_config = Tls(validate=ssl.CERT_NONE) if config.use_ldaps else None + server = Server(dc_ip, port=port, use_ssl=config.use_ldaps, tls=tls_config, get_info=ALL) + conn = Connection(server, auto_bind=True, auto_referrals=False) + return conn, False, search_base + + # All credentialed auth uses impacket + protos = ['ldaps'] if config.use_ldaps else ['ldap', 'ldaps'] + + for proto in protos: + try: + conn = ldap_impacket.LDAPConnection( + url=f"{proto}://{dc_ip}", + baseDN=config.domain, + dstIp=dc_ip, + signing=proto == 'ldap', + ) + + if config.use_kerberos: + krb_domain = (config.domain or '').upper() + conn.kerberosLogin( + user=config.username, + password=config.password or '', + domain=krb_domain, + lmhash=config.lmhash or '', + nthash=config.nthash or '', + aesKey=config.aesKey, + kdcHost=config.dc_ip, + useCache=True, + ) + elif config.nthash: + conn.login( + user=config.username, + password='', + domain=config.domain or '', + lmhash=config.lmhash or '', + nthash=config.nthash, + authenticationChoice='sasl', + ) + else: + conn.login( + user=config.username, + password=config.password, + domain=config.domain or '', + lmhash='', + nthash='', + authenticationChoice='sasl', + ) + + return conn, True, search_base + + except Exception as e: + if '80090346' in str(e) and proto == 'ldap': + continue + raise + + +def configure_rpc_auth(config, rpc_transport, target): + """ + Set credentials and Kerberos options on an RPC transport. + + Args: + config: RelayKingConfig instance + rpc_transport: DCERPCTransport from transport factory + target: Target hostname or IP (used for per-host Kerberos decision) + + Returns: + bool: True if Kerberos auth is configured + """ + use_kerberos = config.should_use_kerberos(target) + + if config.username: + if use_kerberos: + rpc_transport.set_credentials( + config.username, + config.password or '', + config.domain or '', + config.lmhash or '', + config.nthash or '', + config.aesKey, + ) + rpc_transport.set_kerberos(True, config.dc_ip) + else: + rpc_transport.set_credentials( + config.username, + config.password, + config.domain or '', + config.lmhash or '', + config.nthash or '', + ) + + return use_kerberos + + +def connect_dce(rpc_transport, use_kerberos, uuid): + """ + Create, connect, and bind a DCE/RPC handle from a configured transport. + + Args: + rpc_transport: Configured DCERPCTransport (credentials already set) + use_kerberos: Whether to set Kerberos auth type + uuid: Service UUID to bind to (e.g. rrp.MSRPC_UUID_RRP) + + Returns: + Connected and bound DCE/RPC handle + + Raises: + Exception on connection or bind failure + """ + dce = rpc_transport.get_dce_rpc() + if use_kerberos: + dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) + dce.connect() + dce.bind(uuid) + return dce diff --git a/core/creds_checker.py b/core/creds_checker.py index 10eca81..b223cf0 100644 --- a/core/creds_checker.py +++ b/core/creds_checker.py @@ -3,10 +3,7 @@ Check if given credential is valid to avoid unexpected account lockout """ -from impacket.dcerpc.v5 import transport, rrp -from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE -from impacket.ldap import ldap as ldap_impacket -from impacket.ldap import ldapasn1 as ldapasn1_impacket +from core.auth import connect_ldap, is_kerberos_error class CredentialChecker: @@ -32,63 +29,16 @@ def check_creds(self) -> str: dc_host = self.config.dc_ip if self.config.dc_ip is not None else self.config.domain print(f"[*] Checking given credentials against domain controller [{dc_host}] ... ") - last_error = None - for proto in ('ldap', 'ldaps'): - try: - ldap_url = f"{proto}://{dc_host}" - ldap_conn = ldap_impacket.LDAPConnection(url=ldap_url, baseDN=self.config.domain, dstIp=dc_host, signing=proto == 'ldap') + try: + connect_ldap(self.config, dc_host) + result['status'] = "success" + result['error'] = "None" + return result + except Exception as e: + if is_kerberos_error(e): + result['error'] = f'Kerberos auth failed: {e}' + else: + result['error'] = str(e) + return result - if self.config.should_use_kerberos(dc_host): - krb_domain = (self.config.domain or '').upper() - try: - ldap_conn.kerberosLogin( - user=self.config.username, - password=self.config.password or '', - domain=krb_domain, - lmhash=self.config.lmhash or '', - nthash=self.config.nthash or '', - aesKey=self.config.aesKey, - kdcHost=self.config.dc_ip, - useCache=True - ) - except Exception as krb_err: - krb_error = str(krb_err).lower() - if 'kdc' in krb_error or 'kerberos' in krb_error or 'krb' in krb_error: - result['error'] = f'Kerberos auth failed: {krb_err}' - return result - raise - else: - ldap_conn.login( - user=self.config.username, - password=self.config.password, - domain=self.config.domain or '', - lmhash=self.config.lmhash or '', - nthash=self.config.nthash or '', - authenticationChoice='sasl' - ) - result['status'] = "success" - result['error'] = "None" - return result - - except Exception as e: - err_str = str(e) - last_error = err_str - # 80090346 = SEC_E_BAD_BINDINGS: channel binding required. - # Retry with LDAPS so impacket can negotiate CBT over TLS. - if '80090346' in err_str and proto == 'ldap': - continue - result['error'] = err_str - return result - - result['error'] = last_error - return result - - - def _get_base_dn(self) -> str: - """Convert domain to base DN""" - if not self.config.domain: - return "" - - parts = self.config.domain.split('.') - return ','.join([f"DC={part}" for part in parts]) diff --git a/core/target_parser.py b/core/target_parser.py index 638abf2..44acfb1 100644 --- a/core/target_parser.py +++ b/core/target_parser.py @@ -4,16 +4,16 @@ """ import ipaddress +import os import re import subprocess import platform from typing import List, Set from concurrent.futures import ThreadPoolExecutor, as_completed -from impacket.dcerpc.v5 import transport, samr -from impacket.ldap import ldap, ldapasn1 -from impacket.smbconnection import SMBConnection import socket +from core.auth import connect_ldap + def _is_valid_unicast_ip(ip: str) -> bool: """ @@ -188,124 +188,18 @@ def _enumerate_ad(self): print(f"[!] Could not resolve domain: {self.config.domain}") return - # Build LDAP connection string - if self.config.use_ldaps: - ldap_scheme = 'ldaps' - ldap_port = 636 - else: - ldap_scheme = 'ldap' - ldap_port = 389 - - # Connect to LDAP - ldap_url = f"{ldap_scheme}://{dc_ip}:{ldap_port}" - try: - # Use Kerberos if specified, otherwise use ldap3 with NTLM + # Print auth method info if self.config.use_kerberos: - # Use impacket for Kerberos LDAP authentication - from impacket.ldap import ldap as ldap_impacket - from ldap3 import Server, Connection, NTLM, ALL, SUBTREE, SASL, KERBEROS - import os - - # Check for ccache in KRB5CCNAME environment variable ccache_file = os.environ.get('KRB5CCNAME', '') if ccache_file: print(f"[*] Using Kerberos authentication with ccache: {ccache_file}") else: print("[*] Using Kerberos authentication for AD enumeration...") - - ldap_url = f"{'ldaps' if self.config.use_ldaps else 'ldap'}://{dc_ip}" - impacket_conn = ldap_impacket.LDAPConnection(url=ldap_url, baseDN=self.config.domain, dstIp=dc_ip) - - # Kerberos login via impacket - # useCache=True (default) tells impacket to use ccache from KRB5CCNAME if available - # Domain should be uppercase for Kerberos realm matching - krb_domain = self.config.domain.upper() if self.config.domain else '' - impacket_conn.kerberosLogin( - user=self.config.username, - password=self.config.password or '', - domain=krb_domain, - lmhash=self.config.lmhash or '', - nthash=self.config.nthash or '', - aesKey=self.config.aesKey, - kdcHost=self.config.dc_ip, - useCache=True # Use ccache from KRB5CCNAME if available - ) - - # For the rest of the enumeration, we need to use the impacket connection - # since ldap3's Kerberos support is complex to set up - conn = impacket_conn - use_impacket = True elif self.config.nthash: - # Use impacket for NTLM pass-the-hash authentication - # ldap3 doesn't support pass-the-hash natively, so we use impacket - from impacket.ldap import ldap as ldap_impacket - print("[*] Using NTLM pass-the-hash authentication for AD enumeration...") - protos = ['ldaps'] if self.config.use_ldaps else ['ldap', 'ldaps'] - impacket_conn = None - for proto in protos: - try: - ldap_url = f"{proto}://{dc_ip}" - impacket_conn = ldap_impacket.LDAPConnection(url=ldap_url, baseDN=self.config.domain, dstIp=dc_ip, signing=proto == 'ldap') - impacket_conn.login( - user=self.config.username, - password='', - domain=self.config.domain, - lmhash=self.config.lmhash or '', - nthash=self.config.nthash, - authenticationChoice='sasl' - ) - break - except Exception as e: - if '80090346' in str(e) and proto == 'ldap': - continue - raise - - conn = impacket_conn - use_impacket = True - else: - # Use impacket for NTLM password auth. Unlike ldap3, impacket negotiates - # NTLM signing flags in the handshake, satisfying LDAP signing enforcement - # on port 389. For channel binding enforcement, retry with ldaps://. - from impacket.ldap import ldap as ldap_impacket - - if self.config.null_auth: - # Null/anonymous bind — impacket requires credentials, use ldap3 for this - import ssl - from ldap3 import Server, Connection, ALL, Tls - port = 636 if self.config.use_ldaps else 389 - tls_config = Tls(validate=ssl.CERT_NONE) if self.config.use_ldaps else None - server = Server(dc_ip, port=port, use_ssl=self.config.use_ldaps, tls=tls_config, get_info=ALL) - conn = Connection(server, auto_bind=True, auto_referrals=False) - use_impacket = False - else: - protos = ['ldaps'] if self.config.use_ldaps else ['ldap', 'ldaps'] - impacket_conn = None - for proto in protos: - try: - impacket_conn = ldap_impacket.LDAPConnection( - url=f"{proto}://{dc_ip}", baseDN=self.config.domain, dstIp=dc_ip, signing=proto == 'ldap' - ) - impacket_conn.login( - user=self.config.username, - password=self.config.password, - domain=self.config.domain or '', - lmhash='', - nthash='', - authenticationChoice='sasl' - ) - break - except Exception as e: - if '80090346' in str(e) and proto == 'ldap': - continue - raise - conn = impacket_conn - use_impacket = True - - # Build search base from domain - search_base = ','.join([f"DC={part}" for part in self.config.domain.split('.')]) + conn, use_impacket, search_base = connect_ldap(self.config, dc_ip) # Detect tier-0 assets (SCCM, ADCS, Exchange) if not using null auth # Note: Tier0Detector requires ldap3 connection, skip for Kerberos/impacket @@ -381,7 +275,8 @@ def _on_computer(item): print(f"[+] Retrieved {len(hostnames_to_resolve)} computers") else: - # Use ldap3's paged search for NTLM + # Use ldap3's paged search for null/anonymous auth + from ldap3 import SUBTREE page_size = self.config.ad_page_size cookie = None total_retrieved = 0 diff --git a/detectors/ghost_spn.py b/detectors/ghost_spn.py index cad9653..b3caf27 100644 --- a/detectors/ghost_spn.py +++ b/detectors/ghost_spn.py @@ -9,6 +9,8 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Set, Tuple, Optional +from core.auth import connect_ldap as _connect_ldap_shared + class GhostSPNDetector: """ @@ -160,65 +162,11 @@ def detect(self) -> Dict: def _connect_ldap(self, dc_ip: str) -> Tuple: """Establish LDAP connection. Returns (conn, use_impacket, search_base). - Uses impacket for all auth types: - - signing=True on ldap:// satisfies LDAP signing enforcement (00002028) - - authenticationChoice='sasl' on ldaps:// computes CBT from the TLS cert, - satisfying channel binding enforcement (80090346) - - Falls back ldap -> ldaps automatically on 80090346 + Delegates to the shared auth module which handles all auth types + (Kerberos, pass-the-hash, password) and automatic ldap→ldaps fallback + on channel binding enforcement (80090346). """ - from impacket.ldap import ldap as ldap_impacket - - search_base = '' - if self.config.domain: - search_base = ','.join(f"DC={part}" for part in self.config.domain.split('.')) - - use_ldaps = self.config.use_ldaps - protos = ['ldaps'] if use_ldaps else ['ldap', 'ldaps'] - - for proto in protos: - try: - conn = ldap_impacket.LDAPConnection( - url=f"{proto}://{dc_ip}", baseDN=self.config.domain, dstIp=dc_ip, - signing=proto == 'ldap' - ) - - if self.config.use_kerberos: - krb_domain = self.config.domain.upper() if self.config.domain else '' - conn.kerberosLogin( - user=self.config.username, - password=self.config.password or '', - domain=krb_domain, - lmhash=self.config.lmhash or '', - nthash=self.config.nthash or '', - aesKey=self.config.aesKey, - kdcHost=self.config.dc_ip, - useCache=True, - ) - elif self.config.nthash: - conn.login( - user=self.config.username, - password='', - domain=self.config.domain, - lmhash=self.config.lmhash or '', - nthash=self.config.nthash, - authenticationChoice='sasl' - ) - else: - conn.login( - user=self.config.username, - password=self.config.password, - domain=self.config.domain or '', - lmhash='', - nthash='', - authenticationChoice='sasl' - ) - - return conn, True, search_base - - except Exception as e: - if '80090346' in str(e) and proto == 'ldap': - continue - raise + return _connect_ldap_shared(self.config, dc_ip) def _check_wildcard_dns(self, conn, search_base: str, use_impacket: bool) -> bool: """Return True if any wildcard DNS entry exists in DomainDnsZones.""" diff --git a/detectors/ntlm_reflection.py b/detectors/ntlm_reflection.py index f1a420e..4c061ea 100644 --- a/detectors/ntlm_reflection.py +++ b/detectors/ntlm_reflection.py @@ -4,12 +4,13 @@ """ from impacket.dcerpc.v5 import transport, rrp, rprn -from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE from impacket.smbconnection import SessionError from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError import threading import time +from core.auth import configure_rpc_auth, connect_dce, is_kerberos_error + class NTLMReflectionDetector: """Detector for CVE-2025-33073 NTLM reflection vulnerability""" @@ -271,44 +272,13 @@ def _get_ubr_from_registry(self, target: str) -> int: try: # Create RPC transport over SMB named pipe rpc = transport.DCERPCTransportFactory(f"ncacn_np:{target}[\\pipe\\winreg]") - - # Set credentials - use_kerberos = self.config.should_use_kerberos(target) - if self.config.username: - if use_kerberos: - rpc.set_credentials( - self.config.username, - self.config.password or '', - self.config.domain or '', - self.config.lmhash or '', - self.config.nthash or '', - self.config.aesKey - ) - rpc.set_kerberos(True, self.config.dc_ip) - else: - rpc.set_credentials( - self.config.username, - self.config.password, - self.config.domain or '', - self.config.lmhash or '', - self.config.nthash or '' - ) - - # Get DCE/RPC handle - dce = rpc.get_dce_rpc() - - # Set Kerberos auth if needed - if use_kerberos: - dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) + use_kerberos = configure_rpc_auth(self.config, rpc, target) # Connect and bind to winreg try: - dce.connect() - dce.bind(rrp.MSRPC_UUID_RRP) + dce = connect_dce(rpc, use_kerberos, rrp.MSRPC_UUID_RRP) except Exception as conn_err: - # Handle Kerberos-specific errors - do NOT retry - conn_error = str(conn_err).lower() - if 'kdc' in conn_error or 'kerberos' in conn_error or 'krb' in conn_error: + if is_kerberos_error(conn_err): if self.config.verbose >= 3: print(f"[!] Kerberos auth failed for UBR check on {target}: {conn_err}") return None @@ -379,45 +349,23 @@ def _check_printspooler_enabled(self, target: str) -> bool: if self.config.verbose >= 3: print(f"[*] Checking PrintSpooler on {target} via RPC over TCP") - # Create RPC transport + # Create RPC transport and set credentials rpctransport = transport.DCERPCTransportFactory(stringbinding) - - # Set credentials - use_kerberos = self.config.should_use_kerberos(target) - if self.config.username: - if use_kerberos: - rpctransport.set_credentials( - self.config.username, - self.config.password or '', - self.config.domain or '', - self.config.lmhash or '', - self.config.nthash or '', - self.config.aesKey - ) - rpctransport.set_kerberos(True, self.config.dc_ip) - else: - rpctransport.set_credentials( - self.config.username, - self.config.password, - self.config.domain or '', - self.config.lmhash or '', - self.config.nthash or '' - ) + use_kerberos = configure_rpc_auth(self.config, rpctransport, target) # Get DCE/RPC connection dce = rpctransport.get_dce_rpc() # Set Kerberos auth if needed if use_kerberos: - dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) + from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE as _GSS + dce.set_auth_type(_GSS) # Connect try: dce.connect() except Exception as conn_err: - # Handle Kerberos-specific errors - do NOT retry - conn_error = str(conn_err).lower() - if 'kdc' in conn_error or 'kerberos' in conn_error or 'krb' in conn_error: + if is_kerberos_error(conn_err): if self.config.verbose >= 3: print(f"[!] Kerberos auth failed for PrintSpooler check on {target}: {conn_err}") return False diff --git a/detectors/ntlmv1_detector.py b/detectors/ntlmv1_detector.py index f43e5e7..c63c0f1 100644 --- a/detectors/ntlmv1_detector.py +++ b/detectors/ntlmv1_detector.py @@ -4,9 +4,7 @@ """ from impacket.dcerpc.v5 import transport, rrp -from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE -from impacket.ldap import ldap as ldap_impacket -from impacket.ldap import ldapasn1 as ldapasn1_impacket +from core.auth import connect_ldap, get_base_dn, is_kerberos_error, configure_rpc_auth, connect_dce class NTLMv1Detector: @@ -50,46 +48,19 @@ def check_gpo(self, dc_host: str) -> dict: # Computer Configuration -> Windows Settings -> Security Settings -> Local Policies -> Security Options # "Network security: LAN Manager authentication level" - ldap_url = f"ldap://{dc_host}" - ldap_conn = ldap_impacket.LDAPConnection(url=ldap_url, baseDN=self.config.domain, dstIp=dc_host) - - # Authenticate using Kerberos if specified, otherwise NTLM - # For GPO check, dc_host IS a DC so use should_use_kerberos - if self.config.should_use_kerberos(dc_host): - # Use uppercase domain for Kerberos realm matching, useCache for ccache - krb_domain = (self.config.domain or '').upper() - try: - ldap_conn.kerberosLogin( - user=self.config.username, - password=self.config.password or '', - domain=krb_domain, - lmhash=self.config.lmhash or '', - nthash=self.config.nthash or '', - aesKey=self.config.aesKey, - kdcHost=self.config.dc_ip, - useCache=True - ) - except Exception as krb_err: - # Handle Kerberos-specific errors - do NOT retry - # This prevents account lockouts from repeated auth failures - krb_error = str(krb_err).lower() - if 'kdc' in krb_error or 'kerberos' in krb_error or 'krb' in krb_error: - result['error'] = f'Kerberos auth failed: {krb_err}' - return result - raise - else: - ldap_conn.login( - user=self.config.username, - password=self.config.password, - domain=self.config.domain or '', - lmhash=self.config.lmhash or '', - nthash=self.config.nthash or '' - ) + try: + ldap_conn, _, _ = connect_ldap(self.config, dc_host) + except Exception as e: + if is_kerberos_error(e): + result['error'] = f'Kerberos auth failed: {e}' + return result + raise # Search for GPO objects with NTLMv1 settings # Look in Default Domain Policy and Default Domain Controllers Policy search_filter = "(objectClass=groupPolicyContainer)" + from impacket.ldap import ldapasn1 as ldapasn1_impacket resp = ldap_conn.search( searchBase=f"CN=Policies,CN=System,{self._get_base_dn()}", searchFilter=search_filter, @@ -166,42 +137,13 @@ def _get_lm_compat_level(self, host: str) -> int: try: # Create RPC transport over SMB rpc = transport.DCERPCTransportFactory(f"ncacn_np:{host}[\\pipe\\winreg]") - - # Set credentials - use_kerberos = self.config.should_use_kerberos(host) - if self.config.username: - if use_kerberos: - rpc.set_credentials( - self.config.username, - self.config.password or '', - self.config.domain or '', - self.config.lmhash or '', - self.config.nthash or '', - self.config.aesKey - ) - rpc.set_kerberos(True, self.config.dc_ip) - else: - rpc.set_credentials( - self.config.username, - self.config.password, - self.config.domain or '', - self.config.lmhash or '', - self.config.nthash or '' - ) + use_kerberos = configure_rpc_auth(self.config, rpc, host) # Connect and bind - dce = rpc.get_dce_rpc() - if use_kerberos: - dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) - try: - dce.connect() - dce.bind(rrp.MSRPC_UUID_RRP) + dce = connect_dce(rpc, use_kerberos, rrp.MSRPC_UUID_RRP) except Exception as conn_err: - # Handle Kerberos-specific errors - do NOT retry - # This prevents account lockouts from repeated auth failures - conn_error = str(conn_err).lower() - if 'kdc' in conn_error or 'kerberos' in conn_error or 'krb' in conn_error: + if is_kerberos_error(conn_err): if self.config.verbose >= 2: print(f"[!] Kerberos auth failed for registry access to {host}: {conn_err}") return None @@ -233,8 +175,4 @@ def _get_lm_compat_level(self, host: str) -> int: def _get_base_dn(self) -> str: """Convert domain to base DN""" - if not self.config.domain: - return "" - - parts = self.config.domain.split('.') - return ','.join([f"DC={part}" for part in parts]) + return get_base_dn(self.config.domain) diff --git a/pyproject.toml b/pyproject.toml index ab0384b..a040fd7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,3 +38,7 @@ py-modules = ["relayking"] [tool.setuptools.dynamic] dependencies = {file = ["requirements.txt"]} + +[tool.pytest.ini_options] +testpaths = ["tests"] +addopts = "-v" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..eb5b08b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,173 @@ +""" +Shared test fixtures for RelayKing test suite. +""" + +import sys +import os +import pytest +from unittest.mock import MagicMock, patch +from dataclasses import dataclass, field +from typing import Optional, List, Set + +# Ensure project root is importable +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + +# --------------------------------------------------------------------------- +# Stub out heavy third-party packages so tests run without them installed. +# This block must execute BEFORE any RelayKing imports. +# --------------------------------------------------------------------------- + +def _ensure_mock_module(name): + """Insert a MagicMock for *name* into sys.modules if it isn't importable.""" + try: + __import__(name) + except ImportError: + parts = name.split(".") + for i in range(len(parts)): + partial = ".".join(parts[: i + 1]) + if partial not in sys.modules: + sys.modules[partial] = MagicMock() + + +# impacket tree +for _mod in [ + "impacket", + "impacket.dcerpc", + "impacket.dcerpc.v5", + "impacket.dcerpc.v5.transport", + "impacket.dcerpc.v5.rpcrt", + "impacket.dcerpc.v5.rrp", + "impacket.dcerpc.v5.rprn", + "impacket.ldap", + "impacket.ldap.ldap", + "impacket.ldap.ldapasn1", + "impacket.smbconnection", + "impacket.tds", + "impacket.ntlm", + "impacket.smb", + "impacket.smb3", + "impacket.uuid", + "impacket.dcerpc.v5.samr", + "impacket.dcerpc.v5.epm", + "impacket.dcerpc.v5.even", + "impacket.dcerpc.v5.ndr", + "impacket.dcerpc.v5.dtypes", +]: + _ensure_mock_module(_mod) + +# ldap3 +for _mod in ["ldap3"]: + _ensure_mock_module(_mod) + +# requests / requests_ntlm / urllib3 / dnspython +for _mod in [ + "requests", + "requests.exceptions", + "requests_ntlm", + "urllib3", + "dns", + "dns.resolver", + "dns.rdatatype", + "dns.query", + "dns.message", +]: + _ensure_mock_module(_mod) + + +# --------------------------------------------------------------------------- +# Lightweight config stand-in (mirrors RelayKingConfig fields used by auth.py) +# --------------------------------------------------------------------------- + +@dataclass +class FakeConfig: + """Minimal stand-in for RelayKingConfig that covers every field auth.py touches.""" + username: Optional[str] = None + password: Optional[str] = None + domain: Optional[str] = None + lmhash: str = '' + nthash: str = '' + aesKey: Optional[str] = None + use_kerberos: bool = False + krb_dc_only: bool = False + dc_ip: Optional[str] = None + use_ldaps: bool = False + null_auth: bool = False + verbose: int = 0 + _dc_hostnames: Set[str] = field(default_factory=set) + + def should_use_kerberos(self, target: str) -> bool: + if not self.krb_dc_only: + return self.use_kerberos + return target.lower() in self._dc_hostnames + + +@pytest.fixture +def password_config(): + """Config for standard password authentication.""" + return FakeConfig( + username='lowpriv', + password='Password1', + domain='corp.local', + dc_ip='10.0.0.1', + ) + + +@pytest.fixture +def pth_config(): + """Config for pass-the-hash authentication.""" + return FakeConfig( + username='lowpriv', + domain='corp.local', + dc_ip='10.0.0.1', + nthash='aabbccdd11223344aabbccdd11223344', + ) + + +@pytest.fixture +def kerberos_config(): + """Config for Kerberos authentication.""" + return FakeConfig( + username='lowpriv', + password='Password1', + domain='corp.local', + dc_ip='10.0.0.1', + use_kerberos=True, + ) + + +@pytest.fixture +def null_config(): + """Config for null/anonymous authentication.""" + return FakeConfig( + null_auth=True, + domain='corp.local', + dc_ip='10.0.0.1', + ) + + +@pytest.fixture +def ldaps_config(): + """Config for password auth forced over LDAPS.""" + return FakeConfig( + username='lowpriv', + password='Password1', + domain='corp.local', + dc_ip='10.0.0.1', + use_ldaps=True, + ) + + +@pytest.fixture +def krb_dc_only_config(): + """Config with --krb-dc-only: Kerberos for DCs, NTLM for everything else.""" + cfg = FakeConfig( + username='lowpriv', + password='Password1', + domain='corp.local', + dc_ip='10.0.0.1', + use_kerberos=True, + krb_dc_only=True, + _dc_hostnames={'dc01.corp.local'}, + ) + return cfg diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..3506618 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,568 @@ +""" +Comprehensive tests for core.auth — the shared authentication module. + +Every function is tested across all auth modes (password, pass-the-hash, +Kerberos, null/anonymous) and edge cases (fallback, errors, missing fields). +""" + +import pytest +from unittest.mock import MagicMock, patch, call + +from core.auth import ( + get_base_dn, + is_kerberos_error, + connect_ldap, + configure_rpc_auth, + connect_dce, +) + + +# ═══════════════════════════════════════════════════════════════════ +# get_base_dn +# ═══════════════════════════════════════════════════════════════════ + +class TestGetBaseDn: + def test_simple_domain(self): + assert get_base_dn('corp.local') == 'DC=corp,DC=local' + + def test_three_part_domain(self): + assert get_base_dn('sub.corp.local') == 'DC=sub,DC=corp,DC=local' + + def test_single_label(self): + assert get_base_dn('WORKGROUP') == 'DC=WORKGROUP' + + def test_empty_string(self): + assert get_base_dn('') == '' + + def test_none(self): + assert get_base_dn(None) == '' + + +# ═══════════════════════════════════════════════════════════════════ +# is_kerberos_error +# ═══════════════════════════════════════════════════════════════════ + +class TestIsKerberosError: + @pytest.mark.parametrize('msg', [ + 'KDC_ERR_CLIENT_REVOKED', + 'Kerberos SessionError: KRB_AP_ERR_SKEW', + 'kdc unreachable', + 'krb5 library error', + 'KERBEROS authentication failed', + ]) + def test_detects_kerberos_errors(self, msg): + assert is_kerberos_error(Exception(msg)) is True + + @pytest.mark.parametrize('msg', [ + 'Connection refused', + 'STATUS_LOGON_FAILURE', + '80090346', # channel binding — NOT kerberos + 'timeout', + '', + ]) + def test_rejects_non_kerberos_errors(self, msg): + assert is_kerberos_error(Exception(msg)) is False + + def test_accepts_plain_string(self): + """is_kerberos_error should work with str(exception) internally.""" + assert is_kerberos_error(Exception('KDC timeout')) is True + + def test_case_insensitive(self): + assert is_kerberos_error(Exception('kErBeRoS')) is True + + +# ═══════════════════════════════════════════════════════════════════ +# connect_ldap — password auth +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectLdapPassword: + """Standard username + password auth via impacket.""" + + @patch('core.auth.ldap_impacket') + def test_password_auth_returns_connection(self, mock_ldap, password_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + conn, use_impacket, search_base = connect_ldap(password_config) + + assert conn is mock_conn + assert use_impacket is True + assert search_base == 'DC=corp,DC=local' + + @patch('core.auth.ldap_impacket') + def test_password_calls_login_with_sasl(self, mock_ldap, password_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(password_config) + + mock_conn.login.assert_called_once_with( + user='lowpriv', + password='Password1', + domain='corp.local', + lmhash='', + nthash='', + authenticationChoice='sasl', + ) + + @patch('core.auth.ldap_impacket') + def test_password_tries_ldap_first(self, mock_ldap, password_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(password_config) + + # First call should be ldap:// with signing=True + mock_ldap.LDAPConnection.assert_called_once_with( + url='ldap://10.0.0.1', + baseDN='corp.local', + dstIp='10.0.0.1', + signing=True, + ) + + @patch('core.auth.ldap_impacket') + def test_password_ldaps_forced(self, mock_ldap, ldaps_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(ldaps_config) + + mock_ldap.LDAPConnection.assert_called_once_with( + url='ldaps://10.0.0.1', + baseDN='corp.local', + dstIp='10.0.0.1', + signing=False, + ) + + @patch('core.auth.ldap_impacket') + def test_explicit_dc_ip_overrides_config(self, mock_ldap, password_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(password_config, dc_ip='10.99.99.99') + + mock_ldap.LDAPConnection.assert_called_once_with( + url='ldap://10.99.99.99', + baseDN='corp.local', + dstIp='10.99.99.99', + signing=True, + ) + + +# ═══════════════════════════════════════════════════════════════════ +# connect_ldap — channel binding fallback (80090346) +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectLdapChannelBindingFallback: + """When ldap:// fails with 80090346 the function should retry with ldaps://.""" + + @patch('core.auth.ldap_impacket') + def test_fallback_to_ldaps_on_channel_binding(self, mock_ldap, password_config): + mock_conn_ldap = MagicMock() + mock_conn_ldaps = MagicMock() + + # First call (ldap://) raises channel binding error, second (ldaps://) succeeds + mock_ldap.LDAPConnection.side_effect = [mock_conn_ldap, mock_conn_ldaps] + mock_conn_ldap.login.side_effect = Exception('80090346 SEC_E_BAD_BINDINGS') + mock_conn_ldaps.login.return_value = None + + conn, use_impacket, _ = connect_ldap(password_config) + + assert conn is mock_conn_ldaps + assert use_impacket is True + assert mock_ldap.LDAPConnection.call_count == 2 + + # Verify second call was ldaps with signing=False + second_call = mock_ldap.LDAPConnection.call_args_list[1] + assert second_call == call( + url='ldaps://10.0.0.1', + baseDN='corp.local', + dstIp='10.0.0.1', + signing=False, + ) + + @patch('core.auth.ldap_impacket') + def test_non_channel_binding_error_raises_immediately(self, mock_ldap, password_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + mock_conn.login.side_effect = Exception('STATUS_LOGON_FAILURE') + + with pytest.raises(Exception, match='STATUS_LOGON_FAILURE'): + connect_ldap(password_config) + + # Should NOT retry with ldaps + assert mock_ldap.LDAPConnection.call_count == 1 + + @patch('core.auth.ldap_impacket') + def test_ldaps_forced_no_fallback_needed(self, mock_ldap, ldaps_config): + """When use_ldaps=True, only ldaps:// is tried — no fallback loop.""" + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + mock_conn.login.side_effect = Exception('80090346 bad bindings') + + with pytest.raises(Exception, match='80090346'): + connect_ldap(ldaps_config) + + # Only one attempt (ldaps) + assert mock_ldap.LDAPConnection.call_count == 1 + + +# ═══════════════════════════════════════════════════════════════════ +# connect_ldap — pass-the-hash +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectLdapPassTheHash: + + @patch('core.auth.ldap_impacket') + def test_pth_sends_empty_password_and_nthash(self, mock_ldap, pth_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(pth_config) + + mock_conn.login.assert_called_once_with( + user='lowpriv', + password='', + domain='corp.local', + lmhash='', + nthash='aabbccdd11223344aabbccdd11223344', + authenticationChoice='sasl', + ) + + @patch('core.auth.ldap_impacket') + def test_pth_returns_impacket_connection(self, mock_ldap, pth_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + conn, use_impacket, _ = connect_ldap(pth_config) + assert use_impacket is True + + @patch('core.auth.ldap_impacket') + def test_pth_falls_back_on_channel_binding(self, mock_ldap, pth_config): + mock_conn_ldap = MagicMock() + mock_conn_ldaps = MagicMock() + mock_ldap.LDAPConnection.side_effect = [mock_conn_ldap, mock_conn_ldaps] + mock_conn_ldap.login.side_effect = Exception('80090346') + + conn, _, _ = connect_ldap(pth_config) + assert conn is mock_conn_ldaps + + +# ═══════════════════════════════════════════════════════════════════ +# connect_ldap — Kerberos +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectLdapKerberos: + + @patch('core.auth.ldap_impacket') + def test_kerberos_calls_kerberosLogin(self, mock_ldap, kerberos_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(kerberos_config) + + mock_conn.kerberosLogin.assert_called_once_with( + user='lowpriv', + password='Password1', + domain='CORP.LOCAL', + lmhash='', + nthash='', + aesKey=None, + kdcHost='10.0.0.1', + useCache=True, + ) + mock_conn.login.assert_not_called() + + @patch('core.auth.ldap_impacket') + def test_kerberos_uppercases_domain(self, mock_ldap, kerberos_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(kerberos_config) + + kw = mock_conn.kerberosLogin.call_args + assert kw[1]['domain'] == 'CORP.LOCAL' + + @patch('core.auth.ldap_impacket') + def test_kerberos_with_aeskey(self, mock_ldap, kerberos_config): + kerberos_config.aesKey = 'deadbeef' * 8 + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(kerberos_config) + + kw = mock_conn.kerberosLogin.call_args + assert kw[1]['aesKey'] == 'deadbeef' * 8 + + +# ═══════════════════════════════════════════════════════════════════ +# connect_ldap — null/anonymous auth +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectLdapNull: + + @patch('core.auth.ldap_impacket') + def test_null_returns_ldap3_connection(self, mock_ldap, null_config): + """Null auth must use ldap3 (not impacket) because impacket needs creds.""" + import ldap3 + ldap3.Server = MagicMock() + ldap3.Connection = MagicMock(return_value=MagicMock()) + ldap3.ALL = 'ALL' + ldap3.Tls = MagicMock() + + conn, use_impacket, search_base = connect_ldap(null_config) + + assert use_impacket is False + assert search_base == 'DC=corp,DC=local' + # impacket should NOT be called + mock_ldap.LDAPConnection.assert_not_called() + + @patch('core.auth.ldap_impacket') + def test_null_ldaps(self, mock_ldap, null_config): + null_config.use_ldaps = True + import ldap3 + ldap3.Server = MagicMock() + ldap3.Connection = MagicMock(return_value=MagicMock()) + ldap3.ALL = 'ALL' + ldap3.Tls = MagicMock() + + connect_ldap(null_config) + + # Server should be created with use_ssl=True and port 636 + server_call = ldap3.Server.call_args + assert server_call[0][0] == '10.0.0.1' + assert server_call[1].get('port') == 636 or server_call[0][1] == 636 + assert server_call[1].get('use_ssl') is True + + +# ═══════════════════════════════════════════════════════════════════ +# connect_ldap — dc_ip resolution +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectLdapDcResolution: + + @patch('core.auth.ldap_impacket') + def test_falls_back_to_config_dc_ip(self, mock_ldap, password_config): + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(password_config) # dc_ip not passed; uses config.dc_ip + + mock_ldap.LDAPConnection.assert_called_once_with( + url='ldap://10.0.0.1', + baseDN='corp.local', + dstIp='10.0.0.1', + signing=True, + ) + + @patch('socket.gethostbyname', return_value='10.0.0.42') + @patch('core.auth.ldap_impacket') + def test_resolves_domain_when_no_dc_ip(self, mock_ldap, mock_dns, password_config): + password_config.dc_ip = None + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + connect_ldap(password_config) + + mock_dns.assert_called_once_with('corp.local') + mock_ldap.LDAPConnection.assert_called_once_with( + url='ldap://10.0.0.42', + baseDN='corp.local', + dstIp='10.0.0.42', + signing=True, + ) + + @patch('core.auth.ldap_impacket') + def test_no_domain_gives_empty_search_base(self, mock_ldap, password_config): + password_config.domain = None + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + _, _, search_base = connect_ldap(password_config) + assert search_base == '' + + +# ═══════════════════════════════════════════════════════════════════ +# configure_rpc_auth +# ═══════════════════════════════════════════════════════════════════ + +class TestConfigureRpcAuth: + + def test_ntlm_sets_credentials(self, password_config): + transport = MagicMock() + + use_krb = configure_rpc_auth(password_config, transport, 'target.corp.local') + + assert use_krb is False + transport.set_credentials.assert_called_once_with( + 'lowpriv', 'Password1', 'corp.local', '', '', + ) + transport.set_kerberos.assert_not_called() + + def test_kerberos_sets_credentials_and_kerberos(self, kerberos_config): + transport = MagicMock() + + use_krb = configure_rpc_auth(kerberos_config, transport, 'dc01.corp.local') + + assert use_krb is True + transport.set_credentials.assert_called_once_with( + 'lowpriv', 'Password1', 'corp.local', '', '', None, + ) + transport.set_kerberos.assert_called_once_with(True, '10.0.0.1') + + def test_pth_sets_nthash(self, pth_config): + transport = MagicMock() + + configure_rpc_auth(pth_config, transport, 'target.corp.local') + + transport.set_credentials.assert_called_once_with( + 'lowpriv', None, 'corp.local', '', 'aabbccdd11223344aabbccdd11223344', + ) + + def test_no_username_skips_credentials(self, null_config): + transport = MagicMock() + + configure_rpc_auth(null_config, transport, 'target.corp.local') + + transport.set_credentials.assert_not_called() + transport.set_kerberos.assert_not_called() + + def test_krb_dc_only_uses_kerberos_for_dc(self, krb_dc_only_config): + transport = MagicMock() + + use_krb = configure_rpc_auth(krb_dc_only_config, transport, 'dc01.corp.local') + + assert use_krb is True + transport.set_kerberos.assert_called_once() + + def test_krb_dc_only_uses_ntlm_for_member(self, krb_dc_only_config): + transport = MagicMock() + + use_krb = configure_rpc_auth(krb_dc_only_config, transport, 'workstation1.corp.local') + + assert use_krb is False + transport.set_kerberos.assert_not_called() + + def test_kerberos_includes_aeskey(self, kerberos_config): + kerberos_config.aesKey = 'aes256key' + transport = MagicMock() + + configure_rpc_auth(kerberos_config, transport, 'dc.corp.local') + + cred_args = transport.set_credentials.call_args[0] + assert cred_args[5] == 'aes256key' + + +# ═══════════════════════════════════════════════════════════════════ +# connect_dce +# ═══════════════════════════════════════════════════════════════════ + +class TestConnectDce: + + def test_creates_connects_binds_dce(self): + transport = MagicMock() + mock_dce = MagicMock() + transport.get_dce_rpc.return_value = mock_dce + fake_uuid = b'\x01\x02\x03' + + result = connect_dce(transport, use_kerberos=False, uuid=fake_uuid) + + assert result is mock_dce + mock_dce.connect.assert_called_once() + mock_dce.bind.assert_called_once_with(fake_uuid) + mock_dce.set_auth_type.assert_not_called() + + def test_sets_kerberos_auth_type(self): + from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE + + transport = MagicMock() + mock_dce = MagicMock() + transport.get_dce_rpc.return_value = mock_dce + + connect_dce(transport, use_kerberos=True, uuid=b'\x01') + + mock_dce.set_auth_type.assert_called_once_with(RPC_C_AUTHN_GSS_NEGOTIATE) + + def test_connect_failure_propagates(self): + transport = MagicMock() + mock_dce = MagicMock() + transport.get_dce_rpc.return_value = mock_dce + mock_dce.connect.side_effect = OSError('Connection refused') + + with pytest.raises(OSError, match='Connection refused'): + connect_dce(transport, use_kerberos=False, uuid=b'\x01') + + def test_bind_failure_propagates(self): + transport = MagicMock() + mock_dce = MagicMock() + transport.get_dce_rpc.return_value = mock_dce + mock_dce.bind.side_effect = Exception('RPC_S_SERVER_UNAVAILABLE') + + with pytest.raises(Exception, match='RPC_S_SERVER_UNAVAILABLE'): + connect_dce(transport, use_kerberos=False, uuid=b'\x01') + + +# ═══════════════════════════════════════════════════════════════════ +# Integration-style: end-to-end auth flow scenarios +# ═══════════════════════════════════════════════════════════════════ + +class TestAuthIntegrationScenarios: + """Test realistic multi-step auth flows matching how callers use the module.""" + + @patch('core.auth.ldap_impacket') + def test_creds_checker_flow(self, mock_ldap, password_config): + """Simulates CredentialChecker.check_creds() calling connect_ldap.""" + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + # CredentialChecker calls connect_ldap(config, dc_host) + conn, use_impacket, _ = connect_ldap(password_config, dc_ip='10.0.0.1') + + assert conn is mock_conn + assert use_impacket is True + mock_conn.login.assert_called_once() + + @patch('core.auth.ldap_impacket') + def test_ghost_spn_flow_kerberos(self, mock_ldap, kerberos_config): + """Simulates GhostSPNDetector._connect_ldap() flow with Kerberos.""" + mock_conn = MagicMock() + mock_ldap.LDAPConnection.return_value = mock_conn + + conn, use_impacket, search_base = connect_ldap(kerberos_config, dc_ip='10.0.0.1') + + assert use_impacket is True + assert search_base == 'DC=corp,DC=local' + mock_conn.kerberosLogin.assert_called_once() + + def test_ntlmv1_registry_check_flow(self, password_config): + """Simulates NTLMv1Detector._get_lm_compat_level() RPC flow.""" + from impacket.dcerpc.v5 import rrp + + rpc_transport = MagicMock() + mock_dce = MagicMock() + rpc_transport.get_dce_rpc.return_value = mock_dce + + # Step 1: configure auth + use_krb = configure_rpc_auth(password_config, rpc_transport, 'target.corp.local') + assert use_krb is False + + # Step 2: connect + bind + dce = connect_dce(rpc_transport, use_krb, rrp.MSRPC_UUID_RRP) + assert dce is mock_dce + mock_dce.connect.assert_called_once() + mock_dce.bind.assert_called_once_with(rrp.MSRPC_UUID_RRP) + + @patch('core.auth.ldap_impacket') + def test_channel_binding_then_kerberos_error(self, mock_ldap, kerberos_config): + """Channel binding fallback should still propagate Kerberos errors.""" + mock_conn_ldap = MagicMock() + mock_conn_ldaps = MagicMock() + mock_ldap.LDAPConnection.side_effect = [mock_conn_ldap, mock_conn_ldaps] + + # ldap:// fails with channel binding + mock_conn_ldap.kerberosLogin.side_effect = Exception('80090346') + # ldaps:// fails with Kerberos error + mock_conn_ldaps.kerberosLogin.side_effect = Exception('KDC_ERR_C_PRINCIPAL_UNKNOWN') + + with pytest.raises(Exception, match='KDC_ERR_C_PRINCIPAL_UNKNOWN'): + connect_ldap(kerberos_config) diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..65b25de --- /dev/null +++ b/tox.ini @@ -0,0 +1,25 @@ +[tox] +envlist = py{39,310,311,312,313}-{full,stubbed}, lint +skip_missing_interpreters = true + +[testenv] +description = run auth tests +deps = + pytest>=7.0 + full: impacket>=0.11.0 + full: ldap3>=2.9.1 + full: requests>=2.28.0 + full: requests-ntlm>=1.2.0 + full: dnspython>=2.3.0 + full: pyasn1>=0.4.8 +commands = + pytest tests/ -v {posargs} + +[testenv:lint] +description = static analysis +deps = + pyflakes + pycodestyle +commands = + pyflakes core/auth.py + pycodestyle --max-line-length=120 --ignore=E501,W503 core/auth.py