Skip to content

[trainer, fully_async] feat: add streaming rollouter mode to the V1 PPO trainer#6868

Open
huaiyizhao wants to merge 5 commits into
verl-project:mainfrom
huaiyizhao:feat/v1-fully-async-streaming-rollouter
Open

[trainer, fully_async] feat: add streaming rollouter mode to the V1 PPO trainer#6868
huaiyizhao wants to merge 5 commits into
verl-project:mainfrom
huaiyizhao:feat/v1-fully-async-streaming-rollouter

Conversation

@huaiyizhao

Copy link
Copy Markdown
Contributor

What does this PR do?

Adds a new fully_async trainer mode to the V1 PPO trainer that decouples rollout generation from the training step, so generation and training overlap instead of running lock-step (one batch fed per step). This brings the streaming-rollouter capability of verl/experimental/fully_async_policy to the V1 trainer, while reusing the existing TransferQueue as the streaming channel (no separate MessageQueue / Rollouter actor).

Add concise overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review.

Checklist Before Starting

  • Search for similar PRs. Paste at least one query link here: ...
  • Format the PR title as [{modules}] {type}: {description} (This will be checked by the CI)
    • {modules} include fsdp, megatron, veomni, sglang, vllm, rollout, trainer, ci, training_utils, recipe, hardware, deployment, ray, worker, single_controller, misc, perf, model, algo, env, tool, ckpt, doc, data, cfg, reward, fully_async, one_step_off
    • If this PR involves multiple modules, separate them with , like [megatron, fsdp, doc]
    • {type} is in feat, fix, refactor, chore, test
    • If this PR breaks any API (CLI arguments, config, function signature, etc.), add [BREAKING] to the beginning of the title.
    • Example: [BREAKING][fsdp, megatron] feat: dynamic batching

Test

For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc.

API and Usage Example

Demonstrate how the API changes if any, and provide usage example(s) if possible.

# Add code snippet or script demonstrating how to use this

Design & Code Changes

Demonstrate the high-level design if this PR is complex, and list the specific changes.

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new fully_async (streaming) mode for the V1 PPO trainer, allowing an autonomous background feeder to continuously stream prompts into the TransferQueue under an in-flight budget. This decouples rollout generation from training and overlaps the two processes. The changes include the core streaming feeder logic, the PPOTrainerFullyAsync implementation, thread-safety guards for the training dataloader, configuration updates, and comprehensive CPU unit/integration tests and E2E scripts. The review feedback highlights two key improvement opportunities: first, updating the parameter version only during actual weight synchronization steps to prevent underestimating off-policy staleness; second, refactoring the feeder's pause/resume mechanism using a threading.Condition to eliminate polling delays and avoid cumulative GPU idle time.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +98 to +99
with self._param_version_lock:
self._param_version = self.global_steps

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Updating self._param_version at the end of every training step is incorrect because weight synchronization with the standalone rollout only occurs periodically (every parameter_sync_step steps). Tagging prompts with intermediate step numbers makes the trainer believe they were generated with fresher weights than they actually were, which underestimates off-policy staleness and can bypass the max_off_policy_threshold safety guard.

self._param_version should only be updated to self.global_steps when a weight synchronization actually occurs.

Suggested change
with self._param_version_lock:
self._param_version = self.global_steps
if self.global_steps % self.parameter_sync_step == 0:
with self._param_version_lock:
self._param_version = self.global_steps

Comment thread verl/trainer/ppo/v1/streaming_feeder.py Outdated
Comment on lines +85 to +140
self._stop = threading.Event()
self._paused = threading.Event() # when set, the loop stops dispatching new prompts
self.error = False # set True if the feeder thread dies unexpectedly
self._thread: threading.Thread | None = None

def _loop(self):
while not self._stop.is_set():
if self._paused.is_set():
# paused (e.g. during a weight sync): do not dispatch; generation already in
# flight keeps running and is aborted+continued by the checkpoint engine.
self._stop.wait(self._poll_interval)
continue
try:
counts = self._count_inflight()
inflight = counts["pending"] + counts["running"] + counts["finished"] + counts["failure"]
if inflight < self._budget:
self._feed_one_batch(self._param_version())
else:
# interruptible sleep: avoids a tight busy loop while the budget is full
self._stop.wait(self._poll_interval)
except StopIteration:
logger.info("Streaming feeder: dataset exhausted, stopping feeder")
break
except Exception:
logger.exception("Streaming feeder thread crashed")
self.error = True
break

def start(self):
"""Start the background feeder thread."""
self._stop.clear()
self.error = False
self._thread = threading.Thread(target=self._loop, name="streaming-rollout-feeder", daemon=True)
self._thread.start()

def stop(self, timeout: float = 30.0):
"""Signal the feeder to stop and join the thread."""
self._stop.set()
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout=timeout)

def pause(self):
"""Pause dispatching new prompts (e.g. during a weight sync).

Generation already in flight is unaffected (it is aborted+continued by the checkpoint
engine / FullyAsyncLLMServerClient). Idempotent.
"""
self._paused.set()

def resume(self):
"""Resume dispatching after :meth:`pause`. Idempotent."""
self._paused.clear()

@property
def paused(self) -> bool:
return self._paused.is_set()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using self._paused.wait(self._poll_interval) or polling self._paused.is_set() with a sleep interval introduces an unnecessary delay (up to feeder_poll_interval, which defaults to 1.0s) when resuming the feeder after a weight synchronization. This can lead to significant cumulative GPU idle time during training.

Using a threading.Condition variable allows the feeder to be resumed instantly and efficiently without any polling or sleeping delays.

        self._stop = threading.Event()
        self._paused = False
        self._cv = threading.Condition()
        self.error = False  # set True if the feeder thread dies unexpectedly
        self._thread: threading.Thread | None = None

    def _loop(self):
        while not self._stop.is_set():
            with self._cv:
                while self._paused and not self._stop.is_set():
                    self._cv.wait()
            if self._stop.is_set():
                break
            try:
                counts = self._count_inflight()
                inflight = counts["pending"] + counts["running"] + counts["finished"] + counts["failure"]
                if inflight < self._budget:
                    self._feed_one_batch(self._param_version())
                else:
                    # interruptible sleep: avoids a tight busy loop while the budget is full
                    self._stop.wait(self._poll_interval)
            except StopIteration:
                logger.info("Streaming feeder: dataset exhausted, stopping feeder")
                break
            except Exception:
                logger.exception("Streaming feeder thread crashed")
                self.error = True
                break

    def start(self):
        """Start the background feeder thread."""
        self._stop.clear()
        with self._cv:
            self._paused = False
        self.error = False
        self._thread = threading.Thread(target=self._loop, name="streaming-rollout-feeder", daemon=True)
        self._thread.start()

    def stop(self, timeout: float = 30.0):
        """Signal the feeder to stop and join the thread."""
        self._stop.set()
        with self._cv:
            self._cv.notify_all()
        if self._thread is not None and self._thread.is_alive():
            self._thread.join(timeout=timeout)

    def pause(self):
        """Pause dispatching new prompts (e.g. during a weight sync).

        Generation already in flight is unaffected (it is aborted+continued by the checkpoint
        engine / FullyAsyncLLMServerClient). Idempotent.
        """
        with self._cv:
            self._paused = True

    def resume(self):
        """Resume dispatching after :meth:`pause`. Idempotent."""
        with self._cv:
            self._paused = False
            self._cv.notify_all()

    @property
    def paused(self) -> bool:
        with self._cv:
            return self._paused

@wuxibin89 wuxibin89 requested a review from Begunner June 29, 2026 06:59
@wuxibin89

Copy link
Copy Markdown
Collaborator

@huaiyizhao Thanks for you contribution. I have 2 questions:

  1. What's the advantage of streaming rollouter mode compared to lock-step mode?
  2. Do you have e2e experiment result on streaming rollouter mode?

@huaiyizhao huaiyizhao force-pushed the feat/v1-fully-async-streaming-rollouter branch from 30bb9e9 to bf6b966 Compare June 30, 2026 11:41
@CLAassistant

CLAassistant commented Jun 30, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

huaiyizhao and others added 3 commits June 30, 2026 19:52
Add a fully_async trainer mode: an autonomous background feeder thread continuously
streams prompts into TransferQueue (bounded by a staleness/in-flight budget) while step()
only samples + trains, decoupling rollout production from training consumption.

The feeder (thread loop, throttling, weight-sync pause/resume) is fully self-contained in
trainer_fully_async.py and touches the base trainer only through its public state. trainer_base
is left exactly as upstream: the subclass overrides _add_batch_to_generate so the base step()'s
unconditional feed becomes a no-op once the feeder owns generation, owns its own dataloader lock,
and overrides _save_checkpoint to serialize against the feeder thread.

replay_buffer gains count_inflight + a 'none' staleness strategy (TIS-corrected streaming) and a
no-op dead_prompt_keys hook. separate_async reads num_warmup_batches/parameter_sync_step from the
active mode's config so fully_async uses its own cadence.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…mal-rollout discard

Add an opt-in alternative to the default prompt-level agent-loop worker, selected via the existing
agent_loop_manager_class / custom_sampler config injection (the prompt-level implementation is left
unchanged):

- RolloutAgentLoopManagerTQ/RolloutAgentLoopWorkerTQ (agent_loop_tq_rollout.py): dispatch one
  (prompt, session) rollout at a time round-robin across the worker pool, so sibling sessions run
  on different workers and a long-tail rollout never blocks the rest. Each session writes a
  {uid}_sess{session_id} completion marker and offloads postprocess CPU work to a thread pool.
- SessionReplayBuffer (replay_buffer_session.py): readiness derived from per-session completion
  markers; only prompts with >=1 successful session are sampleable; dead_prompt_keys() surfaces
  all-failed prompts so the streaming feeder discards them and a fresh prompt takes the slot.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…real TQ

Cover the default prompt-level ReplayBuffer (status readiness) and the opt-in rollout-level
SessionReplayBuffer (session-counting readiness, abnormal-rollout discard via dead_prompt_keys,
sample/clear cycle) against a real TransferQueue on a local Ray cluster, plus a buffer-level
concurrent produce/consume steady-state check (bounded, no deadlock). The inlined feeder thread
lives in PPOTrainerFullyAsync (pulls the GPU serving stack) and is covered by the GPU e2e scripts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@huaiyizhao huaiyizhao force-pushed the feat/v1-fully-async-streaming-rollouter branch from 62ab8fb to 6c27b8f Compare June 30, 2026 11:56
huaiyizhao and others added 2 commits June 30, 2026 20:10
File-load agent_loop_tq_rollout with the GPU serving stack stubbed, covering what the GPU smoke
run does not assert: build_trajectory_info / extract_sample / mm_token_feature_counts, the
RolloutAgentLoopManagerTQ round-robin session fan-out (+ persistent dispatch cursor), and
_execute_rollout writing a per-session success OR failure completion marker against a real
TransferQueue. The failure-marker branch is the one a healthy GPU run never trips, yet the
abnormal-rollout discard depends on it.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Register "Copyright 2026 Tencent Inc. and/or its affiliates" in the license-header allowlist and
apply it to the files newly added by the streaming work.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@huaiyizhao

Copy link
Copy Markdown
Contributor Author

@wuxibin89 Hi.

  1. Streaming mode keeps the rollout side as an autonomous capacity-driven service. It accepts input prompts whenever the inflight budget has room, which makes it more robust to rollout-side irregularities such as failures, drops, partial rollout abort/continue, long-tail requests, or uneven completion. In the ideal steady state where warmup exactly fills the budget and every step consumes/refills the same amount, its throughput can be close to warmup-based separate async. The practical benefit is flexibility and recovery under non-ideal rollout behavior.
  2. No, it is an re-implementation of fully async trainer in v1. I expect similar speedups compared with colocated mode.
  3. I have pushed another version with rollout-level agentloop worker, which also aims for a more fine-grained control over the rollout process.
  4. The other trainers are left unchanged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants