Skip to content

JavRedstone/sentinel-aml

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Sentinel AML — Real-Time AML Intelligence Platform

Confluent AI Hackathon

Sentinel AML is a fully real-time Anti-Money Laundering (AML) detection and case-triage platform built entirely on Confluent Cloud. It continuously monitors financial transaction streams, detects suspicious patterns using Flink SQL (including Confluent's built-in ML functions), enriches alerts with Azure OpenAI via ML_PREDICT, and autonomously triages cases using a Streaming Agent — all without any external orchestration.


Architecture

                    Confluent Cloud
                                         
  [Python Producer]                                                      
   sentinel produce -------->  sentinel.transactions_raw  (Kafka)  
                                         |                           
                          +--------------+--------------+            
                          v              v              v            
                [Flink SQL Jobs -- continuously running]              
                +------------+  +----------+  +--------------+      
                |Structuring |  |Velocity  |  | Geo Anomaly  |      
                |Detection   |  |Anomaly   |  |  Detection   |      
                |TUMBLE 60m  |  |ML_DETECT |  | TUMBLE 30m   |      
                |>=5 cash txn|  |ANOMALIES |  | >=3 countries|      
                +------------+  +----------+  +--------------+      
                          |              |              |            
                          +--------------+--------------+            
                                         v                           
                          sentinel.aml_alerts  (Kafka)              
                                         |                           
                          +--------------v--------------+            
                          |  AI Enrichment (ML_PREDICT) |            
                          |  Azure OpenAI GPT-5-mini    |            
                          |  -> analyst summary          |            
                          +--------------+--------------+            
                                         v                           
                          aml_alerts_enriched                        
                                         |                           
                          +--------------v--------------+            
                          |  Streaming Agent            |            
                          |  (AI_RUN_AGENT)             |            
                          |  sentinel_triage_agent      |            
                          |  -> ESCALATE/REVIEW/DISMISS |            
                          |  -> SAR recommendation      |            
                          +--------------+--------------+            
                                         v                           
                          alert_dispositions  (Kafka)                
                                         |
                          +--------------v--------------+
                          |  [Python Dashboard]          |
                          |  sentinel dashboard          |
                          |  Rich live TUI               |
                          |  (--mode kafka or --local)  |
                          +-----------------------------+

Confluent Platform Features Used

Feature How Sentinel Uses It
Kafka Topics transactions_raw, aml_alerts, alert_dispositions — event backbone
Schema Registry JSON schemas validated at produce time; governance for compliance
Flink SQL Tumbling windows, GROUP BY, HAVING for pattern detection
ML_DETECT_ANOMALIES Built-in Flink ML for velocity spike detection per account
ML_PREDICT Azure OpenAI via Confluent connection — AI alert summaries in-stream
CREATE AGENT / AI_RUN_AGENT Streaming Agent autonomously triages every alert
Confluent Connectors Kafka -> Python producer; Kafka -> Python consumer
Stream Processing 3 simultaneous Flink detection jobs, enrichment pipeline

AML Patterns Detected

Pattern Detection Method Window Threshold
Structuring (Smurfing) TUMBLE window + HAVING 60 min >=5 cash/EFT txns, >=4 sub-$10k, total >=$30k
Transaction Velocity ML_DETECT_ANOMALIES 10 min Statistical anomaly in txn count per account
Geographic Anomaly TUMBLE window + COUNT DISTINCT 30 min >=3 distinct countries

Quick Start

Prerequisites

# Install uv (fast Python package manager)
winget install astral-sh.uv

# Install Terraform (required for deployment)
winget install Hashicorp.Terraform

# Install Python deps
cd confluent-ai-hackathon
uv sync

API Keys Required

You need two sets of credentials:

  1. Confluent Cloud — org-level API key (not a cluster key).
    Create at: https://confluent.cloud/settings/api-keys
    Select "Cloud resource management" scope.

  2. Azure OpenAI — endpoint URL + API key.
    Find at: https://portal.azure.com > your Azure OpenAI resource > Keys and Endpoint.
    Requires deployments: gpt-5-mini and text-embedding-ada-002.

    OR

    AWS Bedrock — IAM access key + secret with Bedrock permissions.
    Requires model access enabled for Claude Sonnet 4.5 and Titan Embeddings.

1. Configure Credentials

cp credentials.env.example credentials.env
# Edit credentials.env — fill in the values shown in the file
notepad credentials.env

2. Deploy Everything

uv run deploy

This single command:

  • Creates a Confluent Cloud environment, Kafka cluster, Schema Registry, and Flink compute pool
  • Sets up LLM connections (Azure OpenAI or AWS Bedrock)
  • Creates the sentinel Kafka topics
  • Deploys all Flink SQL detection jobs (structuring, velocity, geo anomaly)
  • Deploys AI enrichment (ML_PREDICT) and the triage streaming agent (AI_RUN_AGENT)

Deployment takes 3-5 minutes.

3. Run the Producer

uv run produce --loop --mode mixed --delay 150

4. Open the Dashboard

uv run dashboard

Local Demo (no Kafka required)

uv run dashboard --mode local
# or:
python -m sentinel demo

CLI Reference

sentinel <command> [options]

Commands:
  generate    Emit synthetic transactions to stdout (JSON lines)
  produce     Stream transactions -> Confluent Kafka
  consume     Consume AML alerts <- Kafka, enrich with AI, print to stdout
  dashboard   Rich live TUI showing real-time alerts
  demo        Quick in-process demo (no Kafka required)

sentinel produce --help:
  --events N          Events per run (default: 200)
  --customers N       Distinct accounts (default: 20)
  --mode MODE         normal|structuring|velocity|geo|mixed (default: mixed)
  --delay MS          Milliseconds between events (default: 100)
  --loop              Continuously produce until Ctrl-C
  --verbose           Print each transaction

sentinel dashboard --help:
  --mode MODE         local (in-process, no Kafka) | kafka (live, default)
  --max-alerts N      Max alerts shown (default: 50)

Project Structure

sentinel/
|-- credentials.env             <- your actual credentials (gitignored)
|-- credentials.env.example     <- template showing required keys
|-- deploy.py                   <- root deploy script (uv run deploy)
|-- pyproject.toml
|-- src/sentinel/
|   |-- config.py               <- credential loader (terraform state -> env file -> OS vars)
|   |-- models.py               <- TransactionEvent, AlertEvent dataclasses
|   |-- generator.py            <- synthetic transaction generator
|   |-- rules.py                <- heuristic AML rules (used in local mode)
|   |-- explanations.py         <- rule-based + LLM prompt builders
|   |-- explanation_service.py  <- Azure OpenAI explanation service
|   |-- kafka_producer.py       <- Confluent Kafka producer
|   |-- kafka_consumer.py       <- Confluent Kafka consumer
|   |-- dashboard.py            <- Rich live TUI
|   `-- __main__.py             <- CLI dispatcher
|-- sql/
|   `-- sentinel_rules.sql      <- Flink SQL pipeline (manual reference)
`-- terraform/core/
    |-- versions.tf
    |-- variables.tf
    |-- main.tf                 <- All Confluent resources + Flink SQL jobs
    `-- outputs.tf

Flink SQL Pipeline Details

Step 0 — Source Table

Maps the sentinel.transactions_raw Kafka topic to a Flink table with a 10-second watermark.

Step 1 — Structuring Detection

60-minute TUMBLE window; triggers when >=5 cash/EFT transactions, >=4 below $10,000, total >=$30,000. Matches the FinCEN structuring definition (BSA 5324).

Step 2 — Velocity Anomaly (ML_DETECT_ANOMALIES)

10-minute windows per account fed into Confluent's built-in anomaly detection. Fires when transaction count exceeds the statistical upper bound — no threshold tuning needed.

Step 3 — Geo Anomaly Detection

30-minute TUMBLE window; flags accounts active in >=3 countries — a strong indicator of account takeover or layering.

Step 4 — Unified Alert Sink

INSERT INTO sentinel.aml_alerts from all three detection streams. Single Kafka topic for all alert types.

Step 5 — AI Enrichment (ML_PREDICT)

Each alert is enriched with an analyst-ready paragraph generated by Azure OpenAI GPT-5-mini via the Confluent Flink ML_PREDICT function. This runs entirely inside Flink — no external service calls from the Python layer required.

Step 6 — Streaming Agent (AI_RUN_AGENT)

sentinel_triage_agent reviews each enriched alert and outputs a structured case disposition:

  • ESCALATE / REVIEW / DISMISS
  • Priority score (1-5)
  • SAR (Suspicious Activity Report) recommendation
  • Recommended action for the compliance team

Business Impact

AML compliance failures cost financial institutions billions annually in fines. Sentinel replaces batch-mode rule engines (which detect patterns hours or days late) with a sub-minute streaming pipeline:

Metric Traditional Batch Sentinel
Detection latency Hours to days < 60 seconds
Alert enrichment Manual analyst work Automated (AI in-stream)
Case triage Queue of days Autonomous agent
False positive handling Manual Agent-assisted
Pattern coverage Static rules Adaptive (ML_DETECT_ANOMALIES)

About

First Place in the 2026 Confluent Data Streaming World Tour AI Day Toronto Hackathon

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors