[data] fix: make overlong prompt filter picklable#6888
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the prompt length filtering logic in RLHFDataset to support multiprocessing by extracting the filtering logic into standalone, picklable helper functions and resolving processor/tokenizer paths for worker processes. It also adds corresponding unit tests and ensures the processor's name or path is preserved. However, a potential issue was identified where multiprocessing is not disabled when a processor or tokenizer path cannot be resolved, which would lead to a PicklingError when attempting to serialize unpicklable objects across processes. A suggestion was provided to dynamically disable multiprocessing in these fallback scenarios.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| use_multiprocessing = self.num_workers not in (None, 0) | ||
|
|
||
| if self.processor is not None: | ||
| processor_path = _get_processor_path(self.processor) | ||
| if use_multiprocessing and processor_path is not None: | ||
| filter_kwargs["processor_path"] = processor_path | ||
| else: | ||
| filter_kwargs["processor"] = self.processor | ||
| filter_kwargs.update( | ||
| { | ||
| "image_key": self.image_key, | ||
| "video_key": self.video_key, | ||
| "audio_key": self.audio_key, | ||
| "image_patch_size": self.image_patch_size, | ||
| "mm_processor_kwargs": dict(**self.mm_processor_kwargs), | ||
| } | ||
| ) | ||
| filter_function = _filter_prompt_length_with_processor | ||
| else: | ||
|
|
||
| def doc2len(doc) -> int: | ||
| try: | ||
| apply_kwargs = dict(**self.apply_chat_template_kwargs) | ||
| if self.tool_schemas is not None: | ||
| apply_kwargs["tools"] = self.tool_schemas | ||
|
|
||
| # Keep explicit tokenization to avoid transformers version default changes. | ||
| apply_kwargs.pop("tokenize", None) | ||
| apply_kwargs.pop("return_dict", None) | ||
| apply_kwargs.pop("return_tensors", None) | ||
|
|
||
| tokenized_prompt = tokenizer.apply_chat_template( | ||
| doc[prompt_key], add_generation_prompt=True, tokenize=True, **apply_kwargs | ||
| ) | ||
| return len(normalize_token_ids(tokenized_prompt)) | ||
| except Exception: | ||
| print("Error processing one of the samples, skipping...") | ||
| traceback.print_exc() | ||
| return self.max_prompt_length + 1 | ||
| tokenizer_path = _get_tokenizer_path(self.tokenizer) | ||
| if use_multiprocessing and tokenizer_path is not None: | ||
| filter_kwargs["tokenizer_path"] = tokenizer_path | ||
| else: | ||
| filter_kwargs["tokenizer"] = self.tokenizer | ||
| filter_function = _filter_prompt_length_with_tokenizer | ||
|
|
||
| dataframe = dataframe.filter( | ||
| lambda doc: doc2len(doc) <= self.max_prompt_length, | ||
| filter_function, | ||
| num_proc=self.num_workers, | ||
| desc=f"Filtering prompts longer than {self.max_prompt_length} tokens", | ||
| fn_kwargs=filter_kwargs, | ||
| ) |
There was a problem hiding this comment.
If self.num_workers is configured for multiprocessing (i.e., not None or 0), but the processor or tokenizer path cannot be resolved (returning None), the code currently falls back to passing the self.processor or self.tokenizer object directly in filter_kwargs. However, because num_proc is still set to self.num_workers (> 0), Hugging Face Datasets will attempt to serialize these objects across processes. If they contain unpicklable state (such as ssl.SSLContext), this will result in a PicklingError and crash.
To prevent this, we should dynamically disable multiprocessing (by setting use_multiprocessing = False and passing num_proc = None) whenever the path cannot be resolved, ensuring a graceful fallback to single-process filtering.
use_multiprocessing = self.num_workers not in (None, 0)
if self.processor is not None:
processor_path = _get_processor_path(self.processor)
if processor_path is None:
use_multiprocessing = False
if use_multiprocessing:
filter_kwargs["processor_path"] = processor_path
else:
filter_kwargs["processor"] = self.processor
filter_kwargs.update(
{
"image_key": self.image_key,
"video_key": self.video_key,
"audio_key": self.audio_key,
"image_patch_size": self.image_patch_size,
"mm_processor_kwargs": dict(**self.mm_processor_kwargs),
}
)
filter_function = _filter_prompt_length_with_processor
else:
tokenizer_path = _get_tokenizer_path(self.tokenizer)
if tokenizer_path is None:
use_multiprocessing = False
if use_multiprocessing:
filter_kwargs["tokenizer_path"] = tokenizer_path
else:
filter_kwargs["tokenizer"] = self.tokenizer
filter_function = _filter_prompt_length_with_tokenizer
dataframe = dataframe.filter(
filter_function,
num_proc=self.num_workers if use_multiprocessing else None,
desc=f"Filtering prompts longer than {self.max_prompt_length} tokens",
fn_kwargs=filter_kwargs,
)There was a problem hiding this comment.
Fixed in the latest push. Pathless processor/tokenizer objects now disable multiprocessing and call Dataset.filter with num_proc=None, while path-backed processors/tokenizers still use the reload-by-path multiprocessing path. Added regression coverage for both pathless processor and pathless tokenizer fallbacks.
Route prompt-length filtering through top-level callables and picklable fn_kwargs so Hugging Face Dataset.filter(num_proc=...) does not serialize RLHFDataset or processor instances when a processor path is available. Constraint: maintainer feedback on verl-project#6872 requested making the filter picklable in multiprocessing rather than falling back to single-process filtering. Rejected: single-process fallback | it hides the serialization issue and disables the configured multiprocessing path. Confidence: high Scope-risk: moderate Directive: Keep Dataset.filter callables top-level and avoid capturing self or processor objects on multiprocessing paths. Tested: uvx pre-commit run ruff --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py; uvx pre-commit run ruff-format --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py; python -m compileall -q changed files; PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray pytest tests/utils/dataset/test_rl_dataset_on_cpu.py -q -k picklable_multiprocess_processor_filter; PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray --with qwen-vl-utils pytest tests/utils/test_audio_input_support_on_cpu.py -q; local Dataset.from_list num_proc=2 smoke with unpickleable SSLContext processor. Not-tested: full dataset test file because existing tests require local model/data paths under ~/models and ~/data. Co-authored-by: OpenAI Codex <codex@openai.com>
Avoid sending live tokenizer or processor objects through Dataset.filter(num_proc=...) when they cannot be reconstructed from a path. This keeps the picklable multiprocessing path for normal processors while using single-process filtering for pathless custom objects. Constraint: Gemini review on PR verl-project#6888 identified pathless tokenizer/processor objects as still unpicklable across multiprocessing workers. Rejected: Always disable multiprocessing for prompt filtering | Path-backed processors and tokenizers can be reloaded inside workers without pickling live objects. Confidence: high Scope-risk: narrow Directive: Multiprocessing filter kwargs must contain reload paths, not live tokenizer or processor objects. Tested: uvx pre-commit run ruff --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py Tested: uvx pre-commit run ruff-format --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py Tested: python -m compileall -q verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py Tested: PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray pytest tests/utils/dataset/test_rl_dataset_on_cpu.py -q -k 'picklable_multiprocess_processor_filter or disables_multiprocessing_without_processor_path or disables_multiprocessing_without_tokenizer_path or build_messages_accepts_image_path or build_messages_accepts_video_path_or_frame_list or maybe_filter_out_long_prompts_accepts_image_path' Tested: PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray --with qwen-vl-utils pytest tests/utils/test_audio_input_support_on_cpu.py -q Tested: git diff --check Not-tested: Full repository test suite. Co-authored-by: OpenAI Codex <codex@openai.com>
c053bfd to
ddf6dec
Compare
What does this PR do?
Follow-up to #4894 and #6872.
This PR addresses the maintainer feedback to make the overlong prompt filter picklable under
Dataset.filter(..., num_proc=N)instead of falling back to single-process filtering.The previous implementation passed a nested
lambda/doc2lenclosure to Hugging Face Datasets. In the multimodal path that closure capturedselfand the processor instance, so processors containing non-picklable state such asssl.SSLContextcould fail before multiprocessing workers processed any examples.This change routes prompt-length filtering through module-level callables and picklable
fn_kwargs. For multiprocessing, tokenizer/processor paths are passed to workers and loaded lazily per worker process with a small process-local cache. Single-process filtering keeps using the already-created tokenizer/processor object, and pathless tokenizer/processor fallbacks now explicitly disable multiprocessing so live objects are not sent throughnum_procworkers.This is not duplicating an existing PR: duplicate-work checks found no open PRs for #4894 or the area keywords.
AI assistance was used to prepare this change; the submitting human should review every changed line and the test evidence before merge.
Checklist Before Starting
[{modules}] {type}: {description}Test
uvx pre-commit run ruff --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py uvx pre-commit run ruff-format --files verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py python -m compileall -q verl/utils/dataset/rl_dataset.py verl/utils/tokenizer/tokenizer.py tests/utils/dataset/test_rl_dataset_on_cpu.py PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray pytest tests/utils/dataset/test_rl_dataset_on_cpu.py -q -k 'picklable_multiprocess_processor_filter or disables_multiprocessing_without_processor_path or disables_multiprocessing_without_tokenizer_path or build_messages_accepts_image_path or build_messages_accepts_video_path_or_frame_list or maybe_filter_out_long_prompts_accepts_image_path' PYTHONPATH=. uv run --no-project --with pytest --with omegaconf --with pillow --with torch --with datasets --with transformers --with tensordict --with ray --with qwen-vl-utils pytest tests/utils/test_audio_input_support_on_cpu.py -q git diff --checkAlso ran a local
Dataset.from_list(...).filter(num_proc=2)smoke test with a fake processor holdingssl.create_default_context(). The filter completed with multiprocessing enabled and kept the expected short prompt.API and Usage Example
No public API change. Existing
data.filter_overlong_prompts_workersbehavior is preserved; when multiprocessing is configured, the filter callable and kwargs are now picklable instead of capturingRLHFDatasetor processor objects.Design & Code Changes
fn_kwargssoDataset.filter(num_proc=...)can pickle the callable and arguments.num_procis disabled.name_or_pathon processors created byhf_processor()so worker processes can reconstruct the processor.Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwayspre-commitruff and ruff-format hooks on the changed files listed above.tests/utils/dataset/test_rl_dataset_on_cpu.py.ci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main.