diff --git a/txpipe/binning/random_forest.py b/txpipe/binning/random_forest.py index 1929be2f8..c3c4c02bc 100644 --- a/txpipe/binning/random_forest.py +++ b/txpipe/binning/random_forest.py @@ -83,7 +83,7 @@ def apply_classifier(classifier, features, bands, shear_catalog_type, shear_data prefixes = ["", "", "", "", ""] suffixes = ["", "_1p", "_2p", "_1m", "_2m"] elif shear_catalog_type == "metadetect": - prefixes = ["00/", "1p/", "2p/", "1m/", "2m/"] + prefixes = ["ns/", "1p/", "2p/", "1m/", "2m/"] suffixes = ["", "", "", "", ""] else: prefixes = [""] diff --git a/txpipe/calibrate.py b/txpipe/calibrate.py index 37342ce09..90f635812 100644 --- a/txpipe/calibrate.py +++ b/txpipe/calibrate.py @@ -89,9 +89,9 @@ def run(self): # cat_cols is everything we are reading in if cat_type == "metadetect": - cat_cols = cat_cols + [f"00/{c}" for c in extra_cols + mag_cols_in] - mag_cols_in = [f"00/{c}" for c in mag_cols_in] - renames.update({f"00/{c}": c for c in extra_cols}) + cat_cols = cat_cols + [f"ns/{c}" for c in extra_cols + mag_cols_in] + mag_cols_in = [f"ns/{c}" for c in mag_cols_in] + renames.update({f"ns/{c}": c for c in extra_cols}) else: cat_cols = cat_cols + extra_cols + mag_cols_in diff --git a/txpipe/data_types.py b/txpipe/data_types.py index 1d7e33f03..2ffdf3a65 100755 --- a/txpipe/data_types.py +++ b/txpipe/data_types.py @@ -73,29 +73,29 @@ def catalog_type(self): def get_size(self): if self.catalog_type == "metadetect": - return self.file["shear/00/ra"].size + return self.file["shear/ns/ra"].size else: return self.file["shear/ra"].size def get_primary_catalog_group(self): if self.catalog_type == "metadetect": - return "shear/00" + return "shear/ns" else: return "shear" def get_true_redshift_column(self): if self.catalog_type == "metadetect": - return "00/redshift_true" + return "ns/redshift_true" else: return "redshift_true" def get_primary_catalog_names(self, true_shear=False): if true_shear: if self.catalog_type == "metadetect": - shear_cols = ["00/true_g1", "00/true_g2", "00/ra", "00/dec", "00/weight"] + shear_cols = ["ns/true_g1", "ns/true_g2", "ns/ra", "ns/dec", "ns/weight"] rename = {c: c[3:] for c in shear_cols} - rename["00/true_g1"] = "g1" - rename["00/true_g2"] = "g2" + rename["ns/true_g1"] = "g1" + rename["ns/true_g2"] = "g2" else: rename = {"true_g1": "g1", "true_g2": "g2"} rename = {} @@ -106,7 +106,7 @@ def get_primary_catalog_names(self, true_shear=False): shear_cols = ["g1", "g2", "c1", "c2", "ra", "dec", "weight"] rename = {} elif self.catalog_type == "metadetect": - shear_cols = ["00/g1", "00/g2", "00/ra", "00/dec", "00/weight"] + shear_cols = ["ns/g1", "ns/g2", "ns/ra", "ns/dec", "ns/weight"] rename = {c: c[3:] for c in shear_cols} else: shear_cols = ["g1", "g2", "ra", "dec", "weight"] diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index adbb5b1fa..12cefe48f 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -237,7 +237,10 @@ def run(self): "m", ] + [f"mag_{b}" for b in self.config["bands"]] - shear_tomo_cols = ["bin"] + if self.config["shear_catalog_type"] == "metadetect": + shear_tomo_cols = ["bin_ns", "bin_1p", "bin_1m", "bin_2p", "bin_2m"] + else: + shear_tomo_cols = ["bin"] if self.config["shear_catalog_type"] == "metacal": more_iters = ["shear_tomography_catalog", "response", ["R_gamma"]] @@ -292,7 +295,7 @@ def plot_psf_shear(self): delta_gamma = self.config["delta_gamma"] psf_g_edges = self.get_bin_edges("psf_g1") - shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else "" + shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" p1 = MeanShearInBins( f"{shear_prefix}psf_g1", @@ -391,7 +394,7 @@ def plot_psf_size_shear(self): delta_gamma = self.config["delta_gamma"] psf_T_edges = self.get_bin_edges("psf_T_mean") - shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else "" + shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" binnedShear = MeanShearInBins( f"{shear_prefix}psf_T_mean", @@ -451,7 +454,7 @@ def plot_snr_shear(self): # Parameters of the binning in SNR shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" delta_gamma = self.config["delta_gamma"] snr_edges = self.get_bin_edges("s2n") @@ -516,7 +519,7 @@ def plot_size_shear(self): from scipy import stats shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" delta_gamma = self.config["delta_gamma"] T_edges = self.get_bin_edges("T") @@ -580,7 +583,7 @@ def plot_mag_shear(self): from scipy import stats shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" delta_gamma = self.config["delta_gamma"] nbins = self.config["nbins"] @@ -712,16 +715,16 @@ def plot_g_histogram(self): if data is None: break - qual_cut = data["bin"] != -1 + #qual_cut = data["bin"] != -1 if cat_type == "metacal": g1 = data["g1"] g2 = data["g2"] w = data["weight"] elif cat_type == "metadetect": - g1 = data["00/g1"] - g2 = data["00/g2"] - w = data["00/weight"] + g1 = data["ns/g1"] + g2 = data["ns/g2"] + w = data["ns/weight"] elif cat_type == "lensfit": dec = data["dec"] g1 = data["g1"] @@ -790,7 +793,8 @@ def plot_snr_histogram(self): delta_gamma = self.config["delta_gamma"] shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" + bin_type = "bin_ns" if self.config["shear_catalog_type"] == "metadetect" else "bin" bins = 10 edges = np.logspace(1, 3, bins + 1) mids = 0.5 * (edges[1:] + edges[:-1]) @@ -802,7 +806,7 @@ def plot_snr_histogram(self): if data is None: break - qual_cut = data["bin"] != -1 + qual_cut = data[bin_type] != -1 b1 = np.digitize(data[f"{shear_prefix}s2n"][qual_cut], edges) - 1 @@ -979,7 +983,8 @@ def plot_mag_histograms(self): mid = 0.5 * (edges[1:] + edges[:-1]) width = edges[1] - edges[0] bands = self.config["bands"] - shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else "" + shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" + bin_type = "bin_ns" if self.config["shear_catalog_type"] == "metadetect" else "bin" nband = len(bands) full_hists = [np.zeros(size, dtype=int) for b in bands] source_hists = [np.zeros(size, dtype=int) for b in bands] @@ -998,7 +1003,7 @@ def plot_mag_histograms(self): count = w.sum() h1[i] += count - w &= data["bin"] >= 0 + w &= data[bin_type] >= 0 count = w.sum() h2[i] += count diff --git a/txpipe/ingest/__init__.py b/txpipe/ingest/__init__.py index 38573dda4..e3af22f3a 100644 --- a/txpipe/ingest/__init__.py +++ b/txpipe/ingest/__init__.py @@ -11,3 +11,4 @@ ) from .dp1 import TXIngestDataPreview1 from .stage3 import TXIngestDESY3Gold +from .metaDetect import TXIngestRubinMetaDetect diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index c0f3b0128..32f5c08fe 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -64,3 +64,55 @@ def process_shear_data(data): output["psf_g2"] = psf_g2 return output + + +def process_metadetect_data(data): + output = {} + for variant in ["ns", "1p", "1m", "2p", "2m"]: + var_data = data[data["metaStep"] == variant] + var_data = sanitize(var_data) + + var_output = { + "ra": var_data["ra"], + "dec": var_data["dec"], + "id": var_data["shearObjectId"], + "object_mask_fraction": var_data["mfrac"], + #"n_epoch": var_data["nEpochCell"], + "g1": var_data["gauss_g1"], + "g2": var_data["gauss_g2"], + "g1_err": var_data["gauss_g1_g1_Cov"], + "g2_err": var_data["gauss_g2_g2_Cov"], + "g_cross": var_data["gauss_g1_g2_Cov"], + "T": var_data["gauss_T"], + "s2n": var_data["gauss_snr"], + "T_err": var_data["gauss_TErr"], + "psf_g1_original": var_data["psfOriginal_e1"], + "psf_g2_original": var_data["psfOriginal_e2"], + "psf_T_mean_original": var_data["psfOriginal_T"], + "psf_g1": var_data["gauss_psfReconvolved_g1"], + "psf_g2": var_data["gauss_psfReconvolved_g1"], + "psf_T_mean": var_data["gauss_psfReconvolved_T"], + "flags": var_data["gauss_shape_flags"], # TO BE ADDRESSED! + "weight": 1 / (0.5 * (var_data["gauss_g1_g1_Cov"] + var_data["gauss_g2_g2_Cov"])), + } + for band in "gri": # For DP2, we only expect 4 bands + f = var_data[f"{band}_pgaussFlux"] + f_err = var_data[f"{band}_pgaussFluxErr"] + var_output[f"mag_{band}"] = nanojansky_to_mag_ab(f) + var_output[f"mag_err_{band}"] = nanojansky_err_to_mag_ab(f, f_err) + output[f"{variant}"] = var_output + + return output + +def sanitize(data): + """ + Convert unicode arrays into types that h5py can save + """ + # convert unicode to strings + if data.dtype.kind == "U": + data = data.astype("S") + # convert dates to integers + elif data.dtype.kind == "M": + data = data.astype(int) + + return data \ No newline at end of file diff --git a/txpipe/ingest/metaDetect.py b/txpipe/ingest/metaDetect.py new file mode 100644 index 000000000..e3c2b60a5 --- /dev/null +++ b/txpipe/ingest/metaDetect.py @@ -0,0 +1,312 @@ +from ..base_stage import PipelineStage +from ..data_types import ShearCatalog, PhotometryCatalog, HDFFile, FileCollection +from .lsst import process_metadetect_data, sanitize +from ceci.config import StageParameter +from ..utils.hdf_tools import h5py_shorten, repack +from ..utils.splitters import MetaDetectSplitter +import numpy as np +import os +import pyarrow.parquet as pq + +# All TRACT INFORMATION SHOULD BE MOVED ELSEWHERE +DP1_COSMOLOGY_FIELDS = [ + "EDFS", + "ECDFS", + "LGLF", +] + + +DP1_TRACTS = { + # Euclid Deep Field South + "EDFS": [2393, 2234, 2235, 2394], + # Extended Chandra Deep Field South + "ECDFS": [5062, 5063, 5064, 4848, 4849], + # Low Galactic Latitude Field / Rubin_SV_095_-25 + "LGLF": [5305, 5306, 5525, 5526], + # Fornax Dwarf Spheroidal Galaxy + "FDSG": [4016, 4217, 4218, 4017], + # Low Ecliptic Latitude Field / Rubin_SV_38_7 + "LELF": [10464, 10221, 10222, 10704, 10705, 10463], + # Seagull Nebula + "Seagull": [7850, 7849, 7610, 7611], + # 47 Tuc Globular Cluster + "47Tuc": [531, 532, 453, 454], +} + +DP1_COSMOLOGY_TRACTS = sum([DP1_TRACTS[_field] for _field in DP1_COSMOLOGY_FIELDS], []) +ALL_TRACTS = sum(DP1_TRACTS.values(), []) + + +# In case useful later: +DP1_FIELD_CENTERS = { + "47 Tuc Globular Cluster": (6.02, -72.08), + "Low Ecliptic Latitude Field": (37.86, 6.98), + "Fornax Dwarf Spheroidal Galaxy": (40.00, -34.45), + "Extended Chandra Deep Field South": (53.13, -28.10), + "Euclid Deep Field South": (59.10, -48.73), + "Low Galactic Latitude Field": (95.00, -25.00), + "Seagull Nebula": (106.23, -10.51), +} + + +DP1_SURVEY_PROPERTIES = { + "deepCoadd_exposure_time_consolidated_map_sum": "Total exposure time accumulated per sky position (second)", + "deepCoadd_epoch_consolidated_map_min": "Earliest observation epoch (MJD)", + "deepCoadd_epoch_consolidated_map_max": "Latest observation epoch (MJD)", + "deepCoadd_epoch_consolidated_map_mean": "Mean observation epoch (MJD)", + "deepCoadd_psf_size_consolidated_map_weighted_mean": "Weighted mean of PSF characteristic width as computed from the determinant radius (pixel)", + "deepCoadd_psf_e1_consolidated_map_weighted_mean": "Weighted mean of PSF ellipticity component e1", + "deepCoadd_psf_e2_consolidated_map_weighted_mean": "Weighted mean of PSF ellipticity component e2", + "deepCoadd_psf_maglim_consolidated_map_weighted_mean": "Weighted mean of PSF flux 5σ magnitude limit (magAB)", + "deepCoadd_sky_background_consolidated_map_weighted_mean": "Weighted mean of background light level from the sky (nJy)", + "deepCoadd_sky_noise_consolidated_map_weighted_mean": "Weighted mean of standard deviation of the sky level (nJy)", + "deepCoadd_dcr_dra_consolidated_map_weighted_mean": "Weighted mean of DCR-induced astrometric shift in right ascension direction, expressed as a proportionality factor", + "deepCoadd_dcr_ddec_consolidated_map_weighted_mean": "Weighted mean of DCR-induced astrometric shift in declination direction, expressed as a proportionality factor", + "deepCoadd_dcr_e1_consolidated_map_weighted_mean": "Weighted mean of DCR-induced change in PSF ellipticity (e1), expressed as a proportionality factor", + "deepCoadd_dcr_e2_consolidated_map_weighted_mean": "Weighted mean of DCR-induced change in PSF ellipticity (e2), expressed as a proportionality factor", +} + + +class TXIngestRubinMetaDetect(PipelineStage): + """ + Initial ingestion of the Rubin MetaDetect catalog + """ + + name = "TXIngestRubinMetaDetect" + inputs = [] + outputs = [ + ("shear_catalog", ShearCatalog), + ] + config_options= { + "use_butler": StageParameter( + bool, + True, + msg="Should be left on, unless you got an external file, in that case knock yourself out!"), + "butler_config_file": StageParameter( + str, + "/global/cfs/cdirs/lsst/production/gen3/rubin/DP1/repo/butler.yaml", + msg="Path to the LSST butler config file." + ), + "cosmology_tracts_only": StageParameter(bool, True, msg="Use only cosmology tracts."), + "select_field": StageParameter(str, "", msg="Field to select (overrides cosmology_tracts_only)."), + "collections": StageParameter(str, "LSSTComCam/DP1", msg="Butler collections to use."), + "file_path": StageParameter(str, None, msg="if not using a Butler, you need to give a path to the file.") + } + + def run(self): + if self.config["use_butler"]: + self.butler_run() + else: + self.file_run() + + # Run h5repack on the file + print("Repacking files") + repack(self.get_output("shear_catalog")) + + def butler_run(self): + error_msg = ( + "The LSST Science Pipelines are not installed in this environment, " + "or are not configured correctly to access the data. " + "See the note in the file example/dp1/ingest.yml for how to set " + "this up on NERSC." + ) + try: + from lsst.daf.butler import Butler + except: + raise ImportError(error_msg) + + + # Configure and create the butler. There are several ways to do this, + # Here we use a central collective butler yaml file from NERSC. + + butler_config_file = self.config["butler_config_file"] + collections = self.config["collections"] + try: + butler = Butler(butler_config_file, collections=collections) + except: + raise RuntimeError(error_msg) + + if self.config["select_field"]: + tracts = DP1_TRACTS[self.config["select_field"]] + elif self.config["cosmology_tracts_only"]: + tracts = DP1_COSMOLOGY_TRACTS + else: + tracts = ALL_TRACTS + + #n = self.get_catalog_size(butler, "ShearObject") + shear_outfile = self.open_output("shear_catalog") + group = shear_outfile.create_group("shear") + shear_outfile["shear"].attrs["catalog_type"] = "metadetect" + + created_files = False + data_set_refs = butler.query_datasets('object_shear_all') + n_chunks = len(data_set_refs) + input_columns = self.get_input_columns() + + for i, ref in enumerate(data_set_refs): + tract = ref.dataId["tract"] + if tract not in tracts: + print(f"Skipping chunk {i + 1} / {n_chunks} since tract {tract} is not selected") + continue + + d = butler.get('object_shear_all', + dataId=ref.dataId, + parameters={"columns": input_columns} + ) + chunk_size = len(d) + + if chunk_size == 0: + print(f"Skipping chunk {i + 1} / {n_chunks} since it is empty") + continue + + shear_data = process_metadetect_data(d) + if not created_files: + created_files = True + variants = { + "ns": len(shear_data["ns"]), + "1p": len(shear_data["1p"]), + "1m": len(shear_data["1m"]), + "2p": len(shear_data["2p"]), + "2m": len(shear_data["2m"]), + } + columns = list(shear_data["ns"].keys()) + dtypes = {key: shear_data["ns"][key].dtype for key in shear_data["ns"]} + splitter = MetaDetectSplitter(group, columns, variants, dtypes=dtypes) + + for variant in ["ns", "1p", "1m", "2p", "2m"]: + splitter.write_bin(shear_data[variant], variant) + print(f"Processing chunk {i + 1} / {n_chunks}") + + splitter.finish() + shear_outfile.close() + + def file_run(self): + file_path = self.config("file_path") + if file_path is None: + raise RuntimeError("You must either use a butler, or specify a file_path to your metadetect catalog.") + + if not os.path.exists(file_path): + raise RuntimeError("No file where you said it would be.") + + shear_outfile = self.open_output("shear_catalog") + group = shear_outfile.create_group("shear") + shear_outfile["shear"].attrs["catalog_type"] = "metadetect" + + created_files = False + input_columns = self.get_input_columns() + + chunk_size = self.config("chunk_rows") + pf = pq.ParquetFile(file_path) + for batch in pf.iter_batches(columns=input_columns, batch_size=chunk_size): + shear_data = process_metadetect_data(batch) + + if not created_files: + created_files = True + variants = { + "ns": len(shear_data["ns"]), + "1p": len(shear_data["1p"]), + "1m": len(shear_data["1m"]), + "2p": len(shear_data["2p"]), + "2m": len(shear_data["2m"]), + } + columns = list(shear_data["ns"].keys()) + splitter = MetaDetectSplitter(group, columns, variants) + + for variant in ["ns", "1p", "1m", "2p", "2m"]: + data = sanitize(shear_data[variant]) + splitter.write_bin(data, variant) + + splitter.finish() + shear_outfile.close() + + def get_input_columns(self): + input_columns = [ + "shearObjectId", + #"cellId", #removed + 'cell_x', + 'cell_y', + "metaStep", + "ra", + "dec", + "mfrac", + #"maskFractionCell", #Removed + #"nEpochCell", #Removed + "gauss_g1", + "gauss_g2", + "gauss_g1_g1_Cov", + "gauss_g1_g2_Cov", + "gauss_g2_g2_Cov", + "gauss_T", + "gauss_snr", + "gauss_TErr", + "gauss_psfReconvolved_g1", + "gauss_psfReconvolved_g2", + 'gauss_psfReconvolved_T', + "psfOriginal_e1", + "psfOriginal_e2", + "psfOriginal_T", + #Next follows the fluxes: + "g_pgaussFlux", + "r_pgaussFlux", + "i_pgaussFlux", + #"z_pgaussFlux", + "g_pgaussFluxErr", + "r_pgaussFluxErr", + "i_pgaussFluxErr", + #"z_pgaussFluxErr", + "pgauss_T", + "pgauss_TErr", + #Various flags + #"stamp_flags", + "psfOriginal_flags", + "gauss_psfReconvolved_flags", + "gauss_object_flags", + "g_gaussFlux_flags", + "r_gaussFlux_flags", + "i_gaussFlux_flags", + #"z_gaussFlux_flags", + "g_pgaussFlux_flags", + "r_pgaussFlux_flags", + "i_pgaussFlux_flags", + #"z_pgaussFlux_flags", + #"gauss_T_flags", + #"pgauss_T_flags", + "gauss_flags", + "pgauss_flags", + "gauss_shape_flags", + + ] + return input_columns + + def setup_output(self, tag, group, first_chunk): + f = self.open_output(tag) + g = f.create_group(group) + variants = { + "ns": len(first_chunk["ns"]), + "1p": len(first_chunk["1p"]), + "1m": len(first_chunk["1m"]), + "2p": len(first_chunk["2p"]), + "2m": len(first_chunk["2m"]), + } + columns = list(first_chunk["ns"].keys()) + splitter = MetaDetectSplitter(g, columns, variants) + for variant in ["ns", "1p", "1m", "2p", "2m"]: + splitter.write_bin(first_chunk[variant], variant) + return f + + def write_output(self, outfile, group, data): + g = outfile[group] + for variant in ["ns", "1p", "1m", "2p", "2m"]: + k = g[variant] + for name, col in data[variant].items(): + # replace masked values with nans + if np.ma.isMaskedArray(col): + col = col.filled(np.nan) + k[name].append(col) #NOT SURE THIS WORKS EITHER TBD + + + +# Outstanding issues! +# - 1 we don't have a fixed length on the things we add to the seperate variants, hence try append? need to figure out if it works +# - 2 same issue means the h5py shorten thing probably wont work? do we need it to or should we just drop it? +# - 3 write the version where data is taken from a parquet file instead of a butler file! diff --git a/txpipe/psf_diagnostics.py b/txpipe/psf_diagnostics.py index 6de43adf8..9e53c1e29 100644 --- a/txpipe/psf_diagnostics.py +++ b/txpipe/psf_diagnostics.py @@ -1119,7 +1119,7 @@ def load_galaxies(self): # Get the base catalog for metadetect if cat_type == "metadetect": - g = g["00"] + g = g["ns"] ra = g["ra"][:][mask] dec = g["dec"][:][mask] diff --git a/txpipe/shear_calibration/calibration_calculators.py b/txpipe/shear_calibration/calibration_calculators.py index 9430f6315..706a162e5 100755 --- a/txpipe/shear_calibration/calibration_calculators.py +++ b/txpipe/shear_calibration/calibration_calculators.py @@ -53,7 +53,7 @@ class CalibrationCalculator: The data that is added to the calculator should contain shear columns appropriate to the specific type of calibrationn used. For example, the metadetect calculator expects - columns 00/g1, 00/g2, etc. + columns ns/g1, ns/g2, etc. The selection function does not need to know about all these variants. The calculator will wrap the data dictionary passed in in a special class that chooses variant diff --git a/txpipe/shear_calibration/mean_shear_in_bins.py b/txpipe/shear_calibration/mean_shear_in_bins.py index 21f8ac221..945342033 100644 --- a/txpipe/shear_calibration/mean_shear_in_bins.py +++ b/txpipe/shear_calibration/mean_shear_in_bins.py @@ -39,10 +39,13 @@ def selector(self, data, i): # select objects in bin i of the x variable. w = (x > self.limits[i]) & (x < self.limits[i + 1]) - + # Optionally cut down to the source sample only if self.cut_source_bin: - w &= data["bin"] != -1 + if self.shear_catalog_type == "metadetect": + w & data[f"bin_{self.x_name[:2]}"] != -1 + else: + w &= data["bin"] != -1 return np.where(w)[0] def add_data(self, data): @@ -55,7 +58,7 @@ def add_data(self, data): # for all 5 variants. We just want the unsheared on # here. w = w[0] - weight = data["00/weight"][w] + weight = data["ns/weight"][w] else: weight = data["weight"][w] diff --git a/txpipe/shear_calibration/names.py b/txpipe/shear_calibration/names.py index 6bae3f63e..0f1976440 100644 --- a/txpipe/shear_calibration/names.py +++ b/txpipe/shear_calibration/names.py @@ -1,4 +1,4 @@ -META_VARIANTS = ["00", "1p", "1m", "2p", "2m"] +META_VARIANTS = ["ns", "1p", "1m", "2p", "2m"] def metacal_variants(*names): diff --git a/txpipe/source_selection/metadetect.py b/txpipe/source_selection/metadetect.py index 60e6dcf39..c3e8e6f13 100644 --- a/txpipe/source_selection/metadetect.py +++ b/txpipe/source_selection/metadetect.py @@ -41,7 +41,7 @@ def data_iterator(self): bands = self.config["bands"] # Core quantities we need - shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "psf_T_mean", "weight", "flags") + shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "weight", "psf_T_mean", "flags") # Magnitudes and errors shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="metadetect") @@ -81,7 +81,7 @@ def apply_simple_redshift_cut(self, data): # Otherwise we have to do it once for each variant pz_data = {} - variants = ["00/", "1p/", "2p/", "1m/", "2m/"] + variants = ["ns/", "1p/", "2p/", "1m/", "2m/"] for v in variants: if self.config["true_z"]: zz = data[f"{v}redshift_true"] @@ -119,7 +119,7 @@ def setup_output(self): n = infile[f"shear/{v}/ra"].size outfile["tomography"].create_dataset(f"bin_{v}", (n,), dtype=np.int32) # Link the 00 variant to the base tomography/bin dataset - outfile["tomography/bin_00"] = outfile["tomography/bin"] + outfile["tomography/bin_ns"] = outfile["tomography/bin"] # There is only global calibration information for metadetect, nothing # per-bin. diff --git a/txpipe/utils/splitters.py b/txpipe/utils/splitters.py index e53b1046a..87f0b5d27 100644 --- a/txpipe/utils/splitters.py +++ b/txpipe/utils/splitters.py @@ -206,3 +206,39 @@ def finish(self): sz = self.index[b] for col in self.columns: sub[col].resize((sz,)) + + +class MetaDetectSplitter(DynamicSplitter): + """ + A splitter for splitting the metadetect data into subgroups for the different variants + """ + def __init__(self, group, columns, bin_sizes, dtypes=None): + """Create a fixed-size splitter + + Parameters + ---------- + group: h5py.Group + The group where the output data will be written + name: str + The base name of the different bins to write + columns: list + The str names of the columns to be split + bin_sizes: dict + Maps bin_name (can be anything printable) to final_bin_size (int) + dtypes: dict or None + Maps bins to HDF5 data types for the output columns. Bins default + to 8 byte floats if not found in this. + """ + self.bins = list(bin_sizes.keys()) + + self.group = group + + self.columns = columns + + # self.index will track how much data is written so far + self.index = {b: 0 for b in self.bins} + self.bin_sizes = bin_sizes + + # Make a subgroup for each bin in our data + self.subgroups = {b: self.group.create_group(f"{b}") for b in self.bins} + self._setup_columns(dtypes or {}) \ No newline at end of file