"""Nested emission model with support for multiple random variables."""
import copy
import numpy as np
import pandas as pd
from .emission import Emission
[docs]class Nested(Emission):
"""Nested emission model.
The descriptor must be a dict of dicts, where the nested dicts hold arguments for nested models. Each nested dict
is expected to have a model key referring to a valid emission model as well as
an n_columns key describing the number of columns (i.e. features for univariate variables or
features*n_outcomes for one-hot encoded variables) associated with that model.
For example, a model where the first 3 features are gaussian with unit variance, the next 3 are multinoulli
with 5 possible outcomes (for a total of 3*5=15 columns) and the last 4 are covariates would be described likeso :
.. code-block:: python
descriptor = {
'model_1': {
'model': 'gaussian_unit',
'n_columns':3
},
'model_2': {
'model': 'multinoulli',
'n_columns': 15,
'n_outcomes': 5
},
'model_3': {
'model': 'covariate',
'n_columns': 4,
'method': "newton-raphson",
'lr': 1e-3,
}
}
The above model would then expect an n_samples x 22 matrix as input (3 + 15 + 4 = 22) where columns follow the same
order of declaration (i.e., the columns of model_1 are first, columns of model_2 come after etc.).
As demonstrated by the covariate argument, additional arguments can be specified and are passed to the
associated Emission class. Particularly useful to specify optimization parameters for
:class:`stepmix.emission.covariate.Covariate`.
"""
def __init__(self, descriptor, emission_dict, n_components, random_state, **kwargs):
super(Nested, self).__init__(n_components, random_state)
descriptor = copy.deepcopy(
descriptor
) # Make sure we copy descriptor to avoid affecting original
self.models = dict()
self.columns_per_model = list()
self.n_components = n_components
self.random_state = random_state
# Build the nested models
for key, item in descriptor.items():
# Read in model type and the number of features. Other keys are used as arguments
model = item.pop("model")
n_columns = item.pop("n_columns")
# Build model
m = emission_dict[model](
n_components=self.n_components, random_state=self.random_state, **item
)
# Save model and features
self.models[key] = m
self.columns_per_model.append(n_columns)
[docs] def initialize(self, X, resp, random_state=None):
i = 0
for m, range_ in zip(self.models.values(), self.columns_per_model):
# Slice columns to call the m-step only on the appropriate features
m.initialize(X[:, i : i + range_], resp, random_state)
i += range_
[docs] def m_step(self, X, resp):
i = 0
for m, range_ in zip(self.models.values(), self.columns_per_model):
# Slice columns to call the m-step only on the appropriate features
m.m_step(X[:, i : i + range_], resp)
i += range_
[docs] def log_likelihood(self, X):
i = 0
log_eps = np.zeros((X.shape[0], self.n_components))
for m, range_ in zip(self.models.values(), self.columns_per_model):
# Slice columns to compute the log-likelihood only on the appropriate columns
log_eps += m.log_likelihood(X[:, i : i + range_])
i += range_
return log_eps
[docs] def sample(self, class_no, n_samples):
acc = list()
for m in self.models.values():
acc.append(m.sample(class_no, n_samples))
return np.hstack(acc)
[docs] def get_parameters(self):
parameters = dict()
for key, m in self.models.items():
parameters[key] = m.get_parameters()
return parameters
[docs] def set_parameters(self, parameters):
for key, item in parameters.items():
self.models[key].set_parameters(item)
[docs] def print_parameters(self, indent=1, feature_names=None):
if feature_names is None:
n_columns = sum(self.columns_per_model)
feature_names = self.get_default_feature_names(n_columns)
i = 0
for name, m, range_ in zip(
self.models.keys(), self.models.values(), self.columns_per_model
):
# Slice parameter names to get the right column names for this submodel
f_i = feature_names[i : i + range_]
m.print_parameters(indent, model_name=name, feature_names=f_i)
i += range_
print("\n")
@property
def n_parameters(self):
n = 0
for m in self.models.values():
n += m.n_parameters
return n
[docs] def permute_classes(self, perm, axis=0):
for key, item in self.models.items():
self.models[key].permute_classes(perm)
[docs] def get_parameters_df(self, feature_names=None):
df_list = list()
if feature_names is None:
n_columns = sum(self.columns_per_model)
feature_names = self.get_default_feature_names(n_columns)
i = 0
for name, m, range_ in zip(
self.models.keys(), self.models.values(), self.columns_per_model
):
# Slice parameter names to get the right column names for this submodel
f_i = feature_names[i : i + range_]
df_i = m.get_parameters_df(f_i)
df_i["model_name"] = name
df_list.append(df_i)
i += range_
return pd.concat(df_list)