"""Nested emission model with support for multiple random variables."""
import copy
import numpy as np
import pandas as pd
from .emission import Emission
[docs]class Nested(Emission):
"""Nested emission model.
The descriptor must be a dict of dicts, where the nested dicts hold arguments for nested models. Each nested dict
is expected to have a model key referring to a valid emission model as well as
an n_columns key describing the number of columns (i.e. features for univariate variables or
features*n_outcomes for one-hot encoded variables) associated with that model.
For example, a model where the first 3 features are gaussian with unit variance, the next 3 are multinoulli
with 5 possible outcomes (for a total of 3*5=15 columns) and the last 4 are covariates would be described likeso :
.. code-block:: python
descriptor = {
'model_1': {
'model': 'gaussian_unit',
'model_2': {
'model': 'multinoulli',
'n_columns': 15,
'n_outcomes': 5
'model_3': {
'model': 'covariate',
'n_columns': 4,
'method': "newton-raphson",
'lr': 1e-3,
The above model would then expect an n_samples x 22 matrix as input (3 + 15 + 4 = 22) where columns follow the same
order of declaration (i.e., the columns of model_1 are first, columns of model_2 come after etc.).
As demonstrated by the covariate argument, additional arguments can be specified and are passed to the
associated Emission class. Particularly useful to specify optimization parameters for
def __init__(self, descriptor, emission_dict, n_components, random_state, **kwargs):
super(Nested, self).__init__(n_components, random_state)
descriptor = copy.deepcopy(
) # Make sure we copy descriptor to avoid affecting original
self.models = dict()
self.columns_per_model = list()
self.n_components = n_components
self.random_state = random_state
# Build the nested models
for key, item in descriptor.items():
# Read in model type and the number of features. Other keys are used as arguments
model = item.pop("model")
n_columns = item.pop("n_columns")
# Build model
m = emission_dict[model](
n_components=self.n_components, random_state=self.random_state, **item
# Save model and features
self.models[key] = m
[docs] def initialize(self, X, resp, random_state=None):
i = 0
for m, range_ in zip(self.models.values(), self.columns_per_model):
# Slice columns to call the m-step only on the appropriate features
m.initialize(X[:, i : i + range_], resp, random_state)
i += range_
[docs] def m_step(self, X, resp):
i = 0
for m, range_ in zip(self.models.values(), self.columns_per_model):
# Slice columns to call the m-step only on the appropriate features
m.m_step(X[:, i : i + range_], resp)
i += range_
[docs] def log_likelihood(self, X):
i = 0
log_eps = np.zeros((X.shape[0], self.n_components))
for m, range_ in zip(self.models.values(), self.columns_per_model):
# Slice columns to compute the log-likelihood only on the appropriate columns
log_eps += m.log_likelihood(X[:, i : i + range_])
i += range_
return log_eps
[docs] def sample(self, class_no, n_samples):
acc = list()
for m in self.models.values():
acc.append(m.sample(class_no, n_samples))
return np.hstack(acc)
[docs] def get_parameters(self):
parameters = dict()
for key, m in self.models.items():
parameters[key] = m.get_parameters()
return parameters
[docs] def set_parameters(self, parameters):
for key, item in parameters.items():
[docs] def print_parameters(self, indent=1, feature_names=None):
if feature_names is None:
n_columns = sum(self.columns_per_model)
feature_names = self.get_default_feature_names(n_columns)
i = 0
for name, m, range_ in zip(
self.models.keys(), self.models.values(), self.columns_per_model
# Slice parameter names to get the right column names for this submodel
f_i = feature_names[i : i + range_]
m.print_parameters(indent, model_name=name, feature_names=f_i)
i += range_
def n_parameters(self):
n = 0
for m in self.models.values():
n += m.n_parameters
return n
[docs] def permute_classes(self, perm, axis=0):
for key, item in self.models.items():
[docs] def get_parameters_df(self, feature_names=None):
df_list = list()
if feature_names is None:
n_columns = sum(self.columns_per_model)
feature_names = self.get_default_feature_names(n_columns)
i = 0
for name, m, range_ in zip(
self.models.keys(), self.models.values(), self.columns_per_model
# Slice parameter names to get the right column names for this submodel
f_i = feature_names[i : i + range_]
df_i = m.get_parameters_df(f_i)
df_i["model_name"] = name
i += range_
return pd.concat(df_list)