Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,65 @@ name,reads1,reads2,project,organism,cell_type,source,source__annotation
$ flowbio samples batch-template --sample-type rna_seq --json
[{"name": "name", "kind": "reserved", "required": true, "options": null, "description": "..."}, ...]
```

### `samples upload-batch`

Upload many samples from a filled-in CSV sample sheet, applying one sample type
to every row.

```
flowbio samples upload-batch --sheet PATH --sample-type TYPE
[--skip-invalid] [--stop-on-error]
```

Run `flowbio samples upload-batch --help` for the full option list. The sheet is
the CSV produced by `samples batch-template` (see that command for the schema);
it must be a `.csv` — an `.xlsx` or `.tsv` is a usage error directing you to
export to CSV. Reads paths in the sheet are resolved relative to the **sheet
file's own directory** (absolute paths are used as-is), and empty cells are
omitted rather than sent as empty values. The sample type is sent as-is and
validated server-side.

**Validation is up front.** Every row is validated *before any upload*, and all
problems on a row are reported together — a missing `name`/`reads1`, a reads file
that is not on disk, a name containing spaces, a value outside a closed-option
attribute's allowed values, metadata required for the chosen type that is
missing, or a `<identifier>__annotation` companion set without its value or on an
attribute that does not permit annotations.

- By default, **any** invalid row aborts the whole run: every row's errors are
reported (with its 1-based row number and name), nothing is uploaded, exit `2`.
- `--skip-invalid` skips the invalid rows (reporting why) and uploads the rest.

Valid rows upload **sequentially in sheet order**. By default a row that fails to
upload is recorded and the run continues; `--stop-on-error` aborts on the first
failing row, reporting the rows already uploaded.

**Output** — human: each row's outcome on stderr, then a final counts summary on
stdout. `--json`: a single document on stdout with `uploaded`, `failed`, and
`skipped` lists plus a `counts` summary:

```json
{
"uploaded": [{"row_number": 1, "name": "s1", "sample_id": "samp_1"}],
"failed": [{"row_number": 2, "name": "s2", "message": "..."}],
"skipped": [{"row_number": 3, "name": "s3", "reasons": ["..."]}],
"counts": {"uploaded": 1, "failed": 1, "skipped": 1}
}
```

**Exit codes** — `0` every row uploaded; `2` a pre-flight validation failure
(without `--skip-invalid`) or a non-CSV sheet; `1` any upload failure; `3`
authentication failure; otherwise the standard mapping above.

**Example**

```bash
$ flowbio samples upload-batch --sheet ./samples.csv --sample-type rna_seq
Row 1 (liver_r1): uploaded samp_1
Row 2 (liver_r2): uploaded samp_2
Uploaded 2, failed 0, skipped 0.

$ flowbio samples upload-batch --sheet ./samples.csv --sample-type rna_seq --json
{"uploaded": [{"row_number": 1, "name": "liver_r1", "sample_id": "samp_1"}], "failed": [], "skipped": [], "counts": {"uploaded": 1, "failed": 0, "skipped": 0}}
```
179 changes: 178 additions & 1 deletion flowbio/cli/_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@
from flowbio.cli._exit_codes import CliUsageError, ExitCode
from flowbio.cli._files import existing_file
from flowbio.cli._output import Output, format_issue
from flowbio.cli._sheet import (
ANNOTATION_SUFFIX,
SheetRow,
parse_sheet,
validate_row,
)
from flowbio.cli._types import JsonValue
from flowbio.v2.client import Client
from flowbio.v2.exceptions import FlowApiError
from flowbio.v2.samples import MetadataAttribute, SampleTypeId


Expand Down Expand Up @@ -60,6 +67,15 @@ def register(
"--json) for use with 'samples upload-batch'."
),
))
_configure_upload_batch(verbs.add_parser(
"upload-batch",
parents=[global_parent],
help="Upload many samples from a CSV sample sheet.",
description=(
"Validate every row of a CSV sample sheet up front, then upload the "
"valid rows sequentially, reporting each row's outcome."
),
))


def _configure_upload(upload: argparse.ArgumentParser) -> None:
Expand Down Expand Up @@ -187,6 +203,36 @@ def _configure_batch_template(batch_template: argparse.ArgumentParser) -> None:
)


def _configure_upload_batch(upload_batch: argparse.ArgumentParser) -> None:
upload_batch.set_defaults(
command_parser=upload_batch, handler=_upload_batch_command,
)
upload_batch.add_argument(
"--sheet",
required=True,
metavar="PATH",
type=Path,
help="CSV sample sheet (the filled-in `samples batch-template` output).",
)
upload_batch.add_argument(
"--sample-type",
required=True,
metavar="TYPE",
type=SampleTypeId,
help="Sample type applied to every row (sent as-is; validated server-side).",
)
upload_batch.add_argument(
"--skip-invalid",
action="store_true",
help="Skip invalid rows (reporting why) instead of aborting the batch.",
)
upload_batch.add_argument(
"--stop-on-error",
action="store_true",
help="Abort on the first row that fails to upload.",
)


def _upload_command(args: argparse.Namespace, client: Client, output: Output) -> ExitCode:
"""Upload a single sample and report its identifier.

Expand Down Expand Up @@ -362,7 +408,7 @@ def _template_columns(
if attribute.allow_annotation:
columns.append(
_TemplateColumn(
name=f"{attribute.identifier}__annotation",
name=f"{attribute.identifier}{ANNOTATION_SUFFIX}",
kind="annotation",
required=False,
options=None,
Expand All @@ -381,6 +427,137 @@ def _required_summary(columns: list[_TemplateColumn]) -> str:
)


@dataclass(frozen=True)
class _BatchResult:
"""The outcome of an ``upload-batch`` run, rendered to text or JSON."""

uploaded: list[dict[str, JsonValue]]
failed: list[dict[str, JsonValue]]
skipped: list[dict[str, JsonValue]]

@property
def counts(self) -> dict[str, int]:
return {
"uploaded": len(self.uploaded),
"failed": len(self.failed),
"skipped": len(self.skipped),
}

@property
def document(self) -> dict[str, JsonValue]:
return {
"uploaded": self.uploaded,
"failed": self.failed,
"skipped": self.skipped,
"counts": self.counts,
}

@property
def summary(self) -> str:
counts = self.counts
return (
f"Uploaded {counts['uploaded']}, failed {counts['failed']}, "
f"skipped {counts['skipped']}."
)

@property
def exit_code(self) -> ExitCode:
return ExitCode.RUNTIME if self.failed else ExitCode.SUCCESS


def _upload_batch_command(
args: argparse.Namespace, client: Client, output: Output,
) -> ExitCode:
"""Validate a sample sheet up front, then upload the valid rows.

:param args: Parsed command-line arguments.
:param client: The authenticated Flow client.
:param output: The result/error renderer.
:returns: :attr:`ExitCode.SUCCESS` when every row uploaded,
:attr:`ExitCode.USAGE` on a pre-flight validation failure without
``--skip-invalid``, or :attr:`ExitCode.RUNTIME` if any upload failed.
"""
sheet = parse_sheet(args.sheet)
attributes = client.samples.get_metadata_attributes()
classified = [
(row, validate_row(row, attributes, args.sample_type))
for row in sheet.rows
]
invalid = [(row, reasons) for row, reasons in classified if reasons]
valid = [row for row, reasons in classified if not reasons]

if invalid and not args.skip_invalid:
output.emit_error(
"Sample sheet has invalid rows; nothing was uploaded.",
details=[_invalid_line(row, reasons) for row, reasons in invalid],
)
return ExitCode.USAGE

for row, reasons in invalid:
output.emit_advisory(f"Skipped {_invalid_line(row, reasons)}")
skipped = [
{"row_number": row.row_number, "name": row.name, "reasons": reasons}
for row, reasons in invalid
]
result = _upload_rows(valid, args, client, output, skipped)
output.emit_result(result.summary, result.document)
return result.exit_code


def _upload_rows(
rows: list[SheetRow],
args: argparse.Namespace,
client: Client,
output: Output,
skipped: list[dict[str, JsonValue]],
) -> _BatchResult:
uploaded: list[dict[str, JsonValue]] = []
failed: list[dict[str, JsonValue]] = []
for row in rows:
try:
sample = client.samples.upload_sample(
name=row.name,
sample_type=args.sample_type,
data=_row_reads(row),
metadata=row.metadata or None,
project_id=row.project,
organism_id=row.organism,
)
except FlowApiError as error:
failed.append({
"row_number": row.row_number,
"name": row.name,
"message": error.message,
})
output.emit_advisory(
f"Row {row.row_number} ({row.name}): upload failed — {error.message}",
)
if args.stop_on_error:
break
continue
uploaded.append({
"row_number": row.row_number,
"name": row.name,
"sample_id": sample.id,
})
output.emit_advisory(
f"Row {row.row_number} ({row.name}): uploaded {sample.id}",
)
return _BatchResult(uploaded=uploaded, failed=failed, skipped=skipped)


def _row_reads(row: SheetRow) -> dict[str, Path]:
return {
label: path
for label, path in (("reads1", row.reads1), ("reads2", row.reads2))
if path is not None
}


def _invalid_line(row: SheetRow, reasons: list[str]) -> str:
return f"Row {row.row_number} ({row.name}): {'; '.join(reasons)}"


def _merge_metadata(
pairs: list[str] | None, json_text: str | None,
) -> dict[str, str]:
Expand Down
Loading
Loading