Source code for glass.fields

"""Functions for random fields."""

from __future__ import annotations

import itertools
import math
import warnings
from collections.abc import Sequence
from itertools import combinations_with_replacement, product
from typing import TYPE_CHECKING

import healpy as hp
import numpy as np
from transformcl import cltovar

import array_api_extra as xpx

import glass
import glass._array_api_utils as _utils
import glass.grf

if TYPE_CHECKING:
    from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
    from typing import Any, Literal, TypeVar

    from numpy.typing import NDArray

    from glass._array_api_utils import AnyArray, ComplexArray, FloatArray

    Fields = Sequence[glass.grf.Transformation]
    Cls = Sequence[AnyArray]

    T = TypeVar("T")

try:
    from warnings import deprecated
except ImportError:
    if TYPE_CHECKING:
        from typing import ParamSpec, TypeVar

        _P = ParamSpec("_P")
        _R = TypeVar("_R")

    def deprecated(msg: str, /) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:  # type: ignore[no-redef]
        """Backport of Python's warnings.deprecated()."""
        from functools import wraps  # noqa: PLC0415
        from warnings import warn  # noqa: PLC0415

        def decorator(func: Callable[_P, _R], /) -> Callable[_P, _R]:
            @wraps(func)
            def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
                warn(msg, category=DeprecationWarning, stacklevel=2)
                return func(*args, **kwargs)

            return wrapper

        return decorator


def _inv_triangle_number(triangle_number: int) -> int:
    r"""
    The :math:`n`-th triangle number is :math:`T_n = n \, (n+1)/2`. If
    the argument is :math:`T_n`, then :math:`n` is returned. Otherwise,
    a :class:`ValueError` is raised.
    """
    n = math.floor(math.sqrt(2 * triangle_number))
    if n * (n + 1) // 2 != triangle_number:
        msg = f"not a triangle number: {triangle_number}"
        raise ValueError(msg)
    return n


def nfields_from_nspectra(nspectra: int) -> int:
    r"""
    Returns the number of fields for a number of spectra.

    Given the number of spectra *nspectra*, returns the number of
    fields *n* such that ``n * (n + 1) // 2 == nspectra`` or raises
    a :class:`ValueError` if the number of spectra is invalid.
    """
    try:
        n = _inv_triangle_number(nspectra)
    except ValueError:
        msg = f"invalid number of spectra: {nspectra}"
        raise ValueError(msg) from None
    return n


def iternorm(
    k: int,
    cov: Iterable[FloatArray],
    size: int | tuple[int, ...] = (),
) -> Generator[tuple[int | None, FloatArray, FloatArray]]:
    """
    Return the vector a and variance sigma^2 for iterative normal sampling.

    Parameters
    ----------
    k
        The number of fields.
    cov
        The covariance matrix for the fields.
    size
        The size of the covariance matrix.

    Yields
    ------
    index
        The index for iterative sampling.
    vector
        The vector for iterative sampling.
    standard_deviation
        The standard deviation for iterative sampling.

    Raises
    ------
    TypeError
        If the covariance matrix is not broadcastable to the given size.
    ValueError
        If the covariance matrix is not positive definite.

    """
    # Convert to list here to allow determining the namespace
    first = next(cov)  # type: ignore[call-overload]
    xp = _utils.get_namespace(first)

    n = (size,) if isinstance(size, int) else size

    m = xp.zeros((*n, k, k))
    a = xp.zeros((*n, k))
    s = xp.zeros((*n,))
    q = (*n, k + 1)
    j = 0 if k > 0 else None

    # We must use cov_expanded here as cov has been consumed to determine the namespace
    for i, x in enumerate(itertools.chain([first], cov)):
        # Ideally would be xp.asanyarray but this does not yet exist. The key difference
        # between the two in numpy is that asanyarray maintains subclasses of NDArray
        # whereas asarray will return the base class NDArray. Currently, we don't seem
        # to pass a subclass of NDArray so this, so it might be okay
        x = xp.asarray(x)  # noqa: PLW2901
        if x.shape != q:
            try:
                x = xp.broadcast_to(x, q)  # noqa: PLW2901
            except ValueError:
                msg = f"covariance row {i}: shape {x.shape} cannot be broadcast to {q}"
                raise TypeError(msg) from None

        # only need to update matrix A if there are correlations
        if j is not None:
            # compute new entries of matrix A
            m[..., :, j] = 0
            m[..., j : j + 1, :] = xp.matmul(a[..., xp.newaxis, :], m)
            m[..., j, j] = xp.where(s != 0, -1, s)
            # To ensure we don't divide by zero or nan we use a mask to only divide the
            # appropriate values of m and s
            m_j = m[..., j, :]
            s_broadcast = xp.broadcast_to(s[..., xp.newaxis], m_j.shape)
            mask = (m_j != 0) & (s_broadcast != 0) & ~xp.isnan(s_broadcast)
            m_j[mask] = xp.divide(m_j[mask], -s_broadcast[mask])
            m[..., j, :] = m_j

            # compute new vector a
            c = x[..., 1:, xp.newaxis]
            a = xp.matmul(m[..., :j], c[..., k - j :, :])
            a += xp.matmul(m[..., j:], c[..., : k - j, :])
            a = xp.reshape(a, (*n, k))

            # next rolling index
            j = (j - 1) % k

        # compute new standard deviation
        a_np = np.asarray(a, copy=True)
        einsum_result_np = np.einsum("...i,...i", a_np, a_np)
        s = x[..., 0] - xp.asarray(einsum_result_np, copy=True)
        if xp.any(s < 0):
            msg = "covariance matrix is not positive definite"
            raise ValueError(msg)
        s = xp.sqrt(s)

        # yield the next index, vector a, and standard deviation s
        yield j, a, s


def cls2cov(
    cls: Cls,
    nl: int,
    nf: int,
    nc: int,
) -> Generator[FloatArray]:
    """
    Return array of Cls as a covariance matrix for iterative sampling.

    Parameters
    ----------
    cls
        Angular matter power spectra in *GLASS* ordering.
    nl
        The number of modes.
    nf
        The number of fields.
    nc
        The number of correlated fields.

    Yields
    ------
    matrix
        The covariance matrix for iterative sampling.

    Raises
    ------
    ValueError
        If negative values are found in the Cls.

    """
    xp = _utils.get_namespace(*cls)

    cov = xp.zeros((nl, nc + 1))
    end = 0
    for j in range(nf):
        begin, end = end, end + j + 1
        for i, cl in enumerate(cls[begin:end][: nc + 1]):
            if i == 0 and np.any(xp.less(cl, 0)):
                msg = "negative values in cl"
                raise ValueError(msg)
            n = cl.size
            cov[:n, i] = cl
            cov[n:, i] = 0
        cov /= 2
        yield cov


def _multalm(
    alm: ComplexArray,
    bl: FloatArray,
    *,
    inplace: bool = False,
) -> ComplexArray:
    """
    Multiply alm by bl.

    The alm should be in GLASS order:

    [
        00,
        10, 11,
        20, 21, 22,
        30, 31, 32, 33,
        ...
    ]

    Parameters
    ----------
    alm
        The alm to multiply.
    bl
        The bl to multiply.
    inplace
        Whether to perform the operation in place.

    Returns
    -------
        The product of alm and bl.

    """
    xp = _utils.get_namespace(alm, bl)

    n = bl.size
    # Ideally would be xp.asanyarray but this does not yet exist. The key difference
    # between the two in numpy is that asanyarray maintains subclasses of NDArray
    # whereas asarray will return the base class NDArray. Currently, we don't seem
    # to pass a subclass of NDArray so this, so it might be okay
    out = xp.asarray(alm) if inplace else xp.asarray(alm, copy=True)
    for ell in range(n):
        out[ell * (ell + 1) // 2 : (ell + 1) * (ell + 2) // 2] *= bl[ell]

    return out


[docs] def discretized_cls( cls: Cls, *, lmax: int | None = None, ncorr: int | None = None, nside: int | None = None, ) -> Cls: """ Apply discretisation effects to angular power spectra. Depending on the given arguments, this truncates the angular power spectra to ``lmax``, removes all but ``ncorr`` correlations between fields, and applies the HEALPix pixel window function of the given ``nside``. If no arguments are given, no action is performed. Parameters ---------- cls Angular matter power spectra in *GLASS* ordering. lmax The maximum mode number to truncate the spectra. ncorr The number of correlated fields to keep. nside The resolution parameter for the HEALPix maps. Returns ------- The discretised angular power spectra. Raises ------ ValueError If the length of the Cls array is not a triangle number. """ if ncorr is not None: n = nfields_from_nspectra(len(cls)) cls = [ cls[i * (i + 1) // 2 + j] if j <= ncorr else np.asarray([]) for i in range(n) for j in range(i + 1) ] if nside is not None: pw = hp.pixwin(nside, lmax=lmax) gls = [] for cl in cls: if len(cl) > 0: if lmax is not None: cl = cl[: lmax + 1] # noqa: PLW2901 if nside is not None: n = min(len(cl), len(pw)) cl = cl[:n] * pw[:n] ** 2 # noqa: PLW2901 gls.append(cl) return gls
[docs] @deprecated("use glass.solve_gaussian_spectra() instead") def lognormal_gls( cls: Cls, shift: float = 1.0, ) -> Cls: """ Compute Gaussian Cls for a lognormal random field. .. deprecated:: 2025.1 Use :func:`glass.lognormal_fields` and :func:`glass.compute_gaussian_spectra` or :func:`glass.solve_gaussian_spectra` instead. Parameters ---------- cls Angular matter power spectra in *GLASS* ordering. shift The shift parameter for the lognormal transformation. Returns ------- The Gaussian angular power spectra for a lognormal random field. """ n = nfields_from_nspectra(len(cls)) fields = [glass.grf.Lognormal(shift) for _ in range(n)] return solve_gaussian_spectra(fields, cls)
def _generate_grf( gls: Cls, nside: int, *, ncorr: int | None = None, rng: np.random.Generator | None = None, ) -> Generator[NDArray[np.float64]]: """ Iteratively sample Gaussian random fields (internal use). A generator that iteratively samples HEALPix maps of Gaussian random fields with the given angular power spectra ``gls`` and resolution parameter ``nside``. The optional argument ``ncorr`` can be used to artificially limit now many realised fields are correlated. This saves memory, as only `ncorr` previous fields need to be kept. The ``gls`` array must contain the angular power power spectra of the Gaussian random fields in :ref:`standard order <twopoint_order>`. Parameters ---------- gls The Gaussian angular power spectra for a random field. nside The resolution parameter for the HEALPix maps. ncorr The number of correlated fields. If not given, all fields are correlated. rng Random number generator. If not given, a default RNG is used. Yields ------ fields The Gaussian random fields. Raises ------ ValueError If all gls are empty. """ if rng is None: rng = np.random.default_rng() # number of gls and number of fields ngls = len(gls) ngrf = nfields_from_nspectra(ngls) # number of correlated fields if not specified if ncorr is None: ncorr = ngrf - 1 # number of modes n = max((len(gl) for gl in gls), default=0) if n == 0: msg = "all gls are empty" raise ValueError(msg) # generates the covariance matrix for the iterative sampler cov = cls2cov(gls, n, ngrf, ncorr) # working arrays for the iterative sampling z = np.zeros(n * (n + 1) // 2, dtype=np.complex128) y = np.zeros((n * (n + 1) // 2, ncorr), dtype=np.complex128) # generate the conditional normal distribution for iterative sampling conditional_dist = iternorm(ncorr, cov, size=n) # sample the fields from the conditional distribution for j, a, s in conditional_dist: # standard normal random variates for alm # sample real and imaginary parts, then view as complex number rng.standard_normal(n * (n + 1), np.float64, z.view(np.float64)) # scale by standard deviation of the conditional distribution # variance is distributed over real and imaginary part alm = _multalm(z, s) # add the mean of the conditional distribution for i in range(ncorr): alm += _multalm(y[:, i], a[:, i]) # store the standard normal in y array at the indicated index if j is not None: y[:, j] = z alm = _glass_to_healpix_alm(alm) # modes with m = 0 are real-valued and come first in array alm[:n].real += alm[:n].imag alm[:n].imag[:] = 0 # transform alm to maps # can be performed in place on the temporary alm array yield hp.alm2map(alm, nside, pixwin=False, pol=False, inplace=True)
[docs] @deprecated("use glass.generate() instead") def generate_gaussian( gls: Cls, nside: int, *, ncorr: int | None = None, rng: np.random.Generator | None = None, ) -> Generator[NDArray[np.float64]]: """ Sample Gaussian random fields from Cls iteratively. .. deprecated:: 2025.1 Use :func:`glass.generate()` instead. A generator that iteratively samples HEALPix maps of Gaussian random fields with the given angular power spectra ``gls`` and resolution parameter ``nside``. The optional argument ``ncorr`` can be used to artificially limit now many realised fields are correlated. This saves memory, as only `ncorr` previous fields need to be kept. The ``gls`` array must contain the angular power power spectra of the Gaussian random fields in :ref:`standard order <twopoint_order>`. Parameters ---------- gls The Gaussian angular power spectra for a random field. nside The resolution parameter for the HEALPix maps. ncorr The number of correlated fields. If not given, all fields are correlated. rng Random number generator. If not given, a default RNG is used. Yields ------ fields The Gaussian random fields. Raises ------ ValueError If all gls are empty. """ n = nfields_from_nspectra(len(gls)) fields = [glass.grf.Normal() for _ in range(n)] yield from generate(fields, gls, nside, ncorr=ncorr, rng=rng)
[docs] @deprecated("use glass.generate() instead") def generate_lognormal( gls: Cls, nside: int, shift: float = 1.0, *, ncorr: int | None = None, rng: np.random.Generator | None = None, ) -> Generator[NDArray[np.float64]]: """ Sample lognormal random fields from Gaussian Cls iteratively. .. deprecated:: 2025.1 Use :func:`glass.generate()` instead. Parameters ---------- gls The Gaussian angular power spectra for a lognormal random field. nside The resolution parameter for the HEALPix maps. shift The shift parameter for the lognormal transformation. ncorr The number of correlated fields. If not given, all fields are correlated. rng Random number generator. If not given, a default RNG is used. Yields ------ fields The lognormal random fields. """ n = nfields_from_nspectra(len(gls)) fields = [glass.grf.Lognormal(shift) for _ in range(n)] yield from generate(fields, gls, nside, ncorr=ncorr, rng=rng)
[docs] def getcl( cls: Cls, i: int, j: int, lmax: int | None = None, ) -> FloatArray: """ Return a specific angular power spectrum from an array in :ref:`standard order <twopoint_order>`. Parameters ---------- cls Angular matter power spectra in *GLASS* ordering. i Indices to return. j Indices to return. lmax Truncate the returned spectrum at this mode number. Returns ------- The angular power spectrum for indices *i* and *j* from an array in *GLASS* ordering. """ if j > i: i, j = j, i cl = cls[i * (i + 1) // 2 + i - j] if lmax is not None: if cl.size > lmax + 1: cl = cl[: lmax + 1] else: cl = xpx.pad(cl, (0, lmax + 1 - cl.size)) return cl
[docs] def enumerate_spectra( entries: Iterable[NDArray[Any]], ) -> Iterator[tuple[int, int, NDArray[Any]]]: """ Iterate over a set of two-point functions in :ref:`standard order <twopoint_order>`, yielding a tuple of indices and their associated entry from the input. Examples -------- >>> spectra = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] >>> list(enumerate_spectra(spectra)) [(0, 0, [1, 2, 3]), (1, 1, [4, 5, 6]), (1, 0, [7, 8, 9])] """ for k, cl in enumerate(entries): i = int((2 * k + 0.25) ** 0.5 - 0.5) j = i * (i + 3) // 2 - k yield i, j, cl
[docs] def spectra_indices(n: int) -> NDArray[np.integer]: """ Return an array of indices in :ref:`standard order <twopoint_order>` for a set of two-point functions for *n* fields. Each row is a pair of indices *i*, *j*. Examples -------- >>> spectra_indices(3) array([[0, 0], [1, 1], [1, 0], [2, 2], [2, 1], [2, 0]]) """ i, j = np.tril_indices(n) return np.transpose([i, i - j])
[docs] def effective_cls( cls: Cls, weights1: FloatArray, weights2: FloatArray | None = None, *, lmax: int | None = None, ) -> FloatArray: """ Compute effective angular power spectra from weights. Computes a linear combination of the angular power spectra *cls* using the factors provided by *weights1* and *weights2*. Additional axes in *weights1* and *weights2* produce arrays of spectra. Parameters ---------- cls Angular matter power spectra in *GLASS* ordering. weights1 Weight factors for spectra. The first axis must be equal to the number of fields. weights2 Second set of weights. If not given, *weights1* is used. lmax Truncate the angular power spectra at this mode number. If not given, the longest input in *cls* will be used. Returns ------- A dictionary of effective angular power spectra, where keys correspond to the leading axes of *weights1* and *weights2*. Raises ------ ValueError If the length of *cls* is not a triangle number. ValueError If the shapes of *weights1* and *weights2* are incompatible. """ # Try with cls and weights but if cls is a Sequence[float] then we use weights only # and convert cls to an xp array xp = _utils.get_namespace(*cls, weights1, weights2) # this is the number of fields n = nfields_from_nspectra(len(cls)) # find lmax if not given if lmax is None: lmax = max((cl.shape[0] for cl in cls), default=0) - 1 # broadcast weights1 such that its shape ends in n weights1 = xp.asarray(weights1) weights2 = xp.asarray(weights2) if weights2 is not None else weights1 shape1, shape2 = weights1.shape, weights2.shape for i, shape in enumerate((shape1, shape2)): if not shape or shape[0] != n: msg = f"shape mismatch between fields and weights{i + 1}" raise ValueError(msg) # get the iterator over leading weight axes # auto-spectra do not repeat identical computations pairs = ( combinations_with_replacement(np.ndindex(shape1[1:]), 2) if weights2 is weights1 else product(np.ndindex(shape1[1:]), np.ndindex(shape2[1:])) ) # create the output array: axes for all input axes plus lmax+1 out = xp.empty(shape1[1:] + shape2[1:] + (lmax + 1,)) # helper that will grab the entire first column (i.e. shells) c = (slice(None),) # compute all combined cls from pairs # if weights2 is weights1, set the transpose elements in one pass for j1, j2 in pairs: w1, w2 = weights1[c + j1], weights2[c + j2] cl = sum( w1[i1] * w2[i2] * getcl(cls, i1, i2, lmax=lmax) for i1 in range(n) for i2 in range(n) ) out[j1 + j2 + (...,)] = cl if weights2 is weights1 and j1 != j2: out[j2 + j1 + (...,)] = cl return out
def gaussian_fields(shells: Sequence[glass.RadialWindow]) -> Sequence[glass.grf.Normal]: """ Create Gaussian random fields for radial windows *shells*. Parameters ---------- shells Window functions for the simulated shells. Returns ------- A sequence describing the Gaussian random fields. """ return [glass.grf.Normal() for _shell in shells]
[docs] def lognormal_fields( shells: Sequence[glass.RadialWindow], shift: Callable[[float], float] | None = None, ) -> Sequence[glass.grf.Lognormal]: """ Create lognormal fields for radial windows *shells*. If *shifts* is given, it must be a callable that returns a lognormal shift (i.e. the scale parameter) at the nominal redshift of each shell. Parameters ---------- shells Window functions for the simulated shells. shift Callable that returns the lognormal shift for each field. Returns ------- A sequence describing the lognormal fields. """ if shift is None: shift = lambda _z: 1.0 # noqa: E731 return [glass.grf.Lognormal(shift(shell.zeff)) for shell in shells]
[docs] def compute_gaussian_spectra(fields: Fields, spectra: Cls) -> Cls: """ Compute a sequence of Gaussian angular power spectra. After transformation by *fields*, the expected two-point statistics should recover *spectra* when using a band-limited transform [Tessore23]_. Parameters ---------- fields The fields to be simulated. spectra The desired angular power spectra of the fields. Returns ------- Gaussian angular power spectra for simulation. """ n = len(fields) if len(spectra) != n * (n + 1) // 2: msg = "mismatch between number of fields and spectra" raise ValueError(msg) gls = [] for i, j, cl in enumerate_spectra(spectra): gl = glass.grf.compute(cl, fields[i], fields[j]) if cl.size > 0 else 0 * cl gls.append(gl) return gls
[docs] def solve_gaussian_spectra(fields: Fields, spectra: Cls) -> Cls: """ Solve a sequence of Gaussian angular power spectra. After transformation by *fields*, the expected two-point statistics should recover *spectra* when using a non-band-limited transform [Tessore23]_. Parameters ---------- fields The fields to be simulated. spectra The desired angular power spectra of the fields. Returns ------- Gaussian angular power spectra for simulation. """ n = len(fields) if len(spectra) != n * (n + 1) // 2: msg = "mismatch between number of fields and spectra" raise ValueError(msg) gls = [] for i, j, cl in enumerate_spectra(spectra): if cl.size > 0: # transformation pair t1, t2 = fields[i], fields[j] # set zero-padding of solver to 2N pad = 2 * cl.size # if the desired monopole is zero, that is most likely # and artefact of the theory spectra -- the variance of the # matter density in a finite shell is not zero # -> set output monopole to zero, which ignores cl[0] monopole = 0.0 if cl[0] == 0 else None # call solver gl, _cl_out, info = glass.grf.solve(cl, t1, t2, pad=pad, monopole=monopole) # warn if solver didn't converge if info == 0: warnings.warn( f"Gaussian spectrum for fields ({i}, {j}) did not converge", stacklevel=2, ) else: gl = 0 * cl # makes a copy of the empty array gls.append(gl) return gls
[docs] def generate( fields: Fields, gls: Cls, nside: int, *, ncorr: int | None = None, rng: np.random.Generator | None = None, ) -> Iterator[NDArray[Any]]: """ Sample random fields from Gaussian angular power spectra. Iteratively sample HEALPix maps of transformed Gaussian random fields with the given angular power spectra *gls* and resolution parameter *nside*. The random fields are sampled from Gaussian random fields using the transformations in *fields*. The *gls* array must contain the angular power power spectra of the Gaussian random fields in :ref:`standard order <twopoint_order>`. The optional number *ncorr* limits how many realised fields are correlated. This saves memory, as only *ncorr* previous fields are kept. Parameters ---------- fields Transformations for the random fields. gls Gaussian angular power spectra. nside Resolution parameter for the HEALPix maps. ncorr Number of correlated fields. If not given, all fields are correlated. rng Random number generator. If not given, a default RNG is used. Yields ------ x Sampled random fields. """ n = len(fields) if len(gls) != n * (n + 1) // 2: msg = "mismatch between number of fields and gls" raise ValueError(msg) variances = (cltovar(getcl(gls, i, i)) for i in range(n)) grf = _generate_grf(gls, nside, ncorr=ncorr, rng=rng) for t, x, var in zip(fields, grf, variances, strict=True): yield t(x, var)
[docs] def glass_to_healpix_spectra(spectra: Sequence[T]) -> list[T]: """ Reorder spectra from GLASS to HEALPix order. Reorder spectra in :ref:`GLASS order <twopoint_order>` to conform to (new) HEALPix order. Parameters ---------- spectra Sequence of spectra in GLASS order. Returns ------- Sequence of spectra in HEALPix order. """ n = nfields_from_nspectra(len(spectra)) comb = [(i, j) for i, j in spectra_indices(n)] return [spectra[comb.index((i + k, i))] for k in range(n) for i in range(n - k)]
[docs] def healpix_to_glass_spectra(spectra: Sequence[T]) -> list[T]: """ Reorder spectra from HEALPix to GLASS order. Reorder HEALPix spectra (in new order) to conform to :ref:`GLASS order <twopoint_order>`. Parameters ---------- spectra Sequence of spectra in HEALPix order. Returns ------- Sequence of spectra in GLASS order. """ n = nfields_from_nspectra(len(spectra)) comb = [(i + k, i) for k in range(n) for i in range(n - k)] return [spectra[comb.index((i, j))] for i, j in spectra_indices(n)]
def _glass_to_healpix_alm(alm: NDArray[np.complex128]) -> NDArray[np.complex128]: """ Reorder alms in GLASS order to conform to (new) HEALPix order. Parameters ---------- alm alm in GLASS order. Returns ------- alm in HEALPix order. """ n = _inv_triangle_number(alm.size) ell = np.arange(n) out = [alm[ell[m:] * (ell[m:] + 1) // 2 + m] for m in ell] return np.concatenate(out)
[docs] def lognormal_shift_hilbert2011(z: float) -> float: """ Lognormal shift of Hilbert et al. (2011) for convergence fields. Lognormal shift parameter for the weak lensing convergence from the fitting formula of [Hilbert11]_. Parameters ---------- z Redshift. Returns ------- Lognormal shift. """ return z * (0.008 + z * (0.029 + z * (-0.0079 + z * 0.00065)))
[docs] def cov_from_spectra(spectra: Cls, *, lmax: int | None = None) -> NDArray[Any]: """ Construct covariance matrix from spectra. Construct a covariance matrix from the angular power spectra in *spectra*. Parameters ---------- spectra Sequence of angular power spectra. lmax Maximum angular mode number. If not given, the maximum is taken from the provided spectra. Returns ------- Covariance matrix from the given spectra. """ # recover the number of fields from the number of spectra n = nfields_from_nspectra(len(spectra)) if lmax is None: # noqa: SIM108 # maximum length in input spectra k = max((cl.size for cl in spectra), default=0) else: k = lmax + 1 # this is the covariance matrix of the spectra # the leading dimension is k, then it is a n-by-n covariance matrix # missing entries are zero, which is the default value cov = np.zeros((k, n, n)) # fill the matrix up by going through the spectra in order # skip over entries that are None # if the spectra are ragged, some entries at high ell may remain zero # only fill the lower triangular part, everything is symmetric for i, j, cl in enumerate_spectra(spectra): cov[: cl.size, i, j] = cov[: cl.size, j, i] = cl.reshape(-1)[:k] return cov
def check_posdef_spectra(spectra: Cls) -> bool: """ Test whether angular power spectra are positive semi-definite. Parameters ---------- spectra Spectra to be tested. Returns ------- Whether spectra are positive semi-definite or not. """ cov = cov_from_spectra(spectra) xp = cov.__array_namespace__() is_positive_semi_definite: bool = xp.all(xp.linalg.eigvalsh(cov) >= 0) return is_positive_semi_definite
[docs] def regularized_spectra( spectra: Cls, *, lmax: int | None = None, method: Literal["nearest", "clip"] = "nearest", **method_kwargs: float | None, ) -> Cls: r""" Regularise a set of angular power spectra. Regularises a complete set *spectra* of angular power spectra in :ref:`standard order <twopoint_order>` such that at every angular mode number :math:`\ell`, the matrix :math:`C_\ell^{ij}` is a valid positive semi-definite covariance matrix. The length of the returned spectra is set by *lmax*, or the maximum length of the given spectra if *lmax* is not given. Shorter input spectra are padded with zeros as necessary. Missing spectra can be set to :data:`None` or, preferably, an empty array. The *method* parameter determines how the regularisation is carried out. Parameters ---------- spectra Spectra to be regularised. lmax Maximum angular mode number. If not given, the maximum is taken from the provided spectra. method Regularisation method. """ # regularise the cov matrix using the chosen method cov_method: Callable[..., NDArray[Any]] if method == "clip": from glass.algorithm import cov_clip as cov_method # noqa: PLC0415 elif method == "nearest": from glass.algorithm import cov_nearest as cov_method # noqa: PLC0415 else: msg = f"unknown method '{method}'" # type: ignore[unreachable] raise ValueError(msg) # get the cov matrix from spectra cov = cov_from_spectra(spectra, lmax=lmax) # regularise the cov matrix using the chosen method cov = cov_method(cov, **method_kwargs) # return regularised spectra from cov matrix array return [cov[:, i, j] for i, j in spectra_indices(cov.shape[-1])]