Source code for soliket.gaussian.gaussian_data

import json
from collections.abc import Sequence
from typing import Optional

import numpy as np
import sacc
from cobaya.functions import chi_squared


[docs] class GaussianData: """Container for named multivariate Gaussian data. Stores a data vector with its covariance matrix and provides methods for computing the Gaussian log-likelihood. Parameters ---------- name : str Name identifier for the data x : Sequence Labels or coordinates for each data point (e.g., ell values) y : Sequence[float] The data vector values cov : np.ndarray Covariance matrix with shape (n, n) where n = len(x) ncovsims : int, optional Number of simulations used to estimate covariance. If provided, applies the Hartlap correction factor to the inverse covariance. indices : np.ndarray, optional Boolean array for trimming cross-covariances when scale cuts are applied Attributes ---------- inv_cov : np.ndarray Inverse covariance matrix (with Hartlap correction if applicable) norm_const : float Normalization constant for the Gaussian likelihood Raises ------ ValueError If dimensions of x, y, and cov are incompatible If covariance matrix has non-positive determinant """ name: str # name identifier for the data x: Sequence # labels for each data point y: np.ndarray # data point values cov: np.ndarray # covariance matrix inv_cov: np.ndarray # inverse covariance matrix ncovsims: int | None # number of simulations used to estimate covariance indices: np.ndarray | None # boolean array to trim cross-cov with selected bandpowers _fast_chi_squared = staticmethod(chi_squared) def __init__( self, name, x: Sequence, y: Sequence[float], cov: np.ndarray, ncovsims: int | None = None, indices: np.ndarray | None = None, ): self.name = str(name) self.ncovsims = ncovsims self.indices = ( indices if indices is not None and not all(indices) else np.ones(len(x), dtype=bool) ) if not (len(x) == len(y) and cov.shape == (len(x), len(x))): raise ValueError( f"Incompatible shapes! x={len(x)}, y={len(y)}, \ cov={cov.shape}" ) self.x: Sequence[float] = x self.y: np.ndarray = np.ascontiguousarray(y) self.cov: np.ndarray = cov # self.eigenevalues = np.linalg.eigvalsh(cov) # if self.eigenevalues.min() <= 0: # print(self.eigenevalues) # raise ValueError("Covariance is not positive definite!") self.inv_cov: np.ndarray = np.linalg.inv(self.cov) if ncovsims is not None: hartlap_factor = (self.ncovsims - len(x) - 2) / (self.ncovsims - 1) self.inv_cov *= hartlap_factor # log_det = np.log(self.eigenevalues).sum() sign_log_det, log_det = np.linalg.slogdet(self.cov) if sign_log_det != 1: raise ValueError( f"Negative or zero determinant: \ sign(det)={sign_log_det}" ) self.norm_const = -(np.log(2 * np.pi) * len(x) + log_det) / 2 def __len__(self) -> int: return len(self.x)
[docs] def loglike(self, theory: np.ndarray) -> float: """Compute the Gaussian log-likelihood. Parameters ---------- theory : np.ndarray Theory prediction vector with same length as data Returns ------- float Log-likelihood value including normalization constant """ delta = self.y - theory return -0.5 * self._fast_chi_squared(self.inv_cov, delta) + self.norm_const
[docs] class CrossCov(dict): """Cross-covariance container for multi-component Gaussian likelihoods. Stores cross-covariances between named components (e.g., "mflike", "lensing") and optionally the full covariances for each component. Supports saving and loading in SACC format for persistence. The dictionary keys are tuples of component names, e.g., ("mflike", "lensing"). Values are the corresponding covariance matrices. For the full joint covariance, use: - Diagonal blocks: ``(name, name)`` -> auto-covariance matrix - Off-diagonal blocks: ``(name1, name2)`` -> cross-covariance matrix Examples -------- **Mode 1: Full covariance specification** Use ``add_component()`` for auto-covariances and ``add_cross_covariance()`` for off-diagonal blocks:: cross_cov = CrossCov() cross_cov.add_component("mflike", mflike_cov) cross_cov.add_component("lensing", lensing_cov) cross_cov.add_cross_covariance("mflike", "lensing", cross_block) cross_cov.save("cross_cov.fits") **Mode 2: Cross-covariance only** If auto-covariances will come from individual likelihoods:: cross_cov = CrossCov() cross_cov.add_cross_covariance("mflike", "lensing", cross_block) cross_cov.save("cross_cov.fits") **Loading**:: cross_cov = CrossCov.load("cross_cov.fits") block = cross_cov[("mflike", "lensing")] """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._metadata = {} self._component_info: dict[str, dict] = {}
[docs] def add_component( self, name: str, cov: np.ndarray, ): """Add a component with its full covariance. Parameters ---------- name : str Component name (e.g., "mflike", "kk") cov : np.ndarray Full covariance matrix for this component """ if isinstance(cov, dict): raise TypeError( f"cov must be a numpy array, not a dict. Got: {type(cov)}" ) cov_array = np.asarray(cov) self._component_info[name] = { "size": cov_array.shape[0], "cov": cov_array, } # Also store the auto-covariance in the dict self[(name, name)] = cov_array
[docs] def add_cross_covariance( self, name1: str, name2: str, cross_cov: np.ndarray, ): """Add cross-covariance between two components. Parameters ---------- name1 : str First component name name2 : str Second component name cross_cov : np.ndarray Cross-covariance matrix with shape (n1, n2) """ self[(name1, name2)] = np.asarray(cross_cov) # Also store the transpose for convenience self[(name2, name1)] = np.asarray(cross_cov).T
@property def component_names(self) -> list[str]: """Get ordered list of component names.""" return list(self._component_info.keys()) def _infer_component_info(self): """Infer component sizes from stored covariance blocks. This is called when save() is invoked without explicit add_component() calls. Sizes are inferred from the shapes of cross-covariance matrices. """ sizes: dict[str, int] = {} for key, cov in self.items(): name1, name2 = key n1, n2 = cov.shape if name1 in sizes: if sizes[name1] != n1: raise ValueError( f"Inconsistent sizes for component '{name1}': " f"{sizes[name1]} vs {n1}" ) else: sizes[name1] = n1 if name2 in sizes: if sizes[name2] != n2: raise ValueError( f"Inconsistent sizes for component '{name2}': " f"{sizes[name2]} vs {n2}" ) else: sizes[name2] = n2 # Populate _component_info with inferred sizes for name, size in sizes.items(): self._component_info[name] = { "size": size, "cov": self.get((name, name)), # May be None if only cross-covs }
[docs] def save(self, path: str): """Save cross-covariance to SACC format. The SACC file will contain: - A misc tracer for each component - Dummy data points to establish the data vector structure - The full joint covariance matrix Parameters ---------- path : str Output path (must end with .fits or .sacc) """ if not path.endswith((".fits", ".sacc")): raise ValueError("Only .fits or .sacc files are supported!") # If no components were added explicitly, infer sizes from cross-covariances if not self._component_info: self._infer_component_info() cross_sacc = sacc.Sacc() # Add metadata about component order (as JSON string for FITS compatibility) cross_sacc.metadata["component_names"] = json.dumps(self.component_names) # Add a misc tracer for each component for name in self.component_names: cross_sacc.add_tracer("misc", name, quantity="generic", spin=0) # Add dummy data points to establish SACC structure # (SACC requires data points before covariance can be added) # The actual data values are not meaningful - only the covariance matters for name in self.component_names: n_points = self._component_info[name]["size"] for i in range(n_points): cross_sacc.add_data_point( "generic", (name, name), 0.0, ell=float(i) ) # Build and add the full joint covariance matrix full_cov = self._build_full_covariance() cross_sacc.add_covariance(full_cov) cross_sacc.save_fits(path, overwrite=True)
def _build_full_covariance(self) -> np.ndarray: """Build the full joint covariance matrix from stored blocks.""" names = self.component_names sizes = [self._component_info[name]["size"] for name in names] total_size = sum(sizes) full_cov = np.zeros((total_size, total_size)) # Fill in blocks row_start = 0 for i, name_i in enumerate(names): col_start = 0 for j, name_j in enumerate(names): key = (name_i, name_j) if key in self: block = np.asarray(self[key]) full_cov[ row_start : row_start + sizes[i], col_start : col_start + sizes[j], ] = block col_start += sizes[j] row_start += sizes[i] return full_cov
[docs] @classmethod def load(cls, path: str | None) -> Optional["CrossCov"]: """Load cross-covariance from SACC format. Parameters ---------- path : str or None Path to SACC file. If None, returns None. Returns ------- CrossCov or None Loaded cross-covariance object, or None if path is None. """ if path is None: return None if not path.endswith((".fits", ".sacc")): raise ValueError("Only .fits or .sacc files are supported!") cross_sacc = sacc.Sacc.load_fits(path) cross_cov = cls() # Get component names from metadata or infer from tracers if "component_names" in cross_sacc.metadata: # Parse JSON string back to list component_names = json.loads(cross_sacc.metadata["component_names"]) else: # Infer from tracer names component_names = list(cross_sacc.tracers.keys()) # Extract indices for each component component_indices = {} for name in component_names: indices = cross_sacc.indices(tracers=(name, name)) component_indices[name] = indices # Store component info (cov will be extracted below) cross_cov._component_info[name] = { "size": len(indices), "cov": None, # Will be filled below } # Extract covariance blocks if cross_sacc.covariance is not None: full_cov = cross_sacc.covariance.covmat for name_i in component_names: indices_i = component_indices[name_i] for name_j in component_names: indices_j = component_indices[name_j] # Extract the covariance block cov_block = full_cov[np.ix_(indices_i, indices_j)] cross_cov[(name_i, name_j)] = cov_block # Update auto-covariance in component_info if name_i == name_j: cross_cov._component_info[name_i]["cov"] = cov_block return cross_cov
# Keep old methods for backwards compatibility with existing metadata approach
[docs] def add_metadata( self, key: tuple[str], tracers: tuple[tuple[str]], data_types: tuple[str], tracer_info: dict[str, dict[str, str | int]] = None, ): """Store metadata for cross-covariance entries (legacy method). Parameters ---------- key : tuple[str] Component identifier key tracers : tuple[tuple[str]] Tracer pairs for each component data_types : tuple[str] Data types (e.g., "cl_00", "cl_22") tracer_info : dict[str, dict[str, str | int]] Dictionary mapping tracer names to their properties """ self._metadata[key] = { "tracers": tracers, "data_types": data_types, "tracer_info": tracer_info or {}, }
[docs] class MultiGaussianData(GaussianData): """Combined Gaussian data from multiple components with cross-covariances. Assembles multiple ``GaussianData`` objects into a single joint data vector with a combined covariance matrix that includes both auto-covariances and cross-covariances between components. Parameters ---------- data_list : list of GaussianData Individual data objects to combine cross_covs : CrossCov, optional Cross-covariance container. If None, components are assumed independent. Auto-covariances can come from either the CrossCov or the individual GaussianData objects (individual data takes precedence if CrossCov doesn't contain auto-covariance for a component). Attributes ---------- data_list : list of GaussianData The original individual data objects names : list of str Names of all components lengths : list of int Data vector lengths for each component labels : list of str Component name for each element in the combined data vector Examples -------- Combining two datasets with cross-covariance:: data1 = GaussianData("mflike", x1, y1, cov1) data2 = GaussianData("lensing", x2, y2, cov2) cross_cov = CrossCov() cross_cov.add_cross_covariance("mflike", "lensing", cross_block) multi_data = MultiGaussianData([data1, data2], cross_cov) # Access combined properties print(multi_data.cov.shape) # (n1 + n2, n1 + n2) loglike = multi_data.loglike(theory_vector) """ def __init__( self, data_list: list[GaussianData], cross_covs: CrossCov | None = None, ): if cross_covs is None: cross_covs = CrossCov() self.cross_covs = {} # Build covariance blocks: use CrossCov if available, otherwise use defaults for d1 in data_list: for d2 in data_list: key = (d1.name, d2.name) rev_key = (d2.name, d1.name) if d1 == d2: # Auto-covariance: prefer CrossCov, fallback to individual likelihood if key in cross_covs: cov_block = cross_covs[key] # Only trim if not already the right size if cov_block.shape == (len(d1), len(d1)): self.cross_covs[key] = cov_block else: self.cross_covs[key] = cov_block[d1.indices, :][:, d1.indices] else: # Fallback to individual likelihood's covariance self.cross_covs[key] = d1.cov continue # Cross-covariance: use CrossCov if available, otherwise zeros if key not in cross_covs and rev_key not in cross_covs: self.cross_covs[key] = np.zeros((len(d1), len(d2))) elif key in cross_covs: cov_block = cross_covs[key] # Only trim if not already the right size if cov_block.shape == (len(d1), len(d2)): self.cross_covs[key] = cov_block else: self.cross_covs[key] = cov_block[d1.indices, :][:, d2.indices] if self.cross_covs[key].shape != (len(d1), len(d2)): raise ValueError( f"Cross-covariance (for {d1.name} x {d2.name}) " f"has wrong shape: {self.cross_covs[key].shape} " f"instead of {len(d1)} x {len(d2)}!" ) self.cross_covs[rev_key] = self.cross_covs[key].T self.data_list: list[GaussianData] = data_list self.lengths: list[int] = [len(d) for d in data_list] self.names: list[str] = [d.name for d in data_list] self._data: np.ndarray | None = None @property def data(self) -> GaussianData: if self._data is None: self._assemble_data() return self._data
[docs] def loglike(self, theory: np.ndarray) -> float: return self.data.loglike(theory)
@property def name(self) -> str: return self.data.name @property def inv_cov(self) -> np.ndarray: return self.data.inv_cov @property def cov(self) -> np.ndarray: return self.data.cov @property def norm_const(self) -> float: return self.data.norm_const @property def labels(self) -> list[str]: return [ x for y in [[name] * len(d) for name, d in zip(self.names, self.data_list)] for x in y ] def _index_range(self, name: str) -> tuple[int, int]: if name not in self.names: raise ValueError(f"{name} not in {self.names}!") i0 = 0 for n, length in zip(self.names, self.lengths): if n == name: i1 = i0 + length break i0 += length return i0, i1 def _slice(self, *names: str) -> slice: if isinstance(names, str): names = [names] return np.s_[tuple(slice(*self._index_range(n)) for n in names)] def _assemble_data(self): x = np.concatenate([d.x for d in self.data_list]) y = np.concatenate([d.y for d in self.data_list]) N = sum([len(d) for d in self.data_list]) cov = np.zeros((N, N)) for n1 in self.names: for n2 in self.names: cov[self._slice(n1, n2)] = self.cross_covs[(n1, n2)] self._data = GaussianData(" + ".join(self.names), x, y, cov) def plot_cov(self, **kwargs): import matplotlib.pyplot as plt labels = [ f"{label}: {value:.2f}" for label, value in zip(self.labels, self.data.x) ] x_indices = np.arange(len(labels) + 1) y_indices = np.arange(len(labels) + 1) _, ax = plt.subplots(figsize=(10, 8)) heatmap = ax.pcolormesh( x_indices, y_indices, self.cov, cmap="viridis", shading="auto" ) ax.set_xticks(x_indices[:-1] + 0.5) ax.set_yticks(y_indices[:-1] + 0.5) ax.set_xticklabels(labels, rotation=90) ax.set_yticklabels(labels) ax.invert_yaxis() plt.colorbar(heatmap, ax=ax) plt.show() return heatmap