diff --git a/narf/histutils.py b/narf/histutils.py index f08514b..d1ef73d 100644 --- a/narf/histutils.py +++ b/narf/histutils.py @@ -19,6 +19,24 @@ narf.clingutils.Declare('#include ') narf.clingutils.Declare('#include ') +class SparseStorage: + """Storage option for HistoBoost selecting a narf::concurrent_sparse_storage + backed by a narf::concurrent_flat_map. Conversion to a python hist.Hist + object is not supported in this mode. + + Parameters + ---------- + fill_fraction : float + Estimated fraction of bins (including under/overflow) that will be + populated. Used to size the underlying concurrent_flat_map so that + most fills hit the initial allocation rather than triggering + on-the-fly expansion. Values outside (0, 1] are accepted; pass a + small number for very sparse fills. + """ + def __init__(self, fill_fraction=0.1): + self.fill_fraction = float(fill_fraction) + + def bool_to_string(b): if b: return "true" @@ -114,7 +132,7 @@ def make_array_interface_view(boost_hist): underflow = [axis.traits.underflow for axis in boost_hist.axes] - acc_type = convert_storage_type(boost_hist._storage_type) + acc_type = convert_storage_type(boost_hist.storage_type) arrview = ROOT.narf.array_interface_view[acc_type, len(shape)](arr, shape, strides, underflow) return arrview @@ -132,9 +150,9 @@ def hist_to_pyroot_boost(hist_hist, tensor_rank = 0, force_atomic = False): scalar_type = ROOT.double dimensions = ROOT.Eigen.Sizes[tuple(tensor_sizes)] - if issubclass(hist_hist._storage_type, bh.storage.Double): + if issubclass(hist_hist.storage_type, bh.storage.Double): cppstoragetype = ROOT.narf.tensor_accumulator[scalar_type, dimensions] - elif issubclass(hist_hist._storage_type, bh.storage.Weight): + elif issubclass(hist_hist.storage_type, bh.storage.Weight): cppstoragetype = ROOT.narf.tensor_accumulator[ROOT.boost.histogram.accumulators.weighted_sum[scalar_type], dimensions] else: raise TypeError("Requested storage type is not supported with tensor weights currently") @@ -143,7 +161,7 @@ def hist_to_pyroot_boost(hist_hist, tensor_rank = 0, force_atomic = False): cppstoragetype = ROOT.narf.atomic_adaptor[cppstoragetype] else: python_axes = hist_hist.axes - cppstoragetype = convert_storage_type(hist_hist._storage_type, force_atomic = force_atomic) + cppstoragetype = convert_storage_type(hist_hist.storage_type, force_atomic = force_atomic) cppaxes = [ROOT.std.move(convert_axis(axis)) for axis in python_axes] @@ -153,7 +171,7 @@ def hist_to_pyroot_boost(hist_hist, tensor_rank = 0, force_atomic = False): return pyroot_boost_hist -def _histo_boost(df, name, axes, cols, storage = bh.storage.Weight(), force_atomic = None, tensor_axes = None, convert_to_hist = True): +def _histo_boost(df, name, axes, cols, storage = bh.storage.Weight(), force_atomic = None, tensor_axes = None, convert_to_hist = True, metadata = None): # first construct a histogram from the hist python interface, then construct a boost histogram # using PyROOT with compatible axes and storage types, adopting the underlying storage # of the python hist histogram @@ -161,6 +179,68 @@ def _histo_boost(df, name, axes, cols, storage = bh.storage.Weight(), force_atom if force_atomic is None: force_atomic = ROOT.ROOT.IsImplicitMTEnabled() + # Sparse storage path: build a narf::sparse_histogram backed by a + # concurrent_flat_map. The result is exposed as a wums.SparseHist. + if isinstance(storage, SparseStorage): + if tensor_axes is not None: + raise NotImplementedError("Tensor weights are not supported with SparseStorage") + coltypes = [df.GetColumnType(col) for col in cols] + for coltype in coltypes[len(axes):]: + traits = ROOT.narf.tensor_traits[coltype] + if traits.is_tensor: + raise NotImplementedError("Tensor weights are not supported with SparseStorage") + cppaxes = [ROOT.std.move(convert_axis(axis)) for axis in axes] + hfill = ROOT.narf.make_histogram_sparse[ROOT.narf.atomic_adaptor[ROOT.double]](storage.fill_fraction, *cppaxes) + helper = ROOT.narf.FillBoostHelperAtomic[type(hfill)](ROOT.std.move(hfill)) + targs = tuple([type(df), type(helper)] + coltypes) + res = ROOT.narf.book_helper[targs](df, ROOT.std.move(helper), cols) + + if not convert_to_hist: + return res + + # Lazily convert the underlying C++ sparse histogram to a wums.SparseHist + # the first time the result is dereferenced. + from wums.sparse_hist import SparseHist + + res._GetPtr = res.GetPtr + res._sparse_hist = None + python_axes_sparse = list(axes) + + def _build_sparse(): + if res._sparse_hist is not None: + return res._sparse_hist + cpp_hist = res._GetPtr() + snapshot = ROOT.narf.sparse_histogram_snapshot(cpp_hist) + n = len(snapshot) + boost_flat = np.empty(n, dtype=np.int64) + vals = np.empty(n, dtype=np.float64) + for i, kv in enumerate(snapshot): + boost_flat[i] = int(kv.first) + vals[i] = float(kv.second) + extents = tuple(int(ax.extent) for ax in python_axes_sparse) + size = int(np.prod(extents)) if extents else 1 + # boost::histogram linearizes column-major (leftmost axis = stride 1), + # but wums.SparseHist expects numpy row-major (C order). Remap by + # un-raveling under F order and re-raveling under C order. + if n and len(extents) > 1: + multi = np.unravel_index(boost_flat, extents, order="F") + flat = np.ravel_multi_index(multi, extents, order="C").astype(np.int64) + else: + flat = boost_flat + res._sparse_hist = SparseHist._from_flat( + flat, vals, python_axes_sparse, size, metadata=metadata + ) + return res._sparse_hist + + ret_null = lambda: None + res.__deref__ = _build_sparse + res.__follow__ = _build_sparse + res.begin = ret_null + res.end = ret_null + res.GetPtr = _build_sparse + res.GetValue = _build_sparse + return res + #TODO some of this code can be shared with root histogram version #FIXME make this more generic @@ -244,7 +324,7 @@ def _histo_boost(df, name, axes, cols, storage = bh.storage.Weight(), force_atom if convert_to_hist: - _hist = hist.Hist(*python_axes, storage = storage, name = name) + _hist = hist.Hist(*python_axes, storage = storage, name = name, metadata = metadata) arrview = make_array_interface_view(_hist) helper = ROOT.narf.FillBoostHelperAtomic[type(arrview), type(hfill)](ROOT.std.move(arrview), ROOT.std.move(hfill)) @@ -641,8 +721,23 @@ def pythonize_rdataframe(klass): klass.HistoNDWithBoost = _histond_with_boost klass.SumAndCount = _sum_and_count -def build_quantile_hists(df, cols, condaxes, quantaxes): - """Build histograms which encode conditional quantiles for the provided variables, to be used with define_quantile_ints""" +def build_quantile_hists(df, cols, condaxes, quantaxes, continuous=False): + """Build histograms which encode conditional quantiles for the provided variables, to be used with define_quantiles. + + When ``continuous=True`` the original quantile axes are kept as-is in the + returned helper histograms (instead of being replaced by ``Integer`` axes). + ``define_quantiles`` detects this from the axis type and uses the + continuous quantile helpers, which return CDF-style values in ``[0, 1]`` + obtained by linearly interpolating between the stored quantile edges. + + Returns a tuple ``(quantile_hists, centers_hist, volume_hist)`` where + ``centers_hist`` gives the multidimensional center (one component per + quantile variable, along an extra ``coord`` ``StrCategory`` axis labelled + with the input quantile axis names when available) of each final + transformed quantile bin, and ``volume_hist`` gives the product of the + per-dimension widths of the same bin, both indexed by the full set of + conditional and (integer or original) quantile axes. + """ arraxes = condaxes + quantaxes @@ -678,6 +773,8 @@ def build_quantile_hists(df, cols, condaxes, quantaxes): quantile_integer_axes = [] quantile_hists = [] + quantile_mins_list = [] + quantile_maxs_list = [] for iax, (col, axis) in enumerate(zip(cols, arraxes)): @@ -707,29 +804,355 @@ def build_quantile_hists(df, cols, condaxes, quantaxes): quantile_edges = np.reshape(quantile_edges, shape_final[:iax+1]) quantile_edges = ak.to_numpy(quantile_edges) + quantile_mins = ak.min(arr[..., col], axis=-1, mask_identity=False) + quantile_mins = np.reshape(quantile_mins, shape_final[:iax+1]) + quantile_mins = ak.to_numpy(quantile_mins) + # replace -infinity from empty values with the previous bin edge # so that the quantile edges are at least still monotonic nquants = quantile_edges.shape[-1] for iquant in range(1, nquants): quantile_edges[..., iquant] = np.where(quantile_edges[..., iquant]==-np.inf, quantile_edges[..., iquant-1], quantile_edges[..., iquant]) + # replace +infinity from empty values with the next bin's min, + # keeping mins monotonic and avoiding negative bin widths; any + # remaining +infinities (trailing empty bins) fall back to the + # corresponding max so the bin collapses to zero width. + for iquant in range(nquants - 2, -1, -1): + quantile_mins[..., iquant] = np.where(quantile_mins[..., iquant]==np.inf, quantile_mins[..., iquant+1], quantile_mins[..., iquant]) + quantile_mins = np.where(quantile_mins==np.inf, quantile_edges, quantile_mins) + + quantile_mins_list.append(quantile_mins) + quantile_maxs_list.append(quantile_edges) + iquantax = iax - ncond - quantile_integer_axis = hist.axis.Integer(0, axis.size, underflow=False, overflow=False, name=f"{axis.name}_int") + # Output quantile axis: N bins, used as conditioning axis for + # later helpers and as the bin axis of centers/volume hists. + if continuous: + quantile_integer_axis = axis + else: + quantile_integer_axis = hist.axis.Integer(0, axis.size, underflow=False, overflow=False, name=f"{axis.name}_int") quantile_integer_axes.append(quantile_integer_axis) - helper_axes = condaxes[:iax+1] + quantile_integer_axes + # Helper histogram: N + 1 storage slots in both modes: + # edges[0] = left boundary of bin 0 (observed minimum of first + # quantile bin, where CDF = 0), edges[1..N] = right boundaries. + # Uniform layout gives bin 0 its own dedicated segment. + if continuous: + storage_axis = hist.axis.Regular( + axis.size + 1, 0.0, 1.0, + name=f"{axis.name}_edges", + underflow=False, overflow=False, + ) + else: + storage_axis = hist.axis.Integer( + 0, axis.size + 1, + underflow=False, overflow=False, + name=f"{axis.name}_edges_int", + ) + helper_axes = ( + condaxes[:iax+1] + quantile_integer_axes[:-1] + + [storage_axis] + ) helper_hist = hist.Hist(*helper_axes) - helper_hist.values(flow=True)[...] = quantile_edges + val_min_arr = quantile_mins[..., 0:1].astype( + quantile_edges.dtype, copy=False + ) + stored_edges = np.concatenate( + [val_min_arr, quantile_edges], axis=-1 + ) + helper_hist.values(flow=True)[...] = stored_edges quantile_hists.append(helper_hist) - return quantile_hists + # Compute multidim centers and volumes for the final transformed bins. + # Each per-axis widths/centers array has shape covering only the + # conditional dims plus the quantile dims up to that axis; broadcast to + # the full (cond + all quant) shape and combine. + full_shape = tuple(shape_final) + per_dim_widths = [] + per_dim_centers = [] + for i in range(nquant): + mins_i = quantile_mins_list[i] + maxs_i = quantile_maxs_list[i] + widths_i = maxs_i - mins_i + centers_i = 0.5 * (maxs_i + mins_i) + ndim_extra = nquant - i - 1 + new_shape = widths_i.shape + (1,) * ndim_extra + per_dim_widths.append(np.broadcast_to(widths_i.reshape(new_shape), full_shape)) + per_dim_centers.append(np.broadcast_to(centers_i.reshape(new_shape), full_shape)) + + final_axes = condaxes + quantile_integer_axes + volume_hist = hist.Hist(*final_axes) + if nquant > 0: + volume = per_dim_widths[0].copy() + for w in per_dim_widths[1:]: + volume = volume * w + volume_hist.values(flow=True)[...] = volume + + # Use the input quantile axis names as labels on the coord axis, with + # a unique placeholder for any unnamed axis. + coord_names = [] + for i, ax in enumerate(quantaxes): + name = getattr(ax, "name", "") or "" + if not name or name in coord_names: + name = f"quant_{i}" + coord_names.append(name) + coord_axis = hist.axis.StrCategory(coord_names, name="coord", overflow=False) + centers_hist = hist.Hist(*final_axes, coord_axis) + centers_hist.values(flow=True)[...] = np.stack(per_dim_centers, axis=-1) + else: + centers_hist = hist.Hist(*final_axes) + + return quantile_hists, centers_hist, volume_hist + +def build_quantile_hists_from_fine(fine_hist, condaxes, quantaxes, continuous=False): + """Build quantile helper histograms from a pre-filled finely-binned histogram. -def define_quantile_ints(df, cols, quantile_hists): - """Define transformed columns corresponding to conditional quantile bins (integers)""" + Takes a single multi-dimensional finely-binned histogram (filled in one + event loop) and computes chained conditional quantile edges by cumulative- + sum analysis. Quantile boundaries are restricted to the bin edges of the + fine histogram. + + The fine histogram axes must be ``condaxes + fine_quantile_axes`` in order, + where each fine quantile axis is a finely-binned version of the + corresponding quantile variable covering its full data range. + + Returns ``(quantile_hists, centers_hist, volume_hist)`` with the same + format as :func:`build_quantile_hists`. + """ + + ncond = len(condaxes) + nquant = len(quantaxes) + fine_axes_all = list(fine_hist.axes) + assert len(fine_axes_all) == ncond + nquant, ( + f"fine_hist has {len(fine_axes_all)} axes, expected {ncond + nquant}" + ) + + vals = fine_hist.values(flow=True) + + quantile_integer_axes = [] + quantile_hists_list = [] + quantile_mins_list = [] + quantile_maxs_list = [] + + for iquant in range(nquant): + qax = quantaxes[iquant] + nqbins = qax.size + fine_ax = fine_axes_all[ncond + iquant] + fine_edges = np.asarray(fine_ax.edges, dtype=np.float64) + n_fine = fine_ax.size + n_fine_flow = fine_ax.extent + + # Project out remaining fine axes (after the current one). + projected = vals + for _ in range(nquant - iquant - 1): + projected = projected.sum(axis=-1) + # projected shape: (cond_extents..., [quant_sizes...], fine_extent) + + # Cumulative sum along the last (fine) axis. + cumsum = np.cumsum(projected, axis=-1) + + # Map from fine-bin index (flow-inclusive) to variable value at + # the right edge of that bin. + bin_right = np.empty(n_fine_flow) + bin_right[0] = fine_edges[0] + bin_right[1 : n_fine + 1] = fine_edges[1:] + if n_fine_flow > n_fine + 1: + bin_right[-1] = fine_edges[-1] + + bin_left = np.empty(n_fine_flow) + bin_left[0] = fine_edges[0] + bin_left[1 : n_fine + 1] = fine_edges[:-1] + if n_fine_flow > n_fine + 1: + bin_left[-1] = fine_edges[-1] + + # Quantile fractions (right edge of each output bin in CDF space). + fractions = np.asarray( + [qax.edges[j + 1] for j in range(nqbins)], dtype=np.float64 + ) + + # Flatten conditional dimensions for the searchsorted loop. + cond_shape = projected.shape[:-1] + n_cells = max(1, int(np.prod(cond_shape))) + flat_cumsum = cumsum.reshape(n_cells, -1) + flat_total = flat_cumsum[:, -1] + + flat_maxs = np.full((n_cells, nqbins), -np.inf) + flat_mins = np.full((n_cells, nqbins), np.inf) + bin_starts = np.zeros((n_cells, nqbins), dtype=np.int64) + bin_ends = np.zeros((n_cells, nqbins), dtype=np.int64) + + for icell in range(n_cells): + if flat_total[icell] <= 0: + continue + prev = 0 + for iq in range(nqbins): + threshold = fractions[iq] * flat_total[icell] + idx = int(np.searchsorted(flat_cumsum[icell], threshold)) + idx = min(idx, n_fine_flow - 1) + flat_maxs[icell, iq] = bin_right[idx] + if prev < n_fine_flow: + flat_mins[icell, iq] = bin_left[prev] + bin_starts[icell, iq] = prev + bin_ends[icell, iq] = idx + 1 + prev = idx + 1 + + quantile_maxs = flat_maxs.reshape(cond_shape + (nqbins,)) + quantile_mins = flat_mins.reshape(cond_shape + (nqbins,)) + + # Fix empty-bin infinities (same logic as build_quantile_hists). + for iq in range(1, nqbins): + quantile_maxs[..., iq] = np.where( + quantile_maxs[..., iq] == -np.inf, + quantile_maxs[..., iq - 1], + quantile_maxs[..., iq], + ) + for iq in range(nqbins - 2, -1, -1): + quantile_mins[..., iq] = np.where( + quantile_mins[..., iq] == np.inf, + quantile_mins[..., iq + 1], + quantile_mins[..., iq], + ) + quantile_mins = np.where( + quantile_mins == np.inf, quantile_maxs, quantile_mins + ) + + quantile_mins_list.append(quantile_mins) + quantile_maxs_list.append(quantile_maxs) + + # Output quantile axis: indexes the N quantile bins and is used + # both as the conditioning axis for later helpers in the chain + # and as the bin axis of the returned centers/volume hists. + # Always N bins, independent of integer vs continuous mode. + if continuous: + quantile_integer_axis = qax + else: + quantile_integer_axis = hist.axis.Integer( + 0, qax.size, underflow=False, overflow=False, + name=f"{qax.name}_int", + ) + quantile_integer_axes.append(quantile_integer_axis) + + # Helper histogram: stores per-conditional-cell edge tensor for + # the C++ quantile_lookup. In both modes the last axis has N + 1 + # slots: edges[0] = left boundary of bin 0 (the fine-axis lower + # edge, where CDF = 0), edges[1..N] = right boundaries of the N + # bins. The uniform N + 1 layout gives each bin — including the + # first — its own dedicated segment in the CDF and its own + # dedicated bin index under iquant - 1 clamp. Integer mode uses + # an Integer(0, N+1) storage axis so define_quantiles' isinstance + # check keeps detecting the mode. + if continuous: + storage_axis = hist.axis.Regular( + nqbins + 1, 0.0, 1.0, + name=f"{qax.name}_edges", + underflow=False, overflow=False, + ) + else: + storage_axis = hist.axis.Integer( + 0, nqbins + 1, + underflow=False, overflow=False, + name=f"{qax.name}_edges_int", + ) + helper_axes = ( + list(condaxes) + list(quantile_integer_axes[:-1]) + + [storage_axis] + ) + helper_hist = hist.Hist(*helper_axes) + val_min = float(fine_edges[0]) + lead_shape = quantile_maxs.shape[:-1] + (1,) + val_min_arr = np.full( + lead_shape, val_min, dtype=quantile_maxs.dtype + ) + stored_edges = np.concatenate( + [val_min_arr, quantile_maxs], axis=-1 + ) + helper_hist.values(flow=True)[...] = stored_edges + quantile_hists_list.append(helper_hist) + + # Rebin vals: replace the current fine axis with the quantile axis + # by summing fine bins within each quantile bin (contiguous ranges + # determined by the cumsum thresholds above). + fine_pos = ncond + iquant + pre_shape = vals.shape[:fine_pos] + post_shape = vals.shape[fine_pos + 1 :] + n_pre = max(1, int(np.prod(pre_shape))) + n_post = max(1, int(np.prod(post_shape))) + flat_vals = vals.reshape(n_pre, vals.shape[fine_pos], n_post) + flat_new = np.zeros((n_pre, nqbins, n_post), dtype=vals.dtype) + for icell in range(n_pre): + for iq in range(nqbins): + s = bin_starts[icell, iq] + e = bin_ends[icell, iq] + if s < e: + flat_new[icell, iq] = flat_vals[icell, s:e].sum(axis=0) + new_shape = list(vals.shape) + new_shape[fine_pos] = nqbins + vals = flat_new.reshape(new_shape) + + # Centers and volumes (same logic as build_quantile_hists). + full_shape = ( + quantile_maxs_list[-1].shape if quantile_maxs_list else () + ) + per_dim_widths = [] + per_dim_centers = [] + for i in range(nquant): + widths_i = quantile_maxs_list[i] - quantile_mins_list[i] + centers_i = 0.5 * (quantile_maxs_list[i] + quantile_mins_list[i]) + ndim_extra = nquant - i - 1 + new_shape = widths_i.shape + (1,) * ndim_extra + per_dim_widths.append( + np.broadcast_to(widths_i.reshape(new_shape), full_shape) + ) + per_dim_centers.append( + np.broadcast_to(centers_i.reshape(new_shape), full_shape) + ) + + final_axes = list(condaxes) + quantile_integer_axes + volume_hist = hist.Hist(*final_axes) + if nquant > 0: + volume = per_dim_widths[0].copy() + for w in per_dim_widths[1:]: + volume *= w + volume_hist.values(flow=True)[...] = volume + + coord_names = [] + for i, ax in enumerate(quantaxes): + name = getattr(ax, "name", "") or "" + if not name or name in coord_names: + name = f"quant_{i}" + coord_names.append(name) + coord_axis = hist.axis.StrCategory( + coord_names, name="coord", overflow=False + ) + centers_hist = hist.Hist(*final_axes, coord_axis) + centers_hist.values(flow=True)[...] = np.stack( + per_dim_centers, axis=-1 + ) + else: + centers_hist = hist.Hist(*final_axes) + + return quantile_hists_list, centers_hist, volume_hist + + +def define_quantiles(df, cols, quantile_hists, label=""): + """Define transformed columns corresponding to conditional quantiles. + + By default the helpers return the integer quantile bin index. If the + helper histograms produced by :func:`build_quantile_hists` were built in + continuous mode (their trailing quantile axis is not a plain ``Integer`` + axis), the continuous quantile helpers are used instead, returning a + CDF-style value in ``[0, 1]``. + + An optional ``label`` string is inserted into the output column names + (e.g. ``label="plus"`` turns ``col_quant`` into ``col_plus_quant``) so + that the same transform can be applied independently to different sets of + input columns without naming collisions. + """ ncols = len(cols) nquant = len(quantile_hists) @@ -738,23 +1161,39 @@ def define_quantile_ints(df, cols, quantile_hists): condcols = cols[:ncond] quantcols = cols[ncond:] + # Detect continuous mode from the trailing (quantile) axis of the first + # helper histogram: continuous-mode helper histograms preserve the + # original (Regular / Variable) quantile axis, integer-mode ones use a + # generated Integer axis. + continuous = not isinstance(quantile_hists[0].axes[-1], hist.axis.Integer) + helper_cols_cond = condcols.copy() + suffix = "_quant" if continuous else "_iquant" + if label: + suffix = f"_{label}{suffix}" + for col, quantile_hist in zip(quantcols, quantile_hists): if len(quantile_hist.axes) > 1: helper_hist = narf.hist_to_pyroot_boost(quantile_hist, tensor_rank=1) - quanthelper = ROOT.narf.make_quantile_helper(ROOT.std.move(helper_hist)) + if continuous: + quanthelper = ROOT.narf.make_quantile_helper_continuous(ROOT.std.move(helper_hist)) + else: + quanthelper = ROOT.narf.make_quantile_helper(ROOT.std.move(helper_hist)) else: # special case for static quantiles with no conditional variables vals = quantile_hist.values() arr = ROOT.std.array["double", vals.size](vals) - quanthelper = ROOT.narf.QuantileHelperStatic[vals.size](arr) + if continuous: + quanthelper = ROOT.narf.QuantileHelperStaticContinuous[vals.size](arr) + else: + quanthelper = ROOT.narf.QuantileHelperStatic[vals.size](arr) helper_cols = helper_cols_cond + [col] - outname = f"{col}_iquant" - df = df.Define(outname, quanthelper, helper_cols) + outname = f"{col}{suffix}" + df = narf.rdfutils.flexible_define(df, outname, quanthelper, helper_cols) helper_cols_cond.append(outname) quantile_axes = list(quantile_hists[-1].axes) diff --git a/narf/include/concurrent_flat_map.hpp b/narf/include/concurrent_flat_map.hpp new file mode 100644 index 0000000..e1c0cd2 --- /dev/null +++ b/narf/include/concurrent_flat_map.hpp @@ -0,0 +1,384 @@ +#ifndef NARF_CONCURRENT_FLAT_MAP_HPP +#define NARF_CONCURRENT_FLAT_MAP_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace narf { + +// Lock-free, insert-only, expandable concurrent flat hash map. +// +// Properties: +// - find / insert / emplace / expansion are all lock-free and safe to call +// concurrently from any number of threads. +// - Elements are never erased; once inserted, the address of an element's +// value is stable for the lifetime of the map (pointers returned by +// find/emplace remain valid). +// - The container grows by appending geometrically larger segments; existing +// segments are never rehashed, so concurrent readers are never disturbed. +// +// Requirements on Key: +// - Must be an integer type. +// - The two most significant bits of any user-supplied key must be zero; +// they are reserved for internal slot state (occupied / busy markers). +template > +class concurrent_flat_map { + static_assert(std::is_integral_v, "Key must be an integer type"); + +public: + using key_type = Key; + using mapped_type = Value; + using hasher = Hash; + +private: + using UKey = std::make_unsigned_t; + + static constexpr unsigned kKeyBits = sizeof(UKey) * 8; + static constexpr UKey kOccupiedBit = UKey(1) << (kKeyBits - 1); + static constexpr UKey kBusyBit = UKey(1) << (kKeyBits - 2); + static constexpr UKey kStateMask = kOccupiedBit | kBusyBit; + static constexpr UKey kPayloadMask = ~kStateMask; + static constexpr UKey kEmpty = 0; + + struct Slot { + std::atomic key{kEmpty}; + alignas(Value) unsigned char storage[sizeof(Value)]; + + Value* value_ptr() noexcept { + return std::launder(reinterpret_cast(&storage)); + } + }; + + struct Segment { + const std::size_t capacity; + const std::size_t mask; + std::atomic size{0}; + std::unique_ptr slots; + std::atomic next{nullptr}; + + explicit Segment(std::size_t cap) + : capacity(cap), mask(cap - 1), slots(new Slot[cap]) {} + + ~Segment() { + if constexpr (!std::is_trivially_destructible_v) { + for (std::size_t i = 0; i < capacity; ++i) { + UKey k = slots[i].key.load(std::memory_order_relaxed); + if ((k & kOccupiedBit) && !(k & kBusyBit)) { + slots[i].value_ptr()->~Value(); + } + } + } + } + }; + + static constexpr std::size_t kDefaultInitialCapacity = 64; + static constexpr std::size_t kMaxProbe = 32; + + Segment* head_; + std::atomic tail_; + Hash hash_; + + static UKey encode(Key key) noexcept { + return (static_cast(key) & kPayloadMask) | kOccupiedBit; + } + + static std::size_t round_up_pow2(std::size_t n) noexcept { + std::size_t c = 1; + while (c < n) c <<= 1; + return c; + } + + // Sentinel published in `Segment::next` while a thread is allocating the + // successor segment. Distinct from both nullptr and any valid pointer. + static Segment* growing_sentinel() noexcept { + return reinterpret_cast(static_cast(1)); + } + + // Returns the published successor of `s`, or nullptr if there isn't one + // yet (or if a growth is in progress and not yet visible to readers). + static Segment* observed_next(const Segment* s) noexcept { + Segment* n = s->next.load(std::memory_order_acquire); + return (n == growing_sentinel()) ? nullptr : n; + } + + // Spin until the slot's busy bit clears, then return the stable key. + static UKey wait_not_busy(Slot& slot) noexcept { + UKey k = slot.key.load(std::memory_order_acquire); + while (k & kBusyBit) { + k = slot.key.load(std::memory_order_acquire); + } + return k; + } + + // Allocate (or observe) the segment that follows `current`. Only the thread + // that wins the right to grow performs the allocation; other threads spin + // briefly on a sentinel until the new segment is published. This avoids the + // thundering-herd of speculative allocations that doubling-segment growth + // would otherwise produce under high thread contention. + Segment* ensure_next(Segment* current) { + Segment* next = current->next.load(std::memory_order_acquire); + if (next && next != growing_sentinel()) return next; + + Segment* expected = nullptr; + if (current->next.compare_exchange_strong( + expected, growing_sentinel(), + std::memory_order_acq_rel, std::memory_order_acquire)) { + // Won the race: this thread is the sole allocator. + Segment* fresh = nullptr; + try { + fresh = new Segment(current->capacity * 2); + } catch (...) { + current->next.store(nullptr, std::memory_order_release); + throw; + } + current->next.store(fresh, std::memory_order_release); + + // Best-effort tail advance so future inserters skip filled segments. + Segment* t = tail_.load(std::memory_order_acquire); + while (true) { + Segment* tn = t->next.load(std::memory_order_acquire); + if (!tn || tn == growing_sentinel()) break; + if (tail_.compare_exchange_weak(t, tn, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + t = tn; + } + } + return fresh; + } + + // Lost the race. Either another thread is mid-allocation (sentinel + // observed) or the new segment is already published. + while (true) { + Segment* obs = current->next.load(std::memory_order_acquire); + if (obs && obs != growing_sentinel()) return obs; + std::this_thread::yield(); + } + } + + // Search a single segment for `target`. Returns pointer to value or nullptr. + Value* find_in(Segment* seg, std::size_t h, UKey target) const noexcept { + const std::size_t base = h & seg->mask; + const std::size_t probe_limit = std::min(kMaxProbe, seg->capacity); + for (std::size_t i = 0; i < probe_limit; ++i) { + Slot& slot = seg->slots[(base + i) & seg->mask]; + UKey k = slot.key.load(std::memory_order_acquire); + if (k == kEmpty) return nullptr; + if (k & kBusyBit) k = wait_not_busy(slot); + if (k == target) return slot.value_ptr(); + } + return nullptr; + } + + // Try to insert `target` into a single segment. Returns + // {ptr, true} : newly inserted, value constructed from args + // {ptr, false} : key was already present + // {nullptr, false} : segment full along this probe sequence + template + std::pair emplace_in(Segment* seg, std::size_t h, UKey target, + Args&&... args) { + const std::size_t base = h & seg->mask; + const std::size_t probe_limit = std::min(kMaxProbe, seg->capacity); + const UKey busy = (target & kPayloadMask) | kOccupiedBit | kBusyBit; + for (std::size_t i = 0; i < probe_limit; ++i) { + Slot& slot = seg->slots[(base + i) & seg->mask]; + UKey k = slot.key.load(std::memory_order_acquire); + if (k == kEmpty) { + UKey expected = kEmpty; + if (slot.key.compare_exchange_strong( + expected, busy, + std::memory_order_acq_rel, std::memory_order_acquire)) { + ::new (static_cast(&slot.storage)) + Value(std::forward(args)...); + slot.key.store(target, std::memory_order_release); + seg->size.fetch_add(1, std::memory_order_relaxed); + return {slot.value_ptr(), true}; + } + k = expected; + } + if (k & kBusyBit) k = wait_not_busy(slot); + if (k == target) return {slot.value_ptr(), false}; + } + return {nullptr, false}; + } + +public: + explicit concurrent_flat_map(std::size_t initial_capacity = kDefaultInitialCapacity) { + const std::size_t cap = round_up_pow2(std::max(initial_capacity, 2)); + head_ = new Segment(cap); + tail_.store(head_, std::memory_order_release); + } + + ~concurrent_flat_map() { + Segment* s = head_; + while (s) { + Segment* n = s->next.load(std::memory_order_relaxed); + delete s; + s = n; + } + } + + concurrent_flat_map(const concurrent_flat_map&) = delete; + concurrent_flat_map& operator=(const concurrent_flat_map&) = delete; + + // Move constructor: transfers ownership. The moved-from object is left in + // a destroy-only state; calling any other method on it is undefined. + concurrent_flat_map(concurrent_flat_map&& other) noexcept + : head_(other.head_), + tail_(other.tail_.load(std::memory_order_relaxed)), + hash_(std::move(other.hash_)) { + other.head_ = nullptr; + other.tail_.store(nullptr, std::memory_order_relaxed); + } + + concurrent_flat_map& operator=(concurrent_flat_map&& other) noexcept { + if (this != &other) { + Segment* s = head_; + while (s) { + Segment* n = s->next.load(std::memory_order_relaxed); + delete s; + s = n; + } + head_ = other.head_; + tail_.store(other.tail_.load(std::memory_order_relaxed), + std::memory_order_relaxed); + hash_ = std::move(other.hash_); + other.head_ = nullptr; + other.tail_.store(nullptr, std::memory_order_relaxed); + } + return *this; + } + + // Returns a pointer to the value associated with `key`, or nullptr. + // The returned pointer remains valid for the lifetime of the map. + Value* find(Key key) noexcept { + const UKey target = encode(key); + const std::size_t h = hash_(key); + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + if (Value* v = find_in(seg, h, target)) return v; + } + return nullptr; + } + + const Value* find(Key key) const noexcept { + return const_cast(this)->find(key); + } + + bool contains(Key key) const noexcept { return find(key) != nullptr; } + + // Construct a value in-place if `key` is not yet present. + // Returns {pointer-to-value, inserted?}. + template + std::pair emplace(Key key, Args&&... args) { + const UKey target = encode(key); + const std::size_t h = hash_(key); + + // Look in every existing segment first to honour insert-once semantics. + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + if (Value* v = find_in(seg, h, target)) return {v, false}; + } + + // Then try to insert at the tail, growing as required. + Segment* seg = tail_.load(std::memory_order_acquire); + while (true) { + auto result = emplace_in(seg, h, target, std::forward(args)...); + if (result.first) { + return result; + } + // This probe sequence is saturated in `seg`; another thread may have + // already inserted the same key into a later segment, so re-check + // everything past `seg` before allocating. + Segment* next = observed_next(seg); + if (!next) next = ensure_next(seg); + for (Segment* s = next; s; s = observed_next(s)) { + if (Value* v = find_in(s, h, target)) return {v, false}; + } + seg = next; + } + } + + std::pair insert(Key key, const Value& v) { + return emplace(key, v); + } + std::pair insert(Key key, Value&& v) { + return emplace(key, std::move(v)); + } + + // Total number of inserted elements summed across all segments. Reads each + // segment's atomic counter; safe to call concurrently with other operations + // but the result reflects an instantaneous, possibly racing snapshot. + std::size_t size() const noexcept { + std::size_t total = 0; + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + total += seg->size.load(std::memory_order_acquire); + } + return total; + } + + // Visit every (key, value) pair currently in the map. Not safe to call + // concurrently with insertions if the visitor relies on a stable snapshot; + // the visitor only sees fully-constructed slots and waits past any in-flight + // insert that races with iteration. + template + void for_each(F&& f) { + for (Segment* seg = head_; seg; + seg = observed_next(seg)) { + for (std::size_t i = 0; i < seg->capacity; ++i) { + Slot& slot = seg->slots[i]; + UKey k = slot.key.load(std::memory_order_acquire); + if (k == kEmpty) continue; + if (k & kBusyBit) k = wait_not_busy(slot); + if (!(k & kOccupiedBit)) continue; + f(static_cast(k & kPayloadMask), *slot.value_ptr()); + } + } + } + + template + void for_each(F&& f) const { + const_cast(this)->for_each(std::forward(f)); + } + + // Remove all elements and shrink back to a single head segment. NOT thread + // safe with respect to any other operation; the caller must establish + // exclusive access (e.g. between processing passes). + void clear() { + Segment* s = head_->next.load(std::memory_order_relaxed); + while (s) { + Segment* n = s->next.load(std::memory_order_relaxed); + delete s; + s = n; + } + head_->next.store(nullptr, std::memory_order_relaxed); + if constexpr (!std::is_trivially_destructible_v) { + for (std::size_t i = 0; i < head_->capacity; ++i) { + UKey k = head_->slots[i].key.load(std::memory_order_relaxed); + if ((k & kOccupiedBit) && !(k & kBusyBit)) { + head_->slots[i].value_ptr()->~Value(); + } + head_->slots[i].key.store(kEmpty, std::memory_order_relaxed); + } + } else { + for (std::size_t i = 0; i < head_->capacity; ++i) { + head_->slots[i].key.store(kEmpty, std::memory_order_relaxed); + } + } + head_->size.store(0, std::memory_order_relaxed); + tail_.store(head_, std::memory_order_release); + } +}; + +} // namespace narf + +#endif // NARF_CONCURRENT_FLAT_MAP_HPP diff --git a/narf/include/histutils.hpp b/narf/include/histutils.hpp index ad4fc31..6fc8513 100644 --- a/narf/include/histutils.hpp +++ b/narf/include/histutils.hpp @@ -6,8 +6,10 @@ #include "traits.hpp" #include "utils.hpp" #include "atomic_adaptor.hpp" +#include "sparse_histogram.hpp" #include "tensorutils.hpp" #include "tensorevalutils.hpp" +#include "rdfutils.hpp" #include #include #include @@ -616,27 +618,68 @@ namespace narf { // Helper which facilitates conversion from value to quantile for a single variable // The underlying histogram holds a tensor with the bin edges for the quantiles in the last variable, // conditional on all the previous variables - template - class QuantileHelper : public HistHelper { + /// Look up the quantile bin for `val` in the sorted edge array [begin, end). + /// + /// Edge layout is the same in both modes: the ``n`` stored edges are + /// ``[val_min, e_0, e_1, ..., e_{N-1}]`` where ``val_min`` is the left + /// boundary of the first quantile bin (where ``CDF = 0``) and + /// ``e_k = edges[k+1]`` is the right boundary of the ``k``-th quantile + /// bin. So ``nbins = n - 1`` quantile bins are encoded by ``n`` edges. + /// + /// * ``Continuous = false`` (integer mode): returns the clamped integer + /// bin index ``i ∈ [0, nbins - 1]``. + /// + /// * ``Continuous = true`` (continuous CDF mode): returns a CDF-style + /// double in ``[0, 1)`` obtained by linear interpolation on segment + /// ``i`` (from ``edges[i]`` at ``CDF = i/nbins`` to ``edges[i+1]`` at + /// ``CDF = (i+1)/nbins``). Every bin — including the first and last — + /// gets its own dedicated slope. + template + auto quantile_lookup(It begin, It end, const T &val) { + const auto n = std::distance(begin, end); + auto const upper = std::upper_bound(begin, end, val); + auto const iquant = std::distance(begin, upper); + const std::ptrdiff_t nbins = n - 1; + auto const i = std::clamp(iquant - 1, 0, nbins - 1); + if constexpr (Continuous) { + auto const lo = *(begin + i); + auto const hi = *(begin + i + 1); + // Guard against degenerate (collapsed) bins where hi == lo. + double const frac = (hi != lo) ? double(val - lo) / double(hi - lo) : 0.5; + double const res = (double(i) + frac) / double(nbins); + return std::clamp(res, 0.0, std::nextafter(1.0, 0.0)); + } else { + return static_cast(i); + } + } + + template + class QuantileHelperImpl : public HistHelper { using base_t = HistHelper; using hist_t = typename base_t::hist_t; using scalar_t = typename Storage::value_type::tensor_t::Scalar; static constexpr auto nquants = Storage::value_type::size; public: - QuantileHelper(hist_t &&resource) : base_t(std::forward(resource)) {} + QuantileHelperImpl(hist_t &&resource) : base_t(std::forward(resource)) {} - boost::histogram::axis::index_type operator()(const boost::histogram::axis::traits::value_type&... args, const scalar_t &last) { + auto operator()(const boost::histogram::axis::traits::value_type&... args, const scalar_t &last) const { auto const &hist = *base_t::resourceHist_; auto const &edges = narf::get_value(hist, args...).data(); - - // find the quantile bin corresponding to the last argument - auto const upper = std::upper_bound(edges.data(), edges.data()+nquants, last); - auto const iquant = std::distance(edges.data(), upper); - return std::clamp(iquant, 0, nquants-1); + return quantile_lookup(edges.data(), edges.data() + nquants, last); } }; + // MapWrapper> so that: + // - RVec arguments are broadcast element-wise (MapWrapper) + // - Eigen tensor arguments are broadcast element-wise (TensorMapWrapper) + // - scalar arguments call through directly + template + using QuantileHelper = MapWrapper>>; + + template + using QuantileHelperContinuous = MapWrapper>>; + // CTAD doesn't work reliably from cppyy so add factory function template QuantileHelper make_quantile_helper(boost::histogram::histogram, Storage> &&h) { @@ -644,25 +687,34 @@ namespace narf { return QuantileHelper(std::forward(h)); } + template + QuantileHelperContinuous make_quantile_helper_continuous(boost::histogram::histogram, Storage> &&h) { + using hist_t = boost::histogram::histogram, Storage>; + return QuantileHelperContinuous(std::forward(h)); + } + // simple version for static quantiles - template - class QuantileHelperStatic { + template + class QuantileHelperStaticImpl { public: using edge_t = std::array; - QuantileHelperStatic(const edge_t &edges) : edges_(edges) {} + QuantileHelperStaticImpl(const edge_t &edges) : edges_(edges) {} - boost::histogram::axis::index_type operator() (double val) { - // find the quantile bin corresponding to the last argument - auto const upper = std::upper_bound(edges_.begin(), edges_.end(), val); - auto const iquant = std::distance(edges_.begin(), upper); - return std::clamp(iquant, 0, N-1); + auto operator() (double val) const { + return quantile_lookup(edges_.begin(), edges_.end(), val); } private: const edge_t edges_; }; + template + using QuantileHelperStatic = MapWrapper>>; + + template + using QuantileHelperStaticContinuous = MapWrapper>>; + /// Computes the minimum-variance reweighting to approximate a shift /// or smearing in the underlying variables of a multidimensional histogram. /// @@ -670,10 +722,10 @@ namespace narf { /// summing over the contribution from each axis template - class HistShiftHelper { + class HistShiftHelperImpl { public: - HistShiftHelper(const Axes&... axes) : axes_(axes...) {} - HistShiftHelper(Axes&&... axes) : axes_(std::move(axes)...) {} + HistShiftHelperImpl(const Axes&... axes) : axes_(axes...) {} + HistShiftHelperImpl(Axes&&... axes) : axes_(std::move(axes)...) {} template auto operator()(const Args&... args) const { @@ -714,39 +766,6 @@ namespace narf { /// Core computation dispatched over axis indices. template auto compute(const Nominal& orig, - const Shifted& shifted, - const SmearShifted& smear_shifted, - const Weight& nominal_weight, - std::index_sequence idxs) const { - - constexpr bool is_container_any = (is_container> || ...); - - if constexpr(is_container_any) { - auto make_range_of_tuples = [](auto const&...xs){ return make_zip_view(make_view(xs)...); }; - - auto const orig_v = std::apply(make_range_of_tuples, orig); - auto const shifted_v = std::apply(make_range_of_tuples, shifted); - auto const smear_shifted_v = std::apply(make_range_of_tuples, smear_shifted); - - auto compute_elem_impl = [this, &nominal_weight, &idxs](auto &orig_elem, auto &shifted_elem, auto&smear_shifted_elem) { - return compute_impl(orig_elem, shifted_elem, smear_shifted_elem, nominal_weight, idxs); - }; - auto compute_elem = [&compute_elem_impl](auto const &elem_tuple) { return std::apply(compute_elem_impl, elem_tuple); }; - - auto const res_view = make_zip_view(orig_v, shifted_v, smear_shifted_v) - | std::views::transform(compute_elem); - - auto const res = range_to(res_view); - return res; - } - else { - return compute_impl(orig, shifted, smear_shifted, nominal_weight, idxs); - } - } - - /// Core computation dispatched over axis indices. - template - auto compute_impl(const Nominal& orig, const Shifted& shifted, const SmearShifted& smear_shifted, const Weight& nominal_weight, @@ -806,9 +825,11 @@ namespace narf { // will probably have to return u,d,s individually with more complicated logic // in the calling layer to do the matrix multiplication - // Underflow / overflow: no reliable bin geometry, return no correction. + // Underflow / overflow or degenerate bin geometry (e.g. infinite bin + // edge): no reliable bin geometry, return no correction. // (note that a is infinity in this case such that delta and v are also zero) - const bool flow = bin_idx < 0 || bin_idx >= ax.size(); + const bool degenerate = !std::isfinite(a) || !std::isfinite(x_c); + const bool flow = bin_idx < 0 || bin_idx >= ax.size() || degenerate; auto const u = flow ? 0.*x_orig : (x_orig - x_c)/a; auto const delta = (x_shifted - x_orig)/a; @@ -838,12 +859,18 @@ namespace narf { const std::tuple axes_; }; + /// Public helper: MapWrapper around HistShiftHelperImpl so container + /// arguments are automatically broadcast/zipped element-wise, while scalar + /// arguments are passed through directly. + template + using HistShiftHelper = MapWrapper>; + // factory function needed because CTAD doesn't work reliably from cppyy // also the trailing return type is needed because cppyy has issues with auto // return types template HistShiftHelper...> make_hist_shift_helper(Axes&&... axes) { - return HistShiftHelper(std::forward(axes)...); + return HistShiftHelper...>(std::forward(axes)...); } } diff --git a/narf/include/matrix_utils.hpp b/narf/include/matrix_utils.hpp new file mode 100644 index 0000000..8a74729 --- /dev/null +++ b/narf/include/matrix_utils.hpp @@ -0,0 +1,151 @@ +#pragma once + +#include +#include +#include +#include "concurrent_flat_map.hpp" + + +class SymMatrixAtomic { +public: + SymMatrixAtomic() = default; + SymMatrixAtomic(std::size_t n) : n_(n), data_(n*(n+1)/2) {} + + double fetch_add(std::size_t iidx, std::size_t jidx, double val) { + std::atomic& ref = data_[packed_index(iidx, jidx)]; + return ref.fetch_add(val); + } + + void fill_row(std::size_t row, double *rowData) { + const std::size_t offset = packed_index(row, row); + std::fill(rowData, rowData + row, 0.); + std::copy(data_.begin() + offset, data_.begin() + offset + n_ - row, rowData + row); + } + +private: + +/** + * Converts (row, col) indices of a symmetric matrix into a linearized index + * for packed storage of unique elements (upper triangle, row-major). + * + * Storage layout (0-indexed, n=4 example): + * (0,0)(0,1)(0,2)(0,3) | (1,1)(1,2)(1,3) | (2,2)(2,3) | (3,3) + * 0 1 2 3 4 5 6 7 8 9 + * + * Total elements stored: n*(n+1)/2 + * + * @param row Row index (0-based) + * @param col Column index (0-based) + * @return Linearized index into packed storage array + */ + inline std::size_t packed_index(std::size_t row, std::size_t col) { + // Normalize to upper triangle: ensure row <= col + if (row > col) { + std::size_t tmp = row; + row = col; + col = tmp; + } + + // Number of elements in rows 0..(row-1): row*n - row*(row-1)/2 + // Plus offset within current row: (col - row) + return row * n_ - row * (row - 1) / 2 + (col - row); + } + + std::size_t n_{}; + std::vector > data_; + +}; + +class SparseMatrixIndexValues { +public: + const std::vector &idxs0() const { return idxs0_; } + const std::vector &idxs1() const { return idxs1_; } + const std::vector &vals() const { return vals_; } + + void emplace_back(std::size_t idx0, std::size_t idx1, double val) { + idxs0_.emplace_back(idx0); + idxs1_.emplace_back(idx1); + vals_.emplace_back(val); + } + + void reserve(std::size_t i) { + idxs0_.reserve(i); + idxs1_.reserve(i); + vals_.reserve(i); + } + + std::size_t size() const { return vals_.size(); } + +private: + std::vector idxs0_; + std::vector idxs1_; + std::vector vals_; +}; + +class SparseMatrixAtomic { +public: + using map_type = narf::concurrent_flat_map>; + + SparseMatrixAtomic(std::size_t size0, std::size_t size1, + double fill_fraction = 0.025) + : size0_(size0), size1_(size1), + data_(std::max( + static_cast(fill_fraction * static_cast(size0 * size1)), + 16)) {} + + std::atomic &operator() (std::size_t idx0, std::size_t idx1) { + const std::size_t i = globalidx(idx0, idx1); + auto res = data_.emplace(i); + return *res.first; + } + + const std::atomic &operator() (std::size_t idx0, std::size_t idx1) const { + const std::size_t i = globalidx(idx0, idx1); + auto* p = data_.find(i); + return *p; + } + + void fetch_add(std::size_t idx0, std::size_t idx1, double val) { + if (val != 0.) { + auto &elemval = operator()(idx0, idx1); + elemval.fetch_add(val); + } + } + + SparseMatrixIndexValues index_values() const { + SparseMatrixIndexValues res; + res.reserve(data_.size()); + + data_.for_each([&](std::size_t key, const std::atomic& val) { + auto is = idxs(key); + res.emplace_back(is[0], is[1], val.load()); + }); + + return res; + } + + void clear() { data_.clear(); } + + void reserve(std::size_t /*i*/) { /* no-op: map grows on demand */ } + + std::size_t dense_size() const { return size0_*size1_; } + + map_type &data() { return data_; } + +private: + std::size_t globalidx(std::size_t idx0, std::size_t idx1) const { + return idx0*size0_ + idx1; + } + + std::array idxs(std::size_t globalidx) const { + const std::size_t idx0 = globalidx/size0_; + const std::size_t idx1 = globalidx % size0_; + + return std::array{idx0, idx1}; + } + + const std::size_t size0_; + const std::size_t size1_; + map_type data_; + +}; \ No newline at end of file diff --git a/narf/include/rdfutils.hpp b/narf/include/rdfutils.hpp index eaddbbb..41c195c 100755 --- a/narf/include/rdfutils.hpp +++ b/narf/include/rdfutils.hpp @@ -1,5 +1,8 @@ #pragma once +#include "tensorevalutils.hpp" +#include "utils.hpp" + namespace narf { template @@ -8,15 +11,113 @@ namespace narf { private: Callable callable_; - using return_t = decltype(callable_(std::declval()...)); + using return_t = std::decay_t()...))>; public: DefineWrapper(const Callable &callable) : callable_(callable) {} DefineWrapper(Callable &&callable) : callable_(std::move(callable)) {} - return_t operator() (const Args&... args) const { + return_t operator() (const Args&... args) { return callable_(args...); } }; + + template + class MapWrapper { + + private: + Callable callable_; + public: + + template + requires std::is_constructible_v + explicit MapWrapper(CArgs&&... cargs) : callable_(std::forward(cargs)...) {} + + template + auto operator() (const Args&... args) { + if constexpr ((is_container || ...)) { + auto apply_elem = [this](auto const &elem_tuple) { + return std::apply(callable_, elem_tuple); + }; + auto const res_view = make_zip_view(make_view(args)...) | std::views::transform(apply_elem); + return range_to(res_view); + } else { + return callable_(args...); + } + } + }; + + // TensorMapWrapper: applies the wrapped callable element-wise over + // Eigen tensor arguments. Exactly one argument may be a concrete + // Eigen tensor; all other arguments are broadcast (passed through + // unchanged to every element call). The result is a tensor of the + // same shape whose scalar type is the callable's return type. + // When no tensor argument is present, calls through directly. + template + class TensorMapWrapper { + + private: + Callable callable_; + + // View adapter: tensor args produce a finite view over their + // elements; everything else repeats (same pattern as MapWrapper's + // make_view, but keyed on is_concrete_tensor_v). + template + static auto make_tensor_view(const T &x) { + if constexpr (is_concrete_tensor_v) { + return std::views::counted(x.data(), x.size()); + } else { + return make_repeat_view(x); + } + } + + template + static consteval std::size_t find_tensor_idx() { + constexpr bool flags[] = {is_concrete_tensor_v...}; + for (std::size_t i = 0; i < sizeof...(Args); ++i) + if (flags[i]) return i; + return sizeof...(Args); + } + + public: + + template + requires std::is_constructible_v + explicit TensorMapWrapper(CArgs&&... cargs) + : callable_(std::forward(cargs)...) {} + + template + auto operator() (const Args&... args) { + if constexpr ((is_concrete_tensor_v || ...)) { + auto apply_elem = [this](auto const &elem_tuple) { + return std::apply(callable_, elem_tuple); + }; + auto const res_view = + make_zip_view(make_tensor_view(args)...) + | std::views::transform(apply_elem); + + // Deduce output tensor type from the input tensor's + // shape and the callable's return type. + constexpr std::size_t TIdx = find_tensor_idx(); + auto const &tensor_arg = + std::get(std::tie(args...)); + using in_tensor_t = + std::remove_cvref_t; + using out_scalar_t = + std::decay_t; + + Eigen::TensorFixedSize + result; + auto it = res_view.begin(); + for (Eigen::Index k = 0; k < result.size(); ++k, ++it) + result.data()[k] = *it; + return result; + } else { + return callable_(args...); + } + } + }; + } diff --git a/narf/include/sparse_histogram.hpp b/narf/include/sparse_histogram.hpp new file mode 100644 index 0000000..a2e5d20 --- /dev/null +++ b/narf/include/sparse_histogram.hpp @@ -0,0 +1,202 @@ +#ifndef NARF_SPARSE_HISTOGRAM_HPP +#define NARF_SPARSE_HISTOGRAM_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "atomic_adaptor.hpp" +#include "concurrent_flat_map.hpp" + +namespace narf { + +// A boost::histogram-compatible Storage backed by narf::concurrent_flat_map. +// +// Bins are addressed by their linearized index (the same scheme boost +// histogram itself uses for dense storages). Only bins that have been touched +// by a fill consume memory; the underlying lock-free map allows concurrent +// fills from many threads. +// +// Iteration via the standard begin()/end() interface walks every virtual bin +// (including never-touched bins, which materialize on access via the map's +// emplace path). For sparse traversals prefer iterating data() directly with +// for_each. +// +// Conversion of histograms using this storage to python hist.Hist objects is +// not supported in the current implementation. +template > +class concurrent_sparse_storage { +public: + using value_type = T; + using reference = T&; + using const_reference = const T&; + using map_type = concurrent_flat_map; + + static constexpr bool has_threading_support = true; + + concurrent_sparse_storage() = default; + explicit concurrent_sparse_storage(double fill_fraction) + : fill_fraction_(fill_fraction) {} + concurrent_sparse_storage(concurrent_sparse_storage&&) = default; + concurrent_sparse_storage& operator=(concurrent_sparse_storage&&) = default; + concurrent_sparse_storage(const concurrent_sparse_storage&) = delete; + concurrent_sparse_storage& operator=(const concurrent_sparse_storage&) = delete; + + // boost::histogram::histogram calls reset() with the total number of bins + // (including under/overflow) on construction and after axis growth. + // The map is sized to fill_fraction * n to avoid most early expansions + // when an estimate of occupancy is supplied by the caller. + void reset(std::size_t n) { + size_ = n; + const double cap_d = fill_fraction_ * static_cast(n); + std::size_t cap = cap_d > 1.0 ? static_cast(cap_d) : 1; + map_ = map_type{cap}; + } + + double fill_fraction() const noexcept { return fill_fraction_; } + + std::size_t size() const noexcept { return size_; } + + // Insert-on-access; safe for concurrent fills. + reference operator[](std::size_t i) { + return *map_.emplace(i).first; + } + + const_reference operator[](std::size_t i) const { + if (auto* p = map_.find(i)) return *p; + // Materialize a default-constructed cell so the const overload still + // returns a stable reference. This matches the dense_storage contract + // that "every bin index in [0, size()) is addressable". + return *const_cast(map_).emplace(i).first; + } + + // Required by boost::histogram::storage concept, but not used by our fill + // path. Two sparse storages compare equal iff they have the same logical + // size and identical populated entries. + bool operator==(const concurrent_sparse_storage& other) const { + if (size_ != other.size_) return false; + bool eq = true; + map_.for_each([&](std::size_t k, const T& v) { + if (!eq) return; + auto* p = const_cast(other.map_).find(k); + if (!p || !(*p == v)) eq = false; + }); + return eq; + } + + // Random-access iterator over the full virtual bin range. Dereferencing + // materializes the bin (via operator[]). boost::histogram's fill path + // requires random-access semantics so it can do `begin() + idx`. + class iterator { + public: + using iterator_category = std::random_access_iterator_tag; + using value_type = T; + using reference = T&; + using pointer = T*; + using difference_type = std::ptrdiff_t; + + iterator() = default; + iterator(concurrent_sparse_storage* s, std::size_t i) : s_(s), i_(i) {} + + reference operator*() const { return (*s_)[i_]; } + reference operator[](difference_type n) const { + return (*s_)[i_ + static_cast(n)]; + } + + iterator& operator++() { ++i_; return *this; } + iterator operator++(int) { auto t = *this; ++i_; return t; } + iterator& operator--() { --i_; return *this; } + iterator operator--(int) { auto t = *this; --i_; return t; } + + iterator& operator+=(difference_type n) { + i_ = static_cast(static_cast(i_) + n); + return *this; + } + iterator& operator-=(difference_type n) { return *this += -n; } + iterator operator+(difference_type n) const { auto t = *this; t += n; return t; } + iterator operator-(difference_type n) const { auto t = *this; t -= n; return t; } + friend iterator operator+(difference_type n, iterator it) { return it + n; } + difference_type operator-(const iterator& o) const { + return static_cast(i_) - static_cast(o.i_); + } + + bool operator==(const iterator& o) const { return i_ == o.i_; } + bool operator!=(const iterator& o) const { return i_ != o.i_; } + bool operator< (const iterator& o) const { return i_ < o.i_; } + bool operator<=(const iterator& o) const { return i_ <= o.i_; } + bool operator> (const iterator& o) const { return i_ > o.i_; } + bool operator>=(const iterator& o) const { return i_ >= o.i_; } + + private: + concurrent_sparse_storage* s_ = nullptr; + std::size_t i_ = 0; + }; + using const_iterator = iterator; + + iterator begin() { return iterator(this, 0); } + iterator end() { return iterator(this, size_); } + const_iterator begin() const { + return iterator(const_cast(this), 0); + } + const_iterator end() const { + return iterator(const_cast(this), size_); + } + + map_type& data() noexcept { return map_; } + const map_type& data() const noexcept { return map_; } + +private: + double fill_fraction_ = 1.0; + std::size_t size_ = 0; + map_type map_; +}; + +// Convenience factory: builds a boost::histogram::histogram with the +// concurrent sparse storage and the supplied axes. +template +boost::histogram::histogram...>, + concurrent_sparse_storage> +make_histogram_sparse(double fill_fraction, Axes&&... axes) { + return boost::histogram::make_histogram_with( + concurrent_sparse_storage{fill_fraction}, + std::forward(axes)...); +} + +// Helpers to inspect the underlying concurrent_flat_map of a sparse-storage +// boost histogram. Free functions because cppyy does not expose +// boost::histogram::histogram::storage_ directly. +template +typename concurrent_sparse_storage::map_type& +sparse_histogram_data( + boost::histogram::histogram>& h) { + return boost::histogram::unsafe_access::storage(h).data(); +} + +// Snapshot the populated bins of a sparse-storage boost histogram into a +// vector of (linearized_index, value) pairs. Convenience for python +// inspection where passing a python callable to for_each is awkward. +template +std::vector> sparse_histogram_snapshot( + boost::histogram::histogram>& h) { + std::vector> out; + auto& m = boost::histogram::unsafe_access::storage(h).data(); + out.reserve(m.size()); + m.for_each([&](std::size_t k, const T& v) { + if constexpr (requires { v.load(); }) { + out.emplace_back(k, static_cast(v.load())); + } else { + out.emplace_back(k, static_cast(v)); + } + }); + return out; +} + +} // namespace narf + +#endif // NARF_SPARSE_HISTOGRAM_HPP diff --git a/narf/include/tests.hpp b/narf/include/tests.hpp index 36b99d6..89340ab 100644 --- a/narf/include/tests.hpp +++ b/narf/include/tests.hpp @@ -1,5 +1,16 @@ #pragma once +#include "concurrent_flat_map.hpp" +#include "histutils.hpp" +#include "matrix_utils.hpp" +#include "rdfutils.hpp" + +#include +#include +#include +#include +#include + namespace narf { ROOT::VecOps::RVec testshift() { boost::histogram::axis::regular a(100, 0., 1.); @@ -35,18 +46,272 @@ namespace narf { void testshiftrw() { std::vector a{0, 1, 2}; std::vector b{3, 4, 5}; - + std::cout << a.front() << std::endl; std::cout << b.front() << std::endl; - + for (auto const &[ael, bel] : make_zip_view(a, b)) { ael = 0; bel = 1; } - + std::cout << a.front() << std::endl; std::cout << b.front() << std::endl; - - + + + } + + // Test SymMatrixAtomic: construction, fetch_add, fill_row, and index symmetry. + // Returns true if all checks pass. + bool testSymMatrixAtomic() { + const std::size_t n = 4; + SymMatrixAtomic mat(n); + + // Fill upper triangle: mat[i][j] = (i+1)*(j+1) for i <= j + for (std::size_t i = 0; i < n; ++i) { + for (std::size_t j = i; j < n; ++j) { + mat.fetch_add(i, j, double((i + 1) * (j + 1))); + } + } + + // Verify via fill_row: rowData[j] == (i+1)*(j+1) for j >= i, else 0 + std::vector rowData(n); + for (std::size_t i = 0; i < n; ++i) { + mat.fill_row(i, rowData.data()); + for (std::size_t j = 0; j < i; ++j) { + if (rowData[j] != 0.0) return false; + } + for (std::size_t j = i; j < n; ++j) { + if (rowData[j] != double((i + 1) * (j + 1))) return false; + } + } + + // Verify symmetry: fetch_add(i,j) and fetch_add(j,i) address the same element + SymMatrixAtomic mat2(n); + mat2.fetch_add(1, 3, 5.0); // upper triangle (1,3) + mat2.fetch_add(3, 1, 3.0); // lower triangle: should map to same element + mat2.fill_row(1, rowData.data()); + if (rowData[3] != 8.0) return false; + + return true; + } + + // Test SparseMatrixAtomic: single-threaded fetch_add, index_values round-trip, + // clear, and multi-threaded concurrent fetch_add. Returns true on success. + bool testSparseMatrixAtomic() { + // ---- single-threaded ---- + { + SparseMatrixAtomic mat(20, 20); + mat.fetch_add(1, 2, 3.0); + mat.fetch_add(1, 2, 4.0); + mat.fetch_add(5, 7, 1.5); + mat.fetch_add(0, 0, 2.0); + mat.fetch_add(0, 0, 0.0); // no-op + if (mat(1, 2).load() != 7.0) return false; + if (mat(5, 7).load() != 1.5) return false; + if (mat(0, 0).load() != 2.0) return false; + + auto iv = mat.index_values(); + if (iv.size() != 3) return false; + double sum = 0.0; + for (std::size_t k = 0; k < iv.size(); ++k) sum += iv.vals()[k]; + if (sum != 10.5) return false; + + mat.clear(); + if (mat.index_values().size() != 0) return false; + // reuse after clear + mat.fetch_add(3, 4, 9.0); + if (mat(3, 4).load() != 9.0) return false; + if (mat.index_values().size() != 1) return false; + } + + // ---- multi-threaded fetch_add: each (i,j) cell receives a known total ---- + { + const std::size_t N = 32; + SparseMatrixAtomic mat(N, N); + const unsigned T = 8; + const unsigned reps = 500; + std::vector threads; + threads.reserve(T); + for (unsigned t = 0; t < T; ++t) { + threads.emplace_back([&, t] { + for (unsigned r = 0; r < reps; ++r) { + for (std::size_t i = 0; i < N; ++i) { + // Use a sparse pattern: only ~half the columns + std::size_t j = (i * 3 + 1) % N; + mat.fetch_add(i, j, 1.0 + 0.01 * t); + } + } + }); + } + for (auto& th : threads) th.join(); + + double per_cell_expected = 0.0; + for (unsigned t = 0; t < T; ++t) per_cell_expected += reps * (1.0 + 0.01 * t); + + auto iv = mat.index_values(); + if (iv.size() != N) return false; + for (std::size_t k = 0; k < iv.size(); ++k) { + std::size_t i = iv.idxs0()[k]; + std::size_t j = iv.idxs1()[k]; + if (j != (i * 3 + 1) % N) return false; + if (std::abs(iv.vals()[k] - per_cell_expected) > 1e-9) return false; + } + } + + return true; + } + + // Test concurrent_flat_map: single-threaded correctness, expansion, and + // multi-threaded concurrent insert / find. Returns true on success. + bool testConcurrentFlatMap() { + // ---- single-threaded correctness, force several expansions ---- + { + concurrent_flat_map map(8); + const std::uint64_t N = 5000; + for (std::uint64_t i = 1; i <= N; ++i) { + auto [p, inserted] = map.emplace(i, i * 7u + 3u); + if (!inserted || !p || *p != i * 7u + 3u) return false; + } + // re-insert: must report not-inserted but return existing value + for (std::uint64_t i = 1; i <= N; ++i) { + auto [p, inserted] = map.emplace(i, std::uint64_t(0)); + if (inserted || !p || *p != i * 7u + 3u) return false; + } + // find every key + for (std::uint64_t i = 1; i <= N; ++i) { + auto* p = map.find(i); + if (!p || *p != i * 7u + 3u) return false; + } + // missing keys + if (map.find(0) != nullptr) return false; + if (map.find(N + 1) != nullptr) return false; + } + + // ---- pointer stability across expansion ---- + { + concurrent_flat_map map(4); + std::vector ptrs; + const std::uint64_t N = 1000; + ptrs.reserve(N); + for (std::uint64_t i = 1; i <= N; ++i) { + ptrs.push_back(map.emplace(i, i).first); + } + for (std::uint64_t i = 1; i <= N; ++i) { + if (ptrs[i - 1] != map.find(i)) return false; + if (*ptrs[i - 1] != i) return false; + } + } + + // ---- multi-threaded insert / find ---- + { + concurrent_flat_map map(16); + const unsigned T = 8; + const std::uint64_t per = 4000; + std::atomic dup_inserts{0}; + std::atomic bad{0}; + std::vector threads; + threads.reserve(T); + for (unsigned t = 0; t < T; ++t) { + threads.emplace_back([&, t] { + for (std::uint64_t i = 0; i < per; ++i) { + // Overlapping key ranges across threads exercise contention. + std::uint64_t key = (i % (per / 2)) + 1 + (t % 2) * (per / 2); + std::uint64_t val = key * 1315423911u; + auto [p, ins] = map.emplace(key, val); + if (!p || *p != val) bad.fetch_add(1); + (void)ins; + auto* f = map.find(key); + if (!f || *f != val) bad.fetch_add(1); + } + // Each thread also inserts a unique key block. + for (std::uint64_t i = 0; i < per; ++i) { + std::uint64_t key = 1000000ull + t * per + i; + auto [p, ins] = map.emplace(key, key ^ 0xdeadbeefu); + if (!ins || !p || *p != (key ^ 0xdeadbeefu)) { + dup_inserts.fetch_add(1); + } + } + }); + } + for (auto& th : threads) th.join(); + if (bad.load() != 0) return false; + if (dup_inserts.load() != 0) return false; + + // Verify all unique-block keys present and correct. + for (unsigned t = 0; t < T; ++t) { + for (std::uint64_t i = 0; i < per; ++i) { + std::uint64_t key = 1000000ull + t * per + i; + auto* p = map.find(key); + if (!p || *p != (key ^ 0xdeadbeefu)) return false; + } + } + } + + return true; + } + + // Test QuantileHelperStatic: edges {0.25, 0.5, 0.75} partition into 4 bins. + // Exercise both scalar and container (RVec) call paths via MapWrapper. + bool testQuantileHelperStatic() { + QuantileHelperStatic<4> helper(std::array{0.25, 0.5, 0.75, 1.0}); + + if (helper(0.1) != 0) return false; + if (helper(0.25) != 1) return false; + if (helper(0.4) != 1) return false; + if (helper(0.6) != 2) return false; + if (helper(0.9) != 3) return false; + + ROOT::VecOps::RVec vals{0.1, 0.4, 0.6, 0.9}; + auto res = helper(vals); + if (res.size() != 4) return false; + if (res[0] != 0 || res[1] != 1 || res[2] != 2 || res[3] != 3) return false; + + // Continuous mode: CDF in [0, 1), edges[k] -> (k+1)/N. + // With 4 edges {0.25, 0.5, 0.75, 1.0}, edges[0]->1/4, edges[1]->2/4, etc. + QuantileHelperStaticContinuous<4> helper_c(std::array{0.25, 0.5, 0.75, 1.0}); + auto const eps = 1e-9; + auto const max_cdf = std::nextafter(1.0, 0.0); + if (std::abs(helper_c(0.25) - 0.25) > eps) return false; + if (std::abs(helper_c(0.5) - 0.5) > eps) return false; + if (helper_c(1.0) != max_cdf) return false; + // val below first edge clamps to 0 + if (std::abs(helper_c(0.0) - 0.0) > eps) return false; + // val above last edge clamps to max_cdf + if (helper_c(2.0) != max_cdf) return false; + // midpoint of [edges[0], edges[1]]: CDF = (0 + 1 + 0.5) / 4 = 0.375 + if (std::abs(helper_c(0.375) - 0.375) > eps) return false; + + // Container path through MapWrapper + auto res_c = helper_c(ROOT::VecOps::RVec{0.25, 0.5, 0.75, 1.0}); + if (res_c.size() != 4) return false; + if (std::abs(res_c[0] - 0.25) > eps) return false; + if (std::abs(res_c[1] - 0.5) > eps) return false; + if (std::abs(res_c[2] - 0.75) > eps) return false; + if (res_c[3] != max_cdf) return false; + + return true; + } + + // Test MapWrapper: constructs a wrapper over a simple callable and applies + // it over zipped input ranges. + bool testMapWrapper() { + auto add = [](int a, int b) { return a + b; }; + MapWrapper wrapper(add); + + // Scalar passthrough: no container args -> callable invoked directly. + auto add_scalar = [](int x, int y) { return x + y; }; + MapWrapper scalar_wrapper(add_scalar); + if (scalar_wrapper(2, 5) != 7) return false; + + std::vector a{1, 2, 3, 4}; + std::vector b{10, 20, 30, 40}; + auto res = wrapper(a, b); + if (res.size() != 4) return false; + if (res[0] != 11) return false; + if (res[1] != 22) return false; + if (res[2] != 33) return false; + if (res[3] != 44) return false; + return true; } } diff --git a/narf/include/utils.hpp b/narf/include/utils.hpp index e5eaf3b..fc99960 100755 --- a/narf/include/utils.hpp +++ b/narf/include/utils.hpp @@ -1,5 +1,7 @@ #pragma once +#include + namespace narf { template @@ -134,8 +136,8 @@ namespace narf { | std::views::transform([](auto const &it){ return *it; }); } - template