from typing import Optional, List, Tuple, Any
import numpy as np
from numpy import ndarray
from scipy import linalg as sLA
from math import sqrt, pow
from abc import abstractmethod, ABCMeta
[docs]def sign_sta(
x: float):
"""Standardization of decision coefficient based on sign(x).
Args:
x (float)
Returns:
y (float): y=sign(x)*x^2
"""
x = np.real(x)
return (abs(x) / x) * (x ** 2)
[docs]def combine_feature(
features: List[ndarray],
func: Any = sign_sta):
"""Coefficient-level integration.
Args:
features (List[float or int or ndarray]): Different features.
func (function): Quantization function.
Returns:
coef (the same type with elements of features): Integrated coefficients.
"""
coef = np.zeros_like(features[0])
for feature in features:
coef += func(feature)
return coef
[docs]def combine_fb_feature(
features: List[Any]):
"""Coefficient-level integration specially for filter-bank design.
Args:
features (List[Any]): Coefficient matrices of different sub-bands.
Returns:
coef (float): Integrated coefficients.
"""
coef = np.zeros_like(features[0])
for nf, feature in enumerate(features):
coef += (pow(nf + 1, -1.25) + 0.25) * (feature ** 2)
return coef
[docs]def pick_subspace(
descend_order: List[Tuple[int, float]],
e_val_sum: float,
ratio: float):
"""Config the number of subspaces.
Args:
descend_order (List[Tuple[int,float]]): See it in solve_gep() or solve_ep().
e_val_sum (float): Trace of covariance matrix.
ratio (float): 0-1. The ratio of the sum of eigenvalues to the total.
Returns:
n_components (int): The number of subspaces.
"""
temp_val_sum = 0.0
for n_components, do in enumerate(descend_order): # n_sp: n_subspace
temp_val_sum += do[-1]
if temp_val_sum > ratio * e_val_sum:
return n_components + 1
[docs]def solve_gep(
A: ndarray,
B: ndarray,
n_components: Optional[int] = None,
ratio: float = 0.5,
mode: Optional[str] = 'Max'):
"""Solve generalized problems | generalized Rayleigh quotient:
f(w)=wAw^T/(wBw^T) -> Aw = lambda Bw -> B^{-1}Aw = lambda w
Args:
A (ndarray): (m,m).
B (ndarray): (m,m).
n_components (int): Number of eigenvectors picked as filters.
Eigenvectors are referring to eigenvalues sorted in descend order.
ratio (float): 0-1. The ratio of the sum of eigenvalues to the total.
mode (str): 'Max' or 'Min'. Depends on target function.
Returns:
w (ndarray): (Nk,m). Picked eigenvectors.
"""
e_val, e_vec = sLA.eig(sLA.solve(a=B, b=A, assume_a='sym')) # ax=b -> x=a^{-1}b
e_val_sum = np.sum(e_val)
descend_order = sorted(enumerate(e_val), key=lambda x: x[1], reverse=True)
w_index = [do[0] for do in descend_order]
if not n_components:
n_components = pick_subspace(descend_order, e_val_sum, ratio)
if mode == 'Min':
return np.real(e_vec[:, w_index][:, n_components:].T)
elif mode == 'Max':
return np.real(e_vec[:, w_index][:, :n_components].T)
[docs]def pearson_corr(
X: ndarray,
Y: ndarray):
"""Pearson correlation coefficient (1-D or 2-D).
Args:
X (ndarray): (..., n_points)
Y (ndarray): (..., n_points). The dimension must be same with X.
Returns:
corrcoef (float)
"""
# check if not zero_mean():
# X,Y = zero_mean(X), zero_mean(Y)
cov_xy = np.sum(X * Y)
var_x = np.sum(X ** 2)
var_y = np.sum(Y ** 2)
corrcoef = cov_xy / sqrt(var_x * var_y)
return corrcoef
# %% Basic TRCA object
[docs]class BasicTRCA(metaclass=ABCMeta):
def __init__(self,
standard: Optional[bool] = True,
ensemble: Optional[bool] = True,
n_components: Optional[int] = 1,
ratio: float = 0.5):
"""Basic configuration.
Args:
standard (bool, optional): Standard TRCA model. Defaults to True.
ensemble (bool, optional): Ensemble TRCA model. Defaults to True.
n_components (int): Number of eigenvectors picked as filters.
Set to 'None' if ratio is not 'None'.
ratio (float): 0-1. The ratio of the sum of eigenvalues to the total.
Defaults to be 'None' when n_components is not 'None'.
"""
# config model
self.n_components = n_components
self.ratio = ratio
self.standard = standard
self.ensemble = ensemble
[docs] @abstractmethod
def fit(self,
X_train: ndarray,
y_train: ndarray,
sine_template: ndarray):
"""Load in training dataset and train model.
Args:
X_train (ndarray): (Ne*Nt,...,Np). Training dataset.
y_train (ndarray): (Ne*Nt,). Labels for X_train.
"""
pass
[docs] @abstractmethod
def predict(self,
X_test: ndarray):
"""Predict test data.
Args:
X_test (ndarray): (Ne*Nte,...,Np). Test dataset.
Return:
y_standard (ndarray): (Ne*Nte,). Predict labels.
y_ensemble (ndarray): (Ne*Nte,). Predict labels (ensemble).
"""
pass
[docs]class BasicFBTRCA(metaclass=ABCMeta):
def __init__(self,
standard: Optional[bool] = True,
ensemble: Optional[bool] = True,
n_components: Optional[int] = 1,
n_bands: int = 1,
ratio: float = 0.5):
"""Basic configuration.
Args:
standard (bool, optional): Standard TRCA model. Defaults to True.
ensemble (bool, optional): Ensemble TRCA model. Defaults to True.
n_components (int): Number of eigenvectors picked as filters.
Set to 'None' if ratio is not 'None'.
ratio (float): 0-1. The ratio of the sum of eigenvalues to the total.
Defaults to be 'None' when n_components is not 'None'.
"""
# config model
self.n_components = n_components
self.n_bands = n_bands
self.ratio = ratio
self.standard = standard
self.ensemble = ensemble
[docs] @abstractmethod
def fit(self,
X_train: ndarray,
y_train: ndarray,
sine_template: ndarray):
"""Load in training dataset and train model.
Args:
X_train (ndarray): (Nb,Ne*Nt,...,Np). Training dataset.
y_train (ndarray): (Ne*Nt,). Labels for X_train.
"""
pass
[docs] def predict(self,
X_test: ndarray):
"""Calculating the prediction labels based on the decision coefficients.
Args:
X_test (ndarray): (Nt*Nte,Nc,Np). Test dataset.
Return:
y_standard (ndarray): (Nt*Nte,). Predict labels of sc-TRCA.
y_ensemble (ndarray): (Nt*Nte,). Predict labels of sc-eTRCA.
"""
# basic information
n_test = X_test.shape[1]
self.fb_y_standard: List[List] = [[] for nb in range(self.n_bands)]
self.fb_y_ensemble: List[List] = [[] for nb in range(self.n_bands)]
for nb in range(self.n_bands):
fb_results = self.sub_models[nb].predict(X_test=X_test[nb])
self.fb_y_standard[nb] = fb_results[1]
self.fb_y_ensemble[nb] = fb_results[3]
# integration of multi-bands' results
self.y_standard = np.empty((n_test))
self.y_ensemble = np.empty_like(self.y_standard)
self.rou, self.erou = self.transform(X_test)
for nte in range(n_test):
self.y_standard[nte] = np.argmax(self.rou[nte, :])
self.y_ensemble[nte] = np.argmax(self.erou[nte, :])
return self.y_standard, self.y_ensemble
[docs]def sctrca_compute(
X_train: ndarray,
y_train: ndarray,
sine_template: ndarray,
train_info: dict,
n_components: Optional[int] = 1,
ratio: float = 0.5):
"""(Ensemble) similarity-constrained TRCA (sc-(e)TRCA).
Args:
X_train (ndarray): (Ne*Nt,Nc,Np). Training dataset. Nt>=2.
y_train (ndarray): (Ne*Nt,). Labels for X_train.
sine_template (ndarray): (Ne,2*Nh,Np). Sinusoidal template.
train_info (dict): {'event_type':ndarray (Ne,),
'n_events':int,
'n_train':ndarray (Ne,),
'n_chans':int,
'n_points':int,
'standard':True,
'ensemble':True}
n_components (int): Number of eigenvectors picked as filters.
Set to 'None' if ratio is not 'None'.
ratio (float): 0-1. The ratio of the sum of eigenvalues to the total.
Defaults to be 'None'.
Return: sc-(e)TRCA model (dict).
Q (ndarray): (Ne,Nc,Nc). Covariance of original data & average template.
S (ndarray): (Ne,Nc,Nc). Covariance of template.
u (List[ndarray]): Ne*(Nk,Nc). Spatial filters for EEG signal.
v (List[ndarray]): Ne*(Nk,2*Nh). Spatial filters for sinusoidal signal.
u_concat (ndarray): (Ne*Nk,Nc). Concatenated filter for EEG signal.
v_concat (ndarray): (Ne*Nk,2*Nh). Concatenated filter for sinusoidal signal.
uX (List[ndarray]): Ne*(Nk,Np). sc-TRCA templates for EEG signal.
vY (List[ndarray]): Ne*(Nk,Np). sc-TRCA templates for sinusoidal signal.
euX (List[ndarray]): (Ne,Ne*Nk,Np). sc-eTRCA templates for EEG signal.
evY (List[ndarray]): (Ne,Ne*Nk,Np). sc-eTRCA templates for sinusoidal signal.
"""
# basic information
event_type = train_info['event_type']
n_events = train_info['n_events'] # Ne
n_train = train_info['n_train'] # [Nt1,Nt2,...]
n_chans = train_info['n_chans'] # Nc
n_points = train_info['n_points'] # Np
standard = train_info['standard'] # bool
ensemble = train_info['ensemble'] # bool
n_2harmonics = sine_template.shape[1] # 2*Nh
S = np.zeros((n_events, n_chans + n_2harmonics, n_chans + n_2harmonics)) # (Ne,Nc+2Nh,Nc+2Nh)
Q = np.zeros_like(S) # (Ne,Nc+2Nh,Nc+2Nh)
avg_template = np.zeros((n_events, n_chans, n_points)) # (Ne,Nc,Np)
for ne, et in enumerate(event_type):
train_trials = n_train[ne] # Nt
X_temp = X_train[y_train == et] # (Nt,Nc,Np)
avg_template[ne] = np.mean(X_temp, axis=0) # (Nc,Np)
YY = sine_template[ne] @ sine_template[ne].T # (2Nh,2Nh)
XX = np.zeros((n_chans, n_chans)) # (Nc,Nc)
for tt in range(train_trials):
XX += X_temp[tt] @ X_temp[tt].T
XmXm = avg_template[ne] @ avg_template[ne].T # (Nc,Nc)
XmY = avg_template[ne] @ sine_template[ne].T # (Nc,2Nh)
# block covariance matrix S: [[S11,S12],[S21,S22]]
S[ne, :n_chans, :n_chans] = XmXm # S11
S[ne, :n_chans, n_chans:] = (1 - 1 / train_trials) * XmY # S12
S[ne, n_chans:, :n_chans] = S[ne, :n_chans, n_chans:].T # S21
S[ne, n_chans:, n_chans:] = YY # S22
# block covariance matrix Q: blkdiag(Q1,Q2)
for ntr in range(n_train[ne]):
Q[ne, :n_chans, :n_chans] += X_temp[ntr] @ X_temp[ntr].T # Q1
Q[ne, n_chans:, n_chans:] = train_trials * YY # Q2
# GEP | train spatial filters
u, v, ndim, correct = [], [], [], [False for ne in range(n_events)]
for ne in range(n_events):
spatial_filter = solve_gep(
A=S[ne],
B=Q[ne],
n_components=n_components,
ratio=ratio
)
ndim.append(spatial_filter.shape[0]) # Nk
u.append(spatial_filter[:, :n_chans]) # (Nk,Nc)
v.append(spatial_filter[:, n_chans:]) # (Nk,2Nh)
u_concat = np.zeros((np.sum(ndim), n_chans)) # (Ne*Nk,Nc)
v_concat = np.zeros((np.sum(ndim), n_2harmonics)) # (Ne*Nk,2Nh)
start_idx = 0
for ne, dims in enumerate(ndim):
u_concat[start_idx:start_idx + dims] = u[ne]
v_concat[start_idx:start_idx + dims] = v[ne]
start_idx += dims
# signal templates
uX, vY = [], [] # Ne*(Nk,Np)
euX = np.zeros((n_events, u_concat.shape[0], n_points)) # (Ne,Ne*Nk,Np)
evY = np.zeros_like(euX)
if standard:
for ne in range(n_events):
uX.append(u[ne] @ avg_template[ne]) # (Nk,Np)
vY.append(v[ne] @ sine_template[ne]) # (Nk,Np)
if ensemble:
for ne in range(n_events):
euX[ne] = u_concat @ avg_template[ne] # (Nk*Ne,Np)
evY[ne] = v_concat @ sine_template[ne] # (Nk*Ne,Np)
# sc-(e)TRCA model
model = {
'Q': Q, 'S': S,
'u': u, 'v': v, 'u_concat': u_concat, 'v_concat': v_concat,
'uX': uX, 'vY': vY, 'euX': euX, 'evY': evY, 'correct': correct
}
return model
# %% similarity constrained (e)TRCA | sc-(e)TRCA
[docs]class SC_TRCA(BasicTRCA):
[docs] def fit(self,
X_train: ndarray,
y_train: ndarray,
sine_template: ndarray):
"""Train sc-(e)TRCA model.
Args:
X_train (ndarray): (Ne*Nt,Nc,Np). Training dataset. Nt>=2.
y_train (ndarray): (Ne*Nt,). Labels for X_train.
sine_template (ndarray): (Ne,2*Nh,Np). Sinusoidal template.
"""
# basic information
self.X_train = X_train
self.y_train = y_train
event_type = np.unique(y_train) # [0,1,2,...,Ne-1]
self.train_info = {
'event_type': event_type,
'n_events': len(event_type),
'n_train': np.array([np.sum(self.y_train == et) for et in event_type]),
'n_chans': self.X_train.shape[-2],
'n_points': self.X_train.shape[-1],
'standard': self.standard,
'ensemble': self.ensemble
}
# train sc-TRCA models & templates
model = sctrca_compute(
X_train=self.X_train,
y_train=self.y_train,
sine_template=sine_template,
train_info=self.train_info,
n_components=self.n_components,
ratio=self.ratio
)
self.Q, self.S = model['Q'], model['S']
self.u, self.v = model['u'], model['v']
self.u_concat, self.v_concat = model['u_concat'], model['v_concat']
self.uX, self.vY = model['uX'], model['vY']
self.euX, self.evY = model['euX'], model['evY']
self.correct = model['correct']
return self
[docs] def predict(self,
X_test: ndarray):
"""Calculating the prediction labels based on the decision coefficients.
Args:
X_test (ndarray): (Nt*Nte,Nc,Np). Test dataset.
Return:
y_standard (ndarray): (Nt*Nte,). Predict labels of sc-TRCA.
y_ensemble (ndarray): (Nt*Nte,). Predict labels of sc-eTRCA.
"""
# basic information
n_test = X_test.shape[0]
event_type = self.train_info['event_type']
self.rou, self.erou = self.transform(X_test)
self.y_standard = np.empty((n_test))
self.y_ensemble = np.empty_like(self.y_standard)
if self.standard:
for nte in range(n_test):
self.y_standard[nte] = event_type[np.argmax(self.rou[nte, :])]
if self.ensemble:
for nte in range(n_test):
self.y_ensemble[nte] = event_type[np.argmax(self.erou[nte, :])]
return self.y_standard, self.y_ensemble
[docs]class FB_SC_TRCA(BasicFBTRCA):
[docs] def fit(self,
X_train: ndarray,
y_train: ndarray,
sine_template: ndarray):
"""Train filter-bank sc-(e)TRCA model.
Args:
X_train (ndarray): (Nb,Ne*Nt,Nc,Np). Training dataset. Nt>=2.
y_train (ndarray): (Ne*Nt,). Labels for X_train.
sine_template (ndarray): (Ne,2*Nh,Np). Sinusoidal template.
"""
# basic information
self.X_train = X_train
self.y_train = y_train
self.sine_template = sine_template
self.n_bands = X_train.shape[0]
# train sc-TRCA models & templates
self.sub_models = [[] for nb in range(self.n_bands)]
for nb in range(self.n_bands):
self.sub_models[nb] = SC_TRCA(
standard=self.standard,
ensemble=self.ensemble,
n_components=self.n_components,
ratio=self.ratio
)
self.sub_models[nb].fit(
X_train=self.X_train[nb],
y_train=self.y_train,
sine_template=self.sine_template
)
return self