A high-performance, resilient, and production-grade Real-Time Fraud Detection System designed to expose financial money launderers, circular trading rings, and mule accounts. The system combines graph structural topology (PageRank, Strongly Connected Component cycles) with transactional behavioral profiling using a cost-sensitive XGBoost Classifier and a FastAPI / Redis / Kafka real-time scoring stream.
To ensure zero-friction testing and deployment, the codebase is built with a Dual-Mode Fallback Architecture:
- Enterprise Mode: Connects to Neo4j as the graph database, Redis as the low-latency Feature Store, and Confluent-Kafka as the real-time transaction event queue.
- Offline Mock Mode: If any local database or broker is unreachable, the pipeline automatically and seamlessly falls back to in-memory NetworkX graph processing, local JSON file caches, and thread-safe in-process stream queues.
- No complex Docker setup is required to run and test the complete pipeline from data generation to real-time alerting.
├── data
│ ├── raw # Generated users.csv and transactions.csv
│ └── processed # Processed features.csv and offline mock_redis.json
├── docker
│ └── docker-compose.yml # Docker environment for Neo4j, Redis, Zookeeper, and Kafka
├── models
│ ├── xgb_fraud_model.json # Serialized XGBoost model weights
│ └── feature_names.pkl # Serialized trained feature column ordering
├── src
│ ├── generator.py # Heuristic synthetic data & fraud patterns generator
│ ├── graph_ingest.py # Neo4j batch node and relationship ingestor
│ ├── graph_analytics.py # Neo4j modular community and SCC loop detector
│ ├── rules_engine.py # Baselines heuristic rules engine (Velocity & Pass-through)
│ ├── features.py # Batch feature extraction pipeline (Neo4j / NetworkX)
│ ├── model.py # Train & evaluate cost-sensitive XGBoost classifier
│ ├── preload_redis.py # Redis Feature Store preloader (Redis / JSON Cache)
│ ├── realtime_api.py # FastAPI REST scoring engine (Redis / JSON Cache)
│ └── streaming_pipeline.py # Real-time transaction ingestion and scoring simulator
├── check_env.py # Diagnostic script for checking service connectivity
├── requirements.txt # System library dependencies
└── README.md # Project documentation
Follow these steps in sequence to configure the environment, extract features, train the machine learning model, and launch the real-time scoring simulation.
-
Create and Activate Virtual Environment:
# Windows PowerShell python -m venv fraud_detection_env .\fraud_detection_env\Scripts\Activate.ps1
-
Install Dependencies:
pip install -r requirements.txt
-
Verify Settings: The system parameters are stored in the
.envfile. The default configuration uses local bolting ports:NEO4J_URI=bolt://localhost:7687 NEO4J_USER=neo4j NEO4J_PASSWORD=password123
Generate a realistic transaction log containing 1,000 users and over 5,000 transactions, injecting sophisticated fraud patterns:
- Mule Layering & Cashout: Multiple incoming small transfers to one node followed by a large cashout transfer.
- Collusive Ring Loops: Circular trading structures (e.g. A -> B -> C -> A) to artificially inflate velocity.
python src/generator.pyOutput: Generates users.csv and transactions.csv under data/raw/.
Run the feature engineering pipeline to extract 16 descriptive metrics per user.
If Neo4j is offline, the script automatically spins up an in-memory NetworkX directed graph to calculate centralities:
- PageRank: Captures user centrality and structural influence in high-value flows.
- Money Loops (SCCs): Flags nodes trapped in strongly connected circular cycles of size 3–8.
- Node Degrees: In-degree (unique senders fan-in) and out-degree (unique receivers fan-out).
- Aggregated Balances: Total received, total sent, net balance, and pass-through ratios.
- Device Sharing: Counts of other unique users logged in on the same device fingerprint.
python src/features.pyOutput: Generates clean, balanced datasets in data/processed/features.csv.
Train a cost-sensitive XGBClassifier optimized using stratified splits and regularized parameters. To counter class imbalance, the script calculates a dynamic loss penalty ratio (scale_pos_weight = 1.4096).
python src/model.pyOutput: Logs classification reports (Accuracy, F1-Score, PR-AUC), lists top feature importance rankings, and serializes the model weights to models/xgb_fraud_model.json.
Preload the pre-computed graph features into the real-time datastore.
If local Redis is offline, this script automatically creates a shared JSON database (data/processed/mock_redis.json) which acts as an active, file-persisted mock Feature Store:
python src/preload_redis.pyOutput: Populates Redis database hashes or serializes mock_redis.json.
Launch the end-to-end simulated transaction scoring pipeline. The pipeline streams transactions chronologically, queries the Feature Store, updates user velocity stats in real time, executes XGBoost predictions on both senders/receivers, and outputs active security alerts:
python src/streaming_pipeline.py-
Integration Dual-Mode: If you have not started the FastAPI server (
python src/realtime_api.py) in another console,streaming_pipeline.pywill automatically import the scoring logic in-process so you can view live alerts instantly! -
Alert Trigger Profiles:
-
SUSPECTED_ACTIVE_MULE: Spikes in inflow-to-outflow pass-through ratios matching flagged profiles. -
COLLUSIVE_LOOP_CASHOUT: Receiving funds while being an identified member of an SCC loop. -
HIGH_VALUE_EXFILTRATION: Large transfers ($\ge 4000$ ) involving flagged accounts.
-
============================================================
REAL-TIME FRAUD SCORING PIPELINE SHUTDOWN
============================================================
Simulation Status: COMPLETED SUCCESSFUL
Total Transactions Scored: 120
Total Suspicious Flags: 59
------------------------------------------------------------
FRAUD PATTERN DETECTION SUMMARY:
[MULE_LAYERING] Active Mules Flagged: 17
[COLLUSIVE_RING] Loops Cashed-Out Alert: 5
[HIGH_VALUE_TXN] Exfiltrations Flagged: 10
------------------------------------------------------------
ENVIRONMENT REPORT:
- Database: Redis Hash Store (Offline fallback: mock_redis.json)
- Ingestion: Confluent-Kafka stream (Offline fallback: Threaded Queue)
- Model: Cost-Sensitive XGBoost (models/xgb_fraud_model.json)
============================================================