"""
Random points
=============
.. currentmodule:: glass
The following functions provide functionality for simulating point
processes on the sphere and sampling random positions.
Sampling
--------
.. autofunction:: positions_from_delta
.. autofunction:: uniform_positions
.. autofunction:: position_weights
Bias
----
.. autofunction:: effective_bias
Bias models
-----------
.. autofunction:: linear_bias
.. autofunction:: loglinear_bias
Displacing points
-----------------
.. autofunction:: displace
.. autofunction:: displacement
""" # noqa: D400
from __future__ import annotations
import itertools
import math
from typing import TYPE_CHECKING
import array_api_compat
import array_api_extra as xpx
import glass.arraytools
import glass.healpix as hp
import glass.shells
from glass import _rng
from glass._array_api_utils import xp_additions as uxpx
if TYPE_CHECKING:
from collections.abc import Callable, Generator
from types import ModuleType
from typing import Any
from glass._types import (
ComplexArray,
FloatArray,
IntArray,
UnifiedGenerator,
)
ARCMIN2_SPHERE = 60**6 // 100 / math.pi
[docs]
def effective_bias(
z: FloatArray,
bz: FloatArray,
w: glass.shells.RadialWindow,
) -> float | FloatArray:
r"""
Effective bias parameter from a redshift-dependent bias function.
This function takes a redshift-dependent bias function :math:`b(z)`
and computes an effective bias parameter :math:`\bar{b}` for a
given window function :math:`w(z)`.
Parameters
----------
z
Redshifts and values of the bias function :math:`b(z)`.
bz
Redshifts and values of the bias function :math:`b(z)`.
w
The radial window function :math:`w(z)`.
Returns
-------
The effective bias parameter for the window.
Notes
-----
The effective bias parameter :math:`\bar{b}` is computed using the
window function :math:`w(z)` as the weighted average
.. math::
\bar{b} = \frac{\int b(z) \, w(z) \, dz}{\int w(z) \, dz}
\;.
"""
norm = uxpx.trapezoid(w.wa, w.za)
return glass.arraytools.trapezoid_product((z, bz), (w.za, w.wa)) / norm
[docs]
def linear_bias(
delta: FloatArray,
b: float | FloatArray,
) -> FloatArray:
r"""
Linear bias model :math:`\delta_g = b \, \delta`.
Parameters
----------
delta
The input density contrast.
b
The bias parameter.
Returns
-------
The density contrast after biasing.
"""
return b * delta
[docs]
def loglinear_bias(
delta: FloatArray,
b: float | FloatArray,
) -> FloatArray:
r"""
Log-linear bias model :math:`\ln(1 + \delta_g) = b \ln(1 + \delta)`.
Parameters
----------
delta
The input density contrast.
b
The bias parameter.
Returns
-------
The density contrast after biasing.
"""
xp = array_api_compat.array_namespace(delta, b, use_compat=False)
delta_g = xp.log1p(delta)
delta_g *= b
return xp.expm1(delta_g)
def _broadcast_inputs(
bias: float | FloatArray | None,
delta: FloatArray,
ngal: float | FloatArray,
vis: FloatArray | None,
) -> tuple[
float | FloatArray | None,
FloatArray,
tuple[int, ...],
float | FloatArray,
FloatArray | None,
]:
"""
Broadcast inputs to common shape of extra dimensions.
Figure out how many "populations" of objects we are
dealing with, by broadcasting all inputs together.
Parameters
----------
bias
Bias parameter, is passed as an argument to the bias model.
delta
Map of the input density contrast. This is fed into the bias
model to produce the density contrast for sampling.
ngal
Number density, expected number of points per arcmin2.
vis
Visibility map for the observed points. This is multiplied with
the full sky number count map, and must hence be of compatible shape.
Returns
-------
The broadcasted inputs.
"""
inputs: list[tuple[float | FloatArray, int]] = [(ngal, 0), (delta, 1)]
if bias is not None:
inputs.append((bias, 0))
if vis is not None:
inputs.append((vis, 1))
dims, *rest = glass.arraytools.broadcast_leading_axes(*inputs)
ngal, delta, *rest = rest
if bias is not None:
bias, *rest = rest
if vis is not None:
vis, *rest = rest
return bias, delta, dims, ngal, vis
def _compute_density_contrast(
bias: float | FloatArray | None,
bias_model: Callable[..., Any],
delta: FloatArray,
k: tuple[int, ...],
) -> FloatArray:
"""
Compute density contrast from bias model, or copy.
Applies the bias model to ``delta``.
Parameters
----------
bias
Bias parameter, is passed as an argument to the bias model.
bias_model
The bias model to apply. For examples, :func:`glass.linear_bias`
or :func:`glass.loglinear_bias`.
delta
Map of the input density contrast. This is fed into the bias
model to produce the density contrast for sampling.
k
Indices for the extra dimensions.
Returns
-------
The density contrast after biasing.
"""
xp = array_api_compat.array_namespace(delta, bias, use_compat=False)
return (
xp.asarray(delta[(*k, ...)], copy=True)
if bias is None
else bias_model(delta[(*k, ...)], bias[(*k, ...)]) # ty: ignore[not-subscriptable]
)
def _compute_expected_count(
k: tuple[int, ...],
n: FloatArray,
ngal: float | FloatArray,
*,
remove_monopole: bool,
) -> FloatArray:
"""
Computes the expected number of objects per pixel.
Parameters
----------
k
Indices for the extra dimensions.
n
Number density.
ngal
Number density, expected number of points per arcmin2.
remove_monopole
If true, the monopole of the density contrast
after biasing is fixed to zero.
Returns
-------
Expected number of objects per pixel.
"""
xp = array_api_compat.array_namespace(n, ngal, use_compat=False)
# remove monopole if asked to
if remove_monopole:
n = n - xp.mean(n, keepdims=True)
# turn into number count, modifying the array in place
n = n + 1
n *= ARCMIN2_SPHERE / n.size * ngal[k] # ty: ignore[not-subscriptable]
return n
def _apply_visibility(
k: tuple[int, ...],
n: FloatArray,
vis: FloatArray | None,
) -> FloatArray:
"""
Apply visibility if given.
Parameters
----------
k
Indices for the extra dimensions.
n
Number density.
vis
Visibility map for the observed points. This is multiplied with
the full sky number count map, and must hence be of compatible shape.
Returns
-------
The visibility-applied number count map.
"""
if vis is not None:
n *= vis[(*k, ...)]
return n
def _sample_number_galaxies(
n: FloatArray,
*,
rng: UnifiedGenerator | None = None,
) -> IntArray:
"""
Sample the actual number of galaxies in each
pixel from the Poisson distribution.
Parameters
----------
n
Number density.
rng
Random number generator.
Returns
-------
The sampled number of galaxies per pixel.
"""
xp = n.__array_namespace__()
# get default RNG if not given
if rng is None:
rng = _rng.rng_dispatcher(xp=xp)
# clip number density at zero
n = xp.clip(n, min=0.0)
# sample actual number in each pixel
return rng.poisson(n)
def _sample_galaxies_per_pixel(
batch: int,
dims: tuple[int, ...],
k: tuple[int, ...],
n: FloatArray,
) -> Generator[
tuple[
FloatArray,
FloatArray,
int | IntArray,
]
]:
"""
Sample the individual galaxies in each pixel,
randomly distributed over sub pixels, in batches.
Parameters
----------
batch
Maximum number of positions to yield in one batch.
dims
Shape of the extra dimensions.
k
Indices for the extra dimensions.
n
Number density.
Yields
------
lon
Columns of longitudes for the sampled points.
lat
Columns of latitudes for the sampled points.
count
The number of sampled points If multiple populations are sampled, an
array of counts in the shape of the extra dimensions is returned.
"""
xp = n.__array_namespace__()
# total number of points
count = xp.sum(n)
# don't go through pixels if there are no points
if count == 0:
return
# for converting randomly sampled positions to HEALPix indices
npix = n.shape[-1]
nside = hp.npix2nside(npix)
# create a mask to report the count in the right axis
if dims:
cmask = xp.zeros(dims, dtype=xp.int64)
cmask = xpx.at(cmask)[k].set(1)
else:
cmask = 1
# sample the map in batches
step = 1_000
start, stop, size = 0, 0, 0
while count:
# tally this group of pixels
q = xp.cumulative_sum(n[stop : min(npix, stop + step)])
# does this group of pixels fill the batch?
if size + q[-1] < min(batch, count):
# no, we need the next group of pixels to fill the batch
stop += step
size += q[-1]
else:
# how many pixels from this group do we need?
stop += int(xp.searchsorted(q, xp.asarray(batch - size), side="right"))
# if the first pixel alone is too much, use it anyway
if stop == start:
stop += 1
# sample this batch of pixels
ipix = xp.repeat(xp.arange(start, stop), n[start:stop])
lon, lat = hp.randang(
nside,
ipix,
lonlat=True,
)
# next batch
start, size = stop, 0
# keep track of remaining number of points
count -= ipix.size
# yield the batch
yield lon, lat, ipix.size * cmask
# make sure that the correct number of pixels was sampled
assert xp.sum(n[stop:]) == 0 # noqa: S101
[docs]
def positions_from_delta( # noqa: PLR0913
ngal: float | FloatArray,
delta: FloatArray,
bias: float | FloatArray | None = None,
vis: FloatArray | None = None,
*,
bias_model: Callable[..., Any] = linear_bias,
remove_monopole: bool = False,
batch: int = 1_000_000,
rng: UnifiedGenerator | None = None,
) -> Generator[
tuple[
FloatArray,
FloatArray,
int | IntArray,
]
]:
"""
Generate positions tracing a density contrast.
The map of expected number counts is constructed from the number
density, density contrast, an optional bias model, and an optional
visibility map.
If ``remove_monopole`` is set, the monopole of the computed density
contrast is removed. Over the full sky, the mean number density of
the map will then match the given number density exactly. This,
however, means that an effectively different bias model is being
used, unless the monopole is already zero in the first place.
The function supports multi-dimensional input for the ``ngal``,
``delta``, ``bias``, and ``vis`` parameters. Extra dimensions are
broadcast to a common shape, and treated as separate populations of
points. These are then sampled independently, and the results
concatenated into a flat list of longitudes and latitudes. The
number of points per population is returned in ``count`` as an array
in the shape of the extra dimensions.
Parameters
----------
ngal
Number density, expected number of points per arcmin2.
delta
Map of the input density contrast. This is fed into the bias
model to produce the density contrast for sampling.
bias
Bias parameter, is passed as an argument to the bias model.
vis
Visibility map for the observed points. This is multiplied with
the full sky number count map, and must hence be of compatible shape.
bias_model
The bias model to apply. For examples, :func:`glass.linear_bias`
or :func:`glass.loglinear_bias`.
remove_monopole
If true, the monopole of the density contrast
after biasing is fixed to zero.
batch
Maximum number of positions to yield in one batch.
rng
Random number generator. If not given, a default RNG is used.
Yields
------
lon
Columns of longitudes for the sampled points.
lat
Columns of latitudes for the sampled points.
count
The number of sampled points If multiple populations are sampled, an
array of counts in the shape of the extra dimensions is returned.
Raises
------
TypeError
If the bias model is not a string or callable.
"""
xp = array_api_compat.array_namespace(ngal, delta, bias, vis, use_compat=False)
# get default RNG if not given
if rng is None:
rng = _rng.rng_dispatcher(xp=xp)
# ensure bias_model is a function
if not callable(bias_model):
raise TypeError("bias_model must be callable")
bias, delta, dims, ngal, vis = _broadcast_inputs(bias, delta, ngal, vis)
# iterate the leading dimensions
for k in itertools.product(*map(range, dims)):
n = _compute_density_contrast(bias, bias_model, delta, k)
n = _compute_expected_count(k, n, ngal, remove_monopole=remove_monopole)
n = _apply_visibility(k, n, vis)
n = _sample_number_galaxies(n, rng=rng)
yield from _sample_galaxies_per_pixel(batch, dims, k, n)
[docs]
def position_weights(
densities: FloatArray,
bias: FloatArray | float | None = None,
) -> FloatArray:
r"""
Compute relative weights for angular clustering.
Takes an array *densities* of densities in arbitrary units and
returns the relative weight of each shell. If *bias* is given, a
linear bias is applied to each shell.
This is the equivalent of computing the product of normalised
redshift distribution and bias factor :math:`n(z) \, b(z)` for the
discretised shells.
Parameters
----------
densities
Density of points in each shell. The first axis must broadcast
against the number of shells, and is normalised internally.
bias
Value or values of the linear bias parameter for each shell.
Returns
-------
The relative weight of each shell for angular clustering.
"""
xp = array_api_compat.array_namespace(densities, bias, use_compat=False)
bias = bias if bias is None or not isinstance(bias, float) else xp.asarray(bias)
# bring densities and bias into the same shape
if bias is not None:
densities, bias = glass.arraytools.broadcast_first(densities, bias)
# normalise densities after shape has been fixed
densities = densities / xp.sum(densities, axis=0)
# apply bias after normalisation
if bias is not None:
densities = densities * bias
# densities now contains the relative contribution with bias applied
return densities
[docs]
def displace(
lon: FloatArray,
lat: FloatArray,
alpha: ComplexArray | FloatArray,
) -> tuple[FloatArray, FloatArray]:
r"""
Displace positions on the sphere.
Takes an array of :term:`displacement` values and applies them to
the given positions.
Parameters
----------
lon
Longitudes to be displaced.
lat
Latitudes to be displaced.
alpha
Displacement values. Must be complex-valued or have a leading
axis of size 2 for the real and imaginary component.
Returns
-------
The longitudes and latitudes after displacement.
Notes
-----
Displacements on the sphere are :term:`defined <displacement>` as
follows: The complex displacement :math:`\alpha` transports a point
on the sphere an angular distance :math:`|\alpha|` along the
geodesic with bearing :math:`\arg\alpha` in the original point.
In the language of differential geometry, this function is the
exponential map.
"""
xp = array_api_compat.get_namespace(lon, lat, alpha, use_compat=False)
alpha = xp.asarray(alpha)
if xp.isdtype(alpha.dtype, "complex floating"):
alpha1, alpha2 = xp.real(alpha), xp.imag(alpha)
else:
alpha1, alpha2 = alpha
# we know great-circle navigation:
# θ' = arctan2(√[(cosθ sin|α| - sinθ cos|α| cosγ)² + (sinθ sinγ)²],
# cosθ cos|α| + sinθ sin|α| cosγ)
# δ = arctan2(sin|α| sinγ, sinθ cos|α| - cosθ sin|α| cosγ)
t = xp.asarray(lat) / 180 * math.pi
ct, st = xp.sin(t), xp.cos(t) # sin and cos flipped: lat not co-lat
a = xp.hypot(alpha1, alpha2) # abs(alpha)
g = xp.atan2(alpha2, alpha1) # arg(alpha)
ca, sa = xp.cos(a), xp.sin(a)
cg, sg = xp.cos(g), xp.sin(g)
# flipped atan2 arguments for lat instead of co-lat
tp = xp.atan2(ct * ca + st * sa * cg, xp.hypot(ct * sa - st * ca * cg, st * sg))
d = xp.atan2(sa * sg, st * ca - ct * sa * cg)
return lon + d / math.pi * 180, tp / math.pi * 180
[docs]
def displacement(
from_lon: FloatArray,
from_lat: FloatArray,
to_lon: FloatArray,
to_lat: FloatArray,
) -> ComplexArray:
"""
Compute the displacement between two sets of positions.
Compute the complex :term:`displacement` that transforms points with
longitude *from_lon* and latitude *from_lat* into points with
longitude *to_lon* and latitude *to_lat* (all in degrees).
Parameters
----------
from_lon
Points before displacement.
from_lat
Points before displacement.
to_lon
Points after displacement.
to_lat
Points after displacement.
Returns
-------
Array of complex displacement.
See Also
--------
displace : Apply displacement to a set of points.
"""
xp = array_api_compat.get_namespace(
from_lon,
from_lat,
to_lon,
to_lat,
use_compat=False,
)
a = uxpx.radians(from_lat)
b = uxpx.radians(to_lat)
g = uxpx.radians(to_lon - from_lon)
sa, ca = xp.sin(a), xp.cos(a)
sb, cb = xp.sin(b), xp.cos(b)
sg, cg = xp.sin(g), xp.cos(g)
r = xp.atan2(xp.hypot(cb * sg, ca * sb - sa * cb * cg), sa * sb + ca * cb * cg)
x = xp.atan2(cb * sg, ca * sb - sa * cb * cg)
return r * xp.exp(1j * x)