Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def process(self, _):
return [
SampleTask(
data=pd.DataFrame({"text": ["Hello world", "Test sentence"]}),
task_id="1",
dataset_name="test",
)
]
Expand Down
4 changes: 0 additions & 4 deletions tutorials/audio/callhome_diar/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def _load_task(path: Path) -> AudioTask:
"""Reconstruct a single AudioTask from a checkpoint file."""
payload = json.loads(path.read_text())
return AudioTask(
task_id=payload["task_id"],
dataset_name=payload["dataset_name"],
data=payload["data"],
_metadata=payload.get("_metadata", {}),
Expand Down Expand Up @@ -159,7 +158,6 @@ def process(self, task: _EmptyTask) -> list[AudioTask]: # noqa: ARG002
tasks.append(
AudioTask(
data={self.filepath_key: str(wav), "session_name": fid},
task_id=f"callhome_{fid}",
dataset_name="callhome_eng0",
)
)
Expand Down Expand Up @@ -196,7 +194,6 @@ def process(self, task: AudioTask) -> AudioTask:
output_data = dict(task.data)
output_data[self.filepath_key] = self._ensure_mono(task.data[self.filepath_key])
return AudioTask(
task_id=task.task_id,
dataset_name=task.dataset_name,
data=output_data,
_metadata=task._metadata,
Expand Down Expand Up @@ -237,7 +234,6 @@ def process(self, task: AudioTask) -> AudioTask:
metrics = self._compute_der(gt, output_data[self.diar_segments_key], uem_start, uem_end)
output_data[self.der_metrics_key] = metrics
return AudioTask(
task_id=task.task_id,
dataset_name=task.dataset_name,
data=output_data,
_metadata=task._metadata,
Expand Down
3 changes: 0 additions & 3 deletions tutorials/audio/single_speaker_filter/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ def _load_task(path: Path) -> AudioTask | FileGroupTask:
payload = json.loads(path.read_text())
if payload.get("_task_type") == "FileGroupTask":
return FileGroupTask(
task_id=payload["task_id"],
dataset_name=payload["dataset_name"],
data=payload["data"],
_metadata=payload.get("_metadata", {}),
reader_config=payload.get("reader_config", {}),
)
return AudioTask(
task_id=payload["task_id"],
dataset_name=payload["dataset_name"],
data=payload["data"],
_metadata=payload.get("_metadata", {}),
Expand Down Expand Up @@ -153,7 +151,6 @@ def process_batch(self, tasks: list[AudioTask]) -> list[AudioTask]:
output_data["num_speakers"] = 1
results.append(
AudioTask(
task_id=task.task_id,
dataset_name=task.dataset_name,
data=output_data,
_metadata=task._metadata,
Expand Down
1 change: 0 additions & 1 deletion tutorials/math/1_cc_index_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def process(self, task: FileGroupTask) -> FileGroupTask:
logger.debug(f"Processed {len(task.data)} files: {total_input:,} -> {total_matched:,} rows")

return FileGroupTask(
task_id=task.task_id,
dataset_name=task.dataset_name,
data=output_files,
_metadata={
Expand Down
3 changes: 1 addition & 2 deletions tutorials/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def process(self, _: _EmptyTask) -> SampleTask:
tasks.append(
SampleTask(
data=pd.DataFrame({"sentence": sampled_sentences}),
task_id=random.randint(0, 1000000), # noqa: S311
dataset_name="SampleDataset",
)
)
Expand Down Expand Up @@ -203,7 +202,7 @@ def process_batch(self, tasks: list[SampleTask]) -> list[SampleTask]:
new_data = task.data.copy()
new_data["sentiment"] = task_sentiments

result_task = SampleTask(data=new_data, task_id=task.task_id, dataset_name=task.dataset_name)
result_task = SampleTask(data=new_data, dataset_name=task.dataset_name)
result_tasks.append(result_task)

sentence_idx += num_sentences
Expand Down
3 changes: 1 addition & 2 deletions tutorials/slurm/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,11 @@ def outputs(self) -> tuple[list[str], list[str]]:

def process(self, _: _EmptyTask) -> list[SampleTask]:
tasks = []
for i in range(self.num_tasks):
for _ in range(self.num_tasks):
sentences = random.choices(SAMPLE_SENTENCES, k=self.sentences_per_task) # noqa: S311
tasks.append(
SampleTask(
data=pd.DataFrame({"sentence": sentences}),
task_id=f"task_{i:04d}",
dataset_name="slurm_demo",
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,13 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915
input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)]
input_tasks = []
id_counter = 0
for i in range(num_input_tasks // len(input_batches)):
for j, batch in enumerate(input_batches):
for _ in range(num_input_tasks // len(input_batches)):
for batch in input_batches:
df = pd.DataFrame(batch)
df["id"] = [id_counter + k for k in range(len(df))]
id_counter += len(df)
input_task = DocumentBatch(
data=df,
task_id=f"input_batch_{i * batch_size + j}",
dataset_name="data_for_sdg",
)
input_tasks.append(input_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,10 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915
batch_size = 5
input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)]
input_tasks = []
for i, batch in enumerate(input_batches):
for batch in input_batches:
df = pd.DataFrame(batch)
input_task = DocumentBatch(
data=df,
task_id=f"input_batch_{i}",
dataset_name="data_for_sdg",
)
input_tasks.append(input_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,14 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915
input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)]
input_tasks = []
id_counter = 0
for i in range(num_input_tasks // len(input_batches)):
for j, batch in enumerate(input_batches):
for _ in range(num_input_tasks // len(input_batches)):
for batch in input_batches:
df = pd.DataFrame(batch)
# Ensure a stable document identifier required by DocumentJoiner
df["id"] = [id_counter + j for j in range(len(df))]
id_counter += len(df)
input_task = DocumentBatch(
data=df,
task_id=f"input_batch_{i * batch_size + j}",
dataset_name="data_for_sdg",
)
input_tasks.append(input_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,10 @@ def main() -> None: # noqa: C901, PLR0912, PLR0915
batch_size = 5
input_batches = [input_data[i : i + batch_size] for i in range(0, len(input_data), batch_size)]
input_tasks = []
for i, batch in enumerate(input_batches):
for batch in input_batches:
df = pd.DataFrame(batch)
input_task = DocumentBatch(
data=df,
task_id=f"input_batch_{i}",
dataset_name="data_for_sdg",
)
input_tasks.append(input_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch | None:

# Create output batch
return DocumentBatch(
task_id=f"{batch.task_id}_{self.name}",
dataset_name=batch.dataset_name,
data=df,
_metadata=batch._metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch:
df_filtered = df[mask]

return DocumentBatch(
task_id=batch.task_id,
dataset_name=batch.dataset_name,
data=df_filtered,
_metadata=batch._metadata,
Expand Down Expand Up @@ -179,7 +178,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch:
df_filtered = df[mask]

return DocumentBatch(
task_id=batch.task_id,
dataset_name=batch.dataset_name,
data=df_filtered,
_metadata=batch._metadata,
Expand Down Expand Up @@ -256,7 +254,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch:
df_filtered = df[mask]

return DocumentBatch(
task_id=batch.task_id,
dataset_name=batch.dataset_name,
data=df_filtered,
_metadata=batch._metadata,
Expand Down Expand Up @@ -350,7 +347,6 @@ def process(self, batch: DocumentBatch) -> DocumentBatch:
)

return DocumentBatch(
task_id=batch.task_id,
dataset_name=batch.dataset_name,
data=df,
_metadata=batch._metadata,
Expand Down
1 change: 0 additions & 1 deletion tutorials/text/nemotron-climb-data-curation/3_prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ def process(self, task: DocumentBatch) -> FileGroupTask:

# Create FileGroupTask with written files using the full protocol-prefixed path
return FileGroupTask(
task_id=task.task_id,
dataset_name=task.dataset_name,
data=[file_path_with_protocol],
_metadata={
Expand Down
Loading