Source code for soliket.cosmopower.cosmopower

r"""
.. module:: soliket.cosmopower

:Synopsis: Simple CosmoPower theory wrapper for Cobaya.
:Author: Hidde T. Jense

.. |br| raw:: html

   <br />

.. note::

   **If you use this cosmological code, please cite:**
   |br|
   A. Spurio Mancini et al.
   *CosmoPower: emulating cosmological power spectra for accelerated Bayesian
   inference from next-generation surveys*
   (`arXiv:210603846 <https://arxiv.org/abs/2106.03846>`_)

   And remember to cite any sources for trained networks you use.

Usage
-----

After installing SOLikeT and cosmopower, you can use the ``CosmoPower`` theory
codes by adding the ``soliket.CosmoPower`` code as a block in your parameter
files.

Example: CMB emulators
----------------------

You can get the example CMB emulators from the
`cosmopower release repository
<https://github.com/alessiospuriomancini/cosmopower/tree/main/cosmopower/trained_models/CP_paper>`_.
After downloading these, you should have a directory structure like:

.. code-block:: bash

  /path/to/cosmopower/data
    ├── cmb_TT_NN.pkl
    ├── cmb_TE_PCAplusNN.pkl
    └── cmb_EE_NN.pkl

With these and with ``soliket.CosmoPower`` installed and visible to cobaya, you
can add it as a theory block to your run yaml as:

.. code-block:: yaml

  theory:
    soliket.CosmoPower:
      network_path: /path/to/cosmopower/data
      network_settings:
        tt:
          type: NN
          filename: cmb_TT_NN
        te:
          type: PCAplusNN
          filename: cmb_TE_PCAplusNN
          log: False
        ee:
          type: NN
          filename: cmb_EE_NN

Running this with cobaya will use ``soliket.CosmoPower`` as a theory to
calculate the CMB Cl's from the emulators.

If you want to add the example PP networks as well, you can do that simply with
a block as:

.. code-block:: yaml

  theory:
    soliket.CosmoPower:
      network_path: /path/to/cosmopower/data
      network_settings:
        [...]
        pp:
          type: PCAplusNN
          filename: cmb_PP_PCAplusNN

In this example, the TT, EE and PP networks output :math:`\log(C_\ell)`, hence
the default (``log: True``) is left, while the TE network outputs the
:math:`C_\ell` values directly. The TT and EE networks use the CosmoPower
Neural Network (NN) type emulators, while TE and PP use the Principal Component
Analysis + NN (PCAplusNN) types.

SOLikeT will automatically use the correct conversion prefactors
:math:`\ell (\ell + 1) / 2 \pi` terms and similar, as well as the CMB
temperature. See the :func:`~soliket.cosmopower.CosmoPower.ell_factor` and
:func:`~soliket.cosmopower.CosmoPower.cmb_unit_factor` functions for more
information on how SOLikeT infers these values.
"""

import os
from collections.abc import Iterable

import numpy as np
from cobaya.log import LoggedError
from cobaya.theories.cosmo import BoltzmannBase
from cobaya.theory import Theory

try:
    import cosmopower as cp
except ImportError:
    pass


[docs] class CosmoPower(BoltzmannBase): """A CosmoPower Network wrapper for Cobaya.""" _enforce_types: bool = True def initialize(self): super().initialize() if self.network_settings is None: # pragma: no cover raise LoggedError("No network settings were provided.") self.networks = {} self.all_parameters = set() for spectype in self.network_settings: netdata = {} nettype = self.network_settings[spectype] netpath = os.path.join(self.network_path, nettype["filename"]) if nettype["type"] == "NN": network = cp.cosmopower_NN(restore=True, restore_filename=netpath) elif nettype["type"] == "PCAplusNN": network = cp.cosmopower_PCAplusNN(restore=True, restore_filename=netpath) elif self.stop_at_error: # pragma: no cover raise ValueError( f"Unknown network type {nettype['type']} for network {spectype}." ) else: # pragma: no cover self.log.warn( f"Unknown network type {nettype['type']}\ for network {spectype}: skipped!" ) netdata["type"] = nettype["type"] netdata["log"] = nettype.get("log", True) netdata["network"] = network netdata["parameters"] = list(network.parameters) netdata["lmax"] = network.modes.max() netdata["has_ell_factor"] = nettype.get("has_ell_factor", False) self.all_parameters = self.all_parameters | set(network.parameters) if network is not None: self.networks[spectype.lower()] = netdata if "lmax" not in self.extra_args: # pragma: no cover self.extra_args["lmax"] = None self.log.info(f"Loaded CosmoPower from directory {self.network_path}") self.log.info(f"CosmoPower will expect the parameters {self.all_parameters}")
[docs] def calculate(self, state: dict, want_derived: bool = True, **params) -> bool: ## sadly, this syntax not valid until python 3.9 # cmb_params = { # p: [params[p]] for p in params # } | { # self.translate_param(p): [params[p]] for p in params # } cmb_params = { **{p: [params[p]] for p in params}, **{self.translate_param(p): [params[p]] for p in params}, } ells = None for spectype in self.networks: network = self.networks[spectype] used_params = { par: (cmb_params[par] if par in cmb_params else [params[par]]) for par in network["parameters"] } if network["log"]: data = network["network"].ten_to_predictions_np(used_params)[0, :] else: data = network["network"].predictions_np(used_params)[0, :] state[spectype] = data if ells is None: ells = network["network"].modes state["ell"] = ells.astype(int) return True
[docs] def get_Cl(self, ell_factor: bool = False, units: str = "FIRASmuK2") -> dict: cls_old = self.current_state.copy() lmax = self.extra_args["lmax"] or cls_old["ell"].max() cls = {"ell": np.arange(lmax + 1).astype(int)} ls = cls_old["ell"] for k in self.networks: cls[k] = np.tile(np.nan, cls["ell"].shape) for k in self.networks: prefac = np.ones_like(ls).astype(float) if self.networks[k]["has_ell_factor"]: prefac /= self.ell_factor(ls, k) if ell_factor: prefac *= self.ell_factor(ls, k) cls[k][ls] = cls_old[k] * prefac * self.cmb_unit_factor(k, units, 2.7255) cls[k][:2] = 0.0 if np.any(np.isnan(cls[k])): self.log.warning( "CosmoPower used outside of trained " "{} ell range. Filled in with NaNs.".format(k) ) return cls
[docs] def ell_factor(self, ls: np.ndarray, spectra: str) -> np.ndarray: r""" Calculate the ell factor for a specific spectrum. These prefactors are used to convert from Cell to Dell and vice-versa. See also: cobaya.BoltzmannBase.get_Cl `camb.CAMBresults.get_cmb_power_spectra <https://camb.readthedocs.io/en/latest/results.html#camb.results.CAMBdata.get_cmb_power_spectra>`_ Examples: ell_factor(l, "tt") -> :math:`\ell ( \ell + 1 )/(2 \pi)` ell_factor(l, "pp") -> :math:`\ell^2 ( \ell + 1 )^2/(2 \pi)`. :param ls: the range of ells. :param spectra: a two-character string with each character being one of [tebp]. :return: an array filled with ell factors for the given spectrum. """ ellfac = np.ones_like(ls).astype(float) if spectra in ["tt", "te", "tb", "ee", "et", "eb", "bb", "bt", "be"]: ellfac = ls * (ls + 1.0) / (2.0 * np.pi) elif spectra in ["pt", "pe", "pb", "tp", "ep", "bp"]: ellfac = (ls * (ls + 1.0)) ** (3.0 / 2.0) / (2.0 * np.pi) elif spectra in ["pp"]: ellfac = (ls * (ls + 1.0)) ** 2.0 / (2.0 * np.pi) return ellfac
[docs] def cmb_unit_factor( self, spectra: str, units: str = "FIRASmuK2", Tcmb: float = 2.7255 ) -> float: """ Calculate the CMB prefactor for going from dimensionless power spectra to CMB units. :param spectra: a length 2 string specifying the spectrum for which to calculate the units. :param units: a string specifying which units to use. :param Tcmb: the used CMB temperature [units of K]. :return: The CMB unit conversion factor. """ res = 1.0 x, y = spectra.lower() if x == "t" or x == "e" or x == "b": res *= self._cmb_unit_factor(units, Tcmb) elif x == "p": res *= 1.0 / np.sqrt(2.0 * np.pi) if y == "t" or y == "e" or y == "b": res *= self._cmb_unit_factor(units, Tcmb) elif y == "p": res *= 1.0 / np.sqrt(2.0 * np.pi) return res
def get_can_support_parameters(self) -> list[str]: return self.all_parameters
[docs] def get_requirements(self) -> Iterable[tuple[str, str]]: requirements = [] for k in self.all_parameters: if k in self.renames.values(): for v in self.renames: if self.renames[v] == k: requirements.append((v, None)) break else: requirements.append((k, None)) return requirements
[docs] class CosmoPowerDerived(Theory): """A theory class that can calculate derived parameters from CosmoPower networks.""" def initialize(self): super().initialize() if self.network_settings is None: raise LoggedError("No network settings were provided.") netpath = os.path.join(self.network_path, self.network_settings["filename"]) if self.network_settings["type"] == "NN": self.network = cp.cosmopower_NN(restore=True, restore_filename=netpath) elif self.network_settings["type"] == "PCAplusNN": self.network = cp.cosmopower_PCAplusNN(restore=True, restore_filename=netpath) else: raise LoggedError(f"Unknown network type {self.network_settings['type']}.") self.input_parameters = set(self.network.parameters) self.log_data = self.network_settings.get("log", False) self.log.info(f"Loaded CosmoPowerDerived from directory {self.network_path}") self.log.info( f"CosmoPowerDerived will expect the parameters {self.input_parameters}" ) self.log.info( f"CosmoPowerDerived can provide the following parameters: \ {self.get_can_provide()}." ) def translate_param(self, p): return self.renames.get(p, p)
[docs] def calculate(self, state: dict, want_derived: bool = True, **params) -> bool: ## sadly, this syntax not valid until python 3.9 # input_params = { # p: [params[p]] for p in params # } | { # self.translate_param(p): [params[p]] for p in params # } input_params = { **{p: [params[p]] for p in params}, **{self.translate_param(p): [params[p]] for p in params}, } if self.log_data: data = self.network.ten_to_predictions_np(input_params)[0, :] else: data = self.network.predictions_np(input_params)[0, :] for k, v in zip(self.derived_parameters, data): if len(k) == 0 or k == "_": continue state["derived"][k] = v return True
[docs] def get_param(self, p) -> float: return self.current_state["derived"][self.translate_param(p)]
def get_can_support_parameters(self) -> list[str]: return self.input_parameters
[docs] def get_requirements(self) -> Iterable[tuple[str, str]]: requirements = [] for k in self.input_parameters: if k in self.renames.values(): for v in self.renames: if self.renames[v] == k: requirements.append((v, None)) break else: requirements.append((k, None)) return requirements
[docs] def get_can_provide(self) -> Iterable[str]: return { par for par in self.derived_parameters if (len(par) > 0 and not par == "_") }