from collections.abc import Sequence
import numpy as np
import sacc
from cobaya.input import get_default_info, merge_info
from cobaya.likelihood import Likelihood
from cobaya.log import LoggedError
from cobaya.theory import Provider, Theory
from cobaya.tools import recursive_update
from cobaya.typing import empty_dict
from soliket.gaussian.gaussian_data import CrossCov, GaussianData, MultiGaussianData
from soliket.utils import get_likelihood
[docs]
class GaussianLikelihood(Likelihood):
"""Base class for Gaussian likelihoods in SOLikeT.
This class provides the infrastructure for computing Gaussian log-likelihoods
from SACC data files. Subclasses must implement the ``_get_theory()`` method
to compute the theory prediction for the data vector.
Parameters
----------
name : str
Name identifier for the likelihood (default: "Gaussian")
datapath : str
Path to the SACC file containing data and covariance
use_spectra : str or list
Which spectra to use. Either "all" or a list of tracer pairs
like ``[("tracer1", "tracer2")]``
ncovsims : int, optional
Number of simulations used to estimate covariance. If provided,
applies the Hartlap correction factor to the inverse covariance.
Attributes
----------
data : GaussianData
The assembled Gaussian data object with covariance
sacc_data : sacc.Sacc
The loaded SACC data object
x : np.ndarray
The bin centers (ell values)
y : np.ndarray
The data vector
cov : np.ndarray
The covariance matrix
Examples
--------
To create a custom Gaussian likelihood::
class MyLikelihood(GaussianLikelihood):
name = "my_likelihood"
_allowable_tracers = ("cmb_temperature", "cmb_polarization")
def _get_theory(self, **params):
# Compute theory prediction
return theory_vector
"""
name: str = "Gaussian"
use_spectra: (
str | tuple[str, str] | list[tuple[str, str]] | list[list[str, str]] | None
) = None
datapath: str | None = None
sacc_data: sacc.Sacc | None = None
ncovsims: int | None = None
provider: Provider
_enforce_types: bool = True
_allowable_tracers: tuple[str] | None = None
def initialize(self):
self.log.info(f"Initialising {self.name}...")
self._check_use_spectra()
if self.datapath is None and self.sacc_data is None:
raise LoggedError(
self.log,
"You must provide either datapath or sacc_data!",
)
self.sacc_data = self._get_sacc_data()
if self._allowable_tracers is None:
raise LoggedError(
self.log,
"You must set _allowable_tracers in the subclass of GaussianLikelihood!",
)
self._check_tracers()
self.tracer_comb = self.sacc_data.get_tracer_combinations()[0]
self.data = self._get_gauss_data()
def _check_use_spectra(self):
if self.use_spectra is None:
raise LoggedError(self.log, "You must provide use_spectra!")
elif isinstance(self.use_spectra, str):
assert self.use_spectra == "all", "The only allowed string is 'all'!"
elif isinstance(self.use_spectra, tuple):
self.use_spectra = [self.use_spectra]
elif isinstance(self.use_spectra, list):
for item in self.use_spectra:
if isinstance(item, list):
self.use_spectra[self.use_spectra.index(item)] = tuple(item)
elif not isinstance(item, tuple) or len(item) != 2:
raise LoggedError(
self.log,
"Each item in `use_spectra` list must "
"be a tuple of two tracer names!",
)
def _get_sacc_data(self, **params_values):
if self.sacc_data is not None:
self.log.warning(
"You have provided sacc_data directly, so datapath will be ignored!"
)
else:
self.log.info(f"Loading data from {self.datapath}...")
sacc_data = sacc.Sacc.load_fits(self.datapath)
if self.use_spectra == "all":
pass
else:
for tracer_comb in sacc_data.get_tracer_combinations():
if tracer_comb not in self.use_spectra:
sacc_data.remove_selection(tracers=tracer_comb)
tracer_combs = sacc_data.get_tracer_combinations()
assert tracer_combs != [], "No tracer was found!"
return sacc_data
def _get_gauss_data(self, **params_values):
self.x = self._construct_ell_bins()
self.y = self.sacc_data.mean
self.cov = self.sacc_data.covariance.covmat
data = GaussianData(self.name, self.x, self.y, self.cov, self.ncovsims)
return data
def _check_tracers(self):
for tracer_comb in self.sacc_data.get_tracer_combinations():
assert len(tracer_comb) == 2, "Only auto- and cross-spectra are supported!"
for tracer in tracer_comb:
if self.sacc_data.tracers[tracer].quantity not in self._allowable_tracers:
raise LoggedError(
self.log,
(
f"You have tried to use a "
f"{self.sacc_data.tracers[tracer].quantity} tracer in "
f"{self.__class__.__name__}, which only allows "
f"{self._allowable_tracers}. Please check your "
"tracer selection in the ini file."
),
)
def _construct_ell_bins(self) -> np.ndarray:
ell_eff = []
for tracer_comb in self.sacc_data.get_tracer_combinations():
ind = self.sacc_data.indices(tracers=tracer_comb)
ell = np.array(self.sacc_data._get_tags_by_index(["ell"], ind)[0])
ell_eff.append(ell)
return np.concatenate(ell_eff)
def _get_data(self) -> tuple[np.ndarray, np.ndarray]:
return self.x, self.y
def _get_cov(self) -> np.ndarray:
return self.cov
def _get_bin_centers(self) -> np.ndarray:
return self.x
def _get_data_spectrum(self) -> np.ndarray:
return self.y
def get_binning(self, tracer_comb: tuple) -> tuple[np.ndarray, np.ndarray]:
bpw_idx = self.sacc_data.indices(data_type="cl_00", tracers=tracer_comb)
bpw = self.sacc_data.get_bandpower_windows(bpw_idx)
ells_theory = bpw.values
ells_theory = np.asarray(ells_theory, dtype=int)
w_bins = bpw.weight.T
return ells_theory, w_bins
def _get_theory(self, **kwargs) -> np.ndarray:
raise NotImplementedError
[docs]
def logp(self, **params_values) -> float:
theory = self._get_theory(**params_values)
return self.data.loglike(theory)
[docs]
class MultiGaussianLikelihood(GaussianLikelihood):
"""A likelihood combining multiple Gaussian likelihoods with cross-covariances.
This class enables joint analysis of multiple datasets by combining their
data vectors and covariance matrices. Cross-covariances between datasets
can be specified via a ``CrossCov`` object stored in SACC format.
Parameters
----------
components : list of str
List of likelihood class names to combine, e.g.,
``["soliket.mflike.MFLike", "soliket.lensing.LensingLikelihood"]``
options : list of dict
Configuration options for each component likelihood. Each dict should
contain at minimum ``datapath`` and any other required parameters.
cross_cov_path : str, optional
Path to a SACC file containing cross-covariances between components.
If not provided, components are assumed independent (zero cross-covariance).
Attributes
----------
likelihoods : list of Likelihood
The instantiated component likelihoods
cross_cov : CrossCov or None
The loaded cross-covariance container
data : MultiGaussianData
The combined data object with joint covariance
Examples
--------
YAML configuration::
likelihood:
soliket.MultiGaussianLikelihood:
components:
- soliket.mflike.MFLike
- soliket.lensing.LensingLikelihood
options:
- datapath: /path/to/mflike.fits
use_spectra: all
- datapath: /path/to/lensing.fits
cross_cov_path: /path/to/cross_cov.fits
Python usage::
from soliket import MultiGaussianLikelihood
info = {
"components": ["soliket.mflike.MFLike", "soliket.lensing.LensingLikelihood"],
"options": [
{"datapath": "mflike.fits", "use_spectra": "all"},
{"datapath": "lensing.fits"},
],
"cross_cov_path": "cross_cov.fits",
}
like = MultiGaussianLikelihood(info)
"""
components: Sequence | None = None
options: Sequence | None = None
cross_cov_path: str | None = None
def __init__(self, info=empty_dict, **kwargs):
if "components" in info:
self.likelihoods: list[Likelihood] = [
get_likelihood(*kv) for kv in zip(info["components"], info["options"])
]
default_info = self.get_defaults(input_options=info)
default_info.update(info)
default_info = self.get_modified_defaults(default_info, input_options=info)
super().__init__(info=default_info, **kwargs)
[docs]
@classmethod
def get_defaults(
cls, return_yaml=False, yaml_expand_defaults=True, input_options=empty_dict
):
default_info = merge_info(
*[
get_default_info(like, input_options=info)
for like, info in zip(
input_options["components"], input_options["options"]
)
]
)
return default_info
[docs]
@classmethod
def get_modified_defaults(cls, defaults, input_options=empty_dict):
return defaults
def initialize(self):
self.cross_cov: CrossCov | None = CrossCov.load(self.cross_cov_path)
data_list = [like._get_gauss_data() for like in self.likelihoods]
self.data = MultiGaussianData(data_list, self.cross_cov)
self.log.info("Initialized.")
[docs]
def initialize_with_provider(self, provider: Provider):
for like in self.likelihoods:
like.initialize_with_provider(provider)
super().initialize_with_provider(provider)
[docs]
def get_helper_theories(self) -> dict[str, Theory]: # pragma: no cover
helpers: dict[str, Theory] = {}
for like in self.likelihoods:
helpers.update(like.get_helper_theories())
return helpers
def _get_theory(self, **kwargs) -> np.ndarray:
return np.concatenate([like._get_theory(**kwargs) for like in self.likelihoods])
[docs]
def get_requirements(self): # pragma: no cover
# Reqs with arguments like 'lmax', etc. may have to be carefully treated here to
# merge
reqs = {}
for like in self.likelihoods:
new_reqs = like.get_requirements()
# Deal with special cases requiring careful merging
# Make sure the max of the lmax/union of Cls is taken.
# (should make a unit test for this)
if "Cl" in new_reqs and "Cl" in reqs:
new_cl_spec = new_reqs["Cl"]
old_cl_spec = reqs["Cl"]
merged_cl_spec = {}
all_keys = set(new_cl_spec.keys()).union(set(old_cl_spec.keys()))
for k in all_keys:
new_lmax = new_cl_spec.get(k, 0)
old_lmax = old_cl_spec.get(k, 0)
merged_cl_spec[k] = max(new_lmax, old_lmax)
new_reqs["Cl"] = merged_cl_spec
reqs = recursive_update(reqs, new_reqs)
return reqs