Skip to content

[feat] Add StreamingTokenBudgetSampler for token-budget streaming fetch#3

Merged
NINGBENZHE merged 2 commits into
mainfrom
feat/yuzhe/support_stream_perfetch_bucket
Jun 25, 2026
Merged

[feat] Add StreamingTokenBudgetSampler for token-budget streaming fetch#3
NINGBENZHE merged 2 commits into
mainfrom
feat/yuzhe/support_stream_perfetch_bucket

Conversation

@NINGBENZHE

Copy link
Copy Markdown
Member

Introduce StreamingTokenBudgetSampler and wire up the controller, client, and storage managers to support a token-budget fetch mode for fully-async / dynamic-batch consumers:

  • get_metadata/get_meta accept token_budget (mutually exclusive with batch_size); the controller polls the streaming sampler instead of waiting for N ready samples.
  • user_custom_meta is written before samples are marked ready, so streaming consumers never observe a ready sample without its custom_meta.
  • async_put accepts inline custom_meta that lands atomically with readiness, avoiding the put/set_custom_meta round-trip and race.

@NINGBENZHE NINGBENZHE force-pushed the feat/yuzhe/support_stream_perfetch_bucket branch from 9147a0b to 739506f Compare June 11, 2026 09:05
    Introduce StreamingTokenBudgetSampler and wire up the controller, client,
    and storage managers to support a token-budget fetch mode for fully-async /
    dynamic-batch consumers:

    - get_metadata/get_meta accept token_budget (mutually exclusive with
      batch_size); the controller polls the streaming sampler instead of waiting
      for N ready samples.
    - user_custom_meta is written before samples are marked ready, so streaming
      consumers never observe a ready sample without its custom_meta.
    - async_put accepts inline custom_meta that lands atomically with readiness,
      avoiding the put/set_custom_meta round-trip and race.

Signed-off-by: 宁本哲 <ningbenzhe@xiaohongshu.com>
@NINGBENZHE NINGBENZHE force-pushed the feat/yuzhe/support_stream_perfetch_bucket branch from 739506f to f98d76a Compare June 11, 2026 09:56
@NINGBENZHE NINGBENZHE force-pushed the feat/yuzhe/support_stream_perfetch_bucket branch from 5f2de10 to 7d5979c Compare June 25, 2026 03:26
- Track per-partition completion via the producer's is_last signal instead of
  pre-allocating to global_batch_size: add production_completed, actual_sample_count,
  pending_last_indexes/fields on DataPartitionStatus
- Thread is_last through put -> get_meta(insert) -> get_metadata; accumulate the
  true inserted sample count on the insert path only
- Flip production_completed only once the is_last batch is produced for the
  PRODUCER's own fields (not downstream-backfilled columns), avoiding a
  completion deadlock when advantages/ref backfill extra columns
- Add check_stream_drained (production_completed AND all inserted samples consumed)
  and check_production_completed (producer-side gate) ZMQ ops + client wrappers

- Pad/grow the lazily-sized per-task consumption tensor in is_stream_drained
  (read-only) and mark_consumed (write) so a high global index never overruns it
  (was: IndexError that killed the controller request thread)

- Re-run the completion check right after the insert sets has_pending_last, in
  case the is_last batch's production NOTIFY arrived before the insert RPC

- At EOS slice each DP bucket by token budget over successive batch_index rounds
  (not one oversized pop -> OOM); cache an explicit empty result for an empty DP
  so batch_index alignment is not frozen and residue is never stranded

---

- production_completed gating, backfilled-field independence, OOB consumption
  tensor, notify-before-is_last race, EOS multi-round drain within token budget

Signed-off-by: 宁本哲 <ningbenzhe@xiaohongshu.com>
@NINGBENZHE NINGBENZHE force-pushed the feat/yuzhe/support_stream_perfetch_bucket branch from 7d5979c to a447841 Compare June 25, 2026 03:28
@NINGBENZHE NINGBENZHE merged commit dcc78f0 into main Jun 25, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant