Source code for stepmix.datasets

"""Various synthetic datasets."""
import warnings

import numpy as np
from sklearn.utils.validation import check_random_state
from scipy.special import softmax
import pandas as pd

from .stepmix import StepMix


[docs]def random_nan(X, Y, nan_ratio, random_state=None): """Randomly replace values in X and Y with NaNs with probability nan_ratio.""" (n_samples, n_mm), n_sm = X.shape, Y.shape[1] rng = np.random.default_rng(random_state) observed_mask = rng.random((n_samples, n_mm + n_sm)) > nan_ratio if not observed_mask.sum(axis=1).all(): warnings.warn( "Some samples are completely unobserved. This will likely result in downstream errors. Reduce the nan_ratio or try another" " seed." ) mm_mask, sm_mask = observed_mask[:, :n_mm], observed_mask[:, n_mm:] X = np.where(mm_mask, X, np.nan) Y = np.where(sm_mask, Y, np.nan) return X, Y
[docs]def bakk_measurements(n_classes, n_mm, sep_level): """Binary measurement parameters in Bakk 2018. Parameters ---------- n_classes: int Number of latent classes. Use 3 for the paper simulation. n_mm: int Number of features in the measurement model. Use 6 for the paper simulation. sep_level : float Separation level in the measurement data. Use .7, .8 or .9 for the paper simulation. Returns ------- pis : ndarray of shape (n_mm, n_classes) Conditional bernoulli probabilities. """ pis = np.zeros((n_mm, n_classes)) pis[:, 0] = sep_level pis[: int(n_mm / 2), 1] = sep_level pis[int(n_mm / 2) :, 1] = 1 - sep_level pis[:, 2] = 1 - sep_level return pis
[docs]def data_bakk_response(n_samples, sep_level, n_classes=3, n_mm=6, random_state=None): """Simulated data for the response simulations in Bakk 2018. Parameters ---------- n_samples : int Number of samples. sep_level : float Separation level in the measurement data. Use .7, .8 or .9 for the paper simulation. n_classes: int Number of latent classes. Use 3 for the paper simulation. n_mm: int Number of features in the measurement model. Use 6 for the paper simulation. random_state: int Random state. Returns ------- X : ndarray of shape (n_samples, n_mm) Binary measurement samples. Y : ndarray of shape (n_samples, 1) Response structural samples. labels : ndarray of shape (n_samples,) Ground truth class membership. References ---------- Bakk, Z. and Kuha, J. Two-step estimation of models between latent classes and external variables. Psychometrika, 83(4):871–892, 2018 """ n_sm = 1 # Always 1 structural feature # Measurement probabilities pis = bakk_measurements(n_classes, n_mm, sep_level) # Structural means means = [[-1], [1], [0]] # Model parameters params = dict( weights=np.ones(n_classes) / n_classes, measurement=dict(pis=pis.T), structural=dict(means=np.array(means)), measurement_in=n_mm, structural_in=n_sm, ) # Sample data generator = StepMix( n_components=n_classes, measurement="bernoulli", structural="gaussian_unit", random_state=random_state, ) generator.set_parameters(params) X, Y, labels = generator.sample(n_samples) return X, Y, labels
[docs]def data_bakk_covariate(n_samples, sep_level, n_mm=6, random_state=None): """Simulated data for the covariate simulations in Bakk 2018. Parameters ---------- n_samples : int Number of samples. sep_level : float Separation level in the measurement data. Use .7, .8 or .9 for the paper simulation. n_mm: int Number of features in the measurement model. Use 6 for the paper simulation. random_state: int Random state. Returns ------- X : ndarray of shape (n_samples, n_mm) Binary measurement samples. Y : ndarray of shape (n_samples, 1) Covariate structural samples. labels : ndarray of shape (n_samples,) Ground truth class membership. References ---------- Bakk, Z. and Kuha, J. Two-step estimation of models between latent classes and external variables. Psychometrika, 83(4):871–892, 2018 """ rng = check_random_state(random_state) n_sm = 1 # Always one structural feature n_classes = 3 # Always 3 latent classes # Regression parameters beta = np.array([0, -1, 1]) # Intercepts are adjusted so that class sizes are equal in average intercepts = np.array([0.0, 2.34459467, -3.65540533]) # Sample covariate # Uniform integer 1-5 (see p.15 Bakk 2017) Y = rng.randint(low=1, high=6, size=(n_samples, n_sm)) # Scores logits = Y * beta.reshape((1, -1)) + intercepts # Latent class (Ground truth class membership) # probabilistic assignment ( realizations from the distribution of the r.v label_i|Y_i ) probas = softmax( logits, axis=1 ) # probas[n,c] = probability for unit n to be assigned to class c cumul_probas = np.cumsum(probas, axis=1) bool_tab = (cumul_probas - np.tile(rng.rand(n_samples), (n_classes, 1)).T) >= 0 labels = -np.sum(bool_tab, axis=1) + n_classes # labels = probas.argmax(axis=1) # Old deterministic assignment # Measurement probabilities pis = bakk_measurements(n_classes, n_mm, sep_level) # Measurement model parameters params = dict( weights=np.ones(n_classes) / n_classes, # Spoof. Will be ignored measurement=dict(pis=pis.T), measurement_in=n_mm, ) # Sample data generator = StepMix( n_components=n_classes, measurement="bernoulli", random_state=random_state ) generator.set_parameters(params) X, _, labels_new = generator.sample(n_samples, labels=labels) return X, Y, labels
[docs]def data_bakk_complete(n_samples, sep_level, n_mm=6, random_state=None, nan_ratio=0.0): """Stitch together data_bakk_covariate and data_bakk_response to get a complete model.""" # Get covariate data X, Y_p, labels = data_bakk_covariate(n_samples, sep_level, n_mm, random_state) # Now use the labels to conditionally sample from a response model means = [[-1], [1], [0]] # Model parameters params = dict( weights=np.ones(3) / 3, measurement=dict(means=np.array(means)), measurement_in=1, ) # Sample data generator = StepMix( n_components=3, measurement="gaussian_unit", random_state=random_state, ) generator.set_parameters(params) Y_o, _, _ = generator.sample(n_samples, labels=labels) # Drop some values in measurements and response if nan_ratio: X, Y_o = random_nan(X, Y_o, nan_ratio, random_state=random_state) return X, np.hstack((Y_p, Y_o)), labels
[docs]def data_bakk_complex(n_samples, sep_level, random_state=None, nan_ratio=0.0): """Build a simulated example with mixed data and missing values. Measurements: 3 binary variables + 1 continuous variable. Structural: 3 binary response variables + 1 continuous response variable + 1 covariate. Missing values everywhere except in the covariate. Return data as a dataframe. """ # Get covariate data X, Y_p, labels = data_bakk_covariate(n_samples, sep_level, 6, random_state) # Now use the labels to conditionally sample from a response model means = [[-1, 2], [1, -2], [0, 0]] # Model parameters params = dict( weights=np.ones(3) / 3, measurement=dict(means=np.array(means)), measurement_in=2, ) # Sample data generator = StepMix( n_components=3, measurement="gaussian_unit", random_state=random_state, ) generator.set_parameters(params) Y_o, _, _ = generator.sample(n_samples, labels=labels) # Drop some values in measurements and response if nan_ratio: X, Y_o = random_nan(X, Y_o, nan_ratio, random_state=random_state) # Dispatch binary variables in measurement and structural models X_mixed = np.hstack((X[:, :3], Y_o[:, [1]])) Y_mixed = np.hstack((X[:, 3:], Y_o[:, [0]], Y_p)) # Return as dataframes X_mixed = pd.DataFrame( X_mixed, columns=[ "Binary Measurement 1", "Binary Measurement 2", "Binary Measurement 3", "Continuous Measurement 1", ], ) Y_mixed = pd.DataFrame( Y_mixed, columns=[ "Binary Outcome 1", "Binary Outcome 2", "Binary Outcome 3", "Continuous Outcome 1", "Covariate 1", ], ) return X_mixed, Y_mixed, labels
# Data generation: Simulated problems from IFT6269 Hwk 4
[docs]def data_generation_gaussian(n_samples, sep_level, n_mm=6, random_state=None): """Bakk binary measurement model with more complex gaussian structural model. Parameters ---------- n_samples : int Number of samples. sep_level : float Separation level in the measurement data. Use .7, .8 or .9 for the paper simulation. n_mm: int Number of features in the measurement model. Use 6 for the paper simulation. random_state: int Random state. Returns ------- X : ndarray of shape (n_samples, n_mm) Binary Measurement samples. Y : ndarray of shape (n_samples, 2) Gaussian Structural samples. labels : ndarray of shape (n_samples,) Ground truth class membership. """ n_classes = 4 # Number of latent classes n_sm = 2 # Dimensions of the response variable Zo # True parameters # rho[c] = p(X=c) rho = np.ones(n_classes) / n_classes # mus[k] = E[Z0|X=c] # mus[k] = E[Z0|X=c] mus = np.array( [[-2.0344, 4.1726], [3.9779, 3.7735], [3.8007, -3.7972], [-3.0620, -3.5345]] ) # sigmas[k] = V[Z0|X=c] sigmas = np.array( [ [[2.9044, 0.2066], [0.2066, 2.7562]], [[0.2104, 0.2904], [0.2904, 12.2392]], [[0.9213, 0.0574], [0.0574, 1.8660]], [[6.2414, 6.0502], [6.0502, 6.1825]], ] ) # pis[k,c] = p(Yk=1|X=c) # sep_level = 0.9 #0.9->high, 0.8->medium, 0.7->low pis = bakk_measurements(n_classes, n_mm, sep_level) # Model parameters params = dict( weights=rho, measurement=dict(pis=pis.T), structural=dict(means=mus, covariances=sigmas), measurement_in=n_mm, structural_in=n_sm, ) # Sample data generator = StepMix( n_components=n_classes, measurement="bernoulli", structural="gaussian_full", random_state=random_state, ) generator.set_parameters(params) X, Y, labels = generator.sample(n_samples) return X, Y, labels
[docs]def data_gaussian_binary(n_samples, random_state=None): """Full Gaussian measurement model with 2 binary responses. The data has 4 latent classes. Parameters ---------- n_samples : int Number of samples. random_state: int Random state. Returns ------- X : ndarray of shape (n_samples, 2) Gaussian Measurement samples. Y : ndarray of shape (n_samples, 2) Binary Structural samples. labels : ndarray of shape (n_samples,) Ground truth class membership. """ n_classes = 4 # Number of latent classes # True parameters # rho[c] = p(X=c) rho = np.ones(n_classes) / n_classes # mus[k] = E[Z0|X=c] # mus[k] = E[Z0|X=c] mus = np.array( [[-2.0344, 4.1726], [3.9779, 3.7735], [3.8007, -3.7972], [-3.0620, -3.5345]] ) # sigmas[k] = V[Z0|X=c] sigmas = np.array( [ [[2.9044, 0.2066], [0.2066, 2.7562]], [[0.2104, 0.2904], [0.2904, 12.2392]], [[0.9213, 0.0574], [0.0574, 1.8660]], [[6.2414, 6.0502], [6.0502, 6.1825]], ] ) pis = np.array([[0.1, 0.9, 0.1, 0.9], [0.9, 0.9, 0.1, 0.1]]) # Model parameters params = dict( weights=rho, measurement=dict(pis=pis.T), structural=dict(means=mus, covariances=sigmas), measurement_in=2, structural_in=2, ) # Sample data generator = StepMix( n_components=n_classes, measurement="bernoulli", structural="gaussian_full", random_state=random_state, ) generator.set_parameters(params) Y, X, labels = generator.sample(n_samples) return X, Y, labels
[docs]def data_gaussian_categorical(n_samples, random_state=None): """Full Gaussian measurement model with 2 categorical responses. The data has 4 latent classes. Parameters ---------- n_samples : int Number of samples. random_state: int Random state. Returns ------- X : ndarray of shape (n_samples, 2) Gaussian Measurement samples. Y : ndarray of shape (n_samples, 2) Categorical Structural samples. labels : ndarray of shape (n_samples,) Ground truth class membership. """ n_classes = 4 # Number of latent classes # True parameters # rho[c] = p(X=c) rho = np.ones(n_classes) / n_classes # mus[k] = E[Z0|X=c] # mus[k] = E[Z0|X=c] mus = np.array( [[-2.0344, 4.1726], [3.9779, 3.7735], [3.8007, -3.7972], [-3.0620, -3.5345]] ) # sigmas[k] = V[Z0|X=c] sigmas = np.array( [ [[2.9044, 0.2066], [0.2066, 2.7562]], [[0.2104, 0.2904], [0.2904, 12.2392]], [[0.9213, 0.0574], [0.0574, 1.8660]], [[6.2414, 6.0502], [6.0502, 6.1825]], ] ) # 4 classes x (2 categorical variables x 3 outcomes) = 4 x 6 pis = np.array( [ [0.8, 0.1, 0.1, 0.9, 0.1, 0.0], [0.1, 0.1, 0.8, 0.9, 0.0, 0.1], [0.1, 0.8, 0.1, 0.1, 0.9, 0.0], [0.1, 0.1, 0.8, 0.1, 0.0, 0.9], ] ) # Model parameters params = dict( weights=rho, measurement=dict(means=mus, covariances=sigmas), structural=dict(pis=pis, max_n_outcomes=3, total_outcomes=2), measurement_in=2, structural_in=2, ) # Sample data generator = StepMix( n_components=n_classes, measurement="gaussian_full", structural="categorical", random_state=random_state, ) generator.set_parameters(params) X, Y, labels = generator.sample(n_samples) return X, Y, labels
[docs]def data_gaussian_diag(n_samples, sep_level, n_mm=6, random_state=None, nan_ratio=0.0): """Bakk binary measurement model with 2D diagonal gaussian structural model. Optionally, a random proportion of values can be replaced with missing values to test FIML models. Parameters ---------- n_samples : int Number of samples. sep_level : float Separation level in the measurement data. Use .7, .8 or .9 for the paper simulation. n_mm: int Number of features in the measurement model. Use 6 for the paper simulation. random_state: int Random state. nan_ratio: float Ratio of values to replace with missing values. Returns ------- X : ndarray of shape (n_samples, n_mm) Binary ,easurement samples. Y : ndarray of shape (n_samples, 2) Gaussian structural samples. labels : ndarray of shape (n_samples,) Ground truth class membership. """ n_classes = 3 # Number of latent classes n_sm = 2 # Dimensions of the response variable Zo # True parameters # rho[c] = p(X=c) rho = np.ones(n_classes) / n_classes # mus[k] = E[Z0|X=c] # mus[k] = E[Z0|X=c] mus = np.array([[0.0, 0.0], [5.0, 5.0], [-5.0, -5.0]]) # sigmas[k] = V[Z0|X=c] sigmas = np.array( [[[1.0, 0.0], [0.0, 1.0]], [[2.0, 0.0], [0.0, 2.0]], [[1.0, 0.0], [0.0, 3.0]]] ) # pis[k,c] = p(Yk=1|X=c) # sep_level = 0.9 #0.9->high, 0.8->medium, 0.7->low pis = bakk_measurements(n_classes, n_mm, sep_level) # Model parameters params = dict( weights=rho, measurement=dict(pis=pis.T), structural=dict(means=mus, covariances=sigmas), measurement_in=n_mm, structural_in=n_sm, ) # Sample data generator = StepMix( n_components=n_classes, measurement="bernoulli", structural="gaussian_full", random_state=random_state, ) generator.set_parameters(params) X, Y, labels = generator.sample(n_samples) # Drop some values if nan_ratio: rng = np.random.default_rng(random_state) observed_mask = rng.random((n_samples, n_mm + n_sm)) > nan_ratio if not observed_mask.sum(axis=1).all(): warnings.warn( "Some samples are completely unobserved. This will likely result in downstream errors. Reduce the nan_ratio or try another" "seed." ) mm_mask, sm_mask = observed_mask[:, :n_mm], observed_mask[:, n_mm:] X = np.where(mm_mask, X, np.nan) Y = np.where(sm_mask, Y, np.nan) return X, Y, labels