Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions torchbase/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,32 @@ def compress_stream(file_obj):
# Main running method
#

def _strategy_callback(ctx, param, value):
"""Callback to mark when strategy is explicitly set."""
ctx.ensure_object(dict)
# Check if the parameter came from user input (not default)
if hasattr(ctx, 'get_parameter_source'):
source = ctx.get_parameter_source(param.name)
if source and source.name == 'COMMANDLINE':
ctx.obj['_strategy_explicit'] = True
return value


@cli.command("run", context_settings=dict(ignore_unknown_options=True, allow_extra_args=True))
@click.option("--cromwell-opts", "cromwell_options", nargs=1, default="", type=click.STRING)
@torch
@click.option("-m", "--method", nargs=1, default="main", type=click.STRING)
@click.option("--workflow", default=None, help="Override workflow torch (namespace/name format)")
@click.option("-o", "--output", default=None, help="Output file for results")
@click.option(
"--strategy",
type=click.Choice(['fast', 'balanced', 'sensitive']),
default='balanced',
callback=_strategy_callback,
is_eager=True,
help="Typing strategy (default=balanced): fast (MinHash only), "
"balanced (MinHash+alignment), sensitive (full alignment). "
"Cannot be used with embedded workflows.")
@ReadsParam("-c", "--contigs")
@ReadsParam("-r", "--reads")
@ReadsParam("-pe1", "--paired1", "--pe1")
Expand All @@ -186,7 +206,7 @@ def compress_stream(file_obj):
@ReadsParam("-l", "--longreads")
@click.argument('torch_args', nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def _run(clx, torch, cromwell_options="", method="main", workflow=None, output=None, contigs=None, reads=None, paired1=None, paired2=None, interlaced=None, longreads=None, torch_args=[]):
def _run(clx, torch, cromwell_options="", method="main", workflow=None, output=None, strategy='balanced', contigs=None, reads=None, paired1=None, paired2=None, interlaced=None, longreads=None, torch_args=[]):
"Run the selected torch."
from torchbase.torchfs import Torch
from torchbase.registry import RegistryManager
Expand All @@ -203,8 +223,18 @@ def _run(clx, torch, cromwell_options="", method="main", workflow=None, output=N
# Load data torch
data_torch = Torch.load(torch)

# Determine workflow to use
workflow_torch = data_torch
# Check for conflict: --strategy cannot be used with embedded workflows
# Check if user explicitly specified --strategy via the callback flag
user_specified_strategy = clx.obj.get('_strategy_explicit', False) if clx.obj else False

if user_specified_strategy and data_torch.workflow:
raise click.ClickException(
"Cannot use --strategy with torch-embedded workflows. "
"The torch already has a custom workflow (main.wdl) defined."
)

# Determine workflow file to use
workflow_file = None

if workflow:
# User specified custom workflow
Expand All @@ -213,31 +243,57 @@ def _run(clx, torch, cromwell_options="", method="main", workflow=None, output=N
try:
workflow_path = manager.fetch_torch(workflow)
workflow_torch = Torch.load(workflow_path)
workflow_file = workflow_torch.workflow
except Exception as e:
raise click.ClickException(f"Failed to fetch workflow {workflow}: {str(e)}")
elif not data_torch.workflow:
# No workflow in data torch, try default
elif data_torch.workflow:
# Torch has embedded workflow
workflow_file = data_torch.workflow
elif user_specified_strategy:
# User explicitly specified --strategy, use built-in workflow
strategy_to_workflow = {
'fast': 'fast_typing.wdl',
'balanced': 'balanced_typing.wdl',
'sensitive': 'sensitive_typing.wdl',
}
workflow_filename = strategy_to_workflow.get(strategy)
if not workflow_filename:
raise click.ClickException(f"Unknown strategy: {strategy}")

# Resolve workflow path relative to torchbase package
import torchbase
torchbase_dir = Path(torchbase.__file__).parent
builtin_workflow = torchbase_dir / 'workflows' / 'builtin' / workflow_filename

if not builtin_workflow.exists():
raise click.ClickException(
f"Built-in workflow not found: {builtin_workflow}"
)

workflow_file = builtin_workflow
else:
# No --strategy specified and torch has no workflow
# Try default workflow for backward compatibility
try:
config = RegistryConfig.load()
manager = RegistryManager(config)
default_workflow_path = manager.fetch_torch("torchbase/default-workflow")
workflow_torch = Torch.load(default_workflow_path)
workflow_file = workflow_torch.workflow
except Exception as e:
raise click.ClickException(
f"Workflow not found in torch and default workflow fetch failed: {str(e)}"
)

# Validate workflow exists and is named main.wdl
if not workflow_torch.workflow:
raise click.ClickException("No workflow found (main.wdl) in torch")
# Validate workflow exists
if not workflow_file:
raise click.ClickException("No workflow found")

if workflow_torch.workflow.name != "main.wdl":
raise click.ClickException(
f"Workflow must be named 'main.wdl', found: {workflow_torch.workflow.name}"
)
if isinstance(workflow_file, str):
workflow_file = Path(workflow_file)

# Build miniwdl command
miniwdl_cmd = ['miniwdl', 'run', str(workflow_torch.workflow)]
miniwdl_cmd = ['miniwdl', 'run', str(workflow_file)]

# Add input files
if contigs:
Expand Down
Loading
Loading