Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions .github/workflows/auth-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
name: Auth Tests

on:
push:
paths:
- 'core/auth.py'
- 'core/creds_checker.py'
- 'core/target_parser.py'
- 'detectors/**'
- 'tests/**'
- '.github/workflows/auth-tests.yml'
pull_request:
paths:
- 'core/auth.py'
- 'core/creds_checker.py'
- 'core/target_parser.py'
- 'detectors/**'
- 'tests/**'

jobs:
test-stubbed:
name: "Stubbed – Python ${{ matrix.python-version }} / ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: ['3.9', '3.10', '3.11', '3.12', '3.13']
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- run: pip install pytest pytest-cov
- name: Run tests with coverage
run: >
pytest tests/ -v
--cov=core.auth
--cov-report=term-missing
--cov-report=xml:coverage-stubbed.xml
- name: Upload coverage artifact
if: always()
uses: actions/upload-artifact@v4
with:
name: coverage-stubbed-${{ matrix.os }}-py${{ matrix.python-version }}
path: coverage-stubbed.xml

test-full:
name: "Full deps – Python ${{ matrix.python-version }} / ${{ matrix.os }}"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
python-version: ['3.10', '3.11', '3.12', '3.13']
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- run: pip install pytest pytest-cov -r requirements.txt
- name: Run tests with coverage
run: >
pytest tests/ -v
--cov=core.auth
--cov-report=term-missing
--cov-report=xml:coverage-full.xml
- name: Upload coverage artifact
if: always()
uses: actions/upload-artifact@v4
with:
name: coverage-full-${{ matrix.os }}-py${{ matrix.python-version }}
path: coverage-full.xml

coverage-report:
name: Coverage summary
needs: [test-stubbed, test-full]
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install pytest pytest-cov coverage
- name: Download all coverage artifacts
uses: actions/download-artifact@v4
with:
path: coverage-reports
- name: Combine and report
run: |
pip install coverage
# find all coverage XML files and generate a combined report
echo "## Coverage reports by OS" > coverage-summary.md
for f in coverage-reports/*/coverage-*.xml; do
name=$(basename "$(dirname "$f")")
echo "### $name" >> coverage-summary.md
python -c "
import xml.etree.ElementTree as ET
tree = ET.parse('$f')
root = tree.getroot()
rate = float(root.attrib.get('line-rate', 0)) * 100
print(f' Line coverage: {rate:.1f}%')
" >> coverage-summary.md
done
echo "" >> coverage-summary.md
cat coverage-summary.md
- name: Post coverage to PR summary
run: cat coverage-summary.md >> "$GITHUB_STEP_SUMMARY"

lint:
name: Static analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install pyflakes pycodestyle
- run: pyflakes core/auth.py
- run: pycodestyle --max-line-length=120 --ignore=E501,W503 core/auth.py
2 changes: 1 addition & 1 deletion core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
RelayKing Core Module
"""

__all__ = ['banner', 'config', 'target_parser', 'scanner', 'relay_analyzer']
__all__ = ['auth', 'banner', 'config', 'target_parser', 'scanner', 'relay_analyzer']
169 changes: 169 additions & 0 deletions core/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""
RelayKing Shared Authentication Module
Consolidates LDAP and RPC authentication logic used across the codebase.
"""

from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE
from impacket.ldap import ldap as ldap_impacket


def get_base_dn(domain):
"""Convert domain name to LDAP base DN (e.g. 'corp.local' -> 'DC=corp,DC=local')."""
if not domain:
return ""
return ','.join(f"DC={part}" for part in domain.split('.'))


def is_kerberos_error(error):
"""Check if an exception is Kerberos-specific (should not be retried)."""
err_str = str(error).lower()
return any(kw in err_str for kw in ('kdc', 'kerberos', 'krb'))


def connect_ldap(config, dc_ip=None):
"""
Establish an authenticated LDAP connection using impacket.

Handles Kerberos, pass-the-hash, password auth, and null/anonymous bind.
Automatically falls back from ldap:// to ldaps:// on channel binding
enforcement (80090346).

Args:
config: RelayKingConfig instance
dc_ip: DC IP address (defaults to config.dc_ip, then resolved from config.domain)

Returns:
Tuple of (connection, use_impacket: bool, search_base: str)
use_impacket is False only for null/anonymous bind (ldap3 connection).

Raises:
Exception on connection or auth failure
"""
import socket as _socket

if not dc_ip:
dc_ip = config.dc_ip
if not dc_ip and config.domain:
dc_ip = _socket.gethostbyname(config.domain)

search_base = get_base_dn(config.domain) if config.domain else ''

# Null/anonymous bind uses ldap3 (impacket requires credentials)
if config.null_auth:
import ssl
from ldap3 import Server, Connection, ALL, Tls
port = 636 if config.use_ldaps else 389
tls_config = Tls(validate=ssl.CERT_NONE) if config.use_ldaps else None
server = Server(dc_ip, port=port, use_ssl=config.use_ldaps, tls=tls_config, get_info=ALL)
conn = Connection(server, auto_bind=True, auto_referrals=False)
return conn, False, search_base

# All credentialed auth uses impacket
protos = ['ldaps'] if config.use_ldaps else ['ldap', 'ldaps']

for proto in protos:
try:
conn = ldap_impacket.LDAPConnection(
url=f"{proto}://{dc_ip}",
baseDN=config.domain,
dstIp=dc_ip,
signing=proto == 'ldap',
)

if config.use_kerberos:
krb_domain = (config.domain or '').upper()
conn.kerberosLogin(
user=config.username,
password=config.password or '',
domain=krb_domain,
lmhash=config.lmhash or '',
nthash=config.nthash or '',
aesKey=config.aesKey,
kdcHost=config.dc_ip,
useCache=True,
)
elif config.nthash:
conn.login(
user=config.username,
password='',
domain=config.domain or '',
lmhash=config.lmhash or '',
nthash=config.nthash,
authenticationChoice='sasl',
)
else:
conn.login(
user=config.username,
password=config.password,
domain=config.domain or '',
lmhash='',
nthash='',
authenticationChoice='sasl',
)

return conn, True, search_base

except Exception as e:
if '80090346' in str(e) and proto == 'ldap':
continue
raise


def configure_rpc_auth(config, rpc_transport, target):
"""
Set credentials and Kerberos options on an RPC transport.

Args:
config: RelayKingConfig instance
rpc_transport: DCERPCTransport from transport factory
target: Target hostname or IP (used for per-host Kerberos decision)

Returns:
bool: True if Kerberos auth is configured
"""
use_kerberos = config.should_use_kerberos(target)

if config.username:
if use_kerberos:
rpc_transport.set_credentials(
config.username,
config.password or '',
config.domain or '',
config.lmhash or '',
config.nthash or '',
config.aesKey,
)
rpc_transport.set_kerberos(True, config.dc_ip)
else:
rpc_transport.set_credentials(
config.username,
config.password,
config.domain or '',
config.lmhash or '',
config.nthash or '',
)

return use_kerberos


def connect_dce(rpc_transport, use_kerberos, uuid):
"""
Create, connect, and bind a DCE/RPC handle from a configured transport.

Args:
rpc_transport: Configured DCERPCTransport (credentials already set)
use_kerberos: Whether to set Kerberos auth type
uuid: Service UUID to bind to (e.g. rrp.MSRPC_UUID_RRP)

Returns:
Connected and bound DCE/RPC handle

Raises:
Exception on connection or bind failure
"""
dce = rpc_transport.get_dce_rpc()
if use_kerberos:
dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE)
dce.connect()
dce.bind(uuid)
return dce
74 changes: 12 additions & 62 deletions core/creds_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
Check if given credential is valid to avoid unexpected account lockout
"""

from impacket.dcerpc.v5 import transport, rrp
from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_GSS_NEGOTIATE
from impacket.ldap import ldap as ldap_impacket
from impacket.ldap import ldapasn1 as ldapasn1_impacket
from core.auth import connect_ldap, is_kerberos_error


class CredentialChecker:
Expand All @@ -32,63 +29,16 @@ def check_creds(self) -> str:
dc_host = self.config.dc_ip if self.config.dc_ip is not None else self.config.domain
print(f"[*] Checking given credentials against domain controller [{dc_host}] ... ")

last_error = None
for proto in ('ldap', 'ldaps'):
try:
ldap_url = f"{proto}://{dc_host}"
ldap_conn = ldap_impacket.LDAPConnection(url=ldap_url, baseDN=self.config.domain, dstIp=dc_host, signing=proto == 'ldap')
try:
connect_ldap(self.config, dc_host)
result['status'] = "success"
result['error'] = "None"
return result
except Exception as e:
if is_kerberos_error(e):
result['error'] = f'Kerberos auth failed: {e}'
else:
result['error'] = str(e)
return result

if self.config.should_use_kerberos(dc_host):
krb_domain = (self.config.domain or '').upper()
try:
ldap_conn.kerberosLogin(
user=self.config.username,
password=self.config.password or '',
domain=krb_domain,
lmhash=self.config.lmhash or '',
nthash=self.config.nthash or '',
aesKey=self.config.aesKey,
kdcHost=self.config.dc_ip,
useCache=True
)
except Exception as krb_err:
krb_error = str(krb_err).lower()
if 'kdc' in krb_error or 'kerberos' in krb_error or 'krb' in krb_error:
result['error'] = f'Kerberos auth failed: {krb_err}'
return result
raise
else:
ldap_conn.login(
user=self.config.username,
password=self.config.password,
domain=self.config.domain or '',
lmhash=self.config.lmhash or '',
nthash=self.config.nthash or '',
authenticationChoice='sasl'
)

result['status'] = "success"
result['error'] = "None"
return result

except Exception as e:
err_str = str(e)
last_error = err_str
# 80090346 = SEC_E_BAD_BINDINGS: channel binding required.
# Retry with LDAPS so impacket can negotiate CBT over TLS.
if '80090346' in err_str and proto == 'ldap':
continue
result['error'] = err_str
return result

result['error'] = last_error
return result


def _get_base_dn(self) -> str:
"""Convert domain to base DN"""
if not self.config.domain:
return ""

parts = self.config.domain.split('.')
return ','.join([f"DC={part}" for part in parts])
Loading