Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"rapids_conda_channels": "-c rapidsai-nightly -c conda-forge",
"rapids_conda_packages": f"rapids={nightly_version} python=3.13 'cuda-version>=12.0,<=12.9'",
"rapids_pip_index": "https://pypi.anaconda.org/rapidsai-wheels-nightly/simple",
"rapids_pip_version": f"{nightly_version}.*,>=0.0.0a0",
"rapids_pip_version": f"~={nightly_version}.0a0",
# SageMaker Notebook Instance examples need to stay pinned to an older RAPIDS until this is resolved:
# https://github.com/rapidsai/deployment/issues/520
"rapids_sagemaker_conda_packages": f"rapids={nightly_version} python=3.12 cuda-version=13",
Expand Down
22 changes: 22 additions & 0 deletions source/examples/fraud-detection-mlops-pipeline/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Required: NGC API key for pulling the training container
NGC_API_KEY=

# MLflow tracking server URL
MLFLOW_TRACKING_URI=http://localhost:5050

# Prefect API URL
PREFECT_API_URL=http://localhost:4200/api

# Triton server URL
TRITON_HTTP_URL=localhost:8000
TRITON_GRPC_URL=localhost:8001

# Pipeline data paths (adjust for your environment)
DATA_ROOT=./data/TabFormer
MODEL_OUTPUT_DIR=./data/trained_models
TRITON_MODEL_REPO=./data/models

# Docker-compose volume paths
MLFLOW_DATA_DIR=./mlflow-data
PREFECT_DATA_DIR=./prefect-data
TRITON_IMAGE=triton-fraud:latest
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
services:
mlflow:
image: ghcr.io/mlflow/mlflow:v2.19.0
ports:
- "5050:5050"
volumes:
- ${MLFLOW_DATA_DIR:-./mlflow-data}:/mlflow
command: >
mlflow server
--host 0.0.0.0
--port 5050
--backend-store-uri sqlite:///mlflow/mlflow.db
--default-artifact-root mlflow-artifacts:/
--serve-artifacts
--artifacts-destination /mlflow/artifacts

prefect:
image: prefecthq/prefect:3-latest
ports:
- "4200:4200"
volumes:
- ${PREFECT_DATA_DIR:-./prefect-data}:/root/.prefect
command: prefect server start --host 0.0.0.0

triton:
image: ${TRITON_IMAGE:-triton-fraud:latest}
ports:
- "8000:8000"
- "8001:8001"
- "8002:8002"
volumes:
- ${TRITON_MODEL_REPO}:/models
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
command: >
tritonserver
--model-repository=/models
--model-control-mode=explicit
--exit-timeout-secs=6000
--http-port=8000
--grpc-port=8001
--metrics-port=8002
profiles:
- gpu
17 changes: 17 additions & 0 deletions source/examples/fraud-detection-mlops-pipeline/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: fraud-mlops
channels:
- rapidsai
- conda-forge
- nvidia
dependencies:
- python=3.13
- cuda-version>=13.0,<=13.1
- rapids=26.04
- prefect
- mlflow
- tritonclient
- python-dotenv
- category_encoders
- scipy
- networkx
- matplotlib
601 changes: 601 additions & 0 deletions source/examples/fraud-detection-mlops-pipeline/notebook.ipynb

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Fraud Detection MLOps Pipeline."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Centralized configuration for the fraud detection MLOps pipeline."""

import os

from dotenv import load_dotenv

load_dotenv()

# --- Paths ---

DATA_ROOT = os.getenv("DATA_ROOT", "/data/TabFormer")
RAW_CSV_PATH = os.path.join(DATA_ROOT, "raw", "card_transaction.v1.csv")
GNN_DATA_DIR = os.path.join(DATA_ROOT, "gnn")
TEST_DATA_DIR = os.path.join(DATA_ROOT, "gnn", "test_gnn")
MODEL_OUTPUT_DIR = os.getenv("MODEL_OUTPUT_DIR", "/data/trained_models")
TRITON_MODEL_REPO = os.getenv("TRITON_MODEL_REPO", "/models")

# --- Infrastructure ---

MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5050")
MLFLOW_EXPERIMENT_NAME = "fraud-detection-training"
MLFLOW_PREPROCESS_EXPERIMENT_NAME = "fraud-detection-preprocess"
PREFECT_API_URL = os.getenv("PREFECT_API_URL", "http://localhost:4200/api")
TRITON_HTTP_URL = os.getenv("TRITON_HTTP_URL", "localhost:8000")
TRITON_GRPC_URL = os.getenv("TRITON_GRPC_URL", "localhost:8001")

# --- Training ---

NGC_API_KEY = os.getenv("NGC_API_KEY", "")
TRAINING_IMAGE = "nvcr.io/nvidia/cugraph/financial-fraud-training:2.0.0"
TRITON_MODEL_NAME = "prediction_and_shapley"

DEFAULT_GNN_PARAMS = {
"hidden_channels": 32,
"n_hops": 2,
"layer": "SAGEConv",
"dropout_prob": 0.1,
"batch_size": 4096,
"fan_out": 10,
"num_epochs": 8,
}

DEFAULT_XGB_PARAMS = {
"max_depth": 6,
"learning_rate": 0.2,
"num_parallel_tree": 3,
"num_boost_round": 512,
"gamma": 0.0,
}

# --- Preprocessing ---

DEFAULT_FRAUD_RATIO = 0.1
DEFAULT_UNDER_SAMPLE = True

# --- Evaluation ---

PROMOTION_METRIC = "f1_score"
MIN_IMPROVEMENT = 0.0
DECISION_THRESHOLD = 0.5
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Register all flows with Prefect work pools and schedules."""

from prefect import serve

from pipeline.flows.deploy import deploy_flow
from pipeline.flows.evaluate import evaluate_flow
from pipeline.flows.full_pipeline import full_pipeline_flow
from pipeline.flows.preprocess import preprocess_flow
from pipeline.flows.train import train_flow


def main():
"""Register and serve all deployments."""
preprocess_deploy = preprocess_flow.to_deployment(
name="preprocess",
work_pool_name="gpu",
cron="0 0 * * *",
)
train_deploy = train_flow.to_deployment(
name="train",
work_pool_name="gpu",
)
evaluate_deploy = evaluate_flow.to_deployment(
name="evaluate",
work_pool_name="gpu",
)
deploy_deploy = deploy_flow.to_deployment(
name="deploy",
work_pool_name="gpu",
)
full_pipeline_deploy = full_pipeline_flow.to_deployment(
name="full-pipeline",
work_pool_name="gpu",
cron="0 2 * * *",
)

serve(
preprocess_deploy,
train_deploy,
evaluate_deploy,
deploy_deploy,
full_pipeline_deploy,
)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Prefect flows for each pipeline stage."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Deploy flow with Triton model version promotion or rollback."""

import logging

from prefect import flow

from pipeline.config import TRITON_HTTP_URL, TRITON_MODEL_NAME, TRITON_MODEL_REPO
from pipeline.tasks.mlflow_utils import register_champion
from pipeline.tasks.triton import (
cleanup_version_artifacts,
health_check,
reload_model,
)

logger = logging.getLogger(__name__)


@flow(name="deploy", log_prints=True)
def deploy_flow(
eval_result: dict,
triton_url: str = TRITON_HTTP_URL,
model_name: str = TRITON_MODEL_NAME,
model_repo_path: str = TRITON_MODEL_REPO,
) -> dict:
"""Promote or roll back a model version in Triton based on the evaluation result.

If the challenger won, removes the old champion, health-checks the new one,
and registers it in MLflow's model registry with the champion alias. If the
champion won, cleans up the challenger artifacts and restores Triton to its
previous state.
"""
should_promote = eval_result["should_promote"]
champion_version = eval_result["champion_version"]
challenger_version = eval_result["challenger_version"]
challenger_run_id = eval_result["challenger_run_id"]

if should_promote:
logger.info(
"Promoting challenger version %d as new champion", challenger_version
)

# Remove old champion artifacts and reload
if champion_version > 0:
cleanup_version_artifacts(model_repo_path, model_name, champion_version)
reload_model(triton_url, model_name)

# Health check new champion
healthy = health_check(triton_url, model_name, challenger_version)
if not healthy:
logger.error("Health check failed for version %d", challenger_version)
raise RuntimeError(
f"Deploy failed: version {challenger_version} not healthy after promotion."
)

# Register in MLflow
register_champion(challenger_run_id, model_name)

logger.info("Deploy complete: version %d is now champion", challenger_version)
return {
"action": "promoted",
"version": challenger_version,
"run_id": challenger_run_id,
}

else:
logger.info("Rejecting challenger version %d", challenger_version)

# Remove challenger artifacts and reload to restore champion-only state
cleanup_version_artifacts(model_repo_path, model_name, challenger_version)
if champion_version > 0:
reload_model(triton_url, model_name)

logger.info(
"Rejected: version %d cleaned up, champion version %d unchanged",
challenger_version,
champion_version,
)
return {
"action": "rejected",
"version": challenger_version,
"reason": eval_result.get("reason", ""),
}


if __name__ == "__main__":
import json
import sys

if len(sys.argv) > 1:
eval_result = json.loads(sys.argv[1])
deploy_flow(eval_result=eval_result)
Loading
Loading