diff --git a/bin/cut_out_example.py b/bin/cut_out_example.py index e81c8d40b..6d28146f7 100755 --- a/bin/cut_out_example.py +++ b/bin/cut_out_example.py @@ -10,34 +10,41 @@ def trim_file(input_file, output_file, group_name, thin): + if isinstance(group_name, str): + group_name = [group_name] file_in = h5py.File(input_file, 'r') file_out = h5py.File(output_file, 'w') - group_in = file_in[group_name] - group_out = file_out.create_group(group_name) file_in.copy('provenance', file_out) - ra = group_in['ra'][:] - print("read RA", ra.size) - dec = group_in['dec'][:] - print("read dec") - w = ra > ra_min - w &= ra < ra_max - w &= dec > dec_min - w &= dec < dec_max - del ra, dec - w[np.arange(w.size) % thin > 0] = False + for group_name in group_name: + group_in = file_in[group_name] + group_out = file_out.create_group(group_name) - print('Output count:', w.sum()) + ra = group_in['ra'][:] + print("read group", group_name, "size:", ra.size) + dec = group_in['dec'][:] + w = ra > ra_min + w &= ra < ra_max + w &= dec > dec_min + w &= dec < dec_max + del ra, dec - for col in group_in.keys(): - print("Copying", col) - d = group_in[col][w] - group_out.create_dataset(col, data=d) + if thin > 1: + w[np.arange(w.size) % thin > 0] = False + + print('Output count:', w.sum()) + + for col in group_in.keys(): + print("Copying", col) + d = group_in[col][w] + group_out.create_dataset(col, data=d) if __name__ == '__main__': - trim_file('photometry_catalog.hdf5', 'example_photometry_catalog.hdf5', 'photometry', 5) - trim_file('shear_catalog.hdf5', 'example_shear_catalog.hdf5', 'photometry', 5) - trim_file('star_catalog.hdf5', 'example_star_catalog.hdf5', 'stars', 5) + trim_file('photometry_catalog.hdf5', 'example_photometry_catalog.hdf5', 'photometry', 1) + + shear_groups = ["shear/00", "shear/1p", "shear/1m", "shear/2p", "shear/2m"] + trim_file('shear_catalog.hdf5', 'example_shear_catalog.hdf5', shear_groups, 1) + # trim_file('star_catalog.hdf5', 'example_star_catalog.hdf5', 'stars', 5) diff --git a/bin/get_des_y6_response.py b/bin/get_des_y6_response.py new file mode 100644 index 000000000..1662aae68 --- /dev/null +++ b/bin/get_des_y6_response.py @@ -0,0 +1,63 @@ +from parallel_statistics import ParallelMean +import h5py +import os + +params = [ + "gauss_g_1", + "gauss_g_2", + "gauss_s2n", + "pgauss_band_flux_g_nodered", + "pgauss_band_flux_r_nodered", + "pgauss_band_flux_i_nodered", + "pgauss_band_flux_z_nodered", + "pgauss_T", +] + +nparam = len(params) +variants = [ + "noshear", + "1p", + "1m", + "2p", + "2m", +] + +stats = { + v: ParallelMean(nparam) for v in variants +} + +results = {} +delta_gamma = 0.02 +chunk_size = 1_000_000 + +cfs_dir = os.environ["CFS"] +filename = os.path.join(cfs_dir, "des", "y6kp-cats", "final_version", "metadetect_notomo2024-11-07.hdf5") + +with h5py.File(filename) as f: + for v in variants: + stat = stats[v] + s = 0 + e = 0 + ntot = f[f"{v}/ra"].shape[0] + while e < ntot: + s = e + e = min(s + chunk_size, ntot) + # print(f"Processing {v} {s}:{e} / {ntot}") + data = {p: f[f"{v}/{p}"][s:e] for p in params} + flag = [f"{v}/mdet_flags"][s:e] + sel = flag == 0 + for i, p in enumerate(params): + stat.add_data(i, data[p][sel]) + _, mu = stat.collect() + print(f"Variant {v}:") + for i, p in enumerate(params): + print(f" {p}: {mu[i]}") + print("") + results[v] = mu + +print("") +for i, p in enumerate(params): + dp_dg1 = (results["1p"][i] - results["1m"][i]) / delta_gamma + dp_dg2 = (results["2p"][i] - results["2m"][i]) / delta_gamma + print(f"d({p}) / dg1: {dp_dg1}") + print(f"d({p}) / dg2: {dp_dg2}") diff --git a/examples/cosmodc2/config.yml b/examples/cosmodc2/config.yml index c4fc76644..da1582a86 100644 --- a/examples/cosmodc2/config.yml +++ b/examples/cosmodc2/config.yml @@ -2,18 +2,20 @@ global: pixelization: healpix chunk_rows: 1000000 chunk_size: 200000 + nside: 512 + ell_max: 1536 TXLSSWeightsUnit: - nside: 2048 pixelization: healpix TXDiagnosticQuantiles: - psf_prefix: mcal_psf_ - shear_prefix: mcal_ + psf_prefix: psf_ + shear_prefix: "" nbins: 20 TXShearCalibration: - shear_prefix: mcal_ + shear_prefix: "" + TXCosmoDC2Mock: cat_name: cosmoDC2_v1.1.4_image @@ -43,22 +45,18 @@ TXLensTrueNumberDensity: TXLensMaps: pixelization: healpix - nside: 2048 sparse: true TXSourceMaps: - nside: 2048 sparse: true pixelization: healpix true_shear: false TXExternalLensMaps: - nside: 2048 sparse: true pixelization: healpix TXExternalLensNoiseMaps: - nside: 2048 pixelization: healpix TXAuxiliarySourceMaps: @@ -67,7 +65,6 @@ TXAuxiliarySourceMaps: TXAuxiliaryLensMaps: sparse: true - nside: 2048 pixelization: healpix bright_obj_threshold: 22.0 @@ -75,19 +72,11 @@ TXSimpleMask: depth_cut: 23.0 bright_object_max: 10.0 - - -TXSourceSelectorMetacal: - input_pz: false +TXSourceTomography: bands: riz #used for selection - T_cut: 0.5 - s2n_cut: 10.0 - max_rows: 1000 - delta_gamma: 0.02 + random_seed: 123213 source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins - true_z: false - shear_prefix: mcal_ TXSourceSelectorMetadetect: input_pz: false @@ -96,10 +85,10 @@ TXSourceSelectorMetadetect: s2n_cut: 10.0 max_rows: 1000 delta_gamma: 0.02 - source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins - # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins true_z: false shear_prefix: '' + source_zbin_edges: [0.19285902, 0.40831394, 0.65503818, 0.94499109, 1.2947086, 1.72779632, 2.27855242, 3.] # 7 bins + # source_zbin_edges: [0.25588604, 0.55455363, 0.91863365, 1.38232001, 2.] # 4 bins TXRandomCat: @@ -109,15 +98,16 @@ TXJackknifeCenters: npatch: 40 TXSourceDiagnosticPlots: - shear_prefix: mcal_ + shear_prefix: "00/" + psf_prefix: "00/psf_" TXFourierGaussianCovariance: galaxy_bias: [1.404, 1.458, 1.693, 1.922, 2.133] # Tinker bias values - cache_dir: ./cache_nmt/cosmodc2/nside2048/ + cache_dir: ./cache_nmt/cosmodc2/nside512/ TXFourierTJPCovariance: galaxy_bias: [1.404, 1.458, 1.693, 1.922, 2.133] # Tinker bias values - cache_dir: ./cache_nmt/cosmodc2/nside2048/ + cache_dir: ./cache_nmt/cosmodc2/nside512/ IA: 0. TXRealGaussianCovariance: @@ -131,11 +121,9 @@ TXTwoPointFourier: flip_g1: true flip_g2: true apodization_size: 0.0 - cache_dir: ./cache_nmt/cosmodc2/nside2048/ + cache_dir: ./cache_nmt/cosmodc2/nside512/ true_shear: false n_ell: 30 - ell_max: 6144 # nside * 3 , since Namaster computes that anyway. - nside: 2048 analytic_noise: true TXTwoPoint: @@ -179,6 +167,21 @@ FlowCreator: n_samples: 1000000 seed: 5763248 +LSSTErrorModel: + renameDict: {u: mag_u_lsst, g: mag_g_lsst, r: mag_r_lsst, i: mag_i_lsst, z: mag_z_lsst, "y": mag_y_lsst} + seed: 29 + +InvRedshiftIncompleteness: + pivot_redshift: 1.0 + +LineConfusion: + true_wavelen: 5007.0 + wrong_wavelen: 3727.0 + frac_wrong: 0.05 + +QuantityCut: + cuts: {mag_i_lsst: 25.0} + GridSelection: redshift_cut: 5.1 diff --git a/examples/cosmodc2/pipeline.yml b/examples/cosmodc2/pipeline.yml index d12512ca8..03f6e9809 100644 --- a/examples/cosmodc2/pipeline.yml +++ b/examples/cosmodc2/pipeline.yml @@ -3,12 +3,14 @@ launcher: interval: 3.0 site: - name: nersc-interactive + name: local modules: > txpipe - rail.creation.degraders.grid_selection + rail.creation.degraders.photometric_errors + rail.creation.degraders.quantityCut + rail.creation.degraders.spectroscopic_degraders rail.creation.engines.flowEngine rail.estimation.algos.nz_dir rail.estimation.algos.bpz_lite @@ -22,18 +24,32 @@ stages: aliases: output: ideal_specz_catalog model: flow - - name: GridSelection + - name: LSSTErrorModel aliases: input: ideal_specz_catalog - output: specz_catalog_pq + output: noisy_specz_catalog + - name: InvRedshiftIncompleteness + aliases: + input: noisy_specz_catalog + output: incomplete_specz_catalog + - name: LineConfusion + aliases: + input: incomplete_specz_catalog + output: confused_specz_catalog + - name: QuantityCut + aliases: + input: confused_specz_catalog + output: final_specz_catalog - name: TXParqetToHDF aliases: - input: specz_catalog_pq + input: final_specz_catalog output: spectroscopic_catalog - - name: TXSourceSelectorMetacal - nprocess: 32 + - name: TXSourceTomography + threads_per_process: 8 + - name: TXSourceSelectorMetadetect + # nprocess: 32 - name: TXShearCalibration - nprocess: 32 + # nprocess: 32 - name: PZPrepareEstimatorLens # Prepare the p(z) estimator classname: BPZliteInformer aliases: @@ -42,7 +58,8 @@ stages: - name: PZEstimatorLens classname: BPZliteEstimator - nprocess: 32 + # nprocess: 256 + # nodes: 2 aliases: model: lens_photoz_model input: photometry_catalog @@ -81,10 +98,10 @@ stages: - name: TXRandomCat - nprocess: 16 + # nprocess: 16 - name: TXLSSWeightsUnit - name: TXRandomForestLensSelector - nprocess: 32 + # nprocess: 32 - name: TXPhotozPlotSource classname: TXPhotozPlot aliases: @@ -96,59 +113,59 @@ stages: photoz_stack: lens_photoz_stack nz_plot: lens_nz - name: TXLensCatalogSplitter - nprocess: 32 + # nprocess: 32 - name: TXTwoPointFourier - nprocess: 8 - nodes: 2 - threads_per_process: 32 + # nprocess: 8 + # nodes: 2 + # threads_per_process: 32 - name: TXTwoPointTheoryFourier - name: TXTwoPointPlotsFourier - name: TXJackknifeCenters - name: TXSourceMaps - nprocess: 8 + # nprocess: 8 - name: TXLensMaps - nprocess: 8 + # nprocess: 8 - name: TXAuxiliarySourceMaps - nprocess: 8 + # nprocess: 8 - name: TXAuxiliaryLensMaps - nprocess: 8 + # nprocess: 8 - name: TXDensityMaps - name: TXSourceNoiseMaps - nprocess: 4 - nodes: 1 - threads_per_process: 1 + # nprocess: 4 + # nodes: 1 + # threads_per_process: 1 - name: TXLensNoiseMaps - nprocess: 4 - nodes: 1 - threads_per_process: 1 + # nprocess: 4 + # nodes: 1 + # threads_per_process: 1 - name: TXSimpleMask - name: TXMapPlots - name: TXTracerMetadata - name: TXNullBlinding - name: TXTwoPoint - threads_per_process: 32 - nprocess: 8 - nodes: 2 + # threads_per_process: 32 + # nprocess: 8 + # nodes: 2 - name: TXTwoPointPlotsTheory - name: TXDiagnosticQuantiles - nodes: 1 - nprocess: 16 + # nodes: 1 + # nprocess: 16 - name: TXLensDiagnosticPlots - nprocess: 16 - nodes: 1 + # nprocess: 16 + # nodes: 1 - name: TXSourceDiagnosticPlots - nprocess: 16 - nodes: 1 + # nprocess: 16 + # nodes: 1 - name: TXFourierGaussianCovariance - threads_per_process: 32 + # threads_per_process: 32 - name: TXTwoPointTheoryReal - name: TXRealGaussianCovariance - threads_per_process: 32 + # threads_per_process: 32 - name: TXConvergenceMaps - threads_per_process: 32 + # threads_per_process: 32 - name: TXConvergenceMapPlots -output_dir: data/cosmodc2/outputs +output_dir: data/cosmodc2/cutout-outputs config: examples/cosmodc2/config.yml # On NERSC, set this before running: @@ -156,11 +173,9 @@ config: examples/cosmodc2/config.yml inputs: # See README for paths to download these files - shear_catalog: data/cosmodc2/inputs/shear_catalog.hdf5 - photometry_catalog: /global/cfs/cdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/photometry_catalog.hdf5 - photoz_trained_model: /global/cfs/cdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/cosmoDC2_trees_i25.3.npy + shear_catalog: data/cosmodc2/cutout/example_shear_catalog.hdf5 + photometry_catalog: data/cosmodc2/cutout/example_photometry_catalog.hdf5 fiducial_cosmology: data/fiducial_cosmology.yml - calibration_table: /global/cfs/cdirs/lsst/groups/WL/users/zuntz/data/cosmoDC2-1.1.4_oneyear_unit_response/sample_cosmodc2_w10year_errors.dat none: none flow: data/example/inputs/example_flow.pkl diff --git a/examples/metadetect/config.yml b/examples/metadetect/config.yml index 187941bf7..9079e6775 100755 --- a/examples/metadetect/config.yml +++ b/examples/metadetect/config.yml @@ -128,9 +128,14 @@ TXTrueNumberDensity: nz: 301 zmax: 3.0 +TXSourceTomography: + source_zbin_edges: [0.5, 0.7, 0.9, 1.1, 2.0] + bands: riz + random_seed: 6789 + TXSourceSelectorMetadetect: input_pz: false - true_z: true + true_z: false bands: riz #used for selection T_cut: 0.5 s2n_cut: 10.0 @@ -210,12 +215,12 @@ TXBlinding: delete_unblinded: true TXDiagnosticQuantiles: - psf_prefix: 00/mcal_psf_ + psf_prefix: 00/psf_ shear_prefix: 00/ nbins: 20 TXSourceDiagnosticPlots: - psf_prefix: 00/mcal_psf_ + psf_prefix: 00/psf_ shear_prefix: 00/ nbins: 20 g_min: -0.01 diff --git a/examples/metadetect/pipeline.yml b/examples/metadetect/pipeline.yml index 41b817eca..1875fd3f3 100644 --- a/examples/metadetect/pipeline.yml +++ b/examples/metadetect/pipeline.yml @@ -46,6 +46,8 @@ stages: model: source_direct_calibration_model photoz_stack: shear_photoz_stack photoz_realizations: source_photoz_realizations + - name: TXSourceTomography + # threads_per_process: 8 - name: TXSourceSelectorMetadetect # select and split objects into source bins - name: NZDirInformerSource # Prepare the DIR method inputs for the source sample classname: NZDirInformer @@ -148,7 +150,7 @@ python_paths: - submodules/pyfsb # Where to put outputs -output_dir: data/example/outputs_metadetect +output_dir: data/cosmodc2-1deg2/outputs # How to run the pipeline: mini, parsl, or cwl launcher: @@ -168,8 +170,8 @@ config: examples/metadetect/config.yml inputs: # See README for paths to download these files - shear_catalog: data/example/inputs/metadetect_shear_catalog.hdf5 - photometry_catalog: data/example/inputs/photometry_catalog.hdf5 + shear_catalog: data/cosmodc2-1deg2/inputs/shear_catalog.hdf5 + photometry_catalog: data/cosmodc2-1deg2/inputs/photometry_catalog.hdf5 exposures: data/example/inputs/exposures.hdf5 star_catalog: data/example/inputs/star_catalog.hdf5 # This file comes with the code diff --git a/txpipe/base_stage.py b/txpipe/base_stage.py index e60fcef75..8cb7dccbc 100755 --- a/txpipe/base_stage.py +++ b/txpipe/base_stage.py @@ -71,7 +71,7 @@ def memory_report(self, tag=None): print(f"{t}: Process {self.rank}:{tag} Remaining memory on {host} {avail:.1f} GB / {total:.1f} GB") sys.stdout.flush() - def combined_iterators(self, rows, *inputs, parallel=True): + def combined_iterators(self, rows, *inputs, parallel=True, longest=False): """ Iterate through multiple files at the same time. @@ -110,7 +110,7 @@ def combined_iterators(self, rows, *inputs, parallel=True): tag = inputs[3 * i] section = inputs[3 * i + 1] cols = inputs[3 * i + 2] - iterators.append(self.iterate_hdf(tag, section, cols, rows, parallel=parallel)) + iterators.append(self.iterate_hdf(tag, section, cols, rows, parallel=parallel, longest=longest)) for it in zip(*iterators): data = {} diff --git a/txpipe/binning/random_forest.py b/txpipe/binning/random_forest.py index bc1aa0ee1..511f193d9 100644 --- a/txpipe/binning/random_forest.py +++ b/txpipe/binning/random_forest.py @@ -113,7 +113,11 @@ def apply_classifier(classifier, features, bands, shear_catalog_type, shear_data data = np.array(data).T # Run the random forest on this data chunk - pz_data[f"{prefix}zbin{suffix}"] = classifier.predict(data) + if data.size == 0: + bins = np.array([], dtype="i") + else: + bins = classifier.predict(data) + pz_data[f"{prefix}zbin{suffix}"] = bins if shear_catalog_type == "metacal": pz_data[f"zbin{suffix}"] = pz_data[f"{prefix}zbin{suffix}"] return pz_data diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 201fa26d5..425fc2fbf 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -42,8 +42,8 @@ class TXDiagnosticQuantiles(PipelineStage): ("shear_catalog_quantiles", HDFFile), ] config_options = { - "shear_prefix": StageParameter(str, "mcal_", msg="Prefix for shear columns in the catalog."), - "psf_prefix": StageParameter(str, "mcal_psf_", msg="Prefix for PSF columns in the catalog."), + "shear_prefix": StageParameter(str, "", msg="Prefix for shear columns in the catalog."), + "psf_prefix": StageParameter(str, "", msg="Prefix for PSF columns in the catalog."), "nbins": StageParameter(int, 20, msg="Number of quantile bins to compute."), "chunk_rows": StageParameter(int, 0, msg="Number of rows to process in each chunk (0 means auto)."), "bands": StageParameter(str, "riz", msg="Bands to use for diagnostics."), @@ -53,47 +53,42 @@ def run(self): _, da = import_dask() # Configuration parameters - psf_prefix = self.config["psf_prefix"] - shear_prefix = self.config["shear_prefix"] chunk_rows = self.config["chunk_rows"] nedge = self.config["nbins"] + 1 if chunk_rows == 0: chunk_rows = "auto" # We canonicalise the names here - col_names = { - "psf_g1": f"{psf_prefix}g1", - "psf_T_mean": f"{psf_prefix}T_mean", - "s2n": f"{shear_prefix}s2n", - "T": f"{shear_prefix}T", - } - + col_names = ["psf_g1", "psf_g2", "psf_T", "s2n", "T"] for band in self.config["bands"]: - col_names[f"mag_{band}"] = f"{shear_prefix}mag_{band}" + col_names.append(f"mag_{band}") # We ask for quantiles at these points quantiles = np.linspace(0, 1, nedge, endpoint=True) percentiles = quantiles * 100 - with self.open_input("shear_catalog") as f, self.open_input("shear_tomography_catalog") as g: + with self.open_input("shear_catalog", wrapper=True) as f, self.open_input("shear_tomography_catalog") as g: # We will be checking if the source is in a tomographic bin # and doing quantiles only of selected obejcts (in any bin) bins = da.from_array(g["tomography/bin"], chunks=chunk_rows) selected = bins >= 0 + fg = f.file[f.get_primary_catalog_group()] + # We now build up the quantile values quantile_values = {} - for new_name, old_name in col_names.items(): + for name in col_names: + print(name) # Create dask arrays of the columns. This loads them lazily, # so no data is actually loaded here. Only when the "compute" # method is called below does anything actually happen. - col = da.from_array(f[f"shear/{old_name}"], chunks=chunk_rows) + col = da.from_array(fg[name], chunks=chunk_rows) # Ask dask to compute the percentiles of this column. # Again, it will not actually do anything until the "compute" # method is called below. When that happens, it will # chunk up the data and calculate the percentiles in parallel. - quantile_values[new_name] = da.percentile(col[selected], percentiles) + quantile_values[name] = da.percentile(col[selected], percentiles) # Now ask dask to actually do the calculations (quantile_values,) = da.compute(quantile_values) @@ -194,7 +189,7 @@ def run(self): shear_cols = [ f"{psf_prefix}g1", f"{psf_prefix}g2", - f"{psf_prefix}T_mean", + f"{psf_prefix}T", "mcal_g1", "mcal_g1_1p", "mcal_g1_2p", @@ -223,9 +218,9 @@ def run(self): "g1", "g2", "T", - "mcal_psf_g1", - "mcal_psf_g2", - "mcal_psf_T_mean", + "psf_g1", + "psf_g2", + "psf_T", "s2n", "weight", ) @@ -237,7 +232,7 @@ def run(self): "psf_g2", "g1", "g2", - "psf_T_mean", + "psf_T", "s2n", "T", "weight", @@ -260,12 +255,14 @@ def run(self): "tomography", shear_tomo_cols, *more_iters, + longest=True, ) # Now loop through each chunk of input data, one at a time. # Each time we get a new segment of data, which goes to all the plotters for start, end, data in it: print(f"Read data {start} - {end}") + # This causes each data = yield statement in each plotter to # be given this data chunk as the variable data. @@ -361,6 +358,11 @@ def plot_psf_shear(self): plt.plot(mu1, line12, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope12, std_err12)) plt.plot(mu1, [0] * len(line11), color="black") + std11[std11<0] = 0 + std12[std12<0] = 0 + std21[std21<0] = 0 + std22[std22<0] = 0 + plt.errorbar(mu1 + dx, mean11, std11, label="g1", fmt="s", markersize=5, color="red") plt.errorbar(mu1 - dx, mean12, std12, label="g2", fmt="o", markersize=5, color="blue") @@ -398,10 +400,10 @@ def plot_psf_size_shear(self): psf_prefix = self.config["psf_prefix"] delta_gamma = self.config["delta_gamma"] - psf_T_edges = self.get_bin_edges("psf_T_mean") + psf_T_edges = self.get_bin_edges("psf_T") binnedShear = MeanShearInBins( - f"{psf_prefix}T_mean", + f"{psf_prefix}T", psf_T_edges, delta_gamma, cut_source_bin=True, @@ -421,6 +423,7 @@ def plot_psf_size_shear(self): if self.rank != 0: return + dx = 0.05 * (psf_T_edges[1] - psf_T_edges[0]) idx = np.where(np.isfinite(mu))[0] @@ -433,6 +436,10 @@ def plot_psf_size_shear(self): fig = self.open_output("g_psf_T", wrapper=True) + + std1[std1<0] = 0 + std2[std2<0] = 0 + plt.plot(mu, line1, color="red", label=r"$m=%.2e \pm %.2e$" % (slope1, std_err1)) plt.plot(mu, line2, color="blue", label=r"$m=%.2e \pm %.2e$" % (slope2, std_err2)) plt.plot(mu, [0] * len(mu), color="black") @@ -497,6 +504,10 @@ def plot_snr_shear(self): std_err2 = mc_cov[0, 0] ** 0.5 line2 = slope2 * mu + intercept2 + + std1[std1<0] = 0 + std2[std2<0] = 0 + fig = self.open_output("g_snr", wrapper=True) plt.plot(mu, line1, color="red", label=r"$m=%.2e \pm %.2e$" % (slope1, std_err1)) @@ -559,6 +570,9 @@ def plot_size_shear(self): std_err2 = mc_cov[0, 0] ** 0.5 line2 = slope2 * mu + intercept2 + std1[std1<0] = 0 + std2[std2<0] = 0 + fig = self.open_output("g_T", wrapper=True) plt.plot(mu, line1, color="red", label=r"$m=%.2e \pm %.2e$" % (slope1, std_err1)) @@ -613,6 +627,7 @@ def plot_mag_shear(self): binnedShear[f"{band}"].add_data(data) for band in self.config["bands"]: + print("collecting for", band) ( stat[f"mu_{band}"], stat[f"mean1_{band}"], @@ -653,6 +668,8 @@ def plot_mag_shear(self): label=r"$m=%.2e \pm %.2e$" % (stat[f"slope1_{band}"], stat[f"std_err1_{band}"]), ) plt.plot(stat[f"mu_{band}"], [0] * len(stat[f"mu_{band}"]), color="black") + stat[f"std1_{band}"][stat[f"std1_{band}"] < 0] = 0 + stat[f"std2_{band}"][stat[f"std2_{band}"] < 0] = 0 plt.errorbar( stat[f"mu_{band}"] + dx, stat[f"mean1_{band}"], diff --git a/txpipe/ingest/__init__.py b/txpipe/ingest/__init__.py index 0b381c8d7..ab2f8ce71 100644 --- a/txpipe/ingest/__init__.py +++ b/txpipe/ingest/__init__.py @@ -1,5 +1,5 @@ from .dp02 import TXIngestDataPreview02 -from .mocks import TXCosmoDC2Mock, TXGaussianSimsMock +from .mocks import TXIngestRomanRubin, TXIngestSkySim, TXIngestCosmoDC2, TXIngestBuzzard, TXSimpleMock, TXMockTruthPZ from .gcr import TXMetacalGCRInput, TXIngestStars from .redmagic import TXIngestRedmagic from .ssi import ( diff --git a/txpipe/ingest/mock_tools.py b/txpipe/ingest/mock_tools.py new file mode 100644 index 000000000..80270c53d --- /dev/null +++ b/txpipe/ingest/mock_tools.py @@ -0,0 +1,328 @@ +import numpy as np +from ..utils.conversion import mag_ab_to_nanojansky, nanojansky_to_mag_ab, nanojansky_err_to_mag_ab, mag_ab_err_to_nanojansky_err + + + +# These are quick, rough, and global numbers JZ measured on DES Y6 +# metadetect catalogs to use as defaults for response values +# The value d(parameter)/d(g_i) has the key (parameter, i) +desy6_responses = { + ("g1", 1): 0.7522786345399349, + ("g1", 2): -7.221603811297746e-05, + ("g2", 1): 0.000695986991087691, + ("g2", 2): 0.753689800584438, + ("s2n", 1): 0.4495460648463734, + ("s2n", 2): -0.2446177233672131, + ("flux_g", 1): 2.200867381810667, + ("flux_g", 2): -0.7594371207147788, + ("flux_r", 1): 6.4665622482152685, + ("flux_r", 2): -3.319059863883922, + ("flux_i", 1): 9.973907233552382, + ("flux_i", 2): -5.844535374410498, + ("flux_z", 1): 13.01874158668852, + ("flux_z", 2): -7.8327478646315285, + ("T", 1): 0.0002450368855461127, + ("T", 2): -0.0005327801992860426, +} + + +mock_psf_stats = { + # trace size in arcsec^2 + # corresponds to FWHM 0.75 arcsec + "T": (0.20287899012501048, 0.025), + "e1": (0.0, 0.03), + "e2": (0.0, 0.03), +} + +unit_responses = { + ("g1", 1): 1.0, + ("g1", 2): 0.0, + ("g2", 1): 0.0, + ("g2", 2): 1.0, + ("s2n", 1): 0.0, + ("s2n", 2): 0.0, + ("flux_g", 1): 0.0, + ("flux_g", 2): 0.0, + ("flux_r", 1): 0.0, + ("flux_r", 2): 0.0, + ("flux_i", 1): 0.0, + ("flux_i", 2): 0.0, + ("flux_z", 1): 0.0, + ("flux_z", 2): 0.0, + ("T", 1): 0.0, + ("T", 2): 0.0, +} + + + +def add_lsst_like_noise(data, random_state, year=1, **config): + from photerr import LsstErrorModel + import pandas as pd + + # LsstErrorModel is the newer of the photometry models + # and should be more representative of real data. + + # Find all bands available in the data + bands = [b for b in "ugrizy" if f"mag_{b}" in data] + + model = LsstErrorModel( + nYrObs=year, + **config + ) + + df = pd.DataFrame({b: data[f"mag_{b}"] for b in bands}) + noisy_data = model(df, random_state=random_state) + + # Keep all the input columns as expected. + + for b in bands: + mag = np.array(noisy_data[b].values) + mag_err = np.array(noisy_data[f"{b}_err"].values) + flux = mag_ab_to_nanojansky(mag) + flux_err = mag_ab_err_to_nanojansky_err(mag, mag_err) + snr = flux / flux_err + data[f"mag_{b}"] = mag + data[f"mag_err_{b}"] = mag_err + data[f"snr_{b}"] = snr + data[f"s2n_{b}"] = snr + + + + +def make_metadetect_catalog(data, response_type, delta_gamma, bands, rng, snr_cut=4): + output = {} + + # These parameters have response factors defined for them. + params = ["g1", "g2", "T"] + + # For these parameters we do not apply any response correction, + # they have the same value in the metadetect variants. + # Note that this will not be the case in real life - different + # sets of galaxies will be identified in each metadetect variant, + # and locations will change. + non_response_params = ["ra", "dec", "id", "redshift_true"] + + if response_type == "desy6": + responses = desy6_responses + elif response_type == "unit": + responses = unit_responses + else: + raise ValueError(f"Unknown response type {response_type}") + + + + for param in params: + output[f"00/{param}"] = data[param] + output[f"1p/{param}"] = data[param] + responses[param, 1] * delta_gamma / 2 + output[f"1m/{param}"] = data[param] - responses[param, 1] * delta_gamma / 2 + output[f"2p/{param}"] = data[param] + responses[param, 2] * delta_gamma / 2 + output[f"2m/{param}"] = data[param] - responses[param, 2] * delta_gamma / 2 + + for param in non_response_params: + output[f"00/{param}"] = data[param] + output[f"1p/{param}"] = data[param] + output[f"1m/{param}"] = data[param] + output[f"2p/{param}"] = data[param] + output[f"2m/{param}"] = data[param] + + s2n_total = 0 + s2n_total_1p = 0 + s2n_total_1m = 0 + s2n_total_2p = 0 + s2n_total_2m = 0 + + dg2 = delta_gamma / 2 + + for b in bands: + mag = data["mag_" + b] + mag_err = data["mag_err_" + b] + flux = mag_ab_to_nanojansky(mag) + flux_err = mag_ab_err_to_nanojansky_err(mag, mag_err) + + s2n = flux / flux_err + s2n_1p = s2n + responses["s2n", 1] * dg2 + s2n_1m = s2n - responses["s2n", 1] * dg2 + s2n_2p = s2n + responses["s2n", 2] * dg2 + s2n_2m = s2n - responses["s2n", 2] * dg2 + + s2n_total += s2n ** 2 + s2n_total_1p += s2n_1p ** 2 + s2n_total_1m += s2n_1m ** 2 + s2n_total_2p += s2n_2p ** 2 + s2n_total_2m += s2n_2m ** 2 + + flux_1p = flux + responses[(f"flux_{b}", 1)] * dg2 + flux_1m = flux - responses[(f"flux_{b}", 1)] * dg2 + flux_2p = flux + responses[(f"flux_{b}", 2)] * dg2 + flux_2m = flux - responses[(f"flux_{b}", 2)] * dg2 + # there are some zero fluxes for non-detections. We silence the warning + with np.errstate(divide = 'ignore'): + output[f"00/mag_{b}"] = nanojansky_to_mag_ab(flux) + output[f"1p/mag_{b}"] = nanojansky_to_mag_ab(flux_1p) + output[f"1m/mag_{b}"] = nanojansky_to_mag_ab(flux_1m) + output[f"2p/mag_{b}"] = nanojansky_to_mag_ab(flux_2p) + output[f"2m/mag_{b}"] = nanojansky_to_mag_ab(flux_2m) + + output[f"00/mag_err_{b}"] = nanojansky_err_to_mag_ab(flux, flux_err) + output[f"1p/mag_err_{b}"] = nanojansky_err_to_mag_ab(flux_1p, flux_err) + output[f"1m/mag_err_{b}"] = nanojansky_err_to_mag_ab(flux_1m, flux_err) + output[f"2p/mag_err_{b}"] = nanojansky_err_to_mag_ab(flux_2p, flux_err) + output[f"2m/mag_err_{b}"] = nanojansky_err_to_mag_ab(flux_2m, flux_err) + + + + output["00/s2n"] = np.sqrt(s2n_total) + output["1p/s2n"] = np.sqrt(s2n_total_1p) + output["1m/s2n"] = np.sqrt(s2n_total_1m) + output["2p/s2n"] = np.sqrt(s2n_total_2p) + output["2m/s2n"] = np.sqrt(s2n_total_2m) + + + psf_T, psf_e1, psf_e2 = make_mock_psf(output["00/ra"].size, rng) + + for variant in ["00", "1p", "1m", "2p", "2m"]: + output[f"{variant}/psf_T"] = psf_T + output[f"{variant}/psf_g1"] = psf_e1 + output[f"{variant}/psf_g2"] = psf_e2 + + # Now we do the cuts on each variant separately + for variant in ["00", "1p", "1m", "2p", "2m"]: + snr = output[f"{variant}/s2n"] + mask = snr > snr_cut + # T cut is done later + for key in output: + if key.startswith(f"{variant}/"): + output[key] = output[key][mask] + n = output[f"{variant}/ra"].size + output[f"{variant}/weight"] = np.ones(n, dtype=np.float32) + output[f"{variant}/flags"] = np.zeros(n, dtype=np.int32) + + return output + + +def make_mock_psf(n, rng): + T = rng.normal(mock_psf_stats["T"][0], mock_psf_stats["T"][1], n) + e1 = rng.normal(mock_psf_stats["e1"][0], mock_psf_stats["e1"][1], n) + e2 = rng.normal(mock_psf_stats["e2"][0], mock_psf_stats["e2"][1], n) + return T, e1, e2 + + +def generate_mock_metacal_mag_responses(bands, nobj): + nband = len(bands) + mu = np.zeros(nband) # seems approx mean of response across bands, from HSC tract + rho = 0.25 # approx correlation between response in bands, from HSC tract + sigma2 = 1.7**2 # approx variance of response, from HSC tract + covmat = np.full((nband, nband), rho * sigma2) + np.fill_diagonal(covmat, sigma2) + mag_responses = np.random.multivariate_normal(mu, covmat, nobj).T + return mag_responses + + +class TableWriter: + def __init__(self, group, initial_size=1_000_000): + import h5py + self.group = group + self.created_datasets = False + self.initial_size = initial_size + self.keys = [] + self.current_size = {} + self.start = 0 + + def do_size_check(self, data): + sz = None + for key, value in data.items(): + if sz is None: + sz = len(value) + elif sz != len(value): + raise ValueError("All arrays to write must have the same length") + return sz + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.resize(self.start) + + def create_datasets(self, data): + size = max(self.initial_size, len(data[next(iter(data))])) + shape = (size,) + for key, value in data.items(): + maxshape = (None,) + self.group.create_dataset(key, shape, maxshape=maxshape, dtype=value.dtype) + self.current_size = size + self.created_datasets = True + self.keys = list(data.keys()) + + def resize(self, new_size): + for key in self.keys: + self.group[key].resize((new_size,)) + self.current_size = new_size + + def write(self, data): + sz = self.do_size_check(data) + + if not self.created_datasets: + self.create_datasets(data) + + end = self.start + sz + + if end > self.current_size: + new_size = max(self.current_size * 2, end) + self.resize(new_size) + + for key in self.keys: + value = data[key] + self.group[key][self.start:end] = value + self.start = end + + +class ShearTableWriter: + def __init__(self, filename, initial_size=1_000_000): + import h5py + self.file = h5py.File(filename, "w") + self.group = self.file.create_group("shear") + self.writers = {} + for variant in ["00", "1p", "1m", "2p", "2m"]: + group = self.file.create_group(f"shear/{variant}") + self.writers[variant] = TableWriter(group, initial_size=initial_size) + + def __enter__(self): + for writer in self.writers.values(): + writer.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + for writer in self.writers.values(): + writer.__exit__(exc_type, exc_value, traceback) + self.file.close() + + def write(self, data): + for variant in self.writers: + tag = variant + "/" + data_variant = {key.removeprefix(tag): value for key, value in data.items() if key.startswith(tag)} + self.writers[variant].write(data_variant) + + + + + +def make_photo_cuts(data, bands, snr_cut): + # check if object is detected in any band + detected = np.zeros_like(data["id"], dtype=bool) + for b in bands: + snr = data[f"snr_{b}"] + detected |= (snr >= snr_cut) + + output = {} + for key, value in data.items(): + output[key] = value[detected] + return output + + + + +def half_light_radius_to_trace(hlr): + """Convert half-light radius to trace T of the Gaussian matrix""" + size_sigma = hlr / np.sqrt(2 * np.log(2)) + size_T = 2 * size_sigma**2 + return size_T diff --git a/txpipe/ingest/mocks.py b/txpipe/ingest/mocks.py index 39981b780..5edf135bb 100755 --- a/txpipe/ingest/mocks.py +++ b/txpipe/ingest/mocks.py @@ -1,64 +1,170 @@ +import glob +import numpy as np +from ..data_types import ShearCatalog, TextFile, QPPDFFile, PhotometryCatalog from ..base_stage import PipelineStage -from ..data_types import ShearCatalog, HDFFile, TextFile, QPPDFFile -from ..utils import band_variants, metacal_variants, metadetect_variants, Timer +from .mock_tools import add_lsst_like_noise, make_metadetect_catalog, TableWriter, ShearTableWriter, make_photo_cuts, half_light_radius_to_trace from ceci.config import StageParameter -import numpy as np - - -class TXCosmoDC2Mock(PipelineStage): - """ - Simulate mock shear and photometry measurements from CosmoDC2 (or similar) - - This stage simulates metacal data and metacalibrated - photometry measurements, starting from a cosmology catalogs - of the kind used as an input to DC2 image and obs-catalog simulations. - This is mainly useful for testing infrastructure in advance - of the DC2 catalogs being available, but might also be handy - for starting from a purer simulation. - """ - name = "TXCosmoDC2Mock" +class TXIngestExtraGalactic(PipelineStage): + name = "TXIngestExtraGalactic"; + inputs = [] parallel = False - inputs = [("response_model", HDFFile)] - outputs = [ ("shear_catalog", ShearCatalog), - ("photometry_catalog", HDFFile), + ("photometry_catalog", PhotometryCatalog), ] config_options = { - "cat_name": StageParameter(str, "cosmoDC2", msg="Name of the mock catalog to use."), - "visits_per_band": StageParameter(int, 165, msg="Number of visits per band for noise simulation."), - "snr_limit": StageParameter(float, 4.0, msg="S/N limit for object selection."), - "max_size": StageParameter(int, 99999999999999, msg="Maximum catalog size for testing."), - "extra_cols": StageParameter(str, "", msg="Extra columns to include (space-separated)."), - "max_npix": StageParameter(int, 99999999999999, msg="Maximum number of pixels."), - "unit_response": StageParameter(bool, False, msg="Whether to use unit response in simulation."), - "cat_size": StageParameter(int, 0, msg="Pre-computed catalog size, if known."), - "flip_g2": StageParameter( - bool, True, msg="Whether to flip g2 sign to match conventions. TreeCorr and NaMaster use flip_g2=True" - ), - "apply_mag_cut": StageParameter(bool, False, msg="Apply magnitude cut for descqa comparison."), - "Mag_r_limit": StageParameter(float, -19, msg="Magnitude r limit for object selection."), - "metadetect": StageParameter( - bool, True, msg="Whether to make a metadetect-style catalog (True) or metacal (False)." - ), - "add_shape_noise": StageParameter(bool, True, msg="Whether to add shape noise to simulation."), - "healpixels": StageParameter(list, [-1], msg="List of HEALPix pixels to use."), + "delta_gamma": StageParameter(float, default=0.02, msg="Delta gamma value for metadetect response calculations"), + "year": StageParameter(int, default=1, msg="Number of years of LSST observations to simulate photometric noise for"), + "response_type": StageParameter(str, default="desy6", msg="Type of response to apply for metadetect"), + "shear_snr_cut": StageParameter(float, default=4.0, msg="SNR cut for shear, total over shear bands"), + "phot_snr_cut": StageParameter(float, default=5.0, msg="SNR cut to be in the photometry catalog, any band"), + "T_ratio_cut": StageParameter(float, default=0.5, msg="T/PSF_T cut for metadetect catalog"), + "random_seed": StageParameter(int, default=0, msg="Random seed"), + "bands": StageParameter(str, default="ugrizy", msg="Bands to ingest photometry for"), + "shear_bands": StageParameter(str, default="griz", msg="Bands to use for shear metadetect"), + } + + def run(self): + shear_filename = self.get_output("shear_catalog") + photo_file = self.open_output("photometry_catalog") + photo_group = photo_file.create_group("photometry") + + shear_snr_cut = self.config["shear_snr_cut"] + phot_snr_cut = self.config["phot_snr_cut"] + + rng = np.random.default_rng(self.config['random_seed']) + size = self.config.get("initial_size", 10_000_000) + + with ShearTableWriter(shear_filename, initial_size=size) as shear_writer, \ + TableWriter(photo_group, initial_size=size) as photo_writer: + for data in self.data_iterator(): + self.add_additional_columns(data) + add_lsst_like_noise(data, rng, year=self.config["year"]) + shear_data = make_metadetect_catalog(data, self.config["response_type"], self.config["delta_gamma"], self.config["shear_bands"], rng, snr_cut=shear_snr_cut) + photo_data = make_photo_cuts(data, self.config["bands"], phot_snr_cut) + shear_writer.write(shear_data) + photo_writer.write(photo_data) + + def add_additional_columns(self, data): + if "extendedness" not in data: + # Extra-galactic objects are all extended, no stars are included + data["extendedness"] = np.ones_like(data["ra"], dtype=np.int8) + + def data_iterator(self): + raise NotImplementedError("data_iterator must be implemented in subclass") + + + + +class TXIngestRomanRubin(TXIngestExtraGalactic): + """Ingest skysim simulation data for TXPipe processing. + """ + name = "TXIngestRomanRubin" + config_options = TXIngestExtraGalactic.config_options | { + "xgal_dir_name": StageParameter(str, default="/global/cfs/cdirs/lsst/shared/xgal/roman-rubin/roman_rubin_2023_v1.1.3", msg="Directory name for skysim xgal files"), + "file_pattern": StageParameter(str, default="roman_rubin_2023_*.hdf5", msg="Pattern for file name"), + } + + def data_iterator(self): + import h5py + bands = "ugrizy" + file_format = self.config["file_pattern"] + files = glob.glob(f"{self.config.xgal_dir_name}/{file_format}") + nfile = len(files) + for j, filename in enumerate(files): + print(f"Processing file {j+1}/{nfile}: {filename}") + with h5py.File(filename, "r") as f: + nkey = len(f.keys()) - 1 + + # The keys here are mostly healpix pixel indices. + for i, key in enumerate(f.keys()): + if key == "metaData": + continue + + group = f[key] + + if "ra" not in group.keys(): + print(f" No data - skipping healpix pixel {i+1}/{nkey}: {key} ") + continue + + print(f" Processing healpix pixel {i+1}/{nkey}: {key}") + data = self.extract_roman_rubin_truth_info(group, bands) + yield data + + def extract_roman_rubin_truth_info(self, group, bands): + params = [f"LSST_obs_{b}" for b in bands] + params += ["redshift", "ra", "dec", "galaxy_id", "shear1", "shear2", "diskHalfLightRadiusArcsec", "spheroidHalfLightRadiusArcsec", "bulge_frac"] + if "totalEllipticity1" in group: + params += ["totalEllipticity1", "totalEllipticity2"] + else: + params += ["spheroidEllipticity1", "spheroidEllipticity2", "diskEllipticity1", "diskEllipticity2"] + data = {p: group[p][:] for p in params} + output = {} + output["ra"] = data["ra"] + output["dec"] = data["dec"] + output["redshift_true"] = data["redshift"] + + output["id"] = data["galaxy_id"] + for b in bands: + output[f"mag_{b}"] = data[f'LSST_obs_{b}'] + + # Compute the overall (bulge + disc) half-light radius. + # This copies the GCRCatalogs approach, which + # I'm noy sure is quite right. + hlr_disc = data["diskHalfLightRadiusArcsec"] + hlr_bulge = data["spheroidHalfLightRadiusArcsec"] + f = data["bulge_frac"] + hlr = (hlr_disc * (1 - f) + hlr_bulge * f) + + if "totalEllipticity1" in data: + e1 = data["totalEllipticity1"] + e2 = data["totalEllipticity2"] + else: + e1 = data["spheroidEllipticity1"] * data["bulge_frac"] + data["diskEllipticity1"] * (1 - data["bulge_frac"]) + e2 = data["spheroidEllipticity2"] * data["bulge_frac"] + data["diskEllipticity2"] * (1 - data["bulge_frac"]) + + g = data["shear1"] + 1j * data["shear2"] + e = e1 + 1j * e2 + # Apply shear to get observed ellipticity + denom = 1 + np.conj(g) * e + e_obs = (e + g) / denom + output["g1"] = np.real(e_obs) + output["g2"] = np.imag(e_obs) + output["true_g1"] = data["shear1"] + output["true_g2"] = data["shear2"] + + + # Convert half-light radius to T + output["T"] = half_light_radius_to_trace(hlr) + return output + + + +class TXIngestSkySim(TXIngestExtraGalactic): + """Ingest skysim simulation data for TXPipe processing. + """ + name = "TXIngestSkySim" + config_options = TXIngestExtraGalactic.config_options | { + "cat_name": StageParameter(str, default="skysim5000_v1.2", msg="Name of the GCR catalog to use"), + "extra_cols": StageParameter(str, default="", msg="Extra columns to ingest from the catalog"), } - def data_iterator(self, gc): + def data_iterator(self): + import GCRCatalogs + gc = GCRCatalogs.load_catalog(self.config["cat_name"]) # Columns we need from the cosmo simulation cols = [ + "ra", + "dec", "mag_true_u_lsst", "mag_true_g_lsst", "mag_true_r_lsst", "mag_true_i_lsst", "mag_true_z_lsst", "mag_true_y_lsst", - "ra", - "dec", "ellipticity_1_true", "ellipticity_2_true", "shear_1", @@ -77,1094 +183,59 @@ def data_iterator(self, gc): if nfile: j = i + 1 print(f"Loading chunk {j}/{nfile}") - yield data - - def load_catalog(self): - import GCRCatalogs - - cat_name = self.config["cat_name"] - self.bands = ("u", "g", "r", "i", "z", "y") - - print(f"Loading from catalog {cat_name}") - - kw = {} - if self.config["healpixels"] != [-1]: - kw = {"config_overwrite": {"healpix_pixels": self.config["healpixels"]}} - - gc = GCRCatalogs.load_catalog(cat_name, **kw) - - # GCR sometimes tries to read the entire catalog - # to measure its length rather than looking at metadata - # this can take a very long time. - # allow the user to say that already know it. - N = self.config["cat_size"] - if N == 0: - N = len(gc) - - return gc, N - - def run(self): - gc, N = self.load_catalog() - - print(f"Rank {self.rank} loaded: length = {N}.") - - target_size = min(N, self.config["max_size"]) - select_fraction = target_size / N - - if target_size != N: - print(f"Will select approx {100 * select_fraction:.2f}% of objects ({target_size})") - - # Prepare output files - metacal_file = self.open_output("shear_catalog", parallel=self.is_mpi()) - photo_file = self.open_output("photometry_catalog", parallel=self.is_mpi()) - photo_cols = self.setup_photometry_output(photo_file, target_size) - if self.config["metadetect"]: - metacal_cols = self.setup_metadetect_output(metacal_file, target_size) - else: - metacal_cols = self.setup_metacal_output(metacal_file, target_size) - - # Load the metacal response file - self.load_metacal_response_model() - - # Keep track of catalog position - start = 0 - count = 0 - - # Loop through chunks of - for data in self.data_iterator(gc): - # The initial chunk size, of all the input data. - # This will be reduced later as we remove objects - some_col = list(data.keys())[0] - chunk_size = len(data[some_col]) - print(f"Process {self.rank} read chunk {count} - {count + chunk_size} of {N}") - count += chunk_size - # Select a random fraction of the catalog if we are cutting down - # We can't just take the earliest galaxies because they are ordered - # by redshift - if target_size != N: - select = np.random.uniform(size=chunk_size) < select_fraction - nselect = select.sum() - print(f"Cutting down to {nselect}/{chunk_size} objects") - for name in list(data.keys()): - data[name] = data[name][select] - - # Simulate the various output data sets - mock_photometry = self.make_mock_photometry(data) - - # Cut out any objects too faint to be detected and measured. - # We have to do this after the photometry, so that we know if - # the object is detected, but we can do it before making the mock - # metacal info, saving us some time simulating un-needed objects - if self.config["snr_limit"] > 0: # otherwise there is no need to run this function which is slow - self.remove_undetected(data, mock_photometry) - - if self.config["apply_mag_cut"]: - self.apply_magnitude_cut(data) - - if self.config["metadetect"]: - mock_shear = self.make_mock_metadetect(data, mock_photometry) - else: - mock_shear = self.make_mock_metacal(data, mock_photometry) - - # The chunk size has now changed - some_col = list(mock_photometry.keys())[0] - chunk_size = len(mock_photometry[some_col]) - - # start is where this process should start writing this - # chunk of data. end is where the final process will finish - # writing, becoming the starting point for the whole next - # chunk over all the processes - start, end = self.next_output_indices(start, chunk_size) - - # Save all output - self.write_output( - start, - target_size, - photo_cols, - metacal_cols, - photo_file, - mock_photometry, - metacal_file, - mock_shear, - ) - - # The next iteration starts writing where the current one ends. - start = end - - if start >= target_size: - break - # Tidy up - - photo_file.close() - metacal_file.close() - - self.truncate_outputs(end) - - def truncate_outputs(self, n): - import h5py - - if self.comm is not None: - self.comm.Barrier() - - def visitor(name, node): - if isinstance(node, h5py.Dataset): - print(f"Resizing {name}") - node.resize((n,)) - - if self.rank == 0: - # all files should now be closed for all procs - print(f"Resizing all outupts to size {n}") - - with h5py.File(self.get_output("photometry_catalog"), "r+") as f: - f["photometry"].visititems(visitor) - - with h5py.File(self.get_output("shear_catalog"), "r+") as f: - f["shear"].visititems(visitor) - - def next_output_indices(self, start, chunk_size): - if self.comm is None: - end = start + chunk_size - else: - all_indices = self.comm.allgather(chunk_size) - starting_points = np.concatenate(([0], np.cumsum(all_indices))) - # use the old start to find the end point. - # the final starting point (not used below, since it is larger - # than the largest self.rank value) is the total data length - end = start + starting_points[-1] - start = start + starting_points[self.rank] - print(f"- Rank {self.rank} writing output to {start}-{start + chunk_size}") - return start, end - - def setup_photometry_output(self, photo_file, target_size): - # Get a list of all the column names - cols = ["ra", "dec", "extendedness"] - for band in self.bands: - cols.append(f"mag_{band}") - cols.append(f"mag_{band}_err") - cols.append(f"snr_{band}") - - for col in self.config["extra_cols"].split(): - cols.append(col) - - # Make group for all the photometry - group = photo_file.create_group("photometry") - group.attrs["bands"] = self.bands - - # Extensible columns becase we don't know the size yet. - # We will cut down the size at the end. - for col in cols: - group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype="f8") - - # The only non-float column for now - group.create_dataset("id", (target_size,), maxshape=(target_size,), dtype="i8") - - return cols + ["id"] - - def setup_metadetect_output(self, metacal_file, target_size): - # Get a list of all the column names - cols = metadetect_variants( - "g1", - "g2", - "T", - "s2n", - "T_err", - "ra", - "dec", - "psf_g1", - "psf_g2", - "mcal_psf_g1", - "mcal_psf_g2", - "mcal_psf_T_mean", - "weight", - ) + band_variants("riz", "mag", "mag_err", shear_catalog_type="metadetect") - - # Store the truth values only for the primary catalog - cols += ["00/true_g1", "00/true_g2", "00/redshift_true"] - - # Make group for all the photometry - group = metacal_file.create_group("shear") - group.attrs["bands"] = self.bands - - # Extensible columns becase we don't know the size yet. - # We will cut down the size at the end. - for col in cols: - group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype="f8") - - # Integer columns - int_cols = metadetect_variants("id", "flags") - for col in int_cols: - group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype="i8") - - return cols + int_cols - - def setup_metacal_output(self, metacal_file, target_size): - # Get a list of all the column names - cols = ( - [ - "ra", - "dec", - "psf_g1", - "psf_g2", - "mcal_psf_g1", - "mcal_psf_g2", - "mcal_psf_T_mean", - ] - + metacal_variants("mcal_g1", "mcal_g2", "mcal_T", "mcal_s2n", "mcal_T_err") - + band_variants("riz", "mcal_mag", "mcal_mag_err", shear_catalog_type="metacal") - + ["weight"] - ) - - cols += ["true_g1", "true_g2", "redshift_true"] - - # Make group for all the photometry - group = metacal_file.create_group("shear") - group.attrs["bands"] = self.bands - - # Extensible columns becase we don't know the size yet. - # We will cut down the size at the end. - - for col in cols: - group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype="f8") - - group.create_dataset("id", (target_size,), maxshape=(target_size,), dtype="i8") - - for col in metacal_variants("mcal_flags"): - group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype="i8") - - return cols + ["id", "mcal_flags"] - - def load_metacal_response_model(self): - """ - Load an HDF file containing the response model - R(log10(snr), size) - R_std(log10(snr), size) - - where R is the mean metacal response in a bin and - R_std is its standard deviation. - - So far only one of these files exists! - - """ - import scipy.interpolate - - if self.config["unit_response"]: - return - - model_file = self.open_input("response_model") - snr_centers = model_file["R_model/log10_snr"][:] - sz_centers = model_file["R_model/size"][:] - R_mean = model_file["R_model/R_mean"][:] - R_std = model_file["R_model/R_std"][:] - model_file.close() - - # Save a 2D spline - snr_grid, sz_grid = np.meshgrid(snr_centers, sz_centers) - self.R_spline = scipy.interpolate.SmoothBivariateSpline( - snr_grid.T.flatten(), - sz_grid.T.flatten(), - R_mean.flatten(), - w=R_std.flatten(), - ) - self.Rstd_spline = scipy.interpolate.SmoothBivariateSpline( - snr_grid.T.flatten(), sz_grid.T.flatten(), R_std.flatten() - ) - - def write_output( - self, - start, - target_size, - photo_cols, - metacal_cols, - photo_file, - photo_data, - metacal_file, - metacal_data, - ): - """ - Save the photometry we have just simulated to disc - - Parameters - ---------- - - photo_file: HDF File object - - metacal_file: HDF File object - - photo_data: dict - Dictionary of simulated photometry - - metacal_data: dict - Dictionary of simulated metacal data - - """ - # Work out the range of data to output (since we will be - # doing this in chunks). If we have cut down to a random - # subset of the catalog then we may have gone over the - # target length, depending on the random selection - n = len(photo_data["id"]) - end = min(start + n, target_size) - - # assert photo_data['id'].min()>0 - - # Save each column - for name in photo_cols: - photo_file[f"photometry/{name}"][start:end] = photo_data[name] - - for name in metacal_cols: - metacal_file[f"shear/{name}"][start:end] = metacal_data[name] - - def make_mock_photometry(self, data): - # The visit count affects the overall noise levels - n_visit = self.config["visits_per_band"] - # Do all the work in the function below - photo = make_mock_photometry(n_visit, self.bands, data, self.config["unit_response"]) - - for col in self.config["extra_cols"].split(): - photo[col] = data[col] - - return photo + yield self.process_data(data) - def make_mock_metadetect(self, data, photo): - """ - !!! This function is not working properly yet!!! - Generate a mock metadetect table. - This is a long and complicated function unfortunately. - Throughout we assume a single R = R11 = R22, with R12 = R21 = 0 + def process_data(self, data): + output = {} - Parameters - ---------- - data: dict - Dictionary of arrays read from the input cosmo catalog - photo: dict - Dictionary of arrays generated to simulate photometry - """ + # Basic info + output["ra"] = data["ra"] + output["dec"] = data["dec"] + output["redshift_true"] = data["redshift_true"] + output["id"] = data["galaxy_id"] - # These are the numbers from figure F1 of the DES Y1 shear catalog paper - # (this version is not yet public but is awaiting a second referee response) + # Magnitudes + for b in "ugrizy": + output[f"mag_{b}"] = data[f"mag_true_{b}_lsst"] - # Overall SNR for the three bands usually used for shape measurement - # We use the true SNR not the estimated one, though these are pretty close - snr = (photo["snr_r"] ** 2 + photo["snr_i"] ** 2 + photo["snr_z"] ** 2) ** 0.5 - snr_1p = (photo["snr_r_1p"] ** 2 + photo["snr_i_1p"] ** 2 + photo["snr_z_1p"] ** 2) ** 0.5 - snr_1m = (photo["snr_r_1m"] ** 2 + photo["snr_i_1m"] ** 2 + photo["snr_z_1m"] ** 2) ** 0.5 - snr_2p = (photo["snr_r_2p"] ** 2 + photo["snr_i_2p"] ** 2 + photo["snr_z_2p"] ** 2) ** 0.5 - snr_2m = (photo["snr_r_2m"] ** 2 + photo["snr_i_2m"] ** 2 + photo["snr_z_2m"] ** 2) ** 0.5 + # Shear and ellipticity info + g = data["shear_1"] + 1j * data["shear_2"] + e = data["ellipticity_1_true"] + 1j * data["ellipticity_2_true"] - if self.config["unit_response"]: - assert np.allclose(snr, snr_1p) - assert np.allclose(snr, snr_2m) + # Apply shear to get observed ellipticity + denom = 1 + np.conj(g) * e + e_obs = (e + g) / denom + output["g1"] = np.real(e_obs) + output["g2"] = np.imag(e_obs) + output["true_g1"] = data["shear_1"] + output["true_g2"] = data["shear_2"] - nobj = snr.size - - log10_snr = np.log10(snr) - - # Convert from the half-light radius which is in the - # input catalogs to a sigma. Do this by pretending - # that it is a Gaussian. This is clearly wrong, and - # if this causes major errors in the size cuts we may - # have to modify this size_hlr = data["size_true"] - size_sigma = size_hlr / np.sqrt(2 * np.log(2)) - size_T = 2 * size_sigma**2 - - # Use a fixed PSF across all the objects - psf_fwhm = 0.75 - psf_sigma = psf_fwhm / (2 * np.sqrt(2 * np.log(2))) - psf_T = 2 * psf_sigma**2 - - if self.config["unit_response"]: - R = 1.0 - R_size = 0.0 - - else: - # Use the response model to get a reasonable response - # value for this size and SNR - R_mean = self.R_spline(log10_snr, size_T, grid=False) - R_std = self.Rstd_spline(log10_snr, size_T, grid=False) - - # Assume a 0.2 correlation between the size response - # and the shear response. - rho = 0.2 - f = np.random.multivariate_normal([0.0, 0.0], [[1.0, rho], [rho, 1.0]], nobj).T - R, R_size = f * R_std + R_mean - - # Convert magnitudes to fluxes according to the baseline - # use in the metacal numbers - flux_r = 10**0.4 * (27 - photo["mag_r"]) - flux_i = 10**0.4 * (27 - photo["mag_i"]) - flux_z = 10**0.4 * (27 - photo["mag_z"]) - - # Note that this is delta_gamma not 2*delta_gamma, because - # of how we use it below - delta_gamma = 0.01 - - # Use a fixed shape noise per component to generate - # an overall - if self.config["add_shape_noise"]: - shape_noise = 0.26 - else: - shape_noise = 0.0 - - eps = np.random.normal(0, shape_noise, nobj) + 1.0j * np.random.normal(0, shape_noise, nobj) - # True shears without shape noise - g1 = data["shear_1"] - g2 = data["shear_2"] - - if self.config["flip_g2"]: - g2 *= -1 - - # Do the full combination of (g,epsilon) -> e, not the approx one - g = g1 + 1j * g2 - e = (eps + g) / (1 + g.conj() * eps) - e1 = e.real - e2 = e.imag - - zero = np.zeros(nobj) - # Now collect together everything to go into the metacal - # file - output = { - # Basic values - "00/id": photo["id"], - "1p/id": photo["id"], - "1m/id": photo["id"], - "2p/id": photo["id"], - "2m/id": photo["id"], - "00/ra": photo["ra"], - "1p/ra": photo["ra"], - "1m/ra": photo["ra"], - "2p/ra": photo["ra"], - "2m/ra": photo["ra"], - "00/dec": photo["dec"], - "1p/dec": photo["dec"], - "1m/dec": photo["dec"], - "2p/dec": photo["dec"], - "2m/dec": photo["dec"], - # Keep the truth values for use in some testing paths - # We are not pretending to do meta - "00/true_g1": g1, - "00/true_g2": g2, - "00/redshift_true": photo["redshift_true"], - # g1 - "00/g1": e1 * R, - "1p/g1": (e1 + delta_gamma) * R, - "1m/g1": (e1 - delta_gamma) * R, - "2p/g1": e1 * R, - "2m/g1": e1 * R, - # g2 - "00/g2": e2 * R, - "1p/g2": e2 * R, - "1m/g2": e2 * R, - "2p/g2": (e2 + delta_gamma) * R, - "2m/g2": (e2 - delta_gamma) * R, - # T - "00/T": size_T, - "1p/T": size_T + R_size * delta_gamma, - "1m/T": size_T - R_size * delta_gamma, - "2p/T": size_T + R_size * delta_gamma, - "2m/T": size_T - R_size * delta_gamma, - # Terr - "00/T_err": zero, - "1p/T_err": zero, - "1m/T_err": zero, - "2p/T_err": zero, - "2m/T_err": zero, - # size - "00/s2n": snr, - "1p/s2n": snr_1p, - "1m/s2n": snr_1m, - "2p/s2n": snr_2p, - "2m/s2n": snr_2m, - # Magntiudes and fluxes, just copied from the inputs. - "00/mag_r": photo["mag_r"], - "00/mag_i": photo["mag_i"], - "00/mag_z": photo["mag_z"], - "00/mag_err_r": photo["mag_r_err"], - "00/mag_err_i": photo["mag_i_err"], - "00/mag_err_z": photo["mag_z_err"], - "1p/mag_r": photo["mag_r_1p"], - "2p/mag_r": photo["mag_r_2p"], - "1m/mag_r": photo["mag_r_1m"], - "2m/mag_r": photo["mag_r_2m"], - "1p/mag_i": photo["mag_i_1p"], - "2p/mag_i": photo["mag_i_2p"], - "1m/mag_i": photo["mag_i_1m"], - "2m/mag_i": photo["mag_i_2m"], - "1p/mag_z": photo["mag_z_1p"], - "2p/mag_z": photo["mag_z_2p"], - "1m/mag_z": photo["mag_z_1m"], - "2m/mag_z": photo["mag_z_2m"], - "1p/mag_err_r": photo["mag_r_err"], - "2p/mag_err_r": photo["mag_r_err"], - "1m/mag_err_r": photo["mag_r_err"], - "2m/mag_err_r": photo["mag_r_err"], - "1p/mag_err_i": photo["mag_i_err"], - "2p/mag_err_i": photo["mag_i_err"], - "1m/mag_err_i": photo["mag_i_err"], - "2m/mag_err_i": photo["mag_i_err"], - "1p/mag_err_z": photo["mag_z_err"], - "2p/mag_err_z": photo["mag_z_err"], - "1m/mag_err_z": photo["mag_z_err"], - "2m/mag_err_z": photo["mag_z_err"], - # Fixed PSF parameters - all round with same size - "00/mcal_psf_g1": zero, - "1p/mcal_psf_g1": zero, - "1m/mcal_psf_g1": zero, - "2p/mcal_psf_g1": zero, - "2m/mcal_psf_g1": zero, - "00/mcal_psf_g2": zero, - "1p/mcal_psf_g2": zero, - "1m/mcal_psf_g2": zero, - "2p/mcal_psf_g2": zero, - "2m/mcal_psf_g2": zero, - "00/psf_g1": zero, - "1p/psf_g1": zero, - "1m/psf_g1": zero, - "2p/psf_g1": zero, - "2m/psf_g1": zero, - "00/psf_g2": zero, - "1p/psf_g2": zero, - "1m/psf_g2": zero, - "2p/psf_g2": zero, - "2m/psf_g2": zero, - "00/mcal_psf_T_mean": np.repeat(psf_T, nobj), - "1p/mcal_psf_T_mean": np.repeat(psf_T, nobj), - "1m/mcal_psf_T_mean": np.repeat(psf_T, nobj), - "2p/mcal_psf_T_mean": np.repeat(psf_T, nobj), - "2m/mcal_psf_T_mean": np.repeat(psf_T, nobj), - # Everything that gets this far should be used, so flag=0 - "00/flags": np.zeros(nobj, dtype=np.int32), - "1p/flags": np.zeros(nobj, dtype=np.int32), - "1m/flags": np.zeros(nobj, dtype=np.int32), - "2p/flags": np.zeros(nobj, dtype=np.int32), - "2m/flags": np.zeros(nobj, dtype=np.int32), - # we use weights of one for everything for metacal - # if that ever changes we may also need to add - # weight_1p, etc. - "00/weight": np.ones(nobj), - "1p/weight": np.ones(nobj), - "1m/weight": np.ones(nobj), - "2p/weight": np.ones(nobj), - "2m/weight": np.ones(nobj), - } + output["T"] = half_light_radius_to_trace(size_hlr) return output - def make_mock_metacal(self, data, photo): - """ - I copied the old version here to use if we do not want metadetect. - Generate a mock metacal table. - This is a long and complicated function unfortunately. - Throughout we assume a single R = R11 = R22, with R12 = R21 = 0 - - Parameters - ---------- - data: dict - Dictionary of arrays read from the input cosmo catalog - photo: dict - Dictionary of arrays generated to simulate photometry - """ - - # These are the numbers from figure F1 of the DES Y1 shear catalog paper - # (this version is not yet public but is awaiting a second referee response) - - # Overall SNR for the three bands usually used for shape measurement - # We use the true SNR not the estimated one, though these are pretty close - snr = (photo["snr_r"] ** 2 + photo["snr_i"] ** 2 + photo["snr_z"] ** 2) ** 0.5 - snr_1p = (photo["snr_r_1p"] ** 2 + photo["snr_i_1p"] ** 2 + photo["snr_z_1p"] ** 2) ** 0.5 - snr_1m = (photo["snr_r_1m"] ** 2 + photo["snr_i_1m"] ** 2 + photo["snr_z_1m"] ** 2) ** 0.5 - snr_2p = (photo["snr_r_2p"] ** 2 + photo["snr_i_2p"] ** 2 + photo["snr_z_2p"] ** 2) ** 0.5 - snr_2m = (photo["snr_r_2m"] ** 2 + photo["snr_i_2m"] ** 2 + photo["snr_z_2m"] ** 2) ** 0.5 - - if self.config["unit_response"]: - assert np.allclose(snr, snr_1p) - assert np.allclose(snr, snr_2m) - - nobj = snr.size - - log10_snr = np.log10(snr) - - # Convert from the half-light radius which is in the - # input catalogs to a sigma. Do this by pretending - # that it is a Gaussian. This is clearly wrong, and - # if this causes major errors in the size cuts we may - # have to modify this - size_hlr = data["size_true"] - size_sigma = size_hlr / np.sqrt(2 * np.log(2)) - size_T = 2 * size_sigma**2 - - # Use a fixed PSF across all the objects - psf_fwhm = 0.75 - psf_sigma = psf_fwhm / (2 * np.sqrt(2 * np.log(2))) - psf_T = 2 * psf_sigma**2 - - if self.config["unit_response"]: - R = 1.0 - R_size = 0.0 - - else: - # Use the response model to get a reasonable response - # value for this size and SNR - R_mean = self.R_spline(log10_snr, size_T, grid=False) - R_std = self.Rstd_spline(log10_snr, size_T, grid=False) - - # Assume a 0.2 correlation between the size response - # and the shear response. - rho = 0.2 - f = np.random.multivariate_normal([0.0, 0.0], [[1.0, rho], [rho, 1.0]], nobj).T - R, R_size = f * R_std + R_mean - # Convert magnitudes to fluxes according to the baseline - # use in the metacal numbers - flux_r = 10**0.4 * (27 - photo["mag_r"]) - flux_i = 10**0.4 * (27 - photo["mag_i"]) - flux_z = 10**0.4 * (27 - photo["mag_z"]) - - # Note that this is delta_gamma not 2*delta_gamma, because - # of how we use it below - delta_gamma = 0.01 - - # Use a fixed shape noise per component to generate - # an overall - if self.config["add_shape_noise"]: - shape_noise = 0.26 - else: - shape_noise = 0.0 - - eps = np.random.normal(0, shape_noise, nobj) + 1.0j * np.random.normal(0, shape_noise, nobj) - # True shears without shape noise - g1 = data["shear_1"] - g2 = data["shear_2"] - - if self.config["flip_g2"]: - g2 *= -1 - - # Do the full combination of (g,epsilon) -> e, not the approx one - g = g1 + 1j * g2 - e = (eps + g) / (1 + g.conj() * eps) - e1 = e.real - e2 = e.imag - - zero = np.zeros(nobj) - # Now collect together everything to go into the metacal - # file - output = { - # Basic values - "id": photo["id"], - "ra": photo["ra"], - "dec": photo["dec"], - # Keep the truth value just in case - "true_g1": g1, - "true_g2": g2, - # add true redshift since it is used in source selector - "redshift_true": photo["redshift_true"], - # g1 - "mcal_g1": e1 * R, - "mcal_g1_1p": (e1 + delta_gamma) * R, - "mcal_g1_1m": (e1 - delta_gamma) * R, - "mcal_g1_2p": e1 * R, - "mcal_g1_2m": e1 * R, - # g2 - "mcal_g2": e2 * R, - "mcal_g2_1p": e2 * R, - "mcal_g2_1m": e2 * R, - "mcal_g2_2p": (e2 + delta_gamma) * R, - "mcal_g2_2m": (e2 - delta_gamma) * R, - # T - "mcal_T": size_T, - "mcal_T_1p": size_T + R_size * delta_gamma, - "mcal_T_1m": size_T - R_size * delta_gamma, - "mcal_T_2p": size_T + R_size * delta_gamma, - "mcal_T_2m": size_T - R_size * delta_gamma, - # Terr - "mcal_T_err": zero, - "mcal_T_err_1p": zero, - "mcal_T_err_1m": zero, - "mcal_T_err_2p": zero, - "mcal_T_err_2m": zero, - # size - "mcal_s2n": snr, - "mcal_s2n_1p": snr_1p, - "mcal_s2n_1m": snr_1m, - "mcal_s2n_2p": snr_2p, - "mcal_s2n_2m": snr_2m, - # Magntiudes and fluxes, just copied from the inputs. - "mcal_mag_r": photo["mag_r"], - "mcal_mag_i": photo["mag_i"], - "mcal_mag_z": photo["mag_z"], - "mcal_mag_err_r": photo["mag_r_err"], - "mcal_mag_err_i": photo["mag_i_err"], - "mcal_mag_err_z": photo["mag_z_err"], - "mcal_mag_r_1p": photo["mag_r_1p"], - "mcal_mag_r_2p": photo["mag_r_2p"], - "mcal_mag_r_1m": photo["mag_r_1m"], - "mcal_mag_r_2m": photo["mag_r_2m"], - "mcal_mag_i_1p": photo["mag_i_1p"], - "mcal_mag_i_2p": photo["mag_i_2p"], - "mcal_mag_i_1m": photo["mag_i_1m"], - "mcal_mag_i_2m": photo["mag_i_2m"], - "mcal_mag_z_1p": photo["mag_z_1p"], - "mcal_mag_z_2p": photo["mag_z_2p"], - "mcal_mag_z_1m": photo["mag_z_1m"], - "mcal_mag_z_2m": photo["mag_z_2m"], - "mcal_mag_err_r_1p": photo["mag_r_err"], - "mcal_mag_err_r_2p": photo["mag_r_err"], - "mcal_mag_err_r_1m": photo["mag_r_err"], - "mcal_mag_err_r_2m": photo["mag_r_err"], - "mcal_mag_err_i_1p": photo["mag_i_err"], - "mcal_mag_err_i_2p": photo["mag_i_err"], - "mcal_mag_err_i_1m": photo["mag_i_err"], - "mcal_mag_err_i_2m": photo["mag_i_err"], - "mcal_mag_err_z_1p": photo["mag_z_err"], - "mcal_mag_err_z_2p": photo["mag_z_err"], - "mcal_mag_err_z_1m": photo["mag_z_err"], - "mcal_mag_err_z_2m": photo["mag_z_err"], - # Fixed PSF parameters - all round with same size - "mcal_psf_g1": zero, - "mcal_psf_g2": zero, - "psf_g1": zero, - "psf_g2": zero, - "mcal_psf_T_mean": np.repeat(psf_T, nobj), - # Everything that gets this far should be used, so flag=0 - "mcal_flags": np.zeros(nobj, dtype=np.int32), - "mcal_flags_1p": np.zeros(nobj, dtype=np.int32), - "mcal_flags_1m": np.zeros(nobj, dtype=np.int32), - "mcal_flags_2p": np.zeros(nobj, dtype=np.int32), - "mcal_flags_2m": np.zeros(nobj, dtype=np.int32), - # we use weights of one for everything for metacal - # if that ever changes we may also need to add - # weight_1p, etc. - "weight": np.ones(nobj), - } - - return output - - def apply_magnitude_cut(self, data): - """ - Allow for a cut in absolute magnitude. - """ - mag_limit = self.config["Mag_r_limit"] - sel = data["Mag_true_r_sdss_z0"] < mag_limit - - ndet = sel.sum() - ntot = sel.size - fract = ndet * 100.0 / ntot - print(f"{ndet} objects pass magnitude cut out of {ntot} objects ({fract:.1f}%)") - - # Remove all objects not selected - for key in list(data.keys()): - data[key] = data[key][sel] - - def remove_undetected(self, data, photo): - """ - Strip out any undetected objects from the two - simulated data sets. - - Use a configuration parameter snr_limit to decide - on the detection limit. - """ - snr_limit = self.config["snr_limit"] - - # This will become a boolean array in a minute when - # we OR it with an array - detected = False - - # Check if detected in any band. Makes a boolean array - # Even though we started with just a single False. - for band in self.bands: - detected_in_band = photo[f"snr_{band}"] > snr_limit - not_detected_in_band = ~detected_in_band - # Set objects not detected in one band that are detected in another - # to inf magnitude in that band, and the SNR to zero. - # We have to do this for each of the variants also, because otherwise - # we end up with wildly different final SNR values later. - # This is the metadetection issue really! - for v in metacal_variants(f"snr_{band}"): - photo[v][not_detected_in_band] = 0.0 - for v in metacal_variants(f"mag_{band}"): - photo[v][not_detected_in_band] = np.inf - - # Record that we have detected this object at all - detected |= detected_in_band - - # the protoDC2 sims have an edge with zero shear. - # Remove it. - zero_shear_edge = (abs(data["shear_1"]) == 0) & (abs(data["shear_2"]) == 0) - print("Removing {} objects with identically zero shear in both terms".format(zero_shear_edge.sum())) - - detected &= ~zero_shear_edge - - # Print out interesting information - ndet = detected.sum() - ntot = detected.size - fract = ndet * 100.0 / ntot - print(f"Detected {ndet} out of {ntot} objects ({fract:.1f}%)") - - # Remove all objects not detected in *any* band - # make a copy of the keys with list(photo.keys()) so we are not - # modifying during the iteration - for key in list(photo.keys()): - photo[key] = photo[key][detected] - - for key in list(data.keys()): - data[key] = data[key][detected] - - -class TXBuzzardMock(TXCosmoDC2Mock): +class TXIngestCosmoDC2(TXIngestSkySim): + """Ingest CosmoDC2 data for TXPipe processing. """ - Simulate mock photometry from Buzzard. - - May be obsolete. - """ - - name = "TXBuzzardMock" - parallel = False - inputs = [("response_model", HDFFile)] - - outputs = [ - ("shear_catalog", ShearCatalog), - ("photometry_catalog", HDFFile), - ] - - config_options = { - "cat_name": StageParameter(str, "buzzard", msg="Name of the mock catalog to use."), - "visits_per_band": StageParameter(int, 165, msg="Number of visits per band for noise simulation."), - "snr_limit": StageParameter(float, 4.0, msg="S/N limit for object selection."), - "max_size": StageParameter(int, 99999999999999, msg="Maximum catalog size for testing."), - "extra_cols": StageParameter(str, "", msg="Extra columns to include (space-separated)."), - "max_npix": StageParameter(int, 99999999999999, msg="Maximum number of pixels."), - "unit_response": StageParameter(bool, False, msg="Whether to use unit response in simulation."), - "flip_g2": StageParameter(bool, True, msg="Whether to flip g2 sign to match conventions."), + name = "TXIngestCosmoDC2" + config_options = TXIngestSkySim.config_options | { + "cat_name": StageParameter(str, default="cosmoDC2", msg="Name of the GCR catalog to use"), } -class TXGaussianSimsMock(TXCosmoDC2Mock): +class TXIngestBuzzard(TXIngestSkySim): + """Ingest Buzzard data for TXPipe processing. """ - Simulate mock photometry from gaussian simulations - - This stage simulates metacal data and metacalibrated - photometry measurements, starting from simple Gaussian simulations - produced starting from CCL power spectra and poission sampling galaxies - from it. - - This is mainly useful for testing infrastructure - starting from a purer simulation. - """ - - name = "TXGaussianSimsMock" - parallel = False - inputs = [("response_model", HDFFile)] - - outputs = [ - ("shear_catalog", ShearCatalog), - ("photometry_catalog", HDFFile), - ] - - config_options = { - "cat_name": StageParameter(str, "GaussianSims", msg="Name of the Gaussian simulation catalog."), - "visits_per_band": StageParameter(int, 165, msg="Number of visits per band for noise simulation."), - "snr_limit": StageParameter(float, 0.0, msg="S/N limit for object selection (0 for all)."), - "max_size": StageParameter(int, 99999999999999, msg="Maximum catalog size for testing."), - "extra_cols": StageParameter(str, "", msg="Extra columns to include (space-separated)."), - "max_npix": StageParameter(int, 99999999999999, msg="Maximum number of pixels."), - "unit_response": StageParameter(bool, True, msg="Whether to use unit response in simulation."), - "cat_size": StageParameter(int, 0, msg="Catalog size (0 for all)."), - "flip_g2": StageParameter(bool, False, msg="Whether to flip g2 sign to match conventions."), - "apply_mag_cut": StageParameter(bool, False, msg="Apply magnitude cut for descqa comparison."), - "metadetect": StageParameter( - bool, True, msg="Whether to make a metadetect-style catalog (True) or metacal (False)." - ), - "add_shape_noise": StageParameter(bool, False, msg="Whether to add shape noise to simulation."), + name = "TXIngestBuzzard" + config_options = TXIngestSkySim.config_options | { + "cat_name": StageParameter(str, default="buzzard", msg="Name of the GCR catalog to use"), } - def data_iterator(self, cat): - # all cols we need - ( - ra, - dec, - g1, - g2, - z, - m_u, - m_g, - m_r, - m_i, - m_z, - m_y, - etrue1, - etrue2, - size, - galaxy_id, - ) = cat - - # figuring out number of chunks - chunk_size = 1_000_000 - nchunk = len(ra) // chunk_size - if nchunk * chunk_size < len(ra): - nchunk += 1 - - # main loop - for i in range(nchunk): - # start and end index of this chunk in full data - s = chunk_size * i - e = s + chunk_size - - # make dict just for this chunk of data - data_chunk = {} - data_chunk["ra"] = ra[s:e] - data_chunk["dec"] = dec[s:e] - data_chunk["shear_1"] = g1[s:e] - data_chunk["shear_2"] = g2[s:e] - data_chunk["size_true"] = size[s:e] - data_chunk["galaxy_id"] = galaxy_id[s:e] - data_chunk["redshift_true"] = z[s:e] - data_chunk["ellipticity_1_true"] = etrue1[s:e] - data_chunk["ellipticity_2_true"] = etrue2[s:e] - data_chunk["mag_true_u_lsst"] = m_u[s:e] - data_chunk["mag_true_g_lsst"] = m_g[s:e] - data_chunk["mag_true_r_lsst"] = m_r[s:e] - data_chunk["mag_true_i_lsst"] = m_i[s:e] - data_chunk["mag_true_z_lsst"] = m_z[s:e] - data_chunk["mag_true_y_lsst"] = m_y[s:e] - - # send this chunk of data back to caller - yield data_chunk - def load_catalog(self): - """ - This method loads the Gaussian sims produced in this notebook - https://github.com/LSSTDESC/txpipe-cosmodc2/blob/master/notebooks/MaskAreaGaussianSims_and_FakeTXPipeInput.ipynb - """ - cat_name = self.config["cat_name"] - self.bands = ("u", "g", "r", "i", "z", "y") - print(f"Loading from catalog {cat_name}") - - cat = np.load(cat_name, allow_pickle=True) - - N = len(cat[0]) - - return cat, N - - -def make_mock_photometry(n_visit, bands, data, unit_response): - """ - Generate a mock photometric table with noise added - - This is mostly from LSE‐40 by - Zeljko Ivezic, Lynne Jones, and Robert Lupton - retrieved here: - http://faculty.washington.edu/ivezic/Teaching/Astr511/LSST_SNRdoc.pdf - """ - - output = {} - nobj = data["galaxy_id"].size - output["ra"] = data["ra"] - output["dec"] = data["dec"] - output["id"] = data["galaxy_id"] - output["extendedness"] = np.ones(nobj) - - # Sky background, seeing FWHM, and system throughput, - # all from table 2 of Ivezic, Jones, & Lupton - B_b = np.array([85.07, 467.9, 1085.2, 1800.3, 2775.7, 3614.3]) - fwhm = np.array([0.77, 0.73, 0.70, 0.67, 0.65, 0.63]) - T_b = np.array([0.0379, 0.1493, 0.1386, 0.1198, 0.0838, 0.0413]) - - # effective pixels size for a Gaussian PSF, from equation - # 27 of Ivezic, Jones, & Lupton - pixel = 0.2 # arcsec - N_eff = 2.436 * (fwhm / pixel) ** 2 - - # other numbers from Ivezic, Jones, & Lupton - sigma_inst2 = 10.0**2 # instrumental noise in photons per pixel, just below eq 42 - gain = 1 # ADU units per photon, also just below eq 42 - D = 6.5 # primary mirror diameter in meters, from LSST key numbers page (effective clear diameter) - # Not sure what effective clear diameter means but it's closer to the number used in the paper - time = 30.0 # seconds per exposure, from LSST key numbers page - sigma_b2 = 0.0 # error on background, just above eq 42 - - # combination of these used below, from various equations - factor = 5455.0 / gain * (D / 6.5) ** 2 * (time / 30.0) - - # Fake some metacal responses - if unit_response: - mag_responses = [0.0 for i in bands] - else: - mag_responses = generate_mock_metacal_mag_responses(bands, nobj) - - delta_gamma = 0.01 # this is the half-delta gamma, i.e. gamma_+ - gamma_0 - # that's the right thing to use here because we are doing m+ = m0 + dm/dy*dy - # Use the same response for gamma1 and gamma2 - - for band, b_b, t_b, n_eff, mag_resp in zip(bands, B_b, T_b, N_eff, mag_responses): - # truth magnitude - mag = data[f"mag_true_{band}_lsst"] - output[f"mag_true_{band}_lsst"] = mag - - # expected signal photons, over all visits - c_b = factor * 10 ** (0.4 * (25 - mag)) * t_b * n_visit - - # expected background photons, over all visits - background = np.sqrt((b_b + sigma_inst2 + sigma_b2) * n_eff * n_visit) - # total expected photons - mu = c_b + background - - # Observed number of photons in excess of the expected background. - # This can go negative for faint magnitudes, indicating that the object is - # not going to be detected - n_obs = np.random.poisson(mu) - background - n_obs_err = np.sqrt(mu) - - # signal to noise, true and estimated values - true_snr = c_b / background - obs_snr = n_obs / background - - # observed magnitude from inverting c_b expression above - mag_obs = 25 - 2.5 * np.log10(n_obs / factor / t_b / n_visit) - - # converting error on n_obs to error on mag - mag_err = 2.5 / np.log(10.0) / obs_snr - - output[f"true_snr_{band}"] = true_snr - output[f"snr_{band}"] = obs_snr - output[f"mag_{band}"] = mag_obs - output[f"mag_err_{band}"] = mag_err - output[f"mag_{band}_err"] = mag_err - - m = mag_resp * delta_gamma - - m1 = mag_obs + m - m2 = mag_obs - m - mag_obs_1p = m1 - mag_obs_1m = m2 - - output[f"mag_{band}_1p"] = m1 - output[f"mag_{band}_1m"] = m2 - output[f"mag_{band}_2p"] = m1 - output[f"mag_{band}_2m"] = m2 - - # Scale the SNR values according the to change in magnitude.r - s = np.power(10.0, -0.4 * m) - - snr1 = obs_snr * s - snr2 = obs_snr / s - output[f"snr_{band}_1p"] = snr1 - output[f"snr_{band}_1m"] = snr2 - output[f"snr_{band}_2p"] = snr1 - output[f"snr_{band}_2m"] = snr2 - - return output - - -def generate_mock_metacal_mag_responses(bands, nobj): - nband = len(bands) - mu = np.zeros(nband) # seems approx mean of response across bands, from HSC tract - rho = 0.25 # approx correlation between response in bands, from HSC tract - sigma2 = 1.7**2 # approx variance of response, from HSC tract - covmat = np.full((nband, nband), rho * sigma2) - np.fill_diagonal(covmat, sigma2) - mag_responses = np.random.multivariate_normal(mu, covmat, nobj).T - return mag_responses class TXSimpleMock(PipelineStage): @@ -1264,21 +335,3 @@ def run(self): q = qp.Ensemble(qp.interp, data=dict(xvals=zgrid, yvals=pdfs)) q.set_ancil(dict(zmode=redshifts, zmean=redshifts, zmedian=redshifts)) q.write_to(self.get_output("photoz_pdfs")) - - -def test(): - import pylab - - data = { - "ra": None, - "dec": None, - "galaxy_id": None, - } - bands = "ugrizy" - n_visit = 165 - M5 = [24.22, 25.17, 24.74, 24.38, 23.80] - for b, m5 in zip(bands, M5): - data[f"mag_true_{b}_lsst"] = np.repeat(m5, 10000) - results = make_mock_photometry(n_visit, bands, data, True) - pylab.hist(results["snr_r"], bins=50, histtype="step") - pylab.savefig("snr_r.png") diff --git a/txpipe/maps.py b/txpipe/maps.py index cc766f540..0346cf7d1 100644 --- a/txpipe/maps.py +++ b/txpipe/maps.py @@ -192,6 +192,7 @@ def run(self): output = {} for i in bins: + bin_output = {} # These don't actually load all the data - everything is lazy ra = da.from_array(f[f"shear/bin_{i}/ra"], block_size) block_size = ra.chunksize @@ -209,17 +210,22 @@ def run(self): i = "2D" # Save all the stuff we want here. - output[f"count_{i}"] = count_map - output[f"g1_{i}"] = g1_map - output[f"g2_{i}"] = g2_map - output[f"lensing_weight_{i}"] = weight_map - output[f"count_{i}"] = count_map - output[f"var_e_{i}"] = esq_map - output[f"var_g1_{i}"] = var1_map - output[f"var_g2_{i}"] = var2_map + bin_output[f"count_{i}"] = count_map + bin_output[f"g1_{i}"] = g1_map + bin_output[f"g2_{i}"] = g2_map + bin_output[f"lensing_weight_{i}"] = weight_map + bin_output[f"count_{i}"] = count_map + bin_output[f"var_e_{i}"] = esq_map + bin_output[f"var_g1_{i}"] = var1_map + bin_output[f"var_g2_{i}"] = var2_map + + bin_output, = dask.compute(bin_output) + output.update(bin_output) + + f.close() # mask is where a pixel is hit in any of the tomo bins - mask = da.zeros(npix, dtype=bool) + mask = np.zeros(npix, dtype=bool) for i in bins: if i == "all": i = "2D" @@ -229,8 +235,6 @@ def run(self): # Everything above is lazy - this forces the computation. # It works out an efficient (we hope) way of doing everything in parallel - (output,) = dask.compute(output) - f.close() # collate metadata metadata = {key: self.config[key] for key in map_config_options} diff --git a/txpipe/masks.py b/txpipe/masks.py index e189e9e39..9c9d62015 100644 --- a/txpipe/masks.py +++ b/txpipe/masks.py @@ -257,7 +257,7 @@ def run(self): "You have specified both map_file and map_files, pick one" ) - if self.config["supreme_map_file"] is not "none": + if self.config["supreme_map_file"] != "none": fracdet = self.compute_fracdet_from_hsp(metadata) else: fracdet = self.compute_fracdet_from_hsp_list(metadata) diff --git a/txpipe/source_selector.py b/txpipe/source_selector.py index 8394c9a1c..4087ce2ca 100755 --- a/txpipe/source_selector.py +++ b/txpipe/source_selector.py @@ -6,6 +6,7 @@ TomographyCatalog, HDFFile, TextFile, + PickleFile, ) from .utils import SourceNumberDensityStats, rename_iterated from .utils.calibration_tools import read_shear_catalog_type @@ -31,7 +32,6 @@ import numpy as np import warnings - class BinStats: """ This is a small helper class to store and write the statistics of a @@ -81,7 +81,7 @@ def write_to(self, outfile, i): group["sigma_e_2d"][:] = self.sigma_e else: group["counts"][i] = self.source_count - group["N_eff"][i] = self.N_eff + group["N_eff"][i] = self.N_eff group["mean_e1"][i] = self.mean_e[0] group["mean_e2"][i] = self.mean_e[1] group["sigma_e"][i] = self.sigma_e @@ -89,6 +89,51 @@ def write_to(self, outfile, i): self.calibrator.save(outfile, i) +class TXSourceTomography(PipelineStage): + name = "TXSourceTomography" + inputs = [ + ("spectroscopic_catalog", HDFFile), + ] + outputs = [ + ("shear_tomography_classifier", PickleFile), + ] + config_options = { + "bands": StageParameter(list, ["r", "i", "z"], msg="Bands from the catalog to use for selection"), + "spec_mag_column_format": StageParameter(str, "photometry/{band}", msg="Format string for spectroscopic magnitude columns"), + "spec_redshift_column": StageParameter(str, "photometry/redshift", msg="Column name for spectroscopic redshifts"), + "source_zbin_edges": StageParameter(list, required=True, msg="Redshift bin edges for source tomography"), + "random_seed": StageParameter(int, 42, msg="Random seed for reproducibility"), + } + + def run(self): + bands = self.config['bands'] + with self.open_input("spectroscopic_catalog") as spec_file: + training_data = read_training_data( + spec_file, + bands, + self.config["spec_mag_column_format"], + self.config["spec_redshift_column"], + ) + + + classifier, features = build_tomographic_classifier( + bands, + training_data, + self.config["source_zbin_edges"], + self.config["random_seed"], + self.comm, + ) + + with self.open_output("shear_tomography_classifier", wrapper=True) as outfile: + pickle_data = { + "classifier": classifier, + "features": features, + "bands": bands, + "source_zbin_edges": self.config["source_zbin_edges"], + } + outfile.write(pickle_data) + + class TXSourceSelectorBase(PipelineStage): """ Base stage for source selection using S/N, size, and flag cuts and tomography @@ -120,6 +165,7 @@ class TXSourceSelectorBase(PipelineStage): inputs = [ ("shear_catalog", ShearCatalog), + ("shear_tomography_classifier", PickleFile), ("spectroscopic_catalog", HDFFile), ] @@ -128,19 +174,12 @@ class TXSourceSelectorBase(PipelineStage): config_options = { "input_pz": StageParameter(bool, False, msg="Whether to use input photo-z posteriors"), "true_z": StageParameter(bool, False, msg="Whether to use true redshifts instead of photo-z"), - "bands": StageParameter(str, "riz", msg="Bands from the catalog to use for selection"), + "bands": StageParameter(list, ["r", "i", "z"], msg="Bands from the catalog to use for selection"), "verbose": StageParameter(bool, False, msg="Whether to print verbose output"), "T_cut": StageParameter(float, required=True, msg="Size cut threshold for object selection"), "s2n_cut": StageParameter(float, required=True, msg="Signal-to-noise cut threshold for object selection"), "chunk_rows": StageParameter(int, 10000, msg="Number of rows to process in each chunk"), "source_zbin_edges": StageParameter(list, required=True, msg="Redshift bin edges for source tomography"), - "random_seed": StageParameter(int, 42, msg="Random seed for reproducibility"), - "spec_mag_column_format": StageParameter( - str, "photometry/{band}", msg="Format string for spectroscopic magnitude columns" - ), - "spec_redshift_column": StageParameter( - str, "photometry/redshift", msg="Column name for spectroscopic redshifts" - ), } def run(self): @@ -151,7 +190,8 @@ def run(self): # accidentally doing so we give a clear message if they try. if self.name == "TXSourceSelector": raise ValueError( - "Do not use the class TXSourceSelector any more. Use one of the subclasses like TXSourceSelectorMetacal" + "Do not use the class TXSourceSelector any more. " + "Use one of the subclasses like TXSourceSelectorMetacal" ) # Suppress some warnings from numpy that are not relevant @@ -173,33 +213,22 @@ def run(self): # Build a classifier used to put objects into tomographic bins if not (self.config["input_pz"] or self.config["true_z"]): - with self.open_input("spectroscopic_catalog") as spec_file: - training_data = read_training_data( - spec_file, - bands, - self.config["spec_mag_column_format"], - self.config["spec_redshift_column"], - ) + classifier, features = self.read_tomography_classifier() - classifier, features = build_tomographic_classifier( - bands, - training_data, - self.config["source_zbin_edges"], - self.config["random_seed"], - self.comm, - ) # We will collect the selection biases for each bin # as a matrix. We will collect together the different # matrices for each chunk and do a weighted average at the end. nbin_source = len(self.config["source_zbin_edges"]) - 1 - number_density_stats = SourceNumberDensityStats(nbin_source, comm=self.comm, shear_type=shear_catalog_type) + number_density_stats = SourceNumberDensityStats( + nbin_source, comm=self.comm, shear_type=shear_catalog_type + ) calculators = self.setup_response_calculators(nbin_source) # Loop through the input data, processing it chunk by chunk - for start, end, shear_data in it: + for (start, end, shear_data) in it: print(f"Process {self.rank} running selection for rows {start:,}-{end:,}") # Either apply a simple z cut if we have an input PZ estimate or @@ -218,7 +247,9 @@ def run(self): ) # Combine this selection with size and snr cuts to produce a source selection # and calculate the shear bias it would generate - tomo_bin, R, counts = self.calculate_tomography(pz_data, shear_data, calculators) + tomo_bin, R, counts = self.calculate_tomography( + pz_data, shear_data, calculators + ) # Save the tomography for this chunk self.write_tomography(output_file, start, end, tomo_bin, R) @@ -236,6 +267,32 @@ def run(self): # Restore the original warning settings in case we are being called from a library np.seterr(**original_warning_settings) + def read_tomography_classifier(self): + # Read the tomography classifier from file + with self.open_input("shear_tomography_classifier", wrapper=True) as infile: + pickle_data = infile.read() + + # Check that the tomographer used the same configuration + # as we have here. + if not pickle_data["bands"] == self.config["bands"]: + raise ValueError( + "Bands used in tomography classifier do not match those " + "in source selector configuration." + ) + # Also check bin edges are close enough + if not np.allclose(pickle_data["source_zbin_edges"], self.config["source_zbin_edges"]): + raise ValueError( + "Source redshift bin edges used in tomography classifier do not match those " + "in source selector configuration." + ) + + # This is all that is actually needed in this class + classifier = pickle_data["classifier"] + features = pickle_data["features"] + return classifier, features + + + def apply_simple_redshift_cut(self, shear_data): pz_data = {} if self.config["input_pz"]: @@ -245,7 +302,9 @@ def apply_simple_redshift_cut(self, shear_data): pz_data_bin = np.zeros(len(zz), dtype=int) - 1 for zi in range(len(self.config["source_zbin_edges"]) - 1): - mask_zbin = (zz >= self.config["source_zbin_edges"][zi]) & (zz < self.config["source_zbin_edges"][zi + 1]) + mask_zbin = (zz >= self.config["source_zbin_edges"][zi]) & ( + zz < self.config["source_zbin_edges"][zi + 1] + ) pz_data_bin[mask_zbin] = zi return {"zbin": pz_data_bin} @@ -315,7 +374,7 @@ def setup_output(self): output = self.open_output("shear_tomography_catalog", parallel=True, wrapper=True) outfile = output.file group = outfile.create_group("tomography") - group.attrs["catalog_type"] = cat_type + group.attrs['catalog_type'] = cat_type output.write_zbins(zbins) group.create_dataset("bin", (n,), dtype="i") @@ -417,11 +476,10 @@ def select_2d(self, data, calling_from_select=False): verbose = self.config["verbose"] variant = data.suffix - shear_prefix = self.config["shear_prefix"] - s2n = data[f"{shear_prefix}s2n{variant}"] - T = data[f"{shear_prefix}T{variant}"] - Tpsf = data[f"{shear_prefix}psf_T_mean"] - flag = data[f"{shear_prefix}flags{variant}"] + s2n = data[f"s2n{variant}"] + T = data[f"T{variant}"] + Tpsf = data[f"psf_T"] + flag = data[f"flags{variant}"] # Apply our cuts. We keep track of the number of objects # reject by each cut in case it's important. @@ -449,11 +507,15 @@ def select_2d(self, data, calling_from_select=False): # as above if verbose and calling_from_select: print( - f"Tomo selection ({variant}) {f1:.2%} flag, {f2:.2%} size, {f3:.2%} SNR, ", + f"Tomo selection ({variant}) {f1:.2%} flag, {f2:.2%} size, " + f"{f3:.2%} SNR, ", end="", ) elif verbose: - print(f"2D selection ({variant}) {f1:.2%} flag, {f2:.2%} size, {f3:.2%} SNR, {f4:.2%} any z bin") + print( + f"2D selection ({variant}) {f1:.2%} flag, {f2:.2%} size, " + f"{f3:.2%} SNR, {f4:.2%} any z bin" + ) print("total 2D", sel.sum()) return sel @@ -473,11 +535,10 @@ class TXSourceSelectorMetacal(TXSourceSelectorBase): config_options = { **TXSourceSelectorBase.config_options, "delta_gamma": StageParameter(float, required=True, msg="Delta gamma value for metacal response calculation"), - "use_diagonal_response": StageParameter( - bool, False, msg="Whether to use only diagonal elements of the response matrix" - ), + "use_diagonal_response": StageParameter(bool, False, msg="Whether to use only diagonal elements of the response matrix") } + # The main differences between the classes are to do with how the data is read # and what output response values are generated. def data_iterator(self): @@ -488,9 +549,13 @@ def data_iterator(self): just choosing which columns to read. """ bands = self.config["bands"] - shear_cols = metacal_variants("mcal_T", "mcal_s2n", "mcal_g1", "mcal_g2", "mcal_flags", "weight") - shear_cols += ["ra", "dec", "mcal_psf_T_mean"] - shear_cols += band_variants(bands, "mcal_mag", "mcal_mag_err", shear_catalog_type="metacal") + shear_cols = metacal_variants( + "mcal_T", "mcal_s2n", "mcal_g1", "mcal_g2", "mcal_flags", "weight" + ) + shear_cols += ["ra", "dec", "psf_T"] + shear_cols += band_variants( + bands, "mcal_mag", "mcal_mag_err", shear_catalog_type="metacal" + ) if self.config["input_pz"]: shear_cols += metacal_variants("mean_z") @@ -531,8 +596,10 @@ def setup_output(self): def setup_response_calculators(self, nbin_source): delta_gamma = self.config["delta_gamma"] use_diagonal_response = self.config["use_diagonal_response"] - calculators = [MetacalCalculator(self.select, delta_gamma, use_diagonal_response) for i in range(nbin_source)] - calculators.append(MetacalCalculator(self.select_2d, delta_gamma, use_diagonal_response)) + calculators = [ + MetacalCalculator(self.select, delta_gamma,use_diagonal_response) for i in range(nbin_source) + ] + calculators.append(MetacalCalculator(self.select_2d, delta_gamma,use_diagonal_response)) return calculators def write_tomography(self, outfile, start, end, source_bin, R): @@ -618,21 +685,24 @@ class TXSourceSelectorMetadetect(TXSourceSelectorBase): # add one option to the base class configuration config_options = { **TXSourceSelectorBase.config_options, - "delta_gamma": StageParameter( - float, required=True, msg="Delta gamma value for metadetect response calculation" - ), + "delta_gamma": StageParameter(float, required=True, msg="Delta gamma value for metadetect response calculation") } + def data_iterator(self): # As above, this is where we work out which columns we need. chunk_rows = self.config["chunk_rows"] bands = self.config["bands"] # Core quantities we need - shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "mcal_psf_T_mean", "weight", "flags") + shear_cols = metadetect_variants( + "T", "s2n", "g1", "g2", "ra", "dec", "psf_T", "weight", "flags" + ) # Magnitudes and errors - shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="metadetect") + shear_cols += band_variants( + bands, "mag", "mag_err", shear_catalog_type="metadetect" + ) renames = {} # We need truth shears and/or PZ point-estimates for each shear too @@ -642,17 +712,18 @@ def data_iterator(self): shear_cols += ["00/redshift_true"] renames["00/redshift_true"] = "redshift_true" - for prefix in ["00", "1p", "1m", "2p", "2m"]: - renames[f"{prefix}/mcal_psf_T_mean"] = f"{prefix}/psf_T_mean" - # This is a parent ceci.PipelineStage method. # It returns an iterator we loop through - it = self.iterate_hdf("shear_catalog", "shear", shear_cols, chunk_rows, longest=True) + it = self.iterate_hdf( + "shear_catalog", "shear", shear_cols, chunk_rows, longest=True + ) return rename_iterated(it, renames) def setup_response_calculators(self, nbin_source): delta_gamma = self.config["delta_gamma"] - calculators = [MetaDetectCalculator(self.select, delta_gamma) for i in range(nbin_source)] + calculators = [ + MetaDetectCalculator(self.select, delta_gamma) for i in range(nbin_source) + ] calculators.append(MetaDetectCalculator(self.select_2d, delta_gamma)) return calculators @@ -711,7 +782,6 @@ def compute_output_stats(self, calculator, mean, variance): # Like metacal, N_eff = N for metadetect return BinStats(N, Neff, mean_e, sigma_e, calibrator) - class TXSourceSelectorLensfit(TXSourceSelectorBase): """ Source selection and tomography for lensfit catalogs @@ -727,12 +797,11 @@ class TXSourceSelectorLensfit(TXSourceSelectorBase): # add one option to the base class configuration config_options = { **TXSourceSelectorBase.config_options, - "input_m_is_weighted": StageParameter( - bool, required=True, msg="Whether the input m values are already weighted" - ), + "input_m_is_weighted": StageParameter(bool, required=True, msg="Whether the input m values are already weighted"), "dec_cut": StageParameter(bool, True, msg="Whether to apply a declination cut"), } + def data_iterator(self): chunk_rows = self.config["chunk_rows"] bands = self.config["bands"] @@ -748,7 +817,9 @@ def data_iterator(self): "weight", "m", ] - shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="lensfit") + shear_cols += band_variants( + bands, "mag", "mag_err", shear_catalog_type="lensfit" + ) if self.config["input_pz"]: shear_cols += ["mean_z"] elif self.config["true_z"]: @@ -760,7 +831,9 @@ def setup_response_calculators(self, nbin_source): LensfitCalculator(self.select, input_m_is_weighted=self.config["input_m_is_weighted"]) for i in range(nbin_source) ] - calculators.append(LensfitCalculator(self.select_2d, input_m_is_weighted=self.config["input_m_is_weighted"])) + calculators.append( + LensfitCalculator(self.select_2d, input_m_is_weighted=self.config["input_m_is_weighted"]) + ) return calculators def setup_output(self): @@ -781,20 +854,18 @@ def setup_output(self): return outfile def compute_output_stats(self, calculator, mean, variance): - K, C_N, C_S, N, Neff = calculator.collect(self.comm, allgather=True) + K, C_N,C_S, N, Neff = calculator.collect(self.comm, allgather=True) calibrator = LensfitCalibrator(K, C_N, C_S) - mean_e = (C_N + C_S) / 2 + mean_e = (C_N+C_S)/2 sigma_e = np.sqrt((0.5 * (variance[0] + variance[1]))) / (1 + K) return BinStats(N, Neff, mean_e, sigma_e, calibrator) - class TXSourceSelectorSimple(TXSourceSelectorBase): """ Source selection and tomography for mock catalogs that do not require any calibration. """ - name = "TXSourceSelectorSimple" config_options = TXSourceSelectorBase.config_options.copy() @@ -837,6 +908,7 @@ def compute_output_stats(self, calculator, mean, variance): return BinStats(N, Neff, mean_e, sigma_e, calibrator) + class TXSourceSelectorHSC(TXSourceSelectorBase): """ Source selection and tomography for HSC catalogs @@ -853,7 +925,7 @@ class TXSourceSelectorHSC(TXSourceSelectorBase): name = "TXSourceSelectorHSC" config_options = TXSourceSelectorBase.config_options.copy() config_options["max_shear_cut"] = StageParameter(float, 0.0, msg="Maximum shear value for object selection") - + def data_iterator(self): chunk_rows = self.config["chunk_rows"] bands = self.config["bands"] @@ -911,7 +983,10 @@ def write_tomography(self, outfile, start, end, source_bin, R): def compute_per_object_response(self, data): w_tot = np.sum(data["weight"]) - R = np.array([1.0 - np.sum(data["weight"] * data["sigma_e"]) / w_tot] * len(data["weight"])) + R = np.array( + [1.0 - np.sum(data["weight"] * data["sigma_e"]) / w_tot] + * len(data["weight"]) + ) return R def compute_output_stats(self, calculator, mean, variance): @@ -921,8 +996,13 @@ def compute_output_stats(self, calculator, mean, variance): return BinStats(N, Neff, mean, sigma_e, calibrator) def setup_response_calculators(self, nbin_source): - calculators = [HSCCalculator(self.select) for i in range(nbin_source)] - calculators.append(HSCCalculator(self.select_2d)) + calculators = [ + HSCCalculator(self.select) + for i in range(nbin_source) + ] + calculators.append( + HSCCalculator(self.select_2d) + ) return calculators def select_2d(self, data, calling_from_select=False): @@ -943,6 +1023,5 @@ def select_2d(self, data, calling_from_select=False): print(f" after shear cut retain {p:.2f}% of objects") return sel - if __name__ == "__main__": PipelineStage.main() diff --git a/txpipe/utils/conversion.py b/txpipe/utils/conversion.py index bb601e810..87a7557bd 100644 --- a/txpipe/utils/conversion.py +++ b/txpipe/utils/conversion.py @@ -5,7 +5,11 @@ # follow https://pipelines.lsst.io/v/DM-22499/cpp-api/file/_photo_calib_8h.html def nanojansky_to_mag_ab(flux): - return -2.5 * np.log10(flux * 1e-9 / REF_FLUX) + zero = flux == 0 + out = np.empty_like(flux) + out[zero] = np.inf + out[~zero] = -2.5 * np.log10(flux[~zero] * 1e-9 / REF_FLUX) + return out def mag_ab_to_nanojansky(mag): @@ -15,6 +19,11 @@ def mag_ab_to_nanojansky(mag): def nanojansky_err_to_mag_ab(flux, flux_err): return 2.5 / np.log(10) * (flux_err / flux) +def mag_ab_err_to_nanojansky_err(mag, mag_err): + flux = mag_ab_to_nanojansky(mag) + with np.errstate(invalid='ignore'): + flux_err = flux * np.log(10) / 2.5 * mag_err + return flux_err def moments_to_shear(Ixx, Iyy, Ixy): b = Ixx + Iyy + 2 * np.sqrt(Ixx * Iyy - Ixy**2)