# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2026 Antoine COLLET
"""Provide covariance matrix representation."""
from __future__ import annotations
import abc
import logging
from abc import abstractmethod
from functools import cached_property
from time import time
from typing import Callable, List, Optional, Sequence, Tuple, Union
import numpy as np
import scipy as sp
from numpy.random import Generator, RandomState
from scipy.sparse import csc_array, csr_array
from scipy.sparse.linalg import LinearOperator, gmres
from scipy.spatial.distance import cdist
from covmats._helpers import (
check_random_state,
get_pts_coords_regular_grid,
)
from covmats._sparse_helpers import (
SparseCholeskyFactor,
)
from covmats._toeplitz import (
create_toepliz_first_row,
toeplitz_product,
)
from covmats._types import ArrayLike, NDArrayFloat, NDArrayInt
class CallBack:
"""Represents a callback instance."""
__slots__: List[str] = ["res"]
def __init__(self) -> None:
"""Initialize the instance."""
self.res: List[NDArrayFloat] = []
def __call__(self, rk) -> None:
self.res.append(rk)
@property
def itercount(self) -> int:
"""Return the number of times the callback as been called."""
return len(self.res)
def clear(self) -> None:
"""Delete all results."""
self.res = []
[docs]
class CovarianceMatrix(LinearOperator, sp.stats.Covariance, abc.ABC):
"""
Abstract representation of a covariance matrix.
Calculations involving covariance matrices (e.g. data whitening,
multivariate normal function evaluation) are often performed more
efficiently using a decomposition of the covariance matrix instead of the
covariance matrix itself. This class allows the user to construct an
object representing a covariance matrix using any of several
decompositions and perform calculations using a common interface.
.. note::
The `CovarianceMatrix` class cannot be instantiated directly. Instead, use
one of the derived class:
- :py:class:`CovViaDiagonal`
- :py:class:`CovViaCholesky`
- :py:class:`CovViaSparseCholesky`
- :py:class:`CovViaPrecisionCholesky`
- :py:class:`CovViaSparsePrecisionCholesky`
- :py:class:`CovViaEigenFactorization`
- :py:class:`CovViaEnsemble`
Notes
-----
None of the :py:class:`Covariance` child implementation supports singular
or indefinite covariance matrices. Otherwise, they would not be invertible,
colorizing and whitening would not well defined.
"""
__slots__ = [
"_rank",
"_n_pts",
"_log_pdet",
"_allow_singular",
"_subspace_size",
]
[docs]
def __init__(self) -> None:
"""Initialize the instance."""
# counters
self.dtype = np.dtype("d") # float64 for LinearOperator
self._allow_singular = False # must be invertible
@property
def n_pts(self) -> int:
"""Number of points in the domain (n)."""
return self._n_pts
@n_pts.setter
def n_pts(self, n_pts: int) -> None:
"""Set the number of points in the domain (n)."""
assert int(n_pts) > 0
self._n_pts = n_pts
@property
def shape(self) -> Tuple[int, int]:
"""Shape of the covariance matrix (n, n)."""
return (self.n_pts, self.n_pts)
[docs]
@abstractmethod
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
"""Solve Ax = b, with A, the current covariance matrix instance."""
[docs]
@abstractmethod
def get_diagonal(self) -> NDArrayFloat:
"""Return the diagonal entries of the matrix (variances) as a vector (n,)."""
...
[docs]
def get_trace(self) -> float:
"""Return the trace of the covariance matrix (sum of diagonal elements)."""
return np.sum(self.get_diagonal()).item()
@property
def log_pdet(self) -> float:
"""
Log of the pseudo-determinant of the covariance matrix.
Note
----
In linear algebra and statistics, the pseudo-determinant [1] is the product of
all non-zero eigenvalues of a square matrix. It coincides with the regular
determinant when the matrix is non-singular.
"""
return np.array(self._log_pdet, dtype=float)[()]
@property
def rank(self) -> int:
"""
Rank of the covariance matrix.
"""
return np.array(self._rank, dtype=int)[()]
@property
def subspace_size(self) -> int:
"""
Subspace size of the covariance matrix.
"""
return self._subspace_size
@abstractmethod
def _todense(self) -> NDArrayFloat:
"""
Return an explicit dense representation of the covariance matrix.
"""
...
[docs]
def todense(self) -> NDArrayFloat:
"""Explicit dense representation of the covariance matrix with shape (n, n)."""
return self._todense()
@property
def covariance(self) -> NDArrayFloat:
"""
Explicit dense representation of the covariance matrix.
Notes
-----
Alias for :py:meth:`CovarianceMatrix.todense()`.
"""
return self.todense()
@property
@abstractmethod
def precision(self) -> NDArrayFloat:
"""
Explicit dense representation of the precision matrix with shape (n, n).
"""
...
@staticmethod
def _validate_cov_mat(A: Union[NDArrayFloat, sp.sparse.sparray], name: str) -> None:
"""Raise an error if the input array is not square and of real number values."""
m, n = A.shape[-2:]
if (
m != n
or A.ndim != 2
or not (
np.issubdtype(A.dtype, np.integer)
or np.issubdtype(A.dtype, np.floating)
)
):
message = (
f"The input `{name}` must be a square, "
"two-dimensional array of real numbers."
)
raise ValueError(message)
@staticmethod
def _validate_dense_matrix(A: ArrayLike, name: str) -> NDArrayFloat:
"""Raise an error if the input array is not square and of real number values."""
A = np.atleast_2d(A)
CovarianceMatrix._validate_cov_mat(A, name)
return A
@staticmethod
def _validate_sparse_matrix(A: sp.sparse.sparray, name: str) -> sp.sparse.sparray:
"""Raise an error if the input array is not square and of real number values."""
CovarianceMatrix._validate_cov_mat(A, name)
return A
@staticmethod
def _validate_dense_lower_triangle(A: ArrayLike, name: str) -> NDArrayFloat:
if not np.allclose(A, np.tril(A)):
message = f"The input `{name}` must be a lower-triangular matrix."
raise ValueError(message)
return A
@staticmethod
def _validate_vector(A, name):
"""Raise an error if the input array is not a vector of real number values."""
A = np.atleast_1d(A)
if A.ndim != 1 or not (
np.issubdtype(A.dtype, np.integer) or np.issubdtype(A.dtype, np.floating)
):
message = (
f"The input `{name}` must be a one-dimensional array of real numbers."
)
raise ValueError(message)
return A
def _rmatvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return A.T @ x."""
return self._matvec(x)
def _matmat(self, X: NDArrayFloat) -> NDArrayFloat:
"""Return A @ X."""
return self._matvec(X)
def _rmatmat(self, X: NDArrayFloat) -> NDArrayFloat:
"""Return A @ X."""
return self._matmat(X)
@abc.abstractmethod
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""Perform a whitening transformation on data."""
...
[docs]
def whiten(self, x):
"""
Perform a whitening transformation on data.
"Whitening" ("white" as in "white noise", in which each frequency has
equal magnitude) transforms a set of random variables into a new set of
random variables with unit-diagonal covariance. When a whitening
transform is applied to a sample of points distributed according to
a multivariate normal distribution with zero mean, the covariance of
the transformed sample is approximately the identity matrix
:cite:p:`wikipediaWhiteningTransformation2025,
novakGeneralizationColoringLinear2019`.
Parameters
----------
x : array_like
An array of points. The last dimension must correspond with the
dimensionality of the space, i.e., the number of columns in the
covariance matrix.
Returns
-------
x_ : array_like
The transformed array of points.
References
----------
.. bibliography::
:list: enumerated
:filter: False
novakGeneralizationColoringLinear2019
wikipediaWhiteningTransformation2025
Examples
--------
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng()
>>> n = 3
>>> A = rng.random(size=(n, n))
>>> cov_array = A @ A.T # make matrix symmetric positive definite
>>> precision = np.linalg.inv(cov_array)
>>> cov_object = covmats.CovViaPrecisionCholesky(
.. sp.linalg.cholesky(precision, lower=True))
>>> x = rng.multivariate_normal(np.zeros(n), cov_array, size=(10000))
>>> x_ = cov_object.whiten(x)
>>> np.cov(x_, rowvar=False) # near-identity covariance
array([[0.97862122, 0.00893147, 0.02430451],
[0.00893147, 0.96719062, 0.02201312],
[0.02430451, 0.02201312, 0.99206881]])
"""
return self._whiten(np.asarray(x))
@abc.abstractmethod
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
"""Perform a colorizing transformation on data."""
...
[docs]
def colorize(self, x):
"""
Perform a colorizing transformation on data.
"Colorizing" ("color" as in "colored noise", in which different
frequencies may have different magnitudes) transforms a set of
uncorrelated random variables into a new set of random variables with
the desired covariance. When a coloring transform is applied to a
sample of points distributed according to a multivariate normal
distribution with identity covariance and zero mean, the covariance of
the transformed sample is approximately the covariance matrix used
in the coloring transform
:cite:p:`wikipediaWhiteningTransformation2025,novakGeneralizationColoringLinear2019`.
Parameters
----------
x : array_like
An array of points. The last dimension must correspond with the
dimensionality of the space, i.e., the number of columns in the
covariance matrix.
Returns
-------
x_ : array_like
The transformed array of points.
References
----------
.. bibliography::
:list: enumerated
:filter: False
wikipediaWhiteningTransformation2025
novakGeneralizationColoringLinear2019
Examples
--------
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng(1638083107694713882823079058616272161)
>>> n = 3
>>> A = rng.random(size=(n, n))
>>> cov_array = A @ A.T # make matrix symmetric positive definite
>>> cholesky = np.linalg.cholesky(cov_array)
>>> cov_object = covmats.CovViaCholesky(cholesky)
>>> x = rng.multivariate_normal(np.zeros(n), np.eye(n), size=(10000))
>>> x_ = cov_object.colorize(x)
>>> cov_data = np.cov(x_, rowvar=False)
>>> np.allclose(cov_data, cov_array, rtol=3e-2)
True
"""
return self._colorize(np.asarray(x))
[docs]
def sample_mvnormal(
self,
shape: Sequence[int],
random_state: Optional[Union[int, RandomState, Generator]] = None,
) -> NDArrayFloat:
r"""
Draw samples from the multivariate normal N(0, A).
Parameters
----------
shape: Sequence[int]
Number of random vectors to sample. The resulting array will be of shape
(shape, n), n being the number of elements per random vector
(the covariance) matrix has shape (n, n).
random_state: Optional[Union[int, np.random.Generator, np.random.RandomState]]
Pseudorandom number generator state used to generate resamples.
If `random_state` is ``None`` (or `np.random`), the
`numpy.random.RandomState` singleton is used.
If `random_state` is an int, a new ``RandomState`` instance is used,
seeded with `random_state`.
If `random_state` is already a ``Generator`` or ``RandomState``
instance then that instance is used. The default is None.
Return
------
X: The transformed array of points. It has shape (input shape, n), n being
the number of elements per random vector (the covariance)
matrix has shape (n, n).
Examples
--------
>>> import numpy as np
>>> import scipy as sp
>>> import covmats
>>> covd = covmats.CovViaDiagonal(np.array([5.0, 10.0, 15.0]))
>>> rng_seed = 42
>>> covd.sample_mvnormal(shape=[2], random_state=rng_seed)
array([[ 1.11068661, -0.43723011, 2.50848692],
[ 3.40559829, -0.74045799, -0.90680853]])
>>> x = covd.sample_mvnormal(shape=[2, 4], random_state=rng_seed)
>>> x
array([[[ 1.11068661, -0.43723011, 2.50848692],
[ 3.40559829, -0.74045799, -0.90680853],
[ 3.53122721, 2.4268417 , -1.81826648],
[ 1.21320114, -1.46545542, -1.80376358]],
[[ 0.54104409, -6.05032338, -6.68057804],
[-1.25731314, -3.20285323, 1.21707469],
[-2.03040356, -4.46609644, 5.67643327],
[-0.50485116, 0.21354293, -5.518026 ]]])
>>> x.shape
(2, 4, 3)
>>> cov_cho = covmats.CovViaCholesky(sp.linalg.cholesky(covd.todense()))
>>> cov_cho.sample_mvnormal(shape=[2], random_state=rng_seed)
array([[-0.95777013, -1.11354406, 2.06162461],
[ 0.81715777, 1.30517512, 1.66856257]])
>>> cov_cho.sample_mvnormal(shape=[2, 2], random_state=rng_seed)
array([[[ 1.11068661, -0.43723011, 2.50848692],
[ 3.40559829, -0.74045799, -0.90680853],
[ 3.53122721, 2.4268417 , -1.81826648],
[ 1.21320114, -1.46545542, -1.80376358]],
[[ 0.54104409, -6.05032338, -6.68057804],
[-1.25731314, -3.20285323, 1.21707469],
[-2.03040356, -4.46609644, 5.67643327],
[-0.50485116, 0.21354293, -5.518026 ]]])
"""
# A 1D diagonal of a covariance matrix was passed
return self._colorize(
check_random_state(seed=random_state).standard_normal(
size=(*shape, self.subspace_size)
)
)
[docs]
@classmethod
def from_cholesky(self, *args):
"""Representation of a covariance provided via choleksy factorization.
.. error::
Removed from parent class, do not use.
"""
raise NotImplementedError(
"`from_cholesky` is not available, please instantiate"
" with `CovViaCholesky(...)` directly!"
)
[docs]
@classmethod
def from_diagonal(self, *args):
"""Representation of a covariance provided via diagonal.
.. error::
Removed from parent class, do not use.
"""
raise NotImplementedError(
"`from_diagonal` is not available, please instantiate"
" with `CovViaDiagonal(...)` directly!"
)
[docs]
@classmethod
def from_eigendecomposition(self, *args):
"""
Representation of a covariance provided via eigendecomposition.
.. error::
Removed from parent class, do not use.
"""
raise NotImplementedError(
"`from_eigendecomposition` is not available, please instantiate"
" with `CovViaEigenFactorization(...)` directly!"
)
[docs]
@classmethod
def from_precision(self, *args):
"""
Return a representation of a covariance from its precision matrix.
.. error::
Removed from parent class, do not use.
"""
raise NotImplementedError(
"`from_precision` is not available, please instantiate"
" with `CovViaPrecisionCholesky(...)` directly!"
)
def _dot_diag(x: NDArrayFloat, d: NDArrayFloat):
"""
Perform the dot product between x with shape (..., n) and d with shape (n).
Parameters
----------
x : NDArrayFloat
Input array with shape (..., n).
d : NDArrayFloat
1D vector representing a the diagonal elements of a (n, n) diagonal matrix.
Returns
-------
A @ x
"""
# If d were a full diagonal matrix, x @ d would always do what we want.
# Special treatment is needed for n-dimensional `d` in which each row
# includes only the diagonal elements of a covariance matrix.
return x * d if x.ndim < 2 else x * np.expand_dims(d, -2)
[docs]
class CovViaDiagonal(CovarianceMatrix):
r"""
Representation of a covariance matrix via its diagonal.
Attributes
----------
diagonal : NDArrayFloat
The diagonal elements of a diagonal matrix.
Notes
-----
Let the diagonal elements of a diagonal covariance matrix :math:`D` be
stored in the vector :math:`d`.
When all elements of :math:`d` are strictly positive, whitening of a
data point :math:`x` is performed by computing
:math:`x \cdot d^{-1/2}`, where the inverse square root can be taken
element-wise.
:math:`\log\det{D}` is calculated as :math:`-2 \sum(\log{d})`,
where the :math:`\log` operation is performed element-wise.
Notes
-----
No null nor negative elements are allowed, otherwise the matrix would be
singular or indefinite and consequently non invertible.
Examples
--------
Prepare a symmetric positive definite covariance matrix ``A`` and a
data point ``x``.
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng()
>>> n = 5
>>> A = np.diag(rng.random(n))
>>> x = rng.random(size=n)
Extract the diagonal from ``A`` and create the `Covariance` object.
>>> d = np.diag(A)
>>> cov = covmats.CovViaDiagonal(d)
Compare the functionality of the `Covariance` object against a
reference implementations.
>>> res = cov.whiten(x)
>>> ref = np.diag(d**-0.5) @ x
>>> np.allclose(res, ref)
True
>>> res = cov.log_pdet
>>> ref = np.linalg.slogdet(A)[-1]
>>> np.allclose(res, ref)
True
"""
__slots__ = ["_D", "_sqrtD", "_invD", "_i_zero"]
[docs]
def __init__(self, diagonal: ArrayLike) -> None:
"""
Initialize the instance.
Parameters
----------
diagonal : ArrayLike
The diagonal elements of a diagonal matrix.
"""
self.D = diagonal
super().__init__()
@property
def D(self) -> NDArrayFloat:
"""1D vector representing the diagonal elements of the covariance matrix."""
return self._D
@D.setter
def D(self, D: NDArrayFloat) -> None:
"""
Set the diagonal elements of the covariance matrix.
Notes
-----
No null nor negative elements are allowed, otherwise the matrix would be
singular or indefinite and consequently non invertible.
"""
self._D = self._validate_vector(
A=np.asarray(D, dtype=np.float64), name="diagonal"
)
nnv = np.count_nonzero(self.D < 0.0)
if nnv != 0:
raise ValueError(
f"{nnv} negative values have been detected in `D` which "
"means the matrix is indefinite and non invertible."
)
nnv = np.count_nonzero(self.D == 0.0)
if nnv != 0:
raise ValueError(
f"{nnv} null values have been detected in `D` which "
"means the matrix is singular and non invertible."
)
self.n_pts = np.size(self.D)
self._subspace_size = self.n_pts
self._rank = self.n_pts
# Update the pseudo inverse as well
self._invD = 1.0 / self.D
# Store the square roots for whitening and colorizing transformations
self._sqrtD = np.sqrt(self.D)
self._sqrtinvD = np.sqrt(self._invD)
# Update the log pseudo determinant
self._log_pdet = np.sum(np.log(self.D), axis=-1)
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
return self.get_diagonal() * x
def _matmat(self, X: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the matrix X."""
return self.get_diagonal()[:, np.newaxis] * X
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return _dot_diag(x, self._sqrtinvD)
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a colorizing transformation on data.
Parameters
----------
x : NDArrayFloat
Data to colorize with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return _dot_diag(x, self._sqrtD)
def _todense(self) -> NDArrayFloat:
return np.apply_along_axis(np.diag, -1, self.D)
[docs]
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
"""Solve Ax = b, with A, the current covariance matrix instance."""
return self._invD * b if b.ndim < 2 else (self._invD)[:, np.newaxis] * b
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""Return the diagonal entries of the matrix (variances)."""
return self.D
@property
def precision(self) -> NDArrayFloat:
"""
Explicit dense representation of the precision matrix.
"""
return np.diag(self._invD)
[docs]
class CovViaCholesky(CovarianceMatrix):
r"""
Representation of a covariance via a Cholesky factorization.
Parameters
----------
cholesky : array_like
The lower triangular Cholesky factor of the covariance matrix.
Notes
-----
Let the covariance matrix be :math:`A` and :math:`L` be the lower
Cholesky factor such that :math:`L L^T = A`.
Whitening of a data point :math:`x` is performed by computing
:math:`L^{-1} x`. :math:`\log\det{A}` is calculated as
:math:`2tr(\log{L})`, where the :math:`\log` operation is performed
element-wise.
This `Covariance` class does not support singular covariance matrices
because the Cholesky decomposition does not exist for a singular
covariance matrix.
Examples
--------
Prepare a symmetric positive definite covariance matrix ``A`` and a
data point ``x``.
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng()
>>> n = 5
>>> A = rng.random(size=(n, n))
>>> A = A @ A.T # make the covariance symmetric positive definite
>>> x = rng.random(size=n)
Perform the Cholesky decomposition of ``A`` and create the
`Covariance` object.
>>> L = np.linalg.cholesky(A)
>>> cov = covmats.CovViaCholesky(L)
Compare the functionality of the `Covariance` object against
reference implementation.
>>> from scipy.linalg import solve_triangular
>>> res = cov.whiten(x)
>>> ref = solve_triangular(L, x, lower=True)
>>> np.allclose(res, ref)
True
>>> res = cov.log_pdet
>>> ref = np.linalg.slogdet(A)[-1]
>>> np.allclose(res, ref)
True
"""
__slots__ = ["_L", "_covariance"]
[docs]
def __init__(self, L: ArrayLike, covariance: Optional[ArrayLike] = None) -> None:
"""
Initialize the instance.
Parameters
----------
L : NDArrayFloat
Lower triangle of the covariance matrix Cholesky factorization.
covariance : Optional[NDArrayFloat], optional
Dense covariance matrix, by default None
"""
self.L = L
super().__init__()
if covariance is not None:
self._covariance = self._validate_dense_matrix(covariance, "covariance")
else:
self._covariance = None
@property
def L(self) -> NDArrayFloat:
"""Lower triangle of the covariance matrix Cholesky factorization."""
return self._L
@L.setter
def L(self, L: NDArrayFloat) -> None:
"""Set the lower triangle of the covariance matrix Cholesky factorization."""
self._L = self._validate_dense_lower_triangle(
self._validate_matrix(L, "L"), "L"
)
self._log_pdet = 2 * np.log(np.diag(self._L)).sum(axis=-1)
self.n_pts = np.shape(self.L)[0]
self._subspace_size = self.n_pts
self._rank = self.n_pts
@property
def precision(self) -> NDArrayFloat:
"""
Explicit dense representation of the precision matrix.
"""
return self.solve(np.eye(self.n_pts))
def _todense(self) -> NDArrayFloat:
if self._covariance is not None:
return self._covariance
return self.L @ self.L.T
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
return np.linalg.multi_dot([self.L, self.L.T, x])
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
m = x.T.shape[0]
res = sp.linalg.solve_triangular(self.L, x.T.reshape(m, -1), lower=True)
return res.reshape(x.T.shape).T
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
return x @ self.L.T
[docs]
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
"""Solve Ax = b, with A, the current covariance matrix instance."""
return sp.linalg.cho_solve((self.L, True), b)
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""Return the diagonal entries of the matrix (variances)."""
return np.sum((self.L**2), axis=1)
[docs]
class CovViaSparseCholesky(CovarianceMatrix):
r"""
Representation of a covariance via a sparse Cholesky factorization.
Notes
-----
Let the covariance matrix be :math:`A` and :math:`L` be the lower
Cholesky factor such that :math:`L L^T = A`.
Whitening of a data point :math:`x` is performed by computing
:math:`L^{-1} x`. :math:`\log\det{A}` is calculated as
:math:`2tr(\log{L})`, where the :math:`\log` operation is performed
element-wise.
This `Covariance` class does not support singular covariance matrices
because the Cholesky decomposition does not exist for a singular
covariance matrix.
TODO: indicate that LGPL licence is expected here because it relies on CHOLMOD.
Examples
--------
Prepare a symmetric positive definite covariance matrix ``A`` and a
data point ``x``.
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng()
>>> n = 5
>>> A = rng.random(size=(n, n))
>>> A = A @ A.T # make the covariance symmetric positive definite
>>> x = rng.random(size=n)
Perform the Cholesky decomposition of ``A`` and create the
`Covariance` object.
>>> L = np.linalg.cholesky(A)
>>> cov = covmats.CovViaCholesky(L)
Compare the functionality of the `Covariance` object against
reference implementation.
>>> from scipy.linalg import solve_triangular
>>> res = cov.whiten(x)
>>> ref = solve_triangular(L, x, lower=True)
>>> np.allclose(res, ref)
True
>>> res = cov.log_pdet
>>> ref = np.linalg.slogdet(A)[-1]
>>> np.allclose(res, ref)
True
"""
__slots__ = ["_scf", "_sparse_covariance"]
[docs]
def __init__(
self,
scf: SparseCholeskyFactor,
sparse_covariance: Optional[sp.sparse.sparray] = None,
) -> None:
"""
Initialize the instance.
Parameters
----------
scf : SparseCholeskyFactor
Lower triangle of the sparse covariance matrix Cholesky factorization.
sparse_covariance : Optional[sp.sparse.sparray], optional
Sparse covariance matrix, by default None
"""
self.scf = scf
if sparse_covariance is not None:
self._validate_sparse_matrix(sparse_covariance, name="sparse_covariance")
self._sparse_covariance: Optional[sp.sparse.sparray] = sparse_covariance
super().__init__()
@property
def scf(self) -> SparseCholeskyFactor:
return self._scf
@scf.setter
def scf(self, scf: SparseCholeskyFactor) -> None:
self._scf = scf
self._n_pts = self._scf.L.shape[0]
self._log_pdet = self._scf.log_pdet
self._subspace_size = self.n_pts
self._rank = self._n_pts
def _todense(self):
return self.scf.todense()
@property
def precision(self) -> NDArrayFloat:
"""
Explicit dense representation of the precision matrix.
"""
return self.scf.inv()
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
if self._sparse_covariance is not None:
return self._sparse_covariance @ x
return self.scf @ x
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return self.scf.whiten(x)
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
return self.scf.colorize(x)
[docs]
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
"""Solve Ax = b, with A, the current covariance matrix instance."""
return self.scf.solve(b)
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""
Return the diagonal entries of the matrix (variances).
The matrix is never built explicitly. Instead the matvec interface is
used to multiply all column of the identity matrix.
"""
return self.scf.get_diagonal()
[docs]
class CovViaPrecisionCholesky(CovarianceMatrix):
r"""
Representation of a covariance via the Cholesky factorization of its inverse
(aka the precision matrix).
Notes
-----
Let the covariance matrix be :math:`A`, its precision matrix be
:math:`P = A^{-1}`, and :math:`L` be the lower Cholesky factor such
that :math:`L L^T = P`.
Whitening of a data point :math:`x` is performed by computing
:math:`x^T L`. :math:`\log\det{A}` is calculated as
:math:`-2tr(\log{L})`, where the :math:`\log` operation is performed
element-wise.
This `Covariance` class does not support singular covariance matrices
because the precision matrix does not exist for a singular covariance
matrix.
Examples
--------
Prepare a symmetric positive definite precision matrix ``P`` and a
data point ``x``. (If the precision matrix is not already available,
consider the other factory methods of the `Covariance` class.)
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng()
>>> n = 5
>>> P = rng.random(size=(n, n))
>>> P = P @ P.T # a precision matrix must be positive definite
>>> x = rng.random(size=n)
Create the `Covariance` object.
>>> cov = covmats.CovViaPrecisionCholesky(np.linalg.cholesky(P))
Compare the functionality of the `Covariance` object against
reference implementations.
>>> res = cov.whiten(x)
>>> ref = x @ np.linalg.cholesky(P)
>>> np.allclose(res, ref)
True
>>> res = cov.log_pdet
>>> ref = -np.linalg.slogdet(P)[-1]
>>> np.allclose(res, ref)
True
"""
__slots__: List[str] = ["_L", "_precision"]
[docs]
def __init__(self, L: ArrayLike, precision: Optional[ArrayLike] = None) -> None:
"""
Initialize the instance.
Parameters
----------
L : NDArrayFloat
Lower triangle of the precision matrix Cholesky factorization.
covariance : Optional[NDArrayFloat], optional
Dense precision matrix, by default None
Raises
------
ValueError
_description_
"""
self.L = L
super().__init__()
if precision is not None:
self._precision: Optional[NDArrayFloat] = self._validate_dense_matrix(
precision, "precision"
)
else:
self._precision = None
@property
def L(self) -> NDArrayFloat:
"""Lower triangle of the precision matrix Cholesky factorization."""
return self._L
@L.setter
def L(self, L: NDArrayFloat) -> None:
"""Set the lower triangle of the precision matrix Cholesky factorization."""
self._L = self._validate_dense_lower_triangle(
self._validate_matrix(L, "L"), "L"
)
self._log_pdet = -2 * np.log(np.diag(self.L)).sum(axis=-1)
self.n_pts = np.shape(self.L)[0]
self._subspace_size = self.n_pts
self._rank = self.n_pts
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
return sp.linalg.cho_solve((self.L, True), x)
def _todense(self) -> NDArrayFloat:
return sp.linalg.cho_solve((self.L, True), np.eye(self.n_pts))
@property
def precision(self) -> NDArrayFloat:
"""
Explicit dense representation of the precision matrix.
"""
if self._precision is not None:
return self._precision
return self.L @ self.L.T
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return x @ self.L
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
m = x.T.shape[0]
res = sp.linalg.solve_triangular(self.L.T, x.T.reshape(m, -1), lower=False)
# L1^T @ b.T = x.T
return res.reshape(x.T.shape).T
[docs]
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
"""Solve Ax = b, with A, the current covariance matrix instance."""
if self._precision is not None:
return self._precision @ b
return self.L @ (self.L.T) @ b
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""Return the diagonal entries of the matrix (variances)."""
# Otherwise extract it using matrix vector products
approx_diag = np.zeros(self.n_pts)
# TODO: multiprocessing to speed up things ? Or by chunks of 1000 ?
for i in range(self.n_pts):
# construct the ith row of the identity matrix
v = np.zeros(self.n_pts)
v[i] = 1.0
approx_diag[i] = self.matvec(v)[i]
return approx_diag
[docs]
class CovViaSparsePrecisionCholesky(CovarianceMatrix):
"""
Representation of a covariance via the sparse Cholesky factorization of its
sparse inverse (aka the precision matrix).
Notes
-----
Blablabla.
"""
__slots__: List[str] = [
"_sparse_precision",
"_scf",
]
[docs]
def __init__(
self,
scf: SparseCholeskyFactor,
sparse_precision: Optional[sp.sparse.sparray] = None,
) -> None:
"""
Initialize the instance.
Parameters
----------
scf : SparseCholeskyFactor
Lower triangle of the sparse precision matrix Cholesky factorization.
sparse_precision : Optional[sp.sparse.sparray], optional
Sparse precision matrix (inverse of the covariance matrix), by default None
"""
self.scf = scf
if sparse_precision is not None:
self._sparse_precision = self._validate_sparse_matrix(
sparse_precision, "sparse_precision"
)
self._sparse_precision: Optional[sp.sparse.sparray] = sparse_precision
super().__init__()
@property
def scf(self) -> SparseCholeskyFactor:
return self._scf
@scf.setter
def scf(self, scf: SparseCholeskyFactor) -> None:
self._scf = scf
self._n_pts = self._scf.L.shape[0]
self._log_pdet = -self._scf.log_pdet
self._subspace_size = self.n_pts
self._rank = self._n_pts
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
return self.scf.solve(x)
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return self.scf.whiten_inv(x)
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
return self.scf.colorize_inv(x)
[docs]
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
"""Return x = A^{-1} b."""
if self._sparse_precision is not None:
return self._sparse_precision.dot(b)
return self.scf.mat @ b
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""
Return the diagonal entries of the matrix (variances).
The matrix is never built explicitly. Instead the matvec interface is
used to multiply all column of the identity matrix.
"""
return self.scf.get_invdiagonal()
def _todense(self):
return self.scf.inv()
@property
def precision(self) -> sp.sparse.sparray:
if self._sparse_precision is not None:
return self._sparse_precision
return self.scf.mat
[docs]
class CovViaEigenFactorization(CovarianceMatrix):
r"""
Representation of a covariance provided via eigenfactorization
Parameters
----------
eigenfactorization : sequence
A sequence (nominally a tuple) containing the eigenvalue and
eigenvector arrays as computed by `scipy.sparse.eigsh`.
Notes
-----
Let the covariance matrix be :math:`A`, let :math:`V` be matrix of
eigenvectors, and let :math:`W` be the diagonal matrix of eigenvalues
such that `V W V^T = A`.
When all of the eigenvalues are strictly positive, whitening of a
data point :math:`x` is performed by computing
:math:`x^T (V W^{-1/2})`, where the inverse square root can be taken
element-wise.
:math:`\log\det{A}` is calculated as :math:`tr(\log{W})`,
where the :math:`\log` operation is performed element-wise.
This `Covariance` class supports singular covariance matrices. When
computing ``_log_pdet``, non-positive eigenvalues are ignored.
Whitening is not well defined when the point to be whitened
does not lie in the span of the columns of the covariance matrix. The
convention taken here is to treat the inverse square root of
non-positive eigenvalues as zeros.
Examples
--------
Prepare a symmetric positive definite covariance matrix ``A`` and a
data point ``x``.
>>> import numpy as np
>>> import covmats
>>> rng = np.random.default_rng()
>>> n = 5
>>> A = rng.random(size=(n, n))
>>> A = A @ A.T # make the covariance symmetric positive definite
>>> x = rng.random(size=n)
Perform the eigenfactorization of ``A`` and create the `Covariance`
object.
>>> w, v = np.linalg.eigh(A)
>>> cov = covmats.CovViaEigenFactorization((w, v))
Compare the functionality of the `Covariance` object against
reference implementations.
>>> res = cov.whiten(x)
>>> ref = x @ (v @ np.diag(w**-0.5))
>>> np.allclose(res, ref)
True
>>> res = cov.log_pdet
>>> ref = np.linalg.slogdet(A)[-1]
>>> np.allclose(res, ref)
True
"""
__slots__: List[str] = ["_w", "_v", "_inv_w", "_sqrt_w", "_inv_sqrt_w"]
[docs]
def __init__(self, eigenfactorization: Tuple[NDArrayFloat, NDArrayFloat]) -> None:
"""
Initialize the instance.
Parameters
----------
(eig_vals, eig_vects) : Tuple[NDArrayFloat, NDArrayFloat]
- 1D vector of eigen values with size `n_pc`.
- 2D arrays of eigen vectors (columns) with size `(Ns, n_pc)`. Ns being the
number of elements in the original covariance matrix.
"""
eigenvalues, eigenvectors = eigenfactorization
# See if we do a common setter interface ?
self.eig_vals = eigenvalues
self.eig_vects = eigenvectors
super().__init__()
@property
def n_pc(self) -> int:
"""
Return the number of eigen vectors/values, i.e. principal components.
It is determined from the eigen values vector size.
"""
return self._w.size
@property
def subspace_size(self) -> int:
return self.n_pc
@property
def eig_vals(self) -> NDArrayFloat:
"""Return the Eigen values."""
return self._w
@eig_vals.setter
def eig_vals(self, value: NDArrayFloat) -> None:
"""Return the Eigen values."""
self._w = np.reshape(value, (-1, 1))
self._sqrt_w = np.sqrt(self._w)
self._inv_w = 1.0 / self._w
self._inv_sqrt_w = 1.0 / (self._sqrt_w)
self._log_pdet = np.sum(np.log(self._w), axis=0).item()
self._subspace_size = np.size(self._w)
self._rank = self._subspace_size
@property
def eig_vects(self) -> NDArrayFloat:
"""Return the Eigen vectors."""
return self._v
@eig_vects.setter
def eig_vects(self, value: NDArrayFloat) -> None:
self._v = value
self.n_pts = self._v.shape[0]
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""
Return the diagonal entries of the matrix (variances).
"""
return np.sum(((self._v.T * np.sqrt(self._w)) ** 2), axis=0)
def _todense(self) -> NDArrayFloat:
return np.dot(self._v, np.multiply(self._w, self._v.T))
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return x @ (self._inv_sqrt_w * self._v.T).T
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
return x @ (self._sqrt_w * self._v.T)
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
return np.dot(
self._v,
np.multiply(self._w, np.dot(self._v.T, x.reshape(-1, 1))),
)
def _matmat(self, X: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
assert np.shape(X)[0] == self.shape[0]
return np.dot(
self._v,
np.multiply(self._w, np.dot(self._v.T, X)),
)
[docs]
def solve(self, b: NDArrayFloat) -> NDArrayFloat:
r"""
Return $Q^{-1} b = ZD^{-1}Z^{T}b$.
Parameters
----------
b: NDArrayFloat
Column vector with shape ($N_{\mathrm{s}}$, 1) or ensemble matrix with
shape ($N_{\mathrm{s}}$, $N_e$).
Returns
-------
NDAarrayFloat
Column vector with shape ($N_{\mathrm{s}}$, 1) or ensemble matrix with
shape ($N_{\mathrm{s}}$, $N_e$).
"""
# np.dot(invZs.T, invZs)
# Note: x must be a column vector of a matrix with size (Ns, Ne)
ne = 1 # case of a column vector
if b.ndim > 1:
ne = b.shape[1]
return np.dot(
self._v,
np.multiply(1.0 / self._w, np.dot(self._v.T, b.reshape(-1, ne))),
)
[docs]
def get_sparse_LLT_factor(self) -> csc_array:
"""
Return the sparse factor L of the LL^T factorization of the eigen matrix.
Return
------
L: csc_array
L = U * V^{T/2}.
"""
# 1) Convert U sqrt(V) to a sparse format
sp_mat = sp.sparse.lil_array(self._v * np.sqrt(self._w).T)
# 2) Resize -> we now have a square matrix and indices are preserved
sp_mat.resize(self.shape)
# 3) Convert to column format
return sp_mat.tocsc()
@property
def precision(self) -> sp.sparse.sparray:
return np.dot(
self._v,
np.multiply(1.0 / self._w, self._v.T),
)
def get_eigen_factorization_gram_matrix_trick(anomalies: NDArrayFloat) -> NDArrayFloat:
"""
_summary_
Parameters
----------
anomalies : NDArrayFloat
_description_
Returns
-------
NDArrayFloat
_description_
"""
ne, n_pts = np.shape(anomalies)
if ne < n_pts:
GramM = anomalies @ anomalies.T # ne x ne matrix
else:
GramM = anomalies.T @ anomalies # n_pts x n_pts matrix
# Computed eigen decomposition
# Could be randomized for large matrices
eigvals, U = sp.linalg.eigh(GramM)
# Sort
idx = eigvals.argsort()[::-1]
eigvals = eigvals[idx]
U = U[:, idx]
# Keep only positive values(truncate)
tol = np.max(eigvals) * max(GramM.shape) * np.finfo(eigvals.dtype).eps
mask = eigvals > tol
eigvals = eigvals[mask]
U = U[:, mask]
# Rebuild the Eigen decomposition of anomalies
s = np.sqrt(eigvals)
if ne < n_pts:
# U has shape (ne, n_pc)
# anomalies has shape (ne, n_pts)
V = (anomalies.T @ U) / s
else:
# U has shape (n_pts, p_pc)
V = U
# Vt has shape (n_pc, n_pts)
return V, s
[docs]
class CovViaEnsemble(CovarianceMatrix):
r"""
Represents a covariance matrix as an ensemble of realizations.
For a given ensemble with shape (:math:`N_{s}`, :math:`N_{e}`), the number of
points and the number of members in the ensemble respectively, the covariance
matrix :math:`\mathbf{\Sigma_{ss}}` is approximated from the ensemble
in the standard way of EnKF
:cite:p:`evensenDataAssimilationEnsemble2007,aanonsenEnsembleKalmanFilter2009`:
.. math::
\mathbf{\Sigma_{ss}} = \frac{1}{N_{e} - 1} \sum_{j=1}^{N_{e}}\left(s_{j} -
\overline{s}\right)\left(s_{j}
- \overline{s^{l}} \right)^{T}
Or by defining a matrix of anomalies
:math:`\mathbf{A} = \mathbf{S} - \overline{\mathbf{S}}`
with shape (:math:`N_{s}`, :math:`N_{e}`):
.. math::
\mathbf{\Sigma_{ss}} = \frac{1}{N_{e} - 1} \mathbf{A}^{T}\mathbf{A}
Note
----
Practically, the dense covariance matrix is never built,
only the anomalies matrix :math:`\mathbf{A}` is used. The product between the
inverse of the covariance matrix and a vector
:math:`\mathbf{x} = \mathbf{\Sigma_{ss}}^{-1}\mathbf{b}`
is obtained solving the system :math:`\mathbf{A}^{T}\mathbf{Ax} = \mathbf{b}`,
using gmres, where only anomalies matrix vector products are required.
References
----------
.. bibliography::
:list: enumerated
:filter: False
evensenDataAssimilationEnsemble2007
aanonsenEnsembleKalmanFilter2009
"""
__slots__ = ["_ensemble", "_v", "_w", "_sqrt_w", "_inv_sqrt_w", "_inv_w"]
[docs]
def __init__(
self,
ensemble: NDArrayFloat,
) -> None:
"""
Initiate the instance.
Parameters
----------
ensemble : NDArrayFloat
Ensemble of realization with shape (:math:`N_{e}`, :math:`N_{s}`).
"""
self.ensemble = ensemble
super().__init__()
@property
def ensemble(self) -> NDArrayFloat:
return self._ensemble
@ensemble.setter
def ensemble(self, value: NDArrayFloat) -> None:
self._ensemble = value
# build preconditioner using Eigen decomposition
self._v, self._sqrt_w = get_eigen_factorization_gram_matrix_trick(
self.anomalies / np.sqrt((self.n_ens - 1))
)
self._w = self._sqrt_w**2
self._inv_w = 1.0 / self._w
self._inv_sqrt_w = 1.0 / (self._sqrt_w)
self.n_pts = np.shape(self.ensemble)[1]
self._subspace_size = np.shape(self._v)[1]
self._rank = 0
self._log_pdet = 0.0
@cached_property
def anomalies(self) -> NDArrayFloat:
"""
Return the matrix of anomalies.
"""
return self.ensemble - np.mean(self.ensemble, axis=0, keepdims=True)
@cached_property
def n_ens(self) -> int:
"""Return the number of members in the ensemble."""
return self.ensemble.shape[0]
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x (dot product)."""
return np.linalg.multi_dot([self.anomalies.T, self.anomalies, x]) / (
self.n_ens - 1
)
def _whiten(self, x: NDArrayFloat) -> NDArrayFloat:
"""
Perform a whitening transformation on data.
Parameters
----------
x : NDArrayFloat
Data to whiten with shape (..., n), n being the number of points covered
by the covariance matrix.
Returns
-------
NDArrayFloat
Transformed data with shape (..., n)
"""
return x @ (self._v * self._inv_sqrt_w[None, :])
def _colorize(self, x: NDArrayFloat) -> NDArrayFloat:
return x @ (self._sqrt_w[None, :].T * self._v.T)
def _todense(self) -> NDArrayFloat:
"""
Return a dense representation of the matrix.
"""
return self.anomalies.T @ self.anomalies / (self.n_ens - 1)
[docs]
def solve(
self, b: NDArrayFloat, rtol: float = 1e-12, maxiter: int = 1000
) -> NDArrayFloat:
"""
Solve A^{T}Ax = b, with A, the anomalies matrix instance.
Note that the dense covariance matrix is never built.
TODO: explains that it relies on the eigen decomposition.
"""
return np.linalg.multi_dot(
[
(self._v * self._inv_w[None, :]),
self._v.T,
b,
]
)
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""Return the diagonal entries of the matrix (variances)."""
return np.sum((self.anomalies**2), axis=0) / (self.n_ens - 1.0)
@property
def precision(self) -> NDArrayFloat:
return (self._v * self._inv_w[None, :]) @ self._v.T
def _generate_dense_matrix_from_kernel(
pts: NDArrayFloat, kernel: Callable, len_scale: NDArrayFloat, nugget: float = 0.0
) -> NDArrayFloat:
"""
Generate a dense matrix.
Compute O(dim^2) interactions.
Parameters
----------
pts : NDArrayFloat
DESCRIPTION.
kernel : Callable
DESCRIPTION.
len_scale: NDArrayFloat
DESCRIPTION.
nugget: float
It acts as a regularization. The default is 0.0 (no nugget effect).
Returns
-------
NDArrayFloat
The dense matrix.
"""
# Scale the points coordinates
scaled_pts = np.array(pts, copy=True)
for dim in range(scaled_pts.shape[1]):
scaled_pts[:, dim] /= len_scale[dim]
return kernel(sp.spatial.distance_matrix(scaled_pts, scaled_pts))
[docs]
class CovKernelAsLinop(abc.ABC, LinearOperator):
"""Abstract class providing linear operator capability from a kernel definition."""
__slots__: List[str] = [
"_kernel",
"_pts",
"_nugget",
"_preconditioner",
"count",
"solvematvecs",
]
[docs]
def __init__(
self,
pts: NDArrayFloat,
kernel: Callable,
len_scale: NDArrayFloat,
nugget: float = 0.0,
k: int = 100,
is_use_preconditioner: bool = False,
) -> None:
"""
Initialize the instance.
Parameters
----------
pts : NDArrayFloat
_description_
kernel : Callable
_description_
nugget : float, optional
_description_, by default 0.0
"""
self._kernel: Callable = kernel
self._pts: NDArrayFloat = pts
self._nugget: float = nugget
self._len_scale: NDArrayFloat = len_scale
# counters
self.count: int = 0
self.solvmatvecs: int = 0
super().__init__(shape=(self.n_pts, self.n_pts), dtype="d")
if is_use_preconditioner:
self._preconditioner: Optional[csr_array] = _build_kernel_preconditioner(
pts, kernel, k=k
)
else:
self._preconditioner = None
@property
def n_pts(self) -> int:
"""Return the number of points covered."""
return np.shape(self._pts)[0]
[docs]
def todense(self) -> NDArrayFloat:
"""
Explicit dense representation of the covariance matrix.
"""
return _generate_dense_matrix_from_kernel(
self._pts, self._kernel, self._len_scale, self._nugget
)
[docs]
def get_diagonal(self) -> NDArrayFloat:
"""Return the diagonal entries of the matrix (variances)."""
return self._kernel(np.zeros(len(self._pts)))
[docs]
def reset_comptors(self) -> None:
"""Set the comptors to zero."""
self.count = 0
self.solvmatvecs = 0
[docs]
def itercount(self) -> int:
"""Return the number of counts."""
return self.count
[docs]
def solve(
self, b: NDArrayFloat, rtol: float = 1e-12, maxiter: int = 1000
) -> NDArrayFloat:
"""
Solve Ax = b, with A, the current covariance matrix instance.
Notes
-----
It relies on GMRES and matrix-vector multiplications (column by column).
Parameters
----------
b : NDArrayFloat
Array with shape (n,), (n, 1) or (n, ne), ne being the number of
vectors (columns). `x` will have the same shape as b.
rtol : float, optional
Relative tolerance for the convergence of GMRES, by default 1e-12
maxiter : int, optional
Maximum number of GMRES iterations, by default 1000
Returns
-------
NDArrayFloat
The x array with same shape as b.
"""
residual = CallBack()
# make sure to have 2d array
_b = np.reshape(b, (self.n_pts, -1))
nv = np.shape(_b)[1]
x = np.zeros_like(_b)
# does not work for matrices ?
for i in range(nv):
x[:, i], info = gmres(
self,
b=_b[:, i],
rtol=rtol,
maxiter=maxiter,
callback=residual,
callback_type="legacy",
M=self._preconditioner,
atol=0.0,
)
self.solvmatvecs += residual.itercount
# Get the original shape back
return x.reshape(np.shape(b))
def _build_kernel_preconditioner(
pts: NDArrayFloat, kernel: Callable, k: int = 100
) -> csr_array:
"""
Implementation of the preconditioner based on changing basis.
Parameters
----------
pts : NDArrayFloat
The points (n, m) with n the number of data points and m the dimension of
coordinates.
kernel: Callable
The covariance kernel defined as a callable instance.
k : int, optional
Number of local centers in the preconditioner. Controls the sparity of
the preconditioner. By default 100.
Returns
-------
csr_array
_description_
Raises
------
ValueError
_description_
Notes:
------
Implementation of the preconditioner based on local centers.
The parameter k controls the sparsity and the effectiveness of the preconditioner.
Larger k is more expensive but results in fewer iterations.
For large ill-conditioned systems, it was best to use a nugget effect to make the
problem better conditioned.
To Do: implementation based on local centers and additional points. Will remove the
hack of using nugget effect.
"""
nb_pts: int = pts.shape[0]
if nb_pts <= 0:
raise ValueError("The number of points cannot be null !")
if nb_pts < k:
raise ValueError(
f"k ({k}) must be lower or equal to the number of points ({nb_pts})!"
)
# Build the tree
start: float = time()
tree: sp.spatial.cKDTree = sp.spatial.cKDTree(pts, leafsize=32)
end: float = time()
logging.log(logging.INFO, f"Tree building time = {end - start}")
# Find the nearest neighbors of all the points
start = time()
_dist, ind = tree.query(pts, k=k)
end = time()
logging.log(logging.INFO, f"Nearest neighbor computation time = {end - start}")
Q = np.zeros((k, k), dtype="d")
y = np.zeros((k, 1), dtype="d")
row = np.tile(np.arange(nb_pts), (k, 1)).transpose()
col = np.copy(ind)
nu = np.zeros((nb_pts, k), dtype="d")
y[0] = 1.0
start = time()
# TODO: This is very inefficient and must be re-written
for i in np.arange(nb_pts):
Q = kernel(cdist(pts[ind[i, :], :], pts[ind[i, :], :]))
nui = sp.linalg.solve(Q, y)
nu[i] = nui.ravel()
end = time()
logging.log(logging.INFO, "Elapsed time = %g" % (end - start))
ij = np.zeros((nb_pts * k, 2), dtype="i")
ij[:, 0] = np.copy(np.reshape(row, nb_pts * k, order="F").transpose())
ij[:, 1] = np.copy(np.reshape(col, nb_pts * k, order="F").transpose())
data = np.copy(np.reshape(nu, nb_pts * k, order="F").transpose())
return csr_array((data, ij.transpose()), shape=(nb_pts, nb_pts), dtype="d")
[docs]
class CovKernelAsLinopViaFFT(CovKernelAsLinop):
"""
Represents a fast fourier transform covariance matrix.
FFT based operations if kernel is stationary or translation invariant and points
are on a regular grid.
Notes
-----
This FFT-based code is a reimplementation of Arvind Saibaba's work
(https://github.com/arvindks/kle).
TODO: add references.
"""
__slots__: List[str] = ["_first_row"]
[docs]
def __init__(
self,
kernel,
mesh_dim: Union[float, NDArrayFloat, Sequence[float]],
domain_shape: Union[int, NDArrayInt, Sequence[int]],
len_scale: NDArrayFloat,
nugget: float = 0.0,
k: int = 100,
is_use_preconditioner: bool = False,
) -> None:
"""
Initialize the instance.
Parameters
----------
kernel : _type_
_description_
mesh_dim : Union[NDArrayInt, Tuple[float, float]]
_description_
domain_shape : Union[NDArrayInt, Tuple[int, int]]
_description_
len_scale : NDArrayFloat
_description_
nugget : float, optional
_description_, by default 0.0
k : int, optional
Number of local centers in the preconditioner. Controls the sparity of
the preconditioner. It should be inferior to the number of points.
By default 100.
is_use_preconditioner: bool
Whether to build the preconditioner at instance creation and use it to
solve Ax = b systems. The default is False.
"""
self.param_shape: NDArrayInt = np.array(domain_shape, dtype=np.int8)
# Coordinates of the points in the grid with shape (Npts, Ndim)
pts = get_pts_coords_regular_grid(mesh_dim, self.param_shape)
self._first_row = create_toepliz_first_row(pts, kernel, len_scale)
super().__init__(
pts,
kernel,
len_scale=len_scale,
nugget=nugget,
k=k,
is_use_preconditioner=is_use_preconditioner,
)
def _matvec(self, x: NDArrayFloat) -> NDArrayFloat:
"""Return the covariance matrix times the vector x."""
return toeplitz_product(x, self._first_row, self.param_shape) * (
1 + self._nugget
)
[docs]
def get_linop_eigen_factorization(
linop: LinearOperator,
size: int,
n_pc: int,
random_state: Optional[Union[int, RandomState, Generator]] = None,
) -> Tuple[NDArrayFloat, NDArrayFloat]:
"""
Compute Eigenmodes of the covariance.
Parameters
----------
linop: LinearOperator
The covariance matrix as a linear operator to factorize.
size: int
Input vector size for the linear operator.
n_pc : int
Number of principal component in the matrix.
random_state: Optional[Union[int, np.random.Generator, np.random.RandomState]]
Pseudorandom number generator state used to generate resamples.
If `random_state` is ``None`` (or `np.random`), the
`numpy.random.RandomState` singleton is used.
If `random_state` is an int, a new ``RandomState`` instance is used,
seeded with `random_state`.
If `random_state` is already a ``Generator`` or ``RandomState``
instance then that instance is used.
Raises
------
NotImplementedError
If a method difference from arpack is used for decomposition.
Returns
-------
Tuple[NDArrayFloat, NDArrayFloat]
Eigen values and eigen vectors.
"""
logging.info("Eigen factorization of linear operator")
# twopass = False if not 'twopass' in self.params else self.params['twopass']
start = time()
# Random state for v0 vector used by eigsh and svds
if random_state is not None:
random_state = check_random_state(random_state)
v0 = random_state.uniform(size=(size,))
else:
v0 = None
eig_vals, eig_vects = sp.sparse.linalg.eigsh(linop, k=n_pc, v0=v0)
eig_vals = eig_vals[::-1]
eig_vals = eig_vals.reshape(-1, 1) # make a column vector
eig_vects = eig_vects[:, ::-1]
logging.info(
"- time for eigenfactorization with k = %d is %g sec"
% (n_pc, round(time() - start))
)
if (eig_vals > 0).sum() < n_pc:
n_pc = (eig_vals > 0).sum()
eig_vals = eig_vals[:n_pc, :]
eig_vects = eig_vects[:, :n_pc]
logging.warning("Warning: n_pc changed to %d for positive eigenvalues" % (n_pc))
logging.info(
f"- 1st eigv : {eig_vals[0]}, {n_pc}-th eigv : {eig_vals[-1]}, "
f"ratio: {eig_vals[-1] / eig_vals[0]}"
)
return eig_vals, eig_vects
[docs]
def eigen_factorize_cov_mat(
cov_mat: Union[CovarianceMatrix, CovKernelAsLinop],
n_pc: int,
random_state: Optional[Union[int, RandomState, Generator]] = None,
) -> CovViaEigenFactorization:
"""
Return an eigen factorized covariance matrix from the input covariance matrix.
Parameters
----------
cov_mat : Union[CovarianceMatrix, CovKernelAsLinop]
The covariance matrix instance to decompose.
n_pc : int
Number of principal component in the matrix.
random_state: Optional[Union[int, np.random.Generator, np.random.RandomState]]
Pseudorandom number generator state used to generate resamples.
If `random_state` is ``None`` (or `np.random`), the
`numpy.random.RandomState` singleton is used.
If `random_state` is an int, a new ``RandomState`` instance is used,
seeded with `random_state`.
If `random_state` is already a ``Generator`` or ``RandomState``
instance then that instance is used.
Returns
-------
CovViaEigenFactorization
Decomposed matrix instance.
"""
if isinstance(cov_mat, CovViaEigenFactorization):
return cov_mat
return CovViaEigenFactorization(
get_linop_eigen_factorization(cov_mat, cov_mat.shape[0], n_pc, random_state)
)
[docs]
def get_explained_var(
eigval: NDArrayFloat,
cov_mat: Optional[CovarianceMatrix] = None,
trace_cov_mat: Optional[float] = None,
) -> NDArrayFloat:
"""Return the variance explained by each eigen value."""
if trace_cov_mat is not None:
return eigval / trace_cov_mat
if cov_mat is not None:
return eigval / cov_mat.get_trace()
else:
raise ValueError("You must provide a Covariance matrix instance or the trace !")