diff --git a/source/conf.py b/source/conf.py index 18582aca..1e95d087 100644 --- a/source/conf.py +++ b/source/conf.py @@ -49,7 +49,7 @@ "rapids_conda_channels": "-c rapidsai-nightly -c conda-forge", "rapids_conda_packages": f"rapids={nightly_version} python=3.13 'cuda-version>=12.0,<=12.9'", "rapids_pip_index": "https://pypi.anaconda.org/rapidsai-wheels-nightly/simple", - "rapids_pip_version": f"{nightly_version}.*,>=0.0.0a0", + "rapids_pip_version": f"~={nightly_version}.0a0", # SageMaker Notebook Instance examples need to stay pinned to an older RAPIDS until this is resolved: # https://github.com/rapidsai/deployment/issues/520 "rapids_sagemaker_conda_packages": f"rapids={nightly_version} python=3.12 cuda-version=13", diff --git a/source/examples/fraud-detection-mlops-pipeline/.env.example b/source/examples/fraud-detection-mlops-pipeline/.env.example new file mode 100644 index 00000000..5d82bd02 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/.env.example @@ -0,0 +1,22 @@ +# Required: NGC API key for pulling the training container +NGC_API_KEY= + +# MLflow tracking server URL +MLFLOW_TRACKING_URI=http://localhost:5050 + +# Prefect API URL +PREFECT_API_URL=http://localhost:4200/api + +# Triton server URL +TRITON_HTTP_URL=localhost:8000 +TRITON_GRPC_URL=localhost:8001 + +# Pipeline data paths (adjust for your environment) +DATA_ROOT=./data/TabFormer +MODEL_OUTPUT_DIR=./data/trained_models +TRITON_MODEL_REPO=./data/models + +# Docker-compose volume paths +MLFLOW_DATA_DIR=./mlflow-data +PREFECT_DATA_DIR=./prefect-data +TRITON_IMAGE=triton-fraud:latest diff --git a/source/examples/fraud-detection-mlops-pipeline/docker-compose.yml b/source/examples/fraud-detection-mlops-pipeline/docker-compose.yml new file mode 100644 index 00000000..ee19bffa --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/docker-compose.yml @@ -0,0 +1,49 @@ +services: + mlflow: + image: ghcr.io/mlflow/mlflow:v2.19.0 + ports: + - "5050:5050" + volumes: + - ${MLFLOW_DATA_DIR:-./mlflow-data}:/mlflow + command: > + mlflow server + --host 0.0.0.0 + --port 5050 + --backend-store-uri sqlite:///mlflow/mlflow.db + --default-artifact-root mlflow-artifacts:/ + --serve-artifacts + --artifacts-destination /mlflow/artifacts + + prefect: + image: prefecthq/prefect:3-latest + ports: + - "4200:4200" + volumes: + - ${PREFECT_DATA_DIR:-./prefect-data}:/root/.prefect + command: prefect server start --host 0.0.0.0 + + triton: + image: ${TRITON_IMAGE:-triton-fraud:latest} + ports: + - "8000:8000" + - "8001:8001" + - "8002:8002" + volumes: + - ${TRITON_MODEL_REPO}:/models + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] + command: > + tritonserver + --model-repository=/models + --model-control-mode=explicit + --exit-timeout-secs=6000 + --http-port=8000 + --grpc-port=8001 + --metrics-port=8002 + profiles: + - gpu diff --git a/source/examples/fraud-detection-mlops-pipeline/environment.yml b/source/examples/fraud-detection-mlops-pipeline/environment.yml new file mode 100644 index 00000000..b4c2b645 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/environment.yml @@ -0,0 +1,17 @@ +name: fraud-mlops +channels: + - rapidsai + - conda-forge + - nvidia +dependencies: + - python=3.13 + - cuda-version>=13.0,<=13.1 + - rapids=26.04 + - prefect + - mlflow + - tritonclient + - python-dotenv + - category_encoders + - scipy + - networkx + - matplotlib diff --git a/source/examples/fraud-detection-mlops-pipeline/notebook.ipynb b/source/examples/fraud-detection-mlops-pipeline/notebook.ipynb new file mode 100644 index 00000000..66bcde74 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/notebook.ipynb @@ -0,0 +1,601 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "356e542f", + "metadata": { + "tags": [ + "library/cudf", + "library/xgboost", + "library/prefect", + "library/mlflow", + "library/triton", + "data-format/csv", + "workflow/mlops" + ] + }, + "source": [ + "# Orchestrating a Fraud Detection Model Lifecycle with cuDF, Prefect, MLflow, and Triton\n", + "\n", + "_April, 2026_\n", + "\n", + "A production MLOps pipeline for the [NVIDIA Financial Fraud Detection AI Blueprint](https://github.com/NVIDIA-AI-Blueprints/financial-fraud-detection), built with [Prefect](https://www.prefect.io/), [MLflow](https://mlflow.org/), and [Triton Inference Server](https://github.com/triton-inference-server/server).\n", + "\n", + "The blueprint provides a [notebook](https://github.com/NVIDIA-AI-Blueprints/financial-fraud-detection/blob/main/notebooks/financial-fraud-usage-v2.ipynb) that manually walks through data preprocessing, GNN + XGBoost model training, Triton serving, and inference evaluation. This example takes that workflow and wraps each stage in automated orchestration with experiment tracking and continuous deployment for a production-ready model lifecycle implementation." + ] + }, + { + "cell_type": "markdown", + "id": "518f17ff", + "metadata": {}, + "source": [ + "## What the Blueprint Does\n", + "\n", + "The blueprint notebook can be divided into 4 steps:\n", + "\n", + "1. **Load and preprocess** 24M credit card transactions from the [IBM TabFormer](https://github.com/IBM/TabFormer) dataset using cuDF, building a bipartite User \u2194 Merchant graph where edges represent transactions\n", + "2. **Train** a 2-hop SAGEConv GNN that produces node embeddings, then an XGBoost classifier on those embeddings for edge classification (To determine fraudulent transactions)\n", + "3. **Serve** the trained model on Triton Inference Server with a custom Python backend\n", + "4. **Evaluate** by sending held-out test data to Triton and computing F1, precision, recall, and accuracy\n", + "\n", + "In the blueprint notebook, each step is a notebook cell that you run manually and inspect the output. However, in a production scenario, there is a need for orchestration, experiment tracking, model versioning, and automated deployment.\n", + "\n", + "The aim of this workflow example is to make this flow production-ready by adding the following components:\n", + "\n", + "- **Orchestration**: [Prefect](https://docs.prefect.io) flows chain the stages together, track runs, and handle failures\n", + "- **Experiment tracking**: [MLflow](https://mlflow.org/docs/latest) logs hyperparameters, model artifacts, and evaluation metrics for every run\n", + "- **Continuous deployment**: champion/challenger evaluation via [Triton's](https://docs.nvidia.com/deeplearning/triton-inference-server/user-guide/docs/index.html) native model versioning, with automatic promotion or rollback" + ] + }, + { + "cell_type": "markdown", + "id": "4fc21661", + "metadata": {}, + "source": [ + "## Architecture\n", + "\n", + "Similar to the structure of the notebook, the production pipeline has four stages, each implemented as a Prefect [flow](https://docs.prefect.io/3.0/develop/write-flows) composed of reusable [tasks](https://docs.prefect.io/3.0/develop/write-tasks):\n", + "\n", + "```text\n", + "full_pipeline_flow\n", + "\u251c\u2500\u2500 preprocess_flow \u2192 cuDF graph formation + MLflow metadata logging\n", + "\u251c\u2500\u2500 train_flow \u2192 NGC training container + MLflow experiment logging\n", + "\u251c\u2500\u2500 evaluate_flow \u2192 Triton champion/challenger scoring + MLflow metrics\n", + "\u2514\u2500\u2500 deploy_flow \u2192 Promote or rollback + MLflow model registry\n", + "```\n", + "\n", + "The full pipeline runs all four as subflows in sequence, passing results between them. Each stage can also run independently for ad-hoc experiments.\n", + "\n", + "### Infrastructure\n", + "\n", + "Three services support the pipeline, all running as Docker containers via `docker-compose.yml`:\n", + "\n", + "| Service | Role | Port |\n", + "|---------|------|------|\n", + "| **Prefect** | Flow orchestration, run tracking, scheduling, UI | 4200 |\n", + "| **MLflow** | Experiment tracking, artifact storage, model registry | 5050 |\n", + "| **Triton** | GPU inference, model versioning, model control API | 8000/8001/8002 |\n", + "\n", + "### Champion/Challenger via Triton Native Versioning\n", + "\n", + "The evaluate stage uses Triton's built-in model versioning for zero-downtime model comparison. Triton's model repository supports multiple version directories (`1/`, `2/`, `3/`...), and with `version_policy: { all: {} }` in `config.pbtxt`, all versions are served simultaneously." + ] + }, + { + "cell_type": "markdown", + "id": "2930520b", + "metadata": {}, + "source": [ + "## Component Structure\n", + "\n", + "```text\n", + "fraud-detection-mlops/\n", + "\u251c\u2500\u2500 pipeline/\n", + "\u2502 \u251c\u2500\u2500 config.py Paths, service URLs, hyperparameters, thresholds\n", + "\u2502 \u251c\u2500\u2500 deployments.py Register flows with work pools and schedules\n", + "\u2502 \u251c\u2500\u2500 flows/\n", + "\u2502 \u2502 \u251c\u2500\u2500 preprocess.py Stage 1: cuDF graph formation + MLflow\n", + "\u2502 \u2502 \u251c\u2500\u2500 train.py Stage 2: NGC container + MLflow logging\n", + "\u2502 \u2502 \u251c\u2500\u2500 evaluate.py Stage 3: Triton versioning + champion/challenger scoring\n", + "\u2502 \u2502 \u251c\u2500\u2500 deploy.py Stage 4: promote or rollback\n", + "\u2502 \u2502 \u2514\u2500\u2500 full_pipeline.py Orchestrator: chains all stages as subflows\n", + "\u2502 \u2514\u2500\u2500 tasks/\n", + "\u2502 \u251c\u2500\u2500 data.py Test data loading for evaluation\n", + "\u2502 \u251c\u2500\u2500 training.py Config generation + Docker container lifecycle\n", + "\u2502 \u251c\u2500\u2500 mlflow_utils.py Experiment tracking + model registry\n", + "\u2502 \u2514\u2500\u2500 triton.py Version staging, model reload, inference scoring\n", + "\u251c\u2500\u2500 scripts/\n", + "\u2502 \u251c\u2500\u2500 preprocess_tabformer.py cuDF preprocessing (adapted from blueprint)\n", + "\u2502 \u2514\u2500\u2500 download_data.sh TabFormer dataset download helper\n", + "\u251c\u2500\u2500 triton/\n", + "\u2502 \u2514\u2500\u2500 Dockerfile Custom Triton image with PyTorch, XGBoost GPU, PyG\n", + "\u251c\u2500\u2500 docker-compose.yml MLflow + Prefect + Triton services\n", + "\u251c\u2500\u2500 environment.yml Conda environment (RAPIDS + pipeline deps)\n", + "\u2514\u2500\u2500 .env.example Configuration template\n", + "```\n", + "\n", + "The `tasks/` layer contains reusable units of work (data loading, container execution, MLflow logging, Triton management). The `flows/` layer composes tasks into pipeline stages. This separation is important for scaling: tasks can be distributed across workers and [work pools](https://docs.prefect.io/3.0/deploy/infrastructure-concepts/work-pools), while flows define the orchestration logic." + ] + }, + { + "cell_type": "markdown", + "id": "6fd03b8b", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "- **NVIDIA GPU**: Turing architecture or newer (T4, A10G, L4, A100, H100). Compute capability >= 7.5 is required by the training container.\n", + "- **Docker** with [NVIDIA Container Toolkit](https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html) for GPU access\n", + "- **NGC API key** from [ngc.nvidia.com/setup/api-key](https://ngc.nvidia.com/setup/api-key) (needed to pull the training container)\n", + "- **Conda**: [miniforge](https://github.com/conda-forge/miniforge) is recommended\n" + ] + }, + { + "cell_type": "markdown", + "id": "8048a7c1", + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "Download the companion files from the sidebar or clone the repository, then configure your environment.\n", + "\n", + "### Configure\n", + "\n", + "Copy the environment template and set your NGC API key and data paths:\n", + "\n", + "```console\n", + "$ cp .env.example .env\n", + "```\n", + "\n", + "Edit `.env` to set your `NGC_API_KEY` and adjust data paths for your environment.\n", + "\n", + "### Install Python dependencies\n", + "\n", + "All dependencies (RAPIDS, Prefect, MLflow, Triton client) are specified in `environment.yml`:\n", + "\n", + "```console\n", + "$ conda env create -f environment.yml\n", + "$ conda activate fraud-mlops\n", + "```\n", + "\n", + "### Download the TabFormer dataset\n", + "\n", + "The dataset is 266MB of synthetic credit card transactions (24M rows, 15 columns) from [IBM TabFormer](https://github.com/IBM/TabFormer):\n", + "\n", + "```console\n", + "$ ./scripts/download_data.sh ./data/TabFormer\n", + "```\n", + "\n", + "This prompts you to download `transactions.tgz` from [IBM Box](https://ibm.ent.box.com/v/tabformer-data/folder/130747715605) and extract it.\n", + "\n", + "### Start the infrastructure\n", + "\n", + "```console\n", + "# MLflow + Prefect (no GPU needed)\n", + "$ docker compose up -d\n", + "\n", + "# With Triton (needs GPU and a pre-built image, see below)\n", + "$ docker compose --profile gpu up -d\n", + "```\n", + "\n", + "Verify the services are running:\n", + "- Prefect UI: [http://localhost:4200](http://localhost:4200)\n", + "- MLflow UI: [http://localhost:5050](http://localhost:5050)\n", + "\n", + "### Pull the training container\n", + "\n", + "```console\n", + "$ echo \"$NGC_API_KEY\" | docker login nvcr.io --username '$oauthtoken' --password-stdin\n", + "$ docker pull nvcr.io/nvidia/cugraph/financial-fraud-training:2.0.0\n", + "```\n", + "\n", + "### Build the Triton serving image\n", + "\n", + "The custom Triton image (`triton/Dockerfile`) installs PyTorch 2.7, XGBoost 3.0 (GPU), PyTorch Geometric 2.6, and Captum on top of the Triton 25.04 base image:\n", + "\n", + "```console\n", + "$ docker build -t triton-fraud:latest triton/\n", + "```\n", + "\n", + "This build takes 15-20 minutes and produces a ~24GB image." + ] + }, + { + "cell_type": "markdown", + "id": "6e78c5a1", + "metadata": {}, + "source": [ + "## Stage 1: Preprocessing\n", + "\n", + "**Flow:** `pipeline/flows/preprocess.py` | **Script:** `scripts/preprocess_tabformer.py`\n", + "\n", + "The preprocessing stage takes the raw TabFormer CSV and produces graph-structured data for the GNN training container.\n", + "\n", + "The script (`scripts/preprocess_tabformer.py`, adapted from the blueprint's `preprocess_TabFormer_lp.py`) uses **cuDF** for the following GPU-accelerated DataFrame operations on the 24M-row dataset:\n", + "\n", + "1. **Clean column names**: strip spaces, standardize field names\n", + "2. **Encode categorical features**: one-hot encoding for low-cardinality fields (< 8 categories), binary encoding for high-cardinality fields\n", + "3. **Build the bipartite graph**: Users and Merchants become nodes, each transaction becomes an edge with encoded attributes and a fraud/not-fraud label\n", + "4. **Undersample**: balance the dataset to a configurable fraud ratio (default 10%)\n", + "5. **Split by year**: pre-2018 for training, 2018 for validation, post-2018 for testing\n", + "6. **Write CSVs**: node features, edge indices, edge attributes, edge labels, and feature masks to `gnn/` and `gnn/test_gnn/`\n", + "\n", + "The Prefect flow wraps this script and logs preprocessing metadata (row counts, graph dimensions) to MLflow:\n", + "\n", + "```python\n", + "# pipeline/flows/preprocess.py\n", + "@flow(name=\"preprocess\", log_prints=True)\n", + "def preprocess_flow(\n", + " raw_csv_path: str = RAW_CSV_PATH,\n", + " output_base_path: str = DATA_ROOT,\n", + " fraud_ratio: float = 0.1,\n", + " under_sample: bool = True,\n", + ") -> dict:\n", + " metadata, user_mask_map, mx_mask_map, tx_mask_map = preprocess_data(...)\n", + " # Log to MLflow\n", + " experiment_id = get_or_create_experiment()\n", + " with mlflow.start_run(experiment_id=experiment_id, run_name=\"preprocess\"):\n", + " mlflow.log_params({\"preprocess.fraud_ratio\": fraud_ratio, ...})\n", + " mlflow.log_metrics({\"preprocess.num_transactions\": float(metadata[\"num_transactions\"]), ...})\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "68b48cfd", + "metadata": {}, + "source": [ + "## Stage 2: Training\n", + "\n", + "**Flow:** `pipeline/flows/train.py` | **Tasks:** `pipeline/tasks/training.py`, `pipeline/tasks/mlflow_utils.py`\n", + "\n", + "Training uses NVIDIA's `financial-fraud-training:2.0.0` NGC container for training. The pipeline generates the config in the format the container expects, runs the container, and collects the artifacts.\n", + "\n", + "### Config generation\n", + "\n", + "The `generate_training_config` task builds a JSON config from the flow's hyperparameters:\n", + "\n", + "```python\n", + "# Default hyperparameters from pipeline/config.py\n", + "DEFAULT_GNN_PARAMS = {\n", + " \"hidden_channels\": 32, \"n_hops\": 2, \"layer\": \"SAGEConv\",\n", + " \"dropout_prob\": 0.1, \"batch_size\": 4096, \"fan_out\": 10, \"num_epochs\": 8,\n", + "}\n", + "DEFAULT_XGB_PARAMS = {\n", + " \"max_depth\": 6, \"learning_rate\": 0.2, \"num_parallel_tree\": 3,\n", + " \"num_boost_round\": 512, \"gamma\": 0.0,\n", + "}\n", + "```\n", + "\n", + "### Container execution\n", + "\n", + "The `run_training_container` task runs the NGC container with volume mounts for graph data, output directory, and config:\n", + "\n", + "```python\n", + "# pipeline/tasks/training.py\n", + "cmd = [\n", + " \"docker\", \"run\", \"--rm\", \"--gpus\", \"device=0\",\n", + " \"--cap-add\", \"SYS_NICE\", \"--shm-size=8g\", \"--privileged\",\n", + " \"-e\", \"NCCL_IB_DISABLE=1\", \"-e\", \"NCCL_NET=Socket\", # Cloud NCCL workarounds\n", + " \"-v\", f\"{data_dir}:/data\",\n", + " \"-v\", f\"{output_dir}:/trained_models\",\n", + " \"-v\", f\"{config_path}:/app/config.json\",\n", + " \"--entrypoint\", \"bash\", training_image,\n", + " \"-c\", \"torchrun --standalone --nproc_per_node=1 /app/main.py --config /app/config.json\",\n", + "]\n", + "```\n", + "\n", + "The NCCL environment variables (`NCCL_IB_DISABLE`, `NCCL_NET=Socket`, etc.) are a workaround for instances without InfiniBand hardware. The container is bundled with UCX that is designed for faster HPC networking, which is not usually available on standard single GPU instances.\n", + "\n", + "```{note}\n", + "If running on an InfiniBand-enabled system (DGX, HPC cluster), remove the NCCL environment variables from `pipeline/tasks/training.py`. These overrides disable InfiniBand, NVLink, and shared memory transports. On hardware that supports them, NCCL auto-detects and uses the optimal transport without any configuration.\n", + "```\n", + "\n", + "### MLflow logging\n", + "\n", + "After training completes, the flow logs everything to MLflow:\n", + "\n", + "- **Hyperparameters**: `gnn.hidden_channels`, `gnn.n_hops`, `xgb.max_depth`, `xgb.num_boost_round`, etc.\n", + "- **Artifacts**: the training config JSON and the full model directory\n", + "- **Tags**: `trigger: automated` (or custom tags for ad-hoc experiments)\n", + "\n", + "The flow returns the MLflow run ID, which the evaluate stage uses to associate metrics with the same run." + ] + }, + { + "cell_type": "markdown", + "id": "a75c60f7", + "metadata": {}, + "source": [ + "## Stage 3: Evaluation\n", + "\n", + "**Flow:** `pipeline/flows/evaluate.py` | **Tasks:** `pipeline/tasks/triton.py`, `pipeline/tasks/data.py`\n", + "\n", + "The evaluation stage compares the newly trained model (challenger) against the currently deployed model (champion) using Triton's native versioning.\n", + "\n", + "### Staging the challenger\n", + "\n", + "The `stage_challenger_version` task copies the new model's artifacts as the next version directory in Triton's model repository. If the champion is version 1, the challenger becomes version 2:\n", + "\n", + "```text\n", + "prediction_and_shapley/\n", + "\u251c\u2500\u2500 1/ \u2190 champion (currently serving)\n", + "\u2502 \u251c\u2500\u2500 model.py\n", + "\u2502 \u251c\u2500\u2500 state_dict_gnn_model.pth\n", + "\u2502 \u2514\u2500\u2500 embedding_based_xgboost.json\n", + "\u251c\u2500\u2500 2/ \u2190 challenger (just staged)\n", + "\u2502 \u251c\u2500\u2500 model.py\n", + "\u2502 \u251c\u2500\u2500 state_dict_gnn_model.pth\n", + "\u2502 \u2514\u2500\u2500 embedding_based_xgboost.json\n", + "\u2514\u2500\u2500 config.pbtxt (version_policy { all {} })\n", + "```\n", + "\n", + "The task also ensures `config.pbtxt` has `version_policy { all {} }` so Triton serves both versions simultaneously. Without this, Triton defaults to `latest: { num_versions: 1 }` and would drop the champion when the challenger is loaded.\n", + "\n", + "### Scoring\n", + "\n", + "After reloading the model in Triton, the flow loads the held-out test data (25,803 samples, ~8% fraud) and sends it to both versions using Triton's version-specific inference:\n", + "\n", + "```python\n", + "# pipeline/tasks/triton.py\n", + "response = client.infer(\n", + " model_name,\n", + " inputs=inputs,\n", + " model_version=str(version), # \"1\" for champion, \"2\" for challenger\n", + " outputs=outputs,\n", + ")\n", + "```\n", + "\n", + "Each version returns fraud probability scores. The task applies a decision threshold (default 0.5) and computes F1, precision, recall, and accuracy against the ground truth labels.\n", + "\n", + "### Promotion decision\n", + "\n", + "The flow compares the challenger's F1 score against the champion's. Promotion requires the challenger to exceed the champion by at least `min_improvement` (default 0.0):\n", + "\n", + "```python\n", + "should_promote = challenger_f1 > champion_f1 + min_improvement\n", + "```\n", + "\n", + "Both sets of metrics, the deltas, and the promotion decision are logged to the same MLflow run that was created during training." + ] + }, + { + "cell_type": "markdown", + "id": "c75168ae", + "metadata": {}, + "source": [ + "## Stage 4: Deployment\n", + "\n", + "**Flow:** `pipeline/flows/deploy.py`\n", + "\n", + "The deploy stage acts on the evaluation result.\n", + "\n", + "### If the challenger wins (promote)\n", + "\n", + "1. Remove the old champion's version directory from the model repository\n", + "2. Reload the model in Triton (only the challenger version remains)\n", + "3. Run a health check to verify the new champion is responsive\n", + "4. Register the model version in MLflow's model registry and assign the `champion` alias\n", + "\n", + "If the health check fails, the flow raises an error. This surfaces in Prefect as a failed run that you can investigate.\n", + "\n", + "### If the champion wins (reject)\n", + "\n", + "1. Remove the challenger's version directory\n", + "2. Reload Triton to restore the champion-only state\n", + "\n", + "In both cases, Triton ends up serving exactly one version: the winner. The MLflow model registry provides an authoritative record of which model version is in production via the `champion` alias:\n", + "\n", + "```python\n", + "# pipeline/tasks/mlflow_utils.py\n", + "mv = mlflow.register_model(f\"runs:/{run_id}/model\", model_name)\n", + "client.set_registered_model_alias(model_name, \"champion\", mv.version)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "d02b3bc0", + "metadata": {}, + "source": [ + "## Running the Pipeline\n", + "\n", + "### Full pipeline\n", + "\n", + "Run all four stages end-to-end:\n", + "\n", + "```console\n", + "$ python -m pipeline.flows.full_pipeline\n", + "```\n", + "\n", + "This chains preprocess \u2192 train \u2192 evaluate \u2192 deploy as subflows, passing results between stages automatically. If graph data already exists from a previous preprocessing run, set `skip_preprocess=True` in the flow parameters.\n", + "\n", + "### Individual stages\n", + "\n", + "Each stage can be run independently for debugging or ad-hoc experiments:\n", + "\n", + "```console\n", + "# Preprocess raw CSV into graph data\n", + "$ python -m pipeline.flows.preprocess\n", + "\n", + "# Train with default hyperparameters\n", + "$ python -m pipeline.flows.train\n", + "\n", + "# Evaluate a specific training run\n", + "$ python -m pipeline.flows.evaluate \n", + "\n", + "# Deploy based on evaluation result\n", + "$ python -m pipeline.flows.deploy '{\"should_promote\": true, ...}'\n", + "```\n", + "\n", + "### Experimenting with hyperparameters\n", + "\n", + "To try different model configurations, pass custom hyperparameters to the train flow:\n", + "\n", + "```python\n", + "from pipeline.flows.train import train_flow\n", + "\n", + "run_id = train_flow(\n", + " gnn_params={\"hidden_channels\": 64, \"n_hops\": 3, \"num_epochs\": 12, ...},\n", + " xgb_params={\"max_depth\": 8, \"num_boost_round\": 1024, ...},\n", + ")\n", + "```\n", + "\n", + "Each run is tracked in MLflow with full hyperparameters and metrics, making it straightforward to compare experiments. The evaluate stage will then score this new model against the current champion." + ] + }, + { + "cell_type": "markdown", + "id": "881b10b3", + "metadata": {}, + "source": [ + "## Monitoring and Logging\n", + "\n", + "### Prefect UI\n", + "\n", + "The [Prefect UI](http://localhost:4200) at port 4200 tracks every flow run. Here is a completed full pipeline run showing the three subflows (train, evaluate, deploy) with their execution timeline:\n", + "\n", + "![Prefect flow run showing subflow hierarchy](../../images/prefect-flow-runs.png)\n", + "\n", + "The UI also shows task-level logs from each step, which is useful for debugging container failures or Triton connection issues.\n", + "\n", + "### MLflow UI\n", + "\n", + "The [MLflow UI](http://localhost:5050) at port 5050 tracks experiments across runs. The experiment table shows all training runs with their hyperparameters and evaluation metrics side by side:\n", + "\n", + "![MLflow experiment table with training runs](../../images/mlflow-experiment-table.png)\n", + "\n", + "Clicking into a run shows the full details: all GNN and XGBoost hyperparameters, challenger and champion metrics, deltas, and the promotion decision:\n", + "\n", + "![MLflow run details with parameters and metrics](../../images/mlflow-run-details.png)\n", + "\n", + "The model registry tracks which version is in production. The `champion` alias always points to the current production model:\n", + "\n", + "![MLflow model registry with champion alias](../../images/mlflow-model-registry.png)\n", + "\n", + "In a typical production cycle, start with a full pipeline run to establish a baseline champion. From there, the cycle is: tweak hyperparameters, run `train` + `evaluate`, and check the MLflow experiment table to compare the challenger against the current champion. Promoted models are immediately live in Triton without any need for a manual deployment." + ] + }, + { + "cell_type": "markdown", + "id": "10b99a5d", + "metadata": {}, + "source": [ + "## Scaling to Multiple Machines\n", + "\n", + "The default mode runs all stages as subflows in a single process on one GPU machine. For multi-node architecture, `pipeline/deployments.py` registers each flow as a Prefect [deployment](https://docs.prefect.io/3.0/deploy/infrastructure-concepts/deployments) with its own work pool and schedule.\n", + "\n", + "### How it works\n", + "\n", + "The evaluate and deploy stages need both filesystem access to Triton's model repository (for staging model versions) and HTTP access to the Triton API (for reloading models and running inference). Due to this dependency, all pipeline stages must run on the same machine as the Triton server.\n", + "\n", + "The recommended multi-node setup splits responsibilities across two machines:\n", + "- **Local machine** \u2014 runs Prefect and MLflow servers, providing direct browser access to both UIs without port forwarding. Used only for monitoring and triggering runs.\n", + "- **GPU instance** \u2014 executes all pipeline flows and serves models via Triton.\n", + "\n", + "### Setting up deployments\n", + "\n", + "**1. Start the services on the local machine:**\n", + "\n", + "```console\n", + "$ docker compose up -d\n", + "```\n", + "\n", + "This starts Prefect (port 4200) and MLflow (port 5050). Both UIs are available at `localhost` in your browser.\n", + "\n", + "**2. Point the GPU instance at the local machine** by setting the API URLs:\n", + "\n", + "```console\n", + "# On the GPU instance (replace with the local machine IP)\n", + "$ export PREFECT_API_URL=http://LOCAL_MACHINE_IP:4200/api\n", + "$ export MLFLOW_TRACKING_URI=http://LOCAL_MACHINE_IP:5050\n", + "```\n", + "\n", + "```{note}\n", + "The GPU instance must have network access to the local machine on ports 4200 (Prefect) and 5050 (MLflow). Ensure these ports are open in your firewall or security group rules.\n", + "```\n", + "\n", + "**3. Create the work pool** (run once, from the local machine):\n", + "\n", + "```console\n", + "$ prefect work-pool create gpu --type process\n", + "```\n", + "\n", + "**4. Register and serve all deployments:**\n", + "\n", + "```console\n", + "$ python -m pipeline.deployments\n", + "```\n", + "\n", + "This registers 5 deployments. Simulating a real world scenario, preprocessing runs at midnight to rebuild graph data from the latest raw CSV, and the full pipeline runs at 2am (after preprocessing completes) to retrain, evaluate, and potentially deploy a new champion. The individual stages are on-demand for ad-hoc experiments:\n", + "\n", + "- `preprocess` (gpu pool, nightly at midnight)\n", + "- `train` (gpu pool, on-demand)\n", + "- `evaluate` (gpu pool, on-demand)\n", + "- `deploy` (gpu pool, on-demand)\n", + "- `full-pipeline` (gpu pool, nightly at 2am)\n", + "\n", + "**5. Start a worker on the GPU instance:**\n", + "\n", + "```console\n", + "$ prefect worker start --pool gpu\n", + "```\n", + "\n", + "You can trigger runs from the Prefect UI on the local machine or via the CLI:\n", + "\n", + "```console\n", + "$ prefect deployment run full-pipeline/full-pipeline\n", + "```\n", + "\n", + "The components of the pipeline do not change between single-machine and multi-machine modes. The difference is that flow runs are tracked and triggered via the Prefect server on the local machine, while all execution happens on the GPU instance.\n", + "\n", + "### Data ingestion in production\n", + "\n", + "This example uses a static CSV (`card_transaction.v1.csv`), but in a production scenario, new data arrives continuously. The preprocessing flow accepts a `raw_csv_path` parameter, which serves as the integration point with your data infrastructure. Common patterns on how newer data is fetched include:\n", + "\n", + "- **Scheduled ETL jobs.** An upstream pipeline (Spark, Airflow, dbt) lands a fresh CSV or Parquet file at a known path on a schedule. The preprocessing cron picks it up at midnight.\n", + "- **Cloud storage pulls.** Transaction data accumulates in S3 or GCS. A lightweight script or Prefect task pulls the latest file to the GPU instance before preprocessing runs.\n", + "- **Data warehouse queries.** A scheduled query extracts new records since the last run and writes them to the expected path.\n", + "\n", + "To integrate any of these, add your data fetching logic as a Prefect task at the start of the preprocessing flow. It runs as part of the same nightly cron schedule, so fresh data is pulled and processed in a single run without any additional orchestration." + ] + }, + { + "cell_type": "markdown", + "id": "cffc18f0", + "metadata": {}, + "source": [ + "## Conclusion\n", + "\n", + "This example demonstrates how to take a ML experiment and build production infrastructure around it:\n", + "\n", + "- **Prefect** orchestrates the pipeline stages, tracks runs, and enables scheduling\n", + "- **MLflow** tracks every experiment (hyperparameters, artifacts, metrics) and manages the model registry\n", + "- **Triton** serves models with native versioning, enabling champion/challenger evaluation without separate infrastructure\n", + "\n", + "The same patterns (flow orchestration, experiment tracking, versioned deployment) broadly apply to any ML production scenario where you need to automate training, compare model versions, and deploy the winner.\n", + "\n", + "### Resources\n", + "\n", + "- [NVIDIA Financial Fraud Detection AI Blueprint](https://github.com/NVIDIA-AI-Blueprints/financial-fraud-detection)\n", + "- [IBM TabFormer dataset](https://github.com/IBM/TabFormer)\n", + "- [Prefect 3.x documentation](https://docs.prefect.io)\n", + "- [MLflow documentation](https://mlflow.org/docs/latest)\n", + "- [Triton Inference Server](https://github.com/triton-inference-server/server)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.13.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/__init__.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/__init__.py new file mode 100644 index 00000000..5cbd5422 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/__init__.py @@ -0,0 +1 @@ +"""Fraud Detection MLOps Pipeline.""" diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/config.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/config.py new file mode 100644 index 00000000..fc7e3b8d --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/config.py @@ -0,0 +1,60 @@ +"""Centralized configuration for the fraud detection MLOps pipeline.""" + +import os + +from dotenv import load_dotenv + +load_dotenv() + +# --- Paths --- + +DATA_ROOT = os.getenv("DATA_ROOT", "/data/TabFormer") +RAW_CSV_PATH = os.path.join(DATA_ROOT, "raw", "card_transaction.v1.csv") +GNN_DATA_DIR = os.path.join(DATA_ROOT, "gnn") +TEST_DATA_DIR = os.path.join(DATA_ROOT, "gnn", "test_gnn") +MODEL_OUTPUT_DIR = os.getenv("MODEL_OUTPUT_DIR", "/data/trained_models") +TRITON_MODEL_REPO = os.getenv("TRITON_MODEL_REPO", "/models") + +# --- Infrastructure --- + +MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5050") +MLFLOW_EXPERIMENT_NAME = "fraud-detection-training" +MLFLOW_PREPROCESS_EXPERIMENT_NAME = "fraud-detection-preprocess" +PREFECT_API_URL = os.getenv("PREFECT_API_URL", "http://localhost:4200/api") +TRITON_HTTP_URL = os.getenv("TRITON_HTTP_URL", "localhost:8000") +TRITON_GRPC_URL = os.getenv("TRITON_GRPC_URL", "localhost:8001") + +# --- Training --- + +NGC_API_KEY = os.getenv("NGC_API_KEY", "") +TRAINING_IMAGE = "nvcr.io/nvidia/cugraph/financial-fraud-training:2.0.0" +TRITON_MODEL_NAME = "prediction_and_shapley" + +DEFAULT_GNN_PARAMS = { + "hidden_channels": 32, + "n_hops": 2, + "layer": "SAGEConv", + "dropout_prob": 0.1, + "batch_size": 4096, + "fan_out": 10, + "num_epochs": 8, +} + +DEFAULT_XGB_PARAMS = { + "max_depth": 6, + "learning_rate": 0.2, + "num_parallel_tree": 3, + "num_boost_round": 512, + "gamma": 0.0, +} + +# --- Preprocessing --- + +DEFAULT_FRAUD_RATIO = 0.1 +DEFAULT_UNDER_SAMPLE = True + +# --- Evaluation --- + +PROMOTION_METRIC = "f1_score" +MIN_IMPROVEMENT = 0.0 +DECISION_THRESHOLD = 0.5 diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/deployments.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/deployments.py new file mode 100644 index 00000000..cb33b747 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/deployments.py @@ -0,0 +1,47 @@ +"""Register all flows with Prefect work pools and schedules.""" + +from prefect import serve + +from pipeline.flows.deploy import deploy_flow +from pipeline.flows.evaluate import evaluate_flow +from pipeline.flows.full_pipeline import full_pipeline_flow +from pipeline.flows.preprocess import preprocess_flow +from pipeline.flows.train import train_flow + + +def main(): + """Register and serve all deployments.""" + preprocess_deploy = preprocess_flow.to_deployment( + name="preprocess", + work_pool_name="gpu", + cron="0 0 * * *", + ) + train_deploy = train_flow.to_deployment( + name="train", + work_pool_name="gpu", + ) + evaluate_deploy = evaluate_flow.to_deployment( + name="evaluate", + work_pool_name="gpu", + ) + deploy_deploy = deploy_flow.to_deployment( + name="deploy", + work_pool_name="gpu", + ) + full_pipeline_deploy = full_pipeline_flow.to_deployment( + name="full-pipeline", + work_pool_name="gpu", + cron="0 2 * * *", + ) + + serve( + preprocess_deploy, + train_deploy, + evaluate_deploy, + deploy_deploy, + full_pipeline_deploy, + ) + + +if __name__ == "__main__": + main() diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/__init__.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/__init__.py new file mode 100644 index 00000000..a296c359 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/__init__.py @@ -0,0 +1 @@ +"""Prefect flows for each pipeline stage.""" diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/deploy.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/deploy.py new file mode 100644 index 00000000..a8b80045 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/deploy.py @@ -0,0 +1,91 @@ +"""Deploy flow with Triton model version promotion or rollback.""" + +import logging + +from prefect import flow + +from pipeline.config import TRITON_HTTP_URL, TRITON_MODEL_NAME, TRITON_MODEL_REPO +from pipeline.tasks.mlflow_utils import register_champion +from pipeline.tasks.triton import ( + cleanup_version_artifacts, + health_check, + reload_model, +) + +logger = logging.getLogger(__name__) + + +@flow(name="deploy", log_prints=True) +def deploy_flow( + eval_result: dict, + triton_url: str = TRITON_HTTP_URL, + model_name: str = TRITON_MODEL_NAME, + model_repo_path: str = TRITON_MODEL_REPO, +) -> dict: + """Promote or roll back a model version in Triton based on the evaluation result. + + If the challenger won, removes the old champion, health-checks the new one, + and registers it in MLflow's model registry with the champion alias. If the + champion won, cleans up the challenger artifacts and restores Triton to its + previous state. + """ + should_promote = eval_result["should_promote"] + champion_version = eval_result["champion_version"] + challenger_version = eval_result["challenger_version"] + challenger_run_id = eval_result["challenger_run_id"] + + if should_promote: + logger.info( + "Promoting challenger version %d as new champion", challenger_version + ) + + # Remove old champion artifacts and reload + if champion_version > 0: + cleanup_version_artifacts(model_repo_path, model_name, champion_version) + reload_model(triton_url, model_name) + + # Health check new champion + healthy = health_check(triton_url, model_name, challenger_version) + if not healthy: + logger.error("Health check failed for version %d", challenger_version) + raise RuntimeError( + f"Deploy failed: version {challenger_version} not healthy after promotion." + ) + + # Register in MLflow + register_champion(challenger_run_id, model_name) + + logger.info("Deploy complete: version %d is now champion", challenger_version) + return { + "action": "promoted", + "version": challenger_version, + "run_id": challenger_run_id, + } + + else: + logger.info("Rejecting challenger version %d", challenger_version) + + # Remove challenger artifacts and reload to restore champion-only state + cleanup_version_artifacts(model_repo_path, model_name, challenger_version) + if champion_version > 0: + reload_model(triton_url, model_name) + + logger.info( + "Rejected: version %d cleaned up, champion version %d unchanged", + challenger_version, + champion_version, + ) + return { + "action": "rejected", + "version": challenger_version, + "reason": eval_result.get("reason", ""), + } + + +if __name__ == "__main__": + import json + import sys + + if len(sys.argv) > 1: + eval_result = json.loads(sys.argv[1]) + deploy_flow(eval_result=eval_result) diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/evaluate.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/evaluate.py new file mode 100644 index 00000000..5d60a80a --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/evaluate.py @@ -0,0 +1,145 @@ +"""Evaluate flow — champion/challenger comparison via Triton native versioning.""" + +import logging +import os + +import numpy as np +from prefect import flow + +from pipeline.config import ( + DECISION_THRESHOLD, + MIN_IMPROVEMENT, + MODEL_OUTPUT_DIR, + PROMOTION_METRIC, + TEST_DATA_DIR, + TRITON_HTTP_URL, + TRITON_MODEL_NAME, + TRITON_MODEL_REPO, +) +from pipeline.tasks.data import load_test_data +from pipeline.tasks.mlflow_utils import ( + get_champion_metrics, + log_evaluation_metrics, + log_promotion_decision, +) +from pipeline.tasks.triton import ( + health_check, + reload_model, + score_model_version, + stage_challenger_version, +) + +logger = logging.getLogger(__name__) + + +@flow(name="evaluate", log_prints=True) +def evaluate_flow( + challenger_run_id: str, + challenger_artifacts_path: str = MODEL_OUTPUT_DIR, + test_data_dir: str = TEST_DATA_DIR, + triton_url: str = TRITON_HTTP_URL, + model_name: str = TRITON_MODEL_NAME, + model_repo_path: str = TRITON_MODEL_REPO, + promotion_metric: str = PROMOTION_METRIC, + min_improvement: float = MIN_IMPROVEMENT, + decision_threshold: float = DECISION_THRESHOLD, +) -> dict: + """Compare a newly trained model against the current champion via Triton. + + Stages the challenger as the next version in Triton's model repository, + scores both versions on held-out test data, and decides whether the + challenger should be promoted based on the configured metric and threshold. + Results are logged to the same MLflow run that was created during training. + """ + # Step 1: Stage challenger as next version + source_artifacts = os.path.join( + challenger_artifacts_path, + "python_backend_model_repository", + model_name, + "1", + ) + champion_version, challenger_version = stage_challenger_version( + source_artifacts, + model_repo_path, + model_name, + ) + + # Step 2: Reload model to pick up new version + reload_model(triton_url, model_name) + + # Step 3: Load test data + test_data = load_test_data(test_data_dir) + labels = test_data["edge_label_user_to_merchant"] + if hasattr(labels, "to_numpy"): + labels = labels.to_numpy(dtype=np.int32) + if hasattr(labels, "values"): + labels = labels.values + labels = np.asarray(labels, dtype=np.int32).ravel() + + # Remove labels from inference data + inference_data = { + k: v for k, v in test_data.items() if not k.startswith("edge_label_") + } + + # Step 4: Score challenger + challenger_metrics = score_model_version( + triton_url, + model_name, + challenger_version, + inference_data, + labels, + decision_threshold, + ) + + # Step 5: Score champion (if exists) + champion_metrics = None + if champion_version > 0: + if health_check(triton_url, model_name, champion_version): + champion_metrics = score_model_version( + triton_url, + model_name, + champion_version, + inference_data, + labels, + decision_threshold, + ) + else: + champion_metrics = get_champion_metrics(model_name) + + # Step 6: Decide promotion + if champion_metrics is None: + should_promote = True + reason = "First model deployment — auto-promoted" + else: + challenger_val = challenger_metrics.get(promotion_metric, 0) + champion_val = champion_metrics.get(promotion_metric, 0) + should_promote = challenger_val > champion_val + min_improvement + reason = ( + f"Challenger {promotion_metric}={challenger_val:.4f} vs " + f"champion {promotion_metric}={champion_val:.4f} " + f"(delta={challenger_val - champion_val:.4f}, min={min_improvement})" + ) + + logger.info( + "Promotion decision: %s — %s", "PROMOTE" if should_promote else "REJECT", reason + ) + + # Step 7: Log to MLflow + log_evaluation_metrics(challenger_run_id, challenger_metrics, champion_metrics) + log_promotion_decision(challenger_run_id, should_promote, reason) + + return { + "should_promote": should_promote, + "champion_version": champion_version, + "challenger_version": challenger_version, + "challenger_run_id": challenger_run_id, + "challenger_metrics": challenger_metrics, + "champion_metrics": champion_metrics, + "reason": reason, + } + + +if __name__ == "__main__": + import sys + + evaluate_flow(challenger_run_id=sys.argv[1] if len(sys.argv) > 1 else "manual") diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/full_pipeline.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/full_pipeline.py new file mode 100644 index 00000000..1bc06c4d --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/full_pipeline.py @@ -0,0 +1,110 @@ +"""Full pipeline orchestrator — chains preprocess → train → evaluate → deploy.""" + +import logging +import os + +from prefect import flow + +from pipeline.config import ( + DATA_ROOT, + DEFAULT_FRAUD_RATIO, + DEFAULT_GNN_PARAMS, + DEFAULT_UNDER_SAMPLE, + DEFAULT_XGB_PARAMS, + MIN_IMPROVEMENT, + MODEL_OUTPUT_DIR, + RAW_CSV_PATH, + TRITON_HTTP_URL, + TRITON_MODEL_REPO, +) +from pipeline.flows.deploy import deploy_flow +from pipeline.flows.evaluate import evaluate_flow +from pipeline.flows.preprocess import preprocess_flow +from pipeline.flows.train import train_flow + +logger = logging.getLogger(__name__) + + +@flow(name="full-pipeline", log_prints=True) +def full_pipeline_flow( + raw_csv_path: str = RAW_CSV_PATH, + data_output_path: str = DATA_ROOT, + model_output_dir: str = MODEL_OUTPUT_DIR, + triton_url: str = TRITON_HTTP_URL, + model_repo_path: str = TRITON_MODEL_REPO, + fraud_ratio: float = DEFAULT_FRAUD_RATIO, + under_sample: bool = DEFAULT_UNDER_SAMPLE, + gnn_params: dict | None = None, + xgb_params: dict | None = None, + min_improvement: float = MIN_IMPROVEMENT, + skip_preprocess: bool = False, +) -> dict: + """End-to-end fraud detection pipeline. + + Chains all four stages as subflows. Each stage runs as a child flow + within the same process, with results passed directly between stages. + + Set skip_preprocess=True if graph data already exists from a previous run. + """ + gnn_params = gnn_params or DEFAULT_GNN_PARAMS + xgb_params = xgb_params or DEFAULT_XGB_PARAMS + + logger.info("Starting full pipeline") + + # Stage 1: Preprocess + if not skip_preprocess: + preprocess_result = preprocess_flow( + raw_csv_path=raw_csv_path, + output_base_path=data_output_path, + fraud_ratio=fraud_ratio, + under_sample=under_sample, + ) + logger.info( + "Preprocessing complete: %d transactions", + preprocess_result["metadata"]["num_transactions"], + ) + + # Stage 2: Train + gnn_data_dir = os.path.join(data_output_path, "gnn") + run_id = train_flow( + data_dir=gnn_data_dir, + output_dir=model_output_dir, + gnn_params=gnn_params, + xgb_params=xgb_params, + ) + logger.info("Training complete — run_id: %s", run_id) + + # Stage 3: Evaluate + test_data_dir = os.path.join(data_output_path, "gnn", "test_gnn") + eval_result = evaluate_flow( + challenger_run_id=run_id, + challenger_artifacts_path=model_output_dir, + test_data_dir=test_data_dir, + triton_url=triton_url, + model_repo_path=model_repo_path, + min_improvement=min_improvement, + ) + logger.info( + "Evaluation: %s — %s", + "PROMOTE" if eval_result["should_promote"] else "REJECT", + eval_result["reason"], + ) + + # Stage 4: Deploy + deploy_result = deploy_flow( + eval_result=eval_result, + triton_url=triton_url, + model_repo_path=model_repo_path, + ) + logger.info("Deploy: %s", deploy_result["action"]) + + return { + "status": "complete", + "train_run_id": run_id, + "eval_result": eval_result, + "deploy_result": deploy_result, + } + + +if __name__ == "__main__": + full_pipeline_flow() diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/preprocess.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/preprocess.py new file mode 100644 index 00000000..bf9618d3 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/preprocess.py @@ -0,0 +1,87 @@ +"""Preprocessing flow — runs cuDF-based graph formation and logs metadata to MLflow.""" + +import logging +import os +import sys + +import mlflow +from prefect import flow + +from pipeline.config import ( + DATA_ROOT, + DEFAULT_FRAUD_RATIO, + DEFAULT_UNDER_SAMPLE, + MLFLOW_PREPROCESS_EXPERIMENT_NAME, + RAW_CSV_PATH, +) +from pipeline.tasks.mlflow_utils import get_or_create_experiment + +logger = logging.getLogger(__name__) + + +@flow(name="preprocess", log_prints=True) +def preprocess_flow( + raw_csv_path: str = RAW_CSV_PATH, + output_base_path: str = DATA_ROOT, + fraud_ratio: float = DEFAULT_FRAUD_RATIO, + under_sample: bool = DEFAULT_UNDER_SAMPLE, +) -> dict: + """Preprocess raw TabFormer CSV into graph data for training. + + Steps: + 1. Run cuDF-based preprocessing (adapted from blueprint) + 2. Log metadata to MLflow + """ + # Step 1: Run preprocessing (import here because it requires cuDF/GPU) + scripts_dir = os.path.join(os.path.dirname(__file__), "..", "..", "scripts") + sys.path.insert(0, os.path.abspath(scripts_dir)) + from preprocess_tabformer import preprocess_data + + metadata, user_mask_map, mx_mask_map, tx_mask_map = preprocess_data( + raw_csv_path=raw_csv_path, + output_base_path=output_base_path, + fraud_ratio=fraud_ratio, + under_sample=under_sample, + ) + + logger.info( + "Preprocessing complete: %d transactions, %d users, %d merchants", + metadata["num_transactions"], + metadata["num_users"], + metadata["num_merchants"], + ) + + # Step 2: Log to MLflow + experiment_id = get_or_create_experiment(MLFLOW_PREPROCESS_EXPERIMENT_NAME) + with mlflow.start_run(experiment_id=experiment_id, run_name="preprocess"): + mlflow.log_params( + { + "preprocess.fraud_ratio": fraud_ratio, + "preprocess.under_sample": under_sample, + } + ) + mlflow.log_metrics( + { + "preprocess.row_count": float(metadata["row_count"]), + "preprocess.num_users": float(metadata["num_users"]), + "preprocess.num_merchants": float(metadata["num_merchants"]), + "preprocess.num_transactions": float(metadata["num_transactions"]), + } + ) + mlflow.set_tag("stage", "preprocess") + + gnn_dir = os.path.join(output_base_path, "gnn") + return { + "output_base_path": output_base_path, + "gnn_dir": gnn_dir, + "metadata": metadata, + "masks": { + "user": user_mask_map, + "merchant": mx_mask_map, + "transaction": tx_mask_map, + }, + } + + +if __name__ == "__main__": + preprocess_flow() diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/train.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/train.py new file mode 100644 index 00000000..05fca071 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/flows/train.py @@ -0,0 +1,81 @@ +"""Training flow — config generation, container execution, MLflow logging.""" + +import logging +import tempfile + +from prefect import flow + +from pipeline.config import ( + DEFAULT_GNN_PARAMS, + DEFAULT_XGB_PARAMS, + GNN_DATA_DIR, + MLFLOW_EXPERIMENT_NAME, + MODEL_OUTPUT_DIR, + TRAINING_IMAGE, +) +from pipeline.tasks.mlflow_utils import get_or_create_experiment, log_training_run +from pipeline.tasks.training import generate_training_config, run_training_container + +logger = logging.getLogger(__name__) + + +@flow(name="train", log_prints=True) +def train_flow( + data_dir: str = GNN_DATA_DIR, + output_dir: str = MODEL_OUTPUT_DIR, + gnn_params: dict | None = None, + xgb_params: dict | None = None, + experiment_name: str = MLFLOW_EXPERIMENT_NAME, + run_tags: dict | None = None, + training_image: str = TRAINING_IMAGE, + gpu_device: str = "0", +) -> str: + """Train the GNN + XGBoost fraud detection model. + + Steps: + 1. Generate training config JSON from parameters + 2. Run the financial-fraud-training container + 3. Validate model outputs were produced + 4. Log everything to MLflow + + Returns the MLflow run ID. + """ + gnn_params = gnn_params or DEFAULT_GNN_PARAMS + xgb_params = xgb_params or DEFAULT_XGB_PARAMS + + # Step 1: Generate config + with tempfile.TemporaryDirectory() as config_dir: + config_path = generate_training_config( + data_dir=data_dir, + output_dir=output_dir, + gnn_params=gnn_params, + xgb_params=xgb_params, + config_dir=config_dir, + ) + + # Step 2: Run training container + run_training_container( + data_dir=data_dir, + output_dir=output_dir, + config_path=config_path, + training_image=training_image, + gpu_device=gpu_device, + ) + + # Step 3: Log to MLflow + experiment_id = get_or_create_experiment(experiment_name) + run_id = log_training_run( + experiment_id=experiment_id, + gnn_params=gnn_params, + xgb_params=xgb_params, + config_path=config_path, + model_output_dir=output_dir, + tags=run_tags, + ) + + logger.info("Training complete — MLflow run ID: %s", run_id) + return run_id + + +if __name__ == "__main__": + train_flow() diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/__init__.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/__init__.py new file mode 100644 index 00000000..a503b28e --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/__init__.py @@ -0,0 +1 @@ +"""Prefect tasks — reusable units of work.""" diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/data.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/data.py new file mode 100644 index 00000000..3bfe1d18 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/data.py @@ -0,0 +1,85 @@ +"""Data loading tasks.""" + +import logging +import os + +import numpy as np +import pandas as pd +from prefect import task + +logger = logging.getLogger(__name__) + + +@task(name="load-test-data") +def load_test_data(test_data_dir: str) -> dict: + """Load heterogeneous graph test data for inference. + + Adapted from preprocess_TabFormer_lp.load_hetero_graph(). + Returns dict with node features, edge indices, edge attrs, labels, and feature masks. + """ + nodes_dir = os.path.join(test_data_dir, "nodes") + edges_dir = os.path.join(test_data_dir, "edges") + out = {} + + # Load node features and feature masks + if os.path.isdir(nodes_dir): + for fname in sorted(os.listdir(nodes_dir)): + if fname.endswith(".csv") and not fname.endswith("_feature_mask.csv"): + node_name = fname[: -len(".csv")] + node_df = pd.read_csv(os.path.join(nodes_dir, fname)) + out[f"x_{node_name}"] = node_df.to_numpy(dtype=np.float32) + + mask_path = os.path.join(nodes_dir, f"{node_name}_feature_mask.csv") + if os.path.exists(mask_path): + mask = ( + pd.read_csv(mask_path, header=None) + .to_numpy(dtype=np.int32) + .ravel() + ) + else: + mask = np.zeros(node_df.shape[1], dtype=np.int32) + out[f"feature_mask_{node_name}"] = mask + + # Load edges: base, attrs, labels, feature masks + base_edges = {} + edge_attrs = {} + edge_labels = {} + edge_feature_masks = {} + + if os.path.isdir(edges_dir): + for fname in sorted(os.listdir(edges_dir)): + if not fname.endswith(".csv"): + continue + path = os.path.join(edges_dir, fname) + if fname.endswith("_attr.csv"): + edge_name = fname[: -len("_attr.csv")] + edge_attrs[edge_name] = pd.read_csv(path) + elif fname.endswith("_label.csv"): + edge_name = fname[: -len("_label.csv")] + edge_labels[edge_name] = pd.read_csv(path) + elif fname.endswith("_feature_mask.csv"): + edge_name = fname[: -len("_feature_mask.csv")] + edge_feature_masks[edge_name] = pd.read_csv(path, header=None) + else: + edge_name = fname[: -len(".csv")] + base_edges[edge_name] = pd.read_csv(path) + + for edge_name, df in base_edges.items(): + out[f"edge_index_{edge_name}"] = df.to_numpy(dtype=np.int64).T + if edge_name in edge_attrs: + out[f"edge_attr_{edge_name}"] = edge_attrs[edge_name].to_numpy( + dtype=np.float32 + ) + if edge_name in edge_feature_masks: + out[f"edge_feature_mask_{edge_name}"] = ( + edge_feature_masks[edge_name].to_numpy(dtype=np.int32).ravel() + ) + elif edge_name in edge_attrs: + out[f"edge_feature_mask_{edge_name}"] = np.zeros( + edge_attrs[edge_name].shape[1], dtype=np.int32 + ) + + for label_edge_name, label_df in edge_labels.items(): + out[f"edge_label_{label_edge_name}"] = label_df + + return out diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/mlflow_utils.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/mlflow_utils.py new file mode 100644 index 00000000..bc97b9d8 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/mlflow_utils.py @@ -0,0 +1,144 @@ +"""MLflow utility tasks — experiment management, metrics, model registry.""" + +import logging + +import mlflow +from mlflow import MlflowClient +from prefect import task + +from pipeline.config import ( + MLFLOW_EXPERIMENT_NAME, + MLFLOW_TRACKING_URI, + TRITON_MODEL_NAME, +) + +logger = logging.getLogger(__name__) + +# Configure once — all functions in this module use the same tracking server +mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) + + +@task(name="get-or-create-experiment") +def get_or_create_experiment(experiment_name: str = MLFLOW_EXPERIMENT_NAME) -> str: + """Get existing MLflow experiment or create a new one. Returns experiment ID.""" + experiment = mlflow.get_experiment_by_name(experiment_name) + if experiment is not None: + return experiment.experiment_id + experiment_id = mlflow.create_experiment(experiment_name) + logger.info( + "Created MLflow experiment '%s' (id=%s)", experiment_name, experiment_id + ) + return experiment_id + + +@task(name="log-training-run") +def log_training_run( + experiment_id: str, + gnn_params: dict, + xgb_params: dict, + config_path: str, + model_output_dir: str, + tags: dict | None = None, +) -> str: + """Create an MLflow run logging hyperparams, config artifact, and model artifacts. + + Returns the MLflow run ID. + """ + with mlflow.start_run(experiment_id=experiment_id) as run: + # Log hyperparameters (flatten with prefix) + for k, v in gnn_params.items(): + mlflow.log_param(f"gnn.{k}", v) + for k, v in xgb_params.items(): + mlflow.log_param(f"xgb.{k}", v) + + # Log config as artifact + mlflow.log_artifact(config_path) + + # Log model artifacts directory + mlflow.log_artifacts(model_output_dir, artifact_path="model") + + # Tags + run_tags = {"trigger": "automated"} + if tags: + run_tags.update(tags) + mlflow.set_tags(run_tags) + + logger.info("Logged training run %s", run.info.run_id) + return run.info.run_id + + +@task(name="log-evaluation-metrics") +def log_evaluation_metrics( + run_id: str, + challenger_metrics: dict, + champion_metrics: dict | None, +) -> None: + """Log evaluation metrics to an existing MLflow run.""" + with mlflow.start_run(run_id=run_id): + # Log challenger metrics + mlflow.log_metrics( + {f"challenger_{k}": v for k, v in challenger_metrics.items()} + ) + + # Log champion metrics and delta if champion exists + if champion_metrics: + mlflow.log_metrics( + {f"champion_{k}": v for k, v in champion_metrics.items()} + ) + deltas = { + f"{k}_delta": challenger_metrics.get(k, 0) - champion_metrics.get(k, 0) + for k in challenger_metrics + } + mlflow.log_metrics(deltas) + + +@task(name="get-champion-metrics") +def get_champion_metrics(model_name: str = TRITON_MODEL_NAME) -> dict | None: + """Retrieve metrics for the current champion model from MLflow registry. + + Returns None if no champion exists (first deployment). + """ + client = MlflowClient(MLFLOW_TRACKING_URI) + + versions = client.search_model_versions(f"name='{model_name}'") + champion_version = None + for v in versions: + if "champion" in (v.aliases or []): + champion_version = v + break + + if champion_version is None: + logger.info("No champion model found — this is the first deployment") + return None + + run = client.get_run(champion_version.run_id) + metrics = dict(run.data.metrics) + logger.info("Champion metrics (run %s): %s", champion_version.run_id, metrics) + return metrics + + +@task(name="register-champion") +def register_champion(run_id: str, model_name: str = TRITON_MODEL_NAME) -> None: + """Register a model version and assign the 'champion' alias.""" + client = MlflowClient(MLFLOW_TRACKING_URI) + + # Register model version + model_uri = f"runs:/{run_id}/model" + mv = mlflow.register_model(model_uri, model_name) + + # Move champion alias to this version + client.set_registered_model_alias(model_name, "champion", mv.version) + logger.info( + "Registered %s version %s as champion (run %s)", model_name, mv.version, run_id + ) + + +@task(name="log-promotion-decision") +def log_promotion_decision(run_id: str, should_promote: bool, reason: str = "") -> None: + """Log the promotion decision to the MLflow run.""" + with mlflow.start_run(run_id=run_id): + mlflow.set_tag( + "promotion_decision", "promoted" if should_promote else "rejected" + ) + if reason: + mlflow.set_tag("promotion_reason", reason) diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/training.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/training.py new file mode 100644 index 00000000..e75dbccb --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/training.py @@ -0,0 +1,112 @@ +"""Training tasks — config generation and container lifecycle.""" + +import json +import logging +import os +import subprocess + +from prefect import task + +from pipeline.config import TRAINING_IMAGE + +logger = logging.getLogger(__name__) + + +@task(name="generate-training-config") +def generate_training_config( + data_dir: str, + output_dir: str, + gnn_params: dict, + xgb_params: dict, + config_dir: str = ".", +) -> str: + """Build training JSON config and write to disk. + + The training container expects: + - paths.data_dir = /data (container mount point) + - paths.output_dir = /trained_models (container mount point) + """ + config = { + "paths": { + "data_dir": "/data", + "output_dir": "/trained_models", + }, + "models": [ + { + "kind": "GNN_XGBoost", + "gpu": "single", + "hyperparameters": { + "gnn": gnn_params, + "xgb": xgb_params, + }, + } + ], + } + + config_path = os.path.join(config_dir, "training_config.json") + with open(config_path, "w") as f: + json.dump(config, f, indent=2) + + logger.info("Training config written to %s", config_path) + return config_path + + +@task(name="run-training-container") +def run_training_container( + data_dir: str, + output_dir: str, + config_path: str, + training_image: str = TRAINING_IMAGE, + gpu_device: str = "0", +) -> None: + """Run the financial-fraud-training container. + + Includes NCCL workarounds for cloud instances without InfiniBand: + forces socket-based communication, bypassing UCX and OFI plugins. + """ + cmd = [ + "docker", + "run", + "--rm", + "--gpus", + f"device={gpu_device}", + "--cap-add", + "SYS_NICE", + "--shm-size=8g", + "--privileged", + "-e", + "NCCL_IB_DISABLE=1", + "-e", + "NCCL_NET=Socket", + "-e", + "NCCL_SOCKET_IFNAME=eth0", + "-e", + "NCCL_P2P_DISABLE=1", + "-e", + "NCCL_SHM_DISABLE=1", + "-v", + f"{data_dir}:/data", + "-v", + f"{output_dir}:/trained_models", + "-v", + f"{config_path}:/app/config.json", + "--entrypoint", + "bash", + training_image, + "-c", + "torchrun --standalone --nproc_per_node=1 /app/main.py --config /app/config.json", + ] + logger.info("Running training container: %s", " ".join(cmd)) + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + logger.error( + "Training container failed:\nstdout: %s\nstderr: %s", + result.stdout, + result.stderr, + ) + raise RuntimeError( + f"Training container exited with code {result.returncode}: {result.stderr}" + ) + + logger.info("Training container completed successfully") diff --git a/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/triton.py b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/triton.py new file mode 100644 index 00000000..8f0da0fc --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/pipeline/tasks/triton.py @@ -0,0 +1,208 @@ +"""Triton Inference Server tasks — version management, model control API, health checks.""" + +import logging +import os +import shutil +import time + +import numpy as np +import tritonclient.http as httpclient +from prefect import task + +from pipeline.config import TRITON_MODEL_NAME + +logger = logging.getLogger(__name__) + + +@task(name="get-current-version") +def get_current_version( + model_repo_path: str, model_name: str = TRITON_MODEL_NAME +) -> int: + """Get the highest version number in the Triton model repository. + + Returns 0 if the model directory doesn't exist. + """ + model_dir = os.path.join(model_repo_path, model_name) + if not os.path.isdir(model_dir): + return 0 + + versions = [] + for entry in os.listdir(model_dir): + entry_path = os.path.join(model_dir, entry) + if os.path.isdir(entry_path) and entry.isdigit(): + versions.append(int(entry)) + + return max(versions) if versions else 0 + + +@task(name="stage-challenger-version") +def stage_challenger_version( + challenger_artifacts_path: str, + model_repo_path: str, + model_name: str = TRITON_MODEL_NAME, +) -> tuple[int, int]: + """Copy challenger model artifacts as the next version in the Triton model repo. + + Args: + challenger_artifacts_path: Path to the version directory with model files + (e.g., .../python_backend_model_repository/prediction_and_shapley/1/) + model_repo_path: Root of the Triton model repository + model_name: Triton model name + + Returns: + (champion_version, challenger_version) tuple + """ + champion_version = get_current_version.fn(model_repo_path, model_name) + challenger_version = champion_version + 1 + + target_dir = os.path.join(model_repo_path, model_name, str(challenger_version)) + shutil.copytree(challenger_artifacts_path, target_dir) + + # Ensure config.pbtxt has version_policy { all {} } so both versions are served + config_path = os.path.join(model_repo_path, model_name, "config.pbtxt") + if os.path.exists(config_path): + with open(config_path) as f: + config_text = f.read() + if "version_policy" not in config_text: + with open(config_path, "a") as f: + f.write("\nversion_policy { all {} }\n") + logger.info("Added version_policy { all {} } to config.pbtxt") + + logger.info( + "Staged challenger as version %d (champion is version %d)", + challenger_version, + champion_version, + ) + return champion_version, challenger_version + + +@task(name="reload-model") +def reload_model( + triton_url: str, + model_name: str, + timeout: int = 120, +) -> None: + """Unload then reload a model in Triton to pick up filesystem changes. + + Since load_model/unload_model operate on the entire model (not individual versions), + this is how we refresh Triton's view of available versions. + """ + client = httpclient.InferenceServerClient(url=triton_url) + try: + client.unload_model(model_name) + except Exception: + pass # Model might not be loaded yet + + client.load_model(model_name) + + # Wait for model to be ready + start = time.time() + while time.time() - start < timeout: + try: + if client.is_server_ready() and client.is_model_ready(model_name): + logger.info("Model %s reloaded successfully", model_name) + return + except Exception: + pass + time.sleep(2) + + raise TimeoutError(f"Model {model_name} not ready after {timeout}s") + + +@task(name="health-check") +def health_check( + triton_url: str, + model_name: str, + version: int, + timeout: int = 30, +) -> bool: + """Check if a specific model version is ready and responsive.""" + try: + client = httpclient.InferenceServerClient(url=triton_url) + return client.is_model_ready(model_name, model_version=str(version)) + except Exception as e: + logger.warning("Health check failed for %s v%d: %s", model_name, version, e) + return False + + +@task(name="score-model-version") +def score_model_version( + triton_url: str, + model_name: str, + version: int, + test_data: dict, + labels: np.ndarray, + decision_threshold: float = 0.5, +) -> dict: + """Send test data to a specific Triton model version and compute metrics. + + Returns dict with f1_score, precision, recall, accuracy. + """ + from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score + from tritonclient.http import InferInput, InferRequestedOutput + + client = httpclient.InferenceServerClient(url=triton_url) + + # Build inputs from test_data dict + inputs = [] + for key, value in test_data.items(): + if key.startswith("x_"): + dtype = "FP32" + elif key.startswith("feature_mask_"): + dtype = "INT32" + elif key.startswith("edge_feature_mask_"): + dtype = "INT32" + elif key.startswith("edge_index_"): + dtype = "INT64" + elif key.startswith("edge_attr_"): + dtype = "FP32" + elif key == "COMPUTE_SHAP": + dtype = "BOOL" + else: + continue + + inp = InferInput(key, list(value.shape), datatype=dtype) + inp.set_data_from_numpy(value) + inputs.append(inp) + + # Add COMPUTE_SHAP = False for evaluation (faster) + if "COMPUTE_SHAP" not in test_data: + shap_flag = np.array([False], dtype=np.bool_) + inp = InferInput("COMPUTE_SHAP", [1], datatype="BOOL") + inp.set_data_from_numpy(shap_flag) + inputs.append(inp) + + outputs = [InferRequestedOutput("PREDICTION")] + + response = client.infer( + model_name, + inputs=inputs, + model_version=str(version), + outputs=outputs, + ) + + predictions = response.as_numpy("PREDICTION") + y_pred = (predictions > decision_threshold).astype(int).ravel() + y_true = labels.ravel() + + metrics = { + "f1_score": float(f1_score(y_true, y_pred, zero_division=0)), + "precision": float(precision_score(y_true, y_pred, zero_division=0)), + "recall": float(recall_score(y_true, y_pred, zero_division=0)), + "accuracy": float(accuracy_score(y_true, y_pred)), + } + logger.info("Model %s v%d metrics: %s", model_name, version, metrics) + return metrics + + +@task(name="cleanup-version-artifacts") +def cleanup_version_artifacts( + model_repo_path: str, + model_name: str, + version: int, +) -> None: + """Remove a version directory from the model repository.""" + version_dir = os.path.join(model_repo_path, model_name, str(version)) + if os.path.isdir(version_dir): + shutil.rmtree(version_dir) + logger.info("Cleaned up version %d artifacts", version) diff --git a/source/examples/fraud-detection-mlops-pipeline/scripts/download_data.sh b/source/examples/fraud-detection-mlops-pipeline/scripts/download_data.sh new file mode 100755 index 00000000..d8b4f8a0 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/scripts/download_data.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +set -euo pipefail + +DATA_DIR="${1:?Usage: $0 }" +RAW_DIR="${DATA_DIR}/raw" + +mkdir -p "${RAW_DIR}" + +echo "============================================" +echo "TabFormer Dataset Download" +echo "============================================" +echo "" +echo "The TabFormer dataset must be downloaded manually from IBM Box:" +echo " https://ibm.ent.box.com/v/tabformer-data/folder/130747715605" +echo "" +echo "1. Download 'transactions.tgz' from the link above" +echo "2. Place it in: ${RAW_DIR}/" +echo "3. Run this script again to extract it" +echo "" + +if [ -f "${RAW_DIR}/card_transaction.v1.csv" ]; then + echo "Dataset already extracted at ${RAW_DIR}/card_transaction.v1.csv" + wc -l "${RAW_DIR}/card_transaction.v1.csv" + exit 0 +fi + +if [ -f "${RAW_DIR}/transactions.tgz" ]; then + echo "Extracting transactions.tgz..." + tar xzf "${RAW_DIR}/transactions.tgz" -C "${RAW_DIR}/" + echo "Extracted to ${RAW_DIR}/card_transaction.v1.csv" + wc -l "${RAW_DIR}/card_transaction.v1.csv" +else + echo "transactions.tgz not found in ${RAW_DIR}/" + echo "Please download it from the IBM Box link above." + exit 1 +fi diff --git a/source/examples/fraud-detection-mlops-pipeline/scripts/preprocess_tabformer.py b/source/examples/fraud-detection-mlops-pipeline/scripts/preprocess_tabformer.py new file mode 100644 index 00000000..0c3fa5a5 --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/scripts/preprocess_tabformer.py @@ -0,0 +1,1193 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# # Credit Card Transaction Data Cleanup and Prep +# +# This source code shows the steps for cleanup and preparing the credit card +# transaction data for training models with Training NIM. +# +# ### The dataset: +# * IBM TabFormer: https://github.com/IBM/TabFormer +# * Released under an Apache 2.0 license +# +# Contains 24M records with 15 fields, one field being the "is fraud" label which we use for training. +# +# ### Goals +# The goal is to: +# * Cleanup the data +# * Make field names just single word +# * while field names are not used within the GNN, it makes accessing fields easier during cleanup +# * Encode categorical fields +# * use one-hot encoding for fields with less than 8 categories +# * use binary encoding for fields with more than 8 categories +# * Create a continuous node index across users, merchants, and transactions +# * having node ID start at zero and then be contiguous is critical +# for creation of Compressed Sparse Row (CSR) formatted data +# without wasting memory. +# * Produce: +# * For XGBoost: +# * Training - all data before 2018 +# * Validation - all data during 2018 +# * Test. - all data after 2018 +# * For GNN +# * Training Data +# * Edge List +# * Feature data +# * Test set - all data after 2018 +# +# +# +# ### Graph formation +# Given that we are limited to just the data in the transaction file, the +# ideal model would be to have a bipartite graph of Users to Merchants where +# the edges represent the credit card transaction and then perform Link +# Classification on the Edges to identify fraud. Unfortunately the current +# version of cuGraph does not support GNN Link Prediction. That limitation +# will be lifted over the next few release at which time this code will be +# updated. Luckily, there is precedence for viewing transactions as nodes +# and then doing node classification using the popular GraphSAGE GNN. That +# is the approach this code takes. The produced graph will be a tri-partite +# graph where each transaction is represented as a node. +# +# +# +# +# ### Features +# For the XGBoost approach, there is no need to generate empty features +# for the Merchants. However, for GNN processing, every node needs to +# have the same set of feature data. Therefore, we need to generate +# empty features for the User and Merchant nodes. +# +# ----- + +# #### Import the necessary libraries. In this case will be use cuDF and perform most of the data prep in GPU +# + + +import logging +import os + +import cudf +import matplotlib.pyplot as plt +import networkx as nx +import numpy as np +import pandas as pd +import scipy.stats as ss +from category_encoders import BinaryEncoder +from scipy.stats import pointbiserialr +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import OneHotEncoder, RobustScaler + +COL_USER = "User" +COL_CARD = "Card" +COL_AMOUNT = "Amount" +COL_MCC = "MCC" +COL_TIME = "Time" +COL_DAY = "Day" +COL_MONTH = "Month" +COL_YEAR = "Year" + +COL_MERCHANT = "Merchant" +COL_STATE = "State" +COL_CITY = "City" +COL_ZIP = "Zip" +COL_ERROR = "Errors" +COL_CHIP = "Chip" +COL_FRAUD = "Fraud" +COL_TRANSACTION_ID = "Tx_ID" +COL_MERCHANT_ID = "Merchant_ID" +COL_USER_ID = "User_ID" + +UNKNOWN_STRING_MARKER = "XX" +UNKNOWN_ZIP_CODE = 0 + +COL_GRAPH_SRC = "src" +COL_GRAPH_DST = "dst" +COL_GRAPH_WEIGHT = "wgt" +MERCHANT_AND_USER_COLS = [COL_MERCHANT, COL_CARD, COL_MCC] + +logger = logging.getLogger(__name__) + + +def cramers_v(x, y): + """ " + Compute correlation of categorical field x with target y. + See https://en.wikipedia.org/wiki/Cram%C3%A9r's_V + """ + confusion_matrix = cudf.crosstab(x, y).to_numpy() + chi2 = ss.chi2_contingency(confusion_matrix)[0] + n = confusion_matrix.sum().sum() + r, k = confusion_matrix.shape + return np.sqrt(chi2 / (n * (min(k - 1, r - 1)))) + + +def create_feature_mask(columns, start_mask_id=0): + # Dictionary to store mapping from original column to mask value + mask_mapping = {} + mask_values = [] + current_mask = start_mask_id + + for col in columns: + # For encoded columns, assume the base is before the underscore + if "_" in col: + base_feature = col.split("_")[0] + else: + base_feature = col # For non-encoded columns, use the column name directly + + # Assign a new mask value if this base feature hasn't been seen before + if base_feature not in mask_mapping: + mask_mapping[base_feature] = current_mask + current_mask += 1 + + # Append the mask value for this column + mask_values.append(mask_mapping[base_feature]) + + # Convert list to numpy array for further processing if needed + feature_mask = np.array(mask_values) + + return mask_mapping, feature_mask + + +def preprocess_data( + raw_csv_path: str, + output_base_path: str, + fraud_ratio: float = 0.1, + under_sample: bool = True, +) -> tuple[dict, dict, dict, dict]: + + tabformer_xgb = os.path.join(output_base_path, "xgb") + tabformer_gnn = os.path.join(output_base_path, "gnn") + + if not os.path.exists(tabformer_xgb): + os.makedirs(tabformer_xgb) + if not os.path.exists(tabformer_gnn): + os.makedirs(tabformer_gnn) + + # Read the dataset + + data = cudf.read_csv(raw_csv_path) + + _ = data.rename( + columns={ + "Merchant Name": COL_MERCHANT, + "Merchant State": COL_STATE, + "Merchant City": COL_CITY, + "Errors?": COL_ERROR, + "Use Chip": COL_CHIP, + "Is Fraud?": COL_FRAUD, + }, + inplace=True, + ) + + # #### Handle missing values + # * Zip codes are numeral, replace missing zip codes by 0 + # * State and Error are string, replace missing values by marker 'XX' + + # Make sure that 'XX' doesn't exist in State and Error field before we replace missing values by 'XX' + assert UNKNOWN_STRING_MARKER not in set(data[COL_STATE].unique().to_pandas()) + assert UNKNOWN_STRING_MARKER not in set(data[COL_ERROR].unique().to_pandas()) + + # Make sure that 0 or 0.0 doesn't exist in Zip field before we replace missing values by 0 + assert float(0) not in set(data[COL_ZIP].unique().to_pandas()) + assert 0 not in set(data[COL_ZIP].unique().to_pandas()) + + # Replace missing values with markers + data[COL_STATE] = data[COL_STATE].fillna(UNKNOWN_STRING_MARKER) + data[COL_ERROR] = data[COL_ERROR].fillna(UNKNOWN_STRING_MARKER) + data[COL_ZIP] = data[COL_ZIP].fillna(UNKNOWN_ZIP_CODE) + + # There shouldn't be any missing values in the data now. + assert data.isnull().sum().sum() == 0 + + # ### Clean up the Amount field + # * Drop the "$" from the Amount field and then convert from string to float + # * Look into spread of Amount and choose right scaler for it + + # Drop the "$" from the Amount field and then convert from string to float + data[COL_AMOUNT] = ( + data[COL_AMOUNT].str.replace("$", "", regex=False).astype("float") + ) + + # #### Change the 'Fraud' values to be integer where + # * 1 == Fraud + # * 0 == Non-fraud + + fraud_to_binary = {"No": 0, "Yes": 1} + data[COL_FRAUD] = data[COL_FRAUD].map(fraud_to_binary).astype("int8") + + # Remove ',' in error descriptions + data[COL_ERROR] = data[COL_ERROR].str.replace(",", "") + + # Split the time column into hours and minutes and then cast to int32 + T = data[COL_TIME].str.split(":", expand=True) + T[0] = T[0].astype("int32") + T[1] = T[1].astype("int32") + + # replace the 'Time' column with the new columns + data[COL_TIME] = (T[0] * 60) + T[1] + data[COL_TIME] = data[COL_TIME].astype("int32") + + # Delete temporary DataFrame + del T + + # #### Convert Merchant column to str type + data[COL_MERCHANT] = data[COL_MERCHANT].astype("str") + + # Combine User and Card to generate unique numbers + data[COL_CARD] = data[COL_USER] * len(data[COL_CARD].unique()) + data[COL_CARD] + data[COL_CARD] = data[COL_CARD].astype("int") + + # Collect unique merchant, card and MCC in a dataframe and fit a binary transformer + data = data.to_pandas() + + data_ids = pd.DataFrame() + + nr_unique_card = data[COL_CARD].unique().shape[0] + nr_unique_merchant = data[COL_MERCHANT].unique().shape[0] + nr_unique_mcc = data[COL_MCC].unique().shape[0] + nr_elements = max(nr_unique_merchant, nr_unique_card) + + data_ids[COL_CARD] = [data[COL_CARD][0]] * nr_elements + data_ids[COL_MERCHANT] = [data[COL_MERCHANT][0]] * nr_elements + data_ids[COL_MCC] = [data[COL_MCC][0]] * nr_elements + + data_ids.loc[np.arange(nr_unique_card), COL_CARD] = data[COL_CARD].unique() + data_ids.loc[np.arange(nr_unique_merchant), COL_MERCHANT] = data[ + COL_MERCHANT + ].unique() + data_ids.loc[np.arange(nr_unique_mcc), COL_MCC] = data[COL_MCC].unique() + + data_ids = data_ids[MERCHANT_AND_USER_COLS].astype("category") + + id_bin_encoder = Pipeline( + steps=[ + ("binary", BinaryEncoder(handle_missing="value", handle_unknown="value")) + ] + ) + + id_transformer = ColumnTransformer( + transformers=[ + ("binary", id_bin_encoder, MERCHANT_AND_USER_COLS), + ], + remainder="passthrough", + ) + + pd.set_option("future.no_silent_downcasting", True) + id_transformer = id_transformer.fit(data_ids) + + preprocessed_id_data_raw = id_transformer.transform( + data[MERCHANT_AND_USER_COLS].astype("category") + ) + + # transformed column names + columns_of_transformed_id_data = list( + map( + lambda name: name.split("__")[1], + list(id_transformer.get_feature_names_out(MERCHANT_AND_USER_COLS)), + ) + ) + + # data type of transformed columns + id_col_type_mapping = {} + for col in columns_of_transformed_id_data: + if col.split("_")[0] in MERCHANT_AND_USER_COLS: + id_col_type_mapping[col] = "int8" + + assert data_ids.isnull().sum().sum() == 0 + + preprocessed_id_data = pd.DataFrame( + preprocessed_id_data_raw, columns=columns_of_transformed_id_data + ) + + del data_ids + del preprocessed_id_data_raw + + data = pd.concat( + [data.reset_index(drop=True), preprocessed_id_data.reset_index(drop=True)], + axis=1, + ) + + # ##### Compute correlation of different fields with target + sparse_factor = 1 + columns_to_compute_corr = [ + COL_CARD, + COL_CHIP, + COL_ERROR, + COL_STATE, + COL_CITY, + COL_ZIP, + COL_MCC, + COL_MERCHANT, + COL_USER, + COL_DAY, + COL_MONTH, + COL_YEAR, + ] + for c1 in columns_to_compute_corr: + for c2 in [COL_FRAUD]: + coff = 100 * cramers_v(data[c1][::sparse_factor], data[c2][::sparse_factor]) + logger.info(f"Correlation ({c1}, {c2}) = {coff:6.2f}%") + + # ### Correlation of target with numerical columns + + for col in [COL_TIME, COL_AMOUNT]: + r_pb, p_value = pointbiserialr( + # data[COL_FRAUD].to_pandas(), data[col].to_pandas() + data[COL_FRAUD], + data[col], + ) + logger.info(f"r_pb ({col}) = {r_pb:3.2f} with p_value {p_value:3.2f}") + + numerical_predictors = [COL_AMOUNT] + nominal_predictors = [ + COL_ERROR, + COL_CARD, + COL_CHIP, + COL_CITY, + COL_ZIP, + COL_MCC, + COL_MERCHANT, + ] + + predictor_columns = numerical_predictors + nominal_predictors + target_column = [COL_FRAUD] + + # #### Remove duplicates non-fraud data points + + # Remove duplicates data points + fraud_data = data[data[COL_FRAUD] == 1] + data = data[data[COL_FRAUD] == 0] + + data = data.drop_duplicates(subset=nominal_predictors) + data = pd.concat([data, fraud_data]) + + # ### Split the data into + # The data will be split into thee groups based on event date + # * Training - all data before 2018 + # * Validation - all data during 2018 + # * Test. - all data after 2018 + + if under_sample: + fraud_df = data[data[COL_FRAUD] == 1] + non_fraud_df = data[data[COL_FRAUD] == 0] + nr_non_fraud_samples = min( + (len(data) - len(fraud_df)), int(len(fraud_df) / fraud_ratio) + ) + data = pd.concat( + [fraud_df, non_fraud_df.sample(nr_non_fraud_samples, random_state=42)] + ) + + predictor_columns = list(set(predictor_columns) - set(MERCHANT_AND_USER_COLS)) + nominal_predictors = list(set(nominal_predictors) - set(MERCHANT_AND_USER_COLS)) + + data = data.sample(frac=1, random_state=42).reset_index(drop=True) + + training_idx = data[COL_YEAR] < 2018 + validation_idx = data[COL_YEAR] == 2018 + test_idx = data[COL_YEAR] > 2018 + + # ### Scale numerical columns and encode categorical columns of training data + + # As some of the encoder we want to use is not available in cuml, we can use pandas for now. + # Move training data to pandas for preprocessing + pdf_training = data[training_idx][predictor_columns + target_column] + + # Use one-hot encoding for columns with <= 8 categories, and binary encoding for columns with more categories + columns_for_binary_encoding = [] + columns_for_one_hot_encoding = [] + for col in nominal_predictors: + if len(data[col].unique()) <= 8: + columns_for_one_hot_encoding.append(col) + else: + columns_for_binary_encoding.append(col) + + assert (training_idx.sum() + validation_idx.sum() + test_idx.sum()) == data.shape[0] + + # Mark categorical column as "category" + pdf_training[nominal_predictors] = pdf_training[nominal_predictors].astype( + "category" + ) + + # encoders to encode categorical columns and scalers to scale numerical columns + + bin_encoder = Pipeline( + steps=[ + ("binary", BinaryEncoder(handle_missing="value", handle_unknown="value")) + ] + ) + one_hot_encoder = Pipeline(steps=[("onehot", OneHotEncoder())]) + + robust_scaler = Pipeline( + steps=[ + ("imputer", SimpleImputer(strategy="median")), + ("robust", RobustScaler()), + ], + ) + + # compose encoders and scalers in a column transformer + transformer = ColumnTransformer( + transformers=[ + ("binary", bin_encoder, columns_for_binary_encoding), + ("onehot", one_hot_encoder, columns_for_one_hot_encoding), + ("robust", robust_scaler, [COL_AMOUNT]), + ], + remainder="passthrough", + ) + + # Fit column transformer with training data + + pd.set_option("future.no_silent_downcasting", True) + transformer = transformer.fit(pdf_training[predictor_columns]) + + # transformed column names + columns_of_transformed_txs = list( + map( + lambda name: name.split("__")[1], + list(transformer.get_feature_names_out(predictor_columns)), + ) + ) + + # data type of transformed columns + type_mapping = {} + for col in columns_of_transformed_txs: + if col.split("_")[0] in nominal_predictors: + type_mapping[col] = "int8" + elif col in numerical_predictors: + type_mapping[col] = "float" + elif col in target_column: + type_mapping[col] = data.dtypes.to_dict()[col] + + # transform training data + preprocessed_training_data = transformer.transform(pdf_training[predictor_columns]) + + # Convert transformed data to panda DataFrame + preprocessed_training_data = pd.DataFrame( + preprocessed_training_data, columns=columns_of_transformed_txs + ) + + # Transform test data using the transformer fitted on training data + pdf_test = data[test_idx][predictor_columns + target_column] + pdf_test[nominal_predictors] = pdf_test[nominal_predictors].astype("category") + + preprocessed_test_data = transformer.transform(pdf_test[predictor_columns]) + preprocessed_test_data = pd.DataFrame( + preprocessed_test_data, columns=columns_of_transformed_txs + ) + + # Transform validation data using the transformer fitted on training data + pdf_validation = data[validation_idx][predictor_columns + target_column] + pdf_validation[nominal_predictors] = pdf_validation[nominal_predictors].astype( + "category" + ) + + preprocessed_validation_data = transformer.transform( + pdf_validation[predictor_columns] + ) + preprocessed_validation_data = pd.DataFrame( + preprocessed_validation_data, columns=columns_of_transformed_txs + ) + + preprocessed_id_data_train = pd.DataFrame( + id_transformer.transform(data[training_idx][MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + ) + preprocessed_training_data = pd.concat( + [preprocessed_training_data, preprocessed_id_data_train], axis=1 + ) + + # ## Write out the data for XGB + + # Copy target column + preprocessed_training_data[COL_FRAUD] = pdf_training[COL_FRAUD].values + preprocessed_training_data = preprocessed_training_data.astype(type_mapping) + + assert preprocessed_training_data.columns[-1] == COL_FRAUD + assert ( + set(preprocessed_training_data.columns) + - set( + columns_of_transformed_txs + columns_of_transformed_id_data + target_column + ) + == set() + ) + assert ( + set(columns_of_transformed_txs + columns_of_transformed_id_data + target_column) + - set(preprocessed_training_data.columns) + == set() + ) + + ## Training data + out_path = os.path.join(tabformer_xgb, "training.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + preprocessed_training_data.to_csv( + out_path, + header=True, + index=False, + columns=preprocessed_training_data.columns, + ) + + preprocessed_id_data_val = pd.DataFrame( + id_transformer.transform(data[validation_idx][MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + ) + preprocessed_validation_data = pd.concat( + [preprocessed_validation_data, preprocessed_id_data_val], axis=1 + ) + + # Copy target column + preprocessed_validation_data[COL_FRAUD] = pdf_validation[COL_FRAUD].values + preprocessed_validation_data = preprocessed_validation_data.astype(type_mapping) + + assert preprocessed_validation_data.columns[-1] == COL_FRAUD + assert ( + set(preprocessed_validation_data.columns) + - set( + columns_of_transformed_txs + columns_of_transformed_id_data + target_column + ) + == set() + ) + assert ( + set(columns_of_transformed_txs + columns_of_transformed_id_data + target_column) + - set(preprocessed_validation_data.columns) + == set() + ) + + ## validation data + out_path = os.path.join(tabformer_xgb, "validation.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + preprocessed_validation_data.to_csv( + out_path, + header=True, + index=False, + columns=preprocessed_validation_data.columns, + ) + # preprocessed_validation_data.to_parquet(out_path, index=False, compression='gzip') + + preprocessed_id_data_test = pd.DataFrame( + id_transformer.transform(data[test_idx][MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + ) + preprocessed_test_data = pd.concat( + [preprocessed_test_data, preprocessed_id_data_test], axis=1 + ) + + # Copy target column + preprocessed_test_data[COL_FRAUD] = pdf_test[COL_FRAUD].values + preprocessed_test_data = preprocessed_test_data.astype(type_mapping) + + assert preprocessed_test_data.columns[-1] == COL_FRAUD + assert ( + set(preprocessed_test_data.columns) + - set( + columns_of_transformed_txs + columns_of_transformed_id_data + target_column + ) + == set() + ) + assert ( + set(columns_of_transformed_txs + columns_of_transformed_id_data + target_column) + - set(preprocessed_test_data.columns) + == set() + ) + + ## test data + out_path = os.path.join(tabformer_xgb, "test.csv") + preprocessed_test_data.to_csv( + out_path, + header=True, + index=False, + columns=preprocessed_test_data.columns, + ) + + # Delete dataFrames that are not needed anymore + del pdf_training + del pdf_validation + del pdf_test + del preprocessed_training_data + del preprocessed_validation_data + del preprocessed_test_data + + # ### GNN Data + + # #### Setting Vertex IDs + # In order to create a graph, the different vertices need to be assigned + # unique vertex IDs. Additionally, the IDs needs to be consecutive + # and positive. + # + # There are three nodes groups here: Transactions, Users, and Merchants. + # + # This IDs are not used in training, just used for graph processing. + + # Use the same training data as used for XGBoost + + data_all = data.copy() + data = pd.concat([data[training_idx], data[validation_idx]]) + data.reset_index(inplace=True, drop=True) + + # The number of transaction is the same as the size of the list, and hence the index value + data[COL_TRANSACTION_ID] = data.index + + merchant_name_to_id = dict( + zip( + data[COL_MERCHANT].unique(), + np.arange(len(data[COL_MERCHANT].unique())), + strict=False, + ) + ) + + data[COL_MERCHANT_ID] = data[COL_MERCHANT].map(merchant_name_to_id) + + # ##### NOTE: the 'User' and 'Card' columns of the original data were used to crate updated 'Card' column + # * You can use user or card as nodes + + id_to_consecutive_id = dict( + zip( + data[COL_CARD].unique(), + np.arange(len(data[COL_CARD].unique())), + strict=False, + ) + ) + + # Convert Card to consecutive IDs + data[COL_USER_ID] = data[COL_CARD].map(id_to_consecutive_id) + + NR_USERS = data[COL_USER_ID].max() + 1 + NR_MXS = data[COL_MERCHANT_ID].max() + 1 + NR_TXS = data[COL_TRANSACTION_ID].max() + 1 + + # Check the the transaction, merchant and user ids are consecutive + id_range = data[COL_MERCHANT_ID].min(), data[COL_MERCHANT_ID].max() + logger.info(f"Merchant ID range {id_range}") + id_range = data[COL_USER_ID].min(), data[COL_USER_ID].max() + logger.info(f"User ID range {id_range}") + + # #### Create Edge in COO format + + U_2_M = cudf.DataFrame() + U_2_M[COL_GRAPH_SRC] = data[COL_USER_ID] + U_2_M[COL_GRAPH_DST] = data[COL_MERCHANT_ID] + + Edge = cudf.concat([U_2_M]) + + # Write out Edge data + out_path = os.path.join(tabformer_gnn, "edges/user_to_merchant.csv") + + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + + Edge.to_csv(out_path, header=True, index=False) + + # ### Now the feature data + # Feature data needs to be is sorted in order, where the row index corresponds to the node ID + # + # The data is comprised of three sets of features + # * Transactions + # * Merchants + # * Users + + # #### To get feature vectors of Transaction, transform the training data using pre-fitted transformer + + transaction_feature_df = pd.DataFrame( + transformer.transform(data[predictor_columns]), + columns=columns_of_transformed_txs, + ).astype(type_mapping) + + transaction_feature_df[COL_FRAUD] = data[COL_FRAUD] + + data_merchant = data[[COL_MERCHANT, COL_MCC, COL_CARD]].drop_duplicates( + subset=[COL_MERCHANT] + ) + data_merchant[COL_MERCHANT_ID] = data_merchant[COL_MERCHANT].map( + merchant_name_to_id + ) + data_merchant_sorted = data_merchant.sort_values(by=COL_MERCHANT_ID) + + data_user = data[[COL_MERCHANT, COL_MCC, COL_CARD]].drop_duplicates( + subset=[COL_CARD] + ) + data_user[COL_USER_ID] = data_user[COL_CARD].map(id_to_consecutive_id) + data_user_sorted = data_user.sort_values(by=COL_USER_ID) + + user_feature_columns = [] + mx_feature_columns = [] + for c in columns_of_transformed_id_data: + if c.startswith("Card"): + user_feature_columns.append(c) + else: + mx_feature_columns.append(c) + + preprocessed_merchant_data = pd.DataFrame( + id_transformer.transform(data_merchant_sorted[MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + )[mx_feature_columns] + + preprocessed_user_data = pd.DataFrame( + id_transformer.transform(data_user_sorted[MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + )[user_feature_columns] + + # User features + + out_path = os.path.join(tabformer_gnn, "nodes/user.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + + preprocessed_user_data.to_csv( + out_path, header=True, index=False, columns=user_feature_columns + ) + + # Merchant features + + out_path = os.path.join(tabformer_gnn, "nodes/merchant.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + preprocessed_merchant_data.to_csv( + out_path, header=True, index=False, columns=mx_feature_columns + ) + + # User to merchant edge labels + + out_path = os.path.join(tabformer_gnn, "edges/user_to_merchant_label.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + transaction_feature_df[[COL_FRAUD]].to_csv( + out_path, header=True, index=False, columns=[COL_FRAUD] + ) + + # User to merchant edge features + + out_path = os.path.join(tabformer_gnn, "edges/user_to_merchant_attr.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + transaction_feature_df[columns_of_transformed_txs].to_csv( + out_path, header=True, index=False, columns=columns_of_transformed_txs + ) + + # # Create and save feature masks for training data + user_mask_map, user_mask = create_feature_mask(user_feature_columns, 0) + mx_mask_map, mx_mask = create_feature_mask( + mx_feature_columns, np.max(user_mask) + 1 + ) + tx_mask_map, tx_mask = create_feature_mask( + columns_of_transformed_txs, np.max(mx_mask) + 1 + ) + + # np.savetxt( + # os.path.join(tabformer_gnn, "nodes/user_feature_mask.csv"), + # user_mask, + # delimiter=",", + # fmt="%d", + # ) + # np.savetxt( + # os.path.join(tabformer_gnn, "nodes/merchant_feature_mask.csv"), + # mx_mask, + # delimiter=",", + # fmt="%d", + # ) + # np.savetxt( + # os.path.join(tabformer_gnn, "edges/user_to_merchant_feature_mask.csv"), + # tx_mask, + # delimiter=",", + # fmt="%d", + # ) + + ## Test data + + data = data_all[test_idx].copy() + + data.reset_index(inplace=True, drop=True) + + # The number of transaction is the same as the size of the list, and hence the index value + data[COL_TRANSACTION_ID] = data.index + + merchant_name_to_id = dict( + zip( + data[COL_MERCHANT].unique(), + np.arange(len(data[COL_MERCHANT].unique())), + strict=False, + ) + ) + + data[COL_MERCHANT_ID] = data[COL_MERCHANT].map(merchant_name_to_id) + + # ##### NOTE: the 'User' and 'Card' columns of the original data were used to crate updated 'Card' column + # * You can use user or card as nodes + + id_to_consecutive_id = dict( + zip( + data[COL_CARD].unique(), + np.arange(len(data[COL_CARD].unique())), + strict=False, + ) + ) + + # Convert Card to consecutive IDs + data[COL_USER_ID] = data[COL_CARD].map(id_to_consecutive_id) + + # Check the the transaction, merchant and user ids are consecutive + id_range = data[COL_MERCHANT_ID].min(), data[COL_MERCHANT_ID].max() + logger.info(f"Merchant ID range {id_range}") + id_range = data[COL_USER_ID].min(), data[COL_USER_ID].max() + logger.info(f"User ID range {id_range}") + + NR_USERS = data[COL_USER_ID].max() + 1 + NR_MXS = data[COL_MERCHANT_ID].max() + 1 + NR_TXS = data[COL_TRANSACTION_ID].max() + 1 + + # #### Test Edges in COO format + + # User to Merchant edges + U_2_M = cudf.DataFrame() + U_2_M[COL_GRAPH_SRC] = data[COL_USER_ID] + U_2_M[COL_GRAPH_DST] = data[COL_MERCHANT_ID] + + Edge = cudf.concat([U_2_M]) + + # Write out Edge data + out_path = os.path.join(tabformer_gnn, "test_gnn/edges/user_to_merchant.csv") + + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + + Edge.to_csv(out_path, header=True, index=False) + + # ### Now the feature data + # Feature data needs to be is sorted in order, where the row index corresponds to the node ID + # + # The data is comprised of three sets of features + # * Transactions + # * Merchants + # * Users + + transaction_feature_df = pd.DataFrame( + transformer.transform(data[predictor_columns]), + columns=columns_of_transformed_txs, + ).astype(type_mapping) + + transaction_feature_df[COL_FRAUD] = data[COL_FRAUD] + + data_merchant = data[[COL_MERCHANT, COL_MCC, COL_CARD]].drop_duplicates( + subset=[COL_MERCHANT] + ) + data_merchant[COL_MERCHANT_ID] = data_merchant[COL_MERCHANT].map( + merchant_name_to_id + ) + data_merchant_sorted = data_merchant.sort_values(by=COL_MERCHANT_ID) + + data_user = data[[COL_MERCHANT, COL_MCC, COL_CARD]].drop_duplicates( + subset=[COL_CARD] + ) + data_user[COL_USER_ID] = data_user[COL_CARD].map(id_to_consecutive_id) + data_user_sorted = data_user.sort_values(by=COL_USER_ID) + + preprocessed_merchant_data = pd.DataFrame( + id_transformer.transform(data_merchant_sorted[MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + )[mx_feature_columns] + + preprocessed_user_data = pd.DataFrame( + id_transformer.transform(data_user_sorted[MERCHANT_AND_USER_COLS]), + columns=columns_of_transformed_id_data, + )[user_feature_columns] + + ## feature matrix + + out_path = os.path.join(tabformer_gnn, "test_gnn/nodes/user.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + + preprocessed_user_data.to_csv( + out_path, header=True, index=False, columns=user_feature_columns + ) + + out_path = os.path.join(tabformer_gnn, "test_gnn/nodes/merchant.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + preprocessed_merchant_data.to_csv( + out_path, header=True, index=False, columns=mx_feature_columns + ) + + out_path = os.path.join(tabformer_gnn, "test_gnn/edges/user_to_merchant_label.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + transaction_feature_df[[COL_FRAUD]].to_csv( + out_path, header=True, index=False, columns=[COL_FRAUD] + ) + + out_path = os.path.join(tabformer_gnn, "test_gnn/edges/user_to_merchant_attr.csv") + if not os.path.exists(os.path.dirname(out_path)): + os.makedirs(os.path.dirname(out_path)) + + transaction_feature_df[columns_of_transformed_txs].to_csv( + out_path, header=True, index=False, columns=columns_of_transformed_txs + ) + + # Test feature masks were already created for training data, just save them for test + np.savetxt( + os.path.join(tabformer_gnn, "test_gnn/nodes/user_feature_mask.csv"), + user_mask, + delimiter=",", + fmt="%d", + ) + np.savetxt( + os.path.join(tabformer_gnn, "test_gnn/nodes/merchant_feature_mask.csv"), + mx_mask, + delimiter=",", + fmt="%d", + ) + np.savetxt( + os.path.join(tabformer_gnn, "test_gnn/edges/user_to_merchant_feature_mask.csv"), + tx_mask, + delimiter=",", + fmt="%d", + ) + + metadata = { + "row_count": len(data_all), + "fraud_ratio": float(fraud_ratio), + "under_sample": under_sample, + "num_users": int(NR_USERS), + "num_merchants": int(NR_MXS), + "num_transactions": int(NR_TXS), + } + return metadata, user_mask_map, mx_mask_map, tx_mask_map + + +def load_hetero_graph(base): + """ + Reads: + - All node CSVs from nodes/, plus their matching feature masks (_feature_mask.csv) + If missing, a mask of all ones is created (np.int32). + - All edge CSVs from edges/: + base -> edge_index_ (np.int64) + *_attr.csv -> edge_attr_ (np.float32) + *_label.csv -> exactly one -> edge_label_ (DataFrame) + """ + + nodes_dir = os.path.join(base, "nodes") + edges_dir = os.path.join(base, "edges") + + out = {} + node_feature_mask = {} + + # --- Nodes: every CSV becomes x_; also read/create feature_mask_ --- + if os.path.isdir(nodes_dir): + for fname in os.listdir(nodes_dir): + if fname.lower().endswith(".csv") and not fname.lower().endswith( + "_feature_mask.csv" + ): + node_name = fname[: -len(".csv")] + node_path = os.path.join(nodes_dir, fname) + node_df = pd.read_csv(node_path) + out[f"x_{node_name}"] = node_df.to_numpy(dtype=np.float32) + + # feature mask file (optional) + mask_fname = f"{node_name}_feature_mask.csv" + mask_path = os.path.join(nodes_dir, mask_fname) + if os.path.exists(mask_path): + mask_df = pd.read_csv(mask_path, header=None) + node_feature_mask[node_name] = mask_df + feature_mask = mask_df.to_numpy(dtype=np.int32).ravel() + else: + # create a mask with all zeros + feature_mask = np.zeros(node_df.shape[1], dtype=np.int32) + out[f"feature_mask_{node_name}"] = feature_mask + + # --- Edges: group into base, attr, label by filename suffix --- + base_edges = {} + edge_attrs = {} + edge_labels = {} + edge_feature_mask = {} + + if os.path.isdir(edges_dir): + for fname in os.listdir(edges_dir): + if not fname.lower().endswith(".csv"): + continue + path = os.path.join(edges_dir, fname) + lower = fname.lower() + if lower.endswith("_attr.csv"): + edge_name = fname[: -len("_attr.csv")] + edge_attrs[edge_name] = pd.read_csv(path) # , header=None) + elif lower.endswith("_label.csv"): + edge_name = fname[: -len("_label.csv")] + edge_labels[edge_name] = pd.read_csv(path) + elif lower.endswith("_feature_mask.csv"): + edge_name = fname[: -len("_feature_mask.csv")] + edge_feature_mask[edge_name] = pd.read_csv(path, header=None) + else: + edge_name = fname[: -len(".csv")] + base_edges[edge_name] = pd.read_csv(path) # , header=None) + + # Enforce: only one label file total + if len(edge_labels) == 0: + raise FileNotFoundError( + "No '*_label.csv' found in edges/. Exactly one label file is required." + ) + if len(edge_labels) > 1: + raise ValueError( + f"Found multiple label files: {list(edge_labels.keys())}. Exactly one is allowed." + ) + + # Build output keys for edges + for edge_name, df in base_edges.items(): + out[f"edge_index_{edge_name}"] = df.to_numpy(dtype=np.int64).T + if edge_name in edge_attrs: + out[f"edge_attr_{edge_name}"] = edge_attrs[edge_name].to_numpy( + dtype=np.float32 + ) + if edge_name in edge_feature_mask: + out[f"edge_feature_mask_{edge_name}"] = ( + edge_feature_mask[edge_name].to_numpy(dtype=np.int32).ravel() + ) + else: + # create a mask with all zeros + out[f"edge_feature_mask_{edge_name}"] = np.zeros( + edge_attrs[edge_name].shape[1], dtype=np.int32 + ) + + # Add the single label file (kept as DataFrame) + ((label_edge_name, label_df),) = edge_labels.items() + out[f"edge_label_{label_edge_name}"] = label_df + + return out + + +def prepare_bipartite_structures(edge_list: np.ndarray): + """ + From a (2, E) edge array (row0=A, row1=B): + - returns unique node arrays A_nodes, B_nodes + - returns neighbor dicts neighbors_A[a]->set(B), neighbors_B[b]->set(A) + - returns highest-degree node in A, plus its 1-hop (B) and 2-hop (A) neighbors + """ + assert ( + edge_list.ndim == 2 and edge_list.shape[0] == 2 + ), "edge_list must be shape (2, E)" + A_nodes = np.unique(edge_list[0, :]) + B_nodes = np.unique(edge_list[1, :]) + + # Neighbor maps + neighbors_A = {int(a): set() for a in A_nodes} + neighbors_B = {int(b): set() for b in B_nodes} + for a, b in edge_list.T: + a = int(a) + b = int(b) + neighbors_A[a].add(b) + neighbors_B[b].add(a) + + # Degrees in A and highest-degree anchor + degrees_A = {a: len(neighbors_A[a]) for a in neighbors_A} + if not degrees_A: + raise ValueError("No nodes found in partition A.") + + max_deg = min(5, max(degrees_A.values())) + # deterministic tie-break: smallest node id + anchor_A = min([a for a, d in degrees_A.items() if d == max_deg]) + + # 1-hop (B) and 2-hop (A) around the anchor + one_hop_B = sorted(neighbors_A[anchor_A]) + two_hop_A = set() + + for b in one_hop_B: + two_hop_A.update(list(neighbors_B[b])[:3]) + two_hop_A.discard(anchor_A) + two_hop_A = sorted(two_hop_A) + + return { + "A_nodes": A_nodes, + "B_nodes": B_nodes, + "neighbors_A": neighbors_A, + "neighbors_B": neighbors_B, + "degrees_A": degrees_A, + "anchor_A": anchor_A, + "anchor_degree": max_deg, + "one_hop_B": one_hop_B, + "two_hop_A": two_hop_A, + } + + +def build_bipartite_graph(edge_list: np.ndarray) -> nx.Graph: + """Build a NetworkX bipartite graph with node attribute 'bipartite' (0 for A, 1 for B).""" + A_nodes = np.unique(edge_list[0, :]).astype(int) + B_nodes = np.unique(edge_list[1, :]).astype(int) + G = nx.Graph() + G.add_nodes_from(A_nodes, bipartite=0) + G.add_nodes_from(B_nodes, bipartite=1) + G.add_edges_from([(int(a), int(b)) for a, b in edge_list.T]) + return G + + +def induced_ego_two_hop_subgraph_namespaced(edge_list: np.ndarray): + """ + Same logic as your induced_ego_two_hop_subgraph, but nodes are namespaced: + A-node a -> ('A', a) + B-node b -> ('B', b) + This guarantees edges draw only between the partitions. + """ + info = prepare_bipartite_structures(edge_list) + + A_nodes = info["A_nodes"].astype(int) + B_nodes = info["B_nodes"].astype(int) + A_map = {a: ("A", a) for a in A_nodes} + B_map = {b: ("B", b) for b in B_nodes} + + anchor_A = info["anchor_A"] + one_hop_B = set(info["one_hop_B"]) + two_hop_A = set(info["two_hop_A"]) + + # Build subgraph with namespaced nodes and bipartite attribute + G_sub = nx.Graph() + G_sub.add_nodes_from([A_map[anchor_A], *[A_map[a] for a in two_hop_A]], bipartite=0) + G_sub.add_nodes_from([B_map[b] for b in one_hop_B], bipartite=1) + + for a, b in edge_list.T: + a = int(a) + b = int(b) + if (a == anchor_A or a in two_hop_A) and (b in one_hop_B): + G_sub.add_edge(A_map[a], B_map[b]) + + # Return maps so the plotter can highlight the anchor + return G_sub, info, A_map, B_map + + +def plot_bipartite_subgraph_namespaced(G_sub: nx.Graph, info: dict, A_map: dict): + """ + Plot the 2-hop ego subgraph with namespaced nodes. + Circles = A, Squares = B. The anchor A-node is larger. + """ + anchor_tuple = A_map[info["anchor_A"]] + sub_A = [n for n, d in G_sub.nodes(data=True) if d.get("bipartite") == 0] + sub_B = [n for n, d in G_sub.nodes(data=True) if d.get("bipartite") == 1] + + pos = nx.bipartite_layout(G_sub, nodes=sub_A) + + plt.figure(figsize=(6, 5)) + nx.draw_networkx_nodes( + G_sub, + pos, + nodelist=sub_A, + node_size=[600 if n == anchor_tuple else 300 for n in sub_A], + node_shape="o", + ) + nx.draw_networkx_nodes(G_sub, pos, nodelist=sub_B, node_size=400, node_shape="s") + nx.draw_networkx_edges(G_sub, pos) + + # Pretty labels: show only the numeric part (n[1]) + labels = {n: str(n[1]) for n in G_sub.nodes()} + nx.draw_networkx_labels(G_sub, pos, labels=labels, font_size=10) + + plt.title("A subgraph with a few user and merchant nodes.") + plt.axis("off") + plt.show() diff --git a/source/examples/fraud-detection-mlops-pipeline/triton/Dockerfile b/source/examples/fraud-detection-mlops-pipeline/triton/Dockerfile new file mode 100644 index 00000000..7b17825a --- /dev/null +++ b/source/examples/fraud-detection-mlops-pipeline/triton/Dockerfile @@ -0,0 +1,71 @@ +FROM nvcr.io/nvidia/tritonserver:25.04-py3 + +WORKDIR /opt + +ENV DEBIAN_FRONTEND=noninteractive +ENV CONDA_DIR=/opt/miniconda + +# Install base dependencies and clean cache +RUN apt-get update && apt-get install -y \ + wget curl git build-essential cmake sudo \ + # libnccl2 libnccl-dev \ + libopenblas-dev libssl-dev libtinfo-dev \ + && rm -rf /var/lib/apt/lists/* + + +# Ensure bash features and a stable PATH +SHELL ["/bin/bash", "-o", "pipefail", "-c"] +ENV CONDA_DIR=/opt/miniconda +ENV PATH=${CONDA_DIR}/bin:${PATH} + + +RUN set -eux; \ + ARCH="$(uname -m)"; \ + if [ "$ARCH" = "aarch64" ]; then \ + MINICONDA_URL="https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-aarch64.sh"; \ + CONDA_SUBDIR=linux-aarch64; \ + else \ + MINICONDA_URL="https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh"; \ + CONDA_SUBDIR=linux-64; \ + fi; \ + wget -q "$MINICONDA_URL" -O miniconda.sh; \ + bash miniconda.sh -b -p "$CONDA_DIR"; \ + rm miniconda.sh; \ + conda config --set subdir "$CONDA_SUBDIR"; \ + conda config --set always_yes true; \ + # Accept TOS + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/main; \ + conda tos accept --override-channels --channel https://repo.anaconda.com/pkgs/r; \ + conda update -n base -c defaults conda + + +# Set Conda Python as default +ENV PATH="$CONDA_DIR/bin:$PATH" +ENV PYTHONPATH="$CONDA_DIR/lib/python3.12/site-packages:$PYTHONPATH" +ENV LD_LIBRARY_PATH="$CONDA_DIR/lib:$LD_LIBRARY_PATH" +ENV CONDA_OVERRIDE_CUDA=12.8 +ENV CONDA_ALWAYS_YES=true + +RUN conda install python=3.12 + +# Install GPU-enabled PyTorch and XGBoost + +RUN conda install --override-channels -c nvidia -c conda-forge \ + pytorch=2.7.0=*cuda126* \ + py-xgboost=3.0.2=*cuda128* \ + ncurses + +RUN conda install --override-channels -c conda-forge cupy=13.4.1 +# Install PyTorch Geometric and Captum for CUDA 12.6 + torch 2.7.0 +RUN $CONDA_DIR/bin/pip install \ + torch-geometric==2.6.1 \ + captum==0.7.0 \ + --extra-index-url https://data.pyg.org/whl/torch-2.7.0+cu126.html + +RUN conda install -y --override-channels -c rapidsai -c nvidia -c conda-forge cudf=25.04 python=3.12 + +# Validate installation with explicit Python path +RUN python -c "import torch; print('Torch:', torch.__version__, '| CUDA:', torch.cuda.is_available())" +RUN python -c "import xgboost; print('XGBoost:', xgboost.__version__)" +RUN python -c "import captum; print('Captum:', captum.__version__)" +RUN python -c "import torch_geometric; print('PyG:', torch_geometric.__version__)" diff --git a/source/examples/index.md b/source/examples/index.md index 2dcc2c27..4a5843ad 100644 --- a/source/examples/index.md +++ b/source/examples/index.md @@ -24,6 +24,7 @@ rapids-snowflake-cudf/notebook cuml-snowflake-nb/notebook rapids-coiled-cudf/notebook rapids-morpheus-pipeline/notebook +fraud-detection-mlops-pipeline/notebook lulc-classification-gpu/notebook cuml-ray-hpo/notebook ``` diff --git a/source/images/mlflow-experiment-table.png b/source/images/mlflow-experiment-table.png new file mode 100644 index 00000000..9383956e Binary files /dev/null and b/source/images/mlflow-experiment-table.png differ diff --git a/source/images/mlflow-model-registry.png b/source/images/mlflow-model-registry.png new file mode 100644 index 00000000..f7ebdf7e Binary files /dev/null and b/source/images/mlflow-model-registry.png differ diff --git a/source/images/mlflow-run-details.png b/source/images/mlflow-run-details.png new file mode 100644 index 00000000..09c53816 Binary files /dev/null and b/source/images/mlflow-run-details.png differ diff --git a/source/images/prefect-flow-runs.png b/source/images/prefect-flow-runs.png new file mode 100644 index 00000000..82a0faff Binary files /dev/null and b/source/images/prefect-flow-runs.png differ