diff --git a/examples/ssi/config_ssi_sel_func.yml b/examples/ssi/config_ssi_sel_func.yml new file mode 100644 index 000000000..d6471a7ae --- /dev/null +++ b/examples/ssi/config_ssi_sel_func.yml @@ -0,0 +1,39 @@ +# Values in this section are accessible to all the different stages. +# They can be overridden by individual stages though. +global: + # This is read by many stages that read complete + # catalog data, and tells them how many rows to read + # at once + chunk_rows: 100_000 + # These mapping options are also read by a range of stages + pixelization: healpix + nside: 256 + sparse: true # Generate sparse maps - faster if using small areas + + +TXIngestSSIDetectionDESBalrog: + name: TXIngestSSIDetectionDESBalrog + +TXIngestSSIMatchedDESBalrog: + name: TXIngestSSIMatchedDESBalrog + +TXAuxiliarySSIMaps: + name: TXAuxiliarySSIMaps + mag_delta: 0.5 + smooth_det_frac: false + min_depth: 23 + max_depth: 24 + smooth_window: 2.0 + + +TXPredictDensitySelectionFunction: + name: TXPredictDensitySelectionFunction + systmaps_dir: /global/cfs/cdirs/lsst/groups/WLSS/users/tcornish/balrog_txpipe_example/survey_properties/ + degree: 2 + mask_thresh: 0. + mask_thresh_coarse: 0. + inj_count_thresh: 1 + sel_func_err_type: gaussian + block_size: 2_000 + bins_to_model: [0,1] + diff --git a/examples/ssi/pipeline_ssi_sel_func.yml b/examples/ssi/pipeline_ssi_sel_func.yml new file mode 100644 index 000000000..c7bd15184 --- /dev/null +++ b/examples/ssi/pipeline_ssi_sel_func.yml @@ -0,0 +1,43 @@ +# Stages to run +stages: + - name: TXIngestSSIDetectionDESBalrog + - name: TXIngestSSIMatchedDESBalrog + - name: TXAuxiliarySSIMaps + - name: TXPredictDensitySelectionFunction + +modules: txpipe + +# Where to put outputs +output_dir: /pscratch/sd/t/tcornish/txpipe_out/ + +# How to run the pipeline: mini, parsl, or cwl +launcher: + name: mini + interval: 1.0 + +# Where to run the pipeline: cori-interactive, cori-batch, or local +site: + name: local + max_threads: 2 + +# configuration settings +config: examples/ssi/config_ssi_sel_func.yml + +inputs: + # See README for paths to download these files + + #NERSC locations of public DES balrog data + balrog_detection_catalog: /global/cfs/cdirs/lsst/groups/WLSS/users/tcornish/balrog_txpipe_example/balrog_detection_catalog_sof_y3-merged_v1.2.fits + balrog_matched_catalog: /global/cfs/cdirs/lsst/groups/WLSS/users/tcornish/balrog_txpipe_example/balrog_matched_catalog_sof_y3-merged_v1.2.fits + mask: /global/cfs/cdirs/lsst/groups/WLSS/users/tcornish/balrog_txpipe_example/des_y3_mask.hdf5 + #aux_ssi_maps: /home/cornisht/cloned_repos/TXPipe/data/example/outputs_ssi_des_depth/aux_ssi_maps.hdf5 + + fiducial_cosmology: data/fiducial_cosmology.yml + +# if supported by the launcher, restart the pipeline where it left off +# if interrupted +resume: false +# where to put output logs for individual stages +log_dir: data/example/outputs_ssi_des_sel_func/logs/ +# where to put an overall parsl pipeline log +pipeline_log: data/example/outputs_ssi_des_sel_func/log.txt diff --git a/txpipe/auxiliary_maps.py b/txpipe/auxiliary_maps.py index bc2f1550f..202047562 100644 --- a/txpipe/auxiliary_maps.py +++ b/txpipe/auxiliary_maps.py @@ -11,7 +11,6 @@ ) from .data_types import MapsFile, HDFFile, ShearCatalog from .utils import choose_pixelization, import_dask -from .maps import map_config_options from ceci.config import StageParameter @@ -421,6 +420,9 @@ def run(self): maps["depth_det_prob/det_frac_by_mag_thres"] = depth_map_results[ "det_frac_by_mag_thres" ] + maps["depth_det_prob/inj_count_by_mag_thres"] = depth_map_results[ + "inj_count_by_mag_thres" + ] (maps,) = da.compute(maps) @@ -459,3 +461,458 @@ def run(self): for map_name, m in hsp_maps.items(): out.write_map(map_name, m, metadata) out.file["maps"].attrs.update(metadata) + + +class TXPredictDensitySelectionFunction(TXBaseMaps): + """ + Model selection function across footprint using survey property maps. + + Uses the selection measured in a subset of the footprint to model its dependence on + survey property maps, then uses that model to predict the selection function across + the remainder of the footprint. The model is fit via polynomial regression with + respect to the survey property maps, where the degree of the polynomial is specified + via the config. + TODO: Make it possible to select the degree of the fit for each SP map individually. + TODO: Add other options for the form of the model. + """ + + name = "TXPredictDensitySelectionFunction" + dask_parallel = True + inputs = [ + ("aux_ssi_maps", MapsFile), # Measured selection function and uncertainties + ("mask", MapsFile) # Mask defining survey geometry + + ] + outputs = [ + ("sel_func_pred_maps", MapsFile), # Model prediction of selection function and its uncertainties + ("sel_func_model_info", HDFFile), # Best-fit params and their covariance matrix + ] + + config_options = { + "block_size": StageParameter(int, 0, msg="Block size for dask processing (0 means auto)."), + "systmaps_dir": StageParameter(str, "", msg="Directory containing systematic maps."), + "degree": StageParameter(int, 1, msg="Degree of the polynomial fit."), + "mask_thresh": StageParameter(float, 0.0, msg="Threshold for masking pixels at native resolution of mask."), + "mask_thresh_coarse": StageParameter(float, 0.0, msg="Threshold for masking pixels after mask is degraded."), + "inj_count_thresh": StageParameter(int, 1, msg="Exclude pixels containing fewer injections than this number."), + "sel_func_err_type": StageParameter( + str, + "none", + msg="Type of uncertainty attributed to measured selection function. Can be either 'none' (assumes no " + "uncertainty) or 'gaussian' (uses a Gaussian approximation to estimate uncertainties)." + ), + "bins_to_model": StageParameter( + list, + [-1], + msg="List of depth bins for which the selection function is to be modelled. -1 means selection functions will be " + "modelled for every bin for which fracdet maps were created in TXAuxiliarySSIMaps." + ), + **map_config_options + } + + def run(self): + import glob + import healpy as hp + import healsparse as hsp + from functools import reduce + # Import dask and alias it as 'da' + _, da = import_dask() + # Assign imports to self to avoid repeated imports in other functions + self.da = da + + # Retrieve configuration parameters + block_size = self.config["block_size"] + if block_size == 0: + block_size = "auto" + + # Type of uncertainty estimate to use + err_type = self.config["sel_func_err_type"].lower() + + pixel_scheme = choose_pixelization(**self.config) + + # Load the map of the measured selection function and its uncertainties + with self.open_input("aux_ssi_maps", wrapper=True) as f: + sel_func_meas = f.read_map("depth_det_prob/det_frac_by_mag_thres") + ninj = f.read_map("depth_det_prob/inj_count_by_mag_thres") + # Ensure correct resolution (desired resolution can only be lower than + # or equal to the native resolution of the selection function map). + # "sum" reduction used for ninj since this is a counts map + sel_func_meas = sel_func_meas.degrade( + pixel_scheme.nside, + reduction="wmean", + weights=ninj + ) + ninj = ninj.degrade(pixel_scheme.nside, reduction="sum") + + with self.open_input("mask", wrapper=True) as f: + mask = f.read_mask( + thresh=self.config["mask_thresh"], + degrade_nside=pixel_scheme.nside + ) + + # Load survey property maps at the valid pixels + # TODO: Define these as individual inputs rather than glob search? + root = self.config["systmaps_dir"] + sysfiles = glob.glob(f"{root}*.hs") + nsys = len(sysfiles) + print(f"Found {nsys} total systematic maps") + spmaps = [ + hsp.HealSparseMap.read( + sf, + degrade_nside=pixel_scheme.nside + ) + for sf in sysfiles + ] + + # Identify valid pixels across mask and all SP maps + goodmap = hsp.operations.product_intersection([mask] + spmaps) + goodpix = goodmap.valid_pixels + + # Training pixels must also be valid in the selection function + pix_train = np.intersect1d(sel_func_meas.valid_pixels, goodpix) + # Remove any pixels below specified coverage fraction after degrading + pix_train = pix_train[mask[pix_train] >= self.config["mask_thresh_coarse"]] + + # Determine which samples are to have their selection functions modelled + if self.config["bins_to_model"] == [-1]: + samples = sel_func_meas.dtype.names + else: + samples = [f"bin{k}" for k in self.config["bins_to_model"]] + + # Cycle through the samples for which selection functions are to be modelled + maps = { + "sel_func": [], + "err_sel_func": [] + } + model_info = { + "alphas": [], + "cov_alphas": [] + } + for k in samples: + print(f"Modelling selection function for sample {k}") + + # Remove pixels with fewer than the specified no. of injections + pix_train_k = pix_train[ninj[k][pix_train] >= self.config["inj_count_thresh"]] + # Check there is at least one pixel which has survived all cuts + errmsg = f"Cannot model selection function for sample {k}; no valid pixels." + assert len(pix_train_k) > 0, errmsg + # Select training data + sel_func_train = da.from_array(sel_func_meas[k][pix_train_k], block_size) + block_size = sel_func_train.chunksize + ninj_train = da.from_array(ninj[k][pix_train_k], block_size) + spmaps_train = da.stack( + [ + da.from_array(m[pix_train_k], block_size) for m in spmaps + ] + ).T + # Get uncertainties on training data + err_sel_func_train = self.sel_func_uncertainties( + sel_func_train, + err_type=err_type, + ninj=(None if err_type == "none" else ninj_train) + ) + + # Get survey property values across remainder of footprint + pix_pred = np.setdiff1d(goodpix, pix_train_k) + spmaps_pred = da.stack( + [ + da.from_array(m[pix_pred], block_size) for m in spmaps + ] + ).T + + # Perform polynomial regression and retrieve the following: + # - mean prediction on the selection function (in each pixel) + # - 1-sigma uncertainty on these predictions (in each pixel) + # - best-fit coefficients for each survey property (+ an intercept term) + # - covariance matrix for the best-fit parameters + sel_func_pred, err_sel_func_pred, alphas, cov_alphas = self.polynomial_model( + spmaps_train, + sel_func_train, + err_sel_func_train, + spmaps_pred, + err_type + ) + + # Combine training data and predictions into single HealSparse maps + pix_all = np.concatenate([pix_train_k, pix_pred]) + sel_func_full = da.concatenate([sel_func_train, sel_func_pred]) + err_sel_func_full = da.concatenate([err_sel_func_train, err_sel_func_pred]) + # Sort by pixel number (will make saving to HealSparse format easier) + inds = pix_all.argsort() + chunk_sizes = sel_func_full.chunks[0] + chunk_bounds = np.cumsum([0] + list(chunk_sizes)) + sel_func_full = self.dask_sort(sel_func_full, inds, chunk_bounds) + err_sel_func_full = self.dask_sort(err_sel_func_full, inds, chunk_bounds) + maps["sel_func"].append(sel_func_full) + maps["err_sel_func"].append(err_sel_func_full) + # Store model info + model_info["alphas"].append(alphas) + model_info["cov_alphas"].append(cov_alphas) + + (maps,) = da.compute(maps) + + # Convert maps to healsparse map objects + hsp_maps = {} + dtypes = [(k, float) for k in samples] + for n, m in maps.items(): + # Both sel_func and err_sel_func are 2D; save as recarrays + primary = "bin0" + m_hsp = hsp.HealSparseMap.make_empty( + pixel_scheme.nside_coverage, + pixel_scheme.nside, + dtypes, + primary=primary + ) + m_hsp.update_values_pix( + goodpix, + np.zeros(len(goodpix), dtype=dtypes) + ) + for i,k in enumerate(samples): + m_hsp[k][goodpix] = m[i] + hsp_maps[n] = m_hsp + + # Prepare metadata for the maps. Copy the pixelization-related + # configuration options only here + metadata = {key: self.config[key] for key in map_config_options if key in self.config} + # Then add the other configuration options + metadata.update(pixel_scheme.metadata) + + # Write the output maps to the output file + with self.open_output("sel_func_pred_maps", wrapper=True) as out: + for map_name, hsp_map in hsp_maps.items(): + out.write_map(map_name, hsp_map, metadata) + out.file["maps"].attrs.update(metadata) + + # Prepare metadata for model parameter info + params_metadata = { + "model": f"polynomial (deg = {self.config['degree']})", + "inputs": [ + sf.split('/')[-1] for sf in sysfiles + ] + } + + # Write the model parameters and covariances to the output file + with self.open_output("sel_func_model_info", wrapper=True) as out: + mi = out.file.create_group("model_info") + out.file["model_info"].attrs.update(params_metadata) + gp_alphas = mi.create_group("alphas") + gp_cov = mi.create_group("cov_alphas") + for i,k in enumerate(samples): + gp_alphas.create_dataset(k, data=model_info["alphas"][i]) + gp_cov.create_dataset(k, data=model_info["cov_alphas"][i]) + + def sel_func_uncertainties(self, sel_func, err_type="none", ninj=None): + """ + Estimates uncertainties on the measured selection function. + + Parameters + ---------- + sel_func: dask.array, shape=(N,) + Selection function computed in TXAuxiliarySSIMaps, evaluated at pixels + which are valid in the N pixels which have at least the requisite + number of injections and are valid in all survey property maps. + + err_type: str, optional (default="none") + Determines the treatment of uncertainties on the measured selection + function. Currently two values for err_type are accommodated (case-independent): + - "none": selection function is treated as exact; zero uncertainty + - "gaussian": uncertainties are calculated under a Gaussian approx. + TODO: add other options, e.g. Wilson score interval. + + ninj: dask.array (shape=N,) or None; optional (default=None) + Total injected counts in each of the pixels covered by sel_func. Only required + if err_type=="gaussian". + + Returns + ------- + dask.array, shape=(N,) + Estimates of the uncertainty on the measured selection function in each pixel. + """ + if err_type == "none": + return self.da.zeros_like(sel_func) + elif err_type == "gaussian": + return self.da.sqrt(sel_func * (1 - sel_func) / ninj) + else: + raise ValueError( + "err_type must be either 'none' or 'gaussian'." + ) + + def sel_func_weights(self, err_type="none", err_sel_func=None): + """ + Converts selection function uncertainties into weights for modelling. + + Parameters + ---------- + err_type: str, optional (default="none") + Determines the treatment of uncertainties on the measured selection + function. Currently two values for err_type are accommodated (case-independent): + - "none": selection function is treated as exact; zero uncertainty + - "gaussian": uncertainties are calculated under a Gaussian approx. + + err_sel_func: dask.array, shape=(N,) or None; optional (default=None) + Uncertainty on the measured selection function in the N pixels which have at + least the requisite number of injections and are valid in all survey property + maps. Only required if err_type=="gaussian". + + Returns + ------- + dask.array, shape=(N,) + Weights corresponding to the uncertainty on the measured selection function in + each pixel. + """ + if err_type == "none": + # Return ones if uncertanties are treated as being zero + return self.da.ones(len(err_sel_func)) + else: + # Return inverse of variance + return 1 / (self.da.power(err_sel_func, 2)) + + def polynomial_model(self, X_train, y_train, yerr_train, X_pred, err_type="none"): + """ + Polynomial regression and model predictions for test data. + + Performs a polynomial regression of y with respect to X_train, then + generates predictions at X_pred. In this context, X is an array + containing survey property maps in certain pixels, and y is + the selection function measured in those pixels. + + Parameters + ---------- + X_train: dask.array, shape=(N_train, N_sys) + Survey property maps evaluated at the N_train pixels that have been selected + for use as training data for the model. + + y_train: dask.array, shape=(N_train,) + Measured selection function in the N_train training data pixels. + + yerr_train: dask.array, shape=(N_train,) + Uncertainties on the measured selection function in the N_train training data + pixels. + + X_pred: dask.array, shape=(N_pred, N_sys) + Survey property maps evaluated at all pixels in the survey footprint (as defined + by the mask) not included in the training set, which are also valid across all + survey property maps. I.e. if the footprint comprises N_tot of these valid pixels, + then N_pred = N_tot - N_train. + + err_type: str, optional (default="none") + Determines the treatment of uncertainties on the measured selection + function. Currently two values for err_type are accommodated (case-independent): + - "none": selection function is treated as exact; zero uncertainty + - "gaussian": uncertainties are calculated under a Gaussian approx. + + Returns + ------- + y_pred: dask.array, shape=(N_pred,) + Model predictions for the selection function in the N_pred pixels at which the + survey property maps were evaluated to construct X_pred. + + yerr_pred: dask.array, shape=(N_pred,) + Uncertainties on the model preditions for the selection function. + + alphas: numpy.array, shape=(deg*N_sys+1,) + Best-fit parameters from the model. Shape will depend on the degree of the + polynomial (deg) specified in the config file. The first parameter is the bias + term; the next N_sys parameters correspond to each survey property map. If + deg >= 2, then the following N_sys parameters correspond to the squares of the + survey property maps, etc. The indices of the parameters corresponding to + the i'th survey property, its square, its cube, etc., can be obtained via: + `inds = [1 + i + d * N_sys for d in range(deg)].` + The names of the survey property maps are saved in the correct order as part of + the outputs of this stage so that they can be associated with their corresponding + parameters. + + cov_alphas: numpy.array, shape=(deg*N_sys+1, deg*N_sys+1) + Covariance matrix of the best-fit parameters. + """ + da = self.da + # Degree of polynomial fit + deg = self.config["degree"] + + # Weights for each training pixel + W = self.sel_func_weights(err_type, yerr_train) + # If Gaussian approx was used for uncertainty estimation, pixels in + # which the selection function is 0 or 1 will have zero uncertainty + # (and thus infinite weight); remove these pixels here. + (keep,) = da.compute(da.isfinite(W)) + X_train = X_train[keep, :] + y_train = y_train[keep] + W = W[keep] + + # Construct inputs matrix for regression + m = len(X_train[:, 0]) + ones = da.ones((m, 1), chunks=(X_train.chunks[0], 1)) + X = da.hstack([ones, X_train]) + for n in range(2, deg + 1): + X = da.hstack([X, da.power(X_train, n)]) + + # Explicitly compute necessary array combinations + # NOTE: this assumes a diagonal covariance matrix for y, + # i.e. cov_y = diag(W) + (XtWX,) = da.compute(X.T @ (W[:, None] * X)) + (XtWy,) = da.compute(X.T @ (W * y_train)) + + # Compute best-fit coeffs (alphas) and their covariance + alphas = np.linalg.solve(XtWX, XtWy) + cov_alphas = np.linalg.inv(XtWX) + + # Now make predictions at X_pred + m = len(X_pred[:,0]) + ones = da.ones((m, 1), chunks=(X_pred.chunks[0], 1)) + X = da.hstack([ones, X_pred]) + for n in range(2, deg + 1): + X = da.hstack([X, da.power(X_pred, n)]) + y_pred = alphas @ X.T + # The following is a cheaper equivalent of doing + # `sqrt(diag(X @ cov_alphas @ X.T))` + yerr_pred = (X @ cov_alphas * X).sum(axis=1) + + return y_pred, yerr_pred, alphas, cov_alphas + + def dask_sort(self, X, inds, chunk_boundaries): + """ + Reorders a dask array using `inds`, without creating more chunks. + + This is motivated by wanting to order certain dask arrays by their corresponding + HEALPix pixel indices, which can result in dask creating many more chunks and + slowing down subsequent computations. This function sorts the pixel indices by chunk, + and then gathers from the target array one chunk at a time. + + Parameters + ---------- + X: dask.array, shape=(N,) + Dask array to be sorted according to `inds`. + + inds: numpy.array, shape=(N,) + Array containing the indices of X in order of ascending HEALPix pixel ID. + + chunk_boundaries: numpy.array, shape=(ceil(N/m)+1,) + Boundaries of each chunk of size m, including the left edge and excluding the right. + E.g. The first chunk will have boundaries [0, m), the next will have [m, 2*m), etc. + + Returns + ------- + X_sorted: dask.array, shape=(N,) + Sorted version of the input array + """ + n_chunks = len(chunk_boundaries) - 1 + + # Assign each index in `inds` to a chunk + chunk_ids = np.searchsorted(chunk_boundaries[1:], inds, side='right') + + # Sort inds by chunk, preserving the mapping back to output positions + order = np.argsort(chunk_ids, kind='stable') + inds_sorted = inds[order] + + # Gather from X one chunk at a time + X_sorted = self.da.empty(len(inds), dtype=X.dtype, chunks=X.chunks) + + for c in range(n_chunks): + mask = chunk_ids[order] == c + if not mask.any(): + continue + local_inds = inds_sorted[mask] - chunk_boundaries[c] + chunk_data = X[chunk_boundaries[c]:chunk_boundaries[c+1]] + X_sorted[order[mask]] = chunk_data[local_inds] + + return X_sorted diff --git a/txpipe/data_types.py b/txpipe/data_types.py index fb3fe261c..808a5a465 100755 --- a/txpipe/data_types.py +++ b/txpipe/data_types.py @@ -345,8 +345,8 @@ def read_mask( # Setting pixels to the sentinel can be buggy in healsparse # so we'll make a copy of the mask with the < thresh pixels removed mask = hsp.HealSparseMap.make_empty_like(mask_in) - pix_to_keep = mask.valid_pixels[mask[mask.valid_pixels] > thresh] - mask.update_values_pix(pix_to_keep, mask[pix_to_keep]) + pix_to_keep = mask_in.valid_pixels[mask_in[mask_in.valid_pixels] > thresh] + mask.update_values_pix(pix_to_keep, mask_in[pix_to_keep]) else: mask = mask_in diff --git a/txpipe/mapping/dr1.py b/txpipe/mapping/dr1.py index c62479b42..ccd4672b4 100755 --- a/txpipe/mapping/dr1.py +++ b/txpipe/mapping/dr1.py @@ -187,6 +187,7 @@ def make_dask_depth_map_det_prob( # loop over mag bins # TODO: add option to compute fraction *at* each magnitude, rather than below frac_list = [] + ntot_list = [] # Needed for error estimation on det frac (e.g. as done in TXModelSelectionFunction) for mag_thresh in mag_edges: above_thresh = mag < mag_thresh ntot = da.bincount(sparse_index, weights=above_thresh, minlength=npix_sparse) @@ -195,7 +196,9 @@ def make_dask_depth_map_det_prob( ) frac_det = da.where(ntot != 0, ndet / ntot, np.nan) frac_list.append(frac_det) + ntot_list.append(ntot) det_frac_by_mag_thres = da.stack(frac_list) + inj_count_by_mag_thres = da.stack(ntot_list) # Optional smoothing of the stacked detection fractions if smooth_det_frac: @@ -233,5 +236,6 @@ def make_dask_depth_map_det_prob( "inj_count_map": inj_count_map, "depth_map": depth_map, "det_frac_by_mag_thres": det_frac_by_mag_thres, + "inj_count_by_mag_thres": inj_count_by_mag_thres, "mag_edges": mag_edges, }