From e83798c4d3960a2966302135b452a34e7f6a0bff Mon Sep 17 00:00:00 2001 From: Abhinav Garg Date: Mon, 8 Jun 2026 10:22:09 -0700 Subject: [PATCH] Strip user-set task_id from tutorials & getting-started script PR #2036 made Task.task_id init=False (framework-owned, assigned by the executor adapter), but the tutorials/ examples and the getting-started verify script still passed task_id= to Task constructors (FileGroupTask, DocumentBatch, AudioTask, SampleTask), so they crash with: TypeError: __init__() got an unexpected keyword argument 'task_id' (reported for tutorials/math/1_cc_index_lookup.py). Remove the task_id= kwarg at every construction site; the framework assigns the id. Where a loop index existed only to build the removed task_id, drop it (for _ / for batch in ...). Read-only uses of task_id (logging, a hash seed, the audio checkpoint payload dict) are left as-is. Co-Authored-By: Claude Opus 4.8 (1M context) Signed-off-by: Abhinav Garg --- .../skills/getting-started/scripts/verify_pipeline_cpu.py | 1 - tutorials/audio/callhome_diar/run.py | 4 ---- tutorials/audio/single_speaker_filter/run.py | 3 --- tutorials/math/1_cc_index_lookup.py | 1 - tutorials/quickstart.py | 3 +-- tutorials/slurm/pipeline.py | 3 +-- .../nemotron_cc_sdg_high_quality_example_pipeline.py | 5 ++--- .../nemotron_cc_sdg_low_quality_example_pipeline.py | 3 +-- .../nemotron_cc_sdg_high_quality_example_pipeline.py | 5 ++--- .../nemotron_cc_sdg_low_quality_example_pipeline.py | 3 +-- tutorials/text/gliner-pii-redaction/gliner_pii_redactor.py | 1 - .../llama-nemotron-data-curation/filters/model_filters.py | 4 ---- tutorials/text/nemotron-climb-data-curation/3_prune.py | 1 - 13 files changed, 8 insertions(+), 29 deletions(-) diff --git a/.claude/skills/getting-started/scripts/verify_pipeline_cpu.py b/.claude/skills/getting-started/scripts/verify_pipeline_cpu.py index 0614d54f01..6496e3145b 100644 --- a/.claude/skills/getting-started/scripts/verify_pipeline_cpu.py +++ b/.claude/skills/getting-started/scripts/verify_pipeline_cpu.py @@ -38,7 +38,6 @@ def process(self, _): return [ SampleTask( data=pd.DataFrame({"text": ["Hello world", "Test sentence"]}), - task_id="1", dataset_name="test", ) ] diff --git a/tutorials/audio/callhome_diar/run.py b/tutorials/audio/callhome_diar/run.py index 91f48f6bb0..ef66ede42d 100644 --- a/tutorials/audio/callhome_diar/run.py +++ b/tutorials/audio/callhome_diar/run.py @@ -89,7 +89,6 @@ def _load_task(path: Path) -> AudioTask: """Reconstruct a single AudioTask from a checkpoint file.""" payload = json.loads(path.read_text()) return AudioTask( - task_id=payload["task_id"], dataset_name=payload["dataset_name"], data=payload["data"], _metadata=payload.get("_metadata", {}), @@ -159,7 +158,6 @@ def process(self, task: _EmptyTask) -> list[AudioTask]: # noqa: ARG002 tasks.append( AudioTask( data={self.filepath_key: str(wav), "session_name": fid}, - task_id=f"callhome_{fid}", dataset_name="callhome_eng0", ) ) @@ -196,7 +194,6 @@ def process(self, task: AudioTask) -> AudioTask: output_data = dict(task.data) output_data[self.filepath_key] = self._ensure_mono(task.data[self.filepath_key]) return AudioTask( - task_id=task.task_id, dataset_name=task.dataset_name, data=output_data, _metadata=task._metadata, @@ -237,7 +234,6 @@ def process(self, task: AudioTask) -> AudioTask: metrics = self._compute_der(gt, output_data[self.diar_segments_key], uem_start, uem_end) output_data[self.der_metrics_key] = metrics return AudioTask( - task_id=task.task_id, dataset_name=task.dataset_name, data=output_data, _metadata=task._metadata, diff --git a/tutorials/audio/single_speaker_filter/run.py b/tutorials/audio/single_speaker_filter/run.py index 1949697a02..e48103d6cb 100644 --- a/tutorials/audio/single_speaker_filter/run.py +++ b/tutorials/audio/single_speaker_filter/run.py @@ -93,14 +93,12 @@ def _load_task(path: Path) -> AudioTask | FileGroupTask: payload = json.loads(path.read_text()) if payload.get("_task_type") == "FileGroupTask": return FileGroupTask( - task_id=payload["task_id"], dataset_name=payload["dataset_name"], data=payload["data"], _metadata=payload.get("_metadata", {}), reader_config=payload.get("reader_config", {}), ) return AudioTask( - task_id=payload["task_id"], dataset_name=payload["dataset_name"], data=payload["data"], _metadata=payload.get("_metadata", {}), @@ -153,7 +151,6 @@ def process_batch(self, tasks: list[AudioTask]) -> list[AudioTask]: output_data["num_speakers"] = 1 results.append( AudioTask( - task_id=task.task_id, dataset_name=task.dataset_name, data=output_data, _metadata=task._metadata, diff --git a/tutorials/math/1_cc_index_lookup.py b/tutorials/math/1_cc_index_lookup.py index 35d76afbf4..fdeea87158 100644 --- a/tutorials/math/1_cc_index_lookup.py +++ b/tutorials/math/1_cc_index_lookup.py @@ -121,7 +121,6 @@ def process(self, task: FileGroupTask) -> FileGroupTask: logger.debug(f"Processed {len(task.data)} files: {total_input:,} -> {total_matched:,} rows") return FileGroupTask( - task_id=task.task_id, dataset_name=task.dataset_name, data=output_files, _metadata={ diff --git a/tutorials/quickstart.py b/tutorials/quickstart.py index 5cf2ade2fa..f027e9fbed 100644 --- a/tutorials/quickstart.py +++ b/tutorials/quickstart.py @@ -84,7 +84,6 @@ def process(self, _: _EmptyTask) -> SampleTask: tasks.append( SampleTask( data=pd.DataFrame({"sentence": sampled_sentences}), - task_id=random.randint(0, 1000000), # noqa: S311 dataset_name="SampleDataset", ) ) @@ -203,7 +202,7 @@ def process_batch(self, tasks: list[SampleTask]) -> list[SampleTask]: new_data = task.data.copy() new_data["sentiment"] = task_sentiments - result_task = SampleTask(data=new_data, task_id=task.task_id, dataset_name=task.dataset_name) + result_task = SampleTask(data=new_data, dataset_name=task.dataset_name) result_tasks.append(result_task) sentence_idx += num_sentences diff --git a/tutorials/slurm/pipeline.py b/tutorials/slurm/pipeline.py index 189107f629..7866559e44 100644 --- a/tutorials/slurm/pipeline.py +++ b/tutorials/slurm/pipeline.py @@ -114,12 +114,11 @@ def outputs(self) -> tuple[list[str], list[str]]: def process(self, _: _EmptyTask) -> list[SampleTask]: tasks = [] - for i in range(self.num_tasks): + for _ in range(self.num_tasks): sentences = random.choices(SAMPLE_SENTENCES, k=self.sentences_per_task) # noqa: S311 tasks.append( SampleTask( data=pd.DataFrame({"sentence": sentences}), - task_id=f"task_{i:04d}", dataset_name="slurm_demo", ) ) diff --git a/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_high_quality_example_pipeline.py b/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_high_quality_example_pipeline.py index 7f7a6e49be..785bfd9410 100644 --- a/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_high_quality_example_pipeline.py +++ b/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_high_quality_example_pipeline.py @@ -371,14 +371,13 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915 input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)] input_tasks = [] id_counter = 0 - for i in range(num_input_tasks // len(input_batches)): - for j, batch in enumerate(input_batches): + for _ in range(num_input_tasks // len(input_batches)): + for batch in input_batches: df = pd.DataFrame(batch) df["id"] = [id_counter + k for k in range(len(df))] id_counter += len(df) input_task = DocumentBatch( data=df, - task_id=f"input_batch_{i * batch_size + j}", dataset_name="data_for_sdg", ) input_tasks.append(input_task) diff --git a/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_low_quality_example_pipeline.py b/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_low_quality_example_pipeline.py index 6e55edea70..3fecd99491 100644 --- a/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_low_quality_example_pipeline.py +++ b/tutorials/synthetic/nemotron_cc/nemo_data_designer/nemotron_cc_sdg_low_quality_example_pipeline.py @@ -292,11 +292,10 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915 batch_size = 5 input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)] input_tasks = [] - for i, batch in enumerate(input_batches): + for batch in input_batches: df = pd.DataFrame(batch) input_task = DocumentBatch( data=df, - task_id=f"input_batch_{i}", dataset_name="data_for_sdg", ) input_tasks.append(input_task) diff --git a/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_high_quality_example_pipeline.py b/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_high_quality_example_pipeline.py index 6e14ebd113..0fcd751a29 100644 --- a/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_high_quality_example_pipeline.py +++ b/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_high_quality_example_pipeline.py @@ -349,15 +349,14 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915 input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)] input_tasks = [] id_counter = 0 - for i in range(num_input_tasks // len(input_batches)): - for j, batch in enumerate(input_batches): + for _ in range(num_input_tasks // len(input_batches)): + for batch in input_batches: df = pd.DataFrame(batch) # Ensure a stable document identifier required by DocumentJoiner df["id"] = [id_counter + j for j in range(len(df))] id_counter += len(df) input_task = DocumentBatch( data=df, - task_id=f"input_batch_{i * batch_size + j}", dataset_name="data_for_sdg", ) input_tasks.append(input_task) diff --git a/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_low_quality_example_pipeline.py b/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_low_quality_example_pipeline.py index a66ec066dd..9d74978482 100644 --- a/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_low_quality_example_pipeline.py +++ b/tutorials/synthetic/nemotron_cc/nemotron_cc_sdg_low_quality_example_pipeline.py @@ -272,11 +272,10 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915 batch_size = 5 input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)] input_tasks = [] - for i, batch in enumerate(input_batches): + for batch in input_batches: df = pd.DataFrame(batch) input_task = DocumentBatch( data=df, - task_id=f"input_batch_{i}", dataset_name="data_for_sdg", ) input_tasks.append(input_task) diff --git a/tutorials/text/gliner-pii-redaction/gliner_pii_redactor.py b/tutorials/text/gliner-pii-redaction/gliner_pii_redactor.py index 2274381897..e1df7be738 100644 --- a/tutorials/text/gliner-pii-redaction/gliner_pii_redactor.py +++ b/tutorials/text/gliner-pii-redaction/gliner_pii_redactor.py @@ -178,7 +178,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch | None: # Create output batch return DocumentBatch( - task_id=f"{batch.task_id}_{self.name}", dataset_name=batch.dataset_name, data=df, _metadata=batch._metadata, diff --git a/tutorials/text/llama-nemotron-data-curation/filters/model_filters.py b/tutorials/text/llama-nemotron-data-curation/filters/model_filters.py index 2f44fed8bd..f337f9645e 100644 --- a/tutorials/text/llama-nemotron-data-curation/filters/model_filters.py +++ b/tutorials/text/llama-nemotron-data-curation/filters/model_filters.py @@ -97,7 +97,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch: df_filtered = df[mask] return DocumentBatch( - task_id=batch.task_id, dataset_name=batch.dataset_name, data=df_filtered, _metadata=batch._metadata, @@ -179,7 +178,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch: df_filtered = df[mask] return DocumentBatch( - task_id=batch.task_id, dataset_name=batch.dataset_name, data=df_filtered, _metadata=batch._metadata, @@ -256,7 +254,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch: df_filtered = df[mask] return DocumentBatch( - task_id=batch.task_id, dataset_name=batch.dataset_name, data=df_filtered, _metadata=batch._metadata, @@ -350,7 +347,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch: ) return DocumentBatch( - task_id=batch.task_id, dataset_name=batch.dataset_name, data=df, _metadata=batch._metadata, diff --git a/tutorials/text/nemotron-climb-data-curation/3_prune.py b/tutorials/text/nemotron-climb-data-curation/3_prune.py index e2ddd133b3..8730f23464 100644 --- a/tutorials/text/nemotron-climb-data-curation/3_prune.py +++ b/tutorials/text/nemotron-climb-data-curation/3_prune.py @@ -126,7 +126,6 @@ def process(self, task: DocumentBatch) -> FileGroupTask: # Create FileGroupTask with written files using the full protocol-prefixed path return FileGroupTask( - task_id=task.task_id, dataset_name=task.dataset_name, data=[file_path_with_protocol], _metadata={