From 5bc86e8f2c6fe94e33aaf0ab618bc6457c8078d1 Mon Sep 17 00:00:00 2001 From: empEvil Date: Thu, 6 Nov 2025 11:18:31 +0000 Subject: [PATCH 01/33] adding a metadetect ingestion stage --- txpipe/ingest/MetaDetect.py | 183 ++++++++++++++++++++++++++++++++++++ txpipe/ingest/lsst.py | 39 ++++++++ txpipe/utils/__init__.py | 2 +- txpipe/utils/splitters.py | 36 +++++++ 4 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 txpipe/ingest/MetaDetect.py diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py new file mode 100644 index 000000000..432287c5b --- /dev/null +++ b/txpipe/ingest/MetaDetect.py @@ -0,0 +1,183 @@ +from ..base_stage import PipelineStage +from ..data_types import ShearCatalog, PhotometryCatalog, HDFFile, FileCollection +from .lsst import process_metadetect_data +from ceci.config import StageParameter +from ..utils.hdf_tools import h5py_shorten, repack +from ..utils.splitters import MetaDetectSplitter +import numpy as np + + +class TXIngestMetaDetect(PipelineStage): + """ + Initial ingestion of the Rubin MetaDetect catalog + """ + + name = "TXIngestMetaDetect" + inputs = [] + outputs = [ + ("shear_catalog", ShearCatalog), + ] + config_options= { + "use_butler": StageParameter( + bool, + True, + msg="Should be left on, unless you got an external file, in that case knock yourself out!"), + "butler_config_file": StageParameter( + str, + "/global/cfs/cdirs/lsst/production/gen3/rubin/DP1/repo/butler.yaml", + msg="Path to the LSST butler config file." + ), + "cosmology_tracts_only": StageParameter(bool, True, msg="Use only cosmology tracts."), + "select_field": StageParameter(str, "", msg="Field to select (overrides cosmology_tracts_only)."), + "collections": StageParameter(str, "LSSTComCam/DP1", msg="Butler collections to use."), + "file_path": StageParameter(str, "Nah", msg="if not using a Butler, you need to give a path to the file.") + } + + def run(self): + if self.config("use_butler"): + self.butler_run() + else: + self.file_run() + + # Run h5repack on the file + print("Repacking files") + repack(self.get_output("shear_catalog")) + + def butler_run(self): + error_msg = ( + "The LSST Science Pipelines are not installed in this environment, " + "or are not configured correctly to access the data. " + "See the note in the file example/dp1/ingest.yml for how to set " + "this up on NERSC." + ) + try: + from lsst.daf.butler import Butler + except: + raise ImportError(error_msg) + + + # Configure and create the butler. There are several ways to do this, + # Here we use a central collective butler yaml file from NERSC. + + butler_config_file = self.config["butler_config_file"] + collections = self.config["collections"] + try: + butler = Butler(butler_config_file, collections=collections) + except: + raise RuntimeError(error_msg) + + if self.config["select_field"]: + tracts = DP1_TRACTS[self.config["select_field"]] + elif self.config["cosmology_tracts_only"]: + tracts = DP1_COSMOLOGY_TRACTS + else: + tracts = ALL_TRACTS + + #n = self.get_catalog_size(butler, "ShearObject") + shear_outfile = self.open_output("shear_catalog") + group = shear_outfile.create_group("shear") + shear_outfile["shear"].attrs["catalog_type"] = "metadetect" + + created_files = False + data_set_refs = butler.query_datasets("ShearObject") + n_chunks = len(data_set_refs) + input_columns = self.get_input_columns() + + for i, ref in enumerate(data_set_refs): + tract = ref.dataId["tract"] + if tract not in tracts: + print(f"Skipping chunk {i + 1} / {n_chunks} since tract {tract} is not selected") + continue + + d = butler.get("ShearObject", + dataId=ref.dataId, + parameters={"columns": input_columns} + ) + chunk_size = len(d) + + if chunk_size == 0: + print(f"Skipping chunk {i + 1} / {n_chunks} since it is empty") + continue + + shear_data = process_metadetect_data(d) + if not created_files: + created_files = True + variants = { + "00": len(shear_data["00"]), + "1p": len(shear_data["1p"]), + "1m": len(shear_data["1m"]), + "2p": len(shear_data["2p"]), + "2m": len(shear_data["2m"]), + } + columns = list(shear_data["00"].keys()) + splitter = MetaDetectSplitter(group, columns, variants) + + for variant in ["00", "1p", "1m", "2p", "2m"]: + splitter.write_bin(shear_data[variant], variant) + print(f"Processing chunk {i + 1} / {n_chunks}") + + splitter.finish() + shear_outfile.close() + + def file_run(): + print("Not implemented yet!") + + def get_input_columns(self): + input_columns = [ + "shearObjectId", + "cellId", + "metaStep", + "radec", + "maskFractionObj", + "maskFractionCell", + "nEpochCell", + "g1", + "g2", + "gCov", + "T", + "SNR", + "TErr", + "g1PSFMeta", + "g2PSFMeta", + "g1PSFOrig", + "g2PSFOrig", + "TPSFOrig", + "stdFlux", + "stdFluxErr", + "stdFluxT", + "stdFluxTErr", + "flags" + ] + return input_columns + + def setup_output(self, tag, group, first_chunk): + f = self.open_output(tag) + g = f.create_group(group) + variants = { + "00": len(first_chunk["00"]), + "1p": len(first_chunk["1p"]), + "1m": len(first_chunk["1m"]), + "2p": len(first_chunk["2p"]), + "2m": len(first_chunk["2m"]), + } + columns = list(first_chunk["00"].keys()) + splitter = MetaDetectSplitter(g, columns, variants) + for variant in ["00", "1p", "1m", "2p", "2m"]: + splitter.write_bin(first_chunk[variant], variant) + return f + + def write_output(self, outfile, group, data): + g = outfile[group] + for variant in ["00", "1p", "1m", "2p", "2m"]: + k = g[variant] + for name, col in data[variant].items(): + # replace masked values with nans + if np.ma.isMaskedArray(col): + col = col.filled(np.nan) + k[name].append(col) #NOT SURE THIS WORKS EITHER TBD + + +# Outstanding issues! +# - 1 we don't have a fixed length on the things we add to the seperate variants, hence try append? need to figure out if it works +# - 2 same issue means the h5py shorten thing probably wont work? do we need it to or should we just drop it? +# - 3 write the version where data is taken from a parquet file instead of a butler file! diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index c0f3b0128..bc2cc1a68 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -64,3 +64,42 @@ def process_shear_data(data): output["psf_g2"] = psf_g2 return output + + +def process_metadetect_data(data): + output = {} + for variant in ["00", "1p", "1m", "2p", "2m"]: + var_data = data[data["metaStep"] == variant] + + var_output = { + "ra": var_data["radec"][0], + "dec": var_data["radec"][1], + "cell_id": var_data["cellId"], + "id": var_data["shearObjectId"], + "metaStep": var_data["metaStep"], + "object_mask_fraction": var_data["maskFractionObj"], + "cell_mask_fraction": var_data["maskFractionCell"], + "n_epoch": var_data["nEpochCell"], + "g1": var_data["g1"], + "g2": var_data["g2"], + "g1_err": var_data["gCov"][0], + "g2_err": var_data["gCov"][1], + "g_cross": var_data["gCov"][2], + "T": var_data["T"], + "s2n": var_data["SNR"], + "T_err": var_data["TErr"], + "psf_g1": var_data["g1PSFOrig"], + "psf_g2": var_data["g2PSFOrig"], + "mcal_psf_g1": var_data["g1PSFMeta"], + "mcal_psf_g2": var_data["g2PSFMeta"], + "mcal_psf_T_mean": var_data["TPSFMeta"], + "flags": var_data["flags"], + } + for band in "ugrizy": + f = var_data["stdFlux"][band] + f_err = var_data["stdFluxErr"][band] + var_output[f"flux_{band}"] = f + var_output[f"flux_err_{band}"] = f_err + output[f"{variant}"] = var_output + + return output diff --git a/txpipe/utils/__init__.py b/txpipe/utils/__init__.py index 6f02561ce..5373e857b 100755 --- a/txpipe/utils/__init__.py +++ b/txpipe/utils/__init__.py @@ -10,7 +10,7 @@ LensfitCalibrator, HSCCalibrator, ) -from .splitters import Splitter, DynamicSplitter +from .splitters import Splitter, DynamicSplitter, MetaDetectSplitter from .calibration_tools import read_shear_catalog_type, band_variants, metacal_variants, metadetect_variants from .calibration_tools import MetacalCalculator, LensfitCalculator, MeanShearInBins from .conversion import nanojansky_err_to_mag_ab, nanojansky_to_mag_ab, moments_to_shear, mag_ab_to_nanojansky diff --git a/txpipe/utils/splitters.py b/txpipe/utils/splitters.py index e53b1046a..87f0b5d27 100644 --- a/txpipe/utils/splitters.py +++ b/txpipe/utils/splitters.py @@ -206,3 +206,39 @@ def finish(self): sz = self.index[b] for col in self.columns: sub[col].resize((sz,)) + + +class MetaDetectSplitter(DynamicSplitter): + """ + A splitter for splitting the metadetect data into subgroups for the different variants + """ + def __init__(self, group, columns, bin_sizes, dtypes=None): + """Create a fixed-size splitter + + Parameters + ---------- + group: h5py.Group + The group where the output data will be written + name: str + The base name of the different bins to write + columns: list + The str names of the columns to be split + bin_sizes: dict + Maps bin_name (can be anything printable) to final_bin_size (int) + dtypes: dict or None + Maps bins to HDF5 data types for the output columns. Bins default + to 8 byte floats if not found in this. + """ + self.bins = list(bin_sizes.keys()) + + self.group = group + + self.columns = columns + + # self.index will track how much data is written so far + self.index = {b: 0 for b in self.bins} + self.bin_sizes = bin_sizes + + # Make a subgroup for each bin in our data + self.subgroups = {b: self.group.create_group(f"{b}") for b in self.bins} + self._setup_columns(dtypes or {}) \ No newline at end of file From c59725c15f051f9dc09ac99f01e54752a1417a62 Mon Sep 17 00:00:00 2001 From: empEvil Date: Thu, 6 Nov 2025 13:01:30 +0000 Subject: [PATCH 02/33] added options to just provide a parquet file --- txpipe/ingest/MetaDetect.py | 45 ++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py index 432287c5b..ae9ce6528 100644 --- a/txpipe/ingest/MetaDetect.py +++ b/txpipe/ingest/MetaDetect.py @@ -5,7 +5,8 @@ from ..utils.hdf_tools import h5py_shorten, repack from ..utils.splitters import MetaDetectSplitter import numpy as np - +import os +import pyarrow.parquet as pq class TXIngestMetaDetect(PipelineStage): """ @@ -30,7 +31,7 @@ class TXIngestMetaDetect(PipelineStage): "cosmology_tracts_only": StageParameter(bool, True, msg="Use only cosmology tracts."), "select_field": StageParameter(str, "", msg="Field to select (overrides cosmology_tracts_only)."), "collections": StageParameter(str, "LSSTComCam/DP1", msg="Butler collections to use."), - "file_path": StageParameter(str, "Nah", msg="if not using a Butler, you need to give a path to the file.") + "file_path": StageParameter(str, None, msg="if not using a Butler, you need to give a path to the file.") } def run(self): @@ -111,7 +112,7 @@ def butler_run(self): } columns = list(shear_data["00"].keys()) splitter = MetaDetectSplitter(group, columns, variants) - + for variant in ["00", "1p", "1m", "2p", "2m"]: splitter.write_bin(shear_data[variant], variant) print(f"Processing chunk {i + 1} / {n_chunks}") @@ -119,8 +120,42 @@ def butler_run(self): splitter.finish() shear_outfile.close() - def file_run(): - print("Not implemented yet!") + def file_run(self): + file_path = self.config("file_path") + if file_path == None: + raise RuntimeError("You must either use a butler, or specify a file_path to your metadetect catalog.") + + if not os.path.exists(file_path): + raise RuntimeError("No file where you said it would be.") + + shear_outfile = self.open_output("shear_catalog") + group = shear_outfile.create_group("shear") + shear_outfile["shear"].attrs["catalog_type"] = "metadetect" + + created_files = False + input_columns = self.get_input_columns() + + chunk_size = self.config("chunk_rows") + pf = pq.ParquetFile(file_path) + for batch in pf.iter_batches(columns=input_columns, batch_size=chunk_size): + shear_data = process_metadetect_data(batch) + if not created_files: + created_files = True + variants = { + "00": len(shear_data["00"]), + "1p": len(shear_data["1p"]), + "1m": len(shear_data["1m"]), + "2p": len(shear_data["2p"]), + "2m": len(shear_data["2m"]), + } + columns = list(shear_data["00"].keys()) + splitter = MetaDetectSplitter(group, columns, variants) + + for variant in ["00", "1p", "1m", "2p", "2m"]: + splitter.write_bin(shear_data[variant], variant) + + splitter.finish() + shear_outfile.close() def get_input_columns(self): input_columns = [ From ecf9d8573d3976423a2a9a169264d2a203d006c6 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 20 Feb 2026 14:51:07 +0000 Subject: [PATCH 03/33] updated to DP2 schema --- txpipe/ingest/MetaDetect.py | 69 ++++++++++++++++++++++++++----------- txpipe/ingest/lsst.py | 46 ++++++++++++------------- 2 files changed, 70 insertions(+), 45 deletions(-) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py index ae9ce6528..358e619e6 100644 --- a/txpipe/ingest/MetaDetect.py +++ b/txpipe/ingest/MetaDetect.py @@ -160,28 +160,55 @@ def file_run(self): def get_input_columns(self): input_columns = [ "shearObjectId", - "cellId", + "cellId", #removed "metaStep", - "radec", - "maskFractionObj", - "maskFractionCell", - "nEpochCell", - "g1", - "g2", - "gCov", - "T", - "SNR", - "TErr", - "g1PSFMeta", - "g2PSFMeta", - "g1PSFOrig", - "g2PSFOrig", - "TPSFOrig", - "stdFlux", - "stdFluxErr", - "stdFluxT", - "stdFluxTErr", - "flags" + "coord_ra", + "coord_dec", + "mfrac", + #"maskFractionCell", #Removed + #"nEpochCell", #Removed + "gauss_g1", + "gauss_g2", + "gauss_g1_g1_Cov", + "gauss_g1_g2_Cov", + "gauss_g2_g2_Cov", + "gauss_T", + "gauss_snr", + "gauss_TErr", + "gauss_psfReconvolved_g1", + "gauss_psfReconvolved_g2", + "psfOriginal_e1", + "psfOriginal_e2", + "psfOriginal_T", + #Next follows the fluxes: + "g_pgaussFlux", + "r_pgaussFlux", + "i_pgaussFlux", + "z_pgaussFlux", + "g_pgaussFluxErr", + "r_pgaussFluxErr", + "i_pgaussFluxErr", + "z_pgaussFluxErr", + "pgauss_T", + "pgauss_TErr", + #Various flags + "stamp_flags", + "psfOriginal_flags", + "gauss_psfReconvolved_flags", + "gauss_object_flags", + "g_gaussFlux_flags", + "r_gaussFlux_flags", + "i_gaussFlux_flags", + "z_gaussFlux_flags", + "g_pgaussFlux_flags", + "r_pgaussFlux_flags", + "i_pgaussFlux_flags", + "z_pgaussFlux_flags", + "gauss_T_flags", + "pgauss_T_flags", + "gauss_flags", + "pgauss_flags", + ] return input_columns diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index bc2cc1a68..d70fd8d6e 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -72,32 +72,30 @@ def process_metadetect_data(data): var_data = data[data["metaStep"] == variant] var_output = { - "ra": var_data["radec"][0], - "dec": var_data["radec"][1], - "cell_id": var_data["cellId"], + "ra": var_data["coord_ra"], + "dec": var_data["coord_dec"], "id": var_data["shearObjectId"], - "metaStep": var_data["metaStep"], - "object_mask_fraction": var_data["maskFractionObj"], - "cell_mask_fraction": var_data["maskFractionCell"], - "n_epoch": var_data["nEpochCell"], - "g1": var_data["g1"], - "g2": var_data["g2"], - "g1_err": var_data["gCov"][0], - "g2_err": var_data["gCov"][1], - "g_cross": var_data["gCov"][2], - "T": var_data["T"], - "s2n": var_data["SNR"], - "T_err": var_data["TErr"], - "psf_g1": var_data["g1PSFOrig"], - "psf_g2": var_data["g2PSFOrig"], - "mcal_psf_g1": var_data["g1PSFMeta"], - "mcal_psf_g2": var_data["g2PSFMeta"], - "mcal_psf_T_mean": var_data["TPSFMeta"], - "flags": var_data["flags"], + "metaStep": var_data["metaStep"], #might not be needed + "object_mask_fraction": var_data["mfrac"], + #"n_epoch": var_data["nEpochCell"], + "g1": var_data["gauss_g1"], + "g2": var_data["gauss_g2"], + "g1_err": var_data["gauss_g1_g1_Cov"], + "g2_err": var_data["gauss_g2_g2_Cov"], + "g_cross": var_data["gauss_g1_g2_Cov"], + "T": var_data["gauss_T"], + "s2n": var_data["gauss_snr"], + "T_err": var_data["gauss_TErr"], + "psf_g1": var_data["psfOriginal_e1"], + "psf_g2": var_data["psfOriginal_e2"], + "mcal_psf_g1": var_data["gauss_psfReconvolved_g1"], + "mcal_psf_g2": var_data["gauss_psfReconvolved_g1"], + "mcal_psf_T_mean": var_data["gauss_psfReconvolved_T"], + "flags": var_data["flags"], # TO BE ADDRESSED! } - for band in "ugrizy": - f = var_data["stdFlux"][band] - f_err = var_data["stdFluxErr"][band] + for band in "griz": # For DP2, we only expect 4 bands + f = var_data[f"{band}_pgaussFlux"] + f_err = var_data[f"{band}_pgaussFluxErr"] var_output[f"flux_{band}"] = f var_output[f"flux_err_{band}"] = f_err output[f"{variant}"] = var_output From 41befad6d2677e0db5304ac4b036106af6fc95dc Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 27 Feb 2026 17:04:37 +0000 Subject: [PATCH 04/33] more updates --- txpipe/ingest/lsst.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index d70fd8d6e..1065d67bd 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -68,7 +68,7 @@ def process_shear_data(data): def process_metadetect_data(data): output = {} - for variant in ["00", "1p", "1m", "2p", "2m"]: + for variant in ["ns", "1p", "1m", "2p", "2m"]: var_data = data[data["metaStep"] == variant] var_output = { From 78d6545437ef8386b819969b4ef07eb604addc25 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 17 Mar 2026 14:28:58 +0000 Subject: [PATCH 05/33] fixed up for dp1 --- txpipe/ingest/MetaDetect.py | 132 ++++++++++++++++++++++++++++-------- txpipe/ingest/__init__.py | 1 + txpipe/ingest/lsst.py | 26 +++++-- 3 files changed, 126 insertions(+), 33 deletions(-) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py index 358e619e6..ef275dec8 100644 --- a/txpipe/ingest/MetaDetect.py +++ b/txpipe/ingest/MetaDetect.py @@ -8,6 +8,65 @@ import os import pyarrow.parquet as pq +# The tract values are listed in table 2 of that paper: +DP1_COSMOLOGY_FIELDS = [ + "EDFS", + "ECDFS", + "LGLF", +] + + +DP1_TRACTS = { + # Euclid Deep Field South + "EDFS": [2393, 2234, 2235, 2394], + # Extended Chandra Deep Field South + "ECDFS": [5062, 5063, 5064, 4848, 4849], + # Low Galactic Latitude Field / Rubin_SV_095_-25 + "LGLF": [5305, 5306, 5525, 5526], + # Fornax Dwarf Spheroidal Galaxy + "FDSG": [4016, 4217, 4218, 4017], + # Low Ecliptic Latitude Field / Rubin_SV_38_7 + "LELF": [10464, 10221, 10222, 10704, 10705, 10463], + # Seagull Nebula + "Seagull": [7850, 7849, 7610, 7611], + # 47 Tuc Globular Cluster + "47Tuc": [531, 532, 453, 454], +} + +DP1_COSMOLOGY_TRACTS = sum([DP1_TRACTS[_field] for _field in DP1_COSMOLOGY_FIELDS], []) +ALL_TRACTS = sum(DP1_TRACTS.values(), []) + + +# In case useful later: +DP1_FIELD_CENTERS = { + "47 Tuc Globular Cluster": (6.02, -72.08), + "Low Ecliptic Latitude Field": (37.86, 6.98), + "Fornax Dwarf Spheroidal Galaxy": (40.00, -34.45), + "Extended Chandra Deep Field South": (53.13, -28.10), + "Euclid Deep Field South": (59.10, -48.73), + "Low Galactic Latitude Field": (95.00, -25.00), + "Seagull Nebula": (106.23, -10.51), +} + + +DP1_SURVEY_PROPERTIES = { + "deepCoadd_exposure_time_consolidated_map_sum": "Total exposure time accumulated per sky position (second)", + "deepCoadd_epoch_consolidated_map_min": "Earliest observation epoch (MJD)", + "deepCoadd_epoch_consolidated_map_max": "Latest observation epoch (MJD)", + "deepCoadd_epoch_consolidated_map_mean": "Mean observation epoch (MJD)", + "deepCoadd_psf_size_consolidated_map_weighted_mean": "Weighted mean of PSF characteristic width as computed from the determinant radius (pixel)", + "deepCoadd_psf_e1_consolidated_map_weighted_mean": "Weighted mean of PSF ellipticity component e1", + "deepCoadd_psf_e2_consolidated_map_weighted_mean": "Weighted mean of PSF ellipticity component e2", + "deepCoadd_psf_maglim_consolidated_map_weighted_mean": "Weighted mean of PSF flux 5σ magnitude limit (magAB)", + "deepCoadd_sky_background_consolidated_map_weighted_mean": "Weighted mean of background light level from the sky (nJy)", + "deepCoadd_sky_noise_consolidated_map_weighted_mean": "Weighted mean of standard deviation of the sky level (nJy)", + "deepCoadd_dcr_dra_consolidated_map_weighted_mean": "Weighted mean of DCR-induced astrometric shift in right ascension direction, expressed as a proportionality factor", + "deepCoadd_dcr_ddec_consolidated_map_weighted_mean": "Weighted mean of DCR-induced astrometric shift in declination direction, expressed as a proportionality factor", + "deepCoadd_dcr_e1_consolidated_map_weighted_mean": "Weighted mean of DCR-induced change in PSF ellipticity (e1), expressed as a proportionality factor", + "deepCoadd_dcr_e2_consolidated_map_weighted_mean": "Weighted mean of DCR-induced change in PSF ellipticity (e2), expressed as a proportionality factor", +} + + class TXIngestMetaDetect(PipelineStage): """ Initial ingestion of the Rubin MetaDetect catalog @@ -35,7 +94,7 @@ class TXIngestMetaDetect(PipelineStage): } def run(self): - if self.config("use_butler"): + if self.config["use_butler"]: self.butler_run() else: self.file_run() @@ -80,7 +139,7 @@ def butler_run(self): shear_outfile["shear"].attrs["catalog_type"] = "metadetect" created_files = False - data_set_refs = butler.query_datasets("ShearObject") + data_set_refs = butler.query_datasets('object_shear_all') n_chunks = len(data_set_refs) input_columns = self.get_input_columns() @@ -90,7 +149,7 @@ def butler_run(self): print(f"Skipping chunk {i + 1} / {n_chunks} since tract {tract} is not selected") continue - d = butler.get("ShearObject", + d = butler.get('object_shear_all', dataId=ref.dataId, parameters={"columns": input_columns} ) @@ -104,16 +163,17 @@ def butler_run(self): if not created_files: created_files = True variants = { - "00": len(shear_data["00"]), + "ns": len(shear_data["ns"]), "1p": len(shear_data["1p"]), "1m": len(shear_data["1m"]), "2p": len(shear_data["2p"]), "2m": len(shear_data["2m"]), } - columns = list(shear_data["00"].keys()) - splitter = MetaDetectSplitter(group, columns, variants) + columns = list(shear_data["ns"].keys()) + dtypes = {key: shear_data["ns"][key].dtype for key in shear_data["ns"]} + splitter = MetaDetectSplitter(group, columns, variants, dtypes=dtypes) - for variant in ["00", "1p", "1m", "2p", "2m"]: + for variant in ["ns", "1p", "1m", "2p", "2m"]: splitter.write_bin(shear_data[variant], variant) print(f"Processing chunk {i + 1} / {n_chunks}") @@ -139,20 +199,22 @@ def file_run(self): pf = pq.ParquetFile(file_path) for batch in pf.iter_batches(columns=input_columns, batch_size=chunk_size): shear_data = process_metadetect_data(batch) + if not created_files: created_files = True variants = { - "00": len(shear_data["00"]), + "ns": len(shear_data["ns"]), "1p": len(shear_data["1p"]), "1m": len(shear_data["1m"]), "2p": len(shear_data["2p"]), "2m": len(shear_data["2m"]), } - columns = list(shear_data["00"].keys()) + columns = list(shear_data["ns"].keys()) splitter = MetaDetectSplitter(group, columns, variants) - for variant in ["00", "1p", "1m", "2p", "2m"]: - splitter.write_bin(shear_data[variant], variant) + for variant in ["ns", "1p", "1m", "2p", "2m"]: + data = sanitize(shear_data[variant]) + splitter.write_bin(data, variant) splitter.finish() shear_outfile.close() @@ -160,11 +222,13 @@ def file_run(self): def get_input_columns(self): input_columns = [ "shearObjectId", - "cellId", #removed + #"cellId", #removed + 'cell_x', + 'cell_y', "metaStep", - "coord_ra", - "coord_dec", - "mfrac", + "ra", + "dec", + #"mfrac", #"maskFractionCell", #Removed #"nEpochCell", #Removed "gauss_g1", @@ -177,6 +241,7 @@ def get_input_columns(self): "gauss_TErr", "gauss_psfReconvolved_g1", "gauss_psfReconvolved_g2", + 'gauss_psfReconvolved_T', "psfOriginal_e1", "psfOriginal_e2", "psfOriginal_T", @@ -184,31 +249,32 @@ def get_input_columns(self): "g_pgaussFlux", "r_pgaussFlux", "i_pgaussFlux", - "z_pgaussFlux", + #"z_pgaussFlux", "g_pgaussFluxErr", "r_pgaussFluxErr", "i_pgaussFluxErr", - "z_pgaussFluxErr", + #"z_pgaussFluxErr", "pgauss_T", "pgauss_TErr", #Various flags - "stamp_flags", + #"stamp_flags", "psfOriginal_flags", "gauss_psfReconvolved_flags", "gauss_object_flags", "g_gaussFlux_flags", "r_gaussFlux_flags", "i_gaussFlux_flags", - "z_gaussFlux_flags", + #"z_gaussFlux_flags", "g_pgaussFlux_flags", "r_pgaussFlux_flags", "i_pgaussFlux_flags", - "z_pgaussFlux_flags", - "gauss_T_flags", - "pgauss_T_flags", + #"z_pgaussFlux_flags", + #"gauss_T_flags", + #"pgauss_T_flags", "gauss_flags", "pgauss_flags", - + "gauss_shape_flags", + ] return input_columns @@ -216,21 +282,21 @@ def setup_output(self, tag, group, first_chunk): f = self.open_output(tag) g = f.create_group(group) variants = { - "00": len(first_chunk["00"]), + "ns": len(first_chunk["ns"]), "1p": len(first_chunk["1p"]), "1m": len(first_chunk["1m"]), "2p": len(first_chunk["2p"]), "2m": len(first_chunk["2m"]), } - columns = list(first_chunk["00"].keys()) + columns = list(first_chunk["ns"].keys()) splitter = MetaDetectSplitter(g, columns, variants) - for variant in ["00", "1p", "1m", "2p", "2m"]: + for variant in ["ns", "1p", "1m", "2p", "2m"]: splitter.write_bin(first_chunk[variant], variant) return f def write_output(self, outfile, group, data): g = outfile[group] - for variant in ["00", "1p", "1m", "2p", "2m"]: + for variant in ["ns", "1p", "1m", "2p", "2m"]: k = g[variant] for name, col in data[variant].items(): # replace masked values with nans @@ -238,6 +304,18 @@ def write_output(self, outfile, group, data): col = col.filled(np.nan) k[name].append(col) #NOT SURE THIS WORKS EITHER TBD +def sanitize(data): + """ + Convert unicode arrays into types that h5py can save + """ + # convert unicode to strings + if data.dtype.kind == "U": + data = data.astype("S") + # convert dates to integers + elif data.dtype.kind == "M": + data = data.astype(int) + + return data # Outstanding issues! # - 1 we don't have a fixed length on the things we add to the seperate variants, hence try append? need to figure out if it works diff --git a/txpipe/ingest/__init__.py b/txpipe/ingest/__init__.py index 0b381c8d7..110e1a650 100644 --- a/txpipe/ingest/__init__.py +++ b/txpipe/ingest/__init__.py @@ -11,3 +11,4 @@ ) from .dp1 import TXIngestDataPreview1 from .legacy import TXIngestDESY3Gold +from .MetaDetect import TXIngestMetaDetect diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index 1065d67bd..52130f062 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -70,13 +70,14 @@ def process_metadetect_data(data): output = {} for variant in ["ns", "1p", "1m", "2p", "2m"]: var_data = data[data["metaStep"] == variant] + var_data = sanitize(var_data) var_output = { - "ra": var_data["coord_ra"], - "dec": var_data["coord_dec"], + "ra": var_data["ra"], + "dec": var_data["dec"], "id": var_data["shearObjectId"], - "metaStep": var_data["metaStep"], #might not be needed - "object_mask_fraction": var_data["mfrac"], + "metaStep": var_data["metaStep"].astype("S"), #might not be needed + #"object_mask_fraction": var_data["mfrac"], #"n_epoch": var_data["nEpochCell"], "g1": var_data["gauss_g1"], "g2": var_data["gauss_g2"], @@ -91,9 +92,9 @@ def process_metadetect_data(data): "mcal_psf_g1": var_data["gauss_psfReconvolved_g1"], "mcal_psf_g2": var_data["gauss_psfReconvolved_g1"], "mcal_psf_T_mean": var_data["gauss_psfReconvolved_T"], - "flags": var_data["flags"], # TO BE ADDRESSED! + "flags": var_data["gauss_shape_flags"], # TO BE ADDRESSED! } - for band in "griz": # For DP2, we only expect 4 bands + for band in "gri": # For DP2, we only expect 4 bands f = var_data[f"{band}_pgaussFlux"] f_err = var_data[f"{band}_pgaussFluxErr"] var_output[f"flux_{band}"] = f @@ -101,3 +102,16 @@ def process_metadetect_data(data): output[f"{variant}"] = var_output return output + +def sanitize(data): + """ + Convert unicode arrays into types that h5py can save + """ + # convert unicode to strings + if data.dtype.kind == "U": + data = data.astype("S") + # convert dates to integers + elif data.dtype.kind == "M": + data = data.astype(int) + + return data \ No newline at end of file From f75c150ea9d5d6c251e7082217f205ace5f1d2f2 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 17 Mar 2026 15:10:15 +0000 Subject: [PATCH 06/33] re-enabling mfrac --- txpipe/ingest/MetaDetect.py | 2 +- txpipe/ingest/lsst.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py index ef275dec8..768d4ba8f 100644 --- a/txpipe/ingest/MetaDetect.py +++ b/txpipe/ingest/MetaDetect.py @@ -228,7 +228,7 @@ def get_input_columns(self): "metaStep", "ra", "dec", - #"mfrac", + "mfrac", #"maskFractionCell", #Removed #"nEpochCell", #Removed "gauss_g1", diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index 52130f062..b5b5f1a29 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -77,7 +77,7 @@ def process_metadetect_data(data): "dec": var_data["dec"], "id": var_data["shearObjectId"], "metaStep": var_data["metaStep"].astype("S"), #might not be needed - #"object_mask_fraction": var_data["mfrac"], + "object_mask_fraction": var_data["mfrac"], #"n_epoch": var_data["nEpochCell"], "g1": var_data["gauss_g1"], "g2": var_data["gauss_g2"], From d40683406e5952a3402a82d0719b61af5cf8f4f4 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 20 Mar 2026 16:31:08 +0000 Subject: [PATCH 07/33] changing to mags --- txpipe/ingest/lsst.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index b5b5f1a29..39417fdb8 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -97,8 +97,8 @@ def process_metadetect_data(data): for band in "gri": # For DP2, we only expect 4 bands f = var_data[f"{band}_pgaussFlux"] f_err = var_data[f"{band}_pgaussFluxErr"] - var_output[f"flux_{band}"] = f - var_output[f"flux_err_{band}"] = f_err + var_output[f"mag_{band}"] = nanojansky_to_mag_ab(f) + var_output[f"mag_err_{band}"] = nanojansky_err_to_mag_ab(f, f_err) output[f"{variant}"] = var_output return output From 796ba55ae477721417c15a4f6aaf061d27235a0b Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 14 Apr 2026 14:08:46 +0100 Subject: [PATCH 08/33] just a few lint errors --- txpipe/ingest/MetaDetect.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py index 768d4ba8f..217ddb835 100644 --- a/txpipe/ingest/MetaDetect.py +++ b/txpipe/ingest/MetaDetect.py @@ -182,7 +182,7 @@ def butler_run(self): def file_run(self): file_path = self.config("file_path") - if file_path == None: + if file_path is None: raise RuntimeError("You must either use a butler, or specify a file_path to your metadetect catalog.") if not os.path.exists(file_path): @@ -274,14 +274,14 @@ def get_input_columns(self): "gauss_flags", "pgauss_flags", "gauss_shape_flags", - + ] return input_columns def setup_output(self, tag, group, first_chunk): f = self.open_output(tag) g = f.create_group(group) - variants = { + variants = { "ns": len(first_chunk["ns"]), "1p": len(first_chunk["1p"]), "1m": len(first_chunk["1m"]), @@ -302,7 +302,8 @@ def write_output(self, outfile, group, data): # replace masked values with nans if np.ma.isMaskedArray(col): col = col.filled(np.nan) - k[name].append(col) #NOT SURE THIS WORKS EITHER TBD + k[name].append(col) #NOT SURE THIS WORKS EITHER TBD + def sanitize(data): """ From 0ba3e520061ac1daff3124f4fb550dd9b960cd07 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 5 May 2026 14:27:35 +0100 Subject: [PATCH 09/33] added missing original T --- txpipe/ingest/lsst.py | 1 + 1 file changed, 1 insertion(+) diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index 39417fdb8..20c2db44b 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -89,6 +89,7 @@ def process_metadetect_data(data): "T_err": var_data["gauss_TErr"], "psf_g1": var_data["psfOriginal_e1"], "psf_g2": var_data["psfOriginal_e2"], + "psf_T_mean": var_data["psfOriginal_T"], "mcal_psf_g1": var_data["gauss_psfReconvolved_g1"], "mcal_psf_g2": var_data["gauss_psfReconvolved_g1"], "mcal_psf_T_mean": var_data["gauss_psfReconvolved_T"], From 35d5ebe84de88641655ddfe5734e5e018ec3e457 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Tue, 5 May 2026 16:34:40 +0100 Subject: [PATCH 10/33] updated naming --- txpipe/ingest/lsst.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index 20c2db44b..218557134 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -87,12 +87,12 @@ def process_metadetect_data(data): "T": var_data["gauss_T"], "s2n": var_data["gauss_snr"], "T_err": var_data["gauss_TErr"], - "psf_g1": var_data["psfOriginal_e1"], - "psf_g2": var_data["psfOriginal_e2"], - "psf_T_mean": var_data["psfOriginal_T"], - "mcal_psf_g1": var_data["gauss_psfReconvolved_g1"], - "mcal_psf_g2": var_data["gauss_psfReconvolved_g1"], - "mcal_psf_T_mean": var_data["gauss_psfReconvolved_T"], + "psf_g1_original": var_data["psfOriginal_e1"], + "psf_g2_original": var_data["psfOriginal_e2"], + "psf_T_mean_original": var_data["psfOriginal_T"], + "psf_g1": var_data["gauss_psfReconvolved_g1"], + "psf_g2": var_data["gauss_psfReconvolved_g1"], + "psf_T_mean": var_data["gauss_psfReconvolved_T"], "flags": var_data["gauss_shape_flags"], # TO BE ADDRESSED! } for band in "gri": # For DP2, we only expect 4 bands From d08bb91917c92c9806d192911f7de69c7e1ce903 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Mon, 18 May 2026 11:24:26 +0100 Subject: [PATCH 11/33] fixing smaller details --- txpipe/ingest/MetaDetect.py | 20 ++++---------------- txpipe/ingest/__init__.py | 2 +- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/MetaDetect.py index 217ddb835..e3c2b60a5 100644 --- a/txpipe/ingest/MetaDetect.py +++ b/txpipe/ingest/MetaDetect.py @@ -1,6 +1,6 @@ from ..base_stage import PipelineStage from ..data_types import ShearCatalog, PhotometryCatalog, HDFFile, FileCollection -from .lsst import process_metadetect_data +from .lsst import process_metadetect_data, sanitize from ceci.config import StageParameter from ..utils.hdf_tools import h5py_shorten, repack from ..utils.splitters import MetaDetectSplitter @@ -8,7 +8,7 @@ import os import pyarrow.parquet as pq -# The tract values are listed in table 2 of that paper: +# All TRACT INFORMATION SHOULD BE MOVED ELSEWHERE DP1_COSMOLOGY_FIELDS = [ "EDFS", "ECDFS", @@ -67,12 +67,12 @@ } -class TXIngestMetaDetect(PipelineStage): +class TXIngestRubinMetaDetect(PipelineStage): """ Initial ingestion of the Rubin MetaDetect catalog """ - name = "TXIngestMetaDetect" + name = "TXIngestRubinMetaDetect" inputs = [] outputs = [ ("shear_catalog", ShearCatalog), @@ -305,18 +305,6 @@ def write_output(self, outfile, group, data): k[name].append(col) #NOT SURE THIS WORKS EITHER TBD -def sanitize(data): - """ - Convert unicode arrays into types that h5py can save - """ - # convert unicode to strings - if data.dtype.kind == "U": - data = data.astype("S") - # convert dates to integers - elif data.dtype.kind == "M": - data = data.astype(int) - - return data # Outstanding issues! # - 1 we don't have a fixed length on the things we add to the seperate variants, hence try append? need to figure out if it works diff --git a/txpipe/ingest/__init__.py b/txpipe/ingest/__init__.py index e5ed353be..e3af22f3a 100644 --- a/txpipe/ingest/__init__.py +++ b/txpipe/ingest/__init__.py @@ -11,4 +11,4 @@ ) from .dp1 import TXIngestDataPreview1 from .stage3 import TXIngestDESY3Gold -from .MetaDetect import TXIngestMetaDetect +from .metaDetect import TXIngestRubinMetaDetect From a485fda4f0bf8de7e99acd021b8c72d54cf9838f Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Mon, 18 May 2026 11:38:50 +0100 Subject: [PATCH 12/33] further updates --- txpipe/shear_calibration/names.py | 2 +- txpipe/source_selection/metadetect.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/txpipe/shear_calibration/names.py b/txpipe/shear_calibration/names.py index 6bae3f63e..0f1976440 100644 --- a/txpipe/shear_calibration/names.py +++ b/txpipe/shear_calibration/names.py @@ -1,4 +1,4 @@ -META_VARIANTS = ["00", "1p", "1m", "2p", "2m"] +META_VARIANTS = ["ns", "1p", "1m", "2p", "2m"] def metacal_variants(*names): diff --git a/txpipe/source_selection/metadetect.py b/txpipe/source_selection/metadetect.py index 60e6dcf39..938dd5781 100644 --- a/txpipe/source_selection/metadetect.py +++ b/txpipe/source_selection/metadetect.py @@ -81,7 +81,7 @@ def apply_simple_redshift_cut(self, data): # Otherwise we have to do it once for each variant pz_data = {} - variants = ["00/", "1p/", "2p/", "1m/", "2m/"] + variants = ["ns/", "1p/", "2p/", "1m/", "2m/"] for v in variants: if self.config["true_z"]: zz = data[f"{v}redshift_true"] @@ -119,7 +119,7 @@ def setup_output(self): n = infile[f"shear/{v}/ra"].size outfile["tomography"].create_dataset(f"bin_{v}", (n,), dtype=np.int32) # Link the 00 variant to the base tomography/bin dataset - outfile["tomography/bin_00"] = outfile["tomography/bin"] + outfile["tomography/bin_ns"] = outfile["tomography/bin"] # There is only global calibration information for metadetect, nothing # per-bin. From 6ab7c896e4be124249c35f3b6fb3eeffd85a4e1b Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Mon, 18 May 2026 11:39:23 +0100 Subject: [PATCH 13/33] Rename MetaDetect.py to metaDetect.py --- txpipe/ingest/{MetaDetect.py => metaDetect.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename txpipe/ingest/{MetaDetect.py => metaDetect.py} (100%) diff --git a/txpipe/ingest/MetaDetect.py b/txpipe/ingest/metaDetect.py similarity index 100% rename from txpipe/ingest/MetaDetect.py rename to txpipe/ingest/metaDetect.py From 0f94175c1174c34e35671ff23e91c4a35cff3c1e Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Mon, 18 May 2026 16:20:52 +0100 Subject: [PATCH 14/33] changing 00 to ns in datatypes --- txpipe/data_types.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/txpipe/data_types.py b/txpipe/data_types.py index 1d7e33f03..4a574e38b 100755 --- a/txpipe/data_types.py +++ b/txpipe/data_types.py @@ -79,23 +79,23 @@ def get_size(self): def get_primary_catalog_group(self): if self.catalog_type == "metadetect": - return "shear/00" + return "shear/ns" else: return "shear" def get_true_redshift_column(self): if self.catalog_type == "metadetect": - return "00/redshift_true" + return "ns/redshift_true" else: return "redshift_true" def get_primary_catalog_names(self, true_shear=False): if true_shear: if self.catalog_type == "metadetect": - shear_cols = ["00/true_g1", "00/true_g2", "00/ra", "00/dec", "00/weight"] + shear_cols = ["ns/true_g1", "ns/true_g2", "ns/ra", "ns/dec", "ns/weight"] rename = {c: c[3:] for c in shear_cols} - rename["00/true_g1"] = "g1" - rename["00/true_g2"] = "g2" + rename["ns/true_g1"] = "g1" + rename["ns/true_g2"] = "g2" else: rename = {"true_g1": "g1", "true_g2": "g2"} rename = {} @@ -106,7 +106,7 @@ def get_primary_catalog_names(self, true_shear=False): shear_cols = ["g1", "g2", "c1", "c2", "ra", "dec", "weight"] rename = {} elif self.catalog_type == "metadetect": - shear_cols = ["00/g1", "00/g2", "00/ra", "00/dec", "00/weight"] + shear_cols = ["ns/g1", "ns/g2", "ns/ra", "ns/dec", "ns/weight"] rename = {c: c[3:] for c in shear_cols} else: shear_cols = ["g1", "g2", "ra", "dec", "weight"] From de41d62711093b55b6332e93311e8ca9b432bc8a Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 12:05:11 +0100 Subject: [PATCH 15/33] missed a 00 --- txpipe/data_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/data_types.py b/txpipe/data_types.py index 4a574e38b..2ffdf3a65 100755 --- a/txpipe/data_types.py +++ b/txpipe/data_types.py @@ -73,7 +73,7 @@ def catalog_type(self): def get_size(self): if self.catalog_type == "metadetect": - return self.file["shear/00/ra"].size + return self.file["shear/ns/ra"].size else: return self.file["shear/ra"].size From 7b3f25eba98132ca35814308110e19bfe6a06d12 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 12:20:55 +0100 Subject: [PATCH 16/33] change 00 to ns in random_forest --- txpipe/binning/random_forest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/binning/random_forest.py b/txpipe/binning/random_forest.py index 1929be2f8..c3c4c02bc 100644 --- a/txpipe/binning/random_forest.py +++ b/txpipe/binning/random_forest.py @@ -83,7 +83,7 @@ def apply_classifier(classifier, features, bands, shear_catalog_type, shear_data prefixes = ["", "", "", "", ""] suffixes = ["", "_1p", "_2p", "_1m", "_2m"] elif shear_catalog_type == "metadetect": - prefixes = ["00/", "1p/", "2p/", "1m/", "2m/"] + prefixes = ["ns/", "1p/", "2p/", "1m/", "2m/"] suffixes = ["", "", "", "", ""] else: prefixes = [""] From 3fe61494c5ceedf0c01a8e2f0cc72bf978891466 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 12:21:42 +0100 Subject: [PATCH 17/33] removed weight from source selector --- txpipe/source_selection/metadetect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/source_selection/metadetect.py b/txpipe/source_selection/metadetect.py index 938dd5781..4b994d2bd 100644 --- a/txpipe/source_selection/metadetect.py +++ b/txpipe/source_selection/metadetect.py @@ -41,7 +41,7 @@ def data_iterator(self): bands = self.config["bands"] # Core quantities we need - shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "psf_T_mean", "weight", "flags") + shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "psf_T_mean", "flags") # Magnitudes and errors shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="metadetect") From 28e3735dea7a4e9e6085155c5ca7f1f8f5224fcb Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 12:24:57 +0100 Subject: [PATCH 18/33] removed weight from source selector --- txpipe/source_selection/metadetect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/source_selection/metadetect.py b/txpipe/source_selection/metadetect.py index 938dd5781..4b994d2bd 100644 --- a/txpipe/source_selection/metadetect.py +++ b/txpipe/source_selection/metadetect.py @@ -41,7 +41,7 @@ def data_iterator(self): bands = self.config["bands"] # Core quantities we need - shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "psf_T_mean", "weight", "flags") + shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "psf_T_mean", "flags") # Magnitudes and errors shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="metadetect") From 0cf4a5d250b1ebc5dceafa9dc459c9ef1be78d41 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 14:05:23 +0100 Subject: [PATCH 19/33] adding a weight column with 1s --- txpipe/ingest/lsst.py | 1 + 1 file changed, 1 insertion(+) diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index 218557134..c04d424e5 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -94,6 +94,7 @@ def process_metadetect_data(data): "psf_g2": var_data["gauss_psfReconvolved_g1"], "psf_T_mean": var_data["gauss_psfReconvolved_T"], "flags": var_data["gauss_shape_flags"], # TO BE ADDRESSED! + "weight": np.ones_like(var_data["ra"]), } for band in "gri": # For DP2, we only expect 4 bands f = var_data[f"{band}_pgaussFlux"] From b134bfd98f2b76613a466f0f8a7160a461326a09 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 14:13:13 +0100 Subject: [PATCH 20/33] readding weights --- txpipe/source_selection/metadetect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/source_selection/metadetect.py b/txpipe/source_selection/metadetect.py index 4b994d2bd..c3e8e6f13 100644 --- a/txpipe/source_selection/metadetect.py +++ b/txpipe/source_selection/metadetect.py @@ -41,7 +41,7 @@ def data_iterator(self): bands = self.config["bands"] # Core quantities we need - shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "psf_T_mean", "flags") + shear_cols = metadetect_variants("T", "s2n", "g1", "g2", "ra", "dec", "weight", "psf_T_mean", "flags") # Magnitudes and errors shear_cols += band_variants(bands, "mag", "mag_err", shear_catalog_type="metadetect") From 04a9e5ee04589a3c6f786578acb689079fc4f431 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 14:34:26 +0100 Subject: [PATCH 21/33] changing 00 to ns --- txpipe/calibrate.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/txpipe/calibrate.py b/txpipe/calibrate.py index 37342ce09..90f635812 100644 --- a/txpipe/calibrate.py +++ b/txpipe/calibrate.py @@ -89,9 +89,9 @@ def run(self): # cat_cols is everything we are reading in if cat_type == "metadetect": - cat_cols = cat_cols + [f"00/{c}" for c in extra_cols + mag_cols_in] - mag_cols_in = [f"00/{c}" for c in mag_cols_in] - renames.update({f"00/{c}": c for c in extra_cols}) + cat_cols = cat_cols + [f"ns/{c}" for c in extra_cols + mag_cols_in] + mag_cols_in = [f"ns/{c}" for c in mag_cols_in] + renames.update({f"ns/{c}": c for c in extra_cols}) else: cat_cols = cat_cols + extra_cols + mag_cols_in From 633fbfcac61898381c4f8d7072fd8e77d1717d00 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 14:40:15 +0100 Subject: [PATCH 22/33] psf diagnostics fixed --- txpipe/psf_diagnostics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/psf_diagnostics.py b/txpipe/psf_diagnostics.py index 6de43adf8..9e53c1e29 100644 --- a/txpipe/psf_diagnostics.py +++ b/txpipe/psf_diagnostics.py @@ -1119,7 +1119,7 @@ def load_galaxies(self): # Get the base catalog for metadetect if cat_type == "metadetect": - g = g["00"] + g = g["ns"] ra = g["ra"][:][mask] dec = g["dec"][:][mask] From c41df07fb961ab0241870a1eaf653053524817a4 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 14:45:03 +0100 Subject: [PATCH 23/33] updating diagnostics to ns --- txpipe/diagnostics.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index adbb5b1fa..9f35b8ed9 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -292,7 +292,7 @@ def plot_psf_shear(self): delta_gamma = self.config["delta_gamma"] psf_g_edges = self.get_bin_edges("psf_g1") - shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else "" + shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" p1 = MeanShearInBins( f"{shear_prefix}psf_g1", @@ -391,7 +391,7 @@ def plot_psf_size_shear(self): delta_gamma = self.config["delta_gamma"] psf_T_edges = self.get_bin_edges("psf_T_mean") - shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else "" + shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" binnedShear = MeanShearInBins( f"{shear_prefix}psf_T_mean", @@ -451,7 +451,7 @@ def plot_snr_shear(self): # Parameters of the binning in SNR shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" delta_gamma = self.config["delta_gamma"] snr_edges = self.get_bin_edges("s2n") @@ -516,7 +516,7 @@ def plot_size_shear(self): from scipy import stats shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" delta_gamma = self.config["delta_gamma"] T_edges = self.get_bin_edges("T") @@ -580,7 +580,7 @@ def plot_mag_shear(self): from scipy import stats shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" delta_gamma = self.config["delta_gamma"] nbins = self.config["nbins"] @@ -719,9 +719,9 @@ def plot_g_histogram(self): g2 = data["g2"] w = data["weight"] elif cat_type == "metadetect": - g1 = data["00/g1"] - g2 = data["00/g2"] - w = data["00/weight"] + g1 = data["ns/g1"] + g2 = data["ns/g2"] + w = data["ns/weight"] elif cat_type == "lensfit": dec = data["dec"] g1 = data["g1"] @@ -790,7 +790,7 @@ def plot_snr_histogram(self): delta_gamma = self.config["delta_gamma"] shear_catalog_type = self.config["shear_catalog_type"] - shear_prefix = "00/" if shear_catalog_type == "metadetect" else "" + shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" bins = 10 edges = np.logspace(1, 3, bins + 1) mids = 0.5 * (edges[1:] + edges[:-1]) @@ -979,7 +979,7 @@ def plot_mag_histograms(self): mid = 0.5 * (edges[1:] + edges[:-1]) width = edges[1] - edges[0] bands = self.config["bands"] - shear_prefix = "00/" if self.config["shear_catalog_type"] == "metadetect" else "" + shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" nband = len(bands) full_hists = [np.zeros(size, dtype=int) for b in bands] source_hists = [np.zeros(size, dtype=int) for b in bands] From b54aa011dd360d8c1d5479725a33d8d16853aeaa Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 14:47:21 +0100 Subject: [PATCH 24/33] missed 00s in calibration --- txpipe/shear_calibration/calibration_calculators.py | 2 +- txpipe/shear_calibration/mean_shear_in_bins.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/txpipe/shear_calibration/calibration_calculators.py b/txpipe/shear_calibration/calibration_calculators.py index 9430f6315..706a162e5 100755 --- a/txpipe/shear_calibration/calibration_calculators.py +++ b/txpipe/shear_calibration/calibration_calculators.py @@ -53,7 +53,7 @@ class CalibrationCalculator: The data that is added to the calculator should contain shear columns appropriate to the specific type of calibrationn used. For example, the metadetect calculator expects - columns 00/g1, 00/g2, etc. + columns ns/g1, ns/g2, etc. The selection function does not need to know about all these variants. The calculator will wrap the data dictionary passed in in a special class that chooses variant diff --git a/txpipe/shear_calibration/mean_shear_in_bins.py b/txpipe/shear_calibration/mean_shear_in_bins.py index 21f8ac221..cb7d81ad0 100644 --- a/txpipe/shear_calibration/mean_shear_in_bins.py +++ b/txpipe/shear_calibration/mean_shear_in_bins.py @@ -55,7 +55,7 @@ def add_data(self, data): # for all 5 variants. We just want the unsheared on # here. w = w[0] - weight = data["00/weight"][w] + weight = data["ns/weight"][w] else: weight = data["weight"][w] From 14621733999800af1a19ac219bb5e5d87f98ee96 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 15:18:28 +0100 Subject: [PATCH 25/33] removing shear_prefix --- txpipe/diagnostics.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 9f35b8ed9..9c6f82f49 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -459,7 +459,7 @@ def plot_snr_shear(self): # This class includes all the cutting and calibration, both for # estimator and selection biases binnedShear = MeanShearInBins( - f"{shear_prefix}s2n", + f"s2n", snr_edges, delta_gamma, cut_source_bin=True, @@ -522,7 +522,7 @@ def plot_size_shear(self): T_edges = self.get_bin_edges("T") binnedShear = MeanShearInBins( - f"{shear_prefix}T", + f"T", T_edges, delta_gamma, cut_source_bin=True, @@ -590,7 +590,7 @@ def plot_mag_shear(self): m_edges = self.get_bin_edges(f"mag_{band}") binnedShear[f"{band}"] = MeanShearInBins( - f"{shear_prefix}mag_{band}", + f"mag_{band}", m_edges, delta_gamma, cut_source_bin=True, From 6a7e6602643bb66fc2c809834bc8f5030b9ff750 Mon Sep 17 00:00:00 2001 From: empEvil Date: Tue, 19 May 2026 15:20:21 +0100 Subject: [PATCH 26/33] undoing removal of shear prefix --- txpipe/diagnostics.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 9c6f82f49..9f35b8ed9 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -459,7 +459,7 @@ def plot_snr_shear(self): # This class includes all the cutting and calibration, both for # estimator and selection biases binnedShear = MeanShearInBins( - f"s2n", + f"{shear_prefix}s2n", snr_edges, delta_gamma, cut_source_bin=True, @@ -522,7 +522,7 @@ def plot_size_shear(self): T_edges = self.get_bin_edges("T") binnedShear = MeanShearInBins( - f"T", + f"{shear_prefix}T", T_edges, delta_gamma, cut_source_bin=True, @@ -590,7 +590,7 @@ def plot_mag_shear(self): m_edges = self.get_bin_edges(f"mag_{band}") binnedShear[f"{band}"] = MeanShearInBins( - f"mag_{band}", + f"{shear_prefix}mag_{band}", m_edges, delta_gamma, cut_source_bin=True, From fd593cb8f044ab98bc79ef896a7f565b70f39d27 Mon Sep 17 00:00:00 2001 From: empEvil Date: Wed, 20 May 2026 11:59:11 +0100 Subject: [PATCH 27/33] update on the weighting --- txpipe/ingest/lsst.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/txpipe/ingest/lsst.py b/txpipe/ingest/lsst.py index c04d424e5..32f5c08fe 100644 --- a/txpipe/ingest/lsst.py +++ b/txpipe/ingest/lsst.py @@ -76,7 +76,6 @@ def process_metadetect_data(data): "ra": var_data["ra"], "dec": var_data["dec"], "id": var_data["shearObjectId"], - "metaStep": var_data["metaStep"].astype("S"), #might not be needed "object_mask_fraction": var_data["mfrac"], #"n_epoch": var_data["nEpochCell"], "g1": var_data["gauss_g1"], @@ -94,7 +93,7 @@ def process_metadetect_data(data): "psf_g2": var_data["gauss_psfReconvolved_g1"], "psf_T_mean": var_data["gauss_psfReconvolved_T"], "flags": var_data["gauss_shape_flags"], # TO BE ADDRESSED! - "weight": np.ones_like(var_data["ra"]), + "weight": 1 / (0.5 * (var_data["gauss_g1_g1_Cov"] + var_data["gauss_g2_g2_Cov"])), } for band in "gri": # For DP2, we only expect 4 bands f = var_data[f"{band}_pgaussFlux"] From 91f90e5945c067a2532cff5a097b8ac3228ff2a8 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 22 May 2026 17:27:27 +0100 Subject: [PATCH 28/33] updating shear_tomo_cols --- txpipe/diagnostics.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 9f35b8ed9..30800f68c 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -237,7 +237,10 @@ def run(self): "m", ] + [f"mag_{b}" for b in self.config["bands"]] - shear_tomo_cols = ["bin"] + if self.config["shear_catalog_type"] == "metadetect": + shear_tomo_cols = ["bin_ns", "bin_1p", "bin_1m", "bin_2p", "bin_2m"] + else: + shear_tomo_cols = ["bin"] if self.config["shear_catalog_type"] == "metacal": more_iters = ["shear_tomography_catalog", "response", ["R_gamma"]] From 0b6f19e379798f4a81067ea65cb53a8e17d49a2e Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 22 May 2026 17:30:31 +0100 Subject: [PATCH 29/33] commenting out a qual_cut, to be deleted --- txpipe/diagnostics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 30800f68c..31ff30e22 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -715,7 +715,7 @@ def plot_g_histogram(self): if data is None: break - qual_cut = data["bin"] != -1 + #qual_cut = data["bin"] != -1 if cat_type == "metacal": g1 = data["g1"] From 60a0d1f45a37966c11e34396ece13e4267fce4c7 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 22 May 2026 17:32:33 +0100 Subject: [PATCH 30/33] addding bin to list --- txpipe/diagnostics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 31ff30e22..1ee34cd44 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -238,7 +238,7 @@ def run(self): ] + [f"mag_{b}" for b in self.config["bands"]] if self.config["shear_catalog_type"] == "metadetect": - shear_tomo_cols = ["bin_ns", "bin_1p", "bin_1m", "bin_2p", "bin_2m"] + shear_tomo_cols = ["bin", "bin_ns", "bin_1p", "bin_1m", "bin_2p", "bin_2m"] else: shear_tomo_cols = ["bin"] From 00682d94184e7a77aa3d0a9e601d113110440d8a Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 22 May 2026 17:36:53 +0100 Subject: [PATCH 31/33] trying to fix issue with plots --- txpipe/diagnostics.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/txpipe/diagnostics.py b/txpipe/diagnostics.py index 1ee34cd44..12cefe48f 100644 --- a/txpipe/diagnostics.py +++ b/txpipe/diagnostics.py @@ -238,7 +238,7 @@ def run(self): ] + [f"mag_{b}" for b in self.config["bands"]] if self.config["shear_catalog_type"] == "metadetect": - shear_tomo_cols = ["bin", "bin_ns", "bin_1p", "bin_1m", "bin_2p", "bin_2m"] + shear_tomo_cols = ["bin_ns", "bin_1p", "bin_1m", "bin_2p", "bin_2m"] else: shear_tomo_cols = ["bin"] @@ -794,6 +794,7 @@ def plot_snr_histogram(self): delta_gamma = self.config["delta_gamma"] shear_catalog_type = self.config["shear_catalog_type"] shear_prefix = "ns/" if shear_catalog_type == "metadetect" else "" + bin_type = "bin_ns" if self.config["shear_catalog_type"] == "metadetect" else "bin" bins = 10 edges = np.logspace(1, 3, bins + 1) mids = 0.5 * (edges[1:] + edges[:-1]) @@ -805,7 +806,7 @@ def plot_snr_histogram(self): if data is None: break - qual_cut = data["bin"] != -1 + qual_cut = data[bin_type] != -1 b1 = np.digitize(data[f"{shear_prefix}s2n"][qual_cut], edges) - 1 @@ -983,6 +984,7 @@ def plot_mag_histograms(self): width = edges[1] - edges[0] bands = self.config["bands"] shear_prefix = "ns/" if self.config["shear_catalog_type"] == "metadetect" else "" + bin_type = "bin_ns" if self.config["shear_catalog_type"] == "metadetect" else "bin" nband = len(bands) full_hists = [np.zeros(size, dtype=int) for b in bands] source_hists = [np.zeros(size, dtype=int) for b in bands] @@ -1001,7 +1003,7 @@ def plot_mag_histograms(self): count = w.sum() h1[i] += count - w &= data["bin"] >= 0 + w &= data[bin_type] >= 0 count = w.sum() h2[i] += count From 0204935d71a3dd11065bd76cd806454a11178a46 Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 22 May 2026 17:50:51 +0100 Subject: [PATCH 32/33] trying to fix mean shear for metadetect --- txpipe/shear_calibration/mean_shear_in_bins.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/txpipe/shear_calibration/mean_shear_in_bins.py b/txpipe/shear_calibration/mean_shear_in_bins.py index cb7d81ad0..30b5b7fd1 100644 --- a/txpipe/shear_calibration/mean_shear_in_bins.py +++ b/txpipe/shear_calibration/mean_shear_in_bins.py @@ -39,10 +39,13 @@ def selector(self, data, i): # select objects in bin i of the x variable. w = (x > self.limits[i]) & (x < self.limits[i + 1]) - + # Optionally cut down to the source sample only if self.cut_source_bin: - w &= data["bin"] != -1 + if self.config["shear_catalog_type"] == "metaddetect": + w & data[f"bin_{self.x_name[:2]}"] != -1 + else: + w &= data["bin"] != -1 return np.where(w)[0] def add_data(self, data): From 1b34c3466b031d4c3bc81f88f108f8052c550daa Mon Sep 17 00:00:00 2001 From: "Eske M. Pedersen" Date: Fri, 22 May 2026 17:52:10 +0100 Subject: [PATCH 33/33] fixing typo --- txpipe/shear_calibration/mean_shear_in_bins.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txpipe/shear_calibration/mean_shear_in_bins.py b/txpipe/shear_calibration/mean_shear_in_bins.py index 30b5b7fd1..945342033 100644 --- a/txpipe/shear_calibration/mean_shear_in_bins.py +++ b/txpipe/shear_calibration/mean_shear_in_bins.py @@ -42,7 +42,7 @@ def selector(self, data, i): # Optionally cut down to the source sample only if self.cut_source_bin: - if self.config["shear_catalog_type"] == "metaddetect": + if self.shear_catalog_type == "metadetect": w & data[f"bin_{self.x_name[:2]}"] != -1 else: w &= data["bin"] != -1