From 9a6a4bba93f7ce6f4d9db33a2f4e7b9975003ddd Mon Sep 17 00:00:00 2001 From: Alejandro Aristizabal Date: Wed, 11 Dec 2024 16:46:42 -0500 Subject: [PATCH 1/3] Move rano data prep to examples --- examples/RANO/data_preparator/.gitignore | 12 + examples/RANO/data_preparator/Dockerfile | 35 ++ .../data_preparator/mlcube/SanityCheck.py | 343 +++++++++++++++ examples/RANO/data_preparator/mlcube/clean.sh | 5 + .../RANO/data_preparator/mlcube/constants.py | 43 ++ .../RANO/data_preparator/mlcube/mlcube.yaml | 50 +++ examples/RANO/data_preparator/mlcube/tests.sh | 63 +++ .../RANO/data_preparator/mlcube/tests_sing.sh | 62 +++ .../mlcube/workspace/parameters.yaml | 21 + .../data_preparator/project/Dockerfile.dev | 17 + .../RANO/data_preparator/project/README.md | 12 + .../RANO/data_preparator/project/mlcube.py | 66 +++ .../RANO/data_preparator/project/prepare.py | 182 ++++++++ .../data_preparator/project/requirements.txt | 11 + .../data_preparator/project/sanity_check.py | 45 ++ .../project/stages/comparison.py | 168 +++++++ .../data_preparator/project/stages/confirm.py | 155 +++++++ .../project/stages/dset_stage.py | 32 ++ .../data_preparator/project/stages/extract.py | 179 ++++++++ .../project/stages/extract_nnunet.py | 187 ++++++++ .../project/stages/generate_report.py | 414 ++++++++++++++++++ .../data_preparator/project/stages/get_csv.py | 132 ++++++ .../data_preparator/project/stages/manual.py | 224 ++++++++++ .../project/stages/mlcube_constants.py | 19 + .../project/stages/nifti_transform.py | 166 +++++++ .../project/stages/pipeline.py | 313 +++++++++++++ .../project/stages/row_stage.py | 34 ++ .../data_preparator/project/stages/split.py | 101 +++++ .../data_preparator/project/stages/stage.py | 5 + .../data_preparator/project/stages/utils.py | 150 +++++++ .../data_preparator/project/statistics.py | 53 +++ 31 files changed, 3299 insertions(+) create mode 100644 examples/RANO/data_preparator/.gitignore create mode 100644 examples/RANO/data_preparator/Dockerfile create mode 100644 examples/RANO/data_preparator/mlcube/SanityCheck.py create mode 100644 examples/RANO/data_preparator/mlcube/clean.sh create mode 100644 examples/RANO/data_preparator/mlcube/constants.py create mode 100644 examples/RANO/data_preparator/mlcube/mlcube.yaml create mode 100644 examples/RANO/data_preparator/mlcube/tests.sh create mode 100644 examples/RANO/data_preparator/mlcube/tests_sing.sh create mode 100644 examples/RANO/data_preparator/mlcube/workspace/parameters.yaml create mode 100644 examples/RANO/data_preparator/project/Dockerfile.dev create mode 100644 examples/RANO/data_preparator/project/README.md create mode 100644 examples/RANO/data_preparator/project/mlcube.py create mode 100644 examples/RANO/data_preparator/project/prepare.py create mode 100644 examples/RANO/data_preparator/project/requirements.txt create mode 100644 examples/RANO/data_preparator/project/sanity_check.py create mode 100644 examples/RANO/data_preparator/project/stages/comparison.py create mode 100644 examples/RANO/data_preparator/project/stages/confirm.py create mode 100644 examples/RANO/data_preparator/project/stages/dset_stage.py create mode 100644 examples/RANO/data_preparator/project/stages/extract.py create mode 100644 examples/RANO/data_preparator/project/stages/extract_nnunet.py create mode 100644 examples/RANO/data_preparator/project/stages/generate_report.py create mode 100644 examples/RANO/data_preparator/project/stages/get_csv.py create mode 100644 examples/RANO/data_preparator/project/stages/manual.py create mode 100644 examples/RANO/data_preparator/project/stages/mlcube_constants.py create mode 100644 examples/RANO/data_preparator/project/stages/nifti_transform.py create mode 100644 examples/RANO/data_preparator/project/stages/pipeline.py create mode 100644 examples/RANO/data_preparator/project/stages/row_stage.py create mode 100644 examples/RANO/data_preparator/project/stages/split.py create mode 100644 examples/RANO/data_preparator/project/stages/stage.py create mode 100644 examples/RANO/data_preparator/project/stages/utils.py create mode 100644 examples/RANO/data_preparator/project/statistics.py diff --git a/examples/RANO/data_preparator/.gitignore b/examples/RANO/data_preparator/.gitignore new file mode 100644 index 000000000..b34df3efb --- /dev/null +++ b/examples/RANO/data_preparator/.gitignore @@ -0,0 +1,12 @@ +*.csv +*.txt +*.nii.gz +*.zip +*.mat +*.dcm +*.png +*/mlcube/workspace/* +!requirements.txt +!*/mlcube/workspace/parameters.yaml +models +tmpmodel \ No newline at end of file diff --git a/examples/RANO/data_preparator/Dockerfile b/examples/RANO/data_preparator/Dockerfile new file mode 100644 index 000000000..e75bd646e --- /dev/null +++ b/examples/RANO/data_preparator/Dockerfile @@ -0,0 +1,35 @@ +FROM locally-built-fetstool AS data_prep + +RUN find /Front-End/bin/install/appdir/usr/bin -type f \( -perm -u=x -o -type l \) -exec cp -P {} /usr/bin \; + +WORKDIR / + +COPY ./mlcubes/data_preparation/project/requirements.txt /project/requirements.txt + +RUN pip install --upgrade pip + +RUN pip install -r /project/requirements.txt + +ENV LANG C.UTF-8 + +RUN mkdir /project/stages + +RUN cp /Front-End/bin/install/appdir/usr/bin/*.py /project/stages/ + +RUN cp -R /Front-End/bin/install/appdir/usr/bin/data_prep_models /project/stages/data_prep_models + +# Hotfix: install more recent version of GaNDLF for metrics generation +RUN pip install git+https://github.com/mlcommons/GaNDLF@616b37bafad8f89d5c816a88f44fa30470601311 + +# setup a separate env for nnunet +RUN python -m venv /nnunet_env && /nnunet_env/bin/pip install --upgrade pip + +RUN /nnunet_env/bin/pip install --default-timeout=1000 torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu118 + +RUN /nnunet_env/bin/pip install git+https://github.com/MIC-DKFZ/nnUNet.git@nnunetv1 + +ENV CUDA_VISIBLE_DEVICES="0" + +COPY ./mlcubes/data_preparation/project /project + +ENTRYPOINT ["python", "/project/mlcube.py"] diff --git a/examples/RANO/data_preparator/mlcube/SanityCheck.py b/examples/RANO/data_preparator/mlcube/SanityCheck.py new file mode 100644 index 000000000..7219bc60f --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/SanityCheck.py @@ -0,0 +1,343 @@ +import os, argparse, sys, platform +from copy import deepcopy +from datetime import date +import SimpleITK as sitk +import numpy as np + + +def read_image_with_min_check(filename): + """ + this function fixes negatives by scaling + if min(input) < 0: + for all x in image: + if x != 0: + x -= min + """ + input_image = sitk.ReadImage(filename) + input_image_array = sitk.GetArrayFromImage(input_image) + min = np.min(input_image_array) + + # fixme: apply following logic + # check for connected components with less than 100 voxels + ## if less than the threshold, then apply above logic to the negative voxels + ## else, give error to user for manual QC + if min < 0: + print("Negative values in", filename) + + return input_image + + +def imageSanityCheck(targetImageFile, inputImageFile) -> bool: + """ + This function does sanity checking of 2 images + """ + targetImage = read_image_with_min_check(targetImageFile) + inputImage = read_image_with_min_check(inputImageFile) + + size = targetImage.GetSize() + size_expected = np.array([240, 240, 155]) + if not (np.array_equal(size, size_expected)): + print( + "Size for target image, '" + + targetImageFile + + "' is not in the BraTS format", + size_expected, + file=sys.stderr, + ) + return False + + if targetImage.GetDimension() != 3: + print( + "Dimension for target image, '" + targetImageFile + "' is not 3", + file=sys.stderr, + ) + return False + + if inputImage.GetDimension() != 3: + print( + "Dimension for input image, '" + inputImageFile + "' is not 3", + file=sys.stderr, + ) + return False + + commonMessage = ( + " mismatch for target image, '" + + targetImageFile + + "' and input image, '" + + inputImageFile + + "'" + ) + problemsIn = "" + returnTrue = True + + if targetImage.GetSize() != inputImage.GetSize(): + if not problemsIn: + problemsIn += "Size" + else: + problemsIn += ", Size" + returnTrue = False + + if targetImage.GetOrigin() != inputImage.GetOrigin(): + if not problemsIn: + problemsIn += "Origin" + else: + problemsIn += ", Origin" + returnTrue = False + + if targetImage.GetSpacing() != inputImage.GetSpacing(): + if not problemsIn: + problemsIn += "Spacing" + else: + problemsIn += ", Spacing" + returnTrue = False + + if returnTrue: + return True + else: + print(problemsIn + commonMessage, file=sys.stderr) + return False + + +def checkBraTSLabels( + subject_id, currentLabelFile, label_values_expected=np.array([0, 1, 2, 4]) +) -> str: + """ + This function checks for the expected labels and returns a string that will be provided as output for user + """ + returnString = "" + mask_array = sitk.GetArrayFromImage(sitk.ReadImage(currentLabelFile)) + # get unique elements and their counts + unique, counts = np.unique(mask_array, return_counts=True) + # this is for the case where the label contains numbers other than 0,1,2,4 + if not (np.array_equal(unique, label_values_expected)): + for j in range(0, len(unique)): # iterate over a range to get counts easier + if not (unique[j] in label_values_expected): + if ( + counts[j] > 1000 + ): # threshold for mis-labelling, anything less is ignored + returnString += ( + subject_id + + "," + + currentLabelFile + + "," + + str(unique[j]) + + "," + + str(counts[j]) + + "\n" + ) + + return returnString + + +def fixForLabelThree(currentLabelFile): + """ + This function checks for the label '3' and changes it to '4' and save it in the same location + """ + base_image = sitk.ReadImage(currentLabelFile) + mask_array = sitk.GetArrayFromImage(sitk.ReadImage(currentLabelFile)) + unique = np.sort(np.unique(mask_array)) + if unique[-1] == 3: + mask_array[mask_array == 3] = 4 + image_to_write = sitk.GetImageFromArray(mask_array) + image_to_write.CopyInformation(base_image) + sitk.WriteImage(image_to_write, currentLabelFile) + + +def main(): + copyrightMessage = ( + "Contact: admin@fets.ai\n\n" + + "This program is NOT FDA/CE approved and NOT intended for clinical use.\nCopyright (c) " + + str(date.today().year) + + " University of Pennsylvania. All rights reserved." + ) + parser = argparse.ArgumentParser( + prog="SanityCheck", + formatter_class=argparse.RawTextHelpFormatter, + description="This application performs rudimentary sanity checks the input data folder for FeTS training.\n\n" + + copyrightMessage, + ) + parser.add_argument( + "-inputDir", + type=str, + help="The input directory (DataForFeTS) that needs to be checked", + required=True, + ) + parser.add_argument( + "-outputFile", + type=str, + help="The CSV file of outputs, which is only generated if there are problematic cases", + required=True, + ) + + args = parser.parse_args() + inputDir = args.inputDir + + if not os.path.isdir(inputDir): + sys.exit("The specified inputDir is not present, please try again") + + errorMessage = "Subject_ID,Recommendation_for_initial_annotations\n" + numberOfProblematicCases = 0 + + # initialize modality dict + files_to_check = { + "T1": "_t1.nii.gz", + "T1CE": "_t1ce.nii.gz", + "T2": "_t2.nii.gz", + "FL": "_flair.nii.gz", + "MASK": "_final_seg.nii.gz", + } + + label_values_expected = np.array([0, 1, 2, 4]) # initialize label array + + for dirs in os.listdir(inputDir): + if (dirs != "logs") and ( + dirs != "split_info" + ): # don't perform sanity check for the 'logs' folder + currentSubjectDir = os.path.join(inputDir, dirs) + if os.path.isdir(currentSubjectDir): # for detected subject dir + filesInDir = os.listdir( + currentSubjectDir + ) # get all files in each directory + files_for_subject = {} + for i in range(len(filesInDir)): + for modality in files_to_check: # check all modalities + if filesInDir[i].endswith( + files_to_check[modality] + ): # if modality detected, populate subject dict + files_for_subject[modality] = os.path.abspath( + os.path.join(currentSubjectDir, filesInDir[i]) + ) + + currentSubjectsLabelIsAbsent = ( + False # check if current subject's final_seg is present or not + ) + all_modalities_present = True + if ( + len(files_for_subject) != 5 + ): # if all modalities are not present, add exit statement + if ( + (len(files_for_subject) == 4) and ("MASK" in files_for_subject) + ) or (len(files_for_subject) < 4): + numberOfProblematicCases += 1 + errorMessage += ( + dirs + ",All_required_modalities_are_not_present.\n" + ) + all_modalities_present = False + + if all_modalities_present and len(files_for_subject) > 0: + first, *rest = files_for_subject.items() # split the dict + for i in range(0, len(rest)): + if not ( + imageSanityCheck(first[1], rest[i][1]) + ): # image sanity check + numberOfProblematicCases += 1 + errorMessage += ( + dirs + + ",Image_dimension/size/origin/spacing_mismatch_between_" + + first[0] + + "_and_" + + rest[i][0] + + "\n" + ) + + currentSubjectsLabelIsProblematic = ( + False # check if current subject's label has issues + ) + if "MASK" in files_for_subject: + currentLabelFile = files_for_subject["MASK"] + fixForLabelThree(currentLabelFile) + returnString = checkBraTSLabels(dirs, currentLabelFile) + if ( + returnString + ): # if there is something present in the return string + numberOfProblematicCases += 1 + currentSubjectsLabelIsProblematic = True + errorMessage += returnString + else: + currentSubjectsLabelIsAbsent = True + + fusionToRecommend = "" + segmentationsForQCPresent = True + problematicSegmentationMessage = "" + if ( + currentSubjectsLabelIsProblematic + or currentSubjectsLabelIsAbsent + ): # if final_seg is absent or is problematic + segmentationsFolder = os.path.join( + currentSubjectDir, "SegmentationsForQC" + ) + if os.path.isdir(segmentationsFolder): + segmentationFiles = os.listdir( + segmentationsFolder + ) # get all files in each directory + for i in range(len(segmentationFiles)): + if ( + "fused" in segmentationFiles[i] + ): # only perform checks for fusion results + currentLabelFile = os.path.join( + segmentationsFolder, segmentationFiles[i] + ) + returnString = checkBraTSLabels( + dirs, currentLabelFile + ) + if ( + returnString + ): # if there is something present in the return string + problematicSegmentationMessage += returnString + else: + if not ( + "staple" in fusionToRecommend + ): # overwrite the fusion result to recommend if not staple that was fine + fusionToRecommend = currentLabelFile + + if not fusionToRecommend: + errorMessage += problematicSegmentationMessage + if not ( + "staple" in fusionToRecommend + ): # recommend nnunet or deepscan if not staple + if not ("itkvoting" in fusionToRecommend): + if not ("majorityvoting" in fusionToRecommend): + fusionToRecommend = "nnunet_or_deepscan" + else: + fusionToRecommend = "majorityvoting" + else: + fusionToRecommend = "itkvoting" + else: + fusionToRecommend = "staple" + + else: + errorMessage += ( + dirs + ",SegmentationsForQC_folder_is_absent\n" + ) + numberOfProblematicCases += 1 + segmentationsForQCPresent = False + # errorMessage += dirs + ',Label_file_absent,N.A.,N.A.\n' + + if currentSubjectsLabelIsAbsent and segmentationsForQCPresent: + numberOfProblematicCases += 1 + if fusionToRecommend: + errorMessage += dirs + "," + fusionToRecommend + "\n" + else: + errorMessage += ( + dirs + + ",final_seg_absent_and_use_either_nnunet_or_deepscan,N.A.,N.A.\n" + ) + + if numberOfProblematicCases > 0: + # print(errorMessage) + with open(args.outputFile, "a") as the_file: + the_file.write(errorMessage) + sys.exit( + "There were subjects with either missing annotations or where annotations had problematic labels. Please see the recommendation(s) for new initialization in the outputFile: '" + + args.outputFile + + "'" + ) + else: + print("Congratulations, all subjects are fine and ready to train!") + + +if __name__ == "__main__": + if platform.system().lower() == "darwin": + sys.exit("macOS is not supported") + else: + main() diff --git a/examples/RANO/data_preparator/mlcube/clean.sh b/examples/RANO/data_preparator/mlcube/clean.sh new file mode 100644 index 000000000..d0bbc3cc2 --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/clean.sh @@ -0,0 +1,5 @@ +rm -rf workspace/data +rm -rf workspace/labels +rm -rf workspace/metadata +rm -rf workspace/report +rm -rf workspace/statistics diff --git a/examples/RANO/data_preparator/mlcube/constants.py b/examples/RANO/data_preparator/mlcube/constants.py new file mode 100644 index 000000000..2f3109d32 --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/constants.py @@ -0,0 +1,43 @@ +# check against all these modality ID strings with extensions +MODALITY_ID_DICT = { + "T1": ["t1", "t1pre", "t1precontrast", "t1n"], + "T1GD": ["t1ce", "t1gd", "t1post", "t1postcontrast", "t1gallodinium", "t1c"], + "T2": ["t2", "t2w"], + "FLAIR": ["flair", "fl", "t2flair", "t2f"], +} +# this is used to keep a mapping between the fets1 nomenclature +MODALITY_ID_MAPPING = { + "T1": "t1n", + "T1GD": "t1c", + "T2": "t2w", + "FLAIR": "t2f", +} +MODALITIES_LIST = list(MODALITY_ID_DICT.keys()) +SUBJECT_NAMES = {"patientid", "subjectid", "subject", "subid"} +TIMEPOINT_NAMES = {"timepoint", "tp", "time", "series", "subseries"} +INPUT_FILENAMES = { + "T1": "T1_to_SRI.nii.gz", + "T1GD": "T1CE_to_SRI.nii.gz", + "T2": "T2_to_SRI.nii.gz", + "FLAIR": "FL_to_SRI.nii.gz", +} + +GANDLF_DF_COLUMNS = ["SubjectID", "Channel_0"] + +INTERIM_FOLDER = "DataForQC" +FINAL_FOLDER = "DataForFeTS" +TUMOR_MASK_FOLDER = "TumorMasksForQC" +TESTING_FOLDER = "testing" +REORIENTED_FOLDER = "reoriented" + +BRAIN_FILENAME = "gandlf_brain_extraction.csv" +TUMOR_FILENAME = "gandlf_tumor_segmentation.csv" +SUBJECTS_FILENAME = "processed_data.csv" +NEG_SUBJECTS_FILENAME = "QC_subjects_with_negative_intensities.csv" +FAIL_SUBJECTS_FILENAME = "QC_subjects_with_bratspipeline_error.csv" +DICOM_ANON_FILENAME = "dicom_tag_information_to_write_anon.yaml" +DICOM_COLLAB_FILENAME = "dicom_tag_information_to_write_collab.yaml" +STDOUT_FILENAME = "preparedataset_stdout.txt" +STDERR_FILENAME = "preparedataset_stderr.txt" + +EXEC_NAME = "BraTSPipeline" diff --git a/examples/RANO/data_preparator/mlcube/mlcube.yaml b/examples/RANO/data_preparator/mlcube/mlcube.yaml new file mode 100644 index 000000000..bed726942 --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/mlcube.yaml @@ -0,0 +1,50 @@ +name: Data Preparator MLCube with Manual preparation steps +description: Data Preparator MLCube showcasing examples were automated and manual steps are required. Provided by MLCommons +authors: + - {name: MLCommons} + +platform: + accelerator_count: 0 + +docker: + # Image name + image: mlcommons/rano-data-prep-mlcube:1.0.10 + # Docker build context relative to $MLCUBE_ROOT. Default is `build`. + build_context: "../project" + # Docker file name within docker build context, default is `Dockerfile`. + build_file: "Dockerfile" + +tasks: + prepare: + parameters: + inputs: { + data_path: input_data, + labels_path: input_labels, + parameters_file: parameters.yaml, + models: additional_files/models, + } + outputs: { + output_path: data/, + output_labels_path: labels/, + report_file: {type: file, default: report.yaml}, + metadata_path: metadata/, + } + sanity_check: + parameters: + inputs: { + data_path: data/, + labels_path: labels/, + parameters_file: parameters.yaml, + metadata_path: metadata/, + } + statistics: + parameters: + inputs: { + data_path: data/, + labels_path: labels/, + parameters_file: parameters.yaml, + metadata_path: metadata/, + } + outputs: { + output_path: {type: file, default: statistics.yaml} + } \ No newline at end of file diff --git a/examples/RANO/data_preparator/mlcube/tests.sh b/examples/RANO/data_preparator/mlcube/tests.sh new file mode 100644 index 000000000..8c7880805 --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/tests.sh @@ -0,0 +1,63 @@ + +DATA=./workspace/data + +run() { +mlcube run --mlcube ./mlcube.yaml --task prepare --network=none --mount=ro \ + report_file=report/report.yaml \ + labels_path=input_data \ + -Pdocker.cpu_args="-u $(id -u):$(id -g)" \ + -Pdocker.gpu_args="-u $(id -u):$(id -g)" +} + +run_other() { +mlcube run --mlcube ./mlcube.yaml --task sanity_check --network=none --mount=ro \ + -Pdocker.cpu_args="-u $(id -u):$(id -g)" \ + -Pdocker.gpu_args="-u $(id -u):$(id -g)" + +mlcube run --mlcube ./mlcube.yaml --task statistics --network=none --mount=ro \ + output_path=statistics/statistics.yaml \ + -Pdocker.cpu_args="-u $(id -u):$(id -g)" \ + -Pdocker.gpu_args="-u $(id -u):$(id -g)" +} + +STARTTIME=$(date +%s.%N) + + +run + +# manual review +cp $DATA/tumor_extracted/DataForQC/AAAC_1/2008.03.31/TumorMasksForQC/AAAC_1_2008.03.31_tumorMask_model_0.nii.gz \ + $DATA/tumor_extracted/DataForQC/AAAC_1/2008.03.31/TumorMasksForQC/finalized/AAAC_1_2008.03.31_tumorMask_model_0.nii.gz + +cp $DATA/tumor_extracted/DataForQC/AAAC_1/2012.01.02/TumorMasksForQC/AAAC_1_2012.01.02_tumorMask_model_0.nii.gz \ + $DATA/tumor_extracted/DataForQC/AAAC_1/2012.01.02/TumorMasksForQC/finalized/AAAC_1_2012.01.02_tumorMask_model_0.nii.gz + +cp $DATA/tumor_extracted/DataForQC/AAAC_2/2001.01.01/TumorMasksForQC/AAAC_2_2001.01.01_tumorMask_model_0.nii.gz \ + $DATA/tumor_extracted/DataForQC/AAAC_2/2001.01.01/TumorMasksForQC/finalized/AAAC_2_2001.01.01_tumorMask_model_0.nii.gz +# end manual review + +run & +PID=$! + +# prompt response +BREAK=0 +while [ $BREAK -eq "0" ] +do +if [ -f $DATA/".prompt.txt" ]; + then BREAK=1; +else + sleep 0.1s; +fi + +done + +echo -n "y" >> $DATA/.response.txt +# end prompt response + +wait ${PID} + +ENDTIME=$(date +%s.%N) +DIFF=$(echo "$ENDTIME - $STARTTIME" | bc) +echo $DIFF + +run_other \ No newline at end of file diff --git a/examples/RANO/data_preparator/mlcube/tests_sing.sh b/examples/RANO/data_preparator/mlcube/tests_sing.sh new file mode 100644 index 000000000..df9762c34 --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/tests_sing.sh @@ -0,0 +1,62 @@ + +DATA=./workspace/data + +run() { +mlcube run --mlcube ./mlcube.yaml --task prepare --network=none --mount=ro --platform=singularity \ + report_file=report/report.yaml \ + labels_path=input_data \ + -Psingularity.run_args="-nce" +} + +run_other() { +mlcube run --mlcube ./mlcube.yaml --task sanity_check --network=none --mount=ro --platform=singularity \ + -Psingularity.run_args="-nce" + + +mlcube run --mlcube ./mlcube.yaml --task statistics --network=none --mount=ro --platform=singularity \ + output_path=statistics/statistics.yaml \ + -Psingularity.run_args="-nce" + +} + +STARTTIME=$(date +%s.%N) + + +run + +# manual review +cp $DATA/tumor_extracted/DataForQC/AAAC_1/2008.03.31/TumorMasksForQC/AAAC_1_2008.03.31_tumorMask_model_0.nii.gz \ + $DATA/tumor_extracted/DataForQC/AAAC_1/2008.03.31/TumorMasksForQC/finalized/AAAC_1_2008.03.31_tumorMask_model_0.nii.gz + +cp $DATA/tumor_extracted/DataForQC/AAAC_1/2012.01.02/TumorMasksForQC/AAAC_1_2012.01.02_tumorMask_model_0.nii.gz \ + $DATA/tumor_extracted/DataForQC/AAAC_1/2012.01.02/TumorMasksForQC/finalized/AAAC_1_2012.01.02_tumorMask_model_0.nii.gz + +cp $DATA/tumor_extracted/DataForQC/AAAC_2/2001.01.01/TumorMasksForQC/AAAC_2_2001.01.01_tumorMask_model_0.nii.gz \ + $DATA/tumor_extracted/DataForQC/AAAC_2/2001.01.01/TumorMasksForQC/finalized/AAAC_2_2001.01.01_tumorMask_model_0.nii.gz +# end manual review + +run & +PID=$! + +# prompt response +BREAK=0 +while [ $BREAK -eq "0" ] +do +if [ -f $DATA/".prompt.txt" ]; + then BREAK=1; +else + sleep 0.1s; +fi + +done + +echo -n "y" >> $DATA/.response.txt +# end prompt response + +wait ${PID} + +ENDTIME=$(date +%s.%N) +DIFF=$(echo "$ENDTIME - $STARTTIME" | bc) +echo $DIFF + +run_other \ No newline at end of file diff --git a/examples/RANO/data_preparator/mlcube/workspace/parameters.yaml b/examples/RANO/data_preparator/mlcube/workspace/parameters.yaml new file mode 100644 index 000000000..7755bbbd7 --- /dev/null +++ b/examples/RANO/data_preparator/mlcube/workspace/parameters.yaml @@ -0,0 +1,21 @@ +seed: 2784 +train_percent: 0.8 +medperf_report_stages: +- "IDENTIFIED" +- "VALIDATED" +- "MISSING_MODALITIES" +- "EXTRA_MODALITIES" +- "VALIDATION_FAILED" +- "CONVERTED_TO_NIfTI" +- "NIfTI_CONVERSION_FAILED" +- "BRAIN_EXTRACT_FINISHED" +- "BRAIN_EXTRACT_FINISHED" +- "TUMOR_EXTRACT_FAILED" +- "MANUAL_REVIEW_COMPLETE" +- "MANUAL_REVIEW_REQUIRED" +- "MULTIPLE_ANNOTATIONS_ERROR" +- "COMPARISON_COMPLETE" +- "EXACT_MATCH_IDENTIFIED" +- "ANNOTATION_COMPARISON_FAILED" +- "ANNOTATION_CONFIRMED" +- "DONE" \ No newline at end of file diff --git a/examples/RANO/data_preparator/project/Dockerfile.dev b/examples/RANO/data_preparator/project/Dockerfile.dev new file mode 100644 index 000000000..4eb71cf2d --- /dev/null +++ b/examples/RANO/data_preparator/project/Dockerfile.dev @@ -0,0 +1,17 @@ +FROM hasan7/baselocal:0.0.0 + +COPY ./atlasImage_0.125.nii.gz /project +COPY ./tmpmodel /project + +# use a downsampled reference image for DICOM to NIFTI conversion +RUN mv /project/atlasImage_0.125.nii.gz /Front-End/bin/install/appdir/usr/data/sri24/atlasImage.nii.gz + +# remove heavy brain extraction models +RUN rm -rf /project/stages/data_prep_models/brain_extraction/model_0/ +RUN rm -rf /project/stages/data_prep_models/brain_extraction/model_1/ + +# use dummy brain extraction models +RUN cp -r /project/tmpmodel /project/stages/data_prep_models/brain_extraction/model_0 +RUN mv /project/tmpmodel /project/stages/data_prep_models/brain_extraction/model_1 + +ENTRYPOINT ["python", "/project/mlcube.py"] \ No newline at end of file diff --git a/examples/RANO/data_preparator/project/README.md b/examples/RANO/data_preparator/project/README.md new file mode 100644 index 000000000..46e8cd47d --- /dev/null +++ b/examples/RANO/data_preparator/project/README.md @@ -0,0 +1,12 @@ +# How to run tests + +1. Download and extract (sha256: 701fbba8b253fc5b2f54660837c493a38dec986df9bdbf3d97f07c8bc276a965): + + +2. Move `additional_files` and `input_data` to the mlcube workspace +3. Move `tmpmodel` and `atlasImage_0.125.nii.gz` to the mlcube project folder + +4. Build the base docker image from the repo's root folder Dockerfile +5. Build the dev docker image using `Dockerfile.dev` in the mlcube project folder. +6. Then change the docker image name in `mlcube.yaml` according to step 5. +7. Then go to `mlcube` folder and run the tests scripts diff --git a/examples/RANO/data_preparator/project/mlcube.py b/examples/RANO/data_preparator/project/mlcube.py new file mode 100644 index 000000000..20036ca89 --- /dev/null +++ b/examples/RANO/data_preparator/project/mlcube.py @@ -0,0 +1,66 @@ +"""MLCube handler file""" +import os +import typer +import subprocess +import shutil + +app = typer.Typer() + + +def exec_python(cmd: str, check_for_failure=True) -> None: + """Execute a python script as a subprocess + + Args: + cmd (str): command to run as would be written inside the terminal + """ + splitted_cmd = cmd.split() + process = subprocess.Popen(splitted_cmd, cwd=".") + process.wait() + if check_for_failure: + assert process.returncode == 0, f"command failed: {cmd}" + else: + if not process.returncode == 0: + exit(process.returncode) + + +@app.command("prepare") +def prepare( + data_path: str = typer.Option(..., "--data_path"), + labels_path: str = typer.Option(..., "--labels_path"), + parameters_file: str = typer.Option(..., "--parameters_file"), + models_path: str = typer.Option(..., "--models"), + output_path: str = typer.Option(..., "--output_path"), + output_labels_path: str = typer.Option(..., "--output_labels_path"), + report_file: str = typer.Option(..., "--report_file"), + metadata_path: str = typer.Option(..., "--metadata_path"), +): + cmd = f"python3 /project/prepare.py --data_path={data_path} --labels_path={labels_path} --models_path={models_path} --data_out={output_path} --labels_out={output_labels_path} --report={report_file} --parameters={parameters_file} --metadata_path={metadata_path}" + exec_python(cmd) + +@app.command("sanity_check") +def sanity_check( + data_path: str = typer.Option(..., "--data_path"), + labels_path: str = typer.Option(..., "--labels_path"), + parameters_file: str = typer.Option(..., "--parameters_file"), + metadata_path: str = typer.Option(..., "--metadata_path"), +): + # Modify the sanity_check command as needed + cmd = f"python3 /project/sanity_check.py --data_path={data_path} --labels_path={labels_path} --metadata={metadata_path}" + exec_python(cmd, check_for_failure=False) # Don't throw an error if it fails, to avoid traceback and confusion from users + + +@app.command("statistics") +def sanity_check( + data_path: str = typer.Option(..., "--data_path"), + labels_path: str = typer.Option(..., "--labels_path"), + parameters_file: str = typer.Option(..., "--parameters_file"), + metadata_path: str = typer.Option(..., "--metadata_path"), + out_path: str = typer.Option(..., "--output_path"), +): + # Modify the statistics command as needed + cmd = f"python3 /project/statistics.py --data_path={data_path} --labels_path={labels_path} --out_file={out_path} --metadata={metadata_path}" + exec_python(cmd) + + +if __name__ == "__main__": + app() diff --git a/examples/RANO/data_preparator/project/prepare.py b/examples/RANO/data_preparator/project/prepare.py new file mode 100644 index 000000000..e0eb3bf9d --- /dev/null +++ b/examples/RANO/data_preparator/project/prepare.py @@ -0,0 +1,182 @@ +import os +import shutil +import argparse +import pandas as pd +import yaml +from stages.generate_report import GenerateReport +from stages.get_csv import AddToCSV +from stages.nifti_transform import NIfTITransform +from stages.extract import Extract +from stages.extract_nnunet import ExtractNnUNet +from stages.manual import ManualStage +from stages.comparison import SegmentationComparisonStage +from stages.confirm import ConfirmStage +from stages.split import SplitStage +from stages.pipeline import Pipeline +from stages.mlcube_constants import * +from stages.constants import INTERIM_FOLDER, FINAL_FOLDER, TUMOR_MASK_FOLDER + +def find_csv_filenames(path_to_dir, suffix=".csv"): + filenames = os.listdir(path_to_dir) + return [filename for filename in filenames if filename.endswith(suffix)] + + +def setup_argparser(): + parser = argparse.ArgumentParser("Medperf Data Preparator Example") + parser.add_argument( + "--data_path", dest="data", type=str, help="path containing raw data" + ) + parser.add_argument( + "--labels_path", dest="labels", type=str, help="path containing labels" + ) + parser.add_argument( + "--models_path", dest="models", type=str, help="path to the nnunet models" + ) + parser.add_argument( + "--data_out", dest="data_out", type=str, help="path to store prepared data" + ) + parser.add_argument( + "--labels_out", + dest="labels_out", + type=str, + help="path to store prepared labels", + ) + parser.add_argument( + "--report", dest="report", type=str, help="path to the report csv file to store" + ) + parser.add_argument( + "--parameters", + dest="parameters", + type=str, + help="path to the parameters yaml file", + ) + parser.add_argument( + "--metadata_path", + dest="metadata_path", + type=str, + help="path to the local metadata folder" + ) + + return parser.parse_args() + + +def init_pipeline(args): + # RUN COLUMN-WISE PROCESSING + out_raw = os.path.join(args.data_out, RAW_PATH) + valid_data_out = os.path.join(args.data_out, VALID_PATH) + nifti_data_out = os.path.join(args.data_out, PREP_PATH) + brain_data_out = os.path.join(args.data_out, BRAIN_PATH) + tumor_data_out = os.path.join(args.data_out, TUMOR_PATH) + match_data_out = args.labels_out + backup_out = os.path.join(args.labels_out, TUMOR_BACKUP_PATH) + staging_folders = [ + out_raw, + valid_data_out, + nifti_data_out, + brain_data_out, + tumor_data_out, + backup_out, + ] + out_data_csv = os.path.join(args.data_out, OUT_CSV) + trash_folder = os.path.join(args.data_out, TRASH_PATH) + invalid_subjects_file = os.path.join(args.metadata_path, INVALID_FILE) + + loop = None + report_gen = GenerateReport( + out_data_csv, + args.data, + out_raw, + args.labels, + args.labels_out, + args.data_out, + DONE_STAGE_STATUS, + brain_data_out, + BRAIN_STAGE_STATUS, + tumor_data_out, + MANUAL_STAGE_STATUS\ + ) + csv_proc = AddToCSV(out_raw, out_data_csv, valid_data_out, out_raw) + nifti_proc = NIfTITransform(out_data_csv, nifti_data_out, valid_data_out, args.metadata_path, args.data_out) + brain_extract_proc = Extract( + out_data_csv, + brain_data_out, + INTERIM_FOLDER, + nifti_data_out, + INTERIM_FOLDER, + # loop, + "extract_brain", + BRAIN_STAGE_STATUS, + ) + tumor_extract_proc = ExtractNnUNet( + out_data_csv, + tumor_data_out, + INTERIM_FOLDER, + brain_data_out, + INTERIM_FOLDER, + TUMOR_STAGE_STATUS, + ) + manual_proc = ManualStage(out_data_csv, tumor_data_out, tumor_data_out, backup_out) + match_proc = SegmentationComparisonStage( + out_data_csv, + match_data_out, + tumor_data_out, + backup_out, + ) + confirm_proc = ConfirmStage( + out_data_csv, + args.data_out, + args.labels_out, + tumor_data_out, + backup_out, + staging_folders, + ) + split_proc = SplitStage( + args.parameters, args.data_out, args.labels_out, staging_folders + ) + stages = [ + csv_proc, + nifti_proc, + brain_extract_proc, + tumor_extract_proc, + manual_proc, + match_proc, + confirm_proc, + split_proc + ] + return Pipeline(report_gen, stages, staging_folders, [trash_folder], invalid_subjects_file) + +def init_report(args) -> pd.DataFrame: + report = None + if os.path.exists(args.report): + with open(args.report, "r") as f: + report_data = yaml.safe_load(f) + report = pd.DataFrame(report_data) + + return report + + +def main(): + args = setup_argparser() + + output_path = args.data_out + models_path = args.models + + tmpfolder = os.path.join(output_path, ".tmp") + cbica_tmpfolder = os.path.join(tmpfolder, ".cbicaTemp") + os.environ["TMPDIR"] = tmpfolder + os.environ["CBICA_TEMP_DIR"] = cbica_tmpfolder + os.makedirs(tmpfolder, exist_ok=True) + os.makedirs(cbica_tmpfolder, exist_ok=True) + os.environ["RESULTS_FOLDER"] = os.path.join(models_path, "nnUNet_trained_models") + os.environ["nnUNet_raw_data_base"] = os.path.join(tmpfolder, "nnUNet_raw_data_base") + os.environ["nnUNet_preprocessed"] = os.path.join(tmpfolder, "nnUNet_preprocessed") + + report = init_report(args) + pipeline = init_pipeline(args) + pipeline.run(report, args.report) + + # cleanup tmp folder + shutil.rmtree(tmpfolder, ignore_errors=True) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/RANO/data_preparator/project/requirements.txt b/examples/RANO/data_preparator/project/requirements.txt new file mode 100644 index 000000000..baa55254e --- /dev/null +++ b/examples/RANO/data_preparator/project/requirements.txt @@ -0,0 +1,11 @@ +typer==0.9.0 +pandas==1.5.3 +PyYAML==6.0.1 +# Include all your requirements here +SimpleITK==2.3.1 +tqdm==4.66.2 +scikit-image==0.21.0 +FigureGenerator==0.0.4 +gandlf==0.0.16 +labelfusion==1.0.14 +nibabel==5.1.0 \ No newline at end of file diff --git a/examples/RANO/data_preparator/project/sanity_check.py b/examples/RANO/data_preparator/project/sanity_check.py new file mode 100644 index 000000000..d2137e37b --- /dev/null +++ b/examples/RANO/data_preparator/project/sanity_check.py @@ -0,0 +1,45 @@ +import yaml +import argparse +import pandas as pd + +from stages.utils import has_prepared_folder_structure + + +def sanity_check(data_path: str, labels_path: str): + """Runs a few checks to ensure data quality and integrity + + Args: + data_path (str): Path to data. + labels_path (str): Path to labels. + """ + # Here you must add all the checks you consider important regarding the + # state of the data + if not has_prepared_folder_structure(data_path, labels_path): + print("The contents of the labels and data don't resemble a prepared dataset", flush=True) + exit(1) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Medperf Model Sanity Check Example") + parser.add_argument( + "--data_path", + dest="data", + type=str, + help="directory containing the prepared data", + ) + parser.add_argument( + "--labels_path", + dest="labels", + type=str, + help="directory containing the prepared labels", + ) + parser.add_argument( + "--metadata_path", + dest="metadata_path", + type=str, + help="path to the local metadata folder", + ) + + args = parser.parse_args() + + sanity_check(args.data, args.labels) diff --git a/examples/RANO/data_preparator/project/stages/comparison.py b/examples/RANO/data_preparator/project/stages/comparison.py new file mode 100644 index 000000000..32a217474 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/comparison.py @@ -0,0 +1,168 @@ +from typing import Union, Tuple +import os +import shutil + +import pandas as pd +from pandas import DataFrame +import numpy as np +import nibabel as nib + +from .row_stage import RowStage +from .utils import get_id_tp, update_row_with_dict, md5_file +from .constants import TUMOR_MASK_FOLDER, INTERIM_FOLDER +from .mlcube_constants import COMPARISON_STAGE_STATUS + + +class SegmentationComparisonStage(RowStage): + def __init__( + self, + data_csv: str, + out_path: str, + prev_stage_path, + backup_path: str, + ): + self.data_csv = data_csv + self.out_path = out_path + self.prev_stage_path = prev_stage_path + self.backup_path = backup_path + + @property + def name(self): + return "Label Segmentation Comparison" + + @property + def status_code(self): + return COMPARISON_STAGE_STATUS + + def __get_input_path(self, index: Union[str, int]) -> str: + id, tp = get_id_tp(index) + path = os.path.join( + self.prev_stage_path, INTERIM_FOLDER, id, tp, TUMOR_MASK_FOLDER, "finalized" + ) + return path + + def __get_backup_path(self, index: Union[str, int]) -> str: + id, tp = get_id_tp(index) + path = os.path.join(self.backup_path, id, tp, TUMOR_MASK_FOLDER) + return path + + def __get_output_path(self, index: Union[str, int]) -> str: + id, tp = get_id_tp(index) + path = os.path.join(self.out_path, id, tp) + return path + + def __get_case_path(self, index: Union[str, int]) -> str: + path = self.__get_input_path(index) + case = os.listdir(path)[0] + + return os.path.join(path, case) + + def __report_gt_not_found( + self, index: Union[str, int], report: pd.DataFrame, reviewed_hash: str + ) -> pd.DataFrame: + case_path = self.__get_case_path(index) + data_path = report.loc[index, "data_path"] + report_data = { + "status": -self.status_code - 0.2, # -6.2 + "data_path": data_path, + "labels_path": case_path, + "segmentation_hash": reviewed_hash, + } + update_row_with_dict(report, report_data, index) + return report + + def __report_exact_match( + self, index: Union[str, int], report: pd.DataFrame, reviewed_hash: str + ) -> pd.DataFrame: + case_path = self.__get_case_path(index) + data_path = report.loc[index, "data_path"] + report_data = { + "status": -self.status_code - 0.1, # -6.1 + "data_path": data_path, + "labels_path": case_path, + "num_changed_voxels": 0, + "segmentation_hash": reviewed_hash, + } + update_row_with_dict(report, report_data, index) + return report + + def __report_success( + self, + index: Union[str, int], + report: pd.DataFrame, + num_changed_voxels: int, + reviewed_hash: str, + ) -> pd.DataFrame: + case_path = self.__get_case_path(index) + data_path = report.loc[index, "data_path"] + report_data = { + "status": -self.status_code, # -6 + "data_path": data_path, + "labels_path": case_path, + "num_changed_voxels": num_changed_voxels, + "segmentation_hash": reviewed_hash, + } + update_row_with_dict(report, report_data, index) + return report + + def could_run(self, index: Union[str, int], report: DataFrame) -> bool: + print(f"Checking if {self.name} can run") + # Ensure a single reviewed segmentation file exists + path = self.__get_input_path(index) + gt_path = self.__get_backup_path(index) + + is_valid = True + path_exists = os.path.exists(path) + gt_path_exists = os.path.exists(gt_path) + contains_case = False + reviewed_hash = None + if path_exists: + cases = os.listdir(path) + num_cases = len(cases) + if num_cases: + reviewed_file = os.path.join(path, cases[0]) + reviewed_hash = md5_file(reviewed_file) + contains_case = num_cases == 1 + + prev_hash = report.loc[index]["segmentation_hash"] + hash_changed = prev_hash != reviewed_hash + print(f"{path_exists=} and {contains_case=} and {gt_path_exists=} and {hash_changed=}") + is_valid = path_exists and contains_case and gt_path_exists and hash_changed + + return is_valid + + def execute( + self, index: Union[str, int], report: DataFrame + ) -> Tuple[DataFrame, bool]: + path = self.__get_input_path(index) + cases = os.listdir(path) + + match_output_path = self.__get_output_path(index) + os.makedirs(match_output_path, exist_ok=True) + # Get the necessary files for match check + # We assume reviewed and gt files have the same name + reviewed_file = os.path.join(path, cases[0]) + reviewed_hash = md5_file(reviewed_file) + gt_file = os.path.join(self.__get_backup_path(index), cases[0]) + + if not os.path.exists(gt_file): + # Ground truth file not found, reviewed file most probably renamed + report = self.__report_gt_not_found( + index, report, reviewed_hash + ) + return report, False + + reviewed_img = nib.load(reviewed_file) + gt_img = nib.load(gt_file) + + reviewed_voxels = np.array(reviewed_img.dataobj) + gt_voxels = np.array(gt_img.dataobj) + + num_changed_voxels = np.sum(reviewed_voxels != gt_voxels) + + if num_changed_voxels == 0: + report = self.__report_exact_match(index, report, reviewed_hash) + return report, True + + report = self.__report_success(index, report, num_changed_voxels, reviewed_hash) + return report, True diff --git a/examples/RANO/data_preparator/project/stages/confirm.py b/examples/RANO/data_preparator/project/stages/confirm.py new file mode 100644 index 000000000..ae8307480 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/confirm.py @@ -0,0 +1,155 @@ +from typing import Union, Tuple +import os +import yaml +import shutil +from time import sleep +from typing import List + +import pandas as pd +from pandas import DataFrame + +from .dset_stage import DatasetStage +from .utils import get_id_tp, cleanup_storage +from .constants import TUMOR_MASK_FOLDER, INTERIM_FOLDER, FINAL_FOLDER +from .mlcube_constants import CONFIRM_STAGE_STATUS + + +class ConfirmStage(DatasetStage): + def __init__( + self, + data_csv: str, + out_data_path: str, + out_labels_path: str, + prev_stage_path: str, + backup_path: str, + staging_folders: List[str], + ): + self.data_csv = data_csv + self.out_data_path = out_data_path + self.out_labels_path = out_labels_path + self.prev_stage_path = prev_stage_path + self.backup_path = backup_path + self.staging_folders = staging_folders + self.prompt_file = ".prompt.txt" + self.response_file = ".response.txt" + + @property + def name(self): + return "Annotations Confirmation" + + @property + def status_code(self): + return CONFIRM_STAGE_STATUS + + def __get_input_data_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join(self.prev_stage_path, FINAL_FOLDER, id, tp) + return path + + def __get_input_label_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join( + self.prev_stage_path, INTERIM_FOLDER, id, tp, TUMOR_MASK_FOLDER, "finalized" + ) + case = os.listdir(path)[0] + + return os.path.join(path, case) + + def __get_output_data_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join(self.out_data_path, id, tp) + return path + + def __get_output_label_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join(self.out_labels_path, id, tp) + filename = f"{id}_{tp}_final_seg.nii.gz" + return path, filename + + def __confirm(self, exact_match_percent: float) -> bool: + exact_match_percent = round(exact_match_percent * 100, 2) + msg = ( + f"We've identified {exact_match_percent}% of cases have not been modified " + + "with respect to the baseline segmentation. Do you confirm this is intended? " + + "[Y]/n" + ) + + # user_input = input(msg).lower() + prompt_path = os.path.join(self.out_data_path, self.prompt_file) + response_path = os.path.join(self.out_data_path, self.response_file) + + with open(prompt_path, "w") as f: + f.write(msg) + + while not os.path.exists(response_path): + sleep(1) + + with open(response_path, "r") as f: + user_input = f.readline().strip() + + os.remove(prompt_path) + os.remove(response_path) + + return user_input == "y" or user_input == "" + + def __report_failure(self, report: DataFrame) -> DataFrame: + # For this stage, failure is done when the user doesn't confirm + # This means he probably wants to keep working on the data + # And needs to know which rows are exact matches. + # Because of this, failing this stage keeps the report intact + return report + + def __process_row(self, row: pd.Series) -> pd.Series: + """process a row by moving the required files + to their respective locations, and removing any extra files + + Args: + report (DataFrame): data preparation report + + Returns: + DataFrame: modified data preparation report + """ + index = row.name + input_data_path = self.__get_input_data_path(index) + input_label_filepath = self.__get_input_label_path(index) + output_data_path = self.__get_output_data_path(index) + output_label_path, filename = self.__get_output_label_path(index) + output_label_filepath = os.path.join(output_label_path, filename) + + shutil.rmtree(output_data_path, ignore_errors=True) + shutil.copytree(input_data_path, output_data_path) + os.makedirs(output_label_path, exist_ok=True) + shutil.copy(input_label_filepath, output_label_filepath) + + row["status"] = self.status_code + row["data_path"] = output_data_path + row["labels_path"] = output_label_path + return row + + def could_run(self, report: DataFrame) -> bool: + print(f"Checking if {self.name} can run") + # could run once all cases have been compared to the ground truth + missing_voxels = report["num_changed_voxels"].isnull().values.any() + prev_path_exists = os.path.exists(self.prev_stage_path) + empty_prev_path = True + if prev_path_exists: + empty_prev_path = len(os.listdir(self.prev_stage_path)) == 0 + + print(f"{prev_path_exists=} and not {empty_prev_path=} and not {missing_voxels=}") + return prev_path_exists and not empty_prev_path and not missing_voxels + + def execute(self, report: DataFrame) -> Tuple[DataFrame, bool]: + exact_match_percent = (report["num_changed_voxels"] == 0).sum() / len(report) + confirmed = self.__confirm(exact_match_percent) + + if not confirmed: + report = self.__report_failure(report) + return report, False + + report = report.apply(self.__process_row, axis=1) + # Remove all intermediary steps + cleanup_storage(self.staging_folders) + if os.path.exists(self.data_csv): + os.remove(self.data_csv) + + return report, True diff --git a/examples/RANO/data_preparator/project/stages/dset_stage.py b/examples/RANO/data_preparator/project/stages/dset_stage.py new file mode 100644 index 000000000..9792a9e26 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/dset_stage.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod +import pandas as pd +from typing import Tuple + +from .stage import Stage + + +class DatasetStage(Stage, ABC): + @abstractmethod + def could_run(self, report: pd.DataFrame) -> bool: + """Establishes if this step could be executed + + Args: + index (Union[str, int]): case index in the report + report (pd.DataFrame): Dataframe containing the current state of the preparation flow + + Returns: + bool: wether this stage could be executed + """ + + @abstractmethod + def execute(self, report: pd.DataFrame) -> Tuple[pd.DataFrame, bool]: + """Executes the stage + + Args: + index (Union[str, int]): case index in the report + report (pd.DataFrame): DataFrame containing the current state of the preparation flow + + Returns: + pd.DataFrame: Updated report dataframe + bool: Success status + """ diff --git a/examples/RANO/data_preparator/project/stages/extract.py b/examples/RANO/data_preparator/project/stages/extract.py new file mode 100644 index 000000000..8c76d96d4 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/extract.py @@ -0,0 +1,179 @@ +from typing import Union, List, Tuple +from tqdm import tqdm +import pandas as pd +import os +import shutil +import traceback + +from .row_stage import RowStage +from .PrepareDataset import Preparator, FINAL_FOLDER +from .utils import update_row_with_dict, get_id_tp, md5_file + + +class Extract(RowStage): + def __init__( + self, + data_csv: str, + out_path: str, + subpath: str, + prev_stage_path: str, + prev_subpath: str, + # pbar: tqdm, + func_name: str, + status_code: int, + extra_labels_path=[], + ): + self.data_csv = data_csv + self.out_path = out_path + self.subpath = subpath + self.data_subpath = FINAL_FOLDER + self.prev_path = prev_stage_path + self.prev_subpath = prev_subpath + os.makedirs(self.out_path, exist_ok=True) + self.prep = Preparator(data_csv, out_path, "BraTSPipeline") + self.func_name = func_name + self.func = getattr(self.prep, func_name) + self.pbar = tqdm() + self.failed = False + self.exception = None + self.__status_code = status_code + self.extra_labels_path = extra_labels_path + + @property + def name(self) -> str: + return self.func_name.replace("_", " ").capitalize() + + @property + def status_code(self) -> str: + return self.__status_code + + def could_run(self, index: Union[str, int], report: pd.DataFrame) -> bool: + """Determine if case at given index needs to be converted to NIfTI + + Args: + index (Union[str, int]): Case index, as used by the report dataframe + report (pd.DataFrame): Report Dataframe for providing additional context + + Returns: + bool: Wether this stage could be executed for the given case + """ + print(f"Checking if {self.name} can run") + prev_paths = self.__get_paths(index, self.prev_path, self.prev_subpath) + is_valid = all([os.path.exists(path) for path in prev_paths]) + print(f"{is_valid=}") + return is_valid + + def execute( + self, index: Union[str, int], report: pd.DataFrame + ) -> Tuple[pd.DataFrame, bool]: + """Executes the NIfTI transformation stage on the given case + + Args: + index (Union[str, int]): case index, as used by the report + report (pd.DataFrame): DataFrame containing the current state of the preparation flow + + Returns: + pd.DataFrame: Updated report dataframe + """ + self.__prepare_exec() + self.__copy_case(index) + self._process_case(index) + report, success = self.__update_state(index, report) + self.prep.write() + + return report, success + + def __prepare_exec(self): + # Reset the file contents for errors + open(self.prep.stderr_log, "w").close() + + # Update the out dataframes to current state + self.prep.read() + + def __get_paths(self, index: Union[str, int], path: str, subpath: str): + id, tp = get_id_tp(index) + data_path = os.path.join(path, self.data_subpath, id, tp) + out_path = os.path.join(path, subpath, id, tp) + return data_path, out_path + + def __copy_case(self, index: Union[str, int]): + prev_paths = self.__get_paths(index, self.prev_path, self.prev_subpath) + copy_paths = self.__get_paths(index, self.out_path, self.prev_subpath) + for prev, copy in zip(prev_paths, copy_paths): + shutil.rmtree(copy, ignore_errors=True) + shutil.copytree(prev, copy, dirs_exist_ok=True) + + def _process_case(self, index: Union[str, int]): + id, tp = get_id_tp(index) + df = self.prep.subjects_df + row_search = df[(df["SubjectID"] == id) & (df["Timepoint"] == tp)] + if len(row_search) > 0: + row = row_search.iloc[0] + else: + # Most probably this case was semi-prepared. Mock a row + row = pd.Series({"SubjectID": id, "Timepoint": tp, "T1": "", "T1GD": "", "T2": "", "FLAIR": ""}) + self.func(row, self.pbar) + + def __hide_paths(self, hide_paths): + for path in hide_paths: + dirname = os.path.dirname(path) + hidden_name = f".{os.path.basename(path)}" + hidden_path = os.path.join(dirname, hidden_name) + if os.path.exists(hidden_path): + shutil.rmtree(hidden_path) + shutil.move(path, hidden_path) + + def __update_state( + self, index: Union[str, int], report: pd.DataFrame + ) -> Tuple[pd.DataFrame, bool]: + if self.failed: + del_paths = self.__get_paths(index, self.out_path, self.subpath) + report, success = self.__report_failure(index, report) + for del_path in del_paths: + shutil.rmtree(del_path, ignore_errors=True) + else: + # Backup the paths in case we need to revert to this stage + hide_paths = self.__get_paths(index, self.prev_path, self.prev_subpath) + _, out_path = self.__get_paths(index, self.out_path, self.subpath) + # Wait a little so that file gets created + brain_mask_file = os.path.join(out_path, "brainMask_fused.nii.gz") + # Handle the case where a brain mask doesn't exist + # Due to the subject being semi-prepared + brain_mask_hash = "" + if os.path.exists(brain_mask_file): + brain_mask_hash = md5_file(brain_mask_file) + report, success = self.__report_success(index, report, brain_mask_hash) + self.__hide_paths(hide_paths) + + return report, success + + def __report_success( + self, index: Union[str, int], report: pd.DataFrame, brain_mask_hash: str + ) -> Tuple[pd.DataFrame, bool]: + data_path, labels_path = self.__get_paths(index, self.out_path, self.subpath) + labels_path = os.path.join(labels_path, *self.extra_labels_path) + report_data = { + "status": self.status_code, + "data_path": data_path, + "labels_path": labels_path, + "brain_mask_hash": brain_mask_hash, + } + update_row_with_dict(report, report_data, index) + return report, True + + def __report_failure( + self, index: Union[str, int], report: pd.DataFrame + ) -> Tuple[pd.DataFrame, bool]: + prev_data_path, prev_labels_path = self.__get_paths( + index, self.prev_path, self.prev_subpath + ) + msg = f"{str(self.exception)}: {self.traceback}" + + report_data = { + "status": -self.status_code, + "comment": msg, + "data_path": prev_data_path, + "labels_path": prev_labels_path, + } + update_row_with_dict(report, report_data, index) + return report, False diff --git a/examples/RANO/data_preparator/project/stages/extract_nnunet.py b/examples/RANO/data_preparator/project/stages/extract_nnunet.py new file mode 100644 index 000000000..3ef95e2f2 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/extract_nnunet.py @@ -0,0 +1,187 @@ +from typing import Union, List, Tuple +from tqdm import tqdm +import pandas as pd +import os +from os.path import realpath, dirname, join +import shutil +import time +import SimpleITK as sitk +import subprocess +import traceback +from LabelFusion.wrapper import fuse_images + +from .extract import Extract +from .PrepareDataset import ( + Preparator, + FINAL_FOLDER, + generate_tumor_segmentation_fused_images, + save_screenshot, +) +from .utils import update_row_with_dict, get_id_tp, MockTqdm + +MODALITY_MAPPING = { + "t1c": "t1c", + "t1ce": "t1c", + "t1": "t1n", + "t1n": "t1n", + "t2": "t2w", + "t2w": "t2w", + "t2f": "t2f", + "flair": "t2f", +} + +MODALITY_VARIANTS = { + "t1c": "T1GD", + "t1ce": "T1GD", + "t1": "T1", + "t1n": "T1", + "t2": "T2", + "t2w": "T2", + "t2f": "FLAIR", + "flair": "FLAIR", +} + + +class ExtractNnUNet(Extract): + def __init__( + self, + data_csv: str, + out_path: str, + subpath: str, + prev_stage_path: str, + prev_subpath: str, + status_code: int, + extra_labels_path=[], + nnunet_executable: str = "/nnunet_env/bin/nnUNet_predict" + ): + self.data_csv = data_csv + self.out_path = out_path + self.subpath = subpath + self.data_subpath = FINAL_FOLDER + self.prev_path = prev_stage_path + self.prev_subpath = prev_subpath + os.makedirs(self.out_path, exist_ok=True) + self.prep = Preparator(data_csv, out_path, "BraTSPipeline") + self.pbar = tqdm() + self.failed = False + self.exception = None + self.__status_code = status_code + self.extra_labels_path = extra_labels_path + self.nnunet_executable = nnunet_executable + + @property + def name(self) -> str: + return "nnUNet Tumor Extraction" + + @property + def status_code(self) -> str: + return self.__status_code + + def __get_models(self): + models_path = os.path.join(os.environ["RESULTS_FOLDER"], "nnUNet", "3d_fullres") + return os.listdir(models_path) + + def __get_mod_order(self, model): + order_path = os.path.join(os.environ["RESULTS_FOLDER"], os.pardir, "nnUNet_modality_order", model, "order") + with open(order_path, "r") as f: + order_str = f.readline() + # remove 'order = ' from the splitted list + modalities = order_str.split()[2:] + modalities = [MODALITY_MAPPING[mod] for mod in modalities] + return modalities + + def __prepare_case(self, path, id, tp, order): + tmp_subject = f"{id}-{tp}" + tmp_path = os.path.join(path, "tmp-data") + tmp_subject_path = os.path.join(tmp_path, tmp_subject) + tmp_out_path = os.path.join(path, "tmp-out") + shutil.rmtree(tmp_path, ignore_errors=True) + shutil.rmtree(tmp_out_path, ignore_errors=True) + os.makedirs(tmp_subject_path) + os.makedirs(tmp_out_path) + in_modalities_path = os.path.join(path, "DataForFeTS", id, tp) + input_modalities = {} + for modality_file in os.listdir(in_modalities_path): + if not modality_file.endswith(".nii.gz"): + continue + modality = modality_file[:-7].split("_")[-1] + norm_mod = MODALITY_MAPPING[modality] + mod_idx = order.index(norm_mod) + mod_idx = str(mod_idx).zfill(4) + + out_modality_file = f"{tmp_subject}_{mod_idx}.nii.gz" + in_file = os.path.join(in_modalities_path, modality_file) + out_file = os.path.join(tmp_subject_path, out_modality_file) + input_modalities[MODALITY_VARIANTS[modality]] = in_file + shutil.copyfile(in_file, out_file) + + return tmp_subject_path, tmp_out_path, input_modalities + + def __run_model(self, model, data_path, out_path): + # models are named Task_..., where is always 3 numbers + task_id = model[4:7] + cmd = f"{self.nnunet_executable} -i {data_path} -o {out_path} -t {task_id}" + print(cmd) + print(os.listdir(data_path)) + start = time.time() + subprocess.call(cmd, shell=True) + end = time.time() + total_time = end - start + print(f"Total time elapsed is {total_time} seconds") + + def __finalize_pred(self, tmp_out_path, out_pred_filepath): + # We assume there's only one file in out_path + pred = None + for file in os.listdir(tmp_out_path): + if file.endswith(".nii.gz"): + pred = file + + if pred is None: + raise RuntimeError("No tumor segmentation was found") + + pred_filepath = os.path.join(tmp_out_path, pred) + shutil.move(pred_filepath, out_pred_filepath) + return out_pred_filepath + + def _process_case(self, index: Union[str, int]): + id, tp = get_id_tp(index) + subject_id = f"{id}_{tp}" + models = self.__get_models() + outputs = [] + images_for_fusion = [] + out_path = os.path.join(self.out_path, "DataForQC", id, tp) + out_pred_path = os.path.join(out_path, "TumorMasksForQC") + os.makedirs(out_pred_path, exist_ok=True) + for i, model in enumerate(models): + order = self.__get_mod_order(model) + tmp_data_path, tmp_out_path, input_modalities = self.__prepare_case( + self.out_path, id, tp, order + ) + out_pred_filepath = os.path.join( + out_pred_path, f"{id}_{tp}_tumorMask_model_{i}.nii.gz" + ) + self.__run_model(model, tmp_data_path, tmp_out_path) + output = self.__finalize_pred(tmp_out_path, out_pred_filepath) + outputs.append(output) + images_for_fusion.append(sitk.ReadImage(output, sitk.sitkUInt8)) + + # cleanup + shutil.rmtree(tmp_data_path, ignore_errors=True) + shutil.rmtree(tmp_out_path, ignore_errors=True) + + fused_outputs = generate_tumor_segmentation_fused_images( + images_for_fusion, out_pred_path, subject_id + ) + outputs += fused_outputs + + for output in outputs: + # save the screenshot + tumor_mask_id = os.path.basename(output).replace(".nii.gz", "") + save_screenshot( + input_modalities, + os.path.join( + out_path, + f"{tumor_mask_id}_summary.png", + ), + output, + ) diff --git a/examples/RANO/data_preparator/project/stages/generate_report.py b/examples/RANO/data_preparator/project/stages/generate_report.py new file mode 100644 index 000000000..443179351 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/generate_report.py @@ -0,0 +1,414 @@ +from .dset_stage import DatasetStage +import pandas as pd +import numpy as np +import os +import re +import shutil +from typing import Tuple +from .utils import has_prepared_folder_structure, md5_dir, md5_file +from .constants import INTERIM_FOLDER, FINAL_FOLDER, TUMOR_MASK_FOLDER +from .mlcube_constants import REPORT_STAGE_STATUS + +DICOM_MODALITIES_PREFIX = {"fl": "t2_Flair", "t1": "t1_axial-3", "t1c": "t1_axial_stealth", "t2": "T2_SAG"} +NIFTI_MODALITIES = ["t1c", "t1n", "t2f", "t2w"] +BRAIN_SCAN_NAME = "brain_(.*)" +TUMOR_SEG_NAME = "final_seg" +CSV_HEADERS = ["SubjectID", "Timepoint", "T1", "T1GD", "T2", "FLAIR"] + +def get_index(subject, timepoint): + return f"{subject}|{timepoint}" + +def has_alternative_folder_structure(subject_tp_path, og_path): + contents = os.listdir(subject_tp_path) + prefixes_presence = {prefix: False for prefix in DICOM_MODALITIES_PREFIX.values()} + for content in contents: + content_path = os.path.join(subject_tp_path, content) + # Search recursively across folders + if os.path.isdir(content_path): + return has_alternative_folder_structure(content_path, og_path) + + # Check if the file is a dicom file with an expected prefix + if not content.endswith(".dcm"): + continue + + for prefix in DICOM_MODALITIES_PREFIX.values(): + if content.startswith(prefix): + prefixes_presence[prefix] = True + + # If all prefixes are found within the current path, then it has the folder structure + if all(prefixes_presence.values()): + return True, subject_tp_path + + # Structure not identified at this tree + return False, og_path + +def to_expected_folder_structure(subject_tp_path, contents_path): + # Create the modality folders + for modality in DICOM_MODALITIES_PREFIX.keys(): + modality_path = os.path.join(subject_tp_path, modality) + os.mkdir(modality_path) + + # Move the dicoms to the needed location + dicoms = os.listdir(contents_path) + prefix2mod = {prefix: mod for mod, prefix in DICOM_MODALITIES_PREFIX.items()} + for dicom in dicoms: + for prefix in prefix2mod.keys(): + if not dicom.startswith(prefix): + continue + mod = prefix2mod[prefix] + old_path = os.path.join(contents_path, dicom) + new_path = os.path.join(subject_tp_path, mod, dicom) + shutil.move(old_path, new_path) + + # Remove extra folders + desired_folders = set(DICOM_MODALITIES_PREFIX.keys()) + found_folders = set(os.listdir(subject_tp_path)) + extra_folders = found_folders - desired_folders + for folder in extra_folders: + folder_path = os.path.join(subject_tp_path, folder) + shutil.rmtree(folder_path) + +def has_semiprepared_folder_structure(subject_tp_path, og_path, recursive=True): + contents = os.listdir(subject_tp_path) + suffixes_presence = {suffix: False for suffix in NIFTI_MODALITIES} + for content in contents: + content_path = os.path.join(subject_tp_path, content) + if os.path.isdir(content_path): + if recursive: + return has_semiprepared_folder_structure(content_path, og_path) + else: + continue + + if not content.endswith(".nii.gz"): + continue + + for suffix in NIFTI_MODALITIES: + full_suffix = f"_brain_{suffix}.nii.gz" + if content.endswith(full_suffix): + suffixes_presence[suffix] = True + + if all(suffixes_presence.values()): + return True, subject_tp_path + + return False, og_path + +def get_timepoints(subject, subject_tp_path): + contents = os.listdir(subject_tp_path) + timepoints = set() + for content in contents: + content_path = os.path.join(subject_tp_path, subject) + if os.path.isdir(content_path): + # Assume any directory at this point represents a timepoint + timepoints.add(content) + continue + + pattern = re.compile(f"{subject}_(.*)_(?:{BRAIN_SCAN_NAME}|{TUMOR_SEG_NAME})\.nii\.gz") + result = pattern.search(content) + timepoint = result.group(1) + timepoints.add(timepoint) + + return list(timepoints) + +def get_tumor_segmentation(subject, timepoint, subject_tp_path): + contents = os.listdir(subject_tp_path) + seg_file = f"{subject}_{timepoint}_{TUMOR_SEG_NAME}.nii.gz" + if seg_file in contents: + return seg_file + return None + +def move_brain_scans(subject, timepoint, in_subject_path, out_data_path): + final_path = os.path.join(out_data_path, FINAL_FOLDER, subject, timepoint) + os.makedirs(final_path, exist_ok=True) + + contents = os.listdir(in_subject_path) + + pattern = re.compile(f"{subject}_{timepoint}_{BRAIN_SCAN_NAME}\.nii\.gz") + brain_scans = [content for content in contents if pattern.match(content)] + + for scan in brain_scans: + in_scan = os.path.join(in_subject_path, scan) + out_scan = os.path.join(final_path, scan) + shutil.copyfile(in_scan, out_scan) + +def move_tumor_segmentation(subject, timepoint, seg_file, in_subject_path, out_data_path, out_labels_path): + interim_path = os.path.join(out_data_path, INTERIM_FOLDER, subject, timepoint) + os.makedirs(interim_path, exist_ok=True) + + in_seg_path = os.path.join(in_subject_path, seg_file) + tumor_mask_path = os.path.join(interim_path, TUMOR_MASK_FOLDER) + under_review_path = os.path.join(tumor_mask_path, "under_review") + finalized_path = os.path.join(tumor_mask_path, "finalized") + os.makedirs(under_review_path, exist_ok=True) + os.makedirs(finalized_path, exist_ok=True) + + seg_root_path = os.path.join(tumor_mask_path, seg_file) + seg_under_review_path = os.path.join(under_review_path, seg_file) + seg_finalized_path = os.path.join(finalized_path, seg_file) + shutil.copyfile(in_seg_path, seg_root_path) + shutil.copyfile(in_seg_path, seg_under_review_path) + shutil.copyfile(in_seg_path, seg_finalized_path) + + # Place the segmentation in the backup folder + backup_path = os.path.join(out_labels_path, ".tumor_segmentation_backup") + subject_tp_backup_path = os.path.join(backup_path, subject, timepoint, TUMOR_MASK_FOLDER) + os.makedirs(subject_tp_backup_path, exist_ok=True) + seg_backup_path = os.path.join(subject_tp_backup_path, seg_file) + shutil.copyfile(in_seg_path, seg_backup_path) + + return in_seg_path, seg_finalized_path + +def write_partial_csv(csv_path, subject, timepoint): + # Used when cases are semi-prepared, in which case they + # skip the formal csv creation + if os.path.exists(csv_path): + df = pd.read_csv(csv_path) + else: + df = pd.DataFrame(columns=CSV_HEADERS) + + row = pd.Series(index=CSV_HEADERS) + row["SubjectID"] = subject + row["Timepoint"] = timepoint + row.name = get_index(subject, timepoint) + row = row.fillna("") + + # Check for existence of this row + row_search = df[(df["SubjectID"] == subject) & (df["Timepoint"] == timepoint)] + if len(row_search) == 0: + df = df.append(row) + + df.to_csv(csv_path, index=False) + + +class GenerateReport(DatasetStage): + def __init__( + self, + data_csv: str, + input_path: str, + output_path: str, + input_labels_path: str, + output_labels_path, + done_data_out_path: str, + done_status: int, + brain_data_out_path: str, + brain_status: int, + tumor_data_out_path: str, + reviewed_status: int, + ): + self.data_csv = data_csv + self.input_path = input_path + self.output_path = output_path + self.input_labels_path = input_labels_path + self.output_labels_path = output_labels_path + self.done_data_out_path = done_data_out_path + self.done_status_code = done_status + self.brain_data_out_path = brain_data_out_path + self.brain_status = brain_status + self.tumor_data_out_path = tumor_data_out_path + self.reviewed_status = reviewed_status + + @property + def name(self) -> str: + return "Generate Report" + + @property + def status_code(self) -> int: + return REPORT_STAGE_STATUS + + def _proceed_to_comparison(self, subject, timepoint, in_subject_path, report): + index = get_index(subject, timepoint) + final_path = os.path.join(self.tumor_data_out_path, FINAL_FOLDER, subject, timepoint) + input_hash = md5_dir(in_subject_path) + # Stop if the subject was already present and no input change has happened + if index in report.index: + if input_hash == report.loc[index]["input_hash"]: + return report + + # Move brain scans to its expected location + move_brain_scans(subject, timepoint, in_subject_path, self.tumor_data_out_path) + + # Move tumor segmentation to its expected location + seg_file = f"{subject}_{timepoint}_{TUMOR_SEG_NAME}.nii.gz" + _, seg_finalized_path = move_tumor_segmentation(subject, timepoint, seg_file, in_subject_path, self.tumor_data_out_path, self.output_labels_path) + + # Update the report + data = { + "status": self.reviewed_status, + "data_path": final_path, + "labels_path": seg_finalized_path, + "num_changed_voxels": np.nan, + "brain_mask_hash": "", + "segmentation_hash": "", + "input_hash": input_hash, + } + + subject_series = pd.Series(data) + subject_series.name = index + report = report.append(subject_series) + + write_partial_csv(self.data_csv, subject, timepoint) + + return report + + def _proceed_to_tumor_extraction(self, subject, timepoint, in_subject_path, report): + index = get_index(subject, timepoint) + input_hash = md5_dir(in_subject_path) + # Stop if the subject was already present and no input change has happened + if index in report.index: + if input_hash == report.loc[index]["input_hash"]: + return report + final_path = os.path.join(self.brain_data_out_path, FINAL_FOLDER, subject, timepoint) + labels_path = os.path.join(self.brain_data_out_path, INTERIM_FOLDER, subject, timepoint) + os.makedirs(final_path, exist_ok=True) + os.makedirs(labels_path, exist_ok=True) + + # Move brain scans to its expected location + move_brain_scans(subject, timepoint, in_subject_path, self.brain_data_out_path) + + # Update the report + data = { + "status": self.brain_status, + "data_path": final_path, + "labels_path": labels_path, + "num_changed_voxels": np.nan, + "brain_mask_hash": "", + "segmentation_hash": "", + "input_hash": input_hash, + } + + subject_series = pd.Series(data) + subject_series.name = index + report = report.append(subject_series) + + write_partial_csv(self.data_csv, subject, timepoint) + + return report + + def could_run(self, report: pd.DataFrame): + return True + + def execute(self, report: pd.DataFrame) -> Tuple[pd.DataFrame, bool]: + # Rewrite the report + cols = [ + "status", + "status_name", + "comment", + "data_path", + "labels_path", + "input_hash", + ] + print("Initializing report") + if report is None: + print("No previous report was identified. Creating a new one") + report = pd.DataFrame(columns=cols) + + input_is_prepared = has_prepared_folder_structure( + self.input_path, self.input_labels_path + ) + if input_is_prepared: + # If prepared, store data directly in the data folder + print("Input data looks prepared already. Skipping preprocessing") + self.output_path = self.done_data_out_path + + observed_cases = set() + + for subject in os.listdir(self.input_path): + in_subject_path = os.path.join(self.input_path, subject) + out_subject_path = os.path.join(self.output_path, subject) + in_labels_subject_path = os.path.join(self.input_labels_path, subject) + out_labels_subject_path = os.path.join(self.output_labels_path, subject) + + if not os.path.isdir(in_subject_path): + continue + + has_semiprepared, _ = has_semiprepared_folder_structure(in_subject_path, in_subject_path, recursive=False) + if has_semiprepared: + timepoints = get_timepoints(subject, in_subject_path) + for timepoint in timepoints: + index = get_index(subject, timepoint) + tumor_seg = get_tumor_segmentation(subject, timepoint, in_subject_path) + if tumor_seg is not None: + report = self._proceed_to_comparison(subject, timepoint, in_subject_path, report) + else: + report = self._proceed_to_tumor_extraction(subject, timepoint, in_subject_path, report) + observed_cases.add(index) + continue + + for timepoint in os.listdir(in_subject_path): + in_tp_path = os.path.join(in_subject_path, timepoint) + out_tp_path = os.path.join(out_subject_path, timepoint) + in_labels_tp_path = os.path.join(in_labels_subject_path, timepoint) + out_labels_tp_path = os.path.join(out_labels_subject_path, timepoint) + + if not os.path.isdir(in_tp_path): + continue + + input_hash = md5_dir(in_tp_path) + + index = get_index(subject, timepoint) + + # Keep track of the cases that were found on the input folder + observed_cases.add(index) + + has_semiprepared, in_tp_path = has_semiprepared_folder_structure(in_tp_path, in_tp_path, recursive=True) + if has_semiprepared: + tumor_seg = get_tumor_segmentation(subject, timepoint, in_tp_path) + if tumor_seg is not None: + report = self._proceed_to_comparison(subject, timepoint, in_tp_path, report) + else: + report = self._proceed_to_tumor_extraction(subject, timepoint, in_tp_path, report) + continue + + if index in report.index: + # Case has already been identified, see if input hash is different + # or if status is corrupted + # if so, override the contents and restart the state for that case + case = report.loc[index] + has_not_changed = case["input_hash"] == input_hash + has_a_valid_status = not np.isnan(case["status"]) + if has_not_changed and has_a_valid_status: + continue + + print(f"Case {index} has either changed ({not has_not_changed}) or has a corrupted status ({not has_a_valid_status}). Starting from scratch") + + shutil.rmtree(out_tp_path, ignore_errors=True) + shutil.copytree(in_tp_path, out_tp_path) + report = report.drop(index) + else: + # New case not identified by the report. Add it + print(f"New case identified: {index}. Adding to report") + shutil.rmtree(out_tp_path, ignore_errors=True) + shutil.copytree(in_tp_path, out_tp_path) + + data = { + "status": self.status_code, + "data_path": out_tp_path, + "labels_path": "", + "num_changed_voxels": np.nan, + "brain_mask_hash": "", + "segmentation_hash": "", + "input_hash": input_hash, + } + + has_alternative, contents_path = has_alternative_folder_structure(out_tp_path, out_tp_path) + if has_alternative: + # Move files around so it has the expected structure + to_expected_folder_structure(out_tp_path, contents_path) + + if input_is_prepared: + data["status_code"] = self.done_status_code + shutil.rmtree(out_labels_tp_path, ignore_errors=True) + shutil.copytree(in_labels_tp_path, out_labels_tp_path) + + subject_series = pd.Series(data) + subject_series.name = index + report = report.append(subject_series) + + reported_cases = set(report.index) + removed_cases = reported_cases - observed_cases + + # Stop reporting removed cases + for case_index in removed_cases: + report = report.drop(case_index) + + report = report.sort_index() + return report, True diff --git a/examples/RANO/data_preparator/project/stages/get_csv.py b/examples/RANO/data_preparator/project/stages/get_csv.py new file mode 100644 index 000000000..ac75692ec --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/get_csv.py @@ -0,0 +1,132 @@ +from .row_stage import RowStage +from .CreateCSVForDICOMs import CSVCreator +from .utils import update_row_with_dict, get_id_tp +from pathlib import Path + +import pandas as pd +from typing import Union, Tuple +import os +import shutil +from .mlcube_constants import CSV_STAGE_STATUS + + +class AddToCSV(RowStage): + def __init__( + self, input_dir: str, output_csv: str, out_dir: str, prev_stage_path: str + ): + self.input_dir = input_dir + self.output_csv = output_csv + self.out_dir = out_dir + self.prev_stage_path = prev_stage_path + os.makedirs(self.out_dir, exist_ok=True) + self.csv_processor = CSVCreator(self.input_dir, self.output_csv) + if os.path.exists(self.output_csv): + # Use the updated version of the CSV + self.contents = pd.read_csv(self.output_csv) + self.csv_processor.output_df_for_csv = self.contents + else: + # Use the default, empty version + self.contents = self.csv_processor.output_df_for_csv + + @property + def name(self) -> str: + return "Initial Validation" + + @property + def status_code(self) -> int: + return CSV_STAGE_STATUS + + def could_run(self, index: Union[str, int], report: pd.DataFrame) -> bool: + """Determines if getting a new CSV is necessary. + This is done by checking the existence of the expected file + + Args: + index (Union[str, int]): case index in the report + report (pd.DataFrame): Dataframe containing the current state of the preparation flow + + Returns: + bool: wether this stage could be executed + """ + print(f"Checking if {self.name} can run") + id, tp = get_id_tp(index) + prev_case_path = os.path.join(self.prev_stage_path, id, tp) + is_valid = os.path.exists(prev_case_path) + print(f"{is_valid=}") + return is_valid + + def execute( + self, index: Union[str, int], report: pd.DataFrame + ) -> Tuple[pd.DataFrame, bool]: + """Adds valid cases to the data csv that is used for later processing + Invalid cases are flagged in the report + + Args: + index (Union[str, int]): case index in the report + report (pd.DataFrame): DataFrame containing the current state of the preparation flow + + Returns: + pd.DataFrame: Updated report dataframe + """ + id, tp = get_id_tp(index) + subject_path = os.path.join(self.input_dir, id) + tp_path = os.path.join(subject_path, tp) + subject_out_path = os.path.join(self.out_dir, id) + tp_out_path = os.path.join(subject_out_path, tp) + # We will first copy the timepoint to the out folder + # This is so, if successful, the csv will point to the data + # in the next stage, instead of the previous + shutil.rmtree(tp_out_path, ignore_errors=True) + shutil.copytree(tp_path, tp_out_path) + + try: + self.csv_processor.process_timepoint(tp, id, subject_out_path) + report_data = { + "status": self.status_code, + "data_path": tp_out_path, + "labels_path": "", + } + except Exception as e: + report_data = { + "status": -self.status_code - 0.3, + "comment": str(e), + "data_path": tp_path, + "labels_path": "", + } + update_row_with_dict(report, report_data, index) + return report, False + + missing = self.csv_processor.subject_timepoint_missing_modalities + extra = self.csv_processor.subject_timepoint_extra_modalities + + success = True + report_data["comment"] = "" + for missing_subject, msg in missing: + if f"{id}_{tp}" in missing_subject: + # Differentiate errors by floating point value + status_code = -self.status_code - 0.1 # -1.1 + report_data["status"] = status_code + report_data["data_path"] = tp_path + report_data["comment"] += "\n\n" + msg + success = False + + for extra_subject, msg in extra: + if f"{id}_{tp}" in extra_subject: + # Differentiate errors by floating point value + status_code = -self.status_code - 0.2 # -1.2 + report_data["status"] = status_code + report_data["data_path"] = tp_path + report_data["comment"] += "\n\n" + msg + success = False + + if success: + shutil.rmtree(tp_path) + else: + shutil.rmtree(tp_out_path, ignore_errors=True) + + report_data["comment"] = report_data["comment"].strip() + + update_row_with_dict(report, report_data, index) + + self.csv_processor.write() + + return report, success diff --git a/examples/RANO/data_preparator/project/stages/manual.py b/examples/RANO/data_preparator/project/stages/manual.py new file mode 100644 index 000000000..a3bc55095 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/manual.py @@ -0,0 +1,224 @@ +from typing import Union, Tuple +import pandas as pd +import os +import shutil + +from .row_stage import RowStage +from .constants import TUMOR_MASK_FOLDER, INTERIM_FOLDER, FINAL_FOLDER +from .mlcube_constants import MANUAL_STAGE_STATUS +from .utils import ( + get_id_tp, + update_row_with_dict, + set_files_read_only, + copy_files, + md5_file, +) + + +class ManualStage(RowStage): + def __init__( + self, data_csv: str, out_path: str, prev_stage_path: str, backup_path: str + ): + self.data_csv = data_csv + self.out_path = out_path + self.prev_stage_path = prev_stage_path + self.backup_path = backup_path + self.rollback_path = os.path.join(os.path.dirname(out_path), "prepared") + self.brain_mask_file = "brainMask_fused.nii.gz" + + @property + def name(self): + return "Manual review" + + @property + def status_code(self) -> int: + return MANUAL_STAGE_STATUS + + def __get_input_paths(self, index: Union[str, int]): + id, tp = get_id_tp(index) + tumor_mask_path = os.path.join( + self.prev_stage_path, INTERIM_FOLDER, id, tp, TUMOR_MASK_FOLDER + ) + brain_mask_path = os.path.join( + self.prev_stage_path, INTERIM_FOLDER, id, tp, self.brain_mask_file + ) + return tumor_mask_path, brain_mask_path + + def __get_under_review_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join( + self.out_path, INTERIM_FOLDER, id, tp, TUMOR_MASK_FOLDER, "under_review" + ) + return path + + def __get_output_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join( + self.out_path, INTERIM_FOLDER, id, tp, TUMOR_MASK_FOLDER, "finalized" + ) + return path + + def __get_backup_path(self, index: Union[str, int]): + id, tp = get_id_tp(index) + path = os.path.join(self.backup_path, id, tp, TUMOR_MASK_FOLDER) + return path + + def __get_rollback_paths(self, index: Union[str, int]): + id, tp = get_id_tp(index) + data_path = os.path.join(self.rollback_path, FINAL_FOLDER, id, tp) + labels_path = os.path.join(self.rollback_path, INTERIM_FOLDER, id, tp) + return data_path, labels_path + + def __report_success( + self, index: Union[str, int], report: pd.DataFrame + ) -> pd.DataFrame: + labels_path = self.__get_output_path(index) + data_path = report.loc[index, "data_path"] + report_data = { + "status": 5, + "data_path": data_path, + "labels_path": labels_path, + } + update_row_with_dict(report, report_data, index) + return report + + def __report_step_missing( + self, index: Union[str, int], report: pd.DataFrame + ) -> pd.DataFrame: + in_path, _ = self.__get_input_paths(index) + data_path = report.loc[index, "data_path"] + + report_data = { + "status": -self.status_code, + "data_path": data_path, + "labels_path": in_path, + } + update_row_with_dict(report, report_data, index) + return report + + def __report_multiple_cases_error( + self, index: Union[str, int], report: pd.DataFrame, cases: list + ) -> pd.DataFrame: + path = self.__get_output_path(index) + data_path = report.loc[index, "data_path"] + + report_data = { + "status": -self.status_code - 0.1, # -5.1 + "data_path": data_path, + "labels_path": path, + "comment": f"Multiple files were identified in the labels path: {cases}. " \ + + "Please ensure that there is only the manually corrected segmentation file." + } + update_row_with_dict(report, report_data, index) + return report + + def __rollback(self, index): + # Unhide the rollback paths + rollback_paths = self.__get_rollback_paths(index) + for rollback_path in rollback_paths: + rollback_dirname = os.path.dirname(rollback_path) + rollback_basename = os.path.basename(rollback_path) + hidden_rollback_path = os.path.join( + rollback_dirname, f".{rollback_basename}" + ) + + if os.path.exists(hidden_rollback_path): + shutil.move(hidden_rollback_path, rollback_path) + + # Move the modified brain mask to the rollback path + _, rollback_labels_path = rollback_paths + tumor_masks_path, brain_mask_path = self.__get_input_paths(index) + rollback_brain_mask_path = os.path.join( + rollback_labels_path, self.brain_mask_file + ) + if os.path.exists(rollback_brain_mask_path): + os.remove(rollback_brain_mask_path) + shutil.move(brain_mask_path, rollback_brain_mask_path) + + # Remove the complete subject path + subject_path = os.path.abspath(os.path.join(tumor_masks_path, "..")) + + shutil.rmtree(subject_path) + + def __report_rollback( + self, index: Union[str, int], report: pd.DataFrame, mask_hash + ) -> pd.DataFrame: + rollback_fets_path, rollback_qc_path = self.__get_rollback_paths(index) + + report_data = { + "status": 2, # Move back to nifti transform finished + "data_path": rollback_qc_path, + "labels_path": rollback_fets_path, + "brain_mask_hash": mask_hash, + "num_changed_voxels": 0.0, # Ensure voxel count is reset + "segmentation_hash": "", + } + update_row_with_dict(report, report_data, index) + return report + + def could_run(self, index: Union[str, int], report: pd.DataFrame) -> bool: + print(f"Checking if {self.name} can run") + out_path = self.__get_output_path(index) + cases = [] + if os.path.exists(out_path): + cases = os.listdir(out_path) + + in_path, brain_path = self.__get_input_paths(index) + brain_mask_hash = "" + if os.path.exists(brain_path): + brain_mask_hash = md5_file(brain_path) + expected_brain_mask_hash = report.loc[index, "brain_mask_hash"] + + segmentation_exists = os.path.exists(in_path) + annotation_exists = len(cases) == 1 + brain_mask_changed = brain_mask_hash != expected_brain_mask_hash + print(f"{segmentation_exists=} and (not {annotation_exists=} or {brain_mask_changed=})") + return segmentation_exists and (not annotation_exists or brain_mask_changed) + + def execute( + self, index: Union[str, int], report: pd.DataFrame + ) -> Tuple[pd.DataFrame, bool]: + """Manual steps are by definition not doable by an algorithm. Therefore, + execution of this step leads to a failed stage message, indicating that + the manual step has not been done. + + Args: + index (Union[str, int]): current case index + report (pd.DataFrame): data preparation report + + Returns: + pd.DataFrame: _description_ + """ + + # Generate a hidden copy of the baseline segmentations + in_path, brain_path = self.__get_input_paths(index) + out_path = self.__get_output_path(index) + under_review_path = self.__get_under_review_path(index) + bak_path = self.__get_backup_path(index) + if not os.path.exists(bak_path): + copy_files(in_path, bak_path) + set_files_read_only(bak_path) + os.makedirs(under_review_path, exist_ok=True) + os.makedirs(out_path, exist_ok=True) + + cases = os.listdir(out_path) + + brain_mask_hash = "" + if os.path.exists(brain_path): + brain_mask_hash = md5_file(brain_path) + expected_brain_mask_hash = report.loc[index, "brain_mask_hash"] + brain_mask_changed = brain_mask_hash != expected_brain_mask_hash + + if brain_mask_changed: + # Found brain mask changed + self.__rollback(index) + # Label this as able to continue + return self.__report_rollback(index, report, brain_mask_hash), True + + if len(cases) > 1: + # Found more than one reviewed case + return self.__report_multiple_cases_error(index, report, cases), False + elif not len(cases): + # Found no cases yet reviewed + return self.__report_step_missing(index, report), False + return self.__report_success(index, report), True diff --git a/examples/RANO/data_preparator/project/stages/mlcube_constants.py b/examples/RANO/data_preparator/project/stages/mlcube_constants.py new file mode 100644 index 000000000..89eed6911 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/mlcube_constants.py @@ -0,0 +1,19 @@ +RAW_PATH = "raw" +VALID_PATH = "validated" +PREP_PATH = "prepared" +BRAIN_PATH = "brain_extracted" +TUMOR_PATH = "tumor_extracted" +TUMOR_BACKUP_PATH = ".tumor_segmentation_backup" +OUT_CSV = "data.csv" +TRASH_PATH = ".trash" +INVALID_FILE = ".invalid.txt" + +REPORT_STAGE_STATUS = 0 +CSV_STAGE_STATUS = 1 +NIFTI_STAGE_STATUS = 2 +BRAIN_STAGE_STATUS = 3 +TUMOR_STAGE_STATUS = 4 +MANUAL_STAGE_STATUS = 5 +COMPARISON_STAGE_STATUS = 6 +CONFIRM_STAGE_STATUS = 7 +DONE_STAGE_STATUS = 8 diff --git a/examples/RANO/data_preparator/project/stages/nifti_transform.py b/examples/RANO/data_preparator/project/stages/nifti_transform.py new file mode 100644 index 000000000..9267c3938 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/nifti_transform.py @@ -0,0 +1,166 @@ +from typing import Union +from tqdm import tqdm +import pandas as pd +import os +import shutil + +from .row_stage import RowStage +from .PrepareDataset import Preparator, INTERIM_FOLDER, FINAL_FOLDER +from .utils import update_row_with_dict, get_id_tp, MockTqdm, unnormalize_path +from .mlcube_constants import NIFTI_STAGE_STATUS + + +class NIfTITransform(RowStage): + def __init__( + self, data_csv: str, out_path: str, prev_stage_path: str, metadata_path: str, data_out: str, + ): + self.data_csv = data_csv + self.out_path = out_path + self.data_out = data_out + self.prev_stage_path = prev_stage_path + self.metadata_path = metadata_path + os.makedirs(self.out_path, exist_ok=True) + self.prep = Preparator(data_csv, out_path, "BraTSPipeline") + # self.pbar = pbar + self.pbar = tqdm() + + @property + def name(self) -> str: + return "NiFTI Conversion" + + @property + def status_code(self) -> int: + return NIFTI_STAGE_STATUS + + def could_run(self, index: Union[str, int], report: pd.DataFrame) -> bool: + """Determine if case at given index needs to be converted to NIfTI + + Args: + index (Union[str, int]): Case index, as used by the report dataframe + report (pd.DataFrame): Report Dataframe for providing additional context + + Returns: + bool: Wether this stage could be executed for the given case + """ + print(f"Checking if {self.name} can run") + id, tp = get_id_tp(index) + prev_case_path = os.path.join(self.prev_stage_path, id, tp) + if os.path.exists(prev_case_path): + is_valid = len(os.listdir(prev_case_path)) > 0 + print(f"{is_valid}") + return is_valid + return False + + def execute(self, index: Union[str, int], report: pd.DataFrame) -> pd.DataFrame: + """Executes the NIfTI transformation stage on the given case + + Args: + index (Union[str, int]): case index, as used by the report + report (pd.DataFrame): DataFrame containing the current state of the preparation flow + + Returns: + pd.DataFrame: Updated report dataframe + """ + self.__prepare_exec() + self.__process_case(index) + self.__cleanup_artifacts(index) + report, success = self.__update_report(index, report) + self.prep.write() + self.__update_metadata() + + return report, success + + def __cleanup_artifacts(self, index): + unused_artifacts_substrs = ["raw", "to_SRI", ".mat"] + _, out_path = self.__get_output_paths(index) + root_artifacts = os.listdir(out_path) + for artifact in root_artifacts: + if not any([substr in artifact for substr in unused_artifacts_substrs]): + continue + artifact_path = os.path.join(out_path, artifact) + os.remove(artifact_path) + + def __get_output_paths(self, index: Union[str, int]): + id, tp = get_id_tp(index) + fets_path = os.path.join(self.prep.final_output_dir, id, tp) + qc_path = os.path.join(self.prep.interim_output_dir, id, tp) + + return fets_path, qc_path + + def __prepare_exec(self): + # Reset the file contents for errors + open(self.prep.stderr_log, "w").close() + + self.prep.read() + + def __process_case(self, index: Union[str, int]): + id, tp = get_id_tp(index) + df = self.prep.subjects_df + row = df[(df["SubjectID"] == id) & (df["Timepoint"] == tp)].iloc[0] + self.prep.convert_to_dicom(hash(index), row, self.pbar) + + def __update_prev_stage_state(self, index: Union[str, int], report: pd.DataFrame): + prev_data_path = report.loc[index]["data_path"] + prev_data_path = unnormalize_path(prev_data_path, self.data_out) + shutil.rmtree(prev_data_path, ignore_errors=True) + + def __undo_current_stage_changes(self, index: Union[str, int]): + fets_path, qc_path = self.__get_output_paths(index) + shutil.rmtree(fets_path, ignore_errors=True) + shutil.rmtree(qc_path, ignore_errors=True) + + def __update_report( + self, index: Union[str, int], report: pd.DataFrame + ) -> pd.DataFrame: + id, tp = get_id_tp(index) + failing = self.prep.failing_subjects + failing_subject = failing[ + (failing["SubjectID"] == id) & (failing["Timepoint"] == tp) + ] + if len(failing_subject): + self.__undo_current_stage_changes(index) + report = self.__report_failure(index, report) + success = False + else: + self.__update_prev_stage_state(index, report) + report = self.__report_success(index, report) + success = True + + return report, success + + def __update_metadata(self): + fets_path = os.path.join(self.out_path, "DataForFeTS") + for file in os.listdir(fets_path): + filepath = os.path.join(fets_path, file) + out_filepath = os.path.join(self.metadata_path, file) + if os.path.isfile(filepath) and filepath.endswith(".yaml"): + shutil.copyfile(filepath, out_filepath) + + def __report_success( + self, index: Union[str, int], report: pd.DataFrame + ) -> pd.DataFrame: + fets_path, qc_path = self.__get_output_paths(index) + report_data = { + "status": self.status_code, + "data_path": qc_path, + "labels_path": fets_path, + } + update_row_with_dict(report, report_data, index) + return report + + def __report_failure( + self, index: Union[str, int], report: pd.DataFrame + ) -> pd.DataFrame: + prev_data_path = report.loc[index]["data_path"] + + with open(self.prep.stderr_log, "r") as f: + msg = f.read() + + report_data = { + "status": -self.status_code, + "comment": msg, + "data_path": prev_data_path, + "labels_path": "", + } + update_row_with_dict(report, report_data, index) + return report diff --git a/examples/RANO/data_preparator/project/stages/pipeline.py b/examples/RANO/data_preparator/project/stages/pipeline.py new file mode 100644 index 000000000..b876c8516 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/pipeline.py @@ -0,0 +1,313 @@ +from pandas import DataFrame +from typing import Union, List, Tuple +from tqdm import tqdm +import traceback +from pathlib import Path +import yaml +import os + +from .dset_stage import DatasetStage +from .row_stage import RowStage +from .stage import Stage +from .utils import cleanup_storage +from .mlcube_constants import DONE_STAGE_STATUS + + +def normalize_report_paths(report: DataFrame) -> DataFrame: + """Ensures paths are normalized and converts them to relative paths for the local machine + + Args: + report (DataFrame): report to normalize + + Returns: + DataFrame: report with transformed paths + """ + pattern = "mlcube_io\d+" + report["data_path"] = report["data_path"].str.split(pattern).str[-1] + report["labels_path"] = report["labels_path"].str.split(pattern).str[-1] + return report + + +def write_report(report: DataFrame, filepath: str): + report = normalize_report_paths(report) + report_dict = report.to_dict() + + # Use a temporary file to avoid quick writes collisions and corruption + temp_path = Path(filepath).parent / ".report.yaml" + with open(temp_path, 'w') as f: + yaml.dump(report_dict, f) + os.rename(temp_path, filepath) + + +class Pipeline: + def __init__( + self, + init_stage: DatasetStage, + stages: List[Union[DatasetStage, RowStage]], + staging_folders: List[str], + trash_folders: List[str], + invalid_subjects_file: str + ): + self.init_stage = init_stage + self.stages = stages + self.staging_folders = staging_folders + self.trash_folders = trash_folders + self.invalid_subjects_file = invalid_subjects_file + + def __invalid_subjects(self) -> List[str]: + """Retrieve invalid subjects + + Returns: + List[str]: list of invalid subjects + """ + if not os.path.exists(self.invalid_subjects_file): + open(self.invalid_subjects_file, "a").close() + + with open(self.invalid_subjects_file, "r") as f: + invalid_subjects = set([line.strip() for line in f]) + + return invalid_subjects + + def __is_subject_done(self, subject: Union[str, int], report: DataFrame) -> bool: + """Determines if a subject is considered done + + Args: + subject (Union[str, int]): subject index + report (DataFrame): DataFrame containing the state of the processing + + Returns: + bool: wether the subject is done or not + """ + subject_status = report.loc[subject, "status"] + + return subject_status == DONE_STAGE_STATUS + + def __is_done(self, report: DataFrame) -> bool: + """Determines if the preparation is complete + + Args: + report (DataFrame): DataFrame containing the state of the processing + + Returns: + bool: Wether the preparation is complete + """ + return all(report["status"] == DONE_STAGE_STATUS) + + def __get_report_stage_to_run( + self, subject: Union[str, int], report: DataFrame + ) -> Union[DatasetStage, RowStage]: + """Retrieves the stage a subject is in indicated by the report + + Args: + subject (Union[str, int]): Subject index + report (DataFrame): Dataframe containing the state of the processing + + Returns: + Union[DatasetStage, RowStage]: Stage the current subject is in + """ + report_status_code = int(report.loc[subject, "status"]) + if report_status_code < 0: + # Error code, rerun the stage specified in the report + report_status_code = abs(report_status_code) + else: + # Success code, reported stage works so move to the next one + report_status_code += 1 + for stage in self.stages: + if stage.status_code == report_status_code: + return stage + + return None + + def determine_next_stage( + self, subject: Union[str, int], report + ) -> Tuple[List[Union[DatasetStage, RowStage]], bool]: + """Determines what stage to run + First priority goes to a stage if it is the only one that could run. (only one stage can run) + Second priority goes to what the report says should run next. (The report knows what stage can run) + Third priority goes to the first of all possible stages that could run. (Earliest of all possible stages) + + Args: + subject (Union[str, int]): Subject name (SubjectID, Timepoint) + report (pd.DataFrame): report dataframe + + Returns: + Tuple[List[Union[DatasetStage, RowStage]], bool]: Stage to run, and wether it is done or not + """ + could_run_stages = [] + for i, stage in enumerate(self.stages): + could_run = False + if isinstance(stage, RowStage): + could_run = stage.could_run(subject, report) + else: + could_run = stage.could_run(report) + + if could_run: + runnable_stage = self.stages[i] + could_run_stages.append(runnable_stage) + + print(f"Possible next stages: {[stage.name for stage in could_run_stages]}") + + # TODO: split into a function + if len(could_run_stages) == 1: + stage = could_run_stages[0] + is_last_subject = subject == report.index[-1] + if isinstance(stage, DatasetStage) and not is_last_subject: + # Only run dataset stages on the last subject, so all subjects can update + # their state if needed before proceeding + return None, False + return stage, False + + # Handle errors + # Either no stage can be executed (len(could_run_stages == 0)) + # or multiple stages can be executed (len(could_run_stages > 1)) + report_stage = self.__get_report_stage_to_run(subject, report) + if report_stage is not None: + print(f"Reported next stage: {report_stage.name}") + + # TODO: split into a function + if len(could_run_stages) == 0: + # Either the case processing was on-going but it's state is broken + # or the next stage is a dataset stage, which means we're done with this one + # or the case is done and no stage can nor should run + # We can tell this by looking at the report + is_done = self.__is_subject_done(subject, report) + is_dset_stage = isinstance(report_stage, DatasetStage) + if is_done or is_dset_stage: + return None, True + else: + return None, False + # TODO: split into a function + else: + # Multiple stages could run. Remove ambiguity by + # syncing with the report + if report_stage in could_run_stages: + return report_stage, False + + return could_run_stages[0], False + + def run(self, report: DataFrame, report_path: str): + # cleanup the trash at the very beginning + cleanup_storage(self.trash_folders) + + # The init stage always has to be executed + report, _ = self.init_stage.execute(report) + write_report(report, report_path) + + invalid_subjects = self.__invalid_subjects() + + should_loop = True + should_stop = False + while should_loop: + + # Since we could have row and dataset stages interwoven, we want + # to make sure we continue processing subjects until nothing new has happened. + # This means we can resume a given subject and its row stages even after a dataset stage + prev_status = report["status"].copy() + subjects = list(report.index) + subjects_loop = tqdm(subjects) + + for subject in subjects_loop: + report, should_stop = self.process_subject( + subject, report, report_path, subjects_loop + ) + + if should_stop: + break + + # If a new invalid subject is identified, start over + new_invalid_subjects = self.__invalid_subjects() + if invalid_subjects != new_invalid_subjects: + invalid_subjects = new_invalid_subjects + # We're going to restart the subjects loop + break + + # Check for report differences. If there are, rerun the loop + should_loop = any(report["status"] != prev_status) and not should_stop + + if self.__is_done(report): + cleanup_folders = self.staging_folders + self.trash_folders + cleanup_storage(cleanup_folders) + + def process_subject( + self, subject: Union[int, str], report: DataFrame, report_path: str, pbar: tqdm + ): + should_stop = False + while True: + # Check if subject has been invalidated + invalid_subjects = self.__invalid_subjects() + if subject in invalid_subjects: + break + + # Filter out invalid subjects + working_report = report[~report.index.isin(invalid_subjects)].copy() + + print(f"Determining next stage for {subject}", flush=True) + stage, done = self.determine_next_stage(subject, working_report) + if stage is not None: + print(f"Next stage for {subject}: {stage.name}", flush=True) + + if done: + print(f"Subject {subject} is Done", flush=True) + break + + try: + working_report, successful = self.run_stage( + stage, subject, working_report, pbar + ) + except Exception: + # TODO: The superclass could be in charge of catching the error, reporting it and cleaning up + # and raise the exception again to be caught here + working_report = self.__report_unhandled_exception( + stage, subject, working_report + ) + print(traceback.format_exc()) + successful = False + + report.update(working_report) + write_report(report, report_path) + + if not successful: + # Send back a signal that a dset stage failed + if isinstance(stage, DatasetStage): + should_stop = True + break + + return report, should_stop + + def run_stage(self, stage, subject, report, pbar): + successful = False + if isinstance(stage, RowStage): + pbar.set_description(f"{subject} | {stage.name}") + report, successful = stage.execute(subject, report) + elif isinstance(stage, DatasetStage): + pbar.set_description(f"{stage.name}") + report, successful = stage.execute(report) + + + return report, successful + + def __report_unhandled_exception( + self, + stage: Stage, + subject: Union[int, str], + report: DataFrame, + ): + # Assign a special status code for unhandled errors, associated + # to the stage status code + status_code = -stage.status_code - 0.101 + name = f"{stage.name.upper().replace(' ', '_')}_UNHANDLED_ERROR" + comment = traceback.format_exc() + data_path = report.loc[subject]["data_path"] + labels_path = report.loc[subject]["labels_path"] + + body = { + "status": status_code, + "status_name": name, + "comment": comment, + "data_path": data_path, + "labels_path": labels_path, + } + + report.loc[subject] = body + + return report diff --git a/examples/RANO/data_preparator/project/stages/row_stage.py b/examples/RANO/data_preparator/project/stages/row_stage.py new file mode 100644 index 000000000..70701beb0 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/row_stage.py @@ -0,0 +1,34 @@ +from abc import ABC, abstractmethod +from typing import Union, Tuple +import pandas as pd + +from .stage import Stage + + +class RowStage(Stage, ABC): + @abstractmethod + def could_run(self, index: Union[str, int], report: pd.DataFrame) -> bool: + """Establishes if this step could be executed for the given case + + Args: + index (Union[str, int]): case index in the report + report (pd.DataFrame): Dataframe containing the current state of the preparation flow + + Returns: + bool: wether this stage could be executed + """ + + @abstractmethod + def execute( + self, index: Union[str, int], report: pd.DataFrame + ) -> Tuple[pd.DataFrame, bool]: + """Executes the stage on the given case + + Args: + index (Union[str, int]): case index in the report + report (pd.DataFrame): DataFrame containing the current state of the preparation flow + + Returns: + pd.DataFrame: Updated report dataframe + bool: Success status + """ diff --git a/examples/RANO/data_preparator/project/stages/split.py b/examples/RANO/data_preparator/project/stages/split.py new file mode 100644 index 000000000..7580b06ba --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/split.py @@ -0,0 +1,101 @@ +import os +import yaml +import pandas as pd +from typing import List +import math + +from .dset_stage import DatasetStage +from .utils import get_id_tp, cleanup_storage +from .mlcube_constants import DONE_STAGE_STATUS + + +def row_to_path(row: pd.Series) -> str: + id = row["SubjectID"] + tp = row["Timepoint"] + return os.path.join(id, tp) + + +class SplitStage(DatasetStage): + def __init__( + self, + params: str, + data_path: str, + labels_path: str, + staging_folders: List[str], + ): + self.params = params + self.data_path = data_path + self.labels_path = labels_path + self.split_csv_path = os.path.join(data_path, "splits.csv") + self.train_csv_path = os.path.join(data_path, "train.csv") + self.val_csv_path = os.path.join(data_path, "val.csv") + self.staging_folders = staging_folders + + @property + def name(self) -> str: + return "Generate splits" + + @property + def status_code(self) -> int: + return DONE_STAGE_STATUS + + def could_run(self, report: pd.DataFrame) -> bool: + split_exists = os.path.exists(self.split_csv_path) + if split_exists: + # This stage already ran + return False + + for index in report.index: + id, tp = get_id_tp(index) + case_data_path = os.path.join(self.data_path, id, tp) + case_labels_path = os.path.join(self.labels_path, id, tp) + data_exists = os.path.exists(case_data_path) + labels_exist = os.path.exists(case_labels_path) + + if not data_exists or not labels_exist: + # Some subjects are not ready + return False + + return True + + def __report_success(self, report: pd.DataFrame) -> pd.DataFrame: + report["status"] = self.status_code + + return report + + def execute(self, report: pd.DataFrame) -> pd.DataFrame: + with open(self.params, "r") as f: + params = yaml.safe_load(f) + + seed = params["seed"] + train_pct = params["train_percent"] + + split_df = report.copy(deep=True) + split_df["SubjectID"] = split_df.index.str.split("|").str[0] + split_df["Timepoint"] = split_df.index.str.split("|").str[1] + split_df = split_df[["SubjectID", "Timepoint"]].reset_index(drop=True) + subjects = split_df["SubjectID"].drop_duplicates() + subjects = subjects.sample(frac=1, random_state=seed) + train_size = math.floor(len(subjects) * train_pct) + + train_subjects = subjects.iloc[:train_size] + val_subjects = subjects.iloc[train_size:] + + train_mask = split_df["SubjectID"].isin(train_subjects) + val_mask = split_df["SubjectID"].isin(val_subjects) + + split_df.loc[train_mask, "Split"] = "Train" + split_df.loc[val_mask, "Split"] = "Val" + + split_df.to_csv(self.split_csv_path, index=False) + + # Generate separate splits files with relative path + split_df["path"] = split_df.apply(row_to_path, axis=1) + + split_df.loc[train_mask].to_csv(self.train_csv_path, index=False) + split_df.loc[val_mask].to_csv(self.val_csv_path, index=False) + + report = self.__report_success(report) + cleanup_storage(self.staging_folders) + + return report, True diff --git a/examples/RANO/data_preparator/project/stages/stage.py b/examples/RANO/data_preparator/project/stages/stage.py new file mode 100644 index 000000000..ac453bd6d --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/stage.py @@ -0,0 +1,5 @@ +from abc import ABC + +class Stage(ABC): + name: str + status_code: int \ No newline at end of file diff --git a/examples/RANO/data_preparator/project/stages/utils.py b/examples/RANO/data_preparator/project/stages/utils.py new file mode 100644 index 000000000..1e2a4c600 --- /dev/null +++ b/examples/RANO/data_preparator/project/stages/utils.py @@ -0,0 +1,150 @@ +import os +import shutil +from tqdm import tqdm +from functools import reduce +from pathlib import Path +import hashlib + +# Taken from https://code.activestate.com/recipes/577879-create-a-nested-dictionary-from-oswalk/ +def get_directory_structure(rootdir): + """ + Creates a nested dictionary that represents the folder structure of rootdir + """ + dir = {} + rootdir = rootdir.rstrip(os.sep) + start = rootdir.rfind(os.sep) + 1 + for path, dirs, files in os.walk(rootdir): + folders = path[start:].split(os.sep) + subdir = dict.fromkeys(files) + parent = reduce(dict.get, folders[:-1], dir) + parent[folders[-1]] = subdir + return dir + + +def has_prepared_folder_structure(data_path, labels_path) -> bool: + data_struct = list(get_directory_structure(data_path).values())[0] + labels_struct = list(get_directory_structure(labels_path).values())[0] + + expected_data_files = ["brain_t1c.nii.gz", "brain_t1n.nii.gz", "brain_t2f.nii.gz", "brain_t2w.nii.gz"] + expected_labels_files = ["final_seg.nii.gz"] + + if "splits.csv" not in data_struct: + return False + + for id in data_struct.keys(): + if data_struct[id] is None: + # This is a file, ignore + continue + for tp in data_struct[id].keys(): + expected_subject_data_files = set(["_".join([id, tp, file]) for file in expected_data_files]) + expected_subject_labels_files = set(["_".join([id, tp, file]) for file in expected_labels_files]) + + found_data_files = set(data_struct[id][tp].keys()) + found_labels_files = set(labels_struct[id][tp].keys()) + + data_files_diff = len(expected_subject_data_files - found_data_files) + labels_files_diff = len(expected_subject_labels_files - found_labels_files) + if data_files_diff or labels_files_diff: + return False + + # Passed all checks + return True + + +def normalize_path(path: str) -> str: + """Remove mlcube-specific components from the given path + + Args: + path (str): mlcube path + + Returns: + str: normalized path + """ + # for this specific problem, we know that all paths start with `/mlcube_io*` + # and that this pattern won't change, shrink or grow. We can therefore write a + # simple, specific solution + if path.startswith("/mlcube_io"): + return path[12:] + + # In case the path has already been normalized + return path + +def unnormalize_path(path: str, parent: str) -> str: + """Add back mlcube-specific components to the given path + + Args: + path (str): normalized path + + Returns: + str: mlcube-specific path + """ + if path.startswith(os.path.sep): + path = path[1:] + return os.path.join(parent, path) + +def update_row_with_dict(df, d, idx): + for key in d.keys(): + df.loc[idx, key] = d.get(key) + + +def get_id_tp(index: str): + return index.split("|") + + +def set_files_read_only(path): + for root, dirs, files in os.walk(path): + for file_name in files: + file_path = os.path.join(root, file_name) + os.chmod(file_path, 0o444) # Set read-only permission for files + + for dir_name in dirs: + dir_path = os.path.join(root, dir_name) + set_files_read_only( + dir_path + ) # Recursively call the function for subdirectories + + +def cleanup_storage(remove_folders): + for folder in remove_folders: + shutil.rmtree(folder, ignore_errors=True) + + +def copy_files(src_dir, dest_dir): + # Ensure the destination directory exists + os.makedirs(dest_dir, exist_ok=True) + + # Iterate through the files in the source directory + for filename in os.listdir(src_dir): + src_file = os.path.join(src_dir, filename) + dest_file = os.path.join(dest_dir, filename) + + # Check if the item is a file (not a directory) + if os.path.isfile(src_file): + shutil.copy2(src_file, dest_file) # Copy the file + + +# Taken from https://stackoverflow.com/questions/24937495/how-can-i-calculate-a-hash-for-a-filesystem-directory-using-python +def md5_update_from_dir(directory, hash): + assert Path(directory).is_dir() + for path in sorted(Path(directory).iterdir(), key=lambda p: str(p).lower()): + hash.update(path.name.encode()) + if path.is_file(): + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash.update(chunk) + elif path.is_dir(): + hash = md5_update_from_dir(path, hash) + return hash + + +def md5_dir(directory): + return md5_update_from_dir(directory, hashlib.md5()).hexdigest() + + +def md5_file(filepath): + return hashlib.md5(open(filepath,'rb').read()).hexdigest() + + +class MockTqdm(tqdm): + def __getattr__(self, attr): + return lambda *args, **kwargs: None diff --git a/examples/RANO/data_preparator/project/statistics.py b/examples/RANO/data_preparator/project/statistics.py new file mode 100644 index 000000000..d31ea02bf --- /dev/null +++ b/examples/RANO/data_preparator/project/statistics.py @@ -0,0 +1,53 @@ +import os +import yaml +import argparse +import pandas as pd + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("MedPerf Statistics Example") + parser.add_argument( + "--data_path", + dest="data", + type=str, + help="directory containing the prepared data", + ) + parser.add_argument( + "--labels_path", + dest="labels", + ) + parser.add_argument( + "--out_file", dest="out_file", type=str, help="file to store statistics" + ) + parser.add_argument( + "--metadata_path", + dest="metadata_path", + type=str, + help="path to the local metadata folder", + ) + + args = parser.parse_args() + + splits_path = os.path.join(args.data, "splits.csv") + invalid_path = os.path.join(args.metadata_path, ".invalid.txt") + + invalid_subjects = [] + if os.path.exists(invalid_path): + with open(invalid_path, "r") as f: + invalid_subjects = f.readlines() + + splits_df = pd.read_csv(splits_path) + + num_train_subjects = len(splits_df[splits_df["Split"] == "Train"]) + num_val_subjects = len(splits_df[splits_df["Split"] == "Val"]) + + num_invalid_subjects = len(invalid_subjects) + + stats = { + "num_train_subjects": num_train_subjects, + "num_val_subjects": num_val_subjects, + "num_invalid_subjects": num_invalid_subjects + } + + with open(args.out_file, "w") as f: + yaml.dump(stats, f) From 8f4b9958d6178c7bd2c4d000f75d64a4bfb0276c Mon Sep 17 00:00:00 2001 From: Alejandro Aristizabal Date: Thu, 12 Dec 2024 16:08:33 -0500 Subject: [PATCH 2/3] Write a build script for rano data prep --- examples/RANO/data_preparator/build.sh | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100755 examples/RANO/data_preparator/build.sh diff --git a/examples/RANO/data_preparator/build.sh b/examples/RANO/data_preparator/build.sh new file mode 100755 index 000000000..3a26699ce --- /dev/null +++ b/examples/RANO/data_preparator/build.sh @@ -0,0 +1,10 @@ +#! /bin/bash +mkdir tmp +cd tmp +git clone -b fets_2.0 https://github.com/FeTS-AI/Front-End.git +cd Front-End +git submodule update --init +docker build --target=fets_base -t local/fets_tool . +cd ../../ +docker build --platform=linux/amd64 -t mlcommons/rano-data-prep-mlcube:1.0.10 . +rm -rf tmp \ No newline at end of file From ce44f2fda336f99d4db2096e93fb1cd3fd9bc8ad Mon Sep 17 00:00:00 2001 From: Alejandro Aristizabal Date: Thu, 12 Dec 2024 16:08:51 -0500 Subject: [PATCH 3/3] Fix Dockerfile to point to correct paths --- examples/RANO/data_preparator/Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/RANO/data_preparator/Dockerfile b/examples/RANO/data_preparator/Dockerfile index e75bd646e..6ef4715ae 100644 --- a/examples/RANO/data_preparator/Dockerfile +++ b/examples/RANO/data_preparator/Dockerfile @@ -1,10 +1,10 @@ -FROM locally-built-fetstool AS data_prep +FROM local/fets_tool AS data_prep RUN find /Front-End/bin/install/appdir/usr/bin -type f \( -perm -u=x -o -type l \) -exec cp -P {} /usr/bin \; WORKDIR / -COPY ./mlcubes/data_preparation/project/requirements.txt /project/requirements.txt +COPY ./project/requirements.txt /project/requirements.txt RUN pip install --upgrade pip @@ -30,6 +30,6 @@ RUN /nnunet_env/bin/pip install git+https://github.com/MIC-DKFZ/nnUNet.git@nnune ENV CUDA_VISIBLE_DEVICES="0" -COPY ./mlcubes/data_preparation/project /project +COPY ./project /project ENTRYPOINT ["python", "/project/mlcube.py"]