[fully_async] feat: Fully Async Policy with TransferQueue#6628
[fully_async] feat: Fully Async Policy with TransferQueue#6628ArronHZG wants to merge 5 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new fully asynchronous training backend that replaces the legacy Ray MessageQueue with TransferQueue (TQ) and a custom ReplayBuffer Ray Actor. The new architecture leverages TQ's zero-copy data transport and uses a dual-layer slot mechanism (physical and version window) for backpressure and staleness control. The trainer side (FullyAsyncTrainerTQ) utilizes multiple inheritance from PPOTrainer and FullyAsyncTrainer to reuse the native KVBatchMeta training pipeline. Feedback on the changes highlights two critical issues: first, a potential crash in FullyAsyncRollouterTQ._validate when converting scalar values extracted from NonTensorData into lists; second, an infinite loop in ReplayBuffer.sample where orphaned keys are popped from the in-memory cache but never permanently deleted from the underlying TransferQueue, causing them to be restored on subsequent polls.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| try: | ||
| import transfer_queue as tq | ||
| except ImportError: | ||
| print("Please install TQ by calling `pip install TransferQueue==0.1.6` and try again.") |
There was a problem hiding this comment.
TODO: update doc description
|
#6868 This pr adds fully async to v1 trainer |
Fully Async Policy with TransferQueue (TQ)
Overview
This solution builds upon
fully_async_policyby migrating the data transport channel from Ray MessageQueue to TransferQueue (TQ),while the training side reuses
main_ppo_sync.py'sPPOTrainervia multiple inheritance to directly leverage TQ's nativeKVBatchMetatraining pipeline.Core Design Principles
FullyAsyncAgentLoopManagerintact, preserving existing inference generation logic; only adapt lightly viaFullyAsyncAgentLoopManagerTQ_should_pause_generationwith ReplayBuffer (Ray Actor)'s Dual-Layer Slot mechanismclass FullyAsyncTrainerTQ(PPOTrainer, FullyAsyncTrainer)Core Objectives
ray.cloudpickleserialization overhead_feed_samples) viaacquire_slot()_compute_old_log_prob,_compute_advantage, etc.)Architecture Comparison
Existing Architecture (MessageQueue + SeparateRayPPOTrainer)
graph TB subgraph Rollouter[Rollouter - SeparateRayPPOTrainer] RM[FullyAsyncRollouter] ALM[FullyAsyncAgentLoopManager] end subgraph MQ[MessageQueue - Ray Actor] MQ_Q[deque - pickle serialization] end subgraph Trainer[Trainer - SeparateRayPPOTrainer] TM[FullyAsyncTrainer] end RM -->|_feed_samples| PQ[pending_queue] PQ -->|_processor_worker| ALM ALM -->|generate_sequences_single| RM RM -->|put_sample<br/>cloudpickle| MQ_Q MQ_Q -->|get_sample<br/>cloudpickle| TM TM -->|fit_step| TM RM -.->|_should_pause_generation<br/>check queue_size| MQ_Q RM -.->|staleness_samples<br/>manual counter| RMProblems:
ray.cloudpickleserialization/deserialization overhead is significant_should_pause_generationpause logic is complex (drain → resume)SeparateRayPPOTrainer) differs greatly from colocate training pipeline, requiring maintenance of two codebasesNew Architecture (TQ + ReplayBuffer + PPOTrainer Multiple Inheritance)
graph TB subgraph Rollouter["Rollouter (TQFullyAsyncRollouter)"] RM[TQFullyAsyncRollouter] ALM["FullyAsyncAgentLoopManager<br/>(+TQ adapter, wait=True)"] end subgraph RB["ReplayBuffer (Ray Actor)"] RB_SLOT["Layer1: Physical Slot<br/>acquire / release"] RB_VER["Layer2: Version Window<br/>reset_staleness resets to zero"] RB_META["Metadata + wait_and_sample"] end subgraph TQ[TransferQueue] TQ_DATA["Zero-Copy Tensor"] end subgraph Worker["AgentLoopWorkerTQ (main_ppo_sync.py)"] W_POST["_agent_loop_postprocess → tq.put"] end subgraph Trainer["Trainer (TQFullyAsyncTrainer)"] TM["Multi-inherit: PPOTrainer + FullyAsyncTrainer"] end RM -->|" 1. acquire_slot 🔒 "| RB_SLOT RM -->|" 2. processor "| ALM ALM -->|generate_sequences| Worker Worker -->|" 3. tq.put (status=success/finished) "| TQ_DATA TQ_DATA -->|kv_list poll| RB_META RM -->|" 4. release_slot ✅ "| RB_SLOT TM -->|" 5. wait_and_sample → KVBatchMeta "| RB_META TM -->|" 6. kv_batch_get → PPO pipeline "| TQ_DATA TM -->|" 7. kv_clear + remove "| TQ_DATA TM -->|" 8. reset_staleness "| RB_VERCore Changes:
_should_pause_generation+ staleness_samplesacquire_slot()at_feed_samplessource-level controlSeparateRayPPOTrainerPPOTrainer×FullyAsyncTrainermultiple inheritanceAgentLoopWorkerTQ._agent_loop_postprocessFullyAsyncAgentLoopManagerFullyAsyncAgentLoopManagerTQ(lightweight subclass)Core Components
1. ReplayBuffer (Ray Actor) — Metadata Channel + Dual-Layer Slot Flow Control
File:
replay_buffer.pyA lightweight Ray Actor that simultaneously handles metadata storage and Dual-Layer Slot flow control.
Dual-Layer Slot Control Mechanism
acquire_slot()is the single gatekeeping interface between Rollouter and RB, simultaneously handling two responsibilities:State Transition:
stateDiagram-v2 [*] --> Idle: Initialization Idle --> Acquired: acquire_slot() both conditions pass Acquired --> InFlight: Write to pending_queue InFlight --> Done: AgentLoopWorkerTQ writes to TQ status=success/finished Done --> Idle: release_slot() note right of Idle L1: pending_slots < max_pending_slots L2: _version_slots < max_version_slots end note note right of Acquired pending_slots++ version_slots++ Rollouter can continue processing data end note note right of InFlight Generating in progress Layer1 occupies 1 physical slot Layer2 occupies 1 version slot (cumulative) end note note right of Done Layer1: release_slot() → pending_slots-- Layer2: unchanged (only reset_staleness resets to zero) end noteCore Interfaces
acquire_slot(timeout, uid)release_slot()wait_and_sample(partition_id, sample_size, rollout_n)remove(partition_id, keys)reset_staleness()signal_finish()Background Tasks
_poll_from_tq(): Periodically pollstq.kv_list()to get a global TQ snapshot, atomically replacingself.partitions. Includes UID integrity check: detects orphan keys (meta.uid doesn't match key prefix) and auto-cleans them._monitor_loop(): Prints buffer statistics every 60 seconds.Usage Pattern
2. FullyAsyncAgentLoopManagerTQ — AgentLoop Lightweight Adapter
File:
fully_async_rollouter_tq.pyKey Points:
AgentLoopWorkerTQ(defined inmain_ppo_sync.py)generate_sequences_singleaddswait=True: ensures Rollouter knows when generation completes, avoiding deadlocksAgentLoopWorkerTQ._agent_loop_postprocessdirectly writes results to TQ (tq.async_kv_batch_put), does not return data to RollouterLifecycle after writing to TQ:
AgentLoopWorkerTQwrites{uid}_{session_id}_{index}response keys (status=success)AgentLoopWorkerTQwrites{uid}uid-level key (status=finished)_poll_from_tqdiscovers new keys viatq.kv_list(), updatesself.partitionswait_and_sampledetects enough finished uids, returns to Trainertq.kv_batch_get, executes PPO trainingtq.kv_clear+rb.removecleanup3. TQFullyAsyncRollouter — Rollouter Adapter
File:
fully_async_rollouter_tq.pyAn incremental modification subclass based on
FullyAsyncRollouter. Core changes are concentrated in three areas: data feeding, sample processing, and validation.3.1
_feed_samples— Source-Level Flow ControlKey Differences from Base Class:
prepare_single_generation_data()(no repeat(n)), instead injects__rollout_n__field so thatAgentLoopWorkerTQ._run_promptloops n times internallybatch_size=1(bsz=1), each prompt processed individuallyuid/__rollout_n__/sample_id/global_stepsinjected asnp.arrayinto plain dict, becomingNonTensorStackaftertu.get_tensordict()3.2
_process_single_sample_streaming— Simplified to generate + releaseCore Difference from Base Class: The base class needs to manually put generation results to MessageQueue; in the TQ path, data writing is handled by
AgentLoopWorkerTQ._agent_loop_postprocess, so Rollouter only needs to call generate then release_slot.3.3 Deleted/Disabled Methods
_should_pause_generation()Falseacquire_slot_async_monitor_loop()3.4 Validation Flow
_validateOverrides base class validation method, using TQ + ReplayBuffer path.
4. TQFullyAsyncTrainer — Multi-Inheritance Trainer
File:
fully_async_trainer_tq.pyThe most core design decision: Use Python multiple inheritance
class FullyAsyncTrainerTQ(PPOTrainer, FullyAsyncTrainer)to gain capabilities from both sides:Data Flow Details
Complete Lifecycle Sequence Diagram
sequenceDiagram participant R as Rollouter<br/>(_feed_samples) participant RB as ReplayBuffer<br/>(Ray Actor) participant PQ as pending_queue participant P as processor<br/>(_processor_worker) participant ALM_TQ as ALM_TQ<br/>(DataProto→TensorDict) participant ALW as AgentLoopWorkerTQ<br/>(main_ppo_sync.py) participant TQ as TransferQueue participant T as Trainer<br/>(TQFullyAsyncTrainer) Note over R, T: === Phase 1: Rollouter Production (Source-Level Flow Control) === loop dataloader iteration R ->> RB: acquire_slot(uid=sample_id) [may block🔒] RB -->> R: slot acquired (L1++, L2++) R ->> R: Inject uid/__rollout_n__/sample_id/global_steps R ->> R: tu.get_tensordict(batch_dict) → bsz=1 TensorDict R ->> PQ: put(RolloutSample{full_batch, sample_id}) end R ->> PQ: put(None) [end signal] Note over R, T: === Phase 2: Generation (ALM_TQ Adapter + Worker Writes TQ) === loop processor worker P ->> PQ: get() P ->> ALM_TQ: generate_sequences_single(batch, wait=True) ALM_TQ ->> ALM_TQ: Select worker (round-robin) ALM_TQ ->> ALW: generate_sequences.remote(batch, wait=True) ALW ->> ALW: _run_prompt loops __rollout_n__ times ALW ->> ALW: _compute_score (reward model) ALW ->> TQ: tq.async_kv_batch_put(response keys, status=success) ALW ->> TQ: tq.async_kv_batch_put(uid key, status=finished) ALW -->> ALM_TQ: return (wait=True ensures completion) ALM_TQ -->> P: return P ->> RB: release_slot() ✅ (L1--) RB -->> R: (wake up waiting acquire_slot) end Note over R, T: === Phase 3: Trainer Consumption + PPO Training === loop fit_step (per step) T ->> RB: wait_and_sample(sample_size=N, rollout_n=n) [block⏳] RB -->> T: [(key1,tag1), ..., (keyN,tagN)] (KVBatchMeta) T ->> TQ: kv_batch_get(keys) [called internally by various _compute_* methods] TQ -->> T: Complete tensor data (prompts, responses, log_probs, ...) T ->> T: _balance_batch → _compute_old_log_prob T ->> TQ: kv_batch_put(old_log_prob, ...) [write back to TQ] T ->> T: _compute_advantage → _update_actor T ->> TQ: kv_batch_put(advantages, returns, ...) T ->> TQ: kv_clear(response keys + uid keys) T ->> RB: remove(keys) end Note over R, T: === Phase 4: Parameter Sync (every trigger_parameter_sync_step steps) === T ->> T: _fit_update_weights(): checkpoint_manager.update_weights() T ->> RB: reset_staleness(): _version_slots reset to zero, unblock L2Dual-Layer Slot Control Detailed Semantics
MessageQueue.queue_sizeRB._pending_slots(Layer 1: Physical Throttling)max_queue_sizemax_pending_slots(Layer 1)_should_pause_generation()acquire_slot()dual-condition = flow controlstaleness_samples(manual counter)RB._version_slots(Layer 2: Cumulative Counter)max_required_samplesmax_version_slots(Layer 2)paused+drain+resumeBefore vs After Comparison:
Parameter Sync Flow:
Usage
Launch Script Example
Key configuration points:
Dependency Installation
All TQ-related code has fallback behavior: when
import transfer_queuefails, the mock implementation inverl.utils.transferqueue_utilsis automatically used.Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,veomni,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,fully_async,one_step_off,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
API and Usage Example
# Add code snippet or script demonstrating how to use thisDesign & Code Changes
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.