Confluent AI Hackathon
Sentinel AML is a fully real-time Anti-Money Laundering (AML) detection and case-triage platform built entirely on Confluent Cloud. It continuously monitors financial transaction streams, detects suspicious patterns using Flink SQL (including Confluent's built-in ML functions), enriches alerts with Azure OpenAI via ML_PREDICT, and autonomously triages cases using a Streaming Agent — all without any external orchestration.
Confluent Cloud
[Python Producer]
sentinel produce --------> sentinel.transactions_raw (Kafka)
|
+--------------+--------------+
v v v
[Flink SQL Jobs -- continuously running]
+------------+ +----------+ +--------------+
|Structuring | |Velocity | | Geo Anomaly |
|Detection | |Anomaly | | Detection |
|TUMBLE 60m | |ML_DETECT | | TUMBLE 30m |
|>=5 cash txn| |ANOMALIES | | >=3 countries|
+------------+ +----------+ +--------------+
| | |
+--------------+--------------+
v
sentinel.aml_alerts (Kafka)
|
+--------------v--------------+
| AI Enrichment (ML_PREDICT) |
| Azure OpenAI GPT-5-mini |
| -> analyst summary |
+--------------+--------------+
v
aml_alerts_enriched
|
+--------------v--------------+
| Streaming Agent |
| (AI_RUN_AGENT) |
| sentinel_triage_agent |
| -> ESCALATE/REVIEW/DISMISS |
| -> SAR recommendation |
+--------------+--------------+
v
alert_dispositions (Kafka)
|
+--------------v--------------+
| [Python Dashboard] |
| sentinel dashboard |
| Rich live TUI |
| (--mode kafka or --local) |
+-----------------------------+
| Feature | How Sentinel Uses It |
|---|---|
| Kafka Topics | transactions_raw, aml_alerts, alert_dispositions — event backbone |
| Schema Registry | JSON schemas validated at produce time; governance for compliance |
| Flink SQL | Tumbling windows, GROUP BY, HAVING for pattern detection |
ML_DETECT_ANOMALIES |
Built-in Flink ML for velocity spike detection per account |
ML_PREDICT |
Azure OpenAI via Confluent connection — AI alert summaries in-stream |
CREATE AGENT / AI_RUN_AGENT |
Streaming Agent autonomously triages every alert |
| Confluent Connectors | Kafka -> Python producer; Kafka -> Python consumer |
| Stream Processing | 3 simultaneous Flink detection jobs, enrichment pipeline |
| Pattern | Detection Method | Window | Threshold |
|---|---|---|---|
| Structuring (Smurfing) | TUMBLE window + HAVING | 60 min | >=5 cash/EFT txns, >=4 sub-$10k, total >=$30k |
| Transaction Velocity | ML_DETECT_ANOMALIES |
10 min | Statistical anomaly in txn count per account |
| Geographic Anomaly | TUMBLE window + COUNT DISTINCT | 30 min | >=3 distinct countries |
# Install uv (fast Python package manager)
winget install astral-sh.uv
# Install Terraform (required for deployment)
winget install Hashicorp.Terraform
# Install Python deps
cd confluent-ai-hackathon
uv syncYou need two sets of credentials:
-
Confluent Cloud — org-level API key (not a cluster key).
Create at: https://confluent.cloud/settings/api-keys
Select "Cloud resource management" scope. -
Azure OpenAI — endpoint URL + API key.
Find at: https://portal.azure.com > your Azure OpenAI resource > Keys and Endpoint.
Requires deployments:gpt-5-miniandtext-embedding-ada-002.OR
AWS Bedrock — IAM access key + secret with Bedrock permissions.
Requires model access enabled for Claude Sonnet 4.5 and Titan Embeddings.
cp credentials.env.example credentials.env
# Edit credentials.env — fill in the values shown in the file
notepad credentials.envuv run deployThis single command:
- Creates a Confluent Cloud environment, Kafka cluster, Schema Registry, and Flink compute pool
- Sets up LLM connections (Azure OpenAI or AWS Bedrock)
- Creates the sentinel Kafka topics
- Deploys all Flink SQL detection jobs (structuring, velocity, geo anomaly)
- Deploys AI enrichment (ML_PREDICT) and the triage streaming agent (AI_RUN_AGENT)
Deployment takes 3-5 minutes.
uv run produce --loop --mode mixed --delay 150uv run dashboarduv run dashboard --mode local
# or:
python -m sentinel demosentinel <command> [options]
Commands:
generate Emit synthetic transactions to stdout (JSON lines)
produce Stream transactions -> Confluent Kafka
consume Consume AML alerts <- Kafka, enrich with AI, print to stdout
dashboard Rich live TUI showing real-time alerts
demo Quick in-process demo (no Kafka required)
sentinel produce --help:
--events N Events per run (default: 200)
--customers N Distinct accounts (default: 20)
--mode MODE normal|structuring|velocity|geo|mixed (default: mixed)
--delay MS Milliseconds between events (default: 100)
--loop Continuously produce until Ctrl-C
--verbose Print each transaction
sentinel dashboard --help:
--mode MODE local (in-process, no Kafka) | kafka (live, default)
--max-alerts N Max alerts shown (default: 50)
sentinel/
|-- credentials.env <- your actual credentials (gitignored)
|-- credentials.env.example <- template showing required keys
|-- deploy.py <- root deploy script (uv run deploy)
|-- pyproject.toml
|-- src/sentinel/
| |-- config.py <- credential loader (terraform state -> env file -> OS vars)
| |-- models.py <- TransactionEvent, AlertEvent dataclasses
| |-- generator.py <- synthetic transaction generator
| |-- rules.py <- heuristic AML rules (used in local mode)
| |-- explanations.py <- rule-based + LLM prompt builders
| |-- explanation_service.py <- Azure OpenAI explanation service
| |-- kafka_producer.py <- Confluent Kafka producer
| |-- kafka_consumer.py <- Confluent Kafka consumer
| |-- dashboard.py <- Rich live TUI
| `-- __main__.py <- CLI dispatcher
|-- sql/
| `-- sentinel_rules.sql <- Flink SQL pipeline (manual reference)
`-- terraform/core/
|-- versions.tf
|-- variables.tf
|-- main.tf <- All Confluent resources + Flink SQL jobs
`-- outputs.tf
Maps the sentinel.transactions_raw Kafka topic to a Flink table with a 10-second watermark.
60-minute TUMBLE window; triggers when >=5 cash/EFT transactions, >=4 below $10,000, total >=$30,000. Matches the FinCEN structuring definition (BSA 5324).
10-minute windows per account fed into Confluent's built-in anomaly detection. Fires when transaction count exceeds the statistical upper bound — no threshold tuning needed.
30-minute TUMBLE window; flags accounts active in >=3 countries — a strong indicator of account takeover or layering.
INSERT INTO sentinel.aml_alerts from all three detection streams. Single Kafka topic for all alert types.
Each alert is enriched with an analyst-ready paragraph generated by Azure OpenAI GPT-5-mini via the Confluent Flink ML_PREDICT function. This runs entirely inside Flink — no external service calls from the Python layer required.
sentinel_triage_agent reviews each enriched alert and outputs a structured case disposition:
- ESCALATE / REVIEW / DISMISS
- Priority score (1-5)
- SAR (Suspicious Activity Report) recommendation
- Recommended action for the compliance team
AML compliance failures cost financial institutions billions annually in fines. Sentinel replaces batch-mode rule engines (which detect patterns hours or days late) with a sub-minute streaming pipeline:
| Metric | Traditional Batch | Sentinel |
|---|---|---|
| Detection latency | Hours to days | < 60 seconds |
| Alert enrichment | Manual analyst work | Automated (AI in-stream) |
| Case triage | Queue of days | Autonomous agent |
| False positive handling | Manual | Agent-assisted |
| Pattern coverage | Static rules | Adaptive (ML_DETECT_ANOMALIES) |