Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions nemo_curator/core/serve/dynamo/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,11 @@ def _deploy_and_healthcheck(self, server: InferenceServer, backend_cfg: DynamoSe
self._nats_actor = self._start_nats(nats_port)
nats_url = f"nats://{self._infra_ip}:{nats_port}"

base_env = {"ETCD_ENDPOINTS": etcd_endpoint, "NATS_SERVER": nats_url}
base_env = {
**backend_cfg.subprocess_env,
"ETCD_ENDPOINTS": etcd_endpoint,
"NATS_SERVER": nats_url,
}

effective_router_mode, effective_router_kv_events = self._resolve_effective_router(
self._models, backend_cfg.router
Expand Down Expand Up @@ -440,6 +444,16 @@ def _start_nats(self, port: int) -> ManagedSubprocess:
# Frontend
# ------------------------------------------------------------------

def _frontend_router_kwargs(self, router_kwargs: dict[str, Any]) -> dict[str, Any]:
"""Pass ``router_kwargs`` through verbatim as frontend CLI flags.

Callers must set ``dyn_chat_processor`` explicitly when needed — for
example, Nemotron-Parse requires ``dyn_chat_processor="vllm"`` because
the native-Rust processor serializes multimodal content arrays instead
of flattening them, corrupting pass-through chat templates.
"""
return dict(router_kwargs)

def _launch_frontend( # noqa: PLR0913
self,
port: int,
Expand Down Expand Up @@ -489,7 +503,7 @@ def _launch_frontend( # noqa: PLR0913
python_args.extend(["--router-mode", router_mode])
if router_mode == "kv":
python_args.append("--router-kv-events" if router_kv_events else "--no-router-kv-events")
python_args.extend(engine_kwargs_to_cli_flags(router.router_kwargs))
python_args.extend(engine_kwargs_to_cli_flags(self._frontend_router_kwargs(router.router_kwargs)))

logger.info(f"Starting Dynamo frontend on port {port}")
return ManagedSubprocess.spawn(
Expand Down
1 change: 1 addition & 0 deletions nemo_curator/core/serve/dynamo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,4 @@ class DynamoServerConfig(BaseServerConfig):
request_plane: str = DEFAULT_DYNAMO_REQUEST_PLANE
event_plane: str = DEFAULT_DYNAMO_EVENT_PLANE
router: DynamoRouterConfig = field(default_factory=DynamoRouterConfig)
subprocess_env: dict[str, str] = field(default_factory=dict)
4 changes: 4 additions & 0 deletions nemo_curator/core/serve/dynamo/infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,17 @@ def engine_kwargs_to_cli_flags(engine_kwargs: dict[str, Any]) -> list[str]:

Example: ``{"tensor_parallel_size": 4, "enforce_eager": True}``
becomes ``["--tensor-parallel-size", "4", "--enforce-eager"]``.
Boolean ``False`` values are emitted as ``--no-<flag>`` for vLLM
``BooleanOptionalAction`` arguments.
"""
flags: list[str] = []
for key, value in engine_kwargs.items():
flag = "--" + key.replace("_", "-")
if isinstance(value, bool):
if value:
flags.append(flag)
else:
flags.append("--no-" + key.replace("_", "-"))
Comment on lines 118 to +122

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 --no-<flag> emitted for all False booleans, not only BooleanOptionalAction ones

The docstring now states this is for vLLM BooleanOptionalAction arguments, but the code applies the rule to every bool value that is False in any kwargs dict — including dynamo_kwargs, router_kwargs, and future users of this helper. vLLM (and Dynamo) have many flags that use store_true and have no --no-<flag> form; passing an unrecognised --no-* flag will cause the subprocess to exit with an argument-parsing error. There is no guard, validator, or documented list of which flags are safe to set to False.

elif isinstance(value, (dict, list)):
flags.extend([flag, json.dumps(value)])
else:
Expand Down
42 changes: 22 additions & 20 deletions nemo_curator/core/serve/dynamo/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ def aggregated_model_uses_exact_kv_events(
return router_kv_events


def explicit_hybrid_kv_cache_manager_enabled(engine_kwargs: dict[str, Any]) -> bool:
"""True when vLLM should receive ``--no-disable-hybrid-kv-cache-manager``."""
return engine_kwargs.get("disable_hybrid_kv_cache_manager") is False


def build_worker_kv_events_config(
model_config: DynamoVLLMModelConfig,
*,
Expand All @@ -189,13 +194,7 @@ def build_worker_kv_events_config(
port_seed: int,
enabled: bool,
) -> str:
"""JSON blob for ``--kv-events-config``.

Always passed explicitly. Without this, Dynamo's ``args.py`` auto-creates
a ``KVEventsConfig`` bound to ``tcp://*:20080`` when ``prefix_caching`` is
enabled (vLLM >=0.16 default), causing every worker on the same node to
fight over the same port.
"""
"""JSON blob for ``--kv-events-config`` when Curator chooses to pass one."""
template = dict(model_config.kv_events_config)

if not enabled:
Expand Down Expand Up @@ -321,13 +320,17 @@ def _launch_vllm_worker( # noqa: PLR0913
kv_events_enabled = is_rank_zero and aggregated_model_uses_exact_kv_events(
model_config, router_mode=router_mode, router_kv_events=router_kv_events
)
kv_events_config = build_worker_kv_events_config(
model_config,
pg=pg,
bundle_index=node_rank,
port_seed=20080 + replica_index + node_rank,
enabled=kv_events_enabled,
)
kv_events_config = None
# vLLM treats any non-None kv_events_config as incompatible with explicitly
# enabled hybrid KV cache manager, even when enable_kv_cache_events=False.
if kv_events_enabled or not explicit_hybrid_kv_cache_manager_enabled(model_config.engine_kwargs):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this same check in the _launch_disagg_role() as well?

kv_events_config = build_worker_kv_events_config(
model_config,
pg=pg,
bundle_index=node_rank,
port_seed=20080 + replica_index + node_rank,
enabled=kv_events_enabled,
)

python_args: list[str] = [
"-m",
Expand All @@ -351,7 +354,8 @@ def _launch_vllm_worker( # noqa: PLR0913
else:
python_args.append("--headless")

python_args += ["--kv-events-config", kv_events_config]
if kv_events_config is not None:
python_args += ["--kv-events-config", kv_events_config]

if spec.is_multi_node:
assert master_addr is not None, "master_addr must be set for multi-node replicas" # noqa: S101
Expand Down Expand Up @@ -492,11 +496,9 @@ def _launch_disagg_role( # noqa: PLR0913
# Global-enough seed so concurrent workers on one node don't collide.
nixl_port = get_free_port_in_bundle(pg, 0, _DISAGG_NIXL_PORT_SEED + worker_index)

# Always pass an explicit ``--kv-events-config``. Decode workers set
# ``enable_kv_cache_events=False`` — without the flag, Dynamo's
# args.py auto-creates a KVEventsConfig bound to ``tcp://*:20080``
# when ``prefix_caching`` is enabled (vLLM >=0.16 default), causing
# every decode worker on the same node to fight over that port.
# Disaggregated workers pass an explicit ``--kv-events-config`` so
# prefill workers can publish events and decode workers can explicitly
# stay non-publishing.
kv_events_config = build_worker_kv_events_config(
model_config,
pg=pg,
Expand Down
71 changes: 71 additions & 0 deletions tests/core/serve/dynamo/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,60 @@ def _make_backend(backend_cfg: DynamoServerConfig) -> DynamoBackend:
backend._infra_pg = object()
return backend

@staticmethod
def _make_multimodal_backend(backend_cfg: DynamoServerConfig) -> DynamoBackend:
server = InferenceServer(
models=[
DynamoVLLMModelConfig(
model_identifier="nvidia/NVIDIA-Nemotron-Parse-v1.2",
dynamo_kwargs={"enable_multimodal": True},
)
],
backend=backend_cfg,
)
backend = DynamoBackend(server)
backend._runtime_dir = "/tmp/rt" # noqa: S108
backend._actor_name_prefix = "prefix"
backend._infra_pg = object()
return backend

def test_multimodal_without_explicit_processor_uses_native(self, captured_spawn: list[dict[str, Any]]) -> None:
"""router_kwargs are passed through verbatim — no auto-injection for multimodal models."""
backend_cfg = DynamoServerConfig()
backend = self._make_multimodal_backend(backend_cfg)

backend._launch_frontend(port=9999, base_env={}, backend_cfg=backend_cfg)

assert "--dyn-chat-processor" not in captured_spawn[0]["python_args"]

def test_explicit_vllm_chat_processor_is_forwarded(self, captured_spawn: list[dict[str, Any]]) -> None:
"""When dyn_chat_processor is set explicitly it flows through as a CLI flag."""
backend_cfg = DynamoServerConfig(
router=DynamoRouterConfig(
router_kwargs={
"dyn_chat_processor": "vllm",
"chat_template_content_format": "string",
"trust_remote_code": True,
}
),
)
backend = self._make_multimodal_backend(backend_cfg)

backend._launch_frontend(port=9999, base_env={}, backend_cfg=backend_cfg)

python_args = captured_spawn[0]["python_args"]
assert python_args[python_args.index("--dyn-chat-processor") + 1] == "vllm"
assert python_args[python_args.index("--chat-template-content-format") + 1] == "string"
assert "--trust-remote-code" in python_args

def test_text_only_model_keeps_native_chat_processor(self, captured_spawn: list[dict[str, Any]]) -> None:
backend_cfg = DynamoServerConfig()
backend = self._make_backend(backend_cfg)

backend._launch_frontend(port=9999, base_env={}, backend_cfg=backend_cfg)

assert "--dyn-chat-processor" not in captured_spawn[0]["python_args"]

def test_router_flags_and_router_kwargs_passthrough(self, captured_spawn: list[dict[str, Any]]) -> None:
"""``PYTHONHASHSEED=0`` is pinned when ``router-mode`` is set: Dynamo KV
routing relies on a stable prefix-hash across frontend + worker processes."""
Expand Down Expand Up @@ -273,6 +327,23 @@ def test_no_router_mode_omits_flag_and_hashseed(self, captured_spawn: list[dict[
]
assert captured_spawn[0]["subprocess_env"] == {}

def test_tcp_request_plane_and_timeout_env_passthrough(self, captured_spawn: list[dict[str, Any]]) -> None:
backend_cfg = DynamoServerConfig(request_plane="tcp")
backend = self._make_backend(backend_cfg)

backend._launch_frontend(
port=9999,
base_env={"ETCD_ENDPOINTS": "e", "DYN_TCP_REQUEST_TIMEOUT": "180"},
backend_cfg=backend_cfg,
)

python_args = captured_spawn[0]["python_args"]
assert python_args[python_args.index("--request-plane") + 1] == "tcp"
assert captured_spawn[0]["subprocess_env"] == {
"ETCD_ENDPOINTS": "e",
"DYN_TCP_REQUEST_TIMEOUT": "180",
}

def test_kv_mode_without_events_emits_no_router_kv_events(self, captured_spawn: list[dict[str, Any]]) -> None:
backend_cfg = DynamoServerConfig(router=DynamoRouterConfig(mode="kv", kv_events=False))
backend = self._make_backend(backend_cfg)
Expand Down
6 changes: 5 additions & 1 deletion tests/core/serve/dynamo/test_infra.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ def test_dynamo_endpoint(namespace: str, component: str, role: str | None, expec
("engine_kwargs", "expected"),
[
({}, []),
({"enforce_eager": False}, []),
({"enforce_eager": False}, ["--no-enforce-eager"]),
({"enforce_eager": True}, ["--enforce-eager"]),
(
{"disable_hybrid_kv_cache_manager": False},
["--no-disable-hybrid-kv-cache-manager"],
),
(
{"tensor_parallel_size": 4, "max_model_len": 8192},
["--tensor-parallel-size", "4", "--max-model-len", "8192"],
Expand Down
18 changes: 14 additions & 4 deletions tests/core/serve/dynamo/test_vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ def test_single_node_disables_kv_events_by_default(self, captured_spawn: list[di
assert "--headless" not in python_args
assert "--nnodes" not in python_args

def test_explicit_hma_omits_disabled_kv_events_config(self, captured_spawn: list[dict[str, Any]]) -> None:
mc = DynamoVLLMModelConfig(
model_identifier="Qwen/Qwen3-0.6B",
engine_kwargs={"disable_hybrid_kv_cache_manager": False},
num_replicas=1,
)
self._launch(mc, topology=_SINGLE_NODE_1GPU)

python_args = captured_spawn[0]["python_args"]
assert "--no-disable-hybrid-kv-cache-manager" in python_args
assert "--kv-events-config" not in python_args

def test_kv_router_enables_exact_kv_events(self, captured_spawn: list[dict[str, Any]]) -> None:
mc = DynamoVLLMModelConfig(model_identifier="Qwen/Qwen3-0.6B", num_replicas=1)
self._launch(mc, topology=_SINGLE_NODE_1GPU, router_mode="kv", router_kv_events=True)
Expand Down Expand Up @@ -251,10 +263,8 @@ def test_decode_and_prefill_workers_launched(self, captured_spawn: list[dict[str
assert "VLLM_NIXL_SIDE_CHANNEL_PORT" in call["subprocess_env"]
assert call["subprocess_env"]["PYTHONHASHSEED"] == "0"

# Every disagg worker always receives ``--kv-events-config`` — even
# decode, which sets ``enable_kv_cache_events=False`` — so Dynamo's
# args.py does not auto-bind port 20080 and cause per-node
# collisions between decode workers.
# Every disagg worker receives ``--kv-events-config``: prefill publishes
# events, while decode explicitly stays non-publishing.
decode_args = captured_spawn[0]["python_args"]
prefill_args = captured_spawn[1]["python_args"]

Expand Down
Loading