# ...
# Copyright 2021 Brooks M. Musangu and Jan Drugowitsch.
# Copyright 2014-2020 by the Elephant team.
# Modified BSD, see LICENSE.txt for details.
# ...
from __future__ import division, print_function, unicode_literals
import time
import warnings
import numpy as np
import scipy as sp
import scipy.linalg as linalg
import scipy.optimize as optimize
import sklearn
from sklearn.base import clone
from sklearn.decomposition import FactorAnalysis
from sklearn.gaussian_process.kernels import (
RBF,
ConstantKernel,
Kernel,
WhiteKernel
)
from tqdm import trange
__all__ = [
"BlockInvGPFA"
]
[docs]
class BlockInvGPFA(sklearn.base.BaseEstimator):
"""BlockInvGPFA is a high-performance implementation of Gaussian
Process Factor Analysis (GPFA) is a probabilistic dimensionality
reduction technique that extracts smooth latent trajectories from
noisy, high-dimensional time series data. GPFA combines Factor
Analysis (FA) for dimensionality reduction with Gaussian Processes
(GPs) to model the time courses of the latent factors in a unified,
probabilistic framework.
GPFA operates on a set of time-series data, where each time series is
given by an ``x_dim`` x ``bins`` matrix. ``x_dim`` needs to be the same
across all time-series, but their individual durations, as determined
by ``bins``, can vary across time-series. When fitting the model to data,
GPFA assumes all model parameters to be shared across the fitted
time-series.
Please consult the :ref:`main page <gpfa_prob_model>` for more details on
the underlying probabilistic model.
This implementation, BlockInvGPFA, builds on the core model described
in [Yu et al., 2009] and is adapted from the Elephant's toolkit, with
substantial algorithmic enhancements:
1. It uses block-matrix identities and Schur complement updates to
efficiently share kernel inversion computations across
variable-length trials.
2. It introduces a new variance-explained metric to evaluate BlockInvGPFA
model fits
Parameters
----------
bin_size : float, optional, default=0.02
Size of time-series data bins in seconds.
gp_kernel : GP kernel instance, optional, default=None
Please consult the `scikit-learn GP documentation
<https://scikit-learn.org/stable/modules/
gaussian_process.html#kernels-for-gaussian-processes>`_
for a list of available GP kernels.
If ``None`` is passed, the kernel defaults to::
ConstantKernel(1-0.001, constant_value_bounds='fixed')
* RBF(length_scale=0.1)
+ ConstantKernel(0.001, constant_value_bounds='fixed')
* WhiteKernel(noise_level=1, noise_level_bounds='fixed')
where only kernel hyperparameters not marked as ``fixed``
are learned - in the above case only the ``length scale``
of the RBF kernel.
.. note::
The ``gp_kernel`` can either be a single kernel (in which case it
will be replicated across all latent dimensions) or a sequence
of kernels, one per latent dimension.
z_dim : int, optional, default=3
Dimensionality of latent space.
min_var_frac : float, optional, default=0.01
Fraction of overall data variance for each observed dimension to set as
the private variance floor. This is used to combat Heywood cases,
where ML parameter learning returns one or more zero private variances
(see Martin & McDonald, Psychometrika, Dec 1975).
em_tol : float, optional, default=1e-5
Stopping criterion for Expectation Maximization (EM) algorithm.
em_max_iters : int, optional, default=500
Maximum number of EM iterations.
freq_ll : int, optional, default=5
Frequency with which data likelihood is updated across EM iterations.
verbose : int, optional, default=0
Determines whether to display status messages when calling :meth:`fit`.
- verbose <= 0: no output
- verbose >= 1: output of EM iteration statistics
- verbose >= 2: additional fitting information.
Attributes
----------
C_ : numpy.ndarray, shape (x_dim, z_dim)
Loading matrix parameters, mapping observed data space to latent
trajectory space.
d_ : numpy.ndarray, shape (x_dim, 1)
Observation mean parameter.
R_ : numpy.ndarray, shape (x_dim, x_dim)
Observation noise covariance parameter.
gp_kernel.theta : numpy.ndarray
Flattened and log-transformed non-fixed GP hyperparameters
for optimization.
fit_info_ : dict
Parameter fitting information. Updated with each call to :meth:`fit`.
- `iteration_time` : list
Runtime for each EM iteration.
- `log_likelihoods` : list
Log likelihoods after each EM iteration.
train_latent_seqs_ : numpy.recarray
Copy of latent variable time-courses, inferred for training data, and
augmented with additional fields.
Z_mu : numpy.ndarray, shape (z_dim, bins)
Latent variable posterior means for each time bin.
Z_cov : numpy.ndarray, shape (z_dim, z_dim, bins)
Latent variable posterior covariances across different time bins.
Z_covGP : numpy.ndarray, shape (bins, bins, z_dim)
Posterior covariance over time for each latent variable.
valid_data_names_ : tuple of str
Sequence of valid names for ``returned_data`` argument of
:meth:`predict` method.
Notes
-----
Two major uses cases of GPFA are:
1. **Inferring latent variable time-courses**: GPFA fits the data,
consisting of a set of high-dimensional time-series, using the
:func:`fit` method. It then returns the inferred, and, on request,
orthonormalized latent variable time-courses by calling :func:`predict`.
2. **Identifying the latent space dimensionality**: GPFA's main
hyperparameter is the dimensionality ``z_dim`` of the latent space.
The standard approach to identify the best value for ``z_dim`` is to
perform cross validation. First, one splits the data into a training and
a test set, and fits GPFA to the training set only, using :func:`fit`.
Second, one applies :func:`predict` to the test set, to compute the
test set data log-likelihood. This procedure is repeated for different
``z_dim`` values, to identify the ``z_dim`` leading to the best
test set data log-likelihood.
This GPFA class support the use of the `cross-validation (CV)
<https://scikit-learn.org/stable/modules/generated/
sklearn.model_selection.cross_validate.html#sklearn.
model_selection.cross_validate>`_ functions of
`sklearn.model_selection <https://scikit-learn.org/stable/modules/
classes.html#module-sklearn.model_selection>`_.
In both cases, the data provided to :func:`fit` is a sequence of matrices
:math:`\\boldsymbol{X}_1, \\boldsymbol{X}_2, \\dots` of arbitrary order,
each containing a separate time-series. Each matrix
:math:`\\boldsymbol{X}_n` is of size ``x_dim`` x ``bins``. Here, the
observation dimensionality ``x_dim`` needs to be shared across all
matrices, but their duration ``bins`` can differ.
The latent variable time-courses returned by :func:`predict` have a
similar structure. They are again a sequence of matrices,
:math:`\\boldsymbol{Z}_1, \\boldsymbol{Z}_2, \\dots`, where each
:math:`\\boldsymbol{Z}_n` corresponds to a provided
:math:`\\boldsymbol{X}_n`. The size of each :math:`\\boldsymbol{Z}_n` is
``z_dim`` x ``bins``, where ``bins`` equals that of the associated
:math:`\\boldsymbol{X}_n`.
GPFA fits the model parameters :math:`\\boldsymbol{C}`,
:math:`\\boldsymbol{d}`, and :math:`\\boldsymbol{R}` (stored in attributes
``C_``, ``d_``, and ``R_``) and the GP kernel parameters (stored
log-transformed in attribute ``gp_kernel.theta``). Please consult the
:ref:`main page <gpfa_prob_model>` for how these attributes parameterize
the probabilistic model underlying GPFA. Parameter fitting is performed by
the Expectation Maximization method. See :meth:`fit` for details.
Examples
--------
>>> import numpy as np
>>> from blockinvgpfa import BlockInvGPFA
>>> X = np.array([[
... [3, 1, 0, 4, 1, 2, 1, 3, 4, 2, 2, 1, 2, 0, 0, 2, 2, 5, 1, 3],
... [1, 0, 2, 0, 2, 1, 4, 2, 0, 1, 4, 4, 1, 2, 8, 3, 2, 1, 3, 1],
... [2, 2, 1, 1, 3, 2, 3, 2, 2, 0, 3, 4, 1, 2, 3, 1, 4, 1, 0, 1]
... ]])
>>>
>>> blockinv_gpfa = BlockInvGPFA(z_dim=1)
>>> # Fit the model
>>> blockinv_gpfa.fit(X)
Initializing parameters using factor analysis...
Fitting GP parameters by EM...
>>> # Infere latent variable time-courses for the same data
>>> Zs, _ = blockinv_gpfa.predict()
>>> print(Zs)
[array([[-1.18405633, -2.00878 , -0.01470251, -2.3143544 , 0.03651376,
-1.06948736, 2.05355342, -0.16920794, -2.26437342, -1.21934552,
1.98656088, 2.13305066, -1.14113106, 0.11319858, 6.14998095,
0.89241818, 0.03509708, -1.3936327 , 0.84781358, -1.2281484 ]])]
>>> # Display the loading matrix (C_) and observation mean (d_) parameters
>>> print(blockinv_gpfa.C_)
[[-0.78376501]
[ 1.77773876]
[ 0.51134474]]
>>> print(blockinv_gpfa.d_)
[1.95470037 2.08933859 1.89693338]
>>> # Obtaining log-likelihood scores
>>> llhs = blockinv_gpfa.score()
>>> print(llhs)
[-117.54588379661148, -107.17193271370158, ..., -100.13200154180569]
>>> # Evaluate the total explained variance regression score
>>> print(blockinv_gpfa.variance_explained())
(0.6581475357501596, array([0.65814754]))
Methods
-------
fit:
Fits model parameters to training data.
predict:
Provides inferred latent variable time-series.
score:
Returns the log-likelihood scores.
variance_explained:
Computes the total explained variance regression score.
"""
def __init__(self, bin_size=0.02, gp_kernel=None, z_dim=3,
min_var_frac=0.01, em_tol=1.0E-5,
em_max_iters=500, freq_ll=5, verbose=0):
self.bin_size = bin_size
self.z_dim = z_dim
self.gp_kernel = gp_kernel
self.min_var_frac = min_var_frac
self.em_tol = em_tol
self.em_max_iters = em_max_iters
self.freq_ll = freq_ll
self.valid_data_names_ = (
'Z_mu_orth',
'Z_mu',
'Z_cov',
'Z_covGP',
'X')
self.verbose = verbose
# ==================================
# Initialize state model parameters
# ==================================
# will be updated later
if self.gp_kernel is None: # Use an RBF kernel as default
self.gp_kernel = ConstantKernel(
1-0.001, constant_value_bounds='fixed'
) * RBF(length_scale=0.1) + ConstantKernel(
0.001, constant_value_bounds='fixed'
) * WhiteKernel(
noise_level=1, noise_level_bounds='fixed'
)
if isinstance(self.gp_kernel, Kernel):
self.gp_kernel = [
clone(self.gp_kernel) for _ in range(self.z_dim)
]
elif len(self.gp_kernel) != self.z_dim:
raise ValueError(
"The sequence length of gp_kernel: "
f"{len(self.gp_kernel)}, doesn't match with the "
f"number of latent dimensions: {self.z_dim}."
)
self.fit_info_ = {}
self.train_latent_seqs_ = None
[docs]
def fit(self, X, use_cut_trials=False):
"""
Fit the GPFA model parameters to the given training data using the
Expectation Maximization algorithm. The function also computes the
orthonormalization transform of the loading matrix for subsequent use
by :meth:`predict`.
Parameters
----------
X : array-like, default=None
An array-like sequence of high-dimensional time-series.
Each element :math:`\\boldsymbol{X}_n` in this sequence is an
``x_dim`` x ``bins`` matrix containing a sequence of
``x_dim``-dimensional observations along its columns. The
observation dimensionality ``x_dim`` needs to be the same across
each :math:`\\boldsymbol{X}_n`, but ``bins`` can differ across
them. The order in which the elements :math:`\\boldsymbol{X}_1,
\\boldsymbol{X}_2, \\dots` are provided in the sequence is
arbitrary and has no impact on the fitted parameters.
use_cut_trials : bool, optional, default=False
If True, long time-series are cut into multiple shorter ones to
potentially speed up training.
.. attention:: Cutting long time-series might worsen parameter
fits, in particular as it removes any long-distance temporal
dependencies that might exist in the data.
Returns
-------
self : object
Returns the instance itself.
Raises
------
ValueError
If covariance matrix of input data is rank deficient.
Notes
-----
The :meth:`fit` method finds the model parameters that best explain the
provided data using Expectation Maximization (EM), using the following
steps:
1. **Initialization**: The emission model parameters,
:math:`\\boldsymbol{C}`, :math:`\\boldsymbol{d}`, and
:math:`\\boldsymbol{R}` are initialized using Factor Analysis
while leaving the latent variable time-course unconstrained.
GP kernel parameters are left at their default values.
2. **Expecation step**: Given current parameter estimates,
the Expectation step computes the posterior distribution over
the latent variables :math:`\\boldsymbol{Z}`, and the complete
data log-likelihood.
3. **Maximization step**: Given the :matH:`\\boldsymbol{Z}`-posterior,
the Maximization step corresponds to finding the emission model
parameters (see first step) that maximize the expected complete data
log-likelihood analytically, and optimizes the GP kernel parameters
using gradient descent.
4. **EM iteration**: Steps 2 and 3 are repeated until either the change
in complete data likelihood drops below the set threshold, or if the
maximum number of interactions is reached.
**Orthonormalization:** Finally, this function computes an
orthonormalization transform to the loading matrix
:math:`\\boldsymbol{C}` that is in turn used by :meth:`predict` to
return an orthonormalized set of latent variables.
"""
# ====================================================================
# Cut trials: Extracts trial segments that are all of the same length.
# ====================================================================
X_in = X
if use_cut_trials:
# For compute efficiency, train on shorter segments of trials
if self.verbose >= 2:
print('Cutting trials to all have the same length')
X_in = self._cut_trials(X_in)
if len(X_in) == 0:
warnings.warn('No segments extracted for training. Defaulting '
'to segLength=Inf.')
X_in = self._cut_trials(X_in, seg_length=np.inf)
# =================================================
# Check if training data's covariance is full rank.
# =================================================
X_all = np.hstack(X_in)
x_dim = X_all.shape[0]
if np.linalg.matrix_rank(np.cov(X_all)) < x_dim:
errmesg = 'Observation covariance matrix is rank deficient.\n' \
'Possible causes: ' \
'repeated units, not enough observations.'
raise ValueError(errmesg)
if self.verbose >= 2:
print(f'Number of training trials: {len(X_in)}')
print(f'Latent space dimensionality: {self.z_dim}')
print(f'Observation dimensionality: {x_dim}')
# ========================================
# Initialize observation model parameters
# ========================================
if self.verbose >= 2:
print('Initializing parameters using factor analysis')
fa = FactorAnalysis(
n_components=self.z_dim, copy=True,
noise_variance_init=np.diag(np.cov(X_all, bias=True))
)
fa.fit(X_all.T)
self.d_ = X_all.mean(axis=1)
self.C_ = fa.components_.T
self.R_ = np.diag(fa.noise_variance_)
# Define parameter constraints
self.notes_ = {
'learnKernelParams': True,
'RforceDiagonal': True,
}
# =====================
# Fit model parameters
# =====================
if self.verbose >= 2:
print('Fitting GP parameters by EM')
self.train_latent_seqs_ = self._em(X_in)
# If `use_cut_trials=True` re-compute the latent sequence on a full
# X rather than on the cut_trial
if use_cut_trials:
self.train_latent_seqs_, _ = self._infer_latents(X)
# ===========================================
# compute the orthonormalization parameters.
# ===========================================
# Orthonormalize the columns of the loading matrix `C`.
if self.z_dim == 1:
# OrthTrans_ is transform matrix
self.OrthTrans_ = np.sqrt(np.dot(self.C_.T, self.C_))
# Orthonormalized loading matrix
Corth = np.linalg.solve(self.OrthTrans_.T, self.C_.T).T
else:
UU, DD, VV = sp.linalg.svd(self.C_, full_matrices=False)
self.OrthTrans_ = np.dot(np.diag(DD), VV)
# Orthonormalized loading matrix
Corth = UU
self.Corth_ = Corth
return self
[docs]
def predict(self, X=None, returned_data=['Z_mu_orth']):
"""
Provides the inferred latent variable time-courses and other
moments thereof, if requested.
Parameters
----------
X : array-like, default=None
An array-like sequence of time-series. The format is the same
as for :meth:`fit`.
.. note::
If ``X=None``, the latent state estimates for the last ``X``
that :meth:`fit` was called with are returned.
returned_data : list of str, default ['Z_mu_orth']
Determines which moments of the inferred latent variable
time-courses, and other data are returned. Valid strings are:
``'Z_mu'`` : posterior mean of latent variables before
orthonormalization
``'Z_mu_orth'`` : orthonormalized posterior mean of latent
variable
``'Z_cov'`` : posterior covariance between latent variables
``'Z_covGP'`` : posterior covariance over time for each latent
variable
``'X'`` : time-series data used to estimate the GPFA model
parameters
Returns
-------
numpy.ndarray or dict
Returns a single ``numpy.ndarray`` if only a single string is
provided to the ``returned_data`` argument, containing the
requested data. If multiple strings are provided, then the
method returns a dictionaries whose keys match the strings
provided to ``returned_data``.
Either way, the :math:`n` th item in either of the returned
`numpy.ndarray` provides the latent state moments associated with
the :math:`n` th time-series in the provided ``X``. Its size
differs depending on the requested moment, and is
``Z_mu``: numpy.ndarray of shape (z_dim x bins)
``Z_mu_orth``: numpy.ndarray of shape (z_dim x bins)
``Z_cov``: numpy.ndarray of shape (z_dim x z_dim x bins)
``Z_covGP``: numpy.ndarray of shape (bins x bins x z_dim)
``X`` : numpy.ndarray of shape (x_dim x bins)
.. note::
Note that the number of bins (``bins``) can vary across
elements in the sequence, reflecting the length of the time
series in the provided time-series data.
lls : float
data log likelihoods
Raises
------
ValueError
If `returned_data` contains strings that aren't present in
`self.valid_data_names_`.
"""
invalid_keys = set(returned_data).difference(self.valid_data_names_)
if len(invalid_keys) > 0:
raise ValueError("'returned_data' can only have the following "
f"entries: {self.valid_data_names_}")
if X is None:
seqs = self.train_latent_seqs_
lls = self.fit_info_['log_likelihoods']
else:
seqs, lls = self._infer_latents(X, get_ll=True)
if 'Z_mu_orth' in returned_data:
seqs = self._orthonormalize(seqs)
if len(returned_data) == 1:
return seqs[returned_data[0]], lls
return {i: seqs[i] for i in returned_data}, lls
[docs]
def score(self, X=None):
"""
Returns the `log-likelihood` scores. If ``X = None``, the training
data `log-likelihood` scores will be returned.
Parameters
----------
X : an array-like, default=None
An array-like sequence of time-series. The format is the same as
for :meth:`fit``.
.. note::
If ``X=None``, the log-likelihoods for the last ``X``
that :meth:`fit` was called with are returned.
Returns
-------
log_likelihood : list
List of log-likelihoods, one per element in ``X``.
"""
if X is None:
return self.fit_info_['log_likelihoods']
_, lls = self._infer_latents(X, get_ll=True)
return lls
[docs]
def variance_explained(self, X=None):
"""
Computes the total explained variance regression score using
:math:`R^2` (coefficient of determination) for the overall `X`.
The `total explained variance` is decomposed into individual
contributions by each orthonormalized latent trajectory.
Returns
-------
R2 : float
The total :math:`R^2` score, i.e., the total variance explained.
Calculated as
`(total variance - residual variance) / total variance`.
latent_R2s : numpy.array
An array containing the variance explained by each latent
trajectory.
Notes
-----
This function calculates the coefficient of determination (:math:`R^2`) to
measure how well the latent trajectories explain the variance in the
observed data. It follows these steps:
1. Compute the total explained variance (`var_total`) across all
bins and trials.
2. Calculate the residual variance (`var_res`) by decomposing the
total explained variance (`var_total`).
3. Compute the explained sum of squares (`SSreg`) for each latent
trajectory.
4. Sum up the `SSreg` values to obtain the total :math:`R^2` score.
5. Calculate the variance explained by each latent trajectory and
store it in `latent_R2s`.
"""
if X is None:
seqs, _ = self.predict(returned_data=['X', 'Z_mu_orth'])
X, Z_mu_orth = seqs['X'], seqs['Z_mu_orth']
else:
seqs, _ = self.predict(X=X, returned_data=['Z_mu_orth'])
Z_mu_orth = seqs
Corth = self.Corth_
x_mean = self.d_[:, np.newaxis]
SStot = 0.0
SSreg = np.zeros(Corth.shape[1])
Corth2 = np.sum(Corth ** 2, axis=0)
for i, x in enumerate(X):
xc = x - x_mean
SStot += np.sum(xc ** 2)
SSreg += 2 * np.sum(Z_mu_orth[i] * (Corth.T @ xc), axis=1) \
- Corth2 * np.sum(Z_mu_orth[i] ** 2, axis=1)
latent_R2s = SSreg / SStot
R2 = float(np.sum(latent_R2s))
return R2, latent_R2s
def _em(self, X):
"""
Fits GPFA model parameter attributes by Expectation Maximization (EM).
The method also updates ``self.fit_info_`` to store additional
fitting information.
Parameters
----------
X : an array-like, default=None
An array-like sequence of time-series. The format is the same as
for :meth:`fit``.
Returns
-------
latent_seqs : numpy.recarray
a copy of the training data structure, augmented with the new
fields:
Z_mu : numpy.ndarray of shape (z_dim x bins)
posterior mean of latent variables at each time bin
Z_cov : numpy.ndarray of shape (z_dim, z_dim, bins)
posterior covariance between latent variables at each
timepoint
Z_covGP : numpy.ndarray of shape (bins, bins, z_dim)
posterior covariance over time for each latent
variable
"""
lls = []
ll_prev = ll_base = ll = 0.0
iter_time = []
var_floor = self.min_var_frac * np.diag(np.cov(np.hstack(X)))
Tall = np.array([X_n.shape[1] for X_n in X])
Tmax = max(Tall)
Tsdt = np.arange(0, Tmax) * self.bin_size
unique_Ts = np.unique(Tall)
# ============== Make Precomp_init ==============
# assign some helpful precomp items
precomp = {'Tsdt': Tsdt[:, np.newaxis], 'Tu': np.empty(len(unique_Ts),
dtype=[('nList', object), ('T', int), ('numTrials', int),
('PautoSUM', object)])}
# Loop once for each unique trial length
for j, trial_len_num in enumerate(unique_Ts):
precomp['Tu'][j]['nList'] = np.where(Tall == trial_len_num)[0]
precomp['Tu'][j]['T'] = trial_len_num
precomp['Tu'][j]['numTrials'] = len(precomp['Tu'][j]['nList'])
precomp['Tu'][j]['PautoSUM'] = np.zeros(
(self.z_dim, precomp['Tu'][j]['T'], precomp['Tu'][j]['T']))
# Loop over EM algorothm iterations
with trange(self.em_max_iters, desc='EM iteration ', total=np.inf,
disable=(self.verbose <= 0)) as t:
for iter_id in t:
tic = time.time()
# ==== E STEP =====
if not np.isnan(ll):
ll_prev = ll
# recompute llh every freq_ll iterations (and in first 2)
get_ll = ((iter_id + 1) % self.freq_ll == 0) or (iter_id <= 1)
if get_ll:
latent_seqs, ll = self._infer_latents(X)
else:
latent_seqs = self._infer_latents(X, get_ll=False)
ll = np.nan
lls.append(ll)
if iter_id <= 1:
ll_base = ll # set baseline in first two iterations
t.set_postfix(llh=ll, delta_llh=0.0)
# ==== M STEP ====
sum_p_auto = np.zeros((self.z_dim, self.z_dim))
for seq_latent in latent_seqs:
sum_p_auto += seq_latent['Z_cov'].sum(axis=2) \
+ seq_latent['Z_mu'].dot(seq_latent['Z_mu'].T)
X_all = np.hstack(X)
Z_all = np.hstack(latent_seqs['Z_mu'])
sum_XZtrans = X_all.dot(Z_all.T)
sum_Zall = Z_all.sum(axis=1)[:, np.newaxis]
sum_Xall = X_all.sum(axis=1)[:, np.newaxis]
# term is (z_dim+1) x (z_dim+1)
term = np.vstack(
[np.hstack([sum_p_auto, sum_Zall]),
np.hstack([sum_Zall.T, Tall.sum().reshape((1, 1))])]
)
# x_dim x (z_dim+1)
cd = np.linalg.solve(
term.T, np.hstack([sum_XZtrans, sum_Xall]).T).T
self.C_ = cd[:, :self.z_dim]
self.d_ = cd[:, -1]
# xd must be based on the new d
# xd = X * d
# R = (X * X.T - 2 * xd * (
# (sum_XZtrans - d.dot(sum_Zall.T)) * c).sum(axis=1)
# ) * Tall.sum()
c = self.C_
d = self.d_[:, np.newaxis]
if self.notes_['RforceDiagonal']:
sum_XXtrans = (X_all * X_all).sum(axis=1)[:, np.newaxis]
xd = sum_Xall * d
term = ((sum_XZtrans - d.dot(sum_Zall.T)) * c).sum(axis=1)
term = term[:, np.newaxis]
r = d ** 2 + (sum_XXtrans - 2 * xd - term) / Tall.sum()
# Set minimum private variance
r = np.maximum(var_floor, r)
self.R_ = np.diag(r[:, 0])
else:
sum_XXtrans = X_all.dot(X_all.T)
xd = sum_Xall.dot(d.T)
term = (sum_XZtrans - d.dot(sum_Zall.T)).dot(c.T)
r = d.dot(d.T) + (sum_XXtrans - xd - xd.T - term) / Tall.sum()
self.R_ = (r + r.T) / 2 # ensure symmetry
if self.notes_['learnKernelParams']:
self._learn_gp_params(latent_seqs, precomp)
t_end = time.time() - tic
iter_time.append(t_end)
# Verify that likelihood is growing monotonically
if iter_id > 1 and not np.isnan(ll):
delta_ll = np.inf if ll_prev == ll_base \
else (ll - ll_prev) / (ll_prev - ll_base)
t.set_postfix(llh=ll, delta_llh=delta_ll)
if ll < ll_prev:
if self.verbose >= 2:
t.write(f'Error: Data likelihood has decreased '
f'from {ll_prev} to {ll}')
elif delta_ll < self.em_tol:
break
if self.verbose >= 2:
if len(lls) < self.em_max_iters:
print(f'Fitting has converged after {len(lls)} EM iterations')
else:
print('Maximum number of EM iterations reached')
if np.any(np.diag(self.R_) == var_floor):
warnings.warn('Private variance floor used for one or more '
'observed dimensions in BlockInvGPFA.')
self.fit_info_ = {'iteration_time': iter_time, 'log_likelihoods': lls}
return latent_seqs
def _infer_latents(self, X, get_ll=True):
"""
Inferrs latent trajectories from observed data given GPFA model
parameters.
Parameters
----------
X : an array-like, default=None
An array-like sequence of time-series. The format is the same as
for :meth:`fit``.
get_ll : bool, optional, default=True
specifies whether to compute data log likelihood
Returns
-------
latent_seqs : numpy.recarray
X_out : numpy.ndarray
input data structure, whose n-th element (corresponding to the
n-th experimental trial) has fields:
X : numpy.ndarray of shape (x_dim, bins)
Z_mu : (z_dim, bins) numpy.ndarray
posterior mean of latent variables at each time bin
Z_cov : (z_dim, z_dim, bins) numpy.ndarray
posterior covariance between latent variables at each timepoint
Z_covGP : (bins, bins, z_dim) numpy.ndarray
posterior covariance over time for each latent trajectory
ll : float
data log likelihood, returned when `get_ll` is set True
"""
x_dim = self.C_.shape[0]
# copy the contents of the input data structure to output structure
X_out = np.empty(len(X), dtype=[('X', object)])
for s, seq in enumerate(X_out):
seq['X'] = X[s]
dtype_out = [(i, X_out[i].dtype) for i in X_out.dtype.names]
dtype_out.extend([('Z_mu', object), ('Z_cov', object),
('Z_covGP', object)])
latent_seqs = np.empty(len(X_out), dtype=dtype_out)
for dtype_name in X_out.dtype.names:
latent_seqs[dtype_name] = X_out[dtype_name]
# Precomputations
if self.notes_['RforceDiagonal']:
rinv = np.diag(1.0 / np.diag(self.R_))
logdet_r = (np.log(np.diag(self.R_))).sum()
else:
rinv = linalg.inv(self.R_)
rinv = (rinv + rinv.T) / 2 # ensure symmetry
logdet_r = self._logdet(self.R_)
c_rinv = self.C_.T.dot(rinv)
c_rinv_c = c_rinv.dot(self.C_)
# Get all trial lengths and find the unique lengths
# Find the maximum trial length
Tall = [X_n.shape[1] for X_n in X]
unique_Ts = np.unique(Tall)
Tmax = max(unique_Ts)
ll = 0.
K_big = self._make_k_big(Tmax)
blah = [c_rinv_c for _ in range(Tmax)]
C_rinv_c_big = linalg.block_diag(*blah) # (z_dim*T) x (z_dim*T)
# Overview:
# - Outer loop on each element of Tu.
# - Do inference and LL computation for all those trials together.
for t in unique_Ts:
if t == unique_Ts[0]:
K_big_inv = linalg.inv(K_big[:t * self.z_dim, :t * self.z_dim])
K_big_inv = (K_big_inv + K_big_inv.T) / 2
logdet_k_big = self._logdet(K_big[:t * self.z_dim, :t * self.z_dim])
M = K_big_inv + C_rinv_c_big[:t * self.z_dim,:t * self.z_dim]
M_inv = linalg.inv(M)
M_inv = (M_inv + M_inv.T) / 2
logdet_M = self._logdet(M)
else:
# Here, we compute the inverse of K for the current t from its
# known inverse for the previous t, using block matrix
# inversion identities. We also use those to update the
# previously computed M_inv by using the (top-left block of
# the) new K_big_inv rather than the K_big_inv for the previous
# t. This updated M_inv (here called MAinv) is in turn used to
# compute the M_inv for the current t using similar block
# matrix inversion identities.
K_big_inv, logdet_k_big, MAinv, logdet_MAinv = \
self._sym_block_inversion(
K_big[:t * self.z_dim, :t * self.z_dim],
K_big_inv,
-logdet_k_big,
M_inv, -logdet_M
)
M = K_big_inv + C_rinv_c_big[:t * self.z_dim, :t * self.z_dim]
M_inv, logdet_M = self._sym_block_inversion(
M, MAinv,
logdet_MAinv
)
# Note that posterior covariance does not depend on observations,
# so can compute once for all trials with same T.
# z_dim x z_dim x T posterior covariance for each timepoint
vsm = np.full((self.z_dim, self.z_dim, t), np.nan)
idx = np.arange(0, self.z_dim * t + 1, self.z_dim)
for i in range(t):
vsm[:, :, i] = M_inv[idx[i]:idx[i + 1], idx[i]:idx[i + 1]]
# T x T posterior covariance for each GP
vsm_gp = np.full((self.z_dim, t, t), np.nan)
for i in range(self.z_dim):
vsm_gp[i, :, :] = M_inv[i::self.z_dim, i::self.z_dim]
# Process all trials with length T
n_list = np.where(Tall == t)[0]
# dif is x_dim x sum(T)
dif = np.hstack(latent_seqs[n_list]['X']) - self.d_[:, np.newaxis]
# term1Mat is (z_dim*T) x length(nList)
term1_mat = c_rinv.dot(dif).reshape(
(self.z_dim * t, -1), order='F'
)
# latent_variable Matrix (Z_mat) is (z_dim*T) x length(nList)
Z_mat = M_inv.dot(term1_mat)
for i, n in enumerate(n_list):
latent_seqs[n]['Z_mu'] = \
Z_mat[:, i].reshape((self.z_dim, t), order='F')
latent_seqs[n]['Z_cov'] = vsm
latent_seqs[n]['Z_covGP'] = vsm_gp
if get_ll:
# Compute data likelihood
val = -t * logdet_r - logdet_k_big - logdet_M \
- x_dim * t * np.log(2 * np.pi)
ll = ll + len(n_list) * val - (rinv.dot(dif) * dif).sum() \
+ (term1_mat.T.dot(M_inv) * term1_mat.T).sum()
if get_ll:
ll /= 2
return latent_seqs, ll
return latent_seqs
def _learn_gp_params(self, latent_seqs, precomp):
"""
Updates parameters of GP kernels, given latent trajectories.
Parameters
----------
latent_seqs : numpy.recarray
data structure containing trajectories
precomp : numpy.recarray
The precomp structure will be updated with the
posterior covariance
"""
precomp = self._fill_p_auto_sum(latent_seqs, precomp)
# Loop once for each state dimension (each GP)
for i in trange(self.z_dim, desc='fitting latents ', leave=False,
disable=(self.verbose <= 0)):
gp_kernel_i = self.gp_kernel[i]
init_theta = self.gp_kernel[i].theta
res_opt = optimize.minimize(
self._grad_bet_theta,
init_theta,
args=(gp_kernel_i, precomp, i),
method='L-BFGS-B',
jac=True
)
self.gp_kernel[i].theta = res_opt.x
for j in range(len(precomp['Tu'])):
precomp['Tu'][j]['PautoSUM'][i, :, :].fill(0)
def _orthonormalize(self, seqs):
"""
Perform orthonormalization transform of the latent variables.
Parameters
----------
seqs : numpy.recarray
Contains the embedding of the training data into the latent
variable space.
Data structure, whose n-th entry (corresponding to the n-th
experimental trial) has field
X : numpy.ndarray of shape (x_dim, bins)
observed data
Z_mu : numpy.ndarray of shape (z_dim, bins)
posterior mean of latent variables at each time bin
Z_cov : numpy.ndarray of shape (z_dim, z_dim, bins)
posterior covariance between latent variables at each timepoint
Z_covGP : numpy.ndarray of shape (bins, bins, z_dim)
posterior covariance over time for each latent trajectory
Returns
-------
seqs : numpy.recarray
Training data structure that contains the new field
`Z_mu_orth`, the orthonormalized trajectories.
"""
Z_all = np.hstack(seqs['Z_mu'])
Z_mu_orth = np.dot(self.OrthTrans_, Z_all)
# add the field `Z_mu_orth` to seq
seqs = self._segment_by_trial(seqs, Z_mu_orth, 'Z_mu_orth')
return seqs
def _logdet(self, A):
"""
log(det(A)) where A is positive-definite.
This is faster and more stable than using log(det(A)).
"""
U = np.linalg.cholesky(A)
return 2 * (np.log(np.diag(U))).sum()
def _cut_trials(self, X_in, seg_length=20):
"""
Extracts trial segments that are all of the same length. Uses
overlapping segments if trial length is not integer multiple
of segment length. Ignores trials with length shorter than
one segment length.
Parameters
----------
X_in : an array-like, default=None
An array-like of observation sequences, one per trial.
Each element in `X` is a matrix of size ``x_dim x bins``,
containing an observation sequence. The input dimensionality
``x_dim`` needs to be the same across elements in `X`, but ``bins``
can be different for each observation sequence.
seg_length : int
length of segments to extract, in number of timesteps. If infinite,
entire trials are extracted, i.e., no segmenting.
Default: 20
Returns
-------
X_out : np.ndarray
data structure containing np.ndarrays whose n-th element
(corresponding to the n-th segment) has shape of
(x_dim x seg_length)
Raises
------
ValueError
If `seq_length == 0`.
"""
if seg_length == 0:
raise ValueError("At least 1 extracted trial must be returned")
if np.isinf(seg_length):
X_out = X_in
return X_out
X_out_buff = []
for n, X_in_n in enumerate(X_in):
T = X_in_n.shape[1]
# Skip trials that are shorter than segLength
if T < seg_length:
warnings.warn(
f'trial corresponding to index {n} is \
shorter than one segLength...'
'skipping')
continue
numSeg = int(np.ceil(float(T) / seg_length))
# Randomize the sizes of overlaps
if numSeg == 1:
cumOL = np.array([0, ])
else:
totalOL = (seg_length * numSeg) - T
probs = np.ones(numSeg - 1, float) / (numSeg - 1)
randOL = np.random.multinomial(totalOL, probs)
cumOL = np.hstack([0, np.cumsum(randOL)])
seg = np.empty(numSeg, object)
for n_seg in range(numSeg):
tStart = seg_length * n_seg - cumOL[n_seg]
seg[n_seg] = X_in_n[:, tStart:tStart + seg_length]
X_out_buff.append(seg)
if len(X_out_buff) > 0:
X_out = np.hstack(X_out_buff)
else:
X_out = np.empty(0)
return X_out
def _sym_block_inversion(
self, M, Ainv, logdet_Ainv, X=None, logdet_X=None
):
"""
Inverts the symmetric matrix M,
[ A B ]^-1 [ MAinv MBinv ]
Minv = M^-1 = [ ] = [ ]
[ B^T D ] [ MBinv^T MDinv ]
exploiting the existing knowledge of Ainv = A^-1 and its
log-determinant logdet_Ainv = log(|A^-1|) to speed up the computation,
see `Block matrix inversion
<https://en.wikipedia.org/wiki/Block_matrix>`_.
This function is faster than calling inv(M) directly. It also supports
a faster computation of (MAinv + X)^-1 for some X, if given, where X
must be the same size as Ainv.
Parameters
----------
M : numpy.ndarray
The symmetric matrix to be inverted.
Ainv : numpy.ndarray
The (symmetric) inverse of the top-left block of M.
logdet_Ainv : float
The log-determinant of A^-1 already known.
X : numpy.ndarray, optional
An arbitrary matrix of the same size as Ainv.
logdet_X : float, optional
The log-determinant of X already known.
Returns
-------
Minv : numpy.ndarray
Inverse of M
logdet_M : float
Log-determinant of M
MAinvPXinv : numpy.ndarray
The inverse of (MAinv + X)
loget_MAinv : float
Log-determinant of MAinvPXinv
"""
t = len(Ainv)
B = M[:t, t:]
D = M[t:, t:]
AinvB = Ainv @ B
MD = D - AinvB.T @ B
MDinv = linalg.inv(MD)
MCinv = - MDinv @ AinvB.T
MAinv = Ainv + AinvB @ - MCinv
Minv = np.block([
[MAinv, MCinv.T],
[MCinv, MDinv]
])
Minv = (Minv + Minv.T) / 2 # Ensure symmetry
# Check if MD is positive definite
try:
logdet_MD = self._logdet(MD) # Use Cholesky decomposition if possible
except np.linalg.LinAlgError:
logdet_MD = np.linalg.slogdet(MD)[1] # Fallback to slogdet for non-PD matrices
warnings.warn("Cholesky decomposition failed for MD; using slogdet instead. "
"MD may not be positive definite.",
UserWarning)
logdet_M = -logdet_Ainv + logdet_MD
if X is not None:
if logdet_X is None:
logdet_X = self._logdet(X)
MDpAinvBXAinvB = MD + AinvB.T @ X @ AinvB
MAinv = X - X @ AinvB @ linalg.inv(MDpAinvBXAinvB) @ AinvB.T @ X
logdet_MAinv = logdet_MD + logdet_X - np.linalg.slogdet(MDpAinvBXAinvB)[1]
return Minv, logdet_M, MAinv, logdet_MAinv
return Minv, logdet_M
def _make_k_big(self, n_timesteps):
"""
Constructs full GP covariance matrix across all state dimensions and
timesteps.
Parameters
----------
n_timesteps : int
number of timesteps
Returns
-------
K_big : numpy.ndarray
GP covariance matrix with dimensions (z_dim * T) x (z_dim * T).
The (t1, t2) block is diagonal, has dimensions z_dim x z_dim, and
represents the covariance between the state vectors at timesteps
t1 and t2. K_big is sparse and striped.
"""
K_big = np.zeros((self.z_dim * n_timesteps, self.z_dim * n_timesteps))
tsdt = np.arange(0, n_timesteps) * self.bin_size
for i in range(self.z_dim):
K = self.gp_kernel[i](tsdt[:, np.newaxis])
K_big[i::self.z_dim, i::self.z_dim] = K
return K_big
def _fill_p_auto_sum(self, Seqs, precomp):
"""
Fill the PautoSUM item in precomp
Parameters
----------
Seqs : numpy.recarray
The sequence structure of inferred latents, etc.
precomp : numpy.recarray
The precomp structure will be updated with the
posterior covariance.
Returns
-------
precomp : numpy.recarray
Structure containing precomputations.
.. note::
All inputs are named sensibly to those in
:funct:`_learn_gp_params`. This code probably should not be
called from anywhere but there.
We bother with this method because we need this particular matrix
sum to be as fast as possible. Thus, no error checking is done
here as that would add needless computation. Instead, the onus is
on the caller (which should be :func:`_learn_gp_params()`) to make
sure this is called correctly.
Finally, see the notes in the BlockInvGPFA README.
"""
############################################################
# Fill out PautoSum
############################################################
# Loop once for each state dimension (each GP)
for i in range(self.z_dim):
# Loop once for each trial length (each of Tu)
for j in range(len(precomp['Tu'])):
# Loop once for each trial (each of nList)
for n in precomp['Tu'][j]['nList']:
precomp['Tu'][j]['PautoSUM'][i, :, :] += \
Seqs[n]['Z_covGP'][i, :, :] \
+ np.outer(
Seqs[n]['Z_mu'][i, :], Seqs[n]['Z_mu'][i, :]
)
return precomp
def _grad_bet_theta(self, theta, gp_kernel_i, precomp, i):
"""
Gradient computation for GP timescale optimization.
This function is called by `_learn_gp_params()`
Parameters
----------
theta : numpy.array
the flattened and log-transformed non-fixed hyperparams
to which optimization is performed,
where :math:`{\\theta} = log(\\text{kernel parameters})`
gp_kernel_i : kernel instance
the i-th GP kernel corresponding to the i-th latent trajectory
precomp : numpy.recarray
structure containing precomputations
i : int
The i-th index of the i-th latent trajectory
Returns
-------
f : float
values of objective function E[log P({x},{y})] at {\\theta}
df_arr : numpy.array
gradients at {\\theta}
"""
gp_kernel_i.theta = theta
Kmax, K_gradient = gp_kernel_i(
precomp['Tsdt'], eval_gradient=True
)
dEdtheta = np.zeros(len(theta))
f = 0.0
for j in range(len(precomp['Tu'])):
T = precomp['Tu'][j]['T']
if j == 0:
Kinv = linalg.inv(Kmax[:T, :T])
Kinv = (Kinv + Kinv.T) / 2
logdet_K = self._logdet(Kmax[:T, :T])
else:
# Here, we compute the inverse of K for the current
# T from its known inverse for the previous T,
# using block matrix inversion identities.
Kinv, logdet_K = self._sym_block_inversion(
Kmax[:T, :T], Kinv, -logdet_K
)
Thalf = int(np.ceil(T / 2.0))
mkr = int(np.ceil(0.5 * T ** 2))
numTrials = precomp['Tu'][j]['numTrials']
PautoSUM = precomp['Tu'][j]['PautoSUM'][i, :, :]
for k, dKdtheta in enumerate(K_gradient.T):
KinvM = Kinv[:Thalf, :].dot(dKdtheta[:T, :T]) # Thalf x T
KinvMKinv = (KinvM.dot(Kinv)).T # Thalf x T
dg_KinvM = np.diag(KinvM)
tr_KinvM = 2 * dg_KinvM.sum() - np.fmod(T, 2) * dg_KinvM[-1]
pauto_kinv_dot = PautoSUM.ravel('F')[:mkr].dot(
KinvMKinv.ravel('F')[:mkr])
pauto_kinv_dot_rest = PautoSUM.ravel('F')[-1:mkr - 1:- 1].dot(
KinvMKinv.ravel('F')[:(T ** 2 - mkr)])
dEdtheta[k] = dEdtheta[k] - 0.5 * numTrials * tr_KinvM \
+ 0.5 * pauto_kinv_dot \
+ 0.5 * pauto_kinv_dot_rest
f = f - 0.5 * numTrials * logdet_K \
- 0.5 * (PautoSUM * Kinv).sum()
f = -f
df_arr = -dEdtheta
return f, df_arr
def _segment_by_trial(self, seqs, Z, fn):
"""
Segment and store data by trial.
Parameters
----------
seqs : numpy.recarray
Data structure that has field Z, the observations
Z : numpy.ndarray
Data to be segmented (any dimensionality Z total number of
timesteps)
fn : str
New field name of seq where segments of X are stored
Returns
-------
seqs_new : numpy.recarray
Data structure with new field `fn`
Raises
------
ValueError
If "`All timespets` != Z.shape[1]".
"""
T_all = [X_n.shape[1] for X_n in seqs['X']]
if np.sum(T_all) != Z.shape[1]:
raise ValueError('size of X incorrect.')
dtype_new = [(i, seqs[i].dtype) for i in seqs.dtype.names]
dtype_new.append((fn, object))
seqs_new = np.empty(len(seqs), dtype=dtype_new)
for dtype_name in seqs.dtype.names:
seqs_new[dtype_name] = seqs[dtype_name]
ctr = 0
for n, T_n in enumerate(T_all):
seqs_new[n][fn] = Z[:, ctr:ctr + T_n]
ctr += T_n
return seqs_new