Source code for stepmix.emission.gaussian

"""Gaussian emission models."""
import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal
from sklearn.mixture import GaussianMixture
from sklearn.mixture._gaussian_mixture import (
    _estimate_gaussian_parameters,
    _compute_precision_cholesky,
)

from stepmix.emission.emission import Emission
from stepmix.utils import check_in, check_nonneg, cov_np_to_df


[docs]class GaussianUnit(Emission): """Gaussian emission model with fixed unit variance. sklearn.mixture.GaussianMixture does not have an implementation for fixed unit variance, so we provide one. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "gaussian_unit"
[docs] def m_step(self, X, resp): self.parameters["means"] = (resp[..., np.newaxis] * X[:, np.newaxis, :]).sum( axis=0 ) / resp.sum(axis=0, keepdims=True).T
[docs] def log_likelihood(self, X): n, D = X.shape log_eps = np.zeros((n, self.n_components)) for c in range(self.n_components): log_eps[:, c] = multivariate_normal.logpdf( x=X, mean=self.parameters["means"][c], cov=1 ) return log_eps
[docs] def sample(self, class_no, n_samples): D = self.parameters["means"].shape[1] X = self.random_state.normal( loc=self.parameters["means"][class_no], scale=np.ones(D), size=(n_samples, D), ) return X
@property def n_parameters(self): return self.parameters["means"].shape[0] * self.parameters["means"].shape[1]
[docs]class Gaussian(Emission): """Gaussian emission model with various covariance options. This class spoofs the scikit-learn Gaussian Mixture class by reusing the same attributes and calls its methods.""" def __init__( self, n_components=2, covariance_type="spherical", init_params="random", reg_covar=1e-6, random_state=None, ): super().__init__(n_components=n_components, random_state=random_state) self.covariance_type = covariance_type self.init_params = init_params self.reg_covar = reg_covar # Actual parameters # We spoof the sklearn GaussianMixture class # This is not needed for typical children of the Emission class. We do this only to be compatible # with the sklearn GaussianMixture machinery. # Normally you should keep all parameters in a dict under self.parameters self.parameters = None self.means_ = None self.covariances_ = None self.precisions_cholesky_ = None # This is for compatibility with Gaussian mixture methods. Could be implemented in the future. self.means_init = None self.weights_init = None self.precisions_init = None
[docs] def check_parameters(self): super().check_parameters() check_in( ["spherical", "tied", "diag", "full"], covariance_type=self.covariance_type ) check_in(["random"], init_params=self.init_params) check_nonneg(reg_covar=self.reg_covar)
[docs] def initialize(self, X, resp, random_state=None): self.check_parameters() # Required to get the initial means, covariances and precisions right # Already performs the M-step GaussianMixture._initialize(self, X, resp)
[docs] def m_step(self, X, resp): """M step. Adapted from the gaussian mixture class to accept responsibilities instead of log responsibilities. Parameters ---------- X : array-like of shape (n_samples, n_features) resp : array-like of shape (n_samples, n_components) Posterior probabilities (or responsibilities) of the point of each sample in X. """ n_samples, _ = X.shape self.weights_, self.means_, self.covariances_ = _estimate_gaussian_parameters( X, resp, self.reg_covar, self.covariance_type ) self.weights_ /= n_samples self.precisions_cholesky_ = _compute_precision_cholesky( self.covariances_, self.covariance_type ) # Save attributes to dict for compatible api with other stepmix models self.parameters = self.get_parameters()
[docs] def log_likelihood(self, X): return GaussianMixture._estimate_log_prob(self, X)
[docs] def sample(self, class_no, n_samples): if self.covariance_type == "full": X = self.random_state.multivariate_normal( self.means_[class_no], self.covariances_[class_no], n_samples ) elif self.covariance_type == "tied": X = self.random_state.multivariate_normal( self.means_[class_no], self.covariances_, n_samples ) else: n_features = self.means_.shape[1] X = self.means_[class_no] + self.random_state.standard_normal( size=(n_samples, n_features) ) * np.sqrt(self.covariances_[class_no]) return X
[docs] def get_parameters(self): return dict( means=self.means_.copy(), covariances=self.covariances_.copy(), precisions_cholesky=self.precisions_cholesky_.copy(), )
[docs] def set_parameters(self, params): # We spoof the sklearn GaussianMixture class # This is not needed for typical children of the Emission class. We do this only to be compatible # with the sklearn GaussianMixture machinery. # This will update self.means_, self.covariances_ and self.precisions_cholesky_ if "precisions_cholesky" not in params: params["precisions_cholesky"] = _compute_precision_cholesky( params["covariances"], self.covariance_type ) GaussianMixture._set_parameters( self, ( None, params["means"], params["covariances"], params["precisions_cholesky"], ), ) # Save attributes to dict for compatible api with other stepmix models self.parameters = self.get_parameters()
@property def n_parameters(self): n = GaussianMixture._n_parameters(self) # Remove class weights from the count # We add them back in the main StepMix class n -= self.n_components - 1 return n
[docs] def permute_classes(self, perm, axis=0): # Latent classes are on first axis self.means_ = self.means_[perm] # Tied has a single covariance (and precisions) shared among all classes # No need to permute if self.covariance_type != "tied": self.covariances_ = self.covariances_[perm] self.precisions_cholesky_ = self.precisions_cholesky_[perm]
[docs]class GaussianFull(Gaussian): def __init__(self, **kwargs): # Make sure no other covariance_type is specified kwargs.pop("covariance_type", None) super().__init__(covariance_type="full", **kwargs) self.model_str = "gaussian_full"
[docs] def print_parameters( self, indent=1, feature_names=None, index=["class_no", "param"], columns=["model_name", "variable"], ): """Flipping class_no and variable is nicer for full covariances.""" super().print_parameters( indent=indent, feature_names=feature_names, index=index, columns=columns )
[docs] def get_parameters_df(self, feature_names=None): """Return self.parameters into a long dataframe. Call self._to_df or implement custom method.""" if feature_names is None: n_features = self.parameters["means"].shape[1] feature_names = self.get_default_feature_names(n_features) means = self._to_df( param_dict=self.parameters, keys=["means"], feature_names=feature_names ) cov = cov_np_to_df( self.parameters["covariances"], feature_names, self.model_str ) return pd.concat([means, cov])
[docs]class GaussianSpherical(Gaussian): def __init__(self, **kwargs): # Make sure no other covariance_type is specified kwargs.pop("covariance_type", None) super().__init__(covariance_type="spherical", **kwargs) self.model_str = "gaussian_spherical"
[docs] def get_parameters_df(self, feature_names=None): params = self.get_parameters() n_features = params["means"].shape[1] params["covariances"] = np.tile(params["covariances"], (n_features, 1)).T return self._to_df( param_dict=params, keys=["means", "covariances"], feature_names=feature_names, )
[docs]class GaussianDiag(Gaussian): def __init__(self, **kwargs): # Make sure no other covariance_type is specified kwargs.pop("covariance_type", None) super().__init__(covariance_type="diag", **kwargs) self.model_str = "gaussian_diag"
[docs] def get_parameters_df(self, feature_names=None): params = self.get_parameters() return self._to_df( param_dict=params, keys=["means", "covariances"], feature_names=feature_names, )
[docs]class GaussianTied(Gaussian): def __init__(self, **kwargs): # Make sure no other covariance_type is specified kwargs.pop("covariance_type", None) super().__init__(covariance_type="tied", **kwargs) self.model_str = "gaussian_tied"
[docs] def print_parameters( self, indent=1, feature_names=None, index=["class_no", "param"], columns=["model_name", "variable"], ): """Flipping class_no and variable is nicer for full covariances.""" super().print_parameters( indent=indent, feature_names=feature_names, index=index, columns=columns )
[docs] def get_parameters_df(self, feature_names=None): """Return self.parameters into a long dataframe. Call self._to_df or implement custom method.""" if feature_names is None: n_features = self.parameters["means"].shape[1] feature_names = self.get_default_feature_names(n_features) means = self._to_df( param_dict=self.parameters, keys=["means"], feature_names=feature_names ) cov = np.tile(self.parameters["covariances"], (self.n_components, 1, 1)) cov = cov_np_to_df(cov, feature_names, self.model_str) return pd.concat([means, cov])
[docs]class GaussianNan(Emission): """Gaussian emission model supporting missing values (Full Information Maximum Likelihood) This class assumes a diagonal covariance structure. The covariances are therefore represented as a (n_components, n_features) array.""" def __init__(self, debug_likelihood=False, **kwargs): super().__init__(**kwargs) self.debug_likelihood = debug_likelihood
[docs] def m_step(self, X, resp): is_observed = ~np.isnan(X) # Replace all nans with 0 resp_sums = list() # Compute normalization factor over observed values for each feature for i in range(X.shape[1]): resp_i = resp[is_observed[:, i]] resp_sums.append(resp_i.sum(axis=0)) self.parameters["means"] = self._compute_means(X, resp, resp_sums) self.parameters["covariances"] = self._compute_cov(X, resp, resp_sums)
def _compute_means(self, X, resp, resp_sums): X = np.nan_to_num(X) means = (resp[..., np.newaxis] * X[:, np.newaxis, :]).sum(axis=0) # Normalize for i in range(means.shape[1]): means[:, i] /= resp_sums[i] return means def _compute_cov(self, X, resp, resp_sums): raise NotImplementedError("No covariance estimator is implemented.")
[docs] def log_likelihood(self, X): if not self.debug_likelihood: return self._log_likelihood(X) else: # Compute the log likelihood naively. Useful for debugging, but slow return self._debug_ll(X)
def _log_likelihood(self, X): # To be tested, but this should work for any diagonal covariance is_observed = ~np.isnan(X) n, D = X.shape log_eps = np.zeros((n, self.n_components)) for c in range(self.n_components): diff = X - self.parameters["means"][c].reshape(1, -1) # Zero out the nans diff = np.nan_to_num(diff) # First compute the likelihood from the term (x-\mu)^T \Sigma^-1 (x-\mu) precision_c = 1 / self.parameters["covariances"][c] ll_diff = ((diff**2) * precision_c.reshape(1, -1)).sum(axis=1) # Then compute the likelihood from the term log det(2*\pi*\Sigma) pi_cov_c = 2 * np.pi * np.tile(self.parameters["covariances"][c], (n, 1)) # Replace nan dimensions with 1 since this won't affect the product of the diagonal (determinant) pi_cov_c = np.where(is_observed, pi_cov_c, 1) log_dets = np.log(pi_cov_c).sum(axis=1) log_eps[:, c] = -0.5 * (log_dets + ll_diff) return log_eps def _debug_ll(self, X): # Naive loopy way to compute the log likelihood # Useful for debugging, otherwise self._log_likelihood should be preferred is_observed = ~np.isnan(X) n, D = X.shape log_eps = np.zeros((n, self.n_components)) for i in range(n): # Only select observed dimensions mask = is_observed[i] if mask.any(): for c in range(self.n_components): x_i = X[i, mask] mean_c = self.parameters["means"][c, mask] cov_c = self.parameters["covariances"][c, mask] log_eps[i, c] = multivariate_normal.logpdf( x=x_i, mean=mean_c, cov=cov_c ) else: # Undefined log likelihood # We use 0, as it won't affect the overall likelihood when summing over independent models log_eps[i, :] = 0 return log_eps
[docs] def sample(self, class_no, n_samples): D = self.parameters["means"].shape[1] X = self.random_state.normal( loc=self.parameters["means"][class_no], scale=self.parameters["covariances"][class_no], size=(n_samples, D), ) return X
[docs]class GaussianUnitNan(GaussianNan): """Gaussian emission model with unit covariance supporting missing values (Full Information Maximum Likelihood)""" def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "gaussian_unit_nan" def _compute_cov(self, X, resp, resp_sums): """No estimate. Simply return diagonal covariance 1 for all features.""" return np.ones_like(self.parameters["means"]) @property def n_parameters(self): return self.parameters["means"].shape[0] * self.parameters["means"].shape[1]
[docs]class GaussianSphericalNan(GaussianNan): """Gaussian emission model with spherical covariance supporting missing values (Full Information Maximum Likelihood)""" def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "gaussian_spherical_nan" def _compute_cov(self, X, resp, resp_sums): """One covariance parameter per component.""" covs = list() is_observed = ~np.isnan(X) is_observed_row = is_observed.sum(axis=1) for c in range(self.n_components): diff = X - self.parameters["means"][c].reshape(1, -1) diff = np.nan_to_num(diff) # Zero out the nans cov_c = resp[:, c][..., np.newaxis] * (diff**2) z = resp[:, c] @ is_observed_row covs.append(cov_c.sum() / z) covs = np.array(covs) # Keep a diagonal format for compatibility with parent class result = np.ones_like(self.parameters["means"]) * covs.reshape((-1, 1)) return result @property def n_parameters(self): mean_params = ( self.parameters["means"].shape[0] * self.parameters["means"].shape[1] ) # Covariance are n latent class x n features, but we only have one degree of freedom per class cov_params = self.parameters["covariances"].shape[0] return mean_params + cov_params
[docs]class GaussianDiagNan(GaussianNan): """Gaussian emission model with diagonal covariance supporting missing values (Full Information Maximum Likelihood)""" def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "gaussian_diag_nan" def _compute_cov(self, X, resp, resp_sums): """One covariance parameter per column.""" covs = list() for c in range(self.n_components): diff = X - self.parameters["means"][c].reshape(1, -1) diff = np.nan_to_num(diff) # Zero out the nans cov_c = resp[:, c][..., np.newaxis] * (diff**2) covs.append(cov_c.sum(axis=0)) covs = np.vstack(covs) # Normalize for i in range(covs.shape[1]): covs[:, i] /= resp_sums[i] return covs @property def n_parameters(self): mean_params = ( self.parameters["means"].shape[0] * self.parameters["means"].shape[1] ) cov_params = ( self.parameters["covariances"].shape[0] * self.parameters["covariances"].shape[1] ) return mean_params + cov_params