diff --git a/bin/environment-local.yml b/bin/environment-local.yml index a20c5cf98..533c8b208 100644 --- a/bin/environment-local.yml +++ b/bin/environment-local.yml @@ -51,3 +51,4 @@ dependencies: # Numpy has to be here to make sure that pypi doesn't # try to install a different version - numpy==2.3.* + - dsigma==1.1.* diff --git a/bin/environment-perlmutter.yml b/bin/environment-perlmutter.yml index f2100d08b..cdc1a857c 100644 --- a/bin/environment-perlmutter.yml +++ b/bin/environment-perlmutter.yml @@ -62,3 +62,5 @@ dependencies: - numpy==2.3.* # NERSC-specific - git+https://github.com/LSSTDESC/dataregistry.git + - dsigma==1.1.* + \ No newline at end of file diff --git a/examples/desi_dr1_stage3/config.yml b/examples/desi_dr1_stage3/config.yml new file mode 100644 index 000000000..75eee5824 --- /dev/null +++ b/examples/desi_dr1_stage3/config.yml @@ -0,0 +1,286 @@ +# Values in this section are accessible to all the different stages. +# They can be overridden by individual stages though. +global: + # This is read by many stages that read complete + # catalog data, and tells them how many rows to read + # at once + chunk_rows: 100000 + # These mapping options are also read by a range of stages + pixelization: healpix + nside: 512 + sparse: true # Generate sparse maps - faster if using small areas + +TXGCRTwoCatalogInput: + metacal_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/metacal_table_summary + photo_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/object_table_summary + +TXMetacalGCRInput: + cat_name: dc2_object_run2.1i_dr1b_with_metacal_griz + +TXExposureInfo: + dc2_name: 1.2p + +TXShearCalibration: + use_true_shear: false + chunk_rows: 100000 + subtract_mean_shear: true + extra_cols: [''] + shear_catalog_type: hsc + +TXCosmoDC2Mock: + cat_name: cosmoDC2_v1.1.4_image + visits_per_band: 16 + extra_cols: redshift_true size_true shear_1 shear_2 + flip_g2: true # to match metacal + +TXIngestRedmagic: + lens_zbin_edges: [0.1, 0.3, 0.5] + +TXIngestDESI: + mock: false + lens_zbin_edges: [0.0, 3.0] + dz: 0.001 + +TXIngestDESIRandoms: + lens_zbin_edges: [0.0, 3.0] + ra_col: RA + dec_col: DEC + z_col: Z + +TXDESISelector: + ra_col: RA + dec_col: DEC + z_col: Z + w_col: WEIGHT + apply_mask: false + zmin: 0.0 + +TXDeltaSigma: + source_bins: [0,1,2,3] # no non-tomograhpic redshift distribution. + lens_bins: [0] + lens_source_sep: 0.1 + photoz: True + r_min: 0.35 + r_max: 70.0 + nbins: 15 + lower_bin_edge: [0.0, 0.358, 0.631, 0.872] + source_cat_w_col: w + lens_cat_w_col: w_sys + n_jobs: 128 + +TXSourceSelectorHSC: + input_pz: true + true_z: false + bands: i #used for selection + T_cut: 0.0 + s2n_cut: 10.0 + max_rows: 10000000000 + source_zbin_edges: [0.5, 1.5, 2.5, 3.5, 4.5] + shear_catalog_type: hsc + + +PZPDFMLZ: + nz: 301 + zmax: 3.0 + + +PZRailTrainSource: + class_name: BPZ_lite + zmin: 0.0 + zmax: 3.0 + dz: 0.01 + columns_file: ./data/bpz_riz.columns + data_path: ./data/example/rail-bpz-inputs + spectra_file: CWWSB4.list + prior_band: i + # Not sure about this + prior_file: hdfn_gen + p_min: 0.005 + gauss_kernel: 0.0 + mag_err_min: 0.005 + inform_options: {save_train: false, load_model: false, modelfile: BPZpriormodel.out} + madau_reddening: no + bands: riz + zp_errors: [0.01, 0.01, 0.01] + +PZRailEstimateSource: + convert_unseen: true # needed for BPZ + + +# Mock version of stacking: +TXSourceTrueNumberDensity: + nz: 301 + zmax: 3.0 + +# Mock version of stacking: +TXTrueNumberDensity: + nz: 301 + zmax: 3.0 + +TXSourceSelectorMetacal: + input_pz: false + true_z: true + bands: riz #used for selection + T_cut: 0.5 + s2n_cut: 10.0 + max_rows: 1000 + delta_gamma: 0.02 + source_zbin_edges: [0.5, 0.7, 0.9, 1.1, 2.0] + +TXSourceTomography: + bands: riz #used for selection + source_zbin_edges: [0.5, 0.7, 0.9, 1.1, 2.0] + +TXTruthLensSelector: + # Mag cuts + input_pz: false + true_z: true + lens_zbin_edges: [0.1, 0.3, 0.5] + cperp_cut: 0.2 + r_cpar_cut: 13.5 + r_lo_cut: 16.0 + r_hi_cut: 21.6 + i_lo_cut: 17.5 + i_hi_cut: 21.9 + r_i_cut: 2.0 + # may also need one for r_cpar_cut + +TXMeanLensSelector: + # Mag cuts + lens_zbin_edges: [0.0, 0.2, 0.4] + cperp_cut: 0.2 + r_cpar_cut: 13.5 + r_lo_cut: 16.0 + r_hi_cut: 21.6 + i_lo_cut: 17.5 + i_hi_cut: 21.9 + r_i_cut: 2.0 + +TXModeLensSelector: + # Mag cuts + lens_zbin_edges: [0.0, 0.2, 0.4] + cperp_cut: 0.2 + r_cpar_cut: 13.5 + r_lo_cut: 16.0 + r_hi_cut: 21.6 + i_lo_cut: 17.5 + i_hi_cut: 21.9 + r_i_cut: 2.0 + +TXRandomCat: + density: 10 # gals per sq arcmin + +TXTwoPoint: + bin_slop: 0.1 + delta_gamma: 0.02 + do_pos_pos: true + do_shear_shear: true + do_shear_pos: true + flip_g2: true # use true when using metacal shears + min_sep: 2.5 + max_sep: 60.0 + nbins: 10 + verbose: 0 + subtract_mean_shear: true + +TXGammaTBrightStars: {} + +TXGammaTDimStars: {} + +TXGammaTRandoms: {} + +TXGammaTFieldCenters: {} + +TXBlinding: + seed: 1972 ## seed uniquely specifies the shift in parameters + Omega_b: [0.0485, 0.001] ## fiducial_model_value, shift_sigma + Omega_c: [0.2545, 0.01] + w0: [-1.0, 0.1] + h: [0.682, 0.02] + sigma8: [0.801, 0.01] + n_s: [0.971, 0.03] + b0: 0.95 ## we use bias of the form b0/g + delete_unblinded: true + +TXSourceDiagnosticPlots: + nbins: 20 + g_min: -0.01 + g_max: 0.01 + psfT_min: 0.2 + psfT_max: 0.28 + T_min: 0.01 + T_max: 2.1 + s2n_min: 1.25 + s2n_max: 300 + +TXDiagnosticMaps: + sparse: true # Generate sparse maps - faster if using small areas + snr_threshold: 10.0 + snr_delta: 1.0 + # pixelization: gnomonic + pixel_size: 0.2 + ra_cent: 62. + dec_cent: -35. + npix_x: 60 + npix_y: 60 + depth_cut: 23.0 + +TXSourceMaps: + sparse: true # Generate sparse maps - faster if using small areas + +TXLensMaps: + sparse: true # Generate sparse maps - faster if using small areas + +# For redmagic mapping +TXExternalLensMaps: + chunk_rows: 100000 + sparse: true + pixelization: healpix + nside: 512 + + + +TXAuxiliarySourceMaps: + flag_exponent_max: 8 + +TXAuxiliaryLensMaps: + flag_exponent_max: 8 + bright_obj_threshold: 22.0 # The magnitude threshold for a object to be counted as bright + depth_band: i + snr_threshold: 10.0 # The S/N value to generate maps for (e.g. 5 for 5-sigma depth) + snr_delta: 1.0 # The range threshold +/- delta is used for finding objects at the boundary + +TXRealGaussianCovariance: + min_sep: 2.5 + max_sep: 60. + nbins: 10 + pickled_wigner_transform: data/example/inputs/wigner.pkl + + +TXJackknifeCenters: + npatch: 5 + + +TXTwoPointFourier: + flip_g2: true + bandwidth: 100 + +TXSimpleMask: + depth_cut: 23.5 + bright_object_max: 10.0 + +TXMapCorrelations: + supreme_path_root: data/example/inputs/supreme + outlier_fraction: 0.05 + nbin: 20 + +TXPhotozPlotSource: + name: TXPhotozPlotSource +TXPhotozPlotLens: + name: TXPhotozPlotLens +TXTruePhotozStackSource: + name: TXTruePhotozStackSource + weight_col: metacal/weight + redshift_group: metacal + zmax: 2.0 + nz: 201 diff --git a/examples/desi_dr1_stage3/pipeline.yml b/examples/desi_dr1_stage3/pipeline.yml new file mode 100644 index 000000000..27b0b4844 --- /dev/null +++ b/examples/desi_dr1_stage3/pipeline.yml @@ -0,0 +1,173 @@ +# Stages to run +stages: + # - name: TXIngestDataPreview2 + # - name: TXCutShearCatalog + # aliases: + # catalog: shear_catalog + # cut_catalog: cut_shear_catalog + # - name: TXSourceSelectorMetacal + # - name: TXSourceSelectorMetadetect + # aliases: + # shear_catalog: cut_shear_catalog + # - name: TXSourceTomography + # - name: TXSourceTomography + # - name: TXShearCalibration + # aliases: + # shear_catalog: cut_shear_catalog + # - name: TXCustomMask + - name: TXIngestDESIRandoms + # - name: PZRailSummarizeSource # Run the DIR method on the lens sample to find n(z) + # classname: PZRailSummarize + # aliases: + # tomography_catalog: shear_tomography_catalog + # binned_catalog: binned_shear_catalog + # model: source_photoz_model + # photoz_stack: shear_photoz_stack + # photoz_realizations: source_photoz_realizations + # - name: TXExternalLensCatalogSplitter + # - name: TXStarCatalogSplitter + # - name: TXTruePhotozStackSource + # classname: TXTruePhotozStack + # aliases: + # tomography_catalog: shear_tomography_catalog + # catalog: shear_catalog + # weights_catalog: shear_catalog + # photoz_stack: shear_photoz_stack + # - name: TXIngestRedmagic + - name: TXDESIJointMask + - name: TXDESISelector + - name: TXIngestDESI + - name: TXTracerMetadata + # aliases: + # shear_catalog: cut_shear_catalog + # - name: TXCutCatalog + # aliases: + # catalog: shear_catalog + # cut_catalog: cut_shear_catalog + # - name: TXSourceMaps + # - name: TXExternalLensMaps + # - name: TXAuxiliarySourceMaps + # - name: TXAuxiliaryLensMaps + # - name: TXSimpleMask + # - name: TXLSSWeightsUnit + # - name: TXDensityMaps + # - name: TXMapPlots + # - name: TXRandomCat + - name: TXJackknifeCenters + # - name: TXTwoPoint + # threads_per_process: 2 + - name: TXDeltaSigma + nprocess: 4 + threads_per_process: 1 + - name: TXDeltaSigmaPlots + # - name: TXBlinding # Blind the data following Muir et al + # threads_per_process: 2 + # - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later + - name: TXPhotozPlotSource # Plot the bin n(z) + classname: TXPhotozPlot + aliases: + photoz_stack: shear_photoz_stack + nz_plot: nz_source + - name: TXPhotozPlotLens # Plot the bin n(z) + classname: TXPhotozPlot + aliases: + photoz_stack: lens_photoz_stack + nz_plot: nz_lens + # - name: TXDiagnosticQuantiles + # - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots + # - name: TXGammaTFieldCenters # Compute and plot gamma_t around center points + # threads_per_process: 2 + # - name: TXGammaTRandoms # Compute and plot gamma_t around randoms + # threads_per_process: 2 + # - name: TXGammaTStars # Compute and plot gamma_t around dim stars + # threads_per_process: 2 + # - name: TXRoweStatistics # Compute and plot Rowe statistics + # threads_per_process: 2 + # - name: TXPSFDiagnostics # Compute and plots other PSF diagnostics + # - name: TXBrighterFatterPlot # Make plots tracking the brighter-fatter effect + # - name: TXSourceNoiseMaps + # - name: TXExternalLensNoiseMaps + # - name: TXConvergenceMaps # Make convergence kappa maps from g1, g2 maps + # - name: TXConvergenceMapPlots # Plot the convergence map + # - name: TXTwoPointPlots + # - name: TXTwoPointFourier + # threads_per_process: 2 + # - name: TXRealGaussianCovariance # Compute covariance of real-space correlations + +# Where to put outputs +output_dir: data/example/outputs_desi_dr1_stage3 + +# How to run the pipeline: mini, parsl, or cwl +launcher: + name: mini + interval: 1.0 + +# Where to run the pipeline: cori-interactive, cori-batch, or local +site: + name: local + max_threads: 2 + +# modules and packages to import that have pipeline +# stages defined in them +modules: txpipe + +# where to find any modules that are not in this repo, +# and any other code we need. +python_paths: + - submodules/WLMassMap/python/desc/ + +# configuration settings +config: examples/desi_dr1_stage3/config.yml + +# On NERSC, set this before running: +# export DATA=${LSST}/groups/WL/users/zuntz/data/metacal-testbed + +inputs: + # See README for paths to download these files + # shear_catalog: /scratch/gpfs/AAMON/desc_analysis/TXPipe/data/shear_catalog_desy3_unmasked_withfakez_v2.h5 + # shear_tomography_classifier: none + shear_catalog: none #/scratch/gpfs/AAMON/desc_analysis/catalogs/metacal_desy3.hdf5 + shear_tomography_catalog: none + #binned_shear_catalog: /scratch/gpfs/AAMON/desc_analysis/catalogs/cut_binned_metacal_desy3.hdf5 + #shear_photoz_stack: /scratch/gpfs/AAMON/desc_analysis/catalogs/shear_photoz_stack_metacal_desy3.hdf5 + binned_shear_catalog: + name: desy3_binned_shear_catalog + shear_photoz_stack: + name: desy3_shear_photoz_stack + shear_mask: + name: desy3_shear_mask + desi_mask: + name: desi_dr1_mask + desi_catalog: + name: desi_dr1_catalog + desi_random_catalog: + name: desi_dr1_random_catalog + + # photometry_catalog: data/example/inputs/photometry_catalog.hdf5 + # photoz_trained_model: data/example/inputs/cosmoDC2_trees_i25.3.npy + # exposures: data/example/inputs/exposures.hdf5 + # star_catalog: data/example/inputs/star_catalog.hdf5 + # desi_random_catalog: /scratch/gpfs/AAMON/desc_analysis/catalogs/BGS_ANY_SGC_0_clustering.ran.fits + # random_cats: /scratch/gpfs/AAMON/desc_analysis/catalogs/random_cats.hdf5 + # binned_random_catalog: /scratch/gpfs/AAMON/desc_analysis/catalogs/binned_randoms_bin0.hdf5 + # binned_random_catalog_sub: /scratch/gpfs/AAMON/desc_analysis/catalogs/random_cats.hdf5 + + spectroscopic_catalog: None + # desi_catalog: /scratch/gpfs/AAMON/desc_analysis/catalogs/BGS_ANY_SGC_clustering.dat.fits + # mask should be already a combination of source and lens. + # desi_mask: /scratch/gpfs/AAMON/desc_analysis/catalogs/mask_BGS_ANY_SGC_clustering.h5 + # shear_mask: /scratch/gpfs/AAMON/desc_analysis/catalogs/desy3_mask.h5 + # This file comes with the code + fiducial_cosmology: data/fiducial_cosmology.yml + # spectroscopic_catalog: data/example/inputs/mock_spectroscopic_catalog.hdf5 + +# if supported by the launcher, restart the pipeline where it left off +# if interrupted +resume: true +# where to put output logs for individual stages +log_dir: data/example/logs_desi_dr1_stage3 +# where to put an overall parsl pipeline log +pipeline_log: data/example/log_desi_dr1_stage3.txt +registry: + owner_type: project + owner: wlss-dp2-desi-delta-sigma \ No newline at end of file diff --git a/examples/desi_mocks/config.yml b/examples/desi_mocks/config.yml new file mode 100644 index 000000000..8cb142a0b --- /dev/null +++ b/examples/desi_mocks/config.yml @@ -0,0 +1,253 @@ +# Values in this section are accessible to all the different stages. +# They can be overridden by individual stages though. +global: + # This is read by many stages that read complete + # catalog data, and tells them how many rows to read + # at once + chunk_rows: 100000 + # These mapping options are also read by a range of stages + pixelization: healpix + nside: 512 + sparse: true # Generate sparse maps - faster if using small areas + +TXGCRTwoCatalogInput: + metacal_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/metacal_table_summary + photo_dir: /global/cscratch1/sd/desc/DC2/data/Run2.2i/dpdd/Run2.2i-t3828/object_table_summary + +TXMetacalGCRInput: + cat_name: dc2_object_run2.1i_dr1b_with_metacal_griz + +TXExposureInfo: + dc2_name: 1.2p + +TXShearCalibration: {} + +TXCosmoDC2Mock: + cat_name: cosmoDC2_v1.1.4_image + visits_per_band: 16 + extra_cols: redshift_true size_true shear_1 shear_2 + flip_g2: true # to match metacal + +TXIngestRedmagic: + lens_zbin_edges: [0.1, 0.3, 0.5] + +TXIngestDESI: + mock: true + lens_zbin_edges: [0.0, 3.0] + dz: 0.001 + +TXDESIMockSelector: + bands: ugrizy + ra_col: ra + dec_col: dec + z_col: redshift + w_col: weight + apply_mask: true + zmin: 0.5 + + +PZPDFMLZ: + nz: 301 + zmax: 3.0 + + +PZRailTrainSource: + class_name: BPZ_lite + zmin: 0.0 + zmax: 3.0 + dz: 0.01 + columns_file: ./data/bpz_riz.columns + data_path: ./data/example/rail-bpz-inputs + spectra_file: CWWSB4.list + prior_band: i + # Not sure about this + prior_file: hdfn_gen + p_min: 0.005 + gauss_kernel: 0.0 + mag_err_min: 0.005 + inform_options: {save_train: false, load_model: false, modelfile: BPZpriormodel.out} + madau_reddening: no + bands: riz + zp_errors: [0.01, 0.01, 0.01] + +PZRailEstimateSource: + convert_unseen: true # needed for BPZ + + +# Mock version of stacking: +TXSourceTrueNumberDensity: + nz: 301 + zmax: 3.0 + +# Mock version of stacking: +TXTrueNumberDensity: + nz: 301 + zmax: 3.0 + +TXSourceSelectorMetacal: + input_pz: false + true_z: true + bands: riz #used for selection + T_cut: 0.5 + s2n_cut: 10.0 + max_rows: 1000 + delta_gamma: 0.02 + source_zbin_edges: [0.5, 0.7, 0.9, 1.1, 2.0] + +TXSourceTomography: + bands: riz #used for selection + source_zbin_edges: [0.5, 0.7, 0.9, 1.1, 2.0] + +TXTruthLensSelector: + # Mag cuts + input_pz: false + true_z: true + lens_zbin_edges: [0.1, 0.3, 0.5] + cperp_cut: 0.2 + r_cpar_cut: 13.5 + r_lo_cut: 16.0 + r_hi_cut: 21.6 + i_lo_cut: 17.5 + i_hi_cut: 21.9 + r_i_cut: 2.0 + # may also need one for r_cpar_cut + +TXMeanLensSelector: + # Mag cuts + lens_zbin_edges: [0.0, 0.2, 0.4] + cperp_cut: 0.2 + r_cpar_cut: 13.5 + r_lo_cut: 16.0 + r_hi_cut: 21.6 + i_lo_cut: 17.5 + i_hi_cut: 21.9 + r_i_cut: 2.0 + +TXModeLensSelector: + # Mag cuts + lens_zbin_edges: [0.0, 0.2, 0.4] + cperp_cut: 0.2 + r_cpar_cut: 13.5 + r_lo_cut: 16.0 + r_hi_cut: 21.6 + i_lo_cut: 17.5 + i_hi_cut: 21.9 + r_i_cut: 2.0 + +TXRandomCat: + density: 10 # gals per sq arcmin + +TXTwoPoint: + bin_slop: 0.1 + delta_gamma: 0.02 + do_pos_pos: true + do_shear_shear: true + do_shear_pos: true + flip_g2: true # use true when using metacal shears + min_sep: 2.5 + max_sep: 60.0 + nbins: 10 + verbose: 0 + subtract_mean_shear: true + +TXGammaTBrightStars: {} + +TXGammaTDimStars: {} + +TXGammaTRandoms: {} + +TXGammaTFieldCenters: {} + +TXBlinding: + seed: 1972 ## seed uniquely specifies the shift in parameters + Omega_b: [0.0485, 0.001] ## fiducial_model_value, shift_sigma + Omega_c: [0.2545, 0.01] + w0: [-1.0, 0.1] + h: [0.682, 0.02] + sigma8: [0.801, 0.01] + n_s: [0.971, 0.03] + b0: 0.95 ## we use bias of the form b0/g + delete_unblinded: true + +TXSourceDiagnosticPlots: + nbins: 20 + g_min: -0.01 + g_max: 0.01 + psfT_min: 0.2 + psfT_max: 0.28 + T_min: 0.01 + T_max: 2.1 + s2n_min: 1.25 + s2n_max: 300 + +TXDiagnosticMaps: + sparse: true # Generate sparse maps - faster if using small areas + snr_threshold: 10.0 + snr_delta: 1.0 + # pixelization: gnomonic + pixel_size: 0.2 + ra_cent: 62. + dec_cent: -35. + npix_x: 60 + npix_y: 60 + depth_cut: 23.0 + +TXSourceMaps: + sparse: true # Generate sparse maps - faster if using small areas + +TXLensMaps: + sparse: true # Generate sparse maps - faster if using small areas + +# For redmagic mapping +TXExternalLensMaps: + chunk_rows: 100000 + sparse: true + pixelization: healpix + nside: 512 + + + +TXAuxiliarySourceMaps: + flag_exponent_max: 8 + +TXAuxiliaryLensMaps: + flag_exponent_max: 8 + bright_obj_threshold: 22.0 # The magnitude threshold for a object to be counted as bright + depth_band: i + snr_threshold: 10.0 # The S/N value to generate maps for (e.g. 5 for 5-sigma depth) + snr_delta: 1.0 # The range threshold +/- delta is used for finding objects at the boundary + +TXRealGaussianCovariance: + min_sep: 2.5 + max_sep: 60. + nbins: 10 + pickled_wigner_transform: data/example/inputs/wigner.pkl + + +TXJackknifeCenters: + npatch: 5 + + +TXTwoPointFourier: + flip_g2: true + bandwidth: 100 + +TXSimpleMask: + depth_cut: 23.5 + bright_object_max: 10.0 + +TXMapCorrelations: + supreme_path_root: data/example/inputs/supreme + outlier_fraction: 0.05 + nbin: 20 + +TXPhotozPlotSource: + name: TXPhotozPlotSource +TXPhotozPlotLens: + name: TXPhotozPlotLens +TXTruePhotozStackSource: + name: TXTruePhotozStackSource + weight_col: metacal/weight + redshift_group: metacal + zmax: 2.0 + nz: 201 diff --git a/examples/desi_mocks/pipeline.yml b/examples/desi_mocks/pipeline.yml new file mode 100644 index 000000000..6e4422361 --- /dev/null +++ b/examples/desi_mocks/pipeline.yml @@ -0,0 +1,113 @@ +# Stages to run +stages: + # - name: TXSourceSelectorMetacal + # - name: TXSourceTomography + # - name: TXShearCalibration + # - name: TXExternalLensCatalogSplitter + # - name: TXStarCatalogSplitter + # - name: TXTruePhotozStackSource + # classname: TXTruePhotozStack + # aliases: + # tomography_catalog: shear_tomography_catalog + # catalog: shear_catalog + # weights_catalog: shear_catalog + # photoz_stack: shear_photoz_stack + # - name: TXIngestRedmagic + - name: TXDESIMockSelector + - name: TXIngestDESI + # - name: TXTracerMetadata + # - name: TXSourceMaps + # - name: TXExternalLensMaps + # - name: TXAuxiliarySourceMaps + # - name: TXAuxiliaryLensMaps + # - name: TXSimpleMask + # - name: TXLSSWeightsUnit + # - name: TXDensityMaps + # - name: TXMapPlots + # - name: TXRandomCat + # - name: TXJackknifeCenters + # - name: TXTwoPoint + # threads_per_process: 2 + # - name: TXBlinding # Blind the data following Muir et al + # threads_per_process: 2 + # - name: TXTwoPointTheoryReal # compute theory using CCL to save in sacc file and plot later + # - name: TXPhotozPlotSource # Plot the bin n(z) + # classname: TXPhotozPlot + # aliases: + # photoz_stack: shear_photoz_stack + # nz_plot: nz_source + - name: TXPhotozPlotLens # Plot the bin n(z) + classname: TXPhotozPlot + aliases: + photoz_stack: lens_photoz_stack + nz_plot: nz_lens + # - name: TXDiagnosticQuantiles + # - name: TXSourceDiagnosticPlots # Make a suite of diagnostic plots + # - name: TXGammaTFieldCenters # Compute and plot gamma_t around center points + # threads_per_process: 2 + # - name: TXGammaTRandoms # Compute and plot gamma_t around randoms + # threads_per_process: 2 + # - name: TXGammaTStars # Compute and plot gamma_t around dim stars + # threads_per_process: 2 + # - name: TXRoweStatistics # Compute and plot Rowe statistics + # threads_per_process: 2 + # - name: TXPSFDiagnostics # Compute and plots other PSF diagnostics + # - name: TXBrighterFatterPlot # Make plots tracking the brighter-fatter effect + # - name: TXSourceNoiseMaps + # - name: TXExternalLensNoiseMaps + # - name: TXConvergenceMaps # Make convergence kappa maps from g1, g2 maps + # - name: TXConvergenceMapPlots # Plot the convergence map + # - name: TXTwoPointPlots + # - name: TXTwoPointFourier + # threads_per_process: 2 + # - name: TXRealGaussianCovariance # Compute covariance of real-space correlations + +# Where to put outputs +output_dir: data/example/outputs_desi_mocks + +# How to run the pipeline: mini, parsl, or cwl +launcher: + name: mini + interval: 1.0 + +# Where to run the pipeline: cori-interactive, cori-batch, or local +site: + name: local + max_threads: 2 + +# modules and packages to import that have pipeline +# stages defined in them +modules: txpipe + +# where to find any modules that are not in this repo, +# and any other code we need. +python_paths: + - submodules/WLMassMap/python/desc/ + +# configuration settings +config: examples/desi_mocks/config.yml + +# On NERSC, set this before running: +# export DATA=${LSST}/groups/WL/users/zuntz/data/metacal-testbed + +inputs: + # See README for paths to download these files + # shear_catalog: data/example/inputs/shear_catalog.hdf5 + # photometry_catalog: data/example/inputs/photometry_catalog.hdf5 + # photoz_trained_model: data/example/inputs/cosmoDC2_trees_i25.3.npy + # exposures: data/example/inputs/exposures.hdf5 + # star_catalog: data/example/inputs/star_catalog.hdf5 + desi_catalog: /scratch/gpfs/AAMON/desc_analysis/catalogs/output_select_lsst_obs_cond_dp2_DESI_ELG_flagship.pq + # mask should be already a combination of source and lens. + mask: /scratch/gpfs/AAMON/desc_analysis/catalogs/dp2_23p5_combined_mask_nmin=500_nside=512.hdf5 # /scratch/gpfs/AAMON/desc_analysis/catalogs/mask_eg.fits + # This file comes with the code + # fiducial_cosmology: data/fiducial_cosmology.yml + # spectroscopic_catalog: data/example/inputs/mock_spectroscopic_catalog.hdf5 + +# if supported by the launcher, restart the pipeline where it left off +# if interrupted +resume: true +# where to put output logs for individual stages +log_dir: data/example/logs_desi_mocks +# where to put an overall parsl pipeline log +pipeline_log: data/example/log_desi_mocks.txt \ No newline at end of file diff --git a/examples/metadetect/config.yml b/examples/metadetect/config.yml index 698f54b46..8c80beda0 100755 --- a/examples/metadetect/config.yml +++ b/examples/metadetect/config.yml @@ -396,8 +396,8 @@ PZRealizationsPlotLens: photoz_realizations_plot: lens_photoz_realizations_plot TXShearCalibration: - add_fiducial_distance: True - copy_redshift: True + add_fiducial_distance: true + copy_redshift: true redshift_name: '00/redshift_true' TXTwoPointSCIA: diff --git a/examples/metadetect/pipeline.yml b/examples/metadetect/pipeline.yml index 7907273b1..1630ced02 100644 --- a/examples/metadetect/pipeline.yml +++ b/examples/metadetect/pipeline.yml @@ -122,6 +122,8 @@ stages: - name: HOSFSB # Disabled since not yet synchronised with new Treecorr MPI - name: TXTwoPointSCIA # Self-calibration intrinsic alignments of galaxies + - name: TXDeltaSigma + - name: TXDeltaSigmaPlots # Disabling these as they takes too long for a quick test. # The latter two need NaMaster diff --git a/txpipe/__init__.py b/txpipe/__init__.py index 8164ef4d5..91c090c4e 100644 --- a/txpipe/__init__.py +++ b/txpipe/__init__.py @@ -11,7 +11,7 @@ TXSourceSelectorLensfit, TXSourceSelectorMetadetect, ) -from .lens_selector import TXMeanLensSelector +from .lens_selector import TXMeanLensSelector, TXDESISelector, TXDESIMockSelector from .photoz_stack import TXPhotozStack, TXPhotozPlot, TXTruePhotozStack from .random_cats import TXRandomCat from .twopoint_fourier import TXTwoPointFourier @@ -41,7 +41,7 @@ from .simulation import TXLogNormalGlass from .magnification import TXSSIMagnification from .covariance_nmt import TXFourierNamasterCovariance, TXRealNamasterCovariance - +from .delta_sigma import TXDeltaSigma # We no longer import all the extensions automatically here to avoid # some dependency problems when running under the LSST environment on NERSC. # You can still import them explicitly in your pipeline scripts by doing: diff --git a/txpipe/data_types.py b/txpipe/data_types.py index 1d7e33f03..dca816f98 100755 --- a/txpipe/data_types.py +++ b/txpipe/data_types.py @@ -705,11 +705,55 @@ def read_provenance(self): return provenance + @classmethod + def add_metadata(cls, sacc_object, provenance, meta): + # We also save the associated metadata to the file + for k, v in meta.items(): + if np.isscalar(v): + sacc_object.metadata[k] = v + else: + for i, vi in enumerate(v): + sacc_object.metadata[f"{k}_{i}"] = vi + + # Add provenance metadata. In managed formats this is done + # automatically, but because the Sacc library is external + # we do it manually here. + provenance.update(SACCFile.generate_provenance()) + for key, value in provenance.items(): + if isinstance(value, str) and "\n" in value: + values = value.split("\n") + for i, v in enumerate(values): + sacc_object.metadata[f"provenance/{key}_{i}"] = v + else: + sacc_object.metadata[f"provenance/{key}"] = value + + def close(self): pass class FiducialCosmology(YamlFile): + + def to_astropy(self): + from astropy.cosmology import w0waCDM + + with open(self.path, "r") as fp: + params = yaml.load(fp, Loader=yaml.Loader) + + astropy_params = dict( + H0=params["H0"], + Om0=params["Omega_m"], + Ode0=1 - params["Omega_k"] - params["Omega_m"], + w0=params["w0"], + wa=params["wa"], + Neff=params["Neff"], + m_nu=params["sum_nu_masses"] + + ) + + cosmo = w0waCDM(**astropy_params) + + return cosmo # TODO replace when CCL has more complete serialization tools. def to_ccl(self, **kwargs): import pyccl as ccl diff --git a/txpipe/delta_sigma.py b/txpipe/delta_sigma.py new file mode 100644 index 000000000..61bbab265 --- /dev/null +++ b/txpipe/delta_sigma.py @@ -0,0 +1,471 @@ +from .base_stage import PipelineStage +from .data_types import SACCFile, ShearCatalog, HDFFile, QPNOfZFile, FiducialCosmology, TextFile, PNGFile +import numpy as np +from ceci.config import StageParameter +import os + + +class TXDeltaSigma(PipelineStage): + """Compute Delta-Sigma, the excess surface density around lenses. + This version uses the dSigma code. + """ + + name = "TXDeltaSigma" + + inputs = [ + ("binned_shear_catalog", ShearCatalog), + ("binned_lens_catalog", HDFFile), + # we use both the binned randoms for the case where we split + # the lens catalog tomographically and the full version for + # when we do the 2D stack of all the lenses together + ("binned_random_catalog", HDFFile), + ("random_cats", HDFFile), + ("shear_photoz_stack", QPNOfZFile), + ("lens_photoz_stack", QPNOfZFile), + # ("tracer_metadata", HDFFile), + ("fiducial_cosmology", FiducialCosmology), + ] + outputs = [("delta_sigma", SACCFile)] + + config_options = { + "source_bins": StageParameter(list, [-1], msg="List of source bins to use (-1 means all)"), + "lens_bins": StageParameter(list, [-1], msg="List of lens bins to use (-1 means all)"), + "r_min": StageParameter(float, 0.1, msg="Minimum radius to use in Mpc"), + "r_max": StageParameter(float, 10.0, msg="Maximum radius to use in Mpc"), + "nbins": StageParameter(int, 10, msg="Number of radial bins"), + "photoz": StageParameter(bool, False, msg="Whether or not objects have point photo-z estimate"), + "lens_source_sep": StageParameter(float, 0.1, msg="Minimum redshift separation between lens and source bins to use for dSigma measurement"), + "lower_bin_edge": StageParameter(list, [0.0, 0.358, 0.631, 0.872], msg="Lower edge of the source redshift bins to use for dSigma measurement, only used if photoz=True"), + "source_cat_w_col": StageParameter(str, "weight", msg="Source catalog weight column name."), + "lens_cat_w_col": StageParameter(str, "weight", msg="Lens catalog weight column name."), + "n_jobs": StageParameter(int, 1, msg="Number of multiprocessing processes to use"), + "n_jackknife": StageParameter(int, 0, msg="Number of jackknife regions to use for error estimation, 0 means no jackknife"), + "subsample": StageParameter(float, 0.0, msg="If set to non-zero, randomly subsample this fraction of the all the data sets for testing."), + } + + def run(self): + import dsigma + import sacc + + bin_pairs = self.get_bin_pairs() + print(bin_pairs) + source_bin = 3 + lens_bin = 0 + + with self.open_input("fiducial_cosmology", wrapper=True) as cosmo_file: + cosmo = cosmo_file.to_astropy() + source_n_of_z = self.load_redshift_distribution("shear_photoz_stack") + + # The lens n_of_z is only used when saving at the end + lens_n_of_z = self.load_redshift_distribution("lens_photoz_stack") + + bins = np.logspace(np.log10(self.config["r_min"]), np.log10(self.config["r_max"]), self.config["nbins"]) + + results = [] + last_source_bin = None + + # suppress numpy division warnings. Real scientists divide by zero + # all the time. + numpy_error_settings = np.seterr(divide="ignore", invalid="ignore") + + for source_bin, lens_bin in self.split_tasks_by_rank(bin_pairs): + # Load the new source bin if changed + if source_bin != last_source_bin: + last_source_bin = source_bin + source_table = self.load_source_table(source_bin) + + # Always reload the lens bins because we will add some + # columns to them in-place in a minute. + lens_table = self.load_lens_table(lens_bin) + randoms_table = self.load_random_table(lens_bin) + + if self.config["photoz"]: + # if we do not have point photo-z estimates for the sources then we use the lower edge of the tomographic bins to make z column + source_table['z'] = np.array(self.config["lower_bin_edge"])[source_table['z_bin']] + + + # Add columns to two tables in-place to do most of the pre-computation work. + # We should look at using the n_jobs multiprocessing option here + # but I don't know if it will play well with MPI on NERSC that we are + # using to split the bin pairs across ranks + print( + f"Computing excess surface density for source = {source_bin}, lens = {lens_bin}, " + f"with {len(source_table)} sources, {len(lens_table)} lenses, {len(randoms_table)} randoms" + ) + progress_bar = self.rank == 0 + n_jobs = self.config["n_jobs"] + + dsigma.precompute.precompute(lens_table, source_table, table_n=source_n_of_z, bins=bins, cosmology=cosmo, progress_bar=progress_bar, n_jobs=n_jobs)#, lens_source_cut=self.config["lens_source_sep"]) + dsigma.precompute.precompute(randoms_table, source_table, table_n=source_n_of_z, bins=bins, cosmology=cosmo, progress_bar=progress_bar, n_jobs=n_jobs)#, lens_source_cut=self.config["lens_source_sep"]) + + + # stack to get the excess surface density Delta(Sigma) + result = dsigma.stacking.excess_surface_density( + lens_table, table_r=randoms_table, random_subtraction=True, boost_correction=True, return_table=True + ) + + # Optionally get the jackknife covariance. + # dSigma does this for us too. The stacking is the relatively + # fast bit so hopefully this is quite fast. + njack = self.config["n_jackknife"] + if njack > 0: + centers = dsigma.jackknife.compute_jackknife_fields(lens_table, njack) + dsigma.jackknife.compute_jackknife_fields(randoms_table, centers) + + # Get the covariance + delta_sigma_cov = dsigma.jackknife.jackknife_resampling( + dsigma.stacking.excess_surface_density, + lens_table, + randoms_table + ) + else: + delta_sigma_cov = None + + + results.append((source_bin, lens_bin, result, delta_sigma_cov)) + + # restore numpy settings + np.seterr(**numpy_error_settings) + + # Collate results from different ranks and save to SACC file + self.save_results(results, source_n_of_z, lens_n_of_z) + + def get_bin_pairs(self): + """ + Determine the list of (source, lens) bin pairs to measure. + + Returns + ------- + bin_pairs: list of tuples + A list of (source_bin, lens_bin) pairs to measure. + """ + + # Get the number of bins of each type from the input files. + with self.open_input("binned_shear_catalog") as shear_file: + nbin_source = shear_file["shear"].attrs["nbin_source"] + with self.open_input("binned_lens_catalog") as lens_file: + nbin_lens = lens_file["lens"].attrs["nbin_lens"] + + # User can override the list of bins to do in the options. + # but if they do not specify then we use everything + source_bins = self.config["source_bins"] + lens_bins = self.config["lens_bins"] + + if source_bins == [-1]: + source_bins = list(range(nbin_source)) + source_bins.append("all") + + if lens_bins == [-1]: + lens_bins = list(range(nbin_lens)) + lens_bins.append("all") + + # Collect all bin pairs together. + # This is everything to be done by all processes. + bin_pairs = [] + for s in source_bins: + for l in lens_bins: + bin_pairs.append((s, l)) + return bin_pairs + + def load_table(self, group, names): + """ + Helper function to read tables from HDF5 into the astropy + format that dSigma expects, with the right column names. + + Parameters + ---------- + group: h5py.Group + The group in the HDF5 file where the data is stored + names: dict + A mapping from the column names that dSigma expects to the column names in TXPipe. + """ + from astropy.table import Table + + table = Table() + for dsigma_name, txpipe_name in names.items(): + table[dsigma_name] = group[txpipe_name][:] + + # Optionally cut down to a random subsample. + # Faster for testing + frac = self.config["subsample"] + if frac > 0: + p = np.random.binomial(1, frac, size=len(table)).astype(bool) + table = table[p] + return table + + def load_source_table(self, bin_index): + """ + Load the lens table for a given bin index + as an astropy table with the columns we need for dSigma. + + Parameters + ---------- + bin_index: int or str + The index of the source bin to load, or "all" for the 2D stack of all the objects + """ + names = { + "ra": "ra", + "dec": "dec", + "z": "z", + "w": self.config["source_cat_w_col"], + "e_1": "g1", + "e_2": "g2", + } + with self.open_input("binned_shear_catalog") as shear_file: + group = shear_file[f"shear/bin_{bin_index}"] + if self.config["photoz"]: + del names["z"] + table = self.load_table(group, names) + nbin = shear_file["shear"].attrs["nbin_source"] + + if bin_index == "all": + table["z_bin"] = np.repeat(nbin, len(table)) + else: + table["z_bin"] = np.repeat(bin_index, len(table)) + + return table + + def load_redshift_distribution(self, file_tag): + """ + Load the redshift distribution for a given source bin index as a tuple of (z, n(z)). + + dSigma wants all the redshift distributions for all the source bins together in a single table, + so we load them all and check they are consistent. + + Parameters + ---------- + file_tag: str + The tag for the input file containing the redshift distribution + + Returns + ------- + table: astropy.table.Table + A table with columns 'z' and 'n' + """ + from astropy.table import Table + + zs = [] + ns = [] + with self.open_input(file_tag, wrapper=True) as f: + nbin = f.get_nbin() + for i in range(nbin): + z, n_of_z = f.get_bin_n_of_z(i) + zs.append(z) + ns.append(n_of_z) + z, n_of_z = f.get_2d_n_of_z() + zs.append(z) + ns.append(n_of_z) + + # check all the z arrays are the same + for i in range(1, len(zs)): + if not np.allclose(zs[i], zs[0]): + raise ValueError("Z arrays for different bins are not the same, cannot use for dSigma") + # stack the n_of_z arrays into a 2D array of shape (nz, nbin) + n_of_z = np.stack(ns, axis=0).T # shape (nz, nbin) + + table = Table({"z": z, "n": n_of_z}) + return table + + def load_lens_table(self, bin_index): + """ + Load the lens table for a given bin index + as an astropy table with the columns we need for dSigma. + + Parameters + ---------- + bin_index: int or str + The index of the lens bin to load, or "all" for the 2D stack of all the objects + + Returns + ------- + table: astropy.table.Table + A table in the format that dSigma expects for lens samples + """ + names = { + "ra": "ra", + "dec": "dec", + "z": "z", + "w_sys": self.config["lens_cat_w_col"], + } + with self.open_input("binned_lens_catalog") as lens_file: + group = lens_file[f"lens/bin_{bin_index}"] + table = self.load_table(group, names) + + return table + + def load_random_table(self, bin_index): + """ + Load the randoms table for a given bin index + as an astropy table with the columns we need for dSigma. + + Parameters + ---------- + bin_index: int or str + The index of the randoms bin to load, or "all" for the 2D stack of all the objects + + Returns + ------- + table: astropy.table.Table + A table in the format that dSigma expects for lens samples + """ + names = { + "ra": "ra", + "dec": "dec", + "z": "z", + } + + if bin_index == "all": + with self.open_input("random_cats") as random_file: + group = random_file["randoms"] + table = self.load_table(group, names) + else: + with self.open_input("binned_random_catalog") as random_file: + group = random_file[f"randoms/bin_{bin_index}"] + table = self.load_table(group, names) + + # randoms should be constructed to have unit weights + for col in list(table.colnames): + table[col] = table[col].astype(np.float64) + table["w_sys"] = np.ones(len(table)) + + return table + + def save_results(self, results, shear_photoz_stack, lens_photoz_stack): + import sacc + + if self.comm is not None: + # Gather results from all ranks to the root rank + results = self.comm.gather(results, root=0) + # flatten back into a single list. I always forget this + if self.rank == 0: + results = [r for sublist in results for r in sublist] + + # Only the root process saves the output file. + if self.rank != 0: + return + + # sort by the bin pairs to put the results in a deterministic order + results = sorted(results, key=lambda x: (str(x[0]), str(x[1]))) + + s = sacc.Sacc() + + # Create tracers for the source sample + # as SACC objects + z = shear_photoz_stack["z"] + Nz = shear_photoz_stack["n"] + nbin_source = Nz.shape[1] + + for i in range(nbin_source): + if i == nbin_source - 1: + i = "all" + s.add_tracer("NZ", f"source_{i}", z, Nz) + + # Create tracers for the lens sample + # as SACC objects + z = lens_photoz_stack["z"] + Nz = lens_photoz_stack["n"] + nbin_lens = Nz.shape[1] + for i in range(nbin_lens): + if i == nbin_lens - 1: + i = "all" + + s.add_tracer("NZ", f"lens_{i}", z, Nz) + + # for each bin pair's results, add all the + # measurements in the output data table + cov_blocks = [] + for source_bin, lens_bin, result, cov_block in results: + tracer1 = f"source_{source_bin}" + tracer2 = f"lens_{lens_bin}" + + for row in result: + # Add the data point and all the various tags + s.add_data_point( + "galaxy_shearDensity_deltasigma", + (tracer1, tracer2), + row["ds"], # Final corrected Delta(Sigma) measurement + rp=row["rp"], # radius of the bin + rp_min=row["rp_min"], # minimum radius of the bin + rp_max=row["rp_max"], # maximum radius of the bin + raw_value=row["ds_raw"], # the raw Delta(Sigma) measurement before boost correction + boost=row["b"], # the boost factor that was applied to get the final Delta(Sigma) + random_subtraction=row["ds_r"], # the contribution from the randoms that was subtracted off + n_pairs=row["n_pairs"], # the number of pairs in the bin + ) + + if cov_block is not None: + cov_blocks.append(cov_block) + + if cov_blocks: + s.add_covariance(cov_blocks) + + # Add provenance and potentially other metadata stuff. + provenance = self.gather_provenance() + other_metadata = {"nbin_source": nbin_source - 1, "nbin_lens": nbin_lens - 1} + + SACCFile.add_metadata(s, provenance, other_metadata) + output_filename = self.get_output("delta_sigma") + # switch to HDF5 backend for sacc since metadata + # handlind is better + print("Saving results to ", output_filename) + s.save_hdf5(output_filename, overwrite=True) + + +class TXDeltaSigmaPlots(PipelineStage): + """Make plots of Delta Sigma results.""" + + name = "TXDeltaSigmaPlots" + inputs = [ + ("delta_sigma", SACCFile), + ("fiducial_cosmology", FiducialCosmology), + ] + outputs = [ + ("delta_sigma_plot", PNGFile), + ] + config_options = {} + + def run(self): + import sacc + import matplotlib.pyplot as plt + + sacc_data = sacc.Sacc.load_hdf5(self.get_input("delta_sigma")) + + # Plot in theta coordinates + nbin_source = sacc_data.metadata["nbin_source"] + nbin_lens = sacc_data.metadata["nbin_lens"] + + if sacc_data.has_covariance(): + cov = sacc_data.covariance.dense + else: + cov = None + + + # Plot in r coordinates + nbin_source = sacc_data.metadata["nbin_source"] + nbin_lens = sacc_data.metadata["nbin_lens"] + with self.open_output("delta_sigma_plot", wrapper=True, figsize=(5 * nbin_lens, 4 * nbin_source)) as fig: + axes = fig.file.subplots(nbin_source, nbin_lens, squeeze=False) + for s in range(nbin_source): + for l in range(nbin_lens): + axes[s, l].set_title(f"Source {s}, Lens {l}") + axes[s, l].set_xlabel("Radius [Mpc]") + axes[s, l].set_ylabel(r"$R \cdot \Delta \Sigma [M_\odot / pc^2]$") + axes[s, l].grid() + x = sacc_data.get_tag("rp", tracers=(f"source_{s}", f"lens_{l}")) + x = np.array(x) + y = sacc_data.get_mean(tracers=(f"source_{s}", f"lens_{l}")) + raw_y = sacc_data.get_tag("raw_value", tracers=(f"source_{s}", f"lens_{l}")) + if cov is not None: + index = sacc_data.indices(tracers=(f"source_{s}", f"lens_{l}")) + cov_block = cov[index][:, index] + error = np.sqrt(np.diag(cov_block)) + axes[s, l].errorbar(x, y * x, yerr=error * x, fmt='+') + # Also plot the raw value with boost/randoms + axes[s, l].plot(x, raw_y * x, 'x') + else: + axes[s, l].plot(x, y * x, '+') + axes[s, l].plot(x, raw_y * x, 'x') + axes[s, l].set_ylim(0, None) + axes[s, l].set_xscale("log") + plt.subplots_adjust(hspace=0.05, wspace=0.05) + plt.tight_layout() diff --git a/txpipe/ingest/__init__.py b/txpipe/ingest/__init__.py index 38573dda4..abdb2892f 100644 --- a/txpipe/ingest/__init__.py +++ b/txpipe/ingest/__init__.py @@ -2,6 +2,7 @@ from .mocks import TXCosmoDC2Mock, TXGaussianSimsMock from .gcr import TXMetacalGCRInput, TXIngestStars from .redmagic import TXIngestRedmagic +from .desi import TXIngestDESI from .ssi import ( TXIngestSSIGCR, TXMatchSSI, diff --git a/txpipe/ingest/desi.py b/txpipe/ingest/desi.py new file mode 100644 index 000000000..0bc696447 --- /dev/null +++ b/txpipe/ingest/desi.py @@ -0,0 +1,321 @@ +from ..base_stage import PipelineStage +from ..data_types import HDFFile, FitsFile, QPNOfZFile, RandomsCatalog, FiducialCosmology +import numpy as np +from ceci.config import StageParameter + +class TXIngestDESIRandoms(PipelineStage): + """ + Ingest a DESI random catalog from FITS and write random_cats, + binned_random_catalog, and a subsampled binned_random_catalog_sub. + + Output format matches TXRandomCat: group "randoms" with datasets + ra, dec, z, comoving_distance, bin at the top level, and per-bin + subgroups bin_0 ... bin_N in the binned outputs. + """ + + name = "TXIngestDESIRandoms" + parallel = False + inputs = [ + ("desi_random_catalog", FitsFile), + ("fiducial_cosmology", FiducialCosmology), + ] + outputs = [ + ("random_cats", RandomsCatalog), + ("binned_random_catalog", RandomsCatalog), + ("binned_random_catalog_sub", RandomsCatalog), + ] + config_options = { + "lens_zbin_edges": StageParameter(list, [float], msg="Edges of lens redshift bins."), + "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), + "sample_rate": StageParameter(float, 0.5, msg="Fraction of randoms retained in the sub-catalog."), + "z_col": StageParameter(str, "z", msg="Redshift column name in the FITS random catalog."), + "ra_col": StageParameter(str, "ra", msg="Right ascension column name in the FITS random catalog."), + "dec_col": StageParameter(str, "dec", msg="Declination column name in the FITS random catalog."), + } + + def run(self): + import pyccl + + with self.open_input("fiducial_cosmology", wrapper=True) as f: + cosmo = f.to_ccl() + + zbin_edges = self.config["lens_zbin_edges"] + nbin = len(zbin_edges) - 1 + chunk_rows = self.config["chunk_rows"] + z_col = self.config["z_col"] + ra_col = self.config["ra_col"] + dec_col = self.config["dec_col"] + + # Count total rows + f = self.open_input("desi_random_catalog") + n_total = f[1].get_nrows() + f.close() + print(f"Total randoms: {n_total}") + + # First pass: count objects per redshift bin + bin_counts = np.zeros(nbin, dtype=np.int64) + for s, e, data in self.iterate_fits("desi_random_catalog", 1, [z_col], chunk_rows): + zbin = np.digitize(data[z_col], zbin_edges) - 1 + zbin[zbin >= nbin] = -1 + valid = zbin >= 0 + bin_counts += np.bincount(zbin[valid], minlength=nbin) + for i, c in enumerate(bin_counts): + print(f" Bin {i}: {c} randoms") + + # Create output files and datasets + random_cats = self.open_output("random_cats") + binned_output = self.open_output("binned_random_catalog") + + g = random_cats.create_group("randoms") + g.create_dataset("ra", (n_total,), dtype=np.float64) + g.create_dataset("dec", (n_total,), dtype=np.float64) + g.create_dataset("z", (n_total,), dtype=np.float64) + g.create_dataset("comoving_distance", (n_total,), dtype=np.float64) + g.create_dataset("bin", (n_total,), dtype=np.int16) + + gb = binned_output.create_group("randoms") + gb.attrs["nbin"] = nbin + subgroups = [] + for i in range(nbin): + sg = gb.create_group(f"bin_{i}") + sg.create_dataset("ra", (bin_counts[i],), dtype=np.float64) + sg.create_dataset("dec", (bin_counts[i],), dtype=np.float64) + sg.create_dataset("z", (bin_counts[i],), dtype=np.float64) + sg.create_dataset("comoving_distance", (bin_counts[i],), dtype=np.float64) + subgroups.append(sg) + + # Second pass: fill data + bin_cursors = np.zeros(nbin, dtype=np.int64) + cols = [ra_col, dec_col, z_col] + for s, e, data in self.iterate_fits("desi_random_catalog", 1, cols, chunk_rows): + ra = data[ra_col] + dec = data[dec_col] + z = data[z_col] + zbin = np.digitize(z, zbin_edges) - 1 + zbin[zbin >= nbin] = -1 + chi = pyccl.comoving_radial_distance(cosmo, 1.0 / (1.0 + z)) + + g["ra"][s:e] = ra + g["dec"][s:e] = dec + g["z"][s:e] = z + g["comoving_distance"][s:e] = chi + g["bin"][s:e] = zbin + + for i in range(nbin): + sel = zbin == i + n_sel = int(sel.sum()) + if n_sel > 0: + c = bin_cursors[i] + subgroups[i]["ra"][c:c + n_sel] = ra[sel] + subgroups[i]["dec"][c:c + n_sel] = dec[sel] + subgroups[i]["z"][c:c + n_sel] = z[sel] + subgroups[i]["comoving_distance"][c:c + n_sel] = chi[sel] + bin_cursors[i] += n_sel + + self.subsample_randoms(binned_output, nbin) + + random_cats.close() + binned_output.close() + + def subsample_randoms(self, binned_output, nbin): + """Randomly subsample the binned random catalog and write binned_random_catalog_sub.""" + sample_rate = self.config["sample_rate"] + print(f"Sub-sampling randoms at rate {sample_rate}") + + binned_output_sub = self.open_output("binned_random_catalog_sub") + gb_sub = binned_output_sub.create_group("randoms") + gb_sub.attrs["nbin"] = nbin + + for j in range(nbin): + ra = binned_output[f"randoms/bin_{j}/ra"][:] + dec = binned_output[f"randoms/bin_{j}/dec"][:] + z = binned_output[f"randoms/bin_{j}/z"][:] + chi = binned_output[f"randoms/bin_{j}/comoving_distance"][:] + + ntotal = len(ra) + nsub = int(sample_rate * ntotal) + idx = np.random.choice(ntotal, size=nsub, replace=False) + + sg = gb_sub.create_group(f"bin_{j}") + sg.create_dataset("ra", data=ra[idx]) + sg.create_dataset("dec", data=dec[idx]) + sg.create_dataset("z", data=z[idx]) + sg.create_dataset("comoving_distance", data=chi[idx]) + + binned_output_sub.close() + + +class TXIngestDESI(PipelineStage): + """ + Ingest a DESI catalog (mock or real) + + This starts with the FITS file format. + """ + + name = "TXIngestDESI" + parallel = False + + inputs = [ + ("desi_catalog_selected", FitsFile), # from stage 1 +] + + outputs = [ + ("lens_catalog", HDFFile), + ("binned_lens_catalog", HDFFile), + ("lens_tomography_catalog_unweighted", HDFFile), + ("lens_tomography_catalog", HDFFile), + ("lens_photoz_stack", QPNOfZFile), + ] + + # TODO: CHANGE CONFIG OPTIONS + config_options = { + "lens_zbin_edges": StageParameter(list, [float], msg="Edges of lens redshift bins."), + "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk."), + "zmin": StageParameter(float, 0.0, msg="Minimum redshift for binning."), + "zmax": StageParameter(float, 3.0, msg="Maximum redshift for binning."), + "dz": StageParameter(float, 0.01, msg="Redshift bin width."), + #"bands": StageParameter(str, "grizy", msg="Bands to use for DESI selection."), + "mock": StageParameter(bool, False, msg="Whether to use a mock catalog."), + } + + + def run(self): + import qp + + # Count number of objects + f = self.open_input("desi_catalog_selected") + n = f[1].get_nrows() + f.close() + + chunk_rows = self.config["chunk_rows"] + # bands = self.config["bands"] + zbin_edges = self.config["lens_zbin_edges"] + nbin_lens = len(zbin_edges) - 1 + + cat = self.open_output("lens_catalog") + cat_binned = self.open_output("binned_lens_catalog") + tomo_uw = self.open_output("lens_tomography_catalog_unweighted") + tomo_w = self.open_output("lens_tomography_catalog") + + # redshift grid + zmin = self.config["zmin"] + zmax = self.config["zmax"] + dz = self.config["dz"] + z_grid = np.arange(zmin, zmax, dz) + nz_grid = np.zeros((nbin_lens + 1, z_grid.size)) + nz = len(z_grid) + + # Create space in outputs + g = cat.create_group("lens") + # g.attrs["bands"] = bands + g.create_dataset("ra", (n,), dtype=np.float64) + g.create_dataset("dec", (n,), dtype=np.float64) + g.create_dataset("z", (n,), dtype=np.float64) + # for b in bands: + # g.create_dataset(f"mag_{b}", (n,), dtype=np.float64) + # g.create_dataset(f"mag_err_{b}", (n,), dtype=np.float64) + # g.attrs["bands"] = bands + + gb = cat_binned.create_group("lens") + gb.attrs["nbin_lens"] = nbin_lens + for bn in range(nbin_lens): + gb_ = gb.create_group(f"bin_{bn}") + gb_.create_dataset("ra", (n,), dtype=np.float64) + gb_.create_dataset("dec", (n,), dtype=np.float64) + gb_.create_dataset("z", (n,), dtype=np.float64) + gb_.create_dataset("w_sys", (n,), dtype=np.float64) + + h_uw = tomo_uw.create_group("tomography") + h_uw.create_dataset("bin", (n,), dtype=np.int32) + h_uw.create_dataset("lens_weight", (n,), dtype=np.float64) + h_uw.attrs["nbin"] = nbin_lens + h_uw.attrs["zbin_edges"] = zbin_edges + h_counts_uw = tomo_uw.create_group("counts") + h_counts_uw.create_dataset("counts", (nbin_lens,), dtype="i") + h_counts_uw.create_dataset("counts_2d", (1,), dtype="i") + + h_w = tomo_w.create_group("tomography") + h_w.create_dataset("bin", (n,), dtype=np.int32) + h_w.create_dataset("lens_weight", (n,), dtype=np.float64) + h_w.attrs["nbin"] = nbin_lens + h_w.attrs["zbin_edges"] = zbin_edges + h_counts_w = tomo_w.create_group("counts") + h_counts_w.create_dataset("counts", (nbin_lens,), dtype="i") + h_counts_w.create_dataset("counts_2d", (1,), dtype="i") + + # we keep track of the counts per-bin also + counts = np.zeros(nbin_lens, dtype=np.int64) + counts_2d = 0 + + # all cols that might be useful + cols = ["ra", "dec", "redshift", "weight"] + + for s, e, data in self.iterate_fits("desi_catalog_selected", 1, cols, chunk_rows): + n = data["ra"].size + z = data["redshift"] + # Unit weight still + weight = np.repeat(1.0, n) + + # work out the redshift bin for each object, if any. + # do any other selection here + zbin = np.digitize(z, zbin_edges) - 1 + zbin[zbin == nbin_lens] = -1 + + # # can select on any other criterion here, e.g. + # # mag or chisq. This is an example + # sel = data["chisq"] < 10 + # # deselect these objects + # zbin[~sel] = -1 + + # Build up the count of the n(z) histograms per-bin + z_grid_index = np.floor((z - zmin) / dz).astype(int) + for i, (i_z, b) in enumerate(zip(z_grid_index, zbin)): + if b >= 0: + nz_grid[b, i_z] += weight[i] + + # Build up the counts + any_bin = zbin >= 0 + counts += np.bincount(zbin[any_bin], minlength=nbin_lens) + counts_2d += any_bin.sum() + + # save data to tomography catalog + g["ra"][s:e] = data["ra"] + g["dec"][s:e] = data["dec"] + g["z"][s:e] = data["redshift"] + + # # including mags + # for i, b in enumerate(bands): + # g[f"mag_{b}"][s:e] = data["mag"][:, i] + # g[f"mag_err_{b}"][s:e] = data["mag_err"][:, i] + + h_uw["bin"][s:e] = zbin + h_uw["lens_weight"][s:e] = 1.0 + + h_w["bin"][s:e] = zbin + if self.config["mock"]: + h_w["lens_weight"][s:e] = 1.0 + else: + h_w["lens_weight"][s:e] = data["weight"] + + # save data to binned lens catalog + for bn in range(nbin_lens): + sel = h_w["bin"][:] == bn + gb_ = gb[f"bin_{bn}"] + gb_["ra"][:] = g["ra"][:][sel] + gb_["dec"][:] = g["dec"][:][sel] + gb_["z"][:] = g["z"][:][sel] + gb_["w_sys"][:] = h_w["lens_weight"][:][sel] + + # this is an overall count + h_counts_uw["counts"][:] = counts + h_counts_uw["counts_2d"][:] = counts_2d + h_counts_w["counts"][:] = counts + h_counts_w["counts_2d"][:] = counts_2d + + # Generate and save the 2D n(z) histogram also, just + # by summing up all the individual values. + nz_grid[-1] = nz_grid[:-1].sum(axis=0) + + stack_object = qp.Ensemble(qp.hist, data={"bins": z_grid, "pdfs": nz_grid[:, :-1]}) + with self.open_output("lens_photoz_stack", wrapper=True) as stack: + stack.write_ensemble(stack_object) \ No newline at end of file diff --git a/txpipe/ingest/dp1.py b/txpipe/ingest/dp1.py index d0c11abfb..f929f5dde 100644 --- a/txpipe/ingest/dp1.py +++ b/txpipe/ingest/dp1.py @@ -334,6 +334,10 @@ def get_catalog_size(self, butler, dataset_type): return n +class TXIngestDataPreview2(TXIngestDataPreview1): + name = "TXIngestDataPreview2" + + def sanitize(data): """ Convert unicode arrays into types that h5py can save diff --git a/txpipe/lens_selector.py b/txpipe/lens_selector.py index 0c264ec92..32c242a2b 100755 --- a/txpipe/lens_selector.py +++ b/txpipe/lens_selector.py @@ -8,6 +8,8 @@ PhotometryCatalog, FitsFile, MapsFile, + ParquetFile, + DataFile ) from .utils import LensNumberDensityStats, Splitter, rename_iterated from .binning import build_tomographic_classifier, apply_classifier, read_training_data @@ -289,7 +291,7 @@ def select_lens_boss(self, phot_data): lens_gals[lens_mask] = 1 return lens_gals - + def select_lens_maglim(self, phot_data): band = self.config["maglim_band"] limit = self.config["maglim_limit"] @@ -608,7 +610,7 @@ def run(self): extra_cols = [c for c in self.config["extra_cols"] if c] # Regular columns. - cols = ["ra", "dec", "weight", "comoving_distance"] + cols = ["ra", "dec", "weight", "comoving_distance", "z"] # Object we use to make the separate lens bins catalog cat_output = self.open_output(self.get_binned_lens_name(), parallel=True) @@ -683,6 +685,7 @@ def add_redshifts(self, iterator): d = np.zeros(len(a)) d[z > 0] = pyccl.comoving_radial_distance(cosmo, a[z > 0]) data["comoving_distance"] = d + data["z"] = z yield s, e, data @@ -805,6 +808,166 @@ def data_iterator(self): return self.add_redshifts(iterator) +class TXDESISelector(PipelineStage): + """ + Select lens galaxies from a DESI spectroscopic FITS catalog. + + Reads a raw DESI FITS catalog and applies: + TODO: ADJUST NECESSARY CUTS FOR REAL DATA + - Redshift range cut + - Optional survey mask + + Outputs a standardized FITS file consumed by TXIngestDESI. + """ + + name = "TXDESISelector" + parallel = False + + outputs = [ + ("desi_catalog_selected", FitsFile), + ] + + config_options = { + "chunk_rows": StageParameter(int, 100_000, msg="Rows per chunk."), + "zmin": StageParameter(float, 0.0, msg="Minimum spectroscopic redshift."), + "zmax": StageParameter(float, 3.0, msg="Maximum spectroscopic redshift."), + "mag_band": StageParameter(str, "r", msg="Band to apply magnitude limits in."), + "apply_mask": StageParameter(bool, False, msg="Apply survey mask from the mask input."), + "ra_col": StageParameter(str, "ra", msg="RA column name in the DESI catalog."), + "dec_col": StageParameter(str, "dec", msg="Dec column name in the DESI catalog."), + "z_col": StageParameter(str, "redshift", msg="Spectroscopic redshift column name."), + "w_col": StageParameter(str, "weight", msg="Weight column name."), + "bands": StageParameter(str, "", msg="Band labels for output magnitudes (one char each)."), + } + + inputs = [ + ("desi_catalog", FitsFile), + ("mask", MapsFile), + ] + + def run(self): + self.bands = self.config["bands"] + ra_col = self.config["ra_col"] + dec_col = self.config["dec_col"] + z_col = self.config["z_col"] + w_col = self.config["w_col"] + + if self.config["apply_mask"]: + with self.open_input("mask", wrapper=True) as f: + self.mask = f.read_mask("mask") + self.mask_nside = f.read_map_info("mask")["nside"] + # self.mask_fmt = f.read_map_info("mask")["nest"] + # print(self.mask_fmt) + else: + self.mask = self.mask_nside = None + + if len(self.bands) == 0: + out_cols = {name: [] for name in ["ra", "dec", "redshift", "weight"]} + else: + out_cols = {name: [] for name in ["ra", "dec", "redshift", "weight"] + [f"mag_{b}" for b in self.bands]} + + n_total = 0 + n_selected = 0 + + for s, e, data in self.data_iterator(): + sel = self._apply_cuts(data) + n_chunk = int(sel.sum()) + n_total += data[ra_col].size + n_selected += n_chunk + + print(f"Rows {s:,}-{e:,}: selected {n_chunk:,} / {data[ra_col].size:,}") + + if n_chunk == 0: + continue + + out_cols["ra"].append(data[ra_col][sel]) + out_cols["dec"].append(data[dec_col][sel]) + out_cols["redshift"].append(data[z_col][sel]) + out_cols["weight"].append(self._get_weights(data, sel, n_chunk, w_col)) + + if len(self.bands) > 0: + for b in self.bands: + out_cols[f"mag_{b}"].append(data[f"mag_{b}_lsst"][sel]) + + print(f"Total selected: {n_selected:,} / {n_total:,}") + + from astropy.table import Table + table = Table({name: np.concatenate(chunks) for name, chunks in out_cols.items()}) + table.write(self.get_output("desi_catalog_selected"), overwrite=True) + + def _get_weights(self, data, sel, n_selected, w_col): + return data[w_col][sel] + + def data_iterator(self): + chunk_rows = self.config["chunk_rows"] + ra_col = self.config["ra_col"] + dec_col = self.config["dec_col"] + z_col = self.config["z_col"] + w_col = self.config["w_col"] + cols = [ra_col, dec_col, z_col, w_col] + yield from self.iterate_fits("desi_catalog", 1, cols, chunk_rows) + + def _apply_cuts(self, data): + ra_col = self.config["ra_col"] + dec_col = self.config["dec_col"] + z_col = self.config["z_col"] + + n = data[ra_col].size + sel = np.ones(n, dtype=bool) + + z = data[z_col] + sel &= (z >= self.config["zmin"]) & (z < self.config["zmax"]) + + if self.mask is not None: + import healpy as hp + pix = hp.ang2pix(self.mask_nside, data[ra_col], data[dec_col], lonlat=True, nest=True) + sel &= self.mask[pix] != hp.UNSEEN + + return sel + + +class TXDESIMockSelector(TXDESISelector): + """ + Select lens galaxies from a DESI mock Parquet catalog. + + Identical cuts to TXDESISelector but reads a Parquet file and assigns + unit weights (mocks carry no per-object weight column). + """ + + name = "TXDESIMockSelector" + + inputs = [ + ("desi_catalog", ParquetFile), + ("mask", MapsFile), + ] + + config_options = { + **TXDESISelector.config_options, + } + # w_col is unused for mocks; keep it in config_options for compatibility + # but override _get_weights to return ones instead. + + def _get_weights(self, data, sel, n_selected, w_col): + return np.ones(n_selected) + + def data_iterator(self): + chunk_rows = self.config["chunk_rows"] + ra_col = self.config["ra_col"] + dec_col = self.config["dec_col"] + z_col = self.config["z_col"] + cols = [ra_col, dec_col, z_col] + [f"mag_{b}_lsst" for b in self.bands] + + from pyarrow.parquet import ParquetFile as PQFile + fn = self.get_input("desi_catalog") + start = 0 + with PQFile(fn) as f: + for batch in f.iter_batches(columns=cols, batch_size=chunk_rows): + data = {name: batch[name].to_numpy(zero_copy_only=False) for name in cols} + n = len(data[cols[0]]) + end = start + n + yield start, end, data + start = end + if __name__ == "__main__": PipelineStage.main() diff --git a/txpipe/masks.py b/txpipe/masks.py index 10e53d72e..be8804f9c 100644 --- a/txpipe/masks.py +++ b/txpipe/masks.py @@ -1,7 +1,7 @@ import numpy as np from .utils import choose_pixelization from .base_stage import PipelineStage -from .data_types import MapsFile +from .data_types import MapsFile, HDFFile from ceci.config import StageParameter from .mapping import degrade_healsparse @@ -482,3 +482,168 @@ def degrade(self, mask, metadata_in, nside_out): assert np.round(area_in, 3) == np.round(area_out, 3) return mask_out, metadata_out + +class TXJointMask(TXBaseMask): + """ + Combine two binary masks into one using AND intersection. + + Currently only supports binary masks. + + Subclasses can override `mask_input_names` to rename the two input tags + without duplicating any logic. + """ + + name = "TXJointMask" + inputs = [ + ("mask1", MapsFile), + ("mask2", MapsFile), + ] + outputs = [("mask", MapsFile)] + + # Subclasses override this tuple to change which input tags are opened. + mask_input_names = ("mask1", "mask2") + + def make_binary_mask(self): + """ + Load the two input masks and return their AND intersection. + + TODO: make this more flexible to allow for different types of masks (e.g. fractional coverage maps) and different combination logic (e.g. OR, product, etc.). Currently, we assume shear mask is in healsparse format and lens mask is in HDF5. + + Returns + ------- + mask : hsp.HealSparseMap + Combined boolean mask. + pixel_scheme : PixelScheme + Pixelization object taken from the first mask's metadata. + metadata : dict + Metadata from the first input mask. + """ + import healsparse as hsp + + name1, name2 = self.mask_input_names + + with self.open_input(name1, wrapper=True) as f: + metadata = dict(f.file["maps"].attrs) + nside1 = metadata["nside"] + pixel_scheme1 = choose_pixelization(**metadata) + mask1 = f.read_mask("mask", returnbool=True) + + with self.open_input(name2, wrapper=True) as f: + metadata = dict(f.file["maps"].attrs) + nside2 = metadata["nside"] + pixel_scheme2 = choose_pixelization(**metadata) + mask2 = f.read_mask("mask", returnbool=True) + + if nside1 != nside2: + if nside1 < nside2: + print(f"Degrading {name2} from Nside {nside2} to {nside1}") + mask2 = mask2.degrade(nside1) + elif nside2 < nside1: + print(f"Degrading {name1} from Nside {nside1} to {nside2}") + mask1 = mask1.degrade(nside2) + mask = hsp.operations.product_intersection([mask1, mask2]) + else: + mask = hsp.operations.product_intersection([mask1, mask2]) + nside_out = mask.nside_sparse + metadata["nside"] = nside_out + pixel_scheme = choose_pixelization(pixelization="healpix", nside=nside_out, nest=True) + + return mask, pixel_scheme, metadata + + +class TXDESIJointMask(TXJointMask): + """ + Combine the DESI lens mask and the shear catalog mask using AND intersection. + """ + + name = "TXDESIJointMask" + inputs = [ + ("desi_mask", MapsFile), + ("shear_mask", MapsFile), + ] + outputs = [("mask", MapsFile)] + + mask_input_names = ("desi_mask", "shear_mask") + + +class TXCutCatalog(PipelineStage): + """ + Cut a catalog to the footprint defined by an input mask, and write out the cut catalog. + + Subclasses can override `catalog_input_name`, `catalog_output_name`, and the + `config_options` defaults to handle different catalog types. + """ + + name = "TXCutCatalog" + inputs = [ + ("catalog", HDFFile), + ("mask", MapsFile), + ] + outputs = [("cut_catalog", HDFFile)] + config_options = { + "chunk_rows": StageParameter(int, 100000, msg="Number of rows to read per chunk."), + "catalog_group": StageParameter(str, "catalog", msg="HDF5 group name in the input catalog."), + "ra_col": StageParameter(str, "ra", msg="RA column name."), + "dec_col": StageParameter(str, "dec", msg="Dec column name."), + } + + catalog_input_name = "catalog" + catalog_output_name = "cut_catalog" + + def run(self): + import healpy as hp + import h5py + + with self.open_input("mask", wrapper=True) as f: + mask = f.read_mask("mask", returnbool=True) + nside = f.read_map_info("mask")["nside"] + + group_name = self.config["catalog_group"] + ra_col = self.config["ra_col"] + dec_col = self.config["dec_col"] + chunk_rows = self.config["chunk_rows"] + + in_path = self.get_input(self.catalog_input_name) + out_path = self.get_output(self.catalog_output_name) + + with h5py.File(in_path, "r") as f_in, h5py.File(out_path, "w") as f_out: + g_in = f_in[group_name] + n_total = g_in[ra_col].size + + # First pass: collect indices of objects inside the mask footprint + selected = [] + for start in range(0, n_total, chunk_rows): + end = min(start + chunk_rows, n_total) + ra = g_in[ra_col][start:end] + dec = g_in[dec_col][start:end] + pix = hp.ang2pix(nside, ra, dec, lonlat=True, nest=True) + sel = mask[pix].astype(bool) + selected.append(np.where(sel)[0] + start) + + selected = np.concatenate(selected) + print(f"Selected {len(selected)} / {n_total} objects inside mask") + + # Write selected rows for every column in the group + g_out = f_out.create_group(group_name) + for col in g_in.keys(): + g_out.create_dataset(col, data=g_in[col][selected]) + + +class TXCutShearCatalog(TXCutCatalog): + """ + Cut a shear catalog to the footprint defined by an input mask. + """ + + name = "TXCutShearCatalog" + inputs = [ + ("shear_catalog", HDFFile), + ("mask", MapsFile), + ] + outputs = [("cut_shear_catalog", HDFFile)] + config_options = { + **TXCutCatalog.config_options, + "catalog_group": StageParameter(str, "shear", msg="HDF5 group name in the input shear catalog."), + } + + catalog_input_name = "shear_catalog" + catalog_output_name = "cut_shear_catalog" \ No newline at end of file diff --git a/txpipe/twopoint.py b/txpipe/twopoint.py index 6c223ac34..f92164a24 100755 --- a/txpipe/twopoint.py +++ b/txpipe/twopoint.py @@ -81,7 +81,7 @@ class TXTwoPoint(PipelineStage): "use_randoms": StageParameter(bool, True, msg="Whether to use random catalogs"), "low_mem": StageParameter(bool, False, msg="Whether to use low memory mode"), "patch_dir": StageParameter(str, "./cache/patches", msg="Directory for storing patch files"), - "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk"), + "chunk_rows": StageParameter(int, 100_000, msg="Number of rows to process in each chunk when making patches"), "share_patch_files": StageParameter(bool, False, msg="Whether to share patch files across processes"), "metric": StageParameter(str, "Euclidean", msg="Distance metric to use (Euclidean, Arc, etc.)"), "gaussian_sims_factor": StageParameter( @@ -423,7 +423,7 @@ def write_output(self, source_list, lens_list, meta, results): if self.config["do_shear_pos"] == True: S2.add_tracer("NZ", f"source_{i}", z, Nz) else: - sys.exit("Requesting a measurement that requires source galaxies but no source_list provided") + raise ValueError("Requesting a measurement that requires source galaxies but no source_list provided") if self.config["do_pos_pos"] or self.config["do_shear_pos"]: if lens_list: @@ -435,7 +435,7 @@ def write_output(self, source_list, lens_list, meta, results): if self.config["do_shear_pos"] == True: S2.add_tracer("NZ", f"lens_{i}", z, Nz) else: - sys.exit("Requesting a measurement that requires lens galaxies but no lens_list provided") + raise ValueError("Requesting a measurement that requires lens galaxies but no lens_list provided") # Now build up the collection of data points, adding them all to # the sacc data one by one. @@ -466,26 +466,8 @@ def write_output(self, source_list, lens_list, meta, results): S2.save_fits(self.get_output("twopoint_gamma_x"), overwrite=True) def write_metadata(self, S, meta): - # We also save the associated metadata to the file - for k, v in meta.items(): - if np.isscalar(v): - S.metadata[k] = v - else: - for i, vi in enumerate(v): - S.metadata[f"{k}_{i}"] = vi - - # Add provenance metadata. In managed formats this is done - # automatically, but because the Sacc library is external - # we do it manually here. provenance = self.gather_provenance() - provenance.update(SACCFile.generate_provenance()) - for key, value in provenance.items(): - if isinstance(value, str) and "\n" in value: - values = value.split("\n") - for i, v in enumerate(values): - S.metadata[f"provenance/{key}_{i}"] = v - else: - S.metadata[f"provenance/{key}"] = value + SACCFile.add_metadata(S, provenance, meta) def call_treecorr(self, i, j, k): """ diff --git a/txpipe/utils/blank.py b/txpipe/utils/blank.py new file mode 100644 index 000000000..b3281116c --- /dev/null +++ b/txpipe/utils/blank.py @@ -0,0 +1,3 @@ +# This file is intentionally left blank. +# As part of a workaround in TreeCorr we use it as a namespace to +# store things needed to modify the weights in the DeltaSigma calculation. \ No newline at end of file