# -*- coding: utf-8 -*-
#
# Authors: Swolf <swolfforever@gmail.com>
# Date: 2021/1/07
# License: MIT License
from functools import partial
from typing import Union, Optional, Callable
import numpy as np
from numpy import ndarray
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.covariance import oas, ledoit_wolf, fast_mcd, empirical_covariance
from joblib import Parallel, delayed
from scipy.linalg import eigh
estimator = Callable[[ndarray], ndarray]
[docs]def isPD(B: ndarray) -> bool:
"""Returns true when input matrix is positive-definite, via Cholesky decompositon method.
Parameters
----------
B : ndarray
Any matrix, shape (N, N)
Returns
-------
bool
True if B is positve-definite.
Notes
-----
Use numpy.linalg rather than scipy.linalg. In this case, scipy.linalg has unpredictable behaviors.
"""
try:
_ = np.linalg.cholesky(B)
return True
except np.linalg.LinAlgError:
return False
[docs]def nearestPD(A: ndarray) -> ndarray:
"""Find the nearest positive-definite matrix to input.
Parameters
----------
A : ndarray
Any square matrxi, shape (N, N)
Returns
-------
A3 : ndarray
positive-definite matrix to A
Notes
-----
A Python/Numpy port of John D'Errico's `nearestSPD` MATLAB code [1]_, which
origins at [2]_.
References
----------
.. [1] https://www.mathworks.com/matlabcentral/fileexchange/42885-nearestspd
.. [2] N.J. Higham, "Computing a nearest symmetric positive semidefinite matrix" (1988):
https://doi.org/10.1016/0024-3795(88)90223-6
"""
B = (A + A.T) / 2
_, s, V = np.linalg.svd(B)
H = np.dot(V.T, np.dot(np.diag(s), V))
A2 = (B + H) / 2
A3 = (A2 + A2.T) / 2
if isPD(A3):
return A3
print("Replace current matrix with the nearest positive-definite matrix.")
spacing = np.spacing(np.linalg.norm(A))
# The above is different from [1]. It appears that MATLAB's `chol` Cholesky
# decomposition will accept matrixes with exactly 0-eigenvalue, whereas
# Numpy's will not. So where [1] uses `eps(mineig)` (where `eps` is Matlab
# for `numpy.spacing`), we use the above definition. CAVEAT: our `spacing`
# will be much larger than [1]'s `eps(mineig)`, since `mineig` is usually on
# the order of 1e-16, and `eps(1e-16)` is on the order of 1e-34, whereas
# `spacing` will, for Gaussian random matrixes of small dimension, be on
# othe order of 1e-16. In practice, both ways converge, as the unit test
# below suggests.
eye = np.eye(A.shape[0])
k = 1
while not isPD(A3):
mineig = np.min(np.real(np.linalg.eigvals(A3)))
A3 += eye * (-mineig * k**2 + spacing)
k += 1
return A3
def _lwf(X: ndarray) -> ndarray:
"""Wrapper for sklearn ledoit wolf covariance estimator.
Parameters
----------
X : ndarray
EEG signal, shape (n_channels, n_samples).
Returns
-------
C : ndarray
Estimated covariance, shape (n_channels, n_channels).
"""
C, _ = ledoit_wolf(X.T)
return C
def _oas(X: ndarray) -> ndarray:
"""Wrapper for sklearn oas covariance estimator.
Parameters
----------
X : ndarray
EEG signal, shape (n_channels, n_samples).
Returns
-------
C : ndarray
Estimated covariance, shape (n_channels, n_channels).
"""
C, _ = oas(X.T)
return C
def _cov(X: ndarray) -> ndarray:
"""Wrapper for sklearn sample covariance estimator.
Parameters
----------
X : ndarray
EEG signal, shape (n_channels, n_samples).
Returns
-------
C : ndarray
Estimated covariance, shape (n_channels, n_channels).
"""
C = empirical_covariance(X.T)
return C
def _mcd(X: ndarray) -> ndarray:
"""Wrapper for sklearn mcd covariance estimator.
Parameters
----------
X : ndarray
EEG signal, shape (n_channels, n_samples).
Returns
-------
C : ndarray
Estimated covariance, shape (n_channels, n_channels).
"""
_, C, _, _ = fast_mcd(X.T)
return C
estimators = {
"cov": _cov,
"lwf": _lwf,
"oas": _oas,
"mcd": _mcd,
}
def _check_est(est: Union[str, estimator]) -> estimator:
"""Check if a given estimator is valid.
Parameters
----------
est : callable object or str
Could be the name of estimator or a callable estimator itself.
Returns
-------
est: callable object
A callable estimator.
"""
if callable(est):
pass
elif est in estimators.keys():
est = estimators[est]
else:
raise ValueError(
"""%s is not an valid estimator ! Valid estimators are : %s or a
callable function"""
% (est, (" , ").join(estimators.keys()))
)
return est
[docs]def covariances(
X: ndarray, estimator: Union[str, estimator] = "cov", n_jobs: int = 1
) -> ndarray:
"""Estimation of covariance matrix.
Parameters
----------
X : ndarray
EEG signal, shape (..., n_channels, n_samples).
estimator : str or callable object, optional
Covariance estimator to use (the default is `cov`, which uses empirical covariance estimator). For regularization,
consider `lwf` or `oas`.
**supported estimators**
`cov`: empirial covariance estimator
`lwf`: ledoit wolf covariance estimator
`oas`: oracle approximating shrinkage covariance estimator
`mcd`: minimum covariance determinant covariance estimator
n_jobs : int or None, optional
The number of CPUs to use to do the computation (the default is 1, -1 for all processors).
Returns
-------
covmats : ndarray
covariance matrices, shape (..., n_channels, n_channels)
See Also
--------
covariances_erp
"""
X = np.asarray(X)
X = np.atleast_2d(X)
shape = X.shape
X = np.reshape(X, (-1, shape[-2], shape[-1]))
parallel = Parallel(n_jobs=n_jobs)
est = _check_est(estimator)
covmats = parallel(delayed(est)(x) for x in X)
covmats = np.reshape(covmats, (*shape[:-2], shape[-2], shape[-2]))
return covmats
[docs]class Covariance(BaseEstimator, TransformerMixin):
"""Estimation of covariance matrix.
Parameters
----------
estimator : str or callable object, optional
Covariance estimator to use (the default is `cov`, which uses empirical covariance estimator). For regularization,
consider `lwf` or `oas`.
**supported estimators**
`cov`: empirial covariance estimator
`lwf`: ledoit wolf covariance estimator
`oas`: oracle approximating shrinkage covariance estimator
`mcd`: minimum covariance determinant covariance estimator
n_jobs : int or None, optional
The number of CPUs to use to do the computation (the default is 1, -1 for all processors).
See Also
--------
ERPCovariance
"""
def __init__(self, estimator="cov", n_jobs=1):
self.estimator = estimator
self.n_jobs = n_jobs
[docs] def fit(self, X, y=None):
"""Not used, only for compatibility with sklearn API.
Parameters
----------
X : ndarray
EEG signal, shape (..., n_channels, n_samples).
y : ndarray
Labels.
Returns
-------
self : Covariance instance
The Covariance instance.
"""
return self
[docs]def matrix_operator(
Ci: ndarray, operator: estimator, n_jobs: Optional[int] = None
) -> ndarray:
"""Apply operator to any matrix.
Parameters
----------
Ci : ndarray
Input positive definite matrix.
operator : callable object
Operator function or callable object.
n_jobs: int, optional
the number of jobs to use.
Returns
-------
Co : ndarray
Operated matrix.
Raises
------
ValueError
If Ci is not positive definite.
Notes
-----
.. math::
\mathbf{Ci} = \mathbf{V} \left( \mathbf{\Lambda} \\right) \mathbf{V}^T \\\\
\mathbf{Co} = \mathbf{V} operator\left( \mathbf{\Lambda} \\right) \mathbf{V}^T
where :math:`\mathbf{\Lambda}` is the diagonal matrix of eigenvalues
and :math:`\mathbf{V}` the eigenvectors of :math:`\mathbf{Ci}`.
"""
def _single_matrix_operator(Ci: ndarray, operator: estimator) -> ndarray:
eigvals, eigvects = eigh(Ci, check_finite=False)
eigvals = np.diag(operator(eigvals))
Co = eigvects @ eigvals @ eigvects.T
return Co
ori_shape = Ci.shape
Ci = Ci.reshape((-1, *ori_shape[-2:]))
Co = Parallel(n_jobs=n_jobs)(
delayed(_single_matrix_operator)(C, operator) for C in Ci
)
Co = np.stack(Co)
Co = Co.reshape((*ori_shape,))
return Co
[docs]def sqrtm(Ci: ndarray, n_jobs: Optional[int] = None):
"""Return the matrix square root of a covariance matrix.
Parameters
----------
Ci : ndarray
Input positive-definite matrix.
Returns
-------
ndarray
Square root matrix of Ci.
Notes
-----
.. math::
\mathbf{C} = \mathbf{V} \left( \mathbf{\Lambda} \\right)^{1/2} \mathbf{V}^T
where :math:`\mathbf{\Lambda}` is the diagonal matrix of eigenvalues
and :math:`\mathbf{V}` the eigenvectors of :math:`\mathbf{Ci}`.
"""
return matrix_operator(Ci, np.sqrt, n_jobs=n_jobs)
[docs]def logm(Ci: ndarray, n_jobs: Optional[int] = None):
"""Return the matrix logrithm of a covariance matrix.
Parameters
----------
Ci : ndarray
Input positive-definite matrix.
Returns
-------
ndarray
Logrithm matrix of Ci.
Notes
-----
.. math::
\mathbf{C} = \mathbf{V} \log{(\mathbf{\Lambda})} \mathbf{V}^T
where :math:`\mathbf{\Lambda}` is the diagonal matrix of eigenvalues
and :math:`\mathbf{V}` the eigenvectors of :math:`\mathbf{Ci}`.
"""
return matrix_operator(Ci, np.log, n_jobs=n_jobs)
[docs]def expm(Ci: ndarray, n_jobs: Optional[int] = None):
"""Return the matrix exponential of a covariance matrix.
Parameters
----------
Ci : ndarray
Input positive-definite matrix.
Returns
-------
ndarray
Exponential matrix of Ci.
Notes
-----
.. math::
\mathbf{C} = \mathbf{V} \exp{(\mathbf{\Lambda})} \mathbf{V}^T
where :math:`\mathbf{\Lambda}` is the diagonal matrix of eigenvalues
and :math:`\mathbf{V}` the eigenvectors of :math:`\mathbf{Ci}`.
"""
return matrix_operator(Ci, np.exp, n_jobs=n_jobs)
[docs]def invsqrtm(Ci: ndarray, n_jobs: Optional[int] = None):
"""Return the inverse matrix square root of a covariance matrix.
Parameters
----------
Ci : ndarray
Input positive-definite matrix.
Returns
-------
ndarray
Inverse matrix square root of Ci.
Notes
-----
.. math::
\mathbf{C} = \mathbf{V} \left( \mathbf{\Lambda} \\right)^{-1/2} \mathbf{V}^T
where :math:`\mathbf{\Lambda}` is the diagonal matrix of eigenvalues
and :math:`\mathbf{V}` the eigenvectors of :math:`\mathbf{Ci}`.
"""
def isqrt(x):
return 1.0 / np.sqrt(x)
return matrix_operator(Ci, isqrt, n_jobs=n_jobs)
[docs]def powm(Ci: ndarray, alpha: float, n_jobs: Optional[int] = None):
"""Return the matrix power of a covariance matrix.
Parameters
----------
Ci : ndarray
Input positive-definite matrix.
alpha : float
Exponent.
Returns
-------
ndarray
Power matrix of Ci.
Notes
-----
.. math::
\mathbf{C} = \mathbf{V} \left( \mathbf{\Lambda} \\right)^{\\alpha} \mathbf{V}^T
where :math:`\mathbf{\Lambda}` is the diagonal matrix of eigenvalues
and :math:`\mathbf{V}` the eigenvectors of :math:`\mathbf{Ci}`.
"""
power = partial(lambda x, alpha=None: x**alpha, alpha=alpha)
return matrix_operator(Ci, power, n_jobs=n_jobs)