Source code for stepmix.emission.categorical

"""Categorical emission models."""
import numpy as np
import pandas as pd

from stepmix.emission.emission import Emission
from stepmix.utils import max_one_hot


[docs]class Bernoulli(Emission): """Bernoulli (binary) emission model.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "binary"
[docs] def m_step(self, X, resp): pis = X.T @ resp pis /= resp.sum(axis=0, keepdims=True) pis = np.clip(pis, 1e-15, 1 - 1e-15) # avoid probabilities 0 or 1 self.parameters["pis"] = pis.T
[docs] def log_likelihood(self, X): # compute log emission probabilities pis = np.clip( self.parameters["pis"].T, 1e-15, 1 - 1e-15 ) # avoid probabilities 0 or 1 log_eps = X @ np.log(pis) + (1 - X) @ np.log(1 - pis) return log_eps
[docs] def predict_proba(self, log_resp): resp = np.exp(log_resp) probs = resp @ self.parameters["pis"] # Expand to explicitly model P(0) and P(1) probs = np.repeat(probs, 2, axis=1) probs[:, ::2] = 1 - probs[:, ::2] return probs
[docs] def predict(self, log_resp): probs = self.predict_proba(log_resp) n_features = self.parameters["pis"].shape[1] n_samples = log_resp.shape[0] probs = probs.reshape((n_samples, n_features, 2)) preds = probs.argmax(axis=2) if preds.shape[1] == 1: preds = preds.flatten() return preds
[docs] def sample(self, class_no, n_samples): feature_weights = self.parameters["pis"][class_no, :].reshape((1, -1)) K = feature_weights.shape[1] # number of features X = (self.random_state.uniform(size=(n_samples, K)) < feature_weights).astype( int ) return X
@property def n_parameters(self): return self.parameters["pis"].shape[0] * self.parameters["pis"].shape[1]
[docs]class BernoulliNan(Bernoulli): """Bernoulli (binary) emission model supporting missing values (Full Information Maximum Likelihood).""" def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "binary_nan"
[docs] def m_step(self, X, resp): is_observed = ~np.isnan(X) # Replace all nans with 0 X = np.nan_to_num(X, nan=0) pis = X.T @ resp # Compute normalization factor over observed values for each feature for i in range(pis.shape[0]): resp_i = resp[is_observed[:, i]] pis[i] /= resp_i.sum(axis=0) self.parameters["pis"] = pis.T
[docs] def log_likelihood(self, X): is_observed = ~np.isnan(X) # Replace all nans with 0 X = np.nan_to_num(X, nan=0) # compute log emission probabilities pis = np.clip( self.parameters["pis"].T, 1e-15, 1 - 1e-15 ) # avoid probabilities 0 or 1 log_eps = X @ np.log(pis) + ((1 - X) * is_observed) @ np.log(1 - pis) return log_eps
[docs]class Multinoulli(Emission): """Multinoulli (categorical) emission model Uses one-hot encoded features. Expected data formatting: X[n,k*L+l]=1 if l is the observed outcome for the kth attribute of data point n, where n is the number of observations, K=n_features, L=max_n_outcomes for each multinoulli where max_n_outcomes represents the maximum number of outcomes for a given feature. If integer_codes is set to True, the model will expect integer-encoded categories and will one-hot encode the data itself. In this case, max_n_outcomes and total_outcomes are inferred by the model. Parameters ---------- n_components : int, default=2 The number of latent classes. random_state : int, RandomState instance or None, default=None Controls the random seed given to the method chosen to initialize the parameters. Pass an int for reproducible output across multiple function calls. integer_codes : bool, default=True Input X should be integer-encoded zero-indexed categories. max_n_outcomes : int, default=None Maximum number of outcomes for a single categorical feature. Each column in the input will have max_n_outcomes associated columns in the one-hot encoding. If None and integer_codes=True, will be inferred from the data. total_outcomes : int, default=None Total outcomes over all features. E.g., if we provide a categorical variable with two outcomes and another with 4 outcomes, total_outcomes = 6. If None and integer_codes=True, will be inferred from the data. Attributes ---------- pis[k*L+l,c]=P[ X[n,k*L+l]=1 | n belongs to class c] """ def __init__( self, n_components=2, random_state=None, integer_codes=True, max_n_outcomes=None, total_outcomes=None, ): super().__init__(n_components=n_components, random_state=random_state) self.integer_codes = integer_codes self.parameters["max_n_outcomes"] = max_n_outcomes self.parameters["total_outcomes"] = total_outcomes self.model_str = "categorical" if max_n_outcomes is None and not integer_codes: raise ValueError( "max_n_outcomes can only be set to None with integer_codes=True." ) if total_outcomes is None and not integer_codes: raise ValueError( "total_outcomes can only be set to None with integer_codes=True." )
[docs] def get_n_features(self): n_features_x_max_n_outcomes = self.parameters["pis"].shape[1] n_features = int( n_features_x_max_n_outcomes / self.parameters["max_n_outcomes"] ) return n_features
[docs] def encode_features(self, X): if self.integer_codes: # Self.n_outcomes will only be updated if it is still None ( X, self.parameters["max_n_outcomes"], self.parameters["total_outcomes"], ) = max_one_hot( X, max_n_outcomes=self.parameters["max_n_outcomes"], total_outcomes=self.parameters["total_outcomes"], ) return X
[docs] def m_step(self, X, resp): X = self.encode_features(X) pis = X.T @ resp pis /= resp.sum(axis=0, keepdims=True) pis = np.clip(pis, 1e-15, 1 - 1e-15) # avoid probabilities 0 or 1 self.parameters["pis"] = pis.T
[docs] def log_likelihood(self, X): X = self.encode_features(X) # compute log emission probabilities pis = np.clip(self.parameters["pis"].T, 1e-15, 1 - 1e-15) log_eps = X @ np.log(pis) return log_eps
[docs] def predict_proba(self, log_resp): n_samples, n_features, n_outcomes = ( log_resp.shape[0], self.get_n_features(), self.parameters["max_n_outcomes"], ) resp = np.exp(log_resp) pis = self.parameters["pis"].reshape( (self.n_components, n_features, n_outcomes) ) probs = np.einsum("nk,kfo->nfo", resp, pis) probs = probs.reshape((n_samples, n_features * n_outcomes)) return probs
[docs] def predict(self, log_resp): n_samples, n_features, n_outcomes = ( log_resp.shape[0], self.get_n_features(), self.parameters["max_n_outcomes"], ) probs = self.predict_proba(log_resp) probs = probs.reshape((n_samples, n_features, n_outcomes)) preds = probs.argmax(axis=2) if preds.shape[1] == 1: preds = preds.flatten() return preds
[docs] def sample(self, class_no, n_samples): pis = self.parameters["pis"].T n_features = self.get_n_features() feature_weights = pis[:, class_no].reshape( n_features, self.parameters["max_n_outcomes"] ) X = np.array( [ self.random_state.multinomial(1, feature_weights[k], size=n_samples) for k in range(n_features) ] ) X = np.argmax(X, axis=2) # Convert to integers return X.T
@property def n_parameters(self): n_classes = self.parameters["pis"].shape[0] # Only n_outcomes - 1 free parameters per feature since probabilities sum to 1 n_free_parameters_per_class = ( self.parameters["total_outcomes"] - self.get_n_features() ) return n_classes * n_free_parameters_per_class
[docs] def get_parameters_df(self, feature_names=None): """Expand the feature names since each feature may have up to max_n_outcomes outcomes.""" if feature_names is None: n_features = self.get_n_features() feature_names = self.get_default_feature_names(n_features) # Expand features to account for outcomes feature_names_ex = list() for name in feature_names: feature_names_ex += [ f"{name}_{i}" for i in range(self.parameters["max_n_outcomes"]) ] df = self._to_df( param_dict=self.parameters, keys=["pis"], feature_names=feature_names_ex ) # Drop columns where probabilities are all <= 1e-15 df_p = pd.pivot_table( df, index=["param", "class_no"], columns=["variable"], values="value" ) keep_vars = df_p.loc[:, (df_p > 1e-15).any(axis=0)].columns # Only keep rows in df that are in keep_vars df = df[df.variable.isin(keep_vars)] # We should have total_outcomes total variables # What happpends if user sends a categorical with 0, 2 for example? (Skipping 1) # assert(df.variable.nunique() == self.parameters["total_outcomes"]) return df
[docs]class MultinoulliNan(Multinoulli): """Multinoulli (categorical) emission model supporting missing values (Full Information Maximum Likelihood).""" def __init__(self, **kwargs): super().__init__(**kwargs) self.model_str = "categorical_nan"
[docs] def m_step(self, X, resp): X = self.encode_features(X) is_observed = ~np.isnan(X) # Replace all nans with 0 X = np.nan_to_num(X, nan=0) pis = X.T @ resp # Compute normalization factor over observed values for each feature for i in range(pis.shape[0]): resp_i = resp[is_observed[:, i]] pis[i] /= resp_i.sum(axis=0) self.parameters["pis"] = pis.T
[docs] def log_likelihood(self, X): X = self.encode_features(X) is_observed = ~np.isnan(X) # Replace all nans with 0 X = np.nan_to_num(X, nan=0) # compute log emission probabilities pis = np.clip(self.parameters["pis"].T, 1e-15, 1 - 1e-15) log_eps = X @ np.log(pis) return log_eps