Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ RUN export MAX_JOBS=$(( $(nproc) > 16 ? 16 : $(nproc) )) && \
echo "ERROR: GHSA-72hv-8253-57qq not fixed — ray_dist.jar still present after deletion" && exit 1; \
fi

# Ray nightly wheels omit the prebuilt dashboard frontend (client/build is an npm artifact),
# so the dashboard process dies with FrontendNotFoundError and its HTTP/API server never
# registers — which breaks every ray.util.state call (cosmos-xenna uses it to drive the
# pipeline, failing with "Could not read 'dashboard' from GCS"). Stub the build dir so the
# dashboard + API server start (the web UI itself is unused). No-op on stable wheels that
# already ship client/build. Remove once the nightly wheel bundles the frontend.
# Kept as its own layer so it does not invalidate the expensive uv sync layer above.
RUN mkdir -p /opt/venv/lib/python3.13/site-packages/ray/dashboard/client/build/static

# Patch wandb-core: bump google.golang.org/grpc from 1.79.2 to 1.79.3 (CVE fix)
ARG TARGETARCH
RUN if python3 -c "import wandb" 2>/dev/null; then \
Expand Down
126 changes: 126 additions & 0 deletions docker/Dockerfile_inference
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

ARG CUDA_VER=12.9.1
ARG LINUX_VER=ubuntu24.04

FROM nvidia/cuda:${CUDA_VER}-cudnn-devel-${LINUX_VER} AS cuda


########################################################################
# Base image
########################################################################

FROM cuda AS build

ARG CURATOR_ENV=ci
ENV CURATOR_ENVIRONMENT=${CURATOR_ENV}

ENV NVIDIA_PRODUCT_NAME="NeMo Curator Inference Nightly"

ENV PIP_BREAK_SYSTEM_PACKAGES=1
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
software-properties-common \
curl \
git \
vim && \
add-apt-repository -y ppa:deadsnakes/ppa && \
apt-get update && apt-get install -y --no-install-recommends \
python3.13 \
python3.13-dev \
python3.13-venv \
python-is-python3 && \
apt-get install -y --only-upgrade gnupg && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

WORKDIR /opt

RUN curl -LsSf https://astral.sh/uv/install.sh | sh
ENV PATH="/root/.local/bin:$PATH"
ENV UV_PROJECT_ENVIRONMENT=/opt/venv
ENV UV_CACHE_DIR=/opt/uv_cache
ENV PATH="$UV_PROJECT_ENVIRONMENT/bin:$PATH"
ENV UV_LINK_MODE=copy
RUN uv venv ${UV_PROJECT_ENVIRONMENT} --python /usr/bin/python3.13 --system-site-packages --seed


########################################################################
# Focused inference image
########################################################################

FROM build AS nemo_curator_inference

WORKDIR /opt

# Required by the Dynamo inference backend.
COPY docker/common/install_etcd_nats.sh .
RUN bash install_etcd_nats.sh && \
rm install_etcd_nats.sh

# Required by Ray Serve HAProxy ingress mode.
COPY docker/common/install_haproxy.sh .
RUN bash install_haproxy.sh && \
rm install_haproxy.sh
ENV RAY_SERVE_HAPROXY_BINARY_PATH=/usr/local/bin/haproxy

ARG CURATOR_EXTRAS="inference_server interleaved_cpu text_cpu"
ARG CURATOR_GROUPS="--no-default-groups"
WORKDIR /opt/Curator

COPY pyproject.toml uv.lock /opt/Curator/
COPY nemo_curator/__init__.py nemo_curator/package_info.py /opt/Curator/nemo_curator/

# Install only the Curator dependency surface needed for inference plus CPU text/interleaved workflows.
RUN set -eux; \
extras_args=""; \
for extra in ${CURATOR_EXTRAS}; do \
extras_args="${extras_args} --extra ${extra}"; \
done; \
uv sync --link-mode copy --locked ${extras_args} ${CURATOR_GROUPS} --no-cache

# Keep the image dependency graph owned by pyproject.toml/uv.lock. This cleanup is
# intentionally post-sync hygiene only; it must not install or upgrade packages.
RUN set -eux; \
find /opt/venv -type d -path "*ray/_private/runtime_env/agent/thirdparty_files/aiohttp*" -exec rm -rf {} +; \
find /opt/venv -name "ray_dist.jar" -delete; \
if find /opt/venv -name "ray_dist.jar" | grep -q .; then \
echo "ERROR: GHSA-72hv-8253-57qq not fixed - ray_dist.jar still present after deletion" && exit 1; \
fi

COPY . /opt/Curator

COPY <<EOF_ENV /opt/venv/env.sh
export UV_PROJECT_ENVIRONMENT=/opt/venv
export PATH="/opt/venv/bin:$PATH"
export UV_LINK_MODE=copy
export PATH="/root/.local/bin:$PATH"
export RAY_SERVE_HAPROXY_BINARY_PATH=/usr/local/bin/haproxy
EOF_ENV

RUN chmod +x /opt/venv/env.sh && \
python - <<'PY'
import importlib.metadata as metadata

for package in ("nemo-curator", "ray", "ai-dynamo"):
print(f"{package}={metadata.version(package)}")
PY

ARG NVIDIA_BUILD_ID
ENV NVIDIA_BUILD_ID=${NVIDIA_BUILD_ID:-<unknown>}
LABEL com.nvidia.build.id="${NVIDIA_BUILD_ID}"
ARG NVIDIA_BUILD_REF
LABEL com.nvidia.build.ref="${NVIDIA_BUILD_REF}"
86 changes: 76 additions & 10 deletions nemo_curator/core/serve/dynamo/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import importlib.metadata
import json
import tempfile
from functools import reduce
Expand All @@ -24,6 +25,7 @@

import ray
from loguru import logger
from packaging.requirements import Requirement

from nemo_curator.core.serve.base import BaseModelConfig
from nemo_curator.core.serve.dynamo.infra import (
Expand All @@ -50,19 +52,82 @@
from nemo_curator.core.serve.placement import ReplicaBundleSpec


# ai-dynamo[vllm]'s [vllm] extra carries a hard ray pin, but Ray refuses
# actor venvs whose ray version differs from the cluster head's. uv has no
# inline override syntax — only ``--override <file>`` — so we materialize a
# tiny constraints file at a fixed path on every node via
# ``ensure_actor_overrides_on_all_nodes``; the content is derived from the
# driver's ``ray.__version__`` at fan-out time so a future Curator ray bump
# doesn't need a code change here.
# The actor venv ``uv pip install`` needs overrides that pyproject's ``[tool.uv]``
# can't reach (Ray runs it in an empty cwd). uv has no inline override syntax —
# only ``--override <file>`` — so we materialize a constraints file at a fixed path
# on every node via ``ensure_actor_overrides_on_all_nodes``. It carries:
# * ``ray==<driver version>`` — ai-dynamo[vllm]'s [vllm] extra has a hard ray pin,
# but Ray refuses actor venvs whose ray differs from the cluster head's. Derived
# from the driver's ``ray.__version__`` so a future Curator ray bump needs no edit.
# * ``nixl-cu13`` dropped — ai-dynamo[vllm] pulls the CUDA-13 NIXL backend, whose
# eagerly-imported ``nixl_ep_cpp.so`` dlopens libcudart.so.13 (absent on this
# CUDA-12.9 image). The base image excludes it via pyproject, but that override
# doesn't reach this standalone install; re-apply it here so the cu12 backend wins.
_ACTOR_VENV_OVERRIDES_PATH = Path(tempfile.gettempdir()) / "nemo_curator_dynamo_actor_overrides.txt"
_ACTOR_VENV_NIXL_CU13_EXCLUSION = "nixl-cu13 ; sys_platform == 'never'"


def _vllm_cu129_index_url() -> str | None:
"""The vLLM cu129 wheel index for the exact version ai-dynamo[vllm] pins.

ai-dynamo's [vllm] extra pins an exact vllm (e.g. ``==0.22.1``) that may
differ from Curator's base vllm — the base installs ai-dynamo WITHOUT its
[vllm] extra, so its vllm comes from Curator's own pin, while the actor
venv installs ``ai-dynamo[vllm]`` and must honor ai-dynamo's pin. vLLM
publishes a per-version cu129 wheel index at ``wheels.vllm.ai/<v>/cu129``;
pointing at the pinned version means its ``+cu129`` local build sorts above
the default cu130 wheel under unsafe-best-match. Derived from ai-dynamo's
own metadata so a nightly bump (which changes the vllm pin) needs no edit.

Returns None if ai-dynamo (or its vllm pin) can't be found — only happens
when the dynamo backend isn't actually installed, where this is unused.
"""
try:
requirements = importlib.metadata.requires("ai-dynamo") or []
except importlib.metadata.PackageNotFoundError:
return None
for raw in requirements:
req = Requirement(raw)
if req.name == "vllm":
pinned = next((spec.version for spec in req.specifier if spec.operator in ("==", "===")), None)
if pinned:
return f"https://wheels.vllm.ai/{pinned}/cu129"
return None


# Ray builds the actor venv with a bare ``uv pip install`` in an empty cwd, so it
# inherits none of the project's ``[tool.uv]`` index/source/prerelease config — only
# what we pass here. The venv is cloned from the base image, so flags matter only when
# uv fetches or upgrades a dep (e.g. ai-dynamo[vllm] pins a newer vllm than the base).
# Force CUDA 12.9 the way vLLM documents for uv:
# * ``--torch-backend cu129`` routes the torch ecosystem to the cu129 PyTorch index.
# * ``unsafe-best-match`` is REQUIRED so nixl resolves — its needed version is split
# across pypi.nvidia.com and PyPI, which the default first-match strategy can't
# combine. The catch: best-match maximizes version globally and ``+cu130`` sorts
# above ``+cu129`` (PEP 440 local order), so it would grab the cu130 vllm whose
# ``vllm._C`` dlopens libcudart.so.13. We counter that by adding the cu129 wheel
# index for the EXACT vllm version ai-dynamo pins, so ``<v>+cu129`` is the highest
# local build of that version and wins. (No fixed version baked in — see above.)
_ACTOR_VENV_EXTRA_INDEX_URLS = tuple(
url for url in ("https://pypi.nvidia.com", _vllm_cu129_index_url()) if url is not None
)
_ACTOR_VENV_UV_OPTIONS = [
"--override",
str(_ACTOR_VENV_OVERRIDES_PATH),
"--torch-backend",
"cu129",
"--index-strategy",
"unsafe-best-match",
"--prerelease",
"if-necessary-or-explicit",
]
for _extra_index_url in _ACTOR_VENV_EXTRA_INDEX_URLS:
_ACTOR_VENV_UV_OPTIONS += ["--extra-index-url", _extra_index_url]

DYNAMO_VLLM_RUNTIME_ENV: dict[str, Any] = {
"uv": {
"packages": ["ai-dynamo[vllm]"],
"uv_pip_install_options": ["--override", str(_ACTOR_VENV_OVERRIDES_PATH)],
"uv_pip_install_options": _ACTOR_VENV_UV_OPTIONS,
},
"config": {"setup_timeout_seconds": 600},
}
Expand All @@ -78,7 +143,8 @@ def ensure_actor_overrides_on_all_nodes(*, ignore_head_node: bool = False) -> No

The file pins ``ray=={ray.__version__}`` (read from the driver) so the
actor venv keeps the same ray patch as the cluster head — Ray rejects
any mismatch.
any mismatch — and drops ``nixl-cu13`` so the cu12 NIXL backend is used
(see module comment on :data:`_ACTOR_VENV_OVERRIDES_PATH`).

Must run inside an active Ray context, before any worker spawned with
:data:`DYNAMO_VLLM_RUNTIME_ENV` lands. The runtime_env_agent on each
Expand All @@ -91,7 +157,7 @@ def ensure_actor_overrides_on_all_nodes(*, ignore_head_node: bool = False) -> No
run_on_each_node(
_write_actor_overrides_file,
str(_ACTOR_VENV_OVERRIDES_PATH),
f"ray=={ray.__version__}\n",
f"ray=={ray.__version__}\n{_ACTOR_VENV_NIXL_CU13_EXCLUSION}\n",
ignore_head_node=ignore_head_node,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ def weakly_connected_components(self, df: cudf.DataFrame, src_col: str, dst_col:
edge_type_array=None,
num_arrays=1,
store_transposed=False,
symmetrize=False,
# The dedup edge list (from buckets_to_edges) is one-directional while
# graph_properties declares is_symmetric=True. cugraph 26.08 honors symmetrize
# literally, so symmetrize=False left reverse edges absent and dropped borderline
# undirected connections (missed duplicate groups). Symmetrize to match the
# declared undirected graph; drop_multi_edges=True dedupes any now-duplicate edges.
symmetrize=True,
do_expensive_check=False,
drop_multi_edges=True,
)
Expand Down
8 changes: 6 additions & 2 deletions nemo_curator/stages/deduplication/fuzzy/lsh/lsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

import cudf
from loguru import logger
from rapidsmpf.utils.cudf import pylibcudf_to_cudf_dataframe

from nemo_curator.stages.deduplication.fuzzy.utils import CURATOR_DEFAULT_MINHASH_FIELD, CURATOR_LSH_BUCKET_FIELD
from nemo_curator.stages.deduplication.id_generator import CURATOR_DEDUP_ID_STR
from nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler import BulkRapidsMPFShuffler

# rapidsmpf 26.08 removed rapidsmpf.utils.cudf; pylibcudf_to_cudf_dataframe is re-exported by the shuffler module.
from nemo_curator.stages.deduplication.shuffle_utils.rapidsmpf_shuffler import (
BulkRapidsMPFShuffler,
pylibcudf_to_cudf_dataframe,
)

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down
14 changes: 11 additions & 3 deletions nemo_curator/stages/deduplication/semantic/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ def _process_batch_single_pass(self, tasks: list[FileGroupTask], groups: list[li

# Fit the model cooperatively across actors, then predict on local data
concatenated_embeddings = cp.concatenate(embeddings_arrays, axis=0)
self.kmeans._fit(concatenated_embeddings, sample_weight=None, convert_dtype=False, multigpu=True)
# cuml 26.08: the cooperative multi-GPU fit is KMeansMG.fit() — the private
# _fit(multigpu=True) entrypoint was removed. KMeansMG carries _multi_gpu=True,
# so fit() dispatches to the multi-GPU C++ impl using the RAFT comms handle.
self.kmeans.fit(concatenated_embeddings, sample_weight=None, convert_dtype=False)

if self.cache_path is not None and getattr(self, "_actor_index", 0) == 0:
os.makedirs(self.cache_path, exist_ok=True)
Expand Down Expand Up @@ -350,7 +353,8 @@ def _fit_pass(self, groups: list[list[str]]) -> float:
f"(fit_data_fraction={fraction:.4f}, {len(fit_files)}/{len(all_files)} files)"
)

self.kmeans._fit(concatenated_samples, sample_weight=None, convert_dtype=False, multigpu=True)
# cuml 26.08: cooperative multi-GPU fit via KMeansMG.fit() (see setup()).
self.kmeans.fit(concatenated_samples, sample_weight=None, convert_dtype=False)
del concatenated_samples
gc.collect()
# Stop the fit-time clock before centroid I/O so the metric isn't skewed
Expand Down Expand Up @@ -425,7 +429,11 @@ def _predict_write_pass(
return results, pass2_read_time, total_rows

def setup(self, _: WorkerMetadata | None = None) -> None:
from cuml.cluster.kmeans import KMeans as cumlKMeans
# cuml 26.08: the single-GPU KMeans no longer accepts a ``handle`` and the
# private multi-GPU ``_fit(multigpu=True)`` entrypoint was removed. The
# multi-node/multi-GPU estimator is KMeansMG, constructed with the RAFT comms
# handle; its fit() does the cooperative fit (see _process_batch_single_pass).
from cuml.cluster.kmeans_mg import KMeansMG as cumlKMeans

if not hasattr(self, "_raft_handle"):
msg = "RAFT handle not found. Make sure the stage is initialized with RAFT"
Expand Down
Loading
Loading