diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cf4ee615..fbb30271 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 diff --git a/.github/workflows/test-spras.yml b/.github/workflows/test-spras.yml index db6d1b0f..b0877223 100644 --- a/.github/workflows/test-spras.yml +++ b/.github/workflows/test-spras.yml @@ -12,13 +12,13 @@ jobs: os: [macos-latest, windows-latest] steps: - name: Checkout repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Install conda environment - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v4 with: activate-environment: spras environment-file: environment.yml - auto-activate-base: false + auto-activate: false miniconda-version: 'latest' - name: Log conda environment # Log conda environment contents @@ -49,18 +49,22 @@ jobs: sudo docker image prune --all --force sudo docker builder prune -a - name: Install conda environment - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v4 with: activate-environment: spras environment-file: environment.yml - auto-activate-base: false + auto-activate: false miniconda-version: 'latest' - name: Install spras in conda env # Install spras in the environment using pip shell: bash --login {0} run: pip install . - - name: Log conda environment - # Log conda environment contents + - name: Get pipx + shell: bash --login {0} + run: pip install pipx + - shell: bash --login {0} + run: pipx install . + - name: Log conda environment contents shell: bash --login {0} run: conda list - name: Install Apptainer @@ -85,11 +89,11 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - name: Install conda environment - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v4 with: activate-environment: spras environment-file: environment.yml - auto-activate-base: false + auto-activate: false miniconda-version: 'latest' # Install spras in the environment using pip - name: Install spras in conda env @@ -100,7 +104,7 @@ jobs: # We enable high parallelization (cores 4) to test our way out of the experienced # race conditions from #268 and #279 # We also enforce strict DAG evaluation to catch DAG problems before they appear as user errors. (#359) - run: snakemake --cores 4 --configfile config/config.yaml --show-failed-logs --strict-dag-evaluation cyclic-graph --strict-dag-evaluation functions --strict-dag-evaluation periodic-wildcards + run: spras run --cores 4 --configfile config/config.yaml --show-failed-logs --strict-dag-evaluation cyclic-graph --strict-dag-evaluation functions --strict-dag-evaluation periodic-wildcards # Run pre-commit checks on source files pre-commit: @@ -110,11 +114,11 @@ jobs: - name: Checkout repository uses: actions/checkout@v3 - name: Install conda environment - uses: conda-incubator/setup-miniconda@v2 + uses: conda-incubator/setup-miniconda@v4 with: activate-environment: spras environment-file: environment.yml - auto-activate-base: false + auto-activate: false miniconda-version: 'latest' - name: Run pre-commit shell: bash --login {0} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fe010814..3b0581af 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,7 @@ # See https://pre-commit.com/hooks.html for more hooks default_language_version: # Match this to the version specified in environment.yml - python: python3.11 + python: python3.13 repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.4.0 # Use the ref you want to point at diff --git a/.readthedocs.yaml b/.readthedocs.yaml index cd6d3883..612c1d92 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -8,7 +8,7 @@ version: 2 build: os: ubuntu-22.04 tools: - python: "3.11" + python: "3.13" # Build documentation in the "docs/" directory with Sphinx sphinx: diff --git a/MANIFEST.in b/MANIFEST.in index 72dcf489..0afcd18f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,4 @@ +include README.md +include LICENSE +include Snakefile include spras/cgroup_wrapper.sh diff --git a/README.md b/README.md index 44895c76..21966070 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ After installing Docker, start Docker before running SPRAS. Once you have activated the conda environment and started Docker, you can run SPRAS with the example Snakemake workflow. From the root directory of the `spras` repository, run the command ``` -snakemake --cores 1 --configfile config/config.yaml +spras run --cores 1 --configfile config/config.yaml ``` This will run the SPRAS workflow with the example config file (`config/config.yaml`) and input files. Output files will be written to the `output` directory. diff --git a/Snakefile b/Snakefile index 3cd39ff8..4b14b8e1 100644 --- a/Snakefile +++ b/Snakefile @@ -1,12 +1,16 @@ import os from spras import runner import shutil +import json import yaml +from pathlib import Path +from spras.containers import TimeoutError from spras.dataset import Dataset from spras.evaluation import Evaluation from spras.analysis import ml, summary, cytoscape from spras.config.revision import detach_spras_revision import spras.config.config as _config +from spras.errors import mark_error, mark_success, is_error, TimeoutArtifactError, FailedDependencyError # Snakemake updated the behavior in the 6.5.0 release https://github.com/snakemake/snakemake/pull/1037 # and using the wrong separator prevents Snakemake from matching filenames to the rules that can produce them @@ -44,11 +48,14 @@ def algo_has_mult_param_combos(algo): algorithms_mult_param_combos = [algo for algo in algorithms if algo_has_mult_param_combos(algo)] +# Gets the associated parameter hash out of a params wildcard +def params_index(params_hash): + return params_hash.replace('params-', '') + # Get the parameter dictionary for the specified # algorithm and parameter combination hash def reconstruction_params(algorithm, params_hash): - index = params_hash.replace('params-', '') - return algorithm_params[algorithm][index] + return algorithm_params[algorithm][params_index(params_hash)] # Log the parameter dictionary for this parameter configuration in a yaml file def write_parameter_log(algorithm, param_label, logfile): @@ -262,33 +269,82 @@ def collect_prepared_input(wildcards): return prepared_inputs +def collect_dependent_artifact_info(wildcards): + # Get the associated runs that this run depends on + dependent_runs = _config.config.conditional_run_dependencies[params_index(wildcards.params)] + return [ + SEP.join([out_dir, f'{wildcards.dataset}-{wildcards.algorithm}-params-{params}', 'artifact-log.json']) + for params in dependent_runs + ] + +def filter_successful(files): + """Convenient function for filtering iterators by whether or not their items are error files.""" + return [file for file in files if not is_error(file)] + +def filter_error(files): + """This function is precisely described as the list difference of `files` and `filter_successful(files)`""" + return [file for file in files if is_error(file)] + # Run the pathway reconstruction algorithm rule reconstruct: - input: collect_prepared_input + input: + prepared_files=collect_prepared_input, + required_artifact_info=collect_dependent_artifact_info # Each reconstruct call should be in a separate output subdirectory that is unique for the parameter combination so # that multiple instances of the container can run simultaneously without overwriting the output files # Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the # same name regardless of the inputs or parameters, and these aren't renamed until after the container command # terminates - output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']) + output: + pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + # Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors. + artifact_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'artifact-log.json']) + params: + # Get the timeout from the config and use it as an input. + # TODO: This has unexpected behavior when this rule succeeds but the timeout extends, + # making this rule run again. + timeout = lambda wildcards: _config.config.algorithm_param_run_settings[params_index(wildcards.params)].timeout run: + successful_runs = filter_successful(input.required_artifact_info) + errorful_runs = filter_error(input.required_artifact_info) + # NOTE: conditionals happen as a big OR statement, so we are looking for at least one success. + if len(successful_runs) == 0 and len(errorful_runs) != 0: + # We don't raise the error here (analogous to `--keep-going`, except we avoid unnecessarily re-running this rule.) + mark_error(output.artifact_info, FailedDependencyError(failing_dependencies=errorful_runs)) + # and we touch pathway_file still: Snakemake doesn't have optional files, so we output a 'artifact info' file, + # which contains the status (success/failure) of specific Snakemake jobs. + # We filter for the successful files (such as ones that didn't time out) with the `filter_successful` function. + Path(output.pathway_file).touch() + # Create a copy so that the updates are not written to the parameters logfile - params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() + algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() # Declare the input files as a dictionary. - inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input}, strict=True)) + inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input.prepared_files}, strict=True)) # Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml - if '_spras_run_name' in params: - params.pop('_spras_run_name') - runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, params, container_settings) + if '_spras_run_name' in algorithm_params: + algorithm_params.pop('_spras_run_name') + try: + runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, algorithm_params, container_settings, params.timeout) + mark_success(output.artifact_info) + except TimeoutError as err: + # See the above notes on conditional runs for precisely why we write this as is. + mark_error(output.artifact_info, TimeoutArtifactError(duration=params.timeout)) + Path(output.pathway_file).touch() # Original pathway reconstruction output to universal output # Use PRRunner as a wrapper to call the algorithm-specific parse_output rule parse_output: input: - raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']), + # We propagate up the artifact_info error if it exists. + artifact_info = rules.reconstruct.output.artifact_info, + raw_file = rules.reconstruct.output.pathway_file, dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle']) output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt']) run: + if is_error(input.artifact_info): + mark_error(output.standardized_file) + return + params = reconstruction_params(wildcards.algorithm, wildcards.params).copy() params['dataset'] = input.dataset_file runner.parse_output(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), input.raw_file, output.standardized_file, params) @@ -310,7 +366,7 @@ rule viz_cytoscape: output: session = SEP.join([out_dir, '{dataset}-cytoscape.cys']) run: - cytoscape.run_cytoscape(input.pathways, output.session, container_settings) + cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings) # Write a single summary table for all pathways for each dataset @@ -323,7 +379,7 @@ rule summary_table: run: # Load the node table from the pickled dataset file node_table = Dataset.from_file(input.dataset_file).node_table - summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params) + summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params) summary_df.to_csv(output.summary_table, sep='\t', index=False) # Cluster the output pathways for each dataset @@ -339,7 +395,7 @@ rule ml_analysis: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -353,7 +409,7 @@ rule jaccard_similarity: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) @@ -364,7 +420,7 @@ rule ensemble: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Returns all pathways for a specific algorithm @@ -386,7 +442,7 @@ rule ml_analysis_aggregate_algo: hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']), hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']), run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params) ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params) ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params) @@ -398,7 +454,7 @@ rule ensemble_per_algo: output: ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.ensemble_network(summary_df, output.ensemble_network_file) # Calculated Jaccard similarity between output pathways for each dataset per algorithm @@ -409,7 +465,7 @@ rule jaccard_similarity_per_algo: jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']), jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png']) run: - summary_df = ml.summarize_networks(input.pathways) + summary_df = ml.summarize_networks(filter_successful(input.pathways)) ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap) # Return the gold standard pickle file for a specific gold standard @@ -440,7 +496,7 @@ rule evaluation_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png) # Returns all pathways for a specific algorithm and dataset @@ -459,7 +515,7 @@ rule evaluation_per_algo_pr_per_pathways: node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']), run: node_table = Evaluation.from_file(input.node_gold_standard_file).node_table - pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table) + pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table) Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval) # Return pathway summary file per dataset diff --git a/config/config.yaml b/config/config.yaml index fc718e3c..b0e1df94 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -73,65 +73,75 @@ algorithms: include: true runs: run1: - k: range(100,201,100) + params: + k: range(100,201,100) - name: "omicsintegrator1" include: true runs: run1: - b: [5, 6] - w: np.linspace(0,5,2) - d: 10 - dummy_mode: "file" # Or "terminals", "all", "others" + params: + b: [5, 6] + w: np.linspace(0,5,2) + d: 10 + dummy_mode: "file" # Or "terminals", "all", "others" - name: "omicsintegrator2" include: true runs: run1: - b: 4 - g: 0 + params: + b: 4 + g: 0 run2: - b: 2 - g: 3 + params: + b: 2 + g: 3 - name: "meo" include: true runs: run1: - max_path_length: 3 - local_search: true - rand_restarts: 10 + params: + max_path_length: 3 + local_search: true + rand_restarts: 10 - name: "mincostflow" include: true runs: run1: - flow: 1 - capacity: 1 + params: + flow: 1 + capacity: 1 - name: "allpairs" include: true + timeout: 1d - name: "domino" include: true runs: run1: - slice_threshold: 0.3 - module_threshold: 0.05 + params: + slice_threshold: 0.3 + module_threshold: 0.05 - name: "strwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "rwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "bowtiebuilder" include: true @@ -140,12 +150,14 @@ algorithms: include: true runs: run1: - gamma: [10] + params: + gamma: [10] - name: "diamond" include: true runs: run1: - n: 1 + params: + n: 1 # Here we specify which pathways to run and other file location information. # DataLoader.py can currently only load a single dataset diff --git a/config/egfr.yaml b/config/egfr.yaml index b93c593c..623b0121 100644 --- a/config/egfr.yaml +++ b/config/egfr.yaml @@ -31,77 +31,89 @@ algorithms: include: true runs: run1: - k: - - 10 - - 20 - - 70 + params: + k: + - 10 + - 20 + - 70 - name: omicsintegrator1 include: true runs: run1: - b: - - 0.55 - - 2 - - 10 - d: 10 - g: 1e-3 - r: 0.01 - w: 0.1 - mu: 0.008 - dummy_mode: ["file"] + params: + b: + - 0.55 + - 2 + - 10 + d: 10 + g: 1e-3 + r: 0.01 + w: 0.1 + mu: 0.008 + dummy_mode: ["file"] - name: omicsintegrator2 include: true runs: run1: - b: 4 - g: 0 + params: + b: 4 + g: 0 run2: - b: 2 - g: 3 + params: + b: 2 + g: 3 - name: meo include: true runs: run1: - local_search: true - max_path_length: 3 - rand_restarts: 10 + params: + local_search: true + max_path_length: 3 + rand_restarts: 10 run2: - local_search: false - max_path_length: 2 - rand_restarts: 10 + params: + local_search: false + max_path_length: 2 + rand_restarts: 10 - name: allpairs include: true - name: domino include: true runs: run1: - slice_threshold: 0.3 - module_threshold: 0.05 + params: + slice_threshold: 0.3 + module_threshold: 0.05 - name: mincostflow include: true runs: run1: - capacity: 15 - flow: 80 + params: + capacity: 15 + flow: 80 run2: - capacity: 1 - flow: 6 + params: + capacity: 1 + flow: 6 run3: - capacity: 5 - flow: 60 + params: + capacity: 5 + flow: 60 - name: "strwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "rwr" include: true runs: run1: - alpha: [0.85] - threshold: [100, 200] + params: + alpha: [0.85] + threshold: [100, 200] - name: "bowtiebuilder" include: false diff --git a/docker-wrappers/SPRAS/Dockerfile b/docker-wrappers/SPRAS/Dockerfile index 3d69943c..4bb0d660 100644 --- a/docker-wrappers/SPRAS/Dockerfile +++ b/docker-wrappers/SPRAS/Dockerfile @@ -4,7 +4,7 @@ FROM almalinux:9 RUN dnf update -y && \ dnf install -y epel-release && \ dnf install -y gcc gcc-c++ \ - python3.11 python3.11-pip python3.11-devel \ + python3.13 python3.13-pip python3.13-devel \ docker apptainer && \ dnf clean all @@ -13,4 +13,4 @@ RUN chmod -R 777 /spras WORKDIR /spras # Install spras into the container -RUN pip3.11 install . +RUN pip3.13 install . diff --git a/docs/contributing/index.rst b/docs/contributing/index.rst index f47e935b..38774d4c 100644 --- a/docs/contributing/index.rst +++ b/docs/contributing/index.rst @@ -306,7 +306,7 @@ through SPRAS with .. code:: bash - snakemake --cores 1 --configfile config/config.yaml + spras run --cores 1 --configfile config/config.yaml Make sure to run the command inside the ``spras`` conda environment. diff --git a/docs/design/errors.rst b/docs/design/errors.rst new file mode 100644 index 00000000..4d436601 --- /dev/null +++ b/docs/design/errors.rst @@ -0,0 +1,43 @@ +######## + Errors +######## + +By default, whenever SPRAS runs into a container error (i.e. an internal +algorithm error), the full workflow will fail. However, there are +certain designated errors where we don't want this to be the case. + +Due to the following design constraints: + +- Snakemake does not have support for such errors (the closest being + ``--keep-going``, which unnecessarily runs failed runs) +- SPRAS occasionally outputs empty files as genuine output +- We need to log errors that happen for user knowledge + +we opt to use an ``artifact-info.json`` file, which keeps track of the +success/failure status at certain failable parts of the workflow. This +file contains whether or not this part of the workflow succeeded, and if +it failed, how it failed. + +The ``artifact-info.json`` stores a JSON object, containing: + +- The key ``status``, which is either the value ``success`` or + ``error``, depending on what happened in the workflow. + +- If ``status`` is ``error``, we store associated error details in the + ``details`` key, which contains an object: + + - The ``details`` object varies per error in the form of a tagged + union: they are differentiated by the ``type`` key. We describe + each error down below. + +************* + Error Types +************* + +With the above schema, we detail the ``details`` key below. + +Timeout +======= + +Timeout has ``type: "timeout"``, with a single key ``duration``, which +describes the ``timeout`` value associated to that run. diff --git a/docs/fordevs/spras.config.rst b/docs/fordevs/spras.config.rst index f7161447..a0f6f2e9 100644 --- a/docs/fordevs/spras.config.rst +++ b/docs/fordevs/spras.config.rst @@ -6,6 +6,15 @@ Submodules ************ +******************************** + spras.config.algorithms module +******************************** + +.. automodule:: spras.config.algorithms + :members: + :undoc-members: + :show-inheritance: + **************************** spras.config.config module **************************** @@ -15,6 +24,33 @@ :undoc-members: :show-inheritance: +************************************** + spras.config.container_schema module +************************************** + +.. automodule:: spras.config.container_schema + :members: + :undoc-members: + :show-inheritance: + +***************************** + spras.config.dataset module +***************************** + +.. automodule:: spras.config.dataset + :members: + :undoc-members: + :show-inheritance: + +****************************** + spras.config.revision module +****************************** + +.. automodule:: spras.config.revision + :members: + :undoc-members: + :show-inheritance: + **************************** spras.config.schema module **************************** diff --git a/docs/fordevs/spras.rst b/docs/fordevs/spras.rst index 8bd5060f..d63f52e9 100644 --- a/docs/fordevs/spras.rst +++ b/docs/fordevs/spras.rst @@ -52,6 +52,15 @@ :undoc-members: :show-inheritance: +********************** + spras.diamond module +********************** + +.. automodule:: spras.diamond + :members: + :undoc-members: + :show-inheritance: + ********************* spras.domino module ********************* @@ -61,6 +70,15 @@ :undoc-members: :show-inheritance: +********************* + spras.errors module +********************* + +.. automodule:: spras.errors + :members: + :undoc-members: + :show-inheritance: + ************************* spras.evaluation module ************************* @@ -142,6 +160,15 @@ :undoc-members: :show-inheritance: +************************ + spras.profiling module +************************ + +.. automodule:: spras.profiling + :members: + :undoc-members: + :show-inheritance: + ************************** spras.responsenet module ************************** diff --git a/docs/index.rst b/docs/index.rst index c8aae5e6..42fc92f8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -40,6 +40,7 @@ reconstruction methods (PRMs) to omics data. output htcondor + timeout .. toctree:: :maxdepth: 1 @@ -62,6 +63,12 @@ reconstruction methods (PRMs) to omics data. contributing/patching contributing/design +.. toctree:: + :maxdepth: 1 + :caption: Internal Designs + + design/errors + .. toctree:: :maxdepth: 1 :caption: Tutorials diff --git a/docs/timeout.rst b/docs/timeout.rst new file mode 100644 index 00000000..e58b2e50 --- /dev/null +++ b/docs/timeout.rst @@ -0,0 +1,60 @@ +########## + Timeouts +########## + +The SPRAS global configuration can take optional per-algorithm timeouts. +For example, to give a specific run of the PathLinker algorithm a 1 day +timeout: + +.. code:: yaml + + - name: "pathlinker" + include: true + runs: + run1: + timeout: 1d + params: + k: 200 + +The timeout string parsing is delegated to `pytimeparse +`__ (examples linked here). This +allows for more complicated timeout strings, such as ``3d2h32m``. + +If ``timeout`` is not specified, the algorithm will never be interrupted +due to running too long. + +**NOTE**: This feature only works with docker and apptainer/singularity +at the time of writing, not dsub. + +********************* + Configuration notes +********************* + +Since ``timeout`` is a run parameter, it can also be moved to the top +level of an algorithm configuration, where that value will become the +default unless otherwise specified in a specific run. + +.. code:: yaml + + - name: "pathlinker" + include: true + timeout: 1d + runs: + run1: + # this uses timeout: 2d + timeout: 2d + params: + k: 200 + run2: + # this uses timeout: 1d + params: + k: 100 + +This is also useful for algorithms which take in no parameters, such as +``allpairs``: + +.. code:: yaml + + - name: "allpairs" + include: true + timeout: 1d diff --git a/docs/tutorial/beginner.rst b/docs/tutorial/beginner.rst index a0846666..0e7238c3 100644 --- a/docs/tutorial/beginner.rst +++ b/docs/tutorial/beginner.rst @@ -161,13 +161,15 @@ Algorithms params: include: true run1: - b: 0.1 - d: 10 - g: 1e-3 + params: + b: 0.1 + d: 10 + g: 1e-3 run2: - b: [0.55, 2, 10] - d: [10, 20] - g: 1e-3 + params: + b: [0.55, 2, 10] + d: [10, 20] + g: 1e-3 When defining an algorithm in the configuration file, its name must match one of the supported SPRAS algorithms. Each algorithm includes an @@ -285,7 +287,7 @@ From the root directory, run the command below from the command line: .. code:: bash - snakemake --cores 1 --configfile config/beginner.yaml + spras run --cores 1 --configfile config/beginner.yaml This command starts the workflow manager that automates all steps defined by SPRAS. It tells Snakemake to use one CPU core and to load @@ -430,7 +432,7 @@ After saving the changes, rerun with: .. code:: bash - snakemake --cores 1 --configfile config/beginner.yaml + spras run --cores 1 --configfile config/beginner.yaml What happens when you run this command -------------------------------------- @@ -600,7 +602,7 @@ After saving the changes, rerun with: .. code:: bash - snakemake --cores 1 --configfile config/beginner.yaml + spras run --cores 1 --configfile config/beginner.yaml What happens when you run this command -------------------------------------- diff --git a/docs/tutorial/intermediate.rst b/docs/tutorial/intermediate.rst index 1055b879..21c1e84a 100644 --- a/docs/tutorial/intermediate.rst +++ b/docs/tutorial/intermediate.rst @@ -542,7 +542,7 @@ From the root directory, run the command below from the command line: .. code:: bash - snakemake --cores 4 --configfile config/intermediate.yaml + spras run --cores 4 --configfile config/intermediate.yaml What happens when you run this command -------------------------------------- @@ -836,7 +836,7 @@ After saving the changes in the configuration file, rerun with: .. code:: bash - snakemake --cores 4 --configfile config/intermediate.yaml + spras run --cores 4 --configfile config/intermediate.yaml What happens when you run this command -------------------------------------- diff --git a/docs/usage.rst b/docs/usage.rst index 9b277806..e4932df6 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -3,14 +3,15 @@ ############# SPRAS is run through `Snakemake `_, -which comes with the SPRAS conda environment. +which comes with the SPRAS conda environment and as a dependency of +SPRAS. To run SPRAS, run the following command inside the ``spras`` directory, specifying a ``config.yaml`` and the number of cores to run SPRAS with: .. code:: bash - snakemake --cores 1 --configfile config.yaml + spras run --cores 1 --configfile config.yaml ********************* Parallelizing SPRAS @@ -25,7 +26,7 @@ To parallelize SPRAS, specify ``--cores`` to be a value higher than .. code:: bash - snakemake --cores 4 --configfile config.yaml + spras run --cores 4 --configfile config.yaml SPRAS also supports high-performance computing with its integration with `HTCondor `_. See :doc:`Running with HTCondor diff --git a/environment.yml b/environment.yml index 4361834a..dcd91c6e 100644 --- a/environment.yml +++ b/environment.yml @@ -3,37 +3,38 @@ channels: - conda-forge dependencies: - adjusttext=1.3.0 - - bioconda::snakemake-minimal=9.6.2 + - bioconda::snakemake-minimal=9.19.0 # Conda refers to pypi/docker as docker-py. - docker-py=7.1.0 - - matplotlib=3.10.3 - - networkx=3.5 - - pandas=2.3.0 - - pydantic=2.11.7 - - numpy=2.3.1 - - requests=2.32.4 - - scikit-learn=1.7.0 + - matplotlib=3.10.9 + - networkx=3.6.1 + - pandas=3.0.2 + - pydantic=2.13.3 + - numpy=2.4.4 + - requests=2.33.1 + - scikit-learn=1.8.0 - seaborn=0.13.2 - spython=0.3.14 + - pytimeparse=1.1.8 # conda-specific for dsub - python-dateutil=2.9.0 - - pytz=2025.2 - - pyyaml=6.0.2 - - tenacity=9.1.2 - - tabulate=0.9.0 + - pytz=2026.1 + - pyyaml=6.0.3 + - tenacity=9.1.4 + - tabulate=0.10.0 # toolchain deps - - pip=25.3 + - pip=26.0.1 # This should be the same as requires-python minus the >=. - - python=3.11 + - python=3.13 # development dependencies - - pre-commit=4.2.0 - - pytest=8.4.1 - - pytest-split=0.10.0 - - sphinx=7.4.7 - - sphinx-rtd-theme=2.0.0 + - pre-commit=4.6.0 + - pytest=9.0.3 + - pytest-split=0.11.0 + - sphinx=9.1.0 + - sphinx-rtd-theme=3.1.0 - pip: - dsub==0.4.13 diff --git a/pyproject.toml b/pyproject.toml index bfc602c6..b7d787f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,43 +16,50 @@ classifiers = [ "Programming Language :: Python :: 3", "Topic :: Scientific/Engineering :: Bio-Informatics", ] -requires-python = ">=3.11" +requires-python = ">=3.13" dependencies = [ - "adjusttext==0.7.3", - "snakemake==9.6.2", + "adjusttext==1.3.0", + "snakemake==9.19.0", "docker==7.1.0", - "matplotlib==3.10.3", - "networkx==3.5", - "pandas==2.3.0", - "pydantic==2.11.7", - "numpy==2.3.1", - "requests==2.32.4", - "scikit-learn==1.7.0", + "matplotlib==3.10.9", + "networkx==3.6.1", + "pandas==3.0.2", + "pydantic==2.13.3", + "numpy==2.4.4", + "requests==2.33.1", + "scikit-learn==1.8.0", "seaborn==0.13.2", "spython==0.3.14", + "pytimeparse==1.1.8", # toolchain deps - "pip==25.3", + "pip==26.0.1", ] [project.optional-dependencies] dev = [ # Only required for development - "pre-commit==4.2.0", - "pytest==8.4.1", - "pytest-split==0.10.0", + "pre-commit==4.6.0", + "pytest==9.0.3", + "pytest-split==0.11.0", ] [project.urls] "Homepage" = "https://github.com/Reed-CompBio/spras" "Issues" = "https://github.com/Reed-CompBio/spras/issues" +[project.entry-points."pipx.run"] +spras = "spras.cli.cli:run" + +[project.scripts] +spras = "spras.cli.cli:run" + [build-system] requires = ["setuptools>=64.0"] build-backend = "setuptools.build_meta" [tool.ruff] -target-version = "py311" +target-version = "py313" # Autofix errors when possible fix = true # Select categories or specific rules from https://beta.ruff.rs/docs/rules/ diff --git a/spras/__main__.py b/spras/__main__.py new file mode 100644 index 00000000..40af0521 --- /dev/null +++ b/spras/__main__.py @@ -0,0 +1,3 @@ +if __name__ == "__main__": + from spras.cli.cli import run + run() diff --git a/spras/allpairs.py b/spras/allpairs.py index 51ba5432..b4e542ad 100644 --- a/spras/allpairs.py +++ b/spras/allpairs.py @@ -2,6 +2,7 @@ from pathlib import Path from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import Empty from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset @@ -72,8 +73,9 @@ def generate_inputs(data: Dataset, filename_map): header=["#Interactor1", "Interactor2", "Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() AllPairs.validate_required_run_args(inputs) work_dir = '/apsp' @@ -109,7 +111,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) @staticmethod def parse_output(raw_pathway_file, standardized_pathway_file, params): diff --git a/spras/analysis/cytoscape.py b/spras/analysis/cytoscape.py index e8489950..6eadfadd 100644 --- a/spras/analysis/cytoscape.py +++ b/spras/analysis/cytoscape.py @@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai # (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875) None, container_settings, + None, env) rmtree(cytoscape_output_dir) diff --git a/spras/btb.py b/spras/btb.py index 63b6c7ed..6ccfd3d5 100644 --- a/spras/btb.py +++ b/spras/btb.py @@ -1,6 +1,7 @@ from pathlib import Path from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import Empty from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( @@ -61,8 +62,9 @@ def generate_inputs(data, filename_map): # Skips parameter validation step @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() BowTieBuilder.validate_required_run_args(inputs) # Tests for pytest (docker container also runs this) @@ -119,7 +121,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Output is already written to raw-pathway.txt file diff --git a/spras/cli/cli.py b/spras/cli/cli.py new file mode 100644 index 00000000..eaef4749 --- /dev/null +++ b/spras/cli/cli.py @@ -0,0 +1,53 @@ +import argparse +import itertools +import os +import subprocess +from pathlib import Path + +# https://stackoverflow.com/a/5137509/7589775 +# The file we want, Snakefile, is also included in MANIFEST.in +dir_path = os.path.dirname(os.path.realpath(__file__)) +# we resolve to simplify the path name in errors +snakefile_path = Path(dir_path, "..", "Snakefile").resolve() + +# Removes the very awkwardly phrased "{subcommand1, subcommand2}" from the subcommand help +# from https://stackoverflow.com/a/13429281/7589775 +class SubcommandHelpFormatter(argparse.RawDescriptionHelpFormatter): + def _format_action(self, action): + parts = super(argparse.RawDescriptionHelpFormatter, self)._format_action(action) + if action.nargs == argparse.PARSER: + parts = "\n".join(parts.split("\n")[1:]) + return parts + +def get_parser(): + parser = argparse.ArgumentParser( + prog='SPRAS', + description='The wrapping tool for SPRAS (signaling pathway reconstruction analysis streamliner)', + epilog='SPRAS is in alpha. Report issues or suggest features on GitHub: https://github.com/Reed-CompBio/spras', + formatter_class=SubcommandHelpFormatter) + + subparsers = parser.add_subparsers(title='subcommands', + help='subcommand help', + dest='subcommand') + subparsers = subparsers.add_parser('run', + help='Run the SPRAS Snakemake workflow', + # We let snakemake handle help + add_help=False) + + return parser + +def run(): + parser = get_parser() + (args, unknown_args) = parser.parse_known_args() + + if args.subcommand == "run": + subprocess.run(list(itertools.chain( + ["snakemake", "-s", snakefile_path], + unknown_args + ))) + return + + parser.print_help() + +if __name__ == '__main__': + run() diff --git a/spras/cli/tune.py b/spras/cli/tune.py new file mode 100644 index 00000000..74c5485b --- /dev/null +++ b/spras/cli/tune.py @@ -0,0 +1,211 @@ +""" +Generic algorithm tuning. We spend a good chunk of this file setting up configuration tuning, +then finally use it by wrapping it in a CLI subcommand that takes in a configuration. +""" + +from dataclasses import dataclass, field +import itertools +from typing import Any, Callable, List, Mapping, NamedTuple, Optional +from pydantic import BaseModel, TypeAdapter +from spras.cli.util import window +from spras.config.schema import RawConfig +from spras.config.tunable import Tunable +from spras.util import NpHashEncoder, hash_params_sha1_base32 + +class Neighborly[C, N](NamedTuple): + """A neighborly element takes note of itself and its desired neighbors.""" + current: C + neighbors: List[N] = field(default_factory=list) + +def type_algorithm_params(params: BaseModel) -> dict[str, Tunable[Any]]: + """ + Applies runtime assertions to `params` and coerces it down into + the looser `dict` structure + """ + unfolded_params: dict[str, Tunable[Any]] = vars(params) + for key in unfolded_params.keys(): + # We assert at runtime our (implicit) cast of `unfolded_params` + assert issubclass(type(unfolded_params[key]), Tunable) + return unfolded_params + +@dataclass +class TunableNeighborly[S](Tunable[Neighborly[S, S]]): + """ + NOTE: This does not store data in the form of `Tunable[Neighborly[S]]`, + but that representation can be fetched through the `TunableNeighborly#to_list` method. + """ + wrapped: Tunable[S] + neighborly_elements: Mapping[S, List[S]] = field(default_factory=dict) + + def tune(self): + old_wrapped = self.wrapped.to_list() + new_wrapped = self.wrapped.tune() + new_neighborly_elements = dict(self.neighborly_elements) + for prev, curr, nxt in window(new_wrapped.to_list(), n = 3): + if curr in old_wrapped: continue + new_neighborly_elements[curr] = [prev, nxt] + + return type(self)(new_wrapped, new_neighborly_elements) + + def to_list(self): + return [ + Neighborly(elem, neighbors=self.neighborly_elements[elem] if elem in self.neighborly_elements else []) + for elem in self.wrapped.to_list() + ] + +def flatten_params( + params: Mapping[str, List[Neighborly]], + hash_function: Callable[[dict[str, Any]], str] +) -> dict[str, Neighborly[dict[str, Any], str]]: + """ + Turns parameter dictionary lists of the form: + a: [Neighborly(av1, [...]), Neighborly(av2, [...]), ...] + b: [Neighborly(bv1, [...]), Neighborly(bv2, [...]), ...] + To a dictionary of the form + {hash: Neighborly({a: av1, b: bv1}, [hash1, ...]), ...} + + @param params: a dictionary with a description as described above. + @param hash_function: the hash function to use to compute hashes for the return dictionary + """ + final_dictionary: dict[str, Neighborly[dict[str, Any], str]] = dict() + # Each parameter is a tuple of the form (a: Neighborly(...), b: Neighborly(...), ...) + for parameter_combination in itertools.product(*params.values()): + # We want to collect a list of neighbors of each parameter combination + neighbors: list[dict[str, Any]] = [] + # We transform it into {a: Neighborly(...), b: Neighborly(...), ...} + zipped_parameter_combination = dict(zip(params.keys(), parameter_combination)) + # and create a version of it with no neighbors + forgetful_parameter_combination = {k: v.current for k, v in zipped_parameter_combination.items()} + for key, value in zipped_parameter_combination.items(): + for neighbor in value.neighbors: + # Then, add the adjacent parameter combination to the `forgetful_parameter_combination`. + adjacent_parameter_combination = dict(forgetful_parameter_combination) + adjacent_parameter_combination[key] = neighbor + neighbors.append(adjacent_parameter_combination) + final_dictionary[hash_function(forgetful_parameter_combination)] = Neighborly( + forgetful_parameter_combination, + neighbors=[hash_function(neighbor) for neighbor in neighbors] + ) + return final_dictionary + +# The following is a list of utility functions to encapsulate the above behavior +# in decreasing granularity. + +def prepare_params( + params: Mapping[str, Tunable] +) -> dict[str, TunableNeighborly]: + """Attatches empty neighbor information to some given parameters.""" + return {k: TunableNeighborly(v) for k, v in params.items()} + +def forget_tuning_params( + params: Mapping[str, Tunable[Neighborly]] +) -> dict[str, List[Neighborly]]: + """Removes tuning information about some given parameters""" + return {k: v.to_list() for k, v in params.items()} + +def tune_params( + params: Mapping[str, Tunable], + hash_function: Callable[[dict[str, Any]], str], + tune_count: int = 1 +) -> dict[str, Neighborly[dict[str, Any], str]]: + """ + Tunes some `params` dictionary some `tune_count` number of times. + + @param params: a dictionary of parameter names to their `Tunable` values. + @param hash_function: The function to use to hash parameter dictionaries. + @param tune_count: The number of times to tune. + @returns: A dictionary from hashes to parameter combinations, with associated neighbor information + of what runs a run should depend on (see conditional runs.) + """ + if tune_count < 1: + raise ValueError(f"tune_count must be a positive integer: got {tune_count} instead.") + # We first attach empty neighbor information to associated parameters + prepared_params = prepare_params(params) + # Then we perform our tuning + tune_ready_params: Optional[dict[str, TunableNeighborly]] = None + for _ in range(tune_count): + tune_ready_params = {k: v.tune() for k, v in prepared_params.items()} + assert tune_ready_params is not None, "Empty range? This should have been caught by the first assumption." + # We then forget tuning information + tuned_params = forget_tuning_params(tune_ready_params) + # then finally flatten the associated parameters + return flatten_params(tuned_params, hash_function) + +def preferred_model_dump(model: BaseModel) -> Any: + """ + A custom invokation of BaseModel#model_dump which uses some preferred defaults to keep + generated configuration sizes to a minimum. + """ + return model.model_dump(by_alias=True, exclude_defaults=True, exclude_unset=True) + +def model_vars(model: Any) -> dict[str, Any]: + """Ignores model variables that are the default or unset: this acts as a kind of `vars`.""" + dumped_model = preferred_model_dump(model) + return {k: v for k, v in dict(model).items() if k in dumped_model.keys()} + +# Beyond this lies code that depends on types generated in algorithms.py. +# NOTE: If algorithms.py is refactored to be type-safe, this should be refactored as well. +def tune_run( + run: Any, + hash_function: Callable[[dict[str, Any]], str], + tune_count: int = 1, + # TODO: refactor to a lambda? + process_run: Optional[Callable[[str], str]] = None +) -> dict[str, Any]: + """ + Takes a run (a dictionary with at least a key "params") and returns a dictionary of + hashes to new runs, preserving the associated non-params run properties + while adding in new conditional runs. + + @param run: A pydantic object of runs. + @param run_prefix: The prefix to attatch to all hashed parameter combinations. + """ + if not process_run: process_run = lambda _: "" + tuned_params = tune_params(model_vars(run.params), hash_function, tune_count) + other_run_settings = {k: v for k, v in preferred_model_dump(run).items() if k != "params"} + return {process_run(k): { + "params": v.current, + **other_run_settings, + "if": [ + *[process_run(neighbor) for neighbor in v.neighbors], + *(other_run_settings["if"] if "if" in other_run_settings else []) + ] + } for k, v in tuned_params.items()} + +def tune_runs( + runs: Any, + hash_function: Callable[[dict[str, Any]], str], + tune_count: int = 1, +) -> dict[str, Any]: + all_runs: dict[str, Any] = dict() + for run_name, run_model in runs.items(): + all_runs = {**all_runs, **tune_run(run_model, hash_function, tune_count, lambda run: f"{run_name}_{run}")} + return all_runs + +def tune_algorithm_union( + algorithm: Any, + hash_function: Callable[[dict[str, Any]], str], + tune_count: int = 1, +) -> Any: + dumped_model = preferred_model_dump(algorithm) + dumped_model["runs"] = tune_runs(algorithm.runs, hash_function, tune_count) + return TypeAdapter(type(algorithm)).validate_python(dumped_model) + +def tune( + config: RawConfig, + hash_function: Callable[[dict[str, Any]], str], + tune_count: int = 1, +) -> RawConfig: + return config.model_copy(update={ + "algorithms": tune_algorithm_union([algorithm for algorithm in config.algorithms], hash_function, tune_count) + }) + +# TODO: isolate and deduplicate from config.py +def hash_func(hash_length: int, params: dict[str, Any]) -> str: + """ + General algorithm parameter dictionary hasher. + """ + return hash_params_sha1_base32(params, hash_length, cls=NpHashEncoder) + +"""The curried variant of `hash_func`.""" +hash_factory = lambda hash_length: lambda params: hash_func(hash_length, params) diff --git a/spras/cli/util.py b/spras/cli/util.py new file mode 100644 index 00000000..448067b7 --- /dev/null +++ b/spras/cli/util.py @@ -0,0 +1,17 @@ +from itertools import islice +from typing import Generator, Iterable + +# Adopted from https://stackoverflow.com/a/6822773/7589775 +# with a proper type signature. +def window[T](sequence: Iterable[T], n: int) -> Generator[tuple[T, ...], None, None]: + """ + Returns a sliding window (of width n) over data from the iterable + s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... + """ + it = iter(sequence) + result = tuple(islice(it, n)) + if len(result) == n: + yield result + for elem in it: + result = result[1:] + (elem,) + yield result diff --git a/spras/config/algorithms.py b/spras/config/algorithms.py index 552fbc4e..f364741c 100644 --- a/spras/config/algorithms.py +++ b/spras/config/algorithms.py @@ -3,91 +3,37 @@ parameter combinations. This has been isolated from schema.py as it is not declarative, and rather mainly contains validators and lower-level pydantic code. """ -import ast import copy -from typing import Annotated, Any, Callable, Literal, Union, cast, get_args +from typing import Annotated, Any, Literal, Union, cast, get_args, get_origin -import numpy as np from pydantic import ( BaseModel, BeforeValidator, ConfigDict, Field, + PlainSerializer, ValidationError, create_model, ) +from spras.config.runs import RunSettings +from spras.config.tunable import FloatTunable, IntegerTunable, TunableList from spras.runner import algorithms # This contains the dynamically generated algorithm schema for use in `schema.py` __all__ = ['AlgorithmUnion'] -def is_numpy_friendly(type: type[Any] | None) -> bool: - """ - Whether the passed in type can have any numpy helpers. - This is used to provide hints in the JSON schema, - and to determine whether or not to allow for easy ranges using - `python_evalish_coerce`. - """ - allowed_types = (int, float) - - # check basic types, then check optional types - return type in allowed_types or \ - any([arg for arg in get_args(type) if arg in allowed_types]) - -def python_evalish_coerce(value: Any) -> Any: - """ - Allows for using numpy and python calls: specifically, - `range`, `np.linspace`, `np.arange`, and `np.logspace` are supported. - - **Safety Note**: This does not prevent availability attacks: this can still exhaust - resources if wanted. This only prevents secret leakage. - """ - - if not isinstance(value, str): - return value - - # These strings are in the form of function calls `function.name(param1, param2, ...)`. - # Since we want to avoid `eval` (since this might be running in the secret-sensitive HTCondor), - # we need to parse these functions. - functions_dict: dict[str, Callable[[list[Any]], list[Union[int, float]]]] = { - 'range': lambda params: list(range(*params)), - "np.linspace": lambda params: list(np.linspace(*params)), - "np.arange": lambda params: list(np.arange(*params)), - "np.logspace": lambda params: list(np.logspace(*params)), - } +def determine_inner_type(outer_type: type): + # TODO: this doesn't handle annotated types. + if get_args(outer_type) != () and get_origin(outer_type) == Union: + # We map arbitrary unions Union[A, ...] -> Union[determine_inner_type(A), ..., TunableSet[Union[A, ...]]], + # where we include the extra entry at the end to allow for heterogeneous lists. + return Union[*(determine_inner_type(arg) for arg in get_args(outer_type)), TunableList[outer_type]] - # To do this, we get the AST of our string as an expression - # (filename='' is to make the error message more closely resemble that of eval.) - value_ast = ast.parse(value, mode='eval', filename='') - - # Then we do some light parsing - we're only looking to do some literal evaluation - # (allowing light python notation) and some basic function parsing. Full python programs - # should just generate a config.yaml. - - # This should always be an Expression whose body is Call (a function). - if not isinstance(value_ast.body, ast.Call): - raise ValueError(f'This argument "{value}" was interpreted as a non-function-calling string: it should be a function call (e.g. range(100, 201, 50)), or an int or a float.') - - # We get the function name back as a string - function_name = ast.unparse(value_ast.body.func) - - # and we use the (non-availability) safe `ast.literal_eval` to support literals passed into functions. - arguments = [ast.literal_eval(arg) for arg in value_ast.body.args] - - if function_name not in functions_dict: - raise ValueError(f"{function_name} is not an allowed function to be run! Allowed functions: {list(functions_dict.keys())}") - - return functions_dict[function_name](arguments) - -def list_coerce(value: Any) -> Any: - """ - Coerces to a value to a list if it isn't already. - Used as a BeforeValidator. - """ - if not isinstance(value, list): - return [value] - return value + if outer_type == float: return FloatTunable + if outer_type == int: return IntegerTunable + # We fall back to base `TunableSet` otherwise. + return TunableList[outer_type] # This is the most 'hacky' part of this code, but, thanks to pydantic, we avoid reflection # and preserve rich type information at runtime. @@ -100,12 +46,6 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod - Ranges and other convenient calls are expanded (see `python_evalish_coerce`) """ - # Get the default model instance by trying to serialize the empty dictionary - try: - model_default = model.model_validate({}) - except ValidationError: - model_default = None - # First, we need to take our 'model' and coerce it to permit parameter combinations. # This assumes that all of the keys are flattened, so we only get a structure like so: # class AlgorithmParams(BaseModel): @@ -114,34 +54,51 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod # ... # and we want to transform this to: # class AlgorithmParamsCombination(BaseModel): - # key1: list[int] - # key2: list[list[str]] + # key1: TunableInteger + # key2: TunableSet[list[str]] + # ... + # where all key-types are children of `Tunable`. # However, we want to preserve certain conveniences (singleton values, fake python evaluation), # so we also make use of BeforeValidators to do so, and we pass over their preferences into the JSON schema. # (Note: This function does not worry about getting the cartesian product of this.) - # Map our fields to a list (assuming we have no nested keys), + # Map our fields to `Tunable`s (assuming we have no nested keys), # and specify our user convenience validators - mapped_list_field: dict[str, Annotated] = dict() + mapped_list_field: dict[str, tuple] = dict() for field_name, field in model.model_fields.items(): # We need to create a copy of the field, # as we need to make sure that it gets mapped to the list coerced version of the field. new_field = copy.deepcopy(field) new_field.validate_default = True - mapped_list_field[field_name] = (Annotated[ - list[field.annotation], - # This order isn't arbitrary. - # https://docs.pydantic.dev/latest/concepts/validators/#ordering-of-validators - # This runs second. This coerces any singletons to lists. - BeforeValidator(list_coerce, json_schema_input_type=Union[field.annotation, list[field.annotation]]), - # This runs first. This evaluates numpy utils for integer/float lists - BeforeValidator( - python_evalish_coerce, - # json_schema_input_type (sensibly) overwrites, so we have to specify the entire union again here. - json_schema_input_type=Union[field.annotation, list[field.annotation], str] - ) if is_numpy_friendly(field.annotation) else None - ], new_field) + assert field.annotation is not None, f"Field {field.title} ({field}) has a None annotation, but we require type annotations on fields." + field_type = determine_inner_type(field.annotation) + + mapped_list_field[field_name] = ( + Annotated[ + Union[ + # We first try to validate the type directly + field_type, + # If that fails, we coerce it into a list beforehand then validate it again. + Annotated[ + field_type, + BeforeValidator( + lambda value: [value], + # and we ensure that in a JSON schema, this is properly marked + # as a singleton value. + json_schema_input_type=field.annotation + ), + ] + ], + # For cleaner serialization, we also serialize singleton types + # as single arrays rather than full arrays. This is especially useful + # for parameter tuning output. + PlainSerializer( + lambda value: value.to_list()[0] if len(value.to_list()) == 1 else value + ) + ], + new_field + ) # Runtime assertion check: mapped_list_field does not contain any `__-prefixed` fields for key in mapped_list_field.keys(): @@ -151,12 +108,30 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod # Pass this as kwargs to create_model, which usually takes in parameters field_name=type. # We do need to cast create_model, since otherwise the type-checker complains that we may # have had a key that starts with __ in mapped_list_fields. The above assertion prevents this. - run_model = (cast(Any, create_model))( - f'{name}RunModel', + params_model = (cast(Any, create_model))( + f'{name}ParamModel', __config__=ConfigDict(extra='forbid'), **mapped_list_field ) + # Get the default model instance by trying to serialize the empty dictionary + try: + params_model_default = params_model.model_validate({}) + except ValidationError: + params_model_default = None + + # Then, we create a wrapping `run_model` which contains our params_model, + # as well as any associated options with an individual run. + run_model = create_model( + f'{name}RunModel', + params=(params_model, params_model_default), + __base__=RunSettings, + __config__=ConfigDict(extra='forbid') + ) + + # We use `model_validate` instead of the `run_model` constructor since `run_model` is based off of `RunSettings` + run_model_default = None if params_model_default is None else run_model.model_validate({"params": params_model_default}) + # Here is an example of how this would look like inside config.yaml # name: pathlinker # include: true @@ -174,8 +149,11 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod # include: true # will run, despite there being no entries in `runs`. # (create_model entries take in either a type or (type, default)). - runs=dict[str, run_model] if model_default is None else (dict[str, run_model], {"default": model_default}), - __config__=ConfigDict(extra='forbid') + runs=dict[str, run_model] if run_model_default is None else (dict[str, run_model], {"default": run_model_default}), + __config__=ConfigDict(extra='forbid'), + # Note that both entire algorithms and their runs inherit from `RunSettings`, to allow default runs such as + # `allpairs` to have run-specific settings (e.g. allpairs with timeout.) + __base__=RunSettings ) algorithm_models: list[type[BaseModel]] = [construct_algorithm_model(name, model.get_params_generic()) for name, model in algorithms.items()] diff --git a/spras/config/config.py b/spras/config/config.py index 0a4670a4..f12a7cb7 100644 --- a/spras/config/config.py +++ b/spras/config/config.py @@ -19,11 +19,11 @@ from pathlib import Path from typing import Any -import numpy as np import yaml from spras.config.container_schema import ProcessedContainerSettings from spras.config.revision import attach_spras_revision, spras_revision +from spras.config.runs import RunSettings from spras.config.schema import DatasetSchema, RawConfig from spras.util import LoosePathLike, NpHashEncoder, hash_params_sha1_base32 @@ -61,6 +61,10 @@ def __init__(self, raw_config: dict[str, Any]): self.hash_length = parsed_raw_config.hash_length # Container settings used by PRMs. self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length) + # Dictionary of algorithm runs with their associated conditional requirements + self.conditional_run_dependencies: dict[str, set[str]] = dict() + # Dictionary of parameter hashes to their respective run settings + self.algorithm_param_run_settings: dict[str, RunSettings] = dict() # A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations. # Only includes algorithms that are set to be run with 'include: true'. self.algorithm_params: dict[str, dict[str, Any]] = dict() @@ -165,6 +169,12 @@ def process_algorithms(self, raw_config: RawConfig): # We copy raw_config.algorithms to avoid mutating the original config # when we attach the SPRAS revision to algorithm names later. for alg in raw_config.algorithms[:]: + # We later use these dictionary to build up our conditional runs dictionary, + # choosing to handle it in the algorithms loop to avoid having to handle run name + # conflicts throughout separate algorithms. + unexpanded_conditional_run_dependencies: dict[str, set[str]] = dict() + run_name_hashes: dict[str, set[str]] = dict() + alg.name = attach_spras_revision(self.immutable_files, alg.name) if alg.include: # This dict maps from parameter combinations hashes to parameter combination dictionaries @@ -179,30 +189,21 @@ def process_algorithms(self, raw_config: RawConfig): for run_name in runs.keys(): all_runs = [] + # Since we have to build up our runs first, we save the dictionary expansion until after we loop through all runs. + unexpanded_conditional_run_dependencies[run_name] = set(runs[run_name].conditionals).union(set(alg.conditionals)) + # We create the product of all param combinations for each run param_name_list = [] # We convert our run parameters to a dictionary, allowing us to iterate over it - run_subscriptable = vars(runs[run_name]) - for param in run_subscriptable: + run_subscriptable = vars(runs[run_name].params) + # `param_values` is guaranteed to be `Tunable[Any]` by algorithms.py + for param, param_values in run_subscriptable.items(): param_name_list.append(param) - # this is guaranteed to be list[Any] by algorithms.py - param_values: list[Any] = run_subscriptable[param] - all_runs.append(param_values) + all_runs.append(param_values.to_list()) run_list_tuples = list(it.product(*all_runs)) param_name_tuple = tuple(param_name_list) for r in run_list_tuples: run_dict = dict(zip(param_name_tuple, r, strict=True)) - # TODO: Workaround for yaml.safe_dump in Snakefile write_parameter_log. - # We would like to preserve np info for larger floats and integers on the config, - # but this isn't strictly necessary for the pretty yaml logging that's happening - if we - # want to preserve the precision, we need to output this into yaml as strings. - for param, value in run_dict.copy().items(): - if isinstance(value, np.integer): - run_dict[param] = int(value) - if isinstance(value, np.floating): - run_dict[param] = float(value) - if isinstance(value, np.ndarray): - run_dict[param] = value.tolist() hash_run_dict = copy.deepcopy(run_dict) if self.immutable_files: # Incorporates the `spras_revision` into the hash @@ -218,6 +219,21 @@ def process_algorithms(self, raw_config: RawConfig): self.algorithm_params[alg.name][params_hash] = run_dict + run_name_hashes.setdefault(run_name, set()) + run_name_hashes[run_name].add(params_hash) + + # We finalize by handling any associated information to each parameter hash. + self.algorithm_param_run_settings[params_hash] = RunSettings( + timeout=runs[run_name].timeout or alg.timeout + ) + + for key, values in unexpanded_conditional_run_dependencies.items(): + for key_run_hash in run_name_hashes[key]: + self.conditional_run_dependencies.setdefault(key_run_hash, set()) + for value in values: + for value_run_hash in run_name_hashes[value]: + self.conditional_run_dependencies[key_run_hash].add(value_run_hash) + def process_analysis(self, raw_config: RawConfig): if not raw_config.analysis: return diff --git a/spras/config/function_parsing.py b/spras/config/function_parsing.py new file mode 100644 index 00000000..dab3a5e0 --- /dev/null +++ b/spras/config/function_parsing.py @@ -0,0 +1,49 @@ +import ast +from typing import Callable + + +def python_evalish_coerce[T]( + value: str, + func: Callable[..., T], + desired_function_name: str +) -> T: + """ + A convenient function for parsing strings of the form + `function_name(arg1, arg2, named=arg3, ...)` using Python's `ast`, + and evaluating them against some anonymous function. We choose to immediately evaluate + against `func`, as Python's internal parameter validation logic encapsulates precisely + what we need for this function. + + Since these errors occur in the context of a union type, we don't worry too much + about trying to collect all function names for erroring purposes. + + @param value: The value to parse against. + @param func: The function to pass arguments to. + @param desired_function_name: The function name to look for. + + @returns The result of calling `func` with the arguments in `value` + """ + # To do this, we get the AST of our string as an expression + # (filename='' is to make the error message more closely resemble that of eval.) + value_ast = ast.parse(value, mode='eval', filename='') + + # Then we do some light parsing - we're only looking to do some literal evaluation + # (allowing light python notation) and some basic function parsing. Full python programs + # should just generate a config.yaml. + + # This should always be an Expression whose body is Call (a function). + if not isinstance(value_ast.body, ast.Call): + raise ValueError(f'This argument "{value}" was interpreted as a non-function-calling string: it should be a function call (e.g. range(100, 201, 50)), or an int or a float.') + + # We get the function name back as a string + function_name = ast.unparse(value_ast.body.func) + + if desired_function_name != function_name: + raise ValueError(f"Tried looking for a function of the name {desired_function_name}, but got {function_name} instead.") + + # and we use the (non-availability) safe `ast.literal_eval` to support literals passed into functions. + arguments = [ast.literal_eval(arg) for arg in value_ast.body.args] + # TODO: unclear when keyword.arg can be None? We filter for none-None values to satisfy the type checker. + kv_arguments = {keyword.arg: ast.literal_eval(keyword.value) for keyword in value_ast.body.keywords if keyword.arg is not None} + + return func(*arguments, **kv_arguments) diff --git a/spras/config/runs.py b/spras/config/runs.py new file mode 100644 index 00000000..7044f314 --- /dev/null +++ b/spras/config/runs.py @@ -0,0 +1,31 @@ +from typing import Annotated, Optional + +from pydantic import BaseModel, BeforeValidator, ConfigDict, Field +from pytimeparse import parse + + +def validate_duration(value): + if isinstance(value, int): return value + parsed_duration = parse(value, granularity='seconds') + if not parsed_duration: raise RuntimeError(f"Encountered unparsable duration string '{value}'.") + return parsed_duration + +PyDateTimeDuration = Annotated[ + int, + BeforeValidator(validate_duration) +] + +class RunSettings(BaseModel): + """All of the non-parameter settings associated with a run.""" + + timeout: Optional[PyDateTimeDuration] = None + """The associated timeout with a run, parsed with `pytimeparse`.""" + + conditionals: list[str] = Field(alias="if", default_factory=lambda: []) + """ + If any of the specified runs in this list succeed, then this run itself is permitted to run. + We refer to these as 'conditional runs,' and since Python reserves the `if` keyword, we call + them "conditionals" for short in-code, but users interface with this using `if`. + """ + + model_config = ConfigDict(extra='forbid', use_attribute_docstrings=True) diff --git a/spras/config/tunable.py b/spras/config/tunable.py new file mode 100644 index 00000000..4fdd2f62 --- /dev/null +++ b/spras/config/tunable.py @@ -0,0 +1,181 @@ +""" +A collection of 'tunable' functions, or functions which can be tuned to +produce 'midpoints.' + +We consider parameter tuning a first-class feature of SPRAS configs, +so all algorithm list parameters are all marked with `Tunable`. +""" + +from abc import ABC, abstractmethod +from collections.abc import Set +from typing import Annotated, Any, Callable, List, Self, Union + +import numpy +from pydantic import BaseModel, Field, RootModel, model_validator + +from spras.config.function_parsing import python_evalish_coerce + + +class Tunable[T](ABC): + @abstractmethod + def tune(self) -> Self: + """ + Creates a tuned version of `self` which contains some sembalance of 'midpoints' on top of `self.` + + We impose a restriction: for all `P` implementing `Tunable`, any valid implementation of `P#tune` must satisfy that + for all `p : P`, we can construct some `p' = p.tune()` such that: + 1. `to_list(p)` is a sublist of `to_list(p')`, + 2. for every x, y in `to_list(p)` such that no z in `to_list(p)` satisfies x < z < y, there either: + - exists a unique a z' in `to_list(p')` such that x < z' < y. + - no such x < z' < y exists for all z' in to_list(p'). + + We do this for the sake of parameter tuning (to avoid situations where what runs depend on what other runs is ambiguous), + though this can be extended in the future to avoid that. + """ + # TODO: enforce at runtime + pass + + @abstractmethod + def to_list(self) -> List[T]: + pass + +class TunableList[S](RootModel[List[S]], Tunable[S]): + """ + A thin wrapper class to allow generic sets to 'pretend' to be tunable. + Note that `TunableList#tune` is the identity function. + """ + + def tune(self): + # NOTE: We use `type(self)` to access the constructor [This is annoying and I'm not a particular fan of it]. + # This happens throughout this file. + return type(self)(self.root) + + def to_list(self): + return self.root + +def parse_from_function[T](data: Any, func: Callable[..., T], desired_function_name: str) -> T: + """Convenience wrapper around `python_evalish_coerce` for trivial before model_validator declarations.""" + if not isinstance(data, str): + return data + + return python_evalish_coerce(data, func, desired_function_name) + +class BaseTunable[T](BaseModel, Tunable[T]): + """No-op class to avoid double inheritance issues for IntegerAdapter""" + pass + +# TODO: The way we do `parse_from_string` for these methods is lenghty and bad for types, +# and frankly, this section is a lot of length "declaring definitions" boilerplate, with the only substance +# happening in their respective `tune` methods: can we make this better? +class Range(BaseModel, Tunable[int]): + """A `range`-backed list of integers.""" + start: int = 0 + stop: int + step: int = 1 + + @model_validator(mode='before') + @classmethod + def parse_from_string(cls, data: Any) -> Any: + return parse_from_function(data, lambda start, stop, step=1: {"start": start, "stop": stop, "step": step}, "range") + + def tune(self): + return type(self)(start=self.start, stop=self.stop, step=int(self.step / 2.0)) + + def to_list(self): + return list(range(self.start, self.stop, self.step)) + +class LinSpace(BaseTunable[float]): + """ + A tuning wrapper for numpy.linspace: + see its associated documentation for parameter information. + """ + + start: float + stop: float + num: int = 50 + endpoint: bool + + @model_validator(mode='before') + @classmethod + def parse_from_string(cls, data: Any) -> Any: + return parse_from_function( + data, + lambda start, stop, num=50, endpoint=True: {"start": start, "stop": stop, "num": num, "endpoint": endpoint}, + "np.linspace" + ) + + def tune(self): + return type(self)(start=self.start, stop=self.stop, num=self.num * 2, endpoint=self.endpoint) + + def to_list(self): + return list(numpy.linspace(self.start, self.stop, self.num, endpoint=self.endpoint).tolist()) + +class ARange(BaseTunable[float]): + """ + A tuning wrapper for numpy.arange: + see its associated documentation for parameter information. + """ + + start: float + stop: float + step: float = 1.0 + + @model_validator(mode='before') + @classmethod + def parse_from_string(cls, data: Any) -> Any: + return parse_from_function( + data, + lambda start, stop, step=1.0: {"start": start, "stop": stop, "step": step}, + "np.arange" + ) + + def tune(self): + return type(self)(start=self.start, stop=self.stop, step=self.step / 2) + + def to_list(self): + return list(numpy.arange(self.start, self.stop, self.step).tolist()) + +class LogSpace(BaseTunable[float]): + """ + A tuning wrapper for numpy.logspace: + see its associated documentation for parameter information. + """ + + start: float + stop: float + num: int = 50 + endpoint: bool = True + base: float = 10.0 + + @model_validator(mode='before') + @classmethod + def parse_from_string(cls, data: Any) -> Any: + return parse_from_function( + data, + lambda start, stop, num=50, endpoint=True, base=10.0: + {"start": start, "stop": stop, "num": num, "endpoint": endpoint, "base": base}, + "np.logspace" + ) + + def tune(self): + # TODO: does logspace admit a proper `tune` implementation? + return type(self)(start=self.start, stop=self.stop, num=self.num, endpoint=self.endpoint, base=self.base) + + def to_list(self): + return list(numpy.logspace(self.start, self.stop, self.num, endpoint=self.endpoint, base=self.base).tolist()) + +class IntegerAdapter[S : (BaseTunable[float], BaseTunable[int])](RootModel[S], Tunable[int]): + """Takes some Tunable[float/int] backed by a BaseModel and turns it into a Tunable[int]""" + + def tune(self): + return type(self)(self.root.tune()) + + def to_list(self): + return list({int(elem) for elem in self.root.to_list()}) + + +FloatTunable = Annotated[Union[Range, LinSpace, ARange, LogSpace, TunableList[float]], Field(union_mode="left_to_right")] + +# We have to annoyingly spread IntegerAdapter across every type we care about, to avoid multiple inheritance issues +# TODO: maybe a better way to do this? +IntegerTunable = Annotated[Union[Range, IntegerAdapter[LinSpace], IntegerAdapter[ARange], IntegerAdapter[LogSpace], TunableList[int]], Field(union_mode="left_to_right")] diff --git a/spras/containers.py b/spras/containers.py index 124b9741..8c3f26b6 100644 --- a/spras/containers.py +++ b/spras/containers.py @@ -1,3 +1,4 @@ +import datetime import os import platform import re @@ -8,6 +9,7 @@ import docker import docker.errors +import requests from spras.config.container_schema import ProcessedContainerSettings from spras.logging import indent @@ -166,6 +168,20 @@ def streams_contain(self, needle: str): def __str__(self): return self.message +class TimeoutError(RuntimeError): + """Raises when a function times out.""" + timeout: int + message: str + + def __init__(self, timeout: int, *args): + self.timeout = timeout + self.message = f"Timed out after {datetime.timedelta(seconds=timeout)}." + + super(TimeoutError, self).__init__(timeout, *args) + + def __str__(self): + return self.message + def env_to_items(environment: dict[str, str]) -> Iterator[str]: """ Turns an environment variable dictionary to KEY=VALUE pairs. @@ -176,7 +192,17 @@ def env_to_items(environment: dict[str, str]) -> Iterator[str]: # TODO consider a better default environment variable # Follow docker-py's naming conventions (https://docker-py.readthedocs.io/en/stable/containers.html) # Technically the argument is an image, not a container, but we use container here. -def run_container(container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled = False): +def run_container( + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled = False +): """ Runs a command in the container using Singularity or Docker @param container_suffix: name of the DockerHub container without the 'docker://' prefix @@ -185,6 +211,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple @param working_dir: the working directory in the container @param container_settings: the settings to use to run the container @param out_dir: output directory for the rule's artifacts. Only passed into run_container_singularity for the purpose of profiling. + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -193,7 +220,7 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple container = container_settings.prefix + "/" + container_suffix if normalized_framework == 'docker': - return run_container_docker(container, command, volumes, working_dir, environment, network_disabled) + return run_container_docker(container, command, volumes, working_dir, environment, timeout, network_disabled) elif normalized_framework == 'singularity' or normalized_framework == "apptainer": return run_container_singularity(container, command, volumes, working_dir, out_dir, container_settings, environment) elif normalized_framework == 'dsub': @@ -201,7 +228,17 @@ def run_container(container_suffix: str, command: List[str], volumes: List[Tuple else: raise ValueError(f'{container_settings.framework} is not a recognized container framework. Choose "docker", "dsub", "apptainer", or "singularity".') -def run_container_and_log(name: str, container_suffix: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str | os.PathLike, container_settings: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_and_log( + name: str, + container_suffix: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, out_dir: str | os.PathLike, + container_settings: ProcessedContainerSettings, + timeout: Optional[int], + environment: Optional[dict[str, str]] = None, + network_disabled=False +): """ Runs a command in the container using Singularity or Docker with associated pretty printed messages. @param name: the display name of the running container for logging purposes @@ -210,6 +247,7 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param container_settings: the container settings to use + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @param environment: environment variables to set in the container @param network_disabled: Disables the network on the container. Only works for docker for now. This acts as a 'runtime assertion' that a container works w/o networking. @return: output from Singularity execute or Docker run @@ -219,7 +257,17 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], print('Running {} on container framework "{}" on env {} with command: {}'.format(name, container_settings.framework, list(env_to_items(environment)), ' '.join(command)), flush=True) try: - out = run_container(container_suffix=container_suffix, command=command, volumes=volumes, working_dir=working_dir, out_dir=out_dir, container_settings=container_settings, environment=environment, network_disabled=network_disabled) + out = run_container( + container_suffix=container_suffix, + command=command, + volumes=volumes, + working_dir=working_dir, + out_dir=out_dir, + container_settings=container_settings, + timeout=timeout, + environment=environment, + network_disabled=network_disabled + ) if out is not None: if isinstance(out, list): out = ''.join(out) @@ -250,7 +298,15 @@ def run_container_and_log(name: str, container_suffix: str, command: List[str], raise ContainerError(message, err.exit_status, stdout, stderr) from None # TODO any issue with creating a new client each time inside this function? -def run_container_docker(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, environment: Optional[dict[str, str]] = None, network_disabled=False): +def run_container_docker( + container: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + environment: Optional[dict[str, str]] = None, + timeout: Optional[int] = None, + network_disabled=False +): """ Runs a command in the container using Docker. Attempts to automatically correct file owner and group for new files created by the container, setting them to the @@ -261,6 +317,8 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple @param volumes: a list of volumes to mount where each item is a (source, destination) tuple @param working_dir: the working directory in the container @param environment: environment variables to set in the container + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. + @param network_disabled: if enabled, disables the underlying network: useful when containers don't fetch any online resources. @return: output from Docker run, or will error if the container errored. """ @@ -290,13 +348,28 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple bind_paths = [f'{prepare_path_docker(src)}:{dest}' for src, dest in volumes] - out = client.containers.run(container, - command, - stderr=True, - volumes=bind_paths, - working_dir=working_dir, - network_disabled=network_disabled, - environment=environment).decode('utf-8') + # We detach the container, allowing dockerpy to return a + # `Container` object for our further use. This is currently only + # to set docker-based container timeouts. + container_obj = client.containers.run( + container, + command, + volumes=bind_paths, + working_dir=working_dir, + network_disabled=network_disabled, + environment=environment, + detach=True + ) + + try: + container_obj.wait(timeout=timeout) + except requests.exceptions.ReadTimeout as err: + container_obj.stop() + client.close() + if timeout: raise TimeoutError(timeout) from err + else: raise RuntimeError("Timeout error but no timeout specified. Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") from None + + out = container_obj.attach(stderr=True).decode('utf-8') # TODO does this cleanup need to still run even if there was an error in the above run command? # On Unix, files written by the above Docker run command will be owned by root and cannot be modified @@ -345,7 +418,16 @@ def run_container_docker(container: str, command: List[str], volumes: List[Tuple return out -def run_container_singularity(container: str, command: List[str], volumes: List[Tuple[PurePath, PurePath]], working_dir: str, out_dir: str, config: ProcessedContainerSettings, environment: Optional[dict[str, str]] = None): +def run_container_singularity( + container: str, + command: List[str], + volumes: List[Tuple[PurePath, PurePath]], + working_dir: str, + out_dir: str | os.PathLike, + config: ProcessedContainerSettings, + environment: Optional[dict[str, str]] = None, + timeout: Optional[int] = None, +): """ Runs a command in the container using Singularity. Only available on Linux. @@ -355,6 +437,7 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ @param working_dir: the working directory in the container @param out_dir: output directory for the rule's artifacts -- used here to store profiling data @param environment: environment variable to set in the container + @param timeout: the timeout (in seconds), throwing a TimeoutException if the timeout is reached. @return: output from Singularity execute """ @@ -417,37 +500,44 @@ def run_container_singularity(container: str, command: List[str], volumes: List[ # If not using the expanded sandbox image, we still need to prepend the docker:// prefix # so apptainer knows to pull and convert the image format from docker to apptainer. image_to_run = expanded_image if expanded_image else "docker://" + container - if config.enable_profiling: - # We won't end up using the spython client if profiling is enabled because - # we need to run everything manually to set up the cgroup - # Build the apptainer run command, which gets passed to the cgroup wrapper script - singularity_cmd = [ - "apptainer", "exec" - ] - for bind in bind_paths: - singularity_cmd.extend(["--bind", bind]) - singularity_cmd.extend(singularity_options) - singularity_cmd.append(image_to_run) - singularity_cmd.extend(command) + # We won't end up using the spython client if profiling or timeout is enabled because + # we need to run everything manually to set up the cgroup and add the timeout command as a prefix. + # Build the apptainer run command, which gets passed to the cgroup wrapper script + cmd = [ + "apptainer", "exec" + ] + for bind in bind_paths: + cmd.extend(["--bind", bind]) + cmd.extend(singularity_options) + cmd.append(str(image_to_run)) + cmd.extend(command) + + my_cgroup: Optional[str] = None + if config.enable_profiling: my_cgroup = create_peer_cgroup() # The wrapper script is packaged with spras, and should be located in the same directory # as `containers.py`. wrapper = os.path.join(os.path.dirname(__file__), "cgroup_wrapper.sh") - cmd = [wrapper, my_cgroup] + singularity_cmd - proc = subprocess.run(cmd, capture_output=True, text=True, stderr=subprocess.STDOUT) + cmd = [wrapper, my_cgroup] + cmd + if timeout is not None: + cmd = ["timeout", f"{timeout}s"] + cmd + proc = subprocess.run(cmd, text=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + # As per unix `timeout`, this is the status if the command times out and --preserve-status is not initially specified + # (where the latter above holds). + if proc.returncode == 124: + if timeout is not None: + raise TimeoutError(timeout) + else: + raise RuntimeError("Timeout return code occurred, yet `timeout` wasn't specified. " + \ + "Please file an issue with this error and stacktrace at https://github.com/Reed-CompBio/spras/issues/new.") + if my_cgroup is not None: print("Reading memory and CPU stats from cgroup") - create_apptainer_container_stats(my_cgroup, out_dir) + create_apptainer_container_stats(my_cgroup, str(out_dir)) - result = proc.stdout - else: - result = Client.execute( - image=image_to_run, - command=command, - options=singularity_options, - bind=bind_paths - ) + result = proc.stdout return result diff --git a/spras/dataset.py b/spras/dataset.py index ddf74736..3e9975f3 100644 --- a/spras/dataset.py +++ b/spras/dataset.py @@ -1,7 +1,7 @@ import os import pickle as pkl import warnings -from typing import Union +from typing import Self, Union import pandas as pd @@ -60,9 +60,8 @@ def to_file(self, file: LoosePathLike): with open(file, "wb") as f: pkl.dump(self, f) - # NOTE: When we bump to Python 3.13, we can use the reference Dataset instead of the literal "Dataset" for typing. @classmethod - def from_file(cls, file: Union[LoosePathLike, "Dataset"]): + def from_file(cls, file: Union[LoosePathLike, Self]): """ Loads dataset object from a pickle file or another `Dataset` object. Usage: dataset = Dataset.from_file(pickle_file) diff --git a/spras/diamond.py b/spras/diamond.py index e8d5ed7d..65f1ac9b 100644 --- a/spras/diamond.py +++ b/spras/diamond.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import ContainerError, prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -63,8 +64,9 @@ def generate_inputs(data, filename_map): edges_df.to_csv(filename_map["network"], columns=["Interactor1", "Interactor2"], index=False, header=None, sep=',') @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() DIAMOnD.validate_required_run_args(inputs) work_dir = '/diamond' @@ -100,7 +102,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) except ContainerError as err: if err.streams_contain("KeyError: 'nix'"): raise RuntimeError(f"{err.stderr}\n" + \ diff --git a/spras/domino.py b/spras/domino.py index 2e96a298..1e419ecf 100644 --- a/spras/domino.py +++ b/spras/domino.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import BaseModel from spras.containers import ContainerError, prepare_volume, run_container_and_log from spras.interactome import ( @@ -79,9 +80,10 @@ def generate_inputs(data, filename_map): header=['ID_interactor_A', 'ppi', 'ID_interactor_B']) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = DominoParams() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() DOMINO.validate_required_run_args(inputs) work_dir = '/spras' @@ -117,7 +119,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. @@ -151,7 +154,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) except ContainerError as err: # Occurs when DOMINO gets passed some empty dataframe from network_file. # This counts as an empty input, so we return an empty output. diff --git a/spras/errors.py b/spras/errors.py new file mode 100644 index 00000000..e45bf7fa --- /dev/null +++ b/spras/errors.py @@ -0,0 +1,79 @@ +""" +These are errors for the SPRAS workflow: we describe these as 'artifact' information, +as Snakemake produces artifacts, and the error/success status of these artifacts is associated with +a separate file, named `artifact-info.json`. Note that an `artifact-info.json` file is attached to a +part of the workflow run, which may produce multiple artifacts, and not a single artifact. + +This file makes some heavy use of pydantic discriminated unions and type adapters, +both of which happen to be described in the unions page: +https://pydantic.dev/docs/validation/latest/concepts/unions/#discriminated-unions +""" + +import json +from pathlib import Path +from typing import Annotated, Literal, Union + +from pydantic import BaseModel, Field, TypeAdapter + +from spras.util import LoosePathLike + + +class TimeoutArtifactError(BaseModel): + # We can't use the key `type` without some extra pydantic aliasing. + error_type: Literal['timeout'] = 'timeout' + duration: int + +class FailedDependencyError(BaseModel): + error_type: Literal['depended'] = 'depended' + failing_dependencies: list[str] + +"""All possible distinguished errors.""" +ArtifactErrorOptions = Union[TimeoutArtifactError, FailedDependencyError] + +class ArtifactError(BaseModel): + """ + One of the two variants of artifact information describing errors. See `ArtifactSuccess` for the other option. + + This variant is returned when we have a failing point in the workflow, + with `details` delegating to `ArtifactErrorOptions` for more information about the error. + """ + details: ArtifactErrorOptions = Field(discriminator="error_type") + status: Literal['error'] = 'error' + +class ArtifactSuccess(BaseModel): + """ + One of the two variants of artifact information describing successes. See `ArtifactError` for the other option. + + This variant only says that this part of the workflow succeeded. + """ + status: Literal['success'] = 'success' + +"""Describes what happened to a [potentially collection of] artifacts at a point in the workflow.""" +ArtifactInfo = Annotated[Union[ArtifactError, ArtifactSuccess], Field(discriminator="status")] +ArtifactInfoAdapter = TypeAdapter[ArtifactInfo](ArtifactInfo) + +# Collection of Snakemake utilities. + +def artifact_info_to_str(artifact_info: ArtifactInfo) -> str: + """Converts some `ArtifactInfo` into a string.""" + return json.dumps(ArtifactInfoAdapter.validate_python(artifact_info).model_dump(mode='json')) + +def artifact_info_from_file(file: LoosePathLike) -> ArtifactInfo: + """Converts a file into ArtifactInfo.""" + with open(file, 'r') as f: + return ArtifactInfoAdapter.validate_json(json.load(f)) + +def mark_error(file: LoosePathLike, artifact_error: ArtifactErrorOptions): + """Marks an artifact information file as an error with associated details.""" + Path(file).write_text(artifact_info_to_str(ArtifactError(details=artifact_error))) + +def mark_success(file: LoosePathLike): + """Marks an artifact information file as successful""" + Path(file).write_text(artifact_info_to_str(ArtifactSuccess())) + +def is_error(file: LoosePathLike): + """Checks if a file was produced by mark_error.""" + try: + return artifact_info_from_file(file).status == "error" + except ValueError: + return False diff --git a/spras/meo.py b/spras/meo.py index dcb7985e..003d6a62 100644 --- a/spras/meo.py +++ b/spras/meo.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( add_directionality_constant, @@ -136,10 +137,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['edges'], sep='\t', index=False, columns=['Interactor1', 'EdgeType', 'Interactor2', 'Weight'], header=False) - # TODO add parameter validation # TODO document required arguments @staticmethod - def run(inputs, output_file=None, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): """ Run Maximum Edge Orientation in the Docker image with the provided parameters. The properties file is generated from the provided arguments. @@ -148,8 +148,9 @@ def run(inputs, output_file=None, args=None, container_settings=None): Only the edge output file is retained. All other output files are deleted. """ - if not container_settings: container_settings = ProcessedContainerSettings() if not args: args = MEOParams() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() MEO.validate_required_run_args(inputs) work_dir = '/spras' @@ -196,7 +197,8 @@ def run(inputs, output_file=None, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) properties_file_local.unlink(missing_ok=True) diff --git a/spras/mincostflow.py b/spras/mincostflow.py index 96fb0e10..60f57900 100644 --- a/spras/mincostflow.py +++ b/spras/mincostflow.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( convert_undirected_to_directed, @@ -71,9 +72,10 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = MinCostFlowParams() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() MinCostFlow.validate_required_run_args(inputs) # the data files will be mapped within this directory within the container @@ -122,7 +124,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Check the output of the container out_dir_content = sorted(out_dir.glob('*.sif')) diff --git a/spras/omicsintegrator1.py b/spras/omicsintegrator1.py index 916b7c45..85f444b8 100644 --- a/spras/omicsintegrator1.py +++ b/spras/omicsintegrator1.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import CaseInsensitiveEnum from spras.containers import prepare_volume, run_container_and_log from spras.dataset import MissingDataError @@ -154,8 +155,9 @@ def generate_inputs(data, filename_map): # TODO add support for knockout argument # TODO add reasonable default values @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() OmicsIntegrator1.validate_required_run_args(inputs, ["dummy_nodes"]) work_dir = '/spras' @@ -227,6 +229,7 @@ def run(inputs, output_file, args, container_settings=None): work_dir, out_dir, container_settings, + run_settings.timeout, {'TMPDIR': mapped_out_dir}) conf_file_local.unlink(missing_ok=True) diff --git a/spras/omicsintegrator2.py b/spras/omicsintegrator2.py index b6c18efd..7d11d267 100644 --- a/spras/omicsintegrator2.py +++ b/spras/omicsintegrator2.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.config.util import CaseInsensitiveEnum from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset, MissingDataError @@ -105,9 +106,10 @@ def generate_inputs(data: Dataset, filename_map): header=['protein1', 'protein2', 'cost']) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = OmicsIntegrator2Params() + if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() OmicsIntegrator2.validate_required_run_args(inputs) work_dir = '/spras' @@ -157,6 +159,7 @@ def run(inputs, output_file, args=None, container_settings=None): work_dir, out_dir, container_settings, + run_settings.timeout, network_disabled=True) # TODO do we want to retain other output files? diff --git a/spras/pathlinker.py b/spras/pathlinker.py index 7f070f55..6a05509f 100644 --- a/spras/pathlinker.py +++ b/spras/pathlinker.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -73,9 +74,10 @@ def generate_inputs(data, filename_map): header=["#Interactor1","Interactor2","Weight"]) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): - if not container_settings: container_settings = ProcessedContainerSettings() + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): if not args: args = PathLinkerParams() + if not run_settings: run_settings = RunSettings() + if not container_settings: container_settings = ProcessedContainerSettings() PathLinker.validate_required_run_args(inputs) work_dir = '/spras' @@ -113,7 +115,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename # Currently PathLinker only writes one output file so we do not need to delete others diff --git a/spras/prm.py b/spras/prm.py index 18f3c8a9..f5d7c53a 100644 --- a/spras/prm.py +++ b/spras/prm.py @@ -1,11 +1,13 @@ import os from abc import ABC, abstractmethod from pathlib import Path +from types import get_original_bases from typing import Any, Generic, Mapping, Optional, TypeVar, cast, get_args from pydantic import BaseModel from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.dataset import Dataset from spras.util import LoosePathLike @@ -52,41 +54,55 @@ def get_params_generic(cls) -> type[T]: For example, on `class PathLinker(PRM[PathLinkerParams])`, calling `PathLinker.get_params_generic()` returns `PathLinkerParams`. """ - # TODO: use the type-safe get_original_bases when we bump to >= Python 3.12 - # This is hacky reflection from https://stackoverflow.com/a/71720366/7589775 - # which grabs the class of type T by the definition of `__orig_bases__`. - return get_args(cast(Any, cls).__orig_bases__[0])[0] + # This gives us (PRM[PathLinkerParams], ) + original_bases = get_original_bases(cls) + + # Since we just used reflection, we provide a few mountain-dewey error messages here + # to protect against any developer confusion. + assert len(original_bases) == 1, f"{cls} inherits from several classes, when precisely one is required." + original_bases_args = get_args(original_bases[0]) + assert len(original_bases_args) == 1, "There were several generics passed into PRM, when precisely one is required." + T_class, = original_bases_args + + if not issubclass(T_class, BaseModel): + raise RuntimeError("The generic passed into PRM is not a pydantic.BaseModel.") + + # Finally, we cast, since issubclass overeagerly restricts T_class to type[BaseModel] + # instead of type[T] without imposing the restriction that T inherits from BaseModel + return cast(type[T], T_class) # This is used in `runner.py` to avoid a dependency diamond when trying # to import the actual algorithm schema. @classmethod - def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings): + def run_typeless(cls, inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: dict[str, Any], container_settings: ProcessedContainerSettings, run_settings: RunSettings): """ - This is similar to PRA.run, but it does pydantic logic internally to re-validate argument parameters. + This is similar to PRA.run, but `args` is a dictionary and not a pydantic structure. + However, this method still re-validates `args` against the associated pydantic PRM argument model. """ T_class = cls.get_params_generic() - # Since we just used reflection, we provide a mountain-dewey error message here - # to protect against any developer confusion. - if not issubclass(T_class, BaseModel): - raise RuntimeError("The generic passed into PRM is not a pydantic.BaseModel.") - # Validates our untyped `args` parameter against our parameter class of type T # using BaseModel.model_validate (https://docs.pydantic.dev/latest/api/base_model/#pydantic.BaseModel.model_validate) # (Pydantic already provides nice error messages, so we don't need to worry about catching this.) T_parsed = T_class.model_validate(args) - return cls.run(inputs, output_file, T_parsed, container_settings) + return cls.run(inputs, output_file, T_parsed, container_settings, run_settings) @staticmethod @abstractmethod - def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings): + def run(inputs: dict[str, str | os.PathLike], output_file: str | os.PathLike, args: T, container_settings: ProcessedContainerSettings, run_settings: RunSettings): """ - Runs an algorithm with the specified inputs, algorithm params (T), - the designated output_file, and the desired container_settings. - - See the algorithm-specific `generate_inputs` and `parse_output` + Runs an algorithm. + @param inputs: specified inputs + @param output_file: designated reconstructed pathway output + @param args: (T) typed algorithm params + @param container_settings: what settings should be associated with the individual container. + @param run_settings: The particular run settings to use. See `RunSettings` for more info. + + See the algorithm-specific `PRM.generate_inputs` and `PRM.parse_output` for information about the input and output format. + + Also see `PRM.run_typeless` for the non-pydantic version of this method (where `args` is a dict). """ raise NotImplementedError diff --git a/spras/responsenet.py b/spras/responsenet.py index f53b5984..893fd841 100644 --- a/spras/responsenet.py +++ b/spras/responsenet.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.interactome import ( convert_undirected_to_directed, @@ -63,10 +64,11 @@ def generate_inputs(data, filename_map): header=False) @staticmethod - def run(inputs, output_file, args=None, container_settings=None): + def run(inputs, output_file, args=None, container_settings=None, run_settings=None): + if not args: args = ResponseNetParams() + if not run_settings: run_settings = RunSettings() if not container_settings: container_settings = ProcessedContainerSettings() ResponseNet.validate_required_run_args(inputs) - if not args: args = ResponseNetParams() # the data files will be mapped within this directory within the container work_dir = '/ResponseNet' @@ -112,7 +114,8 @@ def run(inputs, output_file, args=None, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename out_file_suffixed.rename(output_file) @@ -136,7 +139,7 @@ def parse_output(raw_pathway_file, standardized_pathway_file, params): df = raw_pathway_df(raw_pathway_file, sep='\t', header=0) if not df.empty: df.columns = ['Node1', 'Node2', 'Flow'] - df = df.drop(columns=['Flow'], axis=1) + df = df.drop(columns=['Flow']) df = add_rank_column(df) # ResponseNet's outputs should be treated as undirected outputs. df = reinsert_direction_col_undirected(df) diff --git a/spras/runner.py b/spras/runner.py index fb9391f9..994cf5c7 100644 --- a/spras/runner.py +++ b/spras/runner.py @@ -1,8 +1,11 @@ +from os import PathLike from typing import Any, Mapping # supported algorithm imports from spras.allpairs import AllPairs from spras.btb import BowTieBuilder +from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.dataset import Dataset, DatasetSchema from spras.diamond import DIAMOnD from spras.domino import DOMINO @@ -38,14 +41,21 @@ def get_algorithm(algorithm: str) -> type[PRM]: except KeyError as exc: raise NotImplementedError(f'{algorithm} is not currently supported.') from exc -def run(algorithm: str, inputs, output_file, args, container_settings): +def run( + algorithm: str, + inputs: dict[str, str | PathLike], + output_file: str | PathLike, + args: dict[str, Any], + container_settings: ProcessedContainerSettings, + run_settings: RunSettings +): """ A generic interface to the algorithm-specific run functions """ algorithm_runner = get_algorithm(algorithm) # We can't use config.config here else we would get a cyclic dependency. # Since args is a dict here, we use the 'run_typeless' utility PRM function. - algorithm_runner.run_typeless(inputs, output_file, args, container_settings) + algorithm_runner.run_typeless(inputs, output_file, args, container_settings, run_settings) def get_required_inputs(algorithm: str): diff --git a/spras/rwr.py b/spras/rwr.py index e85eef8a..38bc4534 100644 --- a/spras/rwr.py +++ b/spras/rwr.py @@ -5,6 +5,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -52,8 +53,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() RWR.validate_required_run_args(inputs) with Path(inputs["network"]).open() as network_f: @@ -99,7 +101,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') diff --git a/spras/strwr.py b/spras/strwr.py index ea5bc274..b3fd86c3 100644 --- a/spras/strwr.py +++ b/spras/strwr.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict from spras.config.container_schema import ProcessedContainerSettings +from spras.config.runs import RunSettings from spras.containers import prepare_volume, run_container_and_log from spras.dataset import Dataset from spras.interactome import ( @@ -52,8 +53,9 @@ def generate_inputs(data, filename_map): edges.to_csv(filename_map['network'],sep='|',index=False,columns=['Interactor1','Interactor2'],header=False) @staticmethod - def run(inputs, output_file, args, container_settings=None): + def run(inputs, output_file, args, container_settings=None, run_settings=None): if not container_settings: container_settings = ProcessedContainerSettings() + if not run_settings: run_settings = RunSettings() ST_RWR.validate_required_run_args(inputs) with Path(inputs["network"]).open() as network_f: @@ -104,7 +106,8 @@ def run(inputs, output_file, args, container_settings=None): volumes, work_dir, out_dir, - container_settings) + container_settings, + run_settings.timeout) # Rename the primary output file to match the desired output filename output_edges = Path(out_dir, 'output.txt') diff --git a/test/analysis/input/egfr.yaml b/test/analysis/input/egfr.yaml index c9ed5f73..46bbfe00 100644 --- a/test/analysis/input/egfr.yaml +++ b/test/analysis/input/egfr.yaml @@ -12,14 +12,16 @@ algorithms: include: true runs: run1: - k: [10, 20] + params: + k: [10, 20] - name: meo include: true runs: run1: - local_search: true - max_path_length: 3 - rand_restarts: 10 + params: + local_search: true + max_path_length: 3 + rand_restarts: 10 datasets: - data_dir: "input" edge_files: diff --git a/test/analysis/input/example.yaml b/test/analysis/input/example.yaml index 1a4514c0..e80097cf 100644 --- a/test/analysis/input/example.yaml +++ b/test/analysis/input/example.yaml @@ -12,22 +12,25 @@ algorithms: include: true runs: run1: - k: range(100,201,100) + params: + k: range(100,201,100) - name: "meo" include: true runs: run1: - max_path_length: [3] - local_search: ["Yes"] - rand_restarts: [10] + params: + max_path_length: [3] + local_search: ["Yes"] + rand_restarts: [10] - name: "mincostflow" include: true runs: run1: - flow: [1] # The flow must be an int - capacity: [1] + params: + flow: [1] # The flow must be an int + capacity: [1] - name: "allpairs" include: true diff --git a/test/generate-inputs/inputs/test_config.yaml b/test/generate-inputs/inputs/test_config.yaml index 33900bdb..a4aa1e81 100644 --- a/test/generate-inputs/inputs/test_config.yaml +++ b/test/generate-inputs/inputs/test_config.yaml @@ -11,40 +11,46 @@ algorithms: include: true runs: run1: - k: range(100,201,100) + params: + k: range(100,201,100) - name: "omicsintegrator1" include: true runs: run1: - b: [5, 6] - w: np.linspace(0,5,2) - d: 10 + params: + b: [5, 6] + w: np.linspace(0,5,2) + d: 10 - name: "omicsintegrator2" include: true runs: run1: - b: 4 - g: 0 + params: + b: 4 + g: 0 run2: - b: 2 - g: 3 + params: + b: 2 + g: 3 - name: "meo" include: true runs: run1: - max_path_length: 3 - local_search: true - rand_restarts: 10 + params: + max_path_length: 3 + local_search: true + rand_restarts: 10 - name: "mincostflow" include: true runs: run1: - flow: 1 # The flow must be an int - capacity: 1 + params: + flow: 1 # The flow must be an int + capacity: 1 - name: "allpairs" include: true @@ -53,8 +59,9 @@ algorithms: include: true runs: run1: - slice_threshold: 0.3 - module_threshold: 0.05 + params: + slice_threshold: 0.3 + module_threshold: 0.05 datasets: - # Labels can only contain letters, numbers, or underscores diff --git a/test/test_config.py b/test/test_config.py index 41551c38..12b4a65f 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -66,27 +66,27 @@ def get_test_config(): "name": "omicsintegrator2", "include": True, "runs": { - "strings": {"dummy_mode": ["terminals", "others"], "b": 3}, + "strings": {"params": {"dummy_mode": ["terminals", "others"], "b": 3}}, # spacing in np.linspace is on purpose - "singleton_string_np_linspace": {"dummy_mode": "terminals", "b": "np.linspace(0, 5,2,)"}, - "str_array_np_logspace": {"dummy_mode": ["others", "all"], "g": "np.logspace(1,1)"} + "singleton_string_np_linspace": {"params": {"dummy_mode": "terminals", "b": "np.linspace(0, 5,2,)"}}, + "str_array_np_logspace": {"params": {"dummy_mode": ["others", "all"], "g": "np.logspace(1,1)"}} } }, { "name": "meo", "include": True, "runs": { - "numbersAndBoolsDuplicate": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}, - "numbersAndBool": {"max_path_length": 2, "rand_restarts": [float(2.0), 3], "local_search": [True]}, - "numbersAndBools": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}, - "boolArrTest": {"local_search": [True, False], "max_path_length": "range(1, 3)"} + "numbersAndBoolsDuplicate": {"params": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}}, + "numbersAndBool": {"params": {"max_path_length": 2, "rand_restarts": [float(2.0), 3], "local_search": [True]}}, + "numbersAndBools": {"params": {"max_path_length": 1, "rand_restarts": [float(2.0), 3], "local_search": [True, False]}}, + "boolArrTest": {"params": {"local_search": [True, False], "max_path_length": "range(1, 3)"}} } }, { "name": "mincostflow", "include": True, "runs": { - "int64artifact": {"flow": "np.arange(5, 7)", "capacity": [2, 3]} + "int64artifact": {"params": {"flow": "np.arange(5, 7)", "capacity": [2, 3]}} } }, ], @@ -110,7 +110,7 @@ def get_test_config(): return test_raw_config -def value_test_util(alg: str, run_name: str, param_type: type[BaseModel], configurations: Iterable[BaseModel]): +def value_test_util[T: BaseModel](alg: str, run_name: str, param_type: type[T], configurations: Iterable[T]): """ Utility test function to be able to test against certain named runs under algorithms. This is, unfortunately, a very holistic function that depends