Skip to content

[pull] master from ray-project:master#493

Open
pull[bot] wants to merge 5228 commits into
Marveliu:masterfrom
ray-project:master
Open

[pull] master from ray-project:master#493
pull[bot] wants to merge 5228 commits into
Marveliu:masterfrom
ray-project:master

Conversation

@pull
Copy link
Copy Markdown

@pull pull Bot commented May 28, 2025

See Commits and Changes for more details.


Created by pull[bot] (v2.0.0-alpha.1)

Can you help keep this open source service alive? 💖 Please sponsor : )

@pull pull Bot added the ⤵️ pull label May 28, 2025
XuQianJin-Stars and others added 29 commits May 14, 2026 15:25
… tasks (#63291)

## Why are these changes needed?

Python's asyncio documentation explicitly warns that the event loop only
holds **weak** references to tasks created via `asyncio.create_task` /
`loop.create_task`. A task without any other strong reference can be
garbage collected at **any** time — even before it finishes:

> Important: Save a reference to the result of this function, to avoid a
task disappearing mid-execution. The event loop only keeps weak
references to tasks. A task that isn't referenced elsewhere may be
garbage collected at any time, even before it's done.
>
> —
https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task

Three call sites in the repository were violating this contract (caught
by ruff's `RUF006` rule). All three create background tasks whose return
value is discarded, so the resulting `Task` object is only referenced by
the event loop's weak set and may be GC'd at any moment:

- **`python/ray/_private/async_utils.py`** — `enable_monitor_loop_lag()`
The event-loop lag monitor task could be silently collected, stopping
lag monitoring without warning.

-
**`python/ray/train/v2/_internal/execution/checkpoint/checkpoint_manager.py`**
— `CheckpointManager._notify()`
The notify task wakes up coroutines waiting on `self._condition`. If
it's collected before `notify_all()` runs, listeners may wait forever.

- **`python/ray/data/_internal/planner/plan_udf_map_op.py`** —
`_generate_transform_fn_for_async_map().._execute_transform()`
The `_reorder` background task is responsible for forwarding completed
results from `completed_tasks_queue` into the output queue in
deterministic order. Losing this task to GC would silently break the
entire async map operation.

## What was changed

Each of the three sites now retains a strong reference to the created
task using the pattern recommended by the asyncio docs:

| File | Pattern |
|---|---|
| `async_utils.py` | Module-level `_BACKGROUND_TASKS: Set[asyncio.Task]`
+ `task.add_done_callback(_BACKGROUND_TASKS.discard)` |
| `checkpoint_manager.py` | Per-instance `self._background_tasks: set` +
`add_done_callback(self._background_tasks.discard)` |
| `plan_udf_map_op.py` | Local variable `reorder_task =
asyncio.create_task(_reorder())`, awaited in the `finally` block of
`_execute_transform` (also propagates unexpected exceptions) |

No public API change. No behavior change in the happy path — only
correctness under GC pressure.

## Related issues

N/A (silences the `RUF006` lint warnings for these three files).

## Checks

- [x] I've signed off every commit (DCO).
- [x] `ruff check --select RUF006` passes on the three modified files.
- [x] `python -m py_compile` passes for all three files.
- [ ] I've made sure the tests are passing.

---------

Signed-off-by: forwardxu <forwardxu@apache.org>
## Description
The OOM kill message re-computed the memory threshold at kill time via
`GetMemoryThreshold()`, which under `--enable-resource-isolation` could
read a different cgroup `memory.max` value than at init time (e.g. "max"
instead of digits), silently falling back to 0.95. Fix: have each
monitor pass the threshold it actually fired on through the callback
instead of letting NodeManager guess.

Now the OOM msg should be something like this

```
  Memory on the node (IP: 10.0.0.1, ID: abc123) was 7.20GB / 8.00GB (0.900000);
  OOM kill reason: Memory usage 7728742400B exceeded threshold of 6871947674B (85.9% of 8589934592B total);
  Object store memory usage: [...];
  Ray killed 1 worker(s) based on the killing policy: [...];
```

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
…63322)

## Description

Closes #45302.

Ray 2.50.x fails to register any GPU resources on hosts with NVIDIA
Blackwell-class consumer GPUs (e.g. RTX 5090, driver 570.x):

```
TimeoutError: Placement group creation timed out. Make sure your cluster has enough resources.
Error: No available node types can fulfill resource request {'GPU': 1.0}
```

Two independent bugs interact:

**1. TPU false positive on `/dev/accel*`.** NVIDIA driver 570.x
(Blackwell) creates `/dev/accel/accel0` on the host.
`TPUAcceleratorManager.get_current_node_num_accelerators` uses
`glob.glob("/dev/accel*")` to detect TPU chips and reports `TPU == 1`,
which then steals the resource slot from the NVIDIA detector and `GPU`
is never registered.

Evidence chain on an RTX 5090 host (driver 570.211.01):

| Layer | Sees GPU? | Detail |
|-------|-----------|--------|
| `nvidia-smi` | ✅ | `NVIDIA GeForce RTX 5090, 32607 MiB, 570.211.01` |
| `torch.cuda` | ✅ | `device_count()=1, is_available()=True` |
| Ray `NvidiaGPUAcceleratorManager` (pynvml) | ✅ |
`get_current_node_num_accelerators()=1` |
| Ray `TPUAcceleratorManager` | **false-positive 1** |
`glob("/dev/accel*")` matches NVIDIA device file |
| `ray.cluster_resources()` | **no GPU** | `{'TPU': 1.0, 'CPU': 24.0,
...}` |

Fix: only count `/dev/accel*` as TPU chips when `TPU_ACCELERATOR_TYPE`
is set in the environment. Real TPU VMs (GCE / GKE) always set this env
var (the constant is already defined as
`GKE_TPU_ACCELERATOR_TYPE_ENV_VAR` at the top of `tpu.py`). The
`/dev/vfio/*` fallback for non-GKE TPU hosts is preserved.

**2. NVIDIA GPU name regex captures only `"G"` on consumer cards.**
`NVIDIA_GPU_NAME_PATTERN = re.compile(r"\w+\s+([A-Z0-9]+)")` was
designed for datacenter cards (`"Tesla V100-SXM2-16GB"` → `"V100"`,
`"NVIDIA A100-SXM4-40GB"` → `"A100"`). On a consumer card name like
`"NVIDIA GeForce RTX 5090"` the regex stops at the lowercase `e` in
`GeForce` and captures just `"G"`, producing a useless
`accelerator_type:G` label.

Fix: when the existing regex returns a result of length ≤1, fall back to
a hyphen-joined product name. `"NVIDIA GeForce RTX 5090"` →
`"GeForce-RTX-5090"`. The original `TODO(Alex)` comment noted this exact
concern — this PR addresses it without regressing the Tesla/datacenter
behavior.

### After the fix

```python
>>> ray.cluster_resources()
{'GPU': 1.0,
 'accelerator_type:GeForce-RTX-5090': 1.0,
 'CPU': 24.0,
 ...}
```

## Test plan

```
pytest python/ray/tests/accelerators/test_tpu.py \
       python/ray/tests/accelerators/test_nvidia_gpu.py
# 71 passed locally
```

New/updated cases:
- `test_autodetect_num_tpus_accel_ignored_without_tpu_env` — exercises
the NVIDIA-Blackwell false-positive scenario.
- `test_set_tpu_visible_ids_and_bounds` now sets `TPU_ACCELERATOR_TYPE`
inside its cleared env block (matches real TPU VMs).
- `test_gpu_name_to_accelerator_type` parametrized over `Tesla
V100-SXM2-16GB`, `Tesla K80`, `NVIDIA A100-SXM4-40GB`, `NVIDIA H100 80GB
HBM3`, `NVIDIA GeForce RTX 5090`, `NVIDIA GeForce RTX 4090`, `None`, and
`""`.

## Additional information

This problem will get more common as Blackwell-class hardware (consumer
RTX 5xxx, plus B200 datacenter cards which also ship with driver 570.x)
reaches more users. The same patch has already been validated end-to-end
in an Applied Intuition fork running ray==2.50.1; opening here so the
fix benefits everyone.

Signed-off-by: Micah <micah@applied.co>
…rces (#63306)

#59353 open-sourced the `AutoscalingCoordinator` including a `math.ceil`
block on incoming resource bundles. The block traces back to that
validator was added back in #26626 to fix a *shape* bug (#23166 ). The
literal `int` came from an informal type annotation;
It seems that neither v1 (KV → load_metrics → bin-packing) nor v2 (proto
`map<string, double>`) requires it.

Drop the value-type check in the SDK and remove the `math.ceil`
workaround in Ray Data

---------

Signed-off-by: Lehui Liu <lehui@anyscale.com>
Message should say "exiting immediately..."

Message was wrong to say "existing immediately" per comments and
behavior.

Signed-off-by: George Gensure <werkt@users.noreply.github.com>
## Description
Attempt to fix serve's windows flaky tests.

Most failures raise the same exception from
`python/ray/_private/node.py`
([example](https://buildkite.com/ray-project/postmerge/builds/17496/canvas?jid=019e157b-bea4-4918-8075-86e9396a89e4&tab=output)):
```
The current node timed out during startup...
```

GCS client polls GCS once per second for `raylet_start_wait_time_s`
(default 30 s) and raises if the raylet hasn't registered. The retry log
line `Failed to get node info ... node registration may not be complete
yet before the timeout. Try increase the RAY_raylet_start_wait_time_s
config.` shows up consistently across failed tests.


https://github.com/ray-project/ray/blob/c01dcdefbefa123ba8766e1d90c00588f59026bb/src/ray/gcs_rpc_client/global_state_accessor.cc#L421-L436

The current `--parallelism-per-worker 3` command line argument spawns
three docker containers concurrently within one Windows VM. Each
container runs its own `ray.init()`, launching raylet + GCS + plasma +
dashboard + log-monitor + runtime-env-agent.

**Hypothesis**: Windows pays more per `ray.init()` because it has no
`fork()` (every helper re-imports via spawn + CreateProcess), DLL load
goes through AV scan, and the raylet binary is on a network-mounted
bazel runfiles tree. Linux absorbs 3-way contention in <30 s; Windows
doesn't.

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
When user code breaks out of `iter_batches()` early (`break` in a
for-loop), the dataset was not marked as finished. The streaming
executor's worker thread kept producing blocks that piled up in the
object store, and the executor held resources that could starve other
datasets (e.g. a validation dataset waiting to run).

The inner `_ClosingIterator` only shuts down the executor through its
`__del__`, which is non-deterministic — generator references may linger,
leaving the executor alive long after iteration stops.

Add a `try/finally` around `yield from batch_iterator` in
`DataIterator._iter_batches`, calling a new
`_on_iteration_end(executor)` hook on `StopIteration`, `GeneratorExit`
(early break), and exceptions.

- The default implementation calls `executor.shutdown(force=False)`
(idempotent), covering the local `iter_batches()` path.
- `StreamSplitDataIterator` overrides it to fire
`client_disengaged(epoch, split_idx)` on the remote `SplitCoordinator`,
since its executor lives on the actor and the iterator returns
`executor=None`.

Both run synchronously on the **consumer's thread**, so cleanup happens
the moment the consumer stops pulling.

---------

Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
#62897)

## Summary

When a user passes an fsspec `S3FileSystem` with Okta/STS/profile-based
auth (wrapped as `PyFileSystem(FSSpecHandler(s3fs_fs))`),
`_extract_credentials_from_filesystem` read only static attrs
(`key`/`secret`/`token`, `storage_options`). Those attrs are `None` for
session-backed s3fs — credentials live on `fs.session.get_credentials()`
and may rotate. The function returned `{}`, obstore's `from_url` got no
keys, and obstore silently fell back to its own credential chain (IMDS
on EC2). Under concurrent S3 workloads this manifested as intermittent
`NoCredentialsError` — users never saw a warning, and their Okta
credentials had been silently dropped.

This is the first of two independent PRs addressing related bugs in the
`download` expression pipeline. The matching threaded-path pre-resolve
fix (which avoids an IMDS thundering herd on the PyArrow fallback path,
and is triggered whenever this PR routes an unextractable filesystem to
threaded) is in a separate PR. Either can land first.

## Changes

- **`_extract_credentials_from_filesystem`** now returns
`Optional[Dict]`. `None` signals "route to threaded" so the user's
filesystem stays authoritative. Unrecognized non-`None` filesystems also
return `None` per the new contract.
- **`_frozen_s3fs_credentials`** (new helper) snapshots credentials via
`session.get_credentials().get_frozen_credentials()` for both sync
(botocore) and async (aiobotocore) sessions.
- **`_plan_obstore_routing(fs) -> (use_obstore, fs_kwargs)`** (new
helper) centralizes dispatch at plan time with a one-shot `WARNING` per
filesystem when credentials can't be extracted.
- **`plan_download_op`** wires the routing helper so both the partition
actor choice (`AsyncPartitionActor` vs `PartitionActor`) and the
download function choice (`download_bytes_async` vs
`download_bytes_threaded`) use the same gate.
- **`download_bytes_async`** extracts credentials once in sync context
and threads the kwargs through to `_download_uris_with_obstore`. Fixes a
latent correctness issue where aiobotocore's async `get_credentials`
can't run from inside an already-running event loop.
- **`AsyncPartitionActor`** and **`_download_uris_with_obstore`** both
fail closed with a clear `RuntimeError` when constructed / called with a
filesystem whose credentials can't be statically extracted. Dispatch
should route such filesystems to the PyArrow path; reaching these code
paths anyway indicates a dispatch bug and the guard prevents regressions
back to the silent-drop behavior.

## Test plan

- [x] `pre-commit run` passes (ruff, pydoclint, black, docstyle,
semgrep, import order, mock-method / logger checks).
- [x] New unit tests in `test_obstore_download.py`:
- `TestSessionBackedFsspecCredentials` — sync botocore session,
aiobotocore async session, `_session` fallback for older s3fs,
unresolvable-session → `None`, no-access-key → `None`, `anon=True` skips
the session, static attrs win over session.
- `TestPlanObstoreRouting` — `None` filesystem → obstore, non-S3 fsspec
→ threaded, unextractable fsspec-S3 → threaded with warning, warning
dedup (same FS twice = one warning), extractable fsspec-S3 → obstore,
`AsyncPartitionActor` raises on unextractable creds,
`_download_uris_with_obstore` raises on unextractable creds.
- [ ] End-to-end against moto/minio with Okta-style fsspec
`S3FileSystem` over ~100 URIs (no `NoCredentialsError`, moto sees signed
requests with the session's current keys).

---------

Signed-off-by: xyuzh <xinyzng@gmail.com>
…63335)

Two small comment-only typos: `check that that you set a horizon on
your` becomes `check that you set a horizon on your` in
`rllib/evaluation/collectors/simple_list_collector.py` and
`rllib/evaluation/env_runner_v2.py`. No code change.

Signed-off-by: Mira Sato <275437409+oab24413gmai@users.noreply.github.com>
Co-authored-by: Mira Sato <275437409+oab24413gmai@users.noreply.github.com>
## Description
`linux://python/ray/tests:test_placement_group_3` has been flaking on
postmerge. The failing case is `test_placement_group_status` on the
autoscaler v1 path.

The test waits for autoscaler v1 to reflect a placement group's
reservation in ray status. The v1 monitor writes status to GCS internal
KV once per `AUTOSCALER_UPDATE_INTERVAL_S` = 5s tick.


Our test waits time is only `AUTOSCALER_UPDATE_INTERVAL_S` s and cause
problem:

- `AUTOSCALER_UPDATE_INTERVAL_S` is the interval between ticks, not a
deadline. If the placement group lands just after a tick, the next fresh
status is up to a full interval away. So we need to wait at least 2 *
`AUTOSCALER_UPDATE_INTERVAL_S`.
- We use sleep 5s for autoscaler v1, but when the OS is busy, the actual
sleep may be longer than 5s. To be safe, we wait 3 *
`AUTOSCALER_UPDATE_INTERVAL_S`.



flaky tests(The following link is auto generated in http://go/flaky):
- 3eb5a1d FLAKY [Buildkite :ray: core:
python tests
[g6_s10]](https://buildkite.com/ray-project/postmerge/builds/17471#019e0932-2837-4d11-a82e-8e9d6afcd0a7)
- 8f8cc68 FLAKY [Buildkite :ray: core:
python tests
[g6_s10]](https://buildkite.com/ray-project/postmerge/builds/17466#019e0857-c6d3-4aba-ab4e-d66559b37ca1)
- 35629a8 FLAKY [Buildkite :ray: core:
python tests
[g6_s10]](https://buildkite.com/ray-project/postmerge/builds/17445#019e00d4-11ad-4799-ba6c-1eb168d060bf)
- 86d83e9 FLAKY [Buildkite :ray: core:
python tests
[g6_s10]](https://buildkite.com/ray-project/postmerge/builds/17420#019dfe0d-71fa-4431-90cb-eb196ae2cfd0)
- d155399 FLAKY [Buildkite :ray: core:
python tests
[g6_s10]](https://buildkite.com/ray-project/postmerge/builds/17409#019dfa9c-6cfb-4f5d-92a6-dbe1eaa79cce)

Signed-off-by: yicheng <yicheng@anyscale.com>
Co-authored-by: yicheng <yicheng@anyscale.com>
## Description

Mirror of `custom_resources_per_env_runner`: lets users attach custom
Ray resource requirements to each Learner worker. Plumbed into
`learner_group.py`'s `resources_per_learner` dict so the custom
resources are claimed alongside CPU/GPU when Ray Train schedules
Learners.

---------

Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
## Summary

Follow-up to #62907

`MultiAgentEpisode.get_extra_model_outputs(key=...)` no longer returns
the entire `extra_model_outputs` dict (or crashes) when the agent has
hanging extra model outputs at the requested env step.

When an agent sends an action with extra model outputs (e.g.,
`vf_preds`, `action_dist_inputs`) but hasn't received its next
observation yet, the outputs are cached in
`_hanging_extra_model_outputs_end` as a dict of all keys. When
`get_extra_model_outputs(key="vf_preds", env_steps=True)` is called,
`inf_lookback_buffer` is correctly indexed by the key to the specific
sub-buffer, but `hanging_val` (the cached dict) is **not indexed** — the
entire dict is passed as `_add_last_ts_value` to the buffer's `get()`
method.



The agent_steps path(`_get_data_by_agent_steps`)already does this
indexing at
[L2412-2413](https://github.com/ray-project/ray/blob/a157d4d6f298ec5b8cf022338a9b14fa0a132ec0/rllib/env/multi_agent_episode.py#L2412-L2413);
this PR applies the same fix to the two `env_steps=True` paths, which
manifest differently:
**Non-finalized + list/slice indices**:
`_get_single_agent_data_by_env_step_indices` silently returns the whole
dict instead of the scalar — **data corruption without error**
**Finalized + list/slice indices**: crashes in `tree.map_structure` due
to structure mismatch between dict and scalar
**Single int index**: `_get_single_agent_data_by_index` crashes with
`AttributeError: 'dict' object has no attribute 'lookback'`



<details>
<summary>Before</summary>

```python
# Non-finalized: silent data corruption
episode.get_extra_model_outputs(key="vf_preds", indices=[-1, -2], env_steps=True)
# Returns: {"a0": [{"vf_preds": 0.7, "action_dist_inputs": 1.2}, ...]}
#          ^^^^^^ entire dict instead of scalar 0.7

# Finalized (after to_numpy()): crash
episode.to_numpy()
episode.get_extra_model_outputs(key="vf_preds", indices=[-1, -2], env_steps=True)
# ValueError: The two structures don't have the same nested structure.
# First structure: type=ndarray str=[0.5]
# Second structure: type=dict str={'vf_preds': 0.7, 'action_dist_inputs': 1.2}

# Single int: crash
episode.get_extra_model_outputs(key="vf_preds", indices=-1, env_steps=True)
# AttributeError: 'dict' object has no attribute 'lookback'
```

</details>

<details>
<summary>After</summary>

```python
episode.get_extra_model_outputs(key="vf_preds", indices=[-1, -2], env_steps=True)
# Returns: {"a0": [0.7, 0.5]}

episode.get_extra_model_outputs(key="vf_preds", indices=-1, env_steps=True)
# Returns: {"a0": 0.7}
```

</details>

<details>
<summary>Test results</summary>

```
$ python -m pytest test_multi_agent_episode.py -v -x

test_add_env_reset PASSED [  5%]
test_add_env_step PASSED [ 11%]
test_cut PASSED [ 16%]
test_get_actions PASSED [ 22%]
test_get_extra_model_outputs_hanging_val PASSED [ 27%]
test_get_infos PASSED [ 33%]
test_get_observations PASSED [ 38%]
test_get_return PASSED [ 44%]
test_get_rewards PASSED [ 50%]
test_get_sample_batch PASSED [ 55%]
test_get_state_and_from_state PASSED [ 61%]
test_init PASSED [ 66%]
test_len PASSED [ 72%]
test_other_getters PASSED [ 77%]
test_setters PASSED [ 83%]
test_slice PASSED [ 88%]
test_slice_with_lookback PASSED [ 94%]
test_multi_agent_episode_functionality PASSED [100%]

======================== 18 passed, 3 warnings in 5.08s ========================
```

</details>

## Test plan
- [x] Reproduced silent data corruption (non-finalized, list/slice
indices): returns entire dict instead of scalar
- [x] Reproduced crash (single int index): `AttributeError: 'dict'
object has no attribute 'lookback'`
- [x] Reproduced crash (finalized via `to_numpy()` + list indices):
`ValueError: The two structures don't have the same nested structure`
- [x] Verified fix returns correct scalar values for all index types
- [x] Added regression test `test_get_extra_model_outputs_hanging_val`
to `test_multi_agent_episode.py` covering list, slice, single int, and
agent_steps control paths
- [x] All 18 tests in `test_multi_agent_episode.py` pass

---------

Signed-off-by: Cursx <33718736+Cursx@users.noreply.github.com>
Co-authored-by: Mark Towers <mark.m.towers@gmail.com>
## Description
Run and clear the actor creation callback before actor recreation to
avoid race condition.

A race condition occurs when an actor is successfully created on a
worker, but the worker dies before GCS completes the asynchronous write
of the actor's ALIVE state to storage. While waiting for the storage
write, GCS processes the worker death and clears the actor's address in
memory. When the storage write finally completes, its callback reads the
cleared (Nil) address and sends it to the client, causing a crash.

With the
[suggestion](#59642 (comment))
from @MengjinYan, in RestartActor, before clearing the actor's address
in memory, we check if the actor was already successfully created
(ALIVE) and has pending creation callbacks. If so, we invoke the
callbacks early with the valid address still in memory and the stored
`borrowed_refs`. This ensures the client receives a valid address and
avoids the crash.

## Related issues
Fixes #59642

## Tested
- added a new unit test
- reproduce the issue and verify the fix

Signed-off-by: Yuchen Zhou <yczhou@google.com>
…e_replica` (#63280)

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
…#63352)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
unifying serve test dependencies

- Updating requirements_compiled.txt & requirements_compiled_py3.13.txt
to include serve-test-requirements.txt
- Removing redundant serve test requirements that exist in ray deps

---------

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Adds a release test benchmark for `Dataset.mix()` (introduced in #63168)
that measures mixing throughput and ratio accuracy.

Benchmark design:
- Creates 8 datasets reading ImageNet parquet, each stamped with a
ds_index column
- Mixes with Dataset.mix(), repartitions to 4 * batch_size rows per
block
- Consumes via TorchTrainer to mimic the seen weighting ratio when
ingesting multiple local batches which are split across workers.
- Tracks per-batch mixing ratios per worker, aggregates mean/std across
workers via all_reduce to get the mean and standard deviation across
**global batches.**
- Asserts ratio mean is within 0.05 of target and std < 0.1
- Tests with and without a shuffling step after mixing with
`--num-workers=1` to showcase the effectiveness of shuffling removing
the dependency of mixing quality on the number of workers. [See here for
more
details.](https://docs.ray.io/en/master/data/mixing-data.html#random-mixing)

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…heck_stopped() (#63339)

## Description
Fix a potential `UnboundLocalError` in
`ActorReplicaWrapper.check_stopped()`
(`python/ray/serve/_private/deployment_state.py`).

## Related issues
none

## Additional information
The fix is a single-line defensive initialization. It does not change
any behavior in the normal code path — `stopped` will still be correctly
set to `True` by either the `try` body or the `except ValueError`
handler when those paths execute successfully.

```python
def check_stopped(self) -> bool:
    """Check if the actor has exited."""
    stopped = False  # Ensure 'stopped' is always defined for finally/return
    try:
        handle = ray.get_actor(self._actor_name, namespace=SERVE_NAMESPACE)
        stopped = check_obj_ref_ready_nowait(self._graceful_shutdown_ref)
        ...
    except ValueError:
        stopped = True
    finally:
        if stopped and self._placement_group is not None:
            ...
    return stopped
```

Signed-off-by: chenshi5012 <chenshi5012@163.com>
…3226)

## Description

In `python/ray/serve/schema.py`, the module imports `field` from
`dataclasses` at the top of the file:

    from dataclasses import dataclass, field

and uses it later as a default-factory helper:

    proxies: Dict[str, ProxyStatus] = field(default_factory=dict)
applications: Dict[str, ApplicationStatusOverview] =
field(default_factory=dict)

However, two places inside `DeploymentSchema` use `field` as the name of
a loop / comprehension variable:

- `validate_gang_scheduling_and_num_replicas` — `for field in
["min_replicas", "max_replicas", "initial_replicas"]:`
- `_get_user_configured_option_names` — `{field for field in
self.model_fields_set ...}`

This shadows the module-level `dataclasses.field` inside those function
scopes. It does not cause a runtime bug today (both usages are
self-contained), but it:

- triggers a `pyflakes` warning: *"import 'field' from line 4 shadowed
by loop variable"*,
- makes the code harder to read — the loop variable actually holds a
*field name string*, not a `dataclass` field object,
- is a latent footgun if anyone later adds a `field(...)` call inside
these functions.

This PR renames the two loop variables to `field_name`, which:

- removes the shadowing (and the pyflakes warning),
- more accurately describes what the variable holds,
- is a pure rename with **no behavior change**.

### Diff summary

Minimal: 6 insertions / 6 deletions in a single file.

- for field in ["min_replicas", "max_replicas", "initial_replicas"]:
    -                    val = autoscaling_config.get(field)
+ for field_name in ["min_replicas", "max_replicas",
"initial_replicas"]:
    +                    val = autoscaling_config.get(field_name)
if val is not None and val % gang_config.gang_size != 0:
                             raise ValueError(
- f"autoscaling_config.{field} ({val}) must be a "
+ f"autoscaling_config.{field_name} ({val}) must be a "
f"multiple of gang_size ({gang_config.gang_size})."
                             )

    -        return {
    -            field
    -            for field in self.model_fields_set
    -            if getattr(self, field) is not DEFAULT.VALUE
    -        }
    +        return {
    +            field_name
    +            for field_name in self.model_fields_set
    +            if getattr(self, field_name) is not DEFAULT.VALUE
    +        }

### Verification

- `python -m py_compile python/ray/serve/schema.py` ✅
- `python -m pyflakes python/ray/serve/schema.py` ✅ (no warnings)
- `ruff check python/ray/serve/schema.py` ✅ (All checks passed)
- `black --check python/ray/serve/schema.py` ✅ (would be left unchanged)

## Related issues

None — pure code hygiene / lint cleanup, no user-visible behavior
change.

## Additional information

- No public API changes.
- No tests added: this is a pure rename of two local loop variables;
existing tests cover the surrounding logic unchanged.
- No docs changes needed.

---

#### Checks

- [x] I've signed off every commit (DCO).
- [x] I've run the formatters and linters locally (`ruff`, `black`,
`pyflakes`, `py_compile`).
- [x] Diff is minimal and strictly a rename.
- [ ] Release tests — not applicable (no behavior change).
- [ ] Documentation — not applicable.

---------

Signed-off-by: forwardxu <forwardxu@apache.org>
Fix the following issue:
1. previously when we add `label_selector` support in #58845 , there was
no `resource_requests` in
[`FixedScalingPolicy`](https://github.com/ray-project/ray/blob/master/python/ray/train/v2/_internal/execution/scaling_policy/scaling_policy.py#L111-L137),
which is added in #61703
2. after this change, the normal FixScalingPolicy will also send
autoscaling request for fixedScalingPolicy, but without forwarding the
label selectors, since the AutoscalingCoordinator does not support label
selector field.
3. this caused an issue: `ray autoscaler` sees a bare unlabeled `{"CPU":
N}` demand along side the **labeled** PlacementGroup demand, which can
cause it to scale up the wrong worker group, see a repro in the
additional information.

---------

Signed-off-by: Lehui Liu <lehui@anyscale.com>
…pattern (#62940)

## Description
See #61797

## Related issues
Closes #61797

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: win5923 <ken89@kimo.com>
)

Signed-off-by: Prass, the Nomadic coder <atemysemicolon@gmail.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Why

Turn on DataSource V2 by default for `ray.data.read_parquet` (and
related reads that go through the same path), so the stack that landed
in prior PRs is what users get unless they opt out via
`DataContext.get_current().use_datasource_v2 = False`. Keeping the
default flip in a dedicated change makes revert/revert-cherry-pick
straightforward.

## What

**Default:** `DEFAULT_USE_DATASOURCE_V2 = True` in
`python/ray/data/context.py`.

**Logical plan:** `ReadFiles` no longer uses a bespoke `InitVar` +
`input_op` pattern. It now takes `input_dependencies` like other
operators, which removes the custom `_apply_transform` /
`input_dependency` plumbing and simplifies every `replace(...)` that
used to thread `input_op` through (`read_operator.py`, `read_api.py`,
`limit_pushdown.py`, and unit tests). A `per_block_limit` field is
declared on `ReadFiles` (always `None` here) so inherited `AbstractMap`
machinery can resolve it; V2 limits still go through
`scanner.push_limit` in `LimitPushdownRule`. The read stats metadata key
is aligned with the operator name (`"ReadFiles"`).

**Tests and docs for V2-by-default:** Several tests still assumed V1
behavior or patterns that V2 does not support the same way. Updates are
grouped below. The `read_parquet` docstring example uses chained
`.filter(expr=...)` instead of deprecated `filter=`.

### Test changes (by reason)

**V2 default + deprecated read API**

- `test_parquet.py` — `test_parquet_read_partitioned_with_filter`: stop
`read_parquet(..., filter=pds.field(...))`; use
`read_parquet(...).filter(expr=col(...) == lit(...))`.
- `test_parquet.py` — `test_count_with_filter`: same
(`.filter(expr=col(...) < lit(...))`).
- `test_predicate_pushdown.py` — `test_filter_pushdown_source_and_op`:
remove `read_parquet(..., filter=pc.greater(...))`; chain
`.filter(expr=col("sepal.length") > lit(5.0))` before the
string-expression filter; drop unused `pyarrow.compute` import.

**`include_row_hash` + column projection (V2 ordering)**

- `test_parquet.py` — `test_include_row_hash_with_column_projection` and
`test_include_row_hash_existing_column_with_projection`: replace
`read_parquet(..., columns=[...], include_row_hash=True)` with
`read_parquet(..., include_row_hash=True).select_columns([...,
"row_hash"])` so `row_hash` remains in the schema after projection.

**Empty input path (V2 fails fast)**

- `test_streaming_executor.py` —
`test_execution_callbacks_executor_arg`: write a one-row Parquet file
under the input directory after `makedirs`. V2 raises "no files found"
for an empty directory; V1 could return an empty dataset.

**Nested ARROW-5030 fallback assertion (worker vs driver)**

- `test_parquet.py` —
`test_read_parquet_nested_fallback_triggered_when_filter_references_nested_column`:
remove `patch` + `assert_called()` on
`_get_safe_batch_size_for_nested_types` (called in a Ray worker, so a
driver-side patch never fires). Assert with `_resolve_read_columns(...)`
and `_needs_nested_type_fallback(fragment, read_columns)` on a real
fragment from the fixture.

**Logical `ReadFiles` constructor / graph shape**

- `test_read_files_logical.py`: construct `ReadFiles` with
`input_dependencies=[list_files_op]` instead of
`input_op=list_files_op`.
- `test_read_parquet_v2.py` —
`test_read_parquet_builds_list_files_read_files_chain`: assert
`dag.input_dependencies[0]` is `ListFiles` instead of
`dag.input_dependency`.

## Performance tuning for the V2 reader

Several knobs in the V2 Parquet read path defaulted to conservative
values that left throughput on the table for typical S3+Parquet
workloads — the `read_large_parquet_*` release benchmark in particular
saw underutilized CPU traceable to a mix of sequential fragment reads,
small per-fragment batch readahead, and a tiny per-stream buffer. Bumped
each default and gated every change behind an env var so the prior
behavior is recoverable for memory-sensitive workloads (e.g. wide tensor
columns).

**Concurrent fragment reads** (`file_reader.py`)

- New `RAY_DATA_READ_FILES_NUM_THREADS` env var, default `4`. When `>
1`, `_read_fragment_batches` reads fragments concurrently via
`make_async_gen` instead of the previous serial loop.
- Worker count is capped at runtime by `min(env_default,
len(fragments))` so single-fragment tasks don't spin up extra threads.
- Falls back to the sequential path when
`DataContext.execution_options.preserve_order` is set.
- Uses `preserve_ordering=True` so block ordering is deterministic
across task retries (required for safe block reconstruction).
- Per-worker loop extracted as `_read_fragments_sequential` so the
threaded and sequential paths share the same fragment-read body.

**Batch readahead** (`file_reader.py`)

- `_ARROW_SCANNER_BATCH_READAHEAD` is now
`env_integer("RAY_DATA_ARROW_SCANNER_BATCH_READAHEAD", 8)` (was
hardcoded `1`). The previous value was set to bound peak memory on jumbo
tensor columns; 8 keeps decode pipelined for non-tensor workloads while
remaining tunable down via env var.

**Parquet fragment buffer size** (`parquet_file_reader.py`)

- New `RAY_DATA_PARQUET_FRAGMENT_BUFFER_SIZE` env var, default `8 MiB`.
Passed as `ParquetFragmentScanOptions.buffer_size` so
`use_buffered_stream=True` issues meaningfully sized range requests
against S3 instead of pyarrow's small default (~8 KiB).

## Testing

Ran the updated Parquet, predicate pushdown, streaming executor, and V2
read unit tests in `.venv` (including nested fallback and row-hash cases
touched above). Performance defaults are exercised by the existing V2
read tests; release benchmarks (`read_large_parquet_fixed_size` /
`_autoscaling`) verify the wall-clock improvement on the full S3
dataset.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…DER_KEY) (#63362)

Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
## Description
Changes to fix constructor replica failure tests for both gang
scheduling and normal tests:
Gang Scheduling:
- Added `FailedGangReplicaStore` to use gang_id instead of replica_id .
The old set_if_first / get API is replaced with mark_first_failing_gang
and mark_retry_failing_gang, which let each test precisely control which
gangs fail and which succeed.
- Rewrote `test_partial_constructor_failure` to actually test consistent
failure.
- Update `test_transient_constructor_failure` and
`test_startup_failure_stops_entire_gang` to use the new gang_id based
API

Normal constructor failure tests:
- Rewrote `FailedReplicaStore `to remove replica_id entirely. The old
set_if_first / get API is replaced with a single atomic helper
`should_fail` to prevent race conditions due to concurrent replica
calls.
- The store also takes a boolean `fail_first` argument in its
constructor to differentiate between partial and transient constructor
failure tests and raises accordingly.
- Rewrote `test_partial_constructor_failure` and
`test_transient_constructor_failure` tests to pass the appropriate
argument when calling the store.

## Related issues
Fixes #62829

---------

Signed-off-by: Vaishnavi Panchavati <vaishdho10@gmail.com>
## Description

The Ray Data progress bar shows CPU, GPU, and object store memory usage,
but not logical memory — even though `ExecutionResources` already tracks
it and operators already report it via `current_logical_usage()`. This
makes it hard to monitor memory-constrained workloads. This PR adds
logical memory to both the global resource status line and per-operator
usage strings, displayed conditionally when usage is non-zero (matching
the existing GPU pattern).

## Related issues

None.

## Additional information

The display order is: CPU, memory, GPU, object store. Memory is only
shown when usage > 0, consistent with how GPU is handled.

Example global status line:
```
Active & requested resources: 4/8 CPU, 2.0GiB/8.0GiB memory, 1/2 GPU, 1.0GiB/2.0GiB object store
```

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
## Description
When debugging Ray Data issues, knowing which `RAY_DATA_*` environment
variables are set is essential since they override default behavior.
This adds a debug log of all `RAY_DATA_*` env vars (plus
`RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION`) at the start of every
`StreamingExecutor.execute()` call.

## Related issues
None.

## Additional information
The utility function `_log_ray_data_env_vars()` is guarded by
`logger.isEnabledFor(logging.DEBUG)` so there is zero overhead when
debug logging is disabled.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
## Description
Several Serve APIs have been marked `[EXPERIMENTAL]` or
`@PublicAPI(stability="alpha")` long after they were adopted in
production. This PR promotes them to stable so the public surface
matches actual usage.

Promoted to stable: 
- `target_capacity` (in `ServeDeploySchema`, `ServeStatus`,
`ServeInstanceDetails`)
- `num_replicas="auto"`
- `status_trigger` / `DeploymentStatusTrigger`
- `gRPCOptionsSchema` and `gRPCOptions`
- `max_queued_requests`
- `AutoscalingPolicy`, `AutoscalingContext` (and the default
`replica_queue_length_autoscaling_policy`)

## Related issues
Closes #62727

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: win5923 <ken89@kimo.com>
Co-authored-by: Abrar Sheikh <abrar@anyscale.com>
harshit-anyscale and others added 30 commits June 2, 2026 23:27
…gle reload (#63623)

## Summary

Carved out of #63308 so the coalescing change can land independently of
the stderr-redirect (#63621) and redispatch (#63622) changes in that PR.

Under autoscaling churn the controller's `target_groups` broadcast fires
frequently — its replica set changes on essentially every reconcile that
adds or removes a replica — and each broadcast triggers its own config
regeneration and graceful reload via `-sf`. Both `target_groups` and
`fallback_targets` are emitted from the same control-loop step, so when
both change in one ~100 ms tick they reach the proxy microseconds apart
and (pre-coalescing) cause two back-to-back reloads.

Both broadcasts are already change-gated on the controller
(`broadcast_*_if_changed`), so this PR collapses genuine consecutive /
same-tick changes into one reload — it is not suppressing redundant
no-op broadcasts. (`fallback_targets` in particular is near-static in
steady state; it only changes when the fallback proxy's health flips or
its actor is replaced.)

This PR collapses adjacent broadcasts into a single reload:

- `HAProxyManager` keeps two pieces of state: `_update_pending: bool`
and `_coalesce_task: Optional[asyncio.Task]`.
- Each incoming broadcast sets `_update_pending = True` and arms (or
re-arms) a single sleeping coalesce task.
- The task sleeps for `RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S` (default
100 ms; set to 0 to disable) and then runs one
`_update_haproxy_backends()` call against whatever the latest state is.
- Broadcasts arriving inside the window are absorbed — they flip the
dirty flag, but the task is already armed, so no second reload is
scheduled.

**Failure handling.** If the apply fails, the task re-arms itself and
retries on the next tick. After 3 consecutive failures it stops
re-arming and waits for the next broadcast to trigger fresh state — this
prevents busy-spinning against a persistent error (e.g. HAProxy crashed
and is stuck restarting) while still recovering automatically once a new
broadcast lands.

**Knob.** `RAY_SERVE_HAPROXY_BROADCAST_COALESCE_S` (env var, float
seconds). 100 ms by default — small enough that the worst-case extra
latency from a single broadcast is bounded, large enough to catch the
common back-to-back pattern observed during scale events. Set to `0` to
opt out entirely.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…ation, node-size clamp, and column pruning (#63809)

## Description

The hash-shuffle/join/aggregate operators size each aggregator's
`memory` reservation from the estimated input dataset size. That
estimate was both **missing** in some plans (causing under-reservation →
OOM) and **over-counted** in others (causing over-reservation →
unschedulable). This PR makes the estimate available, accurate, and
bounded.

### Triggering change

This was surfaced by **#63384 ([Data] Remove column renaming from the
read stage)**. Before that PR the read stage performed column renames
inline, so a TPCH plan looked like `ReadParquet → Join` with no
intervening op. #63384 moved renames out of the read into a separate
`Project` (of `AliasExpr`s) above the read, so the plan became
`ReadParquet → Project → Join`. That exposed a latent gap:
`Project.infer_metadata` returned all-`None`, so once a `Project` sat
between a read and a join/aggregate the size estimate disappeared and
the aggregator fell back to a fixed default reservation — which
under-provisioned the TPCH Q3 autoscaling release test and OOM-killed
it. The changes below fix that latent gap and the issues uncovered while
fixing it.

### 1. Propagate metadata through one-to-one ops (fixes join OOM)

With a rename `Project` now between the read and the join, the join's
input op was a `Project`, which inherited the base all-`None`
`infer_metadata`. So `_try_estimate_output_bytes` saw `size_bytes=None`
and the `HashShuffleAggregator` silently fell back to a fixed default
(`DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`, 1 GiB/aggregator)
instead of a size-derived reservation (~5.3 GiB/aggregator for TPCH Q3
SF100). On an autoscaling cluster this under-provisions the aggregators
and the worker is OOM-killed mid-join (`ActorDiedError` / `SYSTEM_ERROR:
Worker connection closed unexpectedly`).

`AbstractOneToOne.infer_metadata` now propagates
`num_rows`/`size_bytes`/`input_files` from its single input, gated on
`can_modify_num_rows` (row-preserving ops like `Project` propagate;
`Filter`/`FlatMap` fall back to `None`). `Download` overrides this to
report `size_bytes=None`, since it appends blob columns and the input
size would be a misleading under-estimate.

### 2. Prune unused columns before hash-shuffle aggregations

An `Aggregate` only reads its group keys and each aggregation's target
column, but nothing pruned the rest before the shuffle — so wide unused
columns (e.g. a string column carried through by `with_column`) were
dragged through the aggregator, inflating both its memory reservation
and the bytes shuffled. `ProjectionPushdown` now inserts a pruning
`Project` below an `Aggregate` keeping only the consumed columns (keys +
aggregation targets); the existing fuse/push steps carry the narrowed
set into the read. It only fires when the input schema is known and has
extra columns (keeping the fixed-point optimizer idempotent), and leaves
generic `AggregateFn`s (unknown columns) untouched.

### 3. Clamp the aggregator reservation to the largest node (safety net)

With estimates now firing more often, a low-partition shuffle over a
large dataset (e.g. a global aggregation with `num_partitions=1`) could
reserve ~2× the whole dataset as a single aggregator's `memory` request
— exceeding any node and making the actor unschedulable
(`ActorUnschedulableError`). The per-aggregator reservation is now
clamped to the largest single node's memory: aggregators hold shuffle
data in the (spillable) object store, so reserving more heap `memory`
than a node has only makes the actor unschedulable, not faster. The
clamp only triggers when the estimate exceeds node capacity (well-sized
reservations are unaffected) and logs a warning when it engages.

## Verification

- Join estimate restored through a plan with a `Project` between the
read and the join (was `None` → 1 GiB fallback).
- Q6-shaped global aggregation: read is pruned to the consumed columns;
end-to-end result matches a pandas ground truth.
- Clamp engages only when the estimate exceeds node memory and the run
completes via spilling.
- Correctness validated across combos (filters, group-by,
multi-aggregation, post-shuffle join→aggregate, computed columns) by
comparing pruned vs unpruned pipelines.
- New parameterized tests in `test_projection_fusion.py` (aggregate
input pruning) and existing suites pass (`test_projection_fusion`,
`test_predicate_pushdown`, `test_execution_optimizer_basic`,
`test_infer_schema`, aggregation tests).

## Related issues

Surfaced by #63384 (column renames moved out of the read stage into a
`Project`).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
## Description

Adds a `build.jobs.post_checkout` block to `.readthedocs.yaml` that
exits with code 183 on PR ("external") builds when the diff doesn't
touch any files that affect documentation output. Tag and branch builds
(master, stable, etc.) always run.

## Why

Today, every PR to `ray-project/ray` triggers a full Sphinx build on
Read the Docs, regardless of what the PR touches. A PR that only changes
C++, Java, Bazel, release tooling, or CI config produces the same docs
build as a PR that rewrites `doc/`.

When several PRs land in quick succession, this overflows the project's
concurrent build slots on RTD. RTD's `finish_inactive_builds` periodic
task then marks the older queued builds as `FAILED` with the message
*"This build was terminated due to inactivity"* (see
[readthedocs.org#4386](readthedocs/readthedocs.org#4386)
— the name is misleading; the build isn't inactive, it just couldn't get
a worker within ~1080 seconds). Contributors then see red checks on
their PRs that have nothing to do with their changes.

[Auto-cancellation of older builds for the same
PR](https://blog.readthedocs.com/cancel-old-builds/) has been on by
default in RTD since October 2022, but that only helps when you push
twice to one PR — not across multiple open PRs. This change complements
that by reducing the load from cross-PR concurrency.

## What the filter does

On every PR build, it computes the diff against `origin/master` and
exits 183 — RTD's [skip-build
sentinel](https://docs.readthedocs.com/platform/stable/guides/build/skip-build.html)
— only when **none** of the changed files match:

- `doc/` — Sphinx sources, BUILD files, doc-only requirements.
- `python/ray/` — anything `autodoc` could pull (kept intentionally
broad).
- `rllib/` — same.
- `.readthedocs.yaml` — this config itself, so changes to it always
test.

If even one file falls in the allow-list, the build runs as before. If
the diff can't be computed (shallow clone, fetch failure), the build
runs.

## What this skips in practice

PRs that don't touch any of the four paths above. Concrete examples:

- C++ / Cython changes under `src/`, `cpp/`.
- Java changes under `java/`.
- Bazel changes under `bazel/`, top-level `BUILD.bazel`, `WORKSPACE`.
- CI changes under `ci/`, `.buildkite/` (except
`.buildkite/doc.rayci.yml` which isn't read by Sphinx but is excluded
from the allow-list since it doesn't change RTD output).
- Release tooling under `release/`.
- Docker changes under `docker/`.
- Top-level scripts and configs.

## What still triggers a build

- All content under `doc/`.
- Any `python/ray/**` change, because `autodoc` could pick it up.
- Any `rllib/**` change, same reason.
- The RTD config itself.

The filter is deliberately conservative — broad inclusion, narrow
exclusion. Better to do an unnecessary build than to miss a doc change.

## Test plan

- Manual trace of the bash logic against representative paths: ✅ passes
(Ray serve, RLlib, doc content, `.readthedocs.yaml`, mixed PRs all
trigger BUILD; Java+C++, Docker, release, `python/setup.py`, empty diff
all SKIP).
- YAML syntax validated locally with `yaml.safe_load`.
- This PR itself touches `.readthedocs.yaml`, so it will exercise the
BUILD path on the RTD preview.

## Future work

If queue pressure persists after this lands, two further levers:

1. Narrow the `python/ray/**` allow-list to exclude paths that don't
appear in any `autodoc` directive (`python/ray/tests/`,
`python/ray/_private/` candidates). Higher maintenance cost.
2. File an RTD support request to ask about the project's
concurrent-build limit on the current plan.

## Related issues

None linked.

---------

Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
## Why

`set_extra_model_outputs` currently asserts that the provided key
already exists before calling `InfiniteLookbackBuffer.set`. The
docstrings for `SingleAgentEpisode` and `MultiAgentEpisode` also said
the method could insert a new key, which made the behavior ambiguous.

This updates the docstrings to describe the current overwrite-only
behavior.

Closes #63217

## Tests

Not run. Documentation-only change.

Signed-off-by: GoparapukethaN <goparapukethan01@gmail.com>
## Summary

- `safe_round` in `ExecutionResources.__init__` was the largest leaf in
the scheduler thread (~8% of wall time in the worker_scaling_5000_tasks
release test); every
`add`/`subtract`/`max`/`min`/`copy`/`scale`/`zero`/`inf`/`for_limits`
paid four rounds per construction.
- Drop rounding from `__init__`; store values at native precision. The
boundary that feeds Ray Core (`to_resource_dict()`) now does the
quantization (5 digits for cpu/gpu, integer bytes for memory).
- `__eq__` / `__hash__` / `is_zero` / `is_non_negative` /
`satisfies_limit` quantize on access so accumulated float drift (~1e-15
per cpu/gpu op, up to ~1e-4 per memory op on byte-magnitude floats)
doesn't flip results.

Microbenchmark (1M iterations of `add + subtract + max + min`): **22.9µs
→ 7.9µs per loop (~2.9×)**.

## Scheduling-loop benchmark

Scheduling-loop duration from the worker-scaling release test, master vs
this PR, across actor/task counts (seconds):

| Row Name | p50_scheduling_loop_duration_s |
p90_scheduling_loop_duration_s | max_scheduling_loop_duration_s |
| :--- | :--- | :--- | :--- |
| master_500_actors | 0.612625923 | 0.665133368 | 0.722738731 |
| remove_safe_round_500_actors | 0.601569453 | 0.640638011 | 0.740870783
|
| master_500_tasks | 0.130088642 | 1.37959566 | 2.648603352 |
| remove_safe_round_500_tasks | 0.110020964 | 1.160841493 | 2.169771337
|
| master_1000_actors | 1.305331189 | 1.427004707 | 1.558304391 |
| remove_safe_round_1000_actors | 1.213747345 | 1.375785287 |
1.496900667 |
| master_1000_tasks | 0.196409489 | 2.933294435 | 5.940470687 |
| remove_safe_round_1000_tasks | 0.199019694 | 2.894881022 | 6.349671409
|
| master_2000_actors | 2.635618952 | 2.854903978 | 3.065606266 |
| remove_safe_round_2000_actors | 2.431840577 | 2.738552787 |
2.981410969 |
| master_2000_tasks | 6.467886639 | 6.96452463 | 15.59612749 |
| remove_safe_round_2000_tasks | 4.14211371 | 6.906086244 | 14.25746993
|
| master_5000_actors | 7.535207658 | 8.594098917 | 8.973420257 |
| remove_safe_round_5000_actors | 6.220295621 | 6.859902615 |
7.413166343 |
| master_5000_tasks | 11.25641787 | 17.33123665 | 32.68019135 |
| remove_safe_round_5000_tasks | 9.844983811 | 13.24368429 | 30.86245854
|

p50/p90 improve consistently at scale — e.g. 5000_actors p50 7.54s →
6.22s (~17%) and p90 8.59s → 6.86s (~20%); 2000_tasks p50 6.47s → 4.14s;
5000_tasks p90 17.33s → 13.24s.

---------

Signed-off-by: xgui <xgui@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
send a burst of n requests and ensure n-1 are backpressured instead of
relying on specific requests backpressuring

---------

Signed-off-by: akyang-anyscale <alexyang@anyscale.com>
## Description
This pr is to improve hash partition performance when table contains
pandas can't handle types by moving retrieve table columns operations
out of loop.
Use the following script to verify:
```
import time
import pyarrow as pa
from ray.data._internal.arrow_ops.transform_pyarrow import hash_partition

idx = list(range(50000000))
ints = [[i]for i in range(50000000)]
t = pa.Table.from_pydict(
   {
       "idx": pa.array(idx),
       "ints": pa.array(ints),
   }
)

start = time.time()
hash_partition(t, hash_cols=["idx", "ints"], num_partitions=10)
end = time.time()
print(end - start)
```
The test result is:

|CPU spec|Code Version|Time consumed|
| --- | --- | --- |
|Apple M4| original|66s|
|Apple M4| optimized|18s|

## Related issues
> Link related issues: "Closes #62550", Cant' reopen after force push,
recreate new pr.

---------

Signed-off-by: yifan.xie <xyfabcd@163.com>
Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
… sorting (#61904)

This PR make Concat Aggregation can uses polars for the internal sort
implementation if available.

---------

Signed-off-by: yifan.xie <xyfabcd@163.com>
Co-authored-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
## Description

The old stack (for example the loss defined in our APPO TF policy) had
importance sampling metrics to track off-policyness.
This PR introduces the same two metrics to the new stack.

---------

Signed-off-by: Artur Niederfahrenhorst <artur@anyscale.com>
Signed-off-by: Artur Niederfahrenhorst <attaismyname@googlemail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
## Description
This PR updates Ray’s Starlette dependency to require starlette>=1.0.1
in order to address CVE-2026-48710.

Starlette versions prior to 1.0.1 are affected by a Host header
validation issue where malformed Host headers can cause request.url
reconstruction to differ from the actual routed request path. This can
impact path-based auth or middleware logic in ASGI applications.

Importantly, as a byproduct this required updating:
1. `fastapi>=0.133.0` as this is the first to support `starlette>=1.0.0`
2. `prometheus-fastapi-instrumentator>=8.0.0` which is imported through
`vllm`, as the first to support `starlette>=1.0.0`
3. `gradio>=6.x` as the first to support `starlette>=1.0.0`
(`gradio-client` updated to `2.5.0`)
4. `release/requirements_compile.txt` is updated

# Related PR
#63715

---------

Signed-off-by: Mark Towers <mark@anyscale.com>
Signed-off-by: pseudo-rnd-thoughts <mark.m.towers@gmail.com>
Co-authored-by: Mark Towers <mark@anyscale.com>
The `read_large_parquet` frequently OOMs. 

This isn't totally unexpected -- Parquet reads allocate lots of memory
because of apache/arrow#39808, and the current
implementation of Ray Data can't guarantee memory safety unless you
specify the heap memory required for high-memory operations.

Since this is a case where we expect the system to need a hint, I've
updated the release test to specify 3.4 GiB of heap memory for reads.
This was the maximum heap usage per task I observed in the latest
release test run.

I've also re-introduced the ratchet that fails the release test on OOMs.
If the system upholds its guarantee that it doesn't OOM when you specify
the required heap memory, this shouldn't fail anymore.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
In #63490, I added a flag to
`DataContext` called `isolate_read_workers`.

In this PR, I'm adding documentation to `read_parquet` describing how to
use the flag.

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…3825)

## Why

#63586 added a `DistributionTracker` to `Timer` for percentile tracking.
`DistributionTracker` is not JSON-serializable, which broke the
`training-ingest-benchmark` release test
(`image_classification.full_training`, s3_url + parquet):

```
TypeError: Object of type DistributionTracker is not JSON serializable
```

The benchmark checkpoints each `Timer` via `v.__dict__.copy()` +
`json.dump`, and `__dict__` now carries the tracker. The tracker's
`__getstate__`/`__setstate__` only cover **pickle**, not `json`, so they
don't help here.

## Fix

- Add `Timer.as_dict()` / `Timer.from_dict()` that round-trip only the
scalar fields (`_total`, `_min`, `_max`, `_total_count`).
`_distribution` is excluded — the KLL sketch isn't reconstructable from
summary stats and isn't meant to persist across checkpoints, so it
restarts empty on restore (same graceful degradation as when
`datasketches` isn't installed).
- Switch the benchmark runner to use these instead of poking `__dict__`.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: xgui <xgui@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…put schema is known (#63813)

## Change
`drop_columns` reshapes into `self.select_columns(keep_cols)` when the
input op's `infer_schema()` returns a `pa.Schema`, keeping the typed
schema chain intact so `Dataset.schema()` resolves without a `limit(1)`
execution. Missing columns raise `KeyError` eagerly at the call site on
the typed path.

When the input schema is opaque (UDF chain, `PandasBlockSchema` source)
or all columns are dropped (avoids the internal __bsp_stub placeholder
that `select_columns([])` inserts), falls back to the existing
`MapBatches` path.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…INARY_PATH (#63829)

## Why

`RAY_SERVE_EXPERIMENTAL_PIP_HAPROXY=1` is supposed to make Ray Serve use
the HAProxy binary bundled in the `ray-haproxy` PyPI package. In
practice it was a **no-op** whenever the runtime image sets
`RAY_SERVE_HAPROXY_BINARY_PATH` — which the Ray images do
(`docker/base-deps/Dockerfile` sets
`RAY_SERVE_HAPROXY_BINARY_PATH=/usr/local/bin/haproxy`).

`_resolve_haproxy_binary()` honored the explicit-path override *before*
trying the bundled wheel, so with the flag on it returned
`/usr/local/bin/haproxy` and never reached the `ray_haproxy` package. As
a result the `*.aws.pip_haproxy` nightly release variants
(`serve_controller_benchmark_haproxy`,
`pytest_serve_autoscaling_load_test`) silently benchmarked the image's
system build, not the pip binary they were meant to validate.

Confirmed from a `pip_haproxy` release run's proxy logs: every proxy
logged `Using HAProxy binary: /usr/local/bin/haproxy`; the wheel path
never appeared.

## What

- Resolve the bundled `ray-haproxy` binary **first**, so the flag takes
precedence over an image-baked `RAY_SERVE_HAPROXY_BINARY_PATH`.
- The explicit-path override and system-PATH fallback are unchanged, and
still apply when the `ray-haproxy` package is unavailable.
- Flag-off (default) behavior is unchanged: return
`RAY_SERVE_HAPROXY_BINARY_PATH` verbatim.

No change needed in `release_tests.yaml`: the existing `pip_haproxy`
variants now actually exercise the pip binary.

## Testing

Existing `python/ray/serve/tests/unit/test_haproxy_binary.py` suite
passes against the new ordering (4 tests). Docstring updated to reflect
the resolution order.

---------

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
## Why are these changes needed?

In the non-compiled DAG path, calling `DAGNode.execute()` executes
`FunctionNode.execute()`, which dynamically defines a new remote
function via `ray.remote(self._body)` on every execution. This exports
new metadata to the GCS KV store on every run, leading to an unbounded
memory leak (GCS KV leak) during the job lifetime (#63666).

Since the non-compiled DAG execution path is not recommended for
production and is a primary source of this leak, we are deprecating
`DAGNode.execute()` in favor of the compiled DAG API.

This PR adds a `DeprecationWarning` to `DAGNode.execute()` to warn users
that it is deprecated and will be removed in a future release.

## Related issue number
Closes #63666

---------

Signed-off-by: Vũ Trần Phúc <Vuphuccc@gmail.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…n` to reduce untracked buffered batches + reduce prefetch onto GPU (#63660)

- `iter_batches` uses `make_async_gen(ref_bundle_iterator,
num_workers=1)` to decouple the batching pipeline from the consumer
thread. With a single worker, the multi-worker machinery (filling
worker, per-worker input/output queues, round-robin draining) is
unnecessary — it just adds complexity and hidden buffering.
- With `buffer_size=prefetch_batches` (added in #58657),
`make_async_gen` creates an input queue (capacity `prefetch_batches +
1`) and an output queue (capacity `prefetch_batches`), buffering up to
`~2 * prefetch_batches` items that are invisible to the resource
manager's memory accounting.
- **BEHAVIOR CHANGE AFTER THIS PR**: The outer make_async_gen output
buffer was a queue of **GPU batches** max size prefetch_batches. This
means that you can have up to prefetch_batches + 1 (the working batch)
on GPU memory. This happens implicitly and is not good default behavior,
since users expect their entire GRAM to be usable for model params,
grads, optimizer states, and the current batch and associated
activations. Prefetching too many batches into GPU forces can be
silently hurting user GPU utilization and throughput by forcing them to
reduce their batch size. This PR bounds the number of prefetched GPU
batches to 1 which has parity with the current defaults. A follow-up PR
will introduce a configuration option to choose how many batches to be
prefetched to the GPU at a time.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…e `make_async_gen` with `iter_threaded` (#63682)

Replaces the inner format/collate `make_async_gen` with `iter_threaded`
from #63660, cutting untracked object store memory pinned batches from
~16 to ~8 (2× reduction).

- `_format_in_threadpool` runs format + collate across a threadpool via
`make_async_gen(num_workers=min(4, prefetch_batches),
preserve_ordering=False)`. With the default `buffer_size=1`, this
allocates one shared input queue of size `(buffer_size + 1) *
num_workers` and `num_workers` per-worker output queues of size
`buffer_size` — for `num_workers=4`, that is **8 (input) + 4 (in-flight
in workers) + 4 (output) ≈ 16** batches buffered inside the threadpool,
none of which are visible to the resource manager.
- These buffered batches are pre-format `pa.Table.slice()` views that
pin their **full** source blocks in the object store (`pa.Table.slice`
is zero-copy and references the entire underlying buffer). They keep
blocks pinned in shared memory even after the distributed reference
counter considers them out of scope, which is the accounting decoupling
that contributes to streaming-split underestimation and spilling.
- Replace with `iter_threaded(..., num_workers=num_threadpool_workers,
output_buffer_size=num_threadpool_workers)` from PR 1 (generalized in
this stack to take a required `fn` and `num_workers`). Workers share
`batch_iter` under a lock and funnel results through a single bounded
queue sized to match the worker count — enough depth to keep workers
from blocking on each other's `put()` when collate is non-trivial.
In-flight is now bounded to **~2 × num_workers ≈ 8** (workers + shared
output buffer) — roughly a 2× reduction in untracked pinned batches.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
## Description
`hash_partition` previously did three expensive things in sequence:
N = `num_partition`  |  R = `num_rows`
1. Built per-partition index arrays via `N × np.where(part_ids == p)` —
O(N · R) scans
2. Defragmented the input via `try_combine_chunked_columns(table)` — a
full-table copy
3. Ran `N` independent `table.take(indices[p])` calls

This change replaces all three with:
1. `pyarrow.compute.sort_indices(partition_ids)` — radix sort on
integers, one O(R) pass
2. One `take_table(table, sort_indices)` on the original (possibly
chunked) input
3. `N` zero-copy `Table.slice()` calls

The `N` takes together form a permutation of the table, so consolidating
them into one sort + N zero-copy slices is equivalent and strictly
cheaper (fixed take overhead paid once instead of N times). The defrag
copy can also be removed: the original Arrow problem
(apache/arrow#35126) is that every `take` on a chunked table internally
concatenates all chunks first, so `try_combine_chunked_columns` exists
to pay that concat once externally and let the subsequent N takes use
the fast path. By calling `take` only once, the internal concat happens
just once anyway — the external defrag becomes redundant. And because
the take output already arranges each partition's rows contiguously, we
can carve out the N partitions with zero-copy slices instead of
materializing a second copy — which would be another 1GB for a 1GB
input.
## Benchmark: 1GB block → 1000 partitions

Single thread, PyArrow 23.0.1. `K` = number of chunks in the input
table; `K=256` mirrors realistic multi-chunk input.

| Block shape | K | Time before | Time after | Speedup | Peak Arrow
before | Peak Arrow after |
|---|---|---|---|---|---|---|
| 16M rows × 8 int64 | 1 | 6542 ms | **939 ms** | **6.96×** | 1024 MB
(no copy needed) | 1152 MB |
| 16M rows × 8 int64 | 256 | 6887 ms | **1057 ms** | **6.51×** | 2048 MB
| **1280 MB** |
| 8M rows × 16 int64 | 1 | 4818 ms | **825 ms** | **5.84×** | 1024 MB
(no copy needed) | 1088 MB |
| 8M rows × 16 int64 | 256 | 5086 ms | **982 ms** | **5.18×** | 2048 MB
| **1152 MB** |
| 2M rows × 64 int64 | 1 | 2468 ms | **322 ms** | **7.66×** | 1026 MB
(no copy needed) | 1040 MB |
| 2M rows × 64 int64 | 256 | 2216 ms | **369 ms** | **6.01×** | 2050 MB
| **1056 MB** |

- **Throughput**: 5–8× faster across all shapes.
- **Peak Arrow allocation** on chunked inputs (K=256): ~2.0 GB → ~1.1 GB
(~40% reduction) — the input no longer has to coexist with a
defragmented copy.


| | Before | After | Speedup |
|---|---|---|---|
| `aggregate_groups` (84 groups, mean) | 61 s | **40 s** | **1.53×** |

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
## Description
Add index-based field access for struct namespace operations in Ray
Data.

### Changes
- Add `struct.field_by_index(field_index: int)` in
`python/ray/data/namespace_expressions/struct_namespace.py`
- Extend `struct.__getitem__` to accept both:
  - `str` (existing behavior): `col("s").struct["field"]`
  - `int` (new): `col("s").struct[0]`
- Keep type inference for Arrow-backed struct dtypes
- Add key-type validation for `.struct[...]`
- Update `Expr.struct` examples in `python/ray/data/expressions.py` to
show index-based usage

## Related issues
Related to #58674

## Additional information
- `python -m pytest -v --doctest-modules
python/ray/data/namespace_expressions/struct_namespace.py`
- `python -m pytest -v
python/ray/data/tests/expressions/test_namespace_struct.py`
- `python -m pytest -v --doctest-modules python/ray/data/expressions.py
-k 'struct and Expr'`

---------

Signed-off-by: wanadzhar913 <adzhar.faiq@gmail.com>
…dicates (#63781)

## Description
Fixes a bug where applying a UDF predicate on top of a datasource read
results in a `TypeError` during the creation of a logical plan. This
exception occurs because the optimizer attempts to push down a predicate
which cannot be converted to a PyArrow expression.

The most straightforward solution is to check if the expression is
PyArrow convertible before performing the pushdown.

## Related issues
Fixes #63761

---------

Signed-off-by: Lukas Hering <lukasheringb@gmail.com>
Signed-off-by: Lukas Hering <40302054+herin049@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Previously, when placement groups were removed we made an RPC for each
bundle to `CancelResourceReserve`. This RPC handler was only called when
removing a placement group and the handler assumed that the entire
placement group had been removed (it canceled all leases for the
placement group).

This PR makes a per-node batched request to cancel all bundles and
remove the placement group instead. I've also updated the naming of the
RPC for clarity.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
## Description

The release test `llm_serve_llama_3dot1_8B_quantized_tp1_2p6d` turned
flaky once Ray Serve LLM switched vLLM to the **`RayExecutorV2`**
backend. There are two independent bugs:

1. **GPU collision**: workers initialize CUDA before `RayExecutorV2`
sets `CUDA_VISIBLE_DEVICES`, so every colocated TP1 worker binds to
physical GPU 0 and the losers OOM.
2. **NIXL port collision**: `_compute_port_offset()` silently returns
`0` for every replica after a Ray Serve type change, so same-role
replicas fight over one handshake port.

### Root cause 1: every worker lands on GPU 0

#### Symptom
Four worker processes pile onto physical GPU 0 (GPUs 1–3 idle) and OOM.
Each worker gets a distinct `CUDA_VISIBLE_DEVICES` (Ray assigned GPUs
0–3 correctly), yet all resolve to the same `phys_uuid`, so the
environment variable had no effect.

#### Mechanism
`RayExecutorV2` passes `vllm_config` as a Ray actor constructor arg,
which Ray deserializes before `__init__` runs. Unpickling the
w4a16/AutoGPTQ `VllmConfig` triggers an import chain (`fused_moe →
all2all → flashinfer`) whose module-level `CompilationContext()` calls
`torch.cuda.get_device_capability()`
([code](https://github.com/flashinfer-ai/flashinfer/blob/7a263cd3a704db2fd642ed40ae6d03ae7a8f164d/flashinfer/compilation_context.py#L78)).
With `CUDA_VISIBLE_DEVICES` still unset, this binds the process to
`cuda:0`. The later `os.environ` assignment can't move an
already-initialized context, so every worker selects `cuda:0`.

#### Why the old executor was immune
`RayDistributedExecutor` sets `CUDA_VISIBLE_DEVICES` via a dedicated RPC
and only then shipped `vllm_config` in a separate `init_worker` RPC, so
the CUDA-initializing import ran after the env was correct.

#### Why it's flaky
The collision is deterministic per fresh worker, but Ray reuses workers,
which retain their prior CUDA context. Depending on process reuse and
start ordering across the 8 engines, replicas sometimes spread and
sometimes collide.

#### Fix
Don't deserialize `vllm_config` until `CUDA_VISIBLE_DEVICES` is set.
Ship it to the actor as cloudpickle bytes, then in `initialize_worker()`
apply the env vars first and `loads()` the config second, before
`super().__init__()`.

#### Not all models experience this problem
For example, Qwen/Qwen3-0.6B (bf16) has no quantization config, so
deserializing its `vllm_config` never triggers the
`quant→fused_moe→flashinfer` import that creates a CUDA context.
Qwen/Qwen3-0.6B-FP8, on the other hand, carries an FP8 quant config, so
deserializing it imports flashinfer and binds cuda:0 before
`CUDA_VISIBLE_DEVICES` is set.

### Root cause 2: NIXL side-channel port collision

#### Symptom
This is only visible after fixing the first root cause, since before it
the engines OOM first. Both prefill replicas bind `20000`, both decode
replicas bind `22000` → `ZMQError: Address already in use`. The losing
handshake-listener thread dies, so cross-engine KV transfer can't
hand-shake and requests fail.

#### Mechanism
#58471 changed `ReplicaContext.rank` from a plain `int` to a
`ReplicaRank` pydantic model. `_compute_port_offset()` does `rc.rank *
num_devices`, which now raises `TypeError`; a bare `except Exception:
pass` swallows it and returns the fallback `0` for every replica,
collapsing the per-replica spacing.

#### Fix
Pull the integer rank out of the model:

```python
rc = serve.get_replica_context()
return rc.rank.rank * num_devices
```

Now prefill gets ports `20000,20001`, decode gets `22000..22005`, and
each TP worker still adds its `tp_rank` at bind time.

---------

Signed-off-by: Jeffrey Wang <jeffreywang@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
## Description
1. add S3 read `500 UNKNOWN` error for retry 

## Additional information
release test passed here
https://buildkite.com/ray-project/release/builds/95416#

---------

Signed-off-by: Lehui Liu <lehui@anyscale.com>
…e behind `preserve_order` (#63792)

Part of the iter_batches consumer pipeline cleanup (#63660, #63682).
Gates restore_original_order behind
`DataContext.execution_options.preserve_order` (default off). When one
format/collate worker lags, the reorder buffer grows with the other
workers' completed batches, and ready batches aren't allowed to be
yielded; this PR skips the reorder step when ordering isn't required.
Recovers next-batch latency from PR1+2's regressed 113 ms steady back to
23 ms (lower than master's 32 ms), with no other regressions.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
…gger (#63843)

Compiled graphs is not actively being developed, so there's no need to
trigger their tests in every premerge/postmerge run.

---------

Signed-off-by: davik <davik@anyscale.com>
Co-authored-by: davik <davik@anyscale.com>
## Description
Pull the bucketing logic out of `RoundRobinPartitioner` into a reusable,
generic `WeightedRoundRobinPartitioner` in `ray.data._internal`. No
behavior change; `RoundRobinPartitioner` now delegates to it.

The purpose is to have the abstraction be shareable between `ReadFiles`
and `download()`

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… temporarily to unblock release (#63863)

## Description
The `image_embedding_from_uris_fixed_size` release is spilling and
blocking release. Disable the fail-on-spill check temporarily to unblock
release. Will need to investigate the cause afterwards.

---------

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Signed-off-by: HFFuture <ray-huangsirui@hotmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
…#63859)

## Why

`llm_serve_llama_3dot1_8B_tp_2` and
`llm_serve_llama_3dot1_8B_tp_2_direct_streaming` intermittently fail at
service startup:

```
AssertionError: Service ... is STARTING, expected RUNNING.
```

The service starts successfully, just slowly. Inspecting the service
logs from a failing run, the 600s budget is dominated by cold infra, not
the engine:

| phase | ~duration |
|---|---|
| head node provisioning | ~80s |
| GPU worker node autoscale + cu130 image pull + cluster join (replica
`PENDING`; controller repeatedly logs "waiting for the cluster to
auto-scale") | ~300s |
| vLLM engine init (weight download, load, torch.compile, CUDA graph
capture) | ~100s |

In the failing build the `LLMServer` replica became healthy at **459s**
and the service reached RUNNING just after the **600s** deadline, so the
test terminated it ~30s short. The variance is almost entirely in GPU
node provisioning, so 600s is too tight.

## Change

Raise the `--timeout` default in `run_llm_serve_test_and_bms.py` (the
value these tests actually use) and the `start_service` default in
`test_utils.py` to **900s**, matching the value
`llm_serve_llama_3dot1_8b_lora` already passes explicitly for the same
reason.

No behavioral change beyond the longer startup grace period.

Signed-off-by: Seiji Eicher <seiji@anyscale.com>
… truth (#63367)

## What this PR is

`doc/redirects/current.yaml` is the curated source of truth for the HTTP
redirects configured on the `anyscale-ray` Read the Docs project
(docs.ray.io), managed with
[rtd-redirects](https://github.com/anyscale/rtd-redirects). This PR
establishes the file and the README policy for changing redirects.

The original 287-rule bootstrap snapshot (a February 2023 bulk import,
never cleaned) is preserved in this PR's history. The earlier review-bot
findings (circular redirects, duplicate entries, a `.rst` typo) were
properties of that historical snapshot and are resolved in the curated
state.

## Cleanup summary (May 15 - June 3, 2026)

- 287 -> 172 rules. 169 version-agnostic `page` rules plus 3 intentional
version-pinned `exact` rules.
- Every rule's target verified live (HTTP 200); broken, duplicate,
shadowed, and test rules removed or retargeted.
- All redirect chains flattened to single hops, anchors preserved.
- All rules now return 301 (previously 99% returned 302).
- Two prefix-rename groups consolidated into wildcard rules
(`/ray-core/tasks/patterns/*`, `/auto_examples/*`).

## Verification

- `rtd-redirects plan --file doc/redirects/current.yaml --strict`
against the live project: **no changes** and a clean validator run. The
file mirrors production exactly.
- End-to-end probes across every change category: 10/10 serve a single
301 hop to a 200 destination.

## Policy

Details in `doc/redirects/README.md`: redirect changes go through this
file via PR; dashboard edits are out-of-process; `rtd-redirects plan`
audits drift; a maintainer runs `apply` after merge (CI automation
planned).

[DOC-928] [DOC-947]

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.