-
Notifications
You must be signed in to change notification settings - Fork 0
Detection Rules
Comprehensive guide to all Anti-Money Laundering detection algorithms implemented in the platform.
The AML Detection Engine implements 5 core detection rules that analyze transactions in real-time to identify suspicious activities. Each rule is designed to detect specific money laundering typologies based on regulatory guidelines and industry best practices.
| Rule | Typology | Risk Score | Detection Method | Regulatory Basis |
|---|---|---|---|---|
| R1 | Sanctions Screening | 95% | Fuzzy name matching | OFAC, UN, EU sanctions |
| R2 | Geography Risk | 60-85% | Country risk mapping | FATF high-risk jurisdictions |
| R3 | Structuring | 80% | Pattern detection | BSA $10K reporting threshold |
| R4 | Velocity Anomalies | 70% | Time-based analysis | Unusual activity patterns |
| R5 | Round-Trip Transactions | 75% | Graph analysis | Circular money flows |
Purpose: Detect transactions involving individuals or entities on sanctions lists.
def _check_sanctions_screening(self, transaction_data):
"""Screen transaction parties against sanctions lists"""
alerts = []
# Check sender
sender_sanctions = self.db.get_sanctions_by_name(
transaction_data['sender_name']
)
# Check receiver
receiver_sanctions = self.db.get_sanctions_by_name(
transaction_data['receiver_name']
)
# Generate alerts for matches
if sender_sanctions:
alerts.append(self._create_sanctions_alert(
transaction_data, sender_sanctions[0], 'sender'
))
if receiver_sanctions:
alerts.append(self._create_sanctions_alert(
transaction_data, receiver_sanctions[0], 'receiver'
))
return alertsFuzzy String Matching:
- Name Normalization: Remove special characters, convert to lowercase
- Similarity Calculation: Levenshtein distance algorithm
- Threshold Matching: 90% similarity for positive match
- Multiple Variants: Check aliases and alternative spellings
| Source | Coverage | Update Frequency | Records |
|---|---|---|---|
| OFAC SDN | US Treasury sanctions | Daily | ~8,000 |
| OpenSanctions | Global consolidated list | Real-time | ~40,000 |
| UN Sanctions | United Nations lists | Weekly | ~1,500 |
| EU Sanctions | European Union lists | Daily | ~2,000 |
{
"matched_name": "Vladimir Petrov",
"sanctions_entry": "Vladimir Petrov (Russia - OFAC SDN)",
"match_confidence": 0.98,
"sanctions_program": "UKRAINE-EO13662",
"sanctions_type": "SDN",
"party_role": "sender",
"source": "OFAC"
}- Exact Match: 95% risk score
- High Confidence (>95% similarity): 90% risk score
- Medium Confidence (90-95% similarity): 85% risk score
-
Additional Factors:
- Country alignment: +5%
- Program severity: +5%
- Multiple list presence: +5%
Purpose: Identify transactions involving high-risk countries or corridors.
def _check_high_risk_geography(self, transaction_data):
"""Analyze geographic risk of transaction corridor"""
alerts = []
sender_country = transaction_data.get('sender_country', '').upper()
receiver_country = transaction_data.get('receiver_country', '').upper()
# Check high-risk corridors
corridor = f"{sender_country}β{receiver_country}"
high_risk_corridors = {
"USβIR": 0.85, # US to Iran
"USβKP": 0.90, # US to North Korea
"DEβRU": 0.75, # Germany to Russia
"GBβIR": 0.80, # UK to Iran
"FRβSY": 0.85, # France to Syria
}
if corridor in high_risk_corridors:
risk_score = high_risk_corridors[corridor]
alerts.append(self._create_geography_alert(
transaction_data, corridor, risk_score
))
return alertsCountry Risk Classifications:
| Risk Level | Countries | Risk Score | Examples |
|---|---|---|---|
| Very High | Sanctioned countries | 85-90% | Iran, North Korea, Syria |
| High | FATF blacklist | 75-85% | Myanmar, Nigeria (some regions) |
| Medium | Enhanced monitoring | 60-75% | Russia, Belarus, Afghanistan |
| Low | Standard monitoring | <60% | Most OECD countries |
Corridor Analysis:
- Direct Sanctions: USβIran, EUβRussia
- Indirect Routes: USβUAEβIran (layering detection)
- Offshore Centers: Transactions via BVI, Cayman Islands
- Cash-Intensive: Countries with high cash usage
{
"sender_country": "US",
"receiver_country": "IR",
"risk_corridor": "USβIR",
"corridor_risk": 0.85,
"fatf_classification": "High Risk",
"sanctions_status": "US Treasury Sanctions",
"additional_factors": [
"FATF Statement country",
"Enhanced due diligence required"
]
}Purpose: Detect multiple transactions designed to avoid reporting thresholds.
def _check_structuring_patterns(self, transaction_data):
"""Detect structuring patterns to avoid reporting thresholds"""
alerts = []
# Look for patterns in the same day
sender_name = transaction_data['sender_name']
transaction_date = transaction_data['transaction_date']
# Query transactions from same sender on same date
daily_transactions = self.db.get_transactions_by_sender_date(
sender_name, transaction_date
)
# Check for structuring indicators
if len(daily_transactions) >= 4: # Multiple transactions
amounts = [tx['amount'] for tx in daily_transactions]
# Check if most amounts are under $10K threshold
under_threshold = [amt for amt in amounts if amt < 10000]
if len(under_threshold) >= 3 and sum(amounts) > 15000:
alerts.append(self._create_structuring_alert(
transaction_data, daily_transactions
))
return alertsClassic Structuring:
- Multiple transactions under $10,000 USD
- Same parties within short time period
- Round number amounts ($9,000, $9,500)
- Sequential timing patterns
Smurfing:
- Multiple accounts or parties
- Coordinated transactions
- Similar amounts across different transactions
- Geographic distribution
Threshold Avoidance:
- Just under reporting limits
- Currency conversion to avoid thresholds
- Split across multiple days/weeks
{
"pattern_type": "Classic Structuring",
"transactions": [
{"amount": 9000, "time": "09:15"},
{"amount": 8500, "time": "11:30"},
{"amount": 9200, "time": "14:45"},
{"amount": 8800, "time": "16:20"}
],
"total_amount": 35500,
"threshold_analysis": {
"reporting_threshold": 10000,
"transactions_under_threshold": 4,
"average_amount": 8875,
"suspicious_indicators": [
"Multiple sub-threshold transactions",
"Round number amounts",
"Same day pattern"
]
}
}- 4+ transactions under threshold: 80% base score
- Same day execution: +10%
- Round amounts: +5%
- Total amount >$25K: +5%
- Sequential timing: +5%
Purpose: Detect unusual transaction frequency patterns that deviate from normal behavior.
def _check_velocity_anomalies(self, transaction_data):
"""Detect unusual transaction velocity patterns"""
alerts = []
sender_name = transaction_data['sender_name']
current_date = datetime.fromisoformat(transaction_data['transaction_date'])
# Check 24-hour window
window_start = current_date - timedelta(days=1)
recent_transactions = self.db.get_transactions_by_sender_window(
sender_name, window_start, current_date
)
# Velocity thresholds
if len(recent_transactions) >= 10: # 10+ transactions in 24 hours
alerts.append(self._create_velocity_alert(
transaction_data, recent_transactions, "HIGH_FREQUENCY"
))
# Amount velocity
total_amount = sum(tx['amount'] for tx in recent_transactions)
if total_amount > 500000: # $500K+ in 24 hours
alerts.append(self._create_velocity_alert(
transaction_data, recent_transactions, "HIGH_VOLUME"
))
return alertsFrequency Anomalies:
- 10+ transactions in 24 hours
- 20+ transactions in 7 days
- Sudden spike from baseline activity
- Weekend/holiday activity spikes
Volume Anomalies:
- $500K+ in 24 hours
- $2M+ in 7 days
- 10x normal transaction size
- Increasing amount patterns
Pattern Anomalies:
- Regular interval transactions (every 2 hours)
- Identical amounts repeatedly
- Multiple jurisdictions rapidly
- Off-hours activity
def calculate_baseline(self, sender_name, days=30):
"""Calculate normal activity baseline for comparison"""
historical_data = self.db.get_historical_transactions(
sender_name, days
)
return {
"avg_daily_transactions": np.mean(daily_counts),
"avg_daily_volume": np.mean(daily_volumes),
"std_transactions": np.std(daily_counts),
"std_volume": np.std(daily_volumes),
"normal_hours": most_common_hours,
"typical_amounts": percentile_amounts
}{
"velocity_type": "HIGH_FREQUENCY",
"window_period": "24_hours",
"transaction_count": 12,
"total_volume": 340000,
"baseline_comparison": {
"normal_daily_count": 2.3,
"normal_daily_volume": 75000,
"deviation_factor": 5.2
},
"time_pattern": {
"transaction_times": ["02:15", "04:30", "06:45", "09:00"],
"off_hours_count": 8,
"regular_intervals": true
}
}Purpose: Identify circular money flows that may indicate layering or integration schemes.
def _check_round_trip_transactions(self, transaction_data):
"""Detect round-trip and circular transaction patterns"""
alerts = []
sender = transaction_data['sender_name']
receiver = transaction_data['receiver_name']
amount = transaction_data['amount']
date = transaction_data['transaction_date']
# Look for reverse transactions within 30 days
window_start = datetime.fromisoformat(date) - timedelta(days=30)
reverse_transactions = self.db.get_transactions_by_parties_window(
sender=receiver, # Reversed roles
receiver=sender,
start_date=window_start,
end_date=date
)
for reverse_tx in reverse_transactions:
# Check for similar amounts (within 10%)
amount_diff = abs(amount - reverse_tx['amount']) / amount
if amount_diff <= 0.1: # Within 10% of original amount
alerts.append(self._create_roundtrip_alert(
transaction_data, reverse_tx
))
return alertsSimple Round-Trip:
- AβB: $100K
- BβA: $95K (within 30 days)
- Net flow analysis
Complex Layering:
- AβBβCβA (multi-hop)
- Currency conversion loops
- Multiple intermediary parties
- Time-delayed completion
Integration Schemes:
- Business-to-business flows
- Invoice manipulation
- Trade-based money laundering
- Investment round-trips
def analyze_transaction_graph(self, party_name, depth=3):
"""Analyze transaction flows using graph theory"""
# Build transaction graph
graph = nx.DiGraph()
transactions = self.db.get_party_transactions(party_name, days=90)
for tx in transactions:
graph.add_edge(
tx['sender_name'],
tx['receiver_name'],
weight=tx['amount'],
date=tx['transaction_date']
)
# Find cycles
cycles = list(nx.simple_cycles(graph))
# Analyze cycle characteristics
suspicious_cycles = []
for cycle in cycles:
if len(cycle) <= 5: # Short cycles more suspicious
cycle_value = self._calculate_cycle_value(graph, cycle)
if cycle_value > 50000: # Significant amounts
suspicious_cycles.append({
"cycle": cycle,
"value": cycle_value,
"length": len(cycle)
})
return suspicious_cycles{
"round_trip_type": "SIMPLE_REVERSE",
"original_transaction": {
"transaction_id": "TXN_001",
"amount": 100000,
"date": "2025-08-15",
"direction": "AβB"
},
"reverse_transaction": {
"transaction_id": "TXN_045",
"amount": 95000,
"date": "2025-08-18",
"direction": "BβA"
},
"analysis": {
"time_gap_days": 3,
"amount_difference": 5000,
"amount_difference_pct": 5.0,
"net_flow": 5000,
"possible_motivation": "Layering scheme"
}
}# Configurable detection thresholds
DETECTION_THRESHOLDS = {
"sanctions_match_confidence": 0.90,
"geography_risk_minimum": 0.60,
"structuring_transaction_count": 4,
"structuring_threshold_amount": 10000,
"velocity_transaction_count": 10,
"velocity_volume_threshold": 500000,
"roundtrip_amount_tolerance": 0.10,
"roundtrip_time_window_days": 30
}def calculate_aggregate_risk(self, alerts):
"""Calculate overall transaction risk score"""
if not alerts:
return 0.0
# Weight different alert types
weights = {
"SANCTIONS_MATCH": 1.0,
"HIGH_RISK_GEOGRAPHY": 0.8,
"STRUCTURING": 0.9,
"VELOCITY_ANOMALY": 0.7,
"ROUND_TRIP": 0.8
}
weighted_scores = []
for alert in alerts:
weight = weights.get(alert['typology'], 0.5)
weighted_scores.append(alert['risk_score'] * weight)
# Use maximum approach (not additive to avoid double-counting)
return min(max(weighted_scores), 1.0)| Rule | Avg Processing Time | Memory Usage | Detection Rate |
|---|---|---|---|
| Sanctions | 15ms | 2MB | 99.8% |
| Geography | 5ms | 0.5MB | 100% |
| Structuring | 25ms | 1MB | 95.2% |
| Velocity | 20ms | 1.5MB | 92.1% |
| Round-Trip | 35ms | 3MB | 87.3% |
| Rule | False Positive Rate | Tuning Status |
|---|---|---|
| Sanctions | 0.1% | Well-tuned |
| Geography | 2.3% | Needs refinement |
| Structuring | 1.8% | Acceptable |
| Velocity | 3.1% | Under review |
| Round-Trip | 2.7% | Acceptable |
def test_sanctions_screening():
"""Test sanctions rule with known positive"""
transaction = {
"sender_name": "Vladimir Petrov",
"receiver_name": "John Smith",
"amount": 50000,
"sender_country": "RU",
"receiver_country": "US"
}
alerts = engine._check_sanctions_screening(transaction)
assert len(alerts) == 1
assert alerts[0]['typology'] == 'SANCTIONS_MATCH'
assert alerts[0]['risk_score'] >= 0.90
def test_structuring_detection():
"""Test structuring with known pattern"""
# Generate 4 transactions under $10K
transactions = generate_structuring_pattern(
count=4, max_amount=9500
)
for tx in transactions:
alerts = engine._check_structuring_patterns(tx)
# Should detect on 4th transaction
assert len(alerts) >= 1
assert alerts[0]['typology'] == 'STRUCTURING'def test_full_detection_pipeline():
"""Test complete transaction processing"""
transaction = create_high_risk_transaction()
all_alerts = engine.process_transaction(transaction)
# Verify multiple rules triggered
typologies = [alert['typology'] for alert in all_alerts]
assert 'SANCTIONS_MATCH' in typologies
assert 'HIGH_RISK_GEOGRAPHY' in typologies
# Verify risk aggregation
max_risk = max(alert['risk_score'] for alert in all_alerts)
assert max_risk >= 0.85-
Machine Learning Integration
- Anomaly detection models
- Behavioral profiling
- Adaptive thresholds
-
Advanced Pattern Recognition
- Deep graph analysis
- Temporal pattern mining
- Cross-correlation detection
-
Real-time Streaming
- Apache Kafka integration
- Stream processing with Apache Flink
- Real-time model updates
-
Enhanced Sanctions
- Real-time PEP screening
- Adverse media monitoring
- Beneficial ownership analysis
Next: Database Schema - Complete database design and relationships