"""EM for stepwise estimation of latent class models with structural variables.
Please note the class weights rho are now referred to as 'weights' and the assignments tau are known as
'responsibilities' or resp to match the sklearn stack terminology.
"""
# Author: Sacha Morin <morin.sacha@gmail.com>
# Author: Robin Legault <robin.legault@umontreal.ca>
# License:
import warnings
import copy
import pandas as pd
import numpy as np
from scipy.special import logsumexp
from sklearn.mixture._base import BaseEstimator
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils.validation import (
check_random_state,
check_is_fitted,
_check_sample_weight,
)
from sklearn.cluster import KMeans
import tqdm
from . import utils
from .corrections import compute_bch_matrix, compute_log_emission_pm
from .emission.build_emission import EMISSION_DICT, build_emission
from .bootstrap import bootstrap
[docs]class StepMix(BaseEstimator):
"""StepMix estimator for Latent Class Analysis.
Multi-step EM estimation of latent class models with measurement and structural models. The measurement and
structural models can be fit together (1-step) or sequentially (2-step and 3-step). This estimator implements
the BCH and ML bias correction methods for 3-step estimation.
The measurement and structural models can be any of those defined in stepmix.emission. The measurement model
can be used alone to effectively fit a latent mixture model.
This class was adapted from the scikit-learn BaseMixture and GaussianMixture classes.
.. versionadded:: 0.00
Parameters
----------
n_components : int, default=2
The number of latent classes.
n_steps : {1, 2, 3}, default=1
Number of steps in the estimation. Must be one of :
- 1: run EM on both the measurement and structural models.
- 2: first run EM on the measurement model, then on the complete model, but keep the measurement parameters\
fixed for the second step. See *Bakk, 2018*.
- 3: first run EM on the measurement model, assign class probabilities, then fit the structural model via\
maximum likelihood. See the correction parameter for bias correction.
measurement : {'bernoulli', 'bernoulli_nan', 'binary', 'binary_nan', 'categorical', 'categorical_nan', 'continuous', \
'continuous_nan', 'covariate',\
'gaussian', 'gaussian_nan', 'gaussian_unit', 'gaussian_unit_nan', 'gaussian_spherical', 'gaussian_spherical_nan',\
'gaussian_tied', 'gaussian_diag', 'gaussian_diag_nan', 'gaussian_full', 'multinoulli', 'multinoulli_nan', dict},\
default='bernoulli'
String describing the measurement model. Must be one of:
- 'bernoulli': the observed data consists of n_features bernoulli (binary) random variables.
- 'bernoulli_nan': the observed data consists of n_features bernoulli (binary) random variables. Supports missing values.
- 'binary': alias for bernoulli.
- 'binary_nan': alias for bernoulli_nan.
- 'categorical': alias for multinoulli.
- 'categorical_nan': alias for multinoulli_nan.
- 'continuous': alias for gaussian_diag.
- 'continuous_nan': alias for gaussian_diag_nan. Supports missing values.
- 'covariate': covariate model where class probabilities are a multinomial logistic model of the features.
- 'gaussian': alias for gaussian_unit.
- 'gaussian_nan': alias for gaussian_unit. Supports missing values.
- 'gaussian_unit': each gaussian component has unit variance. Only fit the mean.
- 'gaussian_unit_nan': each gaussian component has unit variance. Only fit the mean. Supports missing values.
- 'gaussian_spherical': each gaussian component has its own single variance.
- 'gaussian_spherical_nan': each gaussian component has its own single variance. Supports missing values.
- 'gaussian_tied': all gaussian components share the same general covariance matrix.
- 'gaussian_diag': each gaussian component has its own diagonal covariance matrix.
- 'gaussian_diag_nan': each gaussian component has its own diagonal covariance matrix. Supports missing values.
- 'gaussian_full': each gaussian component has its own general covariance matrix.
- 'multinoulli': the observed data consists of n_features multinoulli (categorical) random variables.
- 'multinoulli_nan': the observed data consists of n_features multinoulli (categorical) random variables. Supports missing values.
Models suffixed with ``_nan`` support missing values, but may be slower than their fully observed counterpart.
Alternatively accepts a dict to define a nested model, e.g., 3 gaussian features and 2 binary features. Please
refer to :class:`stepmix.emission.nested.Nested` for details.
structural : {'bernoulli', 'bernoulli_nan', 'binary', 'binary_nan', 'categorical', 'categorical_nan', 'continuous', \
'continuous_nan', 'covariate',\
'gaussian', 'gaussian_nan', 'gaussian_unit', 'gaussian_unit_nan', 'gaussian_spherical', 'gaussian_spherical_nan',\
'gaussian_tied', 'gaussian_diag', 'gaussian_diag_nan', 'gaussian_full', 'multinoulli', 'multinoulli_nan', dict},\
default='bernoulli'
String describing the structural model. Same options as those for the measurement model.
assignment : {'soft', 'modal'}, default='modal'
Class assignments for 3-step estimation.
Must be one of:
- 'soft': keep class responsibilities (posterior probabilities) as is.
- 'modal': assign 1 to the class with max probability, 0 otherwise (one-hot encoding).
correction : {None, 'BCH', 'ML'}, default=None
Bias correction for 3-step estimation.
Must be one of:
- None : No correction. Run Naive 3-step.
- 'BCH' : Apply the empirical BCH correction from *Vermunt, 2004*.
- 'ML' : Apply the ML correction from *Vermunt, 2010; Bakk et al., 2013*.
abs_tol : float, default=1e-10
The convergence threshold. EM iterations will stop when the
lower bound average gain is below this threshold.
rel_tol : float, default=0.00
The convergence threshold. EM iterations will stop when the
relative lower bound average gain is below this threshold.
max_iter : int, default=1000
The number of EM iterations to perform.
n_init : int, default=1
The number of initializations to perform. The best results are kept.
save_param_init : bool, default=False
Save the estimated parameters of all initializations to self.param_buffer\_.
init_params : {'kmeans', 'random'}, default='random'
The method used to initialize the weights, the means and the
precisions.
Must be one of:
- 'kmeans' : responsibilities are initialized using kmeans.
- 'random' : responsibilities are initialized randomly.
random_state : int, RandomState instance or None, default=None
Controls the random seed given to the method chosen to initialize the
parameters. Pass an int for reproducible output across multiple function calls.
verbose : int, default=0
Enable verbose output. If 1, will print detailed report of the model and the performance metrics after fitting.
progress_bar : int, default=1
Display a tqdm progress bar during fitting.
- 0 : No progress bar.
- 1 : Progress bar for initializations.
- 2 : Progress bars for initializations and iterations. This requires a nested tqdm bar and may not work\
properly in some terminals.
measurement_params: {dict, None}, default=None
Additional params passed to the measurement model class. Particularly useful to specify optimization parameters
for :class:`stepmix.emission.covariate.Covariate`. Ignored if the measurement descriptor is a nested object
(see :class:`stepmix.emission.nested.Nested`).
structural_params: {dict, None}, default=None
Additional params passed to the structural model class. Particularly useful to specify optimization parameters
for :class:`stepmix.emission.covariate.Covariate`. Ignored if the structural descriptor is a nested object
(see :class:`stepmix.emission.nested.Nested`).
Attributes
----------
weights_ : ndarray of shape (n_components,)
The weights of each mixture components.
_mm : stepmix.emission.Emission
Measurement model, including parameters and estimation methods.
_sm : stepmix.emission.Emission
Structural model, including parameters and estimation methods.
log_resp_ : ndarray of shape (n_samples, n_components)
Initial log responsibilities.
measurement_in_: int
Number of features in the measurement model.
structural_in_: int
Number of features in the structural model.
converged_ : bool
True when convergence was reached in fit(), False otherwise.
n_iter_ : int
Number of step used by the best fit of EM to reach the convergence.
lower_bound_ : float
Lower bound value on the log-likelihood (of the training data with
respect to the model) of the best fit of EM.
lower_bound_buffer_ : float
Lower bound values on the log-likelihood (of the training data with
respect to the model) of all EM initializations.
param_buffer_ : list
Final parameters of all initializations. Only updated if save_param_init=True.
x_names_ : list
If input is a DataFrame, column names of X.
y_names_: list
If input is a DataFrame, column names of Y.
Notes
-----
References
----------
Bolck, A., Croon, M., and Hagenaars, J. Estimating latent structure models with categorical variables: One-step
versus three-step estimators. Political analysis, 12(1): 3–27, 2004.
Vermunt, J. K. Latent class modeling with covariates: Two improved three-step approaches. Political analysis,
18 (4):450–469, 2010.
Bakk, Z., Tekle, F. B., and Vermunt, J. K. Estimating the association between latent class membership and external
variables using bias-adjusted three-step approaches. Sociological Methodology, 43(1):272–311, 2013.
Bakk, Z. and Kuha, J. Two-step estimation of models between latent classes and external variables. Psychometrika,
83(4):871–892, 2018
Examples
--------
.. code-block:: python
from stepmix.datasets import data_bakk_response
from stepmix.stepmix import StepMix
# Soft 3-step
X, Y, _ = data_bakk_response(n_samples=1000, sep_level=.7, random_state=42)
model = StepMix(n_components=3, n_steps=3, measurement='bernoulli', structural='gaussian_unit', random_state=42, assignment='soft')
model.fit(X, Y)
model.score(X, Y) # Average log-likelihood
# Equivalently, each step can be performed individually. See the code of the fit method for details.
model = StepMix(n_components=3, measurement='bernoulli', structural='gaussian_unit', random_state=42)
model.em(X) # Step 1
probs = model.predict_proba(X) # Step 2
model.m_step_structural(probs, Y) # Step 3
model.score(X, Y) # Average log-likelihood
"""
def __init__(
self,
n_components=2,
*,
n_steps=1,
measurement="bernoulli",
structural="gaussian_unit",
assignment="modal",
correction=None,
abs_tol=1e-10,
rel_tol=0.00,
max_iter=1000,
n_init=1,
save_param_init=False,
init_params="random",
random_state=None,
verbose=0,
progress_bar=1,
measurement_params=None,
structural_params=None,
):
# Attributes of the base StepMix class
self.n_components = n_components
self.abs_tol = abs_tol
self.rel_tol = rel_tol
self.max_iter = max_iter
self.n_init = n_init
self.save_param_init = save_param_init
self.init_params = init_params
self.random_state = random_state
self.verbose = verbose
self.progress_bar = progress_bar
self.n_steps = n_steps
# Additional attributes for 3-step estimation
self.assignment = assignment
self.correction = correction
# Additional attributes to specify the measurement and structural models
self.measurement = measurement
self.measurement_params = measurement_params
self.structural = structural
self.structural_params = structural_params
########################################################################################################################
# INPUT VALIDATION, INITIALIZATIONS AND PARAMETER MANAGEMENT
def _check_initial_parameters(self, X):
"""Validate class attributes.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Raises
------
ValueError : unacceptable choice of parameters
"""
utils.check_type(
int,
n_components=self.n_components,
max_iter=self.max_iter,
n_init=self.n_init,
verbose=self.verbose,
)
utils.check_positive(
n_components=self.n_components,
max_iter=self.max_iter,
n_init=self.n_init,
)
utils.check_nonneg(
abs_tol=self.abs_tol, rel_tol=self.rel_tol, verbose=self.verbose
)
utils.check_in([1, 2, 3], n_steps=self.n_steps)
utils.check_in([0, 1, 2], progress_bar=self.progress_bar)
utils.check_in(["kmeans", "random"], init_params=self.init_params)
utils.check_in(["modal", "soft"], assignment=self.assignment)
utils.check_in([None, "BCH", "ML"], correction=self.correction)
utils.check_in([False, True], save_param_init=self.save_param_init)
utils.check_descriptor(self.measurement, keys=EMISSION_DICT.keys())
utils.check_descriptor(self.structural, keys=EMISSION_DICT.keys())
utils.check_type(
(dict, type(None)),
measurement_params=self.measurement_params,
structural_params=self.structural_params,
)
# Check if models support missing values
# This is passed to sklearn.utils.check_array
self._force_all_finite_mm = (
"allow-nan" if utils.check_descriptor_nan(self.measurement) else True
)
self._force_all_finite_sm = (
"allow-nan" if utils.check_descriptor_nan(self.structural) else True
)
# Buffer to save the likelihoods of different inits for debugging
self.lower_bound_buffer_ = list()
# Buffer to save params of different inits
self.param_buffer_ = list()
# Covariate models have special constraints. Check them.
self._is_covariate = utils.check_covariate(self.measurement, self.structural)
# Covariate models use a different conditional likelihood (See Bakk and Kuha, 2018), which should
# not include the marginal likelihood over the latent classes in the E-step
self._conditional_likelihood = self._is_covariate
def _initialize_parameters(self, X, random_state):
"""Initialize the weights and measurement model parameters.
We do not initialize the structural model here, since the StepMix class can be used without one.
Parameters
----------
X : array-like of shape (n_samples, n_features)
random_state : RandomState
A random number generator instance that controls the random seed
used for the method chosen to initialize the parameters.
Raises
------
ValueError : illegal self.init_params parameter
"""
n_samples, _ = X.shape
# Initialize responsibilities
if self.init_params == "kmeans":
resp = np.zeros((n_samples, self.n_components))
label = (
KMeans(
n_clusters=self.n_components, n_init=1, random_state=random_state
)
.fit(X)
.labels_
)
resp[np.arange(n_samples), label] = 1
elif self.init_params == "random":
resp = random_state.uniform(size=(n_samples, self.n_components))
resp /= resp.sum(axis=1)[:, np.newaxis]
else:
raise ValueError(f"Unimplemented initialization method {self.init_params}.")
# Save log responsibilities
self.log_resp_ = np.log(np.clip(resp, 1e-15, 1 - 1e-15))
# Uniform class weights initialization
self.weights_ = np.ones((self.n_components,)) / self.n_components
# Initialize measurement model
self._initialize_parameters_measurement(X, random_state)
def _initialize_parameters_measurement(
self, X, random_state=None, init_emission=True
):
"""Initialize parameters of measurement model.
Parameters
----------
X : array-like of shape (n_samples, n_features)
"""
# Initialize measurement model
if not hasattr(self, "_mm"):
mm_params = (
self.measurement_params
if self.measurement_params is not None
else dict()
)
self._mm = build_emission(
self.measurement,
n_components=self.n_components,
random_state=random_state,
**mm_params,
)
if init_emission:
self._mm.initialize(X, np.exp(self.log_resp_), random_state)
def _initialize_parameters_structural(
self, Y, random_state=None, init_emission=True
):
"""Initialize parameters of structural model.
Parameters
----------
Y : array-like of shape (n_samples, n_features_structural)
Structural data.
"""
# Initialize structural model
if not hasattr(self, "_sm"):
sm_params = (
self.structural_params if self.structural_params is not None else dict()
)
self._sm = build_emission(
self.structural,
n_components=self.n_components,
random_state=random_state,
**sm_params,
)
if init_emission:
self._sm.initialize(Y, np.exp(self.log_resp_), random_state)
@property
def n_parameters(self):
"""Get number of free parameters."""
check_is_fitted(self)
# Only include class weights dof if they are used for likelihood computations
# Class weights are not used in the conditional perspective (e.g., covariate models)
n = 0.0 if self._conditional_likelihood else (self.n_components - 1)
# Measurement parameters
n += self._mm.n_parameters
# Optional structural parameters
if hasattr(self, "_sm"):
n += self._sm.n_parameters
return n
def _check_x_y(self, X=None, Y=None, reset=False):
"""Input validation function.
Set reset=True to memorize input sizes for future validation.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
reset : bool, default=False
Reset input sizes for future validation.
Returns
-------
X : ndarray of shape (n_samples, n_features) or None
Validated measurement data or None if not provided.
Y : ndarray of shape (n_samples, n_features_structural) or None
Validated structural data or None if not provided.
"""
# We use reset True since we take care of dimensions in this class (and not in the parent)
if X is not None:
X_names = utils.extract_column_names(X)
X = self._validate_data(
X,
dtype=[np.float64, np.float32],
reset=True,
force_all_finite=self._force_all_finite_mm,
)
if Y is not None:
# Handle 1D Y array
Y_names = utils.extract_column_names(Y)
Y = self._validate_data(
Y,
dtype=[np.float64, np.float32],
reset=True,
ensure_2d=False,
force_all_finite=self._force_all_finite_sm,
)
# Force a matrix format
if Y.ndim == 1:
Y = Y.reshape((-1, 1))
if reset:
if X is not None:
self.measurement_in_ = X.shape[1]
self.n_features_in_ = self.measurement_in_ # Keep sklearn happy
self.x_names_ = X_names
if Y is not None:
self.structural_in_ = Y.shape[1]
self.y_names_ = Y_names
else:
if X is not None and X.shape[1] != self.measurement_in_:
raise ValueError(
f"X has {X.shape[1]} features, but StepMix is expecting {self.measurement_in_} measurement"
f" features as input."
)
if (
Y is not None
and hasattr(self, "structural_in_")
and Y.shape[1] != self.structural_in_
):
raise ValueError(
f"Y has {Y.shape[1]} features, but StepMix is expecting {self.structural_in_} structural "
f"features as input."
)
return X, Y
[docs] def get_parameters(self):
"""Get model parameters as a Python dictionary.
Returns
-------
params: dict,
Nested dict {'weights': Current class weights,\
'measurement': dict of measurement params,\
'structural': dict of structural params,\
'measurement_in': number of measurements,\
'structural_in': number of structural features,\
}.
"""
check_is_fitted(self)
params = dict(
weights=self.weights_,
measurement=self._mm.get_parameters(),
measurement_in=self.measurement_in_,
)
if hasattr(self, "_sm"):
params["structural"] = self._sm.get_parameters()
params["structural_in"] = self.structural_in_
return params
[docs] def get_parameters_df(self, x_names=None, y_names=None):
"""Get model parameters as a long-form DataFrame.
Parameters
----------
x_names : List of str
Column names of X.
y_names : List of str
Column names of Y.
Returns
-------
params: pd.DataFrame
"""
check_is_fitted(self)
# Dataframe of the measurement model
if x_names is None:
x_names = self.x_names_
df_mm = self._mm.get_parameters_df(x_names)
df_mm["model"] = "measurement"
list_df = [df_mm]
if not self._conditional_likelihood:
# No class weights if we are using the conditional perspective
# Create a dataframe for class weights
df_class = list()
for class_no, w in enumerate(self.weights_):
df_class.append(
dict(
model="measurement",
model_name="class_weights",
param="class_weights",
class_no=class_no,
variable="nan",
value=w,
)
)
df_class = pd.DataFrame.from_records(df_class)
list_df.append(df_class)
if hasattr(self, "_sm"):
if y_names is None:
y_names = self.y_names_
df_sm = self._sm.get_parameters_df(y_names)
df_sm["model"] = "structural"
list_df.append(df_sm)
df = pd.concat(list_df)
return df.set_index(["model", "model_name", "param", "class_no", "variable"])
[docs] def get_cw_df(self, x_names=None, y_names=None):
"""Get class weights as DataFrame with classes as columns.
Parameters
----------
x_names : List of str
Column names of X.
y_names : List of str
Column names of Y.
Returns
-------
params: pd.DataFrame
"""
if self._conditional_likelihood:
raise ValueError(
"Class weights are not defined when using the conditional likelihood perspective (covariates)."
)
df = self.get_parameters_df(x_names, y_names).loc[
"measurement", "class_weights"
]
return self._pivot_cw(df, aggfunc=np.mean) # Mean of one value is okay
[docs] def get_mm_df(self, x_names=None, y_names=None):
"""Get measurement model parameters as DataFrame with classes as columns.
Parameters
----------
x_names : List of str
Column names of X.
y_names : List of str
Column names of Y.
Returns
-------
params: pd.DataFrame
"""
df = (
self.get_parameters_df(x_names, y_names)
.loc["measurement"]
.drop("class_weights", level=0, errors="ignore")
)
return self._pivot_param(df, aggfunc=np.mean) # Mean of one value is okay
[docs] def get_sm_df(self, x_names=None, y_names=None):
"""Get structural model parameters as DataFrame with classes as columns.
Parameters
----------
x_names : List of str
Column names of X.
y_names : List of str
Column names of Y.
Returns
-------
params: pd.DataFrame
"""
if not hasattr(self, "_sm"):
raise ValueError("No structural model fitted.")
df = self.get_parameters_df(x_names, y_names).loc["structural"]
return self._pivot_param(df, aggfunc=np.mean) # Mean of one value if okay
[docs] def set_parameters(self, params):
"""Set parameters.
Parameters
----------
params: dict,
Same format as self.get_parameters().
"""
self.weights_ = params["weights"]
random_state = check_random_state(self.random_state)
if not hasattr(self, "_mm"):
self._initialize_parameters_measurement(
None, random_state=random_state, init_emission=False
)
self._mm.set_parameters(params["measurement"])
self.measurement_in_ = params["measurement_in"]
if "structural" in params.keys():
if not hasattr(self, "_sm"):
# Init model without random initializations (we will provide one)
self._initialize_parameters_structural(
None, random_state=random_state, init_emission=False
)
self._sm.set_parameters(params["structural"])
self.structural_in_ = params["structural_in"]
[docs] def permute_classes(self, perm):
"""Permute the latent class and associated parameters of this estimator.
Effectively remaps latent classes.
Parameters
----------
perm : ndarray of shape (n_classes,)
Integer array representing the target permutation. Should be a permutation of np.arange(n_classes).
"""
check_is_fitted(self)
self.weights_ = self.weights_[perm]
# Permute classes of measurement model
self._mm.permute_classes(perm)
# Permute classes of structural model (if any)
if hasattr(self, "_sm"):
self._sm.permute_classes(perm)
#######################################################################################################################
# ESTIMATION AND EM METHODS
[docs] def fit(self, X, Y=None, sample_weight=None, y=None):
"""Fit StepMix measurement model and optionally the structural model.
Setting Y=None will fit the measurement model only. Providing both X and Y will fit the full model following
the self.n_steps argument.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Y : array-like of shape (n_samples, n_features_structural), default=None
List of n_features-dimensional data points to fit the structural model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
y: array-like of shape (n_samples, n_features), default=None
Alias for Y. Ignored if Y is provided.
sample_weight : array-like of shape(n_samples,), default=None
Array of weights that are assigned to individual samples.
If not provided, then each sample is given unit weight.
"""
if y is not None and Y is None:
Y = y
if Y is None:
# No structural data. Simply fit the measurement data
self.em(X, sample_weight=sample_weight)
elif self.n_steps == 1:
# One-step estimation
# 1) Maximum likelihood with both measurement and structural models
self.em(X, Y, sample_weight=sample_weight)
elif self.n_steps == 2:
# Two-step estimation
# 1) Fit the measurement model
self.em(X, sample_weight=sample_weight)
# 2) Fit the structural model by keeping the parameters of the measurement model fixed
self.em(X, Y, sample_weight=sample_weight, freeze_measurement=True)
elif self.n_steps == 3 and self.correction is None:
# Three-step estimation
# 1) Fit the measurement model
self.em(X, sample_weight=sample_weight)
# 2) Assign class probabilities
soft_resp = self.predict_proba_class(X)
# Modal assignment (clipped for numerical reasons)
# Else we simply keep the assignment as is (soft)
resp = (
utils.modal(soft_resp, clip=True)
if self.assignment == "modal"
else soft_resp
)
# 3) M-step on the structural model
self.m_step_structural(resp, Y)
elif self.n_steps == 3 and self.correction == "BCH":
# Three-step estimation with BCH correction
# 1) Fit the measurement model
self.em(X)
# 2) Assign class probabilities
soft_resp = self.predict_proba_class(X)
# Apply BCH correction
_, D_inv = compute_bch_matrix(soft_resp, self.assignment)
# Modal assignment (clipped for numerical reasons) or assignment as is (soft)
resp = (
utils.modal(soft_resp, clip=True)
if self.assignment == "modal"
else soft_resp
)
resp = resp @ D_inv
# 3) M-step on the structural model
self.m_step_structural(resp, Y)
elif self.n_steps == 3 and self.correction == "ML":
# Three-step estimation with ML correction
# 1) Fit the measurement model
self.em(X)
# 2) Assign class probabilities
soft_resp = self.predict_proba_class(X)
# Compute log_emission_pm
log_emission_pm = compute_log_emission_pm(soft_resp, self.assignment)
# 3) Fit the structural model by keeping the parameters of the measurement model fixed
self.em(X, Y, freeze_measurement=True, log_emission_pm=log_emission_pm)
# Print report if required
if self.verbose == 1:
self.report(X, Y, sample_weight=sample_weight)
return self
[docs] def report(self, X, Y=None, sample_weight=None):
"""Print detailed report of the model and performance metrics.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
"""
x_names = self.x_names_ if hasattr(self, "x_names_") else None
y_names = self.y_names_ if hasattr(self, "y_names_") else None
utils.print_report(self, X, Y, sample_weight, x_names, y_names)
[docs] def em(
self,
X,
Y=None,
sample_weight=None,
freeze_measurement=False,
log_emission_pm=None,
):
"""EM algorithm to fit the weights, measurement parameters and structural parameters.
Adapted from the fit_predict method of the sklearn BaseMixture class to include (optional) structural model
computations.
Setting Y=None will run EM on the measurement model only. Providing both X and Y will run EM on the complete
model, unless otherwise specified by freeze_measurement.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
freeze_measurement : bool, default =False
Run EM on the complete model, but do not update measurement model parameters.
Useful for 2-step estimation and 3-step with ML correction.
log_emission_pm : ndarray of shape (n, n_components), default=None
Log probabilities of the predicted class given the true latent class for ML correction.
"""
self._check_initial_parameters(X)
# First validate the input and the class attributes
X, Y = self._check_x_y(X, Y, reset=True)
n_samples = X.shape[0]
# If sample weights exist, convert them to array (support for lists)
# and check length
# Otherwise set them to 1 for all examples
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype, copy=True)
if n_samples < self.n_components:
raise ValueError(
"Expected n_samples >= n_components "
f"but got n_components = {self.n_components}, "
f"n_samples = {X.shape[0]}"
)
# Set up useful values for optimization
random_state = check_random_state(self.random_state)
max_lower_bound = -np.inf
self.converged_ = False
# Run multiple restarts
if self.progress_bar:
print("Fitting StepMix...")
if self.progress_bar == 2:
print(
"The Iteration bar may update too quickly to be visualized depending on dataset size and StepMix settings.\n"
)
tqdm_init = tqdm.trange(
self.n_init, disable=not self.progress_bar, desc="Initializations (n_init) "
)
for init in tqdm_init:
if not freeze_measurement:
self._initialize_parameters(X, random_state) # Measurement model
if Y is not None:
self._initialize_parameters_structural(
Y, random_state
) # Structural Model
lower_bound = -np.inf
# EM iterations
tqdm_iter = tqdm.tqdm(
range(1, self.max_iter + 1),
disable=self.progress_bar < 2,
desc="Iterations (max_iter) ",
leave=False,
)
for n_iter in tqdm_iter:
prev_lower_bound = lower_bound
# E-step
log_prob_norm, log_resp = self._e_step(
X, Y=Y, sample_weight=sample_weight, log_emission_pm=log_emission_pm
)
# M-step
self._m_step(
X,
np.exp(log_resp),
Y,
sample_weight=sample_weight,
freeze_measurement=freeze_measurement,
)
# Likelihood & stopping criterion
lower_bound = log_prob_norm
change = lower_bound - prev_lower_bound
rel_change = change / lower_bound
# if both an absolute and a relative tolerance threshold are given, the EM algorithm stops
# as soon as one of them is respected
if abs(change) < self.abs_tol or abs(rel_change) < self.rel_tol:
self.converged_ = True
break
# Ask tqdm to display current max lower bound
ll = (
lower_bound * np.sum(sample_weight)
if sample_weight is not None
else lower_bound * n_samples
)
tqdm_iter.set_postfix(avg_LL=lower_bound, LL=ll)
# Update buffers
current_parameters = self.get_parameters()
if (
lower_bound > max_lower_bound
or max_lower_bound == -np.inf
or np.isnan(max_lower_bound)
):
max_lower_bound = lower_bound
best_params = current_parameters
best_n_iter = n_iter
if self.save_param_init:
self.param_buffer_.append(copy.deepcopy(current_parameters))
# Save lower bound
ll, _ = self._e_step(
X, Y=Y, sample_weight=sample_weight, log_emission_pm=log_emission_pm
)
self.lower_bound_buffer_.append(ll)
# Ask tqdm to display current max lower bound
max_ll = (
max_lower_bound * np.sum(sample_weight)
if sample_weight is not None
else max_lower_bound * n_samples
)
tqdm_init.set_postfix(max_avg_LL=max_lower_bound, max_LL=max_ll)
if not self.converged_:
warnings.warn(
"Initializations did not converge. "
"Try different init parameters, "
"or increase max_iter, abs_tol, rel_tol "
"or check for degenerate data.",
ConvergenceWarning,
)
self.set_parameters(best_params)
self.n_iter_ = best_n_iter
self.lower_bound_ = max_lower_bound
def _e_step(self, X, Y=None, sample_weight=None, log_emission_pm=None):
"""E-step of the EM algorithm to compute posterior probabilities.
Setting Y=None will ignore the structural likelihood.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
log_emission_pm : ndarray of shape (n, n_components), default=None
Log probabilities of the predicted class given the true latent class for ML correction. If provided, the
measurement model likelihood is ignored and this is used instead.
Returns
----------
avg_ll: float,
Weighted average log likelihood over samples .
log_resp: ndarray of shape (n_samples, n_components)
Log responsibilities, i.e., log posterior probabilities over the latent classes.
"""
# Measurement log-likelihood
if log_emission_pm is not None:
# Use log probabilities of the predicted class given the true latent class for ML correction
log_resp = log_emission_pm.copy()
else:
# Standard Measurement Log likelihood
log_resp = self._mm.log_likelihood(X)
# If Y is not provided, we are only using the MM. Use the full likelihood
# Also use the full likelihood if the model is not conditional
if Y is None or not self._conditional_likelihood:
# Add class prior probabilities
log_resp += np.log(self.weights_).reshape((1, -1))
# Add structural model likelihood (if structural data is provided)
if Y is not None:
log_resp += self._sm.log_likelihood(Y)
# Log-likelihood
ll = logsumexp(log_resp, axis=1)
# Normalization
with np.errstate(under="ignore"):
# ignore underflow
log_resp -= ll.reshape((-1, 1))
return np.average(ll, weights=sample_weight), log_resp
def _m_step(self, X, resp, Y=None, sample_weight=None, freeze_measurement=False):
"""M-step of the EM algorithm to compute maximum likelihood estimators
Update parameters of self._mm (measurement) and optionally self._sm (structural).
Setting Y=None will ignore the structural likelihood. freeze_measurement allows to only update the
structural model.
Parameters
----------
X : array-like of shape (n_samples, n_features)
resp : ndarray of shape (n_samples, n_components)
Responsibilities, i.e., posterior probabilities over the latent classes.
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
freeze_measurement : bool, default =False
Do not update the parameters of the measurement model.
"""
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype, copy=True)
if not freeze_measurement:
# Update measurement model parameters
self.weights_ = np.clip(
np.average(resp, weights=sample_weight, axis=0), 1e-15, 1 - 1e-15
)
self._mm.m_step(X, resp * sample_weight[:, np.newaxis])
if Y is not None:
# Update structural model parameters
self._sm.m_step(Y, resp * sample_weight[:, np.newaxis])
[docs] def m_step_structural(self, resp, Y, sample_weight=None):
"""M-step for the structural model only.
Handy for 3-step estimation.
Parameters
----------
resp : ndarray of shape (n_samples, n_components)
Responsibilities, i.e., posterior probabilities over the latent classes of each point in Y.
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
"""
check_is_fitted(self)
_, Y = self._check_x_y(None, Y, reset=True)
sample_weight = _check_sample_weight(sample_weight, Y, dtype=Y.dtype, copy=True)
# For the third step of the 3-step approach
random_state = check_random_state(self.random_state)
self._initialize_parameters_structural(Y, random_state=random_state)
self._sm.m_step(Y, resp * sample_weight[:, np.newaxis])
[docs] def bootstrap(
self,
X,
Y=None,
n_repetitions=1000,
sample_weight=None,
parametric=False,
sampler=None,
identify_classes=True,
progress_bar=True,
random_state=None,
):
"""Parametric or Non-parametric boostrap of this estimator.
Fit n_repetitions clones of the estimator on resampled datasets.
If identify_classes=True, repeated parameter estimates are aligned with the class order of the main estimator using
a permutation search.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
Array of weights that are assigned to individual samples.
If not provided, then each sample is given unit weight. Ignored if parametric=True.
n_repetitions: int
Number of repetitions to fit.
parametric: bool, default=False
Use parametric bootstrap instead of non-parametric. Data will be generated by sampling the estimator.
sampler: bool, default=None
Another fitted estimator to use for sampling instead of the main estimator. Only used for parametric
bootstrapping.
identify_classes: bool, default=True
Run a permutation test to align the classes of the repetitions to the classes of the main estimator. This is
required if inference on the model parameters is needed, but can be turned off if only the likelihood needs
to be bootstrapped to save computations.
progress_bar : bool, default=True
Display a tqdm progress bar for repetitions.
progress_bar : bool, default=True
Display a tqdm progress bar for repetitions.
random_state : int, default=None
If none, use self.random_state.
Returns
----------
samples: DataFrame
Parameter DataFrame for all repetitions. Follows the convention of StepMix.get_parameters_df() with an additional
'rep' column.
rep_stats: DataFrame
Likelihood statistics of each repetition.
"""
check_is_fitted(self)
if random_state is None:
random_state = self.random_state
return bootstrap(
self,
X,
Y,
n_repetitions=n_repetitions,
sample_weight=sample_weight,
parametric=parametric,
sampler=sampler,
identify_classes=identify_classes,
progress_bar=progress_bar,
random_state=random_state,
)
[docs] def bootstrap_stats(
self,
X,
Y=None,
n_repetitions=1000,
sample_weight=None,
parametric=False,
progress_bar=True,
):
"""Bootstrapping of a StepMix estimator. Obtain boostrapped parameters and some statistics
(mean and standard deviation).
If a covariate model is used in the structural model, the output keys "cw_mean" and "cw_std" are omitted.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
sample_weight : array-like of shape(n_samples,), default=None
n_repetitions: int
Number of repetitions to fit.
parametric: bool, default=False
Use parametric bootstrap instead of non-parametric. Data will be generated by sampling the estimator.
progress_bar : bool, default=True
Display a tqdm progress bar for repetitions.
Returns
----------
bootstrap_and_stats: dict,
Dictionary of dataframes {
'samples': Parameters estimated by self.boostrap in a long-form DataFrame,\
'rep_stats': Likelihood statistics of each repetition provided by self.boostrap,\
'cw_mean': Bootstrapped means of the class weights,\
'cw_std': Bootstrapped standard deviations of the class weights,\
'mm_mean': Bootstrapped means of the measurement model parameters,\
'mm_std': Bootstrapped standard deviations of the measurement model parameters,\
'sm_mean': Bootstrapped means of the structural model parameters,\
'sm_std': Bootstrapped standard deviations of the structural model parameters,\
}.
"""
bootstrap_df, bootstrap_stats = self.bootstrap(
X=X,
Y=Y,
n_repetitions=n_repetitions,
sample_weight=sample_weight,
parametric=parametric,
progress_bar=progress_bar,
)
mm_data = bootstrap_df.loc["measurement"].drop(
index="class_weights", level=0, errors="ignore"
)
result = dict()
result["samples"] = bootstrap_df
result["rep_stats"] = bootstrap_stats
result["mm_mean"] = self._pivot_param(mm_data, np.mean)
result["mm_std"] = self._pivot_param(mm_data, np.std)
if hasattr(self, "_sm"):
sm_data = bootstrap_df.loc["structural"]
result["sm_mean"] = self._pivot_param(sm_data, np.mean)
result["sm_std"] = self._pivot_param(sm_data, np.std)
if not self._conditional_likelihood:
cw_data = bootstrap_df.loc["measurement", "class_weights"]
result["cw_mean"] = self._pivot_cw(cw_data, np.mean)
result["cw_std"] = self._pivot_cw(cw_data, np.std)
return result
def _pivot_param(self, df, aggfunc=np.mean):
# Standard pivot function that we reuse for bootstrapping
return pd.pivot_table(
df,
columns="class_no",
values="value",
index=["model_name", "param", "variable"],
aggfunc=aggfunc,
)
def _pivot_cw(self, df, aggfunc=np.std):
# Standard pivot function that we reuse for bootstrapping
return pd.pivot_table(
df, columns="class_no", values="value", index=["param"], aggfunc=aggfunc
)
########################################################################################################################
# INFERENCE
[docs] def score(self, X, Y=None, sample_weight=None):
"""Compute the average log-likelihood over samples.
Setting Y=None will ignore the structural likelihood.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Y : array-like of shape (n_samples, n_features_structural), default=None
List of n_features-dimensional data points to fit the structural model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
sample_weight : array-like of shape(n_samples,), default=None
Array of weights that are assigned to individual samples.
If not provided, then each sample is given unit weight.
Returns
----------
avg_ll: float
Average log likelihood over samples.
"""
check_is_fitted(self)
X, Y = self._check_x_y(X, Y)
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype, copy=True)
avg_ll, _ = self._e_step(X, Y=Y, sample_weight=sample_weight)
return avg_ll
[docs] def bic(self, X, Y=None):
"""Bayesian information criterion for the current model on the measurement data X and optionally the structural
data Y.
Adapted from https://github.com/scikit-learn/scikit-learn/blob/baf0ea25d6dd034403370fea552b21a6776bef18/sklearn/mixture/_base.py
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
Returns
-------
bic : float
The lower the better.
"""
return -2 * self.score(X, Y) * X.shape[0] + self.n_parameters * np.log(
X.shape[0]
)
[docs] def aic(self, X, Y=None):
"""Akaike information criterion for the current model on the measurement data X and optionally the structural
data Y.
Adapted from https://github.com/scikit-learn/scikit-learn/blob/baf0ea25d6dd034403370fea552b21a6776bef18/sklearn/mixture/_base.py
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
Returns
-------
aic : float
The lower the better.
"""
return -2 * self.score(X, Y) * X.shape[0] + 2 * self.n_parameters
[docs] def entropy(self, X, Y=None):
"""Entropy of the posterior over latent classes.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
Returns
-------
entropy : float
"""
check_is_fitted(self)
resp = self.predict_proba_class(X, Y)
resp = np.clip(resp, 1e-15, 1 - 1e-15)
return -1 * np.sum(resp * np.log(resp))
[docs] def relative_entropy(self, X, Y=None):
"""Scaled Relative Entropy of the posterior over latent classes.
Ramaswamy et al., 1993.
1 - entropy / (n_samples * log(n_components))
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
Returns
-------
relative_entropy : float
"""
entropy = self.entropy(X, Y)
n_samples = X.shape[0]
rel_entropy = (
1 - entropy / (n_samples * np.log(self.n_components))
if self.n_components > 1
else np.nan
)
return rel_entropy
[docs] def sabic(self, X, Y=None):
"""Sample-Sized Adjusted BIC.
References
----------
Sclove SL. Application of model-selection criteria to some problems in multivariate analysis. Psychometrika. 1987;52(3):333–343.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
Returns
-------
ssa_bic : float
"""
n = X.shape[0]
return -2 * self.score(X, Y) * n + self.n_parameters * np.log(
n * ((n + 2) / 24)
)
[docs] def caic(self, X, Y=None):
"""Consistent AIC.
References
----------
Bozdogan, H. 1987. Model selection and Akaike’s information criterion (AIC):
The general theory and its analytical extensions. Psychometrika 52: 345–370.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Y : array-like of shape (n_samples, n_features_structural), default=None
Returns
-------
caic : float
The lower the better.
"""
n = X.shape[0]
return -2 * self.score(X, Y) * n + self.n_parameters * (np.log(n) + 1)
[docs] def predict_class(self, X, Y=None):
"""Predict the cluster/latent class/component labels for the data samples in X.
Optionally, an array-like Y can be provided to predict the labels based on both the measurement and structural
models.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Y : array-like of shape (n_samples, n_features_structural), default=None
List of n_features-dimensional data points to fit the structural model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
labels : array, shape (n_samples,)
Component labels.
"""
return self.predict_proba_class(X, Y).argmax(axis=1)
[docs] def predict_proba_class(self, X, Y=None):
"""Predict the latent class probabilities for the data samples in X using the measurement model.
Optionally, an array-like Y can be provided to predict the posterior based on both the measurement and structural
models.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Y : array-like of shape (n_samples, n_features_structural), default=None
List of n_features-dimensional data points to fit the structural model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
resp : array, shape (n_samples, n_components)
P(class|X, Y) for each sample in X.
"""
check_is_fitted(self)
X, Y = self._check_x_y(X, Y)
_, log_resp = self._e_step(X, Y=Y)
return np.exp(log_resp)
[docs] def predict_Y(self, X):
"""Call the predict method of the structural model to predict argmax P(Y|X) (Supervised prediction).
Inference over Y is only supported if the structural model (sm) is set to 'binary', 'binary_nan', 'categorical', or 'categorical_nan'.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
predictions : array, shape (n_samples, n_columns)
Y predictions.
"""
if not hasattr(self, "_sm"):
raise ValueError("Calling predict_Y requires a structural model.")
check_is_fitted(self)
X, _ = self._check_x_y(X, None)
_, log_resp = self._e_step(X, Y=None)
return self._sm.predict(log_resp)
[docs] def predict_proba_Y(self, X):
"""Call the predict method of the structural model to predict the full conditional P(Y|X).
Inference over Y is only supported if the structural model (sm) is set to 'binary', 'binary_nan', 'categorical', or 'categorical_nan'.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
conditional : array, shape (n_samples, n_columns)
P(Y|X).
"""
if not hasattr(self, "_sm"):
raise ValueError("Calling predict_proba_Y requires a structural model.")
check_is_fitted(self)
X, _ = self._check_x_y(X, None)
_, log_resp = self._e_step(X, Y=None)
return self._sm.predict_proba(log_resp)
[docs] def predict(self, X, Y=None):
"""Predict the cluster/latent class/component labels for the data samples in X.
Optionally, an array-like Y can be provided to predict the labels based on both the measurement and structural
models.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Y : array-like of shape (n_samples, n_features_structural), default=None
List of n_features-dimensional data points to fit the structural model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
labels : array, shape (n_samples,)
Component labels.
"""
return self.predict_class(X, Y)
[docs] def predict_proba(self, X, Y=None):
"""Predict the latent class probabilities for the data samples in X using the measurement model.
Optionally, an array-like Y can be provided to predict the posterior based on both the measurement and structural
models.
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Y : array-like of shape (n_samples, n_features_structural), default=None
List of n_features-dimensional data points to fit the structural model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
resp : array, shape (n_samples, n_components)
P(class|X, Y) for each sample in X.
"""
return self.predict_proba_class(X, Y)
[docs] def sample(self, n_samples, labels=None):
"""Sample method for fitted StepMix model.
Adapted from the sklearn BaseMixture sample method.
Parameters
----------
n_samples : int
Number of samples.
labels : ndarray of shape (n_samples,)
Predetermined class labels. Will ignore class weights if provided.
Returns
-------
X : array-like of shape (n_samples, n_columns)
Measurement samples.
Y : array-like of shape (n_samples, n_columns_structural)
Structural samples.
labels : ndarray of shape (n_samples,)
Ground truth class membership.
"""
check_is_fitted(self)
# Validate n_samples argument
utils.check_type(int, n_samples=n_samples)
if n_samples < 1:
raise ValueError(
"Invalid value for 'n_samples': %d . The sampling requires at "
"least one sample." % (self.n_components)
)
# Covariate sampling is not supported
# You need to first sample input data, then apply the covariate model to infer weights
if self.structural == "covariate":
raise NotImplementedError(
"Sampling for the covariate model is not implemented."
)
# Sample
# Use measurement model generator
rng = self._mm.random_state
if labels is None:
n_samples_comp = rng.multinomial(n_samples, self.weights_)
else:
classes, n_samples_comp = np.unique(labels, return_counts=True)
X = np.vstack(
[self._mm.sample(c, int(sample)) for c, sample in enumerate(n_samples_comp)]
)
if hasattr(self, "_sm"):
Y = np.vstack(
[
self._sm.sample(c, int(sample))
for c, sample in enumerate(n_samples_comp)
]
)
else:
Y = None
# Also return labels
labels_ret = []
for i, n in enumerate(n_samples_comp):
labels_ret += [i] * n
labels_ret = np.array(labels_ret)
if labels is not None:
# Reorder samples according to provided labels
X_new = np.zeros_like(X)
if Y is not None:
# Optional structural data
Y_new = np.zeros_like(Y)
for i, c in enumerate(classes):
mask = labels_ret == i
mask_labels = labels == c
X_new[mask_labels] = X[mask]
if Y is not None:
# Optional structural data
Y_new[mask_labels] = Y[mask]
labels_ret = labels
X = X_new
if Y is not None:
# Optional structural data
Y = Y_new
else:
# Shuffle everything
shuffle_mask = rng.permutation(X.shape[0])
X, labels_ret = (
X[shuffle_mask],
labels_ret[shuffle_mask],
)
if Y is not None:
# Optional structural data
Y = Y[shuffle_mask]
return X, Y, labels_ret
[docs]class StepMixClassifier(StepMix):
"""StepMix Supervised Classifier
Identical to a StepMix estimator, but we remap predict and predict_proba
to perform inference over Y instead of the latent class. This follows the supervised
learning convention of sklearn.
We call this a classifier since inference over Y is only supported if the structural
model (sm) is set to 'binary', 'binary_nan', 'categorical', or 'categorical_nan'.
Also works with the aliases 'bernoulli', 'bernoulli_nan', 'multinoulli' and 'multinoulli_nan'."""
def _check_initial_parameters(self, X):
utils.check_in(
[
"binary",
"binary_nan",
"bernoulli",
"bernoulli_nan",
"categorical",
"categorical_nan",
"multinoulli",
"multinoulli_nan",
],
structural=self.structural,
)
super()._check_initial_parameters(X)
[docs] def predict(self, X):
"""Call the predict method of the structural model to predict argmax P(Y|X) (Supervised prediction).
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
predictions : array, shape (n_samples, n_columns)
Y predictions.
"""
return self.predict_Y(X)
[docs] def predict_proba(self, X):
"""Call the predict method of the structural model to predict the full conditional P(Y|X).
Parameters
----------
X : array-like of shape (n_samples, n_features)
List of n_features-dimensional data points to fit the measurement model. Each row
corresponds to a single data point. If the data is categorical, by default it should be
0-indexed and integer encoded (not one-hot encoded).
Returns
-------
conditional : array, shape (n_samples, n_columns)
P(Y|X).
"""
return self.predict_proba_Y(X)