Skip to content

[data] fix: make overlong prompt filter picklable#6888

Open
zhangdw156 wants to merge 2 commits into
verl-project:mainfrom
zhangdw156:fix/picklable-prompt-filter
Open

[data] fix: make overlong prompt filter picklable#6888
zhangdw156 wants to merge 2 commits into
verl-project:mainfrom
zhangdw156:fix/picklable-prompt-filter

Conversation

@zhangdw156

@zhangdw156 zhangdw156 commented Jun 29, 2026

Copy link
Copy Markdown

What does this PR do?

Follow-up to #4894 and #6872.

This PR addresses the maintainer feedback to make the overlong prompt filter picklable under Dataset.filter(..., num_proc=N) instead of falling back to single-process filtering.

The previous implementation passed a nested lambda/doc2len closure to Hugging Face Datasets. In the multimodal path that closure captured self and the processor instance, so processors containing non-picklable state such as ssl.SSLContext could fail before multiprocessing workers processed any examples.

This change routes prompt-length filtering through module-level callables and picklable fn_kwargs. For multiprocessing, tokenizer/processor paths are passed to workers and loaded lazily per worker process with a small process-local cache. Single-process filtering keeps using the already-created tokenizer/processor object, and pathless tokenizer/processor fallbacks now explicitly disable multiprocessing so live objects are not sent through num_proc workers.

This is not duplicating an existing PR: duplicate-work checks found no open PRs for #4894 or the area keywords.

AI assistance was used to prepare this change; the submitting human should review every changed line and the test evidence before merge.

Checklist Before Starting

Test

uvx pre-commit run ruff --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py
uvx pre-commit run ruff-format --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py
python -m compileall -q verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py
PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray pytest tests/utils/dataset/test_rl_dataset_on_cpu.py -q -k 'picklable_multiprocess_processor_filter or disables_multiprocessing_without_processor_path or disables_multiprocessing_without_tokenizer_path or build_messages_accepts_image_path or build_messages_accepts_video_path_or_frame_list or maybe_filter_out_long_prompts_accepts_image_path'
PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray --with qwen-vl-utils pytest tests/utils/test_audio_input_support_on_cpu.py -q
git diff --check

Also ran a local Dataset.from_list(...).filter(num_proc=2) smoke test with a fake processor holding ssl.create_default_context(). The filter completed with multiprocessing enabled and kept the expected short prompt.

API and Usage Example

No public API change. Existing data.filter_overlong_prompts_workers behavior is preserved; when multiprocessing is configured, the filter callable and kwargs are now picklable instead of capturing RLHFDataset or processor objects.

# Existing config continues to use multiprocessing filtering when the tokenizer/processor
# can be reloaded from a path.
data.filter_overlong_prompts=True
data.filter_overlong_prompts_workers=2

Design & Code Changes

  • Replace nested prompt-length filter closures with module-level filter callables.
  • Pass filter state through fn_kwargs so Dataset.filter(num_proc=...) can pickle the callable and arguments.
  • Pass tokenizer/processor paths to multiprocessing workers and lazily load them with a process-local cache.
  • Preserve single-process behavior by using the already-created tokenizer/processor object when num_proc is disabled.
  • Disable multiprocessing for pathless tokenizer/processor fallbacks so live objects are not serialized to workers.
  • Expose name_or_path on processors created by hf_processor() so worker processes can reconstruct the processor.
  • Add CPU regression coverage for the picklable multiprocessing processor path and both pathless processor/tokenizer fallbacks.

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

  • Read the Contribute Guide.
  • Apply pre-commit checks: pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always
    • Ran targeted pre-commit ruff and ruff-format hooks on the changed files listed above.
  • Add / Update the documentation.
    • Not applicable: no user-facing API or docs behavior change.
  • Add unit or end-to-end test(s) to the CI workflow to cover all the code. If not feasible, explain why: added CPU regression tests in tests/utils/dataset/test_rl_dataset_on_cpu.py.
  • Once your PR is ready for CI, send a message in the ci-request channel in the verl Slack workspace. (If not accessible, please try the Feishu group (飞书群).)
  • If your PR is related to the recipe submodule, please also update the reference to the submodule commit via git submodule update --remote or cd recipe && git pull origin main.
    • Not applicable: no recipe submodule change.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the prompt length filtering logic in RLHFDataset to support multiprocessing by extracting the filtering logic into standalone, picklable helper functions and resolving processor/tokenizer paths for worker processes. It also adds corresponding unit tests and ensures the processor's name or path is preserved. However, a potential issue was identified where multiprocessing is not disabled when a processor or tokenizer path cannot be resolved, which would lead to a PicklingError when attempting to serialize unpicklable objects across processes. A suggestion was provided to dynamically disable multiprocessing in these fallback scenarios.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +457 to 488
use_multiprocessing = self.num_workers not in (None, 0)

if self.processor is not None:
processor_path = _get_processor_path(self.processor)
if use_multiprocessing and processor_path is not None:
filter_kwargs["processor_path"] = processor_path
else:
filter_kwargs["processor"] = self.processor
filter_kwargs.update(
{
"image_key": self.image_key,
"video_key": self.video_key,
"audio_key": self.audio_key,
"image_patch_size": self.image_patch_size,
"mm_processor_kwargs": dict(**self.mm_processor_kwargs),
}
)
filter_function = _filter_prompt_length_with_processor
else:

def doc2len(doc) -> int:
try:
apply_kwargs = dict(**self.apply_chat_template_kwargs)
if self.tool_schemas is not None:
apply_kwargs["tools"] = self.tool_schemas

# Keep explicit tokenization to avoid transformers version default changes.
apply_kwargs.pop("tokenize", None)
apply_kwargs.pop("return_dict", None)
apply_kwargs.pop("return_tensors", None)

tokenized_prompt = tokenizer.apply_chat_template(
doc[prompt_key], add_generation_prompt=True, tokenize=True, **apply_kwargs
)
return len(normalize_token_ids(tokenized_prompt))
except Exception:
print("Error processing one of the samples, skipping...")
traceback.print_exc()
return self.max_prompt_length + 1
tokenizer_path = _get_tokenizer_path(self.tokenizer)
if use_multiprocessing and tokenizer_path is not None:
filter_kwargs["tokenizer_path"] = tokenizer_path
else:
filter_kwargs["tokenizer"] = self.tokenizer
filter_function = _filter_prompt_length_with_tokenizer

dataframe = dataframe.filter(
lambda doc: doc2len(doc) <= self.max_prompt_length,
filter_function,
num_proc=self.num_workers,
desc=f"Filtering prompts longer than {self.max_prompt_length} tokens",
fn_kwargs=filter_kwargs,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If self.num_workers is configured for multiprocessing (i.e., not None or 0), but the processor or tokenizer path cannot be resolved (returning None), the code currently falls back to passing the self.processor or self.tokenizer object directly in filter_kwargs. However, because num_proc is still set to self.num_workers (> 0), Hugging Face Datasets will attempt to serialize these objects across processes. If they contain unpicklable state (such as ssl.SSLContext), this will result in a PicklingError and crash.

To prevent this, we should dynamically disable multiprocessing (by setting use_multiprocessing = False and passing num_proc = None) whenever the path cannot be resolved, ensuring a graceful fallback to single-process filtering.

            use_multiprocessing = self.num_workers not in (None, 0)

            if self.processor is not None:
                processor_path = _get_processor_path(self.processor)
                if processor_path is None:
                    use_multiprocessing = False
                if use_multiprocessing:
                    filter_kwargs["processor_path"] = processor_path
                else:
                    filter_kwargs["processor"] = self.processor
                filter_kwargs.update(
                    {
                        "image_key": self.image_key,
                        "video_key": self.video_key,
                        "audio_key": self.audio_key,
                        "image_patch_size": self.image_patch_size,
                        "mm_processor_kwargs": dict(**self.mm_processor_kwargs),
                    }
                )
                filter_function = _filter_prompt_length_with_processor
            else:
                tokenizer_path = _get_tokenizer_path(self.tokenizer)
                if tokenizer_path is None:
                    use_multiprocessing = False
                if use_multiprocessing:
                    filter_kwargs["tokenizer_path"] = tokenizer_path
                else:
                    filter_kwargs["tokenizer"] = self.tokenizer
                filter_function = _filter_prompt_length_with_tokenizer

            dataframe = dataframe.filter(
                filter_function,
                num_proc=self.num_workers if use_multiprocessing else None,
                desc=f"Filtering prompts longer than {self.max_prompt_length} tokens",
                fn_kwargs=filter_kwargs,
            )

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the latest push. Pathless processor/tokenizer objects now disable multiprocessing and call Dataset.filter with num_proc=None, while path-backed processors/tokenizers still use the reload-by-path multiprocessing path. Added regression coverage for both pathless processor and pathless tokenizer fallbacks.

zhangdw156 and others added 2 commits June 29, 2026 17:33
Route prompt-length filtering through top-level callables and picklable fn_kwargs so Hugging Face Dataset.filter(num_proc=...) does not serialize RLHFDataset or processor instances when a processor path is available.

Constraint: maintainer feedback on verl-project#6872 requested making the filter picklable in multiprocessing rather than falling back to single-process filtering.

Rejected: single-process fallback | it hides the serialization issue and disables the configured multiprocessing path.

Confidence: high

Scope-risk: moderate

Directive: Keep Dataset.filter callables top-level and avoid capturing self or processor objects on multiprocessing paths.

Tested: uvx pre-commit run ruff --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py; uvx pre-commit run ruff-format --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py; python -m compileall -q changed files; PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray pytest tests/utils/dataset/test_rl_dataset_on_cpu.py -q -k picklable_multiprocess_processor_filter; PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray --with qwen-vl-utils pytest tests/utils/test_audio_input_support_on_cpu.py -q; local Dataset.from_list num_proc=2 smoke with unpickleable SSLContext processor.

Not-tested: full dataset test file because existing tests require local model/data paths under ~/models and ~/data.

Co-authored-by: OpenAI Codex <codex@openai.com>
Avoid sending live tokenizer or processor objects through Dataset.filter(num_proc=...) when they cannot be reconstructed from a path. This keeps the picklable multiprocessing path for normal processors while using single-process filtering for pathless custom objects.

Constraint: Gemini review on PR verl-project#6888 identified pathless tokenizer/processor objects as still unpicklable across multiprocessing workers.
Rejected: Always disable multiprocessing for prompt filtering | Path-backed processors and tokenizers can be reloaded inside workers without pickling live objects.
Confidence: high
Scope-risk: narrow
Directive: Multiprocessing filter kwargs must contain reload paths, not live tokenizer or processor objects.
Tested: uvx pre-commit run ruff --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py
Tested: uvx pre-commit run ruff-format --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py
Tested: python -m compileall -q verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py
Tested: PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray pytest tests/utils/dataset/test_rl_dataset_on_cpu.py -q -k 'picklable_multiprocess_processor_filter or disables_multiprocessing_without_processor_path or disables_multiprocessing_without_tokenizer_path or build_messages_accepts_image_path or build_messages_accepts_video_path_or_frame_list or maybe_filter_out_long_prompts_accepts_image_path'
Tested: PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray --with qwen-vl-utils pytest tests/utils/test_audio_input_support_on_cpu.py -q
Tested: git diff --check
Not-tested: Full repository test suite.
Co-authored-by: OpenAI Codex <codex@openai.com>
@zhangdw156 zhangdw156 force-pushed the fix/picklable-prompt-filter branch from c053bfd to ddf6dec Compare June 29, 2026 09:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant